// Package sdb offers a simple key-value database that can be utilized with the
// go-shelve project.
//
// It should be suitable for a wide range of applications, but the driver
// directory (go-shelve/driver) provides additional options for configuring the
// Shelf with other supported databases from the Go ecosystem.
//
// # DB Records
//
// In sdb, each database record is represented by a distinct file stored in a
// bucket, which is a corresponding filesystem directory. The number of
// documents stored in each bucket is unlimited, and modern filesystems should
// be able to handle large buckets without significantly affecting performance.
//
// Each file record's name is "base32hex" encoding of the key, which preserves
// lexical sort order [1]. Keys are limited to 128 characters. The record file
// is stored as binary data. With this design, Users do not need to worry about
// hitting the maximum filename length or storing keys with forbidden
// characters.
//
// # Cache
//
// The sdb database uses a memory-based cache to speed up operations. By
// default, the cache size is unlimited, but it can be configured to a fixed
// size or disabled altogether.
//
// The cache's design, albeit simple, can enhance the performance of "DB.Get"
// and "DB.Items" to more than 1 million reads per second on standard hardware.
//
// # Atomicity
//
// New records are written atomically to the key-value store. With a
// file-per-record design, sdb achieves this by using atomic file writes, which
// consist of creating a temporary file and then renaming it [2].
//
// This ensures that the database's methods are always performed with one
// atomic operation, significantly simplifying the recovery process.
//
// Currently, the only data that can become inconsistent is the count of stored
// records, but if this happens, it is detected and corrected at the DB
// initialization.
//
// As an optimization, records might be written directly without needing a
// temporary file if the data fits in a single sector since a single-sector
// write can be assumed to be atomic on some systems [3] [4].
//
// # Durability
//
// By default, sdb leverages the filesystem cache to speed up the database
// writes. This is generally suitable for most applications for which sdb is
// intended, as modern hardware can offer sufficient protection against
// crashes and ensure durability.
//
// For the highest level of durability, the WithSynchronousWrites option makes
// the database synchronize data to persistent storage on each write.
//
// # Notes
//
// [1] https://datatracker.ietf.org/doc/html/rfc4648#section-7
//
// [2] On Windows, additional configuration is involved.
//
// [3] https://stackoverflow.com/questions/2009063/are-disk-sector-writes-atomic
//
// [4] https://web.cs.ucla.edu/classes/spring07/cs111-2/scribe/lecture14.html
package sdb
import (
"encoding/base32"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/lucmq/go-shelve/sdb/internal"
)
const (
// DefaultCacheSize is the default size of the cache used to speed up the
// database operations. A value of -1 represents an unlimited cache.
DefaultCacheSize = -1
// MaxKeyLength is the maximum size of a key.
MaxKeyLength = 128
// metadataSyncInterval is the interval at which the metadata is synced to
// disk.
metadataSyncInterval = 1 * time.Minute
)
const (
dataDirectory = "data"
metadataDirectory = "meta"
)
const version = 1
var (
// ErrKeyTooLarge is returned when a key exceeds the maximum length.
ErrKeyTooLarge = errors.New("key exceeds maximum length")
// ErrDatabaseClosed is returned when the database is closed.
ErrDatabaseClosed = errors.New("database is closed")
)
// Yield is a function called when iterating over key-value pairs in the
// database. If Yield returns false or an error, the iteration stops.
type Yield = func(key, value []byte) (bool, error)
// DB represents a database, which is created with the Open function.
//
// Client applications must call DB.Close() when done with the database.
//
// A DB is safe for concurrent use by multiple goroutines.
type DB struct {
mu sync.RWMutex
path string
metadata metadata
cache internal.Cache[cacheEntry]
closed bool
// Controls the background sync loop.
done chan struct{}
wg sync.WaitGroup
syncWrites bool
// autoSync enables the background sync loop. Can be removed if a WAL
// is adopted for consistency, since the WAL would handle the sync
// loop unnecessary.
autoSync bool
}
// cacheEntry represents an entry in the cache.
type cacheEntry = []byte
// Open opens the database at the given path. If the path does not exist, it is
// created.
//
// Client applications must call DB.Close() when done with the database.
func Open(path string, options ...Option) (*DB, error) {
db := DB{
path: path,
metadata: makeMetadata(),
cache: internal.NewCache[cacheEntry](-1),
done: make(chan struct{}),
syncWrites: false,
autoSync: true,
}
// Apply options.
for _, option := range options {
option(&db)
}
err := initializeDatabase(&db)
if err != nil {
return nil, fmt.Errorf("initialize database: %w", err)
}
// Start the background loop if autoSync is enabled.
if db.autoSync {
db.wg.Add(1)
go syncMetadata(&db)
}
return &db, nil
}
// Close synchronizes and closes the database. Users must ensure no pending
// operations are in progress before calling Close().
//
// Example:
//
// var wg sync.WaitGroup
// db, _ := sdb.Open("path")
//
// // Start concurrent writes
// for i := 0; i < 10; i++ {
// wg.Add(1)
// go func(i int) {
// defer wg.Done()
// db.Put([]byte(fmt.Sprintf("key-%d", i)), []byte("value"))
// }(i)
// }
//
// wg.Wait() // Ensure all writes are done
// db.Close() // Safe to close now
func (db *DB) Close() error {
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
return nil
}
db.closed = true
// Signal the background goroutine to stop.
close(db.done)
db.wg.Wait()
// Final sync.
return syncInternal(db)
}
// Len returns the number of items in the database. If an error occurs, it
// returns -1.
func (db *DB) Len() int64 {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return -1
}
return int64(db.metadata.TotalEntries)
}
// Sync synchronizes the database to persistent storage.
func (db *DB) Sync() error {
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
return ErrDatabaseClosed
}
return syncInternal(db)
}
// Has reports whether a key exists in the database.
func (db *DB) Has(key []byte) (bool, error) {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return false, ErrDatabaseClosed
}
_, ok := db.cache.Get(string(key))
if ok {
return true, nil
}
_, err := os.Stat(keyPath(db, key))
if err != nil && !os.IsNotExist(err) {
return false, fmt.Errorf("stat: %w", err)
}
return !os.IsNotExist(err), nil
}
// Get retrieves the value associated with a key from the database. If the key
// is not found, it returns nil.
func (db *DB) Get(key []byte) ([]byte, error) {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return nil, ErrDatabaseClosed
}
v, ok := db.cache.Get(string(key))
if ok {
return v, nil
}
value, err := os.ReadFile(keyPath(db, key))
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("read file: %w", err)
}
if os.IsNotExist(err) {
return nil, nil
}
return value, err
}
// Put adds a key-value pair to the database. If the key already exists, it
// overwrites the existing value.
//
// It returns an error if the key is greater than [MaxKeyLength].
func (db *DB) Put(key, value []byte) error {
if err := prepareForMutation(db); err != nil {
return fmt.Errorf("prepare for mutation: %w", err)
}
if len(key) > MaxKeyLength {
return ErrKeyTooLarge
}
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
return ErrDatabaseClosed
}
updated, err := putPath(db, keyPath(db, key), value)
if err != nil {
return fmt.Errorf("put path: %w", err)
}
if !updated {
db.metadata.TotalEntries++
}
db.metadata.Generation++
// Cache aside
db.cache.Put(string(key), value)
return nil
}
func putPath(db *DB, path string, value []byte) (updated bool, err error) {
_, err = os.Stat(path)
if err != nil && !os.IsNotExist(err) {
return false, fmt.Errorf("stat: %w", err)
}
if err == nil {
updated = true
}
writer := newAtomicWriter(db.syncWrites)
err = writer.WriteFile(path, value, !updated)
return updated, err
}
// Delete removes a key-value pair from the database.
func (db *DB) Delete(key []byte) error {
if err := prepareForMutation(db); err != nil {
return fmt.Errorf("prepare for mutation: %w", err)
}
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
return ErrDatabaseClosed
}
var deleted bool
err := os.Remove(keyPath(db, key))
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove: %w", err)
}
if err == nil {
deleted = true
}
if deleted {
db.metadata.TotalEntries--
}
db.metadata.Generation++
db.cache.Delete(string(key))
return nil
}
// Items iterates over key-value pairs in the database, invoking fn(k, v)
// for each pair. Iteration stops early if fn returns false.
//
// The start and order parameters are present for compatibility with the
// shelve.DB interface but are currently unused.
//
// This operation acquires a read lock each time a database record is read
// and holds it for the duration of the fn callback. Implementations that
// require faster lock release should copy the key-value pair and return
// from the callback as quickly as possible.
//
// The user-provided fn(k, v) must not modify the database within the same
// goroutine as the iteration, as this would cause a deadlock.
func (db *DB) Items(start []byte, order int, fn Yield) error {
_, _ = start, order
db.mu.RLock()
if db.closed {
db.mu.RUnlock()
return ErrDatabaseClosed
}
db.mu.RUnlock()
root := filepath.Join(db.path, dataDirectory)
if _, err := items(db, root, fn); err != nil {
return fmt.Errorf("walk data directory: %w", err)
}
return nil
}
func items(
db *DB,
root string,
fn func(key, value []byte) (bool, error),
) (
count int,
err error,
) {
err = readDir(root, func(name string) (bool, error) {
path := filepath.Join(root, name)
count++
return handlePathWithLock(db, path, fn)
})
return count, err
}
func handlePathWithLock(
db *DB,
path string,
fn func(key, value []byte) (bool, error),
) (bool, error) {
key, err := parseKey(path)
if err != nil {
return false, fmt.Errorf("parse key: %w", err)
}
// Note: Hold the lock while the callback fn is being executed. Do not
// assume we can release it earlier (after the record read).
// This ensures that `fn` does not process stale data (i.e. the key-value pair
// will be the same on the database for as long as `fn` is running).
// However, notice that this will cause a deadlock if the code from `fn` tries to
// modify the database (i.e. Put or Delete, which write-acquire the mutex).
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return false, ErrDatabaseClosed
}
// Use the cache (but do not cache aside while iterating) because that would
// result in a lot of cache turnover with keys that might not be needed to be
// cached.
value, ok := db.cache.Get(string(key))
if ok {
return fn(key, value)
}
// Read from the disk.
v, err := os.ReadFile(path)
if errors.Is(err, os.ErrNotExist) {
// Deleted while iterating? Ignore.
return true, nil
}
if err != nil {
return false, fmt.Errorf("read key-value: %w", err)
}
return fn(key, v)
}
// Helpers
func keyPath(db *DB, key []byte) string {
base := base32.HexEncoding.EncodeToString(key)
return filepath.Join(db.path, dataDirectory, base)
}
func parseKey(path string) ([]byte, error) {
base := filepath.Base(path)
return base32.HexEncoding.DecodeString(base)
}
// prepareForMutation ensures we have enough information saved in persistent
// storage to be able to recover the database in the event of an error.
//
// Before each mutation, we compare the database generation value with the
// checkpoint. If they are equal, we increase generation and sync the metadata.
// Different values for generation and checkpoint indicates that the database
// has pending state to be synced to persistent storage.
//
// The I/O done by this function should be amortized between many mutations.
func prepareForMutation(db *DB) error {
ok := db.mu.TryLock()
if !ok {
return nil
}
defer db.mu.Unlock()
if db.closed {
return ErrDatabaseClosed
}
if db.metadata.Generation != db.metadata.Checkpoint {
// Already drifted
return nil
}
// Mark as loaded
db.metadata.Generation = db.metadata.Checkpoint + 1
// Sync the metadata
return db.metadata.Save(db.path)
}
func syncInternal(db *DB) error {
// Mark as consistent
db.metadata.Checkpoint = db.metadata.Generation
return db.metadata.Save(db.path)
}
// syncMetadata periodically syncs the metadata to persistent storage.
//
// Note: This is only done to decrease the chance of a recovery triggered
// in the initialization due to a user forgetting to call DB.Close() or a
// system crash. The database doesn't really depend on this mechanism and
// errors here can be ignored.
func syncMetadata(db *DB) {
defer db.wg.Done()
ticker := time.NewTicker(metadataSyncInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
_ = db.Sync()
case <-db.done:
// The channel is closed in Close(); exit the goroutine.
return
}
}
}
package sdb
import (
"errors"
"fmt"
"io"
"math/rand/v2"
"os"
"path/filepath"
"runtime"
"time"
)
var (
defaultDiskSectorSize = 4096
defaultPermissions = os.FileMode(0600)
defaultDirPermissions = os.FileMode(0700)
)
// The main object of atomicWrite is to protect against incomplete writes.
// When used together with O_SYNC, atomicWrite also provides some additional
// durability guarantees.
type atomicWriter struct {
syncWrites bool
diskSectorSize int
perm os.FileMode
}
func newAtomicWriter(syncWrites bool) *atomicWriter {
// Note: If we decide to ask the host system for the disk sector size,
// we can use the go `init` function for that and keep this constructor
// cleaner, without the need to return an error and also, without the
// need to query the os multiple times.
diskSectorSize := defaultDiskSectorSize
return &atomicWriter{
syncWrites: syncWrites,
diskSectorSize: diskSectorSize,
perm: defaultPermissions,
}
}
func (w *atomicWriter) flag(excl bool) int {
flag := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
if w.syncWrites {
flag |= os.O_SYNC
}
if excl {
flag |= os.O_EXCL
}
return flag
}
func (w *atomicWriter) WriteFile(path string, data []byte, excl bool) error {
var err error
defer func() {
// Sync the parent directory for more durability guarantees. See:
// - https://lwn.net/Articles/457667/#:~:text=When%20should%20you%20Fsync
if err == nil && w.syncWrites {
_ = syncFile(filepath.Dir(path))
}
}()
if runtime.GOOS == "linux" && len(data) <= w.diskSectorSize {
// Optimization: Write directly if the data fits in a single sector,
// since a single-sector write can be assumed to be atomic. See:
//
// - https://stackoverflow.com/questions/2009063/are-disk-sector-writes-atomic
// - https://web.cs.ucla.edu/classes/spring07/cs111-2/scribe/lecture14.html
//
// This optimization assumes that the host supports atomic writes to a
// disk sector.
return w._writeFile(path, data, excl)
}
tmpPath := makeTempPath(path)
// w.writeFile will sync, if configured to do so.
err = w._writeFile(tmpPath, data, excl)
if err != nil {
return fmt.Errorf("write: %w", err)
}
return renameFile(tmpPath, path)
}
// writeFile writes data to the named file, creating it if necessary.
// If the file does not exist, WriteFile creates it with permissions perm (before umask);
// otherwise writeFile truncates it before writing, without changing permissions.
// Since writeFile requires multiple system calls to complete, a failure mid-operation
// can leave the file in a partially written state.
func (w *atomicWriter) _writeFile(name string, data []byte, excl bool) error {
// Adapted from `os.WriteFile()`
f, err := os.OpenFile(name, w.flag(excl), w.perm)
if err != nil {
return err
}
_, err = f.Write(data)
if err1 := f.Close(); err1 != nil && err == nil {
err = err1
}
return err
}
func mkdirs(paths []string, perm os.FileMode) error {
for _, path := range paths {
if err := os.MkdirAll(path, perm); err != nil {
return fmt.Errorf("MkdirAll: %w", err)
}
}
return nil
}
func countFiles(path string) (uint64, error) {
var count uint64
err := readDir(path, func(name string) (bool, error) {
count++
return true, nil
})
return count, err
}
func readDir(path string, fn func(name string) (bool, error)) (err error) {
var f *os.File
f, err = os.Open(path)
if err != nil {
return fmt.Errorf("open: %w", err)
}
defer func() {
// Safely close the file and assign to the return value
if err1 := f.Close(); err1 != nil && err == nil {
err = err1
}
}()
for {
var names []string
batchSize := 256
// Note: We may need to acquire a read lock (`DB.mu.RLock()`) both here
// and within DB.handlePathWithLock, as we already do. This ensures
// consistency when reading directory entries and accessing database records.
names, err = f.Readdirnames(batchSize)
if err != nil {
// EOF or unreadable dir
if errors.Is(err, io.EOF) {
err = nil
}
return err
}
for _, name := range names {
ok, err := fn(name)
if err != nil {
return fmt.Errorf("fn: %w", err)
}
if !ok {
// Stop Iteration
return nil
}
}
}
}
// Helpers
func makeTempPath(path string) string {
tmpBase := fmt.Sprintf(
"%s-%d-%d",
filepath.Base(path),
rand.Uint32(),
time.Now().UnixNano(),
)
tmpPath := filepath.Join(os.TempDir(), tmpBase)
return tmpPath
}
func syncFile(path string) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("open: %w", err)
}
err = f.Sync()
if err1 := f.Close(); err1 != nil && err == nil {
err = err1
}
return err
}
//go:build !windows
// +build !windows
package sdb
import "os"
// renameFile atomically replaces the destination file or directory with the
// source. It is guaranteed to either replace the target file entirely, or not
// change either file.
func renameFile(oldpath, newpath string) error {
return os.Rename(oldpath, newpath)
}
package sdb
import (
"fmt"
"os"
"path/filepath"
)
func initializeDatabase(db *DB) error {
// Check if the database already exists
fi, err := os.Stat(db.path)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("stat path: %w", err)
}
if os.IsNotExist(err) {
return createDatabaseStorage(db)
} else {
// Check permissions
if !fi.IsDir() {
return fmt.Errorf("path is not a directory")
} else if fi.Mode().Perm()&0700 != 0700 {
return fmt.Errorf("path permissions are not 0700")
}
}
// Load the metadata
db.metadata = metadata{}
err = db.metadata.Load(db.path)
if err != nil {
return fmt.Errorf("load metadata: %w", err)
}
// Check the DB consistency and possibly recover from a corrupted
// state
return sanityCheck(db)
}
func createDatabaseStorage(db *DB) error {
paths := []string{
db.path,
filepath.Join(db.path, dataDirectory),
filepath.Join(db.path, metadataDirectory),
}
if err := mkdirs(paths, defaultDirPermissions); err != nil {
return fmt.Errorf("create directories: %w", err)
}
// Sync the database
err := db.Sync()
if err != nil {
return fmt.Errorf("sync: %w", err)
}
return nil
}
// Check version, totalBuckets and the generations. Recover from a corrupted
// database if the generation checkpoint doesn't match the current generation.
func sanityCheck(db *DB) error {
if err := db.metadata.Validate(); err != nil {
return err
}
// Check generations
if db.metadata.Generation != db.metadata.Checkpoint {
return recoverDatabase(db)
}
return nil
}
package internal
// TKey is the key type used by the cache.
type TKey = string
// Cache is a generic cache interface.
type Cache[TValue any] interface {
// Get retrieves a value from the cache based on the provided key. It
// returns the value and a boolean indicating whether the value was
// found in the cache.
Get(key TKey) (TValue, bool)
// Put adds a new key-value pair to the cache.
Put(key TKey, value TValue)
// Delete removes a value from the cache based on the provided key.
Delete(key TKey)
}
// Default Cache
// DefaultCache is the default implementation of the Cache interface. It is not
// safe for concurrent use, as it meant to be embedded in code that does the
// concurrency control.
type DefaultCache[TValue any] struct {
cache Cache[TValue]
hits int
misses int
}
// Assert DefaultCache implements Cache
var _ Cache[any] = (*DefaultCache[any])(nil)
// NewCache creates a new cache based on the provided maximum length.
//
// Setting the maxLength to -1 or less will disable the eviction of elements
// from the cache. A maxLength of 0 will create a pass-through cache that
// does nothing.
func NewCache[TValue any](maxLength int) *DefaultCache[TValue] {
var c Cache[TValue]
switch {
case maxLength <= -1:
c = newUnboundedCache[TValue]()
case maxLength == 0:
c = newPassThroughCache[TValue]()
default:
c = newRandomCache[TValue](maxLength)
}
return newCacheWithBase[TValue](c)
}
func newCacheWithBase[TValue any](c Cache[TValue]) *DefaultCache[TValue] {
return &DefaultCache[TValue]{cache: c}
}
// Get retrieves a value from the cache based on the provided key. It
// returns the value and a boolean indicating whether the value was
// found in the cache.
func (c *DefaultCache[TValue]) Get(key TKey) (TValue, bool) {
v, ok := c.cache.Get(key)
if !ok {
c.misses++
return v, false
}
c.hits++
return v, true
}
// Put adds a new key-value pair to the cache.
func (c *DefaultCache[TValue]) Put(key TKey, value TValue) {
c.cache.Put(key, value)
}
// Delete removes a value from the cache based on the provided key.
func (c *DefaultCache[TValue]) Delete(key TKey) { c.cache.Delete(key) }
// Hits returns the number of cache hits (i.e. the number of Get calls that
// found the value in the cache).
func (c *DefaultCache[TValue]) Hits() int { return c.hits }
// Misses returns the number of cache misses (i.e. the number of Get calls that
// did not find the value in the cache).
func (c *DefaultCache[TValue]) Misses() int { return c.misses }
// ResetRatio resets the ratio of hits to misses.
func (c *DefaultCache[TValue]) ResetRatio() {
c.misses = 0
c.hits = 0
}
// Unbounded Cache
type unboundedCache[TValue any] struct {
m map[TKey]TValue
}
// Check unboundedCache implements Cache interface
var _ Cache[any] = (*unboundedCache[any])(nil)
func newUnboundedCache[TValue any]() *unboundedCache[TValue] {
return &unboundedCache[TValue]{
m: make(map[TKey]TValue),
}
}
func (c *unboundedCache[TValue]) Get(key TKey) (TValue, bool) {
v, ok := c.m[key]
return v, ok
}
func (c *unboundedCache[TValue]) Put(key TKey, value TValue) {
c.m[key] = value
}
func (c *unboundedCache[TValue]) Delete(key TKey) {
delete(c.m, key)
}
// Pass-Through Cache
// passThroughCache is a simple pass-through cache.
type passThroughCache[TValue any] struct{}
// Check passThroughCache implements Cache interface
var _ Cache[any] = (*passThroughCache[any])(nil)
// newPassThroughCache creates a new pass-through cache.
func newPassThroughCache[TValue any]() *passThroughCache[TValue] {
return &passThroughCache[TValue]{}
}
func (passThroughCache[TValue]) Get(TKey) (v TValue, ok bool) { return }
func (passThroughCache[TValue]) Put(TKey, TValue) {}
func (passThroughCache[TValue]) Delete(TKey) {}
// Random Cache
// randomCache provides a cache that evicts elements randomly.
type randomCache[TValue any] struct {
cache map[TKey]TValue
maxSize int
}
// Check randomCache implements Cache interface
var _ Cache[any] = (*randomCache[any])(nil)
// newRandomCache creates a new instance of the randomCache struct, that can
// hold up to maxSize elements.
//
// Setting the maxSize to 0 or less will disable the eviction of elements
// from the cache.
func newRandomCache[TValue any](maxSize int) *randomCache[TValue] {
return &randomCache[TValue]{
cache: make(map[TKey]TValue),
maxSize: maxSize,
}
}
func (c *randomCache[TValue]) Get(key TKey) (value TValue, ok bool) {
value, ok = c.cache[key]
return value, ok
}
func (c *randomCache[TValue]) Put(key TKey, value TValue) {
if c.maxSize <= 0 || len(c.cache) < c.maxSize {
c.cache[key] = value
return
}
// Remove any key and save the new one
for k := range c.cache {
delete(c.cache, k)
break
}
c.cache[key] = value
}
func (c *randomCache[TValue]) Delete(key TKey) {
delete(c.cache, key)
}
package sdb
import (
"bytes"
"encoding/gob"
"fmt"
"os"
"path/filepath"
)
type metadata struct {
Version uint64
TotalEntries uint64
Generation uint64
Checkpoint uint64
}
func makeMetadata() metadata {
return metadata{
Version: version,
TotalEntries: 0,
Generation: 0,
Checkpoint: 0,
}
}
func (*metadata) FilePath() string {
return filepath.Join(metadataDirectory, "meta.gob")
}
func (m *metadata) Validate() error {
if m.Version != version {
return fmt.Errorf("version mismatch: expected %d, got %d",
version, m.Version)
}
return nil
}
func (m *metadata) Load(path string) error {
data, err := os.ReadFile(filepath.Join(path, m.FilePath()))
if err != nil {
return fmt.Errorf("read file: %w", err)
}
return m.Decode(data)
}
func (m *metadata) Save(path string) error {
data, err := m.Encode()
if err != nil {
return fmt.Errorf("encode metadata: %w", err)
}
w := newAtomicWriter(false)
return w.WriteFile(
filepath.Join(path, m.FilePath()),
data,
false,
)
}
func (m *metadata) Encode() ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(m)
if err != nil {
return nil, fmt.Errorf("encode gob: %w", err)
}
return buf.Bytes(), nil
}
func (m *metadata) Decode(data []byte) error {
dec := gob.NewDecoder(bytes.NewReader(data))
return dec.Decode(m)
}
package sdb
import "github.com/lucmq/go-shelve/sdb/internal"
// Option is passed to the Open function to create a customized DB.
type Option func(*DB)
// WithCacheSize sets the size of the cache used by the database. A value of -1
// represents an unlimited cache and a value of 0 disables the cache. The
// default cache size is -1.
func WithCacheSize(size int64) Option {
return func(db *DB) {
db.cache = internal.NewCache[cacheEntry](int(size))
}
}
// WithSynchronousWrites enables synchronous writes to the database. By default,
// synchronous writes are disabled.
func WithSynchronousWrites(sync bool) Option {
return func(db *DB) {
db.syncWrites = sync
}
}
package sdb
import (
"fmt"
"path/filepath"
)
// Recover the database from a corrupted state, detected at initialization.
func recoverDatabase(db *DB) error {
// Note: We have the following:
// - The DB design is simple, and all operations require at most one file
// mutation.
// - The metadata is stored in a single file, and all operations require
// at most one file mutation.
// - Currently, the only thing that can get corrupted is the metadata, in
// particular, the metadata.TotalEntries count.
//
// Thus, the recovery process can be limited to counting the number of
// files in the data folder and updating the metadata.
dataRoot := filepath.Join(db.path, dataDirectory)
totalItems, err := countItems(dataRoot)
if err != nil {
return fmt.Errorf("count items: %w", err)
}
db.metadata.TotalEntries = totalItems
db.metadata.Checkpoint = db.metadata.Generation
err = db.metadata.Save(db.path)
if err != nil {
return fmt.Errorf("sync metadata: %w", err)
}
return nil
}
func countItems(path string) (uint64, error) {
// Each database record is represented by a regular file.
return countFiles(path)
}
package shelve
import (
"bytes"
"encoding/gob"
"encoding/json"
"fmt"
"strconv"
)
// Codec is the interface for encoding and decoding data stored by Shelf.
//
// The go-shelve module natively supports the following codecs:
// - [GobCodec]: Returns a Codec for the [gob] format.
// - [JSONCodec]: Returns a Codec for the JSON format.
// - [StringCodec]: Returns a Codec for values that can be represented as
// strings.
//
// Additional codecs are provided by the packages in [driver/encoding].
//
// [driver/encoding]: https://pkg.go.dev/github.com/lucmq/go-shelve/driver/encoding
type Codec interface {
// Encode returns the Codec encoding of v as a byte slice.
Encode(v any) ([]byte, error)
// Decode parses the encoded data and stores the result in the value
// pointed to by v. It is the inverse of Encode.
Decode(data []byte, v any) error
}
// GobCodec Returns a Codec for the [gob] format, a self-describing
// serialization format native to Go.
//
// Gob is a binary format and is more compact than text-based formats like
// JSON.
func GobCodec() Codec { return gobCodec{} }
// JSONCodec Returns a Codec for the JSON format.
func JSONCodec() Codec { return jsonCodec{} }
// StringCodec Returns a Codec for values that can be represented as strings.
func StringCodec() Codec { return stringCodec{} }
// Gob Codec
type gobCodec struct{}
func (gobCodec) Encode(value any) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(value)
if err != nil {
return nil, fmt.Errorf("encode gob: %w", err)
}
return buf.Bytes(), nil
}
func (gobCodec) Decode(data []byte, value any) error {
dec := gob.NewDecoder(bytes.NewReader(data))
err := dec.Decode(value)
if err != nil {
return fmt.Errorf("decode gob: %w", err)
}
return nil
}
// Json Codec
type jsonCodec struct{}
func (jsonCodec) Encode(value any) ([]byte, error) {
return json.MarshalIndent(value, "", " ")
}
func (jsonCodec) Decode(data []byte, value any) error {
return json.Unmarshal(data, value)
}
// String Codec
type stringCodec struct{}
func (stringCodec) Encode(value any) ([]byte, error) {
switch v := value.(type) {
case string:
return []byte(v), nil
case int:
return []byte(strconv.Itoa(v)), nil
case int64:
return []byte(strconv.FormatInt(v, 10)), nil
case uint64:
return []byte(strconv.FormatUint(v, 10)), nil
default:
return []byte(fmt.Sprintf("%v", value)), nil
}
}
func (stringCodec) Decode(data []byte, value any) error {
switch v := value.(type) {
case *string:
*v = string(data)
return nil
case *int:
i, err := strconv.Atoi(string(data))
*v = i
return err
case *int64:
i, err := strconv.ParseInt(string(data), 10, 64)
*v = i
return err
case *uint64:
u, err := strconv.ParseUint(string(data), 10, 64)
*v = u
return err
default:
_, err := fmt.Sscanf(string(data), "%v", value)
return err
}
}
// Package shelve provides a persistent, map-like object called Shelf. It lets you
// store and retrieve Go objects directly, with the serialization and storage handled
// automatically by the Shelf. Additionally, you can customize the underlying
// key-value storage and serialization codec to better suit your application's needs.
//
// This package is inspired by the `shelve` module from the Python standard library
// and aims to provide a similar set of functionalities.
//
// By default, a Shelf serializes data using the Gob format and stores it using `sdb`
// (for "shelve-db"), a simple key-value storage created for this project. This
// database should be good enough for a broad range of applications, but the modules
// in [go-shelve/driver] provide additional options for configuring the `Shelf` with
// other databases and Codecs.
//
// [go-shelve/driver]: https://pkg.go.dev/github.com/lucmq/go-shelve/driver
package shelve
import (
"fmt"
"github.com/lucmq/go-shelve/sdb"
)
const (
// Asc and Desc can be used with the Shelf.Items method to make the
// iteration order ascending or descending respectively.
//
// They are just syntactic sugar to make the iteration order more
// explicit.
Asc = 1
// Desc is the opposite of Asc.
Desc = -1
// All can be used with the Shelf.Items method to iterate over all
// items in the database. It is the same as the -1 value.
All = -1
)
// Yield is a function called when iterating over key-value pairs in the
// Shelf. If Yield returns false or an error, the iteration stops.
type Yield[K, V any] func(key K, value V) (bool, error)
// A Shelf is a persistent, map-like object. It is used together with an
// underlying key-value storage to store Go objects directly.
//
// Stored values can be of arbitrary types, but the keys must be comparable.
//
// The values are encoded as [Gob], and keys, if they are strings or integers,
// are encoded as strings. Otherwise, they will also be encoded as Gob.
//
// For storage, the underlying database is an instance of the [sdb.DB]
// ("shelve-db") key-value store.
//
// The underlying storage and codec Shelf uses can be configured with the
// [Option] functions.
//
// [Gob]: https://pkg.go.dev/encoding/gob
type Shelf[K comparable, V any] struct {
db DB
codec Codec
keyCodec Codec
}
// Option is passed to the Open function to create a customized Shelf.
type Option func(any)
type options struct {
DB DB
Codec Codec
KeyCodec Codec
}
// WithDatabase specifies the underlying database to use. By default, the
// [sdb.DB] ("shelve-db") key-value storage is used.
//
// The packages in [driver/db] packages provide support for others databases in
// the Go ecosystem, like [Bolt] and [Badger].
//
// [driver/db]: https://pkg.go.dev/github.com/lucmq/go-shelve/driver/db
// [Bolt]: https://pkg.go.dev/github.com/etcd-io/bbolt
// [Badger]: https://pkg.go.dev/github.com/dgraph-io/badger
func WithDatabase(db DB) Option {
return func(v any) {
opt := v.(*options)
opt.DB = db
}
}
// WithCodec specifies the Codec to use. By default, a codec for the Gob format
// from the standard library ([encoding/gob]) is used.
//
// Additional Codecs can be found in the packages in [driver/encoding].
//
// [driver/encoding]: https://pkg.go.dev/github.com/lucmq/go-shelve/driver/encoding
func WithCodec(c Codec) Option {
return func(v any) {
opt := v.(*options)
opt.Codec = c
}
}
// WithKeyCodec specifies the Codec to use with keys. By default, if the key is
// a string or an integer type (both signed and unsigned), the [StringCodec] is
// used. Otherwise, keys are encoded as Gob ([encoding/gob]).
//
// Additional Codecs can be found in the packages in [driver/encoding].
//
// [driver/encoding]: https://pkg.go.dev/github.com/lucmq/go-shelve/driver/encoding
func WithKeyCodec(c Codec) Option {
return func(v any) {
opt := v.(*options)
opt.KeyCodec = c
}
}
// Open creates a new Shelf.
//
// The path parameter specifies the filesystem path to the database files. It
// can be a directory or a regular file, depending on the underlying database
// implementation. With the default database [sdb.DB], it will point to a
// directory.
func Open[K comparable, V any](path string, opts ...Option) (
*Shelf[K, V],
error,
) {
var k K
o := options{
Codec: GobCodec(),
KeyCodec: defaultKeyCodec(k),
}
for _, option := range opts {
option(&o)
}
if o.DB == nil {
db, err := sdb.Open(path)
if err != nil {
return nil, fmt.Errorf("open db: %w", err)
}
o.DB = db
}
return &Shelf[K, V]{
db: o.DB,
codec: o.Codec,
keyCodec: o.KeyCodec,
}, nil
}
// Close synchronizes and closes the Shelf.
func (s *Shelf[K, V]) Close() error {
return s.db.Close()
}
// Len returns the number of items in the Shelf. It returns the number
// of items as an int64. If an error occurs, it returns -1.
func (s *Shelf[K, V]) Len() int64 {
return s.db.Len()
}
// Sync synchronizes the Shelf contents to persistent storage.
func (s *Shelf[K, V]) Sync() error {
return s.db.Sync()
}
// Has reports whether a key exists in the Shelf.
func (s *Shelf[K, V]) Has(key K) (bool, error) {
data, err := s.keyCodec.Encode(key)
if err != nil {
return false, fmt.Errorf("encode: %w", err)
}
ok, err := s.db.Has(data)
if err != nil {
return false, fmt.Errorf("has: %w", err)
}
return ok, nil
}
// Get retrieves the value associated with a key from the Shelf. If the key is
// not found, it returns nil.
func (s *Shelf[K, V]) Get(key K) (value V, ok bool, err error) {
data, err := s.keyCodec.Encode(key)
if err != nil {
return *new(V), false, fmt.Errorf("encode: %w", err)
}
vData, err := s.db.Get(data)
if err != nil {
return *new(V), false, fmt.Errorf("get: %w", err)
}
if vData == nil {
return *new(V), false, nil
}
var v V
err = s.codec.Decode(vData, &v)
return v, true, err
}
// Put adds a key-value pair to the Shelf. If the key already exists, it
// overwrites the existing value.
func (s *Shelf[K, V]) Put(key K, value V) error {
data, err := s.keyCodec.Encode(key)
if err != nil {
return fmt.Errorf("encode key: %w", err)
}
vData, err := s.codec.Encode(value)
if err != nil {
return fmt.Errorf("encode value: %w", err)
}
err = s.db.Put(data, vData)
if err != nil {
return fmt.Errorf("put: %w", err)
}
return nil
}
// Delete removes a key-value pair from the Shelf.
func (s *Shelf[K, V]) Delete(key K) error {
data, err := s.keyCodec.Encode(key)
if err != nil {
return fmt.Errorf("encode: %w", err)
}
err = s.db.Delete(data)
if err != nil {
return fmt.Errorf("delete: %w", err)
}
return nil
}
// Items iterates over key-value pairs in the Shelf, calling fn(k, v) for each
// pair in the sequence. The iteration stops early if the function fn returns
// false.
//
// The start parameter specifies the key from which the iteration should start.
// If the start parameter is nil, the iteration will begin from the first key
// in the Shelf.
//
// The n parameter specifies the maximum number of items to iterate over. If n
// is -1 or less, all items will be iterated.
//
// The step parameter specifies the number of items to skip between each
// iteration. A negative value for step will cause the iteration to occur in
// reverse order.
//
// When iterating over key-value pairs in a Shelf, the order of iteration may
// not be sorted. Some database implementations may ignore the start parameter
// or not support iteration in reverse order.
//
// The default database used with Shelf (sdb.DB) does not yield items in any
// particular order and ignores the start parameter.
func (s *Shelf[K, V]) Items(start *K, n, step int, fn Yield[K, V]) error {
dbFn := func(k, v []byte) (bool, error) {
var key K
var value V
err := s.keyCodec.Decode(k, &key)
if err != nil {
return false, fmt.Errorf("decode key: %w", err)
}
if len(v) != 0 {
err = s.codec.Decode(v, &value)
if err != nil {
return false, fmt.Errorf("decode value: %w", err)
}
}
return fn(key, value)
}
return s.iterate(start, n, step, dbFn)
}
// Keys iterates over all keys in the Shelf and calls the user-provided
// function fn for each key. The details of the iteration are the same as
// for [Shelf.Items].
//
// The value parameter for fn will always be the zero value for the type V.
func (s *Shelf[K, V]) Keys(start *K, n, step int, fn Yield[K, V]) error {
dbFn := func(k, _ []byte) (bool, error) {
var key K
var zero V
err := s.keyCodec.Decode(k, &key)
if err != nil {
return false, fmt.Errorf("decode: %w", err)
}
return fn(key, zero)
}
return s.iterate(start, n, step, dbFn)
}
// Values iterates over all values in the Shelf and calls the user-provided
// function fn for each value. The details of the iteration are the same as
// for [Shelf.Items].
//
// The key parameter for fn will always be the zero value for the type K.
func (s *Shelf[K, V]) Values(start *K, n, step int, fn Yield[K, V]) error {
dbFn := func(_, v []byte) (bool, error) {
var zero K
var value V
err := s.codec.Decode(v, &value)
if err != nil {
return false, fmt.Errorf("decode: %w", err)
}
return fn(zero, value)
}
return s.iterate(start, n, step, dbFn)
}
func (s *Shelf[K, V]) iterate(
start *K,
n, step int,
fn func(k, v []byte) (bool, error),
) error {
var from []byte = nil
var err error
if start != nil {
from, err = s.keyCodec.Encode(*start)
if err != nil {
return fmt.Errorf("encode start: %w", err)
}
}
var order int
if step > 0 {
order = Asc
} else if step < 0 {
order = Desc
step = -step
} else {
return nil
}
var total int
var counter = step - 1 // 0, 1, ..., step - 1
return s.db.Items(from, order, func(k, v []byte) (bool, error) {
if n > 0 && total >= n {
return false, nil
}
// Increase counter until the step is reached
if counter < step-1 {
counter++
return true, nil
}
counter = 0
total++
return fn(k, v)
})
}
// Helpers
func defaultKeyCodec(key any) Codec {
switch key.(type) {
case int8, int16, int32, int64, int:
return StringCodec()
case uint8, uint16, uint32, uint64, uint:
return StringCodec()
case string:
return StringCodec()
default:
return GobCodec()
}
}