package main
import "fmt"
func printHelp() {
fmt.Print("Usage: dish [FLAGS] SOURCE\n\n")
fmt.Print("A lightweight, one-shot socket checker\n\n")
fmt.Println("SOURCE must be a file path leading to a JSON file with a list of sockets to be checked or a URL leading to a remote JSON API from which the list of sockets can be retrieved")
fmt.Println("Use the `-h` flag for a list of available flags")
}
// Package main implements a simple, one-shot monitoring tool which checks the provided target endpoints or sockets
// and reports the results to the configured channels (if any).
package main
import (
"errors"
"flag"
"fmt"
"io"
"os"
"go.vxn.dev/dish/pkg/alert"
"go.vxn.dev/dish/pkg/config"
"go.vxn.dev/dish/pkg/logger"
)
func run(fs *flag.FlagSet, args []string, _, stderr io.Writer) int {
cfg, err := config.NewConfig(fs, args)
if err != nil {
// If the error is caused due to no source being provided, print help
if errors.Is(err, config.ErrNoSourceProvided) {
printHelp()
return 1
}
// Otherwise, print the error
fmt.Fprintln(stderr, "error loading config:", err)
return 2
}
logger := logger.NewConsoleLogger(cfg.Verbose, nil)
logger.Info("dish run: started")
// Run tests on sockets
res, err := runTests(cfg, logger)
if err != nil {
logger.Error(err)
return 3
}
// Submit results and alerts
alerter := alert.NewAlerter(logger)
alerter.HandleAlerts(res.messengerText, res.results, res.failedCount, cfg)
if res.failedCount > 0 {
logger.Warn("dish run: some tests failed:\n", res.messengerText)
return 4
}
logger.Info("dish run: all tests ok")
return 0
}
func main() {
os.Exit(run(flag.CommandLine, os.Args[1:], os.Stdout, os.Stderr))
}
package main
import (
"fmt"
"sync"
"go.vxn.dev/dish/pkg/alert"
"go.vxn.dev/dish/pkg/config"
"go.vxn.dev/dish/pkg/logger"
"go.vxn.dev/dish/pkg/netrunner"
"go.vxn.dev/dish/pkg/socket"
)
// testResults holds the overall results of all socket checks combined.
type testResults struct {
messengerText string
results *alert.Results
failedCount int
}
// fanInChannels collects results from multiple goroutines.
func fanInChannels(channels ...chan socket.Result) <-chan socket.Result {
var wg sync.WaitGroup
out := make(chan socket.Result)
// Start a goroutine for each channel
for _, channel := range channels {
wg.Add(1)
go func(ch <-chan socket.Result) {
defer wg.Done()
for result := range ch {
// Forward the result to the output channel
out <- result
}
}(channel)
}
// Close the output channel once all workers are done
go func() {
wg.Wait()
close(out)
}()
return out
}
// runTests orchestrates the process of checking of a list of sockets. It fetches the socket list, runs socket checks, collects results and returns them.
func runTests(cfg *config.Config, logger logger.Logger) (*testResults, error) {
// Load socket list to run tests on
list, err := socket.FetchSocketList(cfg, logger)
if err != nil {
return nil, fmt.Errorf("error loading socket list: %w", err)
}
// Print loaded sockets if flag is set in cfg
if cfg.Verbose {
socket.PrintSockets(list, logger)
}
testResults := &testResults{
messengerText: "",
results: &alert.Results{Map: make(map[string]bool)},
failedCount: 0,
}
var (
// A slice of channels needs to be used here so that each goroutine has its own channel which it then closes upon performing the socket check. One shared channel for all goroutines would not work as it would not be clear which goroutine should close the channel.
channels = make([](chan socket.Result), len(list.Sockets))
wg sync.WaitGroup
i int
)
// Start goroutines for each socket test
for _, sock := range list.Sockets {
wg.Add(1)
channels[i] = make(chan socket.Result)
go netrunner.RunSocketTest(sock, channels[i], &wg, cfg, logger)
i++
}
// Merge channels into one
results := fanInChannels(channels...)
wg.Wait()
// Collect results
for result := range results {
if !result.Passed || result.Error != nil {
testResults.failedCount++
}
if !result.Passed || cfg.TextNotifySuccess {
testResults.messengerText += alert.FormatMessengerText(result)
}
testResults.results.Map[result.Socket.ID] = result.Passed
}
return testResults, nil
}
// Package alert provides functionality to handle alert and result submission
// to different text (e.g. Telegram) and machine (e.g. webhooks) integration channels.
package alert
import (
"net/http"
"go.vxn.dev/dish/pkg/config"
"go.vxn.dev/dish/pkg/logger"
)
// alerter provides a centralized method of alerting the configured channels with the results of the performed checks
// while hiding implementation details of the channels.
type alerter struct {
logger logger.Logger
}
// NewAlerter returns a new instance of alerter using the provided logger.
func NewAlerter(l logger.Logger) *alerter {
if l == nil {
return nil
}
return &alerter{
logger: l,
}
}
// HandleAlerts notifies all configured channels with either the provided message (if text channel) or the structured results (if machine channel).
func (a *alerter) HandleAlerts(messengerText string, results *Results, failedCount int, config *config.Config) {
if results == nil || config == nil {
return
}
notifier := NewNotifier(http.DefaultClient, config, a.logger)
if err := notifier.SendChatNotifications(messengerText, failedCount); err != nil {
a.logger.Errorf("some error(s) encountered when sending chat notifications: \n%v", err)
}
if err := notifier.SendMachineNotifications(results, failedCount); err != nil {
a.logger.Errorf("some error(s) encountered when sending machine notifications: \n%v", err)
}
}
package alert
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"go.vxn.dev/dish/pkg/config"
"go.vxn.dev/dish/pkg/logger"
)
type apiSender struct {
httpClient HTTPClient
url string
headerName string
headerValue string
verbose bool
notifySuccess bool
logger logger.Logger
}
func NewAPISender(httpClient HTTPClient, config *config.Config, logger logger.Logger) (*apiSender, error) {
parsedURL, err := parseAndValidateURL(config.ApiURL, nil)
if err != nil {
return nil, err
}
return &apiSender{
httpClient: httpClient,
url: parsedURL.String(),
headerName: config.ApiHeaderName,
headerValue: config.ApiHeaderValue,
verbose: config.Verbose,
notifySuccess: config.MachineNotifySuccess,
logger: logger,
}, nil
}
func (s *apiSender) send(m *Results, failedCount int) error {
// If no checks failed and success should not be notified, there is nothing to send
if failedCount == 0 && !s.notifySuccess {
s.logger.Debug("no sockets failed, nothing will be sent to remote API")
return nil
}
jsonData, err := json.Marshal(m)
if err != nil {
return fmt.Errorf("failed to marshal JSON: %w", err)
}
bodyReader := bytes.NewReader(jsonData)
s.logger.Debugf("prepared remote API data: %s", string(jsonData))
// If custom header & value is provided (mostly used for auth purposes), include it in the request
opts := []func(*submitOptions){}
if s.headerName != "" && s.headerValue != "" {
opts = append(opts, withHeader(s.headerName, s.headerValue))
}
res, err := handleSubmit(s.httpClient, http.MethodPost, s.url, bodyReader, opts...)
if err != nil {
return fmt.Errorf("error pushing results to remote API: %w", err)
}
err = handleRead(res, s.logger)
if err != nil {
return err
}
s.logger.Info("results pushed to remote API")
return nil
}
package alert
import (
"fmt"
"go.vxn.dev/dish/pkg/socket"
)
func FormatMessengerText(result socket.Result) string {
status := "failed"
if result.Passed {
status = "success"
}
text := fmt.Sprintf("• %s:%d", result.Socket.Host, result.Socket.Port)
if result.Socket.PathHTTP != "" {
text += result.Socket.PathHTTP
}
text += " -- " + status
if status == "failed" {
text += " \u274C" // ❌
text += " -- "
text += result.Error.Error()
} else {
text += " \u2705" // ✅
}
text += "\n"
return text
}
package alert
import (
"errors"
"io"
"net/http"
"go.vxn.dev/dish/pkg/config"
"go.vxn.dev/dish/pkg/logger"
)
type Results struct {
Map map[string]bool `json:"dish_results"`
}
type ChatNotifier interface {
send(string, int) error
}
type MachineNotifier interface {
send(*Results, int) error
}
type notifier struct {
verbose bool
chatNotifiers []ChatNotifier
machineNotifiers []MachineNotifier
logger logger.Logger
}
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
Get(url string) (*http.Response, error)
Post(url string, contentType string, body io.Reader) (*http.Response, error)
}
// NewNotifier creates a new instance of notifier. Based on the flags used, it spawns new instances of ChatNotifiers (e.g. Telegram) and MachineNotifiers (e.g. Webhooks) and stores them on the notifier struct to be used for alert notifications.
func NewNotifier(httpClient HTTPClient, config *config.Config, logger logger.Logger) *notifier {
if logger == nil {
return nil
}
if config == nil {
logger.Error("nil pointer to config")
return nil
}
// Set chat integrations to be notified (e.g. Telegram)
notificationSenders := make([]ChatNotifier, 0)
// Telegram
if config.TelegramBotToken != "" && config.TelegramChatID != "" {
notificationSenders = append(notificationSenders, NewTelegramSender(httpClient, config, logger))
}
// Set machine interface integrations to be notified (e.g. Webhooks)
payloadSenders := make([]MachineNotifier, 0)
// Remote API
if config.ApiURL != "" {
apiSender, err := NewAPISender(httpClient, config, logger)
if err != nil {
logger.Error("error creating new remote API sender: ", err)
} else {
payloadSenders = append(payloadSenders, apiSender)
}
}
// Webhooks
if config.WebhookURL != "" {
webhookSender, err := NewWebhookSender(httpClient, config, logger)
if err != nil {
logger.Error("error creating new webhook sender: ", err)
} else {
payloadSenders = append(payloadSenders, webhookSender)
}
}
// Pushgateway
if config.PushgatewayURL != "" {
pgwSender, err := NewPushgatewaySender(httpClient, config, logger)
if err != nil {
logger.Error("error creating new Pushgateway sender:", err)
} else {
payloadSenders = append(payloadSenders, pgwSender)
}
}
return ¬ifier{
verbose: config.Verbose,
chatNotifiers: notificationSenders,
machineNotifiers: payloadSenders,
logger: logger,
}
}
func (n *notifier) SendChatNotifications(m string, failedCount int) error {
var errs []error
if len(n.chatNotifiers) == 0 {
n.logger.Debug("no chat notification receivers configured, no notifications will be sent")
return nil
}
for _, sender := range n.chatNotifiers {
if err := sender.send(m, failedCount); err != nil {
n.logger.Errorf("failed to send notification using %T: %v", sender, err)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}
func (n *notifier) SendMachineNotifications(m *Results, failedCount int) error {
var errs []error
if len(n.machineNotifiers) == 0 {
n.logger.Debug("no machine interface payload receivers configured, no notifications will be sent")
return nil
}
for _, sender := range n.machineNotifiers {
if err := sender.send(m, failedCount); err != nil {
n.logger.Errorf("failed to send notification using %T: %v", sender, err)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}
package alert
import (
"bytes"
"fmt"
"net/http"
"text/template"
"go.vxn.dev/dish/pkg/config"
"go.vxn.dev/dish/pkg/logger"
)
const (
// jobName is the name of the Prometheus job used for dish results
jobName = "dish_results"
)
// messageTemplate is a template used for the Pushgateway message generated by the createMessage method.
var messageTemplate = `
#HELP failed sockets registered by dish
#TYPE dish_failed_count counter
dish_failed_count {{ .FailedCount }}
`
// messageData is a struct used to store Pushgateway message template variables.
type messageData struct {
FailedCount int
}
type pushgatewaySender struct {
httpClient HTTPClient
url string
instanceName string
verbose bool
notifySuccess bool
tmpl *template.Template
logger logger.Logger
}
// NewPushgatewaySender validates the provided URL, prepares and parses a message template to be used for alerting and returns a new pushgatewaySender struct with the provided attributes.
func NewPushgatewaySender(httpClient HTTPClient, config *config.Config, logger logger.Logger) (*pushgatewaySender, error) {
parsedURL, err := parseAndValidateURL(config.PushgatewayURL, nil)
if err != nil {
return nil, err
}
// Prepare and parse the message template to be used when pushing results
tmpl, err := template.New("pushgatewayMessage").Parse(messageTemplate)
if err != nil {
return nil, fmt.Errorf("error creating Pushgateway message template: %w", err)
}
return &pushgatewaySender{
httpClient: httpClient,
url: parsedURL.String(),
instanceName: config.InstanceName,
verbose: config.Verbose,
notifySuccess: config.MachineNotifySuccess,
tmpl: tmpl,
logger: logger,
}, nil
}
// createMessage returns a string containing the message text in Pushgateway-specific format.
func (s *pushgatewaySender) createMessage(failedCount int) (string, error) {
var buf bytes.Buffer
err := s.tmpl.Execute(&buf, messageData{FailedCount: failedCount})
if err != nil {
return "", fmt.Errorf("error executing Pushgateway message template: %w", err)
}
return buf.String(), nil
}
// Send pushes the results to Pushgateway.
//
// The first argument is needed to implement the MachineNotifier interface, however, it is ignored in favor of a custom message implementation via the createMessage method.
func (s *pushgatewaySender) send(_ *Results, failedCount int) error {
// If no checks failed and success should not be notified, there is nothing to send
if failedCount == 0 && !s.notifySuccess {
s.logger.Debug("no sockets failed, nothing will be sent to Pushgateway")
return nil
}
msg, err := s.createMessage(failedCount)
if err != nil {
return err
}
bodyReader := bytes.NewReader([]byte(msg))
formattedURL := s.url + "/metrics/job/" + jobName + "/instance/" + s.instanceName
res, err := handleSubmit(s.httpClient, http.MethodPut, formattedURL, bodyReader, withContentType("application/byte"))
if err != nil {
return fmt.Errorf("error pushing results to Pushgateway: %w", err)
}
err = handleRead(res, s.logger)
if err != nil {
return err
}
s.logger.Info("results pushed to Pushgateway")
return nil
}
package alert
import (
"fmt"
"net/http"
"net/url"
"go.vxn.dev/dish/pkg/config"
"go.vxn.dev/dish/pkg/logger"
)
const (
baseURL = "https://api.telegram.org"
messageTitle = "\U0001F4E1 <b>dish run results</b>:" // 📡
)
type telegramSender struct {
httpClient HTTPClient
chatID string
token string
verbose bool
notifySuccess bool
logger logger.Logger
}
func NewTelegramSender(httpClient HTTPClient, config *config.Config, logger logger.Logger) *telegramSender {
return &telegramSender{
httpClient: httpClient,
chatID: config.TelegramChatID,
token: config.TelegramBotToken,
verbose: config.Verbose,
notifySuccess: config.TextNotifySuccess,
logger: logger,
}
}
func (s *telegramSender) send(rawMessage string, failedCount int) error {
// If no checks failed and success should not be notified, there is nothing to send
if failedCount == 0 && !s.notifySuccess {
s.logger.Debug("no sockets failed, nothing will be sent to Telegram")
return nil
}
// Construct the Telegram URL with params and the message
telegramURL := fmt.Sprintf("%s/bot%s/sendMessage", baseURL, s.token)
params := url.Values{}
params.Set("chat_id", s.chatID)
params.Set("disable_web_page_preview", "true")
params.Set("parse_mode", "HTML")
params.Set("text", messageTitle+"\n\n"+rawMessage)
fullURL := telegramURL + "?" + params.Encode()
res, err := handleSubmit(s.httpClient, http.MethodGet, fullURL, nil)
if err != nil {
return fmt.Errorf("error submitting Telegram alert: %w", err)
}
err = handleRead(res, s.logger)
if err != nil {
return err
}
s.logger.Info("Telegram alert sent")
return nil
}
package alert
import (
"fmt"
"io"
"net/http"
"go.vxn.dev/dish/pkg/logger"
)
// submitOptions holds optional parameters for submitting HTTP requests using handleSubmit.
type submitOptions struct {
contentType string
headers map[string]string
}
// withContentType sets the provided contentType as the value of the Content-Type header.
func withContentType(contentType string) func(*submitOptions) {
return func(opts *submitOptions) {
opts.contentType = contentType
}
}
// withHeader adds the provided key:value header pair to the request's HTTP headers.
func withHeader(key string, value string) func(*submitOptions) {
return func(opts *submitOptions) {
if opts.headers == nil {
opts.headers = make(map[string]string)
}
opts.headers[key] = value
}
}
// handleSubmit submits an HTTP request using the provided client and method to the specified url with the provided body (can be nil if no body is required) and returns the response.
//
// By default, the application/json Content-Type header is used. A different content type can be specified using the withContentType functional option.
// Custom header key:value pairs can be specified using the withHeader functional option.
func handleSubmit(client HTTPClient, method string, url string, body io.Reader, opts ...func(*submitOptions)) (*http.Response, error) {
// Default options
options := submitOptions{
contentType: "application/json",
headers: make(map[string]string),
}
// Apply provided options, if any, to the defaults
for _, opt := range opts {
opt(&options)
}
req, err := http.NewRequest(method, url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", options.contentType)
// Apply provided custom headers, if any
for k, v := range options.headers {
req.Header.Set(k, v)
}
res, err := client.Do(req)
if err != nil {
return nil, err
}
return res, nil
}
// handleRead reads an HTTP response, ensures the status code is within the expected <200, 299> range and if not, logs the response body.
func handleRead(res *http.Response, logger logger.Logger) error {
defer res.Body.Close()
if res.StatusCode < http.StatusOK || res.StatusCode >= http.StatusMultipleChoices {
body, err := io.ReadAll(res.Body)
if err != nil {
logger.Errorf("error reading response body: %v", err)
} else {
logger.Warnf("response from %s: %s", res.Request.URL, string(body))
}
return fmt.Errorf("unexpected response code received (expected: 200-299, got: %d)", res.StatusCode)
}
return nil
}
package alert
import (
"fmt"
"net/url"
"slices"
"strings"
)
var defaultSchemes = []string{"http", "https"}
// parseAndValidateURL parses and validates a URL with strict scheme requirements.
// The supportedSchemes parameter allows customizing allowed protocols (defaults to HTTP/HTTPS if nil).
func parseAndValidateURL(rawURL string, supportedSchemes []string) (*url.URL, error) {
if strings.TrimSpace(rawURL) == "" {
return nil, fmt.Errorf("URL cannot be empty")
}
if supportedSchemes == nil {
supportedSchemes = defaultSchemes
}
parsedURL, err := url.ParseRequestURI(rawURL)
if err != nil {
return nil, fmt.Errorf("error parsing URL: %w", err)
}
switch {
case parsedURL.Scheme == "":
return nil, fmt.Errorf("protocol must be specified in the provided URL (e.g. https://...)")
case !slices.Contains(supportedSchemes, parsedURL.Scheme):
return nil, fmt.Errorf("unsupported protocol provided in URL: %s (supported protocols: %v)", parsedURL.Scheme, supportedSchemes)
case parsedURL.Host == "":
return nil, fmt.Errorf("URL must contain a host")
}
return parsedURL, nil
}
package alert
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"go.vxn.dev/dish/pkg/config"
"go.vxn.dev/dish/pkg/logger"
)
type webhookSender struct {
httpClient HTTPClient
url string
verbose bool
notifySuccess bool
logger logger.Logger
}
func NewWebhookSender(httpClient HTTPClient, config *config.Config, logger logger.Logger) (*webhookSender, error) {
parsedURL, err := parseAndValidateURL(config.WebhookURL, nil)
if err != nil {
return nil, err
}
return &webhookSender{
httpClient: httpClient,
url: parsedURL.String(),
verbose: config.Verbose,
notifySuccess: config.MachineNotifySuccess,
logger: logger,
}, nil
}
func (s *webhookSender) send(m *Results, failedCount int) error {
// If no checks failed and success should not be notified, there is nothing to send
if failedCount == 0 && !s.notifySuccess {
s.logger.Debug("no sockets failed, nothing will be sent to webhook")
return nil
}
jsonData, err := json.Marshal(m)
if err != nil {
return err
}
bodyReader := bytes.NewReader(jsonData)
s.logger.Debugf("prepared webhook data: %s", string(jsonData))
res, err := handleSubmit(s.httpClient, http.MethodPost, s.url, bodyReader)
if err != nil {
return fmt.Errorf("error pushing results to webhook: %w", err)
}
err = handleRead(res, s.logger)
if err != nil {
return err
}
s.logger.Info("results pushed to webhook")
return nil
}
// Package config provides access to configuration parameters
// set via flags or args.
package config
import (
"errors"
"flag"
"fmt"
)
// Config holds the configuration parameters.
type Config struct {
InstanceName string
ApiHeaderName string
ApiHeaderValue string
ApiCacheSockets bool
ApiCacheDirectory string
ApiCacheTTLMinutes uint
Source string
Verbose bool
PushgatewayURL string
TelegramBotToken string
TelegramChatID string
TimeoutSeconds uint
ApiURL string
WebhookURL string
TextNotifySuccess bool
MachineNotifySuccess bool
}
const (
defaultInstanceName = "generic-dish"
defaultApiHeaderName = ""
defaultApiHeaderValue = ""
defaultApiCacheSockets = false
defaultApiCacheDir = ".cache"
defaultApiCacheTTLMinutes = 10
defaultVerbose = false
defaultPushgatewayURL = ""
defaultTelegramBotToken = ""
defaultTelegramChatID = ""
defaultTimeoutSeconds = 10
defaultApiURL = ""
defaultWebhookURL = ""
defaultTextNotifySuccess = false
defaultMachineNotifySuccess = false
)
// ErrNoSourceProvided is returned when no source of sockets is specified.
var ErrNoSourceProvided = errors.New("no source provided")
// defineFlags defines flags on the provided FlagSet. The values of the flags are stored in the provided Config when parsed.
func defineFlags(fs *flag.FlagSet, cfg *Config) {
// System flags
fs.StringVar(&cfg.InstanceName, "name", defaultInstanceName, "a string, dish instance name")
fs.UintVar(&cfg.TimeoutSeconds, "timeout", defaultTimeoutSeconds, "an int, timeout in seconds for http and tcp calls")
fs.BoolVar(&cfg.Verbose, "verbose", defaultVerbose, "a bool, console stdout logging toggle, output is colored unless disabled by NO_COLOR=true environment variable")
// Integration channels flags
//
// General:
fs.BoolVar(&cfg.TextNotifySuccess, "textNotifySuccess", defaultTextNotifySuccess, "a bool, specifies whether successful checks with no failures should be reported to text channels")
fs.BoolVar(&cfg.MachineNotifySuccess, "machineNotifySuccess", defaultMachineNotifySuccess, "a bool, specifies whether successful checks with no failures should be reported to machine channels")
// API socket source:
fs.StringVar(&cfg.ApiHeaderName, "hname", defaultApiHeaderName, "a string, name of a custom additional header to be used when fetching and pushing results to the remote API (used mainly for auth purposes)")
fs.StringVar(&cfg.ApiHeaderValue, "hvalue", defaultApiHeaderValue, "a string, value of the custom additional header to be used when fetching and pushing results to the remote API (used mainly for auth purposes)")
fs.BoolVar(&cfg.ApiCacheSockets, "cache", defaultApiCacheSockets, "a bool, specifies whether to cache the socket list fetched from the remote API source")
fs.StringVar(&cfg.ApiCacheDirectory, "cacheDir", defaultApiCacheDir, "a string, specifies the directory used to cache the socket list fetched from the remote API source")
fs.UintVar(&cfg.ApiCacheTTLMinutes, "cacheTTL", defaultApiCacheTTLMinutes, "an int, time duration (in minutes) for which the cached list of sockets is valid")
// Pushgateway:
fs.StringVar(&cfg.PushgatewayURL, "target", defaultPushgatewayURL, "a string, result update path/URL to pushgateway, plaintext/byte output")
// Telegram:
fs.StringVar(&cfg.TelegramBotToken, "telegramBotToken", defaultTelegramBotToken, "a string, Telegram bot private token")
fs.StringVar(&cfg.TelegramChatID, "telegramChatID", defaultTelegramChatID, "a string, Telegram chat/channel ID")
// API for pushing results:
fs.StringVar(&cfg.ApiURL, "updateURL", defaultApiURL, "a string, API endpoint URL for pushing results")
// Webhooks:
fs.StringVar(&cfg.WebhookURL, "webhookURL", defaultWebhookURL, "a string, URL of webhook endpoint")
}
// NewConfig returns a new instance of Config.
//
// If a flag is used for a supported config parameter, the config parameter's value is set according to the provided flag. Otherwise, a default value is used for the given parameter.
func NewConfig(fs *flag.FlagSet, args []string) (*Config, error) {
if fs == nil {
//fs = flag.CommandLine
return nil, fmt.Errorf("flagset argument cannot be nil")
}
cfg := &Config{
InstanceName: defaultInstanceName,
ApiHeaderName: defaultApiHeaderName,
ApiHeaderValue: defaultApiHeaderValue,
ApiCacheSockets: defaultApiCacheSockets,
ApiCacheDirectory: defaultApiCacheDir,
ApiCacheTTLMinutes: defaultApiCacheTTLMinutes,
Verbose: defaultVerbose,
PushgatewayURL: defaultPushgatewayURL,
TelegramBotToken: defaultTelegramBotToken,
TelegramChatID: defaultTelegramChatID,
TimeoutSeconds: defaultTimeoutSeconds,
ApiURL: defaultApiURL,
WebhookURL: defaultWebhookURL,
}
defineFlags(fs, cfg)
if err := fs.Parse(args); err != nil {
return nil, fmt.Errorf("error parsing flags: %w", err)
}
parsedArgs := fs.Args()
// If no source is provided, return an error
if len(parsedArgs) == 0 {
return nil, ErrNoSourceProvided
}
// Otherwise, store the source in the config
cfg.Source = parsedArgs[0]
return cfg, nil
}
package logger
import (
"fmt"
"io"
"log"
"os"
)
// consoleLogger logs to the output provided when instantiating it via NewConsoleLogger.
type consoleLogger struct {
stdLogger *log.Logger
logLevel logLevel
withColors bool
}
var defaultOut = os.Stderr
// NewConsoleLogger creates a new ConsoleLogger instance logging to the provided output.
// If the output is not specified (nil), it logs to stderr by default.
//
// If verbose is true, log level is set to TRACE (otherwise to INFO).
func NewConsoleLogger(verbose bool, out io.Writer) *consoleLogger {
if out == nil {
out = defaultOut
}
l := &consoleLogger{
stdLogger: log.New(out, "", log.LstdFlags),
withColors: !(os.Getenv("NO_COLOR") == "true") && verbose,
}
l.logLevel = INFO
if verbose {
l.logLevel = TRACE
}
return l
}
// log prints a message if the current log level allows it.
// It adds the passed prefix and formats the output if a format string is passed.
func (l *consoleLogger) log(level logLevel, prefix string, format string, v ...any) {
if l.logLevel > level {
return
}
msg := prefix + fmt.Sprint(v...)
if format != "" {
msg = prefix + fmt.Sprintf(format, v...)
}
l.stdLogger.Print(msg)
if level == PANIC {
panic(msg)
}
}
func (l *consoleLogger) Trace(v ...any) {
l.log(TRACE, TRACE.Prefix(l.withColors), "", v...)
}
func (l *consoleLogger) Tracef(f string, v ...any) {
l.log(TRACE, TRACE.Prefix(l.withColors), f, v...)
}
func (l *consoleLogger) Debug(v ...any) {
l.log(DEBUG, DEBUG.Prefix(l.withColors), "", v...)
}
func (l *consoleLogger) Debugf(f string, v ...any) {
l.log(DEBUG, DEBUG.Prefix(l.withColors), f, v...)
}
func (l *consoleLogger) Info(v ...any) {
l.log(INFO, INFO.Prefix(l.withColors), "", v...)
}
func (l *consoleLogger) Infof(f string, v ...any) {
l.log(INFO, INFO.Prefix(l.withColors), f, v...)
}
func (l *consoleLogger) Warn(v ...any) {
l.log(WARN, WARN.Prefix(l.withColors), "", v...)
}
func (l *consoleLogger) Warnf(f string, v ...any) {
l.log(WARN, WARN.Prefix(l.withColors), f, v...)
}
func (l *consoleLogger) Error(v ...any) {
l.log(ERROR, ERROR.Prefix(l.withColors), "", v...)
}
func (l *consoleLogger) Errorf(f string, v ...any) {
l.log(ERROR, ERROR.Prefix(l.withColors), f, v...)
}
func (l *consoleLogger) Panic(v ...any) {
l.log(PANIC, PANIC.Prefix(l.withColors), "", v...)
}
func (l *consoleLogger) Panicf(f string, v ...any) {
l.log(PANIC, PANIC.Prefix(l.withColors), f, v...)
}
// Package logger provides a logging interface and implementations for formatted log output.
package logger
import "fmt"
// LogLevel specifies a level from which logs are printed.
type logLevel int32
const (
TRACE logLevel = iota
DEBUG
INFO
WARN
ERROR
PANIC
)
const logPrefixFormat = "%s[ %s ]%s: "
var logColors = map[logLevel]string{
TRACE: "\033[34m", // Blue
DEBUG: "\033[36m", // Cyan
INFO: "\033[32m", // Green
WARN: "\033[33m", // Yellow
ERROR: "\033[31m", // Red
PANIC: "\033[35m", // Magenta
}
var logLabel = map[logLevel]string{
TRACE: "TRACE",
DEBUG: "DEBUG",
INFO: "INFO",
WARN: "WARN",
ERROR: "ERROR",
PANIC: "PANIC",
}
func (l logLevel) Color() string {
if color, exists := logColors[l]; exists {
return color
}
return "\033[0m" // Default color (reset)
}
func (l logLevel) Prefix(withColor bool) string {
label, labelExists := logLabel[l]
if !labelExists {
return "[ UNKNOWN ]: "
}
colorStart, colorReset := "", ""
if withColor {
colorStart = l.Color()
colorReset = "\033[0m"
}
return fmt.Sprintf(logPrefixFormat, colorStart, label, colorReset)
}
// Logger interface defines methods for logging at various levels.
type Logger interface {
Trace(v ...any)
Tracef(format string, v ...any)
Debug(v ...any)
Debugf(format string, v ...any)
Info(v ...any)
Infof(format string, v ...any)
Warn(v ...any)
Warnf(format string, v ...any)
Error(v ...any)
Errorf(format string, v ...any)
Panic(v ...any)
Panicf(format string, v ...any)
}
// Package netrunner provides functionality for checking the availability of sockets and/or endpoints.
// It provides tcpRunner, httpRunner and icmpRunner structs implementing the NetRunner interface, which can be used to
// run checks on the provided targets.
package netrunner
import (
"context"
"fmt"
"net"
"net/http"
"regexp"
"slices"
"strconv"
"sync"
"time"
"go.vxn.dev/dish/pkg/config"
"go.vxn.dev/dish/pkg/logger"
"go.vxn.dev/dish/pkg/socket"
)
const agentVersion = "1.11"
// RunSocketTest is intended to be invoked in a separate goroutine.
// It runs a test for the given socket and sends the result through the given channel.
// If the test fails to start, the error is logged to STDOUT and no result is
// sent. On return, Done() is called on the WaitGroup and the channel is closed.
func RunSocketTest(sock socket.Socket, out chan<- socket.Result, wg *sync.WaitGroup, cfg *config.Config, logger logger.Logger) {
defer wg.Done()
defer close(out)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.TimeoutSeconds)*time.Second)
defer cancel()
runner, err := NewNetRunner(sock, logger)
if err != nil {
logger.Errorf("failed to test socket: %v", err.Error())
return
}
out <- runner.RunTest(ctx, sock)
}
// NetRunner is used to run tests for a socket.
type NetRunner interface {
RunTest(ctx context.Context, sock socket.Socket) socket.Result
}
// NewNetRunner determines the protocol used for the socket test and creates a
// new NetRunner for it.
//
// Rules for the test method determination (first matching rule applies):
// - If socket.Host starts with 'http://' or 'https://', a HTTP runner is returned.
// - If socket.Port is between 1 and 65535, a TCP runner is returned.
// - If socket.Host is not empty, an ICMP runner is returned.
// - If none of the above conditions are met, a non-nil error is returned.
func NewNetRunner(sock socket.Socket, logger logger.Logger) (NetRunner, error) {
exp, err := regexp.Compile("^(http|https)://")
if err != nil {
return nil, fmt.Errorf("regex compilation failed: %w", err)
}
if exp.MatchString(sock.Host) {
return &httpRunner{client: &http.Client{}, logger: logger}, nil
}
if sock.Port >= 1 && sock.Port <= 65535 {
return &tcpRunner{logger: logger}, nil
}
if sock.Host != "" {
return &icmpRunner{logger: logger}, nil
}
return nil, fmt.Errorf("no protocol could be determined from the socket %s", sock.ID)
}
type tcpRunner struct {
logger logger.Logger
}
// RunTest is used to test TCP sockets. It opens a TCP connection with the given socket.
// The test passes if the connection is successfully opened with no errors.
func (runner *tcpRunner) RunTest(ctx context.Context, sock socket.Socket) socket.Result {
endpoint := net.JoinHostPort(sock.Host, strconv.Itoa(sock.Port))
runner.logger.Debug("TCP runner: connect: " + endpoint)
d := net.Dialer{}
conn, err := d.DialContext(ctx, "tcp", endpoint)
if err != nil {
return socket.Result{Socket: sock, Error: err, Passed: false}
}
defer conn.Close()
return socket.Result{Socket: sock, Passed: true}
}
type httpRunner struct {
client *http.Client
logger logger.Logger
}
// RunTest is used to test HTTP/S endpoints exclusively. It executes a HTTP GET
// request to the given socket. The test passes if the request did not end with
// an error and the response status matches the expected HTTP codes.
func (runner *httpRunner) RunTest(ctx context.Context, sock socket.Socket) socket.Result {
url := sock.Host + ":" + strconv.Itoa(sock.Port) + sock.PathHTTP
runner.logger.Debug("HTTP runner: connect:", url)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return socket.Result{Socket: sock, Passed: false, Error: err}
}
req.Header.Set("User-Agent", fmt.Sprintf("dish/%s", agentVersion))
resp, err := runner.client.Do(req)
if err != nil {
return socket.Result{Socket: sock, Passed: false, Error: err}
}
defer resp.Body.Close()
if !slices.Contains(sock.ExpectedHTTPCodes, resp.StatusCode) {
err = fmt.Errorf("expected codes: %v, got %d", sock.ExpectedHTTPCodes, resp.StatusCode)
}
return socket.Result{
Socket: sock,
Passed: slices.Contains(sock.ExpectedHTTPCodes, resp.StatusCode),
ResponseCode: resp.StatusCode,
Error: err,
}
}
//go:build linux || darwin
package netrunner
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"net"
"runtime"
"syscall"
"time"
"go.vxn.dev/dish/pkg/logger"
"go.vxn.dev/dish/pkg/socket"
)
type ICMPType int
const (
echoReply ICMPType = 0
echoRequest ICMPType = 8
)
const ipStripHdr = 23
const testID = 0x1234
const testSeq = 0x0001
type icmpRunner struct {
logger logger.Logger
}
// RunTest is used to test ICMP sockets. It sends an ICMP Echo Request to the given socket using
// non-privileged ICMP and verifies the reply. The test passes if the reply has the same payload
// as the request. Returns an error if the socket host cannot be resolved to an IPv4 address. If
// the host resolves to more than one address, only the first one is used.
func (runner *icmpRunner) RunTest(ctx context.Context, sock socket.Socket) socket.Result {
runner.logger.Debugf("Resolving host '%s' to an IP address", sock.Host)
addr, err := net.DefaultResolver.LookupIPAddr(ctx, sock.Host)
if err != nil {
return socket.Result{Socket: sock, Error: fmt.Errorf("failed to resolve socket host: %w", err)}
}
ip := addr[0].IP.To4()
if ip == nil {
return socket.Result{Socket: sock, Error: errors.New("not a valid IPv4 address")}
}
sockAddr := &syscall.SockaddrInet4{Addr: [4]byte(ip)}
// When using ICMP over DGRAM, Linux Kernel automatically sets (overwrites) and
// validates the id, seq and checksum of each incoming and outgoing ICMP message.
// This is largely non-documented in the linux man pages. The closest I found is:
// - (Linux news) lwn.net/Articles/420800/
// - (MacOS man) https://www.manpagez.com/man/4/icmp/
// - (Third-party article) https://inc0x0.com/icmp-ip-packets-ping-manually-create-and-send-icmp-ip-packets/
// "[...] most Linux systems use a unique identifier for every ping process, and sequence
// number is an increasing number within that process. Windows uses a fixed identifier, which
// varies between Windows versions, and a sequence number that is only reset at boot time."
sysSocket, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_DGRAM, syscall.IPPROTO_ICMP)
if err != nil {
return socket.Result{Socket: sock, Error: fmt.Errorf("failed to create a non-privileged icmp socket: %w", err)}
}
defer syscall.Close(sysSocket)
if runtime.GOOS == "darwin" {
if err := syscall.SetsockoptInt(sysSocket, syscall.IPPROTO_IP, ipStripHdr, 1); err != nil {
return socket.Result{Socket: sock, Error: fmt.Errorf("failed to set ip strip header: %w", err)}
}
}
if d, ok := ctx.Deadline(); ok {
// Set a socket receive timeout.
t := syscall.NsecToTimeval(time.Until(d).Nanoseconds())
if err := syscall.SetsockoptTimeval(sysSocket, syscall.SOL_SOCKET, syscall.SO_RCVTIMEO, &t); err != nil {
return socket.Result{Socket: sock, Error: fmt.Errorf("failed to set a timeout on a non-privileged icmp socket: %w", err)}
}
}
payload := []byte("ICMP echo")
// ICMP Header size is 8 bytes.
reqBuf := make([]byte, 8+len(payload))
// ICMP Header.
// ID, Seq and Checksum are filled in automatically by the kernel on linux machines, not on darwin ipv4
reqBuf[0] = byte(echoRequest) // Type: Echo
copy(reqBuf[8:], payload)
// Set the ID, Seq and Checksum for the darwin based machines
if runtime.GOOS == "darwin" {
binary.BigEndian.PutUint16(reqBuf[4:6], testID)
binary.BigEndian.PutUint16(reqBuf[6:8], testSeq)
csum := checksum(reqBuf)
reqBuf[2] ^= byte(csum)
reqBuf[3] ^= byte(csum >> 8)
}
runner.logger.Debug("ICMP runner: send to " + ip.String())
if err := syscall.Sendto(sysSocket, reqBuf, 0, sockAddr); err != nil {
return socket.Result{Socket: sock, Error: fmt.Errorf("failed to send an echo request: %w", err)}
}
// Maximum Transmission Unit (MTU) equals 1500 bytes.
// Recvfrom before writing to the buffer, checks its length (not capacity).
// If the length of the buffer is too small to fit the data then it's silently truncated.
replyBuf := make([]byte, 1500)
runner.logger.Debug("ICMP runner: recv from " + ip.String())
n, _, err := syscall.Recvfrom(sysSocket, replyBuf, 0)
if err != nil {
return socket.Result{Socket: sock, Error: fmt.Errorf("failed to receive a reply from a socket: %w", err)}
}
if n < 8 {
return socket.Result{Socket: sock, Error: fmt.Errorf("reply is too short: received %d bytes ", n)}
}
if replyBuf[0] != byte(echoReply) {
return socket.Result{Socket: sock, Error: errors.New("received unexpected reply type")}
}
if !bytes.Equal(reqBuf[8:], replyBuf[8:n]) {
return socket.Result{Socket: sock, Error: errors.New("failed to validate echo reply: payloads are not equal")}
}
return socket.Result{Socket: sock, Passed: true}
}
// checksum calculates the internet checksum for the given byte slice.
// This function was taken from the x/net/icmp package, which is not available in the standard library.
// https://godoc.org/golang.org/x/net/icmp
func checksum(b []byte) uint16 {
csumcv := len(b) - 1 // checksum coverage
s := uint32(0)
for i := 0; i < csumcv; i += 2 {
s += uint32(b[i+1])<<8 | uint32(b[i])
}
if csumcv&1 == 0 {
s += uint32(b[csumcv])
}
s = s>>16 + s&0xffff
s = s + s>>16
return ^uint16(s)
}
package socket
import (
"crypto/sha1"
"encoding/hex"
"errors"
"io"
"os"
"path/filepath"
"time"
)
var ErrExpiredCache error = errors.New("cache file for this source is outdated")
// hashUrlToFilePath hashes given URL to create cache file path.
func hashUrlToFilePath(url string, cacheDir string) string {
hash := sha1.Sum([]byte(url))
filename := hex.EncodeToString(hash[:]) + ".json"
return filepath.Join(cacheDir, filename)
}
// saveSocketsToCache caches socket data to specified file in cache directory.
func saveSocketsToCache(filePath string, cacheDir string, data []byte) error {
// Make sure that cache directory exists
if err := os.MkdirAll(cacheDir, 0o600); err != nil {
return err
}
return os.WriteFile(filePath, data, 0o600)
}
// loadCachedSockets checks whether the cache is valid (not expired) and the returns the data stream and ModTime of the cache.
func loadCachedSockets(filePath string, cacheTTL uint) (io.ReadCloser, time.Time, error) {
info, err := os.Stat(filePath)
if err != nil {
return nil, time.Time{}, err
}
reader, err := os.Open(filePath)
if err != nil {
return nil, time.Time{}, err
}
cacheTime := info.ModTime()
if time.Since(cacheTime) > time.Duration(cacheTTL)*time.Minute {
return reader, cacheTime, ErrExpiredCache
}
return reader, cacheTime, nil
}
package socket
import (
"bytes"
"fmt"
"io"
"net/http"
"os"
"time"
"go.vxn.dev/dish/pkg/config"
"go.vxn.dev/dish/pkg/logger"
)
// fetchHandler provides methods to fetch sockets either from a file or from a remote API source.
type fetchHandler struct {
logger logger.Logger
}
// NewFetchHandler creates a new instance of fetchHandler.
func NewFetchHandler(l logger.Logger) *fetchHandler {
return &fetchHandler{
logger: l,
}
}
// fetchSocketsFromFile opens a file and returns [io.ReadCloser] for reading from the stream.
func (f *fetchHandler) fetchSocketsFromFile(config *config.Config) (io.ReadCloser, error) {
file, err := os.Open(config.Source)
if err != nil {
return nil, err
}
f.logger.Debugf("fetching sockets from file (%s)", config.Source)
return file, nil
}
// copyBody copies the provided response body to the provided buffer. The body is closed.
func (f *fetchHandler) copyBody(body io.ReadCloser, buf *bytes.Buffer) error {
defer body.Close()
_, err := buf.ReadFrom(body)
return err
}
// fetchSocketsFromRemote loads the sockets to be monitored from a remote RESTful API endpoint. It returns the response body implementing [io.ReadCloser] for reading from and closing the stream.
//
// It uses a local cache if enabled and falls back to the network if the cache is not present or expired. If the network request fails and expired cache is available, it will be used.
//
// The url parameter must be a complete URL to a remote http/s server, including:
// - Scheme (http:// or https://)
// - Host (domain or IP)
// - Optional port
// - Optional path
// - Optional query parameters
//
// Example url: http://api.example.com:5569/stream?query=variable
func (f *fetchHandler) fetchSocketsFromRemote(config *config.Config) (io.ReadCloser, error) {
cacheFilePath := hashUrlToFilePath(config.Source, config.ApiCacheDirectory)
// If we do not want to cache sockets to the file, fetch from network
if !config.ApiCacheSockets {
return f.loadFreshSockets(config)
}
// If cache is enabled, try to load sockets from it first
cachedReader, cacheTime, err := loadCachedSockets(cacheFilePath, config.ApiCacheTTLMinutes)
// If cache is expired or fails to load, attempt to fetch fresh sockets
if err != nil {
f.logger.Warnf("cache unavailable for URL: %s (reason: %v); attempting network fetch", config.Source, err)
// Fetch fresh sockets from network
respBody, fetchErr := f.loadFreshSockets(config)
if fetchErr != nil {
// If the fetch fails and expired cache is not available, return the fetch error
if err != ErrExpiredCache {
return nil, fetchErr
}
// If the fetch fails and expired cache is available, return the expired cache and log a warning
f.logger.Errorf("fetching socket list from remote API at %s failed: %v.", config.Source, fetchErr)
f.logger.Warnf("using expired cache from %s", cacheTime.Format(time.RFC3339))
return cachedReader, nil
} else {
f.logger.Infof("socket list fetched from %s", config.Source)
}
var buf bytes.Buffer
err = f.copyBody(respBody, &buf)
if err != nil {
return nil, fmt.Errorf("failed to copy response body: %w", err)
}
if err := saveSocketsToCache(cacheFilePath, config.ApiCacheDirectory, buf.Bytes()); err != nil {
f.logger.Warnf("failed to save fetched sockets to cache: %v", err)
}
return io.NopCloser(bytes.NewReader(buf.Bytes())), nil
}
// Cache is valid (not expired, no error from file read)
f.logger.Info("socket list fetched from cache")
return cachedReader, err
}
// loadFreshSockets fetches fresh sockets from the remote source.
func (f *fetchHandler) loadFreshSockets(config *config.Config) (io.ReadCloser, error) {
req, err := http.NewRequest(http.MethodGet, config.Source, nil)
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
client := &http.Client{}
req.Header.Set("Content-Type", "application/json")
if config.ApiHeaderName != "" && config.ApiHeaderValue != "" {
req.Header.Set(config.ApiHeaderName, config.ApiHeaderValue)
}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("network request failed: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to fetch sockets from remote source --- got %d (%s)", resp.StatusCode, resp.Status)
}
return resp.Body, nil
}
// Package socket provides functionality related to handling sockets, which is a structure
// representing the target endpoint/socket to be checked.
package socket
import (
"encoding/json"
"fmt"
"io"
"go.vxn.dev/dish/pkg/config"
"go.vxn.dev/dish/pkg/logger"
)
type Result struct {
Socket Socket
Passed bool
ResponseCode int
Error error
}
type SocketList struct {
Sockets []Socket `json:"sockets"`
}
type Socket struct {
// ID is an unique identifier of such socket.
ID string `json:"id"`
// Socket name, unique identificator, snake_cased.
Name string `json:"socket_name"`
// Remote endpoint hostname or URL.
Host string `json:"host_name"`
// Remote port to assemble a socket.
Port int `json:"port_tcp"`
// HTTP Status Codes expected when giving the endpoint a HEAD/GET request.
ExpectedHTTPCodes []int `json:"expected_http_code_array"`
// HTTP Path to test on Host.
PathHTTP string `json:"path_http"`
}
// PrintSockets prints SocketList.
func PrintSockets(list *SocketList, logger logger.Logger) {
logger.Debug("loaded sockets:")
for _, socket := range list.Sockets {
logger.Debugf("Host: %s, Port: %d, ExpectedHTTPCodes: %v", socket.Host, socket.Port, socket.ExpectedHTTPCodes)
}
}
// LoadSocketList decodes a JSON encoded SocketList from the provided io.ReadCloser.
func LoadSocketList(reader io.ReadCloser) (*SocketList, error) {
defer reader.Close()
list := new(SocketList)
if err := json.NewDecoder(reader).Decode(list); err != nil {
return nil, fmt.Errorf("error decoding sockets json: %w", err)
}
return list, nil
}
// FetchSocketList fetches the list of sockets to be checked. 'input' should be a string like '/path/filename.json', or an HTTP URL string.
func FetchSocketList(config *config.Config, logger logger.Logger) (*SocketList, error) {
var reader io.ReadCloser
var err error
fetchHandler := NewFetchHandler(logger)
if IsFilePath(config.Source) {
reader, err = fetchHandler.fetchSocketsFromFile(config)
} else {
reader, err = fetchHandler.fetchSocketsFromRemote(config)
}
if err != nil {
return nil, err
}
return LoadSocketList(reader)
}
package socket
import "regexp"
// IsFilePath checks whether input is a file path or URL.
func IsFilePath(source string) bool {
matched, _ := regexp.MatchString("^(http|https)://", source)
return !matched
}