Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 19 additions & 24 deletions src/secops/chronicle/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@
#
"""Statistics functionality for Chronicle searches."""
from datetime import datetime
from typing import Any
from typing import Any, TYPE_CHECKING

from secops.chronicle.models import APIVersion
from secops.chronicle.utils.request_utils import (
chronicle_request,
)
from secops.exceptions import APIError

if TYPE_CHECKING:
from secops.chronicle.client import ChronicleClient


def get_stats(
client,
client: "ChronicleClient",
query: str,
start_time: datetime,
end_time: datetime,
Expand Down Expand Up @@ -56,35 +63,23 @@ def get_stats(
# Unused parameters, kept for backward compatibility
_ = (max_events, case_insensitive, max_attempts)

# Format the instance ID for the API call
instance = client.instance_id

# Endpoint for UDM search
url = f"{client.base_url}/{instance}:udmSearch"

# Format times for the API
start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

# Query parameters for the API call
params = {
"query": query,
"timeRange.start_time": start_time_str,
"timeRange.end_time": end_time_str,
"timeRange.start_time": start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"timeRange.end_time": end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"limit": max_values, # Limit to specified number of results
}

# Make the API request
response = client.session.get(url, params=params, timeout=timeout)
if response.status_code != 200:
raise APIError(
f"Error executing stats search: Status {response.status_code}, "
f"Response: {response.text}"
)

results = response.json()
results = chronicle_request(
client,
method="GET",
endpoint_path=":udmSearch",
api_version=APIVersion.V1ALPHA,
params=params,
timeout=timeout,
)

# Check if stats data is available in the response
if "stats" not in results:
raise APIError("No stats found in response")

Expand Down
9 changes: 8 additions & 1 deletion src/secops/chronicle/utils/request_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def chronicle_request(
json: dict[str, Any] | None = None,
expected_status: int | set[int] | tuple[int, ...] | list[int] = 200,
error_message: str | None = None,
timeout: int | None = None,
) -> dict[str, Any] | list[Any]:
"""Perform an HTTP request and return JSON, raising APIError on failure.

Expand All @@ -216,6 +217,7 @@ def chronicle_request(
(e.g. 200) or an iterable of acceptable status codes (e.g. {200, 204}).
If the response status is not acceptable, an APIError is raised.
error_message: Optional base error message to include on failure
timeout: Optional timeout in seconds for the request

Returns:
Parsed JSON response.
Expand All @@ -237,7 +239,12 @@ def chronicle_request(

try:
response = client.session.request(
method=method, url=url, params=params, json=json, headers=headers
method=method,
url=url,
params=params,
json=json,
headers=headers,
timeout=timeout,
)
except GoogleAuthError as exc:
base_msg = error_message or "Google authentication failed"
Expand Down
38 changes: 0 additions & 38 deletions tests/chronicle/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,44 +90,6 @@ def test_chronicle_client_custom_session_user_agent():
assert client.session.headers.get("User-Agent") == "secops-wrapper-sdk"


def test_get_stats(chronicle_client):
"""Test stats search functionality."""
# Mock the search request
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"stats": {
"results": [
{"column": "count", "values": [{"value": {"int64Val": "42"}}]},
{
"column": "hostname",
"values": [{"value": {"stringVal": "test-host"}}],
},
]
}
}

with patch.object(chronicle_client.session, "get", return_value=mock_response):
result = chronicle_client.get_stats(
query="""target.ip != ""
match:
target.ip, principal.hostname
outcome:
$count = count(metadata.id)
order:
principal.hostname asc""",
start_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
end_time=datetime(2024, 1, 2, tzinfo=timezone.utc),
max_events=10,
max_values=10,
)

assert result["total_rows"] == 1
assert result["columns"] == ["count", "hostname"]
assert result["rows"][0]["count"] == 42
assert result["rows"][0]["hostname"] == "test-host"


def test_search_udm(chronicle_client):
"""Test UDM search functionality."""
# Mock the search request
Expand Down
Loading
Loading