From 9cfe35c4d7cbfe77c11a99afe1f6a9aa465dfa0c Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 17 Oct 2025 18:57:08 +0530 Subject: [PATCH 01/10] added initial working cel code without auth --- .../data_stream/siem/agent/stream/cel.yml.hbs | 109 ++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs diff --git a/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs b/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs new file mode 100644 index 00000000000..1e8575fc6fb --- /dev/null +++ b/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs @@ -0,0 +1,109 @@ +config_version: 2 +interval: {{interval}} +resource: + url: https://{{api_host}}/siem/v1/configs/{{config_ids}} + {{#if ssl}} + ssl: {{ssl}} + {{/if}} + {{#if http_client_timeout}} + timeout: {{http_client_timeout}} + {{/if}} + {{#if proxy_url }} + proxy_url: {{proxy_url}} + {{/if}} + tracer: + enabled: {{enable_request_tracer}} + filename: "../../logs/cel/http-request-trace-*.ndjson" + maxbackups: 5 + +state: + client_token: {{client_token}} + access_token: {{access_token}} + client_secret: {{client_secret}} + initial_interval: {{initial_interval}} + event_limit: {{event_limit}} + +program: | + state.with( + ( + state.?cursor.?recovery_mode.orValue(false) ? + { + "from": int(now - duration("12h")), + "to": int(now) + } + : + state.?cursor.?offset.hasValue() ? + { + "offset": state.cursor.offset + } + : + { + "from": max(int(now - duration(state.initial_interval)),int(now - duration("12h"))), + "to": int(now) + } + ).as(params, + request( + "GET", + state.url + "?" + { + "limit": [string(state.event_limit)], + ?"from": params.?from.orValue("") != "" ? optional.of([string(params.from)]) : optional.none(), + ?"to": params.?to.orValue("") != "" ? optional.of([string(params.to)]) : optional.none(), + ?"offset": params.?offset.orValue("") != "" ? optional.of([string(params.offset)]) : optional.none(), + }.format_query() + ).with({ + "Header": + { + "Authorization": ["CEL " + state.client_token + ":" + state.access_token] + } + }).do_request().as(resp, resp.StatusCode == 200 ? + { + "events": string(resp.Body).split("\n").map(e, e!="", + { + "message": e, + } + ), + "cursor": { + ?"offset": resp.Body.?offset.orValue("") != "" ? optional.of(resp.Body.offset) : optional.none(), + "recovery_mode": false + }, + "want_more": has(resp.Body.offset) && resp.Body.offset != "" + } + : + resp.StatusCode == 406 ? + { + "events": { + "error": { + "code": string(resp.StatusCode), + "id": string(resp.Status), + "message": "GET " + state.url.trim_right("/") + ( + size(resp.Body) != 0 ? + string(resp.Body) + : + string(resp.Status) + ' (' + string(resp.StatusCode) + ')' + ), + }, + }, + "cursor": { + "recovery_mode": true + }, + "want_more": true + } + : + { + "events": { + "error": { + "code": string(resp.StatusCode), + "id": string(resp.Status), + "message": "GET " + state.url.trim_right("/") + ( + size(resp.Body) != 0 ? + string(resp.Body) + : + string(resp.Status) + ' (' + string(resp.StatusCode) + ')' + ), + }, + }, + "want_more": false + } + ) + ) + ) \ No newline at end of file From 5c11cffb82da77a3ad241e91f02fdf216cf0ca1f Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Mon, 20 Oct 2025 18:58:48 +0530 Subject: [PATCH 02/10] updated with working CEL code with proper auth signature --- .../data_stream/siem/agent/stream/cel.yml.hbs | 111 +++++++++++------- 1 file changed, 68 insertions(+), 43 deletions(-) diff --git a/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs b/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs index 1e8575fc6fb..427e77a77eb 100644 --- a/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs +++ b/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs @@ -22,7 +22,13 @@ state: client_secret: {{client_secret}} initial_interval: {{initial_interval}} event_limit: {{event_limit}} - + +redact: + fields: + - client_secret + - access_token + - client_token + program: | state.with( ( @@ -38,37 +44,61 @@ program: | } : { - "from": max(int(now - duration(state.initial_interval)),int(now - duration("12h"))), + "from": max(int(now - duration(state.initial_interval)), int(now - duration("12h"))), "to": int(now) } ).as(params, - request( - "GET", - state.url + "?" + { - "limit": [string(state.event_limit)], - ?"from": params.?from.orValue("") != "" ? optional.of([string(params.from)]) : optional.none(), - ?"to": params.?to.orValue("") != "" ? optional.of([string(params.to)]) : optional.none(), - ?"offset": params.?offset.orValue("") != "" ? optional.of([string(params.offset)]) : optional.none(), - }.format_query() - ).with({ - "Header": - { - "Authorization": ["CEL " + state.client_token + ":" + state.access_token] + now.format("20060102T15:04:05-0700").as(timestamp, + uuid().as(nonce, + sprintf("EG1-HMAC-SHA256 client_token=%s;access_token=%s;timestamp=%s;nonce=%s;", + [state.client_token, state.access_token, timestamp, nonce]).as(sig_base, + base64(hmac(timestamp, "sha256", bytes(state.client_secret))).as(sig_key, + (state.url + "?" + { + "limit": [string(state.event_limit)], + ?"from": params.?from.orValue("") != "" ? optional.of([string(params.from)]) : optional.none(), + ?"to": params.?to.orValue("") != "" ? optional.of([string(params.to)]) : optional.none(), + ?"offset": params.?offset.orValue("") != "" ? optional.of([string(params.offset)]) : optional.none(), + }.format_query()).parse_url().as(u, + ( + "GET\t" + + string(u.Scheme) + "\t" + + string(u.Host) + "\t" + + string(u.Path) + "?" + + string(u.RawQuery) + + "\t\t\t" + sig_base + ).as(to_sign, + base64(hmac(to_sign, "sha256", bytes(sig_key))).as(signature, + sig_base + "signature=" + signature + ) + ) + ) + ) + ) + ) + ).as(auth_header, + request( + "GET", + state.url + "?" + { + "limit": [string(state.event_limit)], + ?"from": params.?from.orValue("") != "" ? optional.of([string(params.from)]) : optional.none(), + ?"to": params.?to.orValue("") != "" ? optional.of([string(params.to)]) : optional.none(), + ?"offset": params.?offset.orValue("") != "" ? optional.of([string(params.offset)]) : optional.none(), + }.format_query() + ).with({ + "Header": { + "Authorization": [auth_header] } - }).do_request().as(resp, resp.StatusCode == 200 ? - { - "events": string(resp.Body).split("\n").map(e, e!="", - { - "message": e, - } - ), - "cursor": { - ?"offset": resp.Body.?offset.orValue("") != "" ? optional.of(resp.Body.offset) : optional.none(), - "recovery_mode": false - }, - "want_more": has(resp.Body.offset) && resp.Body.offset != "" - } - : + }).do_request().as(resp, + resp.StatusCode == 200 ? + { + "events": string(resp.Body).split("\n").map(e, e!="", { "message": e }), + "cursor": { + ?"offset": resp.Body.?offset.orValue("") != "" ? optional.of(resp.Body.offset) : optional.none(), + "recovery_mode": false + }, + "want_more": has(resp.Body.offset) && resp.Body.offset != "" + } + : resp.StatusCode == 406 ? { "events": { @@ -76,16 +106,12 @@ program: | "code": string(resp.StatusCode), "id": string(resp.Status), "message": "GET " + state.url.trim_right("/") + ( - size(resp.Body) != 0 ? - string(resp.Body) - : - string(resp.Status) + ' (' + string(resp.StatusCode) + ')' - ), - }, + size(resp.Body) != 0 ? string(resp.Body) + : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' + ), }, - "cursor": { - "recovery_mode": true }, + "cursor": { "recovery_mode": true }, "want_more": true } : @@ -95,15 +121,14 @@ program: | "code": string(resp.StatusCode), "id": string(resp.Status), "message": "GET " + state.url.trim_right("/") + ( - size(resp.Body) != 0 ? - string(resp.Body) - : - string(resp.Status) + ' (' + string(resp.StatusCode) + ')' - ), - }, + size(resp.Body) != 0 ? string(resp.Body) + : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' + ), }, + }, "want_more": false } + ) ) ) - ) \ No newline at end of file + ) From d2e92ce934b3b5af0d3ad36de99feb55499fd89f Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Mon, 20 Oct 2025 19:01:33 +0530 Subject: [PATCH 03/10] updated state.url --- packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs b/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs index 427e77a77eb..ad51762ff0d 100644 --- a/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs +++ b/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs @@ -53,7 +53,7 @@ program: | sprintf("EG1-HMAC-SHA256 client_token=%s;access_token=%s;timestamp=%s;nonce=%s;", [state.client_token, state.access_token, timestamp, nonce]).as(sig_base, base64(hmac(timestamp, "sha256", bytes(state.client_secret))).as(sig_key, - (state.url + "?" + { + (state.url.trim_right("/") + "?" + { "limit": [string(state.event_limit)], ?"from": params.?from.orValue("") != "" ? optional.of([string(params.from)]) : optional.none(), ?"to": params.?to.orValue("") != "" ? optional.of([string(params.to)]) : optional.none(), @@ -78,7 +78,7 @@ program: | ).as(auth_header, request( "GET", - state.url + "?" + { + state.url.trim_right("/") + "?" + { "limit": [string(state.event_limit)], ?"from": params.?from.orValue("") != "" ? optional.of([string(params.from)]) : optional.none(), ?"to": params.?to.orValue("") != "" ? optional.of([string(params.to)]) : optional.none(), From 175690c91adf884dfcf688b33eaad7fe7bda5e99 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 22 Oct 2025 19:17:36 +0530 Subject: [PATCH 04/10] migrated to cel with working system tests --- packages/akamai/changelog.yml | 5 + ...log-config.yml => test-cel.log-config.yml} | 0 .../_dev/test/system/test-emulator-config.yml | 7 +- .../data_stream/siem/agent/stream/cel.yml.hbs | 34 +++--- .../siem/agent/stream/httpjson.yml.hbs | 103 ------------------ packages/akamai/data_stream/siem/manifest.yml | 12 +- packages/akamai/manifest.yml | 4 +- 7 files changed, 41 insertions(+), 124 deletions(-) rename packages/akamai/data_stream/siem/_dev/test/pipeline/{test-http-json.log-config.yml => test-cel.log-config.yml} (100%) delete mode 100644 packages/akamai/data_stream/siem/agent/stream/httpjson.yml.hbs diff --git a/packages/akamai/changelog.yml b/packages/akamai/changelog.yml index 1619841355f..9f26a1d9faa 100644 --- a/packages/akamai/changelog.yml +++ b/packages/akamai/changelog.yml @@ -1,4 +1,9 @@ # newer versions go on top +- version: "2.29.0" + changes: + - description: Migrated siem data stream from Httpjson to CEL. + type: enhancement + link: https://github.com/elastic/integrations/pull/1111 - version: "2.28.1" changes: - description: Fixed time duration handling in request parameters to conform to API guidelines. diff --git a/packages/akamai/data_stream/siem/_dev/test/pipeline/test-http-json.log-config.yml b/packages/akamai/data_stream/siem/_dev/test/pipeline/test-cel.log-config.yml similarity index 100% rename from packages/akamai/data_stream/siem/_dev/test/pipeline/test-http-json.log-config.yml rename to packages/akamai/data_stream/siem/_dev/test/pipeline/test-cel.log-config.yml diff --git a/packages/akamai/data_stream/siem/_dev/test/system/test-emulator-config.yml b/packages/akamai/data_stream/siem/_dev/test/system/test-emulator-config.yml index abb4756aaee..f8a7cdf1165 100644 --- a/packages/akamai/data_stream/siem/_dev/test/system/test-emulator-config.yml +++ b/packages/akamai/data_stream/siem/_dev/test/system/test-emulator-config.yml @@ -1,4 +1,4 @@ -input: httpjson +input: cel service: akamai-siem-emulator vars: ~ data_stream: @@ -12,10 +12,9 @@ data_stream: access_token: at-6b8c7217-8748-490d-b0f5-bfeb72b2e7cd config_ids: 123456 event_limit: 20 + # The akamai-siem emulator does not limit the number of events or pages returned, so we set a large number of max_executions. + max_executions: 50000 enable_request_tracer: true assert: # 12 hours at 5 minutes between events. hit_count: 144 # = 12 * 60/5 -skip: - reason: "The fleet health status changes to degraded when the HTTPJSON template's value evaluation comes up empty, which leads to system test failures but does not interrupt the data flow." - link: https://github.com/elastic/beats/issues/45664 diff --git a/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs b/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs index ad51762ff0d..ae4e75004fc 100644 --- a/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs +++ b/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs @@ -1,7 +1,8 @@ config_version: 2 interval: {{interval}} +max_executions: {{max_executions}} resource: - url: https://{{api_host}}/siem/v1/configs/{{config_ids}} + url: {{api_host}}/siem/v1/configs/{{config_ids}} {{#if ssl}} ssl: {{ssl}} {{/if}} @@ -38,9 +39,9 @@ program: | "to": int(now) } : - state.?cursor.?offset.hasValue() ? + state.?cursor.?last_offset.hasValue() ? { - "offset": state.cursor.offset + "offset": state.cursor.last_offset } : { @@ -90,16 +91,23 @@ program: | } }).do_request().as(resp, resp.StatusCode == 200 ? - { - "events": string(resp.Body).split("\n").map(e, e!="", { "message": e }), - "cursor": { - ?"offset": resp.Body.?offset.orValue("") != "" ? optional.of(resp.Body.offset) : optional.none(), - "recovery_mode": false - }, - "want_more": has(resp.Body.offset) && resp.Body.offset != "" - } - : - resp.StatusCode == 406 ? + string(resp.Body).split("\n").filter(line, line != "").as(lines, { + "events": lines.map(line, { "message": line }), + "cursor": { + ?"last_offset": lines.size() > 0 ? + lines[lines.size() - 1].decode_json().as(lastEvent, + has(lastEvent.offset) ? optional.of(lastEvent.offset) : optional.none() + ) + : optional.none(), + "recovery_mode": false + }, + "want_more": lines.size() > 0 ? + lines[lines.size() - 1].decode_json().as(lastEvent, + has(lastEvent.offset) && lastEvent.offset != "" + ) + : false + }) + : resp.StatusCode == 416 ? { "events": { "error": { diff --git a/packages/akamai/data_stream/siem/agent/stream/httpjson.yml.hbs b/packages/akamai/data_stream/siem/agent/stream/httpjson.yml.hbs deleted file mode 100644 index 7701e19aeaa..00000000000 --- a/packages/akamai/data_stream/siem/agent/stream/httpjson.yml.hbs +++ /dev/null @@ -1,103 +0,0 @@ -config_version: "2" -interval: {{interval}} -request.method: "GET" -request.url: "{{api_host}}/siem/v1/configs/{{config_ids}}" -{{#if ssl}} -request.ssl: {{ssl}} -{{/if}} -{{#if http_client_timeout}} -request.timeout: {{http_client_timeout}} -{{/if}} -{{#if proxy_url }} -request.proxy_url: {{proxy_url}} -{{/if}} -{{#if enable_request_tracer}} -request.tracer.filename: "../../logs/httpjson/http-request-trace-*.ndjson" -request.tracer.maxbackups: 5 -{{/if}} -request.transforms: - - set: - target: url.params.from - # On the initial request (no cursor), this calculates the start time. - # It takes the more recent of two values: the user-defined 'initial_interval' - # or the API's maximum 12-hour look-back, preventing API errors. - value: >- - [[- if not (index .cursor "last_offset") -]] - [[- $initialTime := (now (parseDuration "-{{initial_interval}}")).Unix -]] - [[- $maxLookbackTime := (now (parseDuration "-12h")).Unix -]] - [[- max $maxLookbackTime $initialTime -]] - [[- end -]] - - set: - target: url.params.to - value: >- - [[ if not (index .cursor "last_offset") ]][[ (now).Unix ]][[ end ]] - - set: - target: url.params.offset - value: >- - [[ if (index .cursor "last_offset") ]][[ .cursor.last_offset ]][[ end ]] -{{#if event_limit}} - - set: - target: url.params.limit - value: '{{event_limit}}' -{{/if}} - - set: - target: header.XTimestamp - value: '[[ formatDate (now) "20060102T15:04:05-0700" ]]' - - set: - target: header.XSignatureBase - value: '[[ sprintf "EG1-HMAC-SHA256 client_token=%s;access_token=%s;timestamp=%s;nonce=%s;" "{{client_token}}" "{{access_token}}" (.header.Get "XTimestamp") uuid ]]' - - set: - target: header.XSignatureKey - value: '[[ hmacBase64 "sha256" "{{client_secret}}" (.header.Get "XTimestamp") ]]' - - set: - target: header.XSignature - value: '[[ hmacBase64 "sha256" (.header.Get "XSignatureKey") "GET\t" .url.Scheme "\t" .url.Host "\t" .url.Path "?" .url.RawQuery "\t\t\t" (.header.Get "XSignatureBase") ]]' - - set: - target: header.Authorization - value: '[[ sprintf "%ssignature=%s" (.header.Get "XSignatureBase") (.header.Get "XSignature") ]]' - - delete: - target: header.XSignature - - delete: - target: header.XSignatureKey - - delete: - target: header.XSignatureBase - - delete: - target: header.XTimestamp - -response.decode_as: application/x-ndjson - -response.pagination: - - set: - target: url.params.offset - # This template evaluates to an empty string when the response contains no events - # as indicated by the 'total' field in the ResponseContext. This stops pagination. - value: '[[ if not (eq (toInt .last_event.total) 0) ]][[ .last_event.offset ]][[ end ]]' - fail_on_template_error: true - - delete: - target: url.params.from - - delete: - target: url.params.to - -cursor: - last_offset: - value: '[[ .last_event.offset ]]' - -{{#if tags.length}} -tags: -{{else if preserve_original_event}} -tags: -{{/if}} -{{#each tags as |tag i|}} - - {{tag}} -{{/each}} -{{#if preserve_original_event}} - - preserve_original_event -{{/if}} -{{#contains "forwarded" tags}} -publisher_pipeline.disable_host: true -{{/contains}} - -{{#if processors}} -processors: -{{processors}} -{{/if}} \ No newline at end of file diff --git a/packages/akamai/data_stream/siem/manifest.yml b/packages/akamai/data_stream/siem/manifest.yml index 29ddf08d435..3afcb588146 100644 --- a/packages/akamai/data_stream/siem/manifest.yml +++ b/packages/akamai/data_stream/siem/manifest.yml @@ -1,8 +1,8 @@ type: logs title: Akamai SIEM Logs streams: - - input: httpjson - template_path: httpjson.yml.hbs + - input: cel + template_path: cel.yml.hbs title: Akamai SIEM logs description: Collect Akamai logs via the SIEM API vars: @@ -76,6 +76,14 @@ streams: show_user: false title: Event Limit description: Defines the approximate maximum number of security events each fetch returns, in both offset and time-based modes. The default limit is 10000 and the maximum limit available is 600000. Listing an unlimited number of logs isn't possible. Expect requests to return a slightly higher number of security events than you set in the limit parameter, because data is stored in different buckets. + - name: max_executions + type: integer + title: Maximum Pages Per Interval + description: Maximum Pages Per Interval is the maximum number of pages that can be collected at each interval. + multi: false + required: false + show_user: false + default: 5000 - name: proxy_url type: text title: Proxy URL diff --git a/packages/akamai/manifest.yml b/packages/akamai/manifest.yml index edc05d0b1f2..ac6f95aed60 100644 --- a/packages/akamai/manifest.yml +++ b/packages/akamai/manifest.yml @@ -1,6 +1,6 @@ name: akamai title: Akamai -version: "2.28.1" +version: "2.29.0" description: Collect logs from Akamai with Elastic Agent. type: integration format_version: "3.0.2" @@ -18,7 +18,7 @@ policy_templates: title: Akamai logs description: Collect SIEM logs from Akamai inputs: - - type: httpjson + - type: cel title: "Collect Akamai SIEM logs via API" description: "Collecting SIEM logs from Akamai via API" - type: gcs From ffd666ea298aa3a0c24618f4d5a2ea39dfed4dfa Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Thu, 23 Oct 2025 10:32:20 +0530 Subject: [PATCH 05/10] updated changelog --- packages/akamai/changelog.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/akamai/changelog.yml b/packages/akamai/changelog.yml index 9f26a1d9faa..327f640469e 100644 --- a/packages/akamai/changelog.yml +++ b/packages/akamai/changelog.yml @@ -3,7 +3,7 @@ changes: - description: Migrated siem data stream from Httpjson to CEL. type: enhancement - link: https://github.com/elastic/integrations/pull/1111 + link: https://github.com/elastic/integrations/pull/15713 - version: "2.28.1" changes: - description: Fixed time duration handling in request parameters to conform to API guidelines. From d05fc95ec29c52657fde6fcc705e60557eb8fd14 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 24 Oct 2025 16:54:15 +0530 Subject: [PATCH 06/10] updated stack version to support cel functions max(dyn,dyn) & sprintf() --- packages/akamai/manifest.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/akamai/manifest.yml b/packages/akamai/manifest.yml index ac6f95aed60..76669cf7e01 100644 --- a/packages/akamai/manifest.yml +++ b/packages/akamai/manifest.yml @@ -7,7 +7,7 @@ format_version: "3.0.2" categories: [security, cdn_security] conditions: kibana: - version: "^8.13.0 || ^9.0.0" + version: "^8.18.0 || ^9.0.0" icons: - src: /img/akamai_logo.svg title: Akamai From 42dcd3d87b32aabf0b64734f2d11f557af32d7c4 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Mon, 27 Oct 2025 11:28:54 +0530 Subject: [PATCH 07/10] addressed Dan's suggestions --- .../data_stream/siem/agent/stream/cel.yml.hbs | 235 ++++++++++-------- 1 file changed, 126 insertions(+), 109 deletions(-) diff --git a/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs b/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs index ae4e75004fc..465ef8a1359 100644 --- a/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs +++ b/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs @@ -1,6 +1,5 @@ config_version: 2 interval: {{interval}} -max_executions: {{max_executions}} resource: url: {{api_host}}/siem/v1/configs/{{config_ids}} {{#if ssl}} @@ -16,6 +15,9 @@ resource: enabled: {{enable_request_tracer}} filename: "../../logs/cel/http-request-trace-*.ndjson" maxbackups: 5 +{{#if max_executions}} +max_executions: {{max_executions}} +{{/if}} state: client_token: {{client_token}} @@ -32,111 +34,126 @@ redact: program: | state.with( - ( - state.?cursor.?recovery_mode.orValue(false) ? - { - "from": int(now - duration("12h")), - "to": int(now) - } - : - state.?cursor.?last_offset.hasValue() ? - { - "offset": state.cursor.last_offset - } - : - { - "from": max(int(now - duration(state.initial_interval)), int(now - duration("12h"))), - "to": int(now) - } - ).as(params, - now.format("20060102T15:04:05-0700").as(timestamp, - uuid().as(nonce, - sprintf("EG1-HMAC-SHA256 client_token=%s;access_token=%s;timestamp=%s;nonce=%s;", - [state.client_token, state.access_token, timestamp, nonce]).as(sig_base, - base64(hmac(timestamp, "sha256", bytes(state.client_secret))).as(sig_key, - (state.url.trim_right("/") + "?" + { - "limit": [string(state.event_limit)], - ?"from": params.?from.orValue("") != "" ? optional.of([string(params.from)]) : optional.none(), - ?"to": params.?to.orValue("") != "" ? optional.of([string(params.to)]) : optional.none(), - ?"offset": params.?offset.orValue("") != "" ? optional.of([string(params.offset)]) : optional.none(), - }.format_query()).parse_url().as(u, - ( - "GET\t" + - string(u.Scheme) + "\t" + - string(u.Host) + "\t" + - string(u.Path) + "?" + - string(u.RawQuery) + - "\t\t\t" + sig_base - ).as(to_sign, - base64(hmac(to_sign, "sha256", bytes(sig_key))).as(signature, - sig_base + "signature=" + signature - ) - ) - ) - ) - ) - ) - ).as(auth_header, - request( - "GET", - state.url.trim_right("/") + "?" + { - "limit": [string(state.event_limit)], - ?"from": params.?from.orValue("") != "" ? optional.of([string(params.from)]) : optional.none(), - ?"to": params.?to.orValue("") != "" ? optional.of([string(params.to)]) : optional.none(), - ?"offset": params.?offset.orValue("") != "" ? optional.of([string(params.offset)]) : optional.none(), - }.format_query() - ).with({ - "Header": { - "Authorization": [auth_header] - } - }).do_request().as(resp, - resp.StatusCode == 200 ? - string(resp.Body).split("\n").filter(line, line != "").as(lines, { - "events": lines.map(line, { "message": line }), - "cursor": { - ?"last_offset": lines.size() > 0 ? - lines[lines.size() - 1].decode_json().as(lastEvent, - has(lastEvent.offset) ? optional.of(lastEvent.offset) : optional.none() - ) - : optional.none(), - "recovery_mode": false - }, - "want_more": lines.size() > 0 ? - lines[lines.size() - 1].decode_json().as(lastEvent, - has(lastEvent.offset) && lastEvent.offset != "" - ) - : false - }) - : resp.StatusCode == 416 ? - { - "events": { - "error": { - "code": string(resp.StatusCode), - "id": string(resp.Status), - "message": "GET " + state.url.trim_right("/") + ( - size(resp.Body) != 0 ? string(resp.Body) - : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' - ), - }, - }, - "cursor": { "recovery_mode": true }, - "want_more": true - } - : - { - "events": { - "error": { - "code": string(resp.StatusCode), - "id": string(resp.Status), - "message": "GET " + state.url.trim_right("/") + ( - size(resp.Body) != 0 ? string(resp.Body) - : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' - ), - }, - }, - "want_more": false - } - ) - ) - ) - ) + ( + state.?cursor.recovery_mode.orValue(false) ? + { + "from": int(now - duration("12h")), + "to": int(now), + } + : state.?cursor.last_offset.hasValue() ? + { + "offset": state.cursor.last_offset, + } + : + { + "from": max(int(now - duration(state.initial_interval)), int(now - duration("12h"))), + "to": int(now), + } + ).as(params, + now.format("20060102T15:04:05-0700").as(timestamp, + uuid().as(nonce, + sprintf( + "EG1-HMAC-SHA256 client_token=%s;access_token=%s;timestamp=%s;nonce=%s;", + [state.client_token, state.access_token, timestamp, nonce] + ).as(sig_base, + base64(hmac(timestamp, "sha256", bytes(state.client_secret))).as(sig_key, + ( + state.url.trim_right("/") + "?" + { + "limit": [string(state.event_limit)], + ?"from": (params.?from.orValue("") != "") ? optional.of([string(params.from)]) : optional.none(), + ?"to": (params.?to.orValue("") != "") ? optional.of([string(params.to)]) : optional.none(), + ?"offset": (params.?offset.orValue("") != "") ? optional.of([string(params.offset)]) : optional.none(), + }.format_query() + ).parse_url().as(u, + sprintf( + "GET\t%s\t%s\t%s?%s\t\t\t%s", + [ + string(u.Scheme), + string(u.Host), + string(u.Path), + string(u.RawQuery), + sig_base, + ] + ).as(to_sign, + base64(hmac(to_sign, "sha256", bytes(sig_key))).as(signature, + sig_base + "signature=" + signature + ) + ) + ) + ) + ) + ) + ).as(auth_header, + request( + "GET", + state.url.trim_right("/") + "?" + { + "limit": [string(state.event_limit)], + ?"from": (params.?from.orValue("") != "") ? optional.of([string(params.from)]) : optional.none(), + ?"to": (params.?to.orValue("") != "") ? optional.of([string(params.to)]) : optional.none(), + ?"offset": (params.?offset.orValue("") != "") ? optional.of([string(params.offset)]) : optional.none(), + }.format_query() + ).with( + { + "Header": { + "Authorization": [auth_header], + }, + } + ).do_request().as(resp, + (resp.StatusCode == 200) ? + string(resp.Body).split("\n").filter(line, line != "").as(lines, + { + "events": lines.map(line, {"message": line}), + "cursor": { + ?"last_offset": (lines.size() > 0) ? + lines[lines.size() - 1].decode_json().as(lastEvent, + lastEvent.?offset + ) + : + optional.none(), + "recovery_mode": false, + }, + "want_more": (lines.size() > 0) ? + lines[lines.size() - 1].decode_json().as(lastEvent, + has(lastEvent.offset) && lastEvent.offset != "" + ) + : + false, + } + ) + : (resp.StatusCode == 416) ? + { + "events": { + "error": { + "code": string(resp.StatusCode), + "id": string(resp.Status), + "message": "GET " + state.url.trim_right("/") + ( + (size(resp.Body) != 0) ? + string(resp.Body) + : + string(resp.Status) + " (" + string(resp.StatusCode) + ")" + ), + }, + }, + "cursor": {"recovery_mode": true}, + "want_more": true, + } + : + { + "events": { + "error": { + "code": string(resp.StatusCode), + "id": string(resp.Status), + "message": "GET " + state.url.trim_right("/") + ( + (size(resp.Body) != 0) ? + string(resp.Body) + : + string(resp.Status) + " (" + string(resp.StatusCode) + ")" + ), + }, + }, + "want_more": false, + } + ) + ) + ) +) From b4206f4510938addb383de744e4b479c76305c71 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Mon, 27 Oct 2025 11:30:57 +0530 Subject: [PATCH 08/10] updated changelog --- packages/akamai/changelog.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/akamai/changelog.yml b/packages/akamai/changelog.yml index 327f640469e..4015509dd1d 100644 --- a/packages/akamai/changelog.yml +++ b/packages/akamai/changelog.yml @@ -1,7 +1,7 @@ # newer versions go on top - version: "2.29.0" changes: - - description: Migrated siem data stream from Httpjson to CEL. + - description: Migrated siem data stream from HTTP JSON to CEL. type: enhancement link: https://github.com/elastic/integrations/pull/15713 - version: "2.28.1" From 2d32dc50142e7b1b33b8ee5bb78a3dfb5e50485c Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Mon, 27 Oct 2025 12:29:16 +0530 Subject: [PATCH 09/10] fixed yml formatting issues --- .../data_stream/siem/agent/stream/cel.yml.hbs | 246 +++++++++--------- 1 file changed, 123 insertions(+), 123 deletions(-) diff --git a/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs b/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs index 465ef8a1359..30e408025ab 100644 --- a/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs +++ b/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs @@ -34,126 +34,126 @@ redact: program: | state.with( - ( - state.?cursor.recovery_mode.orValue(false) ? - { - "from": int(now - duration("12h")), - "to": int(now), - } - : state.?cursor.last_offset.hasValue() ? - { - "offset": state.cursor.last_offset, - } - : - { - "from": max(int(now - duration(state.initial_interval)), int(now - duration("12h"))), - "to": int(now), - } - ).as(params, - now.format("20060102T15:04:05-0700").as(timestamp, - uuid().as(nonce, - sprintf( - "EG1-HMAC-SHA256 client_token=%s;access_token=%s;timestamp=%s;nonce=%s;", - [state.client_token, state.access_token, timestamp, nonce] - ).as(sig_base, - base64(hmac(timestamp, "sha256", bytes(state.client_secret))).as(sig_key, - ( - state.url.trim_right("/") + "?" + { - "limit": [string(state.event_limit)], - ?"from": (params.?from.orValue("") != "") ? optional.of([string(params.from)]) : optional.none(), - ?"to": (params.?to.orValue("") != "") ? optional.of([string(params.to)]) : optional.none(), - ?"offset": (params.?offset.orValue("") != "") ? optional.of([string(params.offset)]) : optional.none(), - }.format_query() - ).parse_url().as(u, - sprintf( - "GET\t%s\t%s\t%s?%s\t\t\t%s", - [ - string(u.Scheme), - string(u.Host), - string(u.Path), - string(u.RawQuery), - sig_base, - ] - ).as(to_sign, - base64(hmac(to_sign, "sha256", bytes(sig_key))).as(signature, - sig_base + "signature=" + signature - ) - ) - ) - ) - ) - ) - ).as(auth_header, - request( - "GET", - state.url.trim_right("/") + "?" + { - "limit": [string(state.event_limit)], - ?"from": (params.?from.orValue("") != "") ? optional.of([string(params.from)]) : optional.none(), - ?"to": (params.?to.orValue("") != "") ? optional.of([string(params.to)]) : optional.none(), - ?"offset": (params.?offset.orValue("") != "") ? optional.of([string(params.offset)]) : optional.none(), - }.format_query() - ).with( - { - "Header": { - "Authorization": [auth_header], - }, - } - ).do_request().as(resp, - (resp.StatusCode == 200) ? - string(resp.Body).split("\n").filter(line, line != "").as(lines, - { - "events": lines.map(line, {"message": line}), - "cursor": { - ?"last_offset": (lines.size() > 0) ? - lines[lines.size() - 1].decode_json().as(lastEvent, - lastEvent.?offset - ) - : - optional.none(), - "recovery_mode": false, - }, - "want_more": (lines.size() > 0) ? - lines[lines.size() - 1].decode_json().as(lastEvent, - has(lastEvent.offset) && lastEvent.offset != "" - ) - : - false, - } - ) - : (resp.StatusCode == 416) ? - { - "events": { - "error": { - "code": string(resp.StatusCode), - "id": string(resp.Status), - "message": "GET " + state.url.trim_right("/") + ( - (size(resp.Body) != 0) ? - string(resp.Body) - : - string(resp.Status) + " (" + string(resp.StatusCode) + ")" - ), - }, - }, - "cursor": {"recovery_mode": true}, - "want_more": true, - } - : - { - "events": { - "error": { - "code": string(resp.StatusCode), - "id": string(resp.Status), - "message": "GET " + state.url.trim_right("/") + ( - (size(resp.Body) != 0) ? - string(resp.Body) - : - string(resp.Status) + " (" + string(resp.StatusCode) + ")" - ), - }, - }, - "want_more": false, - } - ) - ) - ) -) + ( + state.?cursor.recovery_mode.orValue(false) ? + { + "from": int(now - duration("12h")), + "to": int(now), + } + : state.?cursor.last_offset.hasValue() ? + { + "offset": state.cursor.last_offset, + } + : + { + "from": max(int(now - duration(state.initial_interval)), int(now - duration("12h"))), + "to": int(now), + } + ).as(params, + now.format("20060102T15:04:05-0700").as(timestamp, + uuid().as(nonce, + sprintf( + "EG1-HMAC-SHA256 client_token=%s;access_token=%s;timestamp=%s;nonce=%s;", + [state.client_token, state.access_token, timestamp, nonce] + ).as(sig_base, + base64(hmac(timestamp, "sha256", bytes(state.client_secret))).as(sig_key, + ( + state.url.trim_right("/") + "?" + { + "limit": [string(state.event_limit)], + ?"from": (params.?from.orValue("") != "") ? optional.of([string(params.from)]) : optional.none(), + ?"to": (params.?to.orValue("") != "") ? optional.of([string(params.to)]) : optional.none(), + ?"offset": (params.?offset.orValue("") != "") ? optional.of([string(params.offset)]) : optional.none(), + }.format_query() + ).parse_url().as(u, + sprintf( + "GET\t%s\t%s\t%s?%s\t\t\t%s", + [ + string(u.Scheme), + string(u.Host), + string(u.Path), + string(u.RawQuery), + sig_base, + ] + ).as(to_sign, + base64(hmac(to_sign, "sha256", bytes(sig_key))).as(signature, + sig_base + "signature=" + signature + ) + ) + ) + ) + ) + ) + ).as(auth_header, + request( + "GET", + state.url.trim_right("/") + "?" + { + "limit": [string(state.event_limit)], + ?"from": (params.?from.orValue("") != "") ? optional.of([string(params.from)]) : optional.none(), + ?"to": (params.?to.orValue("") != "") ? optional.of([string(params.to)]) : optional.none(), + ?"offset": (params.?offset.orValue("") != "") ? optional.of([string(params.offset)]) : optional.none(), + }.format_query() + ).with( + { + "Header": { + "Authorization": [auth_header], + }, + } + ).do_request().as(resp, + (resp.StatusCode == 200) ? + string(resp.Body).split("\n").filter(line, line != "").as(lines, + { + "events": lines.map(line, {"message": line}), + "cursor": { + ?"last_offset": (lines.size() > 0) ? + lines[lines.size() - 1].decode_json().as(lastEvent, + lastEvent.?offset + ) + : + optional.none(), + "recovery_mode": false, + }, + "want_more": (lines.size() > 0) ? + lines[lines.size() - 1].decode_json().as(lastEvent, + has(lastEvent.offset) && lastEvent.offset != "" + ) + : + false, + } + ) + : (resp.StatusCode == 416) ? + { + "events": { + "error": { + "code": string(resp.StatusCode), + "id": string(resp.Status), + "message": "GET " + state.url.trim_right("/") + ( + (size(resp.Body) != 0) ? + string(resp.Body) + : + string(resp.Status) + " (" + string(resp.StatusCode) + ")" + ), + }, + }, + "cursor": {"recovery_mode": true}, + "want_more": true, + } + : + { + "events": { + "error": { + "code": string(resp.StatusCode), + "id": string(resp.Status), + "message": "GET " + state.url.trim_right("/") + ( + (size(resp.Body) != 0) ? + string(resp.Body) + : + string(resp.Status) + " (" + string(resp.StatusCode) + ")" + ), + }, + }, + "want_more": false, + } + ) + ) + ) + ) From d97ebf28cc06983f4bb44da2e6c7c0b6ddbd2352 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Tue, 28 Oct 2025 12:47:44 +0530 Subject: [PATCH 10/10] formatted with celfmt --- packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs b/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs index 30e408025ab..6ee72bd06a9 100644 --- a/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs +++ b/packages/akamai/data_stream/siem/agent/stream/cel.yml.hbs @@ -32,7 +32,7 @@ redact: - access_token - client_token -program: | +program: |- state.with( ( state.?cursor.recovery_mode.orValue(false) ?