package freecache
import (
"encoding/binary"
"sync"
"sync/atomic"
"github.com/cespare/xxhash/v2"
)
const (
// segmentCount represents the number of segments within a freecache instance.
segmentCount = 256
// segmentAndOpVal is bitwise AND applied to the hashVal to find the segment id.
segmentAndOpVal = 255
minBufSize = 512 * 1024
)
// Cache is a freecache instance.
type Cache struct {
locks [segmentCount]sync.Mutex
segments [segmentCount]segment
}
type Updater func(value []byte, found bool) (newValue []byte, replace bool, expireSeconds int)
func hashFunc(data []byte) uint64 {
return xxhash.Sum64(data)
}
// NewCache returns a newly initialize cache by size.
// The cache size will be set to 512KB at minimum.
// If the size is set relatively large, you should call
// `debug.SetGCPercent()`, set it to a much smaller value
// to limit the memory consumption and GC pause time.
func NewCache(size int) (cache *Cache) {
return NewCacheCustomTimer(size, defaultTimer{})
}
// NewCacheCustomTimer returns new cache with custom timer.
func NewCacheCustomTimer(size int, timer Timer) (cache *Cache) {
if size < minBufSize {
size = minBufSize
}
if timer == nil {
timer = defaultTimer{}
}
cache = new(Cache)
for i := 0; i < segmentCount; i++ {
cache.segments[i] = newSegment(size/segmentCount, i, timer)
}
return
}
// Set sets a key, value and expiration for a cache entry and stores it in the cache.
// If the key is larger than 65535 or value is larger than 1/1024 of the cache size,
// the entry will not be written to the cache. expireSeconds <= 0 means no expire,
// but it can be evicted when cache is full.
func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
err = cache.segments[segID].set(key, value, hashVal, expireSeconds)
cache.locks[segID].Unlock()
return
}
// Touch updates the expiration time of an existing key. expireSeconds <= 0 means no expire,
// but it can be evicted when cache is full.
func (cache *Cache) Touch(key []byte, expireSeconds int) (err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
err = cache.segments[segID].touch(key, hashVal, expireSeconds)
cache.locks[segID].Unlock()
return
}
// Get returns the value or not found error.
func (cache *Cache) Get(key []byte) (value []byte, err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
value, _, err = cache.segments[segID].get(key, nil, hashVal, false)
cache.locks[segID].Unlock()
return
}
// GetFn is equivalent to Get or GetWithBuf, but it attempts to be zero-copy,
// calling the provided function with slice view over the current underlying
// value of the key in memory. The slice is constrained in length and capacity.
//
// In moth cases, this method will not alloc a byte buffer. The only exception
// is when the value wraps around the underlying segment ring buffer.
//
// The method will return ErrNotFound is there's a miss, and the function will
// not be called. Errors returned by the function will be propagated.
func (cache *Cache) GetFn(key []byte, fn func([]byte) error) (err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
err = cache.segments[segID].view(key, fn, hashVal, false)
cache.locks[segID].Unlock()
return
}
// GetOrSet returns existing value or if record doesn't exist
// it sets a new key, value and expiration for a cache entry and stores it in the cache, returns nil in that case
func (cache *Cache) GetOrSet(key, value []byte, expireSeconds int) (retValue []byte, err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
defer cache.locks[segID].Unlock()
retValue, _, err = cache.segments[segID].get(key, nil, hashVal, false)
if err != nil {
err = cache.segments[segID].set(key, value, hashVal, expireSeconds)
}
return
}
// SetAndGet sets a key, value and expiration for a cache entry and stores it in the cache.
// If the key is larger than 65535 or value is larger than 1/1024 of the cache size,
// the entry will not be written to the cache. expireSeconds <= 0 means no expire,
// but it can be evicted when cache is full. Returns existing value if record exists
// with a bool value to indicate whether an existing record was found
func (cache *Cache) SetAndGet(key, value []byte, expireSeconds int) (retValue []byte, found bool, err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
defer cache.locks[segID].Unlock()
retValue, _, err = cache.segments[segID].get(key, nil, hashVal, false)
if err == nil {
found = true
}
err = cache.segments[segID].set(key, value, hashVal, expireSeconds)
return
}
// Update gets value for a key, passes it to updater function that decides if set should be called as well
// This allows for an atomic Get plus Set call using the existing value to decide on whether to call Set.
// If the key is larger than 65535 or value is larger than 1/1024 of the cache size,
// the entry will not be written to the cache. expireSeconds <= 0 means no expire,
// but it can be evicted when cache is full. Returns bool value to indicate if existing record was found along with bool
// value indicating the value was replaced and error if any
func (cache *Cache) Update(key []byte, updater Updater) (found bool, replaced bool, err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
defer cache.locks[segID].Unlock()
retValue, _, err := cache.segments[segID].get(key, nil, hashVal, false)
if err == nil {
found = true
} else {
err = nil // Clear ErrNotFound error since we're returning found flag
}
value, replaced, expireSeconds := updater(retValue, found)
if !replaced {
return
}
err = cache.segments[segID].set(key, value, hashVal, expireSeconds)
return
}
// Peek returns the value or not found error, without updating access time or counters.
// Warning: No expiry check is performed so if an expired value is found, it will be
// returned without error
func (cache *Cache) Peek(key []byte) (value []byte, err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
value, _, err = cache.segments[segID].get(key, nil, hashVal, true)
cache.locks[segID].Unlock()
return
}
// PeekFn is equivalent to Peek, but it attempts to be zero-copy, calling the
// provided function with slice view over the current underlying value of the
// key in memory. The slice is constrained in length and capacity.
//
// In most cases, this method will not alloc a byte buffer. The only exception
// is when the value wraps around the underlying segment ring buffer.
//
// The method will return ErrNotFound is there's a miss, and the function will
// not be called. Errors returned by the function will be propagated.
// Warning: No expiry check is performed so if an expired value is found, it will be
// returned without error
func (cache *Cache) PeekFn(key []byte, fn func([]byte) error) (err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
err = cache.segments[segID].view(key, fn, hashVal, true)
cache.locks[segID].Unlock()
return
}
// GetWithBuf copies the value to the buf or returns not found error.
// This method doesn't allocate memory when the capacity of buf is greater or equal to value.
func (cache *Cache) GetWithBuf(key, buf []byte) (value []byte, err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
value, _, err = cache.segments[segID].get(key, buf, hashVal, false)
cache.locks[segID].Unlock()
return
}
// GetWithExpiration returns the value with expiration or not found error.
func (cache *Cache) GetWithExpiration(key []byte) (value []byte, expireAt uint32, err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
value, expireAt, err = cache.segments[segID].get(key, nil, hashVal, false)
cache.locks[segID].Unlock()
return
}
// TTL returns the TTL time left for a given key or a not found error.
func (cache *Cache) TTL(key []byte) (timeLeft uint32, err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
timeLeft, err = cache.segments[segID].ttl(key, hashVal)
cache.locks[segID].Unlock()
return
}
// Del deletes an item in the cache by key and returns true or false if a delete occurred.
func (cache *Cache) Del(key []byte) (affected bool) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
affected = cache.segments[segID].del(key, hashVal)
cache.locks[segID].Unlock()
return
}
// SetInt stores in integer value in the cache.
func (cache *Cache) SetInt(key int64, value []byte, expireSeconds int) (err error) {
var bKey [8]byte
binary.LittleEndian.PutUint64(bKey[:], uint64(key))
return cache.Set(bKey[:], value, expireSeconds)
}
// GetInt returns the value for an integer within the cache or a not found error.
func (cache *Cache) GetInt(key int64) (value []byte, err error) {
var bKey [8]byte
binary.LittleEndian.PutUint64(bKey[:], uint64(key))
return cache.Get(bKey[:])
}
// GetIntWithExpiration returns the value and expiration or a not found error.
func (cache *Cache) GetIntWithExpiration(key int64) (value []byte, expireAt uint32, err error) {
var bKey [8]byte
binary.LittleEndian.PutUint64(bKey[:], uint64(key))
return cache.GetWithExpiration(bKey[:])
}
// DelInt deletes an item in the cache by int key and returns true or false if a delete occurred.
func (cache *Cache) DelInt(key int64) (affected bool) {
var bKey [8]byte
binary.LittleEndian.PutUint64(bKey[:], uint64(key))
return cache.Del(bKey[:])
}
// EvacuateCount is a metric indicating the number of times an eviction occurred.
func (cache *Cache) EvacuateCount() (count int64) {
for i := range cache.segments {
count += atomic.LoadInt64(&cache.segments[i].totalEvacuate)
}
return
}
// ExpiredCount is a metric indicating the number of times an expire occurred.
func (cache *Cache) ExpiredCount() (count int64) {
for i := range cache.segments {
count += atomic.LoadInt64(&cache.segments[i].totalExpired)
}
return
}
// EntryCount returns the number of items currently in the cache.
func (cache *Cache) EntryCount() (entryCount int64) {
for i := range cache.segments {
entryCount += atomic.LoadInt64(&cache.segments[i].entryCount)
}
return
}
// AverageAccessTime returns the average unix timestamp when a entry being accessed.
// Entries have greater access time will be evacuated when it
// is about to be overwritten by new value.
func (cache *Cache) AverageAccessTime() int64 {
var entryCount, totalTime int64
for i := range cache.segments {
totalTime += atomic.LoadInt64(&cache.segments[i].totalTime)
entryCount += atomic.LoadInt64(&cache.segments[i].totalCount)
}
if entryCount == 0 {
return 0
} else {
return totalTime / entryCount
}
}
// HitCount is a metric that returns number of times a key was found in the cache.
func (cache *Cache) HitCount() (count int64) {
for i := range cache.segments {
count += atomic.LoadInt64(&cache.segments[i].hitCount)
}
return
}
// MissCount is a metric that returns the number of times a miss occurred in the cache.
func (cache *Cache) MissCount() (count int64) {
for i := range cache.segments {
count += atomic.LoadInt64(&cache.segments[i].missCount)
}
return
}
// LookupCount is a metric that returns the number of times a lookup for a given key occurred.
func (cache *Cache) LookupCount() int64 {
return cache.HitCount() + cache.MissCount()
}
// HitRate is the ratio of hits over lookups.
func (cache *Cache) HitRate() float64 {
hitCount, missCount := cache.HitCount(), cache.MissCount()
lookupCount := hitCount + missCount
if lookupCount == 0 {
return 0
} else {
return float64(hitCount) / float64(lookupCount)
}
}
// OverwriteCount indicates the number of times entries have been overridden.
func (cache *Cache) OverwriteCount() (overwriteCount int64) {
for i := range cache.segments {
overwriteCount += atomic.LoadInt64(&cache.segments[i].overwrites)
}
return
}
// TouchedCount indicates the number of times entries have had their expiration time extended.
func (cache *Cache) TouchedCount() (touchedCount int64) {
for i := range cache.segments {
touchedCount += atomic.LoadInt64(&cache.segments[i].touched)
}
return
}
// Clear clears the cache.
func (cache *Cache) Clear() {
for i := range cache.segments {
cache.locks[i].Lock()
cache.segments[i].clear()
cache.locks[i].Unlock()
}
}
// ResetStatistics refreshes the current state of the statistics.
func (cache *Cache) ResetStatistics() {
for i := range cache.segments {
cache.locks[i].Lock()
cache.segments[i].resetStatistics()
cache.locks[i].Unlock()
}
}
package freecache
import (
"unsafe"
)
// Iterator iterates the entries for the cache.
type Iterator struct {
cache *Cache
segmentIdx int
slotIdx int
entryIdx int
}
// Entry represents a key/value pair.
type Entry struct {
Key []byte
Value []byte
ExpireAt uint32
}
// Next returns the next entry for the iterator.
// The order of the entries is not guaranteed.
// If there is no more entries to return, nil will be returned.
func (it *Iterator) Next() *Entry {
for it.segmentIdx < 256 {
entry := it.nextForSegment(it.segmentIdx)
if entry != nil {
return entry
}
it.segmentIdx++
it.slotIdx = 0
it.entryIdx = 0
}
return nil
}
func (it *Iterator) nextForSegment(segIdx int) *Entry {
it.cache.locks[segIdx].Lock()
defer it.cache.locks[segIdx].Unlock()
seg := &it.cache.segments[segIdx]
for it.slotIdx < 256 {
entry := it.nextForSlot(seg, it.slotIdx)
if entry != nil {
return entry
}
it.slotIdx++
it.entryIdx = 0
}
return nil
}
func (it *Iterator) nextForSlot(seg *segment, slotId int) *Entry {
slotOff := int32(it.slotIdx) * seg.slotCap
slot := seg.slotsData[slotOff : slotOff+seg.slotLens[it.slotIdx] : slotOff+seg.slotCap]
for it.entryIdx < len(slot) {
ptr := slot[it.entryIdx]
it.entryIdx++
now := seg.timer.Now()
var hdrBuf [ENTRY_HDR_SIZE]byte
seg.rb.ReadAt(hdrBuf[:], ptr.offset)
hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
if hdr.expireAt == 0 || hdr.expireAt > now {
entry := new(Entry)
entry.Key = make([]byte, hdr.keyLen)
entry.Value = make([]byte, hdr.valLen)
entry.ExpireAt = hdr.expireAt
seg.rb.ReadAt(entry.Key, ptr.offset+ENTRY_HDR_SIZE)
seg.rb.ReadAt(entry.Value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
return entry
}
}
return nil
}
// NewIterator creates a new iterator for the cache.
func (cache *Cache) NewIterator() *Iterator {
return &Iterator{
cache: cache,
}
}
package freecache
import (
"bytes"
"errors"
"fmt"
"io"
)
var ErrOutOfRange = errors.New("out of range")
// Ring buffer has a fixed size, when data exceeds the
// size, old data will be overwritten by new data.
// It only contains the data in the stream from begin to end
type RingBuf struct {
begin int64 // beginning offset of the data stream.
end int64 // ending offset of the data stream.
data []byte
index int //range from '0' to 'len(rb.data)-1'
}
func NewRingBuf(size int, begin int64) (rb RingBuf) {
rb.data = make([]byte, size)
rb.Reset(begin)
return
}
// Reset the ring buffer
//
// Parameters:
// begin: beginning offset of the data stream
func (rb *RingBuf) Reset(begin int64) {
rb.begin = begin
rb.end = begin
rb.index = 0
}
// Create a copy of the buffer.
func (rb *RingBuf) Dump() []byte {
dump := make([]byte, len(rb.data))
copy(dump, rb.data)
return dump
}
func (rb *RingBuf) String() string {
return fmt.Sprintf("[size:%v, start:%v, end:%v, index:%v]", len(rb.data), rb.begin, rb.end, rb.index)
}
func (rb *RingBuf) Size() int64 {
return int64(len(rb.data))
}
func (rb *RingBuf) Begin() int64 {
return rb.begin
}
func (rb *RingBuf) End() int64 {
return rb.end
}
// read up to len(p), at off of the data stream.
func (rb *RingBuf) ReadAt(p []byte, off int64) (n int, err error) {
if off > rb.end || off < rb.begin {
err = ErrOutOfRange
return
}
readOff := rb.getDataOff(off)
readEnd := readOff + int(rb.end-off)
if readEnd <= len(rb.data) {
n = copy(p, rb.data[readOff:readEnd])
} else {
n = copy(p, rb.data[readOff:])
if n < len(p) {
n += copy(p[n:], rb.data[:readEnd-len(rb.data)])
}
}
if n < len(p) {
err = io.EOF
}
return
}
func (rb *RingBuf) getDataOff(off int64) int {
var dataOff int
if rb.end-rb.begin < int64(len(rb.data)) {
dataOff = int(off - rb.begin)
} else {
dataOff = rb.index + int(off-rb.begin)
}
if dataOff >= len(rb.data) {
dataOff -= len(rb.data)
}
return dataOff
}
// Slice returns a slice of the supplied range of the ring buffer. It will
// not alloc unless the requested range wraps the ring buffer.
func (rb *RingBuf) Slice(off, length int64) ([]byte, error) {
if off > rb.end || off < rb.begin {
return nil, ErrOutOfRange
}
readOff := rb.getDataOff(off)
readEnd := readOff + int(length)
if readEnd <= len(rb.data) {
return rb.data[readOff:readEnd:readEnd], nil
}
buf := make([]byte, length)
n := copy(buf, rb.data[readOff:])
if n < int(length) {
n += copy(buf[n:], rb.data[:readEnd-len(rb.data)])
}
if n < int(length) {
return nil, io.EOF
}
return buf, nil
}
func (rb *RingBuf) Write(p []byte) (n int, err error) {
if len(p) > len(rb.data) {
err = ErrOutOfRange
return
}
for n < len(p) {
written := copy(rb.data[rb.index:], p[n:])
rb.end += int64(written)
n += written
rb.index += written
if rb.index >= len(rb.data) {
rb.index -= len(rb.data)
}
}
if int(rb.end-rb.begin) > len(rb.data) {
rb.begin = rb.end - int64(len(rb.data))
}
return
}
func (rb *RingBuf) WriteAt(p []byte, off int64) (n int, err error) {
if off+int64(len(p)) > rb.end || off < rb.begin {
err = ErrOutOfRange
return
}
writeOff := rb.getDataOff(off)
writeEnd := writeOff + int(rb.end-off)
if writeEnd <= len(rb.data) {
n = copy(rb.data[writeOff:writeEnd], p)
} else {
n = copy(rb.data[writeOff:], p)
if n < len(p) {
n += copy(rb.data[:writeEnd-len(rb.data)], p[n:])
}
}
return
}
func (rb *RingBuf) EqualAt(p []byte, off int64) bool {
if off+int64(len(p)) > rb.end || off < rb.begin {
return false
}
readOff := rb.getDataOff(off)
readEnd := readOff + len(p)
if readEnd <= len(rb.data) {
return bytes.Equal(p, rb.data[readOff:readEnd])
} else {
firstLen := len(rb.data) - readOff
equal := bytes.Equal(p[:firstLen], rb.data[readOff:])
if equal {
secondLen := len(p) - firstLen
equal = bytes.Equal(p[firstLen:], rb.data[:secondLen])
}
return equal
}
}
// Evacuate read the data at off, then write it to the the data stream,
// Keep it from being overwritten by new data.
func (rb *RingBuf) Evacuate(off int64, length int) (newOff int64) {
if off+int64(length) > rb.end || off < rb.begin {
return -1
}
readOff := rb.getDataOff(off)
if readOff == rb.index {
// no copy evacuate
rb.index += length
if rb.index >= len(rb.data) {
rb.index -= len(rb.data)
}
} else if readOff < rb.index {
var n = copy(rb.data[rb.index:], rb.data[readOff:readOff+length])
rb.index += n
if rb.index == len(rb.data) {
rb.index = copy(rb.data, rb.data[readOff+n:readOff+length])
}
} else {
var readEnd = readOff + length
var n int
if readEnd <= len(rb.data) {
n = copy(rb.data[rb.index:], rb.data[readOff:readEnd])
rb.index += n
} else {
n = copy(rb.data[rb.index:], rb.data[readOff:])
rb.index += n
var tail = length - n
n = copy(rb.data[rb.index:], rb.data[:tail])
rb.index += n
if rb.index == len(rb.data) {
rb.index = copy(rb.data, rb.data[n:tail])
}
}
}
newOff = rb.end
rb.end += int64(length)
if rb.begin < rb.end-int64(len(rb.data)) {
rb.begin = rb.end - int64(len(rb.data))
}
return
}
func (rb *RingBuf) Resize(newSize int) {
if len(rb.data) == newSize {
return
}
newData := make([]byte, newSize)
var offset int
if rb.end-rb.begin == int64(len(rb.data)) {
offset = rb.index
}
if int(rb.end-rb.begin) > newSize {
discard := int(rb.end-rb.begin) - newSize
offset = (offset + discard) % len(rb.data)
rb.begin = rb.end - int64(newSize)
}
n := copy(newData, rb.data[offset:])
if n < newSize {
copy(newData[n:], rb.data[:offset])
}
rb.data = newData
rb.index = 0
}
func (rb *RingBuf) Skip(length int64) {
rb.end += length
rb.index += int(length)
for rb.index >= len(rb.data) {
rb.index -= len(rb.data)
}
if int(rb.end-rb.begin) > len(rb.data) {
rb.begin = rb.end - int64(len(rb.data))
}
}
package freecache
import (
"errors"
"sync/atomic"
"unsafe"
)
const HASH_ENTRY_SIZE = 16
const ENTRY_HDR_SIZE = 24
var ErrLargeKey = errors.New("The key is larger than 65535")
var ErrLargeEntry = errors.New("The entry size is larger than 1/1024 of cache size")
var ErrNotFound = errors.New("Entry not found")
// entry pointer struct points to an entry in ring buffer
type entryPtr struct {
offset int64 // entry offset in ring buffer
hash16 uint16 // entries are ordered by hash16 in a slot.
keyLen uint16 // used to compare a key
reserved uint32
}
// entry header struct in ring buffer, followed by key and value.
type entryHdr struct {
accessTime uint32
expireAt uint32
keyLen uint16
hash16 uint16
valLen uint32
valCap uint32
deleted bool
slotId uint8
reserved uint16
}
// a segment contains 256 slots, a slot is an array of entry pointers ordered by hash16 value
// the entry can be looked up by hash value of the key.
type segment struct {
rb RingBuf // ring buffer that stores data
segId int
_ uint32
missCount int64
hitCount int64
entryCount int64
totalCount int64 // number of entries in ring buffer, including deleted entries.
totalTime int64 // used to calculate least recent used entry.
timer Timer // Timer giving current time
totalEvacuate int64 // used for debug
totalExpired int64 // used for debug
overwrites int64 // used for debug
touched int64 // used for debug
vacuumLen int64 // up to vacuumLen, new data can be written without overwriting old data.
slotLens [256]int32 // The actual length for every slot.
slotCap int32 // max number of entry pointers a slot can hold.
slotsData []entryPtr // shared by all 256 slots
}
func newSegment(bufSize int, segId int, timer Timer) (seg segment) {
seg.rb = NewRingBuf(bufSize, 0)
seg.segId = segId
seg.timer = timer
seg.vacuumLen = int64(bufSize)
seg.slotCap = 1
seg.slotsData = make([]entryPtr, 256*seg.slotCap)
return
}
func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) {
if len(key) > 65535 {
return ErrLargeKey
}
maxKeyValLen := len(seg.rb.data)/4 - ENTRY_HDR_SIZE
if len(key)+len(value) > maxKeyValLen {
// Do not accept large entry.
return ErrLargeEntry
}
now := seg.timer.Now()
expireAt := uint32(0)
if expireSeconds > 0 {
expireAt = now + uint32(expireSeconds)
}
slotId := uint8(hashVal >> 8)
hash16 := uint16(hashVal >> 16)
slot := seg.getSlot(slotId)
idx, match := seg.lookup(slot, hash16, key)
var hdrBuf [ENTRY_HDR_SIZE]byte
hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
if match {
matchedPtr := &slot[idx]
seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset)
hdr.slotId = slotId
hdr.hash16 = hash16
hdr.keyLen = uint16(len(key))
originAccessTime := hdr.accessTime
hdr.accessTime = now
hdr.expireAt = expireAt
hdr.valLen = uint32(len(value))
if hdr.valCap >= hdr.valLen {
// in place overwrite
atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime))
seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset)
seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
atomic.AddInt64(&seg.overwrites, 1)
return
}
// avoid unnecessary memory copy.
seg.delEntryPtr(slotId, slot, idx)
match = false
// increase capacity and limit entry len.
for hdr.valCap < hdr.valLen {
hdr.valCap *= 2
}
if hdr.valCap > uint32(maxKeyValLen-len(key)) {
hdr.valCap = uint32(maxKeyValLen - len(key))
}
} else {
hdr.slotId = slotId
hdr.hash16 = hash16
hdr.keyLen = uint16(len(key))
hdr.accessTime = now
hdr.expireAt = expireAt
hdr.valLen = uint32(len(value))
hdr.valCap = uint32(len(value))
if hdr.valCap == 0 { // avoid infinite loop when increasing capacity.
hdr.valCap = 1
}
}
entryLen := ENTRY_HDR_SIZE + int64(len(key)) + int64(hdr.valCap)
slotModified := seg.evacuate(entryLen, slotId, now)
if slotModified {
// the slot has been modified during evacuation, we need to looked up for the 'idx' again.
// otherwise there would be index out of bound error.
slot = seg.getSlot(slotId)
idx, match = seg.lookup(slot, hash16, key)
// assert(match == false)
}
newOff := seg.rb.End()
seg.insertEntryPtr(slotId, hash16, newOff, idx, hdr.keyLen)
seg.rb.Write(hdrBuf[:])
seg.rb.Write(key)
seg.rb.Write(value)
seg.rb.Skip(int64(hdr.valCap - hdr.valLen))
atomic.AddInt64(&seg.totalTime, int64(now))
atomic.AddInt64(&seg.totalCount, 1)
seg.vacuumLen -= entryLen
return
}
func (seg *segment) touch(key []byte, hashVal uint64, expireSeconds int) (err error) {
if len(key) > 65535 {
return ErrLargeKey
}
slotId := uint8(hashVal >> 8)
hash16 := uint16(hashVal >> 16)
slot := seg.getSlot(slotId)
idx, match := seg.lookup(slot, hash16, key)
if !match {
err = ErrNotFound
return
}
matchedPtr := &slot[idx]
var hdrBuf [ENTRY_HDR_SIZE]byte
seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset)
hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
now := seg.timer.Now()
if isExpired(hdr.expireAt, now) {
seg.delEntryPtr(slotId, slot, idx)
atomic.AddInt64(&seg.totalExpired, 1)
err = ErrNotFound
atomic.AddInt64(&seg.missCount, 1)
return
}
expireAt := uint32(0)
if expireSeconds > 0 {
expireAt = now + uint32(expireSeconds)
}
originAccessTime := hdr.accessTime
hdr.accessTime = now
hdr.expireAt = expireAt
// in place overwrite
atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime))
seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset)
atomic.AddInt64(&seg.touched, 1)
return
}
func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModified bool) {
var oldHdrBuf [ENTRY_HDR_SIZE]byte
consecutiveEvacuate := 0
for seg.vacuumLen < entryLen {
oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size()
seg.rb.ReadAt(oldHdrBuf[:], oldOff)
oldHdr := (*entryHdr)(unsafe.Pointer(&oldHdrBuf[0]))
oldEntryLen := ENTRY_HDR_SIZE + int64(oldHdr.keyLen) + int64(oldHdr.valCap)
if oldHdr.deleted {
consecutiveEvacuate = 0
atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime))
atomic.AddInt64(&seg.totalCount, -1)
seg.vacuumLen += oldEntryLen
continue
}
expired := isExpired(oldHdr.expireAt, now)
leastRecentUsed := int64(oldHdr.accessTime)*atomic.LoadInt64(&seg.totalCount) <= atomic.LoadInt64(&seg.totalTime)
if expired || leastRecentUsed || consecutiveEvacuate > 5 {
seg.delEntryPtrByOffset(oldHdr.slotId, oldHdr.hash16, oldOff)
if oldHdr.slotId == slotId {
slotModified = true
}
consecutiveEvacuate = 0
atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime))
atomic.AddInt64(&seg.totalCount, -1)
seg.vacuumLen += oldEntryLen
if expired {
atomic.AddInt64(&seg.totalExpired, 1)
} else {
atomic.AddInt64(&seg.totalEvacuate, 1)
}
} else {
// evacuate an old entry that has been accessed recently for better cache hit rate.
newOff := seg.rb.Evacuate(oldOff, int(oldEntryLen))
seg.updateEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff, newOff)
consecutiveEvacuate++
atomic.AddInt64(&seg.totalEvacuate, 1)
}
}
return
}
func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byte, expireAt uint32, err error) {
hdr, ptrOffset, err := seg.locate(key, hashVal, peek)
if err != nil {
return
}
expireAt = hdr.expireAt
if cap(buf) >= int(hdr.valLen) {
value = buf[:hdr.valLen]
} else {
value = make([]byte, hdr.valLen)
}
seg.rb.ReadAt(value, ptrOffset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
if !peek {
atomic.AddInt64(&seg.hitCount, 1)
}
return
}
// view provides zero-copy access to the element's value, without copying to
// an intermediate buffer.
func (seg *segment) view(key []byte, fn func([]byte) error, hashVal uint64, peek bool) (err error) {
hdr, ptrOffset, err := seg.locate(key, hashVal, peek)
if err != nil {
return
}
start := ptrOffset + ENTRY_HDR_SIZE + int64(hdr.keyLen)
val, err := seg.rb.Slice(start, int64(hdr.valLen))
if err != nil {
return err
}
err = fn(val)
if !peek {
atomic.AddInt64(&seg.hitCount, 1)
}
return
}
func (seg *segment) locate(key []byte, hashVal uint64, peek bool) (hdrEntry entryHdr, ptrOffset int64, err error) {
slotId := uint8(hashVal >> 8)
hash16 := uint16(hashVal >> 16)
slot := seg.getSlot(slotId)
idx, match := seg.lookup(slot, hash16, key)
if !match {
err = ErrNotFound
if !peek {
atomic.AddInt64(&seg.missCount, 1)
}
return
}
ptr := &slot[idx]
var hdrBuf [ENTRY_HDR_SIZE]byte
seg.rb.ReadAt(hdrBuf[:], ptr.offset)
hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
if !peek {
now := seg.timer.Now()
if isExpired(hdr.expireAt, now) {
seg.delEntryPtr(slotId, slot, idx)
atomic.AddInt64(&seg.totalExpired, 1)
err = ErrNotFound
atomic.AddInt64(&seg.missCount, 1)
return
}
atomic.AddInt64(&seg.totalTime, int64(now-hdr.accessTime))
hdr.accessTime = now
seg.rb.WriteAt(hdrBuf[:], ptr.offset)
}
return *hdr, ptr.offset, nil
}
func (seg *segment) del(key []byte, hashVal uint64) (affected bool) {
slotId := uint8(hashVal >> 8)
hash16 := uint16(hashVal >> 16)
slot := seg.getSlot(slotId)
idx, match := seg.lookup(slot, hash16, key)
if !match {
return false
}
seg.delEntryPtr(slotId, slot, idx)
return true
}
func (seg *segment) ttl(key []byte, hashVal uint64) (timeLeft uint32, err error) {
slotId := uint8(hashVal >> 8)
hash16 := uint16(hashVal >> 16)
slot := seg.getSlot(slotId)
idx, match := seg.lookup(slot, hash16, key)
if !match {
err = ErrNotFound
return
}
ptr := &slot[idx]
var hdrBuf [ENTRY_HDR_SIZE]byte
seg.rb.ReadAt(hdrBuf[:], ptr.offset)
hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
if hdr.expireAt == 0 {
return
} else {
now := seg.timer.Now()
if !isExpired(hdr.expireAt, now) {
timeLeft = hdr.expireAt - now
return
}
}
err = ErrNotFound
return
}
func (seg *segment) expand() {
newSlotData := make([]entryPtr, seg.slotCap*2*256)
for i := 0; i < 256; i++ {
off := int32(i) * seg.slotCap
copy(newSlotData[off*2:], seg.slotsData[off:off+seg.slotLens[i]])
}
seg.slotCap *= 2
seg.slotsData = newSlotData
}
func (seg *segment) updateEntryPtr(slotId uint8, hash16 uint16, oldOff, newOff int64) {
slot := seg.getSlot(slotId)
idx, match := seg.lookupByOff(slot, hash16, oldOff)
if !match {
return
}
ptr := &slot[idx]
ptr.offset = newOff
}
func (seg *segment) insertEntryPtr(slotId uint8, hash16 uint16, offset int64, idx int, keyLen uint16) {
if seg.slotLens[slotId] == seg.slotCap {
seg.expand()
}
seg.slotLens[slotId]++
atomic.AddInt64(&seg.entryCount, 1)
slot := seg.getSlot(slotId)
copy(slot[idx+1:], slot[idx:])
slot[idx].offset = offset
slot[idx].hash16 = hash16
slot[idx].keyLen = keyLen
}
func (seg *segment) delEntryPtrByOffset(slotId uint8, hash16 uint16, offset int64) {
slot := seg.getSlot(slotId)
idx, match := seg.lookupByOff(slot, hash16, offset)
if !match {
return
}
seg.delEntryPtr(slotId, slot, idx)
}
func (seg *segment) delEntryPtr(slotId uint8, slot []entryPtr, idx int) {
offset := slot[idx].offset
var entryHdrBuf [ENTRY_HDR_SIZE]byte
seg.rb.ReadAt(entryHdrBuf[:], offset)
entryHdr := (*entryHdr)(unsafe.Pointer(&entryHdrBuf[0]))
entryHdr.deleted = true
seg.rb.WriteAt(entryHdrBuf[:], offset)
copy(slot[idx:], slot[idx+1:])
seg.slotLens[slotId]--
atomic.AddInt64(&seg.entryCount, -1)
}
func entryPtrIdx(slot []entryPtr, hash16 uint16) (idx int) {
high := len(slot)
for idx < high {
mid := (idx + high) >> 1
oldEntry := &slot[mid]
if oldEntry.hash16 < hash16 {
idx = mid + 1
} else {
high = mid
}
}
return
}
func (seg *segment) lookup(slot []entryPtr, hash16 uint16, key []byte) (idx int, match bool) {
idx = entryPtrIdx(slot, hash16)
for idx < len(slot) {
ptr := &slot[idx]
if ptr.hash16 != hash16 {
break
}
match = int(ptr.keyLen) == len(key) && seg.rb.EqualAt(key, ptr.offset+ENTRY_HDR_SIZE)
if match {
return
}
idx++
}
return
}
func (seg *segment) lookupByOff(slot []entryPtr, hash16 uint16, offset int64) (idx int, match bool) {
idx = entryPtrIdx(slot, hash16)
for idx < len(slot) {
ptr := &slot[idx]
if ptr.hash16 != hash16 {
break
}
match = ptr.offset == offset
if match {
return
}
idx++
}
return
}
func (seg *segment) resetStatistics() {
atomic.StoreInt64(&seg.totalEvacuate, 0)
atomic.StoreInt64(&seg.totalExpired, 0)
atomic.StoreInt64(&seg.overwrites, 0)
atomic.StoreInt64(&seg.hitCount, 0)
atomic.StoreInt64(&seg.missCount, 0)
}
func (seg *segment) clear() {
bufSize := len(seg.rb.data)
seg.rb.Reset(0)
seg.vacuumLen = int64(bufSize)
seg.slotCap = 1
seg.slotsData = make([]entryPtr, 256*seg.slotCap)
for i := 0; i < len(seg.slotLens); i++ {
seg.slotLens[i] = 0
}
atomic.StoreInt64(&seg.hitCount, 0)
atomic.StoreInt64(&seg.missCount, 0)
atomic.StoreInt64(&seg.entryCount, 0)
atomic.StoreInt64(&seg.totalCount, 0)
atomic.StoreInt64(&seg.totalTime, 0)
atomic.StoreInt64(&seg.totalEvacuate, 0)
atomic.StoreInt64(&seg.totalExpired, 0)
atomic.StoreInt64(&seg.overwrites, 0)
}
func (seg *segment) getSlot(slotId uint8) []entryPtr {
slotOff := int32(slotId) * seg.slotCap
return seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap]
}
// isExpired checks if a key is expired.
func isExpired(keyExpireAt, now uint32) bool {
return keyExpireAt != 0 && keyExpireAt <= now
}
//A basic freecache server supports redis protocol
package main
import (
"bufio"
"bytes"
"errors"
"github.com/coocood/freecache"
"io"
"log"
"net"
"net/http"
_ "net/http/pprof"
"runtime"
"runtime/debug"
"strconv"
"time"
)
var (
protocolErr = errors.New("protocol error")
CRLF = []byte("\r\n")
PING = []byte("ping")
DBSIZE = []byte("dbsize")
ERROR_UNSUPPORTED = []byte("-ERR unsupported command\r\n")
OK = []byte("+OK\r\n")
PONG = []byte("+PONG\r\n")
GET = []byte("get")
SET = []byte("set")
SETEX = []byte("setex")
DEL = []byte("del")
NIL = []byte("$-1\r\n")
CZERO = []byte(":0\r\n")
CONE = []byte(":1\r\n")
BulkSign = []byte("$")
)
type Request struct {
args [][]byte
buf *bytes.Buffer
}
func (req *Request) Reset() {
req.args = req.args[:0]
req.buf.Reset()
}
type operation struct {
req Request
replyChan chan *bytes.Buffer
}
type Session struct {
server *Server
conn net.Conn
addr string
reader *bufio.Reader
replyChan chan *bytes.Buffer
}
type Server struct {
cache *freecache.Cache
}
func NewServer(cacheSize int) (server *Server) {
server = new(Server)
server.cache = freecache.NewCache(cacheSize)
return
}
func (server *Server) Start(addr string) error {
l, err := net.Listen("tcp", addr)
if err != nil {
log.Println(err)
return err
}
defer l.Close()
log.Println("Listening on port", addr)
for {
tcpListener := l.(*net.TCPListener)
tcpListener.SetDeadline(time.Now().Add(time.Second))
conn, err := l.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
continue
}
return err
}
session := new(Session)
session.conn = conn
session.replyChan = make(chan *bytes.Buffer, 100)
session.addr = conn.RemoteAddr().String()
session.server = server
session.reader = bufio.NewReader(conn)
go session.readLoop()
go session.writeLoop()
}
}
func copyN(buffer *bytes.Buffer, r *bufio.Reader, n int64) (err error) {
if n <= 512 {
var buf [512]byte
_, err = r.Read(buf[:n])
if err != nil {
return
}
buffer.Write(buf[:n])
} else {
_, err = io.CopyN(buffer, r, n)
}
return
}
func (server *Server) ReadClient(r *bufio.Reader, req *Request) (err error) {
line, err := readLine(r)
if err != nil {
return
}
if len(line) == 0 || line[0] != '*' {
err = protocolErr
return
}
argc, err := btoi(line[1:])
if err != nil {
return
}
if argc <= 0 || argc > 4 {
err = protocolErr
return
}
var argStarts [4]int
var argEnds [4]int
req.buf.Write(line)
req.buf.Write(CRLF)
cursor := len(line) + 2
for i := 0; i < argc; i++ {
line, err = readLine(r)
if err != nil {
return
}
if len(line) == 0 || line[0] != '$' {
err = protocolErr
return
}
var argLen int
argLen, err = btoi(line[1:])
if err != nil {
return
}
if argLen < 0 || argLen > 512*1024*1024 {
err = protocolErr
return
}
req.buf.Write(line)
req.buf.Write(CRLF)
cursor += len(line) + 2
err = copyN(req.buf, r, int64(argLen)+2)
if err != nil {
return
}
argStarts[i] = cursor
argEnds[i] = cursor + argLen
cursor += argLen + 2
}
data := req.buf.Bytes()
for i := 0; i < argc; i++ {
req.args = append(req.args, data[argStarts[i]:argEnds[i]])
}
lower(req.args[0])
return
}
func (down *Session) readLoop() {
var req = new(Request)
req.buf = new(bytes.Buffer)
for {
req.Reset()
err := down.server.ReadClient(down.reader, req)
if err != nil {
close(down.replyChan)
return
}
reply := new(bytes.Buffer)
if len(req.args) == 4 && bytes.Equal(req.args[0], SETEX) {
expire, err := btoi(req.args[2])
if err != nil {
reply.Write(ERROR_UNSUPPORTED)
} else {
down.server.cache.Set(req.args[1], req.args[3], expire)
reply.Write(OK)
}
} else if len(req.args) == 3 && bytes.Equal(req.args[0], SET) {
down.server.cache.Set(req.args[1], req.args[2], 0)
reply.Write(OK)
} else if len(req.args) == 2 {
if bytes.Equal(req.args[0], GET) {
value, err := down.server.cache.Get(req.args[1])
if err != nil {
reply.Write(NIL)
} else {
bukLen := strconv.Itoa(len(value))
reply.Write(BulkSign)
reply.WriteString(bukLen)
reply.Write(CRLF)
reply.Write(value)
reply.Write(CRLF)
}
} else if bytes.Equal(req.args[0], DEL) {
if down.server.cache.Del(req.args[1]) {
reply.Write(CONE)
} else {
reply.Write(CZERO)
}
}
} else if len(req.args) == 1 {
if bytes.Equal(req.args[0], PING) {
reply.Write(PONG)
} else if bytes.Equal(req.args[0], DBSIZE) {
entryCount := down.server.cache.EntryCount()
reply.WriteString(":")
reply.WriteString(strconv.Itoa(int(entryCount)))
reply.Write(CRLF)
} else {
reply.Write(ERROR_UNSUPPORTED)
}
}
down.replyChan <- reply
}
}
func (down *Session) writeLoop() {
var buffer = bytes.NewBuffer(nil)
var replies = make([]*bytes.Buffer, 1)
for {
buffer.Reset()
select {
case reply, ok := <-down.replyChan:
if !ok {
down.conn.Close()
return
}
replies = replies[:1]
replies[0] = reply
queueLen := len(down.replyChan)
for i := 0; i < queueLen; i++ {
reply = <-down.replyChan
replies = append(replies, reply)
}
for _, reply := range replies {
if reply == nil {
buffer.Write(NIL)
continue
}
buffer.Write(reply.Bytes())
}
_, err := down.conn.Write(buffer.Bytes())
if err != nil {
down.conn.Close()
return
}
}
}
}
func readLine(r *bufio.Reader) ([]byte, error) {
p, err := r.ReadSlice('\n')
if err != nil {
return nil, err
}
i := len(p) - 2
if i < 0 || p[i] != '\r' {
return nil, protocolErr
}
return p[:i], nil
}
func btoi(data []byte) (int, error) {
if len(data) == 0 {
return 0, nil
}
i := 0
sign := 1
if data[0] == '-' {
i++
sign *= -1
}
if i >= len(data) {
return 0, protocolErr
}
var l int
for ; i < len(data); i++ {
c := data[i]
if c < '0' || c > '9' {
return 0, protocolErr
}
l = l*10 + int(c-'0')
}
return sign * l, nil
}
func lower(data []byte) {
for i := 0; i < len(data); i++ {
if data[i] >= 'A' && data[i] <= 'Z' {
data[i] += 'a' - 'A'
}
}
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU() - 1)
server := NewServer(256 * 1024 * 1024)
debug.SetGCPercent(10)
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
server.Start(":7788")
}
package freecache
import (
"sync/atomic"
"time"
)
// Timer holds representation of current time.
type Timer interface {
// Give current time (in seconds)
Now() uint32
}
// Timer that must be stopped.
type StoppableTimer interface {
Timer
// Release resources of the timer, functionality may or may not be affected
// It is not called automatically, so user must call it just once
Stop()
}
// Helper function that returns Unix time in seconds
func getUnixTime() uint32 {
return uint32(time.Now().Unix())
}
// Default timer reads Unix time always when requested
type defaultTimer struct{}
func (timer defaultTimer) Now() uint32 {
return getUnixTime()
}
// Cached timer stores Unix time every second and returns the cached value
type cachedTimer struct {
now uint32
ticker *time.Ticker
done chan bool
}
// Create cached timer and start runtime timer that updates time every second
func NewCachedTimer() StoppableTimer {
timer := &cachedTimer{
now: getUnixTime(),
ticker: time.NewTicker(time.Second),
done: make(chan bool),
}
go timer.update()
return timer
}
func (timer *cachedTimer) Now() uint32 {
return atomic.LoadUint32(&timer.now)
}
// Stop runtime timer and finish routine that updates time
func (timer *cachedTimer) Stop() {
timer.ticker.Stop()
timer.done <- true
close(timer.done)
timer.done = nil
timer.ticker = nil
}
// Periodically check and update of time
func (timer *cachedTimer) update() {
for {
select {
case <-timer.done:
return
case <-timer.ticker.C:
atomic.StoreUint32(&timer.now, getUnixTime())
}
}
}