Skip to content

Commit

Permalink
write database entries async in bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
0xERR0R committed Sep 13, 2021
1 parent 26f42e9 commit e6ca896
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ serve_docs: ## serves online docs
mkdocs serve

build: ## Build binary
go install github.com/abice/go-enum
go get -d github.com/abice/go-enum
go generate ./...
go build -v -ldflags="-w -s -X github.com/0xERR0R/blocky/util.Version=${VERSION} -X github.com/0xERR0R/blocky/util.BuildTime=${BUILD_TIME}" -o $(BIN_OUT_DIR)/$(BINARY_NAME)$(BINARY_SUFFIX)

Expand Down
69 changes: 59 additions & 10 deletions querylog/database_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package querylog

import (
"strings"
"sync"
"time"

"gorm.io/gorm/logger"

"github.com/0xERR0R/blocky/log"

"github.com/0xERR0R/blocky/util"
Expand All @@ -29,15 +32,27 @@ type logEntry struct {

type DatabaseWriter struct {
db *gorm.DB
logRetentionDays int
logRetentionDays uint64
pendingEntries []*logEntry
lock sync.RWMutex
dbFlushPeriod time.Duration
}

func NewDatabaseWriter(target string, logRetentionDays uint64) *DatabaseWriter {
return newDatabaseWriter(mysql.Open(target), logRetentionDays)
func NewDatabaseWriter(target string, logRetentionDays uint64, dbFlushPeriod time.Duration) *DatabaseWriter {
return newDatabaseWriter(mysql.Open(target), logRetentionDays, dbFlushPeriod)
}

func newDatabaseWriter(target gorm.Dialector, logRetentionDays uint64) *DatabaseWriter {
db, err := gorm.Open(target, &gorm.Config{})
func newDatabaseWriter(target gorm.Dialector, logRetentionDays uint64, dbFlushPeriod time.Duration) *DatabaseWriter {
db, err := gorm.Open(target, &gorm.Config{
Logger: logger.New(
log.Log(),
logger.Config{
SlowThreshold: time.Minute,
LogLevel: logger.Warn,
IgnoreRecordNotFoundError: false,
Colorful: false,
}),
})

if err != nil {
util.FatalOnError("can't create database connection", err)
Expand All @@ -47,16 +62,31 @@ func newDatabaseWriter(target gorm.Dialector, logRetentionDays uint64) *Database
// Migrate the schema
util.FatalOnError("can't perform auto migration", db.AutoMigrate(&logEntry{}))

return &DatabaseWriter{
w := &DatabaseWriter{
db: db,
logRetentionDays: int(logRetentionDays)}
logRetentionDays: logRetentionDays,
dbFlushPeriod: dbFlushPeriod}

go w.periodicFlush()

return w
}

func (d *DatabaseWriter) periodicFlush() {
ticker := time.NewTicker(d.dbFlushPeriod)
defer ticker.Stop()

for {
<-ticker.C
d.doDBWrite()
}
}

func (d *DatabaseWriter) Write(entry *Entry) {
domain := util.ExtractDomain(entry.Request.Req.Question[0])
eTLD, _ := publicsuffix.EffectiveTLDPlusOne(domain)

d.db.Create(&logEntry{
e := &logEntry{
RequestTS: &entry.Start,
ClientIP: entry.Request.ClientIP.String(),
ClientName: strings.Join(entry.Request.ClientNames, "; "),
Expand All @@ -68,12 +98,31 @@ func (d *DatabaseWriter) Write(entry *Entry) {
EffectiveTLDP: eTLD,
Answer: util.AnswerToString(entry.Response.Res.Answer),
ResponseCode: dns.RcodeToString[entry.Response.Res.Rcode],
})
}

d.lock.Lock()
defer d.lock.Unlock()

d.pendingEntries = append(d.pendingEntries, e)
}

func (d *DatabaseWriter) CleanUp() {
deletionDate := time.Now().AddDate(0, 0, -d.logRetentionDays)
deletionDate := time.Now().AddDate(0, 0, int(-d.logRetentionDays))

log.PrefixedLog("database_writer").Debugf("deleting log entries with request_ts < %s", deletionDate)
d.db.Where("request_ts < ?", deletionDate).Delete(&logEntry{})
}

func (d *DatabaseWriter) doDBWrite() {
d.lock.Lock()
defer d.lock.Unlock()

if len(d.pendingEntries) > 0 {
log.Log().Tracef("%d entries to write", len(d.pendingEntries))

// write bulk
d.db.Create(d.pendingEntries)
// clear the slice with pending entries
d.pendingEntries = nil
}
}
9 changes: 6 additions & 3 deletions querylog/database_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var _ = Describe("DatabaseWriter", func() {
When("New log entry was created", func() {
It("should be persisted in the database", func() {
sqlite := sqlite.Open("file::memory:")
writer := newDatabaseWriter(sqlite, 7)
writer := newDatabaseWriter(sqlite, 7, 1)
request := &model.Request{
Req: util.NewMsgWithQuestion("google.de.", dns.TypeA),
Log: logrus.NewEntry(logrus.New()),
Expand All @@ -41,6 +41,7 @@ var _ = Describe("DatabaseWriter", func() {
Start: time.Now(),
DurationMs: 20,
})
time.Sleep(500 * time.Millisecond)

result := writer.db.Find(&logEntry{})

Expand All @@ -55,7 +56,7 @@ var _ = Describe("DatabaseWriter", func() {
When("There are log entries with timestamp exceeding the retention period", func() {
It("these old entries should be deleted", func() {
sqlite := sqlite.Open("file::memory:")
writer := newDatabaseWriter(sqlite, 1)
writer := newDatabaseWriter(sqlite, 1, 1)
request := &model.Request{
Req: util.NewMsgWithQuestion("google.de.", dns.TypeA),
Log: logrus.NewEntry(logrus.New()),
Expand Down Expand Up @@ -87,6 +88,8 @@ var _ = Describe("DatabaseWriter", func() {

result := writer.db.Find(&logEntry{})

time.Sleep(500 * time.Millisecond)

var cnt int64
result.Count(&cnt)

Expand All @@ -106,7 +109,7 @@ var _ = Describe("DatabaseWriter", func() {
When("connection parameters wrong", func() {
It("should be log with fatal", func() {
helpertest.ShouldLogFatal(func() {
NewDatabaseWriter("wrong param", 7)
NewDatabaseWriter("wrong param", 7, 1)
})

})
Expand Down
5 changes: 2 additions & 3 deletions resolver/query_logging_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ const (
logChanCap = 1000
)

// QueryLoggingResolver writes query information (question, answer, duration, ...) into
// log file or as log entry (if log directory is not configured)
// QueryLoggingResolver writes query information (question, answer, duration, ...)
type QueryLoggingResolver struct {
NextResolver
target string
Expand All @@ -35,7 +34,7 @@ func NewQueryLoggingResolver(cfg config.QueryLogConfig) ChainedResolver {
case config.QueryLogTypeCsvClient:
writer = querylog.NewCSVWriter(cfg.Target, true, cfg.LogRetentionDays)
case config.QueryLogTypeMysql:
writer = querylog.NewDatabaseWriter(cfg.Target, cfg.LogRetentionDays)
writer = querylog.NewDatabaseWriter(cfg.Target, cfg.LogRetentionDays, 30*time.Second)
case config.QueryLogTypeNone:
writer = querylog.NewLoggerWriter()
}
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func getServerAddress(addr string) string {
func NewServer(cfg *config.Config) (server *Server, err error) {
address := getServerAddress(cfg.Port)

log.ConfigureLogger(log.LevelInfo, cfg.LogFormat, cfg.LogTimestamp)
log.ConfigureLogger(cfg.LogLevel, cfg.LogFormat, cfg.LogTimestamp)

udpServer := createUDPServer(address)
tcpServer := createTCPServer(address)
Expand Down

0 comments on commit e6ca896

Please sign in to comment.