package freecache import ( "encoding/binary" "sync" "sync/atomic" "github.com/cespare/xxhash/v2" ) const ( // segmentCount represents the number of segments within a freecache instance. segmentCount = 256 // segmentAndOpVal is bitwise AND applied to the hashVal to find the segment id. segmentAndOpVal = 255 minBufSize = 512 * 1024 ) // Cache is a freecache instance. type Cache struct { locks [segmentCount]sync.Mutex segments [segmentCount]segment } type Updater func(value []byte, found bool) (newValue []byte, replace bool, expireSeconds int) func hashFunc(data []byte) uint64 { return xxhash.Sum64(data) } // NewCache returns a newly initialize cache by size. // The cache size will be set to 512KB at minimum. // If the size is set relatively large, you should call // `debug.SetGCPercent()`, set it to a much smaller value // to limit the memory consumption and GC pause time. func NewCache(size int) (cache *Cache) { return NewCacheCustomTimer(size, defaultTimer{}) } // NewCacheCustomTimer returns new cache with custom timer. func NewCacheCustomTimer(size int, timer Timer) (cache *Cache) { if size < minBufSize { size = minBufSize } if timer == nil { timer = defaultTimer{} } cache = new(Cache) for i := 0; i < segmentCount; i++ { cache.segments[i] = newSegment(size/segmentCount, i, timer) } return } // Set sets a key, value and expiration for a cache entry and stores it in the cache. // If the key is larger than 65535 or value is larger than 1/1024 of the cache size, // the entry will not be written to the cache. expireSeconds <= 0 means no expire, // but it can be evicted when cache is full. func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) { hashVal := hashFunc(key) segID := hashVal & segmentAndOpVal cache.locks[segID].Lock() err = cache.segments[segID].set(key, value, hashVal, expireSeconds) cache.locks[segID].Unlock() return } // Touch updates the expiration time of an existing key. expireSeconds <= 0 means no expire, // but it can be evicted when cache is full. func (cache *Cache) Touch(key []byte, expireSeconds int) (err error) { hashVal := hashFunc(key) segID := hashVal & segmentAndOpVal cache.locks[segID].Lock() err = cache.segments[segID].touch(key, hashVal, expireSeconds) cache.locks[segID].Unlock() return } // Get returns the value or not found error. func (cache *Cache) Get(key []byte) (value []byte, err error) { hashVal := hashFunc(key) segID := hashVal & segmentAndOpVal cache.locks[segID].Lock() value, _, err = cache.segments[segID].get(key, nil, hashVal, false) cache.locks[segID].Unlock() return } // GetFn is equivalent to Get or GetWithBuf, but it attempts to be zero-copy, // calling the provided function with slice view over the current underlying // value of the key in memory. The slice is constrained in length and capacity. // // In moth cases, this method will not alloc a byte buffer. The only exception // is when the value wraps around the underlying segment ring buffer. // // The method will return ErrNotFound is there's a miss, and the function will // not be called. Errors returned by the function will be propagated. func (cache *Cache) GetFn(key []byte, fn func([]byte) error) (err error) { hashVal := hashFunc(key) segID := hashVal & segmentAndOpVal cache.locks[segID].Lock() err = cache.segments[segID].view(key, fn, hashVal, false) cache.locks[segID].Unlock() return } // GetOrSet returns existing value or if record doesn't exist // it sets a new key, value and expiration for a cache entry and stores it in the cache, returns nil in that case func (cache *Cache) GetOrSet(key, value []byte, expireSeconds int) (retValue []byte, err error) { hashVal := hashFunc(key) segID := hashVal & segmentAndOpVal cache.locks[segID].Lock() defer cache.locks[segID].Unlock() retValue, _, err = cache.segments[segID].get(key, nil, hashVal, false) if err != nil { err = cache.segments[segID].set(key, value, hashVal, expireSeconds) } return } // SetAndGet sets a key, value and expiration for a cache entry and stores it in the cache. // If the key is larger than 65535 or value is larger than 1/1024 of the cache size, // the entry will not be written to the cache. expireSeconds <= 0 means no expire, // but it can be evicted when cache is full. Returns existing value if record exists // with a bool value to indicate whether an existing record was found func (cache *Cache) SetAndGet(key, value []byte, expireSeconds int) (retValue []byte, found bool, err error) { hashVal := hashFunc(key) segID := hashVal & segmentAndOpVal cache.locks[segID].Lock() defer cache.locks[segID].Unlock() retValue, _, err = cache.segments[segID].get(key, nil, hashVal, false) if err == nil { found = true } err = cache.segments[segID].set(key, value, hashVal, expireSeconds) return } // Update gets value for a key, passes it to updater function that decides if set should be called as well // This allows for an atomic Get plus Set call using the existing value to decide on whether to call Set. // If the key is larger than 65535 or value is larger than 1/1024 of the cache size, // the entry will not be written to the cache. expireSeconds <= 0 means no expire, // but it can be evicted when cache is full. Returns bool value to indicate if existing record was found along with bool // value indicating the value was replaced and error if any func (cache *Cache) Update(key []byte, updater Updater) (found bool, replaced bool, err error) { hashVal := hashFunc(key) segID := hashVal & segmentAndOpVal cache.locks[segID].Lock() defer cache.locks[segID].Unlock() retValue, _, err := cache.segments[segID].get(key, nil, hashVal, false) if err == nil { found = true } else { err = nil // Clear ErrNotFound error since we're returning found flag } value, replaced, expireSeconds := updater(retValue, found) if !replaced { return } err = cache.segments[segID].set(key, value, hashVal, expireSeconds) return } // Peek returns the value or not found error, without updating access time or counters. // Warning: No expiry check is performed so if an expired value is found, it will be // returned without error func (cache *Cache) Peek(key []byte) (value []byte, err error) { hashVal := hashFunc(key) segID := hashVal & segmentAndOpVal cache.locks[segID].Lock() value, _, err = cache.segments[segID].get(key, nil, hashVal, true) cache.locks[segID].Unlock() return } // PeekFn is equivalent to Peek, but it attempts to be zero-copy, calling the // provided function with slice view over the current underlying value of the // key in memory. The slice is constrained in length and capacity. // // In most cases, this method will not alloc a byte buffer. The only exception // is when the value wraps around the underlying segment ring buffer. // // The method will return ErrNotFound is there's a miss, and the function will // not be called. Errors returned by the function will be propagated. // Warning: No expiry check is performed so if an expired value is found, it will be // returned without error func (cache *Cache) PeekFn(key []byte, fn func([]byte) error) (err error) { hashVal := hashFunc(key) segID := hashVal & segmentAndOpVal cache.locks[segID].Lock() err = cache.segments[segID].view(key, fn, hashVal, true) cache.locks[segID].Unlock() return } // GetWithBuf copies the value to the buf or returns not found error. // This method doesn't allocate memory when the capacity of buf is greater or equal to value. func (cache *Cache) GetWithBuf(key, buf []byte) (value []byte, err error) { hashVal := hashFunc(key) segID := hashVal & segmentAndOpVal cache.locks[segID].Lock() value, _, err = cache.segments[segID].get(key, buf, hashVal, false) cache.locks[segID].Unlock() return } // GetWithExpiration returns the value with expiration or not found error. func (cache *Cache) GetWithExpiration(key []byte) (value []byte, expireAt uint32, err error) { hashVal := hashFunc(key) segID := hashVal & segmentAndOpVal cache.locks[segID].Lock() value, expireAt, err = cache.segments[segID].get(key, nil, hashVal, false) cache.locks[segID].Unlock() return } // TTL returns the TTL time left for a given key or a not found error. func (cache *Cache) TTL(key []byte) (timeLeft uint32, err error) { hashVal := hashFunc(key) segID := hashVal & segmentAndOpVal cache.locks[segID].Lock() timeLeft, err = cache.segments[segID].ttl(key, hashVal) cache.locks[segID].Unlock() return } // Del deletes an item in the cache by key and returns true or false if a delete occurred. func (cache *Cache) Del(key []byte) (affected bool) { hashVal := hashFunc(key) segID := hashVal & segmentAndOpVal cache.locks[segID].Lock() affected = cache.segments[segID].del(key, hashVal) cache.locks[segID].Unlock() return } // SetInt stores in integer value in the cache. func (cache *Cache) SetInt(key int64, value []byte, expireSeconds int) (err error) { var bKey [8]byte binary.LittleEndian.PutUint64(bKey[:], uint64(key)) return cache.Set(bKey[:], value, expireSeconds) } // GetInt returns the value for an integer within the cache or a not found error. func (cache *Cache) GetInt(key int64) (value []byte, err error) { var bKey [8]byte binary.LittleEndian.PutUint64(bKey[:], uint64(key)) return cache.Get(bKey[:]) } // GetIntWithExpiration returns the value and expiration or a not found error. func (cache *Cache) GetIntWithExpiration(key int64) (value []byte, expireAt uint32, err error) { var bKey [8]byte binary.LittleEndian.PutUint64(bKey[:], uint64(key)) return cache.GetWithExpiration(bKey[:]) } // DelInt deletes an item in the cache by int key and returns true or false if a delete occurred. func (cache *Cache) DelInt(key int64) (affected bool) { var bKey [8]byte binary.LittleEndian.PutUint64(bKey[:], uint64(key)) return cache.Del(bKey[:]) } // EvacuateCount is a metric indicating the number of times an eviction occurred. func (cache *Cache) EvacuateCount() (count int64) { for i := range cache.segments { count += atomic.LoadInt64(&cache.segments[i].totalEvacuate) } return } // ExpiredCount is a metric indicating the number of times an expire occurred. func (cache *Cache) ExpiredCount() (count int64) { for i := range cache.segments { count += atomic.LoadInt64(&cache.segments[i].totalExpired) } return } // EntryCount returns the number of items currently in the cache. func (cache *Cache) EntryCount() (entryCount int64) { for i := range cache.segments { entryCount += atomic.LoadInt64(&cache.segments[i].entryCount) } return } // AverageAccessTime returns the average unix timestamp when a entry being accessed. // Entries have greater access time will be evacuated when it // is about to be overwritten by new value. func (cache *Cache) AverageAccessTime() int64 { var entryCount, totalTime int64 for i := range cache.segments { totalTime += atomic.LoadInt64(&cache.segments[i].totalTime) entryCount += atomic.LoadInt64(&cache.segments[i].totalCount) } if entryCount == 0 { return 0 } else { return totalTime / entryCount } } // HitCount is a metric that returns number of times a key was found in the cache. func (cache *Cache) HitCount() (count int64) { for i := range cache.segments { count += atomic.LoadInt64(&cache.segments[i].hitCount) } return } // MissCount is a metric that returns the number of times a miss occurred in the cache. func (cache *Cache) MissCount() (count int64) { for i := range cache.segments { count += atomic.LoadInt64(&cache.segments[i].missCount) } return } // LookupCount is a metric that returns the number of times a lookup for a given key occurred. func (cache *Cache) LookupCount() int64 { return cache.HitCount() + cache.MissCount() } // HitRate is the ratio of hits over lookups. func (cache *Cache) HitRate() float64 { hitCount, missCount := cache.HitCount(), cache.MissCount() lookupCount := hitCount + missCount if lookupCount == 0 { return 0 } else { return float64(hitCount) / float64(lookupCount) } } // OverwriteCount indicates the number of times entries have been overridden. func (cache *Cache) OverwriteCount() (overwriteCount int64) { for i := range cache.segments { overwriteCount += atomic.LoadInt64(&cache.segments[i].overwrites) } return } // TouchedCount indicates the number of times entries have had their expiration time extended. func (cache *Cache) TouchedCount() (touchedCount int64) { for i := range cache.segments { touchedCount += atomic.LoadInt64(&cache.segments[i].touched) } return } // Clear clears the cache. func (cache *Cache) Clear() { for i := range cache.segments { cache.locks[i].Lock() cache.segments[i].clear() cache.locks[i].Unlock() } } // ResetStatistics refreshes the current state of the statistics. func (cache *Cache) ResetStatistics() { for i := range cache.segments { cache.locks[i].Lock() cache.segments[i].resetStatistics() cache.locks[i].Unlock() } }
package freecache import ( "unsafe" ) // Iterator iterates the entries for the cache. type Iterator struct { cache *Cache segmentIdx int slotIdx int entryIdx int } // Entry represents a key/value pair. type Entry struct { Key []byte Value []byte ExpireAt uint32 } // Next returns the next entry for the iterator. // The order of the entries is not guaranteed. // If there is no more entries to return, nil will be returned. func (it *Iterator) Next() *Entry { for it.segmentIdx < 256 { entry := it.nextForSegment(it.segmentIdx) if entry != nil { return entry } it.segmentIdx++ it.slotIdx = 0 it.entryIdx = 0 } return nil } func (it *Iterator) nextForSegment(segIdx int) *Entry { it.cache.locks[segIdx].Lock() defer it.cache.locks[segIdx].Unlock() seg := &it.cache.segments[segIdx] for it.slotIdx < 256 { entry := it.nextForSlot(seg, it.slotIdx) if entry != nil { return entry } it.slotIdx++ it.entryIdx = 0 } return nil } func (it *Iterator) nextForSlot(seg *segment, slotId int) *Entry { slotOff := int32(it.slotIdx) * seg.slotCap slot := seg.slotsData[slotOff : slotOff+seg.slotLens[it.slotIdx] : slotOff+seg.slotCap] for it.entryIdx < len(slot) { ptr := slot[it.entryIdx] it.entryIdx++ now := seg.timer.Now() var hdrBuf [ENTRY_HDR_SIZE]byte seg.rb.ReadAt(hdrBuf[:], ptr.offset) hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0])) if hdr.expireAt == 0 || hdr.expireAt > now { entry := new(Entry) entry.Key = make([]byte, hdr.keyLen) entry.Value = make([]byte, hdr.valLen) entry.ExpireAt = hdr.expireAt seg.rb.ReadAt(entry.Key, ptr.offset+ENTRY_HDR_SIZE) seg.rb.ReadAt(entry.Value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) return entry } } return nil } // NewIterator creates a new iterator for the cache. func (cache *Cache) NewIterator() *Iterator { return &Iterator{ cache: cache, } }
package freecache import ( "bytes" "errors" "fmt" "io" ) var ErrOutOfRange = errors.New("out of range") // Ring buffer has a fixed size, when data exceeds the // size, old data will be overwritten by new data. // It only contains the data in the stream from begin to end type RingBuf struct { begin int64 // beginning offset of the data stream. end int64 // ending offset of the data stream. data []byte index int //range from '0' to 'len(rb.data)-1' } func NewRingBuf(size int, begin int64) (rb RingBuf) { rb.data = make([]byte, size) rb.Reset(begin) return } // Reset the ring buffer // // Parameters: // begin: beginning offset of the data stream func (rb *RingBuf) Reset(begin int64) { rb.begin = begin rb.end = begin rb.index = 0 } // Create a copy of the buffer. func (rb *RingBuf) Dump() []byte { dump := make([]byte, len(rb.data)) copy(dump, rb.data) return dump } func (rb *RingBuf) String() string { return fmt.Sprintf("[size:%v, start:%v, end:%v, index:%v]", len(rb.data), rb.begin, rb.end, rb.index) } func (rb *RingBuf) Size() int64 { return int64(len(rb.data)) } func (rb *RingBuf) Begin() int64 { return rb.begin } func (rb *RingBuf) End() int64 { return rb.end } // read up to len(p), at off of the data stream. func (rb *RingBuf) ReadAt(p []byte, off int64) (n int, err error) { if off > rb.end || off < rb.begin { err = ErrOutOfRange return } readOff := rb.getDataOff(off) readEnd := readOff + int(rb.end-off) if readEnd <= len(rb.data) { n = copy(p, rb.data[readOff:readEnd]) } else { n = copy(p, rb.data[readOff:]) if n < len(p) { n += copy(p[n:], rb.data[:readEnd-len(rb.data)]) } } if n < len(p) { err = io.EOF } return } func (rb *RingBuf) getDataOff(off int64) int { var dataOff int if rb.end-rb.begin < int64(len(rb.data)) { dataOff = int(off - rb.begin) } else { dataOff = rb.index + int(off-rb.begin) } if dataOff >= len(rb.data) { dataOff -= len(rb.data) } return dataOff } // Slice returns a slice of the supplied range of the ring buffer. It will // not alloc unless the requested range wraps the ring buffer. func (rb *RingBuf) Slice(off, length int64) ([]byte, error) { if off > rb.end || off < rb.begin { return nil, ErrOutOfRange } readOff := rb.getDataOff(off) readEnd := readOff + int(length) if readEnd <= len(rb.data) { return rb.data[readOff:readEnd:readEnd], nil } buf := make([]byte, length) n := copy(buf, rb.data[readOff:]) if n < int(length) { n += copy(buf[n:], rb.data[:readEnd-len(rb.data)]) } if n < int(length) { return nil, io.EOF } return buf, nil } func (rb *RingBuf) Write(p []byte) (n int, err error) { if len(p) > len(rb.data) { err = ErrOutOfRange return } for n < len(p) { written := copy(rb.data[rb.index:], p[n:]) rb.end += int64(written) n += written rb.index += written if rb.index >= len(rb.data) { rb.index -= len(rb.data) } } if int(rb.end-rb.begin) > len(rb.data) { rb.begin = rb.end - int64(len(rb.data)) } return } func (rb *RingBuf) WriteAt(p []byte, off int64) (n int, err error) { if off+int64(len(p)) > rb.end || off < rb.begin { err = ErrOutOfRange return } writeOff := rb.getDataOff(off) writeEnd := writeOff + int(rb.end-off) if writeEnd <= len(rb.data) { n = copy(rb.data[writeOff:writeEnd], p) } else { n = copy(rb.data[writeOff:], p) if n < len(p) { n += copy(rb.data[:writeEnd-len(rb.data)], p[n:]) } } return } func (rb *RingBuf) EqualAt(p []byte, off int64) bool { if off+int64(len(p)) > rb.end || off < rb.begin { return false } readOff := rb.getDataOff(off) readEnd := readOff + len(p) if readEnd <= len(rb.data) { return bytes.Equal(p, rb.data[readOff:readEnd]) } else { firstLen := len(rb.data) - readOff equal := bytes.Equal(p[:firstLen], rb.data[readOff:]) if equal { secondLen := len(p) - firstLen equal = bytes.Equal(p[firstLen:], rb.data[:secondLen]) } return equal } } // Evacuate read the data at off, then write it to the the data stream, // Keep it from being overwritten by new data. func (rb *RingBuf) Evacuate(off int64, length int) (newOff int64) { if off+int64(length) > rb.end || off < rb.begin { return -1 } readOff := rb.getDataOff(off) if readOff == rb.index { // no copy evacuate rb.index += length if rb.index >= len(rb.data) { rb.index -= len(rb.data) } } else if readOff < rb.index { var n = copy(rb.data[rb.index:], rb.data[readOff:readOff+length]) rb.index += n if rb.index == len(rb.data) { rb.index = copy(rb.data, rb.data[readOff+n:readOff+length]) } } else { var readEnd = readOff + length var n int if readEnd <= len(rb.data) { n = copy(rb.data[rb.index:], rb.data[readOff:readEnd]) rb.index += n } else { n = copy(rb.data[rb.index:], rb.data[readOff:]) rb.index += n var tail = length - n n = copy(rb.data[rb.index:], rb.data[:tail]) rb.index += n if rb.index == len(rb.data) { rb.index = copy(rb.data, rb.data[n:tail]) } } } newOff = rb.end rb.end += int64(length) if rb.begin < rb.end-int64(len(rb.data)) { rb.begin = rb.end - int64(len(rb.data)) } return } func (rb *RingBuf) Resize(newSize int) { if len(rb.data) == newSize { return } newData := make([]byte, newSize) var offset int if rb.end-rb.begin == int64(len(rb.data)) { offset = rb.index } if int(rb.end-rb.begin) > newSize { discard := int(rb.end-rb.begin) - newSize offset = (offset + discard) % len(rb.data) rb.begin = rb.end - int64(newSize) } n := copy(newData, rb.data[offset:]) if n < newSize { copy(newData[n:], rb.data[:offset]) } rb.data = newData rb.index = 0 } func (rb *RingBuf) Skip(length int64) { rb.end += length rb.index += int(length) for rb.index >= len(rb.data) { rb.index -= len(rb.data) } if int(rb.end-rb.begin) > len(rb.data) { rb.begin = rb.end - int64(len(rb.data)) } }
package freecache import ( "errors" "sync/atomic" "unsafe" ) const HASH_ENTRY_SIZE = 16 const ENTRY_HDR_SIZE = 24 var ErrLargeKey = errors.New("The key is larger than 65535") var ErrLargeEntry = errors.New("The entry size is larger than 1/1024 of cache size") var ErrNotFound = errors.New("Entry not found") // entry pointer struct points to an entry in ring buffer type entryPtr struct { offset int64 // entry offset in ring buffer hash16 uint16 // entries are ordered by hash16 in a slot. keyLen uint16 // used to compare a key reserved uint32 } // entry header struct in ring buffer, followed by key and value. type entryHdr struct { accessTime uint32 expireAt uint32 keyLen uint16 hash16 uint16 valLen uint32 valCap uint32 deleted bool slotId uint8 reserved uint16 } // a segment contains 256 slots, a slot is an array of entry pointers ordered by hash16 value // the entry can be looked up by hash value of the key. type segment struct { rb RingBuf // ring buffer that stores data segId int _ uint32 missCount int64 hitCount int64 entryCount int64 totalCount int64 // number of entries in ring buffer, including deleted entries. totalTime int64 // used to calculate least recent used entry. timer Timer // Timer giving current time totalEvacuate int64 // used for debug totalExpired int64 // used for debug overwrites int64 // used for debug touched int64 // used for debug vacuumLen int64 // up to vacuumLen, new data can be written without overwriting old data. slotLens [256]int32 // The actual length for every slot. slotCap int32 // max number of entry pointers a slot can hold. slotsData []entryPtr // shared by all 256 slots } func newSegment(bufSize int, segId int, timer Timer) (seg segment) { seg.rb = NewRingBuf(bufSize, 0) seg.segId = segId seg.timer = timer seg.vacuumLen = int64(bufSize) seg.slotCap = 1 seg.slotsData = make([]entryPtr, 256*seg.slotCap) return } func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) { if len(key) > 65535 { return ErrLargeKey } maxKeyValLen := len(seg.rb.data)/4 - ENTRY_HDR_SIZE if len(key)+len(value) > maxKeyValLen { // Do not accept large entry. return ErrLargeEntry } now := seg.timer.Now() expireAt := uint32(0) if expireSeconds > 0 { expireAt = now + uint32(expireSeconds) } slotId := uint8(hashVal >> 8) hash16 := uint16(hashVal >> 16) slot := seg.getSlot(slotId) idx, match := seg.lookup(slot, hash16, key) var hdrBuf [ENTRY_HDR_SIZE]byte hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0])) if match { matchedPtr := &slot[idx] seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset) hdr.slotId = slotId hdr.hash16 = hash16 hdr.keyLen = uint16(len(key)) originAccessTime := hdr.accessTime hdr.accessTime = now hdr.expireAt = expireAt hdr.valLen = uint32(len(value)) if hdr.valCap >= hdr.valLen { // in place overwrite atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime)) seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset) seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) atomic.AddInt64(&seg.overwrites, 1) return } // avoid unnecessary memory copy. seg.delEntryPtr(slotId, slot, idx) match = false // increase capacity and limit entry len. for hdr.valCap < hdr.valLen { hdr.valCap *= 2 } if hdr.valCap > uint32(maxKeyValLen-len(key)) { hdr.valCap = uint32(maxKeyValLen - len(key)) } } else { hdr.slotId = slotId hdr.hash16 = hash16 hdr.keyLen = uint16(len(key)) hdr.accessTime = now hdr.expireAt = expireAt hdr.valLen = uint32(len(value)) hdr.valCap = uint32(len(value)) if hdr.valCap == 0 { // avoid infinite loop when increasing capacity. hdr.valCap = 1 } } entryLen := ENTRY_HDR_SIZE + int64(len(key)) + int64(hdr.valCap) slotModified := seg.evacuate(entryLen, slotId, now) if slotModified { // the slot has been modified during evacuation, we need to looked up for the 'idx' again. // otherwise there would be index out of bound error. slot = seg.getSlot(slotId) idx, match = seg.lookup(slot, hash16, key) // assert(match == false) } newOff := seg.rb.End() seg.insertEntryPtr(slotId, hash16, newOff, idx, hdr.keyLen) seg.rb.Write(hdrBuf[:]) seg.rb.Write(key) seg.rb.Write(value) seg.rb.Skip(int64(hdr.valCap - hdr.valLen)) atomic.AddInt64(&seg.totalTime, int64(now)) atomic.AddInt64(&seg.totalCount, 1) seg.vacuumLen -= entryLen return } func (seg *segment) touch(key []byte, hashVal uint64, expireSeconds int) (err error) { if len(key) > 65535 { return ErrLargeKey } slotId := uint8(hashVal >> 8) hash16 := uint16(hashVal >> 16) slot := seg.getSlot(slotId) idx, match := seg.lookup(slot, hash16, key) if !match { err = ErrNotFound return } matchedPtr := &slot[idx] var hdrBuf [ENTRY_HDR_SIZE]byte seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset) hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0])) now := seg.timer.Now() if isExpired(hdr.expireAt, now) { seg.delEntryPtr(slotId, slot, idx) atomic.AddInt64(&seg.totalExpired, 1) err = ErrNotFound atomic.AddInt64(&seg.missCount, 1) return } expireAt := uint32(0) if expireSeconds > 0 { expireAt = now + uint32(expireSeconds) } originAccessTime := hdr.accessTime hdr.accessTime = now hdr.expireAt = expireAt // in place overwrite atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime)) seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset) atomic.AddInt64(&seg.touched, 1) return } func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModified bool) { var oldHdrBuf [ENTRY_HDR_SIZE]byte consecutiveEvacuate := 0 for seg.vacuumLen < entryLen { oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size() seg.rb.ReadAt(oldHdrBuf[:], oldOff) oldHdr := (*entryHdr)(unsafe.Pointer(&oldHdrBuf[0])) oldEntryLen := ENTRY_HDR_SIZE + int64(oldHdr.keyLen) + int64(oldHdr.valCap) if oldHdr.deleted { consecutiveEvacuate = 0 atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime)) atomic.AddInt64(&seg.totalCount, -1) seg.vacuumLen += oldEntryLen continue } expired := isExpired(oldHdr.expireAt, now) leastRecentUsed := int64(oldHdr.accessTime)*atomic.LoadInt64(&seg.totalCount) <= atomic.LoadInt64(&seg.totalTime) if expired || leastRecentUsed || consecutiveEvacuate > 5 { seg.delEntryPtrByOffset(oldHdr.slotId, oldHdr.hash16, oldOff) if oldHdr.slotId == slotId { slotModified = true } consecutiveEvacuate = 0 atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime)) atomic.AddInt64(&seg.totalCount, -1) seg.vacuumLen += oldEntryLen if expired { atomic.AddInt64(&seg.totalExpired, 1) } else { atomic.AddInt64(&seg.totalEvacuate, 1) } } else { // evacuate an old entry that has been accessed recently for better cache hit rate. newOff := seg.rb.Evacuate(oldOff, int(oldEntryLen)) seg.updateEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff, newOff) consecutiveEvacuate++ atomic.AddInt64(&seg.totalEvacuate, 1) } } return } func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byte, expireAt uint32, err error) { hdr, ptrOffset, err := seg.locate(key, hashVal, peek) if err != nil { return } expireAt = hdr.expireAt if cap(buf) >= int(hdr.valLen) { value = buf[:hdr.valLen] } else { value = make([]byte, hdr.valLen) } seg.rb.ReadAt(value, ptrOffset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) if !peek { atomic.AddInt64(&seg.hitCount, 1) } return } // view provides zero-copy access to the element's value, without copying to // an intermediate buffer. func (seg *segment) view(key []byte, fn func([]byte) error, hashVal uint64, peek bool) (err error) { hdr, ptrOffset, err := seg.locate(key, hashVal, peek) if err != nil { return } start := ptrOffset + ENTRY_HDR_SIZE + int64(hdr.keyLen) val, err := seg.rb.Slice(start, int64(hdr.valLen)) if err != nil { return err } err = fn(val) if !peek { atomic.AddInt64(&seg.hitCount, 1) } return } func (seg *segment) locate(key []byte, hashVal uint64, peek bool) (hdrEntry entryHdr, ptrOffset int64, err error) { slotId := uint8(hashVal >> 8) hash16 := uint16(hashVal >> 16) slot := seg.getSlot(slotId) idx, match := seg.lookup(slot, hash16, key) if !match { err = ErrNotFound if !peek { atomic.AddInt64(&seg.missCount, 1) } return } ptr := &slot[idx] var hdrBuf [ENTRY_HDR_SIZE]byte seg.rb.ReadAt(hdrBuf[:], ptr.offset) hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0])) if !peek { now := seg.timer.Now() if isExpired(hdr.expireAt, now) { seg.delEntryPtr(slotId, slot, idx) atomic.AddInt64(&seg.totalExpired, 1) err = ErrNotFound atomic.AddInt64(&seg.missCount, 1) return } atomic.AddInt64(&seg.totalTime, int64(now-hdr.accessTime)) hdr.accessTime = now seg.rb.WriteAt(hdrBuf[:], ptr.offset) } return *hdr, ptr.offset, nil } func (seg *segment) del(key []byte, hashVal uint64) (affected bool) { slotId := uint8(hashVal >> 8) hash16 := uint16(hashVal >> 16) slot := seg.getSlot(slotId) idx, match := seg.lookup(slot, hash16, key) if !match { return false } seg.delEntryPtr(slotId, slot, idx) return true } func (seg *segment) ttl(key []byte, hashVal uint64) (timeLeft uint32, err error) { slotId := uint8(hashVal >> 8) hash16 := uint16(hashVal >> 16) slot := seg.getSlot(slotId) idx, match := seg.lookup(slot, hash16, key) if !match { err = ErrNotFound return } ptr := &slot[idx] var hdrBuf [ENTRY_HDR_SIZE]byte seg.rb.ReadAt(hdrBuf[:], ptr.offset) hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0])) if hdr.expireAt == 0 { return } else { now := seg.timer.Now() if !isExpired(hdr.expireAt, now) { timeLeft = hdr.expireAt - now return } } err = ErrNotFound return } func (seg *segment) expand() { newSlotData := make([]entryPtr, seg.slotCap*2*256) for i := 0; i < 256; i++ { off := int32(i) * seg.slotCap copy(newSlotData[off*2:], seg.slotsData[off:off+seg.slotLens[i]]) } seg.slotCap *= 2 seg.slotsData = newSlotData } func (seg *segment) updateEntryPtr(slotId uint8, hash16 uint16, oldOff, newOff int64) { slot := seg.getSlot(slotId) idx, match := seg.lookupByOff(slot, hash16, oldOff) if !match { return } ptr := &slot[idx] ptr.offset = newOff } func (seg *segment) insertEntryPtr(slotId uint8, hash16 uint16, offset int64, idx int, keyLen uint16) { if seg.slotLens[slotId] == seg.slotCap { seg.expand() } seg.slotLens[slotId]++ atomic.AddInt64(&seg.entryCount, 1) slot := seg.getSlot(slotId) copy(slot[idx+1:], slot[idx:]) slot[idx].offset = offset slot[idx].hash16 = hash16 slot[idx].keyLen = keyLen } func (seg *segment) delEntryPtrByOffset(slotId uint8, hash16 uint16, offset int64) { slot := seg.getSlot(slotId) idx, match := seg.lookupByOff(slot, hash16, offset) if !match { return } seg.delEntryPtr(slotId, slot, idx) } func (seg *segment) delEntryPtr(slotId uint8, slot []entryPtr, idx int) { offset := slot[idx].offset var entryHdrBuf [ENTRY_HDR_SIZE]byte seg.rb.ReadAt(entryHdrBuf[:], offset) entryHdr := (*entryHdr)(unsafe.Pointer(&entryHdrBuf[0])) entryHdr.deleted = true seg.rb.WriteAt(entryHdrBuf[:], offset) copy(slot[idx:], slot[idx+1:]) seg.slotLens[slotId]-- atomic.AddInt64(&seg.entryCount, -1) } func entryPtrIdx(slot []entryPtr, hash16 uint16) (idx int) { high := len(slot) for idx < high { mid := (idx + high) >> 1 oldEntry := &slot[mid] if oldEntry.hash16 < hash16 { idx = mid + 1 } else { high = mid } } return } func (seg *segment) lookup(slot []entryPtr, hash16 uint16, key []byte) (idx int, match bool) { idx = entryPtrIdx(slot, hash16) for idx < len(slot) { ptr := &slot[idx] if ptr.hash16 != hash16 { break } match = int(ptr.keyLen) == len(key) && seg.rb.EqualAt(key, ptr.offset+ENTRY_HDR_SIZE) if match { return } idx++ } return } func (seg *segment) lookupByOff(slot []entryPtr, hash16 uint16, offset int64) (idx int, match bool) { idx = entryPtrIdx(slot, hash16) for idx < len(slot) { ptr := &slot[idx] if ptr.hash16 != hash16 { break } match = ptr.offset == offset if match { return } idx++ } return } func (seg *segment) resetStatistics() { atomic.StoreInt64(&seg.totalEvacuate, 0) atomic.StoreInt64(&seg.totalExpired, 0) atomic.StoreInt64(&seg.overwrites, 0) atomic.StoreInt64(&seg.hitCount, 0) atomic.StoreInt64(&seg.missCount, 0) } func (seg *segment) clear() { bufSize := len(seg.rb.data) seg.rb.Reset(0) seg.vacuumLen = int64(bufSize) seg.slotCap = 1 seg.slotsData = make([]entryPtr, 256*seg.slotCap) for i := 0; i < len(seg.slotLens); i++ { seg.slotLens[i] = 0 } atomic.StoreInt64(&seg.hitCount, 0) atomic.StoreInt64(&seg.missCount, 0) atomic.StoreInt64(&seg.entryCount, 0) atomic.StoreInt64(&seg.totalCount, 0) atomic.StoreInt64(&seg.totalTime, 0) atomic.StoreInt64(&seg.totalEvacuate, 0) atomic.StoreInt64(&seg.totalExpired, 0) atomic.StoreInt64(&seg.overwrites, 0) } func (seg *segment) getSlot(slotId uint8) []entryPtr { slotOff := int32(slotId) * seg.slotCap return seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap] } // isExpired checks if a key is expired. func isExpired(keyExpireAt, now uint32) bool { return keyExpireAt != 0 && keyExpireAt <= now }
//A basic freecache server supports redis protocol package main import ( "bufio" "bytes" "errors" "github.com/coocood/freecache" "io" "log" "net" "net/http" _ "net/http/pprof" "runtime" "runtime/debug" "strconv" "time" ) var ( protocolErr = errors.New("protocol error") CRLF = []byte("\r\n") PING = []byte("ping") DBSIZE = []byte("dbsize") ERROR_UNSUPPORTED = []byte("-ERR unsupported command\r\n") OK = []byte("+OK\r\n") PONG = []byte("+PONG\r\n") GET = []byte("get") SET = []byte("set") SETEX = []byte("setex") DEL = []byte("del") NIL = []byte("$-1\r\n") CZERO = []byte(":0\r\n") CONE = []byte(":1\r\n") BulkSign = []byte("$") ) type Request struct { args [][]byte buf *bytes.Buffer } func (req *Request) Reset() { req.args = req.args[:0] req.buf.Reset() } type operation struct { req Request replyChan chan *bytes.Buffer } type Session struct { server *Server conn net.Conn addr string reader *bufio.Reader replyChan chan *bytes.Buffer } type Server struct { cache *freecache.Cache } func NewServer(cacheSize int) (server *Server) { server = new(Server) server.cache = freecache.NewCache(cacheSize) return } func (server *Server) Start(addr string) error { l, err := net.Listen("tcp", addr) if err != nil { log.Println(err) return err } defer l.Close() log.Println("Listening on port", addr) for { tcpListener := l.(*net.TCPListener) tcpListener.SetDeadline(time.Now().Add(time.Second)) conn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { continue } return err } session := new(Session) session.conn = conn session.replyChan = make(chan *bytes.Buffer, 100) session.addr = conn.RemoteAddr().String() session.server = server session.reader = bufio.NewReader(conn) go session.readLoop() go session.writeLoop() } } func copyN(buffer *bytes.Buffer, r *bufio.Reader, n int64) (err error) { if n <= 512 { var buf [512]byte _, err = r.Read(buf[:n]) if err != nil { return } buffer.Write(buf[:n]) } else { _, err = io.CopyN(buffer, r, n) } return } func (server *Server) ReadClient(r *bufio.Reader, req *Request) (err error) { line, err := readLine(r) if err != nil { return } if len(line) == 0 || line[0] != '*' { err = protocolErr return } argc, err := btoi(line[1:]) if err != nil { return } if argc <= 0 || argc > 4 { err = protocolErr return } var argStarts [4]int var argEnds [4]int req.buf.Write(line) req.buf.Write(CRLF) cursor := len(line) + 2 for i := 0; i < argc; i++ { line, err = readLine(r) if err != nil { return } if len(line) == 0 || line[0] != '$' { err = protocolErr return } var argLen int argLen, err = btoi(line[1:]) if err != nil { return } if argLen < 0 || argLen > 512*1024*1024 { err = protocolErr return } req.buf.Write(line) req.buf.Write(CRLF) cursor += len(line) + 2 err = copyN(req.buf, r, int64(argLen)+2) if err != nil { return } argStarts[i] = cursor argEnds[i] = cursor + argLen cursor += argLen + 2 } data := req.buf.Bytes() for i := 0; i < argc; i++ { req.args = append(req.args, data[argStarts[i]:argEnds[i]]) } lower(req.args[0]) return } func (down *Session) readLoop() { var req = new(Request) req.buf = new(bytes.Buffer) for { req.Reset() err := down.server.ReadClient(down.reader, req) if err != nil { close(down.replyChan) return } reply := new(bytes.Buffer) if len(req.args) == 4 && bytes.Equal(req.args[0], SETEX) { expire, err := btoi(req.args[2]) if err != nil { reply.Write(ERROR_UNSUPPORTED) } else { down.server.cache.Set(req.args[1], req.args[3], expire) reply.Write(OK) } } else if len(req.args) == 3 && bytes.Equal(req.args[0], SET) { down.server.cache.Set(req.args[1], req.args[2], 0) reply.Write(OK) } else if len(req.args) == 2 { if bytes.Equal(req.args[0], GET) { value, err := down.server.cache.Get(req.args[1]) if err != nil { reply.Write(NIL) } else { bukLen := strconv.Itoa(len(value)) reply.Write(BulkSign) reply.WriteString(bukLen) reply.Write(CRLF) reply.Write(value) reply.Write(CRLF) } } else if bytes.Equal(req.args[0], DEL) { if down.server.cache.Del(req.args[1]) { reply.Write(CONE) } else { reply.Write(CZERO) } } } else if len(req.args) == 1 { if bytes.Equal(req.args[0], PING) { reply.Write(PONG) } else if bytes.Equal(req.args[0], DBSIZE) { entryCount := down.server.cache.EntryCount() reply.WriteString(":") reply.WriteString(strconv.Itoa(int(entryCount))) reply.Write(CRLF) } else { reply.Write(ERROR_UNSUPPORTED) } } down.replyChan <- reply } } func (down *Session) writeLoop() { var buffer = bytes.NewBuffer(nil) var replies = make([]*bytes.Buffer, 1) for { buffer.Reset() select { case reply, ok := <-down.replyChan: if !ok { down.conn.Close() return } replies = replies[:1] replies[0] = reply queueLen := len(down.replyChan) for i := 0; i < queueLen; i++ { reply = <-down.replyChan replies = append(replies, reply) } for _, reply := range replies { if reply == nil { buffer.Write(NIL) continue } buffer.Write(reply.Bytes()) } _, err := down.conn.Write(buffer.Bytes()) if err != nil { down.conn.Close() return } } } } func readLine(r *bufio.Reader) ([]byte, error) { p, err := r.ReadSlice('\n') if err != nil { return nil, err } i := len(p) - 2 if i < 0 || p[i] != '\r' { return nil, protocolErr } return p[:i], nil } func btoi(data []byte) (int, error) { if len(data) == 0 { return 0, nil } i := 0 sign := 1 if data[0] == '-' { i++ sign *= -1 } if i >= len(data) { return 0, protocolErr } var l int for ; i < len(data); i++ { c := data[i] if c < '0' || c > '9' { return 0, protocolErr } l = l*10 + int(c-'0') } return sign * l, nil } func lower(data []byte) { for i := 0; i < len(data); i++ { if data[i] >= 'A' && data[i] <= 'Z' { data[i] += 'a' - 'A' } } } func main() { runtime.GOMAXPROCS(runtime.NumCPU() - 1) server := NewServer(256 * 1024 * 1024) debug.SetGCPercent(10) go func() { log.Println(http.ListenAndServe("localhost:6060", nil)) }() server.Start(":7788") }
package freecache import ( "sync/atomic" "time" ) // Timer holds representation of current time. type Timer interface { // Give current time (in seconds) Now() uint32 } // Timer that must be stopped. type StoppableTimer interface { Timer // Release resources of the timer, functionality may or may not be affected // It is not called automatically, so user must call it just once Stop() } // Helper function that returns Unix time in seconds func getUnixTime() uint32 { return uint32(time.Now().Unix()) } // Default timer reads Unix time always when requested type defaultTimer struct{} func (timer defaultTimer) Now() uint32 { return getUnixTime() } // Cached timer stores Unix time every second and returns the cached value type cachedTimer struct { now uint32 ticker *time.Ticker done chan bool } // Create cached timer and start runtime timer that updates time every second func NewCachedTimer() StoppableTimer { timer := &cachedTimer{ now: getUnixTime(), ticker: time.NewTicker(time.Second), done: make(chan bool), } go timer.update() return timer } func (timer *cachedTimer) Now() uint32 { return atomic.LoadUint32(&timer.now) } // Stop runtime timer and finish routine that updates time func (timer *cachedTimer) Stop() { timer.ticker.Stop() timer.done <- true close(timer.done) timer.done = nil timer.ticker = nil } // Periodically check and update of time func (timer *cachedTimer) update() { for { select { case <-timer.done: return case <-timer.ticker.C: atomic.StoreUint32(&timer.now, getUnixTime()) } } }