package eventbus
import (
"context"
"fmt"
"hash/fnv"
"reflect"
"sync"
"sync/atomic"
"time"
)
// Handler is a generic event handler function
type Handler[T any] func(T)
// ContextHandler is a generic event handler function that accepts context
type ContextHandler[T any] func(context.Context, T)
// SubscribeOption configures a subscription
type SubscribeOption func(*internalHandler)
// internalHandler wraps a handler with metadata
type internalHandler struct {
handler any
handlerType reflect.Type
eventType reflect.Type
once bool
async bool
sequential bool
acceptsContext bool
filter any // Predicate function for filtering events
mu sync.Mutex
executed uint32 // For once handlers, atomically tracks if executed
}
// PanicHandler is called when a handler panics
type PanicHandler func(event any, handlerType reflect.Type, panicValue any)
// PersistenceErrorHandler is called when event persistence fails
type PersistenceErrorHandler func(event any, eventType reflect.Type, err error)
// PublishHook is called when an event is published
type PublishHook func(eventType reflect.Type, event any)
// PublishHookContext is called when an event is published (with context)
type PublishHookContext func(ctx context.Context, eventType reflect.Type, event any)
const numShards = 32 // Power of 2 for efficient modulo
// shard represents a single shard with its own mutex
type shard struct {
mu sync.RWMutex
handlers map[reflect.Type][]*internalHandler
}
// EventBus is a high-performance event bus with sharded locks
type EventBus struct {
shards [numShards]*shard
panicHandler PanicHandler
beforePublish PublishHook
afterPublish PublishHook
beforePublishCtx PublishHookContext
afterPublishCtx PublishHookContext
wg sync.WaitGroup
// Optional persistence fields (nil if not using persistence)
store EventStore
subscriptionStore SubscriptionStore
lastOffset Offset
storeMu sync.RWMutex
persistenceErrorHandler PersistenceErrorHandler
persistenceTimeout time.Duration
replayBatchSize int // Batch size for Replay (default: 100)
// Upcast registry for event migration
upcastRegistry *upcastRegistry
// Optional observability (metrics & tracing)
observability Observability
}
// TypeNamer is an optional interface that events can implement to provide
// their own type name. This gives explicit control over event type naming,
// which is useful for:
// - Stable type names across package refactoring
// - Custom versioning schemes (e.g., "UserCreated.v2")
// - Compatibility with external event stores
//
// If an event implements TypeNamer, EventType() will use the provided name
// instead of the reflection-based package-qualified name.
//
// Example:
//
// type UserCreatedEvent struct {
// UserID string
// }
//
// func (e UserCreatedEvent) EventTypeName() string {
// return "user.created.v1"
// }
type TypeNamer interface {
EventTypeName() string
}
// EventType returns the type name of an event.
// If the event implements TypeNamer, it returns the custom name.
// Otherwise, it returns the reflection-based package-qualified name.
//
// This is useful for comparing with StoredEvent.Type during replay.
//
// Example with reflection (default):
//
// eventType := EventType(MyEvent{})
// // Returns: "github.com/mypackage/MyEvent"
//
// Example with TypeNamer:
//
// type MyEvent struct{}
// func (e MyEvent) EventTypeName() string { return "my-event.v1" }
// eventType := EventType(MyEvent{})
// // Returns: "my-event.v1"
//
// Usage in replay:
//
// bus.Replay(ctx, 0, func(event *StoredEvent) error {
// if event.Type == EventType(MyEvent{}) {
// // Process MyEvent
// }
// return nil
// })
func EventType(event any) string {
// Check if event implements TypeNamer
if namer, ok := event.(TypeNamer); ok {
return namer.EventTypeName()
}
// Fall back to reflection-based name
return reflect.TypeOf(event).String()
}
// Observability is an optional interface for metrics and tracing.
// Implementations can track event publishing, handler execution, and errors.
//
// This interface is designed to be zero-cost when not used - if no
// observability is configured, there is no performance overhead.
//
// The context returned from each method can be used to propagate trace
// spans and other context-specific data through the event processing pipeline.
//
// Example implementation: see github.com/jilio/ebu/otel package for
// OpenTelemetry integration.
type Observability interface {
// OnPublishStart is called when an event is about to be published.
// Returns a context that will be passed to handlers and subsequent hooks.
// The event parameter allows implementations to extract custom attributes.
OnPublishStart(ctx context.Context, eventType string, event any) context.Context
// OnPublishComplete is called after all synchronous handlers complete.
// Note: This is called before async handlers complete.
OnPublishComplete(ctx context.Context, eventType string)
// OnHandlerStart is called before a handler executes.
// Returns a context for the handler execution.
OnHandlerStart(ctx context.Context, eventType string, async bool) context.Context
// OnHandlerComplete is called after a handler completes.
// The error parameter is non-nil if the handler panicked.
OnHandlerComplete(ctx context.Context, duration time.Duration, err error)
// OnPersistStart is called before persisting an event.
OnPersistStart(ctx context.Context, eventType string, position int64) context.Context
// OnPersistComplete is called after persisting an event.
OnPersistComplete(ctx context.Context, duration time.Duration, err error)
}
// Option is a function that configures the EventBus
type Option func(*EventBus)
// New creates a new EventBus with sharded locks for better performance
func New(opts ...Option) *EventBus {
bus := &EventBus{
upcastRegistry: newUpcastRegistry(),
}
// Initialize shards
for i := 0; i < numShards; i++ {
bus.shards[i] = &shard{
handlers: make(map[reflect.Type][]*internalHandler),
}
}
// Apply options
for _, opt := range opts {
opt(bus)
}
return bus
}
// getShard returns the shard for a given event type using FNV hash
func (bus *EventBus) getShard(eventType reflect.Type) *shard {
h := fnv.New32a()
h.Write([]byte(eventType.String()))
shardIndex := h.Sum32() & (numShards - 1) // Fast modulo for power of 2
return bus.shards[shardIndex]
}
// Subscribe registers a handler for events of type T
func Subscribe[T any](bus *EventBus, handler Handler[T], opts ...SubscribeOption) error {
if bus == nil {
return fmt.Errorf("eventbus: bus cannot be nil")
}
if handler == nil {
return fmt.Errorf("eventbus: handler cannot be nil")
}
eventType := reflect.TypeOf((*T)(nil)).Elem()
h := &internalHandler{
handler: handler,
handlerType: reflect.TypeOf(handler),
eventType: eventType,
acceptsContext: false,
}
for _, opt := range opts {
if opt == nil {
return fmt.Errorf("eventbus: subscribe option cannot be nil")
}
opt(h)
}
shard := bus.getShard(eventType)
shard.mu.Lock()
shard.handlers[eventType] = append(shard.handlers[eventType], h)
shard.mu.Unlock()
return nil
}
// SubscribeContext registers a context-aware handler for events of type T
func SubscribeContext[T any](bus *EventBus, handler ContextHandler[T], opts ...SubscribeOption) error {
if bus == nil {
return fmt.Errorf("eventbus: bus cannot be nil")
}
if handler == nil {
return fmt.Errorf("eventbus: handler cannot be nil")
}
eventType := reflect.TypeOf((*T)(nil)).Elem()
h := &internalHandler{
handler: handler,
handlerType: reflect.TypeOf(handler),
eventType: eventType,
acceptsContext: true,
}
for _, opt := range opts {
if opt == nil {
return fmt.Errorf("eventbus: subscribe option cannot be nil")
}
opt(h)
}
shard := bus.getShard(eventType)
shard.mu.Lock()
shard.handlers[eventType] = append(shard.handlers[eventType], h)
shard.mu.Unlock()
return nil
}
// Unsubscribe removes a handler for events of type T
func Unsubscribe[T any, H any](bus *EventBus, handler H) error {
eventType := reflect.TypeOf((*T)(nil)).Elem()
handlerPtr := reflect.ValueOf(handler).Pointer()
shard := bus.getShard(eventType)
shard.mu.Lock()
defer shard.mu.Unlock()
handlers := shard.handlers[eventType]
for i, h := range handlers {
if reflect.ValueOf(h.handler).Pointer() == handlerPtr {
// Remove handler efficiently
shard.handlers[eventType] = append(handlers[:i], handlers[i+1:]...)
return nil
}
}
return fmt.Errorf("handler not found")
}
// Publish publishes an event to all registered handlers
func Publish[T any](bus *EventBus, event T) {
PublishContext(bus, context.Background(), event)
}
// PublishContext publishes an event with context to all registered handlers
func PublishContext[T any](bus *EventBus, ctx context.Context, event T) {
eventType := reflect.TypeOf(event)
eventTypeName := EventType(event)
// Observability: Track publish start
if bus.observability != nil {
ctx = bus.observability.OnPublishStart(ctx, eventTypeName, event)
}
// Call before publish hooks
if bus.beforePublish != nil {
bus.beforePublish(eventType, event)
}
if bus.beforePublishCtx != nil {
bus.beforePublishCtx(ctx, eventType, event)
}
// Get handlers from appropriate shard
shard := bus.getShard(eventType)
shard.mu.RLock()
handlers := shard.handlers[eventType]
// Create a copy to avoid holding the lock during handler execution
handlersCopy := make([]*internalHandler, len(handlers))
copy(handlersCopy, handlers)
shard.mu.RUnlock()
// Execute handlers without holding the lock
var wg sync.WaitGroup
var onceHandlersToRemove []*internalHandler
for _, h := range handlersCopy {
// Check filter if present
if h.filter != nil {
if filterFunc, ok := h.filter.(func(T) bool); ok {
if !filterFunc(event) {
continue // Skip this handler as event doesn't match filter
}
}
}
// For once handlers, use CompareAndSwap to ensure atomic execution
if h.once {
if !atomic.CompareAndSwapUint32(&h.executed, 0, 1) {
continue // Already executed
}
// Mark for removal after execution
onceHandlersToRemove = append(onceHandlersToRemove, h)
}
if h.async {
wg.Add(1)
bus.wg.Add(1)
go func(handler *internalHandler) {
defer wg.Done()
defer bus.wg.Done()
// Check context before executing
select {
case <-ctx.Done():
return
default:
callHandlerWithContext(handler, ctx, event, bus.panicHandler, bus.observability, eventTypeName, true)
}
}(h)
} else {
// Check context cancellation for sync handlers too
select {
case <-ctx.Done():
continue // Skip if context cancelled
default:
callHandlerWithContext(h, ctx, event, bus.panicHandler, bus.observability, eventTypeName, false)
}
}
}
// Remove once handlers that were executed
if len(onceHandlersToRemove) > 0 {
shard.mu.Lock()
handlers := shard.handlers[eventType]
for _, onceHandler := range onceHandlersToRemove {
for i, h := range handlers {
if h == onceHandler {
handlers = append(handlers[:i], handlers[i+1:]...)
break
}
}
}
shard.handlers[eventType] = handlers
shard.mu.Unlock()
}
// For async handlers, we don't wait inline to avoid blocking
// Users can call bus.Wait() if they need to wait for completion
// Call after publish hooks
if bus.afterPublish != nil {
bus.afterPublish(eventType, event)
}
if bus.afterPublishCtx != nil {
bus.afterPublishCtx(ctx, eventType, event)
}
// Observability: Track publish complete (sync handlers done)
if bus.observability != nil {
bus.observability.OnPublishComplete(ctx, eventTypeName)
}
}
// Clear removes all handlers for events of type T
func Clear[T any](bus *EventBus) {
eventType := reflect.TypeOf((*T)(nil)).Elem()
shard := bus.getShard(eventType)
shard.mu.Lock()
delete(shard.handlers, eventType)
shard.mu.Unlock()
}
// ClearAll removes all handlers for all event types
func ClearAll(bus *EventBus) {
for i := 0; i < numShards; i++ {
bus.shards[i].mu.Lock()
bus.shards[i].handlers = make(map[reflect.Type][]*internalHandler)
bus.shards[i].mu.Unlock()
}
}
// HasHandlers checks if there are handlers for events of type T
func HasHandlers[T any](bus *EventBus) bool {
eventType := reflect.TypeOf((*T)(nil)).Elem()
shard := bus.getShard(eventType)
shard.mu.RLock()
defer shard.mu.RUnlock()
return len(shard.handlers[eventType]) > 0
}
// HandlerCount returns the number of handlers for events of type T
func HandlerCount[T any](bus *EventBus) int {
eventType := reflect.TypeOf((*T)(nil)).Elem()
shard := bus.getShard(eventType)
shard.mu.RLock()
defer shard.mu.RUnlock()
return len(shard.handlers[eventType])
}
// Wait blocks until all async handlers complete
func (bus *EventBus) Wait() {
bus.wg.Wait()
}
// callHandlerWithContext calls a handler with proper type checking and panic recovery
func callHandlerWithContext[T any](h *internalHandler, ctx context.Context, event T, panicHandler PanicHandler, obs Observability, eventTypeName string, async bool) {
start := time.Now()
var panicErr error
defer func() {
duration := time.Since(start)
if r := recover(); r != nil {
panicErr = fmt.Errorf("handler panic: %v", r)
if panicHandler != nil {
panicHandler(event, h.handlerType, r)
}
}
// Observability: Track handler complete
if obs != nil {
obs.OnHandlerComplete(ctx, duration, panicErr)
}
}()
// Observability: Track handler start
if obs != nil {
ctx = obs.OnHandlerStart(ctx, eventTypeName, async)
}
// Sequential handlers need locking
if h.sequential {
h.mu.Lock()
defer h.mu.Unlock()
}
// Try common handler types first for performance
switch fn := h.handler.(type) {
case Handler[T]:
fn(event)
case ContextHandler[T]:
fn(ctx, event)
case func(T):
fn(event)
case func(context.Context, T):
fn(ctx, event)
case func(any):
fn(event)
case func(context.Context, any):
fn(ctx, event)
case func(context.Context, any) error:
_ = fn(ctx, event)
default:
// Fallback to reflection for other types
handlerValue := reflect.ValueOf(h.handler)
eventValue := reflect.ValueOf(event)
if h.handlerType.Kind() == reflect.Func {
numIn := h.handlerType.NumIn()
switch numIn {
case 1:
handlerValue.Call([]reflect.Value{eventValue})
case 2:
handlerValue.Call([]reflect.Value{reflect.ValueOf(ctx), eventValue})
}
}
}
}
// Subscribe Options
// Once makes the handler execute only once
func Once() SubscribeOption {
return func(h *internalHandler) {
h.once = true
}
}
// Async makes the handler execute asynchronously
func Async() SubscribeOption {
return func(h *internalHandler) {
h.async = true
}
}
// Sequential ensures the handler executes sequentially (with mutex)
func Sequential() SubscribeOption {
return func(h *internalHandler) {
h.sequential = true
}
}
// WithFilter configures the handler to only receive events that match the predicate
func WithFilter[T any](predicate func(T) bool) SubscribeOption {
return func(h *internalHandler) {
h.filter = predicate
}
}
// Bus Options
// WithPanicHandler sets a panic handler for the event bus
func WithPanicHandler(handler PanicHandler) Option {
return func(bus *EventBus) {
bus.panicHandler = handler
}
}
// WithBeforePublish sets a hook that's called before publishing events
func WithBeforePublish(hook PublishHook) Option {
return func(bus *EventBus) {
bus.beforePublish = hook
}
}
// WithAfterPublish sets a hook that's called after publishing events
func WithAfterPublish(hook PublishHook) Option {
return func(bus *EventBus) {
bus.afterPublish = hook
}
}
// WithBeforePublishContext sets a context-aware hook that's called before publishing events
func WithBeforePublishContext(hook PublishHookContext) Option {
return func(bus *EventBus) {
bus.beforePublishCtx = hook
}
}
// WithAfterPublishContext sets a context-aware hook that's called after publishing events
func WithAfterPublishContext(hook PublishHookContext) Option {
return func(bus *EventBus) {
bus.afterPublishCtx = hook
}
}
// WithPersistenceErrorHandler sets the error handler for persistence failures
func WithPersistenceErrorHandler(handler PersistenceErrorHandler) Option {
return func(bus *EventBus) {
bus.persistenceErrorHandler = handler
}
}
// WithPersistenceTimeout sets the timeout for persistence operations
func WithPersistenceTimeout(timeout time.Duration) Option {
return func(bus *EventBus) {
bus.persistenceTimeout = timeout
}
}
// WithReplayBatchSize sets the batch size for Replay operations.
// This controls how many events are read at a time when using a store
// that doesn't implement EventStoreStreamer.
// Default is 100 if not set or set to 0.
func WithReplayBatchSize(size int) Option {
return func(bus *EventBus) {
bus.replayBatchSize = size
}
}
// WithObservability sets the observability implementation for metrics and tracing
func WithObservability(obs Observability) Option {
return func(bus *EventBus) {
bus.observability = obs
}
}
// Shutdown gracefully shuts down the event bus, waiting for async handlers to complete.
// It respects the context timeout/cancellation.
// If the store implements io.Closer, its Close() method will be called after handlers complete.
func (bus *EventBus) Shutdown(ctx context.Context) error {
done := make(chan struct{})
go func() {
bus.Wait()
close(done)
}()
select {
case <-done:
// Close store if it implements io.Closer
if bus.store != nil {
if closer, ok := bus.store.(interface{ Close() error }); ok {
if err := closer.Close(); err != nil {
return fmt.Errorf("failed to close store: %w", err)
}
}
}
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Backward compatibility methods
// SetPanicHandler sets the panic handler (for backward compatibility)
func (bus *EventBus) SetPanicHandler(handler PanicHandler) {
bus.panicHandler = handler
}
// SetBeforePublishHook sets the before publish hook (for backward compatibility)
func (bus *EventBus) SetBeforePublishHook(hook PublishHook) {
bus.beforePublish = hook
}
// SetAfterPublishHook sets the after publish hook (for backward compatibility)
func (bus *EventBus) SetAfterPublishHook(hook PublishHook) {
bus.afterPublish = hook
}
// SetPersistenceErrorHandler sets the persistence error handler (for runtime configuration)
func (bus *EventBus) SetPersistenceErrorHandler(handler PersistenceErrorHandler) {
bus.persistenceErrorHandler = handler
}
package eventbus
import (
"context"
"encoding/json"
"fmt"
"iter"
"reflect"
"sync"
"time"
)
// Offset represents an opaque position in an event stream.
// Implementations define the format (e.g., "123", "abc_456", timestamp-based).
// Offsets are lexicographically comparable within the same store.
type Offset string
const (
// OffsetOldest represents the beginning of the stream.
// When passed to Read, returns events from the start.
OffsetOldest Offset = ""
// OffsetNewest represents the current end of the stream.
// Useful for subscribing to only new events.
OffsetNewest Offset = "$"
)
// EventStore defines the core interface for persisting events.
// This is a minimal interface with just 2 methods for basic event storage.
// Additional capabilities are provided through optional interfaces.
type EventStore interface {
// Append stores an event and returns its assigned offset.
// The store is responsible for generating unique, monotonically increasing offsets.
Append(ctx context.Context, event *Event) (Offset, error)
// Read returns events starting after the given offset.
// Use OffsetOldest to read from the beginning.
// The limit parameter controls max events returned (0 = no limit).
// Returns the events, the offset to use for the next read, and any error.
Read(ctx context.Context, from Offset, limit int) ([]*StoredEvent, Offset, error)
}
// EventStoreStreamer is an optional interface for memory-efficient streaming.
// When implemented, the Replay method will automatically use streaming.
//
// Implementation notes:
// - Database-backed stores should use cursor-based iteration to minimize memory
// - In-memory stores may need to take a snapshot to avoid holding locks during iteration,
// trading memory for deadlock safety (see MemoryStore.ReadStream for an example)
type EventStoreStreamer interface {
// ReadStream returns an iterator yielding events starting after the given offset.
// Use OffsetOldest to read from the beginning.
// The iterator checks ctx.Done() before each yield and returns ctx.Err() when cancelled.
// A yielded error terminates iteration.
ReadStream(ctx context.Context, from Offset) iter.Seq2[*StoredEvent, error]
}
// EventStoreSubscriber is an optional interface for stores that support live subscriptions.
// This enables real-time event streaming for remote stores.
type EventStoreSubscriber interface {
// Subscribe starts a live subscription from the given offset.
// Returns a channel that yields events as they arrive.
// Call the returned cancel function to stop the subscription and close the channel.
Subscribe(ctx context.Context, from Offset) (<-chan *StoredEvent, func(), error)
}
// SubscriptionStore tracks subscription progress separately from event storage.
// This interface is optional and enables resumable subscriptions.
type SubscriptionStore interface {
// SaveOffset persists the current offset for a subscription.
SaveOffset(ctx context.Context, subscriptionID string, offset Offset) error
// LoadOffset retrieves the last saved offset for a subscription.
// Returns OffsetOldest if the subscription has no saved offset.
LoadOffset(ctx context.Context, subscriptionID string) (Offset, error)
}
// Event represents an event to be stored (before it has an offset).
type Event struct {
Type string `json:"type"`
Data json.RawMessage `json:"data"`
Timestamp time.Time `json:"timestamp"`
}
// StoredEvent represents an event that has been persisted with an offset.
type StoredEvent struct {
Offset Offset `json:"offset"`
Type string `json:"type"`
Data json.RawMessage `json:"data"`
Timestamp time.Time `json:"timestamp"`
}
// WithStore enables persistence with the given store
func WithStore(store EventStore) Option {
return func(bus *EventBus) {
bus.store = store
// Chain the persistence hook with any existing context-aware hook
existingHook := bus.beforePublishCtx
bus.beforePublishCtx = func(ctx context.Context, eventType reflect.Type, event any) {
// Call existing hook first if any
if existingHook != nil {
existingHook(ctx, eventType, event)
}
// Then persist the event
bus.persistEvent(ctx, eventType, event)
}
}
}
// WithSubscriptionStore enables subscription position tracking
func WithSubscriptionStore(store SubscriptionStore) Option {
return func(bus *EventBus) {
bus.subscriptionStore = store
}
}
// persistEvent saves an event to storage (only if store is configured)
func (bus *EventBus) persistEvent(ctx context.Context, eventType reflect.Type, event any) {
if bus.store == nil {
return // No persistence configured
}
// Marshal the event first
data, err := json.Marshal(event)
if err != nil {
if bus.persistenceErrorHandler != nil {
bus.persistenceErrorHandler(event, eventType, fmt.Errorf("failed to marshal event: %w", err))
}
return
}
// Use EventType() to respect TypeNamer interface if implemented
typeName := EventType(event)
toStore := &Event{
Type: typeName,
Data: data,
Timestamp: time.Now(),
}
// Apply timeout if configured
var cancel context.CancelFunc
if bus.persistenceTimeout > 0 {
ctx, cancel = context.WithTimeout(ctx, bus.persistenceTimeout)
defer cancel()
}
// Observability: Track persistence start
if bus.observability != nil {
ctx = bus.observability.OnPersistStart(ctx, typeName, 0)
}
start := time.Now()
// Append the event - the store assigns the offset
bus.storeMu.Lock()
offset, saveErr := bus.store.Append(ctx, toStore)
if saveErr == nil {
bus.lastOffset = offset
}
bus.storeMu.Unlock()
// Observability: Track persistence complete
if bus.observability != nil {
bus.observability.OnPersistComplete(ctx, time.Since(start), saveErr)
}
if saveErr != nil {
if bus.persistenceErrorHandler != nil {
bus.persistenceErrorHandler(event, eventType, fmt.Errorf("failed to save event: %w", saveErr))
}
}
}
// Replay replays events from an offset
func (bus *EventBus) Replay(ctx context.Context, from Offset, handler func(*StoredEvent) error) error {
if bus.store == nil {
return fmt.Errorf("replay requires persistence (use WithStore option)")
}
// Use streaming if available for memory efficiency
if streamer, ok := bus.store.(EventStoreStreamer); ok {
for event, err := range streamer.ReadStream(ctx, from) {
if err != nil {
return fmt.Errorf("stream events: %w", err)
}
if err := handler(event); err != nil {
return fmt.Errorf("handle event at offset %s: %w", event.Offset, err)
}
}
return nil
}
// Fallback to Read for stores that don't support streaming
batchSize := bus.replayBatchSize
if batchSize <= 0 {
batchSize = 100 // Default batch size
}
offset := from
for {
// Check context cancellation before each batch
select {
case <-ctx.Done():
return ctx.Err()
default:
}
events, nextOffset, err := bus.store.Read(ctx, offset, batchSize)
if err != nil {
return fmt.Errorf("read events: %w", err)
}
if len(events) == 0 {
break
}
for _, event := range events {
if err := handler(event); err != nil {
return fmt.Errorf("handle event at offset %s: %w", event.Offset, err)
}
}
// Protect against infinite loop if offset doesn't advance
if nextOffset == offset {
return fmt.Errorf("store returned non-advancing offset %s: possible bug in EventStore.Read implementation", offset)
}
offset = nextOffset
}
return nil
}
// ReplayWithUpcast replays events from an offset, applying upcasts before passing to handler
func (bus *EventBus) ReplayWithUpcast(ctx context.Context, from Offset, handler func(*StoredEvent) error) error {
return bus.Replay(ctx, from, func(event *StoredEvent) error {
// Apply upcasts if available
if bus.upcastRegistry != nil {
upcastedData, upcastedType, err := bus.upcastRegistry.apply(event.Data, event.Type)
if err == nil {
// Create a new StoredEvent with upcasted data
upcastedEvent := &StoredEvent{
Offset: event.Offset,
Type: upcastedType,
Data: upcastedData,
Timestamp: event.Timestamp,
}
return handler(upcastedEvent)
}
// If upcast fails, pass original event
}
return handler(event)
})
}
// IsPersistent returns true if persistence is enabled
func (bus *EventBus) IsPersistent() bool {
return bus.store != nil
}
// GetStore returns the event store (or nil if not persistent)
func (bus *EventBus) GetStore() EventStore {
return bus.store
}
// SubscribeWithReplay subscribes and replays missed events.
// Requires both an EventStore (for replay) and a SubscriptionStore (for tracking).
// If the store implements SubscriptionStore, it will be used automatically.
//
// Context usage:
// - The context is used for the replay phase (loading historical events)
// - The context is used for saving subscription offsets
// - The context is NOT used for the live subscription handler (which follows the bus's lifecycle)
//
// Note: If the saved offset is OffsetOldest (""), replay starts from the beginning.
// The OffsetNewest ("$") constant is typically not stored and is only used for live subscriptions.
func SubscribeWithReplay[T any](
ctx context.Context,
bus *EventBus,
subscriptionID string,
handler Handler[T],
opts ...SubscribeOption,
) error {
if bus.store == nil {
return fmt.Errorf("SubscribeWithReplay requires persistence (use WithStore option)")
}
// Get subscription store - either explicit or from the event store
subStore := bus.subscriptionStore
if subStore == nil {
if ss, ok := bus.store.(SubscriptionStore); ok {
subStore = ss
} else {
return fmt.Errorf("SubscribeWithReplay requires a SubscriptionStore (use WithSubscriptionStore option or use a store that implements SubscriptionStore)")
}
}
// Load last offset for this subscription
lastOffset, _ := subStore.LoadOffset(ctx, subscriptionID)
// Replay missed events
var eventType = reflect.TypeOf((*T)(nil)).Elem()
// Use consistent type naming with EventType() function
typeName := eventType.String()
err := bus.Replay(ctx, lastOffset, func(stored *StoredEvent) error {
// Apply upcasts if available
eventData, eventTypeName := stored.Data, stored.Type
if bus.upcastRegistry != nil {
upcastedData, upcastedType, err := bus.upcastRegistry.apply(eventData, eventTypeName)
if err == nil {
eventData = upcastedData
eventTypeName = upcastedType
}
// Continue even if upcast fails, let the type check handle it
}
// Only replay events of the correct type
if eventTypeName != typeName {
return nil
}
var event T
if err := json.Unmarshal(eventData, &event); err != nil {
return err
}
handler(event)
// Update offset
subStore.SaveOffset(ctx, subscriptionID, stored.Offset)
return nil
})
if err != nil {
return fmt.Errorf("replay events: %w", err)
}
// Subscribe for future events with offset tracking
wrappedHandler := func(event T) {
handler(event)
// Update offset after handling
bus.storeMu.RLock()
offset := bus.lastOffset
bus.storeMu.RUnlock()
subStore.SaveOffset(ctx, subscriptionID, offset)
}
return Subscribe(bus, wrappedHandler, opts...)
}
// MemoryStore is a simple in-memory implementation of EventStore and SubscriptionStore.
type MemoryStore struct {
events []*StoredEvent
subscriptions map[string]Offset
nextOffset int64
mu sync.RWMutex
}
// Ensure MemoryStore implements all required interfaces
var _ EventStore = (*MemoryStore)(nil)
var _ EventStoreStreamer = (*MemoryStore)(nil)
var _ SubscriptionStore = (*MemoryStore)(nil)
// NewMemoryStore creates a new in-memory event store
func NewMemoryStore() *MemoryStore {
return &MemoryStore{
events: make([]*StoredEvent, 0),
subscriptions: make(map[string]Offset),
}
}
// Append implements EventStore
func (m *MemoryStore) Append(ctx context.Context, event *Event) (Offset, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.nextOffset++
// Use zero-padded format for correct lexicographic ordering
offset := Offset(fmt.Sprintf("%020d", m.nextOffset))
stored := &StoredEvent{
Offset: offset,
Type: event.Type,
Data: event.Data,
Timestamp: event.Timestamp,
}
m.events = append(m.events, stored)
return offset, nil
}
// Read implements EventStore
func (m *MemoryStore) Read(ctx context.Context, from Offset, limit int) ([]*StoredEvent, Offset, error) {
m.mu.RLock()
defer m.mu.RUnlock()
var result []*StoredEvent
var lastOffset Offset = from
for _, event := range m.events {
// Include events after the given offset
if from == OffsetOldest || event.Offset > from {
result = append(result, event)
lastOffset = event.Offset
if limit > 0 && len(result) >= limit {
break
}
}
}
return result, lastOffset, nil
}
// SaveOffset implements SubscriptionStore
func (m *MemoryStore) SaveOffset(ctx context.Context, subscriptionID string, offset Offset) error {
m.mu.Lock()
defer m.mu.Unlock()
m.subscriptions[subscriptionID] = offset
return nil
}
// LoadOffset implements SubscriptionStore
func (m *MemoryStore) LoadOffset(ctx context.Context, subscriptionID string) (Offset, error) {
m.mu.RLock()
defer m.mu.RUnlock()
offset, ok := m.subscriptions[subscriptionID]
if !ok {
return OffsetOldest, nil
}
return offset, nil
}
// ReadStream implements EventStoreStreamer for memory-efficient event iteration.
// Note: This takes a filtered snapshot of matching events to avoid holding the lock
// during iteration, which could cause deadlocks if handlers call other store methods.
func (m *MemoryStore) ReadStream(ctx context.Context, from Offset) iter.Seq2[*StoredEvent, error] {
return func(yield func(*StoredEvent, error) bool) {
// Take a filtered snapshot to avoid holding lock during iteration
m.mu.RLock()
var events []*StoredEvent
for _, event := range m.events {
if from == OffsetOldest || event.Offset > from {
events = append(events, event)
}
}
m.mu.RUnlock()
for _, event := range events {
// Check context cancellation
select {
case <-ctx.Done():
yield(nil, ctx.Err())
return
default:
}
if !yield(event, nil) {
return // Consumer stopped iteration
}
}
}
}
package state
import (
"encoding/json"
"fmt"
"time"
)
// Insert creates an insert change message for an entity.
// The type parameter T determines the entity type name (unless overridden with WithEntityType).
//
// Example:
//
// msg, err := state.Insert("user:1", User{Name: "Alice"})
func Insert[T any](key string, value T, opts ...ChangeOption) (*ChangeMessage, error) {
return newChangeMessage[T](OperationInsert, key, &value, nil, opts...)
}
// Update creates an update change message for an entity.
// The type parameter T determines the entity type name (unless overridden with WithEntityType).
//
// Example:
//
// msg, err := state.Update("user:1", User{Name: "Alice Smith"})
func Update[T any](key string, value T, opts ...ChangeOption) (*ChangeMessage, error) {
return newChangeMessage[T](OperationUpdate, key, &value, nil, opts...)
}
// UpdateWithOldValue creates an update change message with the old value for conflict detection.
// The old value can be used by consumers to detect concurrent modifications.
//
// Example:
//
// msg, err := state.UpdateWithOldValue("user:1", newUser, oldUser)
func UpdateWithOldValue[T any](key string, value, oldValue T, opts ...ChangeOption) (*ChangeMessage, error) {
return newChangeMessage[T](OperationUpdate, key, &value, &oldValue, opts...)
}
// Delete creates a delete change message for an entity.
// The type parameter T determines the entity type name.
//
// Example:
//
// msg, err := state.Delete[User]("user:1")
func Delete[T any](key string, opts ...ChangeOption) (*ChangeMessage, error) {
return newChangeMessage[T](OperationDelete, key, nil, nil, opts...)
}
// DeleteWithOldValue creates a delete change message with the old value preserved.
// This is useful for consumers that need to know what was deleted.
//
// Example:
//
// msg, err := state.DeleteWithOldValue("user:1", user)
func DeleteWithOldValue[T any](key string, oldValue T, opts ...ChangeOption) (*ChangeMessage, error) {
return newChangeMessage[T](OperationDelete, key, nil, &oldValue, opts...)
}
// newChangeMessage is the internal constructor for change messages.
func newChangeMessage[T any](op Operation, key string, value, oldValue *T, opts ...ChangeOption) (*ChangeMessage, error) {
if key == "" {
return nil, fmt.Errorf("state: key cannot be empty")
}
cfg := &changeConfig{}
for _, opt := range opts {
opt(cfg)
}
// Determine entity type
var zero T
entityType := EntityType(zero)
if cfg.entityType != "" {
entityType = cfg.entityType
}
msg := &ChangeMessage{
Type: entityType,
Key: key,
Headers: Headers{
Operation: op,
TxID: cfg.txID,
},
}
// Set timestamp
if cfg.timestamp != nil {
msg.Headers.Timestamp = cfg.timestamp.Format(time.RFC3339Nano)
} else if cfg.autoTimestamp {
msg.Headers.Timestamp = time.Now().UTC().Format(time.RFC3339Nano)
}
// Marshal value for insert/update
if value != nil {
valueData, err := json.Marshal(value)
if err != nil {
return nil, fmt.Errorf("state: marshal value: %w", err)
}
msg.Value = valueData
}
// Marshal old value if provided
if oldValue != nil {
oldData, err := json.Marshal(oldValue)
if err != nil {
return nil, fmt.Errorf("state: marshal old_value: %w", err)
}
msg.OldValue = oldData
}
return msg, nil
}
// SnapshotStart creates a snapshot-start control message.
// This marks the beginning of a snapshot in the stream.
func SnapshotStart(offset string) *ControlMessage {
return &ControlMessage{
Headers: ControlHeaders{
Control: ControlSnapshotStart,
Offset: offset,
},
}
}
// SnapshotEnd creates a snapshot-end control message.
// This marks the end of a snapshot in the stream.
func SnapshotEnd(offset string) *ControlMessage {
return &ControlMessage{
Headers: ControlHeaders{
Control: ControlSnapshotEnd,
Offset: offset,
},
}
}
// Reset creates a reset control message.
// This signals that all state should be cleared and rebuilt from subsequent events.
func Reset(offset string) *ControlMessage {
return &ControlMessage{
Headers: ControlHeaders{
Control: ControlReset,
Offset: offset,
},
}
}
package state
import (
"context"
"encoding/json"
"fmt"
"sync"
eventbus "github.com/jilio/ebu"
)
// Store is a generic interface for state storage.
// Implementations can use any backing store (memory, database, etc.).
type Store[T any] interface {
// Get retrieves an entity by its composite key.
Get(compositeKey string) (T, bool)
// Set stores an entity with the given composite key.
Set(compositeKey string, value T)
// Delete removes an entity by its composite key.
Delete(compositeKey string)
// Clear removes all entities from the store.
Clear()
// All returns a copy of all entities in the store.
All() map[string]T
}
// MemoryStore is an in-memory implementation of Store.
// It is safe for concurrent access.
type MemoryStore[T any] struct {
data map[string]T
mu sync.RWMutex
}
// NewMemoryStore creates a new in-memory store.
func NewMemoryStore[T any]() *MemoryStore[T] {
return &MemoryStore[T]{
data: make(map[string]T),
}
}
// Get retrieves an entity by its composite key.
func (s *MemoryStore[T]) Get(key string) (T, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
v, ok := s.data[key]
return v, ok
}
// Set stores an entity with the given composite key.
func (s *MemoryStore[T]) Set(key string, value T) {
s.mu.Lock()
defer s.mu.Unlock()
s.data[key] = value
}
// Delete removes an entity by its composite key.
func (s *MemoryStore[T]) Delete(key string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.data, key)
}
// Clear removes all entities from the store.
func (s *MemoryStore[T]) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
s.data = make(map[string]T)
}
// All returns a copy of all entities in the store.
func (s *MemoryStore[T]) All() map[string]T {
s.mu.RLock()
defer s.mu.RUnlock()
result := make(map[string]T, len(s.data))
for k, v := range s.data {
result[k] = v
}
return result
}
// TypedCollection provides type-safe access to a collection of entities.
// It wraps a Store and associates it with a specific entity type.
type TypedCollection[T any] struct {
store Store[T]
entityType string
}
// NewTypedCollection creates a collection for a specific entity type.
// The entity type name is determined from the type parameter T.
func NewTypedCollection[T any](store Store[T]) *TypedCollection[T] {
var zero T
return &TypedCollection[T]{
store: store,
entityType: EntityType(zero),
}
}
// NewTypedCollectionWithType creates a collection with an explicit type name.
// Use this when the type name should differ from the Go type name.
func NewTypedCollectionWithType[T any](store Store[T], entityType string) *TypedCollection[T] {
return &TypedCollection[T]{
store: store,
entityType: entityType,
}
}
// EntityType returns the entity type name for this collection.
func (c *TypedCollection[T]) EntityType() string {
return c.entityType
}
// Get retrieves an entity by its key (without the type prefix).
func (c *TypedCollection[T]) Get(key string) (T, bool) {
return c.store.Get(CompositeKey(c.entityType, key))
}
// All returns all entities in this collection.
// The keys in the returned map are composite keys (type/key).
func (c *TypedCollection[T]) All() map[string]T {
return c.store.All()
}
// collectionApplier is an internal interface for applying changes to collections.
type collectionApplier interface {
applyChange(msg *ChangeMessage) error
clear()
}
// typedCollectionApplier wraps TypedCollection to implement collectionApplier.
type typedCollectionApplier[T any] struct {
collection *TypedCollection[T]
}
func (a *typedCollectionApplier[T]) applyChange(msg *ChangeMessage) error {
key := CompositeKey(msg.Type, msg.Key)
switch msg.Headers.Operation {
case OperationInsert, OperationUpdate:
var value T
if err := json.Unmarshal(msg.Value, &value); err != nil {
return fmt.Errorf("state: unmarshal value for %s/%s: %w", msg.Type, msg.Key, err)
}
a.collection.store.Set(key, value)
case OperationDelete:
a.collection.store.Delete(key)
}
return nil
}
func (a *typedCollectionApplier[T]) clear() {
a.collection.store.Clear()
}
// Materializer processes state protocol messages and maintains state.
// It applies change messages to registered collections and handles control messages.
type Materializer struct {
collections map[string]collectionApplier
cfg *materializerConfig
mu sync.RWMutex
lastOffset eventbus.Offset
}
// NewMaterializer creates a new Materializer.
func NewMaterializer(opts ...MaterializerOption) *Materializer {
cfg := &materializerConfig{}
for _, opt := range opts {
opt(cfg)
}
return &Materializer{
collections: make(map[string]collectionApplier),
cfg: cfg,
}
}
// RegisterCollection registers a typed collection with the materializer.
// The collection's entity type determines which change messages are routed to it.
func RegisterCollection[T any](m *Materializer, collection *TypedCollection[T]) {
m.mu.Lock()
defer m.mu.Unlock()
m.collections[collection.EntityType()] = &typedCollectionApplier[T]{collection: collection}
}
// Apply processes a single StoredEvent containing a state protocol message.
// This is the primary integration point with ebu's Replay functionality.
//
// The event's Data field should contain a JSON-encoded ChangeMessage or ControlMessage.
func (m *Materializer) Apply(event *eventbus.StoredEvent) error {
// Determine message type by checking for control header
var raw struct {
Headers json.RawMessage `json:"headers"`
}
if err := json.Unmarshal(event.Data, &raw); err != nil {
return fmt.Errorf("state: unmarshal event: %w", err)
}
// Try to parse as control message first
var ctrlHeaders ControlHeaders
if json.Unmarshal(raw.Headers, &ctrlHeaders) == nil && ctrlHeaders.Control != "" {
m.applyControl(&ControlMessage{Headers: ctrlHeaders})
m.mu.Lock()
m.lastOffset = event.Offset
m.mu.Unlock()
return nil
}
// Parse as change message
var changeMsg ChangeMessage
if err := json.Unmarshal(event.Data, &changeMsg); err != nil {
return fmt.Errorf("state: unmarshal change message: %w", err)
}
if err := m.applyChange(&changeMsg); err != nil {
return err
}
m.mu.Lock()
m.lastOffset = event.Offset
m.mu.Unlock()
return nil
}
// ApplyChangeMessage processes a ChangeMessage directly.
// Use this when you have a ChangeMessage that's not wrapped in a StoredEvent.
func (m *Materializer) ApplyChangeMessage(msg *ChangeMessage) error {
return m.applyChange(msg)
}
// ApplyControlMessage processes a ControlMessage directly.
// Use this when you have a ControlMessage that's not wrapped in a StoredEvent.
func (m *Materializer) ApplyControlMessage(msg *ControlMessage) {
m.applyControl(msg)
}
// applyChange applies a change message to the appropriate collection.
func (m *Materializer) applyChange(msg *ChangeMessage) error {
m.mu.RLock()
collection, ok := m.collections[msg.Type]
m.mu.RUnlock()
if !ok {
if m.cfg.strictSchema {
return fmt.Errorf("state: unknown entity type: %s", msg.Type)
}
return nil // Ignore unknown types in non-strict mode
}
if err := collection.applyChange(msg); err != nil {
if m.cfg.onError != nil {
m.cfg.onError(err)
}
return err
}
return nil
}
// applyControl applies a control message.
func (m *Materializer) applyControl(msg *ControlMessage) {
switch msg.Headers.Control {
case ControlReset:
m.mu.Lock()
for _, c := range m.collections {
c.clear()
}
m.mu.Unlock()
if m.cfg.onReset != nil {
m.cfg.onReset()
}
case ControlSnapshotStart:
if m.cfg.onSnapshot != nil {
m.cfg.onSnapshot(true)
}
case ControlSnapshotEnd:
if m.cfg.onSnapshot != nil {
m.cfg.onSnapshot(false)
}
}
}
// LastOffset returns the offset of the last applied event.
func (m *Materializer) LastOffset() eventbus.Offset {
m.mu.RLock()
defer m.mu.RUnlock()
return m.lastOffset
}
// Replay is a convenience method that replays events from an EventBus.
// It calls bus.Replay and applies each event through the materializer.
func (m *Materializer) Replay(ctx context.Context, bus *eventbus.EventBus, from eventbus.Offset) error {
return bus.Replay(ctx, from, m.Apply)
}
package state
import (
"encoding/json"
"reflect"
)
// Operation represents the type of change operation per the State Protocol.
type Operation string
const (
// OperationInsert indicates a new entity is being created.
OperationInsert Operation = "insert"
// OperationUpdate indicates an existing entity is being modified.
OperationUpdate Operation = "update"
// OperationDelete indicates an entity is being removed.
OperationDelete Operation = "delete"
)
// Control represents the type of control message per the State Protocol.
type Control string
const (
// ControlSnapshotStart marks the beginning of a snapshot.
ControlSnapshotStart Control = "snapshot-start"
// ControlSnapshotEnd marks the end of a snapshot.
ControlSnapshotEnd Control = "snapshot-end"
// ControlReset signals that all state should be cleared.
ControlReset Control = "reset"
)
// Headers contains metadata for change messages per the State Protocol.
type Headers struct {
// Operation is the type of change (insert, update, delete).
Operation Operation `json:"operation"`
// TxID is an optional transaction identifier for grouping related changes.
TxID string `json:"txid,omitempty"`
// Timestamp is an optional RFC 3339 formatted timestamp.
Timestamp string `json:"timestamp,omitempty"`
}
// ControlHeaders contains metadata for control messages.
type ControlHeaders struct {
// Control is the type of control message.
Control Control `json:"control"`
// Offset is an optional reference to a stream position.
Offset string `json:"offset,omitempty"`
}
// ChangeMessage represents a state change event per the State Protocol.
// It contains an entity mutation (insert, update, or delete) with a composite key.
type ChangeMessage struct {
// Type is the entity type discriminator (e.g., "user", "order").
Type string `json:"type"`
// Key is the unique identifier within the entity type.
Key string `json:"key"`
// Value contains the entity data (required for insert/update).
Value json.RawMessage `json:"value,omitempty"`
// OldValue contains the previous entity data (optional, for conflict detection).
OldValue json.RawMessage `json:"old_value,omitempty"`
// Headers contains operation metadata.
Headers Headers `json:"headers"`
}
// ControlMessage represents a control event per the State Protocol.
// Control messages manage stream lifecycle (snapshots, resets).
type ControlMessage struct {
// Headers contains control message metadata.
Headers ControlHeaders `json:"headers"`
}
// TypeNamer is an optional interface that entity types can implement to provide
// their own type name. This mirrors ebu's TypeNamer pattern.
//
// Example:
//
// type User struct { ... }
// func (u User) StateTypeName() string { return "user" }
type TypeNamer interface {
StateTypeName() string
}
// EntityType returns the type name for an entity.
// If the entity implements TypeNamer, returns the custom name.
// Otherwise returns the reflect-based package-qualified name.
// Returns "nil" if entity is nil.
func EntityType(entity any) string {
if entity == nil {
return "nil"
}
if namer, ok := entity.(TypeNamer); ok {
return namer.StateTypeName()
}
return reflect.TypeOf(entity).String()
}
// CompositeKey returns a composite key from type and key components.
// The State Protocol uses type + key as a composite identifier.
func CompositeKey(entityType, key string) string {
return entityType + "/" + key
}
// EventTypeName implements ebu's TypeNamer interface for ChangeMessage.
// This allows ChangeMessage to be published directly to an EventBus.
func (m ChangeMessage) EventTypeName() string {
return "state.ChangeMessage"
}
// EventTypeName implements ebu's TypeNamer interface for ControlMessage.
// This allows ControlMessage to be published directly to an EventBus.
func (m ControlMessage) EventTypeName() string {
return "state.ControlMessage"
}
package state
import "time"
// ChangeOption configures a change message.
type ChangeOption func(*changeConfig)
type changeConfig struct {
txID string
timestamp *time.Time
autoTimestamp bool
entityType string
}
// WithTxID sets the transaction ID for grouping related changes.
// Transaction IDs allow consumers to process related changes atomically.
func WithTxID(txID string) ChangeOption {
return func(c *changeConfig) {
c.txID = txID
}
}
// WithTimestamp sets an explicit timestamp for the change message.
// The timestamp will be formatted as RFC 3339.
func WithTimestamp(t time.Time) ChangeOption {
return func(c *changeConfig) {
c.timestamp = &t
}
}
// WithAutoTimestamp automatically sets the timestamp to the current time.
func WithAutoTimestamp() ChangeOption {
return func(c *changeConfig) {
c.autoTimestamp = true
}
}
// WithEntityType overrides the automatic entity type name.
// Use this when the type name should differ from the Go type name.
func WithEntityType(typeName string) ChangeOption {
return func(c *changeConfig) {
c.entityType = typeName
}
}
// MaterializerOption configures a Materializer.
type MaterializerOption func(*materializerConfig)
type materializerConfig struct {
onReset func()
onSnapshot func(start bool)
onError func(error)
strictSchema bool
}
// WithOnReset sets a callback invoked when a reset control message is received.
// The callback is called after all collections have been cleared.
func WithOnReset(fn func()) MaterializerOption {
return func(c *materializerConfig) {
c.onReset = fn
}
}
// WithOnSnapshot sets a callback invoked on snapshot-start/end messages.
// The boolean parameter is true for snapshot-start, false for snapshot-end.
func WithOnSnapshot(fn func(start bool)) MaterializerOption {
return func(c *materializerConfig) {
c.onSnapshot = fn
}
}
// WithOnError sets an error handler for materialization errors.
// This is called when applying a change message fails.
func WithOnError(fn func(error)) MaterializerOption {
return func(c *materializerConfig) {
c.onError = fn
}
}
// WithStrictSchema enables strict schema validation.
// When enabled, the materializer returns an error for unknown entity types.
// When disabled (default), unknown types are silently ignored.
func WithStrictSchema() MaterializerOption {
return func(c *materializerConfig) {
c.strictSchema = true
}
}
package eventbus
import (
"encoding/json"
"fmt"
"reflect"
"sync"
)
// UpcastFunc transforms event data from one version to another.
// It receives the raw JSON data and returns transformed data with the new type name.
type UpcastFunc func(data json.RawMessage) (json.RawMessage, string, error)
// UpcastErrorHandler is called when an upcast operation fails
type UpcastErrorHandler func(eventType string, data json.RawMessage, err error)
// Upcaster represents a transformation from one event type to another
type Upcaster struct {
FromType string // Source event type
ToType string // Target event type
Upcast UpcastFunc // Transformation function
}
// upcastRegistry manages all registered upcasters
type upcastRegistry struct {
upcasters map[string][]Upcaster // Map from source type to list of upcasters
mu sync.RWMutex
errorHandler UpcastErrorHandler
}
// newUpcastRegistry creates a new upcast registry
func newUpcastRegistry() *upcastRegistry {
return &upcastRegistry{
upcasters: make(map[string][]Upcaster),
}
}
// register adds an upcaster to the registry
func (r *upcastRegistry) register(fromType, toType string, upcast UpcastFunc) error {
if fromType == "" || toType == "" {
return fmt.Errorf("eventbus: upcast types cannot be empty")
}
if fromType == toType {
return fmt.Errorf("eventbus: cannot upcast type to itself")
}
if upcast == nil {
return fmt.Errorf("eventbus: upcast function cannot be nil")
}
r.mu.Lock()
defer r.mu.Unlock()
// Check for circular dependencies
if r.wouldCreateCycle(fromType, toType) {
return fmt.Errorf("eventbus: upcast would create circular dependency")
}
upcaster := Upcaster{
FromType: fromType,
ToType: toType,
Upcast: upcast,
}
r.upcasters[fromType] = append(r.upcasters[fromType], upcaster)
return nil
}
// wouldCreateCycle checks if adding an upcast would create a circular dependency
func (r *upcastRegistry) wouldCreateCycle(fromType, toType string) bool {
// Check if there's already a path from toType back to fromType
visited := make(map[string]bool)
return r.hasCycleDFS(toType, fromType, visited)
}
// hasCycleDFS performs depth-first search to detect cycles
func (r *upcastRegistry) hasCycleDFS(current, target string, visited map[string]bool) bool {
if current == target {
return true
}
if visited[current] {
return false
}
visited[current] = true
for _, upcaster := range r.upcasters[current] {
if r.hasCycleDFS(upcaster.ToType, target, visited) {
return true
}
}
return false
}
// apply attempts to apply upcasts to transform data to the latest version
func (r *upcastRegistry) apply(data json.RawMessage, eventType string) (json.RawMessage, string, error) {
r.mu.RLock()
defer r.mu.RUnlock()
// Check if there are any upcasters for this type
upcasters, exists := r.upcasters[eventType]
if !exists || len(upcasters) == 0 {
return data, eventType, nil // No upcasting needed
}
// Apply upcasts in chain
currentData := data
currentType := eventType
appliedTypes := make(map[string]bool) // Prevent infinite loops
for {
// Mark this type as processed
appliedTypes[currentType] = true
// Find upcasters for current type
upcasters, exists := r.upcasters[currentType]
if !exists || len(upcasters) == 0 {
break // No more upcasts available
}
// Apply the first available upcaster (could be enhanced to choose best path)
upcaster := upcasters[0]
// Check for loops
if appliedTypes[upcaster.ToType] {
return data, eventType, fmt.Errorf("eventbus: upcast loop detected")
}
// Apply the upcast
newData, newType, err := upcaster.Upcast(currentData)
if err != nil {
if r.errorHandler != nil {
r.errorHandler(currentType, currentData, err)
}
return data, eventType, fmt.Errorf("eventbus: upcast failed from %s to %s: %w",
upcaster.FromType, upcaster.ToType, err)
}
currentData = newData
currentType = newType
}
return currentData, currentType, nil
}
// clear removes all registered upcasters
func (r *upcastRegistry) clear() {
r.mu.Lock()
defer r.mu.Unlock()
r.upcasters = make(map[string][]Upcaster)
}
// clearType removes all upcasters for a specific source type
func (r *upcastRegistry) clearType(eventType string) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.upcasters, eventType)
}
// RegisterUpcast registers a type-safe upcast function
func RegisterUpcast[From any, To any](bus *EventBus, upcast func(From) To) error {
if bus == nil {
return fmt.Errorf("eventbus: bus cannot be nil")
}
if upcast == nil {
return fmt.Errorf("eventbus: upcast function cannot be nil")
}
fromType := reflect.TypeOf((*From)(nil)).Elem().String()
toType := reflect.TypeOf((*To)(nil)).Elem().String()
upcastFunc := func(data json.RawMessage) (json.RawMessage, string, error) {
var from From
if err := json.Unmarshal(data, &from); err != nil {
return nil, "", fmt.Errorf("unmarshal source: %w", err)
}
to := upcast(from)
newData, err := json.Marshal(to)
if err != nil {
return nil, "", fmt.Errorf("marshal target: %w", err)
}
return newData, toType, nil
}
return bus.upcastRegistry.register(fromType, toType, upcastFunc)
}
// RegisterUpcastFunc registers a raw upcast function for complex transformations
func RegisterUpcastFunc(bus *EventBus, fromType, toType string, upcast UpcastFunc) error {
if bus == nil {
return fmt.Errorf("eventbus: bus cannot be nil")
}
return bus.upcastRegistry.register(fromType, toType, upcast)
}
// WithUpcast adds an upcast function during bus creation
func WithUpcast(fromType, toType string, upcast UpcastFunc) Option {
return func(bus *EventBus) {
bus.upcastRegistry.register(fromType, toType, upcast)
}
}
// WithUpcastErrorHandler sets the error handler for upcast failures
func WithUpcastErrorHandler(handler UpcastErrorHandler) Option {
return func(bus *EventBus) {
bus.upcastRegistry.errorHandler = handler
}
}
// SetUpcastErrorHandler sets the upcast error handler at runtime
func (bus *EventBus) SetUpcastErrorHandler(handler UpcastErrorHandler) {
bus.upcastRegistry.errorHandler = handler
}
// ClearUpcasts removes all registered upcasters
func (bus *EventBus) ClearUpcasts() {
bus.upcastRegistry.clear()
}
// ClearUpcastsForType removes all upcasters for a specific source type
func (bus *EventBus) ClearUpcastsForType(eventType string) {
bus.upcastRegistry.clearType(eventType)
}