package clock
import "time"
type NowFunc func() time.Time
type ManagedTime interface {
Now() time.Time
Sleep(time.Duration)
TickPeriodically(duration time.Duration, handler func(now time.Time)) func()
AfterFunc(d time.Duration, f func(now time.Time))
}
func ParseTime(nowString string) time.Time {
now, _ := time.Parse(time.RFC3339, nowString)
return now
}
package clock
import (
"sync"
"time"
)
type ManualClock struct {
now time.Time
tickers map[int64]*manualTicker
tickerIDs int64
s *sync.Mutex
advanceOnNow time.Duration
}
func NewManualClock(now time.Time, advanceOnNow time.Duration) *ManualClock {
return &ManualClock{
now: now,
tickers: make(map[int64]*manualTicker),
s: &sync.Mutex{},
advanceOnNow: advanceOnNow,
}
}
type manualTicker struct {
handler func(now time.Time)
monotonicTime time.Time
duration time.Duration
}
func (m *manualTicker) tick(now time.Time) {
tick := m.monotonicTime.Add(m.duration)
for tick.Before(now) || tick.Equal(now) {
m.monotonicTime = tick
m.handler(m.monotonicTime)
tick = m.monotonicTime.Add(m.duration)
}
}
func (m *manualTicker) reset(now time.Time) {
m.monotonicTime = now
}
func (t *ManualClock) Now() time.Time {
t.s.Lock()
currentTime := t.now
t.s.Unlock()
if t.advanceOnNow > 0 {
t.Advance(t.advanceOnNow)
}
return currentTime
}
func (t *ManualClock) Sleep(d time.Duration) {
t.Advance(d)
}
func (t *ManualClock) Advance(duration time.Duration) {
t.s.Lock()
t.now = t.now.Add(duration)
currentTime := t.now
currentTickers := t.tickers
t.s.Unlock()
for _, ticker := range currentTickers {
ticker.tick(currentTime)
}
}
func (t *ManualClock) TickPeriodically(duration time.Duration, handler func(t time.Time)) func() {
now := t.Now()
t.s.Lock()
t.tickerIDs++
key := t.tickerIDs
t.tickers[key] = &manualTicker{
handler: handler,
monotonicTime: now,
duration: duration,
}
t.s.Unlock()
return func() {
t.s.Lock()
delete(t.tickers, key)
t.s.Unlock()
}
}
func (t *ManualClock) AfterFunc(duration time.Duration, f func(now time.Time)) {
now := t.Now()
t.s.Lock()
t.tickerIDs++
key := t.tickerIDs
t.tickers[key] = &manualTicker{
handler: func(now time.Time) {
t.s.Lock()
delete(t.tickers, key)
t.s.Unlock()
f(now)
},
monotonicTime: now,
duration: duration,
}
t.s.Unlock()
}
func (t *ManualClock) Reset(now time.Time) {
t.s.Lock()
defer t.s.Unlock()
t.now = now
for _, ticker := range t.tickers {
ticker.reset(t.now)
}
}
package clock
import "time"
type SystemClock struct {
}
func NewSystemClock() *SystemClock {
return &SystemClock{}
}
func (m *SystemClock) Now() time.Time {
return time.Now()
}
func (m *SystemClock) Sleep(d time.Duration) {
time.Sleep(d)
}
func (m *SystemClock) TickPeriodically(duration time.Duration, handler func(t time.Time)) func() {
ticker := time.NewTicker(duration)
go func() {
for t := range ticker.C {
handler(t)
}
}()
return ticker.Stop
}
func (m *SystemClock) AfterFunc(d time.Duration, f func(now time.Time)) {
time.AfterFunc(d, func() {
f(time.Now())
})
}
package concurrency
import (
"context"
"hash/fnv"
)
type FanOut[M any, S any] struct {
channels []chan M
scopes *Scopes[S]
}
func NewFanOut[M any, S any](scopes *Scopes[S], queueProcessor func(ctx context.Context, m M, scope *S)) *FanOut[M, S] {
channels := make([]chan M, scopes.Len())
i := 0
for queueID, scope := range scopes.ForEachScope() {
messages := make(chan M)
channels[i] = messages
go func(ctx context.Context, messages chan M, processID string, queueScope *S) {
for message := range messages {
queueProcessor(ctx, message, queueScope)
}
}(scope.Ctx, messages, queueID, scope.Value)
i++
}
return &FanOut[M, S]{
channels: channels,
scopes: scopes,
}
}
func (f *FanOut[M, S]) Send(id []byte, m M) {
if len(f.channels) == 0 {
return
}
index := 0
hash := fnv.New32()
_, hashErr := hash.Write(id)
if hashErr == nil {
idHash := int(hash.Sum32())
index = idHash % len(f.channels)
}
f.channels[index] <- m
}
func (f *FanOut[M, S]) Broadcast(m M) {
for i := range f.channels {
f.channels[i] <- m
}
}
func (f *FanOut[M, S]) Close() {
for i := range f.channels {
close(f.channels[i])
}
f.scopes = nil
}
type ScopedAction[S any] interface {
Execute(ctx context.Context, scope *S)
}
func NewActionFanOut[S any](scopes *Scopes[S]) *FanOut[ScopedAction[S], S] {
return NewFanOut(scopes, func(ctx context.Context, action ScopedAction[S], scope *S) {
action.Execute(ctx, scope)
})
}
package concurrency
import (
"context"
"iter"
"maps"
)
type contextQueueID struct{}
var contextQueueIDName = contextQueueID{}
func GetScopeIDFromContext(ctx context.Context) string {
name, _ := ctx.Value(contextQueueIDName).(string)
return name
}
type Scope[S any] struct {
Ctx context.Context
Value *S
}
type Scopes[S any] struct {
names []string
scopes map[string]*Scope[S]
}
func NewScopes[S any](names []string, createScope func(ctx context.Context) *S) *Scopes[S] {
scopes := make(map[string]*Scope[S], len(names))
for _, name := range names {
ctx := context.WithValue(context.Background(), contextQueueIDName, name)
scopes[name] = &Scope[S]{
Ctx: ctx,
Value: createScope(ctx),
}
}
return &Scopes[S]{
names: names,
scopes: scopes,
}
}
func (s *Scopes[S]) Len() int {
return len(s.names)
}
func (s *Scopes[S]) ForEachScope() iter.Seq2[string, *Scope[S]] {
return maps.All(s.scopes)
}
package concurrency
import "context"
type ScopeWorkers[S any, W any, M any] struct {
workers map[string]*scopedWorker[S, W]
inputChannel chan *M
}
type scopedWorker[S any, W any] struct {
worker *W
scope *Scope[S]
}
func NewScopeWorkers[W any, S any, M any](
scopes *Scopes[S],
workerCreator func(cxt context.Context, scope *S) *W,
executor func(ctx context.Context, scope *S, worker *W, message *M),
inputChannelLength int,
) *ScopeWorkers[S, W, M] {
inputChannel := make(chan *M, inputChannelLength)
workers := make(map[string]*scopedWorker[S, W], scopes.Len())
for scopeID, scope := range scopes.ForEachScope() {
worker := workerCreator(scope.Ctx, scope.Value)
workers[scopeID] = &scopedWorker[S, W]{
worker: worker,
scope: scope,
}
go func(scope *Scope[S], worker *W) {
for msg := range inputChannel {
executor(scope.Ctx, scope.Value, worker, msg)
}
}(scope, worker)
}
return &ScopeWorkers[S, W, M]{
workers: workers,
inputChannel: inputChannel,
}
}
func (w *ScopeWorkers[S, W, M]) Execute(m *M) {
w.inputChannel <- m
}
func (w *ScopeWorkers[S, W, M]) Close() {
close(w.inputChannel)
}
package http
import (
"fmt"
"iter"
"sort"
"strings"
)
type RequestLocator struct {
Method string
Path string
Host string
Port int
}
func (l *RequestLocator) Normalize() RequestLocator {
return RequestLocator{
Method: strings.ToUpper(l.Method),
Path: strings.ToLower(l.Path),
Host: strings.ToLower(l.Host),
Port: l.Port,
}
}
type Mux[H any] struct {
entries []patternEntry[H]
}
type patternEntry[H any] struct {
route Route
pattern *RoutePattern
handler *H
}
func (m *Mux[H]) LocaleHandler(locator RequestLocator) *H {
for _, entry := range m.entries {
if entry.pattern.Matches(locator) {
return entry.handler
}
}
return nil
}
func (m *Mux[H]) Upsert(route Route, handler *H) {
for index, entry := range m.entries {
if entry.route == route {
m.entries[index].handler = handler
return
}
}
pattern := NewRoutePattern(route)
m.entries = append(m.entries, patternEntry[H]{
route: route,
pattern: pattern,
handler: handler,
})
sort.SliceStable(m.entries, func(i, j int) bool {
return len(m.entries[i].pattern.ID) > len(m.entries[j].pattern.ID)
})
}
func (m *Mux[H]) Handlers() iter.Seq2[Route, H] {
return func(yield func(Route, H) bool) {
for _, entry := range m.entries {
if !yield(entry.route, *entry.handler) {
break
}
}
}
}
const AnyPort = 0
const AnyMethod = ""
const AnyHost = ""
type Route struct {
Method string
PathPattern string // /user/{user-id}/login/{session-id}/
Host string
Port int
}
func (r *Route) Normalize() Route {
return Route{
Method: strings.ToUpper(r.Method),
PathPattern: strings.ToLower(r.PathPattern),
Host: strings.ToLower(r.Host),
Port: r.Port,
}
}
func (r *Route) ID() string {
if r.Port == AnyPort {
return fmt.Sprintf("%s:%s::%s", r.Method, r.Host, r.PathPattern)
}
return fmt.Sprintf("%s:%s:%d:%s", r.Method, r.Host, r.Port, r.PathPattern)
}
type RoutePattern struct {
ID string
route Route
preParsed []pathPart
}
func NewRoutePattern(route Route) *RoutePattern {
normalized := route.Normalize()
parsed := parsePath(normalized.PathPattern)
return &RoutePattern{
ID: fmt.Sprintf("%s:%s:%d:%s", normalized.Method, normalized.Host, normalized.Port, parsed.ID()),
route: normalized,
preParsed: parsed,
}
}
type pathPart struct {
valueLower string
isWildcard bool
name string // name of the wildcard (e.g., "user-id" for {user-id})
}
const urlDelimiter = "/"
type pathParts []pathPart
func (p pathParts) ID() string {
id := ""
for _, part := range p {
if part.isWildcard {
id += "/{}"
} else {
id += "/" + part.valueLower
}
}
return id
}
func parsePath(pathPattern string) pathParts {
normalized := normalizedPathParts(pathPattern)
if len(normalized) == 0 {
return pathParts{}
}
parts := make(pathParts, 0, len(normalized))
for _, part := range normalized {
isWildcard, name := parseWildcard(part)
parts = append(parts, pathPart{
valueLower: part,
isWildcard: isWildcard,
name: name,
})
}
return parts
}
func parseWildcard(part string) (bool, string) {
if len(part) >= 2 && part[0] == '{' && part[len(part)-1] == '}' {
name := part[1 : len(part)-1]
return true, name
}
return false, ""
}
func (p *RoutePattern) Matches(locator RequestLocator) bool {
locator = locator.Normalize()
if p.route.Method != AnyMethod && p.route.Method != locator.Method {
return false
}
if p.route.Host != AnyHost && p.route.Host != locator.Host {
return false
}
if p.route.Port != AnyPort && p.route.Port != locator.Port {
return false
}
return p.matchesPath(locator.Path)
}
func (p *RoutePattern) matchesPath(path string) bool {
if len(p.preParsed) == 0 {
return true
}
pathParts := normalizedPathParts(path)
if len(pathParts) < len(p.preParsed) {
return false
}
for i, part := range p.preParsed {
if part.isWildcard {
continue
}
if part.valueLower != pathParts[i] {
return false
}
}
return true
}
func normalizedPathParts(path string) []string {
var pathParts []string
parts := strings.Split(strings.Trim(path, urlDelimiter), urlDelimiter)
pathParts = make([]string, 0, len(parts))
for _, part := range parts {
if len(part) != 0 {
pathParts = append(pathParts, part)
}
}
return pathParts
}
package http
import (
"bytes"
"io"
"math"
"net/http"
"time"
)
type RetryRoundTripper struct {
origin http.RoundTripper
shouldRetry IsRetryableResponse
retryMax int
backoff BackoffFunc
sleep func(t time.Duration)
}
func RetryStatusCodes(retryStatus ...int) func(resp *http.Response, err error) bool {
retryCodes := make(map[int]int, len(retryStatus))
for _, status := range retryStatus {
retryCodes[status] = 0
}
return func(resp *http.Response, err error) bool {
if err != nil {
return false
}
_, found := retryCodes[resp.StatusCode]
return found
}
}
func WrapWithRetries(origin http.RoundTripper, shouldRetry func(resp *http.Response, err error) bool, retryMax int, inSeconds float64, sleep func(t time.Duration)) *RetryRoundTripper {
backoff := ExponentialBackoff{
Exponent: inSeconds,
}
return &RetryRoundTripper{
origin: origin,
shouldRetry: shouldRetry,
retryMax: retryMax,
backoff: backoff.Delay,
sleep: sleep,
}
}
type IsRetryableResponse func(resp *http.Response, err error) bool
type BackoffFunc func(retryCount int) time.Duration
type ExponentialBackoff struct {
Exponent float64
}
func (e *ExponentialBackoff) Delay(retryCount int) time.Duration {
millis := int64(math.Pow(e.Exponent, float64(retryCount)) * 1000)
return time.Duration(millis) * time.Millisecond
}
func (t *RetryRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
var bodyBytes []byte
if req.Body != nil {
bodyBytes, _ = io.ReadAll(req.Body)
req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
}
response, responseErr := t.origin.RoundTrip(req)
retries := 0
for t.shouldRetry(response, responseErr) && retries < t.retryMax {
if response.Body != nil {
_, _ = io.ReadAll(response.Body)
_ = response.Body.Close()
}
t.sleep(t.backoff(retries + 1))
if req.Body != nil {
req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
}
response, responseErr = t.origin.RoundTrip(req)
retries++
}
return response, responseErr
}
package egress
import (
"context"
"errors"
"hotline/clock"
"hotline/ingestions"
"hotline/integrations"
"hotline/uuid"
"io"
"log"
"net/http"
"net/http/httptrace"
"strconv"
"time"
)
type RequestSemantics struct {
IntegrationIDName string
RequestIDName string
}
var DefaultRequestSemantics = RequestSemantics{
IntegrationIDName: "User-Agent", // required
RequestIDName: "x-request-id",
}
type Proxy struct {
transport http.RoundTripper
timeout time.Duration
ingestion func(req *ingestions.HttpRequest)
time clock.ManagedTime
v7 uuid.V7StringGenerator
semantics *RequestSemantics
}
func New(
transport http.RoundTripper,
ingestion func(req *ingestions.HttpRequest),
time clock.ManagedTime,
timeout time.Duration,
v7 uuid.V7StringGenerator,
semantics *RequestSemantics) *Proxy {
return &Proxy{
ingestion: ingestion,
transport: transport,
timeout: timeout,
time: time,
v7: v7,
semantics: semantics,
}
}
func (p *Proxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
receivedTime := p.time.Now()
v7String, v7Err := p.v7(receivedTime)
if v7Err != nil {
log.Printf("Error generating v7 string: %s", v7Err.Error())
rw.WriteHeader(http.StatusInternalServerError)
return
}
integrationID, parseErr := parseIntegrationID(req.Header.Get(p.semantics.IntegrationIDName))
if parseErr != nil {
log.Printf("Error parsing integration id: %s", parseErr.Error())
rw.WriteHeader(http.StatusBadRequest)
return
}
ingestedRequest := &ingestions.HttpRequest{
ID: v7String,
IntegrationID: integrationID,
ProtocolVersion: req.Proto,
Method: req.Method,
URL: req.URL,
StartTime: receivedTime,
CorrelationID: req.Header.Get(p.semantics.RequestIDName),
}
reqCtx, cancel := context.WithCancel(req.Context())
defer cancel()
p.time.AfterFunc(p.timeout, func(_ time.Time) {
cancel()
})
trace := &httptrace.ClientTrace{}
resp, respErr := p.transport.RoundTrip(req.WithContext(httptrace.WithClientTrace(reqCtx, trace)))
if respErr != nil {
if errors.Is(respErr, context.Canceled) {
log.Printf("Error proxying request: timeout")
rw.WriteHeader(http.StatusGatewayTimeout)
ingestedRequest.ErrorType = "timeout"
p.ingestion(ingestedRequest)
return
}
log.Printf("Error proxying request: %s", respErr.Error())
rw.WriteHeader(http.StatusBadGateway)
ingestedRequest.ErrorType = "unknown"
p.ingestion(ingestedRequest)
return
}
defer func() {
_ = resp.Body.Close()
}()
for key, values := range resp.Header {
rw.Header()[key] = values
}
rw.WriteHeader(resp.StatusCode)
_, copyErr := io.Copy(rw, resp.Body)
if copyErr != nil {
log.Printf("Error copying response, http status already sent,: %s", copyErr.Error())
ingestedRequest.ErrorType = "proxy_copy_err"
p.ingestion(ingestedRequest)
return
}
ingestedRequest.StatusCode = strconv.Itoa(resp.StatusCode)
ingestedRequest.EndTime = p.time.Now()
p.ingestion(ingestedRequest)
}
func parseIntegrationID(headerValue string) (integrations.ID, error) {
if len(headerValue) == 0 {
return "", errors.New("integration id not found")
}
return integrations.ID(headerValue), nil
}
package ingestions
import (
"hotline/integrations"
"hotline/servicelevels"
"net/url"
"time"
)
type IngestHttpRequests func(req []*HttpRequest)
type HttpRequest struct {
ID string
IntegrationID integrations.ID
ProtocolVersion string
Method string
StatusCode string
URL *url.URL
StartTime time.Time
EndTime time.Time
ErrorType string
CorrelationID string
}
func ToSLORequestMessage(requests []*HttpRequest, now time.Time) []*servicelevels.IngestRequestsMessage {
byIntegrationId := make(map[integrations.ID][]*HttpRequest)
for _, request := range requests {
byIntegrationId[request.IntegrationID] = append(byIntegrationId[request.IntegrationID], request)
}
var result []*servicelevels.IngestRequestsMessage
for integrationID, httpRequests := range byIntegrationId {
var reqs []*servicelevels.HttpRequest
for _, httpRequest := range httpRequests {
reqs = append(reqs, ToSLORequest(httpRequest))
}
result = append(result, &servicelevels.IngestRequestsMessage{
Now: now,
ID: integrationID,
Reqs: reqs,
})
}
return result
}
func ToSLOSingleRequestMessage(request *HttpRequest, now time.Time) *servicelevels.IngestRequestsMessage {
return &servicelevels.IngestRequestsMessage{
ID: request.IntegrationID,
Now: now,
Reqs: []*servicelevels.HttpRequest{
ToSLORequest(request),
},
}
}
func ToSLORequest(httpRequest *HttpRequest) *servicelevels.HttpRequest {
latency := servicelevels.LatencyMs(
httpRequest.EndTime.Sub(httpRequest.StartTime).Milliseconds())
state := httpRequest.ErrorType
if len(httpRequest.StatusCode) > 0 {
state = httpRequest.StatusCode
}
return &servicelevels.HttpRequest{
Latency: latency,
State: state,
Method: httpRequest.Method,
URL: httpRequest.URL,
}
}
package otel
import (
"fmt"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"hotline/ingestions"
"hotline/integrations"
"net/url"
"time"
)
type EnvoyAttributeNames struct {
HttpRequestMethod string // required
HttpStatusCode string // conditionally required if no errorType
UrlFull string // required
NetworkProtocolVersion string // Recommended
IntegrationID string // Recommended
CorrelationID string // optional
}
var EnvoyMappingNames = EnvoyAttributeNames{
HttpRequestMethod: "http.method",
HttpStatusCode: "http.status_code",
UrlFull: "http.url",
NetworkProtocolVersion: "http.protocol",
IntegrationID: "user_agent",
CorrelationID: "guid:x-request-id",
}
type EnvoyMapping struct {
attNames EnvoyAttributeNames
}
func NewEnvoyMapping() *EnvoyMapping {
return &EnvoyMapping{attNames: EnvoyMappingNames}
}
func (h *EnvoyMapping) ConvertMessageToHttp(reqProto *coltracepb.ExportTraceServiceRequest) []*ingestions.HttpRequest {
var requests []*ingestions.HttpRequest
for _, resource := range reqProto.ResourceSpans {
for _, scope := range resource.ScopeSpans {
for _, span := range scope.Spans {
attrs := toMap(span.Attributes)
id := fmt.Sprintf("%s:%s", span.TraceId, span.SpanId)
correlationID, _ := attrs.GetStringValue(h.attNames.CorrelationID)
method, foundMethod := attrs.GetStringValue(h.attNames.HttpRequestMethod)
if !foundMethod {
continue
}
statusCode, foundStatusCode := attrs.GetStringValue(h.attNames.HttpStatusCode)
if !foundStatusCode {
continue
}
fullUrlString, foundFullUrl := attrs.GetStringValue(h.attNames.UrlFull)
if !foundFullUrl {
continue
}
fullUrl, fullUrlParseErr := url.Parse(fullUrlString)
if fullUrl == nil || fullUrlParseErr != nil {
continue
}
protocolVersion, _ := attrs.GetStringValue(h.attNames.NetworkProtocolVersion)
integrationID := fullUrl.Host
hotlineIntegrationId, found := attrs.GetStringValue(h.attNames.IntegrationID)
if found {
integrationID = hotlineIntegrationId
}
requests = append(requests, &ingestions.HttpRequest{
ID: id,
IntegrationID: integrations.ID(integrationID),
ProtocolVersion: protocolVersion,
Method: method,
StatusCode: statusCode,
URL: fullUrl,
StartTime: time.Unix(0, int64(span.StartTimeUnixNano)).UTC(),
EndTime: time.Unix(0, int64(span.EndTimeUnixNano)).UTC(),
CorrelationID: correlationID,
})
}
}
}
return requests
}
package otel
import (
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/protobuf/proto"
"hotline/ingestions"
"io"
"net/http"
)
type MessageConverter interface {
Convert(c *coltracepb.ExportTraceServiceRequest) []*ingestions.HttpRequest
}
type TracesHandler struct {
ingestion ingestions.IngestHttpRequests
messageConverter MessageConverter
}
func NewTracesHandler(ingestion ingestions.IngestHttpRequests, messageConverter MessageConverter) *TracesHandler {
return &TracesHandler{
ingestion: ingestion,
messageConverter: messageConverter,
}
}
func (h *TracesHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
raw, readErr := io.ReadAll(req.Body)
defer req.Body.Close()
if readErr != nil {
http.Error(w, "could not read body", http.StatusInternalServerError)
return
}
var reqProto coltracepb.ExportTraceServiceRequest
unmarshalErr := proto.Unmarshal(raw, &reqProto)
if unmarshalErr != nil {
http.Error(w, "could not parse proto", http.StatusBadRequest)
return
}
h.ingestion(h.messageConverter.Convert(&reqProto))
w.WriteHeader(http.StatusCreated)
}
package otel
import (
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"hotline/ingestions"
)
type ProtoConverter struct {
standard *StandardMapping
envoy *EnvoyMapping
}
func NewProtoConverter() *ProtoConverter {
return &ProtoConverter{
standard: NewStandardMapping(),
envoy: NewEnvoyMapping(),
}
}
func (p *ProtoConverter) Convert(c *coltracepb.ExportTraceServiceRequest) []*ingestions.HttpRequest {
reqs := p.envoy.ConvertMessageToHttp(c)
if len(reqs) > 0 {
return reqs
}
return p.standard.ConvertMessageToHttp(c)
}
package otel
import (
"fmt"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
"hotline/ingestions"
"hotline/integrations"
"net/url"
"time"
)
type AttributeNames struct {
HttpRequestMethod string // required
HttpStatusCode string // conditionally required if no errorType
UrlFull string // required
NetworkProtocolVersion string // Recommended
IntegrationID string // Recommended
ErrorType string // conditionally required if no status code
CorrelationID string // required
}
var StandardMappingNames = AttributeNames{
HttpRequestMethod: "http.request.method",
HttpStatusCode: "http.response.status_code",
UrlFull: "url.full",
NetworkProtocolVersion: "network.protocol.version",
IntegrationID: "user_agent.original",
ErrorType: "error.type",
}
type StandardMapping struct {
attNames AttributeNames
}
func NewStandardMapping() *StandardMapping {
return &StandardMapping{attNames: StandardMappingNames}
}
// https://opentelemetry.io/docs/specs/semconv/http/http-spans/#http-client
func (h *StandardMapping) ConvertMessageToHttp(reqProto *coltracepb.ExportTraceServiceRequest) []*ingestions.HttpRequest {
var requests []*ingestions.HttpRequest
for _, resource := range reqProto.ResourceSpans {
for _, scope := range resource.ScopeSpans {
for _, span := range scope.Spans {
if span.Kind != tracepb.Span_SPAN_KIND_CLIENT {
continue
}
attrs := toMap(span.Attributes)
id := fmt.Sprintf("%s:%s", span.TraceId, span.SpanId)
correlationID, _ := attrs.GetStringValue(h.attNames.CorrelationID)
method, foundMethod := attrs.GetStringValue(h.attNames.HttpRequestMethod)
if !foundMethod {
continue
}
statusCode, foundStatusCode := attrs.GetStringValue(h.attNames.HttpStatusCode)
errorType, foundErrorType := attrs.GetStringValue(h.attNames.ErrorType)
if !foundStatusCode && !foundErrorType {
continue
}
fullUrlString, foundFullUrl := attrs.GetStringValue(h.attNames.UrlFull)
if !foundFullUrl {
continue
}
fullUrl, fullUrlParseErr := url.Parse(fullUrlString)
if fullUrl == nil || fullUrlParseErr != nil {
continue
}
protocolVersion, _ := attrs.GetStringValue(h.attNames.NetworkProtocolVersion)
integrationID := fullUrl.Host
hotlineIntegrationId, found := attrs.GetStringValue(h.attNames.IntegrationID)
if found {
integrationID = hotlineIntegrationId
}
if len(integrationID) == 0 {
continue
}
requests = append(requests, &ingestions.HttpRequest{
ID: id,
IntegrationID: integrations.ID(integrationID),
ProtocolVersion: protocolVersion,
Method: method,
StatusCode: statusCode,
URL: fullUrl,
StartTime: time.Unix(0, int64(span.StartTimeUnixNano)).UTC(),
EndTime: time.Unix(0, int64(span.EndTimeUnixNano)).UTC(),
ErrorType: errorType,
CorrelationID: correlationID,
})
}
}
}
return requests
}
type AttributePBMap map[string]*commonpb.KeyValue
func toMap(attributes []*commonpb.KeyValue) AttributePBMap {
values := make(AttributePBMap)
for _, attribute := range attributes {
values[attribute.Key] = attribute
}
return values
}
func (m AttributePBMap) GetStringValue(name string) (string, bool) {
attr, found := m[name]
if found {
return attr.Value.GetStringValue(), true
}
return "", false
}
package reporters
import (
"bytes"
"compress/gzip"
"context"
"fmt"
colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
metricspb "go.opentelemetry.io/proto/otlp/metrics/v1"
"google.golang.org/protobuf/proto"
http2 "hotline/http"
"hotline/servicelevels"
"net/http"
"net/url"
"time"
)
type OtelUrl string
func NewOtelUrl(secure bool, host string) (OtelUrl, error) {
scheme := "https"
if !secure {
scheme = "http"
}
otelUrl, parseErr := url.ParseRequestURI(fmt.Sprintf("%s://%s/v1/metrics", scheme, host))
if parseErr != nil {
return "", parseErr
}
return OtelUrl(otelUrl.String()), nil
}
func (o *OtelUrl) String() string {
return string(*o)
}
type OtelReporterConfig struct {
OtelUrl OtelUrl
Method string
UserAgent string
}
type OtelReporter struct {
client *http.Client
cfg *OtelReporterConfig
protoMarshal func(proto.Message) ([]byte, error)
gzipWriter *gzip.Writer
}
func NewOtelReporter(cfg *OtelReporterConfig, client *http.Client, gzipWriter *gzip.Writer, protoMarshal func(proto.Message) ([]byte, error)) *OtelReporter {
return &OtelReporter{
client: client,
cfg: cfg,
protoMarshal: protoMarshal,
gzipWriter: gzipWriter,
}
}
func (o *OtelReporter) ReportChecks(ctx context.Context, report *servicelevels.CheckReport) error {
var allMetrics []*metricspb.Metric
for _, check := range report.Checks {
metrics := toMetrics(report.Now, check)
allMetrics = append(allMetrics, metrics...)
}
if len(allMetrics) == 0 {
return nil
}
message := &colmetricspb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricspb.ResourceMetrics{
{
ScopeMetrics: []*metricspb.ScopeMetrics{
{
Metrics: allMetrics,
},
},
},
},
}
marshalledBytes, marshalErr := o.protoMarshal(message)
if marshalErr != nil {
return marshalErr
}
bodyReader := bytes.NewReader(compressGzip(o.gzipWriter, marshalledBytes))
postReq, reqErr := http.NewRequestWithContext(
ctx,
o.cfg.Method,
o.cfg.OtelUrl.String(),
bodyReader)
if reqErr != nil {
return reqErr
}
postReq.Header.Set("User-Agent", o.cfg.UserAgent)
postReq.Header.Set("Content-Encoding", "gzip")
postReq.Header.Set("Content-Type", "application/x-protobuf")
postReq.ContentLength = -1
response, respErr := o.client.Do(postReq)
if respErr != nil {
return respErr
}
defer response.Body.Close()
if sc := response.StatusCode; sc >= 200 && sc <= 299 {
return nil
}
return fmt.Errorf("received unexpected status code: %d for req %s %s", response.StatusCode, postReq.Method, postReq.URL.String())
}
func toMetrics(now time.Time, check servicelevels.Check) []*metricspb.Metric {
var metrics []*metricspb.Metric
for _, slo := range check.SLO {
attributes := []*commonpb.KeyValue{
StringAttribute("integration_id", string(check.IntegrationID)),
StringAttribute("metric", slo.Metric.Name),
BoolAttribute("breached", slo.Breach != nil),
}
for key, val := range slo.Tags {
attributes = append(attributes, StringAttribute(key, val))
}
metricID := fmt.Sprintf("service_levels_%s", slo.Namespace)
metricIDEvents := metricID + "_events"
metrics = append(metrics, &metricspb.Metric{
Name: metricID,
Unit: slo.Metric.Unit,
Data: &metricspb.Metric_Gauge{
Gauge: &metricspb.Gauge{
DataPoints: []*metricspb.NumberDataPoint{
{
Attributes: attributes,
TimeUnixNano: uint64(now.UnixNano()),
Value: &metricspb.NumberDataPoint_AsDouble{
AsDouble: slo.Metric.Value,
},
},
},
},
},
}, &metricspb.Metric{
Name: metricIDEvents,
Unit: "#",
Data: &metricspb.Metric_Sum{
Sum: &metricspb.Sum{
DataPoints: []*metricspb.NumberDataPoint{
{
Attributes: attributes,
TimeUnixNano: uint64(now.UnixNano()),
Value: &metricspb.NumberDataPoint_AsInt{
AsInt: slo.Metric.EventsCount,
},
},
},
AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA,
},
},
})
breakDownMetricID := metricID + "_breakdown"
breakDownCountID := breakDownMetricID + "_events"
for _, breakdown := range slo.Breakdown {
attributes = []*commonpb.KeyValue{
StringAttribute("integration_id", string(check.IntegrationID)),
StringAttribute("breakdown", breakdown.Name),
StringAttribute("metric", slo.Metric.Name),
BoolAttribute("breached", slo.Breach != nil),
}
for key, val := range slo.Tags {
attributes = append(attributes, StringAttribute(key, val))
}
metrics = append(metrics, &metricspb.Metric{
Name: breakDownMetricID,
Unit: breakdown.Unit,
Data: &metricspb.Metric_Gauge{
Gauge: &metricspb.Gauge{
DataPoints: []*metricspb.NumberDataPoint{
{
Attributes: attributes,
TimeUnixNano: uint64(now.UnixNano()),
Value: &metricspb.NumberDataPoint_AsDouble{
AsDouble: breakdown.Value,
},
},
},
},
},
}, &metricspb.Metric{
Name: breakDownCountID,
Unit: "#",
Data: &metricspb.Metric_Sum{
Sum: &metricspb.Sum{
DataPoints: []*metricspb.NumberDataPoint{
{
Attributes: attributes,
TimeUnixNano: uint64(now.UnixNano()),
Value: &metricspb.NumberDataPoint_AsInt{
AsInt: breakdown.EventsCount,
},
},
},
AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA,
},
},
})
}
}
return metrics
}
func StringAttribute(key string, value string) *commonpb.KeyValue {
return &commonpb.KeyValue{
Key: key,
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_StringValue{StringValue: value},
},
}
}
func BoolAttribute(key string, value bool) *commonpb.KeyValue {
return &commonpb.KeyValue{
Key: key,
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_BoolValue{BoolValue: value},
},
}
}
func DefaultOtelHttpClient(sleep func(t time.Duration)) *http.Client {
transport := &http.Transport{}
roundTripper := http2.WrapWithRetries(
transport,
http2.RetryStatusCodes(
http.StatusTooManyRequests,
http.StatusBadGateway,
http.StatusServiceUnavailable,
http.StatusGatewayTimeout,
),
5,
1.2,
sleep)
client := &http.Client{
Timeout: 5 * time.Second,
Transport: roundTripper,
}
return client
}
func compressGzip(gzip *gzip.Writer, in []byte) []byte {
var compressedBytes bytes.Buffer
gzip.Reset(&compressedBytes)
_, _ = gzip.Write(in)
_ = gzip.Close()
return compressedBytes.Bytes()
}
package reporters
import (
"compress/gzip"
"context"
"fmt"
"google.golang.org/protobuf/proto"
"hotline/concurrency"
"hotline/servicelevels"
"io"
"log/slog"
"net/http"
"time"
)
type ScopedOtelReporter struct {
workers *concurrency.ScopeWorkers[OtelReporterScope, OtelReporter, servicelevels.CheckReport]
}
type OtelReporterScope struct {
client *http.Client
gzip *gzip.Writer
}
func NewEmptyOtelReporterScope(_ context.Context) *OtelReporterScope {
return &OtelReporterScope{}
}
func NewScopedOtelReporter(
scopes *concurrency.Scopes[OtelReporterScope],
sleep func(t time.Duration),
cfg *OtelReporterConfig,
inputChannelLength int,
) *ScopedOtelReporter {
workers := concurrency.NewScopeWorkers(
scopes,
func(cxt context.Context, scope *OtelReporterScope) *OtelReporter {
scope.client = DefaultOtelHttpClient(sleep)
scope.gzip = gzip.NewWriter(io.Discard)
userAgent := fmt.Sprintf("%s-%s", cfg.UserAgent, concurrency.GetScopeIDFromContext(cxt))
scopedConfig := *cfg
scopedConfig.UserAgent = userAgent
return NewOtelReporter(&scopedConfig, scope.client, scope.gzip, proto.Marshal)
},
func(ctx context.Context, _ *OtelReporterScope, worker *OtelReporter, message *servicelevels.CheckReport) {
reportErr := worker.ReportChecks(ctx, message)
if reportErr != nil {
slog.ErrorContext(ctx, "Failed to report SLO checks ", slog.Any("error", reportErr))
}
},
inputChannelLength)
return &ScopedOtelReporter{
workers: workers,
}
}
func (r *ScopedOtelReporter) ReportChecks(_ context.Context, check *servicelevels.CheckReport) {
r.workers.Execute(check)
}
package servicelevels
import (
"maps"
"slices"
"unsafe"
)
type bucketIndex int
type splitCounter struct {
latency float64
counter int64
}
type bucketCounter struct {
key bucketIndex
counter int64
splits []splitCounter
}
type bucketedCounters struct {
buckets map[bucketIndex]*bucketCounter
}
func (c *bucketedCounters) Add(key bucketIndex, latency float64) {
bucket, found := c.buckets[key]
if !found {
bucket = &bucketCounter{
key: key,
}
c.buckets[key] = bucket
}
bucket.Inc(latency)
}
func (c *bucketedCounters) SizeInBytes() int {
sizeOfBuckets := len(c.buckets)
sizeOfBucket := int(unsafe.Sizeof(&bucketCounter{}))
k := bucketIndex(0)
sizeOfKey := int(unsafe.Sizeof(&k))
return (sizeOfBucket * sizeOfKey) +
(sizeOfBucket * sizeOfBuckets)
}
func (c *bucketedCounters) Sum() int64 {
sum := int64(0)
for _, bucket := range c.buckets {
sum += bucket.Sum()
}
return sum
}
func (c *bucketedCounters) CreateSplit(key bucketIndex, splits []float64) {
splitCounters := make([]splitCounter, len(splits))
for i, latencyForKey := range splits {
splitCounters[i] = splitCounter{
latency: latencyForKey,
counter: 0,
}
}
c.buckets[key] = &bucketCounter{
key: key,
counter: 0,
splits: splitCounters,
}
}
func (c *bucketedCounters) GetCounter(index bucketIndex) *bucketCounter {
return c.buckets[index]
}
func (c *bucketedCounters) GetSortedIndexes() []bucketIndex {
return slices.SortedFunc(maps.Keys(c.buckets), func(index bucketIndex, index2 bucketIndex) int {
return int(index) - int(index2)
})
}
func newBucketedCounters() *bucketedCounters {
return &bucketedCounters{
buckets: make(map[bucketIndex]*bucketCounter),
}
}
func (b *bucketCounter) Inc(latency float64) {
for i, split := range b.splits {
if latency <= split.latency {
b.splits[i].counter++
return
}
}
b.counter++
}
func (b *bucketCounter) Sum() int64 {
sum := b.counter
for _, split := range b.splits {
sum += split.counter
}
return sum
}
func (b *bucketCounter) Split(toDistribute int64) *float64 {
added := int64(0)
for _, split := range b.splits {
added += split.counter
if added >= toDistribute {
return &split.latency
}
}
return nil
}
package servicelevels
import (
"net/url"
"time"
hotlinehttp "hotline/http"
)
type HttpRequest struct {
Latency LatencyMs
State string
Method string
URL *url.URL
}
type HttpApiSLO struct {
mux *hotlinehttp.Mux[HttpRouteSLO]
}
type HttpApiSLODefinition struct {
RouteSLOs []HttpRouteSLODefinition
}
type HttpRouteSLODefinition struct {
Route hotlinehttp.Route
Latency HttpLatencySLODefinition
Status HttpStatusSLODefinition
}
type HttpLatencySLODefinition struct {
Percentiles []PercentileDefinition
WindowDuration time.Duration
}
type HttpStatusSLODefinition struct {
Expected []string
BreachThreshold Percent
WindowDuration time.Duration
}
func NewHttpApiSLO(definition HttpApiSLODefinition) *HttpApiSLO {
apiSlo := &HttpApiSLO{
mux: &hotlinehttp.Mux[HttpRouteSLO]{},
}
for _, routeDefinition := range definition.RouteSLOs {
apiSlo.UpsertRoute(routeDefinition)
}
return apiSlo
}
func (s *HttpApiSLO) AddRequest(now time.Time, req *HttpRequest) {
locator := hotlinehttp.RequestLocator{
Method: req.Method,
Path: req.URL.Path,
Host: req.URL.Hostname(),
Port: 80,
}
handler := s.mux.LocaleHandler(locator)
if handler != nil {
handler.AddRequest(now, req)
}
}
func (s *HttpApiSLO) Check(now time.Time) []SLOCheck {
var checks []SLOCheck
for _, slo := range s.mux.Handlers() {
check := slo.Check(now)
checks = append(checks, check...)
}
return checks
}
func (s *HttpApiSLO) UpsertRoute(routeDefinition HttpRouteSLODefinition) {
slo := NewHttpPathSLO(routeDefinition)
s.mux.Upsert(slo.route, slo)
}
var httpRangeBreakdown = NewHttpStateRangeBreakdown()
type HttpRouteSLO struct {
route hotlinehttp.Route
stateSLO *StateSLO
latencySLO *LatencySLO
expected map[string]bool
}
func NewHttpPathSLO(slo HttpRouteSLODefinition) *HttpRouteSLO {
tags := map[string]string{
"http_route": slo.Route.ID(),
}
expected := make(map[string]bool)
for _, status := range slo.Status.Expected {
expected[status] = true
}
return &HttpRouteSLO{
route: slo.Route,
stateSLO: NewStateSLO(
slo.Status.Expected,
httpRangeBreakdown.GetRanges(),
slo.Status.BreachThreshold,
slo.Status.WindowDuration,
"http_route_status",
tags,
),
latencySLO: NewLatencySLO(
slo.Latency.Percentiles,
slo.Latency.WindowDuration,
"http_route_latency",
tags,
),
expected: expected,
}
}
func (s *HttpRouteSLO) AddRequest(now time.Time, req *HttpRequest) {
s.latencySLO.AddLatency(now, req.Latency)
_, isExpected := s.expected[req.State]
if isExpected {
s.stateSLO.AddState(now, req.State)
return
}
httpRange := httpRangeBreakdown.GetRange(req.State)
if httpRange != nil {
s.stateSLO.AddState(now, *httpRange)
return
}
s.stateSLO.AddState(now, "unknown")
}
func (s *HttpRouteSLO) Check(now time.Time) []SLOCheck {
latencyCheck := s.latencySLO.Check(now)
stateCheck := s.stateSLO.Check(now)
checks := make([]SLOCheck, len(latencyCheck)+len(stateCheck))
checks = checks[:0]
checks = append(checks, latencyCheck...)
checks = append(checks, stateCheck...)
return checks
}
package servicelevels
const httpRange1xx = "1xx"
const httpRange2xx = "2xx"
const httpRange3xx = "3xx"
const httpRange4xx = "4xx"
const httpRange5xx = "5xx"
const httpRangeUnknown = "unknown"
var httpRanges = []string{
httpRange1xx,
httpRange2xx,
httpRange3xx,
httpRange4xx,
httpRange5xx,
httpRangeUnknown,
}
type HttpStateRangeBreakdown struct {
states map[string]string
}
func NewHttpStateRangeBreakdown() *HttpStateRangeBreakdown {
return &HttpStateRangeBreakdown{
map[string]string{
"100": httpRange1xx,
"101": httpRange1xx,
"102": httpRange1xx,
"103": httpRange1xx,
"200": httpRange2xx,
"201": httpRange2xx,
"202": httpRange2xx,
"203": httpRange2xx,
"204": httpRange2xx,
"205": httpRange2xx,
"206": httpRange2xx,
"207": httpRange2xx,
"208": httpRange2xx,
"226": httpRange2xx,
"300": httpRange3xx,
"301": httpRange3xx,
"302": httpRange3xx,
"303": httpRange3xx,
"304": httpRange3xx,
"305": httpRange3xx,
"306": httpRange3xx,
"307": httpRange3xx,
"308": httpRange3xx,
"400": httpRange4xx,
"401": httpRange4xx,
"402": httpRange4xx,
"403": httpRange4xx,
"404": httpRange4xx,
"405": httpRange4xx,
"406": httpRange4xx,
"407": httpRange4xx,
"408": httpRange4xx,
"409": httpRange4xx,
"410": httpRange4xx,
"411": httpRange4xx,
"412": httpRange4xx,
"413": httpRange4xx,
"414": httpRange4xx,
"415": httpRange4xx,
"416": httpRange4xx,
"417": httpRange4xx,
"418": httpRange4xx,
"421": httpRange4xx,
"422": httpRange4xx,
"423": httpRange4xx,
"424": httpRange4xx,
"425": httpRange4xx,
"426": httpRange4xx,
"428": httpRange4xx,
"429": httpRange4xx,
"431": httpRange4xx,
"451": httpRange4xx,
"500": httpRange5xx,
"501": httpRange5xx,
"502": httpRange5xx,
"503": httpRange5xx,
"504": httpRange5xx,
"505": httpRange5xx,
"506": httpRange5xx,
"507": httpRange5xx,
"508": httpRange5xx,
"510": httpRange5xx,
"511": httpRange5xx,
},
}
}
func (b *HttpStateRangeBreakdown) GetRanges() []string {
return httpRanges
}
func (b *HttpStateRangeBreakdown) GetRange(state string) *string {
httpRange, found := b.states[state]
if !found {
return nil
}
return &httpRange
}
package servicelevels
import (
"math"
"slices"
"unsafe"
)
type LatencyHistogram struct {
buckets *bucketedCounters
layout *exponentialBucketLayout
splitLength int
}
func NewLatencyHistogram(splitLatencies []float64) *LatencyHistogram {
h := &LatencyHistogram{
buckets: newBucketedCounters(),
layout: newExponentialLayout(),
splitLength: len(splitLatencies),
}
slices.Sort(splitLatencies)
latenciesByKey := make(map[bucketIndex][]float64)
for _, splitLatency := range splitLatencies {
key := h.layout.key(splitLatency)
latenciesForKey, found := latenciesByKey[key]
if !found {
latenciesForKey = []float64{splitLatency}
} else {
latenciesForKey = append(latenciesForKey, splitLatency)
}
latenciesByKey[key] = latenciesForKey
}
for key, latenciesForKey := range latenciesByKey {
h.buckets.CreateSplit(key, latenciesForKey)
}
return h
}
type Bucket struct {
From float64
To float64
}
func (h *LatencyHistogram) ComputePercentile(percentile float64) (Bucket, int64) {
count := h.buckets.Sum()
if count <= 2 {
return Bucket{}, count
}
pThreshold := int64(math.Ceil(float64(count) * percentile))
index, toDistributeInsideBucket := h.findFirstBucketOverThreshold(pThreshold)
bucket := h.buckets.GetCounter(index)
split := bucket.Split(toDistributeInsideBucket)
var to float64
if split == nil {
to = h.layout.bucketTo(index)
} else {
to = *split
}
return Bucket{
From: h.layout.bucketFrom(index),
To: to,
},
count
}
func (h *LatencyHistogram) findFirstBucketOverThreshold(threshold int64) (bucketIndex, int64) {
entries := int64(0)
sortedKeys := h.buckets.GetSortedIndexes()
firstBucketIndexOverThreshold := sortedKeys[len(sortedKeys)-1]
toDistributeInsideBucket := int64(0)
if len(sortedKeys) == 1 {
return firstBucketIndexOverThreshold, toDistributeInsideBucket
}
for _, sortedKey := range sortedKeys {
bucket := h.buckets.GetCounter(sortedKey)
bucketSum := bucket.Sum()
if entries+bucketSum >= threshold {
firstBucketIndexOverThreshold = sortedKey
toDistributeInsideBucket = threshold - entries
break
} else {
entries += bucketSum
}
}
return firstBucketIndexOverThreshold, toDistributeInsideBucket
}
func (h *LatencyHistogram) Add(latency float64) {
key := h.layout.key(latency)
h.buckets.Add(key, latency)
}
func (h *LatencyHistogram) SizeInBytes() int {
sizeOfSplit := int(unsafe.Sizeof(&splitCounter{}))
h.buckets.SizeInBytes()
return h.buckets.SizeInBytes() +
(h.splitLength * sizeOfSplit)
}
type exponentialBucketLayout struct {
growthFactor float64
growthDivisor float64
zeroBucketThreshold float64
}
func newExponentialLayout() *exponentialBucketLayout {
growthFactor := 1.15
return &exponentialBucketLayout{
growthFactor: growthFactor,
growthDivisor: math.Log(growthFactor),
zeroBucketThreshold: 1.0,
}
}
func (l *exponentialBucketLayout) key(latency float64) bucketIndex {
if latency < l.zeroBucketThreshold {
return bucketIndex(0)
}
return bucketIndex(math.Floor(math.Log(latency) / l.growthDivisor))
}
func (l *exponentialBucketLayout) bucketFrom(index bucketIndex) float64 {
if index == 0 {
return 0
}
return math.Pow(l.growthFactor, float64(index))
}
func (l *exponentialBucketLayout) bucketTo(index bucketIndex) float64 {
if index == 0 {
return 1
}
return math.Pow(l.growthFactor, float64(index+1))
}
package servicelevels
import (
"time"
)
type LatencySLO struct {
window *SlidingWindow[float64]
percentiles []PercentileDefinition
namespace string
tags map[string]string
}
type PercentileDefinition struct {
Percentile Percentile
Threshold LatencyMs
Name string
}
func NewLatencySLO(percentiles []PercentileDefinition, windowDuration time.Duration, namespace string, tags map[string]string) *LatencySLO {
var splitLatencies []float64
for i := range percentiles {
splitLatencies = append(splitLatencies, float64(percentiles[i].Threshold))
}
window := NewSlidingWindow(func() Accumulator[float64] {
return NewLatencyHistogram(splitLatencies)
}, windowDuration, 10*time.Second)
return &LatencySLO{
percentiles: percentiles,
window: window,
namespace: namespace,
tags: tags,
}
}
func (s *LatencySLO) Check(now time.Time) []SLOCheck {
activeWindow := s.window.GetActiveWindow(now)
if activeWindow == nil {
return nil
}
histogram := activeWindow.Accumulator.(*LatencyHistogram)
metrics := make([]SLOCheck, len(s.percentiles))
for i, definition := range s.percentiles {
bucket, eventsCount := histogram.ComputePercentile(definition.Percentile.Normalized())
metric := bucket.To
var breach *SLOBreach
if !(metric < float64(definition.Threshold)) {
breach = &SLOBreach{
ThresholdValue: float64(definition.Threshold),
ThresholdUnit: "ms",
Operation: OperationL,
WindowDuration: s.window.Size,
}
}
metrics[i] = SLOCheck{
Namespace: s.namespace,
Metric: Metric{
Name: definition.Name,
Value: metric,
Unit: "ms",
EventsCount: eventsCount,
},
Tags: s.tags,
Breach: breach,
}
}
return metrics
}
func (s *LatencySLO) AddLatency(now time.Time, latency LatencyMs) {
s.window.AddValue(now, float64(latency))
}
package servicelevels
import "time"
type Window[T any] struct {
StartTime time.Time
EndTime time.Time
Accumulator Accumulator[T]
}
func (w *Window[T]) IsActive(now time.Time) bool {
nowSec := now.Unix()
startSec := w.StartTime.Unix()
endSec := w.EndTime.Unix()
return nowSec >= startSec && nowSec < endSec
}
func (w *Window[T]) IsActiveGracePeriod(now time.Time, gracePeriod time.Duration) bool {
graceEnd := w.EndTime
graceStart := w.EndTime.Add(-gracePeriod)
nowSec := now.Unix()
graceStartSec := graceStart.Unix()
graceEndSec := graceEnd.Unix()
return nowSec >= graceStartSec && nowSec < graceEndSec
}
type Accumulator[T any] interface {
Add(value T)
}
type SlidingWindow[T any] struct {
Size time.Duration
GracePeriod time.Duration
windows map[int64]*Window[T]
createAcc func() Accumulator[T]
}
func NewSlidingWindow[T any](createAcc func() Accumulator[T], size time.Duration, gracePeriod time.Duration) *SlidingWindow[T] {
return &SlidingWindow[T]{
Size: size,
GracePeriod: gracePeriod,
createAcc: createAcc,
windows: make(map[int64]*Window[T]),
}
}
func (w *SlidingWindow[T]) GetActiveWindow(now time.Time) *Window[T] {
if len(w.windows) == 0 {
return nil
}
w.pruneObsoleteWindows(now)
for _, window := range w.windows {
if window.IsActiveGracePeriod(now, w.GracePeriod) {
return window
}
}
return nil
}
func (w *SlidingWindow[T]) pruneObsoleteWindows(now time.Time) {
for key, window := range w.windows {
if !window.IsActive(now) {
delete(w.windows, key)
}
}
}
func (w *SlidingWindow[T]) AddValue(now time.Time, value T) {
w.pruneObsoleteWindows(now)
windowStart := now.Truncate(w.GracePeriod)
for offset := time.Duration(0); offset <= w.Size; offset += w.GracePeriod {
startTime := windowStart.Add(-offset)
endTime := startTime.Add(w.Size)
key := startTime.Unix()
_, found := w.windows[key]
if !found {
w.windows[key] = &Window[T]{
StartTime: startTime,
EndTime: endTime,
Accumulator: w.createAcc(),
}
}
}
for key := range w.windows {
w.windows[key].Accumulator.Add(value)
}
}
package servicelevels
import (
"context"
"hotline/concurrency"
"hotline/http"
"hotline/integrations"
"time"
)
type SLODefinitionRepository interface {
GetConfig(ctx context.Context, id integrations.ID) *HttpApiSLODefinition
}
type ChecksReporter interface {
ReportChecks(ctx context.Context, report *CheckReport)
}
type SLOPipeline struct {
fanOut *concurrency.FanOut[concurrency.ScopedAction[IntegrationsScope], IntegrationsScope]
}
func NewSLOPipeline(scopes *concurrency.Scopes[IntegrationsScope]) *SLOPipeline {
p := &SLOPipeline{
fanOut: concurrency.NewActionFanOut(scopes),
}
return p
}
func (p *SLOPipeline) IngestHttpRequest(m *IngestRequestsMessage) {
p.fanOut.Send(m.GetMessageID(), m)
}
func (p *SLOPipeline) Check(m *CheckMessage) {
p.fanOut.Broadcast(m)
}
func (p *SLOPipeline) ModifyRoute(m *ModifyRouteMessage) {
p.fanOut.Send(m.GetMessageID(), m)
}
type Check struct {
SLO []SLOCheck
IntegrationID integrations.ID
}
type CheckReport struct {
Now time.Time
Checks []Check
}
type IntegrationsScope struct {
Integrations map[integrations.ID]*HttpApiSLO
LastObservedTime time.Time
sloRepository SLODefinitionRepository
checkReporter ChecksReporter
}
func (scope *IntegrationsScope) AdvanceTime(now time.Time) {
if now.After(scope.LastObservedTime) {
scope.LastObservedTime = now
}
}
func NewEmptyIntegrationsScope(sloRepository SLODefinitionRepository, checkReporter ChecksReporter) *IntegrationsScope {
return &IntegrationsScope{
Integrations: make(map[integrations.ID]*HttpApiSLO),
LastObservedTime: time.Time{},
sloRepository: sloRepository,
checkReporter: checkReporter,
}
}
type CheckMessage struct {
Now time.Time
}
func (message *CheckMessage) Execute(ctx context.Context, scope *IntegrationsScope) {
scope.AdvanceTime(message.Now)
var checks []Check
for id, integration := range scope.Integrations {
metrics := integration.Check(scope.LastObservedTime)
checks = append(checks, Check{
SLO: metrics,
IntegrationID: id,
})
}
scope.checkReporter.ReportChecks(ctx, &CheckReport{
Now: scope.LastObservedTime,
Checks: checks,
})
}
type IngestRequestsMessage struct {
ID integrations.ID
Now time.Time
Reqs []*HttpRequest
}
func (message *IngestRequestsMessage) GetMessageID() []byte {
return []byte(message.ID)
}
func (message *IngestRequestsMessage) Execute(ctx context.Context, scope *IntegrationsScope) {
scope.AdvanceTime(message.Now)
slo, found := scope.Integrations[message.ID]
if !found {
config := scope.sloRepository.GetConfig(ctx, message.ID)
if config == nil {
return
}
slo = NewHttpApiSLO(*config)
scope.Integrations[message.ID] = slo
}
for _, req := range message.Reqs {
slo.AddRequest(scope.LastObservedTime, req)
}
}
type ModifyRouteMessage struct {
ID integrations.ID
Now time.Time
Route http.Route
}
func (message *ModifyRouteMessage) Execute(ctx context.Context, scope *IntegrationsScope) {
scope.AdvanceTime(message.Now)
slo, found := scope.Integrations[message.ID]
if !found {
return
}
config := scope.sloRepository.GetConfig(ctx, message.ID)
if config == nil {
delete(scope.Integrations, message.ID)
return
}
for _, slosConfig := range config.RouteSLOs {
if slosConfig.Route == message.Route {
slo.UpsertRoute(slosConfig)
break
}
}
}
func (message *ModifyRouteMessage) GetMessageID() []byte {
return []byte(message.ID)
}
package servicelevels
import (
"math"
"strings"
"time"
)
type StateSLO struct {
window *SlidingWindow[string]
expectedStates []string
expectedStatesMap map[string]int
unexpectedStates []string
unexpectedStatesMap map[string]int
breachThreshold float64
unexpectedBreachThreshold float64
namespace string
tags map[string]string
}
const unexpectedStateName = "unexpected"
const expectedStateName = "expected"
func NewStateSLO(
expectedStates []string,
unexpectedStates []string,
breachThreshold Percent,
windowDuration time.Duration,
namespace string,
tags map[string]string) *StateSLO {
expectedStates = uniqueSlice(filterOutUnknownTag(expectedStates))
unexpectedStates = uniqueSlice(filterOutUnknownTag(unexpectedStates))
unexpectedStates = append(unexpectedStates, unexpectedStateName)
statesMap := make(map[string]int, len(expectedStates))
for i, state := range expectedStates {
statesMap[state] = i
}
unexpectedStatesMap := make(map[string]int, len(unexpectedStates))
for i, state := range unexpectedStates {
unexpectedStatesMap[state] = i
}
stateNames := append([]string{}, expectedStates...)
stateNames = append(stateNames, unexpectedStates...)
window := NewSlidingWindow(func() Accumulator[string] {
return NewTagsHistogram(stateNames)
}, windowDuration, 1*time.Minute)
expectedBreachThreshold := roundTo(breachThreshold.Value(), 5)
unexpectedBreachThreshold := roundTo(100.0-expectedBreachThreshold, 5)
return &StateSLO{
window: window,
expectedStates: expectedStates,
expectedStatesMap: statesMap,
unexpectedStates: unexpectedStates,
unexpectedStatesMap: unexpectedStatesMap,
breachThreshold: expectedBreachThreshold,
unexpectedBreachThreshold: unexpectedBreachThreshold,
namespace: namespace,
tags: tags,
}
}
func filterOutUnknownTag(states []string) []string {
filteredStates := make([]string, len(states))
filteredStates = filteredStates[:0]
for _, state := range states {
if !strings.EqualFold(state, unexpectedStateName) {
filteredStates = append(filteredStates, state)
}
}
return filteredStates
}
func (s *StateSLO) AddState(now time.Time, state string) {
_, found := s.expectedStatesMap[state]
if !found {
_, found = s.unexpectedStatesMap[state]
if !found {
state = unexpectedStateName
}
}
s.window.AddValue(now, state)
}
func (s *StateSLO) Check(now time.Time) []SLOCheck {
activeWindow := s.window.GetActiveWindow(now)
if activeWindow == nil {
return nil
}
histogram := activeWindow.Accumulator.(*TagHistogram)
expectedBreach, expectedMetric, expectedEventsCount, expectedBreakdown := s.checkExpectedBreach(histogram)
unexpectedBreach, unexpectedMetric, unexpectedEventsCount, unexpectedBreakdown := s.checkUnexpectedBreach(histogram)
checks := make([]SLOCheck, 2)
checks = checks[:0]
if len(expectedBreakdown) > 0 {
checks = append(checks, SLOCheck{
Namespace: s.namespace,
Metric: Metric{
Name: expectedStateName,
Value: expectedMetric,
Unit: "%",
EventsCount: expectedEventsCount,
},
Breakdown: expectedBreakdown,
Breach: expectedBreach,
Tags: s.tags,
})
}
if unexpectedBreach != nil {
checks = append(checks, SLOCheck{
Namespace: s.namespace,
Metric: Metric{
Name: unexpectedStateName,
Value: unexpectedMetric,
Unit: "%",
EventsCount: unexpectedEventsCount,
},
Breakdown: unexpectedBreakdown,
Breach: unexpectedBreach,
Tags: s.tags,
})
}
return checks
}
func (s *StateSLO) checkUnexpectedBreach(histogram *TagHistogram) (*SLOBreach, float64, int64, []Metric) {
breakDown := make([]Metric, len(s.unexpectedStatesMap))
breakDown = breakDown[:0]
unexpectedSum := float64(0)
eventsSum := int64(0)
for _, state := range s.unexpectedStates {
metric, count := histogram.ComputePercentile(state)
eventsSum += count
if metric != nil {
breakDown = append(breakDown, Metric{
Name: state,
Value: *metric,
Unit: "%",
EventsCount: count,
})
unexpectedSum += *metric
}
}
var breach *SLOBreach
if unexpectedSum > s.unexpectedBreachThreshold {
breach = &SLOBreach{
ThresholdValue: s.unexpectedBreachThreshold,
ThresholdUnit: "%",
Operation: OperationL,
WindowDuration: s.window.Size,
}
}
return breach, unexpectedSum, eventsSum, breakDown
}
func (s *StateSLO) checkExpectedBreach(histogram *TagHistogram) (*SLOBreach, float64, int64, []Metric) {
breakDown := make([]Metric, len(s.expectedStates))
breakDown = breakDown[:0]
expectedSum := float64(0)
eventsSum := int64(0)
for _, state := range s.expectedStates {
metric, count := histogram.ComputePercentile(state)
eventsSum += count
if metric != nil {
breakDown = append(breakDown, Metric{
Name: state,
Value: *metric,
Unit: "%",
EventsCount: count,
})
expectedSum += *metric
}
}
var breach *SLOBreach
sloHolds := expectedSum >= s.breachThreshold
if !sloHolds {
breach = &SLOBreach{
ThresholdValue: s.breachThreshold,
ThresholdUnit: "%",
Operation: OperationGE,
WindowDuration: s.window.Size,
}
}
return breach, expectedSum, eventsSum, breakDown
}
func uniqueSlice(values []string) []string {
uniqueValues := make(map[string]bool, len(values))
newArr := make([]string, len(values))
newArr = newArr[:0]
for _, elem := range values {
if !uniqueValues[elem] {
newArr = append(newArr, elem)
uniqueValues[elem] = true
}
}
return newArr
}
func roundTo(value float64, decimals uint32) float64 {
return math.Round(value*math.Pow(10, float64(decimals))) / math.Pow(10, float64(decimals))
}
package servicelevels
type TagHistogram struct {
buckets *bucketedCounters
layout *tagsLayout
}
func NewTagsHistogram(tags []string) *TagHistogram {
return &TagHistogram{
buckets: newBucketedCounters(),
layout: newTagsLayout(tags),
}
}
func (h *TagHistogram) Add(tag string) {
key := h.layout.key(tag)
if key == nil {
return
}
h.buckets.Add(*key, 1)
}
func (h *TagHistogram) ComputePercentile(tag string) (*float64, int64) {
key := h.layout.key(tag)
if key == nil {
return nil, 0
}
total := float64(h.buckets.Sum())
counter := h.buckets.GetCounter(*key)
if counter == nil {
return nil, 0
}
sum := counter.Sum()
percentile := float64(sum) / total * 100.0
return &percentile, sum
}
type tagsLayout struct {
tags []string
toIndex map[string]*bucketIndex
indexedByTagOrder []bucketIndex
}
func newTagsLayout(tags []string) *tagsLayout {
toIndex := make(map[string]*bucketIndex, len(tags))
sortedIndexes := make([]bucketIndex, len(tags))
for i, tag := range tags {
index := bucketIndex(i)
toIndex[tag] = &index
sortedIndexes[i] = index
}
return &tagsLayout{
tags: tags,
toIndex: toIndex,
indexedByTagOrder: sortedIndexes,
}
}
func (l *tagsLayout) key(tag string) *bucketIndex {
key, found := l.toIndex[tag]
if !found {
return nil
}
return key
}
package servicelevels
import "errors"
var P50, _ = ParsePercentile(50)
var P70, _ = ParsePercentile(70)
var P90, _ = ParsePercentile(90)
var P99, _ = ParsePercentile(99)
var P999, _ = ParsePercentile(99.9)
type Percentile float64
func ParsePercentile(value float64) (Percentile, error) {
if value > 0 && value <= 100.0 {
return Percentile(value / 100.0), nil
}
return Percentile(0), errors.New("value out of range")
}
func (p *Percentile) Normalized() float64 {
return float64(*p)
}
type LatencyMs int64
type Percent float64
func ParsePercent(value float64) (Percent, error) {
if value > 0 && value <= 100.0 {
return Percent(value), nil
}
return Percent(0), errors.New("value out of range")
}
func (p *Percent) Value() float64 {
return float64(*p)
}
package uuid
import (
"github.com/gofrs/uuid/v5"
"io"
"time"
)
type V7StringGenerator func(time.Time) (string, error)
func NewDeterministicV7(randReader io.Reader) V7StringGenerator {
gen := uuid.NewGenWithOptions(
uuid.WithRandomReader(randReader),
)
return func(time time.Time) (string, error) {
uuidV7, err := gen.NewV7AtTime(time)
if err != nil {
return "", err
}
return uuidV7.String(), nil
}
}