/* MIT License Copyright (c) 2024 Publieke Dienstverlening op de Kaart Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ package controller import ( "context" m "github.com/PDOK/uptime-operator/internal/model" "github.com/PDOK/uptime-operator/internal/service" traefikcontainous "github.com/traefik/traefik/v2/pkg/provider/kubernetes/crd/traefikcontainous/v1alpha1" traefikio "github.com/traefik/traefik/v2/pkg/provider/kubernetes/crd/traefikio/v1alpha1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" ) // IngressRouteReconciler reconciles Traefik IngressRoutes with an uptime monitoring (SaaS) provider type IngressRouteReconciler struct { client.Client Scheme *runtime.Scheme UptimeCheckService *service.UptimeCheckService } //+kubebuilder:rbac:groups=traefik.containo.us,resources=ingressroutes,verbs=get;list;watch //+kubebuilder:rbac:groups=traefik.containo.us,resources=ingressroutes/finalizers,verbs=update //+kubebuilder:rbac:groups=traefik.io,resources=ingressroutes,verbs=get;list;watch //+kubebuilder:rbac:groups=traefik.io,resources=ingressroutes/finalizers,verbs=update // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.0/pkg/reconcile func (r *IngressRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { ingressRoute, err := r.getIngressRoute(ctx, req) if err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } shouldContinue, err := finalizeIfNecessary(ctx, r.Client, ingressRoute, m.AnnotationFinalizer, func() error { r.UptimeCheckService.Mutate(ctx, m.Delete, ingressRoute.GetName(), ingressRoute.GetAnnotations()) return nil }) if !shouldContinue || err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } r.UptimeCheckService.Mutate(ctx, m.CreateOrUpdate, ingressRoute.GetName(), ingressRoute.GetAnnotations()) return ctrl.Result{}, nil } func (r *IngressRouteReconciler) getIngressRoute(ctx context.Context, req ctrl.Request) (client.Object, error) { // first try getting "traefik.containo.us/v1alpha1" ingress ingressContainous := &traefikcontainous.IngressRoute{} if err := r.Get(ctx, req.NamespacedName, ingressContainous); err != nil { // not found, now try getting "traefik.io/v1alpha1" ingress ingressIo := &traefikio.IngressRoute{} if err = r.Get(ctx, req.NamespacedName, ingressIo); err != nil { // still not found, handle error logger := log.FromContext(ctx) if apierrors.IsNotFound(err) { logger.Info("IngressRoute resource not found", "name", req.NamespacedName) } else { logger.Error(err, "unable to fetch IngressRoute resource", "error", err) } return nil, err } return ingressIo, nil } return ingressContainous, nil } func finalizeIfNecessary(ctx context.Context, c client.Client, obj client.Object, finalizerName string, finalizer func() error) (shouldContinue bool, err error) { // not under deletion, ensure finalizer annotation if obj.GetDeletionTimestamp().IsZero() { if !controllerutil.ContainsFinalizer(obj, finalizerName) { controllerutil.AddFinalizer(obj, finalizerName) err = c.Update(ctx, obj) return true, err } return true, nil } // under deletion but not our finalizer annotation, do nothing if !controllerutil.ContainsFinalizer(obj, finalizerName) { return false, nil } // run finalizer and remove annotation if err = finalizer(); err != nil { return false, err } controllerutil.RemoveFinalizer(obj, finalizerName) err = c.Update(ctx, obj) return false, err } // SetupWithManager sets up the controller with the Manager. func (r *IngressRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { preCondition := predicate.Or(predicate.GenerationChangedPredicate{}, predicate.AnnotationChangedPredicate{}) return ctrl.NewControllerManagedBy(mgr). Named(m.OperatorName). Watches( &traefikcontainous.IngressRoute{}, // watch "traefik.containo.us/v1alpha1" ingresses &handler.EnqueueRequestForObject{}, builder.WithPredicates(preCondition)). Watches( &traefikio.IngressRoute{}, // watch "traefik.io/v1alpha1" ingresses &handler.EnqueueRequestForObject{}, builder.WithPredicates(preCondition)). Complete(r) }
package model import ( "fmt" "slices" "strconv" "strings" ) const ( OperatorName = "uptime-operator" // TagManagedBy Indicate to humans that the given check is managed by the operator. TagManagedBy = "managed-by-" + OperatorName AnnotationBase = "uptime.pdok.nl" AnnotationID = AnnotationBase + "/id" AnnotationName = AnnotationBase + "/name" AnnotationURL = AnnotationBase + "/url" AnnotationTags = AnnotationBase + "/tags" AnnotationInterval = AnnotationBase + "/interval-in-minutes" AnnotationRequestHeaders = AnnotationBase + "/request-headers" AnnotationStringContains = AnnotationBase + "/response-check-for-string-contains" AnnotationStringNotContains = AnnotationBase + "/response-check-for-string-not-contains" AnnotationFinalizer = AnnotationBase + "/finalizer" AnnotationIgnore = AnnotationBase + "/ignore" ) type UptimeCheck struct { ID string `json:"id"` Name string `json:"name"` URL string `json:"url"` Tags []string `json:"tags"` Interval int `json:"resolution"` RequestHeaders map[string]string `json:"request_headers"` //nolint:tagliatelle // grandfathered in StringContains string `json:"string_contains"` //nolint:tagliatelle // grandfathered in StringNotContains string `json:"string_not_contains"` //nolint:tagliatelle // grandfathered in } func NewUptimeCheck(ingressName string, annotations map[string]string) (*UptimeCheck, error) { id, ok := annotations[AnnotationID] if !ok { return nil, fmt.Errorf("%s annotation not found on ingress route: %s", AnnotationID, ingressName) } name, ok := annotations[AnnotationName] if !ok { return nil, fmt.Errorf("%s annotation not found on ingress route: %s", AnnotationName, ingressName) } url, ok := annotations[AnnotationURL] if !ok { return nil, fmt.Errorf("%s annotation not found on ingress route %s", AnnotationURL, ingressName) } interval, err := getInterval(annotations) if err != nil { return nil, err } check := &UptimeCheck{ ID: id, Name: name, URL: url, Tags: stringToSlice(annotations[AnnotationTags]), Interval: interval, RequestHeaders: kvStringToMap(annotations[AnnotationRequestHeaders]), StringContains: annotations[AnnotationStringContains], StringNotContains: annotations[AnnotationStringNotContains], } if !slices.Contains(check.Tags, TagManagedBy) { check.Tags = append(check.Tags, TagManagedBy) } return check, nil } func getInterval(annotations map[string]string) (int, error) { if _, ok := annotations[AnnotationInterval]; ok { interval, err := strconv.Atoi(annotations[AnnotationInterval]) if err != nil { return 1, fmt.Errorf("%s annotation should contain integer value: %w", AnnotationInterval, err) } return interval, nil } return 1, nil } func kvStringToMap(s string) map[string]string { if s == "" { return nil } result := make(map[string]string) kvPairs := strings.Split(s, ",") for _, kvPair := range kvPairs { parts := strings.Split(kvPair, ":") if len(parts) != 2 { continue } key := strings.TrimSpace(parts[0]) value := strings.TrimSpace(parts[1]) result[key] = value } return result } func stringToSlice(s string) []string { if s == "" { return nil } var result []string splits := strings.Split(s, ",") for _, part := range splits { result = append(result, strings.TrimSpace(part)) } return result }
package providers import ( "context" "encoding/json" "fmt" "github.com/PDOK/uptime-operator/internal/model" "sigs.k8s.io/controller-runtime/pkg/log" ) type MockUptimeProvider struct { checks map[string]model.UptimeCheck } func NewMockUptimeProvider() *MockUptimeProvider { return &MockUptimeProvider{ checks: make(map[string]model.UptimeCheck), } } func (m *MockUptimeProvider) CreateOrUpdateCheck(ctx context.Context, check model.UptimeCheck) error { m.checks[check.ID] = check checkJSON, _ := json.Marshal(check) log.FromContext(ctx).Info(fmt.Sprintf("MOCK: created or updated check %s\n", checkJSON)) return nil } func (m *MockUptimeProvider) DeleteCheck(ctx context.Context, check model.UptimeCheck) error { delete(m.checks, check.ID) checkJSON, _ := json.Marshal(check) log.FromContext(ctx).Info(fmt.Sprintf("MOCK: deleted check %s\n", checkJSON)) return nil }
package providers import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" classiclog "log" "net/http" "net/url" "sort" "strconv" "strings" "time" "github.com/PDOK/uptime-operator/internal/model" "sigs.k8s.io/controller-runtime/pkg/log" ) const pingdomURL = "https://api.pingdom.com/api/3.1/checks" const checkNotFound = int64(-1) const customIDPrefix = "id:" const headerAuthorization = "Authorization" const headerAccept = "Accept" const headerContentType = "Content-Type" const headerReqLimitShort = "Req-Limit-Short" const headerReqLimitLong = "Req-Limit-Long" type PingdomSettings struct { APIToken string UserIDs []int IntegrationIDs []int } type PingdomUptimeProvider struct { settings PingdomSettings httpClient *http.Client } // NewPingdomUptimeProvider creates a PingdomUptimeProvider func NewPingdomUptimeProvider(settings PingdomSettings) *PingdomUptimeProvider { if settings.APIToken == "" { classiclog.Fatal("Pingdom API token is not provided") } return &PingdomUptimeProvider{ settings: settings, httpClient: &http.Client{Timeout: time.Duration(5) * time.Minute}, } } // CreateOrUpdateCheck create the given check with Pingdom, or update an existing check. Needs to be idempotent! func (m *PingdomUptimeProvider) CreateOrUpdateCheck(ctx context.Context, check model.UptimeCheck) (err error) { existingCheckID, err := m.findCheck(ctx, check) if err != nil { return err } if existingCheckID == checkNotFound { err = m.createCheck(ctx, check) } else { err = m.updateCheck(ctx, existingCheckID, check) } return err } // DeleteCheck deletes the given check from Pingdom func (m *PingdomUptimeProvider) DeleteCheck(ctx context.Context, check model.UptimeCheck) error { log.FromContext(ctx).Info("deleting check", "check", check) existingCheckID, err := m.findCheck(ctx, check) if err != nil { return err } if existingCheckID == checkNotFound { log.FromContext(ctx).Info(fmt.Sprintf("check with ID '%s' is already deleted", check.ID)) return nil } req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/%d", pingdomURL, existingCheckID), nil) if err != nil { return err } resp, err := m.execRequest(ctx, req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { resultBody, _ := io.ReadAll(resp.Body) return fmt.Errorf("got status %d, expected HTTP OK when deleting existing check. Error %s", resp.StatusCode, resultBody) } return nil } func (m *PingdomUptimeProvider) findCheck(ctx context.Context, check model.UptimeCheck) (int64, error) { result := checkNotFound // list all checks managed by uptime-operator. Can be at most 25.000, which is probably sufficient. req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s?include_tags=true&limit=25000&tags=%s", pingdomURL, model.TagManagedBy), nil) if err != nil { return result, err } req.Header.Add(headerAccept, "application/json") resp, err := m.execRequest(ctx, req) if err != nil { return result, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return result, fmt.Errorf("got status %d, expected HTTP OK when listing existing checks", resp.StatusCode) } checksResponse := make(map[string]any) err = json.NewDecoder(resp.Body).Decode(&checksResponse) if err != nil { return result, err } pingdomChecks := checksResponse["checks"].([]any) for _, rawCheck := range pingdomChecks { pingdomCheck := rawCheck.(map[string]any) tags := pingdomCheck["tags"] if tags == nil { continue } for _, rawTag := range tags.([]any) { tag := rawTag.(map[string]any) if tag == nil { continue } tagName := tag["name"].(string) if strings.HasSuffix(tagName, check.ID) { // bingo, we've found the Pingdom check based on our custom ID (check.ID which is stored in a Pingdom tag). // now we return the actual Pingdom ID which we need for updates/deletes/etc. pingdomCheckID := pingdomCheck["id"] if pingdomCheckID != nil && pingdomCheckID.(float64) > 0 { result = int64(pingdomCheckID.(float64)) } } } } return result, nil } func (m *PingdomUptimeProvider) createCheck(ctx context.Context, check model.UptimeCheck) error { log.FromContext(ctx).Info("creating check", "check", check) message, err := m.checkToJSON(check, true) if err != nil { return err } req, err := http.NewRequest(http.MethodPost, pingdomURL, bytes.NewBuffer(message)) if err != nil { return err } err = m.execRequestWithBody(ctx, req) if err != nil { return err } return nil } func (m *PingdomUptimeProvider) updateCheck(ctx context.Context, existingPingdomID int64, check model.UptimeCheck) error { log.FromContext(ctx).Info("updating check", "check", check, "pingdom ID", existingPingdomID) message, err := m.checkToJSON(check, false) if err != nil { return err } req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("%s/%d", pingdomURL, existingPingdomID), bytes.NewBuffer(message)) if err != nil { return err } err = m.execRequestWithBody(ctx, req) if err != nil { return err } return nil } func (m *PingdomUptimeProvider) checkToJSON(check model.UptimeCheck, includeType bool) ([]byte, error) { checkURL, err := url.ParseRequestURI(check.URL) if err != nil { return nil, err } port, err := getPort(checkURL) if err != nil { return nil, err } relativeURL := checkURL.Path if checkURL.RawQuery != "" { relativeURL += "?" + checkURL.RawQuery } // add the check id (from the k8s annotation) as a tag, so // we can latter retrieve the check during update or delete. check.Tags = append(check.Tags, customIDPrefix+check.ID) // tags can be at most 64 chars long, cut off longer ones for k := range check.Tags { tag := check.Tags[k] if len(tag) > 64 { tag = tag[:64] } check.Tags[k] = tag } message := map[string]any{ "name": check.Name, "host": checkURL.Hostname(), "url": relativeURL, "encryption": true, // assume all checks run over HTTPS "port": port, "resolution": check.Interval, "tags": check.Tags, } if includeType { // update messages shouldn't include 'type', since the type of check can't be modified in Pingdom. message["type"] = "http" } if len(m.settings.UserIDs) > 0 { message["userids"] = m.settings.UserIDs } if len(m.settings.IntegrationIDs) > 0 { message["integrationids"] = m.settings.IntegrationIDs } // request header need to be submitted in numbered JSON keys // for example "requestheader1": key:value, "requestheader2": key:value, etc var headers []string for header := range check.RequestHeaders { headers = append(headers, header) } sort.Strings(headers) for i, header := range headers { message[fmt.Sprintf("requestheader%d", i)] = fmt.Sprintf("%s:%s", header, check.RequestHeaders[header]) } // Pingdom doesn't allow both "shouldcontain" and "shouldnotcontain" if check.StringContains != "" { message["shouldcontain"] = check.StringContains } else if check.StringNotContains != "" { message["shouldnotcontain"] = check.StringNotContains } return json.Marshal(message) } func (m *PingdomUptimeProvider) execRequestWithBody(ctx context.Context, req *http.Request) error { req.Header.Add(headerContentType, "application/json") resp, err := m.execRequest(ctx, req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { resultBody, _ := io.ReadAll(resp.Body) return fmt.Errorf("got http status %d, while expected 200. Error: %s", resp.StatusCode, resultBody) } return nil } func (m *PingdomUptimeProvider) execRequest(ctx context.Context, req *http.Request) (*http.Response, error) { req.Header.Add(headerAuthorization, "Bearer "+m.settings.APIToken) resp, err := m.httpClient.Do(req) if err != nil { return resp, err } rateLimitErr := errors.Join( handleRateLimits(ctx, resp.Header.Get(headerReqLimitShort)), handleRateLimits(ctx, resp.Header.Get(headerReqLimitLong)), ) return resp, rateLimitErr } func handleRateLimits(ctx context.Context, rateLimitHeader string) error { remaining, resetTime, err := parseRateLimitHeader(rateLimitHeader) if err != nil { return err } if remaining < 25 { log.FromContext(ctx).Info( fmt.Sprintf("Waiting for %d seconds to avoid hitting Pingdom rate limit", resetTime+1), rateLimitHeader, remaining) time.Sleep(time.Duration(remaining+1) * time.Second) } return nil } func parseRateLimitHeader(header string) (remaining int, resetTime int, err error) { _, err = fmt.Sscanf(header, "Remaining: %d Time until reset: %d", &remaining, &resetTime) return } func getPort(checkURL *url.URL) (int, error) { port := checkURL.Port() if port == "" { port = "443" } return strconv.Atoi(port) }
package service import ( "context" "fmt" classiclog "log" m "github.com/PDOK/uptime-operator/internal/model" "github.com/PDOK/uptime-operator/internal/service/providers" "sigs.k8s.io/controller-runtime/pkg/log" ) type UptimeCheckOption func(*UptimeCheckService) *UptimeCheckService type UptimeCheckService struct { provider UptimeProvider slack *Slack enableDeletes bool } func New(options ...UptimeCheckOption) *UptimeCheckService { service := &UptimeCheckService{} for _, option := range options { service = option(service) } return service } func WithProvider(provider UptimeProvider) UptimeCheckOption { return func(service *UptimeCheckService) *UptimeCheckService { service.provider = provider return service } } func WithProviderAndSettings(provider string, settings any) UptimeCheckOption { return func(service *UptimeCheckService) *UptimeCheckService { switch provider { case "mock": service.provider = providers.NewMockUptimeProvider() case "pingdom": service.provider = providers.NewPingdomUptimeProvider(settings.(providers.PingdomSettings)) default: classiclog.Fatalf("unsupported provider specified: %s", provider) } return service } } func WithSlack(slackWebhookURL string, slackChannel string) UptimeCheckOption { return func(service *UptimeCheckService) *UptimeCheckService { if slackWebhookURL != "" && slackChannel != "" { service.slack = NewSlack(slackWebhookURL, slackChannel) } return service } } func WithDeletes(enableDeletes bool) UptimeCheckOption { return func(service *UptimeCheckService) *UptimeCheckService { service.enableDeletes = enableDeletes return service } } func (r *UptimeCheckService) Mutate(ctx context.Context, mutation m.Mutation, ingressName string, annotations map[string]string) { _, ignore := annotations[m.AnnotationIgnore] if ignore { r.logRouteIgnore(ctx, mutation, ingressName) return } check, err := m.NewUptimeCheck(ingressName, annotations) if err != nil { r.logAnnotationErr(ctx, err) return } if mutation == m.CreateOrUpdate { err = r.provider.CreateOrUpdateCheck(ctx, *check) r.logMutation(ctx, err, mutation, check) } else if mutation == m.Delete { if !r.enableDeletes { r.logDeleteDisabled(ctx, check) return } err = r.provider.DeleteCheck(ctx, *check) r.logMutation(ctx, err, mutation, check) } } func (r *UptimeCheckService) logDeleteDisabled(ctx context.Context, check *m.UptimeCheck) { msg := fmt.Sprintf("delete of uptime check '%s' (id: %s) not executed since 'enable-deletes=false'.", check.Name, check.ID) log.FromContext(ctx).Info(msg, "check", check) if r.slack == nil { return } r.slack.Send(ctx, ":information_source: "+msg) } func (r *UptimeCheckService) logRouteIgnore(ctx context.Context, mutation m.Mutation, name string) { msg := fmt.Sprintf("ignoring %s for ingress route %s, since this route is marked to be excluded from uptime monitoring", mutation, name) log.FromContext(ctx).Info(msg) if r.slack == nil { return } r.slack.Send(ctx, ":information_source: "+msg) } func (r *UptimeCheckService) logAnnotationErr(ctx context.Context, err error) { msg := fmt.Sprintf("missing or invalid uptime check annotation(s) encountered: %v", err) log.FromContext(ctx).Error(err, msg) if r.slack == nil { return } r.slack.Send(ctx, ":large_red_square: "+msg) } func (r *UptimeCheckService) logMutation(ctx context.Context, err error, mutation m.Mutation, check *m.UptimeCheck) { if err != nil { msg := fmt.Sprintf("%s of uptime check '%s' (id: %s) failed.", string(mutation), check.Name, check.ID) log.FromContext(ctx).Error(err, msg, "check", check) if r.slack == nil { return } r.slack.Send(ctx, ":large_red_square: "+msg) return } msg := fmt.Sprintf("%s of uptime check '%s' (id: %s) succeeded.", string(mutation), check.Name, check.ID) log.FromContext(ctx).Info(msg) if r.slack == nil { return } if mutation == m.Delete { r.slack.Send(ctx, ":warning: "+msg+".\n _Beware: a flood of these delete messages may indicate Traefik itself is down!_") } else { r.slack.Send(ctx, ":large_green_square: "+msg) } }
package service import ( "context" "github.com/PDOK/uptime-operator/internal/model" "github.com/slack-go/slack" "golang.org/x/time/rate" "sigs.k8s.io/controller-runtime/pkg/log" ) const ( nrOfMessagesPerSec = 1 nrOfMessagesPerBurst = 10 ) type Slack struct { webhookURL string channelID string rateLimit *rate.Limiter } func NewSlack(webhookURL, channelID string) *Slack { return &Slack{ webhookURL: webhookURL, channelID: channelID, // see https://api.slack.com/apis/rate-limits rateLimit: rate.NewLimiter(nrOfMessagesPerSec, nrOfMessagesPerBurst), } } func (s *Slack) Send(ctx context.Context, message string) { err := s.rateLimit.Wait(ctx) if err != nil { log.FromContext(ctx).Error(err, "failed waiting for slack rate limit") } err = slack.PostWebhook(s.webhookURL, &slack.WebhookMessage{ Channel: s.channelID, Text: message, Username: model.OperatorName, IconEmoji: ":up:", }) if err != nil { log.FromContext(ctx).Error(err, "failed to post Slack message", "message", message, "channel", s.channelID) } }
package util import ( "strconv" ) func StringsToInts(ss []string) ([]int, error) { result := make([]int, 0, len(ss)) for _, s := range ss { n, err := strconv.Atoi(s) if err != nil { return nil, err } result = append(result, n) } return result, nil }
package util import ( "strings" ) type SliceFlag []string func (sf *SliceFlag) String() string { return strings.Join(*sf, ",") } func (sf *SliceFlag) Set(value string) error { *sf = append(*sf, value) return nil }