package main
import "fmt"
func printHelp() {
fmt.Print("Usage: dish [FLAGS] SOURCE\n\n")
fmt.Print("A lightweight, one-shot socket checker\n\n")
fmt.Println("SOURCE must be a file path leading to a JSON file with a list of sockets to be checked or a URL leading to a remote JSON API from which the list of sockets can be retrieved")
fmt.Println("Use the `-h` flag for a list of available flags")
}
package main
import (
"errors"
"flag"
"log"
"os"
"go.vxn.dev/dish/pkg/alert"
"go.vxn.dev/dish/pkg/config"
)
func main() {
cfg, err := config.NewConfig(flag.CommandLine, os.Args[1:])
if err != nil {
// If the error is caused due to no source being provided, print help
if errors.Is(err, config.ErrNoSourceProvided) {
printHelp()
os.Exit(1)
}
// Otherwise, print the error
log.Print("error loading config: ", err)
return
}
log.Println("dish run: started")
// Run tests on sockets
res, err := runTests(cfg)
if err != nil {
log.Println(err)
return
}
// Submit results and alerts
alert.HandleAlerts(res.messengerText, res.results, res.failedCount, cfg)
if res.failedCount > 0 {
log.Println("dish run: some tests failed:\n", res.messengerText)
return
}
log.Println("dish run: all tests ok")
}
package main
import (
"fmt"
"sync"
"go.vxn.dev/dish/pkg/alert"
"go.vxn.dev/dish/pkg/config"
"go.vxn.dev/dish/pkg/netrunner"
"go.vxn.dev/dish/pkg/socket"
)
// testResults holds the overall results of all socket checks combined.
type testResults struct {
messengerText string
results *alert.Results
failedCount int
}
// fanInChannels collects results from multiple goroutines.
func fanInChannels(channels ...chan socket.Result) <-chan socket.Result {
var wg sync.WaitGroup
out := make(chan socket.Result)
// Start a goroutine for each channel
for _, channel := range channels {
wg.Add(1)
go func(ch <-chan socket.Result) {
defer wg.Done()
for result := range ch {
// Forward the result to the output channel
out <- result
}
}(channel)
}
// Close the output channel once all workers are done
go func() {
wg.Wait()
close(out)
}()
return out
}
// runTests orchestrates the process of checking of a list of sockets. It fetches the socket list, runs socket checks, collects results and returns them.
func runTests(cfg *config.Config) (*testResults, error) {
// Load socket list to run tests on
list, err := socket.FetchSocketList(cfg)
if err != nil {
return nil, fmt.Errorf("error loading socket list: %w", err)
}
// Print loaded sockets if flag is set in cfg
if cfg.Verbose {
socket.PrintSockets(list)
}
testResults := &testResults{
messengerText: "",
results: &alert.Results{Map: make(map[string]bool)},
failedCount: 0,
}
var (
// A slice of channels needs to be used here so that each goroutine has its own channel which it then closes upon performing the socket check. One shared channel for all goroutines would not work as it would not be clear which goroutine should close the channel.
channels = make([](chan socket.Result), len(list.Sockets))
wg sync.WaitGroup
i int
)
// Start goroutines for each socket test
for _, sock := range list.Sockets {
wg.Add(1)
channels[i] = make(chan socket.Result)
go netrunner.RunSocketTest(sock, channels[i], &wg, cfg)
i++
}
// Merge channels into one
results := fanInChannels(channels...)
wg.Wait()
// Collect results
for result := range results {
if !result.Passed || result.Error != nil {
testResults.failedCount++
}
if !result.Passed || cfg.TextNotifySuccess {
testResults.messengerText += alert.FormatMessengerText(result)
}
testResults.results.Map[result.Socket.ID] = result.Passed
}
return testResults, nil
}
// Package alert provides functionality to handle alert and result submission
// to different text (e.g. Telegram) and machine (e.g. webhooks) integration channels.
package alert
import (
"log"
"net/http"
"go.vxn.dev/dish/pkg/config"
)
func HandleAlerts(messengerText string, results *Results, failedCount int, config *config.Config) {
notifier := NewNotifier(http.DefaultClient, config)
if err := notifier.SendChatNotifications(messengerText, failedCount); err != nil {
log.Printf("some error(s) encountered when sending chat notifications: \n%v", err)
}
if err := notifier.SendMachineNotifications(results, failedCount); err != nil {
log.Printf("some error(s) encountered when sending machine notifications: \n%v", err)
}
}
package alert
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"go.vxn.dev/dish/pkg/config"
)
type apiSender struct {
httpClient HTTPClient
url string
headerName string
headerValue string
verbose bool
notifySuccess bool
}
func NewAPISender(httpClient HTTPClient, config *config.Config) (*apiSender, error) {
parsedURL, err := parseAndValidateURL(config.ApiURL, nil)
if err != nil {
return nil, err
}
return &apiSender{
httpClient: httpClient,
url: parsedURL.String(),
headerName: config.ApiHeaderName,
headerValue: config.ApiHeaderValue,
verbose: config.Verbose,
notifySuccess: config.MachineNotifySuccess,
}, nil
}
func (s *apiSender) send(m *Results, failedCount int) error {
// If no checks failed and success should not be notified, there is nothing to send
if failedCount == 0 && !s.notifySuccess {
if s.verbose {
log.Println("no sockets failed, nothing will be sent to remote API")
}
return nil
}
jsonData, err := json.Marshal(m)
if err != nil {
return fmt.Errorf("failed to marshal JSON: %w", err)
}
bodyReader := bytes.NewReader(jsonData)
if s.verbose {
log.Printf("prepared remote API data: %s", string(jsonData))
}
// If custom header & value is provided (mostly used for auth purposes), include it in the request
opts := []func(*submitOptions){}
if s.headerName != "" && s.headerValue != "" {
opts = append(opts, withHeader(s.headerName, s.headerValue))
}
err = handleSubmit(s.httpClient, http.MethodPost, s.url, bodyReader, opts...)
if err != nil {
return fmt.Errorf("error pushing results to remote API: %w", err)
}
log.Println("results pushed to remote API")
return nil
}
package alert
import (
"fmt"
"go.vxn.dev/dish/pkg/socket"
)
func FormatMessengerText(result socket.Result) string {
status := "failed"
if result.Passed {
status = "success"
}
text := fmt.Sprintf("• %s:%d", result.Socket.Host, result.Socket.Port)
if result.Socket.PathHTTP != "" {
text += result.Socket.PathHTTP
}
text += " -- " + status
if status == "failed" {
text += " \u274C" // ❌
text += " -- "
text += result.Error.Error()
} else {
text += " \u2705" // ✅
}
text += "\n"
return text
}
package alert
import (
"errors"
"io"
"log"
"net/http"
"go.vxn.dev/dish/pkg/config"
)
type Results struct {
Map map[string]bool `json:"dish_results"`
}
type ChatNotifier interface {
send(string, int) error
}
type MachineNotifier interface {
send(*Results, int) error
}
type notifier struct {
verbose bool
chatNotifiers []ChatNotifier
machineNotifiers []MachineNotifier
}
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
Get(url string) (*http.Response, error)
Post(url string, contentType string, body io.Reader) (*http.Response, error)
}
// NewNotifier creates a new instance of notifier. Based on the flags used, it spawns new instances of ChatNotifiers (e.g. Telegram) and MachineNotifiers (e.g. Webhooks) and stores them on the notifier struct to be used for alert notifications.
func NewNotifier(httpClient HTTPClient, config *config.Config) *notifier {
// Set chat integrations to be notified (e.g. Telegram)
notificationSenders := make([]ChatNotifier, 0)
// Telegram
if config.TelegramBotToken != "" && config.TelegramChatID != "" {
notificationSenders = append(notificationSenders, NewTelegramSender(httpClient, config))
}
// Set machine interface integrations to be notified (e.g. Webhooks)
payloadSenders := make([]MachineNotifier, 0)
// Remote API
if config.ApiURL != "" {
apiSender, err := NewAPISender(httpClient, config)
if err != nil {
log.Println("error creating new remote API sender:", err)
} else {
payloadSenders = append(payloadSenders, apiSender)
}
}
// Webhooks
if config.WebhookURL != "" {
webhookSender, err := NewWebhookSender(httpClient, config)
if err != nil {
log.Println("error creating new webhook sender:", err)
} else {
payloadSenders = append(payloadSenders, webhookSender)
}
}
// Pushgateway
if config.PushgatewayURL != "" {
pgwSender, err := NewPushgatewaySender(httpClient, config)
if err != nil {
log.Println("error creating new Pushgateway sender:", err)
} else {
payloadSenders = append(payloadSenders, pgwSender)
}
}
return ¬ifier{
verbose: config.Verbose,
chatNotifiers: notificationSenders,
machineNotifiers: payloadSenders,
}
}
func (n *notifier) SendChatNotifications(m string, failedCount int) error {
var errs []error
if len(n.chatNotifiers) == 0 {
log.Println("no chat notification receivers configured, no notifications will be sent")
return nil
}
for _, sender := range n.chatNotifiers {
if err := sender.send(m, failedCount); err != nil {
log.Printf("failed to send notification using %T: %v", sender, err)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}
func (n *notifier) SendMachineNotifications(m *Results, failedCount int) error {
var errs []error
if len(n.machineNotifiers) == 0 {
log.Println("no machine interface payload receivers configured, no notifications will be sent")
return nil
}
for _, sender := range n.machineNotifiers {
if err := sender.send(m, failedCount); err != nil {
log.Printf("failed to send notification using %T: %v", sender, err)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}
package alert
import (
"bytes"
"fmt"
"log"
"net/http"
"text/template"
"go.vxn.dev/dish/pkg/config"
)
const (
// jobName is the name of the Prometheus job used for dish results
jobName = "dish_results"
)
// messageTemplate is a template used for the Pushgateway message generated by the createMessage method.
var messageTemplate = `
#HELP failed sockets registered by dish
#TYPE dish_failed_count counter
dish_failed_count {{ .FailedCount }}
`
// messageData is a struct used to store Pushgateway message template variables.
type messageData struct {
FailedCount int
}
type pushgatewaySender struct {
httpClient HTTPClient
url string
instanceName string
verbose bool
notifySuccess bool
tmpl *template.Template
}
// NewPushgatewaySender validates the provided URL, prepares and parses a message template to be used for alerting and returns a new pushgatewaySender struct with the provided attributes.
func NewPushgatewaySender(httpClient HTTPClient, config *config.Config) (*pushgatewaySender, error) {
// Parse and validate the provided URL
parsedURL, err := parseAndValidateURL(config.PushgatewayURL, nil)
if err != nil {
return nil, err
}
// Prepare and parse the message template to be used when pushing results
tmpl, err := template.New("pushgatewayMessage").Parse(messageTemplate)
if err != nil {
return nil, fmt.Errorf("error creating Pushgateway message template: %w", err)
}
return &pushgatewaySender{
httpClient: httpClient,
url: parsedURL.String(),
instanceName: config.InstanceName,
verbose: config.Verbose,
notifySuccess: config.MachineNotifySuccess,
tmpl: tmpl,
}, nil
}
// createMessage returns a string containing the message text in Pushgateway-specific format.
func (s *pushgatewaySender) createMessage(failedCount int) (string, error) {
var buf bytes.Buffer
err := s.tmpl.Execute(&buf, messageData{FailedCount: failedCount})
if err != nil {
return "", fmt.Errorf("error executing Pushgateway message template: %w", err)
}
return buf.String(), nil
}
// Send pushes the results to Pushgateway.
//
// The first argument is needed to implement the MachineNotifier interface, however, it is ignored in favor of a custom message implementation via the createMessage method.
func (s *pushgatewaySender) send(_ *Results, failedCount int) error {
// If no checks failed and success should not be notified, there is nothing to send
if failedCount == 0 && !s.notifySuccess {
if s.verbose {
log.Println("no sockets failed, nothing will be sent to Pushgateway")
}
return nil
}
msg, err := s.createMessage(failedCount)
if err != nil {
return err
}
bodyReader := bytes.NewReader([]byte(msg))
formattedURL := s.url + "/metrics/job/" + jobName + "/instance/" + s.instanceName
err = handleSubmit(s.httpClient, http.MethodPut, formattedURL, bodyReader, withContentType("application/byte"))
if err != nil {
return fmt.Errorf("error pushing results to Pushgateway: %w", err)
}
log.Println("results pushed to Pushgateway")
return nil
}
package alert
import (
"fmt"
"log"
"net/http"
"net/url"
"go.vxn.dev/dish/pkg/config"
)
const (
baseURL = "https://api.telegram.org"
messageTitle = "\U0001F4E1 <b>dish run results</b>:" // 📡
)
type telegramSender struct {
httpClient HTTPClient
chatID string
token string
verbose bool
notifySuccess bool
}
func NewTelegramSender(httpClient HTTPClient, config *config.Config) *telegramSender {
return &telegramSender{
httpClient,
config.TelegramChatID,
config.TelegramBotToken,
config.Verbose,
config.TextNotifySuccess,
}
}
func (s *telegramSender) send(rawMessage string, failedCount int) error {
// If no checks failed and success should not be notified, there is nothing to send
if failedCount == 0 && !s.notifySuccess {
if s.verbose {
log.Printf("no sockets failed, nothing will be sent to Telegram")
}
return nil
}
// Construct the Telegram URL with params and the message
telegramURL := fmt.Sprintf("%s/bot%s/sendMessage", baseURL, s.token)
params := url.Values{}
params.Set("chat_id", s.chatID)
params.Set("disable_web_page_preview", "true")
params.Set("parse_mode", "HTML")
params.Set("text", messageTitle+"\n\n"+rawMessage)
fullURL := telegramURL + "?" + params.Encode()
err := handleSubmit(s.httpClient, http.MethodGet, fullURL, nil)
if err != nil {
return fmt.Errorf("error submitting Telegram alert: %w", err)
}
log.Println("Telegram alert sent")
return nil
}
package alert
import (
"fmt"
"io"
"log"
"net/http"
)
// submitOptions holds optional parameters for submitting HTTP requests using handleSubmit.
type submitOptions struct {
contentType string
headers map[string]string
}
// withContentType sets the provided contentType as the value of the Content-Type header.
func withContentType(contentType string) func(*submitOptions) {
return func(opts *submitOptions) {
opts.contentType = contentType
}
}
// withHeader adds the provided key:value header pair to the request's HTTP headers.
func withHeader(key string, value string) func(*submitOptions) {
return func(opts *submitOptions) {
if opts.headers == nil {
opts.headers = make(map[string]string)
}
opts.headers[key] = value
}
}
// handleSubmit submits an HTTP request using the provided client and method to the specified url with the provided body (can be nil if no body is required).
//
// By default, the application/json Content-Type header is used. A different content type can be specified using the withContentType functional option.
// Custom header key:value pairs can be specified using the withHeader functional option.
//
// The response status code is checked and if it is not within the range of success codes (2xx), the response body is logged and an error with the received status code is returned.
func handleSubmit(client HTTPClient, method string, url string, body io.Reader, opts ...func(*submitOptions)) error {
// Default options
options := submitOptions{
contentType: "application/json",
headers: make(map[string]string),
}
// Apply provided options to the defaults
for _, opt := range opts {
opt(&options)
}
// Prepare the request
req, err := http.NewRequest(method, url, body)
if err != nil {
return err
}
// Set content type
req.Header.Set("Content-Type", options.contentType)
// Apply provided custom headers
for k, v := range options.headers {
req.Header.Set(k, v)
}
// Submit the request
res, err := client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
// If status code is not within <200, 299>, log the body and return an error with the received status code
if res.StatusCode < http.StatusOK || res.StatusCode >= http.StatusMultipleChoices {
body, err := io.ReadAll(res.Body)
if err != nil {
log.Printf("error reading response body: %v", err)
} else {
log.Printf("response from %s: %s", url, string(body))
}
return fmt.Errorf("unexpected response code received (expected: %d, got: %d)", http.StatusOK, res.StatusCode)
}
return nil
}
package alert
import (
"fmt"
"net/url"
"slices"
"strings"
)
var defaultSchemes = []string{"http", "https"}
// parseAndValidateURL parses and validates a URL with strict scheme requirements.
// The supportedSchemes parameter allows customizing allowed protocols (defaults to HTTP/HTTPS if nil).
func parseAndValidateURL(rawURL string, supportedSchemes []string) (*url.URL, error) {
if strings.TrimSpace(rawURL) == "" {
return nil, fmt.Errorf("URL cannot be empty")
}
if supportedSchemes == nil {
supportedSchemes = defaultSchemes
}
// Parse the provided URL
parsedURL, err := url.ParseRequestURI(rawURL)
if err != nil {
return nil, fmt.Errorf("error parsing URL: %w", err)
}
// Validate the parsed URL
switch {
case parsedURL.Scheme == "":
return nil, fmt.Errorf("protocol must be specified in the provided URL (e.g. https://...)")
case !slices.Contains(supportedSchemes, parsedURL.Scheme):
return nil, fmt.Errorf("unsupported protocol provided in URL: %s (supported protocols: %v)", parsedURL.Scheme, supportedSchemes)
case parsedURL.Host == "":
return nil, fmt.Errorf("URL must contain a host")
}
return parsedURL, nil
}
package alert
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"go.vxn.dev/dish/pkg/config"
)
type webhookSender struct {
httpClient HTTPClient
url string
verbose bool
notifySuccess bool
}
func NewWebhookSender(httpClient HTTPClient, config *config.Config) (*webhookSender, error) {
parsedURL, err := parseAndValidateURL(config.WebhookURL, nil)
if err != nil {
return nil, err
}
return &webhookSender{
httpClient: httpClient,
url: parsedURL.String(),
verbose: config.Verbose,
notifySuccess: config.MachineNotifySuccess,
}, nil
}
func (s *webhookSender) send(m *Results, failedCount int) error {
// If no checks failed and success should not be notified, there is nothing to send
if failedCount == 0 && !s.notifySuccess {
if s.verbose {
log.Printf("no sockets failed, nothing will be sent to webhook")
}
return nil
}
jsonData, err := json.Marshal(m)
if err != nil {
return err
}
bodyReader := bytes.NewReader(jsonData)
if s.verbose {
log.Printf("prepared webhook data: %s", string(jsonData))
}
err = handleSubmit(s.httpClient, http.MethodPost, s.url, bodyReader)
if err != nil {
return fmt.Errorf("error pushing results to webhook: %w", err)
}
log.Println("results pushed to webhook")
return nil
}
// Package config provides access to configuration parameters
// set via flags or args.
package config
import (
"errors"
"flag"
"fmt"
)
// Config holds the configuration parameters.
type Config struct {
InstanceName string
ApiHeaderName string
ApiHeaderValue string
ApiCacheSockets bool
ApiCacheDirectory string
ApiCacheTTLMinutes uint
Source string
Verbose bool
PushgatewayURL string
TelegramBotToken string
TelegramChatID string
TimeoutSeconds uint
ApiURL string
WebhookURL string
TextNotifySuccess bool
MachineNotifySuccess bool
}
const (
defaultInstanceName = "generic-dish"
defaultApiHeaderName = ""
defaultApiHeaderValue = ""
defaultApiCacheSockets = false
defaultApiCacheDir = ".cache"
defaultApiCacheTTLMinutes = 10
defaultVerbose = false
defaultPushgatewayURL = ""
defaultTelegramBotToken = ""
defaultTelegramChatID = ""
defaultTimeoutSeconds = 10
defaultApiURL = ""
defaultWebhookURL = ""
defaultTextNotifySuccess = false
defaultMachineNotifySuccess = false
)
// ErrNoSourceProvided is returned when no source of sockets is specified.
var ErrNoSourceProvided = errors.New("no source provided")
// defineFlags defines flags on the provided FlagSet. The values of the flags are stored in the provided Config when parsed.
func defineFlags(fs *flag.FlagSet, cfg *Config) {
// System flags
fs.StringVar(&cfg.InstanceName, "name", defaultInstanceName, "a string, dish instance name")
fs.UintVar(&cfg.TimeoutSeconds, "timeout", defaultTimeoutSeconds, "an int, timeout in seconds for http and tcp calls")
fs.BoolVar(&cfg.Verbose, "verbose", defaultVerbose, "a bool, console stdout logging toggle")
// Integration channels flags
//
// General:
fs.BoolVar(&cfg.TextNotifySuccess, "textNotifySuccess", defaultTextNotifySuccess, "a bool, specifies whether successful checks with no failures should be reported to text channels")
fs.BoolVar(&cfg.MachineNotifySuccess, "machineNotifySuccess", defaultMachineNotifySuccess, "a bool, specifies whether successful checks with no failures should be reported to machine channels")
// API socket source:
fs.StringVar(&cfg.ApiHeaderName, "hname", defaultApiHeaderName, "a string, name of a custom additional header to be used when fetching and pushing results to the remote API (used mainly for auth purposes)")
fs.StringVar(&cfg.ApiHeaderValue, "hvalue", defaultApiHeaderValue, "a string, value of the custom additional header to be used when fetching and pushing results to the remote API (used mainly for auth purposes)")
fs.BoolVar(&cfg.ApiCacheSockets, "cache", defaultApiCacheSockets, "a bool, specifies whether to cache the socket list fetched from the remote API source")
fs.StringVar(&cfg.ApiCacheDirectory, "cacheDir", defaultApiCacheDir, "a string, specifies the directory used to cache the socket list fetched from the remote API source")
fs.UintVar(&cfg.ApiCacheTTLMinutes, "cacheTTL", defaultApiCacheTTLMinutes, "an int, time duration (in minutes) for which the cached list of sockets is valid")
// Pushgateway:
fs.StringVar(&cfg.PushgatewayURL, "target", defaultPushgatewayURL, "a string, result update path/URL to pushgateway, plaintext/byte output")
// Telegram:
fs.StringVar(&cfg.TelegramBotToken, "telegramBotToken", defaultTelegramBotToken, "a string, Telegram bot private token")
fs.StringVar(&cfg.TelegramChatID, "telegramChatID", defaultTelegramChatID, "a string, Telegram chat/channel ID")
// API for pushing results:
fs.StringVar(&cfg.ApiURL, "updateURL", defaultApiURL, "a string, API endpoint URL for pushing results")
// Webhooks:
fs.StringVar(&cfg.WebhookURL, "webhookURL", defaultWebhookURL, "a string, URL of webhook endpoint")
}
// NewConfig returns a new instance of Config.
//
// If a flag is used for a supported config parameter, the config parameter's value is set according to the provided flag. Otherwise, a default value is used for the given parameter.
func NewConfig(fs *flag.FlagSet, args []string) (*Config, error) {
cfg := &Config{
InstanceName: defaultInstanceName,
ApiHeaderName: defaultApiHeaderName,
ApiHeaderValue: defaultApiHeaderValue,
ApiCacheSockets: defaultApiCacheSockets,
ApiCacheDirectory: defaultApiCacheDir,
ApiCacheTTLMinutes: defaultApiCacheTTLMinutes,
Verbose: defaultVerbose,
PushgatewayURL: defaultPushgatewayURL,
TelegramBotToken: defaultTelegramBotToken,
TelegramChatID: defaultTelegramChatID,
TimeoutSeconds: defaultTimeoutSeconds,
ApiURL: defaultApiURL,
WebhookURL: defaultWebhookURL,
}
defineFlags(fs, cfg)
// Parse flags
if err := fs.Parse(args); err != nil {
return nil, fmt.Errorf("error parsing flags: %w", err)
}
// Parse args
parsedArgs := flag.CommandLine.Args()
// If no source is provided, return an error
if len(parsedArgs) == 0 {
return nil, ErrNoSourceProvided
}
// Otherwise, store the source in the config
cfg.Source = parsedArgs[0]
return cfg, nil
}
package logger
import (
"fmt"
"log"
"os"
)
// consoleLogger logs output to stderr.
type consoleLogger struct {
stdLogger *log.Logger
logLevel LogLevel
}
// NewConsoleLogger creates a new ConsoleLogger instance,
// If verbose is true, log level is set to TRACE (otherwise to INFO).
func NewConsoleLogger(verbose bool) *consoleLogger {
l := &consoleLogger{
stdLogger: log.New(os.Stderr, "", log.LstdFlags),
}
l.logLevel = INFO
if verbose {
l.logLevel = TRACE
}
return l
}
// log prints a message if the current log level allows it.
// It adds the passed prefix and formats the output if a format string is passed.
func (l *consoleLogger) log(level LogLevel, prefix string, format string, v ...any) {
if l.logLevel > level {
return
}
msg := prefix + " " + fmt.Sprint(v...)
if format != "" {
msg = prefix + " " + fmt.Sprintf(format, v...)
}
l.stdLogger.Print(msg)
if level == PANIC {
panic(msg)
}
}
func (l *consoleLogger) Trace(v ...any) {
l.log(TRACE, "TRACE:", "", v...)
}
func (l *consoleLogger) Tracef(f string, v ...any) {
l.log(TRACE, "TRACE:", f, v...)
}
func (l *consoleLogger) Debug(v ...any) {
l.log(DEBUG, "DEBUG:", "", v...)
}
func (l *consoleLogger) Debugf(f string, v ...any) {
l.log(DEBUG, "DEBUG:", f, v...)
}
func (l *consoleLogger) Info(v ...any) {
l.log(INFO, "INFO:", "", v...)
}
func (l *consoleLogger) Infof(f string, v ...any) {
l.log(INFO, "INFO:", f, v...)
}
func (l *consoleLogger) Warn(v ...any) {
l.log(WARN, "WARN:", "", v...)
}
func (l *consoleLogger) Warnf(f string, v ...any) {
l.log(WARN, "WARN:", f, v...)
}
func (l *consoleLogger) Error(v ...any) {
l.log(ERROR, "ERROR:", "", v...)
}
func (l *consoleLogger) Errorf(f string, v ...any) {
l.log(ERROR, "ERROR:", f, v...)
}
func (l *consoleLogger) Panic(v ...any) {
l.log(PANIC, "PANIC:", "", v...)
}
func (l *consoleLogger) Panicf(f string, v ...any) {
l.log(PANIC, "PANIC:", f, v...)
}
package netrunner
import (
"context"
"fmt"
"log"
"net"
"net/http"
"regexp"
"slices"
"strconv"
"sync"
"time"
"go.vxn.dev/dish/pkg/config"
"go.vxn.dev/dish/pkg/socket"
)
const agentVersion = "1.11"
// RunSocketTest is intended to be invoked in a separate goroutine.
// It runs a test for the given socket and sends the result through the given channel.
// If the test fails to start, the error is logged to STDOUT and no result is
// sent. On return, Done() is called on the WaitGroup and the channel is closed.
func RunSocketTest(sock socket.Socket, out chan<- socket.Result, wg *sync.WaitGroup, cfg *config.Config) {
defer wg.Done()
defer close(out)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.TimeoutSeconds)*time.Second)
defer cancel()
runner, err := NewNetRunner(sock, cfg.Verbose)
if err != nil {
log.Printf("failed to test socket: %v", err.Error())
return
}
out <- runner.RunTest(ctx, sock)
}
// NetRunner is used to run tests for a socket.
type NetRunner interface {
RunTest(ctx context.Context, sock socket.Socket) socket.Result
}
// NewNetRunner determines the protocol used for the socket test and creates a
// new NetRunner for it.
//
// Rules for the test method determination (first matching rule applies):
// - If socket.Host starts with 'http://' or 'https://', a HTTP runner is returned.
// - If socket.Port is between 1 and 65535, a TCP runner is returned.
// - If socket.Host is not empty, an ICMP runner is returned.
// - If none of the above conditions are met, a non-nil error is returned.
func NewNetRunner(sock socket.Socket, verbose bool) (NetRunner, error) {
exp, err := regexp.Compile("^(http|https)://")
if err != nil {
return nil, fmt.Errorf("regex compilation failed: %w", err)
}
if exp.MatchString(sock.Host) {
return httpRunner{client: &http.Client{}, verbose: verbose}, nil
}
if sock.Port >= 1 && sock.Port <= 65535 {
return tcpRunner{verbose: verbose}, nil
}
if sock.Host != "" {
return icmpRunner{verbose: verbose}, nil
}
return nil, fmt.Errorf("no protocol could be determined from the socket %s", sock.ID)
}
type tcpRunner struct {
verbose bool
}
// RunTest is used to test TCP sockets. It opens a TCP connection with the given socket.
// The test passes if the connection is successfully opened with no errors.
func (runner tcpRunner) RunTest(ctx context.Context, sock socket.Socket) socket.Result {
endpoint := net.JoinHostPort(sock.Host, strconv.Itoa(sock.Port))
if runner.verbose {
log.Println("TCP runner: connect: " + endpoint)
}
d := net.Dialer{}
conn, err := d.DialContext(ctx, "tcp", endpoint)
if err != nil {
return socket.Result{Socket: sock, Error: err, Passed: false}
}
defer conn.Close()
return socket.Result{Socket: sock, Passed: true}
}
type httpRunner struct {
client *http.Client
verbose bool
}
// RunTest is used to test HTTP/S endpoints exclusively. It executes a HTTP GET
// request to the given socket. The test passes if the request did not end with
// an error and the response status matches the expected HTTP codes.
func (runner httpRunner) RunTest(ctx context.Context, sock socket.Socket) socket.Result {
url := sock.Host + ":" + strconv.Itoa(sock.Port) + sock.PathHTTP
if runner.verbose {
log.Println("HTTP runner: connect:", url)
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return socket.Result{Socket: sock, Passed: false, Error: err}
}
req.Header.Set("User-Agent", fmt.Sprintf("dish/%s", agentVersion))
resp, err := runner.client.Do(req)
if err != nil {
return socket.Result{Socket: sock, Passed: false, Error: err}
}
defer resp.Body.Close()
if !slices.Contains(sock.ExpectedHTTPCodes, resp.StatusCode) {
err = fmt.Errorf("expected codes: %v, got %d", sock.ExpectedHTTPCodes, resp.StatusCode)
}
return socket.Result{
Socket: sock,
Passed: slices.Contains(sock.ExpectedHTTPCodes, resp.StatusCode),
ResponseCode: resp.StatusCode,
Error: err,
}
}
//go:build linux || darwin
package netrunner
import (
"bytes"
"context"
"errors"
"fmt"
"log"
"net"
"syscall"
"time"
"go.vxn.dev/dish/pkg/socket"
)
type icmpRunner struct {
verbose bool
}
// RunTest is used to test ICMP sockets. It sends an ICMP Echo Request to the given socket using
// non-privileged ICMP and verifies the reply. The test passes if the reply has the same payload
// as the request. Returns an error if the socket host cannot be resolved to an IPv4 address. If
// the host resolves to more than one address, only the first one is used.
func (runner icmpRunner) RunTest(ctx context.Context, sock socket.Socket) socket.Result {
if runner.verbose {
log.Printf("Resolving host '%s' to an IP address", sock.Host)
}
addr, err := net.DefaultResolver.LookupIP(ctx, "ip4", sock.Host)
if err != nil {
return socket.Result{Socket: sock, Error: fmt.Errorf("failed to resolve socket host: %w", err)}
}
ip := addr[0]
sockAddr := &syscall.SockaddrInet4{Addr: [4]byte(ip)}
// When using ICMP over DGRAM, Linux Kernel automatically sets (overwrites) and
// validates the id, seq and checksum of each incoming and outgoing ICMP message.
// This is largely non-documented in the linux man pages. The closest I found is:
// - (Linux news) lwn.net/Articles/420800/
// - (MacOS man) https://www.manpagez.com/man/4/icmp/
// - (Third-party article) https://inc0x0.com/icmp-ip-packets-ping-manually-create-and-send-icmp-ip-packets/
// "[...] most Linux systems use a unique identifier for every ping process, and sequence
// number is an increasing number within that process. Windows uses a fixed identifier, which
// varies between Windows versions, and a sequence number that is only reset at boot time."
sysSocket, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_DGRAM, syscall.IPPROTO_ICMP)
if err != nil {
return socket.Result{Socket: sock, Error: fmt.Errorf("failed to create a non-privileged icmp socket: %w", err)}
}
defer syscall.Close(sysSocket)
if d, ok := ctx.Deadline(); ok {
// Set a socket receive timeout.
t := syscall.NsecToTimeval(time.Until(d).Nanoseconds())
if err := syscall.SetsockoptTimeval(sysSocket, syscall.SOL_SOCKET, syscall.SO_RCVTIMEO, &t); err != nil {
return socket.Result{Socket: sock, Error: fmt.Errorf("failed to set a timeout on a non-privileged icmp socket: %w", err)}
}
}
payload := []byte("ICMP echo")
// ICMP Header size is 8 bytes.
reqBuf := make([]byte, 8+len(payload))
// ICMP Header.
// ID, Seq and Checksum are filled in automatically by the kernel.
reqBuf[0] = 8 // Type: Echo
copy(reqBuf[8:], payload)
if runner.verbose {
log.Println("ICMP runner: send to " + ip.String())
}
if err := syscall.Sendto(sysSocket, reqBuf, 0, sockAddr); err != nil {
return socket.Result{Socket: sock, Error: fmt.Errorf("failed to send an echo request: %w", err)}
}
// Maximum Transmission Unit (MTU) equals 1500 bytes.
// Recvfrom before writing to the buffer, checks its length (not capacity).
// If the length of the buffer is too small to fit the data then it's silently truncated.
replyBuf := make([]byte, 1500)
if runner.verbose {
log.Println("ICMP runner: recv from " + ip.String())
}
n, _, err := syscall.Recvfrom(sysSocket, replyBuf, 0)
if err != nil {
return socket.Result{Socket: sock, Error: fmt.Errorf("failed to receive a reply from a socket: %w", err)}
}
if n < 8 {
return socket.Result{Socket: sock, Error: fmt.Errorf("reply is too short: received %d bytes ", n)}
}
if replyBuf[0] != 0 {
return socket.Result{Socket: sock, Error: errors.New("received unexpected reply type")}
}
if !bytes.Equal(reqBuf[8:], replyBuf[8:n]) {
return socket.Result{Socket: sock, Error: errors.New("failed to validate echo reply: payloads are not equal")}
}
return socket.Result{Socket: sock, Passed: true}
}
package socket
import (
"crypto/sha1"
"encoding/hex"
"errors"
"io"
"os"
"path/filepath"
"time"
)
var ErrExpiredCache error = errors.New("cache file for this source is outdated")
// hashUrlToFilePath hashes given URL to create cache file path.
func hashUrlToFilePath(url string, cacheDir string) string {
hash := sha1.Sum([]byte(url))
filename := hex.EncodeToString(hash[:]) + ".json"
return filepath.Join(cacheDir, filename)
}
// saveSocketsToCache caches socket data to specified file in cache directory.
func saveSocketsToCache(filePath string, cacheDir string, data []byte) error {
// Make sure that cache directory exists
if err := os.MkdirAll(cacheDir, 0o600); err != nil {
return err
}
return os.WriteFile(filePath, data, 0o600)
}
// loadCachedSockets checks whether the cache is valid (not expired) and the returns the data stream and ModTime of the cache.
func loadCachedSockets(filePath string, cacheTTL uint) (io.ReadCloser, time.Time, error) {
info, err := os.Stat(filePath)
if err != nil {
return nil, time.Time{}, err
}
reader, err := os.Open(filePath)
if err != nil {
return nil, time.Time{}, err
}
cacheTime := info.ModTime()
if time.Since(cacheTime) > time.Duration(cacheTTL)*time.Minute {
return reader, cacheTime, ErrExpiredCache
}
return reader, cacheTime, nil
}
package socket
import (
"io"
"log"
"os"
"go.vxn.dev/dish/pkg/config"
)
// fetchSocketsFromFile opens a file and returns [io.ReadCloser] for reading from the stream.
func fetchSocketsFromFile(config *config.Config) (io.ReadCloser, error) {
file, err := os.Open(config.Source)
if err != nil {
return nil, err
}
// TODO: Replace with logger
if config.Verbose {
log.Printf("fetching sockets from file (%s)", config.Source)
}
return file, nil
}
package socket
import (
"bytes"
"fmt"
"io"
"log"
"net/http"
"time"
"go.vxn.dev/dish/pkg/config"
)
// copyBody copies the provided response body to the provided buffer. The body is closed.
func copyBody(body io.ReadCloser, buf *bytes.Buffer) error {
defer body.Close()
_, err := buf.ReadFrom(body)
return err
}
// fetchSocketsFromRemote loads the sockets to be monitored from a remote RESTful API endpoint. It returns the response body implementing [io.ReadCloser] for reading from and closing the stream.
//
// It uses a local cache if enabled and falls back to the network if the cache is not present or expired. If the network request fails and expired cache is available, it will be used.
//
// The url parameter must be a complete URL to a remote http/s server, including:
// - Scheme (http:// or https://)
// - Host (domain or IP)
// - Optional port
// - Optional path
// - Optional query parameters
//
// Example url: http://api.example.com:5569/stream?query=variable
func fetchSocketsFromRemote(config *config.Config) (io.ReadCloser, error) {
cacheFilePath := hashUrlToFilePath(config.Source, config.ApiCacheDirectory)
// If we do not want to cache sockets to the file, fetch from network
if !config.ApiCacheSockets {
return loadFreshSockets(config)
}
// If cache is enabled, try to load sockets from it first
cachedReader, cacheTime, err := loadCachedSockets(cacheFilePath, config.ApiCacheTTLMinutes)
// If cache is expired or fails to load, attempt to fetch fresh sockets
if err != nil {
log.Printf("cache unavailable for URL: %s (reason: %v); attempting network fetch", config.Source, err)
// Fetch fresh sockets from network
respBody, fetchErr := loadFreshSockets(config)
if fetchErr != nil {
log.Printf("fetching socket list from remote API at %s failed: %v", config.Source, fetchErr)
// If the fetch fails and expired cache is not available, return the fetch error
if err != ErrExpiredCache {
return nil, fetchErr
}
// If the fetch fails and expired cache is available, return the expired cache and log a warning
log.Printf("using expired cache from %s", cacheTime.Format(time.RFC3339))
return cachedReader, nil
}
var buf bytes.Buffer
err = copyBody(respBody, &buf)
if err != nil {
return nil, fmt.Errorf("failed to copy response body: %w", err)
}
if err := saveSocketsToCache(cacheFilePath, config.ApiCacheDirectory, buf.Bytes()); err != nil {
log.Printf("failed to save fetched sockets to cache: %v", err)
}
return io.NopCloser(bytes.NewReader(buf.Bytes())), nil
}
// Cache is valid (not expired, no error from file read)
log.Println("loading sockets from cache...")
return cachedReader, err
}
// loadFreshSockets fetches fresh sockets from the remote source.
func loadFreshSockets(config *config.Config) (io.ReadCloser, error) {
req, err := http.NewRequest(http.MethodGet, config.Source, nil)
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
client := &http.Client{}
req.Header.Set("Content-Type", "application/json")
if config.ApiHeaderName != "" && config.ApiHeaderValue != "" {
req.Header.Set(config.ApiHeaderName, config.ApiHeaderValue)
}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("network request failed: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to fetch sockets from remote source --- got %d (%s)", resp.StatusCode, resp.Status)
}
return resp.Body, nil
}
package socket
import (
"encoding/json"
"fmt"
"io"
"log"
"go.vxn.dev/dish/pkg/config"
)
type Result struct {
Socket Socket
Passed bool
ResponseCode int
Error error
}
type SocketList struct {
Sockets []Socket `json:"sockets"`
}
type Socket struct {
// ID is an unique identifier of such socket.
ID string `json:"id"`
// Socket name, unique identificator, snake_cased.
Name string `json:"socket_name"`
// Remote endpoint hostname or URL.
Host string `json:"host_name"`
// Remote port to assemble a socket.
Port int `json:"port_tcp"`
// HTTP Status Codes expected when giving the endpoint a HEAD/GET request.
ExpectedHTTPCodes []int `json:"expected_http_code_array"`
// HTTP Path to test on Host.
PathHTTP string `json:"path_http"`
}
// PrintSockets prints SocketList.
func PrintSockets(list *SocketList) {
log.Println("loaded sockets:")
for _, socket := range list.Sockets {
log.Printf("Host: %s, Port: %d, ExpectedHTTPCodes: %v", socket.Host, socket.Port, socket.ExpectedHTTPCodes)
}
}
// LoadSocketList decodes a JSON encoded SocketList from the provided io.ReadCloser.
func LoadSocketList(reader io.ReadCloser) (*SocketList, error) {
defer reader.Close()
list := new(SocketList)
if err := json.NewDecoder(reader).Decode(list); err != nil {
return nil, fmt.Errorf("error decoding sockets json: %w", err)
}
return list, nil
}
// FetchSocketList fetches the list of sockets to be checked. 'input' should be a string like '/path/filename.json', or an HTTP URL string.
func FetchSocketList(config *config.Config) (*SocketList, error) {
var reader io.ReadCloser
var err error
if IsFilePath(config.Source) {
reader, err = fetchSocketsFromFile(config)
} else {
reader, err = fetchSocketsFromRemote(config)
}
if err != nil {
return nil, err
}
return LoadSocketList(reader)
}
package socket
import "regexp"
// IsFilePath checks whether input is a file path or URL.
func IsFilePath(source string) bool {
matched, _ := regexp.MatchString("^(http|https)://", source)
return !matched
}
package testhelpers
import (
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
)
// This socket list is used across tests.
const TestSocketList string = `{ "sockets": [ { "id": "vxn_dev_https", "socket_name": "vxn-dev HTTPS", "host_name": "https://vxn.dev", "port_tcp": 443, "path_http": "/", "expected_http_code_array": [200] } ] }`
// TestFile creates a temporary file inside of a temporary directory with the provided filename and data.
// The temporary directory including the file is removed when the test using it finishes.
func TestFile(t *testing.T, filename string, data []byte) string {
t.Helper()
dir := t.TempDir()
filepath := filepath.Join(dir, filename)
err := os.WriteFile(filepath, data, 0o600)
if err != nil {
t.Fatal(err)
}
return filepath
}
// NewMockServer creates an httptest.Server that simulates an expected API endpoint.
// It validates a specific request header (if provided) and returns a customizable response.
func NewMockServer(t *testing.T, expectedHeaderName, expectedHeaderValue, responseBody string, statusCode int) *httptest.Server {
t.Helper()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if expectedHeaderName != "" && expectedHeaderValue != "" {
if r.Header.Get(expectedHeaderName) != expectedHeaderValue {
http.Error(w, `{"error":"Invalid or missing header"}`, http.StatusForbidden)
return
}
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
w.Write([]byte(responseBody))
}))
// Automatically shut down the server when the test completes or fails
t.Cleanup(func() {
server.Close()
})
return server
}
package testhelpers
import (
"fmt"
"io"
"net/http"
"strings"
)
const internalServerErrorResponse = "internal server error"
// SuccessStatusHTTPClient is a mock HTTP client implementation which returns HTTP Success (200) status responses.
type SuccessStatusHTTPClient struct{}
func (c *SuccessStatusHTTPClient) Do(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: 200,
Body: io.NopCloser(strings.NewReader("mocked Do response")),
}, nil
}
func (c *SuccessStatusHTTPClient) Get(url string) (*http.Response, error) {
return &http.Response{
StatusCode: 200,
Body: io.NopCloser(strings.NewReader("mocked Get response")),
}, nil
}
func (c *SuccessStatusHTTPClient) Post(url string, contentType string, body io.Reader) (*http.Response, error) {
return &http.Response{
StatusCode: 200,
Body: io.NopCloser(strings.NewReader("mocked Post response")),
}, nil
}
// ErrorStatusHTTPClient is a mock HTTP client implementation which returns HTTP Internal Server Error (500) status responses.
type ErrorStatusHTTPClient struct{}
func (e *ErrorStatusHTTPClient) Do(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: 500,
Body: io.NopCloser(strings.NewReader(internalServerErrorResponse)),
}, nil
}
func (e *ErrorStatusHTTPClient) Get(url string) (*http.Response, error) {
return &http.Response{
StatusCode: 500,
Body: io.NopCloser(strings.NewReader(internalServerErrorResponse)),
}, nil
}
func (e *ErrorStatusHTTPClient) Post(url, contentType string, body io.Reader) (*http.Response, error) {
return &http.Response{
StatusCode: 500,
Body: io.NopCloser(strings.NewReader(internalServerErrorResponse)),
}, nil
}
// FailureHTTPClient is a mock HTTP client implementation which simulates a failure to process the given request, returning nil as the response and an error.
type FailureHTTPClient struct{}
func (f *FailureHTTPClient) Do(req *http.Request) (*http.Response, error) {
return nil, fmt.Errorf("mocked Do error")
}
func (f *FailureHTTPClient) Get(url string) (*http.Response, error) {
return nil, fmt.Errorf("mocked Get error")
}
func (f *FailureHTTPClient) Post(url, contentType string, body io.Reader) (*http.Response, error) {
return nil, fmt.Errorf("mocked Post error")
}
// InvalidBodyReadCloser implements the [io.ReadCloser] interface and simulates an error when calling Read().
type InvalidBodyReadCloser struct{}
func (i *InvalidBodyReadCloser) Read(p []byte) (n int, err error) {
return 0, fmt.Errorf("invalid body")
}
func (i *InvalidBodyReadCloser) Close() error {
return nil
}
// InvalidResponseBodyHTTPClient is a mock HTTP client implementation which simulates an invalid response body to trigger an error when trying to read it.
type InvalidResponseBodyHTTPClient struct{}
func (i *InvalidResponseBodyHTTPClient) Do(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: 500,
Body: &InvalidBodyReadCloser{},
}, nil
}
func (i *InvalidResponseBodyHTTPClient) Get(url string) (*http.Response, error) {
return &http.Response{
StatusCode: 500,
Body: &InvalidBodyReadCloser{},
}, nil
}
func (i *InvalidResponseBodyHTTPClient) Post(url, contentType string, body io.Reader) (*http.Response, error) {
return &http.Response{
StatusCode: 500,
Body: &InvalidBodyReadCloser{},
}, nil
}