package eventbus import ( "context" "fmt" "hash/fnv" "reflect" "sync" "sync/atomic" "time" ) // Handler is a generic event handler function type Handler[T any] func(T) // ContextHandler is a generic event handler function that accepts context type ContextHandler[T any] func(context.Context, T) // SubscribeOption configures a subscription type SubscribeOption func(*internalHandler) // internalHandler wraps a handler with metadata type internalHandler struct { handler any handlerType reflect.Type eventType reflect.Type once bool async bool sequential bool acceptsContext bool filter any // Predicate function for filtering events mu sync.Mutex executed uint32 // For once handlers, atomically tracks if executed } // PanicHandler is called when a handler panics type PanicHandler func(event any, handlerType reflect.Type, panicValue any) // PersistenceErrorHandler is called when event persistence fails type PersistenceErrorHandler func(event any, eventType reflect.Type, err error) // PublishHook is called when an event is published type PublishHook func(eventType reflect.Type, event any) const numShards = 32 // Power of 2 for efficient modulo // shard represents a single shard with its own mutex type shard struct { mu sync.RWMutex handlers map[reflect.Type][]*internalHandler } // EventBus is a high-performance event bus with sharded locks type EventBus struct { shards [numShards]*shard panicHandler PanicHandler beforePublish PublishHook afterPublish PublishHook wg sync.WaitGroup // Optional persistence fields (nil if not using persistence) store EventStore storePosition int64 storeMu sync.RWMutex persistenceErrorHandler PersistenceErrorHandler persistenceTimeout time.Duration // Upcast registry for event migration upcastRegistry *upcastRegistry } // EventType returns the fully qualified type name of an event. // This is useful for comparing with StoredEvent.Type during replay. // // Example: // // eventType := EventType(MyEvent{}) // // Returns: "github.com/mypackage/MyEvent" // // // Usage in replay: // bus.Replay(ctx, 0, func(event *StoredEvent) error { // if event.Type == EventType(MyEvent{}) { // // Process MyEvent // } // return nil // }) func EventType(event any) string { return reflect.TypeOf(event).String() } // Option is a function that configures the EventBus type Option func(*EventBus) // New creates a new EventBus with sharded locks for better performance func New(opts ...Option) *EventBus { bus := &EventBus{ upcastRegistry: newUpcastRegistry(), } // Initialize shards for i := 0; i < numShards; i++ { bus.shards[i] = &shard{ handlers: make(map[reflect.Type][]*internalHandler), } } // Apply options for _, opt := range opts { opt(bus) } // Initialize persistence if store is provided if bus.store != nil { go bus.startEventProcessor() } return bus } // getShard returns the shard for a given event type using FNV hash func (bus *EventBus) getShard(eventType reflect.Type) *shard { h := fnv.New32a() h.Write([]byte(eventType.String())) shardIndex := h.Sum32() & (numShards - 1) // Fast modulo for power of 2 return bus.shards[shardIndex] } // Subscribe registers a handler for events of type T func Subscribe[T any](bus *EventBus, handler Handler[T], opts ...SubscribeOption) error { if bus == nil { return fmt.Errorf("eventbus: bus cannot be nil") } if handler == nil { return fmt.Errorf("eventbus: handler cannot be nil") } eventType := reflect.TypeOf((*T)(nil)).Elem() h := &internalHandler{ handler: handler, handlerType: reflect.TypeOf(handler), eventType: eventType, acceptsContext: false, } for _, opt := range opts { if opt == nil { return fmt.Errorf("eventbus: subscribe option cannot be nil") } opt(h) } shard := bus.getShard(eventType) shard.mu.Lock() shard.handlers[eventType] = append(shard.handlers[eventType], h) shard.mu.Unlock() return nil } // SubscribeContext registers a context-aware handler for events of type T func SubscribeContext[T any](bus *EventBus, handler ContextHandler[T], opts ...SubscribeOption) error { if bus == nil { return fmt.Errorf("eventbus: bus cannot be nil") } if handler == nil { return fmt.Errorf("eventbus: handler cannot be nil") } eventType := reflect.TypeOf((*T)(nil)).Elem() h := &internalHandler{ handler: handler, handlerType: reflect.TypeOf(handler), eventType: eventType, acceptsContext: true, } for _, opt := range opts { if opt == nil { return fmt.Errorf("eventbus: subscribe option cannot be nil") } opt(h) } shard := bus.getShard(eventType) shard.mu.Lock() shard.handlers[eventType] = append(shard.handlers[eventType], h) shard.mu.Unlock() return nil } // Unsubscribe removes a handler for events of type T func Unsubscribe[T any, H any](bus *EventBus, handler H) error { eventType := reflect.TypeOf((*T)(nil)).Elem() handlerPtr := reflect.ValueOf(handler).Pointer() shard := bus.getShard(eventType) shard.mu.Lock() defer shard.mu.Unlock() handlers := shard.handlers[eventType] for i, h := range handlers { if reflect.ValueOf(h.handler).Pointer() == handlerPtr { // Remove handler efficiently shard.handlers[eventType] = append(handlers[:i], handlers[i+1:]...) return nil } } return fmt.Errorf("handler not found") } // Publish publishes an event to all registered handlers func Publish[T any](bus *EventBus, event T) { PublishContext(bus, context.Background(), event) } // PublishContext publishes an event with context to all registered handlers func PublishContext[T any](bus *EventBus, ctx context.Context, event T) { eventType := reflect.TypeOf(event) // Call before publish hook if bus.beforePublish != nil { bus.beforePublish(eventType, event) } // Get handlers from appropriate shard shard := bus.getShard(eventType) shard.mu.RLock() handlers := shard.handlers[eventType] // Create a copy to avoid holding the lock during handler execution handlersCopy := make([]*internalHandler, len(handlers)) copy(handlersCopy, handlers) shard.mu.RUnlock() // Execute handlers without holding the lock var wg sync.WaitGroup var onceHandlersToRemove []*internalHandler for _, h := range handlersCopy { // Check filter if present if h.filter != nil { if filterFunc, ok := h.filter.(func(T) bool); ok { if !filterFunc(event) { continue // Skip this handler as event doesn't match filter } } } // For once handlers, use CompareAndSwap to ensure atomic execution if h.once { if !atomic.CompareAndSwapUint32(&h.executed, 0, 1) { continue // Already executed } // Mark for removal after execution onceHandlersToRemove = append(onceHandlersToRemove, h) } if h.async { wg.Add(1) bus.wg.Add(1) go func(handler *internalHandler) { defer wg.Done() defer bus.wg.Done() // Check context before executing select { case <-ctx.Done(): return default: callHandlerWithContext(handler, ctx, event, bus.panicHandler) } }(h) } else { // Check context cancellation for sync handlers too select { case <-ctx.Done(): continue // Skip if context cancelled default: callHandlerWithContext(h, ctx, event, bus.panicHandler) } } } // Remove once handlers that were executed if len(onceHandlersToRemove) > 0 { shard.mu.Lock() handlers := shard.handlers[eventType] for _, onceHandler := range onceHandlersToRemove { for i, h := range handlers { if h == onceHandler { handlers = append(handlers[:i], handlers[i+1:]...) break } } } shard.handlers[eventType] = handlers shard.mu.Unlock() } // For async handlers, we don't wait inline to avoid blocking // Users can call bus.Wait() if they need to wait for completion // Call after publish hook if bus.afterPublish != nil { bus.afterPublish(eventType, event) } } // Clear removes all handlers for events of type T func Clear[T any](bus *EventBus) { eventType := reflect.TypeOf((*T)(nil)).Elem() shard := bus.getShard(eventType) shard.mu.Lock() delete(shard.handlers, eventType) shard.mu.Unlock() } // ClearAll removes all handlers for all event types func ClearAll(bus *EventBus) { for i := 0; i < numShards; i++ { bus.shards[i].mu.Lock() bus.shards[i].handlers = make(map[reflect.Type][]*internalHandler) bus.shards[i].mu.Unlock() } } // HasHandlers checks if there are handlers for events of type T func HasHandlers[T any](bus *EventBus) bool { eventType := reflect.TypeOf((*T)(nil)).Elem() shard := bus.getShard(eventType) shard.mu.RLock() defer shard.mu.RUnlock() return len(shard.handlers[eventType]) > 0 } // HandlerCount returns the number of handlers for events of type T func HandlerCount[T any](bus *EventBus) int { eventType := reflect.TypeOf((*T)(nil)).Elem() shard := bus.getShard(eventType) shard.mu.RLock() defer shard.mu.RUnlock() return len(shard.handlers[eventType]) } // Wait blocks until all async handlers complete func (bus *EventBus) Wait() { bus.wg.Wait() } // callHandlerWithContext calls a handler with proper type checking and panic recovery func callHandlerWithContext[T any](h *internalHandler, ctx context.Context, event T, panicHandler PanicHandler) { defer func() { if r := recover(); r != nil && panicHandler != nil { panicHandler(event, h.handlerType, r) } }() // Sequential handlers need locking if h.sequential { h.mu.Lock() defer h.mu.Unlock() } // Try common handler types first for performance switch fn := h.handler.(type) { case Handler[T]: fn(event) case ContextHandler[T]: fn(ctx, event) case func(T): fn(event) case func(context.Context, T): fn(ctx, event) case func(any): fn(event) case func(context.Context, any): fn(ctx, event) case func(context.Context, any) error: _ = fn(ctx, event) default: // Fallback to reflection for other types handlerValue := reflect.ValueOf(h.handler) eventValue := reflect.ValueOf(event) if h.handlerType.Kind() == reflect.Func { numIn := h.handlerType.NumIn() if numIn == 1 { handlerValue.Call([]reflect.Value{eventValue}) } else if numIn == 2 { handlerValue.Call([]reflect.Value{reflect.ValueOf(ctx), eventValue}) } } } } // Subscribe Options // Once makes the handler execute only once func Once() SubscribeOption { return func(h *internalHandler) { h.once = true } } // Async makes the handler execute asynchronously func Async() SubscribeOption { return func(h *internalHandler) { h.async = true } } // Sequential ensures the handler executes sequentially (with mutex) func Sequential() SubscribeOption { return func(h *internalHandler) { h.sequential = true } } // WithFilter configures the handler to only receive events that match the predicate func WithFilter[T any](predicate func(T) bool) SubscribeOption { return func(h *internalHandler) { h.filter = predicate } } // Bus Options // WithPanicHandler sets a panic handler for the event bus func WithPanicHandler(handler PanicHandler) Option { return func(bus *EventBus) { bus.panicHandler = handler } } // WithBeforePublish sets a hook that's called before publishing events func WithBeforePublish(hook PublishHook) Option { return func(bus *EventBus) { bus.beforePublish = hook } } // WithAfterPublish sets a hook that's called after publishing events func WithAfterPublish(hook PublishHook) Option { return func(bus *EventBus) { bus.afterPublish = hook } } // WithPersistenceErrorHandler sets the error handler for persistence failures func WithPersistenceErrorHandler(handler PersistenceErrorHandler) Option { return func(bus *EventBus) { bus.persistenceErrorHandler = handler } } // WithPersistenceTimeout sets the timeout for persistence operations func WithPersistenceTimeout(timeout time.Duration) Option { return func(bus *EventBus) { bus.persistenceTimeout = timeout } } // startEventProcessor starts processing persisted events func (bus *EventBus) startEventProcessor() { // This will be implemented when needed for event replay return } // Backward compatibility methods // SetPanicHandler sets the panic handler (for backward compatibility) func (bus *EventBus) SetPanicHandler(handler PanicHandler) { bus.panicHandler = handler } // SetBeforePublishHook sets the before publish hook (for backward compatibility) func (bus *EventBus) SetBeforePublishHook(hook PublishHook) { bus.beforePublish = hook } // SetAfterPublishHook sets the after publish hook (for backward compatibility) func (bus *EventBus) SetAfterPublishHook(hook PublishHook) { bus.afterPublish = hook } // SetPersistenceErrorHandler sets the persistence error handler (for runtime configuration) func (bus *EventBus) SetPersistenceErrorHandler(handler PersistenceErrorHandler) { bus.persistenceErrorHandler = handler }
package eventbus import ( "context" "encoding/json" "fmt" "reflect" "sync" "time" ) // EventStore defines the interface for persisting events type EventStore interface { // Save an event to storage Save(ctx context.Context, event *StoredEvent) error // Load events from storage within a range Load(ctx context.Context, from, to int64) ([]*StoredEvent, error) // Get the current position (highest event number) GetPosition(ctx context.Context) (int64, error) // Save subscription position for resumable subscriptions SaveSubscriptionPosition(ctx context.Context, subscriptionID string, position int64) error // Load subscription position LoadSubscriptionPosition(ctx context.Context, subscriptionID string) (int64, error) } // StoredEvent represents an event in storage type StoredEvent struct { Position int64 `json:"position"` Type string `json:"type"` Data json.RawMessage `json:"data"` Timestamp time.Time `json:"timestamp"` } // WithStore enables persistence with the given store func WithStore(store EventStore) Option { return func(bus *EventBus) { bus.store = store // Load current position ctx := context.Background() if pos, err := store.GetPosition(ctx); err == nil { bus.storePosition = pos } // Chain the persistence hook with any existing hook existingHook := bus.beforePublish bus.beforePublish = func(eventType reflect.Type, event any) { // Call existing hook first if any if existingHook != nil { existingHook(eventType, event) } // Then persist the event bus.persistEvent(eventType, event) } } } // persistEvent saves an event to storage (only if store is configured) func (bus *EventBus) persistEvent(eventType reflect.Type, event any) { if bus.store == nil { return // No persistence configured } // Marshal the event first data, err := json.Marshal(event) if err != nil { if bus.persistenceErrorHandler != nil { bus.persistenceErrorHandler(event, eventType, fmt.Errorf("failed to marshal event: %w", err)) } return } // Only increment position after successful marshaling bus.storeMu.Lock() position := bus.storePosition + 1 bus.storeMu.Unlock() // Use consistent type naming with EventType() function typeName := eventType.String() stored := &StoredEvent{ Position: position, Type: typeName, Data: data, Timestamp: time.Now(), } // Create context with timeout if configured ctx := context.Background() var cancel context.CancelFunc if bus.persistenceTimeout > 0 { ctx, cancel = context.WithTimeout(ctx, bus.persistenceTimeout) defer cancel() } // Try to save the event if err := bus.store.Save(ctx, stored); err != nil { if bus.persistenceErrorHandler != nil { bus.persistenceErrorHandler(event, eventType, fmt.Errorf("failed to save event: %w", err)) } return } // Only increment position after successful save bus.storeMu.Lock() bus.storePosition = position bus.storeMu.Unlock() } // Replay replays events from a position func (bus *EventBus) Replay(ctx context.Context, from int64, handler func(*StoredEvent) error) error { if bus.store == nil { return fmt.Errorf("replay requires persistence (use WithStore option)") } bus.storeMu.RLock() to := bus.storePosition bus.storeMu.RUnlock() events, err := bus.store.Load(ctx, from, to) if err != nil { return fmt.Errorf("load events: %w", err) } for _, event := range events { if err := handler(event); err != nil { return fmt.Errorf("handle event at position %d: %w", event.Position, err) } } return nil } // ReplayWithUpcast replays events from a position, applying upcasts before passing to handler func (bus *EventBus) ReplayWithUpcast(ctx context.Context, from int64, handler func(*StoredEvent) error) error { return bus.Replay(ctx, from, func(event *StoredEvent) error { // Apply upcasts if available if bus.upcastRegistry != nil { upcastedData, upcastedType, err := bus.upcastRegistry.apply(event.Data, event.Type) if err == nil { // Create a new StoredEvent with upcasted data upcastedEvent := &StoredEvent{ Position: event.Position, Type: upcastedType, Data: upcastedData, Timestamp: event.Timestamp, } return handler(upcastedEvent) } // If upcast fails, pass original event } return handler(event) }) } // IsPersistent returns true if persistence is enabled func (bus *EventBus) IsPersistent() bool { return bus.store != nil } // GetStore returns the event store (or nil if not persistent) func (bus *EventBus) GetStore() EventStore { return bus.store } // SubscribeWithReplay subscribes and replays missed events func SubscribeWithReplay[T any]( bus *EventBus, subscriptionID string, handler Handler[T], opts ...SubscribeOption, ) error { if bus.store == nil { return fmt.Errorf("SubscribeWithReplay requires persistence (use WithStore option)") } ctx := context.Background() // Load last position for this subscription lastPos, _ := bus.store.LoadSubscriptionPosition(ctx, subscriptionID) // Replay missed events var eventType = reflect.TypeOf((*T)(nil)).Elem() // Use consistent type naming with EventType() function typeName := eventType.String() err := bus.Replay(ctx, lastPos+1, func(stored *StoredEvent) error { // Apply upcasts if available eventData, eventTypeName := stored.Data, stored.Type if bus.upcastRegistry != nil { upcastedData, upcastedType, err := bus.upcastRegistry.apply(eventData, eventTypeName) if err == nil { eventData = upcastedData eventTypeName = upcastedType } // Continue even if upcast fails, let the type check handle it } // Only replay events of the correct type if eventTypeName != typeName { return nil } var event T if err := json.Unmarshal(eventData, &event); err != nil { return err } handler(event) // Update position bus.store.SaveSubscriptionPosition(ctx, subscriptionID, stored.Position) return nil }) if err != nil { return fmt.Errorf("replay events: %w", err) } // Subscribe for future events with position tracking wrappedHandler := func(event T) { handler(event) // Update position after handling bus.storeMu.RLock() pos := bus.storePosition bus.storeMu.RUnlock() bus.store.SaveSubscriptionPosition(ctx, subscriptionID, pos) } return Subscribe(bus, wrappedHandler, opts...) } // MemoryStore is a simple in-memory implementation of EventStore type MemoryStore struct { events []*StoredEvent subscriptions map[string]int64 mu sync.RWMutex } // NewMemoryStore creates a new in-memory event store func NewMemoryStore() *MemoryStore { return &MemoryStore{ events: make([]*StoredEvent, 0), subscriptions: make(map[string]int64), } } // Save implements EventStore func (m *MemoryStore) Save(ctx context.Context, event *StoredEvent) error { m.mu.Lock() defer m.mu.Unlock() // Set position for the event event.Position = int64(len(m.events)) + 1 m.events = append(m.events, event) return nil } // Load implements EventStore func (m *MemoryStore) Load(ctx context.Context, from, to int64) ([]*StoredEvent, error) { m.mu.RLock() defer m.mu.RUnlock() var result []*StoredEvent for _, event := range m.events { if event.Position >= from && (to == -1 || event.Position <= to) { result = append(result, event) } } return result, nil } // GetPosition implements EventStore func (m *MemoryStore) GetPosition(ctx context.Context) (int64, error) { m.mu.RLock() defer m.mu.RUnlock() if len(m.events) == 0 { return 0, nil } return m.events[len(m.events)-1].Position, nil } // SaveSubscriptionPosition implements EventStore func (m *MemoryStore) SaveSubscriptionPosition(ctx context.Context, subscriptionID string, position int64) error { m.mu.Lock() defer m.mu.Unlock() m.subscriptions[subscriptionID] = position return nil } // LoadSubscriptionPosition implements EventStore func (m *MemoryStore) LoadSubscriptionPosition(ctx context.Context, subscriptionID string) (int64, error) { m.mu.RLock() defer m.mu.RUnlock() pos, ok := m.subscriptions[subscriptionID] if !ok { return 0, nil } return pos, nil }
package eventbus import ( "encoding/json" "fmt" "reflect" "sync" ) // UpcastFunc transforms event data from one version to another. // It receives the raw JSON data and returns transformed data with the new type name. type UpcastFunc func(data json.RawMessage) (json.RawMessage, string, error) // UpcastErrorHandler is called when an upcast operation fails type UpcastErrorHandler func(eventType string, data json.RawMessage, err error) // Upcaster represents a transformation from one event type to another type Upcaster struct { FromType string // Source event type ToType string // Target event type Upcast UpcastFunc // Transformation function } // upcastRegistry manages all registered upcasters type upcastRegistry struct { upcasters map[string][]Upcaster // Map from source type to list of upcasters mu sync.RWMutex errorHandler UpcastErrorHandler } // newUpcastRegistry creates a new upcast registry func newUpcastRegistry() *upcastRegistry { return &upcastRegistry{ upcasters: make(map[string][]Upcaster), } } // register adds an upcaster to the registry func (r *upcastRegistry) register(fromType, toType string, upcast UpcastFunc) error { if fromType == "" || toType == "" { return fmt.Errorf("eventbus: upcast types cannot be empty") } if fromType == toType { return fmt.Errorf("eventbus: cannot upcast type to itself") } if upcast == nil { return fmt.Errorf("eventbus: upcast function cannot be nil") } r.mu.Lock() defer r.mu.Unlock() // Check for circular dependencies if r.wouldCreateCycle(fromType, toType) { return fmt.Errorf("eventbus: upcast would create circular dependency") } upcaster := Upcaster{ FromType: fromType, ToType: toType, Upcast: upcast, } r.upcasters[fromType] = append(r.upcasters[fromType], upcaster) return nil } // wouldCreateCycle checks if adding an upcast would create a circular dependency func (r *upcastRegistry) wouldCreateCycle(fromType, toType string) bool { // Check if there's already a path from toType back to fromType visited := make(map[string]bool) return r.hasCycleDFS(toType, fromType, visited) } // hasCycleDFS performs depth-first search to detect cycles func (r *upcastRegistry) hasCycleDFS(current, target string, visited map[string]bool) bool { if current == target { return true } if visited[current] { return false } visited[current] = true for _, upcaster := range r.upcasters[current] { if r.hasCycleDFS(upcaster.ToType, target, visited) { return true } } return false } // apply attempts to apply upcasts to transform data to the latest version func (r *upcastRegistry) apply(data json.RawMessage, eventType string) (json.RawMessage, string, error) { r.mu.RLock() defer r.mu.RUnlock() // Check if there are any upcasters for this type upcasters, exists := r.upcasters[eventType] if !exists || len(upcasters) == 0 { return data, eventType, nil // No upcasting needed } // Apply upcasts in chain currentData := data currentType := eventType appliedTypes := make(map[string]bool) // Prevent infinite loops for { // Mark this type as processed appliedTypes[currentType] = true // Find upcasters for current type upcasters, exists := r.upcasters[currentType] if !exists || len(upcasters) == 0 { break // No more upcasts available } // Apply the first available upcaster (could be enhanced to choose best path) upcaster := upcasters[0] // Check for loops if appliedTypes[upcaster.ToType] { return data, eventType, fmt.Errorf("eventbus: upcast loop detected") } // Apply the upcast newData, newType, err := upcaster.Upcast(currentData) if err != nil { if r.errorHandler != nil { r.errorHandler(currentType, currentData, err) } return data, eventType, fmt.Errorf("eventbus: upcast failed from %s to %s: %w", upcaster.FromType, upcaster.ToType, err) } currentData = newData currentType = newType } return currentData, currentType, nil } // clear removes all registered upcasters func (r *upcastRegistry) clear() { r.mu.Lock() defer r.mu.Unlock() r.upcasters = make(map[string][]Upcaster) } // clearType removes all upcasters for a specific source type func (r *upcastRegistry) clearType(eventType string) { r.mu.Lock() defer r.mu.Unlock() delete(r.upcasters, eventType) } // RegisterUpcast registers a type-safe upcast function func RegisterUpcast[From any, To any](bus *EventBus, upcast func(From) To) error { if bus == nil { return fmt.Errorf("eventbus: bus cannot be nil") } if upcast == nil { return fmt.Errorf("eventbus: upcast function cannot be nil") } fromType := reflect.TypeOf((*From)(nil)).Elem().String() toType := reflect.TypeOf((*To)(nil)).Elem().String() upcastFunc := func(data json.RawMessage) (json.RawMessage, string, error) { var from From if err := json.Unmarshal(data, &from); err != nil { return nil, "", fmt.Errorf("unmarshal source: %w", err) } to := upcast(from) newData, err := json.Marshal(to) if err != nil { return nil, "", fmt.Errorf("marshal target: %w", err) } return newData, toType, nil } return bus.upcastRegistry.register(fromType, toType, upcastFunc) } // RegisterUpcastFunc registers a raw upcast function for complex transformations func RegisterUpcastFunc(bus *EventBus, fromType, toType string, upcast UpcastFunc) error { if bus == nil { return fmt.Errorf("eventbus: bus cannot be nil") } return bus.upcastRegistry.register(fromType, toType, upcast) } // WithUpcast adds an upcast function during bus creation func WithUpcast(fromType, toType string, upcast UpcastFunc) Option { return func(bus *EventBus) { bus.upcastRegistry.register(fromType, toType, upcast) } } // WithUpcastErrorHandler sets the error handler for upcast failures func WithUpcastErrorHandler(handler UpcastErrorHandler) Option { return func(bus *EventBus) { bus.upcastRegistry.errorHandler = handler } } // SetUpcastErrorHandler sets the upcast error handler at runtime func (bus *EventBus) SetUpcastErrorHandler(handler UpcastErrorHandler) { bus.upcastRegistry.errorHandler = handler } // ClearUpcasts removes all registered upcasters func (bus *EventBus) ClearUpcasts() { bus.upcastRegistry.clear() } // ClearUpcastsForType removes all upcasters for a specific source type func (bus *EventBus) ClearUpcastsForType(eventType string) { bus.upcastRegistry.clearType(eventType) }