diff --git a/.doc_gen/metadata/guardduty_metadata.yaml b/.doc_gen/metadata/guardduty_metadata.yaml new file mode 100644 index 00000000000..119cf128248 --- /dev/null +++ b/.doc_gen/metadata/guardduty_metadata.yaml @@ -0,0 +1,158 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +# GuardDuty code examples for the SDK for Python. +guardduty_Hello: + title: Hello &GD; + title_abbrev: Hello &GD; + synopsis: get started using &GD;. + category: Hello + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/guardduty + excerpts: + - description: + snippet_tags: + - python.example_code.guardduty.Hello + services: + guardduty: {ListDetectors} +guardduty_CreateDetector: + title: Create a &GD; detector using an &AWS; SDK + title_abbrev: Create a detector + synopsis: create a &GD; detector. + category: Actions + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/guardduty + excerpts: + - description: + snippet_tags: + - python.example_code.guardduty.GuardDutyWrapper.decl + - python.example_code.guardduty.CreateDetector + services: + guardduty: {CreateDetector} +guardduty_GetDetector: + title: Get a &GD; detector using an &AWS; SDK + title_abbrev: Get a detector + synopsis: get a &GD; detector. + category: Actions + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/guardduty + excerpts: + - description: + snippet_tags: + - python.example_code.guardduty.GuardDutyWrapper.decl + - python.example_code.guardduty.GetDetector + services: + guardduty: {GetDetector} +guardduty_ListDetectors: + title: List &GD; detectors using an &AWS; SDK + title_abbrev: List detectors + synopsis: list &GD; detectors. + category: Actions + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/guardduty + excerpts: + - description: + snippet_tags: + - python.example_code.guardduty.GuardDutyWrapper.decl + - python.example_code.guardduty.ListDetectors + services: + guardduty: {ListDetectors} +guardduty_CreateSampleFindings: + title: Create &GD; sample findings using an &AWS; SDK + title_abbrev: Create sample findings + synopsis: create &GD; sample findings. + category: Actions + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/guardduty + excerpts: + - description: + snippet_tags: + - python.example_code.guardduty.GuardDutyWrapper.decl + - python.example_code.guardduty.CreateSampleFindings + services: + guardduty: {CreateSampleFindings} +guardduty_ListFindings: + title: List &GD; findings using an &AWS; SDK + title_abbrev: List findings + synopsis: list &GD; findings. + category: Actions + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/guardduty + excerpts: + - description: + snippet_tags: + - python.example_code.guardduty.GuardDutyWrapper.decl + - python.example_code.guardduty.ListFindings + services: + guardduty: {ListFindings} +guardduty_GetFindings: + title: Get &GD; findings using an &AWS; SDK + title_abbrev: Get findings + synopsis: get &GD; findings. + category: Actions + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/guardduty + excerpts: + - description: + snippet_tags: + - python.example_code.guardduty.GuardDutyWrapper.decl + - python.example_code.guardduty.GetFindings + services: + guardduty: {GetFindings} +guardduty_DeleteDetector: + title: Delete a &GD; detector using an &AWS; SDK + title_abbrev: Delete a detector + synopsis: delete a &GD; detector. + category: Actions + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/guardduty + excerpts: + - description: + snippet_tags: + - python.example_code.guardduty.GuardDutyWrapper.decl + - python.example_code.guardduty.DeleteDetector + services: + guardduty: {DeleteDetector} +guardduty_Scenario: + title: Learn the basics of &GD; using an &AWS; SDK + title_abbrev: Learn the basics + synopsis: learn the basics of &GD;. + category: Scenarios + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/guardduty + excerpts: + - description: Create a wrapper class that encapsulates &GD; functions. + snippet_tags: + - python.example_code.guardduty.GuardDutyWrapper.class + - description: Use the wrapper class to run an interactive scenario at a command prompt. + snippet_tags: + - python.example_code.guardduty.GuardDutyScenario + services: + guardduty: {CreateDetector, GetDetector, ListDetectors, CreateSampleFindings, ListFindings, GetFindings, DeleteDetector} \ No newline at end of file diff --git a/python/example_code/guardduty/README.md b/python/example_code/guardduty/README.md new file mode 100644 index 00000000000..5931e7ff082 --- /dev/null +++ b/python/example_code/guardduty/README.md @@ -0,0 +1,115 @@ +# Amazon GuardDuty code examples for the SDK for Python + +## Overview + +Shows how to use the AWS SDK for Python (Boto3) to work with Amazon GuardDuty. + + + + +_Amazon GuardDuty is a threat detection service that continuously monitors for malicious activity and unauthorized behavior to protect your AWS accounts and workloads._ + +## ⚠ Important + +* Running this code might result in charges to your AWS account. For more details, see [AWS Pricing](https://aws.amazon.com/pricing/) and [Free Tier](https://aws.amazon.com/free/). +* Running the tests might result in charges to your AWS account. +* We recommend that you grant your code least privilege. At most, grant only the minimum permissions required to perform the task. For more information, see [Grant least privilege](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege). +* This code is not tested in every AWS Region. For more information, see [AWS Regional Services](https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services). + + + + +## Code examples + +### Prerequisites + +For prerequisites, see the [README](../../README.md#Prerequisites) in the `python` folder. + +Install the packages required by these examples by running the following in a virtual environment: + +``` +python -m pip install -r requirements.txt +``` + + + + +### Get started + +- [Hello GuardDuty](guardduty_hello.py#L15) (`ListDetectors`) + +### Single actions + +Code excerpts that show you how to call individual service functions. + +- [CreateDetector](guardduty_wrapper.py#L35) +- [CreateSampleFindings](guardduty_wrapper.py#L108) +- [DeleteDetector](guardduty_wrapper.py#L189) +- [GetDetector](guardduty_wrapper.py#L78) +- [GetFindings](guardduty_wrapper.py#L162) +- [ListDetectors](guardduty_wrapper.py#L56) +- [ListFindings](guardduty_wrapper.py#L135) + +### Scenarios + +Code examples that show you how to accomplish a specific task by calling multiple functions within the same service. + +- [Learn the basics of GuardDuty](scenario_guardduty_basics.py) + +## Run the examples + +### Instructions + + + + + +#### Hello GuardDuty + +This example shows you how to get started using GuardDuty. + +``` +python guardduty_hello.py +``` + + +#### Learn the basics of GuardDuty + +This example shows you how to do the following: + +* Create a GuardDuty detector to enable threat detection. +* Generate sample findings for demonstration purposes. +* List and examine findings by severity. +* Delete the detector to clean up resources. + +``` +python scenario_guardduty_basics.py +``` + +### Tests + +⚠ Running tests might result in charges to your AWS account. + + +To find instructions for running these tests, see the [README](../../README.md#Tests) +in the `python` folder. + + + + + + +## Additional resources + +- [Amazon GuardDuty User Guide](https://docs.aws.amazon.com/guardduty/latest/ug/what-is-guardduty.html) +- [Amazon GuardDuty API Reference](https://docs.aws.amazon.com/guardduty/latest/APIReference/Welcome.html) +- [AWS SDK for Python (Boto3) GuardDuty reference](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/guardduty.html) + + + + +--- + +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 \ No newline at end of file diff --git a/python/example_code/guardduty/guardduty_hello.py b/python/example_code/guardduty/guardduty_hello.py new file mode 100644 index 00000000000..4aef9662a68 --- /dev/null +++ b/python/example_code/guardduty/guardduty_hello.py @@ -0,0 +1,63 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Purpose + +Shows how to use the AWS SDK for Python (Boto3) to get started with Amazon GuardDuty. +This example demonstrates the most basic GuardDuty operation: listing existing detectors +in the current region. +""" + +import logging +import boto3 +from botocore.exceptions import ClientError + +from guardduty_wrapper import GuardDutyWrapper + + +# snippet-start:[python.example_code.guardduty.Hello] +def hello_guardduty(): + """ + Use the AWS SDK for Python (Boto3) to check if GuardDuty is available + in the current region and list any existing detectors. + This function is typically used to verify GuardDuty service connectivity. + """ + print("Hello, Amazon GuardDuty!") + + try: + # Create GuardDuty wrapper + guardduty_wrapper = GuardDutyWrapper.from_client() + + # List existing detectors + detector_ids = guardduty_wrapper.list_detectors() + + if detector_ids: + print(f"Found {len(detector_ids)} GuardDuty detector(s) in this region:") + for detector_id in detector_ids: + print(f" - {detector_id}") + else: + print("No GuardDuty detectors found in this region.") + print( + "You can create a detector to start using GuardDuty threat detection." + ) + + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "AccessDeniedException": + print("Access denied. Please check your AWS credentials and permissions.") + elif error_code == "UnauthorizedOperation": + print( + "Unauthorized operation. Please ensure you have GuardDuty permissions." + ) + else: + print(f"Error accessing GuardDuty: {e}") + except Exception as e: + print(f"Unexpected error: {e}") + + +# snippet-end:[python.example_code.guardduty.Hello] + +if __name__ == "__main__": + logging.basicConfig(level=logging.WARNING, format="%(levelname)s: %(message)s") + hello_guardduty() diff --git a/python/example_code/guardduty/guardduty_wrapper.py b/python/example_code/guardduty/guardduty_wrapper.py new file mode 100644 index 00000000000..7696e6700c1 --- /dev/null +++ b/python/example_code/guardduty/guardduty_wrapper.py @@ -0,0 +1,236 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Purpose + +Shows how to use the AWS SDK for Python (Boto3) with Amazon GuardDuty to +manage threat detection and security findings. +""" + +import logging +from typing import Dict, List, Optional, Any +import boto3 +from botocore.exceptions import ClientError + +logger = logging.getLogger(__name__) + + +# snippet-start:[python.example_code.guardduty.GuardDutyWrapper.class] +# snippet-start:[python.example_code.guardduty.GuardDutyWrapper.decl] +class GuardDutyWrapper: + """Encapsulates Amazon GuardDuty functionality.""" + + def __init__(self, guardduty_client: boto3.client): + """ + :param guardduty_client: A Boto3 GuardDuty client. + """ + self.guardduty_client = guardduty_client + + @classmethod + def from_client(cls): + """ + Creates a GuardDutyWrapper instance with a default GuardDuty client. + + :return: An instance of GuardDutyWrapper. + """ + guardduty_client = boto3.client("guardduty") + return cls(guardduty_client) + + # snippet-end:[python.example_code.guardduty.GuardDutyWrapper.decl] + + # snippet-start:[python.example_code.guardduty.CreateDetector] + def create_detector( + self, enable: bool = True, finding_publishing_frequency: str = "FIFTEEN_MINUTES" + ) -> str: + """ + Creates a GuardDuty detector to enable threat detection. + + :param enable: Whether to enable the detector immediately. + :param finding_publishing_frequency: How often to publish findings. + :return: The detector ID. + """ + try: + response = self.guardduty_client.create_detector( + Enable=enable, FindingPublishingFrequency=finding_publishing_frequency + ) + detector_id = response["DetectorId"] + logger.info(f"Created GuardDuty detector with ID: {detector_id}") + return detector_id + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "BadRequestException": + logger.error("Invalid parameters provided for detector creation") + elif error_code == "InternalServerErrorException": + logger.error("Internal server error occurred during detector creation") + else: + logger.error(f"Error creating detector: {e}") + raise + + # snippet-end:[python.example_code.guardduty.CreateDetector] + + # snippet-start:[python.example_code.guardduty.ListDetectors] + def list_detectors(self) -> List[str]: + """ + Lists all GuardDuty detectors in the current region. + + :return: A list of detector IDs. + """ + try: + response = self.guardduty_client.list_detectors() + detector_ids = response.get("DetectorIds", []) + logger.info(f"Found {len(detector_ids)} GuardDuty detectors") + return detector_ids + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "BadRequestException": + logger.error("Invalid request parameters for listing detectors") + elif error_code == "InternalServerErrorException": + logger.error("Internal server error occurred while listing detectors") + else: + logger.error(f"Error listing detectors: {e}") + raise + + # snippet-end:[python.example_code.guardduty.ListDetectors] + + # snippet-start:[python.example_code.guardduty.GetDetector] + def get_detector(self, detector_id: str) -> Dict[str, Any]: + """ + Gets detailed information about a GuardDuty detector. + + :param detector_id: The ID of the detector to retrieve. + :return: Detector details. + """ + try: + response = self.guardduty_client.get_detector(DetectorId=detector_id) + logger.info(f"Retrieved detector details for ID: {detector_id}") + return response + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "BadRequestException": + logger.error( + f"Invalid detector ID format or detector not found: {detector_id}" + ) + elif error_code == "InternalServerErrorException": + logger.error("Internal server error occurred while retrieving detector") + else: + logger.error(f"Error getting detector {detector_id}: {e}") + raise + + # snippet-end:[python.example_code.guardduty.GetDetector] + + # snippet-start:[python.example_code.guardduty.CreateSampleFindings] + def create_sample_findings( + self, detector_id: str, finding_types: Optional[List[str]] = None + ) -> None: + """ + Creates sample findings for testing and demonstration purposes. + + :param detector_id: The ID of the detector to create sample findings for. + :param finding_types: Optional list of specific finding types to generate. + """ + try: + params = {"DetectorId": detector_id} + if finding_types: + params["FindingTypes"] = finding_types + + self.guardduty_client.create_sample_findings(**params) + logger.info(f"Created sample findings for detector: {detector_id}") + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "BadRequestException": + logger.error(f"Invalid detector ID or finding types: {detector_id}") + elif error_code == "InternalServerErrorException": + logger.error( + "Internal server error occurred while creating sample findings" + ) + else: + logger.error( + f"Error creating sample findings for detector {detector_id}: {e}" + ) + raise + + # snippet-end:[python.example_code.guardduty.CreateSampleFindings] + + # snippet-start:[python.example_code.guardduty.ListFindings] + def list_findings(self, detector_id: str, max_results: int = 50) -> List[str]: + """ + Lists finding IDs for a GuardDuty detector. + + :param detector_id: The ID of the detector to list findings for. + :param max_results: Maximum number of findings to return. + :return: A list of finding IDs. + """ + try: + response = self.guardduty_client.list_findings( + DetectorId=detector_id, MaxResults=max_results + ) + finding_ids = response.get("FindingIds", []) + logger.info(f"Found {len(finding_ids)} findings for detector {detector_id}") + return finding_ids + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "BadRequestException": + logger.error(f"Invalid parameters for listing findings: {detector_id}") + elif error_code == "InternalServerErrorException": + logger.error("Internal server error occurred while listing findings") + else: + logger.error(f"Error listing findings for detector {detector_id}: {e}") + raise + + # snippet-end:[python.example_code.guardduty.ListFindings] + + # snippet-start:[python.example_code.guardduty.GetFindings] + def get_findings( + self, detector_id: str, finding_ids: List[str] + ) -> List[Dict[str, Any]]: + """ + Gets detailed information about specific findings. + + :param detector_id: The ID of the detector. + :param finding_ids: List of finding IDs to retrieve. + :return: List of finding details. + """ + try: + response = self.guardduty_client.get_findings( + DetectorId=detector_id, FindingIds=finding_ids + ) + findings = response.get("Findings", []) + logger.info(f"Retrieved {len(findings)} finding details") + return findings + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "BadRequestException": + logger.error(f"Invalid finding IDs format: {finding_ids}") + elif error_code == "InternalServerErrorException": + logger.error("Internal server error occurred while retrieving findings") + else: + logger.error(f"Error getting findings: {e}") + raise + + # snippet-end:[python.example_code.guardduty.GetFindings] + + # snippet-start:[python.example_code.guardduty.DeleteDetector] + def delete_detector(self, detector_id: str) -> None: + """ + Deletes a GuardDuty detector. + + :param detector_id: The ID of the detector to delete. + """ + try: + self.guardduty_client.delete_detector(DetectorId=detector_id) + logger.info(f"Deleted GuardDuty detector: {detector_id}") + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "BadRequestException": + logger.error(f"Detector not found or invalid ID: {detector_id}") + elif error_code == "InternalServerErrorException": + logger.error("Internal server error occurred while deleting detector") + else: + logger.error(f"Error deleting detector {detector_id}: {e}") + raise + + # snippet-end:[python.example_code.guardduty.DeleteDetector] + + +# snippet-end:[python.example_code.guardduty.GuardDutyWrapper.class] diff --git a/python/example_code/guardduty/requirements.txt b/python/example_code/guardduty/requirements.txt new file mode 100644 index 00000000000..9137f2a9ed8 --- /dev/null +++ b/python/example_code/guardduty/requirements.txt @@ -0,0 +1,2 @@ +boto3>=1.26.137 +botocore>=1.29.137 \ No newline at end of file diff --git a/python/example_code/guardduty/scenario_guardduty_basics.py b/python/example_code/guardduty/scenario_guardduty_basics.py new file mode 100644 index 00000000000..e1172bac5b4 --- /dev/null +++ b/python/example_code/guardduty/scenario_guardduty_basics.py @@ -0,0 +1,296 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Purpose + +Shows how to use the AWS SDK for Python (Boto3) to manage Amazon GuardDuty +detectors and findings. This scenario demonstrates: + +1. Creating a GuardDuty detector to enable threat detection +2. Generating sample findings for demonstration +3. Listing and examining findings +4. Cleaning up resources + +This example runs interactively and uses the demo_tools module for user input. +""" + +import logging +import os +import sys +import time +from typing import Optional + +import boto3 +from botocore.exceptions import ClientError + +from guardduty_wrapper import GuardDutyWrapper + +# Add relative path to include demo_tools +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) +from demo_tools import question as q + +logger = logging.getLogger(__name__) + + +# snippet-start:[python.example_code.guardduty.GuardDutyScenario] +class GuardDutyScenario: + """Runs an interactive scenario that shows how to use Amazon GuardDuty.""" + + def __init__(self, guardduty_wrapper: GuardDutyWrapper): + """ + :param guardduty_wrapper: An instance of the GuardDutyWrapper class. + """ + self.guardduty_wrapper = guardduty_wrapper + self.detector_id = None + + def run_scenario(self): + """Runs the GuardDuty basics scenario.""" + print("-" * 88) + print("Welcome to the Amazon GuardDuty basics scenario!") + print("-" * 88) + + print( + "Amazon GuardDuty is a threat detection service that continuously monitors " + "for malicious activity and unauthorized behavior to protect your AWS accounts " + "and workloads." + ) + print() + + try: + self._setup_phase() + self._demonstration_phase() + self._examination_phase() + except Exception as e: + logger.error(f"Scenario failed: {e}") + print(f"The scenario encountered an error: {e}") + finally: + self._cleanup_phase() + + def _setup_phase(self): + """Setup phase: Create or use existing GuardDuty detector.""" + print("=" * 60) + print("Setup: Creating GuardDuty detector") + print("=" * 60) + + # Check for existing detectors + existing_detectors = self.guardduty_wrapper.list_detectors() + + if existing_detectors: + print(f"Found {len(existing_detectors)} existing detector(s):") + for detector_id in existing_detectors: + detector_info = self.guardduty_wrapper.get_detector(detector_id) + status = detector_info.get("Status", "Unknown") + print(f" - {detector_id} (Status: {status})") + + if q.ask("Do you want to use an existing detector? (y/n): ", q.is_yesno): + self.detector_id = existing_detectors[0] + print(f"Using existing detector: {self.detector_id}") + else: + self.detector_id = self._create_new_detector() + else: + print("No existing detectors found. Creating a new one...") + self.detector_id = self._create_new_detector() + + # Display detector information + detector_info = self.guardduty_wrapper.get_detector(self.detector_id) + print(f"\nDetector Details:") + print(f" ID: {self.detector_id}") + print(f" Status: {detector_info.get('Status', 'Unknown')}") + print(f" Service Role: {detector_info.get('ServiceRole', 'Not specified')}") + print( + f" Finding Publishing Frequency: {detector_info.get('FindingPublishingFrequency', 'Unknown')}" + ) + + def _create_new_detector(self) -> str: + """Create a new GuardDuty detector.""" + print("Creating a new GuardDuty detector...") + + # Ask user for finding publishing frequency + frequencies = {"1": "FIFTEEN_MINUTES", "2": "ONE_HOUR", "3": "SIX_HOURS"} + + print("Choose finding publishing frequency:") + print("1. Every 15 minutes") + print("2. Every hour") + print("3. Every 6 hours") + + choice = q.ask("Enter your choice (1-3): ", q.is_int, q.in_range(1, 3)) + frequency = frequencies[str(choice)] + + detector_id = self.guardduty_wrapper.create_detector( + enable=True, finding_publishing_frequency=frequency + ) + + print(f"Successfully created detector: {detector_id}") + return detector_id + + def _demonstration_phase(self): + """Demonstration phase: Generate and explore sample findings.""" + print("\n" + "=" * 60) + print("Demonstration: Working with GuardDuty findings") + print("=" * 60) + + print( + "GuardDuty generates findings when it detects potential security threats. " + "Let's create some sample findings to explore GuardDuty's capabilities." + ) + + if q.ask("Create sample findings for demonstration? (y/n): ", q.is_yesno): + print("Creating sample findings...") + self.guardduty_wrapper.create_sample_findings(self.detector_id) + + print( + "Sample findings created! Waiting a moment for them to be processed..." + ) + time.sleep(5) # Give time for findings to be created + + # List findings + finding_ids = self.guardduty_wrapper.list_findings(self.detector_id) + + if finding_ids: + print(f"Found {len(finding_ids)} findings.") + + if q.ask( + "Would you like to examine the findings in detail? (y/n): ", + q.is_yesno, + ): + self._examine_findings(finding_ids[:5]) # Examine first 5 findings + else: + print("No findings found yet. They may take a few moments to appear.") + + def _examination_phase(self): + """Examination phase: Explore findings in detail.""" + print("\n" + "=" * 60) + print("Examination: Detailed finding analysis") + print("=" * 60) + + finding_ids = self.guardduty_wrapper.list_findings(self.detector_id) + + if not finding_ids: + print("No findings available for examination.") + return + + print(f"Total findings available: {len(finding_ids)}") + + if q.ask("Would you like to examine findings by severity? (y/n): ", q.is_yesno): + self._examine_findings_by_severity(finding_ids) + + def _examine_findings(self, finding_ids): + """Examine specific findings in detail.""" + if not finding_ids: + print("No findings to examine.") + return + + findings = self.guardduty_wrapper.get_findings(self.detector_id, finding_ids) + + for i, finding in enumerate(findings, 1): + print(f"\n--- Finding {i} ---") + print(f"ID: {finding.get('Id', 'Unknown')}") + print(f"Type: {finding.get('Type', 'Unknown')}") + print(f"Severity: {finding.get('Severity', 'Unknown')}") + print(f"Title: {finding.get('Title', 'No title')}") + print(f"Description: {finding.get('Description', 'No description')}") + + # Show service information if available + service_info = finding.get("Service", {}) + if service_info: + print(f"Service: {service_info.get('ServiceName', 'Unknown')}") + print(f"Detector ID: {service_info.get('DetectorId', 'Unknown')}") + + if i < len(findings) and q.ask( + "Continue to next finding? (y/n): ", q.is_yesno + ): + continue + else: + break + + def _examine_findings_by_severity(self, finding_ids): + """Group and examine findings by severity level.""" + findings = self.guardduty_wrapper.get_findings(self.detector_id, finding_ids) + + # Group findings by severity + severity_groups = {} + for finding in findings: + severity = finding.get("Severity", 0) + if severity >= 7.0: + level = "HIGH" + elif severity >= 4.0: + level = "MEDIUM" + else: + level = "LOW" + + if level not in severity_groups: + severity_groups[level] = [] + severity_groups[level].append(finding) + + # Display findings by severity + for level in ["HIGH", "MEDIUM", "LOW"]: + if level in severity_groups: + findings_in_level = severity_groups[level] + print(f"\n{level} Severity Findings ({len(findings_in_level)}):") + + for finding in findings_in_level: + print( + f" - {finding.get('Type', 'Unknown')} (Score: {finding.get('Severity', 'Unknown')})" + ) + print(f" {finding.get('Title', 'No title')}") + + def _cleanup_phase(self): + """Cleanup phase: Optionally delete the detector.""" + print("\n" + "=" * 60) + print("Cleanup: Managing GuardDuty resources") + print("=" * 60) + + if not self.detector_id: + print("No detector to clean up.") + return + + print( + "GuardDuty detectors continue to monitor your account and may incur charges. " + "You can disable the detector to stop monitoring, or delete it entirely." + ) + + cleanup_choice = q.ask( + "What would you like to do?\n" + "1. Keep the detector running\n" + "2. Delete the detector\n" + "Enter your choice (1-2): ", + q.is_int, + q.in_range(1, 2), + ) + + if cleanup_choice == 2: + if q.ask( + f"Are you sure you want to delete detector {self.detector_id}? (y/n): ", + q.is_yesno, + ): + try: + self.guardduty_wrapper.delete_detector(self.detector_id) + print(f"Detector {self.detector_id} has been deleted.") + except Exception as e: + print(f"Error deleting detector: {e}") + else: + print("Detector deletion cancelled.") + else: + print(f"Detector {self.detector_id} will continue running.") + print("You can manage it through the AWS Console or CLI.") + + +# snippet-end:[python.example_code.guardduty.GuardDutyScenario] + + +def main(): + """Runs the GuardDuty basics scenario.""" + logging.basicConfig(level=logging.WARNING, format="%(levelname)s: %(message)s") + + try: + guardduty_wrapper = GuardDutyWrapper.from_client() + scenario = GuardDutyScenario(guardduty_wrapper) + scenario.run_scenario() + except Exception as e: + logger.error(f"Failed to run scenario: {e}") + print(f"Failed to run the scenario: {e}") + + +if __name__ == "__main__": + main() diff --git a/python/example_code/guardduty/test/conftest.py b/python/example_code/guardduty/test/conftest.py new file mode 100644 index 00000000000..c24fdc49369 --- /dev/null +++ b/python/example_code/guardduty/test/conftest.py @@ -0,0 +1,39 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Contains common test fixtures used to run unit tests. +""" + +import sys +import os +import boto3 +import pytest + +script_dir = os.path.dirname(os.path.abspath(__file__)) + +# Add relative path to include GuardDutyWrapper. +sys.path.append(script_dir) +sys.path.append(os.path.dirname(script_dir)) +import scenario_guardduty_basics +from guardduty_wrapper import GuardDutyWrapper + +# Add relative path to include demo_tools in this code example without need for setup. +sys.path.append(os.path.join(script_dir, "../..")) + +from test_tools.fixtures.common import * + + +class ScenarioData: + def __init__(self, guardduty_client, guardduty_stubber): + self.guardduty_client = guardduty_client + self.guardduty_stubber = guardduty_stubber + self.wrapper = GuardDutyWrapper(guardduty_client) + self.scenario = scenario_guardduty_basics.GuardDutyScenario(self.wrapper) + + +@pytest.fixture +def scenario_data(make_stubber): + guardduty_client = boto3.client("guardduty") + guardduty_stubber = make_stubber(guardduty_client) + return ScenarioData(guardduty_client, guardduty_stubber) diff --git a/python/example_code/guardduty/test/test_guardduty_integration.py b/python/example_code/guardduty/test/test_guardduty_integration.py new file mode 100644 index 00000000000..c9732851694 --- /dev/null +++ b/python/example_code/guardduty/test/test_guardduty_integration.py @@ -0,0 +1,90 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Integration tests for guardduty_wrapper.py functions. +""" + +import boto3 +import pytest +import time + +from guardduty_wrapper import GuardDutyWrapper + + +@pytest.mark.integ +def test_guardduty_integration(): + """ + Integration test that works with existing detectors or creates a new one, + generates sample findings, examines them, and cleans up appropriately. + """ + wrapper = GuardDutyWrapper.from_client() + detector_id = None + created_detector = False + + try: + # Check for existing detectors first + existing_detectors = wrapper.list_detectors() + + if existing_detectors: + # Use existing detector + detector_id = existing_detectors[0] + print(f"Using existing detector: {detector_id}") + else: + # Create a new detector + detector_id = wrapper.create_detector( + enable=True, finding_publishing_frequency="FIFTEEN_MINUTES" + ) + created_detector = True + print(f"Created new detector: {detector_id}") + + assert detector_id is not None + + # Verify detector exists and get its info + detector_info = wrapper.get_detector(detector_id) + assert detector_info["Status"] == "ENABLED" + + # List detectors should include our detector + detector_ids = wrapper.list_detectors() + assert detector_id in detector_ids + + # Create sample findings + wrapper.create_sample_findings(detector_id) + + # Wait a moment for findings to be processed + time.sleep(10) + + # List findings + finding_ids = wrapper.list_findings(detector_id) + + # If we have findings, examine them + if finding_ids: + findings = wrapper.get_findings(detector_id, finding_ids[:5]) + assert len(findings) > 0 + + # Verify finding structure + for finding in findings: + assert "Id" in finding + assert "Type" in finding + assert "Severity" in finding + + finally: + # Only delete the detector if we created it + if detector_id and created_detector: + wrapper.delete_detector(detector_id) + + # Verify deletion + detector_ids = wrapper.list_detectors() + assert detector_id not in detector_ids + + +@pytest.mark.integ +def test_list_existing_detectors(): + """ + Simple integration test to list existing detectors. + """ + wrapper = GuardDutyWrapper.from_client() + + # This should not raise an exception + detector_ids = wrapper.list_detectors() + assert isinstance(detector_ids, list) diff --git a/python/example_code/guardduty/test/test_guardduty_scenario.py b/python/example_code/guardduty/test/test_guardduty_scenario.py new file mode 100644 index 00000000000..f416f9d547d --- /dev/null +++ b/python/example_code/guardduty/test/test_guardduty_scenario.py @@ -0,0 +1,199 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Unit tests for scenario_guardduty_basics.py functions. +""" + +import pytest +from unittest.mock import patch +import boto3 +from botocore.exceptions import ClientError + + +def test_scenario_create_detector_success(scenario_data, monkeypatch): + """Test successful detector creation in scenario.""" + detector_id = "test-detector-123" + + scenario_data.guardduty_stubber.stub_create_detector(detector_id) + + with patch("scenario_guardduty_basics.q.ask") as mock_ask: + mock_ask.return_value = 1 # Choose frequency 1 (15 minutes) + + result = scenario_data.scenario._create_new_detector() + assert result == detector_id + + +def test_scenario_setup_phase_with_existing_detector(scenario_data, monkeypatch): + """Test setup phase when existing detectors are found.""" + existing_detector_id = "existing-detector-123" + + scenario_data.guardduty_stubber.stub_list_detectors([existing_detector_id]) + scenario_data.guardduty_stubber.stub_get_detector(existing_detector_id) + scenario_data.guardduty_stubber.stub_get_detector( + existing_detector_id + ) # Called twice + + with patch("scenario_guardduty_basics.q.ask") as mock_ask: + mock_ask.return_value = True # Use existing detector + + scenario_data.scenario._setup_phase() + assert scenario_data.scenario.detector_id == existing_detector_id + + +def test_scenario_create_sample_findings(scenario_data): + """Test creating sample findings.""" + detector_id = "test-detector-123" + scenario_data.scenario.detector_id = detector_id + + scenario_data.guardduty_stubber.stub_create_sample_findings(detector_id) + scenario_data.guardduty_stubber.stub_list_findings( + detector_id, ["finding-1", "finding-2"] + ) + + with patch("scenario_guardduty_basics.q.ask") as mock_ask: + mock_ask.side_effect = [True, False] # Create findings, don't examine details + + with patch("time.sleep"): # Mock sleep to speed up test + scenario_data.scenario._demonstration_phase() + + +def test_scenario_examine_findings(scenario_data): + """Test examining findings in detail.""" + detector_id = "test-detector-123" + scenario_data.scenario.detector_id = detector_id # Set the detector_id + finding_ids = ["finding-1", "finding-2"] + findings = [ + { + "Id": "finding-1", + "AccountId": "123456789012", + "Arn": "arn:aws:guardduty:us-east-1:123456789012:detector/test-detector-id/finding/finding-1", + "Type": "Backdoor:EC2/C&CActivity.B!DNS", + "Severity": 8.0, + "Title": "Sample finding 1", + "Description": "Test finding description", + "CreatedAt": "2023-01-01T00:00:00.000Z", + "UpdatedAt": "2023-01-01T00:00:00.000Z", + "Region": "us-east-1", + "SchemaVersion": "2.0", + "Resource": { + "ResourceType": "Instance", + "InstanceDetails": {"InstanceId": "i-1234567890abcdef0"}, + }, + "Service": {"ServiceName": "GuardDuty", "DetectorId": detector_id}, + }, + { + "Id": "finding-2", + "AccountId": "123456789012", + "Arn": "arn:aws:guardduty:us-east-1:123456789012:detector/test-detector-id/finding/finding-2", + "Type": "Trojan:EC2/BlackholeTraffic", + "Severity": 5.0, + "Title": "Sample finding 2", + "Description": "Another test finding", + "CreatedAt": "2023-01-01T00:00:00.000Z", + "UpdatedAt": "2023-01-01T00:00:00.000Z", + "Region": "us-east-1", + "SchemaVersion": "2.0", + "Resource": { + "ResourceType": "Instance", + "InstanceDetails": {"InstanceId": "i-0987654321fedcba0"}, + }, + "Service": {"ServiceName": "GuardDuty", "DetectorId": detector_id}, + }, + ] + + scenario_data.guardduty_stubber.stub_get_findings( + detector_id, finding_ids, findings + ) + + with patch("scenario_guardduty_basics.q.ask") as mock_ask: + mock_ask.return_value = False # Don't continue to next finding + + scenario_data.scenario._examine_findings(finding_ids) + + +def test_scenario_cleanup_keep_detector(scenario_data): + """Test cleanup phase when keeping the detector.""" + detector_id = "test-detector-123" + scenario_data.scenario.detector_id = detector_id + + with patch("scenario_guardduty_basics.q.ask") as mock_ask: + mock_ask.return_value = 1 # Keep detector running + + scenario_data.scenario._cleanup_phase() + + +def test_scenario_cleanup_delete_detector(scenario_data): + """Test cleanup phase when deleting the detector.""" + detector_id = "test-detector-123" + scenario_data.scenario.detector_id = detector_id + + scenario_data.guardduty_stubber.stub_delete_detector(detector_id) + + with patch("scenario_guardduty_basics.q.ask") as mock_ask: + mock_ask.side_effect = [2, True] # Delete detector, confirm deletion + + scenario_data.scenario._cleanup_phase() + + +def test_scenario_examine_findings_by_severity(scenario_data): + """Test examining findings grouped by severity.""" + detector_id = "test-detector-123" + scenario_data.scenario.detector_id = detector_id # Set the detector_id + finding_ids = ["finding-1", "finding-2", "finding-3"] + findings = [ + { + "Id": "finding-1", + "AccountId": "123456789012", + "Arn": "arn:aws:guardduty:us-east-1:123456789012:detector/test-detector-id/finding/finding-1", + "Type": "High severity", + "Severity": 8.0, + "Title": "High finding", + "CreatedAt": "2023-01-01T00:00:00.000Z", + "UpdatedAt": "2023-01-01T00:00:00.000Z", + "Region": "us-east-1", + "SchemaVersion": "2.0", + "Resource": { + "ResourceType": "Instance", + "InstanceDetails": {"InstanceId": "i-1234567890abcdef0"}, + }, + }, + { + "Id": "finding-2", + "AccountId": "123456789012", + "Arn": "arn:aws:guardduty:us-east-1:123456789012:detector/test-detector-id/finding/finding-2", + "Type": "Medium severity", + "Severity": 5.0, + "Title": "Medium finding", + "CreatedAt": "2023-01-01T00:00:00.000Z", + "UpdatedAt": "2023-01-01T00:00:00.000Z", + "Region": "us-east-1", + "SchemaVersion": "2.0", + "Resource": { + "ResourceType": "Instance", + "InstanceDetails": {"InstanceId": "i-0987654321fedcba0"}, + }, + }, + { + "Id": "finding-3", + "AccountId": "123456789012", + "Arn": "arn:aws:guardduty:us-east-1:123456789012:detector/test-detector-id/finding/finding-3", + "Type": "Low severity", + "Severity": 2.0, + "Title": "Low finding", + "CreatedAt": "2023-01-01T00:00:00.000Z", + "UpdatedAt": "2023-01-01T00:00:00.000Z", + "Region": "us-east-1", + "SchemaVersion": "2.0", + "Resource": { + "ResourceType": "Instance", + "InstanceDetails": {"InstanceId": "i-1111222233334444"}, + }, + }, + ] + + scenario_data.guardduty_stubber.stub_get_findings( + detector_id, finding_ids, findings + ) + + scenario_data.scenario._examine_findings_by_severity(finding_ids) diff --git a/python/example_code/guardduty/test/test_guardduty_wrapper.py b/python/example_code/guardduty/test/test_guardduty_wrapper.py new file mode 100644 index 00000000000..546bf574f69 --- /dev/null +++ b/python/example_code/guardduty/test/test_guardduty_wrapper.py @@ -0,0 +1,217 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Unit tests for guardduty_wrapper.py functions. +""" + +import boto3 +import pytest +from botocore.exceptions import ClientError + +from guardduty_wrapper import GuardDutyWrapper + + +@pytest.mark.parametrize( + "error_code", [None, "BadRequestException", "InternalServerErrorException"] +) +def test_create_detector(make_stubber, error_code): + guardduty_client = boto3.client("guardduty", region_name="us-east-1") + guardduty_stubber = make_stubber(guardduty_client) + wrapper = GuardDutyWrapper(guardduty_client) + + detector_id = "test-detector-id" + enable = True + frequency = "FIFTEEN_MINUTES" + + guardduty_stubber.stub_create_detector( + detector_id, enable, frequency, error_code=error_code + ) + + if error_code is None: + result = wrapper.create_detector(enable, frequency) + assert result == detector_id + else: + with pytest.raises(ClientError) as exc_info: + wrapper.create_detector(enable, frequency) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize( + "error_code", [None, "BadRequestException", "InternalServerErrorException"] +) +def test_list_detectors(make_stubber, error_code): + guardduty_client = boto3.client("guardduty", region_name="us-east-1") + guardduty_stubber = make_stubber(guardduty_client) + wrapper = GuardDutyWrapper(guardduty_client) + + detector_ids = ["detector-1", "detector-2"] + + guardduty_stubber.stub_list_detectors(detector_ids, error_code=error_code) + + if error_code is None: + result = wrapper.list_detectors() + assert result == detector_ids + else: + with pytest.raises(ClientError) as exc_info: + wrapper.list_detectors() + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize( + "error_code", [None, "BadRequestException", "InternalServerErrorException"] +) +def test_get_detector(make_stubber, error_code): + guardduty_client = boto3.client("guardduty", region_name="us-east-1") + guardduty_stubber = make_stubber(guardduty_client) + wrapper = GuardDutyWrapper(guardduty_client) + + detector_id = "test-detector-id" + status = "ENABLED" + service_role = "arn:aws:iam::123456789012:role/aws-guardduty-service-role" + frequency = "FIFTEEN_MINUTES" + + guardduty_stubber.stub_get_detector( + detector_id, status, service_role, frequency, error_code=error_code + ) + + if error_code is None: + result = wrapper.get_detector(detector_id) + assert result["Status"] == status + assert result["ServiceRole"] == service_role + assert result["FindingPublishingFrequency"] == frequency + else: + with pytest.raises(ClientError) as exc_info: + wrapper.get_detector(detector_id) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize( + "error_code", [None, "BadRequestException", "InternalServerErrorException"] +) +def test_create_sample_findings(make_stubber, error_code): + guardduty_client = boto3.client("guardduty", region_name="us-east-1") + guardduty_stubber = make_stubber(guardduty_client) + wrapper = GuardDutyWrapper(guardduty_client) + + detector_id = "test-detector-id" + finding_types = ["Backdoor:EC2/C&CActivity.B!DNS"] + + guardduty_stubber.stub_create_sample_findings( + detector_id, finding_types, error_code=error_code + ) + + if error_code is None: + wrapper.create_sample_findings(detector_id, finding_types) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.create_sample_findings(detector_id, finding_types) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize( + "error_code", [None, "BadRequestException", "InternalServerErrorException"] +) +def test_list_findings(make_stubber, error_code): + guardduty_client = boto3.client("guardduty", region_name="us-east-1") + guardduty_stubber = make_stubber(guardduty_client) + wrapper = GuardDutyWrapper(guardduty_client) + + detector_id = "test-detector-id" + finding_ids = ["finding-1", "finding-2"] + max_results = 50 + + guardduty_stubber.stub_list_findings( + detector_id, finding_ids, max_results, error_code=error_code + ) + + if error_code is None: + result = wrapper.list_findings(detector_id, max_results) + assert result == finding_ids + else: + with pytest.raises(ClientError) as exc_info: + wrapper.list_findings(detector_id, max_results) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize( + "error_code", [None, "BadRequestException", "InternalServerErrorException"] +) +def test_get_findings(make_stubber, error_code): + guardduty_client = boto3.client("guardduty", region_name="us-east-1") + guardduty_stubber = make_stubber(guardduty_client) + wrapper = GuardDutyWrapper(guardduty_client) + + detector_id = "test-detector-id" + finding_ids = ["finding-1", "finding-2"] + findings = [ + { + "Id": "finding-1", + "AccountId": "123456789012", + "Arn": "arn:aws:guardduty:us-east-1:123456789012:detector/test-detector-id/finding/finding-1", + "Type": "Backdoor:EC2/C&CActivity.B!DNS", + "Severity": 8.0, + "Title": "Sample finding 1", + "Description": "Test finding description", + "CreatedAt": "2023-01-01T00:00:00.000Z", + "UpdatedAt": "2023-01-01T00:00:00.000Z", + "Region": "us-east-1", + "SchemaVersion": "2.0", + "Resource": { + "ResourceType": "Instance", + "InstanceDetails": {"InstanceId": "i-1234567890abcdef0"}, + }, + }, + { + "Id": "finding-2", + "AccountId": "123456789012", + "Arn": "arn:aws:guardduty:us-east-1:123456789012:detector/test-detector-id/finding/finding-2", + "Type": "Trojan:EC2/BlackholeTraffic", + "Severity": 5.0, + "Title": "Sample finding 2", + "Description": "Another test finding", + "CreatedAt": "2023-01-01T00:00:00.000Z", + "UpdatedAt": "2023-01-01T00:00:00.000Z", + "Region": "us-east-1", + "SchemaVersion": "2.0", + "Resource": { + "ResourceType": "Instance", + "InstanceDetails": {"InstanceId": "i-0987654321fedcba0"}, + }, + }, + ] + + guardduty_stubber.stub_get_findings( + detector_id, finding_ids, findings, error_code=error_code + ) + + if error_code is None: + result = wrapper.get_findings(detector_id, finding_ids) + assert result == findings + assert len(result) == 2 + assert result[0]["Id"] == "finding-1" + assert result[1]["Id"] == "finding-2" + else: + with pytest.raises(ClientError) as exc_info: + wrapper.get_findings(detector_id, finding_ids) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize( + "error_code", [None, "BadRequestException", "InternalServerErrorException"] +) +def test_delete_detector(make_stubber, error_code): + guardduty_client = boto3.client("guardduty", region_name="us-east-1") + guardduty_stubber = make_stubber(guardduty_client) + wrapper = GuardDutyWrapper(guardduty_client) + + detector_id = "test-detector-id" + + guardduty_stubber.stub_delete_detector(detector_id, error_code=error_code) + + if error_code is None: + wrapper.delete_detector(detector_id) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.delete_detector(detector_id) + assert exc_info.value.response["Error"]["Code"] == error_code diff --git a/python/test_tools/guardduty_stubber.py b/python/test_tools/guardduty_stubber.py new file mode 100644 index 00000000000..faaeb206f0e --- /dev/null +++ b/python/test_tools/guardduty_stubber.py @@ -0,0 +1,176 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Stub functions that are used by the Amazon GuardDuty unit tests. + +When tests are run against an actual AWS account, the stubber class does not +set up stubs and passes all calls through to the Boto 3 client. +""" + +from botocore.stub import ANY +from boto3 import client + +from test_tools.example_stubber import ExampleStubber + + +class GuardDutyStubber(ExampleStubber): + """ + A class that implements stub functions used by Amazon GuardDuty unit tests. + + The stubbed functions expect certain parameters to be passed to them as + part of the tests, and will raise errors when the actual parameters differ from + the expected. + """ + + def __init__(self, guardduty_client: client, use_stubs=True) -> None: + """ + Initializes the object with a specific client and configures it for + stubbing or AWS passthrough. + + :param guardduty_client: A Boto 3 Amazon GuardDuty client. + :param use_stubs: When True, use stubs to intercept requests. Otherwise, + pass requests through to AWS. + """ + super().__init__(guardduty_client, use_stubs) + + def stub_create_detector(self, detector_id: str, enable: bool = True, + finding_publishing_frequency: str = "FIFTEEN_MINUTES", + error_code: str = None) -> None: + """ + Stub the create_detector function. + + :param detector_id: The detector ID to return. + :param enable: Whether the detector is enabled. + :param finding_publishing_frequency: How often to publish findings. + :param error_code: Simulated error code to raise. + """ + expected_params = { + "Enable": enable, + "FindingPublishingFrequency": finding_publishing_frequency + } + response = { + "DetectorId": detector_id + } + self._stub_bifurcator( + "create_detector", expected_params, response, error_code=error_code + ) + + def stub_list_detectors(self, detector_ids: list, error_code: str = None) -> None: + """ + Stub the list_detectors function. + + :param detector_ids: List of detector IDs to return. + :param error_code: Simulated error code to raise. + """ + expected_params = {} + response = { + "DetectorIds": detector_ids + } + self._stub_bifurcator( + "list_detectors", expected_params, response, error_code=error_code + ) + + def stub_get_detector(self, detector_id: str, status: str = "ENABLED", + service_role: str = "arn:aws:iam::123456789012:role/aws-guardduty-service-role", + finding_publishing_frequency: str = "FIFTEEN_MINUTES", + error_code: str = None) -> None: + """ + Stub the get_detector function. + + :param detector_id: The detector ID. + :param status: The detector status. + :param service_role: The service role ARN. + :param finding_publishing_frequency: How often to publish findings. + :param error_code: Simulated error code to raise. + """ + expected_params = { + "DetectorId": detector_id + } + response = { + "Status": status, + "ServiceRole": service_role, + "FindingPublishingFrequency": finding_publishing_frequency, + "CreatedAt": "2023-01-01T00:00:00.000Z", + "UpdatedAt": "2023-01-01T00:00:00.000Z" + } + self._stub_bifurcator( + "get_detector", expected_params, response, error_code=error_code + ) + + def stub_create_sample_findings(self, detector_id: str, finding_types: list = None, + error_code: str = None) -> None: + """ + Stub the create_sample_findings function. + + :param detector_id: The detector ID. + :param finding_types: Optional list of finding types. + :param error_code: Simulated error code to raise. + """ + expected_params = { + "DetectorId": detector_id + } + if finding_types: + expected_params["FindingTypes"] = finding_types + + response = {} + self._stub_bifurcator( + "create_sample_findings", expected_params, response, error_code=error_code + ) + + def stub_list_findings(self, detector_id: str, finding_ids: list, + max_results: int = 50, error_code: str = None) -> None: + """ + Stub the list_findings function. + + :param detector_id: The detector ID. + :param finding_ids: List of finding IDs to return. + :param max_results: Maximum number of results. + :param error_code: Simulated error code to raise. + """ + expected_params = { + "DetectorId": detector_id, + "MaxResults": max_results + } + response = { + "FindingIds": finding_ids + } + self._stub_bifurcator( + "list_findings", expected_params, response, error_code=error_code + ) + + def stub_get_findings(self, detector_id: str, finding_ids: list, + findings: list, error_code: str = None) -> None: + """ + Stub the get_findings function. + + :param detector_id: The detector ID. + :param finding_ids: List of finding IDs to retrieve. + :param findings: List of finding details to return. + :param error_code: Simulated error code to raise. + """ + expected_params = { + "DetectorId": detector_id, + "FindingIds": finding_ids + } + response = { + "Findings": findings + } + self._stub_bifurcator( + "get_findings", expected_params, response, error_code=error_code + ) + + def stub_delete_detector(self, detector_id: str, error_code: str = None) -> None: + """ + Stub the delete_detector function. + + :param detector_id: The detector ID to delete. + :param error_code: Simulated error code to raise. + """ + expected_params = { + "DetectorId": detector_id + } + response = {} + self._stub_bifurcator( + "delete_detector", expected_params, response, error_code=error_code + ) \ No newline at end of file diff --git a/python/test_tools/stubber_factory.py b/python/test_tools/stubber_factory.py index d43bd83e24a..e6772dd2879 100644 --- a/python/test_tools/stubber_factory.py +++ b/python/test_tools/stubber_factory.py @@ -35,6 +35,7 @@ from test_tools.eventbridge_stubber import EventBridgeStubber from test_tools.glacier_stubber import GlacierStubber from test_tools.glue_stubber import GlueStubber +from test_tools.guardduty_stubber import GuardDutyStubber from test_tools.iam_stubber import IamStubber from test_tools.iot_sitewise_stubber import IoTSitewiseStubber from test_tools.healthlake_stubber import HealthLakeStubber