// Package collector holds the resources needed to start an OTEL collector testcontainer package collector import ( "context" "fmt" "strings" "time" "github.com/docker/go-connections/nat" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/network" "github.com/testcontainers/testcontainers-go/wait" ) // Collector hold the testcontainer, ports and network used by the OTEL collector. // If instantiating yourself, be sure to populate Collector.Network, otherwise a new network will be generated. type Collector struct { Ports map[int]nat.Port config string Network *testcontainers.DockerNetwork Name string } // Start starts the OTEL collector container. func (c *Collector) Start(ctx context.Context, jaegerName string, seqName string) (func(context.Context) error, error) { emptyFunc := func(context.Context) error { return nil } var err error c.Ports = make(map[int]nat.Port) if c.Network == nil { c.Network, err = network.New(ctx) if err != nil { return emptyFunc, fmt.Errorf("collector: network not provided and could not create a new one: %w", err) } } c.generateConfig(jaegerName, seqName) container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{ Image: "otel/opentelemetry-collector:0.117.0", ExposedPorts: []string{"4317/tcp", "4318/tcp", "13133/tcp"}, Networks: []string{c.Network.Name}, WaitingFor: wait.ForLog("Everything is ready. Begin running and processing data"), Files: []testcontainers.ContainerFile{{ ContainerFilePath: "/etc/otelcol/config.yaml", Reader: strings.NewReader(c.config), FileMode: 0644, }}, }, Started: true, }) if err != nil { return emptyFunc, fmt.Errorf("collector: could not start the testcontainer: %w", err) } c.Name, err = container.Name(ctx) if err != nil { return emptyFunc, fmt.Errorf("collector: could not read the name of the container from the testcontainer: %w", err) } c.Name = c.Name[1:] for _, portNum := range []int{4317, 4318, 13133} { c.Ports[portNum], err = container.MappedPort(ctx, nat.Port(fmt.Sprintf("%d", portNum))) if err != nil { return emptyFunc, fmt.Errorf("collector: could not retrieve port %d from the testcontainer: %w", portNum, err) } } return func(ctx context.Context) error { return container.Terminate(ctx, testcontainers.StopTimeout(time.Second*30)) }, nil } func (c *Collector) generateConfig(jaegerName string, seqName string) { c.config = fmt.Sprintf(` receivers: otlp: protocols: grpc: endpoint: 0.0.0.0:4317 http: endpoint: 0.0.0.0:4318 exporters: otlp: endpoint: %s:4317 tls: insecure: true otlphttp/logs: endpoint: http://%s/ingest/otlp prometheus: endpoint: "0.0.0.0:8889" send_timestamps: true metric_expiration: 180m resource_to_telemetry_conversion: enabled: true extensions: health_check: endpoint: "0.0.0.0:13133" path: "/health/status" check_collector_pipeline: enabled: true interval: "10s" exporter_failure_threshold: 5 service: extensions: [health_check] pipelines: traces: receivers: [otlp] exporters: [otlp] logs: receivers: [otlp] exporters: [otlphttp/logs] metrics: receivers: [otlp] exporters: [prometheus] `, jaegerName, seqName) }
// Package jaeger holds the resources needed to start a Jaeger testcontainer container. package jaeger import ( "context" "fmt" "strings" "time" "github.com/docker/go-connections/nat" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/network" "github.com/testcontainers/testcontainers-go/wait" ) // Jaeger hold the testcontainer, ports and network used by Jaeger. If instantiating yourself, // be sure to populate Jaeger.Network, otherwise a new network will be generated. type Jaeger struct { Ports map[int]nat.Port Network *testcontainers.DockerNetwork Name string } // Start starts the Jaeger container. func (j *Jaeger) Start(ctx context.Context) (func(context.Context) error, error) { emptyFunc := func(context.Context) error { return nil } var err error j.Ports = make(map[int]nat.Port) if j.Network == nil { j.Network, err = network.New(ctx) if err != nil { return emptyFunc, fmt.Errorf("jaeger: network not provided and could not create a new one: %w", err) } } container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{ Image: "jaegertracing/jaeger:latest", ExposedPorts: []string{"16686/tcp", "4318/tcp"}, Networks: []string{j.Network.Name}, WaitingFor: wait.ForLog("Everything is ready."), Cmd: []string{"--config", "/etc/jaeger/config.yaml"}, Files: []testcontainers.ContainerFile{{ ContainerFilePath: "/etc/jaeger/config.yaml", Reader: strings.NewReader(config), FileMode: 0644, }}, }, Started: true, }) if err != nil { return emptyFunc, fmt.Errorf("jaeger: could not start the testcontainer: %w", err) } j.Name, err = container.Name(ctx) if err != nil { return emptyFunc, fmt.Errorf("jaeger: could not read the name of the container from the testcontainer: %w", err) } j.Name = j.Name[1:] for _, portNum := range []int{16686, 4318} { j.Ports[portNum], err = container.MappedPort(ctx, nat.Port(fmt.Sprintf("%d", portNum))) if err != nil { return emptyFunc, fmt.Errorf("jaeger: could not retrieve port %d from the testcontainer: %w", portNum, err) } } return func(ctx context.Context) error { return container.Terminate(ctx, testcontainers.StopTimeout(time.Second*30)) }, nil } var config = `service: extensions: [jaeger_storage, jaeger_query] pipelines: traces: receivers: [otlp] processors: [] exporters: [jaeger_storage_exporter] extensions: jaeger_query: storage: traces: some_storage jaeger_storage: backends: some_storage: memory: max_traces: 100000 receivers: otlp: protocols: grpc: endpoint: "0.0.0.0:4317" http: endpoint: "0.0.0.0:4318" exporters: jaeger_storage_exporter: trace_storage: some_storage `
// Package jaeger holds the resources needed to start a Jaeger testcontainer container. package jaeger import ( "errors" "fmt" "net/url" "time" "github.com/adreasnow/otelstack/request" ) type unmarshalStruct struct { Traces Traces `json:"data"` Total int `json:"total"` Limit int `json:"limit"` Offset int `json:"offset"` Errors any `json:"errors"` } // Traces holds the returned traces from Jaeger. type Traces []struct { TraceID string `json:"traceID"` Spans []Span `json:"spans"` Processes struct { P1 struct { ServiceName string `json:"serviceName"` Tags []any `json:"tags"` } `json:"p1"` } `json:"processes"` Warnings any `json:"warnings"` } // Span holds the data for each span in a trace type Span struct { TraceID string `json:"traceID"` SpanID string `json:"spanID"` OperationName string `json:"operationName"` References []Reference `json:"references"` StartTime int64 `json:"startTime"` Duration int `json:"duration"` Tags []KeyValue `json:"tags"` Logs []Log `json:"logs"` ProcessID string `json:"processID"` Warnings any `json:"warnings"` } // KeyValue holds the key-value store of data within a span type KeyValue struct { Key string `json:"key"` Type string `json:"type"` Value any `json:"value"` } // Reference holds the the relationship data between spans type Reference struct { RefType string `json:"refType"` TraceID string `json:"traceID"` SpanID string `json:"spanID"` } // Log holds the data for a log event type Log struct { Timestamp int64 `json:"timestamp"` Fields []KeyValue `json:"fields"` } var errRespCode = fmt.Errorf("the return was not of status 200") // GetTraces takes in a service names and returns the last n traces corresponding to that service. // There is a retry mechanism implemented; `GetTraces` will keep fetching every 2 seconds, for a maximum // of `maxRetries` times, until Jaeger returns `expectedTraces` number of traces. func (j *Jaeger) GetTraces(expectedTraces int, maxRetries int, service string) (Traces, string, error) { var traces Traces endpoint := fmt.Sprintf("http://localhost:%d/api/traces?service=%s&limit=%d", j.Ports[16686].Int(), url.QueryEscape(service), expectedTraces) var attempts int for { attempts++ if attempts > 1 { time.Sleep(time.Second * 2) } var u unmarshalStruct err := request.Request(endpoint, &u) if err != nil && !errors.Is(err, errRespCode) { return traces, endpoint, fmt.Errorf("jaeger: request returned a non-retryable error: %w", err) } traces = u.Traces if len(traces) >= expectedTraces { return traces, endpoint, nil } if attempts >= maxRetries { return traces, endpoint, fmt.Errorf("jaeger: could not get %d traces in %d attempts", expectedTraces, maxRetries) } } }
// Package otelstack provides a full OTEL collector and receiver clients // conveniently contained within testcontainers. It removes the hassle // of managing inter-container communication, has built in querying // for validating your tests, and uses lightweight services (seq and Jaeger) to keep // start time low. package otelstack import ( "context" "errors" "fmt" "slices" "testing" "github.com/adreasnow/otelstack/collector" "github.com/adreasnow/otelstack/jaeger" "github.com/adreasnow/otelstack/prometheus" "github.com/adreasnow/otelstack/seq" "github.com/testcontainers/testcontainers-go/network" ) // Stack holds structs containing to all the testcontainers. type Stack struct { Collector collector.Collector Jaeger jaeger.Jaeger Seq seq.Seq Prometheus prometheus.Prometheus metrics bool logs bool traces bool } // New creates a new Stack and populates it with child container structs. // Setting the services toggles will disables or enable the respective receiver containers. func New(metrics bool, logs bool, traces bool) *Stack { return &Stack{ Collector: collector.Collector{}, Jaeger: jaeger.Jaeger{}, Seq: seq.Seq{}, Prometheus: prometheus.Prometheus{}, metrics: metrics, logs: logs, traces: traces, } } // SetTestEnvGRPC sets the environment variableOTEL_EXPORTER_OTLP_ENDPOINT // to the gRPC endpoint. func (s *Stack) SetTestEnvGRPC(t *testing.T) { endpoint := fmt.Sprintf("http://localhost:%d", s.Collector.Ports[4317].Int()) t.Logf(" setting endpoint to %s", endpoint) t.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", endpoint) } // SetTestEnvHTTP sets the environment variableOTEL_EXPORTER_OTLP_ENDPOINT // to the HTTP endpoint func (s *Stack) SetTestEnvHTTP(t *testing.T) { endpoint := fmt.Sprintf("http://localhost:%d", s.Collector.Ports[4318].Int()) t.Logf(" setting endpoint to %s", endpoint) t.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", endpoint) } // Start creates a testcontainer network and starts up all the child containers. func (s *Stack) Start(ctx context.Context) (func(context.Context) error, error) { shutdownFuncs := []func(context.Context) error{} emptyFunc := func(context.Context) error { return nil } shutdown := func(ctx context.Context) error { // Reverse the slice so that the network is shut down last slices.Reverse(shutdownFuncs) for _, f := range shutdownFuncs { var err error err = errors.Join(err, f(ctx)) if err != nil { err = fmt.Errorf("otelstack: error shutting down container: %w", err) return err } } return nil } network, err := network.New(ctx) if err != nil { return shutdown, fmt.Errorf("otelstack: could not create new network: %w", err) } shutdownFuncs = append(shutdownFuncs, network.Remove) if s.traces { s.Jaeger.Network = network jaegerShutdown, err := s.Jaeger.Start(ctx) if err != nil { err = fmt.Errorf("otelstack: could not start jaeger: %w", err) if shutdownErr := shutdown(ctx); shutdownErr != nil { err = errors.Join( err, fmt.Errorf("otelstack: error occurred while shutting down services after failed jaeger start: %w", shutdownErr), ) } return emptyFunc, err } shutdownFuncs = append(shutdownFuncs, jaegerShutdown) } if s.logs { s.Seq.Network = network seqShutdown, err := s.Seq.Start(ctx) if err != nil { err = fmt.Errorf("otelstack: could not start seq: %w", err) if shutdownErr := shutdown(ctx); shutdownErr != nil { err = errors.Join( err, fmt.Errorf("otelstack: error occurred while shutting down services after failed seq start: %w", shutdownErr), ) } return emptyFunc, err } shutdownFuncs = append(shutdownFuncs, seqShutdown) } s.Collector.Network = network collectorShutdown, err := s.Collector.Start(ctx, s.Jaeger.Name, s.Seq.Name) if err != nil { err = fmt.Errorf("otelstack: could not start collector: %w", err) if shutdownErr := shutdown(ctx); shutdownErr != nil { err = errors.Join( err, fmt.Errorf("otelstack: error occurred while shutting down services after failed collector start: %w", shutdownErr), ) } return emptyFunc, err } shutdownFuncs = append(shutdownFuncs, collectorShutdown) if s.metrics { s.Prometheus.Network = network prometheusShutdown, err := s.Prometheus.Start(ctx, s.Collector.Name) if err != nil { err = fmt.Errorf("otelstack: could not start prometheus: %w", err) if shutdownErr := shutdown(ctx); shutdownErr != nil { err = errors.Join( err, fmt.Errorf("otelstack: error occurred while shutting down services after failed prometheus start: %w", shutdownErr), ) } return emptyFunc, err } shutdownFuncs = append(shutdownFuncs, prometheusShutdown) } return shutdown, nil }
// Package prometheus holds the resources needed to start a Prometheus testcontainer container. package prometheus import ( "errors" "fmt" "time" "github.com/adreasnow/otelstack/request" "github.com/google/go-querystring/query" ) type unmarshalStruct struct { Status string `json:"status"` Data struct { ResultType string `json:"resultType"` Result []Metrics `json:"result"` } `json:"data"` } // Metrics represents a Prometheus metric series. type Metrics struct { Metric map[string]string `json:"metric"` Values [][]any `json:"values"` } type requestStruct struct { Query string `url:"query"` Start string `url:"start,omitempty"` End string `url:"end,omitempty"` Step string `url:"step,omitempty"` } var errRespCode = fmt.Errorf("the return was not of status 200") // GetMetrics takes in a service names and returns the last n `metricName` events corresponding to that `service` over that `since`. // There is a retry mechanism implemented; `GetMetrics` will keep fetching every 2 seconds, for a maximum // of `maxRetries` times, until Prometheus returns `expectedDataPoints` number of metrics points. func (p *Prometheus) GetMetrics(expectedDataPoints int, maxRetries int, metricName string, service string, since time.Duration) (Metrics, string, error) { var endpoint string var metrics Metrics startTime := time.Now() var attempts int for { attempts++ if attempts > 1 { time.Sleep(time.Second * 2) } sinceStart := time.Since(startTime) r := requestStruct{ Query: fmt.Sprintf("%s{service_name=\"%s\"}", metricName, service), Start: time.Now().Add(-since - sinceStart).Format(time.RFC3339), End: time.Now().Format(time.RFC3339), Step: "10s", } v, queryErr := query.Values(r) if queryErr != nil { return metrics, "", fmt.Errorf("prometheus: could not marshal values into a url query for request %v: %w", r, queryErr) } endpoint = fmt.Sprintf("http://localhost:%d/api/v1/query_range?%s", p.Ports[9090].Int(), v.Encode()) var u unmarshalStruct err := request.Request(endpoint, &u) if err != nil && !errors.Is(err, errRespCode) { return metrics, endpoint, fmt.Errorf("prometheus: request returned a non-retryable error: %w", err) } if len(u.Data.Result) > 0 { metrics = u.Data.Result[0] } if len(metrics.Values) >= expectedDataPoints { return metrics, endpoint, nil } if attempts >= maxRetries { return metrics, endpoint, fmt.Errorf("prometheus: could not get %d metrics in %d attempts", expectedDataPoints, maxRetries) } } }
// Package prometheus holds the resources needed to start a Prometheus testcontainer container. package prometheus import ( "context" "fmt" "strings" "time" "github.com/docker/go-connections/nat" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/network" "github.com/testcontainers/testcontainers-go/wait" ) // Prometheus holds the testcontainer, ports and network used by Jaeger. If instantiating yourself, // be sure to populate Jaeger.Network, otherwise a new network will be generated. type Prometheus struct { Ports map[int]nat.Port Network *testcontainers.DockerNetwork Name string config string } // Start starts the Prometheus container. func (p *Prometheus) Start(ctx context.Context, collectorName string) (func(context.Context) error, error) { emptyFunc := func(context.Context) error { return nil } var err error p.Ports = make(map[int]nat.Port) if p.Network == nil { p.Network, err = network.New(ctx) if err != nil { return emptyFunc, fmt.Errorf("prometheus: network not provided and could not create a new one: %w", err) } } p.generateConfig(collectorName) container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{ Image: "prom/prometheus:v3.2.1", ExposedPorts: []string{"9090/tcp"}, Networks: []string{p.Network.Name}, WaitingFor: wait.ForLog("Server is ready to receive web requests."), Files: []testcontainers.ContainerFile{{ ContainerFilePath: "/etc/prometheus/prometheus.yml", Reader: strings.NewReader(p.config), FileMode: 0644, }}, }, Started: true, }) if err != nil { return emptyFunc, fmt.Errorf("prometheus: could not start the testcontainer: %w", err) } p.Name, err = container.Name(ctx) if err != nil { return emptyFunc, fmt.Errorf("prometheus: could not read the name of the container from the testcontainer: %w", err) } p.Name = p.Name[1:] for _, portNum := range []int{9090} { p.Ports[portNum], err = container.MappedPort(ctx, nat.Port(fmt.Sprintf("%d", portNum))) if err != nil { return emptyFunc, fmt.Errorf("prometheus: could not retrieve port %d from the testcontainer: %w", portNum, err) } } return func(ctx context.Context) error { return container.Terminate(ctx, testcontainers.StopTimeout(time.Second*30)) }, nil } func (p *Prometheus) generateConfig(collectorName string) { p.config = fmt.Sprintf(` global: scrape_interval: 2s evaluation_interval: 2s scrape_configs: - job_name: otel static_configs: - targets: ["%s:8889"] otlp: keep_identifying_resource_attributes: true # Recommended attributes to be promoted to labels. promote_resource_attributes: - service.instance.id - service.name - service.namespace - service.version storage: tsdb: out_of_order_time_window: 10m `, collectorName) }
// Package request provides functions for making HTTP requests and unmarshaling the response body into a struct. package request import ( "encoding/json" "fmt" "io" "net/http" "slices" ) // ErrRetryableCode is returned when the response status code is not 200 and is retryable. var ErrRetryableCode = fmt.Errorf("the return was not of status 200") // ErrNonRetryableCode is returned when the response status code is not 200 and is not retryable. var ErrNonRetryableCode = fmt.Errorf("the return was not of status 200") var retryCodes = []int{ http.StatusRequestTimeout, http.StatusTooManyRequests, http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout, http.StatusConflict, http.StatusLocked, http.StatusTooEarly, } // Request sends a GET request to the specified endpoint and unmarshals the response body into the provided struct. func Request[U any](endpoint string, unmarshal *U) (err error) { resp, err := http.Get(endpoint) if err != nil { return fmt.Errorf("request: could not get response on endpoint %s: %w", endpoint, err) } defer func() { if deferErr := resp.Body.Close(); deferErr != nil { err = fmt.Errorf("request: error while closing response body %s: %w", endpoint, deferErr) } }() if resp.StatusCode != 200 { var err error switch slices.Contains(retryCodes, resp.StatusCode) { case true: err = ErrRetryableCode case false: err = ErrNonRetryableCode } return fmt.Errorf("request: response from was not 200: got %d on endpoint %s: %w", resp.StatusCode, endpoint, err) } body, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("request: could not read body from response for endpoint %s: %w", endpoint, err) } err = json.Unmarshal(body, unmarshal) if err != nil { return fmt.Errorf("request: could not unmarshal response body %s: %w", string(body), err) } return nil }
// Package seq holds the resources needed to start a Seq testcontainer. package seq import ( "errors" "fmt" "time" "github.com/adreasnow/otelstack/request" ) // Events holds the returned logging events from Seq. type Events []struct { Timestamp time.Time `json:"Timestamp"` Properties []Property `json:"Properties"` Messages []Message `json:"MessageTemplateTokens"` EventType string `json:"EventType"` Exception string `json:"Exception"` Level string `json:"Level"` TraceID string `json:"TraceId"` SpanID string `json:"SpanId"` SpanKind string `json:"SpanKind"` Resource []Resource `json:"Resource"` ID string `json:"Id"` Links struct { Self string `json:"Self"` Group string `json:"Group"` } `json:"Links"` } // Message holds the message template tokens from Seq. type Message struct { Text string `json:"Text"` } // Property holds the property name and value from Seq. type Property struct { Name string `json:"Name"` Value any `json:"Value"` } // Resource holds the resource name and value from Seq. type Resource struct { Name string `json:"Name"` Value struct { Name string `json:"name"` } `json:"Value"` } var errRespCode = fmt.Errorf("the return was not of status 200") // GetEvents takes returns the last n logging events that were received by Seq. // There is a retry mechanism implemented; `GetEvents` will keep fetching every 2 seconds, for a maximum // of `maxRetries` times, until Jaeger returns `expectedEvents` number of events. func (s *Seq) GetEvents(expectedEvents int, maxRetries int) (Events, string, error) { var events Events endpoint := fmt.Sprintf("http://localhost:%d/api/events?count=%d", s.Ports[80].Int(), expectedEvents) var attempts int for { attempts++ if attempts > 1 { time.Sleep(time.Second * 2) } err := request.Request(endpoint, &events) if err != nil && !errors.Is(err, errRespCode) { return events, endpoint, fmt.Errorf("seq: request returned a non-retryable error: %w", err) } if len(events) >= expectedEvents { return events, endpoint, nil } if attempts >= maxRetries { return events, endpoint, fmt.Errorf("seq: could not get %d events in %d attempts", expectedEvents, maxRetries) } } }
// Package seq holds the resources needed to start a Seq testcontainer. package seq import ( "context" "fmt" "time" "github.com/docker/go-connections/nat" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/network" "github.com/testcontainers/testcontainers-go/wait" ) // Seq hold the testcontainer, ports and network used by Seq. If instantiating yourself, // be sure to populate Seq.Network, otherwise a new network will be generated. type Seq struct { Ports map[int]nat.Port Network *testcontainers.DockerNetwork Name string } // Start starts the Seq container. func (s *Seq) Start(ctx context.Context) (func(context.Context) error, error) { emptyFunc := func(context.Context) error { return nil } var err error s.Ports = make(map[int]nat.Port) if s.Network == nil { s.Network, err = network.New(ctx) if err != nil { return emptyFunc, fmt.Errorf("seq: network not provided and could not create a new one: %w", err) } } container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{ Image: "datalust/seq:2024.3", ExposedPorts: []string{"80/tcp", "5341/tcp"}, Networks: []string{s.Network.Name}, WaitingFor: wait.ForLog("Seq listening on"), Env: map[string]string{"ACCEPT_EULA": "Y"}, }, Started: true, }) if err != nil { return emptyFunc, fmt.Errorf("seq: could not start the testcontainer: %w", err) } s.Name, err = container.Name(ctx) if err != nil { return emptyFunc, fmt.Errorf("seq: could not read the name of the container from the testcontainer: %w", err) } s.Name = s.Name[1:] for _, portNum := range []int{80, 5341} { s.Ports[portNum], err = container.MappedPort(ctx, nat.Port(fmt.Sprintf("%d", portNum))) if err != nil { return emptyFunc, fmt.Errorf("seq: could not retrieve port %d from the testcontainer: %w", portNum, err) } } return func(ctx context.Context) error { return container.Terminate(ctx, testcontainers.StopTimeout(time.Second*30)) }, nil }