diff --git a/README.md b/README.md index be9916e46..1ddc51deb 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,32 @@ func init() { ### Collecting Metrics Once the collector has been initialized with `instana.InitCollector`, application metrics such as memory, CPU consumption, active goroutine count etc will be automatically collected and reported to the Agent without further actions or configurations to the SDK. + +#### Metrics Transmission Interval + +Metrics are transmitted to the Instana Agent at a configurable interval. The interval is configured through the agent's `configuration.yaml` file. + +**Configuration:** + +In the agent's `configuration.yaml`: +```yaml +# Configure metrics transmission interval for Go applications +com.instana.plugin.golang: + poll_rate: 5 # Valid range: 1-3600 (seconds) +``` + +**Valid Values:** +- Any value between `1` and `3600` seconds +- Default: `1` second (if not configured) +- Minimum: `1` second +- Maximum: `3600` seconds (1 hour) + +**Behavior:** +- If `poll_rate` is not configured, defaults to 1 second +- Values less than 1 will be set to the minimum value of 1 second. +- Values greater than 3600 will be set to the maximum value of 3600 seconds. +- Configuration is applied when the Go sensor announces itself to the agent. + This data is then already available in the dashboard. ### Tracing Calls diff --git a/agent.go b/agent.go index 84fbcbb0a..a010f1424 100644 --- a/agent.go +++ b/agent.go @@ -54,6 +54,9 @@ type agentResponse struct { ExtraHTTPHeaders []string `json:"extra-http-headers"` Disable []map[string]bool `json:"disable"` } `json:"tracing"` + PluginConfig struct { + PollRate int `json:"poll_rate"` // Poll rate in seconds + } `json:"plugin.golang"` } func (a *agentResponse) getExtraHTTPHeaders() []string { diff --git a/fsm.go b/fsm.go index 4601028ad..b98dc4a48 100644 --- a/fsm.go +++ b/fsm.go @@ -268,10 +268,30 @@ func (r *fsmS) applyHostAgentSettings(resp agentResponse) { } r.applyDisableTracingConfig(resp) + r.applyMetricsPollRateConfig(resp) r.logger.Debug("CollectableHTTPHeaders used: ", sensor.options.Tracer.CollectableHTTPHeaders) } +// applyMetricsPollRateConfig applies the metrics poll rate configuration from agent response. +func (r *fsmS) applyMetricsPollRateConfig(resp agentResponse) { + // Check if sensor is initialized + if sensor == nil || sensor.options == nil { + r.logger.Debug("Sensor not initialized, skipping poll_rate configuration") + return + } + + // If no poll rate is provided by agent, use default (1 second) + if resp.PluginConfig.PollRate == 0 { + r.logger.Debug("No poll_rate configuration received from agent, using default 1 second") + sensor.options.Metrics.setTransmissionInterval(1) + return + } + + r.logger.Debug("Applying metrics poll_rate configuration from agent: ", resp.PluginConfig.PollRate, " second(s)") + sensor.options.Metrics.setTransmissionInterval(resp.PluginConfig.PollRate) +} + func (r *fsmS) applyDisableTracingConfig(resp agentResponse) { // Do nothing if we have no configuration from the agent if len(resp.Tracing.Disable) == 0 { @@ -420,6 +440,7 @@ func (r *fsmS) reset() { func (r *fsmS) ready(_ context.Context, e *f.Event) { go delayed.flush() + go sensor.meter.reset(sensor.options.Metrics.getTransmissionInterval()) } func (r *fsmS) cpuSetFileContent(pid int) string { diff --git a/fsm_test.go b/fsm_test.go index 6c8e69581..a293597ae 100644 --- a/fsm_test.go +++ b/fsm_test.go @@ -635,3 +635,77 @@ func TestApplyDisableTracingConfig(t *testing.T) { }) } } + +func Test_fsmS_applyMetricsPollRateConfig(t *testing.T) { + tests := []struct { + name string + pollRate int + expectedSecs int + }{ + { + name: "Valid 1 second (minimum)", + pollRate: 1, + expectedSecs: 1, + }, + { + name: "Valid 5 seconds", + pollRate: 5, + expectedSecs: 5, + }, + { + name: "Valid 10 seconds", + pollRate: 10, + expectedSecs: 10, + }, + { + name: "Valid 60 seconds", + pollRate: 60, + expectedSecs: 60, + }, + { + name: "Valid 3600 seconds (maximum)", + pollRate: 3600, + expectedSecs: 3600, + }, + { + name: "Zero seconds - sets to minimum (1)", + pollRate: 0, + expectedSecs: 1, + }, + { + name: "Negative value - sets to minimum (1)", + pollRate: -5, + expectedSecs: 1, + }, + { + name: "Exceeds maximum (5000) - sets to maximum (3600)", + pollRate: 5000, + expectedSecs: 3600, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Initialize sensor with default options + sensor = newSensor(DefaultOptions()) + defer func() { sensor = nil }() + + fsm := &fsmS{ + logger: &testLogger{}, + } + + resp := agentResponse{ + PluginConfig: struct { + PollRate int `json:"poll_rate"` + }{ + PollRate: tt.pollRate, + }, + } + + fsm.applyMetricsPollRateConfig(resp) + + interval := sensor.options.Metrics.getTransmissionInterval() + assert.Equal(t, time.Duration(tt.expectedSecs)*time.Second, interval) + }) + } +} diff --git a/meter.go b/meter.go index ef77bca6e..8fff6930a 100644 --- a/meter.go +++ b/meter.go @@ -5,11 +5,19 @@ package instana import ( "runtime" + "sync" "time" "github.com/instana/go-sensor/acceptor" ) +const ( + // Metrics transmission interval constraints (in seconds) + defaultTransmissionInterval = 1 + minTransmissionInterval = 1 + maxTransmissionInterval = 3600 +) + // SnapshotS struct to hold snapshot data type SnapshotS acceptor.RuntimeInfo @@ -23,8 +31,57 @@ type MetricsS acceptor.Metrics type EntityData acceptor.GoProcessData type meterS struct { - numGC uint32 - done chan struct{} + numGC uint32 + running bool + done chan struct{} + mu sync.Mutex +} + +// MetricsOptions contains configuration for metrics collection and transmission. +// This configuration is managed internally and populated from agent configuration. +type MetricsOptions struct { + mu sync.RWMutex + transmissionInterval time.Duration +} + +// GetTransmissionInterval returns the current metrics transmission interval. +// This value is configured through the agent's configuration.yaml file. +func (m *MetricsOptions) getTransmissionInterval() time.Duration { + m.mu.RLock() + defer m.mu.RUnlock() + + if m.transmissionInterval == 0 { + return defaultTransmissionInterval * time.Second + } + return m.transmissionInterval +} + +// setTransmissionInterval sets the metrics transmission interval. +// This is an internal method called when agent configuration is received. +// Valid range: minTransmissionInterval-maxTransmissionInterval seconds. +// Values < minTransmissionInterval are set to minTransmissionInterval, +// values > maxTransmissionInterval are set to maxTransmissionInterval. +func (m *MetricsOptions) setTransmissionInterval(seconds int) { + m.mu.Lock() + defer m.mu.Unlock() + + // Apply minimum value constraint + if seconds < minTransmissionInterval { + defaultLogger.Warn("poll_rate value from agent (", seconds, ") is less than minimum. Setting to minimum value of ", minTransmissionInterval, " second.") + m.transmissionInterval = minTransmissionInterval * time.Second + return + } + + // Apply maximum value constraint + if seconds > maxTransmissionInterval { + defaultLogger.Warn("poll_rate value from agent (", seconds, ") exceeds maximum. Setting to maximum value of ", maxTransmissionInterval, " seconds.") + m.transmissionInterval = maxTransmissionInterval * time.Second + return + } + + // Valid value within range + m.transmissionInterval = time.Duration(seconds) * time.Second + defaultLogger.Info("Metrics transmission interval set to ", seconds, " second(s) from agent configuration") } func newMeter(logger LeveledLogger) *meterS { @@ -35,31 +92,69 @@ func newMeter(logger LeveledLogger) *meterS { } } +func (m *meterS) prepareRun() chan struct{} { + m.mu.Lock() + defer m.mu.Unlock() + + // If already running, stop first + if m.running { + close(m.done) + } + + // Create new channel and mark as running + m.done = make(chan struct{}) + m.running = true + + return m.done +} + func (m *meterS) Run(collectInterval time.Duration) { - ticker := time.NewTicker(collectInterval) - defer ticker.Stop() - for { - select { - case <-m.done: - return - case <-ticker.C: - if isAgentReady() { - go func() { - s, err := getSensor() - if err != nil { - defaultLogger.Error("meter: ", err.Error()) - return - } - - _ = s.Agent().SendMetrics(m.collectMetrics()) - }() + done := m.prepareRun() + + go func(done chan struct{}) { + ticker := time.NewTicker(collectInterval) + defer ticker.Stop() + for { + select { + case <-done: + return + case <-ticker.C: + if isAgentReady() { + go func() { + s, err := getSensor() + if err != nil { + defaultLogger.Error("meter: ", err.Error()) + return + } + + _ = s.Agent().SendMetrics(m.collectMetrics()) + }() + } } } + }(done) +} + +func (m *meterS) reset(interval time.Duration) { + if m == nil { + return } + m.Stop() + m.Run(interval) } func (m *meterS) Stop() { - m.done <- struct{}{} + m.mu.Lock() + defer m.mu.Unlock() + + if m == nil { + return + } + + if m.running { + close(m.done) + m.running = false + } } func (m *meterS) collectMemoryMetrics() acceptor.MemoryStats { diff --git a/meter_test.go b/meter_test.go index edcbdb1d0..606c17f1f 100644 --- a/meter_test.go +++ b/meter_test.go @@ -7,6 +7,8 @@ import ( "sync" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestMeterS_Stop(t *testing.T) { @@ -130,3 +132,324 @@ func TestMeterS_NewMeter(t *testing.T) { t.Errorf("Expected initial numGC to be 0, got %d", m.numGC) } } + +func TestMetricsOptions_GetTransmissionInterval_Default(t *testing.T) { + opts := &MetricsOptions{} + + interval := opts.getTransmissionInterval() + + assert.Equal(t, 1*time.Second, interval, "Default transmission interval should be 1 second") +} + +func TestMetricsOptions_SetTransmissionInterval(t *testing.T) { + tests := []struct { + name string + seconds int + expected time.Duration + }{ + { + name: "Valid 1 second (minimum)", + seconds: 1, + expected: 1 * time.Second, + }, + { + name: "Valid 5 seconds", + seconds: 5, + expected: 5 * time.Second, + }, + { + name: "Valid 60 seconds", + seconds: 60, + expected: 60 * time.Second, + }, + { + name: "Valid 300 seconds", + seconds: 300, + expected: 300 * time.Second, + }, + { + name: "Valid 3600 seconds (maximum)", + seconds: 3600, + expected: 3600 * time.Second, + }, + { + name: "Zero seconds sets to minimum (1 second)", + seconds: 0, + expected: 1 * time.Second, + }, + { + name: "Negative value sets to minimum (1 second)", + seconds: -1, + expected: 1 * time.Second, + }, + { + name: "Negative value -100 sets to minimum (1 second)", + seconds: -100, + expected: 1 * time.Second, + }, + { + name: "Value exceeding maximum (3601) sets to maximum (3600 seconds)", + seconds: 3601, + expected: 3600 * time.Second, + }, + { + name: "Value exceeding maximum (5000) sets to maximum (3600 seconds)", + seconds: 5000, + expected: 3600 * time.Second, + }, + { + name: "Value exceeding maximum (10000) sets to maximum (3600 seconds)", + seconds: 10000, + expected: 3600 * time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opts := &MetricsOptions{} + + opts.setTransmissionInterval(tt.seconds) + + assert.Equal(t, tt.expected, opts.getTransmissionInterval()) + }) + } +} + +func TestMeterS_Reset(t *testing.T) { + t.Run("reset running meter", func(t *testing.T) { + m := newMeter(defaultLogger) + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + m.Run(100 * time.Millisecond) + }() + + time.Sleep(150 * time.Millisecond) + + m.mu.Lock() + assert.True(t, m.running, "Meter should be running before reset") + m.mu.Unlock() + + m.reset(200 * time.Millisecond) + time.Sleep(100 * time.Millisecond) + + m.mu.Lock() + assert.True(t, m.running, "Meter should be running after reset") + m.mu.Unlock() + + m.Stop() + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("meter.Run() did not exit after Stop() was called") + } + }) + + t.Run("multiple resets with different intervals", func(t *testing.T) { + m := newMeter(defaultLogger) + + m.Run(100 * time.Millisecond) + time.Sleep(150 * time.Millisecond) + + m.reset(50 * time.Millisecond) + time.Sleep(100 * time.Millisecond) + + m.reset(200 * time.Millisecond) + time.Sleep(100 * time.Millisecond) + + m.mu.Lock() + running := m.running + m.mu.Unlock() + + assert.True(t, running, "Meter should still be running after multiple resets") + + m.Stop() + + m.mu.Lock() + assert.False(t, m.running, "Meter should be stopped") + m.mu.Unlock() + }) + + t.Run("reset without initial run", func(t *testing.T) { + m := newMeter(defaultLogger) + + m.mu.Lock() + assert.False(t, m.running, "Meter should not be running initially") + m.mu.Unlock() + + m.reset(100 * time.Millisecond) + time.Sleep(150 * time.Millisecond) + + m.mu.Lock() + running := m.running + m.mu.Unlock() + + assert.True(t, running, "Meter should be running after reset") + + m.Stop() + + m.mu.Lock() + assert.False(t, m.running, "Meter should be stopped") + m.mu.Unlock() + }) +} + +func TestMeterS_ConcurrentOperations(t *testing.T) { + t.Run("concurrent stop and run", func(t *testing.T) { + m := newMeter(defaultLogger) + + m.Run(100 * time.Millisecond) + time.Sleep(50 * time.Millisecond) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(2) + go func() { + defer wg.Done() + m.Stop() + }() + go func() { + defer wg.Done() + m.Run(100 * time.Millisecond) + }() + } + + wg.Wait() + m.Stop() + + m.mu.Lock() + assert.False(t, m.running, "Meter should be stopped after concurrent operations") + m.mu.Unlock() + }) + + t.Run("concurrent reset", func(t *testing.T) { + m := newMeter(defaultLogger) + + m.Run(100 * time.Millisecond) + time.Sleep(50 * time.Millisecond) + + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func(interval time.Duration) { + defer wg.Done() + m.reset(interval) + }(time.Duration(50+i*10) * time.Millisecond) + } + + wg.Wait() + time.Sleep(100 * time.Millisecond) + + m.mu.Lock() + running := m.running + m.mu.Unlock() + + assert.True(t, running, "Meter should be running after concurrent resets") + + m.Stop() + + m.mu.Lock() + assert.False(t, m.running, "Meter should be stopped") + m.mu.Unlock() + }) +} + +func TestMeterS_StopAndRestart(t *testing.T) { + t.Run("stop multiple times", func(t *testing.T) { + m := newMeter(defaultLogger) + + m.Run(100 * time.Millisecond) + time.Sleep(50 * time.Millisecond) + + // Stop multiple times - should not panic + m.Stop() + m.Stop() + m.Stop() + + m.mu.Lock() + assert.False(t, m.running, "Meter should be stopped") + m.mu.Unlock() + }) + + t.Run("run after stop", func(t *testing.T) { + m := newMeter(defaultLogger) + + m.Run(100 * time.Millisecond) + time.Sleep(150 * time.Millisecond) + + m.Stop() + time.Sleep(50 * time.Millisecond) + + m.mu.Lock() + assert.False(t, m.running, "Meter should be stopped") + m.mu.Unlock() + + // Start again + m.Run(100 * time.Millisecond) + time.Sleep(150 * time.Millisecond) + + m.mu.Lock() + running := m.running + m.mu.Unlock() + + assert.True(t, running, "Meter should be running after restart") + + m.Stop() + }) +} + +func TestMeterS_Reset_InternalState(t *testing.T) { + t.Run("preserves numGC", func(t *testing.T) { + m := newMeter(defaultLogger) + + _ = m.collectMetrics() + + m.mu.Lock() + initialNumGC := m.numGC + m.mu.Unlock() + + m.Run(100 * time.Millisecond) + time.Sleep(50 * time.Millisecond) + m.reset(200 * time.Millisecond) + time.Sleep(50 * time.Millisecond) + + m.mu.Lock() + currentNumGC := m.numGC + m.mu.Unlock() + + assert.GreaterOrEqual(t, currentNumGC, initialNumGC, "numGC should be preserved or increased") + + m.Stop() + }) + + t.Run("creates new done channel", func(t *testing.T) { + m := newMeter(defaultLogger) + + m.Run(100 * time.Millisecond) + time.Sleep(50 * time.Millisecond) + + m.mu.Lock() + firstDone := m.done + m.mu.Unlock() + + m.reset(200 * time.Millisecond) + time.Sleep(50 * time.Millisecond) + + m.mu.Lock() + secondDone := m.done + m.mu.Unlock() + + assert.NotEqual(t, firstDone, secondDone, "Reset should create a new done channel") + + m.Stop() + }) +} diff --git a/options.go b/options.go index 974e1a66e..d8da61489 100644 --- a/options.go +++ b/options.go @@ -40,6 +40,8 @@ type Options struct { MaxBufferedProfiles int // IncludeProfilerFrames is whether to include profiler calls into the profile or not IncludeProfilerFrames bool + // Metrics contains metrics collection and transmission configuration. + Metrics MetricsOptions // Tracer contains tracer-specific configuration used by all tracers Tracer TracerOptions // AgentClient client to communicate with the agent. In most cases, there is no need to provide it. diff --git a/sensor.go b/sensor.go index 332ef95f4..5acf84a90 100644 --- a/sensor.go +++ b/sensor.go @@ -115,6 +115,7 @@ func newSensor(options *Options) *sensorS { } var agent AgentClient + var isServerless bool if options.AgentClient != nil { agent = options.AgentClient @@ -122,6 +123,7 @@ func newSensor(options *Options) *sensorS { if agentEndpoint := os.Getenv("INSTANA_ENDPOINT_URL"); agentEndpoint != "" && agent == nil { s.logger.Debug("INSTANA_ENDPOINT_URL= is set, switching to the serverless mode") + isServerless = true timeout, err := parseInstanaTimeout(os.Getenv("INSTANA_TIMEOUT")) if err != nil { @@ -142,12 +144,17 @@ func newSensor(options *Options) *sensorS { agent = newServerlessAgent(s.serviceOrBinaryName(), agentEndpoint, os.Getenv("INSTANA_AGENT_KEY"), client, s.logger) } + s.meter = newMeter(s.logger) if agent == nil { agent = newAgent(s.serviceOrBinaryName(), s.options.AgentHost, s.options.AgentPort, s.logger) } s.setAgent(agent) - s.meter = newMeter(s.logger) + + // For serverless agents, start the meter immediately since they don't use the FSM + if isServerless { + s.meter.Run(s.options.Metrics.getTransmissionInterval()) + } return s } @@ -224,9 +231,6 @@ func InitSensor(options *Options) { // configure auto-profiling configureAutoProfiling(options) - // start collecting metrics - go sensor.meter.Run(1 * time.Second) - sensor.logger.Debug("initialized Instana sensor v", Version) }