/*
MIT License
Copyright (c) 2024 Publieke Dienstverlening op de Kaart
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
package controller
import (
"context"
m "github.com/PDOK/uptime-operator/internal/model"
"github.com/PDOK/uptime-operator/internal/service"
traefikio "github.com/traefik/traefik/v3/pkg/provider/kubernetes/crd/traefikio/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
// IngressRouteReconciler reconciles Traefik IngressRoutes with an uptime monitoring (SaaS) provider
type IngressRouteReconciler struct {
client.Client
Scheme *runtime.Scheme
UptimeCheckService *service.UptimeCheckService
}
//+kubebuilder:rbac:groups=traefik.io,resources=ingressroutes,verbs=get;list;watch
//+kubebuilder:rbac:groups=traefik.io,resources=ingressroutes/finalizers,verbs=update
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.0/pkg/reconcile
func (r *IngressRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
ingressRoute, err := r.getIngressRoute(ctx, req)
if err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
shouldContinue, err := finalizeIfNecessary(ctx, r.Client, ingressRoute, m.AnnotationFinalizer, func() error {
r.UptimeCheckService.Mutate(ctx, m.Delete, ingressRoute.GetName(), ingressRoute.GetAnnotations())
return nil
})
if !shouldContinue || err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
r.UptimeCheckService.Mutate(ctx, m.CreateOrUpdate, ingressRoute.GetName(), ingressRoute.GetAnnotations())
return ctrl.Result{}, nil
}
func (r *IngressRouteReconciler) getIngressRoute(ctx context.Context, req ctrl.Request) (client.Object, error) {
// try getting "traefik.io/v1alpha1" ingress
ingressIo := &traefikio.IngressRoute{}
if err := r.Get(ctx, req.NamespacedName, ingressIo); err != nil {
// still not found, handle error
logger := log.FromContext(ctx)
if apierrors.IsNotFound(err) {
logger.Info("IngressRoute resource not found", "name", req.NamespacedName)
} else {
logger.Error(err, "unable to fetch IngressRoute resource", "error", err)
}
return nil, err
}
return ingressIo, nil
}
func finalizeIfNecessary(ctx context.Context, c client.Client, obj client.Object, finalizerName string, finalizer func() error) (shouldContinue bool, err error) {
// not under deletion, ensure finalizer annotation
if obj.GetDeletionTimestamp().IsZero() {
if !controllerutil.ContainsFinalizer(obj, finalizerName) {
controllerutil.AddFinalizer(obj, finalizerName)
err = c.Update(ctx, obj)
return true, err
}
return true, nil
}
// under deletion but not our finalizer annotation, do nothing
if !controllerutil.ContainsFinalizer(obj, finalizerName) {
return false, nil
}
// run finalizer and remove annotation
if err = finalizer(); err != nil {
return false, err
}
controllerutil.RemoveFinalizer(obj, finalizerName)
err = c.Update(ctx, obj)
return false, err
}
// SetupWithManager sets up the controller with the Manager.
func (r *IngressRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
preCondition := predicate.Or(predicate.GenerationChangedPredicate{}, predicate.AnnotationChangedPredicate{})
return ctrl.NewControllerManagedBy(mgr).
Named(m.OperatorName).
Watches(
&traefikio.IngressRoute{}, // watch "traefik.io/v1alpha1" ingresses
&handler.EnqueueRequestForObject{},
builder.WithPredicates(preCondition)).
Complete(r)
}
package model
import (
"fmt"
"slices"
"strconv"
"strings"
)
const (
OperatorName = "uptime-operator"
// TagManagedBy Indicate to humans that the given check is managed by the operator.
TagManagedBy = "managed-by-" + OperatorName
AnnotationBase = "uptime.pdok.nl"
AnnotationID = AnnotationBase + "/id"
AnnotationName = AnnotationBase + "/name"
AnnotationURL = AnnotationBase + "/url"
AnnotationTags = AnnotationBase + "/tags"
AnnotationInterval = AnnotationBase + "/interval-in-minutes"
AnnotationRequestHeaders = AnnotationBase + "/request-headers"
AnnotationStringContains = AnnotationBase + "/response-check-for-string-contains"
AnnotationStringNotContains = AnnotationBase + "/response-check-for-string-not-contains"
AnnotationFinalizer = AnnotationBase + "/finalizer"
AnnotationIgnore = AnnotationBase + "/ignore"
)
type UptimeCheck struct {
ID string `json:"id"`
Name string `json:"name"`
URL string `json:"url"`
Tags []string `json:"tags"`
Interval int `json:"resolution"`
RequestHeaders map[string]string `json:"request_headers"`
StringContains string `json:"string_contains"`
StringNotContains string `json:"string_not_contains"`
}
func NewUptimeCheck(ingressName string, annotations map[string]string) (*UptimeCheck, error) {
id, ok := annotations[AnnotationID]
if !ok {
return nil, fmt.Errorf("%s annotation not found on ingress route: %s", AnnotationID, ingressName)
}
name, ok := annotations[AnnotationName]
if !ok {
return nil, fmt.Errorf("%s annotation not found on ingress route: %s", AnnotationName, ingressName)
}
url, ok := annotations[AnnotationURL]
if !ok {
return nil, fmt.Errorf("%s annotation not found on ingress route %s", AnnotationURL, ingressName)
}
interval, err := getInterval(annotations)
if err != nil {
return nil, err
}
check := &UptimeCheck{
ID: id,
Name: name,
URL: url,
Tags: stringToSlice(annotations[AnnotationTags]),
Interval: interval,
RequestHeaders: kvStringToMap(annotations[AnnotationRequestHeaders]),
StringContains: annotations[AnnotationStringContains],
StringNotContains: annotations[AnnotationStringNotContains],
}
if !slices.Contains(check.Tags, TagManagedBy) {
check.Tags = append(check.Tags, TagManagedBy)
}
return check, nil
}
func getInterval(annotations map[string]string) (int, error) {
if _, ok := annotations[AnnotationInterval]; ok {
interval, err := strconv.Atoi(annotations[AnnotationInterval])
if err != nil {
return 1, fmt.Errorf("%s annotation should contain integer value: %w", AnnotationInterval, err)
}
return interval, nil
}
return 1, nil
}
func kvStringToMap(s string) map[string]string {
if s == "" {
return nil
}
result := make(map[string]string)
kvPairs := strings.Split(s, ",")
for _, kvPair := range kvPairs {
parts := strings.Split(kvPair, ":")
if len(parts) != 2 {
continue
}
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
result[key] = value
}
return result
}
func stringToSlice(s string) []string {
if s == "" {
return nil
}
var result []string
splits := strings.Split(s, ",")
for _, part := range splits {
result = append(result, strings.TrimSpace(part))
}
return result
}
package betterstack
import (
"context"
"fmt"
classiclog "log"
"net/http"
"strconv"
"time"
"github.com/PDOK/uptime-operator/internal/model"
p "github.com/PDOK/uptime-operator/internal/service/providers"
"sigs.k8s.io/controller-runtime/pkg/log"
)
const betterStackBaseURL = "https://uptime.betterstack.com"
type Settings struct {
APIToken string
PageSize int
}
type BetterStack struct {
client Client
}
// New creates a BetterStack
func New(settings Settings) *BetterStack {
if settings.APIToken == "" {
classiclog.Fatal("Better Stack API token is not provided")
}
if settings.PageSize < 1 {
settings.PageSize = 50 // default https://betterstack.com/docs/uptime/api/pagination/
}
return &BetterStack{
Client{
httpClient: &http.Client{Timeout: time.Duration(5) * time.Minute},
settings: settings,
},
}
}
// CreateOrUpdateCheck create the given check with Better Stack, or update an existing check. Needs to be idempotent!
func (b *BetterStack) CreateOrUpdateCheck(ctx context.Context, check model.UptimeCheck) (err error) {
existingCheckID, err := b.findCheck(check)
if err != nil {
return fmt.Errorf("failed to find check %s, error: %w", check.ID, err)
}
if existingCheckID == p.CheckNotFound { //nolint:nestif // clean enough
log.FromContext(ctx).Info("creating check", "check", check)
monitorID, err := b.client.createMonitor(check)
if err != nil {
return fmt.Errorf("failed to create monitor for check %s, error: %w", check.ID, err)
}
if err = b.client.createMetadata(check.ID, monitorID, check.Tags); err != nil {
return fmt.Errorf("failed to create metadata for check %s, error: %w", check.ID, err)
}
} else {
log.FromContext(ctx).Info("updating check", "check", check, "betterstack ID", existingCheckID)
existingMonitor, err := b.client.getMonitor(existingCheckID)
if err != nil {
return fmt.Errorf("failed to get monitor for check %s, error: %w", check.ID, err)
}
if err = b.client.updateMonitor(check, existingMonitor); err != nil {
return fmt.Errorf("failed to update monitor for check %s (betterstack ID: %d), "+
"error: %w", check.ID, existingCheckID, err)
}
if err = b.client.updateMetadata(check.ID, existingCheckID, check.Tags); err != nil {
return fmt.Errorf("failed to update metdata for check %s (betterstack ID: %d), "+
"error: %w", check.ID, existingCheckID, err)
}
}
return err
}
// DeleteCheck deletes the given check from Better Stack
func (b *BetterStack) DeleteCheck(ctx context.Context, check model.UptimeCheck) error {
log.FromContext(ctx).Info("deleting check", "check", check)
existingCheckID, err := b.findCheck(check)
if err != nil {
return fmt.Errorf("failed to find check %s, error: %w", check.ID, err)
}
if existingCheckID == p.CheckNotFound {
log.FromContext(ctx).Info(fmt.Sprintf("check with ID '%s' is already deleted", check.ID))
return nil
}
if err = b.client.deleteMetadata(check.ID, existingCheckID); err != nil {
return fmt.Errorf("failed to delete metadata for check %s (betterstack ID: %d), "+
"error: %w", check.ID, existingCheckID, err)
}
if err = b.client.deleteMonitor(existingCheckID); err != nil {
return fmt.Errorf("failed to delete monitor for check %s (betterstack ID: %d), "+
"error: %w", check.ID, existingCheckID, err)
}
return nil
}
func (b *BetterStack) findCheck(check model.UptimeCheck) (int64, error) {
result := p.CheckNotFound
metadata, err := b.client.listMetadata()
if err != nil {
return result, err
}
for {
for _, md := range metadata.Data {
if md.Attributes != nil && md.Attributes.Key == check.ID {
result, err = strconv.ParseInt(md.Attributes.OwnerID, 10, 64)
if err != nil {
return result, fmt.Errorf("failed to parse monitor ID %s to integer", md.Attributes.OwnerID)
}
return result, nil
}
}
if !metadata.HasNext() {
break // exit infinite loop
}
metadata, err = metadata.Next(b.client)
if err != nil {
return result, err
}
}
return result, nil
}
package betterstack
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"github.com/PDOK/uptime-operator/internal/model"
p "github.com/PDOK/uptime-operator/internal/service/providers"
)
const typeMonitor = "Monitor"
type Client struct {
httpClient *http.Client
settings Settings
}
func (h Client) execRequest(req *http.Request, expectedStatus int) (*http.Response, error) {
req.Header.Set(p.HeaderAuthorization, "Bearer "+h.settings.APIToken)
req.Header.Set(p.HeaderAccept, p.MediaTypeJSON)
req.Header.Set(p.HeaderContentType, p.MediaTypeJSON)
req.Header.Add(p.HeaderUserAgent, model.OperatorName)
resp, err := h.httpClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != expectedStatus {
defer resp.Body.Close()
result, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("got status %d, expected %d. Body: %b", resp.StatusCode, expectedStatus, result)
}
return resp, nil // caller should close resp.Body!
}
func (h Client) execRequestIgnoreResponseBody(req *http.Request, expectedStatus int) error {
resp, err := h.execRequest(req, expectedStatus)
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}
type MetadataListResponse struct {
Data []struct {
ID string `json:"id"`
Type string `json:"type"`
Attributes *struct {
Key string `json:"key"`
Values []struct {
Type string `json:"type"`
Value string `json:"value"`
} `json:"values"`
TeamName string `json:"team_name"`
OwnerID string `json:"owner_id"`
OwnerType string `json:"owner_type"`
} `json:"attributes"`
} `json:"data"`
Pagination *struct {
First string `json:"first"`
Last string `json:"last"`
Prev string `json:"prev"`
Next string `json:"next"`
} `json:"pagination"`
}
// listMetadata https://betterstack.com/docs/uptime/api/list-all-existing-metadata/
func (h Client) listMetadata() (*MetadataListResponse, error) {
url := fmt.Sprintf("%s/api/v3/metadata?owner_type=Monitor&per_page=%d", betterStackBaseURL, h.settings.PageSize)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
// Make initial HTTP request
resp, err := h.execRequest(req, http.StatusOK)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Parse response
var metadata MetadataListResponse
err = json.NewDecoder(resp.Body).Decode(&metadata)
if err != nil {
return nil, err
}
return &metadata, nil
}
func (m MetadataListResponse) HasNext() bool {
return m.Pagination != nil && m.Pagination.Next != ""
}
// Next paginate though metadata, see https://betterstack.com/docs/uptime/api/pagination/
func (m MetadataListResponse) Next(client Client) (*MetadataListResponse, error) {
if !m.HasNext() {
return nil, nil
}
// Make HTTP request to the next URL
req, err := http.NewRequest(http.MethodGet, m.Pagination.Next, nil)
if err != nil {
return nil, err
}
resp, err := client.execRequest(req, http.StatusOK)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Parse response
var nextPage MetadataListResponse
err = json.NewDecoder(resp.Body).Decode(&nextPage)
if err != nil {
return nil, err
}
return &nextPage, nil
}
type MetadataValue struct {
Value string `json:"value"`
}
type MetadataUpdateRequest struct {
Key string `json:"key"`
Values []MetadataValue `json:"values"`
OwnerID string `json:"owner_id"`
OwnerType string `json:"owner_type"`
}
// createMetadata https://betterstack.com/docs/uptime/api/update-an-existing-metadata-record/
func (h Client) createMetadata(key string, monitorID int64, tags []string) error {
metadataUpdateRequest := MetadataUpdateRequest{
Key: key,
OwnerID: strconv.FormatInt(monitorID, 10),
OwnerType: typeMonitor,
}
for _, tag := range tags {
metadataUpdateRequest.Values = append(metadataUpdateRequest.Values, MetadataValue{tag})
}
body := &bytes.Buffer{}
err := json.NewEncoder(body).Encode(&metadataUpdateRequest)
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, betterStackBaseURL+"/api/v3/metadata", body)
if err != nil {
return err
}
if err := h.execRequestIgnoreResponseBody(req, http.StatusCreated); err != nil {
return err
}
return nil
}
// updateMetadata https://betterstack.com/docs/uptime/api/update-an-existing-metadata-record/
func (h Client) updateMetadata(key string, monitorID int64, tags []string) error {
metadataUpdateRequest := MetadataUpdateRequest{
Key: key,
OwnerID: strconv.FormatInt(monitorID, 10),
OwnerType: typeMonitor,
}
for _, tag := range tags {
metadataUpdateRequest.Values = append(metadataUpdateRequest.Values, MetadataValue{tag})
}
body := &bytes.Buffer{}
err := json.NewEncoder(body).Encode(&metadataUpdateRequest)
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, betterStackBaseURL+"/api/v3/metadata", body)
if err != nil {
return err
}
if err = h.execRequestIgnoreResponseBody(req, http.StatusOK); err != nil {
return err
}
return nil
}
// deleteMetadata https://betterstack.com/docs/uptime/api/update-an-existing-metadata-record/
func (h Client) deleteMetadata(key string, monitorID int64) error {
metadataDeleteRequest := MetadataUpdateRequest{
Key: key,
OwnerID: strconv.FormatInt(monitorID, 10),
OwnerType: typeMonitor,
Values: []MetadataValue{}, // empty values will result in delete of metadata record
}
body := &bytes.Buffer{}
err := json.NewEncoder(body).Encode(&metadataDeleteRequest)
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, betterStackBaseURL+"/api/v3/metadata", body)
if err != nil {
return err
}
if err = h.execRequestIgnoreResponseBody(req, http.StatusNoContent); err != nil {
return err
}
return nil
}
type MonitorRequestHeader struct {
ID string `json:"id,omitempty"`
Name string `json:"name"`
Value string `json:"value"`
Destroy bool `json:"_destroy"` //nolint:tagliatelle
}
type MonitorCreateOrUpdateRequest struct {
MonitorType string `json:"monitor_type"`
URL string `json:"url"`
PronounceableName string `json:"pronounceable_name"`
Port int `json:"port"`
Email bool `json:"email"`
Sms bool `json:"sms"`
Call bool `json:"call"`
RequiredKeyword string `json:"required_keyword"`
CheckFrequency int `json:"check_frequency"`
RequestHeaders []MonitorRequestHeader `json:"request_headers"`
}
type MonitorCreateResponse struct {
Data struct {
ID string `json:"id"`
Type string `json:"type"`
} `json:"data"`
}
// createMonitor https://betterstack.com/docs/uptime/api/create-a-new-monitor/
func (h Client) createMonitor(check model.UptimeCheck) (int64, error) {
createRequest := checkToMonitor(check)
body := &bytes.Buffer{}
err := json.NewEncoder(body).Encode(createRequest)
if err != nil {
return -1, err
}
req, err := http.NewRequest(http.MethodPost, betterStackBaseURL+"/api/v2/monitors", body)
if err != nil {
return -1, err
}
resp, err := h.execRequest(req, http.StatusCreated)
if err != nil {
return -1, err
}
defer resp.Body.Close()
var createResponse *MonitorCreateResponse
err = json.NewDecoder(resp.Body).Decode(&createResponse)
if err != nil {
return -1, err
}
monitorID, err := strconv.ParseInt(createResponse.Data.ID, 10, 64)
if err != nil {
return -1, err
}
return monitorID, nil
}
// updateMonitor https://betterstack.com/docs/uptime/api/update-an-existing-monitor/
func (h Client) updateMonitor(check model.UptimeCheck, existingMonitor *MonitorGetResponse) error {
updateRequest := checkToMonitor(check)
if existingMonitor == nil || existingMonitor.Data == nil || existingMonitor.Data.Attributes == nil {
return fmt.Errorf("invalid monitor response, expected values are nil: %v", existingMonitor)
}
// Remove all existing headers (since the API works with HTTP PATCH, to avoid duplicate headers).
for _, existingHeader := range existingMonitor.Data.Attributes.RequestHeaders {
updateRequest.RequestHeaders = append(updateRequest.RequestHeaders, MonitorRequestHeader{
ID: existingHeader.ID,
Destroy: true,
})
}
body := &bytes.Buffer{}
err := json.NewEncoder(body).Encode(&updateRequest)
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPatch, fmt.Sprintf("%s/api/v2/monitors/%s", betterStackBaseURL, existingMonitor.Data.ID), body)
if err != nil {
return err
}
if err = h.execRequestIgnoreResponseBody(req, http.StatusOK); err != nil {
return err
}
return nil
}
// deleteMonitor https://betterstack.com/docs/uptime/api/delete-an-existing-monitor/
func (h Client) deleteMonitor(monitorID int64) error {
req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/api/v2/monitors/%d", betterStackBaseURL, monitorID), nil)
if err != nil {
return err
}
if err = h.execRequestIgnoreResponseBody(req, http.StatusNoContent); err != nil {
return err
}
return nil
}
type MonitorGetResponse struct {
Data *struct {
ID string `json:"id"`
Type string `json:"type"`
Attributes *struct {
URL string `json:"url"`
PronounceableName string `json:"pronounceable_name"`
MonitorType string `json:"monitor_type"`
RequiredKeyword string `json:"required_keyword"`
CheckFrequency int `json:"check_frequency"`
RequestHeaders []MonitorRequestHeader `json:"request_headers"`
} `json:"attributes"`
} `json:"data"`
}
func (h Client) getMonitor(monitorID int64) (*MonitorGetResponse, error) {
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/api/v2/monitors/%d", betterStackBaseURL, monitorID), nil)
if err != nil {
return nil, err
}
resp, err := h.execRequest(req, http.StatusOK)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var existingMonitor *MonitorGetResponse
err = json.NewDecoder(resp.Body).Decode(&existingMonitor)
if err != nil {
return nil, err
}
return existingMonitor, nil
}
func checkToMonitor(check model.UptimeCheck) MonitorCreateOrUpdateRequest {
var request MonitorCreateOrUpdateRequest
switch {
case check.StringContains != "":
request = MonitorCreateOrUpdateRequest{
MonitorType: "keyword",
RequiredKeyword: check.StringContains,
}
case check.StringNotContains != "":
request = MonitorCreateOrUpdateRequest{
MonitorType: "keyword_absence",
RequiredKeyword: check.StringNotContains,
}
default:
request = MonitorCreateOrUpdateRequest{
MonitorType: "status",
}
}
request.URL = check.URL
request.PronounceableName = check.Name
request.Port = 443
request.CheckFrequency = toSupportedInterval(check.Interval)
request.Email = false
request.Sms = false
request.Call = false
for name, value := range check.RequestHeaders {
request.RequestHeaders = append(request.RequestHeaders, MonitorRequestHeader{
Name: name,
Value: value,
})
}
return request
}
package betterstack
import "math"
func toSupportedInterval(intervalInMin int) int {
// Better Stack only accepts a specific sets of intervals
supportedIntervals := []int{30, 45, 60, 120, 180, 300, 600, 900, 1800}
intervalInSec := intervalInMin * 60
if intervalInSec <= 0 {
return supportedIntervals[0] // use the smallest supported interval
}
if intervalInSec > supportedIntervals[len(supportedIntervals)-1] {
return supportedIntervals[len(supportedIntervals)-1] // use the largest supported interval
}
nearestInterval := supportedIntervals[0]
prevDiff := math.MaxInt
// use nearest supported interval
for _, si := range supportedIntervals {
diff := int(math.Abs(float64(intervalInSec - si)))
if diff < prevDiff {
prevDiff = diff
nearestInterval = si
}
}
return nearestInterval
}
package mock
import (
"context"
"encoding/json"
"fmt"
"github.com/PDOK/uptime-operator/internal/model"
"sigs.k8s.io/controller-runtime/pkg/log"
)
type Mock struct {
checks map[string]model.UptimeCheck
}
func New() *Mock {
return &Mock{
checks: make(map[string]model.UptimeCheck),
}
}
func (m *Mock) CreateOrUpdateCheck(ctx context.Context, check model.UptimeCheck) error {
m.checks[check.ID] = check
checkJSON, _ := json.Marshal(check)
log.FromContext(ctx).Info(fmt.Sprintf("MOCK: created or updated check %s\n", checkJSON))
return nil
}
func (m *Mock) DeleteCheck(ctx context.Context, check model.UptimeCheck) error {
delete(m.checks, check.ID)
checkJSON, _ := json.Marshal(check)
log.FromContext(ctx).Info(fmt.Sprintf("MOCK: deleted check %s\n", checkJSON))
return nil
}
package pingdom
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
classiclog "log"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"time"
"github.com/PDOK/uptime-operator/internal/model"
"github.com/PDOK/uptime-operator/internal/service/providers"
"sigs.k8s.io/controller-runtime/pkg/log"
)
const pingdomURL = "https://api.pingdom.com/api/3.1/checks"
const customIDPrefix = "id:"
const headerReqLimitShort = "Req-Limit-Short"
const headerReqLimitLong = "Req-Limit-Long"
type Settings struct {
APIToken string
UserIDs []int
IntegrationIDs []int
}
type Pingdom struct {
settings Settings
httpClient *http.Client
}
// New creates a Pingdom
func New(settings Settings) *Pingdom {
if settings.APIToken == "" {
classiclog.Fatal("Pingdom API token is not provided")
}
return &Pingdom{
settings: settings,
httpClient: &http.Client{Timeout: time.Duration(5) * time.Minute},
}
}
// CreateOrUpdateCheck create the given check with Pingdom, or update an existing check. Needs to be idempotent!
func (p *Pingdom) CreateOrUpdateCheck(ctx context.Context, check model.UptimeCheck) (err error) {
existingCheckID, err := p.findCheck(ctx, check)
if err != nil {
return err
}
if existingCheckID == providers.CheckNotFound {
err = p.createCheck(ctx, check)
} else {
err = p.updateCheck(ctx, existingCheckID, check)
}
return err
}
// DeleteCheck deletes the given check from Pingdom
func (p *Pingdom) DeleteCheck(ctx context.Context, check model.UptimeCheck) error {
log.FromContext(ctx).Info("deleting check", "check", check)
existingCheckID, err := p.findCheck(ctx, check)
if err != nil {
return err
}
if existingCheckID == providers.CheckNotFound {
log.FromContext(ctx).Info(fmt.Sprintf("check with ID '%s' is already deleted", check.ID))
return nil
}
req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/%d", pingdomURL, existingCheckID), nil)
if err != nil {
return err
}
resp, err := p.execRequest(ctx, req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
resultBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("got status %d, expected HTTP OK when deleting existing check. Error %s", resp.StatusCode, resultBody)
}
return nil
}
func (p *Pingdom) findCheck(ctx context.Context, check model.UptimeCheck) (int64, error) {
result := providers.CheckNotFound
// list all checks managed by uptime-operator. Can be at most 25.000, which is probably sufficient.
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s?include_tags=true&limit=25000&tags=%s", pingdomURL, model.TagManagedBy), nil)
if err != nil {
return result, err
}
req.Header.Add(providers.HeaderAccept, providers.MediaTypeJSON)
resp, err := p.execRequest(ctx, req)
if err != nil {
return result, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return result, fmt.Errorf("got status %d, expected HTTP OK when listing existing checks", resp.StatusCode)
}
checksResponse := make(map[string]any)
err = json.NewDecoder(resp.Body).Decode(&checksResponse)
if err != nil {
return result, err
}
pingdomChecks := checksResponse["checks"].([]any)
for _, rawCheck := range pingdomChecks {
pingdomCheck := rawCheck.(map[string]any)
tags := pingdomCheck["tags"]
if tags == nil {
continue
}
for _, rawTag := range tags.([]any) {
tag := rawTag.(map[string]any)
if tag == nil {
continue
}
tagName := tag["name"].(string)
if strings.HasSuffix(tagName, check.ID) {
// bingo, we've found the Pingdom check based on our custom ID (check.ID which is stored in a Pingdom tag).
// now we return the actual Pingdom ID which we need for updates/deletes/etc.
pingdomCheckID := pingdomCheck["id"]
if pingdomCheckID != nil && pingdomCheckID.(float64) > 0 {
result = int64(pingdomCheckID.(float64))
}
}
}
}
return result, nil
}
func (p *Pingdom) createCheck(ctx context.Context, check model.UptimeCheck) error {
log.FromContext(ctx).Info("creating check", "check", check)
message, err := p.checkToJSON(check, true)
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, pingdomURL, bytes.NewBuffer(message))
if err != nil {
return err
}
err = p.execRequestWithBody(ctx, req)
if err != nil {
return err
}
return nil
}
func (p *Pingdom) updateCheck(ctx context.Context, existingPingdomID int64, check model.UptimeCheck) error {
log.FromContext(ctx).Info("updating check", "check", check, "pingdom ID", existingPingdomID)
message, err := p.checkToJSON(check, false)
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("%s/%d", pingdomURL, existingPingdomID), bytes.NewBuffer(message))
if err != nil {
return err
}
err = p.execRequestWithBody(ctx, req)
if err != nil {
return err
}
return nil
}
func (p *Pingdom) checkToJSON(check model.UptimeCheck, includeType bool) ([]byte, error) {
checkURL, err := url.ParseRequestURI(check.URL)
if err != nil {
return nil, err
}
port, err := getPort(checkURL)
if err != nil {
return nil, err
}
relativeURL := checkURL.Path
if checkURL.RawQuery != "" {
relativeURL += "?" + checkURL.RawQuery
}
// add the check id (from the k8s annotation) as a tag, so
// we can latter retrieve the check during update or delete.
check.Tags = append(check.Tags, customIDPrefix+check.ID)
// tags can be at most 64 chars long, cut off longer ones
for k := range check.Tags {
tag := check.Tags[k]
if len(tag) > 64 {
tag = tag[:64]
}
check.Tags[k] = tag
}
message := map[string]any{
"name": check.Name,
"host": checkURL.Hostname(),
"url": relativeURL,
"encryption": true, // assume all checks run over HTTPS
"port": port,
"resolution": check.Interval,
"tags": check.Tags,
}
if includeType {
// update messages shouldn't include 'type', since the type of check can't be modified in Pingdom.
message["type"] = "http"
}
if len(p.settings.UserIDs) > 0 {
message["userids"] = p.settings.UserIDs
}
if len(p.settings.IntegrationIDs) > 0 {
message["integrationids"] = p.settings.IntegrationIDs
}
// request header need to be submitted in numbered JSON keys
// for example "requestheader1": key:value, "requestheader2": key:value, etc
var headers []string
for header := range check.RequestHeaders {
headers = append(headers, header)
}
sort.Strings(headers)
for i, header := range headers {
message[fmt.Sprintf("requestheader%d", i)] = fmt.Sprintf("%s:%s", header, check.RequestHeaders[header])
}
// Pingdom doesn't allow both "shouldcontain" and "shouldnotcontain"
if check.StringContains != "" {
message["shouldcontain"] = check.StringContains
} else if check.StringNotContains != "" {
message["shouldnotcontain"] = check.StringNotContains
}
return json.Marshal(message)
}
func (p *Pingdom) execRequestWithBody(ctx context.Context, req *http.Request) error {
req.Header.Add(providers.HeaderContentType, providers.MediaTypeJSON)
resp, err := p.execRequest(ctx, req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
resultBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("got http status %d, while expected 200. Error: %s", resp.StatusCode, resultBody)
}
return nil
}
func (p *Pingdom) execRequest(ctx context.Context, req *http.Request) (*http.Response, error) {
req.Header.Add(providers.HeaderAuthorization, "Bearer "+p.settings.APIToken)
req.Header.Add(providers.HeaderUserAgent, model.OperatorName)
resp, err := p.httpClient.Do(req)
if err != nil {
return resp, err
}
rateLimitErr := errors.Join(
handleRateLimits(ctx, resp.Header.Get(headerReqLimitShort)),
handleRateLimits(ctx, resp.Header.Get(headerReqLimitLong)),
)
return resp, rateLimitErr
}
func handleRateLimits(ctx context.Context, rateLimitHeader string) error {
remaining, resetTime, err := parseRateLimitHeader(rateLimitHeader)
if err != nil {
return err
}
if remaining < 25 {
log.FromContext(ctx).Info(
fmt.Sprintf("Waiting for %d seconds to avoid hitting Pingdom rate limit", resetTime+1),
rateLimitHeader, remaining)
time.Sleep(time.Duration(remaining+1) * time.Second)
}
return nil
}
func parseRateLimitHeader(header string) (remaining int, resetTime int, err error) {
_, err = fmt.Sscanf(header, "Remaining: %d Time until reset: %d", &remaining, &resetTime)
return
}
func getPort(checkURL *url.URL) (int, error) {
port := checkURL.Port()
if port == "" {
port = "443"
}
return strconv.Atoi(port)
}
package service
import (
"context"
"fmt"
classiclog "log"
m "github.com/PDOK/uptime-operator/internal/model"
p "github.com/PDOK/uptime-operator/internal/service/providers"
"github.com/PDOK/uptime-operator/internal/service/providers/betterstack"
"github.com/PDOK/uptime-operator/internal/service/providers/mock"
"github.com/PDOK/uptime-operator/internal/service/providers/pingdom"
"sigs.k8s.io/controller-runtime/pkg/log"
)
type UptimeCheckOption func(*UptimeCheckService) *UptimeCheckService
type UptimeCheckService struct {
provider UptimeProvider
slack *Slack
enableDeletes bool
}
func New(options ...UptimeCheckOption) *UptimeCheckService {
service := &UptimeCheckService{}
for _, option := range options {
service = option(service)
}
return service
}
func WithProvider(provider UptimeProvider) UptimeCheckOption {
return func(service *UptimeCheckService) *UptimeCheckService {
service.provider = provider
return service
}
}
func WithProviderAndSettings(provider p.UptimeProviderID, settings any) UptimeCheckOption {
return func(service *UptimeCheckService) *UptimeCheckService {
switch provider {
case p.ProviderMock:
service.provider = mock.New()
case p.ProviderPingdom:
service.provider = pingdom.New(settings.(pingdom.Settings))
case p.ProviderBetterStack:
service.provider = betterstack.New(settings.(betterstack.Settings))
default:
classiclog.Fatalf("unsupported provider specified: %s", provider)
}
return service
}
}
func WithSlack(slackWebhookURL string, slackChannel string) UptimeCheckOption {
return func(service *UptimeCheckService) *UptimeCheckService {
if slackWebhookURL != "" && slackChannel != "" {
service.slack = NewSlack(slackWebhookURL, slackChannel)
}
return service
}
}
func WithDeletes(enableDeletes bool) UptimeCheckOption {
return func(service *UptimeCheckService) *UptimeCheckService {
service.enableDeletes = enableDeletes
return service
}
}
func (r *UptimeCheckService) Mutate(ctx context.Context, mutation m.Mutation, ingressName string, annotations map[string]string) {
_, ignore := annotations[m.AnnotationIgnore]
if ignore {
r.logRouteIgnore(ctx, mutation, ingressName)
return
}
check, err := m.NewUptimeCheck(ingressName, annotations)
if err != nil {
r.logAnnotationErr(ctx, err)
return
}
if mutation == m.CreateOrUpdate {
err = r.provider.CreateOrUpdateCheck(ctx, *check)
r.logMutation(ctx, err, mutation, check)
} else if mutation == m.Delete {
if !r.enableDeletes {
r.logDeleteDisabled(ctx, check)
return
}
err = r.provider.DeleteCheck(ctx, *check)
r.logMutation(ctx, err, mutation, check)
}
}
func (r *UptimeCheckService) logDeleteDisabled(ctx context.Context, check *m.UptimeCheck) {
msg := fmt.Sprintf("delete of uptime check '%s' (id: %s) not executed since 'enable-deletes=false'.", check.Name, check.ID)
log.FromContext(ctx).Info(msg, "check", check)
if r.slack == nil {
return
}
r.slack.Send(ctx, ":information_source: "+msg)
}
func (r *UptimeCheckService) logRouteIgnore(ctx context.Context, mutation m.Mutation, name string) {
msg := fmt.Sprintf("ignoring %s for ingress route %s, since this route is marked to be excluded from uptime monitoring", mutation, name)
log.FromContext(ctx).Info(msg)
if r.slack == nil {
return
}
r.slack.Send(ctx, ":information_source: "+msg)
}
func (r *UptimeCheckService) logAnnotationErr(ctx context.Context, err error) {
msg := fmt.Sprintf("missing or invalid uptime check annotation(s) encountered: %v", err)
log.FromContext(ctx).Error(err, msg)
if r.slack == nil {
return
}
r.slack.Send(ctx, ":large_red_square: "+msg)
}
func (r *UptimeCheckService) logMutation(ctx context.Context, err error, mutation m.Mutation, check *m.UptimeCheck) {
if err != nil {
msg := fmt.Sprintf("%s of uptime check '%s' (id: %s) failed.", string(mutation), check.Name, check.ID)
log.FromContext(ctx).Error(err, msg, "check", check)
if r.slack == nil {
return
}
r.slack.Send(ctx, ":large_red_square: "+msg)
return
}
msg := fmt.Sprintf("%s of uptime check '%s' (id: %s) succeeded.", string(mutation), check.Name, check.ID)
log.FromContext(ctx).Info(msg)
if r.slack == nil {
return
}
if mutation == m.Delete {
r.slack.Send(ctx, ":warning: "+msg+".\n _Beware: a flood of these delete messages may indicate Traefik itself is down!_")
} else {
r.slack.Send(ctx, ":large_green_square: "+msg)
}
}
package service
import (
"context"
"github.com/PDOK/uptime-operator/internal/model"
"github.com/slack-go/slack"
"golang.org/x/time/rate"
"sigs.k8s.io/controller-runtime/pkg/log"
)
const (
nrOfMessagesPerSec = 1
nrOfMessagesPerBurst = 10
)
type Slack struct {
webhookURL string
channelID string
rateLimit *rate.Limiter
}
func NewSlack(webhookURL, channelID string) *Slack {
return &Slack{
webhookURL: webhookURL,
channelID: channelID,
// see https://api.slack.com/apis/rate-limits
rateLimit: rate.NewLimiter(nrOfMessagesPerSec, nrOfMessagesPerBurst),
}
}
func (s *Slack) Send(ctx context.Context, message string) {
err := s.rateLimit.Wait(ctx)
if err != nil {
log.FromContext(ctx).Error(err, "failed waiting for slack rate limit")
}
err = slack.PostWebhook(s.webhookURL, &slack.WebhookMessage{
Channel: s.channelID,
Text: message,
Username: model.OperatorName,
IconEmoji: ":up:",
})
if err != nil {
log.FromContext(ctx).Error(err, "failed to post Slack message",
"message", message, "channel", s.channelID)
}
}
package util
import (
"strconv"
)
func StringsToInts(ss []string) ([]int, error) {
result := make([]int, 0, len(ss))
for _, s := range ss {
n, err := strconv.Atoi(s)
if err != nil {
return nil, err
}
result = append(result, n)
}
return result, nil
}
package util
import (
"strings"
)
type SliceFlag []string
func (sf *SliceFlag) String() string {
return strings.Join(*sf, ",")
}
func (sf *SliceFlag) Set(value string) error {
*sf = append(*sf, value)
return nil
}