diff --git a/go.mod b/go.mod index 32866f43..9073585c 100644 --- a/go.mod +++ b/go.mod @@ -37,10 +37,18 @@ require ( github.com/stretchr/testify v1.11.1 go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.67.0 go.opentelemetry.io/otel v1.42.0 + go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.18.0 + go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.18.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.42.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.42.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.42.0 + go.opentelemetry.io/otel/log v0.18.0 go.opentelemetry.io/otel/metric v1.42.0 + go.opentelemetry.io/otel/sdk v1.42.0 + go.opentelemetry.io/otel/sdk/log v0.18.0 go.opentelemetry.io/otel/sdk/metric v1.42.0 + go.opentelemetry.io/otel/trace v1.42.0 go.uber.org/zap v1.27.1 golang.org/x/crypto v0.49.0 golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa @@ -137,8 +145,7 @@ require ( go.mongodb.org/mongo-driver/v2 v2.5.0 // indirect go.opencensus.io v0.22.5 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/otel/sdk v1.42.0 // indirect - go.opentelemetry.io/otel/trace v1.42.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.42.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect diff --git a/go.sum b/go.sum index 0edf211d..0d741bb6 100644 --- a/go.sum +++ b/go.sum @@ -359,16 +359,32 @@ go.opentelemetry.io/contrib/propagators/b3 v1.42.0 h1:B2Pew5ufEtgkjLF+tSkXjgYZXQ go.opentelemetry.io/contrib/propagators/b3 v1.42.0/go.mod h1:iPgUcSEF5DORW6+yNbdw/YevUy+QqJ508ncjhrRSCjc= go.opentelemetry.io/otel v1.42.0 h1:lSQGzTgVR3+sgJDAU/7/ZMjN9Z+vUip7leaqBKy4sho= go.opentelemetry.io/otel v1.42.0/go.mod h1:lJNsdRMxCUIWuMlVJWzecSMuNjE7dOYyWlqOXWkdqCc= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.18.0 h1:deI9UQMoGFgrg5iLPgzueqFPHevDl+28YKfSpPTI6rY= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.18.0/go.mod h1:PFx9NgpNUKXdf7J4Q3agRxMs3Y07QhTCVipKmLsMKnU= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.18.0 h1:icqq3Z34UrEFk2u+HMhTtRsvo7Ues+eiJVjaJt62njs= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.18.0/go.mod h1:W2m8P+d5Wn5kipj4/xmbt9uMqezEKfBjzVJadfABSBE= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.42.0 h1:MdKucPl/HbzckWWEisiNqMPhRrAOQX8r4jTuGr636gk= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.42.0/go.mod h1:RolT8tWtfHcjajEH5wFIZ4Dgh5jpPdFXYV9pTAk/qjc= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.42.0 h1:H7O6RlGOMTizyl3R08Kn5pdM06bnH8oscSj7o11tmLA= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.42.0/go.mod h1:mBFWu/WOVDkWWsR7Tx7h6EpQB8wsv7P0Yrh0Pb7othc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.42.0 h1:THuZiwpQZuHPul65w4WcwEnkX2QIuMT+UFoOrygtoJw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.42.0/go.mod h1:J2pvYM5NGHofZ2/Ru6zw/TNWnEQp5crgyDeSrYpXkAw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0 h1:zWWrB1U6nqhS/k6zYB74CjRpuiitRtLLi68VcgmOEto= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0/go.mod h1:2qXPNBX1OVRC0IwOnfo1ljoid+RD0QK3443EaqVlsOU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.42.0 h1:uLXP+3mghfMf7XmV4PkGfFhFKuNWoCvvx5wP/wOXo0o= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.42.0/go.mod h1:v0Tj04armyT59mnURNUJf7RCKcKzq+lgJs6QSjHjaTc= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.42.0 h1:s/1iRkCKDfhlh1JF26knRneorus8aOwVIDhvYx9WoDw= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.42.0/go.mod h1:UI3wi0FXg1Pofb8ZBiBLhtMzgoTm1TYkMvn71fAqDzs= +go.opentelemetry.io/otel/log v0.18.0 h1:XgeQIIBjZZrliksMEbcwMZefoOSMI1hdjiLEiiB0bAg= +go.opentelemetry.io/otel/log v0.18.0/go.mod h1:KEV1kad0NofR3ycsiDH4Yjcoj0+8206I6Ox2QYFSNgI= go.opentelemetry.io/otel/metric v1.42.0 h1:2jXG+3oZLNXEPfNmnpxKDeZsFI5o4J+nz6xUlaFdF/4= go.opentelemetry.io/otel/metric v1.42.0/go.mod h1:RlUN/7vTU7Ao/diDkEpQpnz3/92J9ko05BIwxYa2SSI= go.opentelemetry.io/otel/sdk v1.42.0 h1:LyC8+jqk6UJwdrI/8VydAq/hvkFKNHZVIWuslJXYsDo= go.opentelemetry.io/otel/sdk v1.42.0/go.mod h1:rGHCAxd9DAph0joO4W6OPwxjNTYWghRWmkHuGbayMts= +go.opentelemetry.io/otel/sdk/log v0.18.0 h1:n8OyZr7t7otkeTnPTbDNom6rW16TBYGtvyy2Gk6buQw= +go.opentelemetry.io/otel/sdk/log v0.18.0/go.mod h1:C0+wxkTwKpOCZLrlJ3pewPiiQwpzycPI/u6W0Z9fuYk= +go.opentelemetry.io/otel/sdk/log/logtest v0.18.0 h1:l3mYuPsuBx6UKE47BVcPrZoZ0q/KER57vbj2qkgDLXA= +go.opentelemetry.io/otel/sdk/log/logtest v0.18.0/go.mod h1:7cHtiVJpZebB3wybTa4NG+FUo5NPe3PROz1FqB0+qdw= go.opentelemetry.io/otel/sdk/metric v1.42.0 h1:D/1QR46Clz6ajyZ3G8SgNlTJKBdGp84q9RKCAZ3YGuA= go.opentelemetry.io/otel/sdk/metric v1.42.0/go.mod h1:Ua6AAlDKdZ7tdvaQKfSmnFTdHx37+J4ba8MwVCYM5hc= go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4LenLmOYY= diff --git a/internal/controller/api_vms_exec.go b/internal/controller/api_vms_exec.go index a3ebf0df..c632edc0 100644 --- a/internal/controller/api_vms_exec.go +++ b/internal/controller/api_vms_exec.go @@ -62,28 +62,37 @@ func (controller *Controller) execVMLegacy( runCommand string, wait uint64, ) responder.Responder { + key := execSessionKey{vmName: name} + setupTelemetry := controller.execTelemetry.startSetup(ctx.Request.Context(), key, spec) + // Look-up the VM waitContext, waitContextCancel := context.WithTimeout(ctx, time.Duration(wait)*time.Second) defer waitContextCancel() vm, responderImpl := controller.waitForVM(waitContext, name) if responderImpl != nil { + setupTelemetry.finish(errors.New("failed to wait for VM")) + return responderImpl } + setupTelemetry.setVM(vm) session, err := controller.newSSHExecSession( - ctx, waitContext, vm, - execSessionKey{vmName: name}, + key, spec, runCommand, nil, legacyExecSessionPolicy, + setupTelemetry, ) if err != nil { + setupTelemetry.finish(err) + return responder.JSON(http.StatusServiceUnavailable, NewErrorResponse("%v", err)) } + setupTelemetry.finish(nil) // Upgrade HTTP request to a WebSocket connection wsConn, err := websocket.Accept(ctx.Writer, ctx.Request, &websocket.AcceptOptions{ @@ -130,18 +139,22 @@ func (controller *Controller) execVMReconnectable( NewErrorResponse("exec session %q does not exist", sessionID)) } + setupTelemetry := controller.execTelemetry.startSetup(ctx.Request.Context(), key, spec) + waitContext, waitContextCancel := context.WithTimeout(ctx, time.Duration(wait)*time.Second) defer waitContextCancel() vm, responderImpl := controller.waitForVM(waitContext, name) if responderImpl != nil { + setupTelemetry.finish(errors.New("failed to wait for VM")) + return responderImpl } + setupTelemetry.setVM(vm) var err error session, _, err = controller.execSessions.getOrCreate(waitContext, key, func() (*execSession, error) { return controller.newSSHExecSession( - ctx, waitContext, vm, key, @@ -149,11 +162,15 @@ func (controller *Controller) execVMReconnectable( runCommand, controller.execSessions, reconnectableExecSessionPolicy, + setupTelemetry, ) }) if err != nil { + setupTelemetry.finish(err) + return responder.JSON(http.StatusServiceUnavailable, NewErrorResponse("%v", err)) } + setupTelemetry.finish(nil) if !session.specMatches(spec) { return responder.JSON(http.StatusConflict, @@ -177,7 +194,6 @@ func (controller *Controller) execVMReconnectable( } func (controller *Controller) newSSHExecSession( - _ *gin.Context, waitContext context.Context, vm *v1.VM, key execSessionKey, @@ -185,9 +201,8 @@ func (controller *Controller) newSSHExecSession( runCommand string, registry *execSessionRegistry, policy execSessionPolicy, + setupTelemetry *execSetupTelemetry, ) (*execSession, error) { - sessionContext, sessionContextCancel := context.WithCancel(context.Background()) - type sshExecAttempt struct { exec *sshexec.Exec } @@ -234,11 +249,12 @@ func (controller *Controller) newSSHExecSession( }, nil }) if err != nil { - sessionContextCancel() - return nil, err } + sessionTraceContext, sessionTelemetry := setupTelemetry.startSession() + sessionContext, sessionContextCancel := context.WithCancel(sessionTraceContext) + return newExecSessionWithContextAndSpec( sessionContext, sessionContextCancel, @@ -250,6 +266,7 @@ func (controller *Controller) newSSHExecSession( registry, controller.execSessionRetentionTTL, policy, + sessionTelemetry, ), nil } @@ -258,7 +275,7 @@ func (controller *Controller) serveExecSession( wsConn *websocket.Conn, session *execSession, ) responder.Responder { - subscriber, err := session.attach() + subscriber, err := session.attach(ctx.Request.Context()) if err != nil { _ = wsConn.Close(websocket.StatusNormalClosure, err.Error()) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 70090d8d..703002d7 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -68,6 +68,7 @@ type Controller struct { sshServer *sshserver.SSHServer execSessions *execSessionRegistry execSSHClients *execSSHClientPool + execTelemetry *execTelemetry single singleflight.Group @@ -106,6 +107,11 @@ func New(opts ...Option) (*Controller, error) { controller.execSSHConnectionKeepaliveInterval, controller.logger.With("component", "exec-ssh"), ) + execTelemetry, err := newDefaultExecTelemetry() + if err != nil { + return nil, err + } + controller.execTelemetry = execTelemetry // Instantiate the database store, err := badger.NewBadgerStore(controller.dataDir.DBPath(), controller.disableDBCompression, diff --git a/internal/controller/exec_sessions.go b/internal/controller/exec_sessions.go index 8ef0f31a..16d0a648 100644 --- a/internal/controller/exec_sessions.go +++ b/internal/controller/exec_sessions.go @@ -238,12 +238,14 @@ type execSessionSubscriber struct { closeOnce sync.Once sendMu sync.Mutex sentWatermark uint64 + telemetry *execAttachmentTelemetry } -func newExecSessionSubscriber() *execSessionSubscriber { +func newExecSessionSubscriber(telemetry *execAttachmentTelemetry) *execSessionSubscriber { return &execSessionSubscriber{ - frames: make(chan *execstream.Frame, 128), - closed: make(chan struct{}), + frames: make(chan *execstream.Frame, 128), + closed: make(chan struct{}), + telemetry: telemetry, } } @@ -315,8 +317,12 @@ func (subscriber *execSessionSubscriber) markSentLocked(frame *execstream.Frame) return frame } -func (subscriber *execSessionSubscriber) close() { +func (subscriber *execSessionSubscriber) close(outcome string) { subscriber.closeOnce.Do(func() { + if subscriber.telemetry != nil { + subscriber.telemetry.finish(outcome) + } + close(subscriber.closed) subscriber.sendMu.Lock() close(subscriber.frames) @@ -347,6 +353,9 @@ type execSession struct { closed bool expiryTimer *time.Timer + telemetry *execSessionTelemetry + attachmentCount uint64 + startOnce sync.Once done chan struct{} doneOnce sync.Once @@ -374,6 +383,7 @@ func newExecSession( registry, retentionTTL, policy, + nil, ) } @@ -388,6 +398,7 @@ func newExecSessionWithContextAndSpec( registry *execSessionRegistry, retentionTTL time.Duration, policy execSessionPolicy, + telemetry *execSessionTelemetry, ) *execSession { if ctx == nil || cancel == nil { ctx, cancel = context.WithCancel(context.Background()) @@ -406,6 +417,7 @@ func newExecSessionWithContextAndSpec( cancel: cancel, stdin: exec.Stdin(), subscribers: map[*execSessionSubscriber]struct{}{}, + telemetry: telemetry, done: make(chan struct{}), } @@ -427,6 +439,10 @@ func (session *execSession) start() { session.started = true session.mu.Unlock() + if session.telemetry != nil { + session.telemetry.start() + } + go session.run() }) } @@ -441,7 +457,7 @@ func (session *execSession) closeIfUnused() { } } -func (session *execSession) attach() (*execSessionSubscriber, error) { +func (session *execSession) attach(requestCtx context.Context) (*execSessionSubscriber, error) { session.mu.Lock() defer session.mu.Unlock() @@ -449,7 +465,18 @@ func (session *execSession) attach() (*execSessionSubscriber, error) { return nil, errors.New("exec session is closed") } - subscriber := newExecSessionSubscriber() + attachmentKind := execTelemetryAttachmentInitial + if session.attachmentCount > 0 { + attachmentKind = execTelemetryAttachmentReconnect + } + session.attachmentCount++ + + var telemetry *execAttachmentTelemetry + if session.telemetry != nil { + telemetry = session.telemetry.attach(requestCtx, attachmentKind) + } + + subscriber := newExecSessionSubscriber(telemetry) session.subscribers[subscriber] = struct{}{} return subscriber, nil @@ -457,6 +484,7 @@ func (session *execSession) attach() (*execSessionSubscriber, error) { func (session *execSession) detach(subscriber *execSessionSubscriber) { if session.policy.closeOnDetach { + session.detachSubscriber(subscriber, execTelemetryOutcomeDetached) session.close() return @@ -465,16 +493,23 @@ func (session *execSession) detach(subscriber *execSessionSubscriber) { session.mu.Lock() defer session.mu.Unlock() - session.detachLocked(subscriber) + session.detachLocked(subscriber, execTelemetryOutcomeDetached) } -func (session *execSession) detachLocked(subscriber *execSessionSubscriber) { +func (session *execSession) detachSubscriber(subscriber *execSessionSubscriber, outcome string) { + session.mu.Lock() + defer session.mu.Unlock() + + session.detachLocked(subscriber, outcome) +} + +func (session *execSession) detachLocked(subscriber *execSessionSubscriber, outcome string) { if _, ok := session.subscribers[subscriber]; !ok { return } delete(session.subscribers, subscriber) - subscriber.close() + subscriber.close(outcome) } func (session *execSession) writeStdin(data []byte) error { @@ -569,9 +604,14 @@ func (session *execSession) close() { } subscribers := session.takeSubscribersLocked() + started := session.started session.mu.Unlock() - closeSubscribers(subscribers) + closeSubscribers(subscribers, execTelemetryOutcomeClosed) + + if !started && session.telemetry != nil { + session.telemetry.finish(execTelemetryOutcomeClosed, nil) + } session.cancel() _ = session.exec.Close() @@ -602,9 +642,12 @@ func (session *execSession) run() { Type: execstream.FrameTypeError, Error: runErr.Error(), }) + if session.telemetry != nil { + session.telemetry.fail(runErr) + } } - session.markFinished() + session.markFinished(runErr) } func (session *execSession) recordFrame(frame *execstream.Frame) { @@ -635,7 +678,7 @@ func (session *execSession) recordFrame(frame *execstream.Frame) { } } -func (session *execSession) markFinished() { +func (session *execSession) markFinished(runErr error) { session.mu.Lock() if session.finished { session.mu.Unlock() @@ -655,7 +698,11 @@ func (session *execSession) markFinished() { } session.mu.Unlock() - closeSubscribers(subscribers) + closeSubscribers(subscribers, execTelemetryOutcomeFinished) + + if session.telemetry != nil { + session.telemetry.finish(execTelemetryOutcomeFromError(runErr), runErr) + } session.doneOnce.Do(func() { close(session.done) @@ -680,9 +727,9 @@ func (session *execSession) takeSubscribersLocked() []*execSessionSubscriber { return subscribers } -func closeSubscribers(subscribers []*execSessionSubscriber) { +func closeSubscribers(subscribers []*execSessionSubscriber, outcome string) { for _, subscriber := range subscribers { - subscriber.close() + subscriber.close(outcome) } } @@ -690,7 +737,7 @@ func (session *execSession) dropSubscriber(subscriber *execSessionSubscriber) { session.mu.Lock() defer session.mu.Unlock() - session.detachLocked(subscriber) + session.detachLocked(subscriber, execTelemetryOutcomeDropped) } func cloneExecFrame(frame *execstream.Frame) *execstream.Frame { diff --git a/internal/controller/exec_sessions_test.go b/internal/controller/exec_sessions_test.go index e569def7..2e10d375 100644 --- a/internal/controller/exec_sessions_test.go +++ b/internal/controller/exec_sessions_test.go @@ -208,7 +208,7 @@ func TestExecSessionHistoryReplayAndAck(t *testing.T) { Exit: &execstream.Exit{Code: 7}, }) - subscriber, err := session.attach() + subscriber, err := session.attach(context.Background()) require.NoError(t, err) session.sendHistory(subscriber, 0) @@ -237,7 +237,7 @@ func TestExecSessionHistoryReplayStreamsPastSubscriberBuffer(t *testing.T) { }) } - subscriber, err := session.attach() + subscriber, err := session.attach(context.Background()) require.NoError(t, err) done := make(chan struct{}) @@ -271,7 +271,7 @@ func TestExecSessionDetachKeepsProcessAlive(t *testing.T) { session := newManualExecSessionForTest(execSessionKey{vmName: "vm", sessionID: "session"}, registry) exec := session.exec.(*fakeExec) - subscriber, err := session.attach() + subscriber, err := session.attach(context.Background()) require.NoError(t, err) session.detach(subscriber) @@ -286,7 +286,7 @@ func TestLegacyExecSessionDetachStopsProcess(t *testing.T) { session.policy = legacyExecSessionPolicy exec := session.exec.(*fakeExec) - subscriber, err := session.attach() + subscriber, err := session.attach(context.Background()) require.NoError(t, err) session.detach(subscriber) @@ -323,7 +323,7 @@ func TestExecSessionCloseIfUnusedKeepsAttachedSession(t *testing.T) { registry := newExecSessionRegistry() session := newManualExecSessionForTest(execSessionKey{vmName: "vm", sessionID: "session"}, registry) - _, err := session.attach() + _, err := session.attach(context.Background()) require.NoError(t, err) session.closeIfUnused() @@ -353,7 +353,7 @@ func TestExecSessionFinishedEntryExpiresAfterTTL(t *testing.T) { session.retentionTTL = 10 * time.Millisecond registry.sessions[key] = session - session.markFinished() + session.markFinished(nil) require.Eventually(t, func() bool { _, ok := registry.get(key) @@ -366,7 +366,7 @@ func TestExecSessionFinishKeepsReconnectableSubscriberOpen(t *testing.T) { registry := newExecSessionRegistry() session := newManualExecSessionForTest(execSessionKey{vmName: "vm", sessionID: "session"}, registry) - subscriber, err := session.attach() + subscriber, err := session.attach(context.Background()) require.NoError(t, err) session.recordFrame(&execstream.Frame{Type: execstream.FrameTypeStdout, Data: []byte("out")}) @@ -374,7 +374,7 @@ func TestExecSessionFinishKeepsReconnectableSubscriberOpen(t *testing.T) { Type: execstream.FrameTypeExit, Exit: &execstream.Exit{Code: 0}, }) - session.markFinished() + session.markFinished(nil) require.Equal(t, execstream.FrameTypeStdout, (<-subscriber.frames).Type) require.Equal(t, execstream.FrameTypeExit, (<-subscriber.frames).Type) diff --git a/internal/controller/exec_telemetry.go b/internal/controller/exec_telemetry.go new file mode 100644 index 00000000..e62af174 --- /dev/null +++ b/internal/controller/exec_telemetry.go @@ -0,0 +1,411 @@ +package controller + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "sync" + "time" + + "github.com/cirruslabs/orchard/internal/opentelemetry" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/log" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" +) + +const ( + execTelemetryEventSessionStarted = "session_started" + execTelemetryEventAttached = "attached" + execTelemetryEventDetached = "detached" + execTelemetryEventFinished = "finished" + execTelemetryEventFailed = "failed" + + execTelemetryModeLegacy = "legacy" + execTelemetryModeReconnectable = "reconnectable" + + execTelemetryAttachmentInitial = "initial" + execTelemetryAttachmentReconnect = "reconnect" + + execTelemetryOutcomeSuccess = "success" + execTelemetryOutcomeError = "error" + execTelemetryOutcomeCanceled = "canceled" + execTelemetryOutcomeClosed = "closed" + execTelemetryOutcomeDetached = "detached" + execTelemetryOutcomeDropped = "dropped" + execTelemetryOutcomeFinished = "finished" +) + +type execTelemetry struct { + tracer trace.Tracer + logger log.Logger + + setupAttempts metric.Int64Counter + sessionsStarted metric.Int64Counter + sessionsFinished metric.Int64Counter + activeSessions metric.Int64UpDownCounter + attachments metric.Int64Counter + setupDuration metric.Float64Histogram + sessionDuration metric.Float64Histogram + + now func() time.Time +} + +func (telemetry *execTelemetry) startSetup( + ctx context.Context, + key execSessionKey, + spec execSessionSpec, +) *execSetupTelemetry { + if ctx == nil { + ctx = context.Background() + } + + metadata := newExecTelemetryMetadata(key, spec) + ctx, span := telemetry.tracer.Start(ctx, "orchard.exec.setup", + trace.WithAttributes(metadata.spanAttributes()...)) + + return &execSetupTelemetry{ + telemetry: telemetry, + metadata: metadata, + ctx: ctx, + span: span, + startedAt: telemetry.now(), + } +} + +func (telemetry *execTelemetry) emitLog( + ctx context.Context, + event string, + metadata execTelemetryMetadata, + extra ...log.KeyValue, +) { + record := log.Record{} + record.SetTimestamp(telemetry.now()) + record.SetObservedTimestamp(telemetry.now()) + record.SetSeverity(log.SeverityInfo) + record.SetBody(log.StringValue(event)) + record.AddAttributes(metadata.logAttributes(extra...)...) + telemetry.logger.Emit(ctx, record) +} + +func newDefaultExecTelemetry() (*execTelemetry, error) { + return newExecTelemetry( + opentelemetry.DefaultMeter, + opentelemetry.DefaultTracer, + opentelemetry.DefaultLogger, + ) +} + +func newExecTelemetry( + meter metric.Meter, + tracer trace.Tracer, + logger log.Logger, +) (*execTelemetry, error) { + telemetry := &execTelemetry{ + tracer: tracer, + logger: logger, + now: time.Now, + } + + var err error + + telemetry.setupAttempts, err = meter.Int64Counter("org.cirruslabs.orchard.controller.exec.setup_attempts") + if err != nil { + return nil, err + } + + telemetry.sessionsStarted, err = meter.Int64Counter("org.cirruslabs.orchard.controller.exec.sessions_started") + if err != nil { + return nil, err + } + + telemetry.sessionsFinished, err = meter.Int64Counter("org.cirruslabs.orchard.controller.exec.sessions_finished") + if err != nil { + return nil, err + } + + telemetry.activeSessions, err = meter.Int64UpDownCounter("org.cirruslabs.orchard.controller.exec.active_sessions") + if err != nil { + return nil, err + } + + telemetry.attachments, err = meter.Int64Counter("org.cirruslabs.orchard.controller.exec.attachments") + if err != nil { + return nil, err + } + + telemetry.setupDuration, err = meter.Float64Histogram("org.cirruslabs.orchard.controller.exec.setup_time") + if err != nil { + return nil, err + } + + telemetry.sessionDuration, err = meter.Float64Histogram("org.cirruslabs.orchard.controller.exec.session_time") + if err != nil { + return nil, err + } + + return telemetry, nil +} + +type execTelemetryMetadata struct { + vmName string + vmUID string + sessionID string + worker string + mode string + tty bool + interactive bool + commandHash string +} + +func newExecTelemetryMetadata(key execSessionKey, spec execSessionSpec) execTelemetryMetadata { + mode := execTelemetryModeLegacy + if key.sessionID != "" { + mode = execTelemetryModeReconnectable + } + + return execTelemetryMetadata{ + vmName: key.vmName, + sessionID: key.sessionID, + mode: mode, + tty: spec.tty, + interactive: spec.interactive, + commandHash: execCommandHash(spec.command), + } +} + +func (metadata *execTelemetryMetadata) setVM(vm *v1.VM) { + metadata.vmUID = vm.UID + metadata.worker = vm.Worker +} + +func (metadata execTelemetryMetadata) spanAttributes(extra ...attribute.KeyValue) []attribute.KeyValue { + attributes := []attribute.KeyValue{ + attribute.String("vm_name", metadata.vmName), + attribute.String("vm_uid", metadata.vmUID), + attribute.String("session_id", metadata.sessionID), + attribute.String("mode", metadata.mode), + attribute.Bool("tty", metadata.tty), + attribute.Bool("interactive", metadata.interactive), + attribute.String("worker", metadata.worker), + attribute.String("command_sha256", metadata.commandHash), + } + + return append(attributes, extra...) +} + +func (metadata execTelemetryMetadata) metricAttributes(extra ...attribute.KeyValue) []attribute.KeyValue { + attributes := []attribute.KeyValue{ + attribute.String("mode", metadata.mode), + attribute.Bool("tty", metadata.tty), + attribute.Bool("interactive", metadata.interactive), + } + + return append(attributes, extra...) +} + +func (metadata execTelemetryMetadata) logAttributes(extra ...log.KeyValue) []log.KeyValue { + attributes := []log.KeyValue{ + log.String("vm_name", metadata.vmName), + log.String("vm_uid", metadata.vmUID), + log.String("session_id", metadata.sessionID), + log.String("mode", metadata.mode), + log.Bool("tty", metadata.tty), + log.Bool("interactive", metadata.interactive), + log.String("worker", metadata.worker), + log.String("command_sha256", metadata.commandHash), + } + + return append(attributes, extra...) +} + +type execSetupTelemetry struct { + telemetry *execTelemetry + metadata execTelemetryMetadata + ctx context.Context + span trace.Span + startedAt time.Time + + finishOnce sync.Once +} + +func (setup *execSetupTelemetry) setVM(vm *v1.VM) { + setup.metadata.setVM(vm) + setup.span.SetAttributes(setup.metadata.spanAttributes()...) +} + +func (setup *execSetupTelemetry) startSession() (context.Context, *execSessionTelemetry) { + sessionCtx, span := setup.telemetry.tracer.Start( + context.WithoutCancel(setup.ctx), + "orchard.exec.session", + trace.WithAttributes(setup.metadata.spanAttributes()...), + ) + + return sessionCtx, &execSessionTelemetry{ + telemetry: setup.telemetry, + metadata: setup.metadata, + ctx: sessionCtx, + span: span, + } +} + +func (setup *execSetupTelemetry) finish(err error) { + setup.finishOnce.Do(func() { + outcome := execTelemetryOutcomeFromError(err) + setup.telemetry.setupAttempts.Add(setup.ctx, 1, + metric.WithAttributes(setup.metadata.metricAttributes(attribute.String("outcome", outcome))...)) + setup.telemetry.setupDuration.Record(setup.ctx, + setup.telemetry.now().Sub(setup.startedAt).Seconds(), + metric.WithAttributes(setup.metadata.metricAttributes(attribute.String("outcome", outcome))...)) + + setup.span.SetAttributes(attribute.String("outcome", outcome)) + if err != nil { + setup.span.RecordError(err) + setup.span.SetStatus(codes.Error, err.Error()) + setup.telemetry.emitLog(setup.ctx, execTelemetryEventFailed, setup.metadata, + log.String("outcome", outcome)) + } + setup.span.End() + }) +} + +type execSessionTelemetry struct { + telemetry *execTelemetry + metadata execTelemetryMetadata + ctx context.Context + span trace.Span + + mu sync.Mutex + started bool + startedAt time.Time + + finishOnce sync.Once +} + +func (session *execSessionTelemetry) start() { + session.mu.Lock() + if session.started { + session.mu.Unlock() + + return + } + + session.started = true + session.startedAt = session.telemetry.now() + session.mu.Unlock() + + session.telemetry.sessionsStarted.Add(session.ctx, 1, + metric.WithAttributes(session.metadata.metricAttributes()...)) + session.telemetry.activeSessions.Add(session.ctx, 1, + metric.WithAttributes(session.metadata.metricAttributes()...)) + session.telemetry.emitLog(session.ctx, execTelemetryEventSessionStarted, session.metadata) +} + +func (session *execSessionTelemetry) attach( + requestCtx context.Context, + kind string, +) *execAttachmentTelemetry { + var links []trace.Link + if spanContext := trace.SpanContextFromContext(requestCtx); spanContext.IsValid() { + links = append(links, trace.Link{SpanContext: spanContext}) + } + + ctx, span := session.telemetry.tracer.Start( + session.ctx, + "orchard.exec.attachment", + trace.WithAttributes(session.metadata.spanAttributes(attribute.String("attachment_kind", kind))...), + trace.WithLinks(links...), + ) + + session.telemetry.attachments.Add(ctx, 1, + metric.WithAttributes(session.metadata.metricAttributes(attribute.String("attachment_kind", kind))...)) + session.telemetry.emitLog(ctx, execTelemetryEventAttached, session.metadata, + log.String("attachment_kind", kind)) + + return &execAttachmentTelemetry{ + session: session, + ctx: ctx, + span: span, + kind: kind, + } +} + +func (session *execSessionTelemetry) fail(err error) { + if err == nil || errors.Is(err, context.Canceled) { + return + } + + session.telemetry.emitLog(session.ctx, execTelemetryEventFailed, session.metadata, + log.String("outcome", execTelemetryOutcomeError)) +} + +func (session *execSessionTelemetry) finish(outcome string, err error) { + session.finishOnce.Do(func() { + session.mu.Lock() + started := session.started + startedAt := session.startedAt + session.mu.Unlock() + + session.span.SetAttributes(attribute.String("outcome", outcome)) + if err != nil && !errors.Is(err, context.Canceled) { + session.span.RecordError(err) + session.span.SetStatus(codes.Error, err.Error()) + } + + if started { + session.telemetry.sessionsFinished.Add(session.ctx, 1, + metric.WithAttributes(session.metadata.metricAttributes(attribute.String("outcome", outcome))...)) + session.telemetry.activeSessions.Add(session.ctx, -1, + metric.WithAttributes(session.metadata.metricAttributes()...)) + session.telemetry.sessionDuration.Record(session.ctx, + session.telemetry.now().Sub(startedAt).Seconds(), + metric.WithAttributes(session.metadata.metricAttributes(attribute.String("outcome", outcome))...)) + session.telemetry.emitLog(session.ctx, execTelemetryEventFinished, session.metadata, + log.String("outcome", outcome)) + } + + session.span.End() + }) +} + +type execAttachmentTelemetry struct { + session *execSessionTelemetry + ctx context.Context + span trace.Span + kind string + + finishOnce sync.Once +} + +func (attachment *execAttachmentTelemetry) finish(outcome string) { + attachment.finishOnce.Do(func() { + attachment.span.SetAttributes(attribute.String("outcome", outcome)) + if outcome != execTelemetryOutcomeFinished { + attachment.session.telemetry.emitLog(attachment.ctx, execTelemetryEventDetached, + attachment.session.metadata, + log.String("attachment_kind", attachment.kind), + log.String("outcome", outcome)) + } + attachment.span.End() + }) +} + +func execCommandHash(command string) string { + sum := sha256.Sum256([]byte(command)) + + return hex.EncodeToString(sum[:]) +} + +func execTelemetryOutcomeFromError(err error) string { + switch { + case err == nil: + return execTelemetryOutcomeSuccess + case errors.Is(err, context.Canceled): + return execTelemetryOutcomeCanceled + default: + return execTelemetryOutcomeError + } +} diff --git a/internal/controller/exec_telemetry_test.go b/internal/controller/exec_telemetry_test.go new file mode 100644 index 00000000..88310364 --- /dev/null +++ b/internal/controller/exec_telemetry_test.go @@ -0,0 +1,496 @@ +//nolint:testpackage // telemetry tests exercise controller internals directly +package controller + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/cirruslabs/orchard/internal/execstream" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + otellog "go.opentelemetry.io/otel/log" + sdklog "go.opentelemetry.io/otel/sdk/log" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +type execTelemetryHarness struct { + telemetry *execTelemetry + logExporter *memoryLogExporter + metricReader *sdkmetric.ManualReader + tracerProvider *sdktrace.TracerProvider + spanRecorder *tracetest.SpanRecorder +} + +func newExecTelemetryHarness(t *testing.T) *execTelemetryHarness { + t.Helper() + + logExporter := &memoryLogExporter{} + loggerProvider := sdklog.NewLoggerProvider( + sdklog.WithProcessor(sdklog.NewSimpleProcessor(logExporter)), + ) + t.Cleanup(func() { + require.NoError(t, loggerProvider.Shutdown(context.Background())) + }) + + metricReader := sdkmetric.NewManualReader() + meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader)) + t.Cleanup(func() { + require.NoError(t, meterProvider.Shutdown(context.Background())) + }) + + spanRecorder := tracetest.NewSpanRecorder() + tracerProvider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(spanRecorder)) + t.Cleanup(func() { + require.NoError(t, tracerProvider.Shutdown(context.Background())) + }) + + telemetry, err := newExecTelemetry( + meterProvider.Meter("exec-telemetry-test"), + tracerProvider.Tracer("exec-telemetry-test"), + loggerProvider.Logger("exec-telemetry-test"), + ) + require.NoError(t, err) + + return &execTelemetryHarness{ + telemetry: telemetry, + logExporter: logExporter, + metricReader: metricReader, + tracerProvider: tracerProvider, + spanRecorder: spanRecorder, + } +} + +type memoryLogExporter struct { + mu sync.Mutex + records []sdklog.Record +} + +func (exporter *memoryLogExporter) Export(_ context.Context, records []sdklog.Record) error { + exporter.mu.Lock() + defer exporter.mu.Unlock() + + for _, record := range records { + exporter.records = append(exporter.records, record.Clone()) + } + + return nil +} + +func (exporter *memoryLogExporter) Shutdown(context.Context) error { + return nil +} + +func (exporter *memoryLogExporter) ForceFlush(context.Context) error { + return nil +} + +func (exporter *memoryLogExporter) snapshot() []sdklog.Record { + exporter.mu.Lock() + defer exporter.mu.Unlock() + + records := make([]sdklog.Record, 0, len(exporter.records)) + for _, record := range exporter.records { + records = append(records, record.Clone()) + } + + return records +} + +func TestExecTelemetryCapturesReconnectableSessionAcrossSignals(t *testing.T) { + harness := newExecTelemetryHarness(t) + spec := execSessionSpec{ + command: "printf secret", + interactive: true, + tty: true, + } + key := execSessionKey{vmName: "vm-1", sessionID: "session-1"} + vm := &v1.VM{ + Meta: v1.Meta{Name: "vm-1"}, + UID: "uid-1", + Worker: "worker-1", + } + + setupTelemetry := harness.telemetry.startSetup(context.Background(), key, spec) + setupTelemetry.setVM(vm) + sessionTraceContext, sessionTelemetry := setupTelemetry.startSession() + setupTelemetry.finish(nil) + + runRelease := make(chan struct{}) + sessionContext, sessionContextCancel := context.WithCancel(sessionTraceContext) + session := newExecSessionWithContextAndSpec( + sessionContext, + sessionContextCancel, + key, + spec, + spec.command, + &fakeExec{ + run: func(context.Context, string, chan<- *execstream.Frame) error { + <-runRelease + + return nil + }, + }, + nil, + nil, + time.Minute, + reconnectableExecSessionPolicy, + sessionTelemetry, + ) + + firstSubscriber, err := session.attach(context.Background()) + require.NoError(t, err) + session.start() + session.detach(firstSubscriber) + + secondSubscriber, err := session.attach(context.Background()) + require.NoError(t, err) + + close(runRelease) + <-session.done + session.detach(secondSubscriber) + + require.NoError(t, harness.tracerProvider.ForceFlush(context.Background())) + + spans := harness.spanRecorder.Ended() + setupSpan := findSpanByName(t, spans, "orchard.exec.setup") + sessionSpan := findSpanByName(t, spans, "orchard.exec.session") + attachmentSpans := findSpansByName(spans, "orchard.exec.attachment") + require.Len(t, attachmentSpans, 2) + require.Equal(t, setupSpan.SpanContext().SpanID(), sessionSpan.Parent().SpanID()) + for _, attachmentSpan := range attachmentSpans { + require.Equal(t, sessionSpan.SpanContext().SpanID(), attachmentSpan.Parent().SpanID()) + } + + logs := harness.logExporter.snapshot() + require.Equal(t, []string{ + execTelemetryEventAttached, + execTelemetryEventSessionStarted, + execTelemetryEventDetached, + execTelemetryEventAttached, + execTelemetryEventFinished, + execTelemetryEventDetached, + }, logBodies(logs)) + for _, record := range logs { + require.Equal(t, sessionSpan.SpanContext().TraceID(), record.TraceID()) + require.NotContains(t, record.Body().AsString(), spec.command) + require.NotContains(t, logAttributes(record), "command") + require.Equal(t, execCommandHash(spec.command), logAttributes(record)["command_sha256"]) + } + + metrics := collectMetrics(t, harness.metricReader) + require.EqualValues(t, 1, metricInt64Value(t, metrics, + "org.cirruslabs.orchard.controller.exec.setup_attempts", nil)) + require.EqualValues(t, 1, metricInt64Value(t, metrics, + "org.cirruslabs.orchard.controller.exec.sessions_started", nil)) + require.EqualValues(t, 1, metricInt64Value(t, metrics, + "org.cirruslabs.orchard.controller.exec.sessions_finished", + map[string]string{"outcome": execTelemetryOutcomeSuccess})) + require.EqualValues(t, 0, metricInt64Value(t, metrics, + "org.cirruslabs.orchard.controller.exec.active_sessions", nil)) + require.EqualValues(t, 1, metricInt64Value(t, metrics, + "org.cirruslabs.orchard.controller.exec.attachments", + map[string]string{"attachment_kind": execTelemetryAttachmentInitial})) + require.EqualValues(t, 1, metricInt64Value(t, metrics, + "org.cirruslabs.orchard.controller.exec.attachments", + map[string]string{"attachment_kind": execTelemetryAttachmentReconnect})) + require.EqualValues(t, 1, metricHistogramCount(t, metrics, + "org.cirruslabs.orchard.controller.exec.setup_time")) + require.EqualValues(t, 1, metricHistogramCount(t, metrics, + "org.cirruslabs.orchard.controller.exec.session_time")) +} + +func TestExecTelemetryRecordsSetupFailure(t *testing.T) { + harness := newExecTelemetryHarness(t) + setupTelemetry := harness.telemetry.startSetup(context.Background(), + execSessionKey{vmName: "vm-1"}, + execSessionSpec{command: "echo hello"}) + + setupErr := errors.New("setup failed") + setupTelemetry.finish(setupErr) + require.NoError(t, harness.tracerProvider.ForceFlush(context.Background())) + + setupSpan := findSpanByName(t, harness.spanRecorder.Ended(), "orchard.exec.setup") + require.Equal(t, codes.Error, setupSpan.Status().Code) + require.Equal(t, setupErr.Error(), setupSpan.Status().Description) + + logs := harness.logExporter.snapshot() + require.Equal(t, []string{execTelemetryEventFailed}, logBodies(logs)) + + metrics := collectMetrics(t, harness.metricReader) + require.EqualValues(t, 1, metricInt64Value(t, metrics, + "org.cirruslabs.orchard.controller.exec.setup_attempts", + map[string]string{"outcome": execTelemetryOutcomeError})) +} + +func TestExecTelemetryRecordsLegacyDetachAsCanceledSession(t *testing.T) { + harness := newExecTelemetryHarness(t) + spec := execSessionSpec{command: "sleep forever"} + key := execSessionKey{vmName: "vm-1"} + + setupTelemetry := harness.telemetry.startSetup(context.Background(), key, spec) + setupTelemetry.setVM(&v1.VM{Meta: v1.Meta{Name: "vm-1"}, UID: "uid-1", Worker: "worker-1"}) + sessionTraceContext, sessionTelemetry := setupTelemetry.startSession() + setupTelemetry.finish(nil) + + sessionContext, sessionContextCancel := context.WithCancel(sessionTraceContext) + session := newExecSessionWithContextAndSpec( + sessionContext, + sessionContextCancel, + key, + spec, + spec.command, + &fakeExec{ + run: func(ctx context.Context, _ string, _ chan<- *execstream.Frame) error { + <-ctx.Done() + + return ctx.Err() + }, + }, + nil, + nil, + time.Minute, + legacyExecSessionPolicy, + sessionTelemetry, + ) + + subscriber, err := session.attach(context.Background()) + require.NoError(t, err) + session.start() + session.detach(subscriber) + <-session.done + + metrics := collectMetrics(t, harness.metricReader) + require.EqualValues(t, 1, metricInt64Value(t, metrics, + "org.cirruslabs.orchard.controller.exec.sessions_finished", + map[string]string{"outcome": execTelemetryOutcomeCanceled})) +} + +func TestExecTelemetryRecordsSessionFailure(t *testing.T) { + harness := newExecTelemetryHarness(t) + spec := execSessionSpec{command: "exit 1"} + key := execSessionKey{vmName: "vm-1"} + + setupTelemetry := harness.telemetry.startSetup(context.Background(), key, spec) + setupTelemetry.setVM(&v1.VM{Meta: v1.Meta{Name: "vm-1"}, UID: "uid-1", Worker: "worker-1"}) + sessionTraceContext, sessionTelemetry := setupTelemetry.startSession() + setupTelemetry.finish(nil) + + sessionContext, sessionContextCancel := context.WithCancel(sessionTraceContext) + session := newExecSessionWithContextAndSpec( + sessionContext, + sessionContextCancel, + key, + spec, + spec.command, + &fakeExec{ + run: func(context.Context, string, chan<- *execstream.Frame) error { + return errors.New("command failed") + }, + }, + nil, + nil, + time.Minute, + legacyExecSessionPolicy, + sessionTelemetry, + ) + + _, err := session.attach(context.Background()) + require.NoError(t, err) + session.start() + <-session.done + require.NoError(t, harness.tracerProvider.ForceFlush(context.Background())) + + require.Contains(t, logBodies(harness.logExporter.snapshot()), execTelemetryEventFailed) + sessionSpan := findSpanByName(t, harness.spanRecorder.Ended(), "orchard.exec.session") + require.Equal(t, codes.Error, sessionSpan.Status().Code) +} + +func TestExecCommandHashIsStableAndDoesNotExposeRawCommand(t *testing.T) { + const command = "printf secret" + + require.Equal(t, execCommandHash(command), execCommandHash(command)) + require.NotEqual(t, command, execCommandHash(command)) + require.NotEqual(t, execCommandHash(command), execCommandHash(command+"!")) +} + +func TestExecTelemetryMetricsAvoidHighCardinalityAttributes(t *testing.T) { + harness := newExecTelemetryHarness(t) + setupTelemetry := harness.telemetry.startSetup(context.Background(), + execSessionKey{vmName: "vm-1", sessionID: "session-1"}, + execSessionSpec{command: "printf secret", interactive: true, tty: true}) + setupTelemetry.setVM(&v1.VM{Meta: v1.Meta{Name: "vm-1"}, UID: "uid-1", Worker: "worker-1"}) + setupTelemetry.finish(nil) + + metrics := collectMetrics(t, harness.metricReader) + for _, metric := range flattenMetrics(metrics) { + for _, attributes := range metricAttributeSets(metric) { + require.NotContains(t, attributes, "vm_name") + require.NotContains(t, attributes, "vm_uid") + require.NotContains(t, attributes, "session_id") + require.NotContains(t, attributes, "command_sha256") + } + } +} + +func findSpanByName( + t *testing.T, + spans []sdktrace.ReadOnlySpan, + name string, +) sdktrace.ReadOnlySpan { + t.Helper() + + matches := findSpansByName(spans, name) + require.Len(t, matches, 1) + + return matches[0] +} + +func findSpansByName(spans []sdktrace.ReadOnlySpan, name string) []sdktrace.ReadOnlySpan { + var matches []sdktrace.ReadOnlySpan + for _, span := range spans { + if span.Name() == name { + matches = append(matches, span) + } + } + + return matches +} + +func logBodies(records []sdklog.Record) []string { + result := make([]string, 0, len(records)) + for _, record := range records { + result = append(result, record.Body().AsString()) + } + + return result +} + +func logAttributes(record sdklog.Record) map[string]string { + result := map[string]string{} + record.WalkAttributes(func(attribute otellog.KeyValue) bool { + result[attribute.Key] = attribute.Value.AsString() + + return true + }) + + return result +} + +func collectMetrics(t *testing.T, reader *sdkmetric.ManualReader) metricdata.ResourceMetrics { + t.Helper() + + var metrics metricdata.ResourceMetrics + require.NoError(t, reader.Collect(context.Background(), &metrics)) + + return metrics +} + +func flattenMetrics(metrics metricdata.ResourceMetrics) []metricdata.Metrics { + var result []metricdata.Metrics + for _, scopeMetrics := range metrics.ScopeMetrics { + result = append(result, scopeMetrics.Metrics...) + } + + return result +} + +func metricInt64Value( + t *testing.T, + metrics metricdata.ResourceMetrics, + name string, + requiredAttributes map[string]string, +) int64 { + t.Helper() + + for _, metric := range flattenMetrics(metrics) { + if metric.Name != name { + continue + } + + sum, ok := metric.Data.(metricdata.Sum[int64]) + require.True(t, ok) + + for _, dataPoint := range sum.DataPoints { + if attributesContain(dataPoint.Attributes.ToSlice(), requiredAttributes) { + return dataPoint.Value + } + } + } + + t.Fatalf("metric %q with attributes %v not found", name, requiredAttributes) + + return 0 +} + +func metricHistogramCount(t *testing.T, metrics metricdata.ResourceMetrics, name string) uint64 { + t.Helper() + + for _, metric := range flattenMetrics(metrics) { + if metric.Name != name { + continue + } + + histogram, ok := metric.Data.(metricdata.Histogram[float64]) + require.True(t, ok) + require.Len(t, histogram.DataPoints, 1) + + return histogram.DataPoints[0].Count + } + + t.Fatalf("metric %q not found", name) + + return 0 +} + +func metricAttributeSets(metric metricdata.Metrics) []map[string]struct{} { + var result []map[string]struct{} + + switch data := metric.Data.(type) { + case metricdata.Sum[int64]: + for _, dataPoint := range data.DataPoints { + result = append(result, attributeSet(dataPoint.Attributes.ToSlice())) + } + case metricdata.Histogram[float64]: + for _, dataPoint := range data.DataPoints { + result = append(result, attributeSet(dataPoint.Attributes.ToSlice())) + } + } + + return result +} + +func attributeSet(attributes []attribute.KeyValue) map[string]struct{} { + result := make(map[string]struct{}, len(attributes)) + for _, attr := range attributes { + result[string(attr.Key)] = struct{}{} + } + + return result +} + +func attributesContain(attributes []attribute.KeyValue, required map[string]string) bool { + if len(required) == 0 { + return true + } + + actual := map[string]string{} + for _, attr := range attributes { + actual[string(attr.Key)] = attr.Value.AsString() + } + + for key, value := range required { + if actual[key] != value { + return false + } + } + + return true +} diff --git a/internal/opentelemetry/opentelemetry.go b/internal/opentelemetry/opentelemetry.go index 520e0d83..07540270 100644 --- a/internal/opentelemetry/opentelemetry.go +++ b/internal/opentelemetry/opentelemetry.go @@ -2,15 +2,25 @@ package opentelemetry import ( "context" + "os" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + otellogglobal "go.opentelemetry.io/otel/log/global" + sdklog "go.opentelemetry.io/otel/sdk/log" "go.opentelemetry.io/otel/sdk/metric" - "os" + sdktrace "go.opentelemetry.io/otel/sdk/trace" ) var ( - DefaultMeter = otel.Meter("") + DefaultLogger = otellogglobal.Logger("") + DefaultMeter = otel.Meter("") + DefaultTracer = otel.Tracer("") ) func Configure(ctx context.Context) error { @@ -30,6 +40,12 @@ func Configure(ctx context.Context) error { if err := setupMeterProvider(ctx); err != nil { return err } + if err := setupTracerProvider(ctx); err != nil { + return err + } + if err := setupLoggerProvider(ctx); err != nil { + return err + } return nil } @@ -54,3 +70,45 @@ func setupMeterProvider(ctx context.Context) error { return nil } + +func setupTracerProvider(ctx context.Context) error { + httpExporter, err := otlptracehttp.New(ctx) + if err != nil { + return err + } + + grpcExporter, err := otlptracegrpc.New(ctx) + if err != nil { + return err + } + + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(httpExporter), + sdktrace.WithBatcher(grpcExporter), + ) + + otel.SetTracerProvider(tracerProvider) + + return nil +} + +func setupLoggerProvider(ctx context.Context) error { + httpExporter, err := otlploghttp.New(ctx) + if err != nil { + return err + } + + grpcExporter, err := otlploggrpc.New(ctx) + if err != nil { + return err + } + + loggerProvider := sdklog.NewLoggerProvider( + sdklog.WithProcessor(sdklog.NewBatchProcessor(httpExporter)), + sdklog.WithProcessor(sdklog.NewBatchProcessor(grpcExporter)), + ) + + otellogglobal.SetLoggerProvider(loggerProvider) + + return nil +}