package eventbus
import (
"context"
"fmt"
"hash/fnv"
"reflect"
"sync"
"sync/atomic"
"time"
)
// Handler is a generic event handler function
type Handler[T any] func(T)
// ContextHandler is a generic event handler function that accepts context
type ContextHandler[T any] func(context.Context, T)
// SubscribeOption configures a subscription
type SubscribeOption func(*internalHandler)
// internalHandler wraps a handler with metadata
type internalHandler struct {
handler any
handlerType reflect.Type
eventType reflect.Type
once bool
async bool
sequential bool
filter any // Predicate function for filtering events
mu sync.Mutex
executed uint32 // For once handlers, atomically tracks if executed
// replayErrorPolicy controls how SubscribeWithReplay treats stored
// events that cannot be decoded; it has no effect on live delivery.
replayErrorPolicy ReplayErrorPolicy
}
// PanicHandler is called when a handler panics
type PanicHandler func(event any, handlerType reflect.Type, panicValue any)
// PersistenceErrorHandler is called when event persistence fails
type PersistenceErrorHandler func(event any, eventType reflect.Type, err error)
// PublishHook is called when an event is published
type PublishHook func(eventType reflect.Type, event any)
// PublishHookContext is called when an event is published (with context)
type PublishHookContext func(ctx context.Context, eventType reflect.Type, event any)
const numShards = 32 // Power of 2 for efficient modulo
// shard represents a single shard with its own mutex
type shard struct {
mu sync.RWMutex
handlers map[reflect.Type][]*internalHandler
}
// EventBus is a high-performance event bus with sharded locks
type EventBus struct {
shards [numShards]*shard
panicHandler PanicHandler
beforePublish PublishHook
afterPublish PublishHook
beforePublishCtx PublishHookContext
afterPublishCtx PublishHookContext
// Async handler tracking. A plain sync.WaitGroup would be racy here:
// Publish (Add) may legally run concurrently with Wait/Shutdown, which
// WaitGroup forbids when the counter is at zero.
asyncMu sync.Mutex
asyncCond *sync.Cond
asyncCount int
// asyncSem, when non-nil, bounds the number of concurrently running
// async handler goroutines (see WithAsyncHandlerLimit).
asyncSem chan struct{}
// Optional persistence fields (nil if not using persistence)
store EventStore
subscriptionStore SubscriptionStore
persistenceErrorHandler PersistenceErrorHandler
persistenceTimeout time.Duration
replayBatchSize int // Batch size for Replay (default: 100)
// Upcast registry for event migration
upcastRegistry *upcastRegistry
// Optional observability (metrics & tracing)
observability Observability
}
// TypeNamer is an optional interface that events can implement to provide
// their own type name. This gives explicit control over event type naming,
// which is useful for:
// - Stable type names across package refactoring
// - Custom versioning schemes (e.g., "UserCreated.v2")
// - Compatibility with external event stores
//
// If an event implements TypeNamer, EventType() will use the provided name
// instead of the reflection-based package-qualified name.
//
// Example:
//
// type UserCreatedEvent struct {
// UserID string
// }
//
// func (e UserCreatedEvent) EventTypeName() string {
// return "user.created.v1"
// }
type TypeNamer interface {
EventTypeName() string
}
// EventType returns the type name of an event.
// If the event implements TypeNamer, it returns the custom name.
// Otherwise, it returns the reflection-based package-qualified name.
//
// This is useful for comparing with StoredEvent.Type during replay.
//
// Example with reflection (default):
//
// eventType := EventType(MyEvent{})
// // Returns: "github.com/mypackage/MyEvent"
//
// Example with TypeNamer:
//
// type MyEvent struct{}
// func (e MyEvent) EventTypeName() string { return "my-event.v1" }
// eventType := EventType(MyEvent{})
// // Returns: "my-event.v1"
//
// Usage in replay:
//
// bus.Replay(ctx, 0, func(event *StoredEvent) error {
// if event.Type == EventType(MyEvent{}) {
// // Process MyEvent
// }
// return nil
// })
func EventType(event any) string {
// Check if event implements TypeNamer
if namer, ok := event.(TypeNamer); ok {
return namer.EventTypeName()
}
// Fall back to reflection-based name
return reflect.TypeOf(event).String()
}
// Observability is an optional interface for metrics and tracing.
// Implementations can track event publishing, handler execution, and errors.
//
// This interface is designed to be zero-cost when not used - if no
// observability is configured, there is no performance overhead.
//
// The context returned from each method can be used to propagate trace
// spans and other context-specific data through the event processing pipeline.
//
// Example implementation: see github.com/jilio/ebu/otel package for
// OpenTelemetry integration.
type Observability interface {
// OnPublishStart is called when an event is about to be published.
// Returns a context that will be passed to handlers and subsequent hooks.
// The event parameter allows implementations to extract custom attributes.
OnPublishStart(ctx context.Context, eventType string, event any) context.Context
// OnPublishComplete is called after all synchronous handlers complete.
// Note: This is called before async handlers complete.
OnPublishComplete(ctx context.Context, eventType string)
// OnHandlerStart is called before a handler executes.
// Returns a context for the handler execution.
OnHandlerStart(ctx context.Context, eventType string, async bool) context.Context
// OnHandlerComplete is called after a handler completes.
// The error parameter is non-nil if the handler panicked.
OnHandlerComplete(ctx context.Context, eventType string, duration time.Duration, err error)
// OnPersistStart is called before persisting an event. The assigned
// offset is not known yet at this point; it is reported to
// OnPersistComplete instead.
OnPersistStart(ctx context.Context, eventType string) context.Context
// OnPersistComplete is called after persisting an event. On success,
// offset is the offset the store assigned; on failure it is empty and
// err is non-nil.
OnPersistComplete(ctx context.Context, eventType string, duration time.Duration, offset Offset, err error)
}
// Option is a function that configures the EventBus
type Option func(*EventBus)
// New creates a new EventBus with sharded locks for better performance
func New(opts ...Option) *EventBus {
bus := &EventBus{
upcastRegistry: newUpcastRegistry(),
}
bus.asyncCond = sync.NewCond(&bus.asyncMu)
// Initialize shards
for i := 0; i < numShards; i++ {
bus.shards[i] = &shard{
handlers: make(map[reflect.Type][]*internalHandler),
}
}
// Apply options
for _, opt := range opts {
opt(bus)
}
return bus
}
// getShard returns the shard for a given event type using FNV hash
func (bus *EventBus) getShard(eventType reflect.Type) *shard {
h := fnv.New32a()
h.Write([]byte(eventType.String()))
shardIndex := h.Sum32() & (numShards - 1) // Fast modulo for power of 2
return bus.shards[shardIndex]
}
// buildHandler is the single validation chokepoint for every subscription
// entry point: it constructs the internalHandler, applies each option exactly
// once, and runs all subscription-time validation (event type, options,
// filter). New entry points must go through it so no check can be missed.
func buildHandler(handler any, eventType reflect.Type, opts []SubscribeOption) (*internalHandler, error) {
if err := validateEventType(eventType); err != nil {
return nil, err
}
h := &internalHandler{
handler: handler,
handlerType: reflect.TypeOf(handler),
eventType: eventType,
}
for _, opt := range opts {
if opt == nil {
return nil, fmt.Errorf("eventbus: subscribe option cannot be nil")
}
opt(h)
}
if err := validateFilter(h, eventType); err != nil {
return nil, err
}
return h, nil
}
// addHandler registers a validated handler in the shard for its event type.
func (bus *EventBus) addHandler(eventType reflect.Type, h *internalHandler) {
shard := bus.getShard(eventType)
shard.mu.Lock()
shard.handlers[eventType] = append(shard.handlers[eventType], h)
shard.mu.Unlock()
}
// Subscribe registers a handler for events of type T.
//
// T must be a concrete type. Interface types are rejected with an error:
// Publish routes events by their dynamic (concrete) type, so a handler
// registered under an interface type could never receive an event — even
// one published through a variable of that interface type.
func Subscribe[T any](bus *EventBus, handler Handler[T], opts ...SubscribeOption) error {
if bus == nil {
return fmt.Errorf("eventbus: bus cannot be nil")
}
if handler == nil {
return fmt.Errorf("eventbus: handler cannot be nil")
}
eventType := reflect.TypeOf((*T)(nil)).Elem()
h, err := buildHandler(handler, eventType, opts)
if err != nil {
return err
}
bus.addHandler(eventType, h)
return nil
}
// SubscribeContext registers a context-aware handler for events of type T.
//
// T must be a concrete type; interface types are rejected (see Subscribe).
func SubscribeContext[T any](bus *EventBus, handler ContextHandler[T], opts ...SubscribeOption) error {
if bus == nil {
return fmt.Errorf("eventbus: bus cannot be nil")
}
if handler == nil {
return fmt.Errorf("eventbus: handler cannot be nil")
}
eventType := reflect.TypeOf((*T)(nil)).Elem()
h, err := buildHandler(handler, eventType, opts)
if err != nil {
return err
}
bus.addHandler(eventType, h)
return nil
}
// validateEventType rejects event types that Publish could never route to.
// Publish looks handlers up by the event's dynamic (concrete) type, so a
// subscription registered under an interface type would be silently dead.
func validateEventType(eventType reflect.Type) error {
if eventType.Kind() == reflect.Interface {
return fmt.Errorf("eventbus: cannot subscribe to interface type %s: events are routed by their concrete type, so an interface subscription would never receive events; subscribe to each concrete event type instead", eventType)
}
return nil
}
// validateFilter ensures a WithFilter predicate matches the subscribed event type.
// Without this check, a mismatched predicate (e.g. WithFilter[U] on Subscribe[T])
// would silently never fire and the handler would receive all events unfiltered.
func validateFilter(h *internalHandler, eventType reflect.Type) error {
if h.filter == nil {
return nil
}
ft := reflect.TypeOf(h.filter)
if ft.Kind() != reflect.Func || ft.NumIn() != 1 || ft.In(0) != eventType {
return fmt.Errorf("eventbus: filter predicate type %s does not match event type %s", ft, eventType)
}
return nil
}
// Unsubscribe removes a handler for events of type T.
//
// Handlers are matched by function code pointer. This has two known
// limitations inherent to Go:
// - Two closures created from the same function literal share a code
// pointer and are indistinguishable; the first registered one is removed.
// - Method values on different receivers share a code pointer.
//
// If you need precise unsubscription of closures, keep a reference to the
// exact handler you registered and unsubscribe with it, or use Clear[T].
func Unsubscribe[T any, H any](bus *EventBus, handler H) error {
if bus == nil {
return fmt.Errorf("eventbus: bus cannot be nil")
}
eventType := reflect.TypeOf((*T)(nil)).Elem()
handlerPtr := reflect.ValueOf(handler).Pointer()
shard := bus.getShard(eventType)
shard.mu.Lock()
defer shard.mu.Unlock()
handlers := shard.handlers[eventType]
for i, h := range handlers {
if reflect.ValueOf(h.handler).Pointer() == handlerPtr {
// Remove handler efficiently
shard.handlers[eventType] = append(handlers[:i], handlers[i+1:]...)
return nil
}
}
return fmt.Errorf("handler not found")
}
// Publish publishes an event to all registered handlers.
// It panics if bus is nil.
func Publish[T any](bus *EventBus, event T) {
PublishContext(bus, context.Background(), event)
}
// PublishContext publishes an event with context to all registered handlers.
// It panics if bus is nil.
//
// When the bus has a store configured (WithStore), the event is persisted
// before handlers run. Persistence is best-effort: on failure the event is
// still delivered to handlers and the error is reported to the
// PersistenceErrorHandler (see WithPersistenceErrorHandler).
func PublishContext[T any](bus *EventBus, ctx context.Context, event T) {
if bus == nil {
panic("eventbus: Publish called with nil bus")
}
eventType := reflect.TypeOf(event)
eventTypeName := EventType(event)
// Observability: Track publish start
if bus.observability != nil {
ctx = bus.observability.OnPublishStart(ctx, eventTypeName, event)
}
// Call before publish hooks
if bus.beforePublish != nil {
bus.beforePublish(eventType, event)
}
if bus.beforePublishCtx != nil {
bus.beforePublishCtx(ctx, eventType, event)
}
// Persist the event before handlers run. On success the assigned offset
// is attached to ctx and can be retrieved with OffsetFromContext.
if bus.store != nil {
ctx = bus.persistEvent(ctx, eventType, event)
}
// Get handlers from appropriate shard
shard := bus.getShard(eventType)
shard.mu.RLock()
handlers := shard.handlers[eventType]
// Create a copy to avoid holding the lock during handler execution
handlersCopy := make([]*internalHandler, len(handlers))
copy(handlersCopy, handlers)
shard.mu.RUnlock()
// Execute handlers without holding the lock
var onceHandlersToRemove []*internalHandler
for _, h := range handlersCopy {
// Check context cancellation before doing anything with the handler.
// This must happen before the Once CAS so a cancelled publish never
// consumes a once-handler without executing it.
select {
case <-ctx.Done():
continue
default:
}
// Check filter if present. The predicate type is validated at
// Subscribe time; if the assertion still fails, fail closed.
if h.filter != nil {
filterFunc, ok := h.filter.(func(T) bool)
if !ok || !filterFunc(event) {
continue // Skip this handler as event doesn't match filter
}
}
// For once handlers, use CompareAndSwap to ensure atomic execution
if h.once {
if !atomic.CompareAndSwapUint32(&h.executed, 0, 1) {
continue // Already executed
}
// Mark for removal after execution
onceHandlersToRemove = append(onceHandlersToRemove, h)
}
if h.async {
// Bounded async concurrency: acquire a slot before spawning.
// This intentionally blocks the publisher when the limit is
// reached so a publish spike cannot create an unbounded number
// of goroutines (see WithAsyncHandlerLimit).
if bus.asyncSem != nil {
bus.asyncSem <- struct{}{}
}
bus.asyncStarted()
go func(handler *internalHandler) {
defer bus.asyncFinished()
if bus.asyncSem != nil {
defer func() { <-bus.asyncSem }()
}
// Once handlers always run: their execution slot was already
// consumed by the CAS above, so skipping here would silently
// drop them forever. Other handlers re-check the context.
if !handler.once {
select {
case <-ctx.Done():
return
default:
}
}
callHandlerWithContext(handler, ctx, event, bus.panicHandler, bus.observability, eventTypeName, true)
}(h)
} else {
callHandlerWithContext(h, ctx, event, bus.panicHandler, bus.observability, eventTypeName, false)
}
}
// Remove once handlers that were executed
if len(onceHandlersToRemove) > 0 {
shard.mu.Lock()
handlers := shard.handlers[eventType]
for _, onceHandler := range onceHandlersToRemove {
for i, h := range handlers {
if h == onceHandler {
handlers = append(handlers[:i], handlers[i+1:]...)
break
}
}
}
shard.handlers[eventType] = handlers
shard.mu.Unlock()
}
// For async handlers, we don't wait inline to avoid blocking
// Users can call bus.Wait() if they need to wait for completion
// Call after publish hooks
if bus.afterPublish != nil {
bus.afterPublish(eventType, event)
}
if bus.afterPublishCtx != nil {
bus.afterPublishCtx(ctx, eventType, event)
}
// Observability: Track publish complete (sync handlers done)
if bus.observability != nil {
bus.observability.OnPublishComplete(ctx, eventTypeName)
}
}
// Clear removes all handlers for events of type T
func Clear[T any](bus *EventBus) {
eventType := reflect.TypeOf((*T)(nil)).Elem()
shard := bus.getShard(eventType)
shard.mu.Lock()
delete(shard.handlers, eventType)
shard.mu.Unlock()
}
// ClearAll removes all handlers for all event types
func ClearAll(bus *EventBus) {
for i := 0; i < numShards; i++ {
bus.shards[i].mu.Lock()
bus.shards[i].handlers = make(map[reflect.Type][]*internalHandler)
bus.shards[i].mu.Unlock()
}
}
// HasHandlers checks if there are handlers for events of type T
func HasHandlers[T any](bus *EventBus) bool {
eventType := reflect.TypeOf((*T)(nil)).Elem()
shard := bus.getShard(eventType)
shard.mu.RLock()
defer shard.mu.RUnlock()
return len(shard.handlers[eventType]) > 0
}
// HandlerCount returns the number of handlers for events of type T
func HandlerCount[T any](bus *EventBus) int {
eventType := reflect.TypeOf((*T)(nil)).Elem()
shard := bus.getShard(eventType)
shard.mu.RLock()
defer shard.mu.RUnlock()
return len(shard.handlers[eventType])
}
// asyncStarted records the start of an async handler goroutine.
func (bus *EventBus) asyncStarted() {
bus.asyncMu.Lock()
bus.asyncCount++
bus.asyncMu.Unlock()
}
// asyncFinished records the completion of an async handler goroutine and
// wakes Wait callers when the last one finishes.
func (bus *EventBus) asyncFinished() {
bus.asyncMu.Lock()
bus.asyncCount--
if bus.asyncCount == 0 {
bus.asyncCond.Broadcast()
}
bus.asyncMu.Unlock()
}
// Wait blocks until all async handlers complete.
// It is safe to call concurrently with Publish.
func (bus *EventBus) Wait() {
bus.asyncMu.Lock()
for bus.asyncCount > 0 {
bus.asyncCond.Wait()
}
bus.asyncMu.Unlock()
}
// callHandlerWithContext calls a handler with proper type checking and panic recovery
func callHandlerWithContext[T any](h *internalHandler, ctx context.Context, event T, panicHandler PanicHandler, obs Observability, eventTypeName string, async bool) {
start := time.Now()
var panicErr error
defer func() {
duration := time.Since(start)
if r := recover(); r != nil {
panicErr = fmt.Errorf("handler panic: %v", r)
if panicHandler != nil {
panicHandler(event, h.handlerType, r)
}
}
// Observability: Track handler complete
if obs != nil {
obs.OnHandlerComplete(ctx, eventTypeName, duration, panicErr)
}
}()
// Observability: Track handler start
if obs != nil {
ctx = obs.OnHandlerStart(ctx, eventTypeName, async)
}
// Sequential handlers need locking
if h.sequential {
h.mu.Lock()
defer h.mu.Unlock()
}
// Handlers are always stored as Handler[T] (Subscribe) or
// ContextHandler[T] (SubscribeContext); no other shapes can be registered.
switch fn := h.handler.(type) {
case Handler[T]:
fn(event)
case ContextHandler[T]:
fn(ctx, event)
}
}
// Subscribe Options
// Once makes the handler execute only once
func Once() SubscribeOption {
return func(h *internalHandler) {
h.once = true
}
}
// Async makes the handler execute asynchronously
func Async() SubscribeOption {
return func(h *internalHandler) {
h.async = true
}
}
// Sequential ensures the handler executes sequentially (with mutex)
func Sequential() SubscribeOption {
return func(h *internalHandler) {
h.sequential = true
}
}
// WithFilter configures the handler to only receive events that match the predicate
func WithFilter[T any](predicate func(T) bool) SubscribeOption {
return func(h *internalHandler) {
h.filter = predicate
}
}
// Bus Options
// WithPanicHandler sets a panic handler for the event bus
func WithPanicHandler(handler PanicHandler) Option {
return func(bus *EventBus) {
bus.panicHandler = handler
}
}
// WithBeforePublish sets a hook that's called before publishing events
func WithBeforePublish(hook PublishHook) Option {
return func(bus *EventBus) {
bus.beforePublish = hook
}
}
// WithAfterPublish sets a hook that's called after publishing events
func WithAfterPublish(hook PublishHook) Option {
return func(bus *EventBus) {
bus.afterPublish = hook
}
}
// WithBeforePublishContext sets a context-aware hook that's called before publishing events
func WithBeforePublishContext(hook PublishHookContext) Option {
return func(bus *EventBus) {
bus.beforePublishCtx = hook
}
}
// WithAfterPublishContext sets a context-aware hook that's called after publishing events
func WithAfterPublishContext(hook PublishHookContext) Option {
return func(bus *EventBus) {
bus.afterPublishCtx = hook
}
}
// WithPersistenceErrorHandler sets the error handler for persistence failures
func WithPersistenceErrorHandler(handler PersistenceErrorHandler) Option {
return func(bus *EventBus) {
bus.persistenceErrorHandler = handler
}
}
// WithPersistenceTimeout sets the timeout for persistence operations
func WithPersistenceTimeout(timeout time.Duration) Option {
return func(bus *EventBus) {
bus.persistenceTimeout = timeout
}
}
// WithReplayBatchSize sets the batch size for Replay operations.
// This controls how many events are read at a time when using a store
// that doesn't implement EventStoreStreamer.
// Default is 100 if not set or set to 0.
func WithReplayBatchSize(size int) Option {
return func(bus *EventBus) {
bus.replayBatchSize = size
}
}
// WithObservability sets the observability implementation for metrics and tracing
func WithObservability(obs Observability) Option {
return func(bus *EventBus) {
bus.observability = obs
}
}
// WithAsyncHandlerLimit bounds the number of concurrently running async
// handler goroutines. When the limit is reached, Publish blocks until a
// running async handler finishes, providing backpressure instead of
// unbounded goroutine growth during publish spikes.
//
// A limit <= 0 means unlimited (the default).
//
// Deadlock warning: do not publish events that have async subscribers from
// inside an async handler when a limit is set. Such a nested Publish blocks
// waiting for a free slot while the publishing handler occupies one; if all
// slots are held by handlers blocked the same way, none can ever be
// released. Publish re-entrantly only from synchronous handlers, or size
// the limit above the maximum possible nesting fan-out.
func WithAsyncHandlerLimit(n int) Option {
return func(bus *EventBus) {
if n > 0 {
bus.asyncSem = make(chan struct{}, n)
}
}
}
// Shutdown gracefully shuts down the event bus, waiting for async handlers to complete.
// It respects the context timeout/cancellation.
// If the store implements io.Closer, its Close() method will be called after handlers complete.
func (bus *EventBus) Shutdown(ctx context.Context) error {
done := make(chan struct{})
go func() {
bus.Wait()
close(done)
}()
select {
case <-done:
// Close store if it implements io.Closer
if bus.store != nil {
if closer, ok := bus.store.(interface{ Close() error }); ok {
if err := closer.Close(); err != nil {
return fmt.Errorf("failed to close store: %w", err)
}
}
}
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Backward compatibility methods
//
// The Set* methods below write bus fields without synchronization. They are
// intended for configuration before the bus is shared across goroutines;
// calling them concurrently with Publish is a data race.
// SetPanicHandler sets the panic handler.
// Not safe to call concurrently with Publish; configure before use.
//
// Deprecated: use the WithPanicHandler option with New instead.
func (bus *EventBus) SetPanicHandler(handler PanicHandler) {
bus.panicHandler = handler
}
// SetBeforePublishHook sets the before publish hook.
// Not safe to call concurrently with Publish; configure before use.
//
// Deprecated: use the WithBeforePublish option with New instead.
func (bus *EventBus) SetBeforePublishHook(hook PublishHook) {
bus.beforePublish = hook
}
// SetAfterPublishHook sets the after publish hook.
// Not safe to call concurrently with Publish; configure before use.
//
// Deprecated: use the WithAfterPublish option with New instead.
func (bus *EventBus) SetAfterPublishHook(hook PublishHook) {
bus.afterPublish = hook
}
// SetPersistenceErrorHandler sets the persistence error handler (for runtime configuration).
// Not safe to call concurrently with Publish; configure before use.
func (bus *EventBus) SetPersistenceErrorHandler(handler PersistenceErrorHandler) {
bus.persistenceErrorHandler = handler
}
package eventbus
import (
"context"
"encoding/json"
"fmt"
"iter"
"reflect"
"sort"
"sync"
"time"
)
// Offset represents an opaque position in an event stream.
// Implementations define the format (e.g., "123", "abc_456", timestamp-based).
//
// Offsets are resumption tokens: the only operations the bus performs on
// them are equality checks and passing them back to the store that issued
// them. Ordering semantics are store-defined — the bundled MemoryStore and
// the sqlite store produce lexicographically ordered offsets, but stores
// backed by external systems (e.g. durablestream) may not. Treat offsets
// from one store as meaningless to any other store.
type Offset string
const (
// OffsetOldest represents the beginning of the stream.
// When passed to Read, returns events from the start.
OffsetOldest Offset = ""
// OffsetNewest represents the current end of the stream.
// Useful for subscribing to only new events.
OffsetNewest Offset = "$"
)
// EventStore defines the core interface for persisting events.
// This is a minimal interface with just 2 methods for basic event storage.
// Additional capabilities are provided through optional interfaces.
type EventStore interface {
// Append stores an event and returns its assigned offset.
// The store is responsible for generating unique, monotonically increasing offsets.
Append(ctx context.Context, event *Event) (Offset, error)
// Read returns events starting after the given offset.
// Use OffsetOldest to read from the beginning.
// The limit parameter controls max events returned (0 = no limit).
// Returns the events, the offset to use for the next read, and any error.
Read(ctx context.Context, from Offset, limit int) ([]*StoredEvent, Offset, error)
}
// EventStoreStreamer is an optional interface for memory-efficient streaming.
// When implemented, the Replay method will automatically use streaming.
//
// Implementation notes:
// - Database-backed stores should use cursor-based iteration to minimize memory
// - In-memory stores may need to take a snapshot to avoid holding locks during iteration,
// trading memory for deadlock safety (see MemoryStore.ReadStream for an example)
type EventStoreStreamer interface {
// ReadStream returns an iterator yielding events starting after the given offset.
// Use OffsetOldest to read from the beginning.
// The iterator checks ctx.Done() before each yield and returns ctx.Err() when cancelled.
// A yielded error terminates iteration.
ReadStream(ctx context.Context, from Offset) iter.Seq2[*StoredEvent, error]
}
// SubscriptionStore tracks subscription progress separately from event storage.
// This interface is optional and enables resumable subscriptions.
type SubscriptionStore interface {
// SaveOffset persists the current offset for a subscription.
SaveOffset(ctx context.Context, subscriptionID string, offset Offset) error
// LoadOffset retrieves the last saved offset for a subscription.
// Returns OffsetOldest if the subscription has no saved offset.
LoadOffset(ctx context.Context, subscriptionID string) (Offset, error)
}
// EventStoreSnapshotter is an optional interface for stores that can persist a
// materialized projection snapshot, keyed by an id and tagged with the Offset the
// snapshot reflects. It lets a caller compact a high-churn log: save the reduced
// state, then (with EventStoreTruncator) drop the events the snapshot subsumes,
// bounding cold-start replay. The blob is opaque to ebu — callers define its
// encoding. A store that does not implement this interface is simply not
// compactable; callers fall back to a full replay from OffsetOldest.
type EventStoreSnapshotter interface {
// SaveSnapshot upserts the snapshot for snapshotID, recording that blob
// reflects the projection state as of (and including) atOffset. atOffset MUST
// be a real, resumable Offset previously returned by Append/Read for THIS
// store (never OffsetNewest, never synthetic): a caller resumes with
// Replay(ctx, atOffset, ...), which reads only events strictly after it.
SaveSnapshot(ctx context.Context, snapshotID string, atOffset Offset, blob json.RawMessage) error
// LoadSnapshot returns the last saved snapshot for snapshotID. When none
// exists it returns (OffsetOldest, nil, nil) — "replay from the beginning",
// never an error — mirroring LoadOffset's OffsetOldest default.
LoadSnapshot(ctx context.Context, snapshotID string) (atOffset Offset, blob json.RawMessage, err error)
}
// EventStoreTruncator is an optional interface for log compaction: it deletes
// events at or before a given Offset. It is only safe to call once a snapshot
// covering that Offset is durably saved AND no live reader/subscription still
// needs the truncated prefix. Stores whose Offsets are not deletable positions
// (e.g. a remote append-only broker) MUST NOT implement this interface.
type EventStoreTruncator interface {
// TruncateBefore deletes every event whose Offset <= beforeOffset and returns
// the number deleted. beforeOffset == OffsetOldest is a no-op. Idempotent:
// re-running with the same offset deletes nothing more.
TruncateBefore(ctx context.Context, beforeOffset Offset) (deleted int64, err error)
}
// Event represents an event to be stored (before it has an offset).
type Event struct {
Type string `json:"type"`
Data json.RawMessage `json:"data"`
Timestamp time.Time `json:"timestamp"`
}
// StoredEvent represents an event that has been persisted with an offset.
type StoredEvent struct {
Offset Offset `json:"offset"`
Type string `json:"type"`
Data json.RawMessage `json:"data"`
Timestamp time.Time `json:"timestamp"`
}
// WithStore enables persistence with the given store.
//
// Events are persisted in the publish path, after the before-publish hooks
// and before handlers run. Persistence is best-effort: a failed Append does
// not prevent delivery to handlers; the error is reported to the
// PersistenceErrorHandler instead.
func WithStore(store EventStore) Option {
return func(bus *EventBus) {
bus.store = store
}
}
// offsetCtxKey is the context key under which the offset assigned to the
// event being published is stored.
type offsetCtxKey struct{}
// OffsetFromContext returns the offset assigned to the event currently being
// handled, if the bus persisted it successfully. It only returns an offset
// inside handlers invoked by a bus configured with WithStore.
func OffsetFromContext(ctx context.Context) (Offset, bool) {
offset, ok := ctx.Value(offsetCtxKey{}).(Offset)
return offset, ok
}
// WithSubscriptionStore enables subscription position tracking
func WithSubscriptionStore(store SubscriptionStore) Option {
return func(bus *EventBus) {
bus.subscriptionStore = store
}
}
// persistEvent saves an event to storage and returns a context that carries
// the assigned offset on success (see OffsetFromContext). On failure it
// reports to the PersistenceErrorHandler and returns the context unchanged.
//
// Concurrency note: the bus does not serialize Append calls; stores must be
// safe for concurrent use (all bundled stores are).
func (bus *EventBus) persistEvent(ctx context.Context, eventType reflect.Type, event any) context.Context {
// Marshal the event first
data, err := json.Marshal(event)
if err != nil {
if bus.persistenceErrorHandler != nil {
bus.persistenceErrorHandler(event, eventType, fmt.Errorf("failed to marshal event: %w", err))
}
return ctx
}
// Use EventType() to respect TypeNamer interface if implemented
typeName := EventType(event)
toStore := &Event{
Type: typeName,
Data: data,
Timestamp: time.Now(),
}
// Apply timeout if configured. The timeout applies to Append only; the
// original context is what gets returned to the publish path.
appendCtx := ctx
if bus.persistenceTimeout > 0 {
var cancel context.CancelFunc
appendCtx, cancel = context.WithTimeout(ctx, bus.persistenceTimeout)
defer cancel()
}
// Observability: Track persistence start
if bus.observability != nil {
appendCtx = bus.observability.OnPersistStart(appendCtx, typeName)
}
start := time.Now()
// Append the event - the store assigns the offset
offset, saveErr := bus.store.Append(appendCtx, toStore)
// Observability: Track persistence complete
if bus.observability != nil {
bus.observability.OnPersistComplete(appendCtx, typeName, time.Since(start), offset, saveErr)
}
if saveErr != nil {
if bus.persistenceErrorHandler != nil {
bus.persistenceErrorHandler(event, eventType, fmt.Errorf("failed to save event: %w", saveErr))
}
return ctx
}
return context.WithValue(ctx, offsetCtxKey{}, offset)
}
// Replay replays events from an offset
func (bus *EventBus) Replay(ctx context.Context, from Offset, handler func(*StoredEvent) error) error {
if bus.store == nil {
return fmt.Errorf("replay requires persistence (use WithStore option)")
}
// Use streaming if available for memory efficiency
if streamer, ok := bus.store.(EventStoreStreamer); ok {
for event, err := range streamer.ReadStream(ctx, from) {
if err != nil {
return fmt.Errorf("stream events: %w", err)
}
if err := handler(event); err != nil {
return fmt.Errorf("handle event at offset %s: %w", event.Offset, err)
}
}
return nil
}
// Fallback to Read for stores that don't support streaming
batchSize := bus.replayBatchSize
if batchSize <= 0 {
batchSize = 100 // Default batch size
}
offset := from
for {
// Check context cancellation before each batch
select {
case <-ctx.Done():
return ctx.Err()
default:
}
events, nextOffset, err := bus.store.Read(ctx, offset, batchSize)
if err != nil {
return fmt.Errorf("read events: %w", err)
}
if len(events) == 0 {
break
}
for _, event := range events {
if err := handler(event); err != nil {
return fmt.Errorf("handle event at offset %s: %w", event.Offset, err)
}
}
// Protect against infinite loop if offset doesn't advance
if nextOffset == offset {
return fmt.Errorf("store returned non-advancing offset %s: possible bug in EventStore.Read implementation", offset)
}
offset = nextOffset
}
return nil
}
// ReplayWithUpcast replays events from an offset, applying upcasts before passing to handler
func (bus *EventBus) ReplayWithUpcast(ctx context.Context, from Offset, handler func(*StoredEvent) error) error {
return bus.Replay(ctx, from, func(event *StoredEvent) error {
// Apply upcasts if available
if bus.upcastRegistry != nil {
upcastedData, upcastedType, err := bus.upcastRegistry.apply(event.Data, event.Type)
if err == nil {
// Create a new StoredEvent with upcasted data
upcastedEvent := &StoredEvent{
Offset: event.Offset,
Type: upcastedType,
Data: upcastedData,
Timestamp: event.Timestamp,
}
return handler(upcastedEvent)
}
// If upcast fails, pass original event
}
return handler(event)
})
}
// IsPersistent returns true if persistence is enabled
func (bus *EventBus) IsPersistent() bool {
return bus.store != nil
}
// GetStore returns the event store (or nil if not persistent)
func (bus *EventBus) GetStore() EventStore {
return bus.store
}
// typeNamerType is the reflect.Type of the TypeNamer interface.
var typeNamerType = reflect.TypeOf((*TypeNamer)(nil)).Elem()
// typeNameOf returns the persisted type name for an event type, honoring the
// TypeNamer interface the same way EventType does for event values.
func typeNameOf(t reflect.Type) string {
if t.Implements(typeNamerType) {
return reflect.Zero(t).Interface().(TypeNamer).EventTypeName()
}
if reflect.PointerTo(t).Implements(typeNamerType) {
// EventTypeName has a pointer receiver; call it on a fresh instance
// rather than a nil pointer.
return reflect.New(t).Interface().(TypeNamer).EventTypeName()
}
return t.String()
}
// ReplayErrorPolicy determines how SubscribeWithReplay handles a stored
// event that cannot be decoded into the subscription's event type.
type ReplayErrorPolicy int
const (
// ReplayAbort stops the replay and returns the decode error (default).
// The subscription's saved offset does not advance past the failing
// event, so the next SubscribeWithReplay hits it again. Use this when a
// decode failure means a bug that must be fixed before proceeding.
ReplayAbort ReplayErrorPolicy = iota
// ReplaySkip reports the decode error to the PersistenceErrorHandler
// (with the *StoredEvent as the event argument) and continues with the
// next event. The skip is durable: the poison event's offset is saved,
// so it is not re-scanned and re-reported on later restarts — recover
// its payload from the reported *StoredEvent if needed.
//
// Scope: the policy fires only when a stored event OF THE SUBSCRIBED
// TYPE fails to decode (malformed or type-incompatible JSON). It does
// not cover events stored under a different type name — those are
// always skipped silently, by design, since streams may carry many
// event types — nor JSON that decodes leniently despite schema drift
// (unknown fields are dropped, missing fields zero-filled; use upcasts
// for schema evolution).
ReplaySkip
)
// WithReplayErrorPolicy sets how SubscribeWithReplay treats stored events
// that fail to decode. It has no effect on Subscribe/SubscribeContext live
// delivery. The default is ReplayAbort.
func WithReplayErrorPolicy(policy ReplayErrorPolicy) SubscribeOption {
return func(h *internalHandler) {
h.replayErrorPolicy = policy
}
}
// SubscribeWithReplay subscribes and replays missed events.
// Requires both an EventStore (for replay) and a SubscriptionStore (for tracking).
// If the store implements SubscriptionStore, it will be used automatically.
//
// Context usage:
// - The context is used for the replay phase (loading historical events)
// - Live-phase offset saves use the per-publish context of each event
//
// Offset tracking: after each handled event the subscription saves that
// event's own offset (replay phase) or the offset assigned during publish
// (live phase, via OffsetFromContext). Delivery is at-least-once: after a
// crash between handling and offset save, the event is redelivered on the
// next SubscribeWithReplay.
//
// Handoff: after the live subscription registers, one catch-up replay pass
// runs from the last replayed position, so events published between the end
// of replay and subscription registration are not missed. Events published
// in that window may be delivered twice (once by the catch-up pass, once
// live) — handlers must be idempotent.
//
// SaveOffset failures do not stop delivery; they are reported to the
// PersistenceErrorHandler.
//
// Validation: all argument and option validation (nil bus/handler/options,
// interface event types, filter shape) happens before the replay pass, so a
// call that returns a validation error has delivered no events and saved no
// offsets.
//
// Filtering: a WithFilter predicate applies to the replay and catch-up
// passes exactly as it does to live delivery — the handler never sees
// non-matching events, and their offsets are not saved.
//
// Note: If the saved offset is OffsetOldest (""), replay starts from the beginning.
// The OffsetNewest ("$") constant is typically not stored and is only used for live subscriptions.
func SubscribeWithReplay[T any](
ctx context.Context,
bus *EventBus,
subscriptionID string,
handler Handler[T],
opts ...SubscribeOption,
) error {
if bus == nil {
return fmt.Errorf("eventbus: bus cannot be nil")
}
if handler == nil {
return fmt.Errorf("eventbus: handler cannot be nil")
}
if bus.store == nil {
return fmt.Errorf("SubscribeWithReplay requires persistence (use WithStore option)")
}
// Get subscription store - either explicit or from the event store
subStore := bus.subscriptionStore
if subStore == nil {
if ss, ok := bus.store.(SubscriptionStore); ok {
subStore = ss
} else {
return fmt.Errorf("SubscribeWithReplay requires a SubscriptionStore (use WithSubscriptionStore option or use a store that implements SubscriptionStore)")
}
}
eventType := reflect.TypeOf((*T)(nil)).Elem()
// Match the name events were persisted under (TypeNamer-aware).
typeName := typeNameOf(eventType)
saveOffset := func(saveCtx context.Context, event T, offset Offset) {
if err := subStore.SaveOffset(saveCtx, subscriptionID, offset); err != nil && bus.persistenceErrorHandler != nil {
bus.persistenceErrorHandler(event, eventType,
fmt.Errorf("failed to save offset for subscription %q: %w", subscriptionID, err))
}
}
// Build the live subscription up front: options are applied exactly once
// and ALL subscription-time validation (event type, nil options, filter
// shape) runs before the replay pass can deliver events or save offsets.
// The handler is registered in the shard only after the first replay pass.
var wrappedHandler ContextHandler[T] = func(hctx context.Context, event T) {
handler(event)
// The publish path attaches the persisted offset of the event being
// delivered to its context, so each handled event saves its own offset.
if offset, ok := OffsetFromContext(hctx); ok {
saveOffset(hctx, event, offset)
}
}
h, err := buildHandler(wrappedHandler, eventType, opts)
if err != nil {
return err
}
// The filter's shape was validated by buildHandler; honor it during the
// replay passes the same way PublishContext honors it live.
filter, _ := h.filter.(func(T) bool)
// Load last offset for this subscription
lastOffset, _ := subStore.LoadOffset(ctx, subscriptionID)
// lastSeen tracks the stream position of the last event observed during
// replay (regardless of type), so the catch-up pass below can resume
// where the first pass left off.
lastSeen := lastOffset
replayFn := func(stored *StoredEvent) error {
lastSeen = stored.Offset
// Apply upcasts if available
eventData, eventTypeName := stored.Data, stored.Type
if bus.upcastRegistry != nil {
upcastedData, upcastedType, err := bus.upcastRegistry.apply(eventData, eventTypeName)
if err == nil {
eventData = upcastedData
eventTypeName = upcastedType
}
// Continue even if upcast fails, let the type check handle it
}
// Only replay events of the correct type
if eventTypeName != typeName {
return nil
}
var event T
if err := json.Unmarshal(eventData, &event); err != nil {
if h.replayErrorPolicy == ReplaySkip {
// Poison event: report and move on rather than wedging the
// subscription on it forever. The skip is durable — saving the
// poison event's own offset stops it from being re-scanned and
// re-reported on every restart. Its payload was just handed to
// the error handler as a *StoredEvent for out-of-band recovery.
if bus.persistenceErrorHandler != nil {
bus.persistenceErrorHandler(stored, eventType,
fmt.Errorf("skipping undecodable event at offset %s for subscription %q: %w", stored.Offset, subscriptionID, err))
}
var zero T
saveOffset(ctx, zero, stored.Offset)
return nil
}
return err
}
// Apply the subscription's filter, mirroring live delivery: the
// handler never sees non-matching events and their offsets are not
// saved (the next handled event advances past them, as it does live).
if filter != nil && !filter(event) {
return nil
}
handler(event)
// Save this event's own offset after successful handling
saveOffset(ctx, event, stored.Offset)
return nil
}
if err := bus.Replay(ctx, lastOffset, replayFn); err != nil {
return fmt.Errorf("replay events: %w", err)
}
// Register the pre-built, pre-validated subscription for live events.
// Options were already applied by buildHandler above — registering the
// same handler instance keeps each option's effect applied exactly once.
bus.addHandler(eventType, h)
// Catch-up pass: an event persisted after the first replay finished but
// before the live subscription registered would otherwise be missed
// until the next SubscribeWithReplay. Persistence happens before live
// delivery in the publish path, so replaying once more after the
// subscription is registered closes the gap: every event is now seen by
// this pass and/or the live subscription. An event published in the
// overlap window may be delivered twice (at-least-once semantics — make
// handlers idempotent).
if err := bus.Replay(ctx, lastSeen, replayFn); err != nil {
return fmt.Errorf("catch-up replay: %w", err)
}
return nil
}
// MemoryStore is a simple in-memory implementation of EventStore and SubscriptionStore.
type MemoryStore struct {
events []*StoredEvent
subscriptions map[string]Offset
nextOffset int64
mu sync.RWMutex
}
// Ensure MemoryStore implements all required interfaces
var _ EventStore = (*MemoryStore)(nil)
var _ EventStoreStreamer = (*MemoryStore)(nil)
var _ SubscriptionStore = (*MemoryStore)(nil)
// NewMemoryStore creates a new in-memory event store
func NewMemoryStore() *MemoryStore {
return &MemoryStore{
events: make([]*StoredEvent, 0),
subscriptions: make(map[string]Offset),
}
}
// Append implements EventStore
func (m *MemoryStore) Append(ctx context.Context, event *Event) (Offset, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.nextOffset++
// Use zero-padded format for correct lexicographic ordering
offset := Offset(fmt.Sprintf("%020d", m.nextOffset))
stored := &StoredEvent{
Offset: offset,
Type: event.Type,
Data: event.Data,
Timestamp: event.Timestamp,
}
m.events = append(m.events, stored)
return offset, nil
}
// searchAfter returns the index of the first event with offset > from.
// Events are stored in ascending, zero-padded offset order, so binary
// search applies. Caller must hold at least a read lock.
func (m *MemoryStore) searchAfter(from Offset) int {
if from == OffsetOldest {
return 0
}
return sort.Search(len(m.events), func(i int) bool {
return m.events[i].Offset > from
})
}
// Read implements EventStore
func (m *MemoryStore) Read(ctx context.Context, from Offset, limit int) ([]*StoredEvent, Offset, error) {
m.mu.RLock()
defer m.mu.RUnlock()
start := m.searchAfter(from)
end := len(m.events)
if limit > 0 && start+limit < end {
end = start + limit
}
if start == end {
return nil, from, nil
}
result := make([]*StoredEvent, end-start)
copy(result, m.events[start:end])
return result, result[len(result)-1].Offset, nil
}
// SaveOffset implements SubscriptionStore
func (m *MemoryStore) SaveOffset(ctx context.Context, subscriptionID string, offset Offset) error {
m.mu.Lock()
defer m.mu.Unlock()
m.subscriptions[subscriptionID] = offset
return nil
}
// LoadOffset implements SubscriptionStore
func (m *MemoryStore) LoadOffset(ctx context.Context, subscriptionID string) (Offset, error) {
m.mu.RLock()
defer m.mu.RUnlock()
offset, ok := m.subscriptions[subscriptionID]
if !ok {
return OffsetOldest, nil
}
return offset, nil
}
// ReadStream implements EventStoreStreamer for memory-efficient event iteration.
// Note: This takes a filtered snapshot of matching events to avoid holding the lock
// during iteration, which could cause deadlocks if handlers call other store methods.
func (m *MemoryStore) ReadStream(ctx context.Context, from Offset) iter.Seq2[*StoredEvent, error] {
return func(yield func(*StoredEvent, error) bool) {
// Take a snapshot to avoid holding lock during iteration
m.mu.RLock()
start := m.searchAfter(from)
events := make([]*StoredEvent, len(m.events)-start)
copy(events, m.events[start:])
m.mu.RUnlock()
for _, event := range events {
// Check context cancellation
select {
case <-ctx.Done():
yield(nil, ctx.Err())
return
default:
}
if !yield(event, nil) {
return // Consumer stopped iteration
}
}
}
}
package state
import (
"encoding/json"
"fmt"
"time"
)
// Insert creates an insert change message for an entity.
// The type parameter T determines the entity type name (unless overridden with WithEntityType).
//
// Example:
//
// msg, err := state.Insert("user:1", User{Name: "Alice"})
func Insert[T any](key string, value T, opts ...ChangeOption) (*ChangeMessage, error) {
return newChangeMessage[T](OperationInsert, key, &value, nil, opts...)
}
// Update creates an update change message for an entity.
// The type parameter T determines the entity type name (unless overridden with WithEntityType).
//
// Example:
//
// msg, err := state.Update("user:1", User{Name: "Alice Smith"})
func Update[T any](key string, value T, opts ...ChangeOption) (*ChangeMessage, error) {
return newChangeMessage[T](OperationUpdate, key, &value, nil, opts...)
}
// UpdateWithOldValue creates an update change message with the old value for conflict detection.
// The old value can be used by consumers to detect concurrent modifications.
//
// Example:
//
// msg, err := state.UpdateWithOldValue("user:1", newUser, oldUser)
func UpdateWithOldValue[T any](key string, value, oldValue T, opts ...ChangeOption) (*ChangeMessage, error) {
return newChangeMessage[T](OperationUpdate, key, &value, &oldValue, opts...)
}
// Delete creates a delete change message for an entity.
// The type parameter T determines the entity type name.
//
// Example:
//
// msg, err := state.Delete[User]("user:1")
func Delete[T any](key string, opts ...ChangeOption) (*ChangeMessage, error) {
return newChangeMessage[T](OperationDelete, key, nil, nil, opts...)
}
// DeleteWithOldValue creates a delete change message with the old value preserved.
// This is useful for consumers that need to know what was deleted.
//
// Example:
//
// msg, err := state.DeleteWithOldValue("user:1", user)
func DeleteWithOldValue[T any](key string, oldValue T, opts ...ChangeOption) (*ChangeMessage, error) {
return newChangeMessage[T](OperationDelete, key, nil, &oldValue, opts...)
}
// newChangeMessage is the internal constructor for change messages.
func newChangeMessage[T any](op Operation, key string, value, oldValue *T, opts ...ChangeOption) (*ChangeMessage, error) {
if key == "" {
return nil, fmt.Errorf("state: key cannot be empty")
}
cfg := &changeConfig{}
for _, opt := range opts {
opt(cfg)
}
// Determine entity type
var zero T
entityType := EntityType(zero)
if cfg.entityType != "" {
entityType = cfg.entityType
}
msg := &ChangeMessage{
Type: entityType,
Key: key,
Headers: Headers{
Operation: op,
TxID: cfg.txID,
},
}
// Set timestamp
if cfg.timestamp != nil {
msg.Headers.Timestamp = cfg.timestamp.Format(time.RFC3339Nano)
} else if cfg.autoTimestamp {
msg.Headers.Timestamp = time.Now().UTC().Format(time.RFC3339Nano)
}
// Marshal value for insert/update
if value != nil {
valueData, err := json.Marshal(value)
if err != nil {
return nil, fmt.Errorf("state: marshal value: %w", err)
}
msg.Value = valueData
}
// Marshal old value if provided
if oldValue != nil {
oldData, err := json.Marshal(oldValue)
if err != nil {
return nil, fmt.Errorf("state: marshal old_value: %w", err)
}
msg.OldValue = oldData
}
return msg, nil
}
// SnapshotStart creates a snapshot-start control message.
// This marks the beginning of a snapshot in the stream.
func SnapshotStart(offset string) *ControlMessage {
return &ControlMessage{
Headers: ControlHeaders{
Control: ControlSnapshotStart,
Offset: offset,
},
}
}
// SnapshotEnd creates a snapshot-end control message.
// This marks the end of a snapshot in the stream.
func SnapshotEnd(offset string) *ControlMessage {
return &ControlMessage{
Headers: ControlHeaders{
Control: ControlSnapshotEnd,
Offset: offset,
},
}
}
// Reset creates a reset control message.
// This signals that all state should be cleared and rebuilt from subsequent events.
func Reset(offset string) *ControlMessage {
return &ControlMessage{
Headers: ControlHeaders{
Control: ControlReset,
Offset: offset,
},
}
}
package state
import (
"context"
"encoding/json"
"fmt"
"sync"
eventbus "github.com/jilio/ebu"
)
// Store is a generic interface for state storage.
// Implementations can use any backing store (memory, database, etc.).
type Store[T any] interface {
// Get retrieves an entity by its composite key.
Get(compositeKey string) (T, bool)
// Set stores an entity with the given composite key.
Set(compositeKey string, value T)
// Delete removes an entity by its composite key.
Delete(compositeKey string)
// Clear removes all entities from the store.
Clear()
// All returns a copy of all entities in the store.
All() map[string]T
}
// MemoryStore is an in-memory implementation of Store.
// It is safe for concurrent access.
type MemoryStore[T any] struct {
data map[string]T
mu sync.RWMutex
}
// NewMemoryStore creates a new in-memory store.
func NewMemoryStore[T any]() *MemoryStore[T] {
return &MemoryStore[T]{
data: make(map[string]T),
}
}
// Get retrieves an entity by its composite key.
func (s *MemoryStore[T]) Get(key string) (T, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
v, ok := s.data[key]
return v, ok
}
// Set stores an entity with the given composite key.
func (s *MemoryStore[T]) Set(key string, value T) {
s.mu.Lock()
defer s.mu.Unlock()
s.data[key] = value
}
// Delete removes an entity by its composite key.
func (s *MemoryStore[T]) Delete(key string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.data, key)
}
// Clear removes all entities from the store.
func (s *MemoryStore[T]) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
s.data = make(map[string]T)
}
// All returns a copy of all entities in the store.
func (s *MemoryStore[T]) All() map[string]T {
s.mu.RLock()
defer s.mu.RUnlock()
result := make(map[string]T, len(s.data))
for k, v := range s.data {
result[k] = v
}
return result
}
// TypedCollection provides type-safe access to a collection of entities.
// It wraps a Store and associates it with a specific entity type.
type TypedCollection[T any] struct {
store Store[T]
entityType string
}
// NewTypedCollection creates a collection for a specific entity type.
// The entity type name is determined from the type parameter T.
func NewTypedCollection[T any](store Store[T]) *TypedCollection[T] {
var zero T
return &TypedCollection[T]{
store: store,
entityType: EntityType(zero),
}
}
// NewTypedCollectionWithType creates a collection with an explicit type name.
// Use this when the type name should differ from the Go type name.
func NewTypedCollectionWithType[T any](store Store[T], entityType string) *TypedCollection[T] {
return &TypedCollection[T]{
store: store,
entityType: entityType,
}
}
// EntityType returns the entity type name for this collection.
func (c *TypedCollection[T]) EntityType() string {
return c.entityType
}
// Get retrieves an entity by its key (without the type prefix).
func (c *TypedCollection[T]) Get(key string) (T, bool) {
return c.store.Get(CompositeKey(c.entityType, key))
}
// All returns all entities in this collection.
// The keys in the returned map are composite keys (type/key).
func (c *TypedCollection[T]) All() map[string]T {
return c.store.All()
}
// collectionApplier is an internal interface for applying changes to collections.
type collectionApplier interface {
applyChange(msg *ChangeMessage) error
clear()
// snapshot serializes every entity in the collection, keyed by composite key.
snapshot() (map[string]json.RawMessage, error)
// restore clears the collection and repopulates it from serialized entities.
restore(entities map[string]json.RawMessage) error
}
// typedCollectionApplier wraps TypedCollection to implement collectionApplier.
type typedCollectionApplier[T any] struct {
collection *TypedCollection[T]
}
func (a *typedCollectionApplier[T]) applyChange(msg *ChangeMessage) error {
key := CompositeKey(msg.Type, msg.Key)
switch msg.Headers.Operation {
case OperationInsert, OperationUpdate:
var value T
if err := json.Unmarshal(msg.Value, &value); err != nil {
return fmt.Errorf("state: unmarshal value for %s/%s: %w", msg.Type, msg.Key, err)
}
a.collection.store.Set(key, value)
case OperationDelete:
a.collection.store.Delete(key)
default:
return fmt.Errorf("state: unknown operation %q for %s/%s", msg.Headers.Operation, msg.Type, msg.Key)
}
return nil
}
func (a *typedCollectionApplier[T]) clear() {
a.collection.store.Clear()
}
func (a *typedCollectionApplier[T]) snapshot() (map[string]json.RawMessage, error) {
all := a.collection.store.All()
entities := make(map[string]json.RawMessage, len(all))
for key, value := range all {
data, err := json.Marshal(value)
if err != nil {
return nil, fmt.Errorf("state: marshal snapshot entity %s: %w", key, err)
}
entities[key] = data
}
return entities, nil
}
func (a *typedCollectionApplier[T]) restore(entities map[string]json.RawMessage) error {
a.collection.store.Clear()
for key, raw := range entities {
var value T
if err := json.Unmarshal(raw, &value); err != nil {
return fmt.Errorf("state: unmarshal snapshot entity %s: %w", key, err)
}
a.collection.store.Set(key, value)
}
return nil
}
// Materializer processes state protocol messages and maintains state.
// It applies change messages to registered collections and handles control messages.
//
// Message application is serialized: concurrent Apply, ApplyChangeMessage, and
// ApplyControlMessage calls are applied one at a time, so a reset can never be
// overwritten by a logically-earlier change, and LastOffset only advances past
// events that were applied successfully. Callbacks configured via options run
// while an application is in flight and must not call back into Apply,
// ApplyChangeMessage, ApplyControlMessage, SaveSnapshotTo, or LoadSnapshotFrom.
type Materializer struct {
collections map[string]collectionApplier
cfg *materializerConfig
// applyMu serializes message application. It is held for the entirety of
// Apply/ApplyChangeMessage/ApplyControlMessage — application plus the
// lastOffset update happen as one atomic step — and for snapshot
// save/restore, which need a consistent view across collections.
applyMu sync.Mutex
// mu guards the collections map and lastOffset.
mu sync.RWMutex
lastOffset eventbus.Offset
}
// NewMaterializer creates a new Materializer.
func NewMaterializer(opts ...MaterializerOption) *Materializer {
cfg := &materializerConfig{}
for _, opt := range opts {
opt(cfg)
}
return &Materializer{
collections: make(map[string]collectionApplier),
cfg: cfg,
}
}
// RegisterCollection registers a typed collection with the materializer.
// The collection's entity type determines which change messages are routed to it.
func RegisterCollection[T any](m *Materializer, collection *TypedCollection[T]) {
m.mu.Lock()
defer m.mu.Unlock()
m.collections[collection.EntityType()] = &typedCollectionApplier[T]{collection: collection}
}
// Apply processes a single StoredEvent containing a state protocol message.
// This is the primary integration point with ebu's Replay functionality.
//
// The event's Data field should contain a JSON-encoded ChangeMessage or
// ControlMessage. Events that are not state protocol messages — both message
// kinds always carry a "headers" field on the wire, so an event without one
// (e.g. a regular event on a mixed stream) — are skipped silently in both
// strict and non-strict mode.
//
// The offset reported by LastOffset advances only when the event was applied
// (or skipped) successfully; failed events never advance it, so a resumed
// replay picks them up again. All failures are reported to the WithOnError
// callback before being returned.
func (m *Materializer) Apply(event *eventbus.StoredEvent) error {
m.applyMu.Lock()
defer m.applyMu.Unlock()
// Determine message type by checking for control header
var raw struct {
Headers json.RawMessage `json:"headers"`
}
if err := json.Unmarshal(event.Data, &raw); err != nil {
return m.reportError(fmt.Errorf("state: unmarshal event: %w", err))
}
// Not a state protocol message: skip it so materializers work on streams
// that mix state messages with regular events.
if raw.Headers == nil {
m.setLastOffset(event.Offset)
return nil
}
// Try to parse as control message first
var ctrlHeaders ControlHeaders
if json.Unmarshal(raw.Headers, &ctrlHeaders) == nil && ctrlHeaders.Control != "" {
m.applyControl(&ControlMessage{Headers: ctrlHeaders})
m.setLastOffset(event.Offset)
return nil
}
// Parse as change message
var changeMsg ChangeMessage
if err := json.Unmarshal(event.Data, &changeMsg); err != nil {
return m.reportError(fmt.Errorf("state: unmarshal change message: %w", err))
}
if err := m.applyChange(&changeMsg); err != nil {
return err
}
m.setLastOffset(event.Offset)
return nil
}
// ApplyChangeMessage processes a ChangeMessage directly.
// Use this when you have a ChangeMessage that's not wrapped in a StoredEvent.
func (m *Materializer) ApplyChangeMessage(msg *ChangeMessage) error {
m.applyMu.Lock()
defer m.applyMu.Unlock()
return m.applyChange(msg)
}
// ApplyControlMessage processes a ControlMessage directly.
// Use this when you have a ControlMessage that's not wrapped in a StoredEvent.
func (m *Materializer) ApplyControlMessage(msg *ControlMessage) {
m.applyMu.Lock()
defer m.applyMu.Unlock()
m.applyControl(msg)
}
// applyChange applies a change message to the appropriate collection.
func (m *Materializer) applyChange(msg *ChangeMessage) error {
m.mu.RLock()
collection, ok := m.collections[msg.Type]
m.mu.RUnlock()
if !ok {
if m.cfg.strictSchema {
return m.reportError(fmt.Errorf("state: unknown entity type: %s", msg.Type))
}
return nil // Ignore unknown types in non-strict mode
}
if err := collection.applyChange(msg); err != nil {
return m.reportError(err)
}
return nil
}
// reportError invokes the configured error callback, if any, and returns err.
// Every materialization error passes through here exactly once.
func (m *Materializer) reportError(err error) error {
if m.cfg.onError != nil {
m.cfg.onError(err)
}
return err
}
// setLastOffset records the offset of the last successfully applied event.
// Callers must hold applyMu.
func (m *Materializer) setLastOffset(offset eventbus.Offset) {
m.mu.Lock()
m.lastOffset = offset
m.mu.Unlock()
}
// applyControl applies a control message.
func (m *Materializer) applyControl(msg *ControlMessage) {
switch msg.Headers.Control {
case ControlReset:
m.mu.Lock()
for _, c := range m.collections {
c.clear()
}
m.mu.Unlock()
if m.cfg.onReset != nil {
m.cfg.onReset()
}
case ControlSnapshotStart:
if m.cfg.onSnapshot != nil {
m.cfg.onSnapshot(true)
}
case ControlSnapshotEnd:
if m.cfg.onSnapshot != nil {
m.cfg.onSnapshot(false)
}
}
}
// LastOffset returns the offset of the last applied event.
func (m *Materializer) LastOffset() eventbus.Offset {
m.mu.RLock()
defer m.mu.RUnlock()
return m.lastOffset
}
// Replay is a convenience method that replays events from an EventBus.
// It calls bus.ReplayWithUpcast and applies each event through the
// materializer, so events with registered schema migrations are upcasted
// before application.
func (m *Materializer) Replay(ctx context.Context, bus *eventbus.EventBus, from eventbus.Offset) error {
return bus.ReplayWithUpcast(ctx, from, m.Apply)
}
// SaveSnapshotTo serializes every registered collection and saves the result
// to s under snapshotID, tagged with the offset of the last applied event.
// The blob format is {"entityType": {"compositeKey": <entity JSON>}}.
//
// It returns an error if no event has been applied yet (LastOffset is empty):
// such a snapshot would claim OffsetOldest, and a later TruncateBefore based
// on it could silently discard the whole log.
//
// The intended compaction sequence is:
//
// if err := mat.SaveSnapshotTo(ctx, snapshotter, "users"); err != nil { ... }
// // Optionally, once the snapshot is durably saved, compact the log:
// if tr, ok := bus.GetStore().(eventbus.EventStoreTruncator); ok {
// tr.TruncateBefore(ctx, offset) // offset the snapshot was saved at
// }
//
// Truncation is only safe once the snapshot is durably saved AND no other
// reader or subscription still needs the truncated prefix. The offset to
// truncate at is the snapshot's offset (retrievable via the snapshotter's
// LoadSnapshot, or LastOffset when no events are applied concurrently) —
// never a later one, or events not covered by the snapshot would be lost.
//
// Message application is paused while the snapshot is taken, so the blob is a
// consistent view of all collections at a single offset.
func (m *Materializer) SaveSnapshotTo(ctx context.Context, s eventbus.EventStoreSnapshotter, snapshotID string) error {
m.applyMu.Lock()
defer m.applyMu.Unlock()
m.mu.RLock()
offset := m.lastOffset
collections := make(map[string]collectionApplier, len(m.collections))
for entityType, c := range m.collections {
collections[entityType] = c
}
m.mu.RUnlock()
if offset == eventbus.OffsetOldest {
return fmt.Errorf("state: refusing to save snapshot %q: no events applied yet (snapshot would claim OffsetOldest)", snapshotID)
}
blob := make(map[string]map[string]json.RawMessage, len(collections))
for entityType, c := range collections {
entities, err := c.snapshot()
if err != nil {
return err
}
blob[entityType] = entities
}
// blob contains only string keys and json.RawMessage values produced by
// json.Marshal, so encoding cannot fail.
encoded, _ := json.Marshal(blob)
if err := s.SaveSnapshot(ctx, snapshotID, offset, encoded); err != nil {
return fmt.Errorf("state: save snapshot %q: %w", snapshotID, err)
}
return nil
}
// LoadSnapshotFrom restores the materializer from the snapshot saved under
// snapshotID: it clears all registered collections, repopulates them from the
// snapshot blob, sets LastOffset to the snapshot's offset, and returns that
// offset. The caller resumes with:
//
// offset, err := mat.LoadSnapshotFrom(ctx, snapshotter, "users")
// if err != nil { ... }
// if err := mat.Replay(ctx, bus, offset); err != nil { ... }
//
// When no snapshot exists it returns OffsetOldest with the collections
// untouched, so the caller's Replay naturally rebuilds from the beginning.
// Snapshot data for entity types with no registered collection is dropped
// silently.
//
// If restoring fails, the materializer is left empty with LastOffset reset to
// OffsetOldest, so a full replay from OffsetOldest rebuilds the state.
func (m *Materializer) LoadSnapshotFrom(ctx context.Context, s eventbus.EventStoreSnapshotter, snapshotID string) (eventbus.Offset, error) {
m.applyMu.Lock()
defer m.applyMu.Unlock()
offset, blob, err := s.LoadSnapshot(ctx, snapshotID)
if err != nil {
return eventbus.OffsetOldest, fmt.Errorf("state: load snapshot %q: %w", snapshotID, err)
}
if offset == eventbus.OffsetOldest && blob == nil {
return eventbus.OffsetOldest, nil // No snapshot: replay from the beginning.
}
var decoded map[string]map[string]json.RawMessage
if err := json.Unmarshal(blob, &decoded); err != nil {
return eventbus.OffsetOldest, fmt.Errorf("state: unmarshal snapshot %q: %w", snapshotID, err)
}
m.mu.Lock()
defer m.mu.Unlock()
for _, c := range m.collections {
c.clear()
}
for entityType, entities := range decoded {
c, ok := m.collections[entityType]
if !ok {
continue // Entity type no longer registered: drop its snapshot data.
}
if err := c.restore(entities); err != nil {
// Leave the materializer empty rather than partially restored: a
// full replay from OffsetOldest rebuilds the state.
for _, c := range m.collections {
c.clear()
}
m.lastOffset = eventbus.OffsetOldest
return eventbus.OffsetOldest, err
}
}
m.lastOffset = offset
return offset, nil
}
package state
import (
"encoding/json"
"reflect"
)
// Operation represents the type of change operation per the State Protocol.
type Operation string
const (
// OperationInsert indicates a new entity is being created.
OperationInsert Operation = "insert"
// OperationUpdate indicates an existing entity is being modified.
OperationUpdate Operation = "update"
// OperationDelete indicates an entity is being removed.
OperationDelete Operation = "delete"
)
// Control represents the type of control message per the State Protocol.
type Control string
const (
// ControlSnapshotStart marks the beginning of a snapshot.
ControlSnapshotStart Control = "snapshot-start"
// ControlSnapshotEnd marks the end of a snapshot.
ControlSnapshotEnd Control = "snapshot-end"
// ControlReset signals that all state should be cleared.
ControlReset Control = "reset"
)
// Headers contains metadata for change messages per the State Protocol.
type Headers struct {
// Operation is the type of change (insert, update, delete).
Operation Operation `json:"operation"`
// TxID is an optional transaction identifier for grouping related changes.
TxID string `json:"txid,omitempty"`
// Timestamp is an optional RFC 3339 formatted timestamp.
Timestamp string `json:"timestamp,omitempty"`
}
// ControlHeaders contains metadata for control messages.
type ControlHeaders struct {
// Control is the type of control message.
Control Control `json:"control"`
// Offset is an optional reference to a stream position.
Offset string `json:"offset,omitempty"`
}
// ChangeMessage represents a state change event per the State Protocol.
// It contains an entity mutation (insert, update, or delete) with a composite key.
type ChangeMessage struct {
// Type is the entity type discriminator (e.g., "user", "order").
Type string `json:"type"`
// Key is the unique identifier within the entity type.
Key string `json:"key"`
// Value contains the entity data (required for insert/update).
Value json.RawMessage `json:"value,omitempty"`
// OldValue contains the previous entity data (optional, for conflict detection).
OldValue json.RawMessage `json:"old_value,omitempty"`
// Headers contains operation metadata.
Headers Headers `json:"headers"`
}
// ControlMessage represents a control event per the State Protocol.
// Control messages manage stream lifecycle (snapshots, resets).
type ControlMessage struct {
// Headers contains control message metadata.
Headers ControlHeaders `json:"headers"`
}
// TypeNamer is an optional interface that entity types can implement to provide
// their own type name. This mirrors ebu's TypeNamer pattern.
//
// Example:
//
// type User struct { ... }
// func (u User) StateTypeName() string { return "user" }
type TypeNamer interface {
StateTypeName() string
}
// EntityType returns the type name for an entity.
// If the entity implements TypeNamer, returns the custom name.
// Otherwise returns the reflect-based package-qualified name.
// Returns "nil" if entity is nil.
func EntityType(entity any) string {
if entity == nil {
return "nil"
}
if namer, ok := entity.(TypeNamer); ok {
return namer.StateTypeName()
}
return reflect.TypeOf(entity).String()
}
// CompositeKey returns a composite key from type and key components.
// The State Protocol uses type + key as a composite identifier.
func CompositeKey(entityType, key string) string {
return entityType + "/" + key
}
// EventTypeName implements ebu's TypeNamer interface for ChangeMessage.
// This allows ChangeMessage to be published directly to an EventBus.
func (m ChangeMessage) EventTypeName() string {
return "state.ChangeMessage"
}
// EventTypeName implements ebu's TypeNamer interface for ControlMessage.
// This allows ControlMessage to be published directly to an EventBus.
func (m ControlMessage) EventTypeName() string {
return "state.ControlMessage"
}
package state
import "time"
// ChangeOption configures a change message.
type ChangeOption func(*changeConfig)
type changeConfig struct {
txID string
timestamp *time.Time
autoTimestamp bool
entityType string
}
// WithTxID sets the transaction ID for grouping related changes.
// Transaction IDs allow consumers to process related changes atomically.
func WithTxID(txID string) ChangeOption {
return func(c *changeConfig) {
c.txID = txID
}
}
// WithTimestamp sets an explicit timestamp for the change message.
// The timestamp will be formatted as RFC 3339.
func WithTimestamp(t time.Time) ChangeOption {
return func(c *changeConfig) {
c.timestamp = &t
}
}
// WithAutoTimestamp automatically sets the timestamp to the current time.
func WithAutoTimestamp() ChangeOption {
return func(c *changeConfig) {
c.autoTimestamp = true
}
}
// WithEntityType overrides the automatic entity type name.
// Use this when the type name should differ from the Go type name.
func WithEntityType(typeName string) ChangeOption {
return func(c *changeConfig) {
c.entityType = typeName
}
}
// MaterializerOption configures a Materializer.
type MaterializerOption func(*materializerConfig)
type materializerConfig struct {
onReset func()
onSnapshot func(start bool)
onError func(error)
strictSchema bool
}
// WithOnReset sets a callback invoked when a reset control message is received.
// The callback is called after all collections have been cleared.
func WithOnReset(fn func()) MaterializerOption {
return func(c *materializerConfig) {
c.onReset = fn
}
}
// WithOnSnapshot sets a callback invoked on snapshot-start/end messages.
// The boolean parameter is true for snapshot-start, false for snapshot-end.
func WithOnSnapshot(fn func(start bool)) MaterializerOption {
return func(c *materializerConfig) {
c.onSnapshot = fn
}
}
// WithOnError sets an error handler for materialization errors.
// It is invoked exactly once for every failed application: envelope or
// change-message decode failures, unknown entity types in strict mode,
// unknown operations, and collection apply errors.
func WithOnError(fn func(error)) MaterializerOption {
return func(c *materializerConfig) {
c.onError = fn
}
}
// WithStrictSchema enables strict schema validation.
// When enabled, the materializer returns an error for unknown entity types.
// When disabled (default), unknown types are silently ignored.
func WithStrictSchema() MaterializerOption {
return func(c *materializerConfig) {
c.strictSchema = true
}
}
package eventbus
import (
"encoding/json"
"fmt"
"reflect"
"sync"
)
// UpcastFunc transforms event data from one version to another.
// It receives the raw JSON data and returns transformed data with the new type name.
type UpcastFunc func(data json.RawMessage) (json.RawMessage, string, error)
// UpcastErrorHandler is called when an upcast operation fails
type UpcastErrorHandler func(eventType string, data json.RawMessage, err error)
// Upcaster represents a transformation from one event type to another
type Upcaster struct {
FromType string // Source event type
ToType string // Target event type
Upcast UpcastFunc // Transformation function
}
// upcastRegistry manages all registered upcasters
type upcastRegistry struct {
upcasters map[string][]Upcaster // Map from source type to list of upcasters
mu sync.RWMutex
errorHandler UpcastErrorHandler
}
// newUpcastRegistry creates a new upcast registry
func newUpcastRegistry() *upcastRegistry {
return &upcastRegistry{
upcasters: make(map[string][]Upcaster),
}
}
// register adds an upcaster to the registry
func (r *upcastRegistry) register(fromType, toType string, upcast UpcastFunc) error {
if fromType == "" || toType == "" {
return fmt.Errorf("eventbus: upcast types cannot be empty")
}
if fromType == toType {
return fmt.Errorf("eventbus: cannot upcast type to itself")
}
if upcast == nil {
return fmt.Errorf("eventbus: upcast function cannot be nil")
}
r.mu.Lock()
defer r.mu.Unlock()
// Check for circular dependencies
if r.wouldCreateCycle(fromType, toType) {
return fmt.Errorf("eventbus: upcast would create circular dependency")
}
upcaster := Upcaster{
FromType: fromType,
ToType: toType,
Upcast: upcast,
}
r.upcasters[fromType] = append(r.upcasters[fromType], upcaster)
return nil
}
// wouldCreateCycle checks if adding an upcast would create a circular dependency
func (r *upcastRegistry) wouldCreateCycle(fromType, toType string) bool {
// Check if there's already a path from toType back to fromType
visited := make(map[string]bool)
return r.hasCycleDFS(toType, fromType, visited)
}
// hasCycleDFS performs depth-first search to detect cycles
func (r *upcastRegistry) hasCycleDFS(current, target string, visited map[string]bool) bool {
if current == target {
return true
}
if visited[current] {
return false
}
visited[current] = true
for _, upcaster := range r.upcasters[current] {
if r.hasCycleDFS(upcaster.ToType, target, visited) {
return true
}
}
return false
}
// apply attempts to apply upcasts to transform data to the latest version.
//
// The registry lock is only held for map lookups, never while a user upcast
// function runs: upcast functions may therefore safely call back into the
// registry (e.g. RegisterUpcast) without deadlocking. A registration that
// races with apply may or may not be observed by the in-flight chain.
func (r *upcastRegistry) apply(data json.RawMessage, eventType string) (json.RawMessage, string, error) {
currentData := data
currentType := eventType
appliedTypes := make(map[string]bool) // Prevent infinite loops
for {
// Mark this type as processed
appliedTypes[currentType] = true
// Find the next upcaster and snapshot the error handler under the
// read lock; user code runs after the lock is released.
r.mu.RLock()
var upcaster Upcaster
found := false
if ups := r.upcasters[currentType]; len(ups) > 0 {
// Apply the first available upcaster (could be enhanced to choose best path)
upcaster = ups[0]
found = true
}
errorHandler := r.errorHandler
r.mu.RUnlock()
if !found {
break // No more upcasts available
}
// Check for loops
if appliedTypes[upcaster.ToType] {
return data, eventType, fmt.Errorf("eventbus: upcast loop detected")
}
// Apply the upcast
newData, newType, err := upcaster.Upcast(currentData)
if err != nil {
if errorHandler != nil {
errorHandler(currentType, currentData, err)
}
return data, eventType, fmt.Errorf("eventbus: upcast failed from %s to %s: %w",
upcaster.FromType, upcaster.ToType, err)
}
currentData = newData
currentType = newType
}
return currentData, currentType, nil
}
// clear removes all registered upcasters
func (r *upcastRegistry) clear() {
r.mu.Lock()
defer r.mu.Unlock()
r.upcasters = make(map[string][]Upcaster)
}
// clearType removes all upcasters for a specific source type
func (r *upcastRegistry) clearType(eventType string) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.upcasters, eventType)
}
// RegisterUpcast registers a type-safe upcast function.
//
// From and To must be concrete types. Interface types are rejected with an
// error: upcasts are keyed by type name, and events are stored under their
// concrete type's name (or TypeNamer name), so an upcast keyed by an
// interface type name could never match a stored event — a silently dead
// registration.
func RegisterUpcast[From any, To any](bus *EventBus, upcast func(From) To) error {
if bus == nil {
return fmt.Errorf("eventbus: bus cannot be nil")
}
if upcast == nil {
return fmt.Errorf("eventbus: upcast function cannot be nil")
}
from := reflect.TypeOf((*From)(nil)).Elem()
to := reflect.TypeOf((*To)(nil)).Elem()
if from.Kind() == reflect.Interface || to.Kind() == reflect.Interface {
return fmt.Errorf("eventbus: cannot register upcast between interface types (%s -> %s): events are stored under concrete type names, so an interface-keyed upcast would never match", from, to)
}
fromType := from.String()
toType := to.String()
upcastFunc := func(data json.RawMessage) (json.RawMessage, string, error) {
var from From
if err := json.Unmarshal(data, &from); err != nil {
return nil, "", fmt.Errorf("unmarshal source: %w", err)
}
to := upcast(from)
newData, err := json.Marshal(to)
if err != nil {
return nil, "", fmt.Errorf("marshal target: %w", err)
}
return newData, toType, nil
}
return bus.upcastRegistry.register(fromType, toType, upcastFunc)
}
// RegisterUpcastFunc registers a raw upcast function for complex transformations
func RegisterUpcastFunc(bus *EventBus, fromType, toType string, upcast UpcastFunc) error {
if bus == nil {
return fmt.Errorf("eventbus: bus cannot be nil")
}
return bus.upcastRegistry.register(fromType, toType, upcast)
}
// WithUpcast adds an upcast function during bus creation.
//
// It panics if the registration is invalid (empty types, self-upcast, nil
// function, or a circular dependency): these are programming errors that
// would otherwise be silently ignored at startup. Use RegisterUpcastFunc if
// you need an error value instead.
func WithUpcast(fromType, toType string, upcast UpcastFunc) Option {
return func(bus *EventBus) {
if err := bus.upcastRegistry.register(fromType, toType, upcast); err != nil {
panic(fmt.Sprintf("eventbus: WithUpcast(%q, %q): %v", fromType, toType, err))
}
}
}
// setErrorHandler sets the error handler under the registry lock so it is
// safe to call concurrently with apply.
func (r *upcastRegistry) setErrorHandler(handler UpcastErrorHandler) {
r.mu.Lock()
defer r.mu.Unlock()
r.errorHandler = handler
}
// WithUpcastErrorHandler sets the error handler for upcast failures
func WithUpcastErrorHandler(handler UpcastErrorHandler) Option {
return func(bus *EventBus) {
bus.upcastRegistry.setErrorHandler(handler)
}
}
// SetUpcastErrorHandler sets the upcast error handler at runtime.
// Safe to call concurrently with replay/publish.
func (bus *EventBus) SetUpcastErrorHandler(handler UpcastErrorHandler) {
bus.upcastRegistry.setErrorHandler(handler)
}
// ClearUpcasts removes all registered upcasters
func (bus *EventBus) ClearUpcasts() {
bus.upcastRegistry.clear()
}
// ClearUpcastsForType removes all upcasters for a specific source type
func (bus *EventBus) ClearUpcastsForType(eventType string) {
bus.upcastRegistry.clearType(eventType)
}