Skip to content

Commit

Permalink
Rewrite DNS response cache (#378) (#413)
Browse files Browse the repository at this point in the history
  • Loading branch information
0xERR0R authored Jan 20, 2022
1 parent 35d257b commit 759f55d
Show file tree
Hide file tree
Showing 14 changed files with 589 additions and 192 deletions.
17 changes: 17 additions & 0 deletions cache/expirationcache/cache_interface.go
Original file line number Diff line number Diff line change
@@ -0,0 1,17 @@
package expirationcache

import "time"

type ExpiringCache interface {
// Put adds the value to the cache unter the passed key with expiration. If expiration <=0, entry will NOT be cached
Put(key string, val interface{}, expiration time.Duration)

// Get returns the value of cached entry with remained TTL. If entry is not cached, returns nil
Get(key string) (val interface{}, expiration time.Duration)

// TotalCount returns the total count of valid (not expired) elements
TotalCount() int

// Clear removes all cache entries
Clear()
}
171 changes: 171 additions & 0 deletions cache/expirationcache/expiration_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 1,171 @@
package expirationcache

import (
"time"

lru "github.com/hashicorp/golang-lru"
)

const (
defaultCleanUpInterval = 10 * time.Second
defaultSize = 10_000
)

type element struct {
val interface{}
expiresEpochMs int64
}

type ExpiringLRUCache struct {
cleanUpInterval time.Duration
preExpirationFn OnExpirationCallback
lru *lru.Cache
}

type CacheOption func(c *ExpiringLRUCache)

func WithCleanUpInterval(d time.Duration) CacheOption {
return func(e *ExpiringLRUCache) {
e.cleanUpInterval = d
}
}

// OnExpirationCallback will be called just before an element gets expired and will
// be removed from cache. This function can return new value and TTL to leave the
// element in the cache or nil to remove it
type OnExpirationCallback func(key string) (val interface{}, ttl time.Duration)

func WithOnExpiredFn(fn OnExpirationCallback) CacheOption {
return func(c *ExpiringLRUCache) {
c.preExpirationFn = fn
}
}

func WithMaxSize(size uint) CacheOption {
return func(c *ExpiringLRUCache) {
if size > 0 {
l, _ := lru.New(int(size))
c.lru = l
}
}
}

func NewCache(options ...CacheOption) *ExpiringLRUCache {
l, _ := lru.New(defaultSize)
c := &ExpiringLRUCache{
cleanUpInterval: defaultCleanUpInterval,
preExpirationFn: func(key string) (val interface{}, ttl time.Duration) {
return nil, 0
},
lru: l,
}

for _, opt := range options {
opt(c)
}

go periodicCleanup(c)

return c
}

func periodicCleanup(c *ExpiringLRUCache) {
ticker := time.NewTicker(c.cleanUpInterval)
defer ticker.Stop()

for {
<-ticker.C
c.cleanUp()
}
}

func (e *ExpiringLRUCache) cleanUp() {
var expiredKeys []string

// check for expired items and collect expired keys
for _, k := range e.lru.Keys() {
if v, ok := e.lru.Get(k); ok {
if isExpired(v.(*element)) {
expiredKeys = append(expiredKeys, k.(string))
}
}
}

if len(expiredKeys) > 0 {
var keysToDelete []string

for _, key := range expiredKeys {
newVal, newTTL := e.preExpirationFn(key)
if newVal != nil {
e.Put(key, newVal, newTTL)
} else {
keysToDelete = append(keysToDelete, key)
}
}

for _, key := range keysToDelete {
e.lru.Remove(key)
}
}
}

func (e *ExpiringLRUCache) Put(key string, val interface{}, ttl time.Duration) {
if ttl <= 0 {
// entry should be considered as already expired
return
}

expiresEpochMs := time.Now().UnixMilli() ttl.Milliseconds()

el, found := e.lru.Get(key)
if found {
// update existing item
el.(*element).val = val
el.(*element).expiresEpochMs = expiresEpochMs
} else {
// add new item
e.lru.Add(key, &element{
val: val,
expiresEpochMs: expiresEpochMs,
})
}
}

func (e *ExpiringLRUCache) Get(key string) (val interface{}, ttl time.Duration) {
el, found := e.lru.Get(key)

if found {
return el.(*element).val, calculateRemainTTL(el.(*element).expiresEpochMs)
}

return nil, 0
}

func isExpired(el *element) bool {
return el.expiresEpochMs > 0 && time.Now().UnixMilli() > el.expiresEpochMs
}

func calculateRemainTTL(expiresEpoch int64) time.Duration {
now := time.Now().UnixMilli()
if now < expiresEpoch {
return time.Duration(expiresEpoch-now) * time.Millisecond
}

return 0
}

func (e *ExpiringLRUCache) TotalCount() (count int) {
for _, k := range e.lru.Keys() {
if v, ok := e.lru.Get(k); ok {
if !isExpired(v.(*element)) {
count
}
}
}

return count
}

func (e *ExpiringLRUCache) Clear() {
e.lru.Purge()
}
16 changes: 16 additions & 0 deletions cache/expirationcache/expiration_cache_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 1,16 @@
package expirationcache_test

import (
"testing"

. "github.com/0xERR0R/blocky/log"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestCache(t *testing.T) {
ConfigureLogger(LevelFatal, FormatTypeText, true)
RegisterFailHandler(Fail)
RunSpecs(t, "Expiration cache suite")
}
178 changes: 178 additions & 0 deletions cache/expirationcache/expiration_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 1,178 @@
package expirationcache

import (
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("Expiration cache", func() {
Describe("Basic operations", func() {
When("string cache was created", func() {

It("Initial cache should be empty", func() {
cache := NewCache()
Expect(cache.TotalCount()).Should(Equal(0))
})
It("Initial cache should not contain any elements", func() {
cache := NewCache()
val, expiration := cache.Get("key1")
Expect(val).Should(BeNil())
Expect(expiration).Should(Equal(time.Duration(0)))
})
})
When("Put new value with positive TTL", func() {
It("Should return the value before element expires", func() {
cache := NewCache(WithCleanUpInterval(100 * time.Millisecond))
cache.Put("key1", "val1", 50*time.Millisecond)
val, expiration := cache.Get("key1")
Expect(val).Should(Equal("val1"))
Expect(expiration.Milliseconds()).Should(BeNumerically("<=", 50))

Expect(cache.TotalCount()).Should(Equal(1))
})
It("Should return nil after expiration", func() {
cache := NewCache(WithCleanUpInterval(100 * time.Millisecond))
cache.Put("key1", "val1", 50*time.Millisecond)

// wait for expiration
Eventually(func(g Gomega) {
val, ttl := cache.Get("key1")
g.Expect(val).Should(Equal("val1"))
g.Expect(ttl.Milliseconds()).Should(BeNumerically("==", 0))
}, "60ms").Should(Succeed())

Expect(cache.TotalCount()).Should(Equal(0))
// internal map has still the expired item
Expect(cache.lru.Len()).Should(Equal(1))

// wait for cleanup run
Eventually(func() int {
return cache.lru.Len()
}, "100ms").Should(Equal(0))
})
})
When("Put new value without expiration", func() {
It("Should not cache the value", func() {
cache := NewCache(WithCleanUpInterval(50 * time.Millisecond))
cache.Put("key1", "val1", 0)
val, expiration := cache.Get("key1")
Expect(val).Should(BeNil())
Expect(expiration.Milliseconds()).Should(BeNumerically("==", 0))
Expect(cache.TotalCount()).Should(Equal(0))
})
})
When("Put updated value", func() {
It("Should return updated value", func() {
cache := NewCache()
cache.Put("key1", "val1", 50*time.Millisecond)
cache.Put("key1", "val2", 200*time.Millisecond)

val, expiration := cache.Get("key1")

Expect(val).Should(Equal("val2"))
Expect(expiration.Milliseconds()).Should(BeNumerically(">", 100))
Expect(expiration.Milliseconds()).Should(BeNumerically("<=", 200))
Expect(cache.TotalCount()).Should(Equal(1))
})
})
When("Purging after usage", func() {
It("Should be empty after purge", func() {
cache := NewCache()
cache.Put("key1", "val1", time.Second)

Expect(cache.TotalCount()).Should(Equal(1))

cache.Clear()

Expect(cache.TotalCount()).Should(Equal(0))
})
})
})
Describe("preExpiration function", func() {
When(" function is defined", func() {
It("should update the value and TTL if function returns values", func() {
fn := func(key string) (val interface{}, ttl time.Duration) {
return "val2", time.Second
}
cache := NewCache(WithOnExpiredFn(fn))
cache.Put("key1", "val1", 50*time.Millisecond)

// wait for expiration
Eventually(func(g Gomega) {
val, ttl := cache.Get("key1")
g.Expect(val).Should(Equal("val1"))
g.Expect(ttl.Milliseconds()).Should(
BeNumerically("==", 0))
}, "150ms").Should(Succeed())
})

It("should update the value and TTL if function returns values on cleanup if element is expired", func() {
fn := func(key string) (val interface{}, ttl time.Duration) {
return "val2", time.Second
}
cache := NewCache(WithOnExpiredFn(fn))
cache.Put("key1", "val1", time.Millisecond)

time.Sleep(2 * time.Millisecond)

// trigger cleanUp manually -> onExpiredFn will be executed, because element is expired
cache.cleanUp()

// wait for expiration
val, ttl := cache.Get("key1")
Expect(val).Should(Equal("val2"))
Expect(ttl.Milliseconds()).Should(And(
BeNumerically(">", 900)),
BeNumerically("<=", 1000))
})

It("should delete the key if function returns nil", func() {
fn := func(key string) (val interface{}, ttl time.Duration) {
return nil, 0
}
cache := NewCache(WithCleanUpInterval(100*time.Millisecond), WithOnExpiredFn(fn))
cache.Put("key1", "val1", 50*time.Millisecond)

Eventually(func() (interface{}, time.Duration) {
return cache.Get("key1")
}, "200ms").Should(BeNil())
})

})
})
Describe("LRU behaviour", func() {
When("Defined max size is reached", func() {
It("should remove old elements", func() {
cache := NewCache(WithMaxSize(3))

cache.Put("key1", "val1", time.Second)
cache.Put("key2", "val2", time.Second)
cache.Put("key3", "val3", time.Second)
cache.Put("key4", "val4", time.Second)

Expect(cache.TotalCount()).Should(Equal(3))

// key1 was removed
Expect(cache.Get("key1")).Should(BeNil())
// key2,3,4 still in the cache
Expect(cache.lru.Contains("key2")).Should(BeTrue())
Expect(cache.lru.Contains("key3")).Should(BeTrue())
Expect(cache.lru.Contains("key4")).Should(BeTrue())

// now get key2 to increase usage count
_, _ = cache.Get("key2")

// put key5
cache.Put("key5", "val5", time.Second)

// now key3 should be removed
Expect(cache.lru.Contains("key2")).Should(BeTrue())
Expect(cache.lru.Contains("key3")).Should(BeFalse())
Expect(cache.lru.Contains("key4")).Should(BeTrue())
Expect(cache.lru.Contains("key5")).Should(BeTrue())
})
})
})
})
Loading

0 comments on commit 759f55d

Please sign in to comment.