/* MIT License Copyright (c) 2024 Publieke Dienstverlening op de Kaart Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ package controller import ( "context" m "github.com/PDOK/uptime-operator/internal/model" "github.com/PDOK/uptime-operator/internal/service" traefikio "github.com/traefik/traefik/v3/pkg/provider/kubernetes/crd/traefikio/v1alpha1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" ) // IngressRouteReconciler reconciles Traefik IngressRoutes with an uptime monitoring (SaaS) provider type IngressRouteReconciler struct { client.Client Scheme *runtime.Scheme UptimeCheckService *service.UptimeCheckService } //+kubebuilder:rbac:groups=traefik.io,resources=ingressroutes,verbs=get;list;watch //+kubebuilder:rbac:groups=traefik.io,resources=ingressroutes/finalizers,verbs=update // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.0/pkg/reconcile func (r *IngressRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { ingressRoute, err := r.getIngressRoute(ctx, req) if err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } shouldContinue, err := finalizeIfNecessary(ctx, r.Client, ingressRoute, m.AnnotationFinalizer, func() error { r.UptimeCheckService.Mutate(ctx, m.Delete, ingressRoute.GetName(), ingressRoute.GetAnnotations()) return nil }) if !shouldContinue || err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } r.UptimeCheckService.Mutate(ctx, m.CreateOrUpdate, ingressRoute.GetName(), ingressRoute.GetAnnotations()) return ctrl.Result{}, nil } func (r *IngressRouteReconciler) getIngressRoute(ctx context.Context, req ctrl.Request) (client.Object, error) { // try getting "traefik.io/v1alpha1" ingress ingressIo := &traefikio.IngressRoute{} if err := r.Get(ctx, req.NamespacedName, ingressIo); err != nil { // still not found, handle error logger := log.FromContext(ctx) if apierrors.IsNotFound(err) { logger.Info("IngressRoute resource not found", "name", req.NamespacedName) } else { logger.Error(err, "unable to fetch IngressRoute resource", "error", err) } return nil, err } return ingressIo, nil } func finalizeIfNecessary(ctx context.Context, c client.Client, obj client.Object, finalizerName string, finalizer func() error) (shouldContinue bool, err error) { // not under deletion, ensure finalizer annotation if obj.GetDeletionTimestamp().IsZero() { if !controllerutil.ContainsFinalizer(obj, finalizerName) { controllerutil.AddFinalizer(obj, finalizerName) err = c.Update(ctx, obj) return true, err } return true, nil } // under deletion but not our finalizer annotation, do nothing if !controllerutil.ContainsFinalizer(obj, finalizerName) { return false, nil } // run finalizer and remove annotation if err = finalizer(); err != nil { return false, err } controllerutil.RemoveFinalizer(obj, finalizerName) err = c.Update(ctx, obj) return false, err } // SetupWithManager sets up the controller with the Manager. func (r *IngressRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { preCondition := predicate.Or(predicate.GenerationChangedPredicate{}, predicate.AnnotationChangedPredicate{}) return ctrl.NewControllerManagedBy(mgr). Named(m.OperatorName). Watches( &traefikio.IngressRoute{}, // watch "traefik.io/v1alpha1" ingresses &handler.EnqueueRequestForObject{}, builder.WithPredicates(preCondition)). Complete(r) }
package model import ( "fmt" "slices" "strconv" "strings" ) const ( OperatorName = "uptime-operator" // TagManagedBy Indicate to humans that the given check is managed by the operator. TagManagedBy = "managed-by-" + OperatorName AnnotationBase = "uptime.pdok.nl" AnnotationID = AnnotationBase + "/id" AnnotationName = AnnotationBase + "/name" AnnotationURL = AnnotationBase + "/url" AnnotationTags = AnnotationBase + "/tags" AnnotationInterval = AnnotationBase + "/interval-in-minutes" AnnotationRequestHeaders = AnnotationBase + "/request-headers" AnnotationStringContains = AnnotationBase + "/response-check-for-string-contains" AnnotationStringNotContains = AnnotationBase + "/response-check-for-string-not-contains" AnnotationFinalizer = AnnotationBase + "/finalizer" AnnotationIgnore = AnnotationBase + "/ignore" ) type UptimeCheck struct { ID string `json:"id"` Name string `json:"name"` URL string `json:"url"` Tags []string `json:"tags"` Interval int `json:"resolution"` RequestHeaders map[string]string `json:"request_headers"` StringContains string `json:"string_contains"` StringNotContains string `json:"string_not_contains"` } func NewUptimeCheck(ingressName string, annotations map[string]string) (*UptimeCheck, error) { id, ok := annotations[AnnotationID] if !ok { return nil, fmt.Errorf("%s annotation not found on ingress route: %s", AnnotationID, ingressName) } name, ok := annotations[AnnotationName] if !ok { return nil, fmt.Errorf("%s annotation not found on ingress route: %s", AnnotationName, ingressName) } url, ok := annotations[AnnotationURL] if !ok { return nil, fmt.Errorf("%s annotation not found on ingress route %s", AnnotationURL, ingressName) } interval, err := getInterval(annotations) if err != nil { return nil, err } check := &UptimeCheck{ ID: id, Name: name, URL: url, Tags: stringToSlice(annotations[AnnotationTags]), Interval: interval, RequestHeaders: kvStringToMap(annotations[AnnotationRequestHeaders]), StringContains: annotations[AnnotationStringContains], StringNotContains: annotations[AnnotationStringNotContains], } if !slices.Contains(check.Tags, TagManagedBy) { check.Tags = append(check.Tags, TagManagedBy) } return check, nil } func getInterval(annotations map[string]string) (int, error) { if _, ok := annotations[AnnotationInterval]; ok { interval, err := strconv.Atoi(annotations[AnnotationInterval]) if err != nil { return 1, fmt.Errorf("%s annotation should contain integer value: %w", AnnotationInterval, err) } return interval, nil } return 1, nil } func kvStringToMap(s string) map[string]string { if s == "" { return nil } result := make(map[string]string) kvPairs := strings.Split(s, ",") for _, kvPair := range kvPairs { parts := strings.Split(kvPair, ":") if len(parts) != 2 { continue } key := strings.TrimSpace(parts[0]) value := strings.TrimSpace(parts[1]) result[key] = value } return result } func stringToSlice(s string) []string { if s == "" { return nil } var result []string splits := strings.Split(s, ",") for _, part := range splits { result = append(result, strings.TrimSpace(part)) } return result }
package betterstack import ( "context" "fmt" classiclog "log" "net/http" "strconv" "time" "github.com/PDOK/uptime-operator/internal/model" p "github.com/PDOK/uptime-operator/internal/service/providers" "sigs.k8s.io/controller-runtime/pkg/log" ) const betterStackBaseURL = "https://uptime.betterstack.com" type Settings struct { APIToken string PageSize int } type BetterStack struct { client Client } // New creates a BetterStack func New(settings Settings) *BetterStack { if settings.APIToken == "" { classiclog.Fatal("Better Stack API token is not provided") } if settings.PageSize < 1 { settings.PageSize = 50 // default https://betterstack.com/docs/uptime/api/pagination/ } return &BetterStack{ Client{ httpClient: &http.Client{Timeout: time.Duration(5) * time.Minute}, settings: settings, }, } } // CreateOrUpdateCheck create the given check with Better Stack, or update an existing check. Needs to be idempotent! func (b *BetterStack) CreateOrUpdateCheck(ctx context.Context, check model.UptimeCheck) (err error) { existingCheckID, err := b.findCheck(check) if err != nil { return fmt.Errorf("failed to find check %s, error: %w", check.ID, err) } if existingCheckID == p.CheckNotFound { //nolint:nestif // clean enough log.FromContext(ctx).Info("creating check", "check", check) monitorID, err := b.client.createMonitor(check) if err != nil { return fmt.Errorf("failed to create monitor for check %s, error: %w", check.ID, err) } if err = b.client.createMetadata(check.ID, monitorID, check.Tags); err != nil { return fmt.Errorf("failed to create metadata for check %s, error: %w", check.ID, err) } } else { log.FromContext(ctx).Info("updating check", "check", check, "betterstack ID", existingCheckID) existingMonitor, err := b.client.getMonitor(existingCheckID) if err != nil { return fmt.Errorf("failed to get monitor for check %s, error: %w", check.ID, err) } if err = b.client.updateMonitor(check, existingMonitor); err != nil { return fmt.Errorf("failed to update monitor for check %s (betterstack ID: %d), "+ "error: %w", check.ID, existingCheckID, err) } if err = b.client.updateMetadata(check.ID, existingCheckID, check.Tags); err != nil { return fmt.Errorf("failed to update metdata for check %s (betterstack ID: %d), "+ "error: %w", check.ID, existingCheckID, err) } } return err } // DeleteCheck deletes the given check from Better Stack func (b *BetterStack) DeleteCheck(ctx context.Context, check model.UptimeCheck) error { log.FromContext(ctx).Info("deleting check", "check", check) existingCheckID, err := b.findCheck(check) if err != nil { return fmt.Errorf("failed to find check %s, error: %w", check.ID, err) } if existingCheckID == p.CheckNotFound { log.FromContext(ctx).Info(fmt.Sprintf("check with ID '%s' is already deleted", check.ID)) return nil } if err = b.client.deleteMetadata(check.ID, existingCheckID); err != nil { return fmt.Errorf("failed to delete metadata for check %s (betterstack ID: %d), "+ "error: %w", check.ID, existingCheckID, err) } if err = b.client.deleteMonitor(existingCheckID); err != nil { return fmt.Errorf("failed to delete monitor for check %s (betterstack ID: %d), "+ "error: %w", check.ID, existingCheckID, err) } return nil } func (b *BetterStack) findCheck(check model.UptimeCheck) (int64, error) { result := p.CheckNotFound metadata, err := b.client.listMetadata() if err != nil { return result, err } for { for _, md := range metadata.Data { if md.Attributes != nil && md.Attributes.Key == check.ID { result, err = strconv.ParseInt(md.Attributes.OwnerID, 10, 64) if err != nil { return result, fmt.Errorf("failed to parse monitor ID %s to integer", md.Attributes.OwnerID) } return result, nil } } if !metadata.HasNext() { break // exit infinite loop } metadata, err = metadata.Next(b.client) if err != nil { return result, err } } return result, nil }
package betterstack import ( "bytes" "encoding/json" "fmt" "io" "net/http" "strconv" "github.com/PDOK/uptime-operator/internal/model" p "github.com/PDOK/uptime-operator/internal/service/providers" ) const typeMonitor = "Monitor" type Client struct { httpClient *http.Client settings Settings } func (h Client) execRequest(req *http.Request, expectedStatus int) (*http.Response, error) { req.Header.Set(p.HeaderAuthorization, "Bearer "+h.settings.APIToken) req.Header.Set(p.HeaderAccept, p.MediaTypeJSON) req.Header.Set(p.HeaderContentType, p.MediaTypeJSON) req.Header.Add(p.HeaderUserAgent, model.OperatorName) resp, err := h.httpClient.Do(req) if err != nil { return nil, err } if resp.StatusCode != expectedStatus { defer resp.Body.Close() result, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("got status %d, expected %d. Body: %b", resp.StatusCode, expectedStatus, result) } return resp, nil // caller should close resp.Body! } func (h Client) execRequestIgnoreResponseBody(req *http.Request, expectedStatus int) error { resp, err := h.execRequest(req, expectedStatus) if err != nil { return err } defer resp.Body.Close() return nil } type MetadataListResponse struct { Data []struct { ID string `json:"id"` Type string `json:"type"` Attributes *struct { Key string `json:"key"` Values []struct { Type string `json:"type"` Value string `json:"value"` } `json:"values"` TeamName string `json:"team_name"` OwnerID string `json:"owner_id"` OwnerType string `json:"owner_type"` } `json:"attributes"` } `json:"data"` Pagination *struct { First string `json:"first"` Last string `json:"last"` Prev string `json:"prev"` Next string `json:"next"` } `json:"pagination"` } // listMetadata https://betterstack.com/docs/uptime/api/list-all-existing-metadata/ func (h Client) listMetadata() (*MetadataListResponse, error) { url := fmt.Sprintf("%s/api/v3/metadata?owner_type=Monitor&per_page=%d", betterStackBaseURL, h.settings.PageSize) req, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { return nil, err } // Make initial HTTP request resp, err := h.execRequest(req, http.StatusOK) if err != nil { return nil, err } defer resp.Body.Close() // Parse response var metadata MetadataListResponse err = json.NewDecoder(resp.Body).Decode(&metadata) if err != nil { return nil, err } return &metadata, nil } func (m MetadataListResponse) HasNext() bool { return m.Pagination != nil && m.Pagination.Next != "" } // Next paginate though metadata, see https://betterstack.com/docs/uptime/api/pagination/ func (m MetadataListResponse) Next(client Client) (*MetadataListResponse, error) { if !m.HasNext() { return nil, nil } // Make HTTP request to the next URL req, err := http.NewRequest(http.MethodGet, m.Pagination.Next, nil) if err != nil { return nil, err } resp, err := client.execRequest(req, http.StatusOK) if err != nil { return nil, err } defer resp.Body.Close() // Parse response var nextPage MetadataListResponse err = json.NewDecoder(resp.Body).Decode(&nextPage) if err != nil { return nil, err } return &nextPage, nil } type MetadataValue struct { Value string `json:"value"` } type MetadataUpdateRequest struct { Key string `json:"key"` Values []MetadataValue `json:"values"` OwnerID string `json:"owner_id"` OwnerType string `json:"owner_type"` } // createMetadata https://betterstack.com/docs/uptime/api/update-an-existing-metadata-record/ func (h Client) createMetadata(key string, monitorID int64, tags []string) error { metadataUpdateRequest := MetadataUpdateRequest{ Key: key, OwnerID: strconv.FormatInt(monitorID, 10), OwnerType: typeMonitor, } for _, tag := range tags { metadataUpdateRequest.Values = append(metadataUpdateRequest.Values, MetadataValue{tag}) } body := &bytes.Buffer{} err := json.NewEncoder(body).Encode(&metadataUpdateRequest) if err != nil { return err } req, err := http.NewRequest(http.MethodPost, betterStackBaseURL+"/api/v3/metadata", body) if err != nil { return err } if err := h.execRequestIgnoreResponseBody(req, http.StatusCreated); err != nil { return err } return nil } // updateMetadata https://betterstack.com/docs/uptime/api/update-an-existing-metadata-record/ func (h Client) updateMetadata(key string, monitorID int64, tags []string) error { metadataUpdateRequest := MetadataUpdateRequest{ Key: key, OwnerID: strconv.FormatInt(monitorID, 10), OwnerType: typeMonitor, } for _, tag := range tags { metadataUpdateRequest.Values = append(metadataUpdateRequest.Values, MetadataValue{tag}) } body := &bytes.Buffer{} err := json.NewEncoder(body).Encode(&metadataUpdateRequest) if err != nil { return err } req, err := http.NewRequest(http.MethodPost, betterStackBaseURL+"/api/v3/metadata", body) if err != nil { return err } if err = h.execRequestIgnoreResponseBody(req, http.StatusOK); err != nil { return err } return nil } // deleteMetadata https://betterstack.com/docs/uptime/api/update-an-existing-metadata-record/ func (h Client) deleteMetadata(key string, monitorID int64) error { metadataDeleteRequest := MetadataUpdateRequest{ Key: key, OwnerID: strconv.FormatInt(monitorID, 10), OwnerType: typeMonitor, Values: []MetadataValue{}, // empty values will result in delete of metadata record } body := &bytes.Buffer{} err := json.NewEncoder(body).Encode(&metadataDeleteRequest) if err != nil { return err } req, err := http.NewRequest(http.MethodPost, betterStackBaseURL+"/api/v3/metadata", body) if err != nil { return err } if err = h.execRequestIgnoreResponseBody(req, http.StatusNoContent); err != nil { return err } return nil } type MonitorRequestHeader struct { ID string `json:"id,omitempty"` Name string `json:"name"` Value string `json:"value"` Destroy bool `json:"_destroy"` //nolint:tagliatelle } type MonitorCreateOrUpdateRequest struct { MonitorType string `json:"monitor_type"` URL string `json:"url"` PronounceableName string `json:"pronounceable_name"` Port int `json:"port"` Email bool `json:"email"` Sms bool `json:"sms"` Call bool `json:"call"` RequiredKeyword string `json:"required_keyword"` CheckFrequency int `json:"check_frequency"` RequestHeaders []MonitorRequestHeader `json:"request_headers"` } type MonitorCreateResponse struct { Data struct { ID string `json:"id"` Type string `json:"type"` } `json:"data"` } // createMonitor https://betterstack.com/docs/uptime/api/create-a-new-monitor/ func (h Client) createMonitor(check model.UptimeCheck) (int64, error) { createRequest := checkToMonitor(check) body := &bytes.Buffer{} err := json.NewEncoder(body).Encode(createRequest) if err != nil { return -1, err } req, err := http.NewRequest(http.MethodPost, betterStackBaseURL+"/api/v2/monitors", body) if err != nil { return -1, err } resp, err := h.execRequest(req, http.StatusCreated) if err != nil { return -1, err } defer resp.Body.Close() var createResponse *MonitorCreateResponse err = json.NewDecoder(resp.Body).Decode(&createResponse) if err != nil { return -1, err } monitorID, err := strconv.ParseInt(createResponse.Data.ID, 10, 64) if err != nil { return -1, err } return monitorID, nil } // updateMonitor https://betterstack.com/docs/uptime/api/update-an-existing-monitor/ func (h Client) updateMonitor(check model.UptimeCheck, existingMonitor *MonitorGetResponse) error { updateRequest := checkToMonitor(check) if existingMonitor == nil || existingMonitor.Data == nil || existingMonitor.Data.Attributes == nil { return fmt.Errorf("invalid monitor response, expected values are nil: %v", existingMonitor) } // Remove all existing headers (since the API works with HTTP PATCH, to avoid duplicate headers). for _, existingHeader := range existingMonitor.Data.Attributes.RequestHeaders { updateRequest.RequestHeaders = append(updateRequest.RequestHeaders, MonitorRequestHeader{ ID: existingHeader.ID, Destroy: true, }) } body := &bytes.Buffer{} err := json.NewEncoder(body).Encode(&updateRequest) if err != nil { return err } req, err := http.NewRequest(http.MethodPatch, fmt.Sprintf("%s/api/v2/monitors/%s", betterStackBaseURL, existingMonitor.Data.ID), body) if err != nil { return err } if err = h.execRequestIgnoreResponseBody(req, http.StatusOK); err != nil { return err } return nil } // deleteMonitor https://betterstack.com/docs/uptime/api/delete-an-existing-monitor/ func (h Client) deleteMonitor(monitorID int64) error { req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/api/v2/monitors/%d", betterStackBaseURL, monitorID), nil) if err != nil { return err } if err = h.execRequestIgnoreResponseBody(req, http.StatusNoContent); err != nil { return err } return nil } type MonitorGetResponse struct { Data *struct { ID string `json:"id"` Type string `json:"type"` Attributes *struct { URL string `json:"url"` PronounceableName string `json:"pronounceable_name"` MonitorType string `json:"monitor_type"` RequiredKeyword string `json:"required_keyword"` CheckFrequency int `json:"check_frequency"` RequestHeaders []MonitorRequestHeader `json:"request_headers"` } `json:"attributes"` } `json:"data"` } func (h Client) getMonitor(monitorID int64) (*MonitorGetResponse, error) { req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/api/v2/monitors/%d", betterStackBaseURL, monitorID), nil) if err != nil { return nil, err } resp, err := h.execRequest(req, http.StatusOK) if err != nil { return nil, err } defer resp.Body.Close() var existingMonitor *MonitorGetResponse err = json.NewDecoder(resp.Body).Decode(&existingMonitor) if err != nil { return nil, err } return existingMonitor, nil } func checkToMonitor(check model.UptimeCheck) MonitorCreateOrUpdateRequest { var request MonitorCreateOrUpdateRequest switch { case check.StringContains != "": request = MonitorCreateOrUpdateRequest{ MonitorType: "keyword", RequiredKeyword: check.StringContains, } case check.StringNotContains != "": request = MonitorCreateOrUpdateRequest{ MonitorType: "keyword_absence", RequiredKeyword: check.StringNotContains, } default: request = MonitorCreateOrUpdateRequest{ MonitorType: "status", } } request.URL = check.URL request.PronounceableName = check.Name request.Port = 443 request.CheckFrequency = toSupportedInterval(check.Interval) request.Email = false request.Sms = false request.Call = false for name, value := range check.RequestHeaders { request.RequestHeaders = append(request.RequestHeaders, MonitorRequestHeader{ Name: name, Value: value, }) } return request }
package betterstack import "math" func toSupportedInterval(intervalInMin int) int { // Better Stack only accepts a specific sets of intervals supportedIntervals := []int{30, 45, 60, 120, 180, 300, 600, 900, 1800} intervalInSec := intervalInMin * 60 if intervalInSec <= 0 { return supportedIntervals[0] // use the smallest supported interval } if intervalInSec > supportedIntervals[len(supportedIntervals)-1] { return supportedIntervals[len(supportedIntervals)-1] // use the largest supported interval } nearestInterval := supportedIntervals[0] prevDiff := math.MaxInt // use nearest supported interval for _, si := range supportedIntervals { diff := int(math.Abs(float64(intervalInSec - si))) if diff < prevDiff { prevDiff = diff nearestInterval = si } } return nearestInterval }
package mock import ( "context" "encoding/json" "fmt" "github.com/PDOK/uptime-operator/internal/model" "sigs.k8s.io/controller-runtime/pkg/log" ) type Mock struct { checks map[string]model.UptimeCheck } func New() *Mock { return &Mock{ checks: make(map[string]model.UptimeCheck), } } func (m *Mock) CreateOrUpdateCheck(ctx context.Context, check model.UptimeCheck) error { m.checks[check.ID] = check checkJSON, _ := json.Marshal(check) log.FromContext(ctx).Info(fmt.Sprintf("MOCK: created or updated check %s\n", checkJSON)) return nil } func (m *Mock) DeleteCheck(ctx context.Context, check model.UptimeCheck) error { delete(m.checks, check.ID) checkJSON, _ := json.Marshal(check) log.FromContext(ctx).Info(fmt.Sprintf("MOCK: deleted check %s\n", checkJSON)) return nil }
package pingdom import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" classiclog "log" "net/http" "net/url" "sort" "strconv" "strings" "time" "github.com/PDOK/uptime-operator/internal/model" "github.com/PDOK/uptime-operator/internal/service/providers" "sigs.k8s.io/controller-runtime/pkg/log" ) const pingdomURL = "https://api.pingdom.com/api/3.1/checks" const customIDPrefix = "id:" const headerReqLimitShort = "Req-Limit-Short" const headerReqLimitLong = "Req-Limit-Long" type Settings struct { APIToken string UserIDs []int IntegrationIDs []int } type Pingdom struct { settings Settings httpClient *http.Client } // New creates a Pingdom func New(settings Settings) *Pingdom { if settings.APIToken == "" { classiclog.Fatal("Pingdom API token is not provided") } return &Pingdom{ settings: settings, httpClient: &http.Client{Timeout: time.Duration(5) * time.Minute}, } } // CreateOrUpdateCheck create the given check with Pingdom, or update an existing check. Needs to be idempotent! func (p *Pingdom) CreateOrUpdateCheck(ctx context.Context, check model.UptimeCheck) (err error) { existingCheckID, err := p.findCheck(ctx, check) if err != nil { return err } if existingCheckID == providers.CheckNotFound { err = p.createCheck(ctx, check) } else { err = p.updateCheck(ctx, existingCheckID, check) } return err } // DeleteCheck deletes the given check from Pingdom func (p *Pingdom) DeleteCheck(ctx context.Context, check model.UptimeCheck) error { log.FromContext(ctx).Info("deleting check", "check", check) existingCheckID, err := p.findCheck(ctx, check) if err != nil { return err } if existingCheckID == providers.CheckNotFound { log.FromContext(ctx).Info(fmt.Sprintf("check with ID '%s' is already deleted", check.ID)) return nil } req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/%d", pingdomURL, existingCheckID), nil) if err != nil { return err } resp, err := p.execRequest(ctx, req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { resultBody, _ := io.ReadAll(resp.Body) return fmt.Errorf("got status %d, expected HTTP OK when deleting existing check. Error %s", resp.StatusCode, resultBody) } return nil } func (p *Pingdom) findCheck(ctx context.Context, check model.UptimeCheck) (int64, error) { result := providers.CheckNotFound // list all checks managed by uptime-operator. Can be at most 25.000, which is probably sufficient. req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s?include_tags=true&limit=25000&tags=%s", pingdomURL, model.TagManagedBy), nil) if err != nil { return result, err } req.Header.Add(providers.HeaderAccept, providers.MediaTypeJSON) resp, err := p.execRequest(ctx, req) if err != nil { return result, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return result, fmt.Errorf("got status %d, expected HTTP OK when listing existing checks", resp.StatusCode) } checksResponse := make(map[string]any) err = json.NewDecoder(resp.Body).Decode(&checksResponse) if err != nil { return result, err } pingdomChecks := checksResponse["checks"].([]any) for _, rawCheck := range pingdomChecks { pingdomCheck := rawCheck.(map[string]any) tags := pingdomCheck["tags"] if tags == nil { continue } for _, rawTag := range tags.([]any) { tag := rawTag.(map[string]any) if tag == nil { continue } tagName := tag["name"].(string) if strings.HasSuffix(tagName, check.ID) { // bingo, we've found the Pingdom check based on our custom ID (check.ID which is stored in a Pingdom tag). // now we return the actual Pingdom ID which we need for updates/deletes/etc. pingdomCheckID := pingdomCheck["id"] if pingdomCheckID != nil && pingdomCheckID.(float64) > 0 { result = int64(pingdomCheckID.(float64)) } } } } return result, nil } func (p *Pingdom) createCheck(ctx context.Context, check model.UptimeCheck) error { log.FromContext(ctx).Info("creating check", "check", check) message, err := p.checkToJSON(check, true) if err != nil { return err } req, err := http.NewRequest(http.MethodPost, pingdomURL, bytes.NewBuffer(message)) if err != nil { return err } err = p.execRequestWithBody(ctx, req) if err != nil { return err } return nil } func (p *Pingdom) updateCheck(ctx context.Context, existingPingdomID int64, check model.UptimeCheck) error { log.FromContext(ctx).Info("updating check", "check", check, "pingdom ID", existingPingdomID) message, err := p.checkToJSON(check, false) if err != nil { return err } req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("%s/%d", pingdomURL, existingPingdomID), bytes.NewBuffer(message)) if err != nil { return err } err = p.execRequestWithBody(ctx, req) if err != nil { return err } return nil } func (p *Pingdom) checkToJSON(check model.UptimeCheck, includeType bool) ([]byte, error) { checkURL, err := url.ParseRequestURI(check.URL) if err != nil { return nil, err } port, err := getPort(checkURL) if err != nil { return nil, err } relativeURL := checkURL.Path if checkURL.RawQuery != "" { relativeURL += "?" + checkURL.RawQuery } // add the check id (from the k8s annotation) as a tag, so // we can latter retrieve the check during update or delete. check.Tags = append(check.Tags, customIDPrefix+check.ID) // tags can be at most 64 chars long, cut off longer ones for k := range check.Tags { tag := check.Tags[k] if len(tag) > 64 { tag = tag[:64] } check.Tags[k] = tag } message := map[string]any{ "name": check.Name, "host": checkURL.Hostname(), "url": relativeURL, "encryption": true, // assume all checks run over HTTPS "port": port, "resolution": check.Interval, "tags": check.Tags, } if includeType { // update messages shouldn't include 'type', since the type of check can't be modified in Pingdom. message["type"] = "http" } if len(p.settings.UserIDs) > 0 { message["userids"] = p.settings.UserIDs } if len(p.settings.IntegrationIDs) > 0 { message["integrationids"] = p.settings.IntegrationIDs } // request header need to be submitted in numbered JSON keys // for example "requestheader1": key:value, "requestheader2": key:value, etc var headers []string for header := range check.RequestHeaders { headers = append(headers, header) } sort.Strings(headers) for i, header := range headers { message[fmt.Sprintf("requestheader%d", i)] = fmt.Sprintf("%s:%s", header, check.RequestHeaders[header]) } // Pingdom doesn't allow both "shouldcontain" and "shouldnotcontain" if check.StringContains != "" { message["shouldcontain"] = check.StringContains } else if check.StringNotContains != "" { message["shouldnotcontain"] = check.StringNotContains } return json.Marshal(message) } func (p *Pingdom) execRequestWithBody(ctx context.Context, req *http.Request) error { req.Header.Add(providers.HeaderContentType, providers.MediaTypeJSON) resp, err := p.execRequest(ctx, req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { resultBody, _ := io.ReadAll(resp.Body) return fmt.Errorf("got http status %d, while expected 200. Error: %s", resp.StatusCode, resultBody) } return nil } func (p *Pingdom) execRequest(ctx context.Context, req *http.Request) (*http.Response, error) { req.Header.Add(providers.HeaderAuthorization, "Bearer "+p.settings.APIToken) req.Header.Add(providers.HeaderUserAgent, model.OperatorName) resp, err := p.httpClient.Do(req) if err != nil { return resp, err } rateLimitErr := errors.Join( handleRateLimits(ctx, resp.Header.Get(headerReqLimitShort)), handleRateLimits(ctx, resp.Header.Get(headerReqLimitLong)), ) return resp, rateLimitErr } func handleRateLimits(ctx context.Context, rateLimitHeader string) error { remaining, resetTime, err := parseRateLimitHeader(rateLimitHeader) if err != nil { return err } if remaining < 25 { log.FromContext(ctx).Info( fmt.Sprintf("Waiting for %d seconds to avoid hitting Pingdom rate limit", resetTime+1), rateLimitHeader, remaining) time.Sleep(time.Duration(remaining+1) * time.Second) } return nil } func parseRateLimitHeader(header string) (remaining int, resetTime int, err error) { _, err = fmt.Sscanf(header, "Remaining: %d Time until reset: %d", &remaining, &resetTime) return } func getPort(checkURL *url.URL) (int, error) { port := checkURL.Port() if port == "" { port = "443" } return strconv.Atoi(port) }
package service import ( "context" "fmt" classiclog "log" m "github.com/PDOK/uptime-operator/internal/model" p "github.com/PDOK/uptime-operator/internal/service/providers" "github.com/PDOK/uptime-operator/internal/service/providers/betterstack" "github.com/PDOK/uptime-operator/internal/service/providers/mock" "github.com/PDOK/uptime-operator/internal/service/providers/pingdom" "sigs.k8s.io/controller-runtime/pkg/log" ) type UptimeCheckOption func(*UptimeCheckService) *UptimeCheckService type UptimeCheckService struct { provider UptimeProvider slack *Slack enableDeletes bool } func New(options ...UptimeCheckOption) *UptimeCheckService { service := &UptimeCheckService{} for _, option := range options { service = option(service) } return service } func WithProvider(provider UptimeProvider) UptimeCheckOption { return func(service *UptimeCheckService) *UptimeCheckService { service.provider = provider return service } } func WithProviderAndSettings(provider p.UptimeProviderID, settings any) UptimeCheckOption { return func(service *UptimeCheckService) *UptimeCheckService { switch provider { case p.ProviderMock: service.provider = mock.New() case p.ProviderPingdom: service.provider = pingdom.New(settings.(pingdom.Settings)) case p.ProviderBetterStack: service.provider = betterstack.New(settings.(betterstack.Settings)) default: classiclog.Fatalf("unsupported provider specified: %s", provider) } return service } } func WithSlack(slackWebhookURL string, slackChannel string) UptimeCheckOption { return func(service *UptimeCheckService) *UptimeCheckService { if slackWebhookURL != "" && slackChannel != "" { service.slack = NewSlack(slackWebhookURL, slackChannel) } return service } } func WithDeletes(enableDeletes bool) UptimeCheckOption { return func(service *UptimeCheckService) *UptimeCheckService { service.enableDeletes = enableDeletes return service } } func (r *UptimeCheckService) Mutate(ctx context.Context, mutation m.Mutation, ingressName string, annotations map[string]string) { _, ignore := annotations[m.AnnotationIgnore] if ignore { r.logRouteIgnore(ctx, mutation, ingressName) return } check, err := m.NewUptimeCheck(ingressName, annotations) if err != nil { r.logAnnotationErr(ctx, err) return } if mutation == m.CreateOrUpdate { err = r.provider.CreateOrUpdateCheck(ctx, *check) r.logMutation(ctx, err, mutation, check) } else if mutation == m.Delete { if !r.enableDeletes { r.logDeleteDisabled(ctx, check) return } err = r.provider.DeleteCheck(ctx, *check) r.logMutation(ctx, err, mutation, check) } } func (r *UptimeCheckService) logDeleteDisabled(ctx context.Context, check *m.UptimeCheck) { msg := fmt.Sprintf("delete of uptime check '%s' (id: %s) not executed since 'enable-deletes=false'.", check.Name, check.ID) log.FromContext(ctx).Info(msg, "check", check) if r.slack == nil { return } r.slack.Send(ctx, ":information_source: "+msg) } func (r *UptimeCheckService) logRouteIgnore(ctx context.Context, mutation m.Mutation, name string) { msg := fmt.Sprintf("ignoring %s for ingress route %s, since this route is marked to be excluded from uptime monitoring", mutation, name) log.FromContext(ctx).Info(msg) if r.slack == nil { return } r.slack.Send(ctx, ":information_source: "+msg) } func (r *UptimeCheckService) logAnnotationErr(ctx context.Context, err error) { msg := fmt.Sprintf("missing or invalid uptime check annotation(s) encountered: %v", err) log.FromContext(ctx).Error(err, msg) if r.slack == nil { return } r.slack.Send(ctx, ":large_red_square: "+msg) } func (r *UptimeCheckService) logMutation(ctx context.Context, err error, mutation m.Mutation, check *m.UptimeCheck) { if err != nil { msg := fmt.Sprintf("%s of uptime check '%s' (id: %s) failed.", string(mutation), check.Name, check.ID) log.FromContext(ctx).Error(err, msg, "check", check) if r.slack == nil { return } r.slack.Send(ctx, ":large_red_square: "+msg) return } msg := fmt.Sprintf("%s of uptime check '%s' (id: %s) succeeded.", string(mutation), check.Name, check.ID) log.FromContext(ctx).Info(msg) if r.slack == nil { return } if mutation == m.Delete { r.slack.Send(ctx, ":warning: "+msg+".\n _Beware: a flood of these delete messages may indicate Traefik itself is down!_") } else { r.slack.Send(ctx, ":large_green_square: "+msg) } }
package service import ( "context" "github.com/PDOK/uptime-operator/internal/model" "github.com/slack-go/slack" "golang.org/x/time/rate" "sigs.k8s.io/controller-runtime/pkg/log" ) const ( nrOfMessagesPerSec = 1 nrOfMessagesPerBurst = 10 ) type Slack struct { webhookURL string channelID string rateLimit *rate.Limiter } func NewSlack(webhookURL, channelID string) *Slack { return &Slack{ webhookURL: webhookURL, channelID: channelID, // see https://api.slack.com/apis/rate-limits rateLimit: rate.NewLimiter(nrOfMessagesPerSec, nrOfMessagesPerBurst), } } func (s *Slack) Send(ctx context.Context, message string) { err := s.rateLimit.Wait(ctx) if err != nil { log.FromContext(ctx).Error(err, "failed waiting for slack rate limit") } err = slack.PostWebhook(s.webhookURL, &slack.WebhookMessage{ Channel: s.channelID, Text: message, Username: model.OperatorName, IconEmoji: ":up:", }) if err != nil { log.FromContext(ctx).Error(err, "failed to post Slack message", "message", message, "channel", s.channelID) } }
package util import ( "strconv" ) func StringsToInts(ss []string) ([]int, error) { result := make([]int, 0, len(ss)) for _, s := range ss { n, err := strconv.Atoi(s) if err != nil { return nil, err } result = append(result, n) } return result, nil }
package util import ( "strings" ) type SliceFlag []string func (sf *SliceFlag) String() string { return strings.Join(*sf, ",") } func (sf *SliceFlag) Set(value string) error { *sf = append(*sf, value) return nil }