package showbridge
import (
"errors"
"reflect"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/route"
)
func (r *Router) GetRunningConfig() config.Config {
r.runningConfigMu.RLock()
defer r.runningConfigMu.RUnlock()
return r.runningConfig
}
func (r *Router) UpdateConfig(newConfig config.Config, triggerChangeChan bool) ([]config.ModuleError, []config.RouteError, error) {
if !r.runningConfigMu.TryLock() {
return nil, nil, errors.New("config update in progress")
}
defer r.runningConfigMu.Unlock()
oldConfig := r.runningConfig
r.logger.Debug("received config update", "oldConfig", oldConfig, "newConfig", newConfig)
if !reflect.DeepEqual(oldConfig.Api, newConfig.Api) {
r.logger.Info("applying new API config")
r.apiServer.Stop()
r.apiServer.Start(newConfig.Api)
r.runningConfig.Api = newConfig.Api
}
// TODO(jwetzell): handle config update errors better
for _, moduleInstance := range r.ModuleInstances {
moduleInstance.Stop()
}
r.logger.Debug("waiting for modules to exit")
r.moduleWait.Wait()
r.ModuleInstances = make(map[string]common.Module)
r.RouteInstances = []*route.Route{}
var moduleErrors []config.ModuleError
for moduleIndex, moduleDecl := range newConfig.Modules {
err := r.addModule(moduleDecl)
if err != nil {
if moduleErrors == nil {
moduleErrors = []config.ModuleError{}
}
moduleErrors = append(moduleErrors, config.ModuleError{
Index: moduleIndex,
Config: moduleDecl,
Error: err.Error(),
})
continue
}
}
var routeErrors []config.RouteError
for routeIndex, routeDecl := range newConfig.Routes {
err := r.addRoute(routeDecl)
if err != nil {
if routeErrors == nil {
routeErrors = []config.RouteError{}
}
routeErrors = append(routeErrors, config.RouteError{
Index: routeIndex,
Config: routeDecl,
Error: err.Error(),
})
continue
}
}
r.runningConfig = newConfig
r.startModules()
if triggerChangeChan {
r.ConfigChange <- newConfig
}
return moduleErrors, routeErrors, nil
}
package showbridge
import (
"time"
"github.com/jwetzell/showbridge-go/internal/common"
)
func (r *Router) HandleEvent(event common.Event, sender common.EventDestination) {
switch event.Type {
case "ping":
r.unicastEvent(common.Event{Type: "pong", Data: map[string]any{
"timestamp": time.Now().UnixMilli(),
}}, sender)
default:
r.logger.Warn("unknown event type", "eventType", event.Type)
}
}
func (r *Router) AddEventDestination(dest common.EventDestination) {
r.eventDestinationsMu.Lock()
defer r.eventDestinationsMu.Unlock()
r.eventDestinations = append(r.eventDestinations, dest)
}
func (r *Router) RemoveEventDestination(dest common.EventDestination) {
r.eventDestinationsMu.Lock()
defer r.eventDestinationsMu.Unlock()
for i, d := range r.eventDestinations {
if d.Is(dest) {
r.eventDestinations = append(r.eventDestinations[:i], r.eventDestinations[i+1:]...)
break
}
}
}
func (r *Router) unicastEvent(event common.Event, dest common.EventDestination) {
err := dest.Send(event)
if err != nil {
r.logger.Error("failed to send event", "error", err)
}
}
func (r *Router) broadcastEvent(event common.Event) {
r.eventDestinationsMu.Lock()
defer r.eventDestinationsMu.Unlock()
for _, dest := range r.eventDestinations {
err := dest.Send(event)
if err != nil {
r.logger.Error("failed to send event", "error", err)
}
}
}
package api
import (
"context"
_ "embed"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"sync"
"time"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/schema"
)
type ApiServer struct {
config config.ApiConfig
serverMu sync.Mutex
server *http.Server
logger *slog.Logger
configurableRouter config.Configurable
eventRouter common.EventRouter
}
func NewApiServer(configurableRouter config.Configurable, eventRouter common.EventRouter) *ApiServer {
return &ApiServer{
configurableRouter: configurableRouter,
eventRouter: eventRouter,
logger: slog.Default().With("component", "api"),
}
}
func (as *ApiServer) Start(config config.ApiConfig) {
as.config = config
if !as.config.Enabled {
as.logger.Warn("not enabled")
return
}
as.logger.Debug("starting", "port", as.config.Port)
mux := http.NewServeMux()
mux.HandleFunc("/ws", as.handleWebsocket)
mux.HandleFunc("/health", as.handleHealthHTTP)
mux.HandleFunc("/api/v1/config", as.handleConfigHTTP)
mux.HandleFunc("/schema/config.schema.json", handleConfigSchema)
mux.HandleFunc("/schema/routes.schema.json", handleRoutesSchema)
mux.HandleFunc("/schema/modules.schema.json", handleModulesSchema)
mux.HandleFunc("/schema/processors.schema.json", handleProcessorsSchema)
as.serverMu.Lock()
defer as.serverMu.Unlock()
as.server = &http.Server{
Addr: fmt.Sprintf(":%d", as.config.Port),
ReadHeaderTimeout: 5 * time.Second,
Handler: mux,
}
go func() {
err := as.server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
as.logger.Error("server error", "error", err)
}
}()
}
func (as *ApiServer) Stop() {
if as.server == nil {
return
}
as.serverMu.Lock()
defer as.serverMu.Unlock()
if as.server != nil {
apiShutdownCtx, apiShutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer apiShutdownCancel()
err := as.server.Shutdown(apiShutdownCtx)
if err != nil {
as.logger.Error("error shutting down server", "error", err)
}
as.server = nil
}
as.logger.Debug("done")
}
func (as *ApiServer) handleHealthHTTP(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodGet:
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(http.StatusOK)
case http.MethodOptions:
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.WriteHeader(http.StatusOK)
default:
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
func (as *ApiServer) handleConfigHTTP(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodGet:
configJSON, err := json.Marshal(as.configurableRouter.GetRunningConfig())
if err != nil {
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json")
w.Write(configJSON)
case http.MethodPut:
//TODO(jwetzell): again way too much marshaling
cfgBytes, err := io.ReadAll(req.Body)
if err != nil {
http.Error(w, "Bad request", http.StatusBadRequest)
return
}
cfgMap := make(map[string]any)
err = json.Unmarshal(cfgBytes, &cfgMap)
if err != nil {
http.Error(w, "Bad request", http.StatusBadRequest)
return
}
err = schema.ApplyDefaults(&cfgMap)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
err = schema.ValidateConfig(cfgMap)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
validCfgBytes, err := json.Marshal(cfgMap)
if err != nil {
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
var newConfig config.Config
err = json.Unmarshal(validCfgBytes, &newConfig)
if err != nil {
http.Error(w, "Bad request", http.StatusBadRequest)
return
}
moduleErrors, routeErrors, err := as.configurableRouter.UpdateConfig(newConfig, true)
if err != nil {
http.Error(w, err.Error(), http.StatusConflict)
return
}
if len(moduleErrors) > 0 || len(routeErrors) > 0 {
errorResponse := struct {
ModuleErrors []config.ModuleError `json:"moduleErrors,omitempty"`
RouteErrors []config.RouteError `json:"routeErrors,omitempty"`
}{
ModuleErrors: moduleErrors,
RouteErrors: routeErrors,
}
errorResponseJSON, err := json.Marshal(errorResponse)
if err != nil {
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
w.Write(errorResponseJSON)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(http.StatusOK)
case http.MethodOptions:
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.WriteHeader(http.StatusOK)
default:
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
func handleConfigSchema(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodGet:
schemaJSON, err := json.Marshal(schema.ConfigSchema)
if err != nil {
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json")
w.Write(schemaJSON)
case http.MethodOptions:
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.WriteHeader(http.StatusOK)
default:
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
func handleRoutesSchema(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodGet:
schemaJSON, err := json.Marshal(schema.RoutesConfigSchema)
if err != nil {
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json")
w.Write(schemaJSON)
case http.MethodOptions:
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.WriteHeader(http.StatusOK)
default:
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
func handleModulesSchema(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodGet:
schemaJSON, err := json.Marshal(schema.GetModulesSchema())
if err != nil {
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json")
w.Write(schemaJSON)
case http.MethodOptions:
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.WriteHeader(http.StatusOK)
default:
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
func handleProcessorsSchema(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodGet:
schemaJSON, err := json.Marshal(schema.GetProcessorsSchema())
if err != nil {
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json")
w.Write(schemaJSON)
case http.MethodOptions:
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.WriteHeader(http.StatusOK)
default:
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
package api
import (
"encoding/json"
"net/http"
"github.com/gorilla/websocket"
"github.com/jwetzell/showbridge-go/internal/common"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type WebsocketEventDestination struct {
conn *websocket.Conn
}
func (d WebsocketEventDestination) Send(event common.Event) error {
eventJSON, err := event.ToJSON()
if err != nil {
return err
}
return d.conn.WriteMessage(websocket.TextMessage, eventJSON)
}
func (d WebsocketEventDestination) Is(dest common.EventDestination) bool {
other, ok := dest.(WebsocketEventDestination)
if !ok {
return false
}
return d.conn == other.conn
}
func (as *ApiServer) handleWebsocket(w http.ResponseWriter, req *http.Request) {
conn, err := upgrader.Upgrade(w, req, nil)
if err != nil {
as.logger.Error("websocket upgrade error", "error", err)
return
}
defer conn.Close()
eventDestination := WebsocketEventDestination{conn: conn}
as.eventRouter.AddEventDestination(eventDestination)
READ_LOOP:
for {
messageType, message, err := conn.ReadMessage()
if err != nil {
_, ok := err.(*websocket.CloseError)
if ok {
break READ_LOOP
}
}
switch messageType {
case websocket.TextMessage, websocket.BinaryMessage:
event := common.Event{}
err = json.Unmarshal(message, &event)
if err != nil {
as.logger.Error("websocket message unmarshal error", "error", err)
continue
}
as.eventRouter.HandleEvent(event, WebsocketEventDestination{conn: conn})
case websocket.CloseMessage:
break READ_LOOP
case websocket.PingMessage:
err = conn.WriteMessage(websocket.PongMessage, nil)
if err != nil {
as.logger.Error("websocket pong error", "error", err)
}
default:
as.logger.Warn("unsupported websocket message type", "type", messageType)
continue
}
}
//NOTE(jwetzell): remove ws connection
as.eventRouter.RemoveEventDestination(eventDestination)
}
package common
import (
"math"
"reflect"
)
func GetAnyAs[T any](value any) (T, bool) {
typed, ok := value.(T)
return typed, ok
}
func GetAnyAsInt(value any) (int, bool) {
intValue, ok := value.(int)
if ok {
return intValue, true
}
uintValue, ok := value.(uint)
if ok {
return int(uintValue), true
}
byteValue, ok := value.(byte)
if ok {
return int(byteValue), true
}
float32Value, ok := value.(float32)
if ok {
if float64(float32Value) != math.Floor(float64(float32Value)) {
return 0, false
}
return int(float32Value), true
}
float64Value, ok := value.(float64)
if ok {
if float64Value != math.Floor(float64Value) {
return 0, false
}
return int(float64Value), true
}
return 0, false
}
func GetAnyAsByte(value any) (byte, bool) {
byteValue, ok := value.(byte)
if ok {
return byte(byteValue), true
}
intValue, ok := value.(int)
if ok {
return byte(intValue), true
}
uintValue, ok := value.(uint)
if ok {
return byte(uintValue), true
}
float32Value, ok := value.(float32)
if ok {
if float64(float32Value) != math.Floor(float64(float32Value)) {
return 0, false
}
return byte(float32Value), true
}
float64Value, ok := value.(float64)
if ok {
if float64Value != math.Floor(float64Value) {
return 0, false
}
return byte(float64Value), true
}
return 0, false
}
func GetAnyAsByteSlice(value any) ([]byte, bool) {
// already a []byte
byteSlice, ok := value.([]byte)
if ok {
return byteSlice, true
}
// check for a slice
v := reflect.ValueOf(value)
if v.Kind() != reflect.Slice && v.Kind() != reflect.Array {
return nil, false
}
// try to convert each element to a byte
result := make([]byte, v.Len())
for i := 0; i < v.Len(); i++ {
elem := v.Index(i).Interface()
elemValue, ok := GetAnyAsByte(elem)
if !ok {
return nil, false
}
result[i] = elemValue
}
return result, true
}
func GetAnyAsIntSlice(value any) ([]int, bool) {
// already a []int
intSlice, ok := value.([]int)
if ok {
return intSlice, true
}
// check for a slice
v := reflect.ValueOf(value)
if v.Kind() != reflect.Slice && v.Kind() != reflect.Array {
return nil, false
}
result := make([]int, v.Len())
for i := 0; i < v.Len(); i++ {
elem := v.Index(i).Interface()
elemInt, ok := GetAnyAsInt(elem)
if !ok {
return nil, false
}
result[i] = elemInt
}
return result, true
}
func GetAnyAsFloat32(value any) (float32, bool) {
float32Value, ok := value.(float32)
if ok {
return float32Value, true
}
float64Value, ok := value.(float64)
if ok {
return float32(float64Value), true
}
intValue, ok := value.(int)
if ok {
return float32(intValue), true
}
uintValue, ok := value.(uint)
if ok {
return float32(uintValue), true
}
byteValue, ok := value.(byte)
if ok {
return float32(byteValue), true
}
return 0, false
}
func GetAnyAsFloat64(value any) (float64, bool) {
float64Value, ok := value.(float64)
if ok {
return float64Value, true
}
float32Value, ok := value.(float32)
if ok {
return float64(float32Value), true
}
intValue, ok := value.(int)
if ok {
return float64(intValue), true
}
uintValue, ok := value.(uint)
if ok {
return float64(uintValue), true
}
byteValue, ok := value.(byte)
if ok {
return float64(byteValue), true
}
return 0, false
}
package common
import (
"encoding/json"
)
type Event struct {
Type string `json:"type"`
Data any `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
func (e Event) ToJSON() ([]byte, error) {
return json.Marshal(e)
}
type EventDestination interface {
Send(event Event) error
Is(dest EventDestination) bool
}
type EventRouter interface {
HandleEvent(event Event, source EventDestination)
AddEventDestination(dest EventDestination)
RemoveEventDestination(dest EventDestination)
}
package config
import (
"errors"
"github.com/jwetzell/showbridge-go/internal/common"
)
type Params map[string]any
var (
ErrParamNotFound = errors.New("not found")
ErrParamNotString = errors.New("not a string")
ErrParamNotNumber = errors.New("not a number")
ErrParamNotInteger = errors.New("not an integer")
ErrParamNotBool = errors.New("not a boolean")
ErrParamNotSlice = errors.New("not a slice")
ErrParamNotStringSlice = errors.New("not a string slice")
ErrParamNotByteSlice = errors.New("not a byte slice")
ErrParamNotIntSlice = errors.New("not an int slice")
)
func (p Params) GetString(key string) (string, error) {
value, ok := p[key]
if !ok {
return "", ErrParamNotFound
}
stringValue, ok := value.(string)
if !ok {
return "", ErrParamNotString
}
return stringValue, nil
}
func (p Params) GetInt(key string) (int, error) {
value, ok := p[key]
if !ok {
return 0, ErrParamNotFound
}
intValue, ok := common.GetAnyAsInt(value)
if ok {
return intValue, nil
}
return 0, ErrParamNotNumber
}
func (p Params) GetFloat32(key string) (float32, error) {
value, ok := p[key]
if !ok {
return 0, ErrParamNotFound
}
floatValue, ok := common.GetAnyAsFloat32(value)
if ok {
return floatValue, nil
}
return 0, ErrParamNotNumber
}
func (p Params) GetFloat64(key string) (float64, error) {
value, ok := p[key]
if !ok {
return 0, ErrParamNotFound
}
floatValue, ok := common.GetAnyAsFloat64(value)
if ok {
return floatValue, nil
}
return 0, ErrParamNotNumber
}
func (p Params) GetBool(key string) (bool, error) {
value, ok := p[key]
if !ok {
return false, ErrParamNotFound
}
boolValue, ok := value.(bool)
if !ok {
return false, ErrParamNotBool
}
return boolValue, nil
}
func (p Params) GetStringSlice(key string) ([]string, error) {
value, ok := p[key]
if !ok {
return nil, ErrParamNotFound
}
interfaceSlice, ok := value.([]any)
if !ok {
return nil, ErrParamNotSlice
}
stringSlice := make([]string, len(interfaceSlice))
for i, v := range interfaceSlice {
str, ok := v.(string)
if !ok {
return nil, ErrParamNotStringSlice
}
stringSlice[i] = str
}
return stringSlice, nil
}
func (p Params) GetIntSlice(key string) ([]int, error) {
value, ok := p[key]
if !ok {
return nil, ErrParamNotFound
}
intSlice, ok := common.GetAnyAsIntSlice(value)
if !ok {
return nil, ErrParamNotIntSlice
}
return intSlice, nil
}
func (p Params) GetByteSlice(key string) ([]byte, error) {
value, ok := p[key]
if !ok {
return nil, ErrParamNotFound
}
byteSlice, ok := common.GetAnyAsByteSlice(value)
if !ok {
return nil, ErrParamNotByteSlice
}
return byteSlice, nil
}
package framer
type Framer interface {
Decode([]byte) [][]byte
Encode([]byte) []byte
Clear()
Buffer() []byte
}
func GetFramer(framingType string) Framer {
switch framingType {
case "CR":
return newByteSeparatorFramer([]byte{'\r'})
case "LF":
return newByteSeparatorFramer([]byte{'\n'})
case "CRLF":
return newByteSeparatorFramer([]byte{'\r', '\n'})
case "SLIP":
return newSlipFramer()
case "RAW":
return newRawFramer()
default:
return nil
}
}
package framer
type rawFramer struct{}
func newRawFramer() *rawFramer {
return &rawFramer{}
}
func (rf *rawFramer) Decode(data []byte) [][]byte {
return [][]byte{data}
}
func (rf *rawFramer) Encode(data []byte) []byte {
return data
}
func (rf *rawFramer) Clear() {
// NOTE(jwetzell): no internal state to clear
}
func (rf *rawFramer) Buffer() []byte {
return []byte{}
}
package framer
import (
"bytes"
)
type byteSeparatorFramer struct {
buffer []byte
separator []byte
}
func newByteSeparatorFramer(separator []byte) *byteSeparatorFramer {
return &byteSeparatorFramer{separator: separator, buffer: []byte{}}
}
func (bsf *byteSeparatorFramer) Decode(data []byte) [][]byte {
messages := [][]byte{}
bsf.buffer = append(bsf.buffer, data...)
parts := bytes.Split(bsf.buffer, bsf.separator)
if len(parts) > 0 {
bsf.buffer = parts[len(parts)-1]
messages = parts[:len(parts)-1]
}
return messages
}
func (bsf *byteSeparatorFramer) Encode(data []byte) []byte {
return append(data, bsf.separator...)
}
func (bsf *byteSeparatorFramer) Clear() {
bsf.buffer = []byte{}
}
func (bsf *byteSeparatorFramer) Buffer() []byte {
return bsf.buffer
}
package framer
type slipFramer struct {
buffer []byte
}
func newSlipFramer() *slipFramer {
return &slipFramer{buffer: []byte{}}
}
func (sf *slipFramer) Decode(data []byte) [][]byte {
messages := [][]byte{}
END := byte(0xc0)
ESC := byte(0xdb)
ESC_END := byte(0xdc)
ESC_ESC := byte(0xdd)
escapeNext := false
for _, packetByte := range data {
if packetByte == ESC {
escapeNext = true
continue
}
if escapeNext {
switch packetByte {
case ESC_END:
sf.buffer = append(sf.buffer, END)
case ESC_ESC:
sf.buffer = append(sf.buffer, ESC)
}
escapeNext = false
} else if packetByte == END {
if len(sf.buffer) == 0 {
// opening END byte, can discard
continue
} else {
message := sf.buffer
messages = append(messages, message)
}
sf.buffer = []byte{}
} else {
sf.buffer = append(sf.buffer, packetByte)
}
}
return messages
}
func (sf *slipFramer) Encode(data []byte) []byte {
END := byte(0xc0)
ESC := byte(0xdb)
ESC_END := byte(0xdc)
ESC_ESC := byte(0xdd)
var encodedBytes = []byte{END}
for _, byteToEncode := range data {
switch byteToEncode {
case END:
encodedBytes = append(encodedBytes, ESC, ESC_END)
case ESC:
encodedBytes = append(encodedBytes, ESC, ESC_ESC)
default:
encodedBytes = append(encodedBytes, byteToEncode)
}
}
encodedBytes = append(encodedBytes, END)
return encodedBytes
}
func (sf *slipFramer) Clear() {
sf.buffer = []byte{}
}
func (sf *slipFramer) Buffer() []byte {
return sf.buffer
}
package module
import (
"context"
"database/sql"
"fmt"
"log/slog"
"sync"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
_ "modernc.org/sqlite"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "db.sqlite",
Title: "SQLite Database",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"dsn": {
Title: "Data Source Name",
Description: "the data source name (DSN) for the SQLite database",
Type: "string",
MinLength: new(1),
},
},
Required: []string{"dsn"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ModuleConfig) (common.Module, error) {
params := config.Params
dsnString, err := params.GetString("dsn")
if err != nil {
return nil, fmt.Errorf("db.sqlite dsn error: %w", err)
}
return &DbSqlite{Dsn: dsnString, config: config, logger: CreateLogger(config)}, nil
},
})
}
type DbSqlite struct {
config config.ModuleConfig
Dsn string
ctx context.Context
inputHandler common.InputHandler
db *sql.DB
logger *slog.Logger
dbMu sync.Mutex
cancel context.CancelFunc
}
func (dbs *DbSqlite) Id() string {
return dbs.config.Id
}
func (dbs *DbSqlite) Type() string {
return dbs.config.Type
}
func (dbs *DbSqlite) Start(ctx context.Context, inputHandler common.InputHandler) error {
dbs.logger.Debug("running")
dbs.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
dbs.ctx = moduleContext
dbs.cancel = cancel
db, err := sql.Open("sqlite", dbs.Dsn)
if err != nil {
return fmt.Errorf("db.sqlite error opening database: %w", err)
}
dbs.dbMu.Lock()
dbs.db = db
dbs.dbMu.Unlock()
<-dbs.ctx.Done()
dbs.logger.Debug("done")
return nil
}
func (dbs *DbSqlite) Stop() {
if dbs.cancel != nil {
defer dbs.cancel()
}
dbs.dbMu.Lock()
defer dbs.dbMu.Unlock()
if dbs.db != nil {
dbs.db.Close()
}
}
func (dbs *DbSqlite) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
dbs.dbMu.Lock()
defer dbs.dbMu.Unlock()
if dbs.db == nil {
return nil, fmt.Errorf("database not initialized")
}
return dbs.db.QueryContext(ctx, query, args...)
}
package module
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"sync"
"time"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "http.server",
Title: "HTTP Server",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"port": {
Title: "Port",
Description: "the port for the HTTP server to listen on",
Type: "integer",
Minimum: jsonschema.Ptr[float64](1024),
Maximum: jsonschema.Ptr[float64](65535),
},
},
Required: []string{"port"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ModuleConfig) (common.Module, error) {
params := config.Params
portNum, err := params.GetInt("port")
if err != nil {
return nil, fmt.Errorf("http.server port error: %w", err)
}
return &HTTPServer{Port: uint16(portNum), config: config, logger: CreateLogger(config)}, nil
},
})
}
type HTTPServer struct {
config config.ModuleConfig
Port uint16
ctx context.Context
inputHandler common.InputHandler
logger *slog.Logger
cancel context.CancelFunc
server *http.Server
serverMu sync.Mutex
}
type ResponseIOError struct {
Index int `json:"index"`
ProcessError *string `json:"processError"`
}
type IOResponseData struct {
IOErrors []ResponseIOError `json:"ioErrors"`
Message string `json:"message"`
Status string `json:"status"`
}
type httpServerContextKey string
type HTTPServerResponseWriter struct {
http.ResponseWriter
done bool
}
func (hsrw *HTTPServerResponseWriter) WriteHeader(status int) {
hsrw.done = true
hsrw.ResponseWriter.WriteHeader(status)
}
func (hsrw *HTTPServerResponseWriter) Write(data []byte) (int, error) {
hsrw.done = true
return hsrw.ResponseWriter.Write(data)
}
func (hs *HTTPServer) Id() string {
return hs.config.Id
}
func (hs *HTTPServer) Type() string {
return hs.config.Type
}
func (hs *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
responseWriter := HTTPServerResponseWriter{ResponseWriter: w}
response := IOResponseData{
Message: "routing successful",
Status: "ok",
}
if hs.inputHandler != nil {
inputContext := context.WithValue(hs.ctx, httpServerContextKey("responseWriter"), &responseWriter)
aRouteFound, routingErrors := hs.inputHandler(inputContext, hs.Id(), r)
if !responseWriter.done {
if aRouteFound {
if routingErrors != nil {
w.WriteHeader(http.StatusInternalServerError)
response.Status = "error"
response.Message = "routing failed"
response.IOErrors = []ResponseIOError{}
for _, responseIOError := range routingErrors {
errorToAdd := ResponseIOError{
Index: responseIOError.Index,
}
if responseIOError.ProcessError != nil {
errorMsg := responseIOError.ProcessError.Error()
errorToAdd.ProcessError = &errorMsg
}
response.IOErrors = append(response.IOErrors, errorToAdd)
}
json.NewEncoder(w).Encode(response)
return
} else {
w.WriteHeader(http.StatusOK)
response.Message = "routing successful"
json.NewEncoder(w).Encode(response)
return
}
} else {
w.WriteHeader(http.StatusNotFound)
response.Status = "error"
response.Message = "no matching routes found"
json.NewEncoder(w).Encode(response)
return
}
}
} else {
w.WriteHeader(http.StatusInternalServerError)
response.Message = "no router registered"
response.Status = "error"
json.NewEncoder(w).Encode(response)
return
}
}
func (hs *HTTPServer) Start(ctx context.Context, inputHandler common.InputHandler) error {
hs.logger.Debug("running")
hs.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
hs.ctx = moduleContext
hs.cancel = cancel
httpServer := &http.Server{
Addr: fmt.Sprintf(":%d", hs.Port),
ReadHeaderTimeout: 5 * time.Second,
Handler: hs,
}
hs.serverMu.Lock()
hs.server = httpServer
hs.serverMu.Unlock()
err := httpServer.ListenAndServe()
// TODO(jwetzell): handle server closed error differently
if err != nil {
if !errors.Is(err, http.ErrServerClosed) {
return err
}
}
<-hs.ctx.Done()
hs.logger.Debug("done")
return nil
}
func (hs *HTTPServer) Output(ctx context.Context, payload any) error {
responseWriter, ok := ctx.Value(httpServerContextKey("responseWriter")).(*HTTPServerResponseWriter)
if !ok {
return errors.New("http.server output must originate from an http.server input")
}
payloadResponse, ok := common.GetAnyAs[processor.HTTPResponse](payload)
if !ok {
return errors.New("http.server is only able to output HTTPResponse")
}
if responseWriter.done {
return errors.New("http.server response writer has already been written to")
}
responseWriter.WriteHeader(payloadResponse.Status)
responseWriter.Write(payloadResponse.Body)
return nil
}
func (hs *HTTPServer) Stop() {
if hs.cancel != nil {
defer hs.cancel()
}
hs.serverMu.Lock()
defer hs.serverMu.Unlock()
if hs.server != nil {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
hs.server.Shutdown(shutdownCtx)
shutdownCancel()
<-shutdownCtx.Done()
}
}
//go:build cgo
package module
import (
"context"
"fmt"
"log/slog"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"gitlab.com/gomidi/midi/v2"
_ "gitlab.com/gomidi/midi/v2/drivers/rtmididrv"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "midi.input",
Title: "MIDI Input",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"port": {
Title: "Port",
Description: "the name of the MIDI port to listen to",
Type: "string",
},
},
Required: []string{"port"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ModuleConfig) (common.Module, error) {
params := config.Params
portString, err := params.GetString("port")
if err != nil {
return nil, fmt.Errorf("midi.input port error: %w", err)
}
return &MIDIInput{config: config, Port: portString, logger: CreateLogger(config)}, nil
},
})
}
type MIDIInput struct {
config config.ModuleConfig
ctx context.Context
inputHandler common.InputHandler
Port string
logger *slog.Logger
cancel context.CancelFunc
stop func()
}
func (mi *MIDIInput) Id() string {
return mi.config.Id
}
func (mi *MIDIInput) Type() string {
return mi.config.Type
}
func (mi *MIDIInput) Start(ctx context.Context, inputHandler common.InputHandler) error {
mi.logger.Debug("running")
mi.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
mi.ctx = moduleContext
mi.cancel = cancel
in, err := midi.FindInPort(mi.Port)
if err != nil {
return fmt.Errorf("midi.input can't find input port: %s", mi.Port)
}
stop, err := midi.ListenTo(in, func(msg midi.Message, timestampms int32) {
if mi.inputHandler != nil {
mi.inputHandler(mi.ctx, mi.Id(), msg)
}
}, midi.UseSysEx())
if err != nil {
return err
}
mi.stop = stop
<-mi.ctx.Done()
mi.logger.Debug("done")
return nil
}
func (mi *MIDIInput) Stop() {
if mi.cancel != nil {
defer mi.cancel()
}
if mi.stop != nil {
mi.stop()
}
midi.CloseDriver()
}
//go:build cgo
package module
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"gitlab.com/gomidi/midi/v2"
_ "gitlab.com/gomidi/midi/v2/drivers/rtmididrv"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "midi.output",
Title: "MIDI Output",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"port": {
Title: "Port",
Description: "the name of the MIDI port to send messages to",
Type: "string",
},
},
Required: []string{"port"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ModuleConfig) (common.Module, error) {
params := config.Params
portString, err := params.GetString("port")
if err != nil {
return nil, fmt.Errorf("midi.output port error: %w", err)
}
return &MIDIOutput{config: config, Port: portString, logger: CreateLogger(config)}, nil
},
})
}
type MIDIOutput struct {
config config.ModuleConfig
ctx context.Context
inputHandler common.InputHandler
Port string
sendFunc func(midi.Message) error
logger *slog.Logger
cancel context.CancelFunc
sendFuncMu sync.Mutex
}
func (mo *MIDIOutput) Id() string {
return mo.config.Id
}
func (mo *MIDIOutput) Type() string {
return mo.config.Type
}
func (mo *MIDIOutput) Start(ctx context.Context, inputHandler common.InputHandler) error {
mo.logger.Debug("running")
mo.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
mo.ctx = moduleContext
mo.cancel = cancel
out, err := midi.FindOutPort(mo.Port)
if err != nil {
return fmt.Errorf("midi.output can't find output port: %s", mo.Port)
}
send, err := midi.SendTo(out)
if err != nil {
return err
}
mo.sendFuncMu.Lock()
mo.sendFunc = send
mo.sendFuncMu.Unlock()
<-mo.ctx.Done()
mo.logger.Debug("done")
return nil
}
func (mo *MIDIOutput) Output(ctx context.Context, payload any) error {
mo.sendFuncMu.Lock()
defer mo.sendFuncMu.Unlock()
if mo.sendFunc == nil {
return errors.New("midi.output output is not setup")
}
payloadMessage, ok := common.GetAnyAs[midi.Message](payload)
if !ok {
return errors.New("midi.output can only output midi.Message")
}
return mo.sendFunc(payloadMessage)
}
func (mo *MIDIOutput) Stop() {
if mo.cancel != nil {
defer mo.cancel()
}
midi.CloseDriver()
}
package module
import (
"fmt"
"log/slog"
"sync"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
type ModuleRegistration struct {
Type string `json:"type"`
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
ParamsSchema *jsonschema.Schema `json:"paramsSchema,omitempty"`
New func(config.ModuleConfig) (common.Module, error)
}
func RegisterModule(mod ModuleRegistration) {
if mod.Type == "" {
panic("module type is missing")
}
if mod.New == nil {
panic("missing ModuleInfo.New")
}
moduleRegistryMu.Lock()
defer moduleRegistryMu.Unlock()
if _, ok := moduleRegistry[string(mod.Type)]; ok {
panic(fmt.Sprintf("module already registered: %s", mod.Type))
}
moduleRegistry[string(mod.Type)] = mod
}
type ModuleRegistry map[string]ModuleRegistration
func GetModuleRegistration(moduleType string) (ModuleRegistration, bool) {
moduleRegistryMu.RLock()
defer moduleRegistryMu.RUnlock()
mod, ok := moduleRegistry[moduleType]
return mod, ok
}
func GetModuleRegistrations() []ModuleRegistration {
moduleRegistryMu.RLock()
defer moduleRegistryMu.RUnlock()
registrations := make([]ModuleRegistration, 0, len(moduleRegistry))
for _, mod := range moduleRegistry {
registrations = append(registrations, mod)
}
return registrations
}
var (
moduleRegistryMu sync.RWMutex
moduleRegistry = make(ModuleRegistry)
)
func CreateLogger(config config.ModuleConfig) *slog.Logger {
return slog.Default().With("component", "module", "id", config.Id, "type", config.Type)
}
package module
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"sync"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "mqtt.client",
Title: "MQTT Client",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"broker": {
Title: "Broker URL",
Description: "the URL of the MQTT broker to connect to",
Type: "string",
},
"topic": {
Title: "Topic",
Description: "an MQTT topic to subscribe to",
Type: "string",
},
"clientId": {
Title: "Client ID",
Description: "the client ID to use when connecting to the MQTT broker",
Type: "string",
},
"qos": {
Title: "QoS",
Description: "the QoS level to use when publishing messages",
Type: "integer",
Minimum: jsonschema.Ptr[float64](0),
Maximum: jsonschema.Ptr[float64](2),
Default: json.RawMessage(`0`),
},
"retained": {
Title: "Retained",
Description: "whether to set the retained flag when publishing messages",
Type: "boolean",
Default: json.RawMessage(`false`),
},
},
Required: []string{"broker", "topic", "clientId"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(moduleConfig config.ModuleConfig) (common.Module, error) {
params := moduleConfig.Params
brokerString, err := params.GetString("broker")
if err != nil {
return nil, fmt.Errorf("mqtt.client broker error: %w", err)
}
topicString, err := params.GetString("topic")
if err != nil {
return nil, fmt.Errorf("mqtt.client topic error: %w", err)
}
clientIdString, err := params.GetString("clientId")
if err != nil {
return nil, fmt.Errorf("mqtt.client clientId error: %w", err)
}
qosString, err := params.GetInt("qos")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
qosString = 0
} else {
return nil, fmt.Errorf("mqtt.client qos error: %w", err)
}
}
retainedBool, err := params.GetBool("retained")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
retainedBool = false
} else {
return nil, fmt.Errorf("mqtt.client retained error: %w", err)
}
}
return &MQTTClient{config: moduleConfig, Broker: brokerString, Topic: topicString, ClientID: clientIdString, QoS: byte(qosString), Retained: retainedBool, logger: CreateLogger(moduleConfig)}, nil
},
})
}
type MQTTClient struct {
config config.ModuleConfig
ctx context.Context
inputHandler common.InputHandler
Broker string
ClientID string
Topic string
QoS byte
Retained bool
client mqtt.Client
logger *slog.Logger
cancel context.CancelFunc
clientMu sync.Mutex
}
func (mc *MQTTClient) Id() string {
return mc.config.Id
}
func (mc *MQTTClient) Type() string {
return mc.config.Type
}
func (mc *MQTTClient) Start(ctx context.Context, inputHandler common.InputHandler) error {
mc.logger.Debug("running")
mc.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
mc.ctx = moduleContext
mc.cancel = cancel
opts := mqtt.NewClientOptions()
opts.AddBroker(mc.Broker)
opts.SetClientID(mc.ClientID)
opts.SetAutoReconnect(true)
opts.SetCleanSession(false)
opts.OnConnect = func(c mqtt.Client) {
token := mc.client.Subscribe(mc.Topic, 1, func(c mqtt.Client, m mqtt.Message) {
if mc.inputHandler != nil {
mc.inputHandler(mc.ctx, mc.Id(), m)
}
})
token.Wait()
}
mc.clientMu.Lock()
mc.client = mqtt.NewClient(opts)
token := mc.client.Connect()
token.Wait()
err := token.Error()
if err != nil {
return err
}
mc.clientMu.Unlock()
<-mc.ctx.Done()
mc.logger.Debug("done")
return nil
}
func (mc *MQTTClient) Publish(ctx context.Context, topic string, payload any) error {
payloadBytes, ok := common.GetAnyAsByteSlice(payload)
if !ok {
payloadString, ok := common.GetAnyAs[string](payload)
if !ok {
return errors.New("mqtt.client is only able to publish bytes or string")
}
payloadBytes = []byte(payloadString)
}
if mc.client == nil {
return errors.New("mqtt.client client is not setup")
}
if !mc.client.IsConnected() {
return errors.New("mqtt.client is not connected")
}
token := mc.client.Publish(topic, mc.QoS, mc.Retained, payloadBytes)
token.Wait()
return token.Error()
}
func (mc *MQTTClient) Stop() {
if mc.cancel != nil {
mc.cancel()
}
mc.clientMu.Lock()
defer mc.clientMu.Unlock()
if mc.client != nil {
mc.client.Disconnect(250)
}
}
package module
import (
"context"
"errors"
"log/slog"
"sync"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/nats-io/nats.go"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "nats.client",
Title: "NATS Client",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"url": {
Title: "NATS Server URL",
Description: "the URL of the NATS server to connect to",
Type: "string",
},
"subject": {
Title: "Subject",
Description: "the subject to subscribe to",
Type: "string",
},
},
Required: []string{"url", "subject"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ModuleConfig) (common.Module, error) {
params := config.Params
urlString, err := params.GetString("url")
if err != nil {
return nil, errors.New("nats.client url error: " + err.Error())
}
subjectString, err := params.GetString("subject")
if err != nil {
return nil, errors.New("nats.client subject error: " + err.Error())
}
return &NATSClient{config: config, URL: urlString, Subject: subjectString, logger: CreateLogger(config)}, nil
},
})
}
type NATSClient struct {
config config.ModuleConfig
ctx context.Context
inputHandler common.InputHandler
URL string
Subject string
client *nats.Conn
logger *slog.Logger
cancel context.CancelFunc
sub *nats.Subscription
subMu sync.Mutex
clientMu sync.Mutex
}
func (nc *NATSClient) Id() string {
return nc.config.Id
}
func (nc *NATSClient) Type() string {
return nc.config.Type
}
func (nc *NATSClient) Start(ctx context.Context, inputHandler common.InputHandler) error {
nc.logger.Debug("running")
nc.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
nc.ctx = moduleContext
nc.cancel = cancel
client, err := nats.Connect(nc.URL, nats.RetryOnFailedConnect(true))
if err != nil {
return err
}
nc.clientMu.Lock()
nc.client = client
nc.clientMu.Unlock()
sub, err := nc.client.Subscribe(nc.Subject, func(msg *nats.Msg) {
if nc.inputHandler != nil {
nc.inputHandler(nc.ctx, nc.Id(), msg)
}
})
if err != nil {
return err
}
nc.subMu.Lock()
nc.sub = sub
nc.subMu.Unlock()
<-nc.ctx.Done()
nc.logger.Debug("done")
return nil
}
func (nc *NATSClient) Publish(ctx context.Context, topic string, payload any) error {
payloadBytes, ok := common.GetAnyAsByteSlice(payload)
if !ok {
payloadString, ok := common.GetAnyAs[string](payload)
if !ok {
return errors.New("nats.client is only able to publish bytes or string")
}
payloadBytes = []byte(payloadString)
}
nc.clientMu.Lock()
defer nc.clientMu.Unlock()
if nc.client == nil {
return errors.New("nats.client client is not setup")
}
if !nc.client.IsConnected() {
return errors.New("nats.client is not connected")
}
err := nc.client.Publish(topic, payloadBytes)
return err
}
func (nc *NATSClient) Stop() {
if nc.cancel != nil {
defer nc.cancel()
}
nc.subMu.Lock()
defer nc.subMu.Unlock()
if nc.sub != nil {
nc.sub.Unsubscribe()
}
nc.clientMu.Lock()
defer nc.clientMu.Unlock()
if nc.client != nil {
nc.client.Drain()
// TODO(jwetzell): setup closed callback to get when client is fully closed
nc.client.Close()
}
}
package module
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net"
"sync"
"time"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/nats-io/nats-server/v2/server"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "nats.server",
Title: "NATS Server",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"ip": {
Title: "IP",
Description: "the IP address to bind the NATS server to",
Type: "string",
Default: json.RawMessage(`"0.0.0.0"`),
},
"port": {
Title: "Port",
Description: "the port for the NATS server to listen on",
Type: "integer",
Minimum: jsonschema.Ptr[float64](1024),
Maximum: jsonschema.Ptr[float64](65535),
Default: json.RawMessage(`4222`),
},
},
Required: []string{},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(moduleConfig config.ModuleConfig) (common.Module, error) {
params := moduleConfig.Params
portNum, err := params.GetInt("port")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
portNum = 4222
} else {
return nil, fmt.Errorf("nats.server port error: %w", err)
}
}
ipString, err := params.GetString("ip")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
ipString = "0.0.0.0"
} else {
return nil, fmt.Errorf("nats.server ip error: %w", err)
}
}
_, err = net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", ipString, uint16(portNum)))
if err != nil {
return nil, err
}
return &NATSServer{config: moduleConfig, logger: CreateLogger(moduleConfig), Ip: ipString, Port: portNum}, nil
},
})
}
type NATSServer struct {
config config.ModuleConfig
ctx context.Context
Ip string
Port int
inputHandler common.InputHandler
server *server.Server
logger *slog.Logger
cancel context.CancelFunc
serverMu sync.Mutex
}
func (ns *NATSServer) Id() string {
return ns.config.Id
}
func (ns *NATSServer) Type() string {
return ns.config.Type
}
func (ns *NATSServer) Start(ctx context.Context, inputHandler common.InputHandler) error {
ns.logger.Debug("running")
ns.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
ns.ctx = moduleContext
ns.cancel = cancel
natsServer, err := server.NewServer(&server.Options{
Host: ns.Ip,
Port: ns.Port,
NoLog: true,
NoSigs: true,
})
if err != nil {
return err
}
ns.serverMu.Lock()
ns.server = natsServer
natsServer.Start()
if !natsServer.ReadyForConnections(5 * time.Second) {
return errors.New("nats.server failed to start")
}
ns.serverMu.Unlock()
<-ns.ctx.Done()
ns.logger.Debug("done")
return nil
}
func (ns *NATSServer) Stop() {
if ns.cancel != nil {
defer ns.cancel()
}
ns.serverMu.Lock()
defer ns.serverMu.Unlock()
if ns.server != nil {
ns.server.Shutdown()
ns.server.WaitForShutdown()
}
}
package module
import (
"context"
"log/slog"
"net"
"sync"
"time"
"github.com/jwetzell/psn-go"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "psn.client",
Title: "PosiStageNet Client",
New: func(config config.ModuleConfig) (common.Module, error) {
return &PSNClient{config: config, decoder: psn.NewDecoder(), logger: CreateLogger(config)}, nil
},
})
}
type PSNClient struct {
config config.ModuleConfig
conn *net.UDPConn
ctx context.Context
inputHandler common.InputHandler
decoder *psn.Decoder
logger *slog.Logger
cancel context.CancelFunc
connMu sync.Mutex
}
func (pc *PSNClient) Id() string {
return pc.config.Id
}
func (pc *PSNClient) Type() string {
return pc.config.Type
}
func (pc *PSNClient) Start(ctx context.Context, inputHandler common.InputHandler) error {
pc.logger.Debug("running")
pc.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
pc.ctx = moduleContext
pc.cancel = cancel
addr, err := net.ResolveUDPAddr("udp", "236.10.10.10:56565")
if err != nil {
return err
}
client, err := net.ListenMulticastUDP("udp", nil, addr)
if err != nil {
return err
}
pc.connMu.Lock()
pc.conn = client
pc.connMu.Unlock()
buffer := make([]byte, 2048)
for pc.ctx.Err() == nil {
select {
case <-pc.ctx.Done():
return nil
default:
pc.connMu.Lock()
pc.conn.SetDeadline(time.Now().Add(time.Millisecond * 200))
numBytes, _, err := pc.conn.ReadFromUDP(buffer)
pc.connMu.Unlock()
if err != nil {
//NOTE(jwetzell) we hit deadline
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
continue
}
return err
}
if numBytes > 0 {
message := buffer[:numBytes]
err := pc.decoder.Decode(message)
if err != nil {
pc.logger.Error("problem decoding psn traffic", "error", err)
}
if pc.inputHandler != nil {
// TODO(jwetzell): better input handling
for _, tracker := range pc.decoder.Trackers {
pc.inputHandler(pc.ctx, pc.Id(), tracker)
}
} else {
pc.logger.Error("has no input handler")
}
}
}
}
<-pc.ctx.Done()
pc.logger.Debug("done")
return nil
}
func (pc *PSNClient) Stop() {
if pc.cancel != nil {
defer pc.cancel()
}
pc.connMu.Lock()
defer pc.connMu.Unlock()
if pc.conn != nil {
pc.conn.Close()
}
}
package module
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/redis/go-redis/v9"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "redis.client",
Title: "Redis Client",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"host": {
Title: "Host",
Description: "the hostname or IP address of the Redis server to connect to",
Type: "string",
},
"port": {
Title: "Port",
Description: "the port to use when connecting to the Redis server",
Type: "integer",
Minimum: jsonschema.Ptr[float64](1),
Maximum: jsonschema.Ptr[float64](65535),
},
},
Required: []string{"host", "port"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ModuleConfig) (common.Module, error) {
params := config.Params
hostString, err := params.GetString("host")
if err != nil {
return nil, errors.New("redis.client host error: " + err.Error())
}
portInt, err := params.GetInt("port")
if err != nil {
return nil, errors.New("redis.client port error: " + err.Error())
}
return &RedisClient{config: config, Host: hostString, Port: uint16(portInt), logger: CreateLogger(config)}, nil
},
})
}
type RedisClient struct {
config config.ModuleConfig
ctx context.Context
inputHandler common.InputHandler
Host string
Port uint16
client *redis.Client
logger *slog.Logger
cancel context.CancelFunc
clientMu sync.Mutex
}
func (rc *RedisClient) Id() string {
return rc.config.Id
}
func (rc *RedisClient) Type() string {
return rc.config.Type
}
func (rc *RedisClient) Printf(ctx context.Context, format string, v ...any) {
msg := fmt.Sprintf(format, v...)
rc.logger.Debug(msg)
}
func (rc *RedisClient) Start(ctx context.Context, inputHandler common.InputHandler) error {
redis.SetLogger(rc)
rc.logger.Debug("running")
rc.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
rc.ctx = moduleContext
rc.cancel = cancel
client := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", rc.Host, rc.Port),
Password: "",
DB: 0,
})
rc.clientMu.Lock()
rc.client = client
rc.clientMu.Unlock()
<-rc.ctx.Done()
rc.logger.Debug("done")
return nil
}
func (rc *RedisClient) Stop() {
if rc.cancel != nil {
defer rc.cancel()
}
rc.clientMu.Lock()
defer rc.clientMu.Unlock()
if rc.client != nil {
rc.client.Close()
}
}
func (rc *RedisClient) Get(ctx context.Context, key string) (any, error) {
if rc.client != nil {
val, err := rc.client.Get(ctx, key).Result()
if err != nil {
return nil, err
}
return val, nil
}
return nil, errors.New("redis.client not setup")
}
func (rc *RedisClient) Set(ctx context.Context, key string, value any) error {
if rc.client != nil {
status := rc.client.Set(ctx, key, value, 0)
return status.Err()
}
return errors.New("redis.client not setup")
}
//go:build cgo
package module
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/framer"
"go.bug.st/serial"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "serial.client",
Title: "Serial Client",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"port": {
Title: "Port",
Description: "the name of the serial port to connect to",
Type: "string",
},
"baudRate": {
Title: "Baud Rate",
Description: "the baud rate to use when connecting to the serial port",
Type: "integer",
},
"framing": {
Title: "Framing Method",
Description: "the method to use for framing messages on the serial port",
Type: "string",
Enum: []any{"LF", "CR", "CRLF", "SLIP", "RAW"},
},
},
Required: []string{"port", "baudRate", "framing"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ModuleConfig) (common.Module, error) {
params := config.Params
portString, err := params.GetString("port")
if err != nil {
return nil, fmt.Errorf("serial.client port error: %w", err)
}
framingMethodString, err := params.GetString("framing")
if err != nil {
return nil, fmt.Errorf("serial.client framing error: %w", err)
}
inFramer := framer.GetFramer(framingMethodString)
if inFramer == nil {
return nil, fmt.Errorf("serial.client unknown framing method: %s", framingMethodString)
}
outFramer := framer.GetFramer(framingMethodString)
baudRateInt, err := params.GetInt("baudRate")
if err != nil {
return nil, fmt.Errorf("serial.client baudRate error: %w", err)
}
mode := serial.Mode{
BaudRate: baudRateInt,
}
return &SerialClient{config: config, Port: portString, inFramer: inFramer, outFramer: outFramer, Mode: &mode, logger: CreateLogger(config)}, nil
},
})
}
type SerialClient struct {
config config.ModuleConfig
ctx context.Context
inputHandler common.InputHandler
Port string
inFramer framer.Framer
outFramer framer.Framer
Mode *serial.Mode
port serial.Port
logger *slog.Logger
cancel context.CancelFunc
portMu sync.Mutex
}
func (sc *SerialClient) Id() string {
return sc.config.Id
}
func (sc *SerialClient) Type() string {
return sc.config.Type
}
func (sc *SerialClient) SetupPort() error {
sc.portMu.Lock()
defer sc.portMu.Unlock()
port, err := serial.Open(sc.Port, sc.Mode)
if err != nil {
return err
}
sc.port = port
return nil
}
func (sc *SerialClient) Start(ctx context.Context, inputHandler common.InputHandler) error {
sc.logger.Debug("running")
sc.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
sc.ctx = moduleContext
sc.cancel = cancel
for sc.ctx.Err() == nil {
err := sc.SetupPort()
if err != nil {
if sc.ctx.Err() != nil {
return nil
}
sc.logger.Error("port setup error", "port", sc.Port, "error", err.Error())
time.Sleep(time.Second * 2)
continue
}
buffer := make([]byte, 1024)
select {
case <-sc.ctx.Done():
return nil
default:
READ:
for sc.ctx.Err() == nil {
select {
case <-sc.ctx.Done():
return nil
default:
byteCount, err := sc.port.Read(buffer)
if err != nil {
sc.inFramer.Clear()
break READ
}
if sc.inFramer != nil {
if byteCount > 0 {
messages := sc.inFramer.Decode(buffer[0:byteCount])
for _, message := range messages {
if sc.inputHandler != nil {
sc.inputHandler(sc.ctx, sc.Id(), message)
} else {
sc.logger.Error("input received but no router is configured")
}
}
}
}
}
}
}
}
<-sc.ctx.Done()
sc.logger.Debug("done")
return nil
}
func (sc *SerialClient) Output(ctx context.Context, payload any) error {
payloadBytes, ok := common.GetAnyAsByteSlice(payload)
if !ok {
return errors.New("serial.client can only output bytes")
}
_, err := sc.port.Write(sc.outFramer.Encode(payloadBytes))
return err
}
func (sc *SerialClient) Stop() {
if sc.cancel != nil {
defer sc.cancel()
}
sc.portMu.Lock()
defer sc.portMu.Unlock()
if sc.port != nil {
sc.port.Close()
}
if sc.inFramer != nil {
sc.inFramer.Clear()
}
if sc.outFramer != nil {
sc.outFramer.Clear()
}
}
package module
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"os"
"sync"
"time"
"github.com/emiago/diago"
"github.com/emiago/diago/media"
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "sip.call.server",
Title: "SIP Call Server",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"ip": {
Title: "IP",
Description: "the IP address to bind the SIP server to",
Type: "string",
Default: json.RawMessage(`"0.0.0.0"`),
},
"port": {
Title: "Port",
Description: "the port for the SIP server to listen on",
Type: "integer",
Minimum: jsonschema.Ptr[float64](1024),
Maximum: jsonschema.Ptr[float64](65535),
Default: json.RawMessage(`5060`),
},
"transport": {
Title: "Transport",
Description: "the transport protocol to use for the SIP server",
Type: "string",
Enum: []any{"udp", "tcp", "ws", "udp4", "tcp4"},
Default: json.RawMessage(`"udp"`),
},
"userAgent": {
Title: "User Agent",
Description: "the user agent string to use",
Type: "string",
Default: json.RawMessage(`"showbridge"`),
},
},
Required: []string{},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(moduleConfig config.ModuleConfig) (common.Module, error) {
params := moduleConfig.Params
portNum, err := params.GetInt("port")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
portNum = 5060
} else {
return nil, fmt.Errorf("sip.call.server port error: %w", err)
}
}
ipString, err := params.GetString("ip")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
ipString = "0.0.0.0"
} else {
return nil, fmt.Errorf("sip.call.server ip error: %w", err)
}
}
transportString, err := params.GetString("transport")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
transportString = "udp"
} else {
return nil, fmt.Errorf("sip.call.server transport error: %w", err)
}
}
userAgentString, err := params.GetString("userAgent")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
userAgentString = "showbridge"
} else {
return nil, fmt.Errorf("sip.call.server userAgent error: %w", err)
}
}
return &SIPCallServer{config: moduleConfig, IP: ipString, Port: int(portNum), Transport: transportString, UserAgent: userAgentString, logger: CreateLogger(moduleConfig)}, nil
},
})
}
type SIPCallServer struct {
config config.ModuleConfig
ctx context.Context
inputHandler common.InputHandler
IP string
Port int
Transport string
UserAgent string
logger *slog.Logger
cancel context.CancelFunc
ua *sipgo.UserAgent
uaMu sync.Mutex
}
type SIPCallMessage struct {
To string
}
type SIPCall struct {
inDialog *diago.DialogServerSession
lock sync.Mutex
}
type sipCallContextKey string
func (scs *SIPCallServer) Id() string {
return scs.config.Id
}
func (scs *SIPCallServer) Type() string {
return scs.config.Type
}
func (scs *SIPCallServer) Start(ctx context.Context, inputHandler common.InputHandler) error {
scs.logger.Debug("running")
scs.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
scs.ctx = moduleContext
scs.cancel = cancel
diagoLogger := slog.New(slog.NewJSONHandler(io.Discard, nil))
ua, _ := sipgo.NewUA(
sipgo.WithUserAgent(scs.UserAgent),
sipgo.WithUserAgentTransportLayerOptions(sip.WithTransportLayerLogger(diagoLogger)),
sipgo.WithUserAgentTransactionLayerOptions(sip.WithTransactionLayerLogger(diagoLogger)),
)
scs.uaMu.Lock()
scs.ua = ua
scs.uaMu.Unlock()
sip.SetDefaultLogger(diagoLogger)
media.SetDefaultLogger(diagoLogger)
dg := diago.NewDiago(ua, diago.WithLogger(diagoLogger), diago.WithTransport(
diago.Transport{
Transport: scs.Transport,
BindHost: scs.IP,
BindPort: scs.Port,
},
))
err := dg.Serve(scs.ctx, func(inDialog *diago.DialogServerSession) {
scs.HandleCall(inDialog)
})
if err != nil {
scs.logger.Error("diago serve error", "error", err)
}
<-scs.ctx.Done()
scs.logger.Debug("done")
return nil
}
func (scs *SIPCallServer) HandleCall(inDialog *diago.DialogServerSession) {
inDialog.Trying()
inDialog.Ringing()
inDialog.Answer()
dialogContext := context.WithValue(scs.ctx, sipCallContextKey("call"), &SIPCall{
inDialog: inDialog,
})
if scs.inputHandler != nil {
scs.inputHandler(dialogContext, scs.Id(), SIPCallMessage{
To: inDialog.ToUser(),
})
}
}
func (scs *SIPCallServer) Output(ctx context.Context, payload any) error {
call, ok := ctx.Value(sipCallContextKey("call")).(*SIPCall)
if !ok {
return errors.New("sip.call.server output must originate from sip.call.server input")
}
gotLock := call.lock.TryLock()
if !gotLock {
return errors.New("sip.call.server call is already locked")
}
if call.inDialog.LoadState() == sip.DialogStateEnded {
return errors.New("sip.call.server inDialog already ended")
}
payloadDTMFResponse, ok := common.GetAnyAs[processor.SipDTMFResponse](payload)
if ok {
dtmfWriter := call.inDialog.AudioWriterDTMF()
time.Sleep(time.Millisecond * time.Duration(payloadDTMFResponse.PreWait))
for i, dtmfRune := range payloadDTMFResponse.Digits {
err := dtmfWriter.WriteDTMF(dtmfRune)
if err != nil {
return fmt.Errorf("sip.dtmf.server error output dtmf digit at index %d", i)
}
}
time.Sleep(time.Millisecond * time.Duration(payloadDTMFResponse.PostWait))
return nil
}
payloadAudioFileResponse, ok := common.GetAnyAs[processor.SipAudioFileResponse](payload)
if ok {
audioFile, err := os.Open(payloadAudioFileResponse.AudioFile)
if err != nil {
return err
}
defer audioFile.Close()
playback, err := call.inDialog.PlaybackCreate()
if err != nil {
return err
}
time.Sleep(time.Millisecond * time.Duration(payloadAudioFileResponse.PreWait))
_, err = playback.Play(audioFile, "audio/wav")
time.Sleep(time.Millisecond * time.Duration(payloadAudioFileResponse.PostWait))
if err != nil {
return err
}
return nil
}
return errors.New("sip.dtmf.server can only output SipDTMFResponse or SipAudioFileResponse")
}
func (scs *SIPCallServer) Stop() {
if scs.cancel != nil {
defer scs.cancel()
}
scs.uaMu.Lock()
defer scs.uaMu.Unlock()
if scs.ua != nil {
scs.ua.Close()
}
}
package module
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"os"
"strings"
"sync"
"time"
"github.com/emiago/diago"
"github.com/emiago/diago/media"
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "sip.dtmf.server",
Title: "SIP DTMF Server",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"ip": {
Title: "IP",
Description: "the IP address to bind the SIP server to",
Type: "string",
Default: json.RawMessage(`"0.0.0.0"`),
},
"port": {
Title: "Port",
Description: "the port for the SIP server to listen on",
Type: "integer",
Minimum: jsonschema.Ptr[float64](1024),
Maximum: jsonschema.Ptr[float64](65535),
Default: json.RawMessage(`5060`),
},
"transport": {
Title: "Transport",
Description: "the transport protocol to use for the SIP server",
Type: "string",
Enum: []any{"udp", "tcp", "ws", "udp4", "tcp4"},
Default: json.RawMessage(`"udp"`),
},
"userAgent": {
Title: "User Agent",
Description: "the user agent string to use",
Type: "string",
Default: json.RawMessage(`"showbridge"`),
},
"separator": {
Title: "DTMF Separator",
Description: "the DTMF character to use as a separator between DTMF digit groups",
Type: "string",
MinLength: new(1),
MaxLength: new(1),
},
},
Required: []string{"separator"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(moduleConfig config.ModuleConfig) (common.Module, error) {
params := moduleConfig.Params
portNum, err := params.GetInt("port")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
portNum = 5060
} else {
return nil, fmt.Errorf("sip.dtmf.server port error: %w", err)
}
}
ipString, err := params.GetString("ip")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
ipString = "0.0.0.0"
} else {
return nil, fmt.Errorf("sip.dtmf.server ip error: %w", err)
}
}
transportString, err := params.GetString("transport")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
transportString = "udp"
} else {
return nil, fmt.Errorf("sip.dtmf.server transport error: %w", err)
}
}
userAgentString, err := params.GetString("userAgent")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
userAgentString = "showbridge"
} else {
return nil, fmt.Errorf("sip.dtmf.server userAgent error: %w", err)
}
}
separatorString, err := params.GetString("separator")
if err != nil {
return nil, fmt.Errorf("sip.dtmf.server separator error: %w", err)
}
if len(separatorString) != 1 {
return nil, errors.New("sip.dtmf.server separator must be a single character")
}
if !strings.ContainsRune("0123456789*#ABCD", rune(separatorString[0])) {
return nil, errors.New("sip.dtmf.server separator must be a valid DTMF character")
}
return &SIPDTMFServer{config: moduleConfig, IP: ipString, Port: int(portNum), Transport: transportString, UserAgent: userAgentString, Separator: separatorString, logger: CreateLogger(moduleConfig)}, nil
},
})
}
type SIPDTMFServer struct {
config config.ModuleConfig
ctx context.Context
inputHandler common.InputHandler
IP string
Port int
Transport string
UserAgent string
Separator string
logger *slog.Logger
cancel context.CancelFunc
ua *sipgo.UserAgent
uaMu sync.Mutex
}
type SIPDTMFMessage struct {
To string
Digits string
}
type SIPDTMFCall struct {
inDialog *diago.DialogServerSession
lock sync.Mutex
}
func (sds *SIPDTMFServer) Id() string {
return sds.config.Id
}
func (sds *SIPDTMFServer) Type() string {
return sds.config.Type
}
func (sds *SIPDTMFServer) Start(ctx context.Context, inputHandler common.InputHandler) error {
sds.logger.Debug("running")
sds.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
sds.ctx = moduleContext
sds.cancel = cancel
diagoLogger := slog.New(slog.NewJSONHandler(io.Discard, nil))
ua, _ := sipgo.NewUA(
sipgo.WithUserAgent(sds.UserAgent),
sipgo.WithUserAgentTransportLayerOptions(sip.WithTransportLayerLogger(diagoLogger)),
sipgo.WithUserAgentTransactionLayerOptions(sip.WithTransactionLayerLogger(diagoLogger)),
)
sds.uaMu.Lock()
sds.ua = ua
sds.uaMu.Unlock()
sip.SetDefaultLogger(diagoLogger)
media.SetDefaultLogger(diagoLogger)
dg := diago.NewDiago(ua, diago.WithLogger(diagoLogger), diago.WithTransport(
diago.Transport{
Transport: sds.Transport,
BindHost: sds.IP,
BindPort: sds.Port,
},
))
err := dg.Serve(sds.ctx, func(inDialog *diago.DialogServerSession) {
sds.HandleCall(inDialog)
})
if err != nil {
return err
}
<-sds.ctx.Done()
sds.logger.Debug("done")
return nil
}
func (sds *SIPDTMFServer) HandleCall(inDialog *diago.DialogServerSession) error {
inDialog.Trying()
inDialog.Ringing()
inDialog.Answer()
reader := inDialog.AudioReaderDTMF()
userString := ""
return reader.Listen(func(dtmf rune) error {
if dtmf == rune(sds.Separator[0]) {
if sds.inputHandler != nil {
dialogContext := context.WithValue(sds.ctx, sipCallContextKey("call"), &SIPDTMFCall{
inDialog: inDialog,
})
sds.inputHandler(dialogContext, sds.Id(), SIPDTMFMessage{
To: inDialog.ToUser(),
Digits: userString,
})
}
userString = ""
} else {
userString += string(dtmf)
}
return nil
}, 5*time.Second)
}
func (sds *SIPDTMFServer) Output(ctx context.Context, payload any) error {
call, ok := ctx.Value(sipCallContextKey("call")).(*SIPDTMFCall)
if !ok {
return errors.New("sip.dtmf.server output must originate from sip.dtmf.server input")
}
gotLock := call.lock.TryLock()
if !gotLock {
return errors.New("sip.dtmf.server call is already locked")
}
if call.inDialog.LoadState() == sip.DialogStateEnded {
return errors.New("sip.dtmf.server inDialog already ended")
}
payloadDTMFResponse, ok := common.GetAnyAs[processor.SipDTMFResponse](payload)
if ok {
dtmfWriter := call.inDialog.AudioWriterDTMF()
time.Sleep(time.Millisecond * time.Duration(payloadDTMFResponse.PreWait))
for i, dtmfRune := range payloadDTMFResponse.Digits {
err := dtmfWriter.WriteDTMF(dtmfRune)
if err != nil {
return fmt.Errorf("sip.dtmf.server error output dtmf digit at index %d", i)
}
}
time.Sleep(time.Millisecond * time.Duration(payloadDTMFResponse.PostWait))
return nil
}
payloadAudioFileResponse, ok := common.GetAnyAs[processor.SipAudioFileResponse](payload)
if ok {
audioFile, err := os.Open(payloadAudioFileResponse.AudioFile)
if err != nil {
return err
}
defer audioFile.Close()
playback, err := call.inDialog.PlaybackCreate()
if err != nil {
return err
}
time.Sleep(time.Millisecond * time.Duration(payloadAudioFileResponse.PreWait))
_, err = playback.Play(audioFile, "audio/wav")
time.Sleep(time.Millisecond * time.Duration(payloadAudioFileResponse.PostWait))
if err != nil {
return err
}
return nil
}
return errors.New("sip.dtmf.server can only output SipDTMFResponse or SipAudioFileResponse")
}
func (sds *SIPDTMFServer) Stop() {
if sds.cancel != nil {
defer sds.cancel()
}
sds.uaMu.Lock()
defer sds.uaMu.Unlock()
if sds.ua != nil {
sds.ua.Close()
}
}
package module
import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"sync"
"time"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/framer"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "net.tcp.client",
Title: "TCP Client",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"host": {
Title: "Host",
Description: "the hostname or IP address of the TCP server to connect to",
Type: "string",
},
"port": {
Title: "Port",
Description: "the port of the TCP server to connect to",
Type: "integer",
Minimum: jsonschema.Ptr[float64](1),
Maximum: jsonschema.Ptr[float64](65535),
},
"framing": {
Title: "Framing Method",
Description: "the method used to frame messages over the TCP connection",
Type: "string",
Enum: []any{"LF", "CR", "CRLF", "SLIP", "RAW"},
},
},
Required: []string{"host", "port", "framing"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ModuleConfig) (common.Module, error) {
params := config.Params
hostString, err := params.GetString("host")
if err != nil {
return nil, fmt.Errorf("net.tcp.client host error: %w", err)
}
portNum, err := params.GetInt("port")
if err != nil {
return nil, fmt.Errorf("net.tcp.client port error: %w", err)
}
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", hostString, uint16(portNum)))
if err != nil {
return nil, err
}
framingMethodString, err := params.GetString("framing")
if err != nil {
return nil, fmt.Errorf("net.tcp.client framing error: %w", err)
}
inFramer := framer.GetFramer(framingMethodString)
if inFramer == nil {
return nil, fmt.Errorf("net.tcp.client unknown framing method: %s", framingMethodString)
}
outFramer := framer.GetFramer(framingMethodString)
return &TCPClient{inFramer: inFramer, outFramer: outFramer, Addr: addr, config: config, logger: CreateLogger(config)}, nil
},
})
}
type TCPClient struct {
config config.ModuleConfig
inFramer framer.Framer
outFramer framer.Framer
conn *net.TCPConn
ctx context.Context
inputHandler common.InputHandler
Addr *net.TCPAddr
logger *slog.Logger
cancel context.CancelFunc
connMu sync.Mutex
}
func (tc *TCPClient) Id() string {
return tc.config.Id
}
func (tc *TCPClient) Type() string {
return tc.config.Type
}
func (tc *TCPClient) Start(ctx context.Context, inputHandler common.InputHandler) error {
tc.logger.Debug("running")
tc.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
tc.ctx = moduleContext
tc.cancel = cancel
CONNECT_RETRY:
for tc.ctx.Err() == nil {
err := tc.SetupConn()
if err != nil {
if tc.ctx.Err() != nil {
break CONNECT_RETRY
}
tc.logger.Error("connection error", "error", err.Error())
time.Sleep(time.Second * 2)
continue
}
buffer := make([]byte, 1024)
READ:
for tc.ctx.Err() == nil {
tc.conn.SetReadDeadline(time.Now().Add(time.Millisecond * 200))
byteCount, err := tc.conn.Read(buffer)
if err != nil {
if opErr, ok := err.(*net.OpError); ok {
//NOTE(jwetzell) we hit deadline
if opErr.Timeout() {
continue
}
}
if errors.Is(err, net.ErrClosed) {
break CONNECT_RETRY
}
break READ
}
if tc.inFramer != nil {
if byteCount > 0 {
messages := tc.inFramer.Decode(buffer[0:byteCount])
for _, message := range messages {
if tc.inputHandler != nil {
tc.inputHandler(tc.ctx, tc.Id(), message)
} else {
tc.logger.Error("input received but no input handler is configured")
}
}
}
}
}
}
<-tc.ctx.Done()
tc.logger.Debug("done")
return nil
}
func (tc *TCPClient) SetupConn() error {
tc.connMu.Lock()
defer tc.connMu.Unlock()
client, err := net.DialTCP("tcp", nil, tc.Addr)
tc.conn = client
return err
}
func (tc *TCPClient) Output(ctx context.Context, payload any) error {
tc.connMu.Lock()
defer tc.connMu.Unlock()
if tc.conn == nil {
return errors.New("net.tcp.client client is not setup")
}
payloadBytes, ok := common.GetAnyAsByteSlice(payload)
if !ok {
return errors.New("net.tcp.client is only able to output bytes")
}
_, err := tc.conn.Write(tc.inFramer.Encode(payloadBytes))
return err
}
func (tc *TCPClient) Stop() {
if tc.cancel != nil {
defer tc.cancel()
}
tc.connMu.Lock()
defer tc.connMu.Unlock()
if tc.conn != nil {
tc.conn.Close()
}
if tc.inFramer != nil {
tc.inFramer.Clear()
}
if tc.outFramer != nil {
tc.outFramer.Clear()
}
}
package module
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net"
"slices"
"strings"
"sync"
"time"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/framer"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "net.tcp.server",
Title: "TCP Server",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"ip": {
Title: "IP",
Description: "the IP address to bind the TCP server to",
Type: "string",
Default: json.RawMessage(`"0.0.0.0"`),
},
"port": {
Title: "Port",
Description: "the port for the TCP server to listen on",
Type: "integer",
Minimum: jsonschema.Ptr[float64](1024),
Maximum: jsonschema.Ptr[float64](65535),
},
"framing": {
Title: "Framing Method",
Description: "the method used to frame messages over the TCP connection",
Type: "string",
Enum: []any{"LF", "CR", "CRLF", "SLIP", "RAW"},
},
},
Required: []string{"port", "framing"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(moduleConfig config.ModuleConfig) (common.Module, error) {
params := moduleConfig.Params
portNum, err := params.GetInt("port")
if err != nil {
return nil, fmt.Errorf("net.tcp.server port error: %w", err)
}
framingMethodString, err := params.GetString("framing")
if err != nil {
return nil, fmt.Errorf("net.tcp.server framing error: %w", err)
}
inFramer := framer.GetFramer(framingMethodString)
if inFramer == nil {
return nil, fmt.Errorf("net.tcp.server unknown framing method: %s", framingMethodString)
}
outFramer := framer.GetFramer(framingMethodString)
ipString, err := params.GetString("ip")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
ipString = "0.0.0.0"
} else {
return nil, fmt.Errorf("net.tcp.server ip error: %w", err)
}
}
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", ipString, uint16(portNum)))
if err != nil {
return nil, err
}
return &TCPServer{inFramer: inFramer, outFramer: outFramer, framerType: framingMethodString, Addr: addr, config: moduleConfig, logger: CreateLogger(moduleConfig)}, nil
},
})
}
type tcpConnection struct {
conn *net.TCPConn
framer framer.Framer
}
type TCPServer struct {
config config.ModuleConfig
Addr *net.TCPAddr
inFramer framer.Framer
outFramer framer.Framer
framerType string
ctx context.Context
inputHandler common.InputHandler
wg sync.WaitGroup
connections []tcpConnection
connectionsMu sync.RWMutex
logger *slog.Logger
cancel context.CancelFunc
listener *net.TCPListener
listenerMu sync.Mutex
connectionShutdownCtx context.Context
connectionShutdown context.CancelFunc
}
func (ts *TCPServer) Id() string {
return ts.config.Id
}
func (ts *TCPServer) Type() string {
return ts.config.Type
}
func (ts *TCPServer) handleClient(client *net.TCPConn) {
ts.connectionsMu.Lock()
ts.connections = append(ts.connections, tcpConnection{conn: client, framer: framer.GetFramer(ts.framerType)})
ts.connectionsMu.Unlock()
ts.logger.Debug("connection accepted", "remoteAddr", client.RemoteAddr().String())
defer func() {
client.Close()
ts.connectionsMu.Lock()
for i := 0; i < len(ts.connections); i++ {
if ts.connections[i].conn == client {
ts.connections = slices.Delete(ts.connections, i, i+1)
break
}
}
ts.connectionsMu.Unlock()
ts.logger.Debug("connection closed", "remoteAddr", client.RemoteAddr().String())
}()
buffer := make([]byte, 1024)
for ts.ctx.Err() == nil && ts.connectionShutdownCtx.Err() == nil {
client.SetDeadline(time.Now().Add(time.Millisecond * 200))
byteCount, err := client.Read(buffer)
if err != nil {
if opErr, ok := err.(*net.OpError); ok {
//NOTE(jwetzell) we hit deadline
if opErr.Timeout() {
continue
}
}
break
}
if ts.inFramer != nil {
if byteCount > 0 {
messages := ts.inFramer.Decode(buffer[0:byteCount])
for _, message := range messages {
if ts.inputHandler != nil {
ts.inputHandler(ts.ctx, ts.Id(), message)
} else {
ts.logger.Error("input received but no input handler is configured")
}
}
}
}
}
}
func (ts *TCPServer) Start(ctx context.Context, inputHandler common.InputHandler) error {
ts.logger.Debug("running")
ts.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
ts.ctx = moduleContext
ts.cancel = cancel
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
ts.connectionShutdownCtx = shutdownCtx
ts.connectionShutdown = shutdownCancel
listener, err := net.ListenTCP("tcp", ts.Addr)
if err != nil {
return err
}
ts.listenerMu.Lock()
ts.listener = listener
ts.listenerMu.Unlock()
ts.wg.Add(1)
AcceptLoop:
for ts.ctx.Err() == nil {
conn, err := listener.AcceptTCP()
if err != nil {
if errors.Is(err, net.ErrClosed) {
break AcceptLoop
}
select {
case <-ts.ctx.Done():
break AcceptLoop
default:
ts.logger.Debug("problem with listener", "error", err)
}
} else {
ts.wg.Go(func() {
ts.handleClient(conn)
})
}
}
ts.wg.Done()
<-ts.ctx.Done()
ts.logger.Debug("done")
return nil
}
func (ts *TCPServer) Output(ctx context.Context, payload any) error {
payloadBytes, ok := common.GetAnyAsByteSlice(payload)
if !ok {
return errors.New("net.tcp.server is only able to output bytes")
}
ts.connectionsMu.Lock()
defer ts.connectionsMu.Unlock()
var errorString strings.Builder
if ts.outFramer == nil {
return errors.New("no output framer configured")
}
outputBytes := ts.outFramer.Encode(payloadBytes)
for _, connection := range ts.connections {
_, err := connection.conn.Write(outputBytes)
if err != nil {
fmt.Fprintf(&errorString, "%s\n", err.Error())
}
}
if errorString.String() == "" {
return nil
}
return fmt.Errorf("net.tcp.server error during output: %s", errorString.String())
}
func (ts *TCPServer) Stop() {
if ts.cancel != nil {
defer ts.cancel()
}
if ts.connectionShutdown != nil {
ts.connectionShutdown()
}
ts.listenerMu.Lock()
defer ts.listenerMu.Unlock()
if ts.listener != nil {
ts.listener.Close()
}
ts.logger.Debug("waiting for connections to close")
ts.wg.Wait()
ts.logger.Debug("all connections closed")
}
package module
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "time.interval",
Title: "Interval",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"duration": {
Title: "Duration",
Description: "time in milliseconds between emitted events",
Type: "integer",
},
},
Required: []string{"duration"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ModuleConfig) (common.Module, error) {
params := config.Params
durationInt, err := params.GetInt("duration")
if err != nil {
return nil, fmt.Errorf("time.interval duration error: %w", err)
}
return &TimeInterval{Duration: uint32(durationInt), config: config, logger: CreateLogger(config)}, nil
},
})
}
type TimeInterval struct {
config config.ModuleConfig
Duration uint32
ctx context.Context
inputHandler common.InputHandler
ticker *time.Ticker
logger *slog.Logger
cancel context.CancelFunc
}
func (i *TimeInterval) Id() string {
return i.config.Id
}
func (i *TimeInterval) Type() string {
return i.config.Type
}
func (i *TimeInterval) Start(ctx context.Context, inputHandler common.InputHandler) error {
i.logger.Debug("running")
i.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
i.ctx = moduleContext
i.cancel = cancel
ticker := time.NewTicker(time.Millisecond * time.Duration(i.Duration))
i.ticker = ticker
for i.ctx.Err() == nil {
select {
case <-ticker.C:
if i.inputHandler != nil {
i.inputHandler(i.ctx, i.Id(), time.Now())
}
default:
continue
}
}
<-i.ctx.Done()
i.logger.Debug("done")
return nil
}
func (i *TimeInterval) Stop() {
if i.cancel != nil {
defer i.cancel()
}
if i.ticker != nil {
i.ticker.Stop()
}
}
package module
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "time.timer",
Title: "Timer",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"duration": {
Title: "Duration",
Description: "time in milliseconds before the timer fires",
Type: "integer",
},
},
Required: []string{"duration"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ModuleConfig) (common.Module, error) {
params := config.Params
durationNum, err := params.GetInt("duration")
if err != nil {
return nil, fmt.Errorf("time.timer duration error: %w", err)
}
return &TimeTimer{Duration: uint32(durationNum), config: config, logger: CreateLogger(config)}, nil
},
})
}
type TimeTimer struct {
config config.ModuleConfig
Duration uint32
ctx context.Context
inputHandler common.InputHandler
timer *time.Timer
logger *slog.Logger
cancel context.CancelFunc
}
func (t *TimeTimer) Id() string {
return t.config.Id
}
func (t *TimeTimer) Type() string {
return t.config.Type
}
func (t *TimeTimer) Start(ctx context.Context, inputHandler common.InputHandler) error {
t.logger.Debug("running")
t.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
t.ctx = moduleContext
t.cancel = cancel
t.timer = time.NewTimer(time.Millisecond * time.Duration(t.Duration))
for t.ctx.Err() == nil {
select {
case <-t.ctx.Done():
return nil
case time := <-t.timer.C:
if t.inputHandler != nil {
t.inputHandler(t.ctx, t.Id(), time)
}
}
}
<-t.ctx.Done()
t.logger.Debug("done")
return nil
}
func (t *TimeTimer) Stop() {
if t.cancel != nil {
defer t.cancel()
}
if t.timer != nil {
t.timer.Stop()
}
}
package module
import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"sync"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "net.udp.client",
Title: "UDP Client",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"host": {
Title: "Host",
Description: "the hostname or IP address of the UDP server to connect to",
Type: "string",
},
"port": {
Title: "Port",
Description: "the port of the UDP server to connect to",
Type: "integer",
Minimum: jsonschema.Ptr[float64](1),
Maximum: jsonschema.Ptr[float64](65535),
},
},
Required: []string{"host", "port"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ModuleConfig) (common.Module, error) {
params := config.Params
hostString, err := params.GetString("host")
if err != nil {
return nil, fmt.Errorf("net.udp.client host error: %w", err)
}
portNum, err := params.GetInt("port")
if err != nil {
return nil, fmt.Errorf("net.udp.client port error: %w", err)
}
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", hostString, uint16(portNum)))
if err != nil {
return nil, err
}
return &UDPClient{Addr: addr, config: config, logger: CreateLogger(config)}, nil
},
})
}
type UDPClient struct {
config config.ModuleConfig
Addr *net.UDPAddr
Port uint16
conn *net.UDPConn
ctx context.Context
inputHandler common.InputHandler
logger *slog.Logger
cancel context.CancelFunc
connMu sync.Mutex
}
func (uc *UDPClient) Id() string {
return uc.config.Id
}
func (uc *UDPClient) Type() string {
return uc.config.Type
}
func (uc *UDPClient) SetupConn() error {
uc.connMu.Lock()
defer uc.connMu.Unlock()
client, err := net.DialUDP("udp", nil, uc.Addr)
uc.conn = client
return err
}
func (uc *UDPClient) Start(ctx context.Context, inputHandler common.InputHandler) error {
uc.logger.Debug("running")
uc.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
uc.ctx = moduleContext
uc.cancel = cancel
err := uc.SetupConn()
if err != nil {
return err
}
<-uc.ctx.Done()
uc.logger.Debug("done")
return nil
}
func (uc *UDPClient) Output(ctx context.Context, payload any) error {
uc.connMu.Lock()
defer uc.connMu.Unlock()
payloadBytes, ok := common.GetAnyAsByteSlice(payload)
if !ok {
return errors.New("net.udp.client is only able to output bytes")
}
if uc.conn != nil {
_, err := uc.conn.Write(payloadBytes)
if err != nil {
return err
}
} else {
return errors.New("net.udp.client client is not setup")
}
return nil
}
func (uc *UDPClient) Stop() {
if uc.cancel != nil {
defer uc.cancel()
}
uc.connMu.Lock()
defer uc.connMu.Unlock()
if uc.conn != nil {
uc.conn.Close()
}
}
package module
import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"sync"
"time"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "net.udp.multicast",
Title: "UDP Multicast",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"ip": {
Title: "IP",
Description: "the multicast address to listen on",
Type: "string",
},
"port": {
Title: "Port",
Description: "the port to listen on",
Type: "integer",
Minimum: jsonschema.Ptr[float64](1024),
Maximum: jsonschema.Ptr[float64](65535),
},
},
Required: []string{"ip", "port"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(moduleConfig config.ModuleConfig) (common.Module, error) {
params := moduleConfig.Params
ipString, err := params.GetString("ip")
if err != nil {
return nil, fmt.Errorf("net.udp.multicast ip error: %w", err)
}
portNum, err := params.GetInt("port")
if err != nil {
return nil, fmt.Errorf("net.udp.multicast port error: %w", err)
}
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", ipString, uint16(portNum)))
if err != nil {
return nil, err
}
return &UDPMulticast{config: moduleConfig, Addr: addr, logger: CreateLogger(moduleConfig)}, nil
},
})
}
type UDPMulticast struct {
config config.ModuleConfig
conn *net.UDPConn
ctx context.Context
inputHandler common.InputHandler
Addr *net.UDPAddr
logger *slog.Logger
cancel context.CancelFunc
connMu sync.Mutex
}
func (um *UDPMulticast) Id() string {
return um.config.Id
}
func (um *UDPMulticast) Type() string {
return um.config.Type
}
func (um *UDPMulticast) Start(ctx context.Context, inputHandler common.InputHandler) error {
um.logger.Debug("running")
um.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
um.ctx = moduleContext
um.cancel = cancel
client, err := net.ListenMulticastUDP("udp", nil, um.Addr)
if err != nil {
return err
}
defer client.Close()
um.connMu.Lock()
um.conn = client
um.connMu.Unlock()
buffer := make([]byte, 2048)
for um.ctx.Err() == nil {
um.conn.SetDeadline(time.Now().Add(time.Millisecond * 200))
numBytes, _, err := um.conn.ReadFromUDP(buffer)
if err != nil {
if errors.Is(err, net.ErrClosed) {
break
}
//NOTE(jwetzell) we hit deadline
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
continue
}
return err
}
if numBytes > 0 {
message := buffer[:numBytes]
if um.inputHandler != nil {
um.inputHandler(um.ctx, um.Id(), message)
} else {
um.logger.Error("input received but no input handler is configured")
}
}
}
<-um.ctx.Done()
um.logger.Debug("done")
return nil
}
func (um *UDPMulticast) Output(ctx context.Context, payload any) error {
payloadBytes, ok := common.GetAnyAsByteSlice(payload)
if !ok {
return errors.New("net.udp.multicast can only output bytes")
}
if um.conn == nil {
return errors.New("net.udp.multicast connection is not setup")
}
_, err := um.conn.Write(payloadBytes)
return err
}
func (um *UDPMulticast) Stop() {
if um.cancel != nil {
defer um.cancel()
}
um.connMu.Lock()
defer um.connMu.Unlock()
if um.conn != nil {
um.conn.Close()
}
}
package module
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net"
"sync"
"time"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "net.udp.server",
Title: "UDP Server",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"ip": {
Title: "IP",
Description: "the IP address to bind the UDP server to",
Type: "string",
Default: json.RawMessage(`"0.0.0.0"`),
},
"port": {
Title: "Port",
Description: "the port for the UDP server to listen on",
Type: "integer",
Minimum: jsonschema.Ptr[float64](1024),
Maximum: jsonschema.Ptr[float64](65535),
},
"bufferSize": {
Title: "Buffer Size",
Description: "the size of the buffer for incoming UDP messages",
Type: "integer",
Minimum: jsonschema.Ptr[float64](1),
Maximum: jsonschema.Ptr[float64](65535),
Default: json.RawMessage("2048"),
},
},
Required: []string{"port"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(moduleConfig config.ModuleConfig) (common.Module, error) {
params := moduleConfig.Params
portNum, err := params.GetInt("port")
if err != nil {
return nil, fmt.Errorf("net.udp.server port error: %w", err)
}
ipString, err := params.GetString("ip")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
ipString = "0.0.0.0"
} else {
return nil, fmt.Errorf("net.udp.server ip error: %w", err)
}
}
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", ipString, uint16(portNum)))
if err != nil {
return nil, err
}
bufferSizeNum, err := params.GetInt("bufferSize")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
bufferSizeNum = 2048
} else {
return nil, fmt.Errorf("net.udp.server bufferSize error: %w", err)
}
}
return &UDPServer{Addr: addr, BufferSize: bufferSizeNum, config: moduleConfig, logger: CreateLogger(moduleConfig)}, nil
},
})
}
type UDPServer struct {
Addr *net.UDPAddr
BufferSize int
config config.ModuleConfig
ctx context.Context
inputHandler common.InputHandler
logger *slog.Logger
cancel context.CancelFunc
listener *net.UDPConn
listenerMu sync.Mutex
}
func (us *UDPServer) Id() string {
return us.config.Id
}
func (us *UDPServer) Type() string {
return us.config.Type
}
func (us *UDPServer) Start(ctx context.Context, inputHandler common.InputHandler) error {
us.logger.Debug("running")
us.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
us.ctx = moduleContext
us.cancel = cancel
listener, err := net.ListenUDP("udp", us.Addr)
if err != nil {
return err
}
us.listenerMu.Lock()
us.listener = listener
us.listenerMu.Unlock()
buffer := make([]byte, us.BufferSize)
for us.ctx.Err() == nil {
listener.SetDeadline(time.Now().Add(time.Millisecond * 200))
numBytes, _, err := listener.ReadFromUDP(buffer)
if err != nil {
//NOTE(jwetzell) we hit deadline
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
continue
}
break
}
message := buffer[:numBytes]
if us.inputHandler != nil {
us.inputHandler(us.ctx, us.Id(), message)
} else {
us.logger.Error("input received but no input handler is configured")
}
}
<-us.ctx.Done()
us.logger.Debug("done")
return nil
}
func (us *UDPServer) Output(ctx context.Context, payload any) error {
return errors.New("net.udp.server output is not implemented")
}
func (us *UDPServer) Stop() {
if us.cancel != nil {
defer us.cancel()
}
us.listenerMu.Lock()
defer us.listenerMu.Unlock()
if us.listener != nil {
us.listener.Close()
}
}
package module
import (
"context"
"errors"
"fmt"
"log/slog"
"net/url"
"sync"
"time"
"github.com/google/jsonschema-go/jsonschema"
"github.com/gorilla/websocket"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterModule(ModuleRegistration{
Type: "websocket.client",
Title: "WebSocket Client",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"url": {
Title: "URL",
Description: "the URL of the WebSocket server to connect",
Type: "string",
},
},
Required: []string{"url"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ModuleConfig) (common.Module, error) {
params := config.Params
urlString, err := params.GetString("url")
if err != nil {
return nil, fmt.Errorf("websocket.client url error: %w", err)
}
parsedURL, err := url.Parse(urlString)
if err != nil {
return nil, fmt.Errorf("websocket.client url error: %w", err)
}
if parsedURL.Scheme != "ws" && parsedURL.Scheme != "wss" {
return nil, fmt.Errorf("websocket.client url error: scheme must be ws or wss")
}
return &WebSocketClient{URL: *parsedURL, config: config, logger: CreateLogger(config)}, nil
},
})
}
type WebSocketClient struct {
config config.ModuleConfig
URL url.URL
ctx context.Context
conn *websocket.Conn
inputHandler common.InputHandler
logger *slog.Logger
cancel context.CancelFunc
connMu sync.Mutex
}
func (wc *WebSocketClient) Id() string {
return wc.config.Id
}
func (wc *WebSocketClient) Type() string {
return wc.config.Type
}
func (wc *WebSocketClient) SetupConn() error {
wc.connMu.Lock()
defer wc.connMu.Unlock()
if wc.conn != nil {
wc.conn.Close()
}
conn, _, err := websocket.DefaultDialer.Dial(wc.URL.String(), nil)
if err != nil {
return fmt.Errorf("websocket.client dial error: %w", err)
}
conn.SetCloseHandler(func(code int, text string) error {
// NOTE(jwetzell): attempt to send close message back to server before closing connection
err := wc.conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
time.Now().Add(time.Minute),
)
wc.connMu.Lock()
defer wc.connMu.Unlock()
if wc.conn != nil {
wc.conn.Close()
}
return err
})
wc.conn = conn
return err
}
func (wc *WebSocketClient) Start(ctx context.Context, inputHandler common.InputHandler) error {
wc.logger.Debug("running")
wc.inputHandler = inputHandler
moduleContext, cancel := context.WithCancel(ctx)
wc.ctx = moduleContext
wc.cancel = cancel
for wc.ctx.Err() == nil {
err := wc.SetupConn()
if err != nil {
wc.logger.Error("connection error", "error", err)
} else {
// NOTE(jwetzell): enter read loop until an error occurs
wc.readLoop()
}
// NOTE(jwetzell): if connection is lost or read error wait before trying again
time.Sleep(2 * time.Second)
}
<-wc.ctx.Done()
wc.logger.Debug("done")
return nil
}
func (wc *WebSocketClient) readLoop() {
for wc.ctx.Err() == nil {
if wc.conn == nil {
wc.logger.Error("websocket connection is not established")
return
}
// TODO(jwetzell): other ways to timeout?
wc.conn.SetReadDeadline(time.Now().Add(5 * time.Second))
messageType, message, err := wc.conn.ReadMessage()
if err != nil {
return
}
if wc.inputHandler != nil {
switch messageType {
case websocket.CloseMessage:
return
case websocket.PingMessage:
err := wc.conn.WriteMessage(websocket.PongMessage, nil)
if err != nil {
wc.logger.Error("websocket pong error", "error", err)
return
}
case websocket.TextMessage:
wc.inputHandler(wc.ctx, wc.Id(), string(message))
case websocket.BinaryMessage:
wc.inputHandler(wc.ctx, wc.Id(), message)
default:
wc.logger.Warn("unsupported message type received", "messageType", messageType)
}
} else {
wc.logger.Error("input received but no input handler is configured")
continue
}
}
}
func (wc *WebSocketClient) outputBytes(ctx context.Context, payload []byte) error {
if wc.conn == nil {
return errors.New("websocket.client client is not setup")
}
err := wc.conn.WriteMessage(websocket.BinaryMessage, payload)
if err != nil {
return err
}
return nil
}
func (wc *WebSocketClient) outputString(ctx context.Context, payload string) error {
if wc.conn == nil {
return errors.New("websocket.client client is not setup")
}
err := wc.conn.WriteMessage(websocket.TextMessage, []byte(payload))
if err != nil {
return err
}
return nil
}
func (wc *WebSocketClient) Output(ctx context.Context, payload any) error {
wc.connMu.Lock()
defer wc.connMu.Unlock()
payloadBytes, ok := common.GetAnyAsByteSlice(payload)
if ok {
return wc.outputBytes(ctx, payloadBytes)
} else {
payloadString, ok := common.GetAnyAs[string](payload)
if ok {
return wc.outputString(ctx, payloadString)
} else {
return errors.New("websocket.client payload must be string or []byte")
}
}
}
func (wc *WebSocketClient) Stop() {
if wc.cancel != nil {
defer wc.cancel()
}
wc.connMu.Lock()
defer wc.connMu.Unlock()
if wc.conn != nil {
err := wc.conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Minute))
if err != nil {
wc.logger.Error("websocket close error", "error", err)
}
wc.conn.Close()
}
}
package processor
import (
"context"
"fmt"
"github.com/jwetzell/artnet-go"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "artnet.packet.decode",
Title: "Decode ArtNet Packet",
New: func(config config.ProcessorConfig) (Processor, error) {
return &ArtNetPacketDecode{config: config}, nil
},
})
}
type ArtNetPacketDecode struct {
config config.ProcessorConfig
}
func (apd *ArtNetPacketDecode) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadBytes, ok := common.GetAnyAsByteSlice(payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("artnet.packet.decode processor only accepts a []byte")
}
payloadMessage, err := artnet.Decode(payloadBytes)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
wrappedPayload.Payload = payloadMessage
return wrappedPayload, nil
}
func (apd *ArtNetPacketDecode) Type() string {
return apd.config.Type
}
package processor
import (
"context"
"fmt"
"github.com/jwetzell/artnet-go"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "artnet.packet.encode",
Title: "Encode ArtNet Packet",
New: func(config config.ProcessorConfig) (Processor, error) {
return &ArtNetPacketEncode{config: config}, nil
},
})
}
type ArtNetPacketEncode struct {
config config.ProcessorConfig
}
func (ape *ArtNetPacketEncode) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadPacket, ok := common.GetAnyAs[artnet.ArtNetPacket](payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("artnet.packet.encode processor only accepts an ArtNetPacket")
}
payloadBytes, err := payloadPacket.MarshalBinary()
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
wrappedPayload.Payload = payloadBytes
return wrappedPayload, nil
}
func (ape *ArtNetPacketEncode) Type() string {
return ape.config.Type
}
package processor
import (
"bytes"
"context"
"errors"
"fmt"
"log/slog"
"text/template"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "db.query",
Title: "Query Database",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"module": {
Title: "Module ID",
Description: "ID of the database module to query",
Type: "string",
},
"query": {
Title: "Query",
Description: "SQL query to execute",
Type: "string",
},
},
Required: []string{"module", "query"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
moduleIdString, err := params.GetString("module")
if err != nil {
return nil, fmt.Errorf("db.query module error: %w", err)
}
queryString, err := params.GetString("query")
if err != nil {
return nil, fmt.Errorf("db.query query error: %w", err)
}
queryTemplate, err := template.New("query").Parse(queryString)
if err != nil {
return nil, err
}
return &DbQuery{config: config, ModuleId: moduleIdString, Query: queryTemplate, logger: slog.Default().With("component", "processor", "type", config.Type)}, nil
},
})
}
type DbQuery struct {
config config.ProcessorConfig
ModuleId string
Query *template.Template
logger *slog.Logger
module common.DatabaseModule
}
func (dq *DbQuery) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
if dq.module == nil {
if wrappedPayload.Modules == nil {
wrappedPayload.End = true
return wrappedPayload, errors.New("db.query wrapped payload has no modules")
}
module, ok := wrappedPayload.Modules[dq.ModuleId]
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("db.query unable to find module with id: %s", dq.ModuleId)
}
dbModule, ok := module.(common.DatabaseModule)
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("db.query module with id %s is not a DatabaseModule", dq.ModuleId)
}
dq.module = dbModule
}
var queryBuffer bytes.Buffer
err := dq.Query.Execute(&queryBuffer, wrappedPayload)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
// support proper parameterized queries
rows, err := dq.module.QueryContext(ctx, queryBuffer.String())
if err != nil {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("db.query error executing query: %w", err)
}
defer rows.Close()
columns, err := rows.Columns()
if err != nil {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("db.query error getting columns: %w", err)
}
// TODO(jwetzell): optimize this
results := make([]map[string]any, 0)
for rows.Next() {
columnValues := make([]any, len(columns))
for i := range columnValues {
columnValues[i] = new(any)
}
if err := rows.Scan(columnValues...); err != nil {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("db.query error scanning row: %w", err)
}
rowMap := make(map[string]any)
for i, colName := range columns {
value := *columnValues[i].(*any)
rowMap[colName] = value
}
results = append(results, rowMap)
}
if len(results) == 0 {
wrappedPayload.Payload = nil
return wrappedPayload, nil
} else if len(results) == 1 {
wrappedPayload.Payload = results[0]
return wrappedPayload, nil
}
wrappedPayload.Payload = results
return wrappedPayload, nil
}
func (dq *DbQuery) Type() string {
return dq.config.Type
}
package processor
import (
"context"
"fmt"
"log/slog"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "debug.log",
Title: "Debug Log",
New: func(config config.ProcessorConfig) (Processor, error) {
return &DebugLog{config: config, logger: slog.Default().With("component", "processor", "type", config.Type)}, nil
},
})
}
type DebugLog struct {
config config.ProcessorConfig
logger *slog.Logger
}
// TODO(jwetzell): maybe make a more useful logging processor
func (dl *DebugLog) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadString := fmt.Sprintf("%+v", payload)
payloadType := fmt.Sprintf("%T", payload)
dl.logger.Debug("", "payload", payloadString, "payloadType", payloadType)
return wrappedPayload, nil
}
func (dl *DebugLog) Type() string {
return dl.config.Type
}
package processor
import (
"context"
"reflect"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "filter.change",
Title: "Filter On Change",
New: func(config config.ProcessorConfig) (Processor, error) {
return &FilterChange{config: config}, nil
},
})
}
type FilterChange struct {
config config.ProcessorConfig
previous any
}
func (fc *FilterChange) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
if reflect.DeepEqual(payload, fc.previous) {
wrappedPayload.End = true
return wrappedPayload, nil
}
fc.previous = payload
return wrappedPayload, nil
}
func (fc *FilterChange) Type() string {
return fc.config.Type
}
package processor
import (
"context"
"fmt"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "filter.expr",
Title: "Filter by Expr expression",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"expression": {
Title: "Expression",
Description: "Expr expression to evaluate, must return a boolean",
Type: "string",
},
},
Required: []string{"expression"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
expressionString, err := params.GetString("expression")
if err != nil {
return nil, fmt.Errorf("filter.expr expression error: %w", err)
}
program, err := expr.Compile(expressionString)
if err != nil {
return nil, err
}
return &FilterExpr{config: config, Program: program}, nil
},
})
}
// NOTE(jwetzell): see language definition https://expr-lang.org/docs/language-definition
type FilterExpr struct {
config config.ProcessorConfig
Program *vm.Program
}
func (fe *FilterExpr) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
exprEnv := wrappedPayload
output, err := expr.Run(fe.Program, exprEnv)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
outputBool, ok := output.(bool)
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("filter.expr expression did not return a boolean")
}
if !outputBool {
wrappedPayload.End = true
return wrappedPayload, nil
}
wrappedPayload.Payload = exprEnv.Payload
return wrappedPayload, nil
}
func (fe *FilterExpr) Type() string {
return fe.config.Type
}
package processor
import (
"context"
"fmt"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"golang.org/x/time/rate"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "filter.rate",
Title: "Filter by Rate",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"rate": {
Title: "Rate",
Description: "number of events to allow per second",
Type: "integer",
},
},
Required: []string{"rate"},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
rateInt, err := params.GetInt("rate")
if err != nil {
return nil, fmt.Errorf("filter.rate rate error: %w", err)
}
limiter := rate.NewLimiter(rate.Limit(rateInt), rateInt*2)
return &FilterRate{config: config, limiter: limiter}, nil
},
})
}
type FilterRate struct {
config config.ProcessorConfig
limiter *rate.Limiter
}
func (fc *FilterRate) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
err := fc.limiter.Wait(ctx)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
return wrappedPayload, nil
}
func (fc *FilterRate) Type() string {
return fc.config.Type
}
package processor
import (
"context"
"errors"
"fmt"
"regexp"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "filter.regex",
Title: "Filter by Regex",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"pattern": {
Title: "Pattern",
Description: "regex pattern to match against",
Type: "string",
},
},
Required: []string{"pattern"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
patternString, err := params.GetString("pattern")
if err != nil {
return nil, fmt.Errorf("filter.regex pattern error: %w", err)
}
patternRegexp, err := regexp.Compile(patternString)
if err != nil {
return nil, err
}
return &FilterRegex{config: config, Pattern: patternRegexp}, nil
},
})
}
type FilterRegex struct {
config config.ProcessorConfig
Pattern *regexp.Regexp
}
func (fr *FilterRegex) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadString, ok := common.GetAnyAs[string](payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("filter.regex processor only accepts a string")
}
if !fr.Pattern.MatchString(payloadString) {
wrappedPayload.End = true
return wrappedPayload, nil
}
wrappedPayload.Payload = payloadString
return wrappedPayload, nil
}
func (fr *FilterRegex) Type() string {
return fr.config.Type
}
package processor
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "float.parse",
Title: "Parse Float",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"bitSize": {
Title: "Bit Size",
Description: "bit size of the resulting float",
Type: "integer",
Enum: []any{32, 64},
Default: json.RawMessage("64"),
},
},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(moduleConfig config.ProcessorConfig) (Processor, error) {
params := moduleConfig.Params
bitSizeNum, err := params.GetInt("bitSize")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
bitSizeNum = 64
} else {
return nil, fmt.Errorf("float.parse bitSize error: %w", err)
}
}
return &FloatParse{config: moduleConfig, BitSize: bitSizeNum}, nil
},
})
}
type FloatParse struct {
BitSize int
config config.ProcessorConfig
}
func (fp *FloatParse) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadString, ok := common.GetAnyAs[string](payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("float.parse processor only accepts a string")
}
payloadFloat, err := strconv.ParseFloat(payloadString, fp.BitSize)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
wrappedPayload.Payload = payloadFloat
return wrappedPayload, nil
}
func (fp *FloatParse) Type() string {
return fp.config.Type
}
package processor
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand/v2"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "float.random",
Title: "Random Float",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"bitSize": {
Title: "Bit Size",
Description: "bit size of the resulting float",
Type: "integer",
Enum: []any{32, 64},
Default: json.RawMessage("32"),
},
"min": {
Title: "Minimum",
Description: "inclusive minimum value of the random float",
Type: "number",
},
"max": {
Title: "Maximum",
Description: "exclusive maximum value of the random float",
Type: "number",
},
},
Required: []string{"min", "max"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(processorConfig config.ProcessorConfig) (Processor, error) {
params := processorConfig.Params
bitSizeInt, err := params.GetInt("bitSize")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
bitSizeInt = 32
} else {
return nil, fmt.Errorf("float.random bitSize error: %w", err)
}
}
if bitSizeInt != 32 && bitSizeInt != 64 {
return nil, errors.New("float.random bitSize error: must be 32 or 64")
}
minFloat, err := params.GetFloat64("min")
if err != nil {
return nil, fmt.Errorf("float.random min error: %w", err)
}
maxFloat, err := params.GetFloat64("max")
if err != nil {
return nil, fmt.Errorf("float.random max error: %w", err)
}
if maxFloat < minFloat {
return nil, errors.New("float.random max must be greater than min")
}
return &FloatRandom{config: processorConfig, Min: minFloat, Max: maxFloat, BitSize: bitSizeInt}, nil
},
})
}
type FloatRandom struct {
BitSize int
Min float64
Max float64
config config.ProcessorConfig
}
func (fr *FloatRandom) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
if fr.BitSize == 32 {
payloadFloat := rand.Float32()*(float32(fr.Max)-float32(fr.Min)) + float32(fr.Min)
wrappedPayload.Payload = payloadFloat
return wrappedPayload, nil
}
if fr.BitSize == 64 {
payloadFloat := rand.Float64()*(fr.Max-fr.Min) + fr.Min
wrappedPayload.Payload = payloadFloat
return wrappedPayload, nil
}
wrappedPayload.End = true
return wrappedPayload, errors.New("float.random bitSize error: must be 32 or 64")
}
func (fr *FloatRandom) Type() string {
return fr.config.Type
}
package processor
import (
"bytes"
"context"
"fmt"
"strconv"
"text/template"
"github.com/google/jsonschema-go/jsonschema"
freeD "github.com/jwetzell/free-d-go"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "freed.create",
Title: "Create FreeD",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"id": {
Title: "Camera ID",
Description: "ID of the camera this FreeD position corresponds to",
Type: "string",
},
"pan": {
Title: "Pan",
Description: "pan angle of the camera",
Type: "string",
},
"tilt": {
Title: "Tilt",
Description: "tilt angle of the camera",
Type: "string",
},
"roll": {
Title: "Roll",
Description: "roll angle of the camera",
Type: "string",
},
"posX": {
Title: "Position X",
Description: "X coordinate of the camera's position",
Type: "string",
},
"posY": {
Title: "Position Y",
Description: "Y coordinate of the camera's position",
Type: "string",
},
"posZ": {
Title: "Position Z",
Description: "Z coordinate of the camera's position",
Type: "string",
},
"zoom": {
Title: "Zoom",
Description: "zoom level of the camera",
Type: "string",
},
"focus": {
Title: "Focus",
Description: "focus level of the camera",
Type: "string",
},
},
Required: []string{
"id",
"pan",
"tilt",
"roll",
"posX",
"posY",
"posZ",
"zoom",
"focus",
},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
// TODO(jwetzell): make some params optional
params := config.Params
idString, err := params.GetString("id")
if err != nil {
return nil, fmt.Errorf("freed.create id error: %w", err)
}
idTemplate, err := template.New("id").Parse(idString)
if err != nil {
return nil, err
}
panString, err := params.GetString("pan")
if err != nil {
return nil, fmt.Errorf("freed.create pan error: %w", err)
}
panTemplate, err := template.New("pan").Parse(panString)
if err != nil {
return nil, err
}
tiltString, err := params.GetString("tilt")
if err != nil {
return nil, fmt.Errorf("freed.create tilt error: %w", err)
}
tiltTemplate, err := template.New("tilt").Parse(tiltString)
if err != nil {
return nil, err
}
rollString, err := params.GetString("roll")
if err != nil {
return nil, fmt.Errorf("freed.create roll error: %w", err)
}
rollTemplate, err := template.New("roll").Parse(rollString)
if err != nil {
return nil, err
}
posXString, err := params.GetString("posX")
if err != nil {
return nil, fmt.Errorf("freed.create posX error: %w", err)
}
posXTemplate, err := template.New("posX").Parse(posXString)
if err != nil {
return nil, err
}
posYString, err := params.GetString("posY")
if err != nil {
return nil, fmt.Errorf("freed.create posY error: %w", err)
}
posYTemplate, err := template.New("posY").Parse(posYString)
if err != nil {
return nil, err
}
posZString, err := params.GetString("posZ")
if err != nil {
return nil, fmt.Errorf("freed.create posZ error: %w", err)
}
posZTemplate, err := template.New("posZ").Parse(posZString)
if err != nil {
return nil, err
}
zoomString, err := params.GetString("zoom")
if err != nil {
return nil, fmt.Errorf("freed.create zoom error: %w", err)
}
zoomTemplate, err := template.New("zoom").Parse(zoomString)
if err != nil {
return nil, err
}
focusString, err := params.GetString("focus")
if err != nil {
return nil, fmt.Errorf("freed.create focus error: %w", err)
}
focusTemplate, err := template.New("focus").Parse(focusString)
if err != nil {
return nil, err
}
return &FreeDCreate{
config: config,
Id: idTemplate,
Pan: panTemplate,
Tilt: tiltTemplate,
Roll: rollTemplate,
PosX: posXTemplate,
PosY: posYTemplate,
PosZ: posZTemplate,
Zoom: zoomTemplate,
Focus: focusTemplate,
}, nil
},
})
}
type FreeDCreate struct {
config config.ProcessorConfig
Id *template.Template
Pan *template.Template
Tilt *template.Template
Roll *template.Template
PosX *template.Template
PosY *template.Template
PosZ *template.Template
Zoom *template.Template
Focus *template.Template
}
func (fc *FreeDCreate) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
templateData := wrappedPayload
var idBuffer bytes.Buffer
err := fc.Id.Execute(&idBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
idString := idBuffer.String()
idNum, err := strconv.ParseUint(idString, 10, 8)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
var panBuffer bytes.Buffer
err = fc.Pan.Execute(&panBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
panString := panBuffer.String()
panNum, err := strconv.ParseFloat(panString, 32)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
var tiltBuffer bytes.Buffer
err = fc.Tilt.Execute(&tiltBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
tiltString := tiltBuffer.String()
tiltNum, err := strconv.ParseFloat(tiltString, 32)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
var rollBuffer bytes.Buffer
err = fc.Roll.Execute(&rollBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
rollString := rollBuffer.String()
rollNum, err := strconv.ParseFloat(rollString, 32)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
var posXBuffer bytes.Buffer
err = fc.PosX.Execute(&posXBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
posXString := posXBuffer.String()
posXNum, err := strconv.ParseFloat(posXString, 32)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
var posYBuffer bytes.Buffer
err = fc.PosY.Execute(&posYBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
posYString := posYBuffer.String()
posYNum, err := strconv.ParseFloat(posYString, 32)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
var posZBuffer bytes.Buffer
err = fc.PosZ.Execute(&posZBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
posZString := posZBuffer.String()
posZNum, err := strconv.ParseFloat(posZString, 32)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
var zoomBuffer bytes.Buffer
err = fc.Zoom.Execute(&zoomBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
zoomString := zoomBuffer.String()
zoomNum, err := strconv.ParseInt(zoomString, 10, 32)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
var focusBuffer bytes.Buffer
err = fc.Focus.Execute(&focusBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
focusString := focusBuffer.String()
focusNum, err := strconv.ParseInt(focusString, 10, 32)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
payloadMessage := freeD.FreeDPosition{
ID: uint8(idNum),
Pan: float32(panNum),
Tilt: float32(tiltNum),
Roll: float32(rollNum),
PosX: float32(posXNum),
PosY: float32(posYNum),
PosZ: float32(posZNum),
Zoom: int32(zoomNum),
Focus: int32(focusNum),
}
wrappedPayload.Payload = payloadMessage
return wrappedPayload, nil
}
func (fc *FreeDCreate) Type() string {
return fc.config.Type
}
package processor
import (
"context"
"errors"
freeD "github.com/jwetzell/free-d-go"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "freed.decode",
Title: "Decode FreeD",
New: func(config config.ProcessorConfig) (Processor, error) {
return &FreeDDecode{config: config}, nil
},
})
}
type FreeDDecode struct {
config config.ProcessorConfig
buf [29]byte
}
func (fd *FreeDDecode) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadBytes, ok := common.GetAnyAsByteSlice(payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("freed.decode processor only accepts a []byte")
}
if len(payloadBytes) != 29 {
wrappedPayload.End = true
return wrappedPayload, errors.New("freeD packet must be exactly 29 bytes")
}
copy(fd.buf[:], payloadBytes)
payloadMessage, err := freeD.Decode(fd.buf)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
wrappedPayload.Payload = payloadMessage
return wrappedPayload, nil
}
func (fd *FreeDDecode) Type() string {
return fd.config.Type
}
package processor
import (
"context"
"errors"
freeD "github.com/jwetzell/free-d-go"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "freed.encode",
Title: "Encode FreeD",
New: func(config config.ProcessorConfig) (Processor, error) {
return &FreeDEncode{config: config}, nil
},
})
}
type FreeDEncode struct {
config config.ProcessorConfig
}
func (fe *FreeDEncode) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadPosition, ok := common.GetAnyAs[freeD.FreeDPosition](payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("freed.encode processor only accepts a FreeDPosition")
}
payloadBytes := freeD.Encode(payloadPosition)
wrappedPayload.Payload = payloadBytes
return wrappedPayload, nil
}
func (fe *FreeDEncode) Type() string {
return fe.config.Type
}
package processor
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"text/template"
"time"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "http.request.do",
Title: "Do HTTP Request",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"method": {
Title: "HTTP Method",
Description: "method to use for the HTTP request",
Type: "string",
Enum: []any{"GET", "POST", "PUT", "PATCH", "DELETE"},
},
"url": {
Title: "URL",
Description: "URL to send the HTTP request to",
Type: "string",
},
},
Required: []string{"method", "url"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
methodString, err := params.GetString("method")
if err != nil {
return nil, fmt.Errorf("http.request.do method error: %w", err)
}
urlString, err := params.GetString("url")
if err != nil {
return nil, fmt.Errorf("http.request.do url error: %w", err)
}
urlTemplate, err := template.New("url").Parse(urlString)
if err != nil {
return nil, err
}
client := &http.Client{
Timeout: 10 * time.Second,
}
return &HTTPRequestDo{config: config, URL: urlTemplate, Method: methodString, client: client}, nil
},
})
}
type HTTPRequestDo struct {
config config.ProcessorConfig
client *http.Client
Method string
URL *template.Template
}
func (hrd *HTTPRequestDo) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
templateData := wrappedPayload
var urlBuffer bytes.Buffer
err := hrd.URL.Execute(&urlBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
urlString := urlBuffer.String()
//TODO(jwetzell): support body
request, err := http.NewRequest(hrd.Method, urlString, bytes.NewBuffer([]byte{}))
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
response, err := hrd.client.Do(request)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
body, err := io.ReadAll(response.Body)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
//TODO(jwetzell): support headers, etc
wrappedPayload.Payload = HTTPResponse{
Status: response.StatusCode,
Body: body,
}
return wrappedPayload, nil
}
func (hrd *HTTPRequestDo) Type() string {
return hrd.config.Type
}
package processor
import (
"bytes"
"context"
"fmt"
"text/template"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "http.response.create",
Title: "Create HTTP Response",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"status": {
Title: "Status Code",
Description: "status code to set on the response",
Type: "integer",
},
"bodyTemplate": {
Title: "Body Template",
Description: "template for the response body",
Type: "string",
},
},
Required: []string{"status", "bodyTemplate"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
statusNum, err := params.GetInt("status")
if err != nil {
return nil, fmt.Errorf("http.response.create status error: %w", err)
}
bodyTemplateString, err := params.GetString("bodyTemplate")
if err != nil {
return nil, fmt.Errorf("http.response.create bodyTemplate error: %w", err)
}
bodyTemplate, err := template.New("body").Parse(bodyTemplateString)
if err != nil {
return nil, err
}
// TODO(jwetzell): support other body kind (direct bytes from input, from file?)
return &HTTPResponseCreate{config: config, Status: int(statusNum), BodyTmpl: bodyTemplate}, nil
},
})
}
type HTTPResponseCreate struct {
Status int
BodyTmpl *template.Template
config config.ProcessorConfig
}
type HTTPResponse struct {
Status int
Body []byte
}
func (hrc *HTTPResponseCreate) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
templateData := wrappedPayload
var bodyBuffer bytes.Buffer
err := hrc.BodyTmpl.Execute(&bodyBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
wrappedPayload.Payload = HTTPResponse{
Status: hrc.Status,
Body: bodyBuffer.Bytes(),
}
return wrappedPayload, nil
}
func (hrc *HTTPResponseCreate) Type() string {
return hrc.config.Type
}
package processor
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "int.parse",
Title: "Parse Int",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"base": {
Title: "Base",
Description: "numerical base to use for parsing the integer",
Type: "integer",
Enum: []any{0, 2, 8, 10, 16},
Default: json.RawMessage("10"),
},
"bitSize": {
Title: "Bit Size",
Description: "bit size of the resulting integer",
Type: "integer",
Enum: []any{0, 8, 16, 32, 64},
Default: json.RawMessage("64"),
},
},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(moduleConfig config.ProcessorConfig) (Processor, error) {
params := moduleConfig.Params
baseNum, err := params.GetInt("base")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
baseNum = 10
} else {
return nil, fmt.Errorf("int.parse base error: %w", err)
}
}
bitSizeNum, err := params.GetInt("bitSize")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
bitSizeNum = 64
} else {
return nil, fmt.Errorf("int.parse bitSize error: %w", err)
}
}
return &IntParse{config: moduleConfig, Base: baseNum, BitSize: bitSizeNum}, nil
},
})
}
type IntParse struct {
Base int
BitSize int
config config.ProcessorConfig
}
func (ip *IntParse) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadString, ok := common.GetAnyAs[string](payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("int.parse processor only accepts a string")
}
payloadInt, err := strconv.ParseInt(payloadString, ip.Base, ip.BitSize)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
wrappedPayload.Payload = payloadInt
return wrappedPayload, nil
}
func (ip *IntParse) Type() string {
return ip.config.Type
}
package processor
import (
"context"
"errors"
"fmt"
"math/rand/v2"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "int.random",
Title: "Random Int",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"min": {
Title: "Minimum",
Description: "inclusive minimum value of the random integer",
Type: "integer",
},
"max": {
Title: "Maximum",
Description: "inclusive maximum value of the random integer",
Type: "integer",
},
},
Required: []string{"min", "max"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
minInt, err := params.GetInt("min")
if err != nil {
return nil, fmt.Errorf("int.random min error: %w", err)
}
maxInt, err := params.GetInt("max")
if err != nil {
return nil, fmt.Errorf("int.random max error: %w", err)
}
if maxInt < minInt {
return nil, errors.New("int.random max must be greater than min")
}
return &IntRandom{config: config, Min: int(minInt), Max: int(maxInt)}, nil
},
})
}
type IntRandom struct {
Min int
Max int
config config.ProcessorConfig
}
func (ir *IntRandom) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payloadInt := rand.IntN(ir.Max-ir.Min+1) + ir.Min
wrappedPayload.Payload = payloadInt
return wrappedPayload, nil
}
func (ir *IntRandom) Type() string {
return ir.config.Type
}
package processor
import (
"context"
"errors"
"fmt"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "int.scale",
Title: "Scale Int",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"inMin": {
Title: "Input Minimum",
Description: "minimum value of the input integer",
Type: "integer",
},
"inMax": {
Title: "Input Maximum",
Description: "maximum value of the input integer",
Type: "integer",
},
"outMin": {
Title: "Output Minimum",
Description: "minimum value of the output integer",
Type: "integer",
},
"outMax": {
Title: "Output Maximum",
Description: "maximum value of the output integer",
Type: "integer",
},
},
Required: []string{"inMin", "inMax", "outMin", "outMax"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
inMinInt, err := params.GetInt("inMin")
if err != nil {
return nil, fmt.Errorf("int.scale inMin error: %w", err)
}
inMaxInt, err := params.GetInt("inMax")
if err != nil {
return nil, fmt.Errorf("int.scale inMax error: %w", err)
}
if inMaxInt < inMinInt {
return nil, errors.New("int.scale inMax must be greater than inMin")
}
outMinInt, err := params.GetInt("outMin")
if err != nil {
return nil, fmt.Errorf("int.scale outMin error: %w", err)
}
outMaxInt, err := params.GetInt("outMax")
if err != nil {
return nil, fmt.Errorf("int.scale outMax error: %w", err)
}
if outMaxInt < outMinInt {
return nil, errors.New("int.scale outMax must be greater than outMin")
}
return &IntScale{config: config, InMin: inMinInt, InMax: inMaxInt, OutMin: outMinInt, OutMax: outMaxInt}, nil
},
})
}
type IntScale struct {
OutMin int
OutMax int
InMin int
InMax int
config config.ProcessorConfig
}
func (is *IntScale) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadInt, ok := common.GetAnyAs[int](payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("int.scale can only process an int")
}
payloadInt = (payloadInt-is.InMin)*(is.OutMax-is.OutMin)/(is.InMax-is.InMin) + is.OutMin
wrappedPayload.Payload = payloadInt
return wrappedPayload, nil
}
func (is *IntScale) Type() string {
return is.config.Type
}
package processor
import (
"context"
"encoding/json"
"errors"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "json.decode",
Title: "Decode JSON",
New: func(config config.ProcessorConfig) (Processor, error) {
return &JsonDecode{config: config}, nil
},
})
}
type JsonDecode struct {
config config.ProcessorConfig
}
func (jd *JsonDecode) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadBytes, ok := common.GetAnyAsByteSlice(payload)
if !ok {
payloadString, ok := common.GetAnyAs[string](payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("json.decode can only process a string or []byte")
}
payloadBytes = []byte(payloadString)
}
payloadJson := make(map[string]any)
err := json.Unmarshal(payloadBytes, &payloadJson)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
wrappedPayload.Payload = payloadJson
return wrappedPayload, nil
}
func (jd *JsonDecode) Type() string {
return jd.config.Type
}
package processor
import (
"bytes"
"context"
"encoding/json"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "json.encode",
Title: "Encode JSON",
New: func(config config.ProcessorConfig) (Processor, error) {
return &JsonEncode{config: config}, nil
},
})
}
type JsonEncode struct {
config config.ProcessorConfig
}
func (je *JsonEncode) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
var payloadBuffer bytes.Buffer
err := json.NewEncoder(&payloadBuffer).Encode(payload)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
payloadBytes := payloadBuffer.Bytes()
payloadBytes = payloadBytes[0 : len(payloadBytes)-1]
wrappedPayload.Payload = payloadBytes
return wrappedPayload, nil
}
func (je *JsonEncode) Type() string {
return je.config.Type
}
package processor
import (
"context"
"errors"
"fmt"
"log/slog"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "kv.get",
Title: "Get Key",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"module": {
Title: "Module ID",
Description: "ID of the key-value module to get the value from",
Type: "string",
},
"key": {
Title: "Key",
Description: "key to retrieve from the key-value module",
Type: "string",
},
},
Required: []string{"module", "key"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
moduleIdString, err := params.GetString("module")
if err != nil {
return nil, fmt.Errorf("kv.get module error: %w", err)
}
keyString, err := params.GetString("key")
if err != nil {
return nil, fmt.Errorf("kv.get key error: %w", err)
}
return &KVGet{config: config, ModuleId: moduleIdString, Key: keyString, logger: slog.Default().With("component", "processor", "type", config.Type)}, nil
},
})
}
type KVGet struct {
config config.ProcessorConfig
ModuleId string
Key string
logger *slog.Logger
module common.KeyValueModule
}
func (kvg *KVGet) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
if kvg.module == nil {
if wrappedPayload.Modules == nil {
wrappedPayload.End = true
return wrappedPayload, errors.New("kv.get wrapped payload has no modules")
}
module, ok := wrappedPayload.Modules[kvg.ModuleId]
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("kv.get unable to find module with id: %s", kvg.ModuleId)
}
kvModule, ok := module.(common.KeyValueModule)
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("kv.get module with id %s is not a KeyValueModule", kvg.ModuleId)
}
kvg.module = kvModule
}
value, err := kvg.module.Get(ctx, kvg.Key)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("kv.get error getting key: %w", err)
}
wrappedPayload.Payload = value
return wrappedPayload, nil
}
func (kvg *KVGet) Type() string {
return kvg.config.Type
}
package processor
import (
"context"
"errors"
"fmt"
"log/slog"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "kv.set",
Title: "Set Key",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"module": {
Title: "Module ID",
Description: "ID of the key-value module to set the value in",
Type: "string",
},
"key": {
Title: "Key",
Description: "key to set in the key-value module",
Type: "string",
},
},
Required: []string{"module", "key"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
moduleIdString, err := params.GetString("module")
if err != nil {
return nil, fmt.Errorf("kv.set module error: %w", err)
}
keyString, err := params.GetString("key")
if err != nil {
return nil, fmt.Errorf("kv.set key error: %w", err)
}
return &KVSet{config: config, ModuleId: moduleIdString, Key: keyString, logger: slog.Default().With("component", "processor", "type", config.Type)}, nil
},
})
}
type KVSet struct {
config config.ProcessorConfig
ModuleId string
Key string
logger *slog.Logger
module common.KeyValueModule
}
func (kvs *KVSet) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
if kvs.module == nil {
if wrappedPayload.Modules == nil {
wrappedPayload.End = true
return wrappedPayload, errors.New("kv.set wrapped payload has no modules")
}
module, ok := wrappedPayload.Modules[kvs.ModuleId]
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("kv.set unable to find module with id: %s", kvs.ModuleId)
}
kvModule, ok := module.(common.KeyValueModule)
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("kv.set module with id %s is not a KeyValueModule", kvs.ModuleId)
}
kvs.module = kvModule
}
err := kvs.module.Set(ctx, kvs.Key, wrappedPayload.Payload)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("kv.set error setting key: %w", err)
}
return wrappedPayload, nil
}
func (kvs *KVSet) Type() string {
return kvs.config.Type
}
//go:build cgo
package processor
import (
"bytes"
"context"
"fmt"
"strconv"
"text/template"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"gitlab.com/gomidi/midi/v2"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "midi.control_change.create",
Title: "Create MIDI Control Change Message",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"channel": {
Title: "Channel",
Description: "channel number for the control change message",
Type: "string",
},
"control": {
Title: "Control",
Description: "control number for the control change message",
Type: "string",
},
"value": {
Title: "Value",
Description: "value for the control change message",
Type: "string",
},
},
Required: []string{"channel", "control", "value"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
channelString, err := params.GetString("channel")
if err != nil {
return nil, fmt.Errorf("midi.control_change.create channel error: %w", err)
}
channelTemplate, err := template.New("channel").Parse(channelString)
if err != nil {
return nil, err
}
controlString, err := params.GetString("control")
if err != nil {
return nil, fmt.Errorf("midi.control_change.create control error: %w", err)
}
controlTemplate, err := template.New("control").Parse(controlString)
if err != nil {
return nil, err
}
valueString, err := params.GetString("value")
if err != nil {
return nil, fmt.Errorf("midi.control_change.create value error: %w", err)
}
valueTemplate, err := template.New("value").Parse(valueString)
if err != nil {
return nil, err
}
return &MIDIControlChangeCreate{
config: config,
Channel: channelTemplate,
Control: controlTemplate,
Value: valueTemplate,
}, nil
},
})
}
type MIDIControlChangeCreate struct {
config config.ProcessorConfig
Channel *template.Template
Control *template.Template
Value *template.Template
}
func (mccc *MIDIControlChangeCreate) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
templateData := wrappedPayload
var channelBuffer bytes.Buffer
err := mccc.Channel.Execute(&channelBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
channelValue, err := strconv.ParseUint(channelBuffer.String(), 10, 8)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
var controlBuffer bytes.Buffer
err = mccc.Control.Execute(&controlBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
controlValue, err := strconv.ParseUint(controlBuffer.String(), 10, 8)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
var valueBuffer bytes.Buffer
err = mccc.Value.Execute(&valueBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
valueValue, err := strconv.ParseUint(valueBuffer.String(), 10, 8)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
payloadMessage := midi.ControlChange(uint8(channelValue), uint8(controlValue), uint8(valueValue))
wrappedPayload.Payload = payloadMessage
return wrappedPayload, nil
}
func (mccc *MIDIControlChangeCreate) Type() string {
return mccc.config.Type
}
//go:build cgo
package processor
import (
"context"
"errors"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"gitlab.com/gomidi/midi/v2"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "midi.message.decode",
Title: "Decode MIDI Message",
New: func(config config.ProcessorConfig) (Processor, error) {
return &MIDIMessageDecode{config: config}, nil
},
})
}
type MIDIMessageDecode struct {
config config.ProcessorConfig
}
func (mmd *MIDIMessageDecode) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadBytes, ok := common.GetAnyAsByteSlice(payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("midi.message.decode processor only accepts a []byte")
}
payloadMessage := midi.Message(payloadBytes)
wrappedPayload.Payload = payloadMessage
return wrappedPayload, nil
}
func (mmd *MIDIMessageDecode) Type() string {
return mmd.config.Type
}
//go:build cgo
package processor
import (
"context"
"errors"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"gitlab.com/gomidi/midi/v2"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "midi.message.encode",
Title: "Encode MIDI Message",
New: func(config config.ProcessorConfig) (Processor, error) {
return &MIDIMessageEncode{config: config}, nil
},
})
}
type MIDIMessageEncode struct {
config config.ProcessorConfig
}
func (mme *MIDIMessageEncode) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadMessage, ok := common.GetAnyAs[midi.Message](payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("midi.message.encode processor only accepts a midi.Message")
}
wrappedPayload.Payload = payloadMessage.Bytes()
return wrappedPayload, nil
}
func (mme *MIDIMessageEncode) Type() string {
return mme.config.Type
}
//go:build cgo
package processor
import (
"context"
"errors"
"fmt"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"gitlab.com/gomidi/midi/v2"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "midi.message.unpack",
Title: "Unpack MIDI Message",
New: func(config config.ProcessorConfig) (Processor, error) {
return &MIDIMessageUnpack{config: config}, nil
},
})
}
type MIDIMessageUnpack struct {
config config.ProcessorConfig
}
type MIDINoteOn struct {
Channel uint8
Note uint8
Velocity uint8
}
type MIDINoteOff struct {
Channel uint8
Note uint8
Velocity uint8
}
type MIDIControlChange struct {
Channel uint8
Control uint8
Value uint8
}
type MIDIProgramChange struct {
Channel uint8
Program uint8
}
type MIDIPitchBend struct {
Channel uint8
Relative int16
Absolute uint16
}
func (mmu *MIDIMessageUnpack) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadMidi, ok := common.GetAnyAs[midi.Message](payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("midi.message.unpack processor only accepts a midi.Message")
}
switch payloadMidi.Type() {
case midi.NoteOnMsg:
noteOnMsg := MIDINoteOn{}
payloadMidi.GetNoteOn(¬eOnMsg.Channel, ¬eOnMsg.Note, ¬eOnMsg.Velocity)
wrappedPayload.Payload = noteOnMsg
return wrappedPayload, nil
case midi.NoteOffMsg:
noteOffMsg := MIDINoteOff{}
payloadMidi.GetNoteOff(¬eOffMsg.Channel, ¬eOffMsg.Note, ¬eOffMsg.Velocity)
wrappedPayload.Payload = noteOffMsg
return wrappedPayload, nil
case midi.ControlChangeMsg:
controlChangeMsg := MIDIControlChange{}
payloadMidi.GetControlChange(&controlChangeMsg.Channel, &controlChangeMsg.Control, &controlChangeMsg.Value)
wrappedPayload.Payload = controlChangeMsg
return wrappedPayload, nil
case midi.ProgramChangeMsg:
programChangeMsg := MIDIProgramChange{}
payloadMidi.GetProgramChange(&programChangeMsg.Channel, &programChangeMsg.Program)
wrappedPayload.Payload = programChangeMsg
return wrappedPayload, nil
case midi.PitchBendMsg:
pitchBendMsg := MIDIPitchBend{}
payloadMidi.GetPitchBend(&pitchBendMsg.Channel, &pitchBendMsg.Relative, &pitchBendMsg.Absolute)
wrappedPayload.Payload = pitchBendMsg
return wrappedPayload, nil
default:
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("midi.message.unpack message type not supported %v", payloadMidi.Type())
}
}
func (mmu *MIDIMessageUnpack) Type() string {
return mmu.config.Type
}
//go:build cgo
package processor
import (
"bytes"
"context"
"fmt"
"strconv"
"text/template"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"gitlab.com/gomidi/midi/v2"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "midi.note_off.create",
Title: "Create MIDI Note Off Message",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"channel": {
Title: "Channel",
Description: "channel number for the note off message",
Type: "string",
},
"note": {
Title: "Note",
Description: "note number for the note off message",
Type: "string",
},
"velocity": {
Title: "Velocity",
Description: "velocity for the note off message",
Type: "string",
},
},
Required: []string{"channel", "note", "velocity"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
channelString, err := params.GetString("channel")
if err != nil {
return nil, fmt.Errorf("midi.note_off.create channel error: %w", err)
}
channelTemplate, err := template.New("channel").Parse(channelString)
if err != nil {
return nil, err
}
noteString, err := params.GetString("note")
if err != nil {
return nil, fmt.Errorf("midi.note_off.create note error: %w", err)
}
noteTemplate, err := template.New("note").Parse(noteString)
if err != nil {
return nil, err
}
velocityString, err := params.GetString("velocity")
if err != nil {
return nil, fmt.Errorf("midi.note_off.create velocity error: %w", err)
}
velocityTemplate, err := template.New("velocity").Parse(velocityString)
if err != nil {
return nil, err
}
return &MIDINoteOffCreate{
config: config,
Channel: channelTemplate,
Note: noteTemplate,
Velocity: velocityTemplate,
}, nil
},
})
}
type MIDINoteOffCreate struct {
config config.ProcessorConfig
Channel *template.Template
Note *template.Template
Velocity *template.Template
}
func (mnoc *MIDINoteOffCreate) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
templateData := wrappedPayload
var channelBuffer bytes.Buffer
err := mnoc.Channel.Execute(&channelBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
channelValue, err := strconv.ParseUint(channelBuffer.String(), 10, 8)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
var noteBuffer bytes.Buffer
err = mnoc.Note.Execute(¬eBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
noteValue, err := strconv.ParseUint(noteBuffer.String(), 10, 8)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
var velocityBuffer bytes.Buffer
err = mnoc.Velocity.Execute(&velocityBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
velocityValue, err := strconv.ParseUint(velocityBuffer.String(), 10, 8)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
payloadMessage := midi.NoteOffVelocity(uint8(channelValue), uint8(noteValue), uint8(velocityValue))
wrappedPayload.Payload = payloadMessage
return wrappedPayload, nil
}
func (mnoc *MIDINoteOffCreate) Type() string {
return mnoc.config.Type
}
//go:build cgo
package processor
import (
"bytes"
"context"
"fmt"
"strconv"
"text/template"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"gitlab.com/gomidi/midi/v2"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "midi.note_on.create",
Title: "Create MIDI Note On Message",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"channel": {
Title: "Channel",
Description: "channel number for the note on message",
Type: "string",
},
"note": {
Title: "Note",
Description: "note number for the note on message",
Type: "string",
},
"velocity": {
Title: "Velocity",
Description: "velocity for the note on message",
Type: "string",
},
},
Required: []string{"channel", "note", "velocity"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
channelString, err := params.GetString("channel")
if err != nil {
return nil, fmt.Errorf("midi.note_on.create channel error: %w", err)
}
channelTemplate, err := template.New("channel").Parse(channelString)
if err != nil {
return nil, err
}
noteString, err := params.GetString("note")
if err != nil {
return nil, fmt.Errorf("midi.note_on.create note error: %w", err)
}
noteTemplate, err := template.New("note").Parse(noteString)
if err != nil {
return nil, err
}
velocityString, err := params.GetString("velocity")
if err != nil {
return nil, fmt.Errorf("midi.note_on.create velocity error: %w", err)
}
velocityTemplate, err := template.New("velocity").Parse(velocityString)
if err != nil {
return nil, err
}
return &MIDINoteOnCreate{
config: config,
Channel: channelTemplate,
Note: noteTemplate,
Velocity: velocityTemplate,
}, nil
},
})
}
type MIDINoteOnCreate struct {
config config.ProcessorConfig
Channel *template.Template
Note *template.Template
Velocity *template.Template
}
func (mnoc *MIDINoteOnCreate) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
templateData := wrappedPayload
var channelBuffer bytes.Buffer
err := mnoc.Channel.Execute(&channelBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
channelValue, err := strconv.ParseUint(channelBuffer.String(), 10, 8)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
var noteBuffer bytes.Buffer
err = mnoc.Note.Execute(¬eBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
noteValue, err := strconv.ParseUint(noteBuffer.String(), 10, 8)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
var velocityBuffer bytes.Buffer
err = mnoc.Velocity.Execute(&velocityBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
velocityValue, err := strconv.ParseUint(velocityBuffer.String(), 10, 8)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
payloadMessage := midi.NoteOn(uint8(channelValue), uint8(noteValue), uint8(velocityValue))
wrappedPayload.Payload = payloadMessage
return wrappedPayload, nil
}
func (mnoc *MIDINoteOnCreate) Type() string {
return mnoc.config.Type
}
//go:build cgo
package processor
import (
"bytes"
"context"
"fmt"
"strconv"
"text/template"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"gitlab.com/gomidi/midi/v2"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "midi.program_change.create",
Title: "Create MIDI Prgoram Change Message",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"channel": {
Title: "Channel",
Description: "channel number for the program change message",
Type: "string",
},
"program": {
Title: "Program",
Description: "program number for the program change message",
Type: "string",
},
},
Required: []string{"type", "channel", "program"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
channelString, err := params.GetString("channel")
if err != nil {
return nil, fmt.Errorf("midi.program_change.create channel error: %w", err)
}
channelTemplate, err := template.New("channel").Parse(channelString)
if err != nil {
return nil, err
}
programString, err := params.GetString("program")
if err != nil {
return nil, fmt.Errorf("midi.program_change.create program error: %w", err)
}
programTemplate, err := template.New("program").Parse(programString)
if err != nil {
return nil, err
}
return &MIDIProgramChangeCreate{
config: config,
Channel: channelTemplate,
Program: programTemplate,
}, nil
},
})
}
type MIDIProgramChangeCreate struct {
config config.ProcessorConfig
Channel *template.Template
Program *template.Template
}
func (mpcc *MIDIProgramChangeCreate) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
templateData := wrappedPayload
var channelBuffer bytes.Buffer
err := mpcc.Channel.Execute(&channelBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
channelValue, err := strconv.ParseUint(channelBuffer.String(), 10, 8)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
var programBuffer bytes.Buffer
err = mpcc.Program.Execute(&programBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
programValue, err := strconv.ParseUint(programBuffer.String(), 10, 8)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
payloadMessage := midi.ProgramChange(uint8(channelValue), uint8(programValue))
wrappedPayload.Payload = payloadMessage
return wrappedPayload, nil
}
func (mpcc *MIDIProgramChangeCreate) Type() string {
return mpcc.config.Type
}
package processor
import (
"context"
"errors"
"fmt"
"log/slog"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "module.output",
Title: "Module Output",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"module": {
Title: "Module ID",
Description: "ID of module to send output to",
Type: "string",
},
},
Required: []string{"module"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
moduleId, err := params.GetString("module")
if err != nil {
return nil, fmt.Errorf("module.output module error: %w", err)
}
return &ModuleOutput{config: config, ModuleId: moduleId, logger: slog.Default().With("component", "processor", "type", config.Type)}, nil
},
})
}
type ModuleOutput struct {
config config.ProcessorConfig
ModuleId string
logger *slog.Logger
module common.OutputModule
}
func (mo *ModuleOutput) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
if mo.module == nil {
if wrappedPayload.Modules == nil {
wrappedPayload.End = true
return wrappedPayload, errors.New("module.output wrapped payload has no modules")
}
module, ok := wrappedPayload.Modules[mo.ModuleId]
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("module.output unable to find module with id: %s", mo.ModuleId)
}
outputModule, ok := module.(common.OutputModule)
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("module.output module with id %s is not an OutputModule", mo.ModuleId)
}
mo.module = outputModule
}
err := mo.module.Output(ctx, wrappedPayload.Payload)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("module.output failed to send output: %w", err)
}
return wrappedPayload, nil
}
func (mo *ModuleOutput) Type() string {
return mo.config.Type
}
package processor
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"strconv"
"text/template"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/osc-go"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "osc.message.create",
Title: "Create OSC Message",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"address": {
Title: "Address",
Description: "OSC address for the message",
Type: "string",
},
"args": {
Title: "Arguments",
Description: "arguments for the OSC message",
Type: "array",
Items: &jsonschema.Schema{
Type: "string",
},
},
"types": {
Title: "Argument Types",
Description: "string of OSC types corresponding to the arguments in args",
Type: "string",
},
},
Required: []string{"address"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(processorConfig config.ProcessorConfig) (Processor, error) {
params := processorConfig.Params
addressString, err := params.GetString("address")
if err != nil {
return nil, fmt.Errorf("osc.message.create address error: %w", err)
}
addressTemplate, err := template.New("address").Parse(addressString)
if err != nil {
return nil, err
}
argStrings, err := params.GetStringSlice("args")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
return &OSCMessageCreate{config: processorConfig, Address: addressTemplate}, nil
} else {
return nil, fmt.Errorf("osc.message.create args error: %w", err)
}
}
typesString, err := params.GetString("types")
if err != nil {
return nil, fmt.Errorf("osc.message.create types error: %w", err)
}
if len(argStrings) != len(typesString) {
return nil, errors.New("osc.message.create args and types must be the same length")
}
argTemplates := []*template.Template{}
for _, argString := range argStrings {
argTemplate, err := template.New("arg").Parse(argString)
if err != nil {
return nil, err
}
argTemplates = append(argTemplates, argTemplate)
}
return &OSCMessageCreate{config: processorConfig, Address: addressTemplate, Args: argTemplates, Types: typesString}, nil
},
})
}
type OSCMessageCreate struct {
config config.ProcessorConfig
Address *template.Template
Args []*template.Template
Types string
}
func (omc *OSCMessageCreate) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
templateData := wrappedPayload
var addressBuffer bytes.Buffer
err := omc.Address.Execute(&addressBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
addressString := addressBuffer.String()
if len(addressString) == 0 {
wrappedPayload.End = true
return wrappedPayload, errors.New("osc.message.create address must not be empty")
}
if addressString[0] != '/' {
wrappedPayload.End = true
return wrappedPayload, errors.New("osc.message.create address must start with '/'")
}
payloadMessage := &osc.OSCMessage{
Address: addressString,
}
args := []osc.OSCArg{}
for argIndex, argTemplate := range omc.Args {
var argBuffer bytes.Buffer
err := argTemplate.Execute(&argBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
argString := argBuffer.String()
typedArg, err := argToTypedArg(argString, omc.Types[argIndex])
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
args = append(args, typedArg)
}
if len(args) > 0 {
payloadMessage.Args = args
}
wrappedPayload.Payload = payloadMessage
return wrappedPayload, nil
}
func (omc *OSCMessageCreate) Type() string {
return omc.config.Type
}
func argToTypedArg(rawArg string, oscType byte) (osc.OSCArg, error) {
switch oscType {
case 's':
return osc.OSCArg{
Value: rawArg,
Type: "s",
}, nil
case 'i':
number, err := strconv.ParseInt(rawArg, 10, 32)
if err != nil {
return osc.OSCArg{}, err
}
return osc.OSCArg{
Value: int32(number),
Type: "i",
}, nil
case 'f':
number, err := strconv.ParseFloat(rawArg, 32)
if err != nil {
return osc.OSCArg{}, err
}
return osc.OSCArg{
Value: float32(number),
Type: "f",
}, nil
case 'b':
data, err := hex.DecodeString(rawArg)
if err != nil {
return osc.OSCArg{}, err
}
return osc.OSCArg{
Value: data,
Type: "b",
}, nil
case 'h':
number, err := strconv.ParseInt(rawArg, 10, 64)
if err != nil {
return osc.OSCArg{}, err
}
return osc.OSCArg{
Value: int64(number),
Type: "h",
}, nil
case 'd':
number, err := strconv.ParseFloat(rawArg, 64)
if err != nil {
return osc.OSCArg{}, err
}
return osc.OSCArg{
Value: float64(number),
Type: "d",
}, nil
case 'T':
return osc.OSCArg{
Value: true,
Type: "T",
}, nil
case 'F':
return osc.OSCArg{
Value: false,
Type: "F",
}, nil
case 'N':
return osc.OSCArg{
Value: nil,
Type: "N",
}, nil
default:
return osc.OSCArg{}, fmt.Errorf("osc.message.create unhandled osc type: %c", oscType)
}
}
package processor
import (
"context"
"errors"
"fmt"
osc "github.com/jwetzell/osc-go"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "osc.message.decode",
Title: "Decode OSC Message",
New: func(config config.ProcessorConfig) (Processor, error) {
return &OSCMessageDecode{config: config}, nil
},
})
}
type OSCMessageDecode struct {
config config.ProcessorConfig
}
func (omd *OSCMessageDecode) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadBytes, ok := common.GetAnyAsByteSlice(payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("osc.message.decode processor only accepts a []byte payload")
}
if len(payloadBytes) == 0 {
wrappedPayload.End = true
return wrappedPayload, errors.New("osc.message.decode processor can't work on empty []byte")
}
if payloadBytes[0] != '/' {
wrappedPayload.End = true
return wrappedPayload, errors.New("osc.message.decode processor needs an OSC looking []byte")
}
message, err := osc.MessageFromBytes(payloadBytes)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("osc.message.decode processor failed to decode OSC message: %w", err)
}
wrappedPayload.Payload = message
return wrappedPayload, nil
}
func (omd *OSCMessageDecode) Type() string {
return omd.config.Type
}
package processor
import (
"context"
"errors"
"fmt"
osc "github.com/jwetzell/osc-go"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
type OSCMessageEncode struct {
config config.ProcessorConfig
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "osc.message.encode",
Title: "Encode OSC Message",
New: func(config config.ProcessorConfig) (Processor, error) {
return &OSCMessageEncode{config: config}, nil
},
})
}
func (ome *OSCMessageEncode) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadMessage, ok := common.GetAnyAs[*osc.OSCMessage](payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("osc.message.encode processor only accepts an *OSCMessage")
}
bytes, err := payloadMessage.ToBytes()
if err != nil {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("osc.message.encode processor failed to encode OSCMessage: %w", err)
}
wrappedPayload.Payload = bytes
return wrappedPayload, nil
}
func (ome *OSCMessageEncode) Type() string {
return ome.config.Type
}
package processor
import (
"context"
"fmt"
"sync"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
type Processor interface {
Type() string
Process(context.Context, common.WrappedPayload) (common.WrappedPayload, error)
}
type ProcessorRegistration struct {
Type string `json:"type"`
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
ParamsSchema *jsonschema.Schema `json:"paramsSchema,omitempty"`
New func(config.ProcessorConfig) (Processor, error)
}
func RegisterProcessor(processor ProcessorRegistration) {
if processor.Type == "" {
panic("processor type is missing")
}
if processor.New == nil {
panic("missing ProcessorRegistration.New")
}
processorRegistryMu.Lock()
defer processorRegistryMu.Unlock()
if _, ok := processorRegistry[string(processor.Type)]; ok {
panic(fmt.Sprintf("processor already registered: %s", processor.Type))
}
processorRegistry[string(processor.Type)] = processor
}
type ProcessorRegistry map[string]ProcessorRegistration
func GetProcessorRegistration(processorType string) (ProcessorRegistration, bool) {
processorRegistryMu.RLock()
defer processorRegistryMu.RUnlock()
processor, ok := processorRegistry[processorType]
return processor, ok
}
func GetProcessorRegistrations() []ProcessorRegistration {
processorRegistryMu.RLock()
defer processorRegistryMu.RUnlock()
registrations := make([]ProcessorRegistration, 0, len(processorRegistry))
for _, processor := range processorRegistry {
registrations = append(registrations, processor)
}
return registrations
}
var (
processorRegistryMu sync.RWMutex
processorRegistry = make(map[string]ProcessorRegistration)
)
package processor
import (
"bytes"
"context"
"errors"
"fmt"
"log/slog"
"text/template"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "pubsub.publish",
Title: "Publish to Pub/Sub Topic",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"module": {
Title: "Module ID",
Description: "ID of the module to publish to",
Type: "string",
},
"topic": {
Title: "Topic",
Description: "topic to publish to",
Type: "string",
},
},
Required: []string{"module", "topic"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
moduleIdString, err := params.GetString("module")
if err != nil {
return nil, fmt.Errorf("pubsub.publish module error: %w", err)
}
topicString, err := params.GetString("topic")
if err != nil {
return nil, fmt.Errorf("pubsub.publish topic error: %w", err)
}
topicTemplate, err := template.New("topic").Parse(topicString)
if err != nil {
return nil, err
}
return &PubSubPublish{config: config, ModuleId: moduleIdString, Topic: topicTemplate, logger: slog.Default().With("component", "processor", "type", config.Type)}, nil
},
})
}
type PubSubPublish struct {
config config.ProcessorConfig
ModuleId string
Topic *template.Template
logger *slog.Logger
module common.PubSubModule
}
func (psp *PubSubPublish) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
if psp.module == nil {
if wrappedPayload.Modules == nil {
wrappedPayload.End = true
return wrappedPayload, errors.New("pubsub.publish wrapped payload has no modules")
}
module, ok := wrappedPayload.Modules[psp.ModuleId]
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("pubsub.publish unable to find module with id: %s", psp.ModuleId)
}
dbModule, ok := module.(common.PubSubModule)
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("pubsub.publish module with id %s is not an OutputModule", psp.ModuleId)
}
psp.module = dbModule
}
var topicBuffer bytes.Buffer
err := psp.Topic.Execute(&topicBuffer, wrappedPayload)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
err = psp.module.Publish(ctx, topicBuffer.String(), wrappedPayload.Payload)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("pubsub.publish error publishing: %w", err)
}
return wrappedPayload, nil
}
func (psp *PubSubPublish) Type() string {
return psp.config.Type
}
package processor
import (
"context"
"errors"
"fmt"
"log/slog"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "router.input",
Title: "Router Input",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"source": {
Title: "Source",
Description: "source to report as to the router",
Type: "string",
},
},
Required: []string{"source"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
sourceId, err := params.GetString("source")
if err != nil {
return nil, fmt.Errorf("router.input source error: %w", err)
}
return &RouterInput{config: config, SourceId: sourceId, logger: slog.Default().With("component", "processor", "type", config.Type)}, nil
},
})
}
type RouterInput struct {
config config.ProcessorConfig
SourceId string
logger *slog.Logger
}
func (ri *RouterInput) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
if wrappedPayload.InputHandler == nil {
wrappedPayload.End = true
return wrappedPayload, errors.New("router.input no input handler found")
}
_, err := wrappedPayload.InputHandler(ctx, ri.SourceId, payload)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, errors.New("router.input failed to send input")
}
wrappedPayload.Payload = payload
return wrappedPayload, nil
}
func (ri *RouterInput) Type() string {
return ri.config.Type
}
package processor
import (
"context"
"fmt"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
// NOTE(jwetzell): see language definition https://expr-lang.org/docs/language-definition
type ScriptExpr struct {
config config.ProcessorConfig
Program *vm.Program
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "script.expr",
Title: "Evaluate Expr Expression",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"expression": {
Title: "Expression",
Description: "Expr expression to evaluate",
Type: "string",
},
},
Required: []string{"expression"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
expressionString, err := params.GetString("expression")
if err != nil {
return nil, fmt.Errorf("script.expr expression error: %w", err)
}
program, err := expr.Compile(expressionString)
if err != nil {
return nil, err
}
return &ScriptExpr{config: config, Program: program}, nil
},
})
}
func (se *ScriptExpr) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
exprEnv := wrappedPayload
output, err := expr.Run(se.Program, exprEnv)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
wrappedPayload.Payload = output
return wrappedPayload, nil
}
func (se *ScriptExpr) Type() string {
return se.config.Type
}
package processor
import (
"context"
"fmt"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"modernc.org/quickjs"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "script.js",
Title: "Run JavaScript",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"program": {
Title: "Program",
Description: "JavaScript program to run",
Type: "string",
},
},
Required: []string{"program"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
programString, err := params.GetString("program")
if err != nil {
return nil, fmt.Errorf("script.js program error: %w", err)
}
vm, err := quickjs.NewVM()
if err != nil {
return nil, err
}
payloadAtom, err := vm.NewAtom("payload")
if err != nil {
return nil, err
}
senderAtom, err := vm.NewAtom("sender")
if err != nil {
return nil, err
}
return &ScriptJS{config: config, Program: programString, vm: vm, payloadAtom: payloadAtom, senderAtom: senderAtom}, nil
},
})
}
type ScriptJS struct {
config config.ProcessorConfig
vm *quickjs.VM
payloadAtom quickjs.Atom
senderAtom quickjs.Atom
Program string
}
func (sj *ScriptJS) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
//NOTE(jwetzell): some weird conversion going on with these types
_, isUint8Slice := common.GetAnyAs[[]uint8](wrappedPayload.Payload)
_, isByteSlice := common.GetAnyAs[[]byte](wrappedPayload.Payload)
if isUint8Slice || isByteSlice {
intSlice, ok := common.GetAnyAsIntSlice(wrappedPayload.Payload)
if ok {
wrappedPayload.Payload = intSlice
}
}
err := sj.vm.SetProperty(sj.vm.GlobalObject(), sj.payloadAtom, wrappedPayload.Payload)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
_, err = sj.vm.Eval(sj.Program, quickjs.EvalGlobal)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
output, err := sj.vm.GetProperty(sj.vm.GlobalObject(), sj.payloadAtom)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
// NOTE(jwetzell): turn undefined into nil
_, ok := output.(quickjs.Undefined)
if ok {
wrappedPayload.End = true
wrappedPayload.Payload = nil
return wrappedPayload, nil
}
// NOTE(jwetzell): turn object into map[string]interface{}
outputObject, ok := output.(*quickjs.Object)
if ok {
var outputSlice []any
err = outputObject.Into(&outputSlice)
if err != nil {
var outputMap map[string]any
err = outputObject.Into(&outputMap)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
} else {
wrappedPayload.Payload = outputMap
return wrappedPayload, nil
}
} else {
wrappedPayload.Payload = outputSlice
return wrappedPayload, nil
}
}
wrappedPayload.Payload = output
return wrappedPayload, nil
}
func (sj *ScriptJS) Type() string {
return sj.config.Type
}
package processor
import (
"context"
"encoding/json"
"errors"
"fmt"
extism "github.com/extism/go-sdk"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "script.wasm",
Title: "Run WASM Plugin",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"path": {
Title: "Path",
Description: "path to the WASM plugin to load",
Type: "string",
},
"function": {
Title: "Function",
Description: "exported function to call on the WASM plugin",
Type: "string",
Default: json.RawMessage(`"process"`),
},
"enableWasi": {
Title: "Enable WASI",
Description: "whether to enable WASI when running the WASM plugin",
Type: "boolean",
Default: json.RawMessage("false"),
},
},
Required: []string{"path"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(processorConfig config.ProcessorConfig) (Processor, error) {
params := processorConfig.Params
pathString, err := params.GetString("path")
if err != nil {
return nil, fmt.Errorf("script.wasm path error: %w", err)
}
functionString, err := params.GetString("function")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
functionString = "process"
} else {
return nil, fmt.Errorf("script.wasm function error: %w", err)
}
}
enableWasiBool, err := params.GetBool("enableWasi")
if err != nil {
if errors.Is(err, config.ErrParamNotFound) {
enableWasiBool = false
} else {
return nil, fmt.Errorf("script.wasm enableWasi error: %w", err)
}
}
manifest := extism.Manifest{
Wasm: []extism.Wasm{
extism.WasmFile{
Path: pathString,
},
},
}
program, err := extism.NewCompiledPlugin(context.Background(), manifest, extism.PluginConfig{
EnableWasi: enableWasiBool,
}, []extism.HostFunction{})
if err != nil {
return nil, err
}
programInstance, err := program.Instance(context.Background(), extism.PluginInstanceConfig{})
if err != nil {
return nil, err
}
return &ScriptWASM{config: processorConfig, Program: programInstance, Function: functionString}, nil
},
})
}
type ScriptWASM struct {
config config.ProcessorConfig
Program *extism.Plugin
Function string
}
func (sw *ScriptWASM) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadBytes, ok := common.GetAnyAsByteSlice(payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("script.wasm can only process a byte array")
}
_, output, err := sw.Program.Call(sw.Function, payloadBytes)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
wrappedPayload.Payload = output
return wrappedPayload, nil
}
func (sw *ScriptWASM) Type() string {
return sw.config.Type
}
package processor
import (
"bytes"
"context"
"fmt"
"text/template"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "sip.response.audio.create",
Title: "Create SIP Audio Response",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"preWait": {
Title: "Pre Wait (ms)",
Description: "number of milliseconds to wait before playing the audio",
Type: "integer",
},
"audioFile": {
Title: "Audio File",
Description: "path to the audio file to play",
Type: "string",
},
"postWait": {
Title: "Post Wait (ms)",
Description: "number of milliseconds to wait after playing the audio",
Type: "integer",
},
},
Required: []string{"preWait", "postWait", "audioFile"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
preWaitNum, err := params.GetInt("preWait")
if err != nil {
return nil, fmt.Errorf("sip.response.audio.create preWait error: %w", err)
}
postWaitNum, err := params.GetInt("postWait")
if err != nil {
return nil, fmt.Errorf("sip.response.audio.create postWait error: %w", err)
}
audioFileString, err := params.GetString("audioFile")
if err != nil {
return nil, fmt.Errorf("sip.response.audio.create audioFile error: %w", err)
}
audioFileTemplate, err := template.New("audioFile").Parse(audioFileString)
if err != nil {
return nil, err
}
return &SipResponseAudioCreate{config: config, AudioFile: audioFileTemplate, PreWait: int(preWaitNum), PostWait: int(postWaitNum)}, nil
},
})
}
type SipResponseAudioCreate struct {
config config.ProcessorConfig
PreWait int
PostWait int
AudioFile *template.Template
}
type SipAudioFileResponse struct {
PreWait int
PostWait int
AudioFile string
}
func (srac *SipResponseAudioCreate) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
templateData := wrappedPayload
var audioFileBuffer bytes.Buffer
err := srac.AudioFile.Execute(&audioFileBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
audioFileString := audioFileBuffer.String()
wrappedPayload.Payload = SipAudioFileResponse{
PreWait: srac.PreWait,
PostWait: srac.PostWait,
AudioFile: audioFileString,
}
return wrappedPayload, nil
}
func (srac *SipResponseAudioCreate) Type() string {
return srac.config.Type
}
package processor
import (
"bytes"
"context"
"errors"
"fmt"
"regexp"
"text/template"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "sip.response.dtmf.create",
Title: "Create SIP DTMF Response",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"preWait": {
Title: "Pre Wait (ms)",
Description: "number of milliseconds to wait before sending the DTMF tones",
Type: "integer",
},
"digits": {
Title: "Digits",
Description: "DTMF digits to send",
Type: "string",
},
"postWait": {
Title: "Post Wait (ms)",
Description: "number of milliseconds to wait after sending the DTMF tones",
Type: "integer",
},
},
Required: []string{"preWait", "postWait", "digits"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
preWaitNum, err := params.GetInt("preWait")
if err != nil {
return nil, fmt.Errorf("sip.response.dtmf.create preWait error: %w", err)
}
postWaitNum, err := params.GetInt("postWait")
if err != nil {
return nil, fmt.Errorf("sip.response.dtmf.create postWait error: %w", err)
}
digitsString, err := params.GetString("digits")
if err != nil {
return nil, fmt.Errorf("sip.response.dtmf.create digits error: %w", err)
}
digitsTemplate, err := template.New("digits").Parse(digitsString)
if err != nil {
return nil, err
}
return &SipResponseDTMFCreate{config: config, Digits: digitsTemplate, PreWait: int(preWaitNum), PostWait: int(postWaitNum), validDTMF: validDTMFRegex}, nil
},
})
}
type SipResponseDTMFCreate struct {
config config.ProcessorConfig
PreWait int
PostWait int
Digits *template.Template
validDTMF *regexp.Regexp
}
type SipDTMFResponse struct {
PreWait int
PostWait int
Digits string
}
var validDTMFRegex = regexp.MustCompile(`^[0-9*#A-Da-d]+$`)
func (srdc *SipResponseDTMFCreate) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
templateData := wrappedPayload
var digitsBuffer bytes.Buffer
err := srdc.Digits.Execute(&digitsBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
digitsString := digitsBuffer.String()
if !srdc.validDTMF.MatchString(digitsString) {
wrappedPayload.End = true
return wrappedPayload, errors.New("sip.response.dtmf.create result of digits template contains invalid characters")
}
wrappedPayload.Payload = SipDTMFResponse{
PreWait: srdc.PreWait,
PostWait: srdc.PostWait,
Digits: digitsString,
}
return wrappedPayload, nil
}
func (srdc *SipResponseDTMFCreate) Type() string {
return srdc.config.Type
}
package processor
import (
"bytes"
"context"
"fmt"
"text/template"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "string.create",
Title: "Create String",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"template": {
Title: "Template",
Description: "template to evaluate",
Type: "string",
},
},
Required: []string{"template"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
templateString, err := params.GetString("template")
if err != nil {
return nil, fmt.Errorf("string.create template error: %w", err)
}
templateTemplate, err := template.New("template").Parse(templateString)
if err != nil {
return nil, err
}
return &StringCreate{config: config, Template: templateTemplate}, nil
},
})
}
type StringCreate struct {
config config.ProcessorConfig
Template *template.Template
}
func (sc *StringCreate) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
templateData := wrappedPayload
var templateBuffer bytes.Buffer
err := sc.Template.Execute(&templateBuffer, templateData)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, err
}
wrappedPayload.Payload = templateBuffer.String()
return wrappedPayload, nil
}
func (sc *StringCreate) Type() string {
return sc.config.Type
}
package processor
import (
"context"
"errors"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "string.decode",
Title: "Decode String",
New: func(config config.ProcessorConfig) (Processor, error) {
return &StringDecode{config: config}, nil
},
})
}
type StringDecode struct {
config config.ProcessorConfig
}
func (sd *StringDecode) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadBytes, ok := common.GetAnyAsByteSlice(payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("string.decode processor only accepts a []byte")
}
payloadMessage := string(payloadBytes)
wrappedPayload.Payload = payloadMessage
return wrappedPayload, nil
}
func (sd *StringDecode) Type() string {
return sd.config.Type
}
package processor
import (
"context"
"errors"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "string.encode",
Title: "Encode String",
New: func(config config.ProcessorConfig) (Processor, error) {
return &StringEncode{config: config}, nil
},
})
}
type StringEncode struct {
config config.ProcessorConfig
}
func (se *StringEncode) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadString, ok := common.GetAnyAs[string](payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("string.encode processor only accepts a string")
}
wrappedPayload.Payload = []byte(payloadString)
return wrappedPayload, nil
}
func (se *StringEncode) Type() string {
return se.config.Type
}
package processor
import (
"context"
"errors"
"fmt"
"strings"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "string.split",
Title: "Split String",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"separator": {
Title: "Separator",
Description: "separator to split the string on",
Type: "string",
},
},
Required: []string{"separator"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
separatorString, err := params.GetString("separator")
if err != nil {
return nil, fmt.Errorf("string.split separator error: %w", err)
}
return &StringSplit{config: config, Separator: separatorString}, nil
},
})
}
type StringSplit struct {
config config.ProcessorConfig
Separator string
}
func (ss *StringSplit) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
payloadString, ok := common.GetAnyAs[string](payload)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("string.split only accepts a string")
}
wrappedPayload.Payload = strings.Split(payloadString, ss.Separator)
return wrappedPayload, nil
}
func (ss *StringSplit) Type() string {
return ss.config.Type
}
package processor
import (
"context"
"errors"
"fmt"
"reflect"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "struct.field.get",
Title: "Get Struct Field",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"name": {
Title: "Field Name",
Description: "name of the struct field to extract",
Type: "string",
},
},
Required: []string{"name"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
nameString, err := params.GetString("name")
if err != nil {
return nil, fmt.Errorf("struct.field.get name error: %w", err)
}
return &StructFieldGet{config: config, Name: nameString}, nil
},
})
}
type StructFieldGet struct {
config config.ProcessorConfig
Name string
}
func (sfg *StructFieldGet) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
s := reflect.ValueOf(payload)
if s.Kind() != reflect.Struct {
if s.Kind() == reflect.Pointer && s.Elem().Kind() == reflect.Struct {
s = s.Elem()
} else {
wrappedPayload.End = true
return wrappedPayload, errors.New("struct.field.get processor only accepts a struct payload")
}
}
field := s.FieldByName(sfg.Name)
if !field.IsValid() {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("struct.field.get field '%s' does not exist", sfg.Name)
}
wrappedPayload.Payload = field.Interface()
return wrappedPayload, nil
}
func (sfg *StructFieldGet) Type() string {
return sfg.config.Type
}
package processor
import (
"context"
"errors"
"fmt"
"reflect"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "struct.method.get",
Title: "Get Struct Method",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"name": {
Title: "Method Name",
Description: "name of the struct method to extract and call (with no arguments)",
Type: "string",
},
},
Required: []string{"name"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
nameString, err := params.GetString("name")
if err != nil {
return nil, fmt.Errorf("struct.method.get name error: %w", err)
}
return &StructMethodGet{config: config, Name: nameString}, nil
},
})
}
type StructMethodGet struct {
config config.ProcessorConfig
Name string
}
func (smg *StructMethodGet) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
payload := wrappedPayload.Payload
s := reflect.ValueOf(payload)
if s.Kind() != reflect.Struct {
if s.Kind() == reflect.Pointer && s.Elem().Kind() == reflect.Struct {
s = s.Elem()
} else {
wrappedPayload.End = true
return wrappedPayload, errors.New("struct.method.get processor only accepts a struct payload")
}
}
method := s.MethodByName(smg.Name)
if !method.IsValid() {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("struct.method.get method '%s' does not exist", smg.Name)
}
value := method.Call(nil)
if len(value) == 0 {
wrappedPayload.End = true
wrappedPayload.Payload = nil
return wrappedPayload, nil
}
if len(value) == 1 {
wrappedPayload.Payload = value[0].Interface()
return wrappedPayload, nil
}
results := make([]any, len(value))
for i, v := range value {
results[i] = v.Interface()
}
wrappedPayload.Payload = results
return wrappedPayload, nil
}
func (smg *StructMethodGet) Type() string {
return smg.config.Type
}
package processor
import (
"context"
"fmt"
"time"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "time.sleep",
Title: "Sleep",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"duration": {
Title: "Duration",
Description: "time to sleep in milliseconds",
Type: "integer",
},
},
Required: []string{"duration"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
durationNum, err := params.GetInt("duration")
if err != nil {
return nil, fmt.Errorf("time.sleep duration error: %w", err)
}
return &TimeSleep{config: config, Duration: time.Millisecond * time.Duration(durationNum)}, nil
},
})
}
type TimeSleep struct {
config config.ProcessorConfig
Duration time.Duration
}
func (ts *TimeSleep) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
time.Sleep(ts.Duration)
return wrappedPayload, nil
}
func (ts *TimeSleep) Type() string {
return ts.config.Type
}
package route
import (
"context"
"fmt"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor"
)
type Route struct {
input string
processors []processor.Processor
}
func NewRoute(config config.RouteConfig) (*Route, error) {
processors := []processor.Processor{}
if len(config.Processors) > 0 {
for _, processorDecl := range config.Processors {
processorInfo, ok := processor.GetProcessorRegistration(processorDecl.Type)
if !ok {
return nil, fmt.Errorf("problem loading processor registration for processor type: %s", processorDecl.Type)
}
processor, err := processorInfo.New(processorDecl)
if err != nil {
return nil, err
}
processors = append(processors, processor)
}
}
return &Route{input: config.Input, processors: processors}, nil
}
func (r *Route) Input() string {
return r.input
}
func (r *Route) ProcessPayload(ctx context.Context, wrappedPayload common.WrappedPayload) (any, error) {
for processorIndex, processor := range r.processors {
processedPayload, err := processor.Process(ctx, wrappedPayload)
if err != nil {
return nil, fmt.Errorf("processor[%d] error: %w", processorIndex, err)
}
//NOTE(jwetzell) payload has been marked as an end without error
if processedPayload.End {
return processedPayload.Payload, nil
}
wrappedPayload = processedPayload
}
return wrappedPayload.Payload, nil
}
package schema
import (
"github.com/google/jsonschema-go/jsonschema"
)
var ConfigSchema = jsonschema.Schema{
Schema: "https://json-schema.org/draft/2020-12/schema",
ID: "https://showbridge.io/config.schema.json",
Title: "Config",
Description: "showbridge configuration",
Type: "object",
Properties: map[string]*jsonschema.Schema{
"api": &ApiConfigSchema,
"modules": {
Ref: "https://showbridge.io/modules.schema.json",
},
"routes": {
Ref: "https://showbridge.io/routes.schema.json",
},
},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
}
func ApplyDefaults(cfg *map[string]any) error {
resolvedSchema, err := GetResolvedConfigSchema()
if err != nil {
return err
}
return resolvedSchema.ApplyDefaults(cfg)
}
func ValidateConfig(cfg map[string]any) error {
resolvedSchema, err := GetResolvedConfigSchema()
if err != nil {
return err
}
return resolvedSchema.Validate(cfg)
}
package schema
import (
"encoding/json"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/module"
)
func GetModulesSchema() *jsonschema.Schema {
schema := &jsonschema.Schema{
Schema: "https://json-schema.org/draft/2020-12/schema",
ID: "https://showbridge.io/modules.schema.json",
Title: "Modules",
Description: "module configurations",
Type: "array",
Default: json.RawMessage(`[]`),
}
moduleDefinitionSchemas := []*jsonschema.Schema{}
for _, mod := range module.GetModuleRegistrations() {
moduleSchema := &jsonschema.Schema{
ID: mod.Type,
Type: "object",
Properties: map[string]*jsonschema.Schema{
"id": {
Type: "string",
MinLength: new(1),
},
"type": {
Const: jsonschema.Ptr[any](mod.Type),
},
},
Required: []string{"id", "type"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
}
if mod.Title != "" {
moduleSchema.Title = mod.Title
}
if mod.Description != "" {
moduleSchema.Description = mod.Description
}
if mod.ParamsSchema != nil {
moduleSchema.Properties["params"] = mod.ParamsSchema
if len(mod.ParamsSchema.Required) > 0 {
moduleSchema.Required = append(moduleSchema.Required, "params")
}
}
moduleDefinitionSchemas = append(moduleDefinitionSchemas, moduleSchema)
}
schema.Items = &jsonschema.Schema{
OneOf: moduleDefinitionSchemas,
}
return schema
}
package schema
import (
"encoding/json"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/processor"
)
func GetProcessorsSchema() *jsonschema.Schema {
schema := &jsonschema.Schema{
Schema: "https://json-schema.org/draft/2020-12/schema",
ID: "https://showbridge.io/processors.schema.json",
Title: "Processors",
Description: "processor configurations",
Type: "array",
Default: json.RawMessage(`[]`),
}
processorDefinitionSchemas := []*jsonschema.Schema{}
for _, proc := range processor.GetProcessorRegistrations() {
processorSchema := &jsonschema.Schema{
ID: proc.Type,
Type: "object",
Properties: map[string]*jsonschema.Schema{
"type": {
Const: jsonschema.Ptr[any](proc.Type),
},
},
Required: []string{"type"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
}
if proc.Title != "" {
processorSchema.Title = proc.Title
}
if proc.Description != "" {
processorSchema.Description = proc.Description
}
if proc.ParamsSchema != nil {
processorSchema.Properties["params"] = proc.ParamsSchema
if len(proc.ParamsSchema.Required) > 0 {
processorSchema.Required = append(processorSchema.Required, "params")
}
}
processorDefinitionSchemas = append(processorDefinitionSchemas, processorSchema)
}
schema.Items = &jsonschema.Schema{
OneOf: processorDefinitionSchemas,
}
return schema
}
package schema
import (
"fmt"
"net/url"
"github.com/google/jsonschema-go/jsonschema"
)
func GetResolvedConfigSchema() (*jsonschema.Resolved, error) {
return ConfigSchema.Resolve(&jsonschema.ResolveOptions{
Loader: func(uri *url.URL) (*jsonschema.Schema, error) {
switch uri.String() {
case "https://showbridge.io/modules.schema.json":
return GetModulesSchema(), nil
case "https://showbridge.io/processors.schema.json":
return GetProcessorsSchema(), nil
case "https://showbridge.io/routes.schema.json":
return &RoutesConfigSchema, nil
default:
return nil, fmt.Errorf("unknown schema reference: %s", uri.String())
}
},
ValidateDefaults: true,
})
}
package test
import (
"context"
"database/sql"
"github.com/jwetzell/showbridge-go/internal/common"
_ "modernc.org/sqlite"
)
func NewTestModule(id string) *TestModule {
return &TestModule{
id: id,
}
}
type TestModule struct {
id string
}
func (m *TestModule) Start(ctx context.Context, inputHandler common.InputHandler) error {
<-ctx.Done()
return nil
}
func (m *TestModule) Stop() {}
func (m *TestModule) Type() string {
return "test.plain"
}
func (m *TestModule) Id() string {
return "test"
}
func NewTestOutputModule(id string) *TestOutputModule {
return &TestOutputModule{
id: id,
}
}
type TestOutputModule struct {
id string
}
func (m *TestOutputModule) Start(ctx context.Context, inputHandler common.InputHandler) error {
<-ctx.Done()
return nil
}
func (m *TestOutputModule) Output(ctx context.Context, payload any) error {
return nil
}
func (m *TestOutputModule) Stop() {}
func (m *TestOutputModule) Type() string {
return "test.output"
}
func (m *TestOutputModule) Id() string {
return m.id
}
func NewTestKVModule(id string, presetValues map[string]any) *TestKVModule {
return &TestKVModule{
id: id,
kvData: presetValues,
}
}
type TestKVModule struct {
id string
kvData map[string]any
}
func (m *TestKVModule) Start(ctx context.Context, inputHandler common.InputHandler) error {
<-ctx.Done()
return nil
}
func (m *TestKVModule) Stop() {}
func (m *TestKVModule) Type() string {
return "test.kv"
}
func (m *TestKVModule) Id() string {
return m.id
}
func (m *TestKVModule) Get(ctx context.Context, key string) (any, error) {
if m.kvData == nil {
return nil, nil
}
value, ok := m.kvData[key]
if !ok {
return nil, nil
}
return value, nil
}
func (m *TestKVModule) Set(ctx context.Context, key string, value any) error {
if m.kvData == nil {
m.kvData = make(map[string]any)
}
m.kvData[key] = value
return nil
}
func NewTestDBModule(id string) *TestDBModule {
return &TestDBModule{
id: id,
}
}
type TestDBModule struct {
id string
db *sql.DB
}
func (m *TestDBModule) Start(ctx context.Context, inputHandler common.InputHandler) error {
<-ctx.Done()
return nil
}
func (m *TestDBModule) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
if m.db == nil {
db, err := sql.Open("sqlite", ":memory:")
if err != nil {
return nil, err
}
_, err = db.Exec(`
CREATE TABLE test (
id INTEGER PRIMARY KEY,
value TEXT
);
INSERT INTO test (id, value) VALUES (1, 'test-1'), (2, 'test-2');
`)
if err != nil {
return nil, err
}
m.db = db
}
return m.db.QueryContext(ctx, query, args...)
}
func (m *TestDBModule) Stop() {}
func (m *TestDBModule) Type() string {
return "test.db"
}
func (m *TestDBModule) Id() string {
return m.id
}
func NewTestPubSubModule(id string) *TestPubSubModule {
return &TestPubSubModule{
id: id,
}
}
type TestPubSubModule struct {
id string
}
func (m *TestPubSubModule) Start(ctx context.Context, inputHandler common.InputHandler) error {
<-ctx.Done()
return nil
}
func (m *TestPubSubModule) Publish(ctx context.Context, topic string, payload any) error {
return nil
}
func (m *TestPubSubModule) Stop() {}
func (m *TestPubSubModule) Type() string {
return "test.pubsub"
}
func (m *TestPubSubModule) Id() string {
return m.id
}
package test
import (
"context"
"github.com/jwetzell/showbridge-go/internal/common"
)
type TestProcessor struct {
}
func (p *TestProcessor) Type() string {
return "test"
}
func (p *TestProcessor) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
return wrappedPayload, nil
}
package test
import (
"context"
"github.com/jwetzell/showbridge-go/internal/common"
)
type TestRouter struct {
}
func (r *TestRouter) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []common.RouteIOError) {
return false, nil
}
func GetNewTestRouter() *TestRouter {
return &TestRouter{}
}
package test
type TestStruct struct {
String string
Int int
Float float64
Bool bool
Data any
IntSlice []int
}
func (t TestStruct) GetString() string {
return t.String
}
func (t TestStruct) GetInt() int {
return t.Int
}
func (t TestStruct) GetFloat() float64 {
return t.Float
}
func (t TestStruct) GetBool() bool {
return t.Bool
}
func (t TestStruct) GetData() any {
return t.Data
}
func (t TestStruct) GetIntSlice() []int {
return t.IntSlice
}
func (t TestStruct) Void() {}
func (t TestStruct) MultipleReturnValues() (string, int) {
return t.String, t.Int
}
package showbridge
import (
"context"
"errors"
"log/slog"
"sync"
"sync/atomic"
"github.com/jwetzell/showbridge-go/internal/api"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/module"
"github.com/jwetzell/showbridge-go/internal/route"
)
// TODO(jwetzell): can/should this be split into different "components"?
type Router struct {
contextCancel context.CancelFunc
Context context.Context
// TODO(jwetzell): do these need to be guarded against concurrency?
ModuleInstances map[string]common.Module
// TODO(jwetzell): change to something easier to lookup
RouteInstances []*route.Route
ConfigChange chan config.Config
moduleWait sync.WaitGroup
logger *slog.Logger
runningConfig config.Config
runningConfigMu sync.RWMutex
apiServer *api.ApiServer
eventDestinations []common.EventDestination
eventDestinationsMu sync.Mutex
}
func (r *Router) addModule(moduleDecl config.ModuleConfig) error {
if moduleDecl.Id == "" {
return errors.New("module id cannot be empty")
}
moduleRegistration, ok := module.GetModuleRegistration(moduleDecl.Type)
if !ok {
return errors.New("module type not defined")
}
_, ok = r.ModuleInstances[moduleDecl.Id]
if ok {
return errors.New("module id already exists")
}
moduleInstance, err := moduleRegistration.New(moduleDecl)
if err != nil {
return err
}
r.ModuleInstances[moduleDecl.Id] = moduleInstance
return nil
}
func (r *Router) removeModule(moduleId string) error {
err := r.stopModule(moduleId)
if err != nil {
return err
}
delete(r.ModuleInstances, moduleId)
return nil
}
func (r *Router) startModule(ctx context.Context, moduleId string) error {
moduleInstance := r.getModule(moduleId)
if moduleInstance == nil {
return errors.New("module id not found")
}
r.moduleWait.Go(func() {
err := moduleInstance.Start(ctx, r.HandleInput)
if err != nil {
// TODO(jwetzell): propagate module run errors better
r.logger.Error("error encountered running module", "moduleId", moduleId, "error", err)
}
})
return nil
}
func (r *Router) stopModule(moduleId string) error {
moduleInstance := r.getModule(moduleId)
if moduleInstance == nil {
return errors.New("module id not found")
}
moduleInstance.Stop()
return nil
}
// TODO(jwetzell): support removing route
func (r *Router) addRoute(routeDecl config.RouteConfig) error {
routeInstance, err := route.NewRoute(routeDecl)
if err != nil {
return err
}
r.RouteInstances = append(r.RouteInstances, routeInstance)
return nil
}
func (r *Router) getModule(moduleId string) common.Module {
moduleInstance, ok := r.ModuleInstances[moduleId]
if !ok {
return nil
}
return moduleInstance
}
func NewRouter(routerConfig config.Config) (*Router, []config.ModuleError, []config.RouteError) {
router := Router{
ModuleInstances: make(map[string]common.Module),
RouteInstances: []*route.Route{},
ConfigChange: make(chan config.Config, 1),
logger: slog.Default().With("component", "router"),
runningConfig: routerConfig,
}
router.logger.Debug("creating")
var moduleErrors []config.ModuleError
for moduleIndex, moduleDecl := range routerConfig.Modules {
err := router.addModule(moduleDecl)
if err != nil {
if moduleErrors == nil {
moduleErrors = []config.ModuleError{}
}
moduleErrors = append(moduleErrors, config.ModuleError{
Index: moduleIndex,
Config: moduleDecl,
Error: err.Error(),
})
continue
}
}
var routeErrors []config.RouteError
for routeIndex, routeDecl := range routerConfig.Routes {
err := router.addRoute(routeDecl)
if err != nil {
if routeErrors == nil {
routeErrors = []config.RouteError{}
}
routeErrors = append(routeErrors, config.RouteError{
Index: routeIndex,
Config: routeDecl,
Error: err.Error(),
})
continue
}
}
apiServer := api.NewApiServer(&router, &router)
router.apiServer = apiServer
return &router, moduleErrors, routeErrors
}
func (r *Router) Start(ctx context.Context) {
r.logger.Info("running")
routerContext, cancel := context.WithCancel(ctx)
r.Context = routerContext
r.contextCancel = cancel
r.startModules()
r.apiServer.Start(r.GetRunningConfig().Api)
}
func (r *Router) Stop() {
r.logger.Info("stopping")
r.logger.Debug("shutting down api server")
r.apiServer.Stop()
r.logger.Debug("stopping modules")
r.stopModules()
r.logger.Debug("waiting for modules to exit")
r.moduleWait.Wait()
r.logger.Debug("canceling router context")
r.contextCancel()
r.logger.Info("done")
}
func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []common.RouteIOError) {
r.runningConfigMu.RLock()
defer r.runningConfigMu.RUnlock()
var routeIOErrors []common.RouteIOError
var routeFound atomic.Bool
r.broadcastEvent(common.Event{
Type: "input",
Data: map[string]any{
"source": sourceId,
},
})
var routeWaitGroup sync.WaitGroup
for routeIndex, routeInstance := range r.RouteInstances {
if routeInstance == nil {
r.logger.Error("nil route instance found", "routeIndex", routeIndex)
continue
}
if routeInstance.Input() == sourceId {
routeWaitGroup.Go(func() {
routeFound.Store(true)
_, err := routeInstance.ProcessPayload(ctx, common.WrappedPayload{
Payload: payload,
Source: sourceId,
Modules: r.ModuleInstances,
InputHandler: r.HandleInput,
End: false,
})
if err != nil {
if routeIOErrors == nil {
routeIOErrors = []common.RouteIOError{}
}
r.logger.Error("unable to process input", "route", routeIndex, "source", sourceId, "error", err)
routeIOErrors = append(routeIOErrors, common.RouteIOError{
Index: routeIndex,
ProcessError: err,
})
r.broadcastEvent(common.Event{
Type: "route",
Data: map[string]any{
"index": routeIndex,
},
Error: err.Error(),
})
return
}
r.broadcastEvent(common.Event{
Type: "route",
Data: map[string]any{
"index": routeIndex,
},
})
})
}
}
routeWaitGroup.Wait()
return routeFound.Load(), routeIOErrors
}
func (r *Router) startModules() {
for moduleId := range r.ModuleInstances {
// TODO(jwetzell): handle module run errors
err := r.startModule(r.Context, moduleId)
if err != nil {
r.logger.Error("error starting module", "moduleId", moduleId, "error", err)
}
}
}
func (r *Router) stopModules() {
for moduleId := range r.ModuleInstances {
// TODO(jwetzell): handle module stop errors?
err := r.stopModule(moduleId)
if err != nil {
r.logger.Error("error stopping module", "moduleId", moduleId, "error", err)
}
}
}