Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,18 @@ require (
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.67.0
go.opentelemetry.io/otel v1.42.0
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.18.0
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.18.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.42.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.42.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.42.0
go.opentelemetry.io/otel/log v0.18.0
go.opentelemetry.io/otel/metric v1.42.0
go.opentelemetry.io/otel/sdk v1.42.0
go.opentelemetry.io/otel/sdk/log v0.18.0
go.opentelemetry.io/otel/sdk/metric v1.42.0
go.opentelemetry.io/otel/trace v1.42.0
go.uber.org/zap v1.27.1
golang.org/x/crypto v0.49.0
golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa
Expand Down Expand Up @@ -137,8 +145,7 @@ require (
go.mongodb.org/mongo-driver/v2 v2.5.0 // indirect
go.opencensus.io v0.22.5 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel/sdk v1.42.0 // indirect
go.opentelemetry.io/otel/trace v1.42.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.42.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
Expand Down
16 changes: 16 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -359,16 +359,32 @@ go.opentelemetry.io/contrib/propagators/b3 v1.42.0 h1:B2Pew5ufEtgkjLF+tSkXjgYZXQ
go.opentelemetry.io/contrib/propagators/b3 v1.42.0/go.mod h1:iPgUcSEF5DORW6+yNbdw/YevUy+QqJ508ncjhrRSCjc=
go.opentelemetry.io/otel v1.42.0 h1:lSQGzTgVR3+sgJDAU/7/ZMjN9Z+vUip7leaqBKy4sho=
go.opentelemetry.io/otel v1.42.0/go.mod h1:lJNsdRMxCUIWuMlVJWzecSMuNjE7dOYyWlqOXWkdqCc=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.18.0 h1:deI9UQMoGFgrg5iLPgzueqFPHevDl+28YKfSpPTI6rY=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.18.0/go.mod h1:PFx9NgpNUKXdf7J4Q3agRxMs3Y07QhTCVipKmLsMKnU=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.18.0 h1:icqq3Z34UrEFk2u+HMhTtRsvo7Ues+eiJVjaJt62njs=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.18.0/go.mod h1:W2m8P+d5Wn5kipj4/xmbt9uMqezEKfBjzVJadfABSBE=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.42.0 h1:MdKucPl/HbzckWWEisiNqMPhRrAOQX8r4jTuGr636gk=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.42.0/go.mod h1:RolT8tWtfHcjajEH5wFIZ4Dgh5jpPdFXYV9pTAk/qjc=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.42.0 h1:H7O6RlGOMTizyl3R08Kn5pdM06bnH8oscSj7o11tmLA=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.42.0/go.mod h1:mBFWu/WOVDkWWsR7Tx7h6EpQB8wsv7P0Yrh0Pb7othc=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.42.0 h1:THuZiwpQZuHPul65w4WcwEnkX2QIuMT+UFoOrygtoJw=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.42.0/go.mod h1:J2pvYM5NGHofZ2/Ru6zw/TNWnEQp5crgyDeSrYpXkAw=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0 h1:zWWrB1U6nqhS/k6zYB74CjRpuiitRtLLi68VcgmOEto=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0/go.mod h1:2qXPNBX1OVRC0IwOnfo1ljoid+RD0QK3443EaqVlsOU=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.42.0 h1:uLXP+3mghfMf7XmV4PkGfFhFKuNWoCvvx5wP/wOXo0o=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.42.0/go.mod h1:v0Tj04armyT59mnURNUJf7RCKcKzq+lgJs6QSjHjaTc=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.42.0 h1:s/1iRkCKDfhlh1JF26knRneorus8aOwVIDhvYx9WoDw=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.42.0/go.mod h1:UI3wi0FXg1Pofb8ZBiBLhtMzgoTm1TYkMvn71fAqDzs=
go.opentelemetry.io/otel/log v0.18.0 h1:XgeQIIBjZZrliksMEbcwMZefoOSMI1hdjiLEiiB0bAg=
go.opentelemetry.io/otel/log v0.18.0/go.mod h1:KEV1kad0NofR3ycsiDH4Yjcoj0+8206I6Ox2QYFSNgI=
go.opentelemetry.io/otel/metric v1.42.0 h1:2jXG+3oZLNXEPfNmnpxKDeZsFI5o4J+nz6xUlaFdF/4=
go.opentelemetry.io/otel/metric v1.42.0/go.mod h1:RlUN/7vTU7Ao/diDkEpQpnz3/92J9ko05BIwxYa2SSI=
go.opentelemetry.io/otel/sdk v1.42.0 h1:LyC8+jqk6UJwdrI/8VydAq/hvkFKNHZVIWuslJXYsDo=
go.opentelemetry.io/otel/sdk v1.42.0/go.mod h1:rGHCAxd9DAph0joO4W6OPwxjNTYWghRWmkHuGbayMts=
go.opentelemetry.io/otel/sdk/log v0.18.0 h1:n8OyZr7t7otkeTnPTbDNom6rW16TBYGtvyy2Gk6buQw=
go.opentelemetry.io/otel/sdk/log v0.18.0/go.mod h1:C0+wxkTwKpOCZLrlJ3pewPiiQwpzycPI/u6W0Z9fuYk=
go.opentelemetry.io/otel/sdk/log/logtest v0.18.0 h1:l3mYuPsuBx6UKE47BVcPrZoZ0q/KER57vbj2qkgDLXA=
go.opentelemetry.io/otel/sdk/log/logtest v0.18.0/go.mod h1:7cHtiVJpZebB3wybTa4NG+FUo5NPe3PROz1FqB0+qdw=
go.opentelemetry.io/otel/sdk/metric v1.42.0 h1:D/1QR46Clz6ajyZ3G8SgNlTJKBdGp84q9RKCAZ3YGuA=
go.opentelemetry.io/otel/sdk/metric v1.42.0/go.mod h1:Ua6AAlDKdZ7tdvaQKfSmnFTdHx37+J4ba8MwVCYM5hc=
go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4LenLmOYY=
Expand Down
35 changes: 26 additions & 9 deletions internal/controller/api_vms_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,37 @@ func (controller *Controller) execVMLegacy(
runCommand string,
wait uint64,
) responder.Responder {
key := execSessionKey{vmName: name}
setupTelemetry := controller.execTelemetry.startSetup(ctx.Request.Context(), key, spec)

// Look-up the VM
waitContext, waitContextCancel := context.WithTimeout(ctx, time.Duration(wait)*time.Second)
defer waitContextCancel()

vm, responderImpl := controller.waitForVM(waitContext, name)
if responderImpl != nil {
setupTelemetry.finish(errors.New("failed to wait for VM"))

return responderImpl
}
setupTelemetry.setVM(vm)

session, err := controller.newSSHExecSession(
ctx,
waitContext,
vm,
execSessionKey{vmName: name},
key,
spec,
runCommand,
nil,
legacyExecSessionPolicy,
setupTelemetry,
)
if err != nil {
setupTelemetry.finish(err)

return responder.JSON(http.StatusServiceUnavailable, NewErrorResponse("%v", err))
}
setupTelemetry.finish(nil)

// Upgrade HTTP request to a WebSocket connection
wsConn, err := websocket.Accept(ctx.Writer, ctx.Request, &websocket.AcceptOptions{
Expand Down Expand Up @@ -130,30 +139,38 @@ func (controller *Controller) execVMReconnectable(
NewErrorResponse("exec session %q does not exist", sessionID))
}

setupTelemetry := controller.execTelemetry.startSetup(ctx.Request.Context(), key, spec)

waitContext, waitContextCancel := context.WithTimeout(ctx, time.Duration(wait)*time.Second)
defer waitContextCancel()

vm, responderImpl := controller.waitForVM(waitContext, name)
if responderImpl != nil {
setupTelemetry.finish(errors.New("failed to wait for VM"))

return responderImpl
}
setupTelemetry.setVM(vm)

var err error
session, _, err = controller.execSessions.getOrCreate(waitContext, key, func() (*execSession, error) {
return controller.newSSHExecSession(
ctx,
waitContext,
vm,
key,
spec,
runCommand,
controller.execSessions,
reconnectableExecSessionPolicy,
setupTelemetry,
)
})
if err != nil {
setupTelemetry.finish(err)

return responder.JSON(http.StatusServiceUnavailable, NewErrorResponse("%v", err))
}
setupTelemetry.finish(nil)

if !session.specMatches(spec) {
return responder.JSON(http.StatusConflict,
Expand All @@ -177,17 +194,15 @@ func (controller *Controller) execVMReconnectable(
}

func (controller *Controller) newSSHExecSession(
_ *gin.Context,
waitContext context.Context,
vm *v1.VM,
key execSessionKey,
spec execSessionSpec,
runCommand string,
registry *execSessionRegistry,
policy execSessionPolicy,
setupTelemetry *execSetupTelemetry,
) (*execSession, error) {
sessionContext, sessionContextCancel := context.WithCancel(context.Background())

type sshExecAttempt struct {
exec *sshexec.Exec
}
Expand Down Expand Up @@ -234,11 +249,12 @@ func (controller *Controller) newSSHExecSession(
}, nil
})
if err != nil {
sessionContextCancel()

return nil, err
}

sessionTraceContext, sessionTelemetry := setupTelemetry.startSession()
sessionContext, sessionContextCancel := context.WithCancel(sessionTraceContext)

return newExecSessionWithContextAndSpec(
sessionContext,
sessionContextCancel,
Expand All @@ -250,6 +266,7 @@ func (controller *Controller) newSSHExecSession(
registry,
controller.execSessionRetentionTTL,
policy,
sessionTelemetry,
), nil
}

Expand All @@ -258,7 +275,7 @@ func (controller *Controller) serveExecSession(
wsConn *websocket.Conn,
session *execSession,
) responder.Responder {
subscriber, err := session.attach()
subscriber, err := session.attach(ctx.Request.Context())
if err != nil {
_ = wsConn.Close(websocket.StatusNormalClosure, err.Error())

Expand Down
6 changes: 6 additions & 0 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Controller struct {
sshServer *sshserver.SSHServer
execSessions *execSessionRegistry
execSSHClients *execSSHClientPool
execTelemetry *execTelemetry

single singleflight.Group

Expand Down Expand Up @@ -106,6 +107,11 @@ func New(opts ...Option) (*Controller, error) {
controller.execSSHConnectionKeepaliveInterval,
controller.logger.With("component", "exec-ssh"),
)
execTelemetry, err := newDefaultExecTelemetry()
if err != nil {
return nil, err
}
controller.execTelemetry = execTelemetry

// Instantiate the database
store, err := badger.NewBadgerStore(controller.dataDir.DBPath(), controller.disableDBCompression,
Expand Down
Loading