Skip to content
14 changes: 8 additions & 6 deletions src/secops/chronicle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,8 @@ def search_udm(
max_attempts: int = 30,
timeout: int = 30,
debug: bool = False,
) -> dict[str, Any]:
as_list: bool = False,
) -> dict[str, Any] | list[dict[str, Any]]:
"""Search UDM events in Chronicle.

Args:
Expand All @@ -885,13 +886,13 @@ def search_udm(
max_attempts: Maximum number of polling attempts (default: 30)
timeout: Timeout in seconds for each API request (default: 30)
debug: Print debug information during execution
as_list: If True, return a list of events instead of a dict
with events list and nextPageToken.

Returns:
Dictionary with search results containing:
- events: List of UDM events with 'name' and 'udm' fields
- total_events: Number of events returned
- more_data_available: Boolean indicating
if more results are available
If as_list is True: List of Events.
If as_list is False: Dict with event list, total number of event and
flag to check if more data is available.

Raises:
APIError: If the API request fails
Expand All @@ -906,6 +907,7 @@ def search_udm(
max_attempts,
timeout,
debug,
as_list,
)

def find_udm_field_values(
Expand Down
82 changes: 32 additions & 50 deletions src/secops/chronicle/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@
"""UDM search functionality for Chronicle."""

from datetime import datetime
from typing import Any
from typing import Any, TYPE_CHECKING

import requests
from secops.chronicle.models import APIVersion
from secops.chronicle.utils.request_utils import (
chronicle_request,
)

from secops.exceptions import APIError
if TYPE_CHECKING:
from secops.chronicle.client import ChronicleClient


def search_udm(
client,
client: "ChronicleClient",
query: str,
start_time: datetime,
end_time: datetime,
Expand All @@ -32,7 +36,8 @@ def search_udm(
max_attempts: int = 30,
timeout: int = 30,
debug: bool = False,
) -> dict[str, Any]:
as_list: bool = False,
) -> dict[str, Any] | list[dict[str, Any]]:
"""Perform a UDM search query using the Chronicle V1alpha API.

Args:
Expand All @@ -46,23 +51,19 @@ def search_udm(
for backwards compatibility)
timeout: Timeout in seconds for each API request (default: 30)
debug: Print debug information during execution
as_list: Whether to return results as a list or dictionary

Returns:
Dict containing the search results with events
If as_list is True: List of Events.
If as_list is False: Dict with event list, total number of event and
flag to check if more data is available.

Raises:
APIError: If the API request fails
"""

# Unused parameters, kept for backward compatibility
_ = (case_insensitive, max_attempts)

# Format the instance ID for the API call
instance = client.instance_id

# Endpoint for UDM search
url = f"{client.base_url}/{instance}:udmSearch"

# Format times for the API
start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
Expand All @@ -79,40 +80,21 @@ def search_udm(
print(f"Executing UDM search: {query}")
print(f"Time range: {start_time_str} to {end_time_str}")

try:
response = client.session.get(url, params=params, timeout=timeout)

if response.status_code != 200:
error_msg = (
f"Error executing search: Status {response.status_code}, "
f"Response: {response.text}"
)
if debug:
print(f"Error: {error_msg}")
raise APIError(error_msg)

# Parse the response
response_data = response.json()

# Extract events and metadata
events = response_data.get("events", [])
more_data_available = response_data.get("moreDataAvailable", False)

if debug:
print(f"Found {len(events)} events")
print(f"More data available: {more_data_available}")

# Build the result structure to match the expected format
result = {
"events": events,
"total_events": len(events),
"more_data_available": more_data_available,
}

return result

except requests.exceptions.RequestException as e:
error_msg = f"Request failed: {str(e)}"
if debug:
print(f"Error: {error_msg}")
raise APIError(error_msg) from e
result = chronicle_request(
client,
method="GET",
endpoint_path=":udmSearch",
api_version=APIVersion.V1ALPHA,
params=params,
timeout=timeout,
)

if as_list:
return result.get("events", [])

events = result.get("events", [])
return {
"events": events,
"total_events": len(events),
"more_data_available": result.get("moreDataAvailable", False),
}
35 changes: 0 additions & 35 deletions tests/chronicle/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,41 +90,6 @@ def test_chronicle_client_custom_session_user_agent():
assert client.session.headers.get("User-Agent") == "secops-wrapper-sdk"


def test_search_udm(chronicle_client):
"""Test UDM search functionality."""
# Mock the search request
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"events": [
{
"name": "projects/test-project/locations/us/instances/test-instance/events/event1",
"udm": {
"metadata": {
"eventTimestamp": "2024-01-01T00:00:00Z",
"eventType": "NETWORK_CONNECTION",
},
"target": {"ip": "192.168.1.1", "hostname": "test-host"},
},
}
],
"moreDataAvailable": False,
}

with patch.object(chronicle_client.session, "get", return_value=mock_response):
result = chronicle_client.search_udm(
query='target.ip != ""',
start_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
end_time=datetime(2024, 1, 2, tzinfo=timezone.utc),
max_events=10,
)

assert "events" in result
assert "total_events" in result
assert result["total_events"] == 1
assert result["events"][0]["udm"]["target"]["ip"] == "192.168.1.1"


@patch("secops.chronicle.entity._detect_value_type_for_query")
@patch("secops.chronicle.entity._summarize_entity_by_id")
def test_summarize_entity_ip(mock_summarize_by_id, mock_detect, chronicle_client):
Expand Down
Loading
Loading