package based
import "time"
// Clock is a simple clock interface.
type Clock interface {
// Now returns the current time according to this Clock.
Now() time.Time
}
// ClockFunc is a functional Clock adapter.
type ClockFunc func() time.Time
func (fn ClockFunc) Now() time.Time {
return fn()
}
// StandardClock provides the current local time using time.Now.
var StandardClock Clock = ClockFunc(time.Now)
package based
import "context"
// Goroutine describes goroutine handle.
type Goroutine interface {
// Cancel signals cancellation to the goroutine.
Cancel()
// Join waits for goroutine to finish.
Join(ctx context.Context) error
}
type goroutine struct {
cancel context.CancelFunc
join Ref[Unit]
}
func (gr *goroutine) Cancel() {
gr.cancel()
}
func (gr *goroutine) Join(ctx context.Context) error {
_, err := gr.join.Get(ctx)
return err
}
// Go starts the provided function in a new goroutine and returns the Goroutine handle.
func Go(ctx context.Context, fn func(ctx context.Context)) Goroutine {
ctx, cancel := context.WithCancel(ctx)
join := FutureFuncRef[Unit](ctx, func(ctx context.Context) (_ Unit, _ error) {
defer cancel()
fn(ctx)
return
})
return &goroutine{
cancel: cancel,
join: join,
}
}
package based
import (
"context"
"math"
"sync"
"time"
)
// Locker provides synchronization semantics similar to mutex.
// Implementations must provide interruptibility and reentrant locking.
type Locker interface {
// Lock acquires the lock.
// It returns a new Context which should be checked for error before proceeding with the execution.
// Context scope must be limited to holding the lock.
// CancelFunc must not be nil and is used in defer statement to release the lock or perform other cleanup,
// even in cases of context errors.
//
// Example usage:
// ctx, cancel := mu.Lock(ctx)
// defer cancel()
// if err := ctx.Err(); err != nil {
// return err
// }
Lock(ctx context.Context) (context.Context, context.CancelFunc)
}
// LockerFunc is a functional Locker adapter.
type LockerFunc func(ctx context.Context) (context.Context, context.CancelFunc)
func (fn LockerFunc) Lock(ctx context.Context) (context.Context, context.CancelFunc) {
return fn(ctx)
}
// Lockers provides a way to simultaneously acquire and release multiple locks.
type Lockers []Locker
func (ls Lockers) Lock(ctx context.Context) (context.Context, context.CancelFunc) {
var (
cancels []context.CancelFunc
cancel context.CancelFunc
)
for _, l := range ls {
ctx, cancel = l.Lock(ctx)
cancels = append(cancels, cancel)
if ctx.Err() != nil {
break
}
}
return ctx, func() {
for i := len(cancels) - 1; i >= 0; i-- {
cancels[i]()
}
}
}
type semaphore struct {
clock Clock
interval time.Duration
c chan time.Time
}
// Semaphore allows at most `size` events at given time interval.
// This becomes a classic semaphore when interval = 0, and a simple mutex when size = 1.
func Semaphore(clock Clock, size int, interval time.Duration) Locker {
if size <= 0 {
return Unlocker
}
c := make(chan time.Time, size)
for i := 0; i < size; i++ {
c <- clock.Now().Add(-interval)
}
return &semaphore{
clock: clock,
interval: interval,
c: c,
}
}
func (s *semaphore) Lock(ctx context.Context) (context.Context, context.CancelFunc) {
return ReentrantLock(ctx, s, 1, s.doLock)
}
func (s *semaphore) doLock(ctx context.Context) (context.Context, context.CancelFunc) {
select {
case occ := <-s.c:
if s.interval > 0 {
wait := occ.Add(s.interval).Sub(s.clock.Now())
if wait > 0 {
select {
case <-time.After(wait):
case <-ctx.Done():
return ctx, Nop
}
}
}
return ctx, func() { s.c <- s.clock.Now() }
case <-ctx.Done():
return ctx, Nop
}
}
// RWMutex is a sync.RWMutex implementation.
type RWMutex struct {
w chan bool
r chan int
once sync.Once
}
func (m *RWMutex) init() {
m.w = make(chan bool, 1)
m.r = make(chan int, 1)
}
func (m *RWMutex) Lock(ctx context.Context) (context.Context, context.CancelFunc) {
return ReentrantLock(ctx, m, 1, m.doLock)
}
func (m *RWMutex) RLock(ctx context.Context) (context.Context, context.CancelFunc) {
return ReentrantLock(ctx, m, 2, m.doRLock)
}
func (m *RWMutex) doLock(ctx context.Context) (context.Context, context.CancelFunc) {
m.once.Do(m.init)
select {
case m.w <- true:
return ctx, func() { <-m.w }
case <-ctx.Done():
return ctx, Nop
}
}
func (m *RWMutex) doRLock(ctx context.Context) (context.Context, context.CancelFunc) {
m.once.Do(m.init)
var rs int
select {
case m.w <- true:
case rs = <-m.r:
case <-ctx.Done():
return ctx, Nop
}
rs++
m.r <- rs
return ctx, func() {
rs := <-m.r
rs--
if rs == 0 {
<-m.w
} else {
m.r <- rs
}
}
}
// Unlocker does nothing.
var Unlocker LockerFunc = func(ctx context.Context) (context.Context, context.CancelFunc) {
return ctx, Nop
}
// ReentrantLock will check context to see if the lock with required level or higher is already acquired.
// If not, it will call provided LockerFunc.
func ReentrantLock(ctx context.Context, key any, requiredLevel uint, lock LockerFunc) (context.Context, context.CancelFunc) {
contextLevel := uint(math.MaxUint)
if level, ok := ctx.Value(key).(uint); ok {
contextLevel = level
}
if contextLevel <= requiredLevel {
return ctx, Nop
}
ctx, cancel := lock(ctx)
if ctx.Err() != nil {
return ctx, Nop
}
var once sync.Once
return context.WithValue(ctx, key, requiredLevel), func() { once.Do(cancel) }
}
package based
import (
"context"
"sync"
"sync/atomic"
"github.com/pkg/errors"
)
// Ref references a value which may require calculation.
type Ref[T any] interface {
Get(ctx context.Context) (T, error)
}
type FuncRef[T any] func(ctx context.Context) (T, error)
func (r FuncRef[T]) Get(ctx context.Context) (T, error) {
return r(ctx)
}
type safeRef[T any] struct {
ref Ref[T]
}
func (r *safeRef[T]) Get(ctx context.Context) (result T, err error) {
defer func() {
if r := recover(); r != nil {
if r, ok := r.(error); ok {
err = r
return
}
err = errors.Errorf("panic: %v", r)
}
}()
if r.ref == nil {
err = errors.New("ref is nil")
return
}
result, err = r.ref.Get(ctx)
return
}
// SafeRef calls Ref with panic handling and other safeguards.
func SafeRef[T any](ref Ref[T]) Ref[T] {
if ref, ok := ref.(*safeRef[T]); ok {
return ref
}
return &safeRef[T]{
ref: ref,
}
}
func SafeFuncRef[T any](ref Ref[T]) Ref[T] {
return SafeRef[T](ref)
}
type lazyRef[T any] struct {
ref Ref[T]
result T
err error
mu RWMutex
start sync.WaitGroup
started atomic.Bool
done atomic.Bool
}
func (r *lazyRef[T]) Get(ctx context.Context) (T, error) {
var zero T
if !r.done.Load() {
if r.started.CompareAndSwap(false, true) {
ctx, cancel := r.mu.Lock(ctx)
defer cancel()
r.start.Done()
if r.err = ctx.Err(); r.err != nil {
return zero, r.err
}
r.result, r.err = SafeRef(r.ref).Get(ctx)
r.ref = nil
r.done.Store(true)
} else {
r.start.Wait()
ctx, cancel := r.mu.RLock(ctx)
defer cancel()
if err := ctx.Err(); err != nil {
return zero, err
}
}
}
return r.result, r.err
}
func (r *lazyRef[T]) Close() error {
_, cancel := r.mu.RLock(context.Background())
defer cancel()
return Close(r.result)
}
// LazyRef creates a Ref with on-demand execution of the provided Ref.
// The result of the first execution will be cached and returned immediately for all consequent calls.
func LazyRef[T any](ref Ref[T]) Ref[T] {
r := &lazyRef[T]{ref: ref}
r.start.Add(1)
return r
}
func LazyFuncRef[T any](ref FuncRef[T]) Ref[T] {
return LazyRef[T](ref)
}
type futureRef[T any] struct {
result T
err error
mu RWMutex
done atomic.Bool
}
func (r *futureRef[T]) Get(ctx context.Context) (T, error) {
var zero T
if !r.done.Load() {
ctx, cancel := r.mu.RLock(ctx)
defer cancel()
if err := ctx.Err(); err != nil {
return zero, err
}
}
return r.result, r.err
}
func (r *futureRef[T]) Close() error {
_, cancel := r.mu.RLock(context.Background())
defer cancel()
return Close(r.result)
}
// FutureRef represents a result of an asynchronous computation that may or may not be available yet.
func FutureRef[T any](ctx context.Context, ref Ref[T]) Ref[T] {
r := new(futureRef[T])
ctx, cancel := r.mu.Lock(ctx)
go func() {
defer cancel()
if r.err = ctx.Err(); r.err != nil {
return
}
r.result, r.err = SafeRef(ref).Get(ctx)
r.done.Store(true)
}()
return r
}
func FutureFuncRef[T any](ctx context.Context, ref FuncRef[T]) Ref[T] {
return FutureRef[T](ctx, ref)
}
package based
import (
"context"
"os"
"os/signal"
)
// AwaitSignal waits for specified signals.
func AwaitSignal(ctx context.Context, signals ...os.Signal) error {
c := make(chan os.Signal, 1)
go signal.Notify(c, signals...)
select {
case <-c:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
package based
import (
"io"
"github.com/go-playground/validator/v10"
)
// Validate is a default shared validator.Validate instance.
var validate = validator.New()
func Validate(value any) error {
return validate.Struct(value)
}
// Unit represents an empty type, similar to Unit in Scala.
type Unit struct{}
// Nop is an empty function.
func Nop() {}
func Close(value any) error {
if closer, ok := value.(io.Closer); ok {
return closer.Close()
}
return nil
}
package based
import (
"context"
"github.com/pkg/errors"
)
// WriteThroughCacheStorage defines an interface for the underlying storage of WriteThroughCache.
type WriteThroughCacheStorage[K comparable, V comparable] interface {
// Load loads the value from storage.
Load(ctx context.Context, key K) (V, error)
// Update persists the value in storage.
Update(ctx context.Context, key K, value V) error
}
// WriteThroughCacheStorageFunc provides a functional adapter for WriteThroughCacheStorage.
type WriteThroughCacheStorageFunc[K comparable, V comparable] struct {
LoadFn func(ctx context.Context, key K) (V, error)
UpdateFn func(ctx context.Context, key K, value V) error
}
func (f WriteThroughCacheStorageFunc[K, V]) Load(ctx context.Context, key K) (V, error) {
return f.LoadFn(ctx, key)
}
func (f WriteThroughCacheStorageFunc[K, V]) Update(ctx context.Context, key K, value V) error {
return f.UpdateFn(ctx, key, value)
}
// WriteThroughCache provides a simple write-through cache implementation.
type WriteThroughCache[K comparable, V comparable] struct {
storage WriteThroughCacheStorage[K, V]
values map[K]V
mu RWMutex
}
// NewWriteThroughCache creates a WriteThroughCache instance.
func NewWriteThroughCache[K comparable, V comparable](storage WriteThroughCacheStorage[K, V]) *WriteThroughCache[K, V] {
return &WriteThroughCache[K, V]{
storage: storage,
values: make(map[K]V),
}
}
// Update updates the value in cache and underlying storage.
func (c *WriteThroughCache[K, V]) Update(ctx context.Context, key K, value V) error {
ctx, cancel := c.mu.Lock(ctx)
defer cancel()
if err := ctx.Err(); err != nil {
return err
}
if err := c.storage.Update(ctx, key, value); err != nil {
return errors.Wrap(err, "update value in storage")
}
c.values[key] = value
return nil
}
// Get attempts to retrieve the value from cache. If no cached value is present,
// it will be retrieved from underlying storage. If the value retrieved from storage
// is not zero, it will be cached.
func (c *WriteThroughCache[K, V]) Get(ctx context.Context, key K) (V, error) {
var zero V
if value, err := c.getFromCache(ctx, key); value != zero || err != nil {
return value, err
}
ctx, cancel := c.mu.Lock(ctx)
defer cancel()
if err := ctx.Err(); err != nil {
return zero, err
}
if value, ok := c.values[key]; ok {
return value, nil
}
value, err := c.storage.Load(ctx, key)
if err == nil && value != zero {
c.values[key] = value
}
return value, errors.Wrap(err, "get value from storage")
}
func (c *WriteThroughCache[K, V]) getFromCache(ctx context.Context, key K) (V, error) {
var zero V
ctx, cancel := c.mu.RLock(ctx)
defer cancel()
if err := ctx.Err(); err != nil {
return zero, err
}
if value, ok := c.values[key]; ok {
return value, nil
}
return zero, nil
}
// WriteThroughCached provides write-through cache semantics for a single value.
type WriteThroughCached[V comparable] struct {
getFn func(ctx context.Context) (V, error)
updateFn func(ctx context.Context, value V) error
}
// NewWriteThroughCached creates a WriteThroughCached instance.
func NewWriteThroughCached[K comparable, V comparable](storage WriteThroughCacheStorage[K, V], key K) *WriteThroughCached[V] {
cache := NewWriteThroughCache[K, V](storage)
return &WriteThroughCached[V]{
getFn: func(ctx context.Context) (V, error) { return cache.Get(ctx, key) },
updateFn: func(ctx context.Context, value V) error { return cache.Update(ctx, key, value) },
}
}
// Get attempts to retrieve the value from cache. If no cached value is present,
// it will be retrieved from underlying storage. If the value retrieved from storage
// is not zero, it will be cached.
func (c *WriteThroughCached[V]) Get(ctx context.Context) (V, error) {
return c.getFn(ctx)
}
// Update updates the value in cache and underlying storage.
func (c *WriteThroughCached[V]) Update(ctx context.Context, value V) error {
return c.updateFn(ctx, value)
}