// Package sdb offers a simple key-value database that can be utilized with the // go-shelve project. // // It should be suitable for a wide range of applications, but the driver // directory (go-shelve/driver) provides additional options for configuring the // Shelf with other supported databases from the Go ecosystem. // // # DB Records // // In sdb, each database record is represented by a distinct file stored in a // bucket, which is a corresponding filesystem directory. The number of // documents stored in each bucket is unlimited, and modern filesystems should // be able to handle large buckets without significantly affecting performance. // // Each file record's name is "base32hex" encoding of the key, which preserves // lexical sort order [1]. Keys are limited to 128 characters. The record file // is stored as binary data. With this design, Users do not need to worry about // hitting the maximum filename length or storing keys with forbidden // characters. // // # Cache // // The sdb database uses a memory-based cache to speed up operations. By // default, the cache size is unlimited, but it can be configured to a fixed // size or disabled altogether. // // The cache's design, albeit simple, can enhance the performance of "DB.Get" // and "DB.Items" to more than 1 million reads per second on standard hardware. // // # Atomicity // // New records are written atomically to the key-value store. With a // file-per-record design, sdb achieves this by using atomic file writes, which // consist of creating a temporary file and then renaming it [2]. // // This ensures that the database's methods are always performed with one // atomic operation, significantly simplifying the recovery process. // // Currently, the only data that can become inconsistent is the count of stored // records, but if this happens, it is detected and corrected at the DB // initialization. // // As an optimization, records might be written directly without needing a // temporary file if the data fits in a single sector since a single-sector // write can be assumed to be atomic on some systems [3] [4]. // // # Durability // // By default, sdb leverages the filesystem cache to speed up the database // writes. This is generally suitable for most applications for which sdb is // intended, as modern hardware can offer sufficient protection against // crashes and ensure durability. // // For the highest level of durability, the WithSynchronousWrites option makes // the database synchronize data to persistent storage on each write. // // # Notes // // [1] https://datatracker.ietf.org/doc/html/rfc4648#section-7 // // [2] On Windows, additional configuration is involved. // // [3] https://stackoverflow.com/questions/2009063/are-disk-sector-writes-atomic // // [4] https://web.cs.ucla.edu/classes/spring07/cs111-2/scribe/lecture14.html package sdb import ( "encoding/base32" "errors" "fmt" "os" "path/filepath" "sync" "time" "github.com/lucmq/go-shelve/sdb/internal" ) const ( // DefaultCacheSize is the default size of the cache used to speed up the // database operations. A value of -1 represents an unlimited cache. DefaultCacheSize = -1 // MaxKeyLength is the maximum size of a key. MaxKeyLength = 128 // metadataSyncInterval is the interval at which the metadata is synced to // disk. metadataSyncInterval = 1 * time.Minute ) const ( dataDirectory = "data" metadataDirectory = "meta" ) const version = 1 var ( // ErrKeyTooLarge is returned when a key exceeds the maximum length. ErrKeyTooLarge = errors.New("key exceeds maximum length") ) // Yield is a function called when iterating over key-value pairs in the // database. If Yield returns false or an error, the iteration stops. type Yield = func(key, value []byte) (bool, error) // DB represents a database, which is created with the Open function. // // Client applications must call DB.Close() when done with the database. // // A DB is safe for concurrent use by multiple goroutines. type DB struct { mu sync.RWMutex path string metadata metadata cache internal.Cache[cacheEntry] syncWrites bool } // cacheEntry represents an entry in the cache. type cacheEntry = []byte // Open opens the database at the given path. If the path does not exist, it is // created. // // Client applications must call DB.Close() when done with the database. func Open(path string, options ...Option) (*DB, error) { db := DB{ path: path, metadata: makeMetadata(), cache: internal.NewCache[cacheEntry](-1), syncWrites: false, } // Apply options for _, option := range options { option(&db) } err := initializeDatabase(&db) if err != nil { return nil, fmt.Errorf("initialize database: %w", err) } go syncMetadata(&db) return &db, nil } // Close synchronizes and closes the database. func (db *DB) Close() error { return db.Sync() } // Len returns the number of items in the database. If an error occurs, it // returns -1. func (db *DB) Len() int64 { return int64(db.metadata.TotalEntries) } // Sync synchronizes the database to persistent storage. func (db *DB) Sync() error { db.mu.Lock() defer db.mu.Unlock() // Mark as consistent db.metadata.Checkpoint = db.metadata.Generation return db.metadata.Save(db.path) } // Has reports whether a key exists in the database. func (db *DB) Has(key []byte) (bool, error) { db.mu.RLock() defer db.mu.RUnlock() _, ok := db.cache.Get(string(key)) if ok { return true, nil } _, err := os.Stat(keyPath(db, key)) if err != nil && !os.IsNotExist(err) { return false, fmt.Errorf("stat: %w", err) } return !os.IsNotExist(err), nil } // Get retrieves the value associated with a key from the database. If the key // is not found, it returns nil. func (db *DB) Get(key []byte) ([]byte, error) { db.mu.RLock() defer db.mu.RUnlock() v, ok := db.cache.Get(string(key)) if ok { return v, nil } value, err := os.ReadFile(keyPath(db, key)) if err != nil && !os.IsNotExist(err) { return nil, fmt.Errorf("read file: %w", err) } if os.IsNotExist(err) { return nil, nil } return value, err } // Put adds a key-value pair to the database. If the key already exists, it // overwrites the existing value. // // It returns an error if the key is greater than [MaxKeyLength]. func (db *DB) Put(key, value []byte) error { if err := prepareForMutation(db); err != nil { return fmt.Errorf("prepare for mutation: %w", err) } if len(key) > MaxKeyLength { return ErrKeyTooLarge } db.mu.Lock() defer db.mu.Unlock() updated, err := putPath(db, keyPath(db, key), value) if err != nil { return fmt.Errorf("put path: %w", err) } if !updated { db.metadata.TotalEntries++ } db.metadata.Generation++ // Cache aside db.cache.Put(string(key), value) return nil } func putPath(db *DB, path string, value []byte) (updated bool, err error) { _, err = os.Stat(path) if err != nil && !os.IsNotExist(err) { return false, fmt.Errorf("stat: %w", err) } if err == nil { updated = true } writer := newAtomicWriter(db.syncWrites) err = writer.WriteFile(path, value, !updated) return updated, err } // Delete removes a key-value pair from the database. func (db *DB) Delete(key []byte) error { if err := prepareForMutation(db); err != nil { return fmt.Errorf("prepare for mutation: %w", err) } db.mu.Lock() defer db.mu.Unlock() var deleted bool err := os.Remove(keyPath(db, key)) if err != nil && !os.IsNotExist(err) { return fmt.Errorf("remove: %w", err) } if err == nil { deleted = true } if deleted { db.metadata.TotalEntries-- } db.metadata.Generation++ db.cache.Delete(string(key)) return nil } // Items iterates over key-value pairs in the database, calling fn(k, v) // for each pair in the sequence. The iteration stops early if the function // fn returns false. // // The start and order parameters exist for compatibility with the shelve.DB // interface and are not currently used. // // The operation will acquire a read lock everytime a database record is read // and will hold for the duration of the fn callback. Implementations that need // to quickly release the lock, should copy the key-value pair and return as // soon as possible from the callback. func (db *DB) Items(start []byte, order int, fn Yield) error { _, _ = start, order root := filepath.Join(db.path, dataDirectory) _, err := items(db, root, fn) if err != nil { return fmt.Errorf("walk data directory: %w", err) } return nil } func items( db *DB, root string, fn func(key, value []byte) (bool, error), ) ( count int, err error, ) { err = readDir(root, func(name string) (bool, error) { path := filepath.Join(root, name) count++ return handlePathWithLock(db, path, fn) }) return count, err } func handlePathWithLock( db *DB, path string, fn func(key, value []byte) (bool, error), ) (bool, error) { db.mu.RLock() defer db.mu.RUnlock() // Note: Hold the lock while the callback fn is being executed. Do not // assume we can release it earlier (after the record read). key, err := parseKey(path) if err != nil { return false, fmt.Errorf("parse key: %w", err) } // Use the cache (but do not cache aside while iterating) value, ok := db.cache.Get(string(key)) if ok { return fn(key, value) } // Read from the disk v, err := os.ReadFile(path) if errors.Is(err, os.ErrNotExist) { // Deleted while iterating? Ignore. return true, nil } if err != nil { return false, fmt.Errorf("read key-value: %w", err) } return fn(key, v) } // Helpers func keyPath(db *DB, key []byte) string { base := base32.HexEncoding.EncodeToString(key) return filepath.Join(db.path, dataDirectory, base) } func parseKey(path string) ([]byte, error) { base := filepath.Base(path) return base32.HexEncoding.DecodeString(base) } // prepareForMutation ensures we have enough information saved in persistent // storage to be able to recover the database in the event of an error. // // Before each mutation, we compare the database generation value with the // checkpoint. If they are equal, we increase generation and sync the metadata. // Different values for generation and checkpoint indicates that the database // has pending state to be synced to persistent storage. // // The I/O done by this function should be amortized between many mutations. func prepareForMutation(db *DB) error { ok := db.mu.TryLock() if !ok { return nil } defer db.mu.Unlock() if db.metadata.Generation != db.metadata.Checkpoint { // Already drifted return nil } // Mark as loaded db.metadata.Generation = db.metadata.Checkpoint + 1 // Sync the metadata return db.metadata.Save(db.path) } // syncMetadata periodically syncs the metadata to persistent storage. func syncMetadata(d *DB) { // Note: This is only done to decrease the chance of a recovery triggered // in the initialization due to a user forgetting to call DB.Close() or a // system crash. The database doesn't really depend on this mechanism and // errors here can be ignored. for { time.Sleep(metadataSyncInterval) _ = d.Sync() } }
package sdb import ( "errors" "fmt" "io" "math/rand/v2" "os" "path/filepath" "runtime" "time" ) var ( defaultDiskSectorSize = 4096 defaultPermissions = os.FileMode(0600) defaultDirPermissions = os.FileMode(0700) ) // The main object of atomicWrite is to protect against incomplete writes. // When used together with O_SYNC, atomicWrite also provides some additional // durability guarantees. type atomicWriter struct { syncWrites bool diskSectorSize int perm os.FileMode } func newAtomicWriter(syncWrites bool) *atomicWriter { // Note: If we decide to ask the host system for the disk sector size, // we can use the go `init` function for that and keep this constructor // cleaner, without the need to return an error and also, without the // need to query the os multiple times. diskSectorSize := defaultDiskSectorSize return &atomicWriter{ syncWrites: syncWrites, diskSectorSize: diskSectorSize, perm: defaultPermissions, } } func (w *atomicWriter) flag(excl bool) int { flag := os.O_WRONLY | os.O_CREATE | os.O_TRUNC if w.syncWrites { flag |= os.O_SYNC } if excl { flag |= os.O_EXCL } return flag } func (w *atomicWriter) WriteFile(path string, data []byte, excl bool) error { var err error defer func() { // Sync the parent directory for more durability guarantees. See: // - https://lwn.net/Articles/457667/#:~:text=When%20should%20you%20Fsync if err == nil && w.syncWrites { _ = syncFile(filepath.Dir(path)) } }() if runtime.GOOS == "linux" && len(data) <= w.diskSectorSize { // Optimization: Write directly if the data fits in a single sector, // since a single-sector write can be assumed to be atomic. See: // // - https://stackoverflow.com/questions/2009063/are-disk-sector-writes-atomic // - https://web.cs.ucla.edu/classes/spring07/cs111-2/scribe/lecture14.html // // This optimization assumes that the host supports atomic writes to a // disk sector. return w._writeFile(path, data, excl) } tmpPath := makeTempPath(path) // w.writeFile will sync, if configured to do so. err = w._writeFile(tmpPath, data, excl) if err != nil { return fmt.Errorf("write: %w", err) } return renameFile(tmpPath, path) } // renameFile atomically replaces the destination file or directory with the // source. It is guaranteed to either replace the target file entirely, or not // change either file. func renameFile(oldpath, newpath string) error { return os.Rename(oldpath, newpath) } // writeFile writes data to the named file, creating it if necessary. // If the file does not exist, WriteFile creates it with permissions perm (before umask); // otherwise writeFile truncates it before writing, without changing permissions. // Since writeFile requires multiple system calls to complete, a failure mid-operation // can leave the file in a partially written state. func (w *atomicWriter) _writeFile(name string, data []byte, excl bool) error { // Adapted from `os.WriteFile()` f, err := os.OpenFile(name, w.flag(excl), w.perm) if err != nil { return err } _, err = f.Write(data) if err1 := f.Close(); err1 != nil && err == nil { err = err1 } return err } func mkdirs(paths []string, perm os.FileMode) error { for _, path := range paths { if err := os.MkdirAll(path, perm); err != nil { return fmt.Errorf("MkdirAll: %w", err) } } return nil } func countFiles(path string) (uint64, error) { var count uint64 err := readDir(path, func(name string) (bool, error) { count++ return true, nil }) return count, err } func readDir(path string, fn func(name string) (bool, error)) (err error) { var f *os.File f, err = os.Open(path) if err != nil { return fmt.Errorf("open: %w", err) } defer func() { // Safely close the file and assign to the return value if err1 := f.Close(); err1 != nil && err == nil { err = err1 } }() for { var names []string batchSize := 256 names, err = f.Readdirnames(batchSize) if err != nil { // EOF or unreadable dir if errors.Is(err, io.EOF) { err = nil } return err } for _, name := range names { ok, err := fn(name) if err != nil { return fmt.Errorf("fn: %w", err) } if !ok { // Stop Iteration return nil } } } } // Helpers func makeTempPath(path string) string { tmpBase := fmt.Sprintf( "%s-%d-%d", filepath.Base(path), rand.Uint32(), time.Now().UnixNano(), ) tmpPath := filepath.Join(os.TempDir(), tmpBase) return tmpPath } func syncFile(path string) error { f, err := os.Open(path) if err != nil { return fmt.Errorf("open: %w", err) } err = f.Sync() if err1 := f.Close(); err1 != nil && err == nil { err = err1 } return err }
package sdb import ( "fmt" "os" "path/filepath" ) func initializeDatabase(db *DB) error { // Check if the database already exists fi, err := os.Stat(db.path) if err != nil && !os.IsNotExist(err) { return fmt.Errorf("stat path: %w", err) } if os.IsNotExist(err) { return createDatabaseStorage(db) } else { // Check permissions if !fi.IsDir() { return fmt.Errorf("path is not a directory") } else if fi.Mode().Perm()&0700 != 0700 { return fmt.Errorf("path permissions are not 0700") } } // Load the metadata db.metadata = metadata{} err = db.metadata.Load(db.path) if err != nil { return fmt.Errorf("load metadata: %w", err) } // Check the DB consistency and possibly recover from a corrupted // state return sanityCheck(db) } func createDatabaseStorage(db *DB) error { paths := []string{ db.path, filepath.Join(db.path, dataDirectory), filepath.Join(db.path, metadataDirectory), } if err := mkdirs(paths, defaultDirPermissions); err != nil { return fmt.Errorf("create directories: %w", err) } // Sync the database err := db.Sync() if err != nil { return fmt.Errorf("sync: %w", err) } return nil } // Check version, totalBuckets and the generations. Recover from a corrupted // database if the generation checkpoint doesn't match the current generation. func sanityCheck(db *DB) error { if err := db.metadata.Validate(); err != nil { return err } // Check generations if db.metadata.Generation != db.metadata.Checkpoint { return recoverDatabase(db) } return nil }
package internal // TKey is the key type used by the cache. type TKey = string // Cache is a generic cache interface. type Cache[TValue any] interface { // Get retrieves a value from the cache based on the provided key. It // returns the value and a boolean indicating whether the value was // found in the cache. Get(key TKey) (TValue, bool) // Put adds a new key-value pair to the cache. Put(key TKey, value TValue) // Delete removes a value from the cache based on the provided key. Delete(key TKey) } // Default Cache // DefaultCache is the default implementation of the Cache interface. It is not // safe for concurrent use, as it meant to be embedded in code that does the // concurrency control. type DefaultCache[TValue any] struct { cache Cache[TValue] hits int misses int } // Assert DefaultCache implements Cache var _ Cache[any] = (*DefaultCache[any])(nil) // NewCache creates a new cache based on the provided maximum length. // // Setting the maxLength to -1 or less will disable the eviction of elements // from the cache. A maxLength of 0 will create a pass-through cache that // does nothing. func NewCache[TValue any](maxLength int) *DefaultCache[TValue] { var c Cache[TValue] switch { case maxLength <= -1: c = newUnboundedCache[TValue]() case maxLength == 0: c = newPassThroughCache[TValue]() default: c = newRandomCache[TValue](maxLength) } return newCacheWithBase[TValue](c) } func newCacheWithBase[TValue any](c Cache[TValue]) *DefaultCache[TValue] { return &DefaultCache[TValue]{cache: c} } // Get retrieves a value from the cache based on the provided key. It // returns the value and a boolean indicating whether the value was // found in the cache. func (c *DefaultCache[TValue]) Get(key TKey) (TValue, bool) { v, ok := c.cache.Get(key) if !ok { c.misses++ return v, false } c.hits++ return v, true } // Put adds a new key-value pair to the cache. func (c *DefaultCache[TValue]) Put(key TKey, value TValue) { c.cache.Put(key, value) } // Delete removes a value from the cache based on the provided key. func (c *DefaultCache[TValue]) Delete(key TKey) { c.cache.Delete(key) } // Hits returns the number of cache hits (i.e. the number of Get calls that // found the value in the cache). func (c *DefaultCache[TValue]) Hits() int { return c.hits } // Misses returns the number of cache misses (i.e. the number of Get calls that // did not find the value in the cache). func (c *DefaultCache[TValue]) Misses() int { return c.misses } // ResetRatio resets the ratio of hits to misses. func (c *DefaultCache[TValue]) ResetRatio() { c.misses = 0 c.hits = 0 } // Unbounded Cache type unboundedCache[TValue any] struct { m map[TKey]TValue } // Check unboundedCache implements Cache interface var _ Cache[any] = (*unboundedCache[any])(nil) func newUnboundedCache[TValue any]() *unboundedCache[TValue] { return &unboundedCache[TValue]{ m: make(map[TKey]TValue), } } func (c *unboundedCache[TValue]) Get(key TKey) (TValue, bool) { v, ok := c.m[key] return v, ok } func (c *unboundedCache[TValue]) Put(key TKey, value TValue) { c.m[key] = value } func (c *unboundedCache[TValue]) Delete(key TKey) { delete(c.m, key) } // Pass-Through Cache // passThroughCache is a simple pass-through cache. type passThroughCache[TValue any] struct{} // Check passThroughCache implements Cache interface var _ Cache[any] = (*passThroughCache[any])(nil) // newPassThroughCache creates a new pass-through cache. func newPassThroughCache[TValue any]() *passThroughCache[TValue] { return &passThroughCache[TValue]{} } func (passThroughCache[TValue]) Get(TKey) (v TValue, ok bool) { return } func (passThroughCache[TValue]) Put(TKey, TValue) {} func (passThroughCache[TValue]) Delete(TKey) {} // Random Cache // randomCache provides a cache that evicts elements randomly. type randomCache[TValue any] struct { cache map[TKey]TValue maxSize int } // Check randomCache implements Cache interface var _ Cache[any] = (*randomCache[any])(nil) // newRandomCache creates a new instance of the randomCache struct, that can // hold up to maxSize elements. // // Setting the maxSize to 0 or less will disable the eviction of elements // from the cache. func newRandomCache[TValue any](maxSize int) *randomCache[TValue] { return &randomCache[TValue]{ cache: make(map[TKey]TValue), maxSize: maxSize, } } func (c *randomCache[TValue]) Get(key TKey) (value TValue, ok bool) { value, ok = c.cache[key] return value, ok } func (c *randomCache[TValue]) Put(key TKey, value TValue) { if c.maxSize <= 0 || len(c.cache) < c.maxSize { c.cache[key] = value return } // Remove any key and save the new one for k := range c.cache { delete(c.cache, k) break } c.cache[key] = value } func (c *randomCache[TValue]) Delete(key TKey) { delete(c.cache, key) }
package sdb import ( "bytes" "encoding/gob" "fmt" "os" "path/filepath" ) type metadata struct { Version uint64 TotalEntries uint64 Generation uint64 Checkpoint uint64 } func makeMetadata() metadata { return metadata{ Version: version, TotalEntries: 0, Generation: 0, Checkpoint: 0, } } func (*metadata) FilePath() string { return filepath.Join(metadataDirectory, "meta.gob") } func (m *metadata) Validate() error { if m.Version != version { return fmt.Errorf("version mismatch: expected %d, got %d", version, m.Version) } return nil } func (m *metadata) Load(path string) error { data, err := os.ReadFile(filepath.Join(path, m.FilePath())) if err != nil { return fmt.Errorf("read file: %w", err) } return m.Decode(data) } func (m *metadata) Save(path string) error { data, err := m.Encode() if err != nil { return fmt.Errorf("encode metadata: %w", err) } w := newAtomicWriter(false) return w.WriteFile( filepath.Join(path, m.FilePath()), data, false, ) } func (m *metadata) Encode() ([]byte, error) { var buf bytes.Buffer enc := gob.NewEncoder(&buf) err := enc.Encode(m) if err != nil { return nil, fmt.Errorf("encode gob: %w", err) } return buf.Bytes(), nil } func (m *metadata) Decode(data []byte) error { dec := gob.NewDecoder(bytes.NewReader(data)) return dec.Decode(m) }
package sdb import "github.com/lucmq/go-shelve/sdb/internal" // Option is passed to the Open function to create a customized DB. type Option func(*DB) // WithCacheSize sets the size of the cache used by the database. A value of -1 // represents an unlimited cache and a value of 0 disables the cache. The // default cache size is -1. func WithCacheSize(size int64) Option { return func(db *DB) { db.cache = internal.NewCache[cacheEntry](int(size)) } } // WithSynchronousWrites enables synchronous writes to the database. By default, // synchronous writes are disabled. func WithSynchronousWrites(sync bool) Option { return func(db *DB) { db.syncWrites = sync } }
package sdb import ( "fmt" "path/filepath" ) // Recover the database from a corrupted state, detected at initialization. func recoverDatabase(db *DB) error { // Note: We have the following: // - The DB design is simple, and all operations require at most one file // mutation. // - The metadata is stored in a single file, and all operations require // at most one file mutation. // - Currently, the only thing that can get corrupted is the metadata, in // particular, the metadata.TotalEntries count. // // Thus, the recovery process can be limited to counting the number of // files in the data folder and updating the metadata. dataRoot := filepath.Join(db.path, dataDirectory) totalItems, err := countItems(dataRoot) if err != nil { return fmt.Errorf("count items: %w", err) } db.metadata.TotalEntries = totalItems db.metadata.Checkpoint = db.metadata.Generation err = db.metadata.Save(db.path) if err != nil { return fmt.Errorf("sync metadata: %w", err) } return nil } func countItems(path string) (uint64, error) { // Each database record is represented by a regular file. return countFiles(path) }
package shelve import ( "bytes" "encoding/gob" "encoding/json" "fmt" "strconv" ) // Codec is the interface for encoding and decoding data stored by Shelf. // // The go-shelve module natively supports the following codecs: // - [GobCodec]: Returns a Codec for the [gob] format. // - [JSONCodec]: Returns a Codec for the JSON format. // - [StringCodec]: Returns a Codec for values that can be represented as // strings. // // Additional codecs are provided by the packages in [driver/encoding]. // // [driver/encoding]: https://pkg.go.dev/github.com/lucmq/go-shelve/driver/encoding type Codec interface { // Encode returns the Codec encoding of v as a byte slice. Encode(v any) ([]byte, error) // Decode parses the encoded data and stores the result in the value // pointed to by v. It is the inverse of Encode. Decode(data []byte, v any) error } // GobCodec Returns a Codec for the [gob] format, a self-describing // serialization format native to Go. // // Gob is a binary format and is more compact than text-based formats like // JSON. func GobCodec() Codec { return gobCodec{} } // JSONCodec Returns a Codec for the JSON format. func JSONCodec() Codec { return jsonCodec{} } // StringCodec Returns a Codec for values that can be represented as strings. func StringCodec() Codec { return stringCodec{} } // Gob Codec type gobCodec struct{} func (gobCodec) Encode(value any) ([]byte, error) { var buf bytes.Buffer enc := gob.NewEncoder(&buf) err := enc.Encode(value) if err != nil { return nil, fmt.Errorf("encode gob: %w", err) } return buf.Bytes(), nil } func (gobCodec) Decode(data []byte, value any) error { dec := gob.NewDecoder(bytes.NewReader(data)) err := dec.Decode(value) if err != nil { return fmt.Errorf("decode gob: %w", err) } return nil } // Json Codec type jsonCodec struct{} func (jsonCodec) Encode(value any) ([]byte, error) { return json.MarshalIndent(value, "", " ") } func (jsonCodec) Decode(data []byte, value any) error { return json.Unmarshal(data, value) } // String Codec type stringCodec struct{} func (stringCodec) Encode(value any) ([]byte, error) { switch v := value.(type) { case string: return []byte(v), nil case int: return []byte(strconv.Itoa(v)), nil case int64: return []byte(strconv.FormatInt(v, 10)), nil case uint64: return []byte(strconv.FormatUint(v, 10)), nil default: return []byte(fmt.Sprintf("%v", value)), nil } } func (stringCodec) Decode(data []byte, value any) error { switch v := value.(type) { case *string: *v = string(data) return nil case *int: i, err := strconv.Atoi(string(data)) *v = i return err case *int64: i, err := strconv.ParseInt(string(data), 10, 64) *v = i return err case *uint64: u, err := strconv.ParseUint(string(data), 10, 64) *v = u return err default: _, err := fmt.Sscanf(string(data), "%v", value) return err } }
// Package shelve provides a persistent, map-like object called Shelf. It lets you // store and retrieve Go objects directly, with the serialization and storage handled // automatically by the Shelf. Additionally, you can customize the underlying // key-value storage and serialization codec to better suit your application's needs. // // This package is inspired by the `shelve` module from the Python standard library // and aims to provide a similar set of functionalities. // // By default, a Shelf serializes data using the Gob format and stores it using `sdb` // (for "shelve-db"), a simple key-value storage created for this project. This // database should be good enough for a broad range of applications, but the modules // in [go-shelve/driver] provide additional options for configuring the `Shelf` with // other databases and Codecs. // // [go-shelve/driver]: https://pkg.go.dev/github.com/lucmq/go-shelve/driver package shelve import ( "fmt" "github.com/lucmq/go-shelve/sdb" ) const ( // Asc and Desc can be used with the Shelf.Items method to make the // iteration order ascending or descending respectively. // // They are just syntactic sugar to make the iteration order more // explicit. Asc = 1 // Desc is the opposite of Asc. Desc = -1 // All can be used with the Shelf.Items method to iterate over all // items in the database. It is the same as the -1 value. All = -1 ) // Yield is a function called when iterating over key-value pairs in the // Shelf. If Yield returns false or an error, the iteration stops. type Yield[K, V any] func(key K, value V) (bool, error) // A Shelf is a persistent, map-like object. It is used together with an // underlying key-value storage to store Go objects directly. // // Stored values can be of arbitrary types, but the keys must be comparable. // // The values are encoded as [Gob], and keys, if they are strings or integers, // are encoded as strings. Otherwise, they will also be encoded as Gob. // // For storage, the underlying database is an instance of the [sdb.DB] // ("shelve-db") key-value store. // // The underlying storage and codec Shelf uses can be configured with the // [Option] functions. // // [Gob]: https://pkg.go.dev/encoding/gob type Shelf[K comparable, V any] struct { db DB codec Codec keyCodec Codec } // Option is passed to the Open function to create a customized Shelf. type Option func(any) type options struct { DB DB Codec Codec KeyCodec Codec } // WithDatabase specifies the underlying database to use. By default, the // [sdb.DB] ("shelve-db") key-value storage is used. // // The packages in [driver/db] packages provide support for others databases in // the Go ecosystem, like [Bolt] and [Badger]. // // [driver/db]: https://pkg.go.dev/github.com/lucmq/go-shelve/driver/db // [Bolt]: https://pkg.go.dev/github.com/etcd-io/bbolt // [Badger]: https://pkg.go.dev/github.com/dgraph-io/badger func WithDatabase(db DB) Option { return func(v any) { opt := v.(*options) opt.DB = db } } // WithCodec specifies the Codec to use. By default, a codec for the Gob format // from the standard library ([encoding/gob]) is used. // // Additional Codecs can be found in the packages in [driver/encoding]. // // [driver/encoding]: https://pkg.go.dev/github.com/lucmq/go-shelve/driver/encoding func WithCodec(c Codec) Option { return func(v any) { opt := v.(*options) opt.Codec = c } } // WithKeyCodec specifies the Codec to use with keys. By default, if the key is // a string or an integer type (both signed and unsigned), the [StringCodec] is // used. Otherwise, keys are encoded as Gob ([encoding/gob]). // // Additional Codecs can be found in the packages in [driver/encoding]. // // [driver/encoding]: https://pkg.go.dev/github.com/lucmq/go-shelve/driver/encoding func WithKeyCodec(c Codec) Option { return func(v any) { opt := v.(*options) opt.KeyCodec = c } } // Open creates a new Shelf. // // The path parameter specifies the filesystem path to the database files. It // can be a directory or a regular file, depending on the underlying database // implementation. With the default database [sdb.DB], it will point to a // directory. func Open[K comparable, V any](path string, opts ...Option) ( *Shelf[K, V], error, ) { var k K o := options{ Codec: GobCodec(), KeyCodec: defaultKeyCodec(k), } for _, option := range opts { option(&o) } if o.DB == nil { db, err := sdb.Open(path) if err != nil { return nil, fmt.Errorf("open db: %w", err) } o.DB = db } return &Shelf[K, V]{ db: o.DB, codec: o.Codec, keyCodec: o.KeyCodec, }, nil } // Close synchronizes and closes the Shelf. func (s *Shelf[K, V]) Close() error { return s.db.Close() } // Len returns the number of items in the Shelf. It returns the number // of items as an int64. If an error occurs, it returns -1. func (s *Shelf[K, V]) Len() int64 { return s.db.Len() } // Sync synchronizes the Shelf contents to persistent storage. func (s *Shelf[K, V]) Sync() error { return s.db.Sync() } // Has reports whether a key exists in the Shelf. func (s *Shelf[K, V]) Has(key K) (bool, error) { data, err := s.keyCodec.Encode(key) if err != nil { return false, fmt.Errorf("encode: %w", err) } ok, err := s.db.Has(data) if err != nil { return false, fmt.Errorf("has: %w", err) } return ok, nil } // Get retrieves the value associated with a key from the Shelf. If the key is // not found, it returns nil. func (s *Shelf[K, V]) Get(key K) (value V, ok bool, err error) { data, err := s.keyCodec.Encode(key) if err != nil { return *new(V), false, fmt.Errorf("encode: %w", err) } vData, err := s.db.Get(data) if err != nil { return *new(V), false, fmt.Errorf("get: %w", err) } if vData == nil { return *new(V), false, nil } var v V err = s.codec.Decode(vData, &v) return v, true, err } // Put adds a key-value pair to the Shelf. If the key already exists, it // overwrites the existing value. func (s *Shelf[K, V]) Put(key K, value V) error { data, err := s.keyCodec.Encode(key) if err != nil { return fmt.Errorf("encode key: %w", err) } vData, err := s.codec.Encode(value) if err != nil { return fmt.Errorf("encode value: %w", err) } err = s.db.Put(data, vData) if err != nil { return fmt.Errorf("put: %w", err) } return nil } // Delete removes a key-value pair from the Shelf. func (s *Shelf[K, V]) Delete(key K) error { data, err := s.keyCodec.Encode(key) if err != nil { return fmt.Errorf("encode: %w", err) } err = s.db.Delete(data) if err != nil { return fmt.Errorf("delete: %w", err) } return nil } // Items iterates over key-value pairs in the Shelf, calling fn(k, v) for each // pair in the sequence. The iteration stops early if the function fn returns // false. // // The start parameter specifies the key from which the iteration should start. // If the start parameter is nil, the iteration will begin from the first key // in the Shelf. // // The n parameter specifies the maximum number of items to iterate over. If n // is -1 or less, all items will be iterated. // // The step parameter specifies the number of items to skip between each // iteration. A negative value for step will cause the iteration to occur in // reverse order. // // When iterating over key-value pairs in a Shelf, the order of iteration may // not be sorted. Some database implementations may ignore the start parameter // or not support iteration in reverse order. // // The default database used with Shelf (sdb.DB) does not yield items in any // particular order and ignores the start parameter. func (s *Shelf[K, V]) Items(start *K, n, step int, fn Yield[K, V]) error { dbFn := func(k, v []byte) (bool, error) { var key K var value V err := s.keyCodec.Decode(k, &key) if err != nil { return false, fmt.Errorf("decode key: %w", err) } if len(v) != 0 { err = s.codec.Decode(v, &value) if err != nil { return false, fmt.Errorf("decode value: %w", err) } } return fn(key, value) } return s.iterate(start, n, step, dbFn) } // Keys iterates over all keys in the Shelf and calls the user-provided // function fn for each key. The details of the iteration are the same as // for [Shelf.Items]. // // The value parameter for fn will always be the zero value for the type V. func (s *Shelf[K, V]) Keys(start *K, n, step int, fn Yield[K, V]) error { dbFn := func(k, _ []byte) (bool, error) { var key K var zero V err := s.keyCodec.Decode(k, &key) if err != nil { return false, fmt.Errorf("decode: %w", err) } return fn(key, zero) } return s.iterate(start, n, step, dbFn) } // Values iterates over all values in the Shelf and calls the user-provided // function fn for each value. The details of the iteration are the same as // for [Shelf.Items]. // // The key parameter for fn will always be the zero value for the type K. func (s *Shelf[K, V]) Values(start *K, n, step int, fn Yield[K, V]) error { dbFn := func(_, v []byte) (bool, error) { var zero K var value V err := s.codec.Decode(v, &value) if err != nil { return false, fmt.Errorf("decode: %w", err) } return fn(zero, value) } return s.iterate(start, n, step, dbFn) } func (s *Shelf[K, V]) iterate( start *K, n, step int, fn func(k, v []byte) (bool, error), ) error { var from []byte = nil var err error if start != nil { from, err = s.keyCodec.Encode(*start) if err != nil { return fmt.Errorf("encode start: %w", err) } } var order int if step > 0 { order = Asc } else if step < 0 { order = Desc step = -step } else { return nil } var total int var counter = step - 1 // 0, 1, ..., step - 1 return s.db.Items(from, order, func(k, v []byte) (bool, error) { if n > 0 && total >= n { return false, nil } // Increase counter until the step is reached if counter < step-1 { counter++ return true, nil } counter = 0 total++ return fn(k, v) }) } // Helpers func defaultKeyCodec(key any) Codec { switch key.(type) { case int8, int16, int32, int64, int: return StringCodec() case uint8, uint16, uint32, uint64, uint: return StringCodec() case string: return StringCodec() default: return GobCodec() } }