package main
import (
"errors"
"flag"
"fmt"
"os"
"github.com/lucmq/go-shelve/shelve"
)
type Shelf = shelve.Shelf[string, string]
var exitOnError = true
var exit = os.Exit
func main() {
if err := run(); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "run failed: %v\n", err)
if exitOnError {
exit(1)
}
}
}
func run() error {
flag.Usage = printUsage
storePath := flag.String("path", ".store", "Path to the shelve store")
codecName := flag.String("codec", "json", "value serialization format: gob, json, or text")
flag.Parse()
args := flag.Args()
if len(args) < 1 {
printUsage()
return nil
}
command := args[0]
commandArgs := args[1:]
codec, err := getCodec(*codecName)
if err != nil {
return fmt.Errorf("get codec: %w", err)
}
// Open the shelve store
store, err := shelve.Open[string, string](
*storePath,
shelve.WithCodec(codec),
)
if err != nil {
return fmt.Errorf("open store: %w", err)
}
defer store.Close()
// Execute the appropriate command
switch command {
case "put":
return handlePut(store, commandArgs)
case "get":
return handleGet(store, commandArgs)
case "has":
return handleHas(store, commandArgs)
case "delete":
return handleDelete(store, commandArgs)
case "len":
return handleLen(store)
case "items":
return handleItems(store, "items", commandArgs)
case "keys":
return handleItems(store, "keys", commandArgs)
case "values":
return handleItems(store, "values", commandArgs)
default:
return fmt.Errorf("unknown command: %s", command)
}
}
func getCodec(name string) (shelve.Codec, error) {
switch name {
case "gob":
return shelve.GobCodec(), nil
case "json":
return shelve.JSONCodec(), nil
case "text":
return shelve.TextCodec(), nil
default:
return nil, fmt.Errorf("unsupported codec: %s", name)
}
}
// Put key-value pairs.
func handlePut(store *Shelf, args []string) error {
if len(args) < 2 || len(args)%2 != 0 {
return errors.New("usage: shelve put <key> <value> [<key> <value> ...]")
}
for i := 0; i < len(args); i += 2 {
key := args[i]
value := args[i+1]
if err := store.Put(key, value); err != nil {
return fmt.Errorf("put key-value pair (%s, %s): %w", key, value, err)
}
}
fmt.Println("OK")
return nil
}
// Get value by key.
func handleGet(store *Shelf, args []string) error {
if len(args) < 1 {
return errors.New("usage: shelve get <key>")
}
key := args[0]
value, _, err := store.Get(key)
if err != nil {
return fmt.Errorf("get key: %w", err)
}
fmt.Println(value)
return nil
}
// Check if a key exists.
func handleHas(store *Shelf, args []string) error {
if len(args) < 1 {
return errors.New("usage: shelve has <key>")
}
key := args[0]
ok, err := store.Has(key)
if err != nil {
return fmt.Errorf("check key existence: %w", err)
}
if ok {
fmt.Println("true")
} else {
fmt.Println("false")
}
return nil
}
// Delete a key.
func handleDelete(store *Shelf, args []string) error {
if len(args) < 1 {
return errors.New("usage: shelve delete <key>")
}
key := args[0]
if err := store.Delete(key); err != nil {
return fmt.Errorf("delete key: %w", err)
}
fmt.Println("OK")
return nil
}
// Get total number of keys.
func handleLen(store *Shelf) error {
count := store.Len()
if count == -1 {
return errors.New("failed to get length")
}
fmt.Println(count)
return nil
}
// List items, keys, or values with optional filters.
func handleItems(store *Shelf, mode string, args []string) error {
fs := flag.NewFlagSet(mode, flag.ContinueOnError)
start := fs.String("start", "", "Start key (inclusive)")
end := fs.String("end", "", "End key (exclusive)")
limit := fs.Int("limit", shelve.All, "Maximum number of items")
if err := fs.Parse(args); err != nil {
return fmt.Errorf("parse flags: %w", err)
}
switch mode {
case "items":
return printItems(store, start, end, *limit)
case "keys":
return printKeys(store, start, end, *limit)
case "values":
return printValues(store, start, end, *limit)
default:
return fmt.Errorf("invalid mode: %s", mode)
}
}
// Helper: Print key-value pairs.
func printItems(store *Shelf, start, end *string, limit int) error {
return store.Items(start, limit, shelve.Asc, func(key, value string) (bool, error) {
if *end != "" && key >= *end {
return false, nil
}
fmt.Println(key, value)
return true, nil
})
}
// Helper: Print keys only.
func printKeys(store *Shelf, start, end *string, limit int) error {
return store.Keys(start, limit, shelve.Asc, func(key, _ string) (bool, error) {
if *end != "" && key >= *end {
return false, nil
}
fmt.Println(key)
return true, nil
})
}
// Helper: Print values only.
func printValues(store *Shelf, start, end *string, limit int) error {
return store.Items(start, limit, shelve.Asc, func(key, value string) (bool, error) {
if *end != "" && key >= *end {
return false, nil
}
fmt.Println(value)
return true, nil
})
}
func printUsage() {
fmt.Println(`shelve is a CLI tool for managing a shelve key-value store.
Usage:
shelve [options] <command> [arguments]
The commands are:
put store one or more key-value pairs
get retrieve the value of a key
has check if a key exists
delete remove a key
len count total keys in the store
items list key-value pairs
keys list only the keys
values list only the values
Options:
`)
flag.PrintDefaults()
}
// Package sdb offers a simple key-value database that can be utilized with the
// go-shelve project.
//
// It should be suitable for a wide range of applications, but the driver
// directory (go-shelve/driver) provides additional options for configuring the
// Shelf with other supported databases from the Go ecosystem.
//
// # DB Records
//
// In sdb, each database record is represented by a distinct file stored in a
// bucket, which is a corresponding filesystem directory. The number of
// documents stored in each bucket is unlimited, and modern filesystems should
// be able to handle large buckets without significantly affecting performance.
//
// Each file record's name is "base32hex" encoding of the key, which preserves
// lexical sort order [1]. Keys are limited to 128 characters. The record file
// is stored as binary data. With this design, Users do not need to worry about
// hitting the maximum filename length or storing keys with forbidden
// characters.
//
// # Cache
//
// The sdb database uses a memory-based cache to speed up operations. By
// default, the cache size is unlimited, but it can be configured to a fixed
// size or disabled altogether.
//
// The cache's design, albeit simple, can enhance the performance of "DB.Get"
// and "DB.Items" to more than 1 million reads per second on standard hardware.
//
// # Atomicity
//
// New records are written atomically to the key-value store. With a
// file-per-record design, sdb achieves this by using atomic file writes, which
// consist of creating a temporary file and then renaming it [2].
//
// This ensures that the database's methods are always performed with one
// atomic operation, significantly simplifying the recovery process.
//
// Currently, the only data that can become inconsistent is the count of stored
// records, but if this happens, it is detected and corrected at the DB
// initialization.
//
// As an optimization, records might be written directly without needing a
// temporary file if the data fits in a single sector since a single-sector
// write can be assumed to be atomic on some systems [3] [4].
//
// # Durability
//
// By default, sdb leverages the filesystem cache to speed up the database
// writes. This is generally suitable for most applications for which sdb is
// intended, as modern hardware can offer sufficient protection against
// crashes and ensure durability.
//
// For the highest level of durability, the WithSynchronousWrites option makes
// the database synchronize data to persistent storage on each write.
//
// # Notes
//
// [1] https://datatracker.ietf.org/doc/html/rfc4648#section-7
//
// [2] On Windows, additional configuration is involved.
//
// [3] https://stackoverflow.com/questions/2009063/are-disk-sector-writes-atomic
//
// [4] https://web.cs.ucla.edu/classes/spring07/cs111-2/scribe/lecture14.html
package sdb
import (
"encoding/base32"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/lucmq/go-shelve/sdb/internal"
)
const (
// DefaultCacheSize is the default size of the cache used to speed up the
// database operations. A value of -1 represents an unlimited cache.
DefaultCacheSize = -1
// MaxKeyLength is the maximum size of a key.
MaxKeyLength = 128
// metadataSyncInterval is the interval at which the metadata is synced to
// disk.
metadataSyncInterval = 1 * time.Minute
)
const (
dataDirectory = "data"
metadataDirectory = "meta"
)
const version = 1
var (
// ErrKeyTooLarge is returned when a key exceeds the maximum length.
ErrKeyTooLarge = errors.New("key exceeds maximum length")
// ErrDatabaseClosed is returned when the database is closed.
ErrDatabaseClosed = errors.New("database is closed")
)
// Yield is a function called when iterating over key-value pairs in the
// database. If Yield returns false or an error, the iteration stops.
type Yield = func(key, value []byte) (bool, error)
// DB represents a database, which is created with the Open function.
//
// Client applications must call DB.Close() when done with the database.
//
// A DB is safe for concurrent use by multiple goroutines.
type DB struct {
mu sync.RWMutex
path string
metadata metadata
cache internal.Cache[cacheEntry]
closed bool
// Controls the background sync loop.
done chan struct{}
wg sync.WaitGroup
syncWrites bool
// autoSync enables the background sync loop. Can be removed if a WAL
// is adopted for consistency, since the WAL would handle the sync
// loop unnecessary.
autoSync bool
}
// cacheEntry represents an entry in the cache.
type cacheEntry = []byte
// Open opens the database at the given path. If the path does not exist, it is
// created.
//
// Client applications must call DB.Close() when done with the database.
func Open(path string, options ...Option) (*DB, error) {
db := DB{
path: path,
metadata: makeMetadata(),
cache: internal.NewCache[cacheEntry](-1),
done: make(chan struct{}),
syncWrites: false,
autoSync: true,
}
// Apply options.
for _, option := range options {
option(&db)
}
err := initializeDatabase(&db)
if err != nil {
return nil, fmt.Errorf("initialize database: %w", err)
}
// Start the background loop if autoSync is enabled.
if db.autoSync {
db.wg.Add(1)
go syncMetadata(&db)
}
return &db, nil
}
// Close synchronizes and closes the database. Users must ensure no pending
// operations are in progress before calling Close().
//
// Example:
//
// var wg sync.WaitGroup
// db, _ := sdb.Open("path")
//
// // Start concurrent writes
// for i := 0; i < 10; i++ {
// wg.Add(1)
// go func(i int) {
// defer wg.Done()
// db.Put([]byte(fmt.Sprintf("key-%d", i)), []byte("value"))
// }(i)
// }
//
// wg.Wait() // Ensure all writes are done
// db.Close() // Safe to close now
func (db *DB) Close() error {
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
return nil
}
db.closed = true
// Signal the background goroutine to stop.
close(db.done)
db.wg.Wait()
// Final sync.
return syncInternal(db)
}
// Len returns the number of items in the database. If an error occurs, it
// returns -1.
func (db *DB) Len() int64 {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return -1
}
return int64(db.metadata.TotalEntries)
}
// Sync synchronizes the database to persistent storage.
func (db *DB) Sync() error {
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
return ErrDatabaseClosed
}
return syncInternal(db)
}
// Has reports whether a key exists in the database.
func (db *DB) Has(key []byte) (bool, error) {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return false, ErrDatabaseClosed
}
_, ok := db.cache.Get(string(key))
if ok {
return true, nil
}
_, err := os.Stat(keyPath(db, key))
if err != nil && !os.IsNotExist(err) {
return false, fmt.Errorf("stat: %w", err)
}
return !os.IsNotExist(err), nil
}
// Get retrieves the value associated with a key from the database. If the key
// is not found, it returns nil.
func (db *DB) Get(key []byte) ([]byte, error) {
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return nil, ErrDatabaseClosed
}
v, ok := db.cache.Get(string(key))
if ok {
return v, nil
}
value, err := os.ReadFile(keyPath(db, key))
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("read file: %w", err)
}
if os.IsNotExist(err) {
return nil, nil
}
return value, err
}
// Put adds a key-value pair to the database. If the key already exists, it
// overwrites the existing value.
//
// It returns an error if the key is greater than [MaxKeyLength].
func (db *DB) Put(key, value []byte) error {
if err := prepareForMutation(db); err != nil {
return fmt.Errorf("prepare for mutation: %w", err)
}
if len(key) > MaxKeyLength {
return ErrKeyTooLarge
}
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
return ErrDatabaseClosed
}
updated, err := putPath(db, keyPath(db, key), value)
if err != nil {
return fmt.Errorf("put path: %w", err)
}
if !updated {
db.metadata.TotalEntries++
}
db.metadata.Generation++
// Cache aside
db.cache.Put(string(key), value)
return nil
}
func putPath(db *DB, path string, value []byte) (updated bool, err error) {
_, err = os.Stat(path)
if err != nil && !os.IsNotExist(err) {
return false, fmt.Errorf("stat: %w", err)
}
if err == nil {
updated = true
}
writer := newAtomicWriter(db.syncWrites)
err = writer.WriteFile(path, value, !updated)
return updated, err
}
// Delete removes a key-value pair from the database.
func (db *DB) Delete(key []byte) error {
if err := prepareForMutation(db); err != nil {
return fmt.Errorf("prepare for mutation: %w", err)
}
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
return ErrDatabaseClosed
}
var deleted bool
err := os.Remove(keyPath(db, key))
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove: %w", err)
}
if err == nil {
deleted = true
}
if deleted {
db.metadata.TotalEntries--
}
db.metadata.Generation++
db.cache.Delete(string(key))
return nil
}
// Items iterates over key-value pairs in the database, invoking fn(k, v)
// for each pair. Iteration stops early if fn returns false.
//
// The start and order parameters are present for compatibility with the
// shelve.DB interface but are currently unused.
//
// This operation acquires a read lock each time a database record is read
// and holds it for the duration of the fn callback. Implementations that
// require faster lock release should copy the key-value pair and return
// from the callback as quickly as possible.
//
// The user-provided fn(k, v) must not modify the database within the same
// goroutine as the iteration, as this would cause a deadlock.
func (db *DB) Items(start []byte, order int, fn Yield) error {
_, _ = start, order
db.mu.RLock()
if db.closed {
db.mu.RUnlock()
return ErrDatabaseClosed
}
db.mu.RUnlock()
root := filepath.Join(db.path, dataDirectory)
if _, err := items(db, root, fn); err != nil {
return fmt.Errorf("walk data directory: %w", err)
}
return nil
}
func items(
db *DB,
root string,
fn func(key, value []byte) (bool, error),
) (
count int,
err error,
) {
err = readDir(root, func(name string) (bool, error) {
path := filepath.Join(root, name)
count++
return handlePathWithLock(db, path, fn)
})
return count, err
}
func handlePathWithLock(
db *DB,
path string,
fn func(key, value []byte) (bool, error),
) (bool, error) {
key, err := parseKey(path)
if err != nil {
return false, fmt.Errorf("parse key: %w", err)
}
// Note: Hold the lock while the callback fn is being executed. Do not
// assume we can release it earlier (after the record read).
// This ensures that `fn` does not process stale data (i.e. the key-value pair
// will be the same on the database for as long as `fn` is running).
// However, notice that this will cause a deadlock if the code from `fn` tries to
// modify the database (i.e. Put or Delete, which write-acquire the mutex).
db.mu.RLock()
defer db.mu.RUnlock()
if db.closed {
return false, ErrDatabaseClosed
}
// Use the cache (but do not cache aside while iterating) because that would
// result in a lot of cache turnover with keys that might not be needed to be
// cached.
value, ok := db.cache.Get(string(key))
if ok {
return fn(key, value)
}
// Read from the disk.
v, err := os.ReadFile(path)
if errors.Is(err, os.ErrNotExist) {
// Deleted while iterating? Ignore.
return true, nil
}
if err != nil {
return false, fmt.Errorf("read key-value: %w", err)
}
return fn(key, v)
}
// Helpers
func keyPath(db *DB, key []byte) string {
base := base32.HexEncoding.EncodeToString(key)
return filepath.Join(db.path, dataDirectory, base)
}
func parseKey(path string) ([]byte, error) {
base := filepath.Base(path)
return base32.HexEncoding.DecodeString(base)
}
// prepareForMutation ensures we have enough information saved in persistent
// storage to be able to recover the database in the event of an error.
//
// Before each mutation, we compare the database generation value with the
// checkpoint. If they are equal, we increase generation and sync the metadata.
// Different values for generation and checkpoint indicates that the database
// has pending state to be synced to persistent storage.
//
// The I/O done by this function should be amortized between many mutations.
func prepareForMutation(db *DB) error {
ok := db.mu.TryLock()
if !ok {
return nil
}
defer db.mu.Unlock()
if db.closed {
return ErrDatabaseClosed
}
if db.metadata.Generation != db.metadata.Checkpoint {
// Already drifted
return nil
}
// Mark as loaded
db.metadata.Generation = db.metadata.Checkpoint + 1
// Sync the metadata
return db.metadata.Save(db.path)
}
func syncInternal(db *DB) error {
// Mark as consistent
db.metadata.Checkpoint = db.metadata.Generation
return db.metadata.Save(db.path)
}
// syncMetadata periodically syncs the metadata to persistent storage.
//
// Note: This is only done to decrease the chance of a recovery triggered
// in the initialization due to a user forgetting to call DB.Close() or a
// system crash. The database doesn't really depend on this mechanism and
// errors here can be ignored.
func syncMetadata(db *DB) {
defer db.wg.Done()
ticker := time.NewTicker(metadataSyncInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
_ = db.Sync()
case <-db.done:
// The channel is closed in Close(); exit the goroutine.
return
}
}
}
package sdb
import (
"errors"
"fmt"
"io"
"math/rand/v2"
"os"
"path/filepath"
"runtime"
"time"
)
var (
defaultDiskSectorSize = 4096
defaultPermissions = os.FileMode(0600)
defaultDirPermissions = os.FileMode(0700)
)
// The main object of atomicWrite is to protect against incomplete writes.
// When used together with O_SYNC, atomicWrite also provides some additional
// durability guarantees.
type atomicWriter struct {
syncWrites bool
diskSectorSize int
perm os.FileMode
}
func newAtomicWriter(syncWrites bool) *atomicWriter {
// Note: If we decide to ask the host system for the disk sector size,
// we can use the go `init` function for that and keep this constructor
// cleaner, without the need to return an error and also, without the
// need to query the os multiple times.
diskSectorSize := defaultDiskSectorSize
return &atomicWriter{
syncWrites: syncWrites,
diskSectorSize: diskSectorSize,
perm: defaultPermissions,
}
}
func (w *atomicWriter) flag(excl bool) int {
flag := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
if w.syncWrites {
flag |= os.O_SYNC
}
if excl {
flag |= os.O_EXCL
}
return flag
}
func (w *atomicWriter) WriteFile(path string, data []byte, excl bool) error {
var err error
defer func() {
// Sync the parent directory for more durability guarantees. See:
// - https://lwn.net/Articles/457667/#:~:text=When%20should%20you%20Fsync
if err == nil && w.syncWrites {
_ = syncFile(filepath.Dir(path))
}
}()
if runtime.GOOS == "linux" && len(data) <= w.diskSectorSize {
// Optimization: Write directly if the data fits in a single sector,
// since a single-sector write can be assumed to be atomic. See:
//
// - https://stackoverflow.com/questions/2009063/are-disk-sector-writes-atomic
// - https://web.cs.ucla.edu/classes/spring07/cs111-2/scribe/lecture14.html
//
// This optimization assumes that the host supports atomic writes to a
// disk sector.
return w._writeFile(path, data, excl)
}
tmpPath := makeTempPath(path)
// w.writeFile will sync, if configured to do so.
err = w._writeFile(tmpPath, data, excl)
if err != nil {
return fmt.Errorf("write: %w", err)
}
return renameFile(tmpPath, path)
}
// writeFile writes data to the named file, creating it if necessary.
// If the file does not exist, WriteFile creates it with permissions perm (before umask);
// otherwise writeFile truncates it before writing, without changing permissions.
// Since writeFile requires multiple system calls to complete, a failure mid-operation
// can leave the file in a partially written state.
func (w *atomicWriter) _writeFile(name string, data []byte, excl bool) error {
// Adapted from `os.WriteFile()`
f, err := os.OpenFile(name, w.flag(excl), w.perm)
if err != nil {
return err
}
_, err = f.Write(data)
if err1 := f.Close(); err1 != nil && err == nil {
err = err1
}
return err
}
func mkdirs(paths []string, perm os.FileMode) error {
for _, path := range paths {
if err := os.MkdirAll(path, perm); err != nil {
return fmt.Errorf("MkdirAll: %w", err)
}
}
return nil
}
func countFiles(path string) (uint64, error) {
var count uint64
err := readDir(path, func(name string) (bool, error) {
count++
return true, nil
})
return count, err
}
func readDir(path string, fn func(name string) (bool, error)) (err error) {
var f *os.File
f, err = os.Open(path)
if err != nil {
return fmt.Errorf("open: %w", err)
}
defer func() {
// Safely close the file and assign to the return value
if err1 := f.Close(); err1 != nil && err == nil {
err = err1
}
}()
for {
var names []string
batchSize := 256
// Note: We may need to acquire a read lock (`DB.mu.RLock()`) both here
// and within DB.handlePathWithLock, as we already do. This ensures
// consistency when reading directory entries and accessing database records.
names, err = f.Readdirnames(batchSize)
if err != nil {
// EOF or unreadable dir
if errors.Is(err, io.EOF) {
err = nil
}
return err
}
for _, name := range names {
ok, err := fn(name)
if err != nil {
return fmt.Errorf("fn: %w", err)
}
if !ok {
// Stop Iteration
return nil
}
}
}
}
// Helpers
func makeTempPath(path string) string {
tmpBase := fmt.Sprintf(
"%s-%d-%d",
filepath.Base(path),
rand.Uint32(),
time.Now().UnixNano(),
)
tmpPath := filepath.Join(os.TempDir(), tmpBase)
return tmpPath
}
func syncFile(path string) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("open: %w", err)
}
err = f.Sync()
if err1 := f.Close(); err1 != nil && err == nil {
err = err1
}
return err
}
//go:build !windows
// +build !windows
package sdb
import "os"
// renameFile atomically replaces the destination file or directory with the
// source. It is guaranteed to either replace the target file entirely, or not
// change either file.
func renameFile(oldpath, newpath string) error {
return os.Rename(oldpath, newpath)
}
package sdb
import (
"fmt"
"os"
"path/filepath"
)
func initializeDatabase(db *DB) error {
// Check if the database already exists
fi, err := os.Stat(db.path)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("stat path: %w", err)
}
if os.IsNotExist(err) {
return createDatabaseStorage(db)
} else {
// Check permissions
if !fi.IsDir() {
return fmt.Errorf("path is not a directory")
} else if fi.Mode().Perm()&0700 != 0700 {
return fmt.Errorf("path permissions are not 0700")
}
}
// Load the metadata
db.metadata = metadata{}
err = db.metadata.Load(db.path)
if err != nil {
return fmt.Errorf("load metadata: %w", err)
}
// Check the DB consistency and possibly recover from a corrupted
// state
return sanityCheck(db)
}
func createDatabaseStorage(db *DB) error {
paths := []string{
db.path,
filepath.Join(db.path, dataDirectory),
filepath.Join(db.path, metadataDirectory),
}
if err := mkdirs(paths, defaultDirPermissions); err != nil {
return fmt.Errorf("create directories: %w", err)
}
// Sync the database
err := db.Sync()
if err != nil {
return fmt.Errorf("sync: %w", err)
}
return nil
}
// Check version, totalBuckets and the generations. Recover from a corrupted
// database if the generation checkpoint doesn't match the current generation.
func sanityCheck(db *DB) error {
if err := db.metadata.Validate(); err != nil {
return err
}
// Check generations
if db.metadata.Generation != db.metadata.Checkpoint {
return recoverDatabase(db)
}
return nil
}
package internal
// TKey is the key type used by the cache.
type TKey = string
// Cache is a generic cache interface.
type Cache[TValue any] interface {
// Get retrieves a value from the cache based on the provided key. It
// returns the value and a boolean indicating whether the value was
// found in the cache.
Get(key TKey) (TValue, bool)
// Put adds a new key-value pair to the cache.
Put(key TKey, value TValue)
// Delete removes a value from the cache based on the provided key.
Delete(key TKey)
}
// Default Cache
// DefaultCache is the default implementation of the Cache interface. It is not
// safe for concurrent use, as it meant to be embedded in code that does the
// concurrency control.
type DefaultCache[TValue any] struct {
cache Cache[TValue]
hits int
misses int
}
// Assert DefaultCache implements Cache
var _ Cache[any] = (*DefaultCache[any])(nil)
// NewCache creates a new cache based on the provided maximum length.
//
// Setting the maxLength to -1 or less will disable the eviction of elements
// from the cache. A maxLength of 0 will create a pass-through cache that
// does nothing.
func NewCache[TValue any](maxLength int) *DefaultCache[TValue] {
var c Cache[TValue]
switch {
case maxLength <= -1:
c = newUnboundedCache[TValue]()
case maxLength == 0:
c = newPassThroughCache[TValue]()
default:
c = newRandomCache[TValue](maxLength)
}
return newCacheWithBase[TValue](c)
}
func newCacheWithBase[TValue any](c Cache[TValue]) *DefaultCache[TValue] {
return &DefaultCache[TValue]{cache: c}
}
// Get retrieves a value from the cache based on the provided key. It
// returns the value and a boolean indicating whether the value was
// found in the cache.
func (c *DefaultCache[TValue]) Get(key TKey) (TValue, bool) {
v, ok := c.cache.Get(key)
if !ok {
c.misses++
return v, false
}
c.hits++
return v, true
}
// Put adds a new key-value pair to the cache.
func (c *DefaultCache[TValue]) Put(key TKey, value TValue) {
c.cache.Put(key, value)
}
// Delete removes a value from the cache based on the provided key.
func (c *DefaultCache[TValue]) Delete(key TKey) { c.cache.Delete(key) }
// Hits returns the number of cache hits (i.e. the number of Get calls that
// found the value in the cache).
func (c *DefaultCache[TValue]) Hits() int { return c.hits }
// Misses returns the number of cache misses (i.e. the number of Get calls that
// did not find the value in the cache).
func (c *DefaultCache[TValue]) Misses() int { return c.misses }
// ResetRatio resets the ratio of hits to misses.
func (c *DefaultCache[TValue]) ResetRatio() {
c.misses = 0
c.hits = 0
}
// Unbounded Cache
type unboundedCache[TValue any] struct {
m map[TKey]TValue
}
// Check unboundedCache implements Cache interface
var _ Cache[any] = (*unboundedCache[any])(nil)
func newUnboundedCache[TValue any]() *unboundedCache[TValue] {
return &unboundedCache[TValue]{
m: make(map[TKey]TValue),
}
}
func (c *unboundedCache[TValue]) Get(key TKey) (TValue, bool) {
v, ok := c.m[key]
return v, ok
}
func (c *unboundedCache[TValue]) Put(key TKey, value TValue) {
c.m[key] = value
}
func (c *unboundedCache[TValue]) Delete(key TKey) {
delete(c.m, key)
}
// Pass-Through Cache
// passThroughCache is a simple pass-through cache.
type passThroughCache[TValue any] struct{}
// Check passThroughCache implements Cache interface
var _ Cache[any] = (*passThroughCache[any])(nil)
// newPassThroughCache creates a new pass-through cache.
func newPassThroughCache[TValue any]() *passThroughCache[TValue] {
return &passThroughCache[TValue]{}
}
func (passThroughCache[TValue]) Get(TKey) (v TValue, ok bool) { return }
func (passThroughCache[TValue]) Put(TKey, TValue) {}
func (passThroughCache[TValue]) Delete(TKey) {}
// Random Cache
// randomCache provides a cache that evicts elements randomly.
type randomCache[TValue any] struct {
cache map[TKey]TValue
maxSize int
}
// Check randomCache implements Cache interface
var _ Cache[any] = (*randomCache[any])(nil)
// newRandomCache creates a new instance of the randomCache struct, that can
// hold up to maxSize elements.
//
// Setting the maxSize to 0 or less will disable the eviction of elements
// from the cache.
func newRandomCache[TValue any](maxSize int) *randomCache[TValue] {
return &randomCache[TValue]{
cache: make(map[TKey]TValue),
maxSize: maxSize,
}
}
func (c *randomCache[TValue]) Get(key TKey) (value TValue, ok bool) {
value, ok = c.cache[key]
return value, ok
}
func (c *randomCache[TValue]) Put(key TKey, value TValue) {
if c.maxSize <= 0 || len(c.cache) < c.maxSize {
c.cache[key] = value
return
}
// Remove any key and save the new one
for k := range c.cache {
delete(c.cache, k)
break
}
c.cache[key] = value
}
func (c *randomCache[TValue]) Delete(key TKey) {
delete(c.cache, key)
}
package sdb
import (
"bytes"
"encoding/gob"
"fmt"
"os"
"path/filepath"
)
type metadata struct {
Version uint64
TotalEntries uint64
Generation uint64
Checkpoint uint64
}
func makeMetadata() metadata {
return metadata{
Version: version,
TotalEntries: 0,
Generation: 0,
Checkpoint: 0,
}
}
func (*metadata) FilePath() string {
return filepath.Join(metadataDirectory, "meta.gob")
}
func (m *metadata) Validate() error {
if m.Version != version {
return fmt.Errorf("version mismatch: expected %d, got %d",
version, m.Version)
}
return nil
}
func (m *metadata) Load(path string) error {
data, err := os.ReadFile(filepath.Join(path, m.FilePath()))
if err != nil {
return fmt.Errorf("read file: %w", err)
}
return m.Decode(data)
}
func (m *metadata) Save(path string) error {
data, err := m.Encode()
if err != nil {
return fmt.Errorf("encode metadata: %w", err)
}
w := newAtomicWriter(false)
return w.WriteFile(
filepath.Join(path, m.FilePath()),
data,
false,
)
}
func (m *metadata) Encode() ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(m)
if err != nil {
return nil, fmt.Errorf("encode gob: %w", err)
}
return buf.Bytes(), nil
}
func (m *metadata) Decode(data []byte) error {
dec := gob.NewDecoder(bytes.NewReader(data))
return dec.Decode(m)
}
package sdb
import "github.com/lucmq/go-shelve/sdb/internal"
// Option is passed to the Open function to create a customized DB.
type Option func(*DB)
// WithCacheSize sets the size of the cache used by the database. A value of -1
// represents an unlimited cache and a value of 0 disables the cache. The
// default cache size is -1.
func WithCacheSize(size int64) Option {
return func(db *DB) {
db.cache = internal.NewCache[cacheEntry](int(size))
}
}
// WithSynchronousWrites enables synchronous writes to the database. By default,
// synchronous writes are disabled.
func WithSynchronousWrites(sync bool) Option {
return func(db *DB) {
db.syncWrites = sync
}
}
package sdb
import (
"fmt"
"path/filepath"
)
// Recover the database from a corrupted state, detected at initialization.
func recoverDatabase(db *DB) error {
// Note: We have the following:
// - The DB design is simple, and all operations require at most one file
// mutation.
// - The metadata is stored in a single file, and all operations require
// at most one file mutation.
// - Currently, the only thing that can get corrupted is the metadata, in
// particular, the metadata.TotalEntries count.
//
// Thus, the recovery process can be limited to counting the number of
// files in the data folder and updating the metadata.
dataRoot := filepath.Join(db.path, dataDirectory)
totalItems, err := countItems(dataRoot)
if err != nil {
return fmt.Errorf("count items: %w", err)
}
db.metadata.TotalEntries = totalItems
db.metadata.Checkpoint = db.metadata.Generation
err = db.metadata.Save(db.path)
if err != nil {
return fmt.Errorf("sync metadata: %w", err)
}
return nil
}
func countItems(path string) (uint64, error) {
// Each database record is represented by a regular file.
return countFiles(path)
}
package shelve
import (
"bytes"
"encoding"
"encoding/gob"
"encoding/hex"
"encoding/json"
"fmt"
"reflect"
"strconv"
)
// Codec is the interface for encoding and decoding data stored by Shelf.
//
// The go-shelve module natively supports the following codecs:
// - [GobCodec]: Returns a Codec for the [gob] format.
// - [JSONCodec]: Returns a Codec for the [json] format.
// - [TextCodec]: Returns a Codec for values that can be represented as
// plain text.
//
// Additional codecs are provided by the packages in [driver/encoding].
//
// [driver/encoding]: https://pkg.go.dev/github.com/lucmq/go-shelve/driver/encoding
type Codec interface {
// Encode returns the Codec encoding of v as a byte slice.
Encode(v any) ([]byte, error)
// Decode parses the encoded data and stores the result in the value
// pointed to by v. It is the inverse of Encode.
Decode(data []byte, v any) error
}
// GobCodec Returns a Codec for the [gob] format, a self-describing
// serialization format native to Go.
//
// Gob is a binary format and is more compact than text-based formats like
// JSON.
func GobCodec() Codec { return gobCodec{} }
// JSONCodec Returns a Codec for the JSON format.
func JSONCodec() Codec { return jsonCodec{} }
// TextCodec Returns a Codec for values that can be represented as plain text.
func TextCodec() Codec { return textCodec{} }
// Gob Codec
type gobCodec struct{}
func (gobCodec) Encode(value any) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(value)
if err != nil {
return nil, fmt.Errorf("encode gob: %w", err)
}
return buf.Bytes(), nil
}
func (gobCodec) Decode(data []byte, value any) error {
dec := gob.NewDecoder(bytes.NewReader(data))
err := dec.Decode(value)
if err != nil {
return fmt.Errorf("decode gob: %w", err)
}
return nil
}
// Json Codec
type jsonCodec struct{}
func (jsonCodec) Encode(value any) ([]byte, error) {
return json.MarshalIndent(value, "", " ")
}
func (jsonCodec) Decode(data []byte, value any) error {
return json.Unmarshal(data, value)
}
// Text Codec
// textCodec encodes scalar values, fixed-size byte arrays, and types that
// implement encoding.TextMarshaler.
// It supports strings, booleans, integers, floats, and [N]byte arrays (encoded
// as hex).
type textCodec struct{}
func (textCodec) Encode(value any) ([]byte, error) {
switch v := value.(type) {
case string:
return []byte(v), nil
case int:
return []byte(strconv.Itoa(v)), nil
case int8:
return []byte(strconv.FormatInt(int64(v), 10)), nil
case int16:
return []byte(strconv.FormatInt(int64(v), 10)), nil
case int32:
return []byte(strconv.FormatInt(int64(v), 10)), nil
case int64:
return []byte(strconv.FormatInt(v, 10)), nil
case uint:
return []byte(strconv.FormatUint(uint64(v), 10)), nil
case uint8:
return []byte(strconv.FormatUint(uint64(v), 10)), nil
case uint16:
return []byte(strconv.FormatUint(uint64(v), 10)), nil
case uint32:
return []byte(strconv.FormatUint(uint64(v), 10)), nil
case uint64:
return []byte(strconv.FormatUint(v, 10)), nil
case float32:
return []byte(strconv.FormatFloat(float64(v), 'g', -1, 32)), nil
case float64:
return []byte(strconv.FormatFloat(v, 'g', -1, 64)), nil
case bool:
return []byte(strconv.FormatBool(v)), nil
case encoding.TextMarshaler:
return v.MarshalText()
default:
if encoded, ok := encodeFixedByteArray(value); ok {
return encoded, nil
}
return nil, fmt.Errorf("textCodec: unsupported type %T", value)
}
}
func (textCodec) Decode(data []byte, value any) error {
str := string(data)
switch v := value.(type) {
case *string:
*v = str
return nil
case *int:
i, err := strconv.Atoi(str)
*v = i
return err
case *int8:
i, err := strconv.ParseInt(str, 10, 8)
*v = int8(i)
return err
case *int16:
i, err := strconv.ParseInt(str, 10, 16)
*v = int16(i)
return err
case *int32:
i, err := strconv.ParseInt(str, 10, 32)
*v = int32(i)
return err
case *int64:
i, err := strconv.ParseInt(str, 10, 64)
*v = i
return err
case *uint:
u, err := strconv.ParseUint(str, 10, 0)
*v = uint(u)
return err
case *uint8:
u, err := strconv.ParseUint(str, 10, 8)
*v = uint8(u)
return err
case *uint16:
u, err := strconv.ParseUint(str, 10, 16)
*v = uint16(u)
return err
case *uint32:
u, err := strconv.ParseUint(str, 10, 32)
*v = uint32(u)
return err
case *uint64:
u, err := strconv.ParseUint(str, 10, 64)
*v = u
return err
case *float32:
f, err := strconv.ParseFloat(str, 32)
*v = float32(f)
return err
case *float64:
f, err := strconv.ParseFloat(str, 64)
*v = f
return err
case *bool:
b, err := strconv.ParseBool(str)
*v = b
return err
default:
if u, ok := value.(encoding.TextUnmarshaler); ok {
return u.UnmarshalText(data)
}
if err := decodeFixedByteArray(data, value); err == nil {
return nil
}
return fmt.Errorf("textCodec: unsupported decode target %T", value)
}
}
func encodeFixedByteArray(v any) ([]byte, bool) {
val := reflect.ValueOf(v)
if val.Kind() == reflect.Array && val.Type().Elem().Kind() == reflect.Uint8 {
n := val.Len()
buf := make([]byte, n)
for i := 0; i < n; i++ {
buf[i] = byte(val.Index(i).Uint())
}
return []byte(hex.EncodeToString(buf)), true
}
return nil, false
}
func decodeFixedByteArray(data []byte, out any) error {
val := reflect.ValueOf(out)
if val.Kind() != reflect.Ptr || val.Elem().Kind() != reflect.Array || val.Elem().Type().Elem().Kind() != reflect.Uint8 {
return fmt.Errorf("unsupported decode target: %T", out)
}
arr := val.Elem()
expectedLen := arr.Len()
decoded, err := hex.DecodeString(string(data))
if err != nil {
return fmt.Errorf("hex decode failed: %w", err)
}
if len(decoded) != expectedLen {
return fmt.Errorf("invalid hex length: got %d bytes, want %d", len(decoded), expectedLen)
}
for i := 0; i < expectedLen; i++ {
arr.Index(i).SetUint(uint64(decoded[i]))
}
return nil
}
// Package shelve provides a persistent, map-like object called Shelf. It lets you
// store and retrieve Go objects directly, with the serialization and storage handled
// automatically by the Shelf. Additionally, you can customize the underlying
// key-value storage and serialization codec to better suit your application's needs.
//
// This package is inspired by the `shelve` module from the Python standard library
// and aims to provide a similar set of functionalities.
//
// By default, a Shelf serializes data using the JSON format and stores it using `sdb`
// (for "shelve-db"), a simple key-value storage created for this project. This
// database should be good enough for a broad range of applications, but the modules
// in [go-shelve/driver] provide additional options for configuring the `Shelf` with
// other databases and Codecs.
//
// [go-shelve/driver]: https://pkg.go.dev/github.com/lucmq/go-shelve/driver
package shelve
import (
"encoding"
"fmt"
"reflect"
"github.com/lucmq/go-shelve/sdb"
)
const (
// Asc and Desc can be used with the Shelf.Items method to make the
// iteration order ascending or descending respectively.
//
// They are just syntactic sugar to make the iteration order more
// explicit.
Asc = 1
// Desc is the opposite of Asc.
Desc = -1
// All can be used with the Shelf.Items method to iterate over all
// items in the database. It is the same as the -1 value.
All = -1
)
// Yield is a function called when iterating over key-value pairs in the
// Shelf. If Yield returns false or an error, the iteration stops.
type Yield[K, V any] func(key K, value V) (bool, error)
// A Shelf is a persistent, map-like object. It is used together with an
// underlying key-value storage to store Go objects directly.
//
// Stored values can be of arbitrary types, but the keys must be comparable.
//
// By default, values are encoded using the JSON codec, and keys using the
// TextCodec.
//
// For storage, the underlying database is an instance of the [sdb.DB]
// ("shelve-db") key-value store.
//
// The underlying storage and codec Shelf uses can be configured with the
// [Option] functions.
type Shelf[K comparable, V any] struct {
db DB
codec Codec
keyCodec Codec
}
// Option is passed to the Open function to create a customized Shelf.
type Option func(any)
type options struct {
DB DB
Codec Codec
KeyCodec Codec
}
// WithDatabase specifies the underlying database to use. By default, the
// [sdb.DB] ("shelve-db") key-value storage is used.
//
// The packages in [driver/db] packages provide support for others databases in
// the Go ecosystem, like [Bolt] and [Badger].
//
// [driver/db]: https://pkg.go.dev/github.com/lucmq/go-shelve/driver/db
// [Bolt]: https://pkg.go.dev/github.com/etcd-io/bbolt
// [Badger]: https://pkg.go.dev/github.com/dgraph-io/badger
func WithDatabase(db DB) Option {
return func(v any) {
opt := v.(*options)
opt.DB = db
}
}
// WithCodec specifies the Codec to use. By default, a codec for the JSON format
// is used.
//
// Additional Codecs can be found in the packages in [driver/encoding].
//
// [driver/encoding]: https://pkg.go.dev/github.com/lucmq/go-shelve/driver/encoding
func WithCodec(c Codec) Option {
return func(v any) {
opt := v.(*options)
opt.Codec = c
}
}
// WithKeyCodec specifies the Codec to use for encoding keys.
// By default, keys of type string, boolean, integer (signed or unsigned),
// float, [N]byte arrays (e.g., [12]byte), or types that implement
// [encoding.TextMarshaler] are encoded using [TextCodec].
//
// Additional Codecs can be found in the packages in [driver/encoding].
//
// [driver/encoding]: https://pkg.go.dev/github.com/lucmq/go-shelve/driver/encoding
func WithKeyCodec(c Codec) Option {
return func(v any) {
opt := v.(*options)
opt.KeyCodec = c
}
}
// Open creates a new Shelf.
//
// The path parameter specifies the filesystem path to the database files. It
// can be a directory or a regular file, depending on the underlying database
// implementation. With the default database [sdb.DB], it will point to a
// directory.
func Open[K comparable, V any](path string, opts ...Option) (
*Shelf[K, V],
error,
) {
var k K
keyCodec, err := defaultKeyCodec(k)
if err != nil {
return nil, err
}
o := options{
Codec: JSONCodec(),
KeyCodec: keyCodec,
}
for _, option := range opts {
option(&o)
}
if o.DB == nil {
db, err := sdb.Open(path)
if err != nil {
return nil, fmt.Errorf("open db: %w", err)
}
o.DB = db
}
return &Shelf[K, V]{
db: o.DB,
codec: o.Codec,
keyCodec: o.KeyCodec,
}, nil
}
// Close synchronizes and closes the Shelf.
func (s *Shelf[K, V]) Close() error {
return s.db.Close()
}
// Len returns the number of items in the Shelf. It returns the number
// of items as an int64. If an error occurs, it returns -1.
func (s *Shelf[K, V]) Len() int64 {
return s.db.Len()
}
// Sync synchronizes the Shelf contents to persistent storage.
func (s *Shelf[K, V]) Sync() error {
return s.db.Sync()
}
// Has reports whether a key exists in the Shelf.
func (s *Shelf[K, V]) Has(key K) (bool, error) {
data, err := s.keyCodec.Encode(key)
if err != nil {
return false, fmt.Errorf("encode: %w", err)
}
ok, err := s.db.Has(data)
if err != nil {
return false, fmt.Errorf("has: %w", err)
}
return ok, nil
}
// Get retrieves the value associated with a key from the Shelf. If the key is
// not found, it returns nil.
func (s *Shelf[K, V]) Get(key K) (value V, ok bool, err error) {
data, err := s.keyCodec.Encode(key)
if err != nil {
return *new(V), false, fmt.Errorf("encode: %w", err)
}
vData, err := s.db.Get(data)
if err != nil {
return *new(V), false, fmt.Errorf("get: %w", err)
}
if vData == nil {
return *new(V), false, nil
}
var v V
err = s.codec.Decode(vData, &v)
return v, true, err
}
// Put adds a key-value pair to the Shelf. If the key already exists, it
// overwrites the existing value.
func (s *Shelf[K, V]) Put(key K, value V) error {
data, err := s.keyCodec.Encode(key)
if err != nil {
return fmt.Errorf("encode key: %w", err)
}
vData, err := s.codec.Encode(value)
if err != nil {
return fmt.Errorf("encode value: %w", err)
}
err = s.db.Put(data, vData)
if err != nil {
return fmt.Errorf("put: %w", err)
}
return nil
}
// Delete removes a key-value pair from the Shelf.
func (s *Shelf[K, V]) Delete(key K) error {
data, err := s.keyCodec.Encode(key)
if err != nil {
return fmt.Errorf("encode: %w", err)
}
err = s.db.Delete(data)
if err != nil {
return fmt.Errorf("delete: %w", err)
}
return nil
}
// Items iterates over key-value pairs in the Shelf, calling fn(k, v) for each
// pair in the sequence. The iteration stops early if the function fn returns
// false.
//
// The start parameter specifies the key from which the iteration should start.
// If the start parameter is nil, the iteration will begin from the first key
// in the Shelf.
//
// The n parameter specifies the maximum number of items to iterate over. If n
// is -1 or less, all items will be iterated.
//
// The step parameter specifies the number of items to skip between each
// iteration. A negative value for step will cause the iteration to occur in
// reverse order.
//
// When iterating over key-value pairs in a Shelf, the order of iteration may
// not be sorted. Some database implementations may ignore the start parameter
// or not support iteration in reverse order.
//
// The default database used with Shelf (sdb.DB) does not yield items in any
// particular order and ignores the start parameter.
func (s *Shelf[K, V]) Items(start *K, n, step int, fn Yield[K, V]) error {
dbFn := func(k, v []byte) (bool, error) {
var key K
var value V
err := s.keyCodec.Decode(k, &key)
if err != nil {
return false, fmt.Errorf("decode key: %w", err)
}
if len(v) != 0 {
err = s.codec.Decode(v, &value)
if err != nil {
return false, fmt.Errorf("decode value: %w", err)
}
}
return fn(key, value)
}
return s.iterate(start, n, step, dbFn)
}
// Keys iterates over all keys in the Shelf and calls the user-provided
// function fn for each key. The details of the iteration are the same as
// for [Shelf.Items].
//
// The value parameter for fn will always be the zero value for the type V.
func (s *Shelf[K, V]) Keys(start *K, n, step int, fn Yield[K, V]) error {
dbFn := func(k, _ []byte) (bool, error) {
var key K
var zero V
err := s.keyCodec.Decode(k, &key)
if err != nil {
return false, fmt.Errorf("decode: %w", err)
}
return fn(key, zero)
}
return s.iterate(start, n, step, dbFn)
}
// Values iterates over all values in the Shelf and calls the user-provided
// function fn for each value. The details of the iteration are the same as
// for [Shelf.Items].
//
// The key parameter for fn will always be the zero value for the type K.
func (s *Shelf[K, V]) Values(start *K, n, step int, fn Yield[K, V]) error {
dbFn := func(_, v []byte) (bool, error) {
var zero K
var value V
err := s.codec.Decode(v, &value)
if err != nil {
return false, fmt.Errorf("decode: %w", err)
}
return fn(zero, value)
}
return s.iterate(start, n, step, dbFn)
}
func (s *Shelf[K, V]) iterate(
start *K,
n, step int,
fn func(k, v []byte) (bool, error),
) error {
var from []byte = nil
var err error
if start != nil {
from, err = s.keyCodec.Encode(*start)
if err != nil {
return fmt.Errorf("encode start: %w", err)
}
}
var order int
if step > 0 {
order = Asc
} else if step < 0 {
order = Desc
step = -step
} else {
return nil
}
var total int
var counter = step - 1 // 0, 1, ..., step - 1
return s.db.Items(from, order, func(k, v []byte) (bool, error) {
if n > 0 && total >= n {
return false, nil
}
// Increase counter until the step is reached
if counter < step-1 {
counter++
return true, nil
}
counter = 0
total++
return fn(k, v)
})
}
// Helpers
func defaultKeyCodec(key any) (Codec, error) {
switch key.(type) {
case string,
bool,
int, int8, int16, int32, int64,
uint, uint8, uint16, uint32, uint64,
float32, float64:
return TextCodec(), nil
default:
// Support TextMarshaler types
if _, ok := key.(encoding.TextMarshaler); ok {
return TextCodec(), nil
}
// Handle [N]byte arrays via reflection
val := reflect.ValueOf(key)
if val.Kind() == reflect.Array && val.Type().Elem().Kind() == reflect.Uint8 {
return TextCodec(), nil
}
return nil, fmt.Errorf("unsupported key type %T: must explicitly set a key codec", key)
}
}