package main
import (
"context"
"fmt"
"io"
"github.com/spf13/cobra"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"github.com/gerrowadat/nomad-botherer/internal/grpcapi"
)
func newDiffsCmd(cfg *rootConfig) *cobra.Command {
return &cobra.Command{
Use: "diffs",
Short: "Show current job diffs",
RunE: func(cmd *cobra.Command, _ []string) error {
client, close, err := cfg.dial()
if err != nil {
return err
}
defer close()
ctx, cancel := context.WithTimeout(context.Background(), cfg.timeout)
defer cancel()
resp, err := client.GetDiffs(ctx, &grpcapi.GetDiffsRequest{})
if err != nil {
return fmt.Errorf("GetDiffs: %w", err)
}
return printOutput(cmd.OutOrStdout(), cfg.outFmt, resp, func(w io.Writer) {
if len(resp.Diffs) == 0 {
fmt.Fprintln(w, "no diffs detected")
} else {
fmt.Fprintf(w, "%d diff(s) detected\n", len(resp.Diffs))
}
if resp.LastCheckTime != "" {
fmt.Fprintf(w, "last check: %s\n", resp.LastCheckTime)
}
if resp.LastCommit != "" {
fmt.Fprintf(w, "last commit: %s\n", resp.LastCommit)
}
for _, d := range resp.Diffs {
fmt.Fprintln(w)
if d.HclFile != "" {
fmt.Fprintf(w, "[%s] %s (%s)\n", d.DiffType, d.JobId, d.HclFile)
} else {
fmt.Fprintf(w, "[%s] %s\n", d.DiffType, d.JobId)
}
if d.Detail != "" {
fmt.Fprintf(w, " %s\n", d.Detail)
}
}
})
},
}
}
func newStatusCmd(cfg *rootConfig) *cobra.Command {
return &cobra.Command{
Use: "status",
Short: "Show git watcher status",
RunE: func(cmd *cobra.Command, _ []string) error {
client, close, err := cfg.dial()
if err != nil {
return err
}
defer close()
ctx, cancel := context.WithTimeout(context.Background(), cfg.timeout)
defer cancel()
resp, err := client.GetStatus(ctx, &grpcapi.GetStatusRequest{})
if err != nil {
return fmt.Errorf("GetStatus: %w", err)
}
return printOutput(cmd.OutOrStdout(), cfg.outFmt, resp, func(w io.Writer) {
commit := resp.LastCommit
if commit == "" {
commit = "(none)"
}
fmt.Fprintf(w, "last commit: %s\n", commit)
updated := resp.LastUpdateTime
if updated == "" {
updated = "(none)"
}
fmt.Fprintf(w, "last updated: %s\n", updated)
})
},
}
}
func newRefreshCmd(cfg *rootConfig) *cobra.Command {
return &cobra.Command{
Use: "refresh",
Short: "Trigger an immediate git pull and diff check",
RunE: func(cmd *cobra.Command, _ []string) error {
client, close, err := cfg.dial()
if err != nil {
return err
}
defer close()
ctx, cancel := context.WithTimeout(context.Background(), cfg.timeout)
defer cancel()
resp, err := client.TriggerRefresh(ctx, &grpcapi.TriggerRefreshRequest{})
if err != nil {
return fmt.Errorf("TriggerRefresh: %w", err)
}
return printOutput(cmd.OutOrStdout(), cfg.outFmt, resp, func(w io.Writer) {
fmt.Fprintln(w, resp.Message)
})
},
}
}
func newVersionCmd(cfg *rootConfig) *cobra.Command {
return &cobra.Command{
Use: "version",
Short: "Show the server's build version",
RunE: func(cmd *cobra.Command, _ []string) error {
client, close, err := cfg.dial()
if err != nil {
return err
}
defer close()
ctx, cancel := context.WithTimeout(context.Background(), cfg.timeout)
defer cancel()
resp, err := client.GetVersion(ctx, &grpcapi.GetVersionRequest{})
if err != nil {
return fmt.Errorf("GetVersion: %w", err)
}
return printOutput(cmd.OutOrStdout(), cfg.outFmt, resp, func(w io.Writer) {
fmt.Fprintf(w, "version: %s\n", resp.Version)
fmt.Fprintf(w, "commit: %s\n", resp.Commit)
fmt.Fprintf(w, "build date: %s\n", resp.BuildDate)
})
},
}
}
// printOutput writes the response to w. In "json" mode it serialises the
// proto message using protojson; otherwise it calls textFn.
func printOutput(w io.Writer, format string, msg proto.Message, textFn func(io.Writer)) error {
if format == "json" {
b, err := protojson.MarshalOptions{Indent: " ", UseProtoNames: true}.Marshal(msg)
if err != nil {
return fmt.Errorf("marshalling json: %w", err)
}
fmt.Fprintln(w, string(b))
return nil
}
textFn(w)
return nil
}
package main
import "os"
func main() {
if err := newRootCmd().Execute(); err != nil {
os.Exit(1)
}
}
package main
import (
"context"
"crypto/tls"
"fmt"
"os"
"time"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"github.com/gerrowadat/nomad-botherer/internal/grpcapi"
)
// Injected at build time via -ldflags.
var version = "dev"
type rootConfig struct {
server string
apiKey string
timeout time.Duration
outFmt string
useTLS bool
}
func newRootCmd() *cobra.Command {
cfg := &rootConfig{}
root := &cobra.Command{
Use: "nbctl",
Short: "Query and control a nomad-botherer instance via gRPC",
Version: version,
SilenceUsage: true,
SilenceErrors: true,
}
root.PersistentFlags().StringVarP(&cfg.server, "server", "s",
envOrDefault("NBCTL_SERVER", "localhost:9090"),
"gRPC server address (env: NBCTL_SERVER)")
root.PersistentFlags().StringVarP(&cfg.apiKey, "api-key", "k", "",
"API key (env: NBCTL_API_KEY)")
root.PersistentFlags().DurationVar(&cfg.timeout, "timeout", 10*time.Second,
"per-request timeout")
root.PersistentFlags().StringVarP(&cfg.outFmt, "output", "o", "text",
"output format: text or json")
root.PersistentFlags().BoolVar(&cfg.useTLS, "tls", false,
"use TLS for the gRPC connection")
root.AddCommand(
newDiffsCmd(cfg),
newStatusCmd(cfg),
newRefreshCmd(cfg),
newVersionCmd(cfg),
)
return root
}
// dial resolves the API key, opens a gRPC connection, and returns a client
// plus a closer. The caller must call close() when done.
func (cfg *rootConfig) dial() (grpcapi.NomadBothererClient, func(), error) {
key := cfg.apiKey
if key == "" {
key = os.Getenv("NBCTL_API_KEY")
}
if key == "" {
return nil, nil, fmt.Errorf("API key required: set --api-key or NBCTL_API_KEY")
}
var tc credentials.TransportCredentials
if cfg.useTLS {
tc = credentials.NewTLS(&tls.Config{})
} else {
tc = insecure.NewCredentials()
}
conn, err := grpc.NewClient(cfg.server,
grpc.WithTransportCredentials(tc),
grpc.WithPerRPCCredentials(bearerCreds{key: key}),
)
if err != nil {
return nil, nil, fmt.Errorf("connecting to %s: %w", cfg.server, err)
}
return grpcapi.NewNomadBothererClient(conn), func() { conn.Close() }, nil
}
// bearerCreds injects the API key into every outgoing RPC as a Bearer token.
type bearerCreds struct{ key string }
func (b bearerCreds) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) {
return map[string]string{"authorization": "Bearer " + b.key}, nil
}
// RequireTransportSecurity returns false so the token can be sent over a
// plaintext connection (expected to be secured at the network layer).
func (b bearerCreds) RequireTransportSecurity() bool { return false }
func envOrDefault(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
package main
import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
"github.com/joho/godotenv"
"github.com/gerrowadat/nomad-botherer/internal/config"
"github.com/gerrowadat/nomad-botherer/internal/gitwatch"
"github.com/gerrowadat/nomad-botherer/internal/grpcserver"
"github.com/gerrowadat/nomad-botherer/internal/nomad"
"github.com/gerrowadat/nomad-botherer/internal/server"
)
// Injected at build time via -ldflags.
var (
version = "dev"
commit = "unknown"
buildDate = "unknown"
)
func main() {
// Load .env for local development. Non-fatal if the file is absent.
if err := godotenv.Load(); err != nil && !os.IsNotExist(err) {
slog.Warn("Error loading .env file", "err", err)
}
cfg, err := config.Load()
if err != nil {
slog.Error("Loading config", "err", err)
os.Exit(1)
}
setupLogging(cfg.LogLevel)
slog.Info("Starting nomad-botherer", "version", version, "commit", commit, "buildDate", buildDate)
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
differ, err := nomad.NewDiffer(cfg)
if err != nil {
slog.Error("Creating Nomad differ", "err", err)
os.Exit(1)
}
// onChange is called by the watcher whenever the branch HEAD advances.
// We close over watcher, which is set below before Run is called.
var watcher *gitwatch.Watcher
onChange := func(newCommit string) {
hclFiles, err := watcher.ReadHCLFiles()
if err != nil {
slog.Error("Reading HCL files from repo", "err", err)
return
}
if err := differ.Check(hclFiles, newCommit); err != nil {
slog.Error("Running diff check", "err", err)
}
}
watcher = gitwatch.New(cfg, onChange)
if err := watcher.Clone(ctx); err != nil {
slog.Error("Cloning repository", "err", err)
os.Exit(1)
}
// Run an initial diff check immediately after clone.
onChange(watcher.LastCommit())
// Periodic diff checks independent of git changes (catches Nomad-side drift).
go func() {
ticker := time.NewTicker(cfg.DiffInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
commit, _ := watcher.Status()
hclFiles, err := watcher.ReadHCLFiles()
if err != nil {
slog.Error("Reading HCL files for periodic check", "err", err)
continue
}
if err := differ.Check(hclFiles, commit); err != nil {
slog.Error("Periodic diff check failed", "err", err)
}
}
}
}()
// Git staleness checker: triggers a fetch when the repo has not been
// successfully fetched within MaxGitStaleness. Disabled when zero.
if cfg.MaxGitStaleness > 0 {
go func() {
checkInterval := cfg.MaxGitStaleness / 2
if checkInterval < 10*time.Second {
checkInterval = 10 * time.Second
}
ticker := time.NewTicker(checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_, lastGitUpdate := watcher.Status()
if !lastGitUpdate.IsZero() && time.Since(lastGitUpdate) > cfg.MaxGitStaleness {
slog.Info("Git repo is stale, triggering refresh", "age", time.Since(lastGitUpdate), "max", cfg.MaxGitStaleness)
watcher.TriggerStale()
}
}
}
}()
}
// Nomad staleness checker: forces a diff check when Nomad state has not
// been checked within MaxNomadStaleness. Disabled when zero.
if cfg.MaxNomadStaleness > 0 {
go func() {
checkInterval := cfg.MaxNomadStaleness / 2
if checkInterval < 10*time.Second {
checkInterval = 10 * time.Second
}
ticker := time.NewTicker(checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_, lastNomadCheck, _ := differ.Diffs()
if !lastNomadCheck.IsZero() && time.Since(lastNomadCheck) > cfg.MaxNomadStaleness {
slog.Info("Nomad state is stale, forcing diff check", "age", time.Since(lastNomadCheck), "max", cfg.MaxNomadStaleness)
commit, _ := watcher.Status()
hclFiles, err := watcher.ReadHCLFiles()
if err != nil {
slog.Error("Reading HCL files for staleness check", "err", err)
continue
}
if err := differ.ForceCheck(hclFiles, commit); err != nil {
slog.Error("Staleness diff check failed", "err", err)
}
}
}
}
}()
}
// Watcher polls git and triggers onChange on new commits.
go watcher.Run(ctx)
// Start gRPC server if configured.
if cfg.GRPCListenAddr != "" {
if cfg.GRPCAPIKey == "" {
slog.Error("grpc-listen-addr is set but grpc-api-key is empty; refusing to start an unauthenticated gRPC server")
os.Exit(1)
}
grpcSrv := grpcserver.New(cfg.GRPCAPIKey, differ, watcher, grpcserver.BuildInfo{
Version: version,
Commit: commit,
BuildDate: buildDate,
})
// Listen before the goroutine so a bind failure is fatal, not just logged.
grpcLis, err := grpcSrv.Listen(cfg.GRPCListenAddr)
if err != nil {
slog.Error("gRPC server failed to bind", "addr", cfg.GRPCListenAddr, "err", err)
os.Exit(1)
}
go func() {
if err := grpcSrv.Serve(ctx, grpcLis); err != nil {
slog.Error("gRPC server error", "err", err)
}
}()
}
srv := server.New(cfg, differ, watcher, version)
if err := srv.Run(ctx); err != nil {
slog.Error("HTTP server error", "err", err)
os.Exit(1)
}
}
func setupLogging(level string) {
var l slog.Level
switch level {
case "debug":
l = slog.LevelDebug
case "warn":
l = slog.LevelWarn
case "error":
l = slog.LevelError
default:
l = slog.LevelInfo
}
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: l})))
}
package config
import (
"flag"
"fmt"
"os"
"strings"
"time"
)
type Config struct {
// Git
RepoURL string
Branch string
PollInterval time.Duration
HCLDir string
GitToken string
GitSSHKeyPath string
GitSSHKeyPass string
// Nomad
NomadAddr string
NomadToken string
NomadNamespace string
// Server
ListenAddr string
WebhookSecret string
WebhookPath string
// gRPC
GRPCListenAddr string
GRPCAPIKey string
// Diff
DiffInterval time.Duration
IncludeDeadJobs bool
// Job selection
JobSelectorGlob string
ManagedMetaPrefix string
// Staleness
MaxGitStaleness time.Duration
MaxNomadStaleness time.Duration
// Logging
LogLevel string
}
// Load parses flags from os.Args and falls back to environment variables.
func Load() (*Config, error) {
return LoadFromArgs(flag.CommandLine, os.Args[1:])
}
// LoadFromArgs registers flags on fs and parses args.
// Tests pass a fresh flag.NewFlagSet to avoid touching flag.CommandLine.
func LoadFromArgs(fs *flag.FlagSet, args []string) (*Config, error) {
c := &Config{}
fs.StringVar(&c.RepoURL, "repo-url", envOrDefault("GIT_REPO_URL", ""), "Remote git repo URL (required)")
fs.StringVar(&c.Branch, "branch", envOrDefault("GIT_BRANCH", "main"), "Branch to watch")
fs.DurationVar(&c.PollInterval, "poll-interval", envDurationOrDefault("POLL_INTERVAL", 5*time.Minute), "Git poll interval (e.g. 5m, 30s)")
fs.StringVar(&c.HCLDir, "hcl-dir", envOrDefault("HCL_DIR", ""), "Directory within repo containing HCL job files (empty = repo root)")
fs.StringVar(&c.GitToken, "git-token", envOrDefault("GIT_TOKEN", ""), "Git HTTP token for private repos (e.g. GitHub PAT)")
fs.StringVar(&c.GitSSHKeyPath, "git-ssh-key", envOrDefault("GIT_SSH_KEY", ""), "Path to SSH private key for git auth")
fs.StringVar(&c.GitSSHKeyPass, "git-ssh-key-password", envOrDefault("GIT_SSH_KEY_PASSWORD", ""), "SSH private key passphrase")
fs.StringVar(&c.NomadAddr, "nomad-addr", envOrDefault("NOMAD_ADDR", "http://127.0.0.1:4646"), "Nomad API address")
fs.StringVar(&c.NomadToken, "nomad-token", envOrDefault("NOMAD_TOKEN", ""), "Nomad ACL token")
fs.StringVar(&c.NomadNamespace, "nomad-namespace", envOrDefault("NOMAD_NAMESPACE", "default"), "Nomad namespace")
fs.StringVar(&c.ListenAddr, "listen-addr", envOrDefault("LISTEN_ADDR", ":8080"), "HTTP listen address")
fs.StringVar(&c.WebhookSecret, "webhook-secret", envOrDefault("WEBHOOK_SECRET", ""), "GitHub webhook HMAC secret")
fs.StringVar(&c.WebhookPath, "webhook-path", envOrDefault("WEBHOOK_PATH", "/webhook"), "HTTP path for webhook endpoint")
fs.StringVar(&c.GRPCListenAddr, "grpc-listen-addr", envOrDefault("GRPC_LISTEN_ADDR", ":9090"), "gRPC listen address (empty string disables gRPC)")
fs.StringVar(&c.GRPCAPIKey, "grpc-api-key", envOrDefault("GRPC_API_KEY", ""), "Pre-shared API key for gRPC authentication (required when gRPC is enabled)")
fs.DurationVar(&c.DiffInterval, "diff-interval", envDurationOrDefault("DIFF_INTERVAL", time.Minute), "How often to run a diff check regardless of git changes")
fs.BoolVar(&c.IncludeDeadJobs, "include-dead-jobs", envBoolOrDefault("INCLUDE_DEAD_JOBS", false), "Treat dead Nomad jobs like running ones (by default dead jobs are treated as missing)")
fs.StringVar(&c.JobSelectorGlob, "job-selector-glob", envOrDefault("JOB_SELECTOR_GLOB", ""), "Glob pattern selecting jobs by name (e.g. 'myprefix-*', '*' for all). Jobs matching either this or --managed-meta-prefix are watched. Empty means no glob selection.")
fs.StringVar(&c.ManagedMetaPrefix, "managed-meta-prefix", envOrDefault("MANAGED_META_PREFIX", "gitops"), "Prefix for job meta keys used by nomad-botherer (e.g. 'gitops' means 'gitops.managed' opts a job in). Empty disables meta-based selection.")
fs.DurationVar(&c.MaxGitStaleness, "max-git-staleness", envDurationOrDefault("MAX_GIT_STALENESS", 0), "Maximum time since last successful git fetch before forcing a refresh (0 disables)")
fs.DurationVar(&c.MaxNomadStaleness, "max-nomad-staleness", envDurationOrDefault("MAX_NOMAD_STALENESS", 0), "Maximum time since last successful Nomad diff check before forcing a refresh (0 disables)")
fs.StringVar(&c.LogLevel, "log-level", envOrDefault("LOG_LEVEL", "info"), "Log level: debug, info, warn, error")
if err := fs.Parse(args); err != nil {
return nil, fmt.Errorf("parsing flags: %w", err)
}
if c.RepoURL == "" {
return nil, fmt.Errorf("--repo-url / GIT_REPO_URL is required")
}
return c, nil
}
func envOrDefault(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
func envDurationOrDefault(key string, def time.Duration) time.Duration {
if v := os.Getenv(key); v != "" {
d, err := time.ParseDuration(v)
if err == nil {
return d
}
}
return def
}
func envBoolOrDefault(key string, def bool) bool {
if v := os.Getenv(key); v != "" {
switch strings.ToLower(v) {
case "true", "1", "yes":
return true
case "false", "0", "no":
return false
}
}
return def
}
// Package gitwatch clones a remote git repo into memory and watches it for
// changes, triggering a callback whenever the watched branch advances.
package gitwatch
import (
"context"
"fmt"
"log/slog"
"strings"
"sync"
"time"
"github.com/go-git/go-billy/v5/memfs"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/object"
githttp "github.com/go-git/go-git/v5/plumbing/transport/http"
gitssh "github.com/go-git/go-git/v5/plumbing/transport/ssh"
"github.com/go-git/go-git/v5/plumbing/transport"
"github.com/go-git/go-git/v5/storage/memory"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/gerrowadat/nomad-botherer/internal/config"
)
// Watcher holds a live in-memory clone of a git repo and keeps it up to date.
type Watcher struct {
cfg *config.Config
mu sync.RWMutex
repo *git.Repository
lastCommit string
lastUpdate time.Time
triggerCh chan struct{}
onChange func(commit string)
gitFetches prometheus.Counter
gitFetchErrors prometheus.Counter
gitLastUpdate prometheus.Gauge
staleRefreshes prometheus.Counter
}
// New creates a Watcher that registers metrics into the default Prometheus registry.
func New(cfg *config.Config, onChange func(commit string)) *Watcher {
return NewWithRegistry(cfg, onChange, prometheus.DefaultRegisterer)
}
// NewWithRegistry creates a Watcher that registers metrics into reg.
// Use this in tests to avoid duplicate-registration panics.
func NewWithRegistry(cfg *config.Config, onChange func(commit string), reg prometheus.Registerer) *Watcher {
f := promauto.With(reg)
return &Watcher{
cfg: cfg,
triggerCh: make(chan struct{}, 1),
onChange: onChange,
gitFetches: f.NewCounter(prometheus.CounterOpts{
Name: "nomad_botherer_git_fetches_total",
Help: "Total number of remote git fetch/clone attempts.",
}),
gitFetchErrors: f.NewCounter(prometheus.CounterOpts{
Name: "nomad_botherer_git_fetch_errors_total",
Help: "Total number of remote git fetch/clone failures.",
}),
gitLastUpdate: f.NewGauge(prometheus.GaugeOpts{
Name: "nomad_botherer_git_last_update_timestamp_seconds",
Help: "Unix timestamp of the most recent successful git fetch.",
}),
staleRefreshes: f.NewCounter(prometheus.CounterOpts{
Name: "nomad_botherer_git_staleness_refreshes_total",
Help: "Total number of git fetches triggered by the staleness check.",
}),
}
}
// Clone performs the initial clone into memory. Must be called before Run.
func (w *Watcher) Clone(ctx context.Context) error {
auth, err := w.buildAuth()
if err != nil {
return fmt.Errorf("building git auth: %w", err)
}
slog.Info("Cloning repository", "url", w.cfg.RepoURL, "branch", w.cfg.Branch)
w.gitFetches.Inc()
storer := memory.NewStorage()
fs := memfs.New()
repo, err := git.CloneContext(ctx, storer, fs, &git.CloneOptions{
URL: w.cfg.RepoURL,
ReferenceName: plumbing.NewBranchReferenceName(w.cfg.Branch),
SingleBranch: true,
Auth: auth,
Progress: nil,
})
if err != nil {
w.gitFetchErrors.Inc()
return fmt.Errorf("cloning %s: %w", w.cfg.RepoURL, err)
}
commit, err := headCommit(repo)
if err != nil {
w.gitFetchErrors.Inc()
return err
}
now := time.Now()
w.mu.Lock()
w.repo = repo
w.lastCommit = commit
w.lastUpdate = now
w.mu.Unlock()
w.gitLastUpdate.Set(float64(now.Unix()))
slog.Info("Repository cloned", "commit", commit)
return nil
}
// Run polls for updates on the configured interval and also reacts to Trigger
// calls. Blocks until ctx is cancelled.
func (w *Watcher) Run(ctx context.Context) {
ticker := time.NewTicker(w.cfg.PollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
w.pull(ctx)
case <-w.triggerCh:
slog.Info("Webhook trigger received, pulling")
w.pull(ctx)
}
}
}
// Trigger schedules an immediate fetch, e.g. when a webhook fires.
// Non-blocking: if a trigger is already pending it is coalesced.
func (w *Watcher) Trigger() {
select {
case w.triggerCh <- struct{}{}:
default:
}
}
// TriggerStale schedules an immediate fetch because the repo has exceeded the
// configured maximum staleness. Increments the staleness counter and delegates
// to Trigger.
func (w *Watcher) TriggerStale() {
w.staleRefreshes.Inc()
w.Trigger()
}
// Ready reports whether the initial clone has completed successfully.
// Before Clone returns, Status and ReadHCLFiles return zero/nil values.
func (w *Watcher) Ready() bool {
w.mu.RLock()
defer w.mu.RUnlock()
return w.repo != nil
}
// Status returns the last seen commit hash and the time it was seen.
func (w *Watcher) Status() (lastCommit string, lastUpdate time.Time) {
w.mu.RLock()
defer w.mu.RUnlock()
return w.lastCommit, w.lastUpdate
}
// LastCommit returns just the last commit hash.
func (w *Watcher) LastCommit() string {
w.mu.RLock()
defer w.mu.RUnlock()
return w.lastCommit
}
// ReadHCLFiles returns a map of repo-relative path → file content for every
// .hcl file under the configured HCLDir.
func (w *Watcher) ReadHCLFiles() (map[string]string, error) {
w.mu.RLock()
repo := w.repo
w.mu.RUnlock()
if repo == nil {
return nil, fmt.Errorf("repository not cloned yet")
}
ref, err := repo.Head()
if err != nil {
return nil, fmt.Errorf("getting HEAD: %w", err)
}
commit, err := repo.CommitObject(ref.Hash())
if err != nil {
return nil, fmt.Errorf("getting commit object: %w", err)
}
tree, err := commit.Tree()
if err != nil {
return nil, fmt.Errorf("getting commit tree: %w", err)
}
// Build prefix filter for HCLDir.
hclPrefix := normalizeHCLDir(w.cfg.HCLDir)
result := make(map[string]string)
err = tree.Files().ForEach(func(f *object.File) error {
if !strings.HasSuffix(f.Name, ".hcl") {
return nil
}
if hclPrefix != "" && !strings.HasPrefix(f.Name, hclPrefix) {
return nil
}
content, err := f.Contents()
if err != nil {
slog.Warn("Could not read HCL file from git tree", "file", f.Name, "err", err)
return nil // skip bad files, don't abort the walk
}
result[f.Name] = content
return nil
})
if err != nil {
return nil, fmt.Errorf("walking commit tree: %w", err)
}
slog.Debug("Read HCL files from repo", "count", len(result), "hcl_dir", w.cfg.HCLDir)
return result, nil
}
// pull fetches the latest changes and calls onChange if the HEAD moved.
func (w *Watcher) pull(ctx context.Context) {
auth, err := w.buildAuth()
if err != nil {
slog.Error("Building git auth for pull", "err", err)
return
}
w.mu.RLock()
repo := w.repo
w.mu.RUnlock()
wt, err := repo.Worktree()
if err != nil {
slog.Error("Getting worktree", "err", err)
return
}
w.gitFetches.Inc()
err = wt.PullContext(ctx, &git.PullOptions{
RemoteName: "origin",
ReferenceName: plumbing.NewBranchReferenceName(w.cfg.Branch),
SingleBranch: true,
Force: true,
Auth: auth,
})
if err != nil && err != git.NoErrAlreadyUpToDate {
w.gitFetchErrors.Inc()
slog.Warn("Pull failed, attempting re-clone", "err", err)
if err2 := w.Clone(ctx); err2 != nil {
slog.Error("Re-clone failed", "err", err2)
return
}
// Clone already updated state and gauges; check if commit changed.
}
commit, err := headCommit(repo)
if err != nil {
slog.Error("Getting HEAD after pull", "err", err)
return
}
now := time.Now()
w.mu.Lock()
prev := w.lastCommit
w.lastCommit = commit
w.lastUpdate = now
w.mu.Unlock()
w.gitLastUpdate.Set(float64(now.Unix()))
if commit != prev {
slog.Info("New commit on branch", "branch", w.cfg.Branch, "commit", commit, "prev", prev)
if w.onChange != nil {
w.onChange(commit)
}
}
}
func (w *Watcher) buildAuth() (transport.AuthMethod, error) {
if w.cfg.GitSSHKeyPath != "" {
auth, err := gitssh.NewPublicKeysFromFile("git", w.cfg.GitSSHKeyPath, w.cfg.GitSSHKeyPass)
if err != nil {
return nil, fmt.Errorf("loading SSH key from %s: %w", w.cfg.GitSSHKeyPath, err)
}
return auth, nil
}
if w.cfg.GitToken != "" {
return &githttp.BasicAuth{
Username: "x-token", // username is ignored by GitHub for token auth
Password: w.cfg.GitToken,
}, nil
}
return nil, nil // anonymous / SSH agent
}
func headCommit(repo *git.Repository) (string, error) {
ref, err := repo.Head()
if err != nil {
return "", fmt.Errorf("getting HEAD: %w", err)
}
return ref.Hash().String(), nil
}
// normalizeHCLDir converts a user-supplied HCLDir value to the prefix used
// when filtering tree file paths (e.g. "jobs" → "jobs/", "" → "").
func normalizeHCLDir(dir string) string {
dir = strings.Trim(dir, "/")
if dir == "" || dir == "." {
return ""
}
return dir + "/"
}
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.11
// protoc (unknown)
// source: nomad_botherer.proto
package grpcapi
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type GetDiffsRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *GetDiffsRequest) Reset() {
*x = GetDiffsRequest{}
mi := &file_nomad_botherer_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *GetDiffsRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetDiffsRequest) ProtoMessage() {}
func (x *GetDiffsRequest) ProtoReflect() protoreflect.Message {
mi := &file_nomad_botherer_proto_msgTypes[0]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetDiffsRequest.ProtoReflect.Descriptor instead.
func (*GetDiffsRequest) Descriptor() ([]byte, []int) {
return file_nomad_botherer_proto_rawDescGZIP(), []int{0}
}
type GetDiffsResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Diffs []*JobDiff `protobuf:"bytes,1,rep,name=diffs,proto3" json:"diffs,omitempty"`
// RFC3339 timestamp of the last diff check.
LastCheckTime string `protobuf:"bytes,2,opt,name=last_check_time,json=lastCheckTime,proto3" json:"last_check_time,omitempty"`
// Git commit hash that was current during the last diff check.
LastCommit string `protobuf:"bytes,3,opt,name=last_commit,json=lastCommit,proto3" json:"last_commit,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *GetDiffsResponse) Reset() {
*x = GetDiffsResponse{}
mi := &file_nomad_botherer_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *GetDiffsResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetDiffsResponse) ProtoMessage() {}
func (x *GetDiffsResponse) ProtoReflect() protoreflect.Message {
mi := &file_nomad_botherer_proto_msgTypes[1]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetDiffsResponse.ProtoReflect.Descriptor instead.
func (*GetDiffsResponse) Descriptor() ([]byte, []int) {
return file_nomad_botherer_proto_rawDescGZIP(), []int{1}
}
func (x *GetDiffsResponse) GetDiffs() []*JobDiff {
if x != nil {
return x.Diffs
}
return nil
}
func (x *GetDiffsResponse) GetLastCheckTime() string {
if x != nil {
return x.LastCheckTime
}
return ""
}
func (x *GetDiffsResponse) GetLastCommit() string {
if x != nil {
return x.LastCommit
}
return ""
}
type JobDiff struct {
state protoimpl.MessageState `protogen:"open.v1"`
JobId string `protobuf:"bytes,1,opt,name=job_id,json=jobId,proto3" json:"job_id,omitempty"`
// Path to the HCL file; empty when diff_type is missing_from_hcl.
HclFile string `protobuf:"bytes,2,opt,name=hcl_file,json=hclFile,proto3" json:"hcl_file,omitempty"`
// One of: modified, missing_from_nomad, missing_from_hcl.
DiffType string `protobuf:"bytes,3,opt,name=diff_type,json=diffType,proto3" json:"diff_type,omitempty"`
// Human-readable summary of the diff (nomad plan output excerpt).
Detail string `protobuf:"bytes,4,opt,name=detail,proto3" json:"detail,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *JobDiff) Reset() {
*x = JobDiff{}
mi := &file_nomad_botherer_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *JobDiff) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*JobDiff) ProtoMessage() {}
func (x *JobDiff) ProtoReflect() protoreflect.Message {
mi := &file_nomad_botherer_proto_msgTypes[2]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use JobDiff.ProtoReflect.Descriptor instead.
func (*JobDiff) Descriptor() ([]byte, []int) {
return file_nomad_botherer_proto_rawDescGZIP(), []int{2}
}
func (x *JobDiff) GetJobId() string {
if x != nil {
return x.JobId
}
return ""
}
func (x *JobDiff) GetHclFile() string {
if x != nil {
return x.HclFile
}
return ""
}
func (x *JobDiff) GetDiffType() string {
if x != nil {
return x.DiffType
}
return ""
}
func (x *JobDiff) GetDetail() string {
if x != nil {
return x.Detail
}
return ""
}
type GetStatusRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *GetStatusRequest) Reset() {
*x = GetStatusRequest{}
mi := &file_nomad_botherer_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *GetStatusRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetStatusRequest) ProtoMessage() {}
func (x *GetStatusRequest) ProtoReflect() protoreflect.Message {
mi := &file_nomad_botherer_proto_msgTypes[3]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetStatusRequest.ProtoReflect.Descriptor instead.
func (*GetStatusRequest) Descriptor() ([]byte, []int) {
return file_nomad_botherer_proto_rawDescGZIP(), []int{3}
}
type GetStatusResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
LastCommit string `protobuf:"bytes,1,opt,name=last_commit,json=lastCommit,proto3" json:"last_commit,omitempty"`
// RFC3339 timestamp of the last successful git fetch.
LastUpdateTime string `protobuf:"bytes,2,opt,name=last_update_time,json=lastUpdateTime,proto3" json:"last_update_time,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *GetStatusResponse) Reset() {
*x = GetStatusResponse{}
mi := &file_nomad_botherer_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *GetStatusResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetStatusResponse) ProtoMessage() {}
func (x *GetStatusResponse) ProtoReflect() protoreflect.Message {
mi := &file_nomad_botherer_proto_msgTypes[4]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetStatusResponse.ProtoReflect.Descriptor instead.
func (*GetStatusResponse) Descriptor() ([]byte, []int) {
return file_nomad_botherer_proto_rawDescGZIP(), []int{4}
}
func (x *GetStatusResponse) GetLastCommit() string {
if x != nil {
return x.LastCommit
}
return ""
}
func (x *GetStatusResponse) GetLastUpdateTime() string {
if x != nil {
return x.LastUpdateTime
}
return ""
}
type TriggerRefreshRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *TriggerRefreshRequest) Reset() {
*x = TriggerRefreshRequest{}
mi := &file_nomad_botherer_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *TriggerRefreshRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*TriggerRefreshRequest) ProtoMessage() {}
func (x *TriggerRefreshRequest) ProtoReflect() protoreflect.Message {
mi := &file_nomad_botherer_proto_msgTypes[5]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use TriggerRefreshRequest.ProtoReflect.Descriptor instead.
func (*TriggerRefreshRequest) Descriptor() ([]byte, []int) {
return file_nomad_botherer_proto_rawDescGZIP(), []int{5}
}
type TriggerRefreshResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *TriggerRefreshResponse) Reset() {
*x = TriggerRefreshResponse{}
mi := &file_nomad_botherer_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *TriggerRefreshResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*TriggerRefreshResponse) ProtoMessage() {}
func (x *TriggerRefreshResponse) ProtoReflect() protoreflect.Message {
mi := &file_nomad_botherer_proto_msgTypes[6]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use TriggerRefreshResponse.ProtoReflect.Descriptor instead.
func (*TriggerRefreshResponse) Descriptor() ([]byte, []int) {
return file_nomad_botherer_proto_rawDescGZIP(), []int{6}
}
func (x *TriggerRefreshResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
type GetVersionRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *GetVersionRequest) Reset() {
*x = GetVersionRequest{}
mi := &file_nomad_botherer_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *GetVersionRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetVersionRequest) ProtoMessage() {}
func (x *GetVersionRequest) ProtoReflect() protoreflect.Message {
mi := &file_nomad_botherer_proto_msgTypes[7]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetVersionRequest.ProtoReflect.Descriptor instead.
func (*GetVersionRequest) Descriptor() ([]byte, []int) {
return file_nomad_botherer_proto_rawDescGZIP(), []int{7}
}
type GetVersionResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
// Semver tag or "dev" when built without -ldflags.
Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"`
// Git commit hash at build time, or "unknown".
Commit string `protobuf:"bytes,2,opt,name=commit,proto3" json:"commit,omitempty"`
// RFC3339 build timestamp, or "unknown".
BuildDate string `protobuf:"bytes,3,opt,name=build_date,json=buildDate,proto3" json:"build_date,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *GetVersionResponse) Reset() {
*x = GetVersionResponse{}
mi := &file_nomad_botherer_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *GetVersionResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetVersionResponse) ProtoMessage() {}
func (x *GetVersionResponse) ProtoReflect() protoreflect.Message {
mi := &file_nomad_botherer_proto_msgTypes[8]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetVersionResponse.ProtoReflect.Descriptor instead.
func (*GetVersionResponse) Descriptor() ([]byte, []int) {
return file_nomad_botherer_proto_rawDescGZIP(), []int{8}
}
func (x *GetVersionResponse) GetVersion() string {
if x != nil {
return x.Version
}
return ""
}
func (x *GetVersionResponse) GetCommit() string {
if x != nil {
return x.Commit
}
return ""
}
func (x *GetVersionResponse) GetBuildDate() string {
if x != nil {
return x.BuildDate
}
return ""
}
var File_nomad_botherer_proto protoreflect.FileDescriptor
const file_nomad_botherer_proto_rawDesc = "" +
"\n" +
"\x14nomad_botherer.proto\x12\x11nomad_botherer.v1\"\x11\n" +
"\x0fGetDiffsRequest\"\x8d\x01\n" +
"\x10GetDiffsResponse\x120\n" +
"\x05diffs\x18\x01 \x03(\v2\x1a.nomad_botherer.v1.JobDiffR\x05diffs\x12&\n" +
"\x0flast_check_time\x18\x02 \x01(\tR\rlastCheckTime\x12\x1f\n" +
"\vlast_commit\x18\x03 \x01(\tR\n" +
"lastCommit\"p\n" +
"\aJobDiff\x12\x15\n" +
"\x06job_id\x18\x01 \x01(\tR\x05jobId\x12\x19\n" +
"\bhcl_file\x18\x02 \x01(\tR\ahclFile\x12\x1b\n" +
"\tdiff_type\x18\x03 \x01(\tR\bdiffType\x12\x16\n" +
"\x06detail\x18\x04 \x01(\tR\x06detail\"\x12\n" +
"\x10GetStatusRequest\"^\n" +
"\x11GetStatusResponse\x12\x1f\n" +
"\vlast_commit\x18\x01 \x01(\tR\n" +
"lastCommit\x12(\n" +
"\x10last_update_time\x18\x02 \x01(\tR\x0elastUpdateTime\"\x17\n" +
"\x15TriggerRefreshRequest\"2\n" +
"\x16TriggerRefreshResponse\x12\x18\n" +
"\amessage\x18\x01 \x01(\tR\amessage\"\x13\n" +
"\x11GetVersionRequest\"e\n" +
"\x12GetVersionResponse\x12\x18\n" +
"\aversion\x18\x01 \x01(\tR\aversion\x12\x16\n" +
"\x06commit\x18\x02 \x01(\tR\x06commit\x12\x1d\n" +
"\n" +
"build_date\x18\x03 \x01(\tR\tbuildDate2\xfe\x02\n" +
"\rNomadBotherer\x12S\n" +
"\bGetDiffs\x12\".nomad_botherer.v1.GetDiffsRequest\x1a#.nomad_botherer.v1.GetDiffsResponse\x12V\n" +
"\tGetStatus\x12#.nomad_botherer.v1.GetStatusRequest\x1a$.nomad_botherer.v1.GetStatusResponse\x12e\n" +
"\x0eTriggerRefresh\x12(.nomad_botherer.v1.TriggerRefreshRequest\x1a).nomad_botherer.v1.TriggerRefreshResponse\x12Y\n" +
"\n" +
"GetVersion\x12$.nomad_botherer.v1.GetVersionRequest\x1a%.nomad_botherer.v1.GetVersionResponseB7Z5github.com/gerrowadat/nomad-botherer/internal/grpcapib\x06proto3"
var (
file_nomad_botherer_proto_rawDescOnce sync.Once
file_nomad_botherer_proto_rawDescData []byte
)
func file_nomad_botherer_proto_rawDescGZIP() []byte {
file_nomad_botherer_proto_rawDescOnce.Do(func() {
file_nomad_botherer_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_nomad_botherer_proto_rawDesc), len(file_nomad_botherer_proto_rawDesc)))
})
return file_nomad_botherer_proto_rawDescData
}
var file_nomad_botherer_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
var file_nomad_botherer_proto_goTypes = []any{
(*GetDiffsRequest)(nil), // 0: nomad_botherer.v1.GetDiffsRequest
(*GetDiffsResponse)(nil), // 1: nomad_botherer.v1.GetDiffsResponse
(*JobDiff)(nil), // 2: nomad_botherer.v1.JobDiff
(*GetStatusRequest)(nil), // 3: nomad_botherer.v1.GetStatusRequest
(*GetStatusResponse)(nil), // 4: nomad_botherer.v1.GetStatusResponse
(*TriggerRefreshRequest)(nil), // 5: nomad_botherer.v1.TriggerRefreshRequest
(*TriggerRefreshResponse)(nil), // 6: nomad_botherer.v1.TriggerRefreshResponse
(*GetVersionRequest)(nil), // 7: nomad_botherer.v1.GetVersionRequest
(*GetVersionResponse)(nil), // 8: nomad_botherer.v1.GetVersionResponse
}
var file_nomad_botherer_proto_depIdxs = []int32{
2, // 0: nomad_botherer.v1.GetDiffsResponse.diffs:type_name -> nomad_botherer.v1.JobDiff
0, // 1: nomad_botherer.v1.NomadBotherer.GetDiffs:input_type -> nomad_botherer.v1.GetDiffsRequest
3, // 2: nomad_botherer.v1.NomadBotherer.GetStatus:input_type -> nomad_botherer.v1.GetStatusRequest
5, // 3: nomad_botherer.v1.NomadBotherer.TriggerRefresh:input_type -> nomad_botherer.v1.TriggerRefreshRequest
7, // 4: nomad_botherer.v1.NomadBotherer.GetVersion:input_type -> nomad_botherer.v1.GetVersionRequest
1, // 5: nomad_botherer.v1.NomadBotherer.GetDiffs:output_type -> nomad_botherer.v1.GetDiffsResponse
4, // 6: nomad_botherer.v1.NomadBotherer.GetStatus:output_type -> nomad_botherer.v1.GetStatusResponse
6, // 7: nomad_botherer.v1.NomadBotherer.TriggerRefresh:output_type -> nomad_botherer.v1.TriggerRefreshResponse
8, // 8: nomad_botherer.v1.NomadBotherer.GetVersion:output_type -> nomad_botherer.v1.GetVersionResponse
5, // [5:9] is the sub-list for method output_type
1, // [1:5] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_nomad_botherer_proto_init() }
func file_nomad_botherer_proto_init() {
if File_nomad_botherer_proto != nil {
return
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_nomad_botherer_proto_rawDesc), len(file_nomad_botherer_proto_rawDesc)),
NumEnums: 0,
NumMessages: 9,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_nomad_botherer_proto_goTypes,
DependencyIndexes: file_nomad_botherer_proto_depIdxs,
MessageInfos: file_nomad_botherer_proto_msgTypes,
}.Build()
File_nomad_botherer_proto = out.File
file_nomad_botherer_proto_goTypes = nil
file_nomad_botherer_proto_depIdxs = nil
}
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.6.1
// - protoc (unknown)
// source: nomad_botherer.proto
package grpcapi
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
NomadBotherer_GetDiffs_FullMethodName = "/nomad_botherer.v1.NomadBotherer/GetDiffs"
NomadBotherer_GetStatus_FullMethodName = "/nomad_botherer.v1.NomadBotherer/GetStatus"
NomadBotherer_TriggerRefresh_FullMethodName = "/nomad_botherer.v1.NomadBotherer/TriggerRefresh"
NomadBotherer_GetVersion_FullMethodName = "/nomad_botherer.v1.NomadBotherer/GetVersion"
)
// NomadBothererClient is the client API for NomadBotherer service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
//
// NomadBotherer allows operators to query drift state and trigger refreshes.
type NomadBothererClient interface {
// GetDiffs returns the current set of job diffs detected between git and Nomad.
GetDiffs(ctx context.Context, in *GetDiffsRequest, opts ...grpc.CallOption) (*GetDiffsResponse, error)
// GetStatus returns git watcher status (last commit, last update time).
GetStatus(ctx context.Context, in *GetStatusRequest, opts ...grpc.CallOption) (*GetStatusResponse, error)
// TriggerRefresh causes an immediate git pull and diff check.
TriggerRefresh(ctx context.Context, in *TriggerRefreshRequest, opts ...grpc.CallOption) (*TriggerRefreshResponse, error)
// GetVersion returns the server build version, commit hash, and build date.
GetVersion(ctx context.Context, in *GetVersionRequest, opts ...grpc.CallOption) (*GetVersionResponse, error)
}
type nomadBothererClient struct {
cc grpc.ClientConnInterface
}
func NewNomadBothererClient(cc grpc.ClientConnInterface) NomadBothererClient {
return &nomadBothererClient{cc}
}
func (c *nomadBothererClient) GetDiffs(ctx context.Context, in *GetDiffsRequest, opts ...grpc.CallOption) (*GetDiffsResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(GetDiffsResponse)
err := c.cc.Invoke(ctx, NomadBotherer_GetDiffs_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *nomadBothererClient) GetStatus(ctx context.Context, in *GetStatusRequest, opts ...grpc.CallOption) (*GetStatusResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(GetStatusResponse)
err := c.cc.Invoke(ctx, NomadBotherer_GetStatus_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *nomadBothererClient) TriggerRefresh(ctx context.Context, in *TriggerRefreshRequest, opts ...grpc.CallOption) (*TriggerRefreshResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(TriggerRefreshResponse)
err := c.cc.Invoke(ctx, NomadBotherer_TriggerRefresh_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *nomadBothererClient) GetVersion(ctx context.Context, in *GetVersionRequest, opts ...grpc.CallOption) (*GetVersionResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(GetVersionResponse)
err := c.cc.Invoke(ctx, NomadBotherer_GetVersion_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// NomadBothererServer is the server API for NomadBotherer service.
// All implementations must embed UnimplementedNomadBothererServer
// for forward compatibility.
//
// NomadBotherer allows operators to query drift state and trigger refreshes.
type NomadBothererServer interface {
// GetDiffs returns the current set of job diffs detected between git and Nomad.
GetDiffs(context.Context, *GetDiffsRequest) (*GetDiffsResponse, error)
// GetStatus returns git watcher status (last commit, last update time).
GetStatus(context.Context, *GetStatusRequest) (*GetStatusResponse, error)
// TriggerRefresh causes an immediate git pull and diff check.
TriggerRefresh(context.Context, *TriggerRefreshRequest) (*TriggerRefreshResponse, error)
// GetVersion returns the server build version, commit hash, and build date.
GetVersion(context.Context, *GetVersionRequest) (*GetVersionResponse, error)
mustEmbedUnimplementedNomadBothererServer()
}
// UnimplementedNomadBothererServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedNomadBothererServer struct{}
func (UnimplementedNomadBothererServer) GetDiffs(context.Context, *GetDiffsRequest) (*GetDiffsResponse, error) {
return nil, status.Error(codes.Unimplemented, "method GetDiffs not implemented")
}
func (UnimplementedNomadBothererServer) GetStatus(context.Context, *GetStatusRequest) (*GetStatusResponse, error) {
return nil, status.Error(codes.Unimplemented, "method GetStatus not implemented")
}
func (UnimplementedNomadBothererServer) TriggerRefresh(context.Context, *TriggerRefreshRequest) (*TriggerRefreshResponse, error) {
return nil, status.Error(codes.Unimplemented, "method TriggerRefresh not implemented")
}
func (UnimplementedNomadBothererServer) GetVersion(context.Context, *GetVersionRequest) (*GetVersionResponse, error) {
return nil, status.Error(codes.Unimplemented, "method GetVersion not implemented")
}
func (UnimplementedNomadBothererServer) mustEmbedUnimplementedNomadBothererServer() {}
func (UnimplementedNomadBothererServer) testEmbeddedByValue() {}
// UnsafeNomadBothererServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to NomadBothererServer will
// result in compilation errors.
type UnsafeNomadBothererServer interface {
mustEmbedUnimplementedNomadBothererServer()
}
func RegisterNomadBothererServer(s grpc.ServiceRegistrar, srv NomadBothererServer) {
// If the following call panics, it indicates UnimplementedNomadBothererServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&NomadBotherer_ServiceDesc, srv)
}
func _NomadBotherer_GetDiffs_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetDiffsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(NomadBothererServer).GetDiffs(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: NomadBotherer_GetDiffs_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(NomadBothererServer).GetDiffs(ctx, req.(*GetDiffsRequest))
}
return interceptor(ctx, in, info, handler)
}
func _NomadBotherer_GetStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetStatusRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(NomadBothererServer).GetStatus(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: NomadBotherer_GetStatus_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(NomadBothererServer).GetStatus(ctx, req.(*GetStatusRequest))
}
return interceptor(ctx, in, info, handler)
}
func _NomadBotherer_TriggerRefresh_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(TriggerRefreshRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(NomadBothererServer).TriggerRefresh(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: NomadBotherer_TriggerRefresh_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(NomadBothererServer).TriggerRefresh(ctx, req.(*TriggerRefreshRequest))
}
return interceptor(ctx, in, info, handler)
}
func _NomadBotherer_GetVersion_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetVersionRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(NomadBothererServer).GetVersion(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: NomadBotherer_GetVersion_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(NomadBothererServer).GetVersion(ctx, req.(*GetVersionRequest))
}
return interceptor(ctx, in, info, handler)
}
// NomadBotherer_ServiceDesc is the grpc.ServiceDesc for NomadBotherer service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var NomadBotherer_ServiceDesc = grpc.ServiceDesc{
ServiceName: "nomad_botherer.v1.NomadBotherer",
HandlerType: (*NomadBothererServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "GetDiffs",
Handler: _NomadBotherer_GetDiffs_Handler,
},
{
MethodName: "GetStatus",
Handler: _NomadBotherer_GetStatus_Handler,
},
{
MethodName: "TriggerRefresh",
Handler: _NomadBotherer_TriggerRefresh_Handler,
},
{
MethodName: "GetVersion",
Handler: _NomadBotherer_GetVersion_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "nomad_botherer.proto",
}
// Package grpcserver provides a gRPC endpoint for querying and controlling
// nomad-botherer. All RPCs require a pre-shared API key supplied in the
// "authorization" metadata header as "Bearer <key>".
package grpcserver
import (
"context"
"crypto/subtle"
"fmt"
"log/slog"
"net"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"github.com/gerrowadat/nomad-botherer/internal/grpcapi"
"github.com/gerrowadat/nomad-botherer/internal/nomad"
)
// DiffSource is satisfied by *nomad.Differ.
type DiffSource interface {
Diffs() ([]nomad.JobDiff, time.Time, string)
// Ready reports whether at least one diff check has completed.
Ready() bool
}
// GitStatusSource is satisfied by *gitwatch.Watcher.
type GitStatusSource interface {
Trigger()
Status() (lastCommit string, lastUpdate time.Time)
// Ready reports whether the initial git clone has completed.
Ready() bool
}
// BuildInfo holds the version strings injected at link time.
type BuildInfo struct {
Version string
Commit string
BuildDate string
}
// Server implements grpcapi.NomadBothererServer.
type Server struct {
grpcapi.UnimplementedNomadBothererServer
// expectedToken is pre-computed as "Bearer <apiKey>" to avoid allocation
// per request and to allow constant-time comparison in the interceptor.
expectedToken string
diffs DiffSource
git GitStatusSource
buildInfo BuildInfo
// Prometheus metrics
rpcTotal *prometheus.CounterVec
rpcErrors *prometheus.CounterVec
}
// New creates a Server using the default Prometheus registry.
func New(apiKey string, diffs DiffSource, git GitStatusSource, info BuildInfo) *Server {
return NewWithRegistry(apiKey, diffs, git, info, prometheus.DefaultRegisterer)
}
// NewWithRegistry creates a Server with a custom Prometheus Registerer.
func NewWithRegistry(apiKey string, diffs DiffSource, git GitStatusSource, info BuildInfo, reg prometheus.Registerer) *Server {
return &Server{
expectedToken: "Bearer " + apiKey,
diffs: diffs,
git: git,
buildInfo: info,
rpcTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "nomad_botherer_grpc_requests_total",
Help: "Authenticated gRPC requests completed, by method and gRPC status code. Does not include requests rejected before auth — see nomad_botherer_grpc_auth_errors_total for those.",
}, []string{"method", "code"}),
rpcErrors: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "nomad_botherer_grpc_auth_errors_total",
Help: "gRPC requests rejected due to a missing or invalid API key, by method.",
}, []string{"method"}),
}
}
// GRPCServer builds and returns a configured *grpc.Server bound to s.
// The caller is responsible for registering it on a listener and shutting it
// down gracefully.
func (s *Server) GRPCServer() *grpc.Server {
srv := grpc.NewServer(
grpc.UnaryInterceptor(s.authInterceptor),
)
grpcapi.RegisterNomadBothererServer(srv, s)
return srv
}
// Listen binds to addr and returns a net.Listener. Call Serve to start
// accepting connections. Separating the two steps lets callers fail fast on
// a bind error before detaching into a goroutine.
func (s *Server) Listen(addr string) (net.Listener, error) {
lis, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("grpc listen %s: %w", addr, err)
}
return lis, nil
}
// Serve starts accepting gRPC connections on lis and blocks until ctx is
// cancelled, at which point it drains in-flight requests via GracefulStop.
func (s *Server) Serve(ctx context.Context, lis net.Listener) error {
srv := s.GRPCServer()
go func() {
<-ctx.Done()
srv.GracefulStop()
}()
slog.Info("gRPC server listening", "addr", lis.Addr())
if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped {
return fmt.Errorf("grpc serve: %w", err)
}
return nil
}
// authInterceptor enforces the pre-shared API key.
// Clients must supply metadata: authorization: Bearer <key>
func (s *Server) authInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
s.rpcErrors.WithLabelValues(info.FullMethod).Inc()
return nil, status.Error(codes.Unauthenticated, "missing metadata")
}
values := md.Get("authorization")
if len(values) == 0 || subtle.ConstantTimeCompare([]byte(values[0]), []byte(s.expectedToken)) != 1 {
s.rpcErrors.WithLabelValues(info.FullMethod).Inc()
return nil, status.Error(codes.Unauthenticated, "invalid or missing API key")
}
resp, err := handler(ctx, req)
code := codes.OK
if err != nil {
if st, ok := status.FromError(err); ok {
code = st.Code()
} else {
code = codes.Unknown
}
}
s.rpcTotal.WithLabelValues(info.FullMethod, code.String()).Inc()
return resp, err
}
// GetDiffs returns the latest set of job diffs.
func (s *Server) GetDiffs(_ context.Context, _ *grpcapi.GetDiffsRequest) (*grpcapi.GetDiffsResponse, error) {
if !s.git.Ready() || !s.diffs.Ready() {
return nil, status.Error(codes.Unavailable, "server is not ready: initial state not yet built")
}
diffs, lastCheck, lastCommit := s.diffs.Diffs()
pbDiffs := make([]*grpcapi.JobDiff, 0, len(diffs))
for _, d := range diffs {
pbDiffs = append(pbDiffs, &grpcapi.JobDiff{
JobId: d.JobID,
HclFile: d.HCLFile,
DiffType: string(d.DiffType),
Detail: d.Detail,
})
}
resp := &grpcapi.GetDiffsResponse{
Diffs: pbDiffs,
LastCommit: lastCommit,
}
if !lastCheck.IsZero() {
resp.LastCheckTime = lastCheck.UTC().Format(time.RFC3339)
}
return resp, nil
}
// GetStatus returns git watcher status.
func (s *Server) GetStatus(_ context.Context, _ *grpcapi.GetStatusRequest) (*grpcapi.GetStatusResponse, error) {
if !s.git.Ready() {
return nil, status.Error(codes.Unavailable, "server is not ready: git state not yet initialized")
}
lastCommit, lastUpdate := s.git.Status()
resp := &grpcapi.GetStatusResponse{
LastCommit: lastCommit,
}
if !lastUpdate.IsZero() {
resp.LastUpdateTime = lastUpdate.UTC().Format(time.RFC3339)
}
return resp, nil
}
// TriggerRefresh triggers an immediate git pull.
func (s *Server) TriggerRefresh(_ context.Context, _ *grpcapi.TriggerRefreshRequest) (*grpcapi.TriggerRefreshResponse, error) {
s.git.Trigger()
return &grpcapi.TriggerRefreshResponse{Message: "refresh triggered"}, nil
}
// GetVersion returns the build version, commit, and build date.
func (s *Server) GetVersion(_ context.Context, _ *grpcapi.GetVersionRequest) (*grpcapi.GetVersionResponse, error) {
return &grpcapi.GetVersionResponse{
Version: s.buildInfo.Version,
Commit: s.buildInfo.Commit,
BuildDate: s.buildInfo.BuildDate,
}, nil
}
// Package nomad compares HCL job definitions against a live Nomad cluster and
// reports any diffs it finds.
package nomad
import (
"fmt"
"log/slog"
"path"
"regexp"
"strings"
"sync"
"time"
nomadapi "github.com/hashicorp/nomad/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/gerrowadat/nomad-botherer/internal/config"
)
// jobBlockRe matches a top-level Nomad job stanza in HCL.
// Files without this pattern are silently skipped (e.g. ACL policies, volumes, namespaces).
var jobBlockRe = regexp.MustCompile(`(?m)^\s*job\s+"`)
// DiffType describes the relationship between a job in HCL and in Nomad.
type DiffType string
const (
// DiffTypeModified means the job exists in both HCL and Nomad but the
// definitions differ (Nomad plan shows changes).
DiffTypeModified DiffType = "modified"
// DiffTypeMissingFromNomad means the job is defined in HCL but not
// currently registered in Nomad.
DiffTypeMissingFromNomad DiffType = "missing_from_nomad"
// DiffTypeMissingFromHCL means the job is running in Nomad but there is
// no corresponding HCL file in the repo.
DiffTypeMissingFromHCL DiffType = "missing_from_hcl"
)
// JobDiff describes a single divergence between the git repo and Nomad.
type JobDiff struct {
JobID string `json:"job_id"`
HCLFile string `json:"hcl_file,omitempty"` // empty for MissingFromHCL
DiffType DiffType `json:"diff_type"`
Detail string `json:"detail"`
// PlanDiff holds the structured diff from the Nomad plan API.
// Only populated for DiffTypeModified entries.
PlanDiff *nomadapi.JobDiff `json:"-"`
}
// NomadJobsClient is the subset of the Nomad API jobs client we use.
// The concrete *nomadapi.Jobs satisfies this interface; tests inject a mock.
type NomadJobsClient interface {
ParseHCL(jobHCL string, canonicalize bool) (*nomadapi.Job, error)
Plan(job *nomadapi.Job, diff bool, q *nomadapi.WriteOptions) (*nomadapi.JobPlanResponse, *nomadapi.WriteMeta, error)
Info(jobID string, q *nomadapi.QueryOptions) (*nomadapi.Job, *nomadapi.QueryMeta, error)
List(q *nomadapi.QueryOptions) ([]*nomadapi.JobListStub, *nomadapi.QueryMeta, error)
}
// Differ runs periodic diff checks and stores the latest results.
type Differ struct {
jobs NomadJobsClient
namespace string
includeDeadJobs bool
jobSelectorGlob string
managedMetaPrefix string
mu sync.RWMutex
diffs []JobDiff
lastCheckTime time.Time
lastCommit string
lastNomadIndex uint64 // Raft index from the last successful List(); protected by mu
driftFirstSeen map[string]time.Time // key: driftKey(jobID, diffType); protected by mu
hclParseErrors prometheus.Counter
hclFilesSkipped prometheus.Counter
diffChecks prometheus.Counter
diffChecksSkipped prometheus.Counter
staleChecks prometheus.Counter
jobsSkippedBySel *prometheus.CounterVec
nomadAPIErrors *prometheus.CounterVec
lastCheck prometheus.Gauge
jobDiffs *prometheus.GaugeVec
driftedJobs *prometheus.GaugeVec
jobDriftSince *prometheus.GaugeVec
}
// newDifferBase constructs a Differ with metrics registered into reg.
func newDifferBase(jobs NomadJobsClient, namespace string, includeDeadJobs bool, jobSelectorGlob, managedMetaPrefix string, reg prometheus.Registerer) *Differ {
f := promauto.With(reg)
return &Differ{
jobs: jobs,
namespace: namespace,
includeDeadJobs: includeDeadJobs,
jobSelectorGlob: jobSelectorGlob,
managedMetaPrefix: managedMetaPrefix,
driftFirstSeen: make(map[string]time.Time),
hclParseErrors: f.NewCounter(prometheus.CounterOpts{
Name: "nomad_botherer_hcl_parse_errors_total",
Help: "Total number of HCL files that failed to parse as Nomad job definitions.",
}),
hclFilesSkipped: f.NewCounter(prometheus.CounterOpts{
Name: "nomad_botherer_hcl_non_job_files_skipped_total",
Help: "Total number of HCL files skipped because they lack a top-level job stanza (e.g. ACL policies, volumes).",
}),
diffChecks: f.NewCounter(prometheus.CounterOpts{
Name: "nomad_botherer_diff_checks_total",
Help: "Total number of diff checks run against the Nomad cluster.",
}),
diffChecksSkipped: f.NewCounter(prometheus.CounterOpts{
Name: "nomad_botherer_diff_checks_skipped_total",
Help: "Total number of diff checks skipped because neither the Nomad index nor the git commit changed.",
}),
staleChecks: f.NewCounter(prometheus.CounterOpts{
Name: "nomad_botherer_nomad_staleness_checks_total",
Help: "Total number of Nomad diff checks triggered by the staleness check.",
}),
jobsSkippedBySel: f.NewCounterVec(prometheus.CounterOpts{
Name: "nomad_botherer_jobs_skipped_by_selector_total",
Help: "Total number of jobs skipped because they did not match the configured selection criteria, by source (hcl or nomad).",
}, []string{"source"}),
nomadAPIErrors: f.NewCounterVec(prometheus.CounterOpts{
Name: "nomad_botherer_nomad_api_errors_total",
Help: "Total number of Nomad API errors by operation.",
}, []string{"op"}),
lastCheck: f.NewGauge(prometheus.GaugeOpts{
Name: "nomad_botherer_last_check_timestamp_seconds",
Help: "Unix timestamp of the most recent diff check.",
}),
jobDiffs: f.NewGaugeVec(prometheus.GaugeOpts{
Name: "nomad_botherer_job_diffs",
Help: "1 for each job/diff-type combination currently detected.",
}, []string{"job", "diff_type"}),
driftedJobs: f.NewGaugeVec(prometheus.GaugeOpts{
Name: "nomad_botherer_drifted_jobs",
Help: "Number of jobs currently in each drift state.",
}, []string{"diff_type"}),
jobDriftSince: f.NewGaugeVec(prometheus.GaugeOpts{
Name: "nomad_botherer_job_drift_first_seen_timestamp_seconds",
Help: "Unix timestamp when drift was first detected for each job. Cleared when drift resolves. Use time()-metric to get seconds in drift state.",
}, []string{"job", "diff_type"}),
}
}
// NewDiffer creates a Differ backed by a real Nomad API client.
func NewDiffer(cfg *config.Config) (*Differ, error) {
nomadCfg := nomadapi.DefaultConfig()
nomadCfg.Address = cfg.NomadAddr
if cfg.NomadToken != "" {
nomadCfg.SecretID = cfg.NomadToken
}
client, err := nomadapi.NewClient(nomadCfg)
if err != nil {
return nil, fmt.Errorf("creating nomad client: %w", err)
}
return newDifferBase(client.Jobs(), cfg.NomadNamespace, cfg.IncludeDeadJobs, cfg.JobSelectorGlob, cfg.ManagedMetaPrefix, prometheus.DefaultRegisterer), nil
}
// NewWithClient creates a Differ with a custom jobs client, intended for tests.
func NewWithClient(cfg *config.Config, jobs NomadJobsClient) *Differ {
return newDifferBase(jobs, cfg.NomadNamespace, cfg.IncludeDeadJobs, cfg.JobSelectorGlob, cfg.ManagedMetaPrefix, prometheus.NewRegistry())
}
// NewWithClientAndRegistry creates a Differ with a custom jobs client and Prometheus
// registry. Use this in tests that need to inspect metric values.
func NewWithClientAndRegistry(cfg *config.Config, jobs NomadJobsClient, reg prometheus.Registerer) *Differ {
return newDifferBase(jobs, cfg.NomadNamespace, cfg.IncludeDeadJobs, cfg.JobSelectorGlob, cfg.ManagedMetaPrefix, reg)
}
// jobIsSelected reports whether a job should be watched. A job is selected when
// its ID matches the configured glob pattern, or when its meta map contains
// "<managedMetaPrefix>.managed" set to "true". If both are empty, no jobs
// are selected.
func (d *Differ) jobIsSelected(jobID string, meta map[string]string) bool {
if d.jobSelectorGlob != "" {
if matched, _ := path.Match(d.jobSelectorGlob, jobID); matched {
return true
}
}
if d.managedMetaPrefix != "" && meta[d.managedMetaPrefix+".managed"] == "true" {
return true
}
return false
}
// Check compares the given HCL files (path → content) against the live Nomad
// cluster and stores the results. commit is recorded for informational purposes.
func (d *Differ) Check(hclFiles map[string]string, commit string) error {
q := &nomadapi.QueryOptions{Namespace: d.namespace}
wq := &nomadapi.WriteOptions{Namespace: d.namespace}
// List all Nomad jobs first. The returned Raft index lets us skip the
// expensive per-job work when neither Nomad state nor the git commit has
// changed since the last check.
allJobs, listMeta, err := d.jobs.List(q)
if err != nil {
d.nomadAPIErrors.WithLabelValues("list").Inc()
slog.Warn("Failed to list Nomad jobs", "err", err)
allJobs = nil
listMeta = nil
}
d.mu.RLock()
prevCommit := d.lastCommit
prevIndex := d.lastNomadIndex
d.mu.RUnlock()
if listMeta != nil && listMeta.LastIndex == prevIndex && commit == prevCommit {
slog.Debug("Skipping diff: Nomad index and commit unchanged", "index", listMeta.LastIndex, "commit", commit)
d.diffChecksSkipped.Inc()
return nil
}
slog.Info("Running diff check", "commit", commit, "hcl_files", len(hclFiles))
d.diffChecks.Inc()
// Parse all HCL files via the Nomad API.
hclJobs := make(map[string]*nomadapi.Job) // jobID → parsed job
hclJobFile := make(map[string]string) // jobID → source HCL file path
for filename, content := range hclFiles {
if !jobBlockRe.MatchString(content) {
slog.Debug("Skipping HCL file with no job stanza", "file", filename)
d.hclFilesSkipped.Inc()
continue
}
job, err := d.jobs.ParseHCL(content, true)
if err != nil {
slog.Warn("Failed to parse HCL file, skipping", "file", filename, "err", err)
d.hclParseErrors.Inc()
continue
}
if job == nil || job.ID == nil || *job.ID == "" {
slog.Warn("HCL file yielded no job ID, skipping", "file", filename)
continue
}
jobID := *job.ID
if !d.jobIsSelected(jobID, job.Meta) {
slog.Debug("Skipping job not matching selection criteria", "job", jobID, "file", filename)
d.jobsSkippedBySel.WithLabelValues("hcl").Inc()
continue
}
hclJobs[jobID] = job
hclJobFile[jobID] = filename
slog.Debug("Parsed HCL file", "file", filename, "job_id", jobID)
}
var diffs []JobDiff
// For each job defined in HCL, check Nomad.
for jobID, job := range hclJobs {
filename := hclJobFile[jobID]
nomadJob, _, err := d.jobs.Info(jobID, q)
if err != nil {
if isNotFound(err) {
diffs = append(diffs, JobDiff{
JobID: jobID,
HCLFile: filename,
DiffType: DiffTypeMissingFromNomad,
Detail: "job is defined in HCL but not registered in Nomad",
})
continue
}
d.nomadAPIErrors.WithLabelValues("info").Inc()
slog.Warn("Failed to query job from Nomad", "job", jobID, "err", err)
continue
}
// Unless the caller explicitly wants dead jobs included, treat a dead
// job the same as a missing one.
if !d.includeDeadJobs && nomadJob != nil && nomadJob.Status != nil && *nomadJob.Status == "dead" {
slog.Debug("Job is dead in Nomad, treating as missing", "job", jobID)
diffs = append(diffs, JobDiff{
JobID: jobID,
HCLFile: filename,
DiffType: DiffTypeMissingFromNomad,
Detail: "job is defined in HCL but is in 'dead' state in Nomad",
})
continue
}
// Job exists and is live — run a plan to detect config drift.
plan, _, err := d.jobs.Plan(job, true, wq)
if err != nil {
d.nomadAPIErrors.WithLabelValues("plan").Inc()
slog.Warn("Failed to plan job", "job", jobID, "err", err)
continue
}
if plan.Diff != nil && plan.Diff.Type != "" && plan.Diff.Type != "None" {
diffs = append(diffs, JobDiff{
JobID: jobID,
HCLFile: filename,
DiffType: DiffTypeModified,
Detail: fmt.Sprintf("Nomad plan shows diff type %q", plan.Diff.Type),
PlanDiff: plan.Diff,
})
}
}
// Find jobs in Nomad that have no corresponding HCL file.
// Dead jobs are skipped unless --include-dead-jobs is set, since a dead
// job without HCL is expected (it was stopped intentionally).
// Only jobs that match the configured selection criteria are considered managed.
for _, j := range allJobs {
if !d.includeDeadJobs && j.Status == "dead" {
continue
}
if !d.jobIsSelected(j.ID, j.Meta) {
d.jobsSkippedBySel.WithLabelValues("nomad").Inc()
continue
}
if _, ok := hclJobs[j.ID]; !ok {
diffs = append(diffs, JobDiff{
JobID: j.ID,
DiffType: DiffTypeMissingFromHCL,
Detail: fmt.Sprintf("job is running in Nomad (status: %s) but has no HCL definition in the repo", j.Status),
})
}
}
now := time.Now()
// Build the set of currently-drifting job+type keys.
currentKeys := make(map[string]struct{}, len(diffs))
for _, diff := range diffs {
currentKeys[driftKey(diff.JobID, string(diff.DiffType))] = struct{}{}
}
d.mu.Lock()
d.diffs = diffs
d.lastCheckTime = now
d.lastCommit = commit
if listMeta != nil {
d.lastNomadIndex = listMeta.LastIndex
}
// Remove entries that are no longer drifting.
for k := range d.driftFirstSeen {
if _, ok := currentKeys[k]; !ok {
delete(d.driftFirstSeen, k)
}
}
// Record the first time each new drift is observed.
for k := range currentKeys {
if _, ok := d.driftFirstSeen[k]; !ok {
d.driftFirstSeen[k] = now
}
}
// Snapshot first-seen times for metric updates below (outside the lock).
firstSeenSnapshot := make(map[string]time.Time, len(d.driftFirstSeen))
for k, v := range d.driftFirstSeen {
firstSeenSnapshot[k] = v
}
d.mu.Unlock()
d.lastCheck.Set(float64(now.Unix()))
d.jobDiffs.Reset()
d.driftedJobs.Reset()
d.jobDriftSince.Reset()
typeCounts := make(map[string]int)
for _, diff := range diffs {
d.jobDiffs.WithLabelValues(diff.JobID, string(diff.DiffType)).Set(1)
typeCounts[string(diff.DiffType)]++
}
for typ, count := range typeCounts {
d.driftedJobs.WithLabelValues(typ).Set(float64(count))
}
for _, diff := range diffs {
k := driftKey(diff.JobID, string(diff.DiffType))
if t, ok := firstSeenSnapshot[k]; ok {
d.jobDriftSince.WithLabelValues(diff.JobID, string(diff.DiffType)).Set(float64(t.Unix()))
}
}
slog.Info("Diff check complete", "diffs", len(diffs), "commit", commit)
return nil
}
// ForceCheck runs a diff check unconditionally because the Nomad state has
// exceeded the configured maximum staleness. Increments the staleness counter
// and delegates to Check.
func (d *Differ) ForceCheck(hclFiles map[string]string, commit string) error {
d.staleChecks.Inc()
return d.Check(hclFiles, commit)
}
// driftKey returns a map key for a (jobID, diffType) pair.
func driftKey(jobID, diffType string) string {
return jobID + "\x00" + diffType
}
// Ready reports whether at least one diff check has completed successfully.
// Before the first check finishes, callers cannot distinguish "no drift" from
// "haven't checked yet", so they should treat the Differ as unavailable.
func (d *Differ) Ready() bool {
d.mu.RLock()
defer d.mu.RUnlock()
return !d.lastCheckTime.IsZero()
}
// Diffs returns a snapshot of the latest diffs, the time they were computed,
// and the git commit they were computed against.
func (d *Differ) Diffs() ([]JobDiff, time.Time, string) {
d.mu.RLock()
defer d.mu.RUnlock()
result := make([]JobDiff, len(d.diffs))
copy(result, d.diffs)
return result, d.lastCheckTime, d.lastCommit
}
func isNotFound(err error) bool {
if err == nil {
return false
}
s := err.Error()
return strings.Contains(s, "404") || strings.Contains(strings.ToLower(s), "not found")
}
package server
import (
"fmt"
"sort"
"strings"
"time"
nomadapi "github.com/hashicorp/nomad/api"
"github.com/gerrowadat/nomad-botherer/internal/nomad"
)
// renderDiffsText produces a nomad-job-plan-style plain-text representation
// of the current diff state.
func renderDiffsText(diffs []nomad.JobDiff, lastCheck time.Time, commit string) string {
var b strings.Builder
fmt.Fprintln(&b, "nomad-botherer diff report")
if !lastCheck.IsZero() {
fmt.Fprintf(&b, "Last check: %s | Commit: %s\n", lastCheck.Format(time.RFC3339), commit)
}
fmt.Fprintln(&b)
if len(diffs) == 0 {
fmt.Fprintln(&b, "No differences detected.")
return b.String()
}
fmt.Fprintf(&b, "%d difference(s) detected:\n", len(diffs))
for _, d := range diffs {
fmt.Fprintln(&b)
switch d.DiffType {
case nomad.DiffTypeMissingFromNomad:
fmt.Fprintf(&b, "+ Job: %q\n", d.JobID)
fmt.Fprintf(&b, " Defined in %s but not registered in Nomad.\n", d.HCLFile)
case nomad.DiffTypeMissingFromHCL:
fmt.Fprintf(&b, "- Job: %q\n", d.JobID)
fmt.Fprintf(&b, " %s\n", d.Detail)
case nomad.DiffTypeModified:
if d.PlanDiff != nil {
renderJobDiff(&b, d.PlanDiff, d.HCLFile)
} else {
fmt.Fprintf(&b, "+/- Job: %q\n", d.JobID)
fmt.Fprintf(&b, " %s\n", d.Detail)
}
}
}
return b.String()
}
func renderJobDiff(b *strings.Builder, jd *nomadapi.JobDiff, hclFile string) {
if hclFile != "" {
fmt.Fprintf(b, "%s Job: %q (%s)\n", diffSymbol(jd.Type), jd.ID, hclFile)
} else {
fmt.Fprintf(b, "%s Job: %q\n", diffSymbol(jd.Type), jd.ID)
}
renderFields(b, jd.Fields, " ")
renderObjects(b, jd.Objects, " ")
for _, tg := range jd.TaskGroups {
renderTaskGroupDiff(b, tg, " ")
}
}
func renderTaskGroupDiff(b *strings.Builder, tg *nomadapi.TaskGroupDiff, indent string) {
var updates string
if len(tg.Updates) > 0 {
parts := make([]string, 0, len(tg.Updates))
for k, v := range tg.Updates {
if v > 0 {
parts = append(parts, fmt.Sprintf("%d %s", v, k))
}
}
sort.Strings(parts)
if len(parts) > 0 {
updates = " (" + strings.Join(parts, ", ") + ")"
}
}
fmt.Fprintf(b, "%s%s Task Group: %q%s\n", indent, diffSymbol(tg.Type), tg.Name, updates)
renderFields(b, tg.Fields, indent+" ")
renderObjects(b, tg.Objects, indent+" ")
for _, t := range tg.Tasks {
renderTaskDiff(b, t, indent+" ")
}
}
func renderTaskDiff(b *strings.Builder, t *nomadapi.TaskDiff, indent string) {
ann := ""
if len(t.Annotations) > 0 {
ann = " (" + strings.Join(t.Annotations, ", ") + ")"
}
fmt.Fprintf(b, "%s%s Task: %q%s\n", indent, diffSymbol(t.Type), t.Name, ann)
renderFields(b, t.Fields, indent+" ")
renderObjects(b, t.Objects, indent+" ")
}
func renderFields(b *strings.Builder, fields []*nomadapi.FieldDiff, indent string) {
for _, f := range fields {
ann := ""
if len(f.Annotations) > 0 {
ann = " (" + strings.Join(f.Annotations, ", ") + ")"
}
switch f.Type {
case "Added":
fmt.Fprintf(b, "%s+ %s: %q%s\n", indent, f.Name, f.New, ann)
case "Deleted":
fmt.Fprintf(b, "%s- %s: %q%s\n", indent, f.Name, f.Old, ann)
case "Edited":
fmt.Fprintf(b, "%s~ %s: %q => %q%s\n", indent, f.Name, f.Old, f.New, ann)
}
}
}
func renderObjects(b *strings.Builder, objects []*nomadapi.ObjectDiff, indent string) {
for _, o := range objects {
fmt.Fprintf(b, "%s%s %s {\n", indent, diffSymbol(o.Type), o.Name)
renderFields(b, o.Fields, indent+" ")
renderObjects(b, o.Objects, indent+" ")
fmt.Fprintf(b, "%s}\n", indent)
}
}
func diffSymbol(t string) string {
switch t {
case "Added":
return "+"
case "Deleted":
return "-"
case "Edited":
return "+/-"
default:
return "?"
}
}
// Package server provides the HTTP server exposing /healthz, /metrics, and
// the git webhook endpoint.
package server
import (
"context"
"encoding/json"
"fmt"
"html/template"
"log/slog"
"net/http"
"strings"
"sync"
"time"
webhookgithub "github.com/go-playground/webhooks/v6/github"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/gerrowadat/nomad-botherer/internal/config"
"github.com/gerrowadat/nomad-botherer/internal/nomad"
)
// DiffSource is satisfied by *nomad.Differ.
type DiffSource interface {
Diffs() ([]nomad.JobDiff, time.Time, string)
// Ready reports whether at least one diff check has completed.
Ready() bool
}
// GitStatusSource is satisfied by *gitwatch.Watcher.
type GitStatusSource interface {
Trigger()
Status() (lastCommit string, lastUpdate time.Time)
// Ready reports whether the initial git clone has completed.
Ready() bool
}
// Server holds the HTTP mux and all dependencies.
type Server struct {
cfg *config.Config
diffs DiffSource
git GitStatusSource
version string
mux *http.ServeMux
webhookMu sync.RWMutex
lastWebhookSuccess time.Time
lastWebhookFailure time.Time
// Prometheus metrics
webhookEvents *prometheus.CounterVec
lastWebhookSuccessGauge prometheus.Gauge
lastWebhookFailureGauge prometheus.Gauge
}
// New creates a Server that registers Prometheus metrics into the default registry.
func New(cfg *config.Config, diffs DiffSource, git GitStatusSource, version string) *Server {
return NewWithRegistry(cfg, diffs, git, version, prometheus.DefaultRegisterer)
}
// NewWithRegistry creates a Server with a custom Prometheus Registerer.
// Useful in tests to avoid duplicate-registration panics when creating multiple servers.
func NewWithRegistry(cfg *config.Config, diffs DiffSource, git GitStatusSource, version string, reg prometheus.Registerer) *Server {
s := &Server{
cfg: cfg,
diffs: diffs,
git: git,
version: version,
webhookEvents: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "nomad_botherer_webhook_events_total",
Help: "Total number of webhook events received, by event type.",
}, []string{"event"}),
lastWebhookSuccessGauge: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "nomad_botherer_last_webhook_success_timestamp_seconds",
Help: "Unix timestamp of the most recent successfully parsed webhook.",
}),
lastWebhookFailureGauge: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "nomad_botherer_last_webhook_failure_timestamp_seconds",
Help: "Unix timestamp of the most recent webhook that failed to parse.",
}),
}
// Static info metric carrying the build version.
promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "nomad_botherer_info",
Help: "Build information.",
}, []string{"version"}).WithLabelValues(version).Set(1)
// Use the provided registry as the Prometheus gatherer if possible,
// otherwise fall back to the global default.
var metricsHandler http.Handler
if g, ok := reg.(prometheus.Gatherer); ok {
metricsHandler = promhttp.HandlerFor(g, promhttp.HandlerOpts{})
} else {
metricsHandler = promhttp.Handler()
}
s.mux = http.NewServeMux()
s.mux.HandleFunc("/{$}", s.handleIndex)
s.mux.HandleFunc("/healthz", s.handleHealthz)
s.mux.HandleFunc("/diffs", s.handleDiffs)
s.mux.Handle("/metrics", metricsHandler)
s.mux.HandleFunc(cfg.WebhookPath, s.handleWebhook())
return s
}
// Run starts the HTTP server and blocks until ctx is cancelled.
func (s *Server) Run(ctx context.Context) error {
srv := &http.Server{
Addr: s.cfg.ListenAddr,
Handler: s.mux,
}
go func() {
<-ctx.Done()
shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = srv.Shutdown(shutCtx)
}()
slog.Info("HTTP server listening", "addr", s.cfg.ListenAddr)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return fmt.Errorf("http server: %w", err)
}
return nil
}
// Handler returns the underlying http.Handler, useful for testing without a
// real listener.
func (s *Server) Handler() http.Handler {
return s.mux
}
var indexTmpl = template.Must(template.New("index").Parse(`<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>nomad-botherer</title>
<style>
body { font-family: sans-serif; max-width: 640px; margin: 2em auto; color: #222; }
h1 { margin-bottom: 0.2em; }
.ok { color: #2a7a2a; font-weight: bold; }
.bad { color: #b94040; font-weight: bold; }
.starting { color: #7a6a00; font-weight: bold; }
code { background: #f4f4f4; padding: 0.1em 0.3em; border-radius: 3px; }
ul { line-height: 1.8; }
</style>
</head>
<body>
<h1>nomad-botherer <small>{{.Version}}</small></h1>
<p>Status:
{{- if .Starting}}
<span class="starting">starting — initial state not yet built</span>
{{- else if .DiffCount}}
<span class="bad">{{.DiffCount}} difference(s) detected</span>
{{- else}}
<span class="ok">OK — no differences</span>
{{- end}}
</p>
<p>Watching:
{{- if .SelectionGlob}} jobs matching <code>{{.SelectionGlob}}</code>{{end}}
{{- if and .SelectionGlob .ManagedMetaKey}}, or{{end}}
{{- if .ManagedMetaKey}} jobs with <code>{{.ManagedMetaKey}}=true</code> in job meta{{end}}
{{- if not (or .SelectionGlob .ManagedMetaKey)}} <em>no jobs — no selection criteria configured</em>{{end}}
</p>
{{- if .ManagedMetaKey}}
<p><small>To include a job, add <code>meta { "{{.ManagedMetaKey}}" = "true" }</code> to its HCL definition.</small></p>
{{- end}}
{{- if .LastCheck}}
<p>Last diff check: {{.LastCheck}}{{if .Commit}} (commit <code>{{.Commit}}</code>){{end}}</p>
{{- end}}
{{- if (or .LastWebhookOK .LastWebhookFail)}}
<p>Last webhook:
{{- if .LastWebhookOK}} ok <code>{{.LastWebhookOK}}</code>{{end}}
{{- if .LastWebhookFail}} failed <code>{{.LastWebhookFail}}</code>{{end}}
</p>
{{- end}}
<ul>
<li><a href="/diffs">/diffs</a> — current job diffs (plan-style)</li>
<li><a href="/healthz">/healthz</a> — JSON health check</li>
<li><a href="/metrics">/metrics</a> — Prometheus metrics</li>
</ul>
</body>
</html>
`))
func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) {
starting := !s.git.Ready() || !s.diffs.Ready()
var diffs []nomad.JobDiff
var lastCheck time.Time
var commit string
if !starting {
diffs, lastCheck, commit = s.diffs.Diffs()
}
s.webhookMu.RLock()
lastOK := s.lastWebhookSuccess
lastFail := s.lastWebhookFailure
s.webhookMu.RUnlock()
managedMetaKey := ""
if s.cfg.ManagedMetaPrefix != "" {
managedMetaKey = s.cfg.ManagedMetaPrefix + ".managed"
}
data := struct {
Version string
Starting bool
DiffCount int
LastCheck string
Commit string
LastWebhookOK string
LastWebhookFail string
SelectionGlob string
ManagedMetaKey string
}{
Version: s.version,
Starting: starting,
DiffCount: len(diffs),
Commit: commit,
SelectionGlob: s.cfg.JobSelectorGlob,
ManagedMetaKey: managedMetaKey,
}
if !lastCheck.IsZero() {
data.LastCheck = lastCheck.Format(time.RFC3339)
}
if !lastOK.IsZero() {
data.LastWebhookOK = lastOK.Format(time.RFC3339)
}
if !lastFail.IsZero() {
data.LastWebhookFail = lastFail.Format(time.RFC3339)
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if starting {
w.WriteHeader(http.StatusServiceUnavailable)
}
_ = indexTmpl.Execute(w, data)
}
func (s *Server) handleDiffs(w http.ResponseWriter, r *http.Request) {
if !s.git.Ready() || !s.diffs.Ready() {
http.Error(w, "not ready: initial state not yet built", http.StatusServiceUnavailable)
return
}
diffs, lastCheck, commit := s.diffs.Diffs()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
fmt.Fprint(w, renderDiffsText(diffs, lastCheck, commit))
}
// HealthResponse is the JSON body returned by /healthz.
type HealthResponse struct {
Status string `json:"status"`
Message string `json:"message,omitempty"`
DiffCount int `json:"diff_count"`
Diffs []DiffEntry `json:"diffs"`
LastCheck string `json:"last_check,omitempty"`
GitCommit string `json:"git_commit,omitempty"`
GitUpdated string `json:"git_updated,omitempty"`
}
// DiffEntry is one element of HealthResponse.Diffs.
type DiffEntry struct {
JobID string `json:"job_id"`
HCLFile string `json:"hcl_file,omitempty"`
DiffType string `json:"diff_type"`
Detail string `json:"detail"`
}
func (s *Server) handleHealthz(w http.ResponseWriter, r *http.Request) {
if !s.git.Ready() || !s.diffs.Ready() {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusServiceUnavailable)
_ = json.NewEncoder(w).Encode(HealthResponse{
Status: "starting",
Message: "initial state not yet built",
})
return
}
diffs, lastCheck, gitCommit := s.diffs.Diffs()
_, gitUpdated := s.git.Status()
status := "ok"
if len(diffs) > 0 {
status = "diffs_detected"
}
entries := make([]DiffEntry, 0, len(diffs))
for _, d := range diffs {
entries = append(entries, DiffEntry{
JobID: d.JobID,
HCLFile: d.HCLFile,
DiffType: string(d.DiffType),
Detail: d.Detail,
})
}
resp := HealthResponse{
Status: status,
DiffCount: len(diffs),
Diffs: entries,
}
if !lastCheck.IsZero() {
resp.LastCheck = lastCheck.Format(time.RFC3339)
}
if gitCommit != "" {
resp.GitCommit = gitCommit
}
if !gitUpdated.IsZero() {
resp.GitUpdated = gitUpdated.Format(time.RFC3339)
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(resp)
}
func (s *Server) handleWebhook() http.HandlerFunc {
hook, err := webhookgithub.New(webhookgithub.Options.Secret(s.cfg.WebhookSecret))
if err != nil {
// This only errors with an invalid secret; log and serve a stub.
slog.Error("Failed to initialise GitHub webhook handler", "err", err)
return func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "webhook handler misconfigured", http.StatusInternalServerError)
}
}
return func(w http.ResponseWriter, r *http.Request) {
eventType := r.Header.Get("X-GitHub-Event")
deliveryID := r.Header.Get("X-GitHub-Delivery")
payload, err := hook.Parse(r, webhookgithub.PushEvent, webhookgithub.PingEvent)
if err != nil {
if err == webhookgithub.ErrEventNotFound {
s.webhookEvents.WithLabelValues("unknown").Inc()
slog.Debug("Ignoring unhandled webhook event", "event", eventType, "delivery", deliveryID)
w.WriteHeader(http.StatusOK)
return
}
s.webhookEvents.WithLabelValues("error").Inc()
slog.Warn("Webhook rejected", "event", eventType, "delivery", deliveryID, "err", err)
now := time.Now()
s.webhookMu.Lock()
s.lastWebhookFailure = now
s.webhookMu.Unlock()
s.lastWebhookFailureGauge.Set(float64(now.Unix()))
http.Error(w, "bad webhook payload", http.StatusBadRequest)
return
}
switch p := payload.(type) {
case webhookgithub.PushPayload:
s.webhookEvents.WithLabelValues("push").Inc()
branch := strings.TrimPrefix(p.Ref, "refs/heads/")
slog.Info("Received push webhook",
"delivery", deliveryID,
"repo", p.Repository.FullName,
"branch", branch,
"before", p.Before,
"after", p.After,
"commits", len(p.Commits),
"pusher", p.Pusher.Name,
"compare", p.Compare,
)
if branch == s.cfg.Branch {
s.git.Trigger()
}
case webhookgithub.PingPayload:
s.webhookEvents.WithLabelValues("ping").Inc()
slog.Info("Received ping webhook",
"delivery", deliveryID,
"hook_id", p.HookID,
"repo", p.Repository.FullName,
"events", p.Hook.Events,
)
}
now := time.Now()
s.webhookMu.Lock()
s.lastWebhookSuccess = now
s.webhookMu.Unlock()
s.lastWebhookSuccessGauge.Set(float64(now.Unix()))
w.WriteHeader(http.StatusOK)
}
}