package main
import (
"context"
"flag"
"log"
"os"
"os/signal"
"syscall"
"github.com/mekops-labs/siphon/internal/config"
_ "github.com/mekops-labs/siphon/internal/modules"
"github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/collector"
"github.com/mekops-labs/siphon/pkg/editor"
"github.com/mekops-labs/siphon/pkg/pipeline"
"github.com/mekops-labs/siphon/pkg/sink"
)
var version = "unknown"
func main() {
// Add an optional flag for the editor port
editorPort := flag.Int("editor-port", 0, "Enable the web-based config editor on this port (e.g. 8099)")
flag.Parse()
// Read the config file path from the remaining args
if flag.NArg() < 1 {
log.Fatal("need config file path as argument! Usage: siphon [flags] <config.yaml>")
}
configPath := flag.Arg(0)
log.Print("Starting Siphon v. ", version)
if *editorPort > 0 {
go editor.Start(*editorPort, configPath)
}
// 1. Load Config
cfg, err := config.Load(configPath)
if err != nil {
log.Fatal(err)
}
// 2. Initialize the Event Bus
eventBus := bus.NewMemoryBus()
// 3. Initialize Collectors
collectors := make(map[string]collector.Collector)
for name, colCfg := range cfg.Collectors {
collectorInit, ok := collector.Registry[colCfg.Type]
if !ok {
log.Printf("unknown collector type: %s", colCfg.Type)
continue
}
c := collectorInit(colCfg.Params)
// Register all aliases defined in config.yaml for this collector
for alias, rawTopic := range colCfg.Topics {
c.RegisterTopic(alias, rawTopic)
log.Printf("Collector [%s] registered alias '%s' for '%s'", name, alias, rawTopic)
}
collectors[name] = c
}
// 4. Initialize Sinks
sinks := make(map[string]sink.Sink)
for name, sinkCfg := range cfg.Sinks {
sinkInit, ok := sink.Registry[sinkCfg.Type]
if !ok {
log.Printf("unknown sink type: %s", sinkCfg.Type)
continue
}
s, err := sinkInit(sinkCfg.Params, eventBus)
if err != nil {
log.Printf("can't initialize sink %s: %v", name, err)
continue
}
sinks[name] = s
log.Printf("added sink: %s, type: %s", name, sinkCfg.Type)
}
// 5. Start the Pipeline Runner
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
runner := pipeline.NewRunner(eventBus, sinks)
runner.Start(ctx, cfg.Pipelines)
// 6. Start Collectors (Injecting the bus so they can publish)
for _, c := range collectors {
go c.Start(eventBus)
}
// 7. Wait for interrupt
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
log.Print("Exiting application...")
cancel() // Stop pipelines
for _, c := range collectors {
c.End() // Stop collectors
}
}
package config
import (
"fmt"
"os"
"github.com/goccy/go-yaml"
"github.com/mekops-labs/siphon/internal/utils"
)
// Config is the root of the Siphon v2 configuration
type Config struct {
Version int `yaml:"version"`
Collectors map[string]CollectorConfig `yaml:"collectors"`
Sinks map[string]SinkConfig `yaml:"sinks"`
Pipelines []PipelineConfig `yaml:"pipelines"`
}
type CollectorConfig struct {
Type string `yaml:"type"`
Topics map[string]string `yaml:"topics"`
Params map[string]interface{} `yaml:"params"` // Generic params for module
}
type SinkConfig struct {
Type string `yaml:"type"`
Params map[string]interface{} `yaml:"params"` // Generic params for module
}
// PipelineConfig defines the linear data flow
type PipelineConfig struct {
Name string `yaml:"name"`
BusMode string `yaml:"bus_mode,omitempty"` // "volatile" or "durable"
Stateful bool `yaml:"stateful,omitempty"`
Type string `yaml:"type,omitempty"` // "cron" or empty
Schedule string `yaml:"schedule,omitempty"`
Topics []string `yaml:"topics,omitempty"`
Parser *ParserConfig `yaml:"parser,omitempty"`
Transform []map[string]string `yaml:"transform,omitempty"`
Sinks []PipelineSinkConfig `yaml:"sinks"`
}
type ParserConfig struct {
Type string `yaml:"type"`
Vars map[string]string `yaml:"vars"`
}
type PipelineSinkConfig struct {
Name string `yaml:"name"`
Format string `yaml:"format"`
Spec string `yaml:"spec"`
}
// Load reads the YAML file, expands ENV vars, and unmarshals it into the Config struct.
func Load(path string) (*Config, error) {
raw, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("can't read config file: %w", err)
}
processedYaml := utils.ReplaceWithEnvVars(string(raw))
var cfg Config
if err := yaml.Unmarshal([]byte(processedYaml), &cfg); err != nil {
return nil, fmt.Errorf("failed to parse yaml: %w", err)
}
if cfg.Version != 2 {
return nil, fmt.Errorf("unsupported config version: %d (expected 2)", cfg.Version)
}
return &cfg, nil
}
package utils
import (
"math/rand"
"os"
"regexp"
"strings"
"time"
)
const charset = "abcdefghijklmnopqrstuvwxyz" +
"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
var seededRand *rand.Rand = rand.New(
rand.NewSource(time.Now().UnixNano()))
func stringWithCharset(length int, charset string) string {
b := make([]byte, length)
for i := range b {
b[i] = charset[seededRand.Intn(len(charset))]
}
return string(b)
}
func RandomString(length int) string {
return stringWithCharset(length, charset)
}
func ReplaceWithEnvVars(in string) string {
re := regexp.MustCompile(`%%[a-zA-Z][0-9a-zA-Z_]+%%`)
out := re.ReplaceAllStringFunc(in, func(s string) string {
s = os.Getenv(strings.Trim(s, "%"))
return s
})
return out
}
package bus
import (
"sync"
"time"
)
type DeliveryMode int
const (
ModeVolatile DeliveryMode = iota
ModeDurable
)
// Event is the standard unit of data moving through Siphon
type Event struct {
ID uint64
Topic string
Payload []byte
Mode DeliveryMode
Timestamp time.Time
Ack func()
Nack func()
}
// Bus defines the interface for publishing and subscribing
type Bus interface {
Publish(topic string, payload []byte) error
Subscribe(topic string) <-chan Event
}
// MemoryBus is a high-speed, volatile event bus using Go channels
type MemoryBus struct {
subscribers map[string][]chan Event
lock sync.RWMutex
seqCounter uint64
}
func NewMemoryBus() *MemoryBus {
return &MemoryBus{
subscribers: make(map[string][]chan Event),
}
}
// Publish sends data to all subscribers of a topic without blocking
func (b *MemoryBus) Publish(topic string, payload []byte) error {
b.lock.Lock()
b.seqCounter++
id := b.seqCounter
b.lock.Unlock()
event := Event{
ID: id,
Topic: topic,
Payload: payload,
Mode: ModeVolatile,
Timestamp: time.Now(),
Ack: func() {}, // No-op for volatile mode
Nack: func() {},
}
b.lock.RLock()
subs := b.subscribers[topic]
b.lock.RUnlock()
for _, ch := range subs {
// Non-blocking Ring Buffer logic
select {
case ch <- event:
default:
// Channel full: drop oldest unread message to make room for newest
select {
case <-ch:
default:
}
// Push new event
select {
case ch <- event:
default:
}
}
}
return nil
}
// Subscribe returns a channel that receives events for a specific topic
func (b *MemoryBus) Subscribe(topic string) <-chan Event {
b.lock.Lock()
defer b.lock.Unlock()
ch := make(chan Event, 1) // Buffer size 1 for simple backpressure handling
b.subscribers[topic] = append(b.subscribers[topic], ch)
return ch
}
package collector
import "github.com/mekops-labs/siphon/pkg/bus"
// Collector defines the interface for all ingestion modules
type Collector interface {
Start(b bus.Bus)
End()
RegisterTopic(name string, value string)
}
type Init func(params any) Collector
type registry map[string]Init
// Main registry of all available collector classes
var Registry = make(registry)
func (c registry) Add(name string, constructor Init) {
c[name] = constructor
}
package file
import (
"context"
"log"
"os"
"sync"
"time"
"github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/collector"
"github.com/mitchellh/mapstructure"
)
type FileParams struct {
Interval int `mapstructure:"interval"` // in seconds
}
type fileSource struct {
params FileParams
paths map[string]string
lock sync.Mutex
bus bus.Bus
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// Ensure fileSource implements the Collector interface
var _ collector.Collector = (*fileSource)(nil)
func init() {
collector.Registry.Add("file", New)
}
func New(p any) collector.Collector {
var opt FileParams
if err := mapstructure.Decode(p, &opt); err != nil {
log.Printf("File collector config error: %v", err)
return nil
}
// Default to 10 seconds if not specified
if opt.Interval <= 0 {
opt.Interval = 10
}
ctx, cancel := context.WithCancel(context.Background())
return &fileSource{
params: opt,
paths: make(map[string]string),
ctx: ctx,
cancel: cancel,
}
}
func (f *fileSource) RegisterTopic(name string, value string) {
f.lock.Lock()
defer f.lock.Unlock()
f.paths[name] = value
}
func (f *fileSource) Start(b bus.Bus) {
f.bus = b
f.wg.Add(1)
go func() {
defer f.wg.Done()
ticker := time.NewTicker(time.Duration(f.params.Interval) * time.Second)
defer ticker.Stop()
// Do an immediate initial read
f.readFiles()
for {
select {
case <-f.ctx.Done():
return
case <-ticker.C:
f.readFiles()
}
}
}()
}
func (f *fileSource) readFiles() {
f.lock.Lock()
defer f.lock.Unlock()
for topic, path := range f.paths {
data, err := os.ReadFile(path)
if err != nil {
log.Printf("File read error (%s): %v", path, err)
continue
}
// Publish raw file contents to the Event Bus
if err := f.bus.Publish(topic, data); err != nil {
log.Printf("File Bus Publish Error (%s): %v", topic, err)
}
}
}
func (f *fileSource) End() {
f.cancel()
f.wg.Wait()
}
package hass
import (
"context"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"sync"
"time"
"github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/collector"
"github.com/mitchellh/mapstructure"
)
type HassParams struct {
Interval int `mapstructure:"interval"` // Polling interval in seconds
}
type hassSource struct {
params HassParams
entities map[string]string // alias -> entity_id
lock sync.Mutex
bus bus.Bus
client *http.Client
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// Ensure hassSource implements the Collector interface
var _ collector.Collector = (*hassSource)(nil)
func init() {
collector.Registry.Add("hass", New)
}
func New(p any) collector.Collector {
var opt HassParams
if err := mapstructure.Decode(p, &opt); err != nil {
log.Printf("HASS collector config error: %v", err)
return nil
}
if opt.Interval <= 0 {
opt.Interval = 30 // Default to 30 seconds
}
ctx, cancel := context.WithCancel(context.Background())
return &hassSource{
params: opt,
entities: make(map[string]string),
client: &http.Client{Timeout: 10 * time.Second},
ctx: ctx,
cancel: cancel,
}
}
func (h *hassSource) RegisterTopic(alias, topic string) {
h.lock.Lock()
defer h.lock.Unlock()
h.entities[alias] = topic
}
func (h *hassSource) Start(b bus.Bus) {
h.bus = b
h.wg.Add(1)
go func() {
defer h.wg.Done()
ticker := time.NewTicker(time.Duration(h.params.Interval) * time.Second)
defer ticker.Stop()
h.fetchStates() // Initial run
for {
select {
case <-h.ctx.Done():
return
case <-ticker.C:
h.fetchStates()
}
}
}()
}
func (h *hassSource) fetchStates() {
h.lock.Lock()
defer h.lock.Unlock()
// Grab the token automatically injected by the HA Supervisor
token := os.Getenv("SUPERVISOR_TOKEN")
if token == "" {
log.Println("HASS Collector Warning: SUPERVISOR_TOKEN not found. Are you running inside Home Assistant?")
return
}
for alias, entityID := range h.entities {
var addr string
// Use the internal Supervisor proxy to hit the Core API safely
if entityID == "*" {
addr = "http://supervisor/core/api/states"
} else {
// Ensure the entity ID is URL-encoded
encodedEntityID := url.PathEscape(entityID)
addr = fmt.Sprintf("http://supervisor/core/api/states/%s", encodedEntityID)
}
req, err := http.NewRequestWithContext(h.ctx, http.MethodGet, addr, nil)
if err != nil {
continue
}
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Content-Type", "application/json")
resp, err := h.client.Do(req)
if err != nil {
log.Printf("HASS API Request failed [%s]: %v", entityID, err)
continue
}
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err == nil && resp.StatusCode == 200 {
// Publish the raw JSON response to the Event Bus using the ALIAS
if err := h.bus.Publish(alias, body); err != nil {
log.Printf("HASS Bus Publish Error [%s]: %v", alias, err)
}
} else {
log.Printf("HASS API Error [%s]: HTTP %d", entityID, resp.StatusCode)
}
}
}
func (h *hassSource) End() {
h.cancel()
h.wg.Wait()
}
package mqtt
import (
"crypto/tls"
"log"
"sync"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/mekops-labs/siphon/internal/utils"
"github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/collector"
"github.com/mitchellh/mapstructure"
)
type mqttSource struct {
client mqtt.Client
mqttOptions *mqtt.ClientOptions
topics map[string]string // List of topics requested by the V2 Pipelines
lock sync.Mutex
bus bus.Bus // Reference to the central Event Bus
}
type MqttParams struct {
Url string
User string
Pass string
}
// Ensure mqttSource implements the new Collector interface
var _ collector.Collector = (*mqttSource)(nil)
func init() {
collector.Registry.Add("mqtt", New)
}
func New(p any) collector.Collector {
var opt MqttParams
if err := mapstructure.Decode(p, &opt); err != nil {
return nil
}
if opt.Url == "" {
return nil
}
m := &mqttSource{
topics: make(map[string]string),
}
opts := mqtt.NewClientOptions()
opts.AddBroker(opt.Url)
opts.SetClientID("siphon-" + utils.RandomString(5))
opts.SetUsername(opt.User)
opts.SetPassword(opt.Pass)
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetTLSConfig(&tls.Config{RootCAs: nil})
// Connection handlers
opts.OnConnect = func(client mqtt.Client) {
log.Print("MQTT connected to broker. Subscribing to pipeline topics...")
m.subscribe()
}
opts.OnConnectionLost = func(client mqtt.Client, err error) {
log.Printf("MQTT connection lost: %v", err)
}
m.mqttOptions = opts
m.client = mqtt.NewClient(opts)
return m
}
func (m *mqttSource) subscribe() {
if !m.client.IsConnected() || m.bus == nil {
return
}
m.lock.Lock()
defer m.lock.Unlock()
for busTopic, topic := range m.topics {
// Subscribe and push raw payloads directly to the Event Bus
if token := m.client.Subscribe(topic, 0, func(c mqtt.Client, msg mqtt.Message) {
if err := m.bus.Publish(busTopic, msg.Payload()); err != nil {
log.Printf("MQTT Bus Publish Error (%s): %v", msg.Topic(), err)
}
}); token.Wait() && token.Error() != nil {
log.Printf("MQTT Subscribe Error (%s): %v", topic, token.Error())
} else {
log.Printf("MQTT Subscribed to topic: %s", topic)
}
}
}
// Start connects to the broker and saves the bus reference
func (m *mqttSource) Start(b bus.Bus) {
m.bus = b
if token := m.client.Connect(); token.Wait() && token.Error() != nil {
log.Printf("MQTT connect error: %v", token.Error())
}
}
func (m *mqttSource) End() {
m.client.Disconnect(100)
}
// RegisterTopic tells the collector what to listen for based on pipeline configs.
func (m *mqttSource) RegisterTopic(name string, value string) {
m.lock.Lock()
m.topics[name] = value
m.lock.Unlock()
// If already connected (e.g., config reload), subscribe immediately
if m.client.IsConnected() {
m.subscribe()
}
}
package rest
import (
"bytes"
"context"
"io"
"log"
"net/http"
"sync"
"time"
"github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/collector"
"github.com/mitchellh/mapstructure"
)
type RestParams struct {
Interval int `mapstructure:"interval"` // Polling interval in seconds
Method string `mapstructure:"method"` // HTTP Method (GET, POST, etc.)
Headers map[string]string `mapstructure:"headers"` // Custom headers (e.g. Authorization)
Body string `mapstructure:"body"` // Optional request body
Timeout int `mapstructure:"timeout"` // Request timeout in seconds
}
type restSource struct {
params RestParams
urls map[string]string // alias -> Target URL
lock sync.Mutex
bus bus.Bus
client *http.Client
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// Ensure restSource implements the Collector interface
var _ collector.Collector = (*restSource)(nil)
func init() {
collector.Registry.Add("rest", New)
}
func New(p any) collector.Collector {
var opt RestParams
if err := mapstructure.Decode(p, &opt); err != nil {
log.Printf("REST collector config error: %v", err)
return nil
}
// Apply sensible defaults
if opt.Interval <= 0 {
opt.Interval = 60
}
if opt.Method == "" {
opt.Method = http.MethodGet
}
if opt.Timeout <= 0 {
opt.Timeout = 10
}
ctx, cancel := context.WithCancel(context.Background())
return &restSource{
params: opt,
urls: make(map[string]string),
client: &http.Client{Timeout: time.Duration(opt.Timeout) * time.Second},
ctx: ctx,
cancel: cancel,
}
}
func (r *restSource) RegisterTopic(alias, topic string) {
r.lock.Lock()
defer r.lock.Unlock()
r.urls[alias] = topic
}
func (r *restSource) Start(b bus.Bus) {
r.bus = b
r.wg.Add(1)
go func() {
defer r.wg.Done()
ticker := time.NewTicker(time.Duration(r.params.Interval) * time.Second)
defer ticker.Stop()
r.fetchUrls() // Fire immediately on startup
for {
select {
case <-r.ctx.Done():
return
case <-ticker.C:
r.fetchUrls()
}
}
}()
}
func (r *restSource) fetchUrls() {
r.lock.Lock()
defer r.lock.Unlock()
for alias, targetURL := range r.urls {
var reqBody io.Reader
if r.params.Body != "" {
reqBody = bytes.NewBufferString(r.params.Body)
}
req, err := http.NewRequestWithContext(r.ctx, r.params.Method, targetURL, reqBody)
if err != nil {
log.Printf("REST Collector failed to build request [%s]: %v", alias, err)
continue
}
// Inject custom headers
for key, val := range r.params.Headers {
req.Header.Set(key, val)
}
resp, err := r.client.Do(req)
if err != nil {
log.Printf("REST Collector request failed [%s]: %v", alias, err)
continue
}
bodyBytes, err := io.ReadAll(resp.Body)
resp.Body.Close()
// Only publish successful responses to the Event Bus
if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 300 {
if err := r.bus.Publish(alias, bodyBytes); err != nil {
log.Printf("REST Bus Publish Error [%s]: %v", alias, err)
}
} else {
log.Printf("REST API Error [%s]: HTTP %d", targetURL, resp.StatusCode)
}
}
}
func (r *restSource) End() {
r.cancel()
r.wg.Wait()
log.Println("REST Collector successfully stopped.")
}
package shell
import (
"context"
"log"
"os/exec"
"sync"
"time"
"github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/collector"
"github.com/mitchellh/mapstructure"
)
type ShellParams struct {
Interval int `mapstructure:"interval"` // in seconds
}
type shellSource struct {
params ShellParams
commands map[string]string
lock sync.Mutex
bus bus.Bus
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// Ensure shellSource implements the Collector interface
var _ collector.Collector = (*shellSource)(nil)
func init() {
collector.Registry.Add("shell", New)
}
func New(p any) collector.Collector {
var opt ShellParams
if err := mapstructure.Decode(p, &opt); err != nil {
log.Printf("Shell collector config error: %v", err)
return nil
}
if opt.Interval <= 0 {
opt.Interval = 10
}
ctx, cancel := context.WithCancel(context.Background())
return &shellSource{
params: opt,
commands: make(map[string]string),
ctx: ctx,
cancel: cancel,
}
}
func (s *shellSource) RegisterTopic(name string, value string) {
s.lock.Lock()
defer s.lock.Unlock()
// For the shell collector, the "topic" is the actual shell command
s.commands[name] = value
}
func (s *shellSource) Start(b bus.Bus) {
s.bus = b
s.wg.Add(1)
go func() {
defer s.wg.Done()
ticker := time.NewTicker(time.Duration(s.params.Interval) * time.Second)
defer ticker.Stop()
s.executeCommands() // Initial run
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.executeCommands()
}
}
}()
}
func (s *shellSource) executeCommands() {
s.lock.Lock()
defer s.lock.Unlock()
for topic, cmdStr := range s.commands {
// Use "sh -c" to support piping and shell builtins (like awk/grep)
cmd := exec.CommandContext(s.ctx, "sh", "-c", cmdStr)
output, err := cmd.Output() // captures stdout
if err != nil {
log.Printf("Shell command failed [%s]: %v", cmdStr, err)
continue
}
// Publish standard output to the Event Bus
if err := s.bus.Publish(topic, output); err != nil {
log.Printf("Shell Bus Publish Error [%s]: %v", topic, err)
}
}
}
func (s *shellSource) End() {
s.cancel()
s.wg.Wait()
}
package webhook
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"log"
"net/http"
"sync"
"time"
"golang.org/x/time/rate"
"github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/collector"
"github.com/mitchellh/mapstructure"
)
type webhookParams struct {
Port int `mapstructure:"port"`
Token string `mapstructure:"token"`
MaxBodyMB int64 `mapstructure:"max_body_mb"`
RPS float64 `mapstructure:"rps"`
Burst int `mapstructure:"burst"`
DedupeTTL int `mapstructure:"dedupe_ttl"` // NEW: Time-to-live for duplicate detection in seconds
}
type webhookSource struct {
params webhookParams
routes map[string]string
lock sync.Mutex
bus bus.Bus
limiter *rate.Limiter
// NEW: Idempotency Cache State
seenCache map[string]time.Time
cacheLock sync.RWMutex
server *http.Server
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// Ensure webhookSource implements the Collector interface
var _ collector.Collector = (*webhookSource)(nil)
func init() {
collector.Registry.Add("webhook", New)
}
func New(p any) collector.Collector {
var opt webhookParams
if err := mapstructure.Decode(p, &opt); err != nil {
log.Printf("Webhook collector config error: %v", err)
return nil
}
if opt.Port == 0 {
opt.Port = 8080
}
if opt.MaxBodyMB <= 0 {
opt.MaxBodyMB = 2
}
if opt.RPS <= 0 {
opt.RPS = 10.0
}
if opt.Burst <= 0 {
opt.Burst = 20
}
if opt.DedupeTTL <= 0 {
opt.DedupeTTL = 300 // Default: Remember payloads for 5 minutes (300s)
}
ctx, cancel := context.WithCancel(context.Background())
return &webhookSource{
params: opt,
routes: make(map[string]string),
limiter: rate.NewLimiter(rate.Limit(opt.RPS), opt.Burst),
seenCache: make(map[string]time.Time),
ctx: ctx,
cancel: cancel,
}
}
func (w *webhookSource) RegisterTopic(alias, topic string) {
w.lock.Lock()
defer w.lock.Unlock()
w.routes[alias] = topic
}
func (w *webhookSource) Start(b bus.Bus) {
w.bus = b
w.wg.Add(1)
// Start the background cache cleanup routine
go w.cleanupCache()
mux := http.NewServeMux()
for alias, path := range w.routes {
a := alias
p := path
mux.HandleFunc(p, func(rw http.ResponseWriter, req *http.Request) {
if !w.limiter.Allow() {
http.Error(rw, "429 Too Many Requests", http.StatusTooManyRequests)
return
}
if req.Method != http.MethodPost && req.Method != http.MethodPut {
http.Error(rw, "method not allowed", http.StatusMethodNotAllowed)
return
}
if w.params.Token != "" {
authHeader := req.Header.Get("Authorization")
if authHeader != "Bearer "+w.params.Token {
http.Error(rw, "unauthorized", http.StatusUnauthorized)
return
}
}
maxBytes := w.params.MaxBodyMB * 1024 * 1024
req.Body = http.MaxBytesReader(rw, req.Body, maxBytes)
body, err := io.ReadAll(req.Body)
if err != nil {
http.Error(rw, "payload too large or malformed", http.StatusRequestEntityTooLarge)
return
}
defer req.Body.Close()
if len(body) == 0 {
rw.WriteHeader(http.StatusOK)
return
}
// ==========================================
// SECURITY 6: Deduplication / Idempotency
// ==========================================
// 1. Hash the body
hashBytes := sha256.Sum256(body)
hashStr := hex.EncodeToString(hashBytes[:])
// 2. Check if we have seen it recently
w.cacheLock.RLock()
_, seen := w.seenCache[hashStr]
w.cacheLock.RUnlock()
if seen {
// We already processed this exact payload recently.
// Return 200 OK so the sender stops retrying, but DO NOT publish to the bus.
log.Printf("Webhook [%s]: Ignored duplicate payload (Hash: %s)", a, hashStr[:8])
rw.WriteHeader(http.StatusOK)
rw.Write([]byte("OK"))
return
}
// 3. Mark it as seen
w.cacheLock.Lock()
w.seenCache[hashStr] = time.Now()
w.cacheLock.Unlock()
// ==========================================
if err := w.bus.Publish(a, body); err != nil {
log.Printf("Webhook Bus Publish Error [%s]: %v", a, err)
http.Error(rw, "internal routing error", http.StatusInternalServerError)
return
}
rw.WriteHeader(http.StatusOK)
rw.Write([]byte("OK"))
})
}
w.server = &http.Server{
Addr: fmt.Sprintf(":%d", w.params.Port),
Handler: mux,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 15 * time.Second,
}
go func() {
defer w.wg.Done()
log.Printf("Starting Webhook HTTP listener on port %d...", w.params.Port)
if err := w.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Printf("Webhook listener error: %v", err)
}
}()
}
// cleanupCache periodically sweeps the seenCache and removes expired hashes
// to prevent the application from slowly running out of memory.
func (w *webhookSource) cleanupCache() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
ttlDuration := time.Duration(w.params.DedupeTTL) * time.Second
for {
select {
case <-w.ctx.Done():
return
case <-ticker.C:
now := time.Now()
w.cacheLock.Lock()
for hash, timestamp := range w.seenCache {
if now.Sub(timestamp) > ttlDuration {
delete(w.seenCache, hash)
}
}
w.cacheLock.Unlock()
}
}
}
func (w *webhookSource) End() {
w.cancel() // Stops the cache cleanup goroutine
if w.server != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
w.server.Shutdown(ctx)
}
w.wg.Wait()
log.Println("Webhook Collector successfully stopped.")
}
package editor
import (
"embed"
"fmt"
"io"
"log"
"net/http"
"os"
)
//go:embed index.html
var ui embed.FS
// Start launches a standalone web server serving the Ace.js editor.
func Start(port int, configPath string) {
mux := http.NewServeMux()
// 1. Serve the embedded HTML UI
mux.Handle("/", http.FileServer(http.FS(ui)))
// 2. API Endpoint to Read/Write the YAML file
mux.HandleFunc("/api/config", func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
data, err := os.ReadFile(configPath)
if err != nil {
http.Error(w, "Failed to read config", http.StatusInternalServerError)
log.Printf("Editor: Failed to read config (%s): %v", configPath, err)
return
}
w.Header().Set("Content-Type", "text/yaml")
w.Write(data)
case http.MethodPost:
body, err := io.ReadAll(r.Body)
if err != nil || len(body) == 0 {
http.Error(w, "Invalid request body", http.StatusBadRequest)
log.Printf("Editor: Failed to read request body: %v", err)
return
}
// Save the file
if err := os.WriteFile(configPath, body, 0644); err != nil {
log.Printf("Editor: Failed to write config (%s): %v", configPath, err)
http.Error(w, "Failed to save file", http.StatusInternalServerError)
return
}
log.Println("Editor: config.yaml updated via Web UI")
w.WriteHeader(http.StatusOK)
w.Write([]byte("Saved"))
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
})
addr := fmt.Sprintf("0.0.0.0:%d", port)
log.Printf("Starting embedded config editor on http://%s", addr)
if err := http.ListenAndServe(addr, mux); err != nil {
log.Fatalf("Editor server crashed: %v", err)
}
}
package jsonpath
import (
"encoding/json"
"fmt"
"github.com/mekops-labs/siphon/pkg/parser"
"github.com/PaesslerAG/jsonpath"
)
type jsonpathParser struct{}
func init() {
parser.Register("jsonpath", func() parser.Parser { return &jsonpathParser{} })
}
func (_ *jsonpathParser) Parse(payload []byte, vars map[string]string) (map[string]interface{}, error) {
result := make(map[string]interface{})
v := interface{}(nil)
err := json.Unmarshal(payload, &v)
if err != nil {
return nil, fmt.Errorf("invalid json in payload: %w", err)
}
for varName, jsonPath := range vars {
value, err := jsonpath.Get(jsonPath, v)
if err != nil {
return nil, fmt.Errorf("jsonpath error for %s: %w", varName, err)
}
result[varName] = value
}
return result, nil
}
package parser
// Parser defines the interface for converting raw bytes into structured variables.
type Parser interface {
// Parse takes the raw payload and a map of variable extraction rules (e.g., {"temp": "$.T"})
// and returns the extracted variables.
Parse(payload []byte, vars map[string]string) (map[string]any, error)
}
// Registry holds the constructor functions for available parsers
var Registry = make(map[string]func() Parser)
func Register(name string, factory func() Parser) {
Registry[name] = factory
}
package regex
import (
"fmt"
"regexp"
"github.com/mekops-labs/siphon/pkg/parser"
)
type regexParser struct{}
func init() {
parser.Register("regex", func() parser.Parser { return ®exParser{} })
}
func (p *regexParser) Parse(payload []byte, vars map[string]string) (map[string]any, error) {
result := make(map[string]any)
text := string(payload)
for varName, regexStr := range vars {
re, err := regexp.Compile(regexStr)
if err != nil {
return nil, fmt.Errorf("invalid regex for %s: %w", varName, err)
}
match := re.FindStringSubmatch(text)
if match == nil {
result[varName] = "" // No match found, return empty string
continue
}
if len(match) > 1 {
result[varName] = match[1] // Use the first capturing group
} else {
result[varName] = match[0] // No capturing group, use the whole match
}
}
return result, nil
}
package pipeline
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"maps"
"sync"
"text/template"
"time"
"github.com/antonmedv/expr"
"github.com/antonmedv/expr/vm"
"github.com/go-co-op/gocron"
"github.com/mekops-labs/siphon/internal/config"
"github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/parser"
"github.com/mekops-labs/siphon/pkg/sink"
)
// compiledSink holds the pre-compiled formatters for maximum performance
type compiledSink struct {
Name string
Sink sink.Sink
Format string
ExprProg *vm.Program
Template *template.Template
}
type transformProgram struct {
VarName string
Program *vm.Program
}
// compiledPipeline holds the runtime state of a pipeline
type compiledPipeline struct {
Config config.PipelineConfig
Parser parser.Parser
Transforms []transformProgram
Sinks []compiledSink
state map[string]interface{} // Pipeline-specific state
topicData map[string]map[string]any // Stores latest parsed data for each topic, keyed by topic name
stateLock sync.Mutex // Lock for pipeline-specific state updates
dispatchMu sync.Mutex // Ensures ordered dispatch for stateful pipelines
}
type Runner struct {
bus bus.Bus
sinks map[string]sink.Sink
}
func NewRunner(b bus.Bus, sinks map[string]sink.Sink) *Runner {
return &Runner{
bus: b,
sinks: sinks,
}
}
// Close gracefully shuts down all sinks managed by the runner
func (r *Runner) Close() {
for name, s := range r.sinks {
if err := s.Close(); err != nil {
log.Printf("Failed to close sink '%s': %v", name, err)
}
}
}
// Start initializes all configured pipelines
func (r *Runner) Start(ctx context.Context, pipelines []config.PipelineConfig) {
for _, pCfg := range pipelines {
cp, err := r.compile(pCfg)
if err != nil {
log.Printf("Failed to compile pipeline '%s': %v", pCfg.Name, err)
continue
}
if pCfg.Type == "cron" {
log.Printf("Pipeline [%s]: Cron active with schedule '%s'", pCfg.Name, pCfg.Schedule)
go r.runCronPipeline(ctx, cp)
continue
}
// Event-driven pipeline
log.Printf("Pipeline [%s]: event-triggered type active", pCfg.Name)
go r.runEventPipeline(ctx, cp)
}
}
// compile prepares the expr programs and templates so they don't re-compile on every event
func (r *Runner) compile(cfg config.PipelineConfig) (*compiledPipeline, error) {
cp := &compiledPipeline{
Config: cfg,
Transforms: make([]transformProgram, 0, len(cfg.Transform)), // Initialize as slice
state: make(map[string]any), // Initialize state during compilation
topicData: make(map[string]map[string]any), // Initialize new map
}
// Setup Parser
if cfg.Parser != nil {
factory, ok := parser.Registry[cfg.Parser.Type]
if !ok {
return nil, fmt.Errorf("unknown parser type: %s", cfg.Parser.Type)
}
cp.Parser = factory()
}
// Helper function for compiling expr programs
compileExprProgram := func(exprStr string) (*vm.Program, error) {
exprOpts := []expr.Option{
expr.Env(map[string]any{}), // The environment will be passed at runtime
expr.AllowUndefinedVariables(),
}
return expr.Compile(exprStr, exprOpts...)
}
for _, transformation := range cfg.Transform {
var varName, exprStr string
for varName, exprStr = range transformation {
if varName == "" || exprStr == "" {
return nil, fmt.Errorf("invalid transform entry: variable name and expression must be non-empty")
}
break // We only expect one key-value pair per transform map
}
prog, err := compileExprProgram(exprStr)
if err != nil {
return nil, fmt.Errorf("failed to compile transform '%s': %w", varName, err)
}
cp.Transforms = append(cp.Transforms, transformProgram{VarName: varName, Program: prog})
}
// Compile Sinks
var sinkConfigs []config.PipelineSinkConfig
if len(cfg.Sinks) > 0 {
sinkConfigs = cfg.Sinks // V2 config places sinks directly under cron
}
for _, sCfg := range sinkConfigs {
targetSink, ok := r.sinks[sCfg.Name]
if !ok {
return nil, fmt.Errorf("sink not found: %s", sCfg.Name)
}
cs := compiledSink{Name: sCfg.Name, Sink: targetSink, Format: sCfg.Format}
switch sCfg.Format {
case "expr":
exprOpts := []expr.Option{
expr.Env(map[string]any{}),
expr.AllowUndefinedVariables(),
}
prog, err := expr.Compile(sCfg.Spec, exprOpts...)
if err != nil {
return nil, fmt.Errorf("failed to compile sink expr '%s': %w", cs.Name, err)
}
cs.ExprProg = prog
case "template":
fMap := template.FuncMap{
"now": func(f string) string { return time.Now().Format(f) },
}
tmpl, err := template.New(sCfg.Name).Funcs(fMap).Parse(sCfg.Spec)
if err != nil {
return nil, fmt.Errorf("failed to compile sink template '%s': %w", cs.Name, err)
}
cs.Template = tmpl
}
cp.Sinks = append(cp.Sinks, cs)
}
return cp, nil
}
// runEventPipeline listens to the bus and triggers processing for matching events
func (r *Runner) runEventPipeline(ctx context.Context, cp *compiledPipeline) {
for _, topic := range cp.Config.Topics {
log.Printf("Pipeline [%s]: Subscribing to topic '%s'", cp.Config.Name, topic)
ch := r.bus.Subscribe(topic)
// Spawn a listener for each topic this pipeline cares about
go func(t string, topicChan <-chan bus.Event) {
for {
select {
case <-ctx.Done():
return
case event := <-topicChan: // Pass the topic 't'
state := r.updateStateFromEvent(cp, t, event) // Update state from event
if state != nil {
r.dispatchState(cp, state) // Dispatch the specific state snapshot
}
}
}
}(topic, ch)
}
}
// runCronPipeline triggers on a schedule and gathers requested state
func (r *Runner) runCronPipeline(ctx context.Context, cp *compiledPipeline) {
if cp.Config.Stateful {
// For stateful cron pipelines, we need to continuously consume events
// and update the pipeline's state. The cron job then dispatches this state.
for _, topic := range cp.Config.Topics {
log.Printf("Pipeline [%s]: Subscribing to topic '%s' for state accumulation", cp.Config.Name, topic)
ch := r.bus.Subscribe(topic)
go func(topic string, topicChan <-chan bus.Event) {
for {
select {
case <-ctx.Done():
return
case event := <-topicChan:
// Process event to update the pipeline's state, no immediate dispatch
r.updateStateFromEvent(cp, topic, event) // This handles Ack/Nack
}
}
}(topic, ch)
}
// The cron job now only triggers the dispatch of the accumulated state
s := gocron.NewScheduler(time.UTC)
s.CronWithSeconds(cp.Config.Schedule).Do(func() {
r.dispatchAccumulatedState(cp)
})
s.StartAsync()
<-ctx.Done()
s.Stop()
return // Exit after setting up state accumulation and cron dispatch
}
// Original logic for stateless cron pipelines
// This part processes events that are available on the bus at the time of the cron tick.
var topicChannels []struct {
topic string
ch <-chan bus.Event
}
for _, topic := range cp.Config.Topics {
log.Printf("Pipeline [%s]: Subscribing to topic '%s'", cp.Config.Name, topic)
ch := r.bus.Subscribe(topic)
topicChannels = append(topicChannels, struct {
topic string
ch <-chan bus.Event
}{topic, ch})
}
s := gocron.NewScheduler(time.UTC)
s.CronWithSeconds(cp.Config.Schedule).Do(func() {
for _, tc := range topicChannels { // Iterate over topic-channel pairs
select {
case event := <-tc.ch: // For stateless cron, process and dispatch each event individually
state := r.updateStateFromEvent(cp, tc.topic, event) // Pass tc.topic
if state != nil {
r.dispatchState(cp, state)
}
case <-time.After(10 * time.Millisecond):
// No event received on this topic during this cron tick, continue to next topic
}
}
})
s.StartAsync()
<-ctx.Done()
s.Stop()
}
// updateStateFromEvent processes an incoming event and updates the pipeline's state.
// It returns a snapshot of the state for immediate dispatch if stateless or a stateful event pipeline,
// or nil if it's a stateful cron pipeline (as dispatch happens on cron tick).
func (r *Runner) updateStateFromEvent(cp *compiledPipeline, eventTopic string, event bus.Event) map[string]any {
var parsedEventData map[string]any
// 1. EXTRACT: Parse raw payload into variables
if cp.Parser != nil {
extracted, err := cp.Parser.Parse(event.Payload, cp.Config.Parser.Vars)
if err != nil {
log.Printf("Parse error in pipeline '%s' for topic '%s': %v", cp.Config.Name, eventTopic, err)
event.Nack() // Nack if parsing fails
return nil
}
parsedEventData = extracted
} else {
// If no parser, try to unmarshal raw payload as JSON into a generic "payload" variable
var genericPayload map[string]any
if err := json.Unmarshal(event.Payload, &genericPayload); err != nil {
log.Printf("Failed to unmarshal event payload as JSON in pipeline '%s' for topic '%s': %v", cp.Config.Name, eventTopic, err)
event.Nack() // Nack if we can't parse the payload at all
return nil
}
if genericPayload == nil {
genericPayload = make(map[string]any)
}
parsedEventData = genericPayload
}
if parsedEventData == nil {
parsedEventData = make(map[string]any)
}
cp.stateLock.Lock()
defer cp.stateLock.Unlock()
// Prepare environment for transforms
var transformEnv map[string]any
if cp.Config.Stateful {
// Update topic-specific data store so it's available via topicName.variableName in transforms
cp.topicData[eventTopic] = parsedEventData
transformEnv = maps.Clone(cp.state) // Start with pipeline-level accumulated state
// Inject all known topic data into the transform environment
for t, data := range cp.topicData {
transformEnv[t] = data
}
} else {
transformEnv = make(map[string]any)
// For stateless, only the current topic's data is available nested
transformEnv[eventTopic] = parsedEventData
}
maps.Copy(transformEnv, parsedEventData) // Merge current event's parsed data into transformEnv
// 2. TRANSFORM: Run expr formulas
for _, ot := range cp.Transforms { // Iterate over ordered slice
result, err := expr.Run(ot.Program, transformEnv)
if err != nil {
log.Printf("Transform error (%s) in '%s' for topic '%s': %v", ot.VarName, cp.Config.Name, eventTopic, err)
// Continue even if transform fails, as other transforms might succeed
// and we don't want to Nack the event for a transform error.
continue
}
transformEnv[ot.VarName] = result
}
event.Ack() // Acknowledge event after successfully updating state
if cp.Config.Stateful {
// Update pipeline's accumulated state (cp.state) with results from transforms
// Only update variables that are explicitly defined in transforms.
for _, ot := range cp.Transforms {
if val, ok := transformEnv[ot.VarName]; ok {
cp.state[ot.VarName] = val
}
}
}
// Decide what to return for dispatch
if cp.Config.Type == "cron" && cp.Config.Stateful {
// Stateful cron pipelines dispatch on cron tick, not immediately after event
return nil
}
var dispatchEnv map[string]any
if !cp.Config.Stateful {
// Stateless (event or cron): return parsed event data at root, transform results, and topic data.
dispatchEnv = make(map[string]any)
// 1. Add parsed event data at the root for direct access by current pipeline's sinks/expr
maps.Copy(dispatchEnv, parsedEventData)
// 2. Add transform results, potentially overwriting parsed data if var names clash
for _, ot := range cp.Transforms { // This loop is skipped if no transforms
if val, ok := transformEnv[ot.VarName]; ok {
dispatchEnv[ot.VarName] = val
}
}
// 3. Add the full parsed event data nested under its topic name for downstream pipelines
dispatchEnv[eventTopic] = parsedEventData
return dispatchEnv
} else {
// Stateful event (or cron fallback): return full accumulated state
dispatchEnv = maps.Clone(cp.state) // Start with pipeline-level accumulated state
for topic, data := range cp.topicData {
dispatchEnv[topic] = data
}
}
return dispatchEnv
}
// dispatchAccumulatedState dispatches the current accumulated state of a pipeline.
// It's typically called by cron jobs or after an event in event-driven pipelines.
func (r *Runner) dispatchAccumulatedState(cp *compiledPipeline) {
cp.stateLock.Lock()
dispatchEnv := maps.Clone(cp.state) // Start with pipeline-level accumulated state
// Add topic-specific data as nested maps
for topic, data := range cp.topicData {
dispatchEnv[topic] = data
}
cp.stateLock.Unlock()
r.dispatchState(cp, dispatchEnv)
}
// dispatchState handles the routing of a state snapshot to the bus or configured sinks
func (r *Runner) dispatchState(cp *compiledPipeline, state map[string]any) {
if state == nil {
return
}
if cp.Config.Stateful {
cp.dispatchMu.Lock()
defer cp.dispatchMu.Unlock()
}
if len(cp.Sinks) == 0 {
serializedState, _ := json.Marshal(state)
r.bus.Publish(cp.Config.Name, serializedState)
return
}
r.dispatchToSinks(cp, state)
}
// dispatchToSinks is the shared formatter logic
func (r *Runner) dispatchToSinks(cp *compiledPipeline, state map[string]any) {
for _, cs := range cp.Sinks {
var outputBytes []byte
if cs.Format == "expr" {
result, err := expr.Run(cs.ExprProg, state)
if err != nil {
log.Printf("Sink expr error [%s]: %v", cs.Name, err)
continue
}
outputBytes, _ = json.Marshal(result)
} else if cs.Format == "template" {
var buf bytes.Buffer
if err := cs.Template.Execute(&buf, state); err != nil {
log.Printf("Sink template error [%s]: %v", cs.Name, err)
continue
}
outputBytes = buf.Bytes()
} else {
// Default to sending the entire state as JSON if no format is specified
outputBytes, _ = json.Marshal(state)
}
if err := cs.Sink.Send(outputBytes); err != nil {
log.Printf("Sink Send error [%s]: %v", cs.Name, err)
}
}
}
package bus
import (
"fmt"
eventBus "github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/sink"
"github.com/mitchellh/mapstructure"
)
type bus struct {
bus eventBus.Bus
params busParams
}
type busParams struct {
Topic string `mapstructure:"topic"`
}
// Ensure bus implements sink.Sink
var _ sink.Sink = (*bus)(nil)
func init() {
sink.Registry.Add("bus", New)
}
func New(params any, eventBus eventBus.Bus) (sink.Sink, error) {
var opt busParams
if err := mapstructure.Decode(params, &opt); err != nil {
return nil, fmt.Errorf("failed to decode bus params: %w", err)
}
if opt.Topic == "" {
return nil, fmt.Errorf("bus requires a topic")
}
return &bus{bus: eventBus, params: opt}, nil
}
func (b *bus) Send(data []byte) error {
b.bus.Publish(b.params.Topic, data)
return nil
}
func (b *bus) Close() error {
// No resources to clean up for the bus sink
return nil
}
package gotify
import (
"bytes"
"encoding/json"
"fmt"
"html/template"
"log"
"net/http"
"net/url"
"time"
"github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/sink"
"github.com/mitchellh/mapstructure"
)
const defaultTimeout = 5 * time.Second
type gotifyParams struct {
URL string `mapstructure:"url"`
Token string `mapstructure:"token"`
Title string `mapstructure:"title"`
Priority int `mapstructure:"priority"`
}
type gotifySink struct {
params gotifyParams
client *http.Client
}
var _ sink.Sink = (*gotifySink)(nil)
func init() {
sink.Registry.Add("gotify", New)
}
func reformatTitle(in string) (string, error) {
fMap := template.FuncMap{
"now": func(f string) string { return time.Now().Format(f) },
}
tmpl, err := template.New("title").Funcs(fMap).Parse(in)
if err != nil {
return "", err
}
var buf string
title := bytes.NewBufferString(buf)
// Run the template to verify the output.
err = tmpl.Execute(title, nil)
if err != nil {
return "", err
}
return title.String(), nil
}
func New(params any, _ bus.Bus) (sink.Sink, error) {
var opt gotifyParams
if err := mapstructure.Decode(params, &opt); err != nil {
return nil, fmt.Errorf("failed to decode gotify params: %w", err)
}
if opt.URL == "" || opt.Token == "" {
return nil, fmt.Errorf("gotify sink: url and token are required fields")
}
return &gotifySink{
params: opt,
client: &http.Client{Timeout: defaultTimeout},
}, nil
}
func (s *gotifySink) Send(b []byte) error {
// Wrap the incoming bytes (the message) into Gotify's expected JSON format
payload := map[string]interface{}{
"message": string(b),
"title": s.params.Title,
"priority": s.params.Priority,
}
if payload["title"] == "" {
payload["title"] = "Siphon Alert" // Default title
formattedTitle, err := reformatTitle(s.params.Title)
if err != nil {
log.Printf("Failed to reformat gotify title: %v", err)
formattedTitle = s.params.Title // Fallback to original title if formatting fails
}
payload["title"] = formattedTitle
}
jsonPayload, _ := json.Marshal(payload)
targetURL, err := url.JoinPath(s.params.URL, "message")
if err != nil {
return fmt.Errorf("failed to construct target URL: %w", err)
}
req, err := http.NewRequest(http.MethodPost, targetURL, bytes.NewReader(jsonPayload))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Gotify-Key", s.params.Token)
resp, err := s.client.Do(req)
if err != nil {
return fmt.Errorf("gotify request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("gotify returned non-200 status: %d", resp.StatusCode)
}
return nil
}
func (s *gotifySink) Close() error { // No persistent connections to close for the gotify sink
return nil
}
package hass
import (
"crypto/tls"
"encoding/json"
"fmt"
"log"
"strings"
"time"
paho "github.com/eclipse/paho.mqtt.golang"
"github.com/mekops-labs/siphon/internal/utils"
"github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/sink"
"github.com/mitchellh/mapstructure"
)
type hassParams struct {
URL string `mapstructure:"url"`
User string `mapstructure:"user"`
Pass string `mapstructure:"pass"`
// Home Assistant Discovery specifics
DiscoveryPrefix string `mapstructure:"discovery_prefix"`
Component string `mapstructure:"component"`
NodeID string `mapstructure:"node_id"`
ObjectID string `mapstructure:"object_id"`
Name string `mapstructure:"name"`
DeviceClass string `mapstructure:"device_class"`
StateClass string `mapstructure:"state_class"`
UnitOfMeasurement string `mapstructure:"unit_of_measurement"`
ValueTemplate string `mapstructure:"value_template"`
Icon string `mapstructure:"icon"`
AvailabilityTopic string `mapstructure:"availability_topic"`
}
type hassSink struct {
params hassParams
client paho.Client
stateTopic string
availabilityTopic string
}
// Ensure hassSink implements sink.Sink
var _ sink.Sink = (*hassSink)(nil)
func init() {
sink.Registry.Add("hass", New)
}
func New(params any, _ bus.Bus) (sink.Sink, error) {
var opt hassParams
if err := mapstructure.Decode(params, &opt); err != nil {
return nil, fmt.Errorf("failed to decode hass sink params: %w", err)
}
// Apply Defaults
if opt.URL == "" {
return nil, fmt.Errorf("hass sink requires an mqtt url")
}
if opt.ObjectID == "" {
return nil, fmt.Errorf("hass sink requires an object_id")
}
if opt.DiscoveryPrefix == "" {
opt.DiscoveryPrefix = "homeassistant"
}
if opt.Component == "" {
opt.Component = "sensor"
}
if opt.Name == "" {
opt.Name = "Siphon " + strings.Title(strings.ReplaceAll(opt.ObjectID, "_", " "))
}
// Construct Topics
baseTopic := fmt.Sprintf("%s/%s", opt.DiscoveryPrefix, opt.Component)
if opt.NodeID != "" {
baseTopic = fmt.Sprintf("%s/%s", baseTopic, opt.NodeID)
}
baseTopic = fmt.Sprintf("%s/%s", baseTopic, opt.ObjectID)
configTopic := baseTopic + "/config"
stateTopic := baseTopic + "/state"
// Default to a sub-topic for availability if not explicitly provided
availabilityTopic := opt.AvailabilityTopic
if availabilityTopic == "" {
availabilityTopic = baseTopic + "/availability"
}
// Build the Discovery Config Payload
configPayload := map[string]interface{}{
"name": opt.Name,
"state_topic": stateTopic,
"availability_topic": availabilityTopic,
"payload_available": "online",
"payload_not_available": "offline",
"unique_id": fmt.Sprintf("siphon_%s", opt.ObjectID),
"device": map[string]interface{}{
"identifiers": []string{"siphon_etl_engine"},
"name": "Siphon ETL Engine",
"manufacturer": "Mekops Labs",
"model": "Siphon V2",
},
}
// Add optional fields
if opt.DeviceClass != "" {
configPayload["device_class"] = opt.DeviceClass
}
if opt.StateClass != "" {
configPayload["state_class"] = opt.StateClass
}
if opt.UnitOfMeasurement != "" {
configPayload["unit_of_measurement"] = opt.UnitOfMeasurement
}
if opt.ValueTemplate != "" {
configPayload["value_template"] = opt.ValueTemplate
}
if opt.Icon != "" {
configPayload["icon"] = opt.Icon
}
configBytes, err := json.Marshal(configPayload)
if err != nil {
return nil, fmt.Errorf("failed to marshal hass discovery config: %w", err)
}
// Setup MQTT Client
opts := paho.NewClientOptions()
opts.AddBroker(opt.URL)
opts.SetClientID("siphon-hass-sink-" + utils.RandomString(5))
if opt.User != "" {
opts.SetUsername(opt.User)
}
if opt.Pass != "" {
opts.SetPassword(opt.Pass)
}
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetConnectRetryInterval(5 * time.Second)
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})
opts.SetWill(availabilityTopic, "offline", 1, true)
// OnConnect Callback
opts.OnConnect = func(client paho.Client) {
log.Printf("HASS Sink connected. Publishing auto-discovery for '%s'", opt.ObjectID)
// 1. Publish Discovery Config
client.Publish(configTopic, 1, true, configBytes)
// 2. Publish Birth Message (Online)
client.Publish(availabilityTopic, 1, true, []byte("online"))
}
opts.OnConnectionLost = func(client paho.Client, err error) {
log.Printf("HASS Sink MQTT connection lost: %v", err)
}
client := paho.NewClient(opts)
token := client.Connect()
if !token.WaitTimeout(2 * time.Second) {
return nil, fmt.Errorf("hass sink timed out connecting to mqtt")
}
if err := token.Error(); err != nil {
return nil, fmt.Errorf("hass sink failed to connect to mqtt: %w", err)
}
return &hassSink{
params: opt,
client: client,
stateTopic: stateTopic,
availabilityTopic: availabilityTopic,
}, nil
}
func (s *hassSink) Send(b []byte) error {
if !s.client.IsConnected() {
return fmt.Errorf("hass sink is not connected to the broker")
}
// Optional: You could update the availability topic here just to be safe,
// but the OnConnect birth message combined with LWT usually handles this perfectly.
token := s.client.Publish(s.stateTopic, 0, false, b)
token.Wait()
if token.Error() != nil {
return fmt.Errorf("hass sink publish failed: %w", token.Error())
}
return nil
}
func (s *hassSink) Close() error {
if s.client != nil && s.client.IsConnected() {
log.Printf("HASS Sink [%s]: Closing connection and sending offline message", s.params.ObjectID)
// Explicitly publish offline message so HA updates immediately
s.client.Publish(s.availabilityTopic, 1, true, "offline").Wait()
s.client.Disconnect(250)
}
return nil
}
package iotplotter
import (
"bytes"
"fmt"
"net/http"
"net/url"
"time"
"github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/sink"
"github.com/mitchellh/mapstructure"
)
const (
defaultTimeout = 10 * time.Second
defaultURL = "https://iotplotter.com"
)
type plotterParams struct {
URL string `mapstructure:"url"`
ApiKey string `mapstructure:"apikey"`
Feed string `mapstructure:"feed"`
}
type plotterSink struct {
params plotterParams
client *http.Client
}
// Ensure plotterSink implements sink.Sink
var _ sink.Sink = (*plotterSink)(nil)
func init() {
sink.Registry.Add("iotplotter", New)
}
func New(params any, _ bus.Bus) (sink.Sink, error) {
var opt plotterParams
if err := mapstructure.Decode(params, &opt); err != nil {
return nil, fmt.Errorf("failed to decode iotplotter params: %w", err)
}
if opt.URL == "" {
opt.URL = defaultURL
}
if opt.ApiKey == "" || opt.Feed == "" {
return nil, fmt.Errorf("iotplotter requires both apikey and feed")
}
return &plotterSink{
params: opt,
client: &http.Client{Timeout: defaultTimeout},
}, nil
}
func (s *plotterSink) Send(b []byte) error {
// IoTPlotter v2 API endpoint format
targetURL, err := url.JoinPath(s.params.URL, "api", "v2", "feed", s.params.Feed)
if err != nil {
return fmt.Errorf("failed to construct target URL: %w", err)
}
req, err := http.NewRequest(http.MethodPost, targetURL, bytes.NewReader(b))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("api-key", s.params.ApiKey)
resp, err := s.client.Do(req)
if err != nil {
return fmt.Errorf("iotplotter request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("iotplotter returned non-2xx status: %d", resp.StatusCode)
}
return nil
}
func (s *plotterSink) Close() error { // No persistent connections to close for the iotplotter sink
return nil
}
package mqtt
import (
"crypto/tls"
"fmt"
"log"
"time"
paho "github.com/eclipse/paho.mqtt.golang" // Aliased to paho to avoid package name conflicts
"github.com/mekops-labs/siphon/internal/utils"
"github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/sink"
"github.com/mitchellh/mapstructure"
)
type mqttParams struct {
URL string `mapstructure:"url"`
User string `mapstructure:"user"`
Pass string `mapstructure:"pass"`
Topic string `mapstructure:"topic"`
QoS byte `mapstructure:"qos"`
Retained bool `mapstructure:"retained"`
}
type mqttSink struct {
params mqttParams
client paho.Client
}
// Ensure mqttSink implements sink.Sink
var _ sink.Sink = (*mqttSink)(nil)
func init() {
sink.Registry.Add("mqtt", New)
}
func New(params any, _ bus.Bus) (sink.Sink, error) {
var opt mqttParams
if err := mapstructure.Decode(params, &opt); err != nil {
return nil, fmt.Errorf("failed to decode mqtt sink params: %w", err)
}
if opt.URL == "" {
return nil, fmt.Errorf("mqtt sink requires a url")
}
if opt.Topic == "" {
return nil, fmt.Errorf("mqtt sink requires a topic")
}
opts := paho.NewClientOptions()
opts.AddBroker(opt.URL)
opts.SetClientID("siphon-sink-" + utils.RandomString(5))
if opt.User != "" {
opts.SetUsername(opt.User)
}
if opt.Pass != "" {
opts.SetPassword(opt.Pass)
}
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetConnectRetryInterval(5 * time.Second)
// Match collector's TLS setup for local SSL brokers if needed
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})
opts.OnConnect = func(client paho.Client) {
log.Printf("MQTT Sink connected to broker: %s", opt.URL)
}
opts.OnConnectionLost = func(client paho.Client, err error) {
log.Printf("MQTT Sink connection lost: %v", err)
}
client := paho.NewClient(opts)
token := client.Connect()
if !token.WaitTimeout(2 * time.Second) {
return nil, fmt.Errorf("mqtt sink timed out connecting to mqtt")
}
if err := token.Error(); err != nil {
return nil, fmt.Errorf("mqtt sink failed to connect: %w", err)
}
return &mqttSink{
params: opt,
client: client,
}, nil
}
func (s *mqttSink) Send(b []byte) error {
if !s.client.IsConnected() {
return fmt.Errorf("mqtt sink is not connected to the broker")
}
// Publish the raw bytes directly to the topic
token := s.client.Publish(s.params.Topic, s.params.QoS, s.params.Retained, b)
token.Wait()
if token.Error() != nil {
return fmt.Errorf("mqtt sink publish failed: %w", token.Error())
}
return nil
}
func (s *mqttSink) Close() error {
if s.client != nil && s.client.IsConnected() {
log.Printf("MQTT Sink: Disconnecting from broker %s", s.params.URL)
s.client.Disconnect(250) // Wait 250ms to ensure pending messages are sent
}
return nil
}
package ntfy
import (
"bytes"
"fmt"
"log"
"net/http"
"net/url"
"text/template"
"time"
"github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/sink"
"github.com/mitchellh/mapstructure"
)
const defaultTimeout = 5 * time.Second
type ntfyParams struct {
URL string `mapstructure:"url"`
Topic string `mapstructure:"topic"`
Token string `mapstructure:"token"`
Title string `mapstructure:"title"`
Priority int `mapstructure:"priority"`
}
type ntfySink struct {
params ntfyParams
client *http.Client
}
// Ensure ntfySink implements sink.Sink
var _ sink.Sink = (*ntfySink)(nil)
func init() {
sink.Registry.Add("ntfy", New)
}
func reformatTitle(in string) (string, error) {
fMap := template.FuncMap{
"now": func(f string) string { return time.Now().Format(f) },
}
tmpl, err := template.New("title").Funcs(fMap).Parse(in)
if err != nil {
return "", err
}
var buf string
title := bytes.NewBufferString(buf)
// Run the template to verify the output.
err = tmpl.Execute(title, nil)
if err != nil {
return "", err
}
return title.String(), nil
}
func New(params any, _ bus.Bus) (sink.Sink, error) {
var opt ntfyParams
if err := mapstructure.Decode(params, &opt); err != nil {
return nil, fmt.Errorf("failed to decode ntfy sink params: %w", err)
}
if opt.URL == "" || opt.Topic == "" {
return nil, fmt.Errorf("ntfy sink: url and topic are required fields")
}
if opt.Priority < 1 || opt.Priority > 5 {
opt.Priority = 3
}
return &ntfySink{
params: opt,
client: &http.Client{Timeout: defaultTimeout},
}, nil
}
func (s *ntfySink) Send(b []byte) error {
targetURL, err := url.JoinPath(s.params.URL, s.params.Topic)
if err != nil {
return fmt.Errorf("failed to construct target URL: %w", err)
}
req, err := http.NewRequest(http.MethodPost, targetURL, bytes.NewReader(b))
if err != nil {
return err
}
// Add headers based on config
if s.params.Token != "" {
req.Header.Set("Authorization", "Bearer "+s.params.Token)
}
if s.params.Title != "" {
formattedTitle, err := reformatTitle(s.params.Title)
if err != nil {
log.Printf("Failed to reformat ntfy title: %v", err)
formattedTitle = s.params.Title // Fallback to original title if formatting fails
}
req.Header.Set("Title", formattedTitle)
}
if s.params.Priority > 0 {
req.Header.Set("Priority", fmt.Sprintf("%d", s.params.Priority))
}
resp, err := s.client.Do(req)
if err != nil {
return fmt.Errorf("ntfy request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("ntfy returned non-2xx status: %d", resp.StatusCode)
}
return nil
}
func (s *ntfySink) Close() error { // No persistent connections to close for the ntfy sink
return nil
}
package sink
import "github.com/mekops-labs/siphon/pkg/bus"
type Sink interface {
Send(b []byte) error
Close() error
}
type SinkCfg struct {
Name string
Type string
Spec string
}
type Init func(params any, eventBus bus.Bus) (Sink, error)
type registry map[string]Init
var Registry = make(registry)
func (s registry) Add(name string, constructor Init) {
s[name] = constructor
}
package stdout
import (
"fmt"
"github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/sink"
)
type stdout struct{}
var _ sink.Sink = (*stdout)(nil)
func init() {
sink.Registry.Add("stdout", New)
}
func New(_ any, _ bus.Bus) (sink.Sink, error) {
return &stdout{}, nil
}
func (*stdout) Send(b []byte) error {
fmt.Println(string(b))
return nil
}
func (*stdout) Close() error {
return nil
}
package windy
import (
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"time"
"github.com/mekops-labs/siphon/pkg/bus"
"github.com/mekops-labs/siphon/pkg/sink"
"github.com/mitchellh/mapstructure"
)
var (
windyProto = "https"
windyHost = "stations.windy.com"
windyPath = "/api/v2/observation/update"
)
type windyParams struct {
Password string `mapstructure:"password"` // Station Password
ID any `mapstructure:"id"` // Station ID
}
type windySink struct {
params windyParams
client *http.Client
}
// Ensure windySink implements sink.Sink
var _ sink.Sink = (*windySink)(nil)
func init() {
sink.Registry.Add("windy", New)
}
func New(params any, _bus bus.Bus) (sink.Sink, error) {
var opt windyParams
if err := mapstructure.Decode(params, &opt); err != nil {
return nil, fmt.Errorf("failed to decode windy params: %w", err)
}
if opt.Password == "" {
return nil, fmt.Errorf("windy requires a station password")
}
if opt.ID == nil {
return nil, fmt.Errorf("windy requires a station id")
}
return &windySink{
params: opt,
client: &http.Client{Timeout: 10 * time.Second},
}, nil
}
func (s *windySink) Send(b []byte) error {
var payload map[string]interface{}
if err := json.Unmarshal(b, &payload); err != nil {
return fmt.Errorf("windy sink failed to parse incoming payload: %w", err)
}
query := url.Values{}
query.Add("station", fmt.Sprintf("%v", s.params.ID))
for key, val := range payload {
query.Add(key, fmt.Sprintf("%v", val))
}
targetURL := url.URL{
Scheme: windyProto,
Host: windyHost,
Path: windyPath,
RawQuery: query.Encode(),
}
req, err := http.NewRequest(http.MethodGet, targetURL.String(), nil)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+s.params.Password)
resp, err := s.client.Do(req)
if err != nil {
return fmt.Errorf("windy request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("windy returned non-2xx status: %d", resp.StatusCode)
} else {
log.Printf("Successfully sent data to Windy for station %s: %s", s.params.ID, targetURL.String())
}
return nil
}
func (s *windySink) Close() error { // No persistent connections to close for the windy sink
return nil
}
package main
import (
"bytes"
"fmt"
"go/format"
"io/fs"
"os"
"path/filepath"
"sort"
"strings"
)
var groups = []string{"collector", "parser", "dispatcher", "sink"}
func modulePath() (string, string, error) {
// search upward for go.mod so generator works when run from a subdirectory
dir, err := os.Getwd()
if err != nil {
return "", "", err
}
for {
modPath := filepath.Join(dir, "go.mod")
if _, statErr := os.Stat(modPath); statErr == nil {
b, err := os.ReadFile(modPath)
if err != nil {
return "", "", err
}
for _, line := range strings.Split(string(b), "\n") {
if strings.HasPrefix(strings.TrimSpace(line), "module ") {
return strings.TrimSpace(strings.TrimPrefix(line, "module ")), dir, nil
}
}
return "", "", fmt.Errorf("module line not found in %s", modPath)
}
parent := filepath.Dir(dir)
if parent == dir {
break
}
dir = parent
}
return "", "", fmt.Errorf("go.mod not found in any parent directory")
}
func main() {
mod, base, err := modulePath()
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(2)
}
var imports []string
for _, g := range groups {
dir := filepath.Join(base, "pkg", g)
ents, _ := os.ReadDir(dir)
for _, e := range ents {
if !e.IsDir() || strings.HasPrefix(e.Name(), ".") || strings.HasPrefix(e.Name(), "_") {
continue
}
p := filepath.Join(dir, e.Name())
hasGo := false
_ = filepath.WalkDir(p, func(path string, d fs.DirEntry, walkErr error) error {
if walkErr != nil || d.IsDir() {
return nil
}
if strings.HasSuffix(d.Name(), ".go") {
hasGo = true
return fs.SkipDir
}
return nil
})
if hasGo {
imports = append(imports, fmt.Sprintf("_ \"%s/pkg/%s/%s\"", mod, g, e.Name()))
}
}
}
sort.Strings(imports)
var buf bytes.Buffer
fmt.Fprintln(&buf, "package modules")
fmt.Fprintln(&buf)
fmt.Fprintln(&buf, "//go:generate go run ../../tools/genmodules")
fmt.Fprintln(&buf, "// Code generated by tools/genmodules; DO NOT EDIT.")
fmt.Fprintln(&buf)
fmt.Fprintln(&buf, "import (")
for _, imp := range imports {
fmt.Fprintln(&buf, "\t"+imp)
}
fmt.Fprintln(&buf, ")")
formatted, err := format.Source(buf.Bytes())
if err != nil {
// fallback to unformatted output
formatted = buf.Bytes()
}
outPath := filepath.Join(base, "internal", "modules", "modules.go")
if err := os.WriteFile(outPath, formatted, 0644); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(2)
}
}