package mvfifo
import (
"container/list"
"iter"
"sync"
)
var (
// DefaultMaxSizeBytes sets the default max size at 256 MiB
DefaultMaxSizeBytes = 256 << 20
overhead = 16
itemPool = sync.Pool{
New: func() any {
return new(item)
},
}
)
type item struct {
key string
cur uint64
val []byte
}
// Cache implements a multi value FIFO cache.
type Cache struct {
items *list.List
maxSize int
mutex sync.RWMutex
size int
vals map[string]*list.List
}
// NewCache returns a new [Cache].
func NewCache(opts ...Option) *Cache {
c := &Cache{
maxSize: DefaultMaxSizeBytes,
vals: map[string]*list.List{},
items: list.New(),
}
for _, fn := range opts {
fn(c)
}
return c
}
// Add adds an item to the cache by key, cursor and value.
func (c *Cache) Add(key string, cur uint64, val []byte) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.size += len(key) + len(val) + overhead
for c.size > c.maxSize {
c.evict()
}
kl, ok := c.vals[key]
if !ok {
kl = list.New()
c.vals[key] = kl
}
i := itemPool.Get().(*item)
i.key = key
i.cur = cur
i.val = val
kl.PushBack(i)
c.items.PushBack(i)
}
// Resize changes the maximum size of the cache.
func (c *Cache) Resize(maxBytes int) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.maxSize = maxBytes
for c.size > c.maxSize {
c.evict()
}
}
// Size returns the approximate size of the cache in bytes.
func (c *Cache) Size() int {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.size
}
// Len returns the number of items in the cache.
func (c *Cache) Len() int {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.items.Len()
}
// First returns the oldest cursor and value in the cache.
func (c *Cache) First() (cur uint64, val []byte) {
c.mutex.RLock()
defer c.mutex.RUnlock()
if el := c.items.Front(); el != nil {
cur = el.Value.(*item).cur
val = el.Value.(*item).val
}
return
}
// Last returns the newest cursor and value in the cache.
func (c *Cache) Last() (cur uint64, val []byte) {
c.mutex.RLock()
defer c.mutex.RUnlock()
if el := c.items.Back(); el != nil {
cur = el.Value.(*item).cur
val = el.Value.(*item).val
}
return
}
// Iter returns an iterator over the cache at a certain key.
func (c *Cache) Iter(key string) iter.Seq2[uint64, []byte] {
return func(yield func(cur uint64, val []byte) bool) {
c.mutex.RLock()
defer c.mutex.RUnlock()
kl, ok := c.vals[key]
if !ok {
return
}
var i *item
for el := kl.Front(); el != nil; el = el.Next() {
i = el.Value.(*item)
if !yield(i.cur, i.val) {
return
}
}
}
}
// IterAfter returns an iterator over the cache at a certain key after a specific cursor.
// Complexity is O(2n). We iterate backward over the list then forward to optimize for access to later values.
func (c *Cache) IterAfter(key string, cur uint64) iter.Seq2[uint64, []byte] {
return func(yield func(cur uint64, val []byte) bool) {
c.mutex.RLock()
defer c.mutex.RUnlock()
kl, ok := c.vals[key]
if !ok {
return
}
var i *item
el := kl.Back()
for ; el != nil && el.Value.(*item).cur > cur; el = el.Prev() {
}
if el == nil {
el = kl.Front()
}
for ; el != nil; el = el.Next() {
i = el.Value.(*item)
if i.cur > cur && !yield(i.cur, i.val) {
return
}
}
}
}
func (c *Cache) evict() {
el := c.items.Front()
item := el.Value.(*item)
defer itemPool.Put(item)
kl := c.vals[item.key]
if kl.Len() == 1 {
delete(c.vals, item.key)
} else {
kl.Remove(kl.Front())
}
c.items.Remove(el)
c.size -= len(item.key) + len(item.val) + overhead
}
package mvfifo
type Option func(*Cache)
func WithMaxSizeBytes(s int) Option {
return func(c *Cache) {
c.maxSize = max(0, s)
}
}