From cabd03220777f40d02427f7e143cd54a977dabd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Wed, 22 Oct 2025 21:50:48 +0200 Subject: [PATCH 1/2] Make all beats receiver integration tests use standard tools (#10717) This way, the tests generate diagnostics and ensure agent is not running after test failure. (cherry picked from commit 020adf82d65f358e413b0cb42708db43163f5f65) # Conflicts: # testing/integration/ess/beat_receivers_test.go --- .../integration/ess/beat_receivers_test.go | 283 ++++++++++++++++-- 1 file changed, 251 insertions(+), 32 deletions(-) diff --git a/testing/integration/ess/beat_receivers_test.go b/testing/integration/ess/beat_receivers_test.go index be099336cd1..021f3a73e42 100644 --- a/testing/integration/ess/beat_receivers_test.go +++ b/testing/integration/ess/beat_receivers_test.go @@ -13,7 +13,11 @@ import ( "fmt" "io" "net/http" +<<<<<<< HEAD "os/exec" +======= + "os" +>>>>>>> 020adf82d (Make all beats receiver integration tests use standard tools (#10717)) "runtime" "strings" "testing" @@ -515,17 +519,17 @@ outputs: ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute)) defer cancel() - fixture, cmd, output := prepareAgentCmd(t, ctx, configContents) + // set up a standalone agent + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) - err = cmd.Start() + err = fixture.Prepare(ctx) + require.NoError(t, err) + err = fixture.Configure(ctx, configContents) require.NoError(t, err) - t.Cleanup(func() { - if t.Failed() { - t.Log("Elastic-Agent output:") - t.Log(output.String()) - } - }) + output, err := fixture.Install(ctx, &atesting.InstallOpts{Privileged: true, Force: true}) + require.NoError(t, err, "failed to install agent: %s", output) require.Eventually(t, func() bool { err = fixture.IsHealthy(ctx) @@ -570,9 +574,6 @@ outputs: 30*time.Second, 1*time.Second, "Expected to find at least one document for metricset %s in index %s and runtime %q, got 0", mset, index, tt.runtimeExperimental) } - - cancel() - cmd.Wait() }) } @@ -876,27 +877,6 @@ func getBeatStartLogRecords(logs string) []map[string]any { return logRecords } -func prepareAgentCmd(t *testing.T, ctx context.Context, config []byte) (*atesting.Fixture, *exec.Cmd, *strings.Builder) { - // set up a standalone agent - fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) - require.NoError(t, err) - - err = fixture.Prepare(ctx) - require.NoError(t, err) - err = fixture.Configure(ctx, config) - require.NoError(t, err) - - cmd, err := fixture.PrepareAgentCommand(ctx, nil) - require.NoError(t, err) - cmd.WaitDelay = 1 * time.Second - - var output strings.Builder - cmd.Stderr = &output - cmd.Stdout = &output - - return fixture, cmd, &output -} - func genIgnoredFields(goos string) []string { switch goos { case "windows": @@ -916,3 +896,242 @@ func genIgnoredFields(goos string) []string { } } } +<<<<<<< HEAD +======= + +// TestSensitiveLogsESExporter tests sensitive logs from ex-exporter are not sent to fleet +func TestSensitiveLogsESExporter(t *testing.T) { + // The ES exporter logs the original document on indexing failure only if + // the "telemetry::log_failed_docs_input" setting is enabled and the log level is set to debug. + info := define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + Sudo: true, + OS: []define.OS{ + {Type: define.Windows}, + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: &define.Stack{}, + }) + tmpDir := t.TempDir() + numEvents := 50 + // Create the data file to ingest + inputFile, err := os.CreateTemp(tmpDir, "input.txt") + require.NoError(t, err, "failed to create temp file to hold data to ingest") + inputFilePath := inputFile.Name() + + // these messages will fail to index as message is expected to be of integer type + for i := 0; i < numEvents; i++ { + _, err = inputFile.Write([]byte(fmt.Sprintf("Line %d\n", i))) + require.NoErrorf(t, err, "failed to write line %d to temp file", i) + } + err = inputFile.Close() + require.NoError(t, err, "failed to close data temp file") + + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + // Create the otel configuration file + type otelConfigOptions struct { + InputPath string + ESEndpoint string + ESApiKey string + Namespace string + } + esEndpoint, err := integration.GetESHost() + require.NoError(t, err, "error getting elasticsearch endpoint") + esApiKey, err := createESApiKey(info.ESClient) + require.NoError(t, err, "error creating API key") + require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + decodedApiKey, err := getDecodedApiKey(esApiKey) + require.NoError(t, err) + + configTemplate := ` +inputs: + - type: filestream + id: filestream-e2e + use_output: default + _runtime_experimental: otel + streams: + - id: e2e + data_stream: + dataset: sensitive + namespace: {{ .Namespace }} + paths: + - {{.InputPath}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ +outputs: + default: + type: elasticsearch + hosts: [{{.ESEndpoint}}] + api_key: "{{.ESApiKey}}" +agent: + monitoring: + enabled: true + metrics: false + logs: true + _runtime_experimental: otel +agent.logging.level: debug +agent.logging.stderr: true +` + index := "logs-sensitive-" + info.Namespace + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, + otelConfigOptions{ + InputPath: inputFilePath, + ESEndpoint: esEndpoint, + ESApiKey: decodedApiKey, + Namespace: info.Namespace, + })) + + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute) + defer cancel() + err = fixture.Prepare(ctx) + require.NoError(t, err) + + err = fixture.Configure(ctx, configBuffer.Bytes()) + require.NoError(t, err) + + err = setStrictMapping(info.ESClient, index) + require.NoError(t, err, "could not set strict mapping due to %v", err) + + timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + + output, err := fixture.Install(ctx, &atesting.InstallOpts{Privileged: true, Force: true}) + require.NoError(t, err, "Elastic Agent installation failed with error: %w, output: %s", err, string(output)) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + var statusErr error + status, statusErr := fixture.ExecStatus(ctx) + assert.NoError(collect, statusErr) + assertBeatsHealthy(collect, &status, component.OtelRuntimeManager, 2) + }, 1*time.Minute, 1*time.Second) + + // Check 1: + // Ensure sensitive logs from ES exporter are not shipped to ES + rawQuery := map[string]any{ + "query": map[string]any{ + "bool": map[string]any{ + "must": map[string]any{ + "match_phrase": map[string]any{ + // this message comes from ES exporter + "message": "failed to index document; input may contain sensitive data", + }, + }, + "filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}}, + }, + }, + "sort": []map[string]any{ + {"@timestamp": map[string]any{"order": "asc"}}, + }, + } + + var monitoringDoc estools.Documents + assert.EventuallyWithT(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) + defer findCancel() + + monitoringDoc, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, "logs-elastic_agent-default*", info.ESClient) + require.NoError(ct, err) + + assert.GreaterOrEqual(ct, monitoringDoc.Hits.Total.Value, 1) + }, + 2*time.Minute, 5*time.Second, + "Expected at least %d log, got %d", 1, monitoringDoc.Hits.Total.Value) + + inputField := monitoringDoc.Hits.Hits[0].Source["input"] + inputFieldStr, ok := inputField.(string) + if ok { + // we check if it contains the original message line + assert.NotContains(t, inputFieldStr, "message: Line", "monitoring logs contain original input") + } + + // Check 2: + // Ensure event logs from elastic owned components is not shipped i.e drop_processor works correctly + rawQuery = map[string]any{ + "query": map[string]any{ + "bool": map[string]any{ + "must": map[string]any{ + "match": map[string]any{ + // event logs contain a special field on them + "log.type": "event", + }, + }, + "filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}}, + }, + }, + "sort": []map[string]any{ + {"@timestamp": map[string]any{"order": "asc"}}, + }, + } + + findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) + defer findCancel() + + docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, "logs-elastic_agent*", map[string]interface{}{ + "log.type": "event", + }) + + assert.NoError(t, err) + assert.Zero(t, docs.Hits.Total.Value) +} + +// setStrictMapping takes es client and index name +// and sets strict mapping for that index. +// Useful to reproduce mapping conflicts required for testing +func setStrictMapping(client *elasticsearch.Client, index string) error { + // Define the body + body := map[string]interface{}{ + "index_patterns": []string{index + "*"}, + "template": map[string]interface{}{ + "mappings": map[string]interface{}{ + "dynamic": "strict", + "properties": map[string]interface{}{ + "@timestamp": map[string]string{"type": "date"}, + "message": map[string]string{"type": "integer"}, // we set message type to integer to cause mapping conflict + }, + }, + }, + "priority": 500, + } + + // Marshal body to JSON + jsonData, err := json.Marshal(body) + if err != nil { + panic(err) + } + + esEndpoint, err := integration.GetESHost() + if err != nil { + return fmt.Errorf("error getting elasticsearch endpoint: %v", err) + } + + // Create a context + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Build request + req, err := http.NewRequestWithContext(ctx, http.MethodPut, + esEndpoint+"/_index_template/no-dynamic-template", + bytes.NewReader(jsonData)) + if err != nil { + return fmt.Errorf("could not create http request to ES server: %v", err) + } + + // Set content type header + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Perform(req) + if err != nil { + return fmt.Errorf("error performing request: %v", err) + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("incorrect response code: %v", err) + } + return nil +} +>>>>>>> 020adf82d (Make all beats receiver integration tests use standard tools (#10717)) From 1ddc2329a572ef896ccf404c703225c02cca818d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Fri, 24 Oct 2025 16:57:45 +0200 Subject: [PATCH 2/2] Resolve conflicts --- .../integration/ess/beat_receivers_test.go | 244 ------------------ 1 file changed, 244 deletions(-) diff --git a/testing/integration/ess/beat_receivers_test.go b/testing/integration/ess/beat_receivers_test.go index 021f3a73e42..066fd56d88a 100644 --- a/testing/integration/ess/beat_receivers_test.go +++ b/testing/integration/ess/beat_receivers_test.go @@ -13,11 +13,6 @@ import ( "fmt" "io" "net/http" -<<<<<<< HEAD - "os/exec" -======= - "os" ->>>>>>> 020adf82d (Make all beats receiver integration tests use standard tools (#10717)) "runtime" "strings" "testing" @@ -896,242 +891,3 @@ func genIgnoredFields(goos string) []string { } } } -<<<<<<< HEAD -======= - -// TestSensitiveLogsESExporter tests sensitive logs from ex-exporter are not sent to fleet -func TestSensitiveLogsESExporter(t *testing.T) { - // The ES exporter logs the original document on indexing failure only if - // the "telemetry::log_failed_docs_input" setting is enabled and the log level is set to debug. - info := define.Require(t, define.Requirements{ - Group: integration.Default, - Local: true, - Sudo: true, - OS: []define.OS{ - {Type: define.Windows}, - {Type: define.Linux}, - {Type: define.Darwin}, - }, - Stack: &define.Stack{}, - }) - tmpDir := t.TempDir() - numEvents := 50 - // Create the data file to ingest - inputFile, err := os.CreateTemp(tmpDir, "input.txt") - require.NoError(t, err, "failed to create temp file to hold data to ingest") - inputFilePath := inputFile.Name() - - // these messages will fail to index as message is expected to be of integer type - for i := 0; i < numEvents; i++ { - _, err = inputFile.Write([]byte(fmt.Sprintf("Line %d\n", i))) - require.NoErrorf(t, err, "failed to write line %d to temp file", i) - } - err = inputFile.Close() - require.NoError(t, err, "failed to close data temp file") - - fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) - require.NoError(t, err) - - // Create the otel configuration file - type otelConfigOptions struct { - InputPath string - ESEndpoint string - ESApiKey string - Namespace string - } - esEndpoint, err := integration.GetESHost() - require.NoError(t, err, "error getting elasticsearch endpoint") - esApiKey, err := createESApiKey(info.ESClient) - require.NoError(t, err, "error creating API key") - require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) - decodedApiKey, err := getDecodedApiKey(esApiKey) - require.NoError(t, err) - - configTemplate := ` -inputs: - - type: filestream - id: filestream-e2e - use_output: default - _runtime_experimental: otel - streams: - - id: e2e - data_stream: - dataset: sensitive - namespace: {{ .Namespace }} - paths: - - {{.InputPath}} - prospector.scanner.fingerprint.enabled: false - file_identity.native: ~ -outputs: - default: - type: elasticsearch - hosts: [{{.ESEndpoint}}] - api_key: "{{.ESApiKey}}" -agent: - monitoring: - enabled: true - metrics: false - logs: true - _runtime_experimental: otel -agent.logging.level: debug -agent.logging.stderr: true -` - index := "logs-sensitive-" + info.Namespace - var configBuffer bytes.Buffer - require.NoError(t, - template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, - otelConfigOptions{ - InputPath: inputFilePath, - ESEndpoint: esEndpoint, - ESApiKey: decodedApiKey, - Namespace: info.Namespace, - })) - - ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute) - defer cancel() - err = fixture.Prepare(ctx) - require.NoError(t, err) - - err = fixture.Configure(ctx, configBuffer.Bytes()) - require.NoError(t, err) - - err = setStrictMapping(info.ESClient, index) - require.NoError(t, err, "could not set strict mapping due to %v", err) - - timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") - - output, err := fixture.Install(ctx, &atesting.InstallOpts{Privileged: true, Force: true}) - require.NoError(t, err, "Elastic Agent installation failed with error: %w, output: %s", err, string(output)) - - require.EventuallyWithT(t, func(collect *assert.CollectT) { - var statusErr error - status, statusErr := fixture.ExecStatus(ctx) - assert.NoError(collect, statusErr) - assertBeatsHealthy(collect, &status, component.OtelRuntimeManager, 2) - }, 1*time.Minute, 1*time.Second) - - // Check 1: - // Ensure sensitive logs from ES exporter are not shipped to ES - rawQuery := map[string]any{ - "query": map[string]any{ - "bool": map[string]any{ - "must": map[string]any{ - "match_phrase": map[string]any{ - // this message comes from ES exporter - "message": "failed to index document; input may contain sensitive data", - }, - }, - "filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}}, - }, - }, - "sort": []map[string]any{ - {"@timestamp": map[string]any{"order": "asc"}}, - }, - } - - var monitoringDoc estools.Documents - assert.EventuallyWithT(t, - func(ct *assert.CollectT) { - findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) - defer findCancel() - - monitoringDoc, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, "logs-elastic_agent-default*", info.ESClient) - require.NoError(ct, err) - - assert.GreaterOrEqual(ct, monitoringDoc.Hits.Total.Value, 1) - }, - 2*time.Minute, 5*time.Second, - "Expected at least %d log, got %d", 1, monitoringDoc.Hits.Total.Value) - - inputField := monitoringDoc.Hits.Hits[0].Source["input"] - inputFieldStr, ok := inputField.(string) - if ok { - // we check if it contains the original message line - assert.NotContains(t, inputFieldStr, "message: Line", "monitoring logs contain original input") - } - - // Check 2: - // Ensure event logs from elastic owned components is not shipped i.e drop_processor works correctly - rawQuery = map[string]any{ - "query": map[string]any{ - "bool": map[string]any{ - "must": map[string]any{ - "match": map[string]any{ - // event logs contain a special field on them - "log.type": "event", - }, - }, - "filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}}, - }, - }, - "sort": []map[string]any{ - {"@timestamp": map[string]any{"order": "asc"}}, - }, - } - - findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) - defer findCancel() - - docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, "logs-elastic_agent*", map[string]interface{}{ - "log.type": "event", - }) - - assert.NoError(t, err) - assert.Zero(t, docs.Hits.Total.Value) -} - -// setStrictMapping takes es client and index name -// and sets strict mapping for that index. -// Useful to reproduce mapping conflicts required for testing -func setStrictMapping(client *elasticsearch.Client, index string) error { - // Define the body - body := map[string]interface{}{ - "index_patterns": []string{index + "*"}, - "template": map[string]interface{}{ - "mappings": map[string]interface{}{ - "dynamic": "strict", - "properties": map[string]interface{}{ - "@timestamp": map[string]string{"type": "date"}, - "message": map[string]string{"type": "integer"}, // we set message type to integer to cause mapping conflict - }, - }, - }, - "priority": 500, - } - - // Marshal body to JSON - jsonData, err := json.Marshal(body) - if err != nil { - panic(err) - } - - esEndpoint, err := integration.GetESHost() - if err != nil { - return fmt.Errorf("error getting elasticsearch endpoint: %v", err) - } - - // Create a context - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - // Build request - req, err := http.NewRequestWithContext(ctx, http.MethodPut, - esEndpoint+"/_index_template/no-dynamic-template", - bytes.NewReader(jsonData)) - if err != nil { - return fmt.Errorf("could not create http request to ES server: %v", err) - } - - // Set content type header - req.Header.Set("Content-Type", "application/json") - - resp, err := client.Perform(req) - if err != nil { - return fmt.Errorf("error performing request: %v", err) - } - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("incorrect response code: %v", err) - } - return nil -} ->>>>>>> 020adf82d (Make all beats receiver integration tests use standard tools (#10717))