diff --git a/.doc_gen/metadata/inspector_metadata.yaml b/.doc_gen/metadata/inspector_metadata.yaml new file mode 100644 index 00000000000..7d4c8c00c45 --- /dev/null +++ b/.doc_gen/metadata/inspector_metadata.yaml @@ -0,0 +1,146 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +# snippet-start:[inspector.yaml] +# Inspector code examples for the AWS SDK for Python (Boto3). +inspector_Hello: + title: Hello &Inspector; + title_abbrev: Hello &Inspector; + synopsis: get started using &Inspector;. + category: Hello + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/inspector + excerpts: + - description: + snippet_tags: + - python.example_code.inspector.Hello + services: + inspector2: {BatchGetAccountStatus} +inspector_Enable: + title: Enable &Inspector; scanning + title_abbrev: Enable scanning + synopsis: enable &Inspector; scanning for your account. + category: Basics + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/inspector + excerpts: + - description: + snippet_tags: + - python.example_code.inspector.InspectorWrapper.class + - python.example_code.inspector.InspectorWrapper.decl + - python.example_code.inspector.Enable + services: + inspector2: {Enable} +inspector_BatchGetAccountStatus: + title: Get &Inspector; account status + title_abbrev: Get account status + synopsis: get the status of &Inspector; for your account. + category: Basics + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/inspector + excerpts: + - description: + snippet_tags: + - python.example_code.inspector.InspectorWrapper.class + - python.example_code.inspector.InspectorWrapper.decl + - python.example_code.inspector.BatchGetAccountStatus + services: + inspector2: {BatchGetAccountStatus} +inspector_ListFindings: + title: List &Inspector; findings + title_abbrev: List findings + synopsis: list security findings from &Inspector;. + category: Basics + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/inspector + excerpts: + - description: + snippet_tags: + - python.example_code.inspector.InspectorWrapper.class + - python.example_code.inspector.InspectorWrapper.decl + - python.example_code.inspector.ListFindings + services: + inspector2: {ListFindings} +inspector_BatchGetFindingDetails: + title: Get &Inspector; finding details + title_abbrev: Get finding details + synopsis: get detailed information for specific &Inspector; findings. + category: Basics + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/inspector + excerpts: + - description: + snippet_tags: + - python.example_code.inspector.InspectorWrapper.class + - python.example_code.inspector.InspectorWrapper.decl + - python.example_code.inspector.BatchGetFindingDetails + services: + inspector2: {BatchGetFindingDetails} +inspector_ListCoverage: + title: List &Inspector; coverage + title_abbrev: List coverage + synopsis: list coverage statistics for resources scanned by &Inspector;. + category: Basics + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/inspector + excerpts: + - description: + snippet_tags: + - python.example_code.inspector.InspectorWrapper.class + - python.example_code.inspector.InspectorWrapper.decl + - python.example_code.inspector.ListCoverage + services: + inspector2: {ListCoverage} +inspector_Disable: + title: Disable &Inspector; scanning + title_abbrev: Disable scanning + synopsis: disable &Inspector; scanning for your account. + category: Basics + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/inspector + excerpts: + - description: + snippet_tags: + - python.example_code.inspector.InspectorWrapper.class + - python.example_code.inspector.InspectorWrapper.decl + - python.example_code.inspector.Disable + services: + inspector2: {Disable} +inspector_Scenario: + title: Learn the basics of &Inspector; + title_abbrev: Learn the basics + synopsis: learn the basics of &Inspector;. + category: Scenarios + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/inspector + excerpts: + - description: Run an interactive scenario demonstrating &Inspector; features. + snippet_tags: + - python.example_code.inspector.InspectorScenario + services: + inspector2: {Enable, BatchGetAccountStatus, ListFindings, BatchGetFindingDetails, ListCoverage, Disable} +# snippet-end:[inspector.yaml] \ No newline at end of file diff --git a/python/example_code/inspector/README.md b/python/example_code/inspector/README.md new file mode 100644 index 00000000000..e2456042bb3 --- /dev/null +++ b/python/example_code/inspector/README.md @@ -0,0 +1,125 @@ +# Amazon Inspector code examples for the SDK for Python + +## Overview + +Shows how to use the AWS SDK for Python (Boto3) to work with Amazon Inspector. + + + + +_Amazon Inspector is a vulnerability management service that continuously scans AWS workloads for software vulnerabilities and unintended network exposure._ + +## ⚠ Important + +* Running this code might result in charges to your AWS account. For more details, see [AWS Pricing](https://aws.amazon.com/pricing/) and [Free Tier](https://aws.amazon.com/free/). +* Running the tests might result in charges to your AWS account. +* We recommend that you grant your code least privilege. At most, grant only the minimum permissions required to perform the task. For more information, see [Grant least privilege](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege). +* This code is not tested in every AWS Region. For more information, see [AWS Regional Services](https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services). + + + + +## Code examples + +### Prerequisites + +For prerequisites, see the [README](../../README.md#Prerequisites) in the `python` folder. + +Install the packages required by these examples by running the following in a virtual environment: + +``` +python -m pip install -r requirements.txt +``` + + + + +### Get started + +- [Hello Amazon Inspector](inspector_hello.py#L15) (`BatchGetAccountStatus`) + + +### Single actions + +Code excerpts that show you how to call individual service functions. + +- [BatchGetAccountStatus](inspector_wrapper.py#L35) +- [BatchGetFindingDetails](inspector_wrapper.py#L125) +- [Disable](inspector_wrapper.py#L175) +- [Enable](inspector_wrapper.py#L25) +- [ListCoverage](inspector_wrapper.py#L145) +- [ListFindings](inspector_wrapper.py#L75) + +### Scenarios + +Code examples that show you how to accomplish a specific task by calling multiple +functions within the same service. + +- [Learn the basics of Amazon Inspector](scenario_inspector_basics.py) + + + + + +## Run the examples + +### Instructions + + + + + +#### Hello Amazon Inspector + +This example shows you how to get started using Amazon Inspector. + +``` +python inspector_hello.py +``` + + +#### Learn the basics of Amazon Inspector + +This example shows you how to learn the basics of Amazon Inspector. + + + + + +Start the example by running the following at a command prompt: + +``` +python scenario_inspector_basics.py +``` + + + + + +### Tests + +⚠ Running tests might result in charges to your AWS account. + + +To find instructions for running these tests, see the [README](../../README.md#Tests) +in the `python` folder. + + + + + + +## Additional resources + +- [Amazon Inspector User Guide](https://docs.aws.amazon.com/inspector/latest/user/) +- [Amazon Inspector API Reference](https://docs.aws.amazon.com/inspector/v2/APIReference/) +- [SDK for Python Amazon Inspector reference](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/inspector2.html) + + + + +--- + +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 \ No newline at end of file diff --git a/python/example_code/inspector/inspector_hello.py b/python/example_code/inspector/inspector_hello.py new file mode 100644 index 00000000000..b092dece5e7 --- /dev/null +++ b/python/example_code/inspector/inspector_hello.py @@ -0,0 +1,71 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Purpose + +Shows how to get started with Amazon Inspector by checking the current account status. +""" + +import logging +import boto3 +from botocore.exceptions import ClientError + +from inspector_wrapper import InspectorWrapper + +# snippet-start:[python.example_code.inspector.Hello] +logger = logging.getLogger(__name__) + + +def hello_inspector(inspector_wrapper: InspectorWrapper): + """ + Use the AWS SDK for Python (Boto3) to check the current account status for Amazon Inspector. + This function is intended to get you started with Amazon Inspector. + + :param inspector_wrapper: An InspectorWrapper object that wraps Inspector actions. + """ + print("Hello, Amazon Inspector! Let's check your account status.") + try: + # Get the current account status + response = inspector_wrapper.get_account_status() + + if "accounts" in response and response["accounts"]: + account = response["accounts"][0] + account_id = account.get("accountId", "Unknown") + + print(f"\nAccount ID: {account_id}") + + # Display overall status + if "state" in account: + status = account["state"].get("status", "Unknown") + print(f"Inspector Status: {status}") + + # Display resource-specific status + if "resourceState" in account: + resource_state = account["resourceState"] + print("\nResource Scanning Status:") + + for resource_type, state in resource_state.items(): + resource_status = state.get("status", "Unknown") + print(f" {resource_type.upper()}: {resource_status}") + + print( + "\nAmazon Inspector is ready to help you identify security vulnerabilities!" + ) + + else: + print("No account information available.") + + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "AccessDeniedException": + print( + "Access denied. Please ensure you have the necessary permissions to use Amazon Inspector." + ) + else: + print(f"Error checking Inspector status: {e}") + + +if __name__ == "__main__": + hello_inspector(InspectorWrapper.from_client()) +# snippet-end:[python.example_code.inspector.Hello] diff --git a/python/example_code/inspector/inspector_wrapper.py b/python/example_code/inspector/inspector_wrapper.py new file mode 100644 index 00000000000..1a5efb16f32 --- /dev/null +++ b/python/example_code/inspector/inspector_wrapper.py @@ -0,0 +1,251 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Purpose + +Shows how to use the AWS SDK for Python (Boto3) with Amazon Inspector +to manage vulnerability assessments and findings. +""" + +import logging +import boto3 +from botocore.exceptions import ClientError +from typing import Dict, List, Optional, Any + +logger = logging.getLogger(__name__) + + +# snippet-start:[python.example_code.inspector.InspectorWrapper.class] +# snippet-start:[python.example_code.inspector.InspectorWrapper.decl] +class InspectorWrapper: + """Encapsulates Amazon Inspector functionality.""" + + def __init__(self, inspector_client: boto3.client): + """ + :param inspector_client: A Boto3 Amazon Inspector client. + """ + self.inspector_client = inspector_client + + @classmethod + def from_client(cls): + inspector_client = boto3.client("inspector2") + return cls(inspector_client) + + # snippet-end:[python.example_code.inspector.InspectorWrapper.decl] + + # snippet-start:[python.example_code.inspector.Enable] + def enable_inspector( + self, + account_ids: Optional[List[str]] = None, + resource_types: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """ + Enable Amazon Inspector for the specified accounts and resource types. + + :param account_ids: List of account IDs to enable Inspector for. If None, enables for current account. + :param resource_types: List of resource types to enable scanning for (EC2, ECR, LAMBDA). + :return: Response from the Enable operation. + """ + try: + params = {} + if account_ids is not None: + params["accountIds"] = account_ids + if resource_types is not None: + params["resourceTypes"] = resource_types + + response = self.inspector_client.enable(**params) + logger.info("Successfully enabled Inspector") + return response + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "ValidationException": + logger.error("Validation error when enabling Inspector: %s", e) + elif error_code == "AccessDeniedException": + logger.error("Access denied when enabling Inspector: %s", e) + else: + logger.error("Error enabling Inspector: %s", e) + raise + + # snippet-end:[python.example_code.inspector.Enable] + + # snippet-start:[python.example_code.inspector.BatchGetAccountStatus] + def get_account_status( + self, account_ids: Optional[List[str]] = None + ) -> Dict[str, Any]: + """ + Get the status of Amazon Inspector for the specified accounts. + + :param account_ids: List of account IDs to get status for. If None, gets status for current account. + :return: Response containing account status information. + """ + try: + params = {} + if account_ids is not None: + params["accountIds"] = account_ids + + response = self.inspector_client.batch_get_account_status(**params) + logger.info("Successfully retrieved account status") + return response + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "ValidationException": + logger.error("Validation error when getting account status: %s", e) + elif error_code == "AccessDeniedException": + logger.error("Access denied when getting account status: %s", e) + else: + logger.error("Error getting account status: %s", e) + raise + + # snippet-end:[python.example_code.inspector.BatchGetAccountStatus] + + # snippet-start:[python.example_code.inspector.ListFindings] + def list_findings( + self, + filter_criteria: Optional[Dict[str, Any]] = None, + max_results: Optional[int] = None, + next_token: Optional[str] = None, + sort_criteria: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """ + List security findings from Amazon Inspector. + + :param filter_criteria: Criteria to filter findings. + :param max_results: Maximum number of results to return. + :param next_token: Token for pagination. + :param sort_criteria: Criteria to sort findings. + :return: Response containing findings. + """ + try: + params = {} + if filter_criteria is not None: + params["filterCriteria"] = filter_criteria + if max_results is not None: + params["maxResults"] = max_results + if next_token is not None: + params["nextToken"] = next_token + if sort_criteria is not None: + params["sortCriteria"] = sort_criteria + + response = self.inspector_client.list_findings(**params) + logger.info( + "Successfully listed %d findings", len(response.get("findings", [])) + ) + return response + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "ValidationException": + logger.error("Validation error when listing findings: %s", e) + elif error_code == "InternalServerException": + logger.error("Internal server error when listing findings: %s", e) + else: + logger.error("Error listing findings: %s", e) + raise + + # snippet-end:[python.example_code.inspector.ListFindings] + + # snippet-start:[python.example_code.inspector.BatchGetFindingDetails] + def get_finding_details(self, finding_arns: List[str]) -> Dict[str, Any]: + """ + Get detailed information for specific findings. + + :param finding_arns: List of finding ARNs to get details for. + :return: Response containing finding details. + """ + try: + response = self.inspector_client.batch_get_finding_details( + findingArns=finding_arns + ) + logger.info( + "Successfully retrieved details for %d findings", len(finding_arns) + ) + return response + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "ValidationException": + logger.error("Validation error when getting finding details: %s", e) + elif error_code == "AccessDeniedException": + logger.error("Access denied when getting finding details: %s", e) + else: + logger.error("Error getting finding details: %s", e) + raise + + # snippet-end:[python.example_code.inspector.BatchGetFindingDetails] + + # snippet-start:[python.example_code.inspector.ListCoverage] + def list_coverage( + self, + filter_criteria: Optional[Dict[str, Any]] = None, + max_results: Optional[int] = None, + next_token: Optional[str] = None, + ) -> Dict[str, Any]: + """ + List coverage statistics for resources scanned by Amazon Inspector. + + :param filter_criteria: Criteria to filter coverage results. + :param max_results: Maximum number of results to return. + :param next_token: Token for pagination. + :return: Response containing coverage information. + """ + try: + params = {} + if filter_criteria is not None: + params["filterCriteria"] = filter_criteria + if max_results is not None: + params["maxResults"] = max_results + if next_token is not None: + params["nextToken"] = next_token + + response = self.inspector_client.list_coverage(**params) + logger.info( + "Successfully listed coverage for %d resources", + len(response.get("coveredResources", [])), + ) + return response + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "ValidationException": + logger.error("Validation error when listing coverage: %s", e) + else: + logger.error("Error listing coverage: %s", e) + raise + + # snippet-end:[python.example_code.inspector.ListCoverage] + + # snippet-start:[python.example_code.inspector.Disable] + def disable_inspector( + self, + account_ids: Optional[List[str]] = None, + resource_types: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """ + Disable Amazon Inspector for the specified accounts and resource types. + + :param account_ids: List of account IDs to disable Inspector for. If None, disables for current account. + :param resource_types: List of resource types to disable scanning for (EC2, ECR, LAMBDA). + :return: Response from the Disable operation. + """ + try: + params = {} + if account_ids is not None: + params["accountIds"] = account_ids + if resource_types is not None: + params["resourceTypes"] = resource_types + + response = self.inspector_client.disable(**params) + logger.info("Successfully disabled Inspector") + return response + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "ValidationException": + logger.error("Validation error when disabling Inspector: %s", e) + elif error_code == "ConflictException": + logger.error("Conflict error when disabling Inspector: %s", e) + else: + logger.error("Error disabling Inspector: %s", e) + raise + + # snippet-end:[python.example_code.inspector.Disable] + + +# snippet-end:[python.example_code.inspector.InspectorWrapper.class] diff --git a/python/example_code/inspector/requirements.txt b/python/example_code/inspector/requirements.txt new file mode 100644 index 00000000000..9137f2a9ed8 --- /dev/null +++ b/python/example_code/inspector/requirements.txt @@ -0,0 +1,2 @@ +boto3>=1.26.137 +botocore>=1.29.137 \ No newline at end of file diff --git a/python/example_code/inspector/scenario_inspector_basics.py b/python/example_code/inspector/scenario_inspector_basics.py new file mode 100644 index 00000000000..8387b43f364 --- /dev/null +++ b/python/example_code/inspector/scenario_inspector_basics.py @@ -0,0 +1,497 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Purpose + +Shows how to use Amazon Inspector to manage vulnerability assessments and findings. +This scenario demonstrates: +1. Enabling Amazon Inspector for your account +2. Checking account status and enabled scan types +3. Listing coverage statistics for resources +4. Managing security findings and vulnerability analysis +5. Optionally disabling Inspector scanning + +Amazon Inspector is a vulnerability management service that continuously scans +AWS workloads for software vulnerabilities and unintended network exposure. +""" + +import logging +import os +import sys +from typing import Optional + +import boto3 +from botocore.exceptions import ClientError + +from inspector_wrapper import InspectorWrapper + +# Add relative path to include demo_tools +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) +from demo_tools import question as q + +logger = logging.getLogger(__name__) + + +# snippet-start:[python.example_code.inspector.InspectorScenario] +class InspectorScenario: + """Runs an interactive scenario that shows how to use Amazon Inspector.""" + + def __init__(self, inspector_wrapper: InspectorWrapper): + """ + :param inspector_wrapper: An instance of the InspectorWrapper class. + """ + self.inspector_wrapper = inspector_wrapper + + def run_scenario(self): + """Runs the Amazon Inspector basics scenario.""" + print("-" * 88) + print("Welcome to the Amazon Inspector basics scenario!") + print("-" * 88) + + print( + "Amazon Inspector is a vulnerability management service that continuously scans\n" + "AWS workloads for software vulnerabilities and unintended network exposure.\n" + "This scenario will walk you through the basic operations of Amazon Inspector.\n" + ) + + try: + self._setup_phase() + self._coverage_assessment_phase() + self._findings_management_phase() + self._vulnerability_analysis_phase() + except Exception as e: + logger.error(f"Scenario failed: {e}") + finally: + self._cleanup_phase() + + def _setup_phase(self): + """Setup phase: Enable Inspector and verify activation.""" + print("-" * 88) + print("Setup Phase: Enabling Amazon Inspector") + print("-" * 88) + + # Check current account status first + print("Checking current Amazon Inspector status...") + try: + status_response = self.inspector_wrapper.get_account_status() + if status_response.get("accounts"): + account = status_response["accounts"][0] + current_status = account.get("state", {}).get("status", "UNKNOWN") + print(f"Current Inspector status: {current_status}") + + if current_status == "ENABLED": + print("Amazon Inspector is already enabled for your account.") + self._display_account_status(account) + return + except ClientError as e: + print(f"Could not check current status: {e}") + + # Enable Inspector if not already enabled + enable_inspector = q.ask( + "Would you like to enable Amazon Inspector for your account? (y/n): ", + q.is_yesno, + ) + + if enable_inspector: + print("Enabling Amazon Inspector...") + try: + # Enable for all resource types by default + resource_types = ["EC2", "ECR", "LAMBDA"] + response = self.inspector_wrapper.enable_inspector( + resource_types=resource_types + ) + + print("Amazon Inspector has been enabled successfully!") + + # Display the results + if "accounts" in response: + for account in response["accounts"]: + account_id = account.get("accountId", "Unknown") + status = account.get("status", "Unknown") + print(f"Account {account_id}: {status}") + + if "resourceStatus" in account: + print("Resource scanning status:") + for resource_type, resource_status in account[ + "resourceStatus" + ].items(): + print(f" {resource_type}: {resource_status}") + + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "ValidationException": + print( + "Validation error: Please check your account permissions and resource types." + ) + elif error_code == "AccessDeniedException": + print( + "Access denied: You don't have permission to enable Amazon Inspector." + ) + else: + print(f"Error enabling Inspector: {e}") + raise + else: + print( + "Skipping Inspector enablement. Some operations may not work without enabling Inspector." + ) + + # Verify Inspector is activated + print("\nVerifying Amazon Inspector activation...") + try: + status_response = self.inspector_wrapper.get_account_status() + if status_response.get("accounts"): + account = status_response["accounts"][0] + self._display_account_status(account) + except ClientError as e: + print(f"Error verifying Inspector status: {e}") + + def _coverage_assessment_phase(self): + """Coverage assessment phase: List coverage statistics.""" + print("-" * 88) + print("Coverage Assessment Phase") + print("-" * 88) + + print("Listing coverage statistics for your AWS resources...") + + try: + # List coverage for all resource types + coverage_response = self.inspector_wrapper.list_coverage(max_results=10) + + covered_resources = coverage_response.get("coveredResources", []) + if covered_resources: + print(f"Found {len(covered_resources)} covered resources:") + + # Group resources by type + resource_types = {} + for resource in covered_resources: + resource_type = resource.get("resourceType", "Unknown") + if resource_type not in resource_types: + resource_types[resource_type] = [] + resource_types[resource_type].append(resource) + + # Display coverage by resource type + for resource_type, resources in resource_types.items(): + print(f"\n{resource_type} Resources ({len(resources)}):") + for resource in resources[ + :3 + ]: # Show first 3 resources of each type + resource_id = resource.get("resourceId", "Unknown") + scan_status = resource.get("scanStatus", {}).get( + "statusCode", "Unknown" + ) + last_scanned = resource.get("lastScannedAt", "Never") + print(f" Resource: {resource_id}") + print(f" Status: {scan_status}") + print(f" Last Scanned: {last_scanned}") + + if len(resources) > 3: + print(f" ... and {len(resources) - 3} more resources") + else: + print("No covered resources found. This might be because:") + print( + "- Inspector was recently enabled and hasn't scanned resources yet" + ) + print( + "- No supported resources (EC2, ECR, Lambda) exist in your account" + ) + print("- Resources are still being discovered") + + except ClientError as e: + print(f"Error listing coverage: {e}") + + def _findings_management_phase(self): + """Findings management phase: List and filter security findings.""" + print("-" * 88) + print("Findings Management Phase") + print("-" * 88) + + print("Listing security findings from Amazon Inspector...") + + try: + # List findings with basic filtering + findings_response = self.inspector_wrapper.list_findings(max_results=10) + + findings = findings_response.get("findings", []) + if findings: + print(f"Found {len(findings)} security findings:") + + # Group findings by severity + severity_groups = {} + for finding in findings: + severity = finding.get("severity", "UNKNOWN") + if severity not in severity_groups: + severity_groups[severity] = [] + severity_groups[severity].append(finding) + + # Display findings by severity + severity_order = [ + "CRITICAL", + "HIGH", + "MEDIUM", + "LOW", + "INFORMATIONAL", + "UNKNOWN", + ] + for severity in severity_order: + if severity in severity_groups: + findings_list = severity_groups[severity] + print(f"\n{severity} Severity Findings ({len(findings_list)}):") + + for finding in findings_list[ + :2 + ]: # Show first 2 findings per severity + title = finding.get("title", "No title") + finding_type = finding.get("type", "Unknown") + inspector_score = finding.get("inspectorScore", "N/A") + print(f" • {title}") + print(f" Type: {finding_type}") + print(f" Inspector Score: {inspector_score}") + + # Show affected resources + resources = finding.get("resources", []) + if resources: + resource = resources[0] # Show first resource + resource_id = resource.get("id", "Unknown") + resource_type = resource.get("type", "Unknown") + print( + f" Affected Resource: {resource_id} ({resource_type})" + ) + + if len(findings_list) > 2: + print( + f" ... and {len(findings_list) - 2} more {severity.lower()} findings" + ) + + # Ask if user wants to see detailed information for a finding + show_details = q.ask( + "Would you like to see detailed information for a finding? (y/n): ", + q.is_yesno, + ) + + if show_details and findings: + self._show_finding_details( + findings[0] + ) # Show details for first finding + + else: + print("No security findings found. This might be because:") + print( + "- Inspector was recently enabled and hasn't completed scanning yet" + ) + print("- No vulnerabilities were detected in your resources") + print("- Resources are still being scanned") + + except ClientError as e: + print(f"Error listing findings: {e}") + + def _vulnerability_analysis_phase(self): + """Vulnerability analysis phase: Display vulnerability details and remediation guidance.""" + print("-" * 88) + print("Vulnerability Analysis Phase") + print("-" * 88) + + print("Analyzing vulnerabilities by resource type...") + + # Filter findings by resource type + resource_types = [ + "AWS_EC2_INSTANCE", + "AWS_ECR_CONTAINER_IMAGE", + "AWS_LAMBDA_FUNCTION", + ] + + for resource_type in resource_types: + print(f"\nAnalyzing {resource_type} vulnerabilities...") + + try: + # Create filter for specific resource type + filter_criteria = { + "resourceType": [{"comparison": "EQUALS", "value": resource_type}] + } + + findings_response = self.inspector_wrapper.list_findings( + filter_criteria=filter_criteria, max_results=5 + ) + + findings = findings_response.get("findings", []) + if findings: + print(f" Found {len(findings)} findings for {resource_type}") + + for finding in findings[:2]: # Show first 2 findings + title = finding.get("title", "No title") + severity = finding.get("severity", "Unknown") + print(f" • {title} ({severity})") + + # Show vulnerability details if available + vuln_details = finding.get("packageVulnerabilityDetails") + if vuln_details: + vuln_id = vuln_details.get("vulnerabilityId", "Unknown") + source = vuln_details.get("source", "Unknown") + print(f" Vulnerability ID: {vuln_id}") + print(f" Source: {source}") + + # Show vulnerable packages + vulnerable_packages = vuln_details.get( + "vulnerablePackages", [] + ) + if vulnerable_packages: + package = vulnerable_packages[0] + package_name = package.get("name", "Unknown") + current_version = package.get("version", "Unknown") + fixed_version = package.get( + "fixedInVersion", "Not available" + ) + print( + f" Package: {package_name} (v{current_version})" + ) + print(f" Fixed in: {fixed_version}") + else: + print(f" No findings for {resource_type}") + + except ClientError as e: + print(f" Error analyzing {resource_type}: {e}") + + def _show_finding_details(self, finding): + """Show detailed information for a specific finding.""" + finding_arn = finding.get("findingArn") + if not finding_arn: + print("Cannot show details: Finding ARN not available") + return + + print(f"\nDetailed information for finding:") + print(f"ARN: {finding_arn}") + + try: + details_response = self.inspector_wrapper.get_finding_details([finding_arn]) + finding_details = details_response.get("findingDetails", []) + + if finding_details: + details = finding_details[0] + + # Show CISA data if available + cisa_data = details.get("cisaData") + if cisa_data: + print("CISA Information:") + print(f" Action: {cisa_data.get('action', 'N/A')}") + print(f" Date Added: {cisa_data.get('dateAdded', 'N/A')}") + + # Show CWE information + cwes = details.get("cwes", []) + if cwes: + print(f"CWE IDs: {', '.join(cwes)}") + + # Show EPSS score + epss_score = details.get("epssScore") + if epss_score: + print(f"EPSS Score: {epss_score}") + + # Show reference URLs + reference_urls = details.get("referenceUrls", []) + if reference_urls: + print("Reference URLs:") + for url in reference_urls[:3]: # Show first 3 URLs + print(f" • {url}") + + except ClientError as e: + print(f"Error getting finding details: {e}") + + def _display_account_status(self, account): + """Display account status information.""" + account_id = account.get("accountId", "Unknown") + print(f"\nAccount ID: {account_id}") + + # Display overall status + if "state" in account: + status = account["state"].get("status", "Unknown") + print(f"Inspector Status: {status}") + + # Display resource-specific status + if "resourceState" in account: + resource_state = account["resourceState"] + print("Resource Scanning Status:") + + for resource_type, state in resource_state.items(): + resource_status = state.get("status", "Unknown") + print(f" {resource_type.upper()}: {resource_status}") + + def _cleanup_phase(self): + """Cleanup phase: Optionally disable Inspector scanning.""" + print("-" * 88) + print("Cleanup Phase") + print("-" * 88) + + disable_inspector = q.ask( + "Would you like to disable Amazon Inspector for your account? (y/n): ", + q.is_yesno, + ) + + if disable_inspector: + print( + "WARNING: Disabling Amazon Inspector will stop vulnerability scanning for your resources." + ) + confirm_disable = q.ask( + "Are you sure you want to disable Inspector? (y/n): ", q.is_yesno + ) + + if confirm_disable: + try: + # Disable all resource types + resource_types = ["EC2", "ECR", "LAMBDA"] + response = self.inspector_wrapper.disable_inspector( + resource_types=resource_types + ) + + print("Amazon Inspector has been disabled.") + + # Display the results + if "accounts" in response: + for account in response["accounts"]: + account_id = account.get("accountId", "Unknown") + status = account.get("status", "Unknown") + print(f"Account {account_id}: {status}") + + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "ConflictException": + print( + "Cannot disable Inspector: There may be active scans or other conflicts." + ) + else: + print(f"Error disabling Inspector: {e}") + else: + print("Inspector remains enabled.") + else: + print("Inspector remains enabled.") + + # Show final status + print("\nFinal Amazon Inspector status:") + try: + status_response = self.inspector_wrapper.get_account_status() + if status_response.get("accounts"): + account = status_response["accounts"][0] + self._display_account_status(account) + except ClientError as e: + print(f"Error getting final status: {e}") + + print("-" * 88) + print("Thanks for using the Amazon Inspector basics scenario!") + print("-" * 88) + + +# snippet-end:[python.example_code.inspector.InspectorScenario] + + +def main(): + """Runs the Amazon Inspector basics scenario.""" + logging.basicConfig(level=logging.WARNING, format="%(levelname)s: %(message)s") + + try: + inspector_wrapper = InspectorWrapper.from_client() + scenario = InspectorScenario(inspector_wrapper) + scenario.run_scenario() + except Exception as e: + logger.error(f"Failed to run scenario: {e}") + + +if __name__ == "__main__": + main() diff --git a/python/example_code/inspector/test/conftest.py b/python/example_code/inspector/test/conftest.py new file mode 100644 index 00000000000..19cab7332bd --- /dev/null +++ b/python/example_code/inspector/test/conftest.py @@ -0,0 +1,28 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import sys +import boto3 +import pytest + +sys.path.append("../..") +from test_tools.fixtures.common import * + + +class ScenarioData: + def __init__(self, inspector_client, inspector_stubber): + self.inspector_client = inspector_client + self.inspector_stubber = inspector_stubber + # Import here to avoid circular imports + from inspector_wrapper import InspectorWrapper + import scenario_inspector_basics + + self.wrapper = InspectorWrapper(inspector_client) + self.scenario = scenario_inspector_basics.InspectorScenario(self.wrapper) + + +@pytest.fixture +def scenario_data(make_stubber): + inspector_client = boto3.client("inspector2") + inspector_stubber = make_stubber(inspector_client) + return ScenarioData(inspector_client, inspector_stubber) diff --git a/python/example_code/inspector/test/test_inspector_integration.py b/python/example_code/inspector/test/test_inspector_integration.py new file mode 100644 index 00000000000..197958bd060 --- /dev/null +++ b/python/example_code/inspector/test/test_inspector_integration.py @@ -0,0 +1,100 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Integration tests for inspector_wrapper.py functions. +""" + +import pytest +import sys +import boto3 +from botocore.exceptions import ClientError + +sys.path.append("..") +from inspector_wrapper import InspectorWrapper + + +@pytest.mark.integ +def test_get_account_status_integration(): + """Test getting account status against real AWS service.""" + inspector_wrapper = InspectorWrapper.from_client() + + try: + response = inspector_wrapper.get_account_status() + assert "accounts" in response + # The response should contain at least one account (the current account) + assert len(response["accounts"]) >= 1 + + account = response["accounts"][0] + assert "accountId" in account + assert "state" in account or "resourceState" in account + + except ClientError as e: + # Access denied is acceptable for integration tests + if e.response["Error"]["Code"] != "AccessDeniedException": + raise + + +@pytest.mark.integ +def test_list_findings_integration(): + """Test listing findings against real AWS service.""" + inspector_wrapper = InspectorWrapper.from_client() + + try: + response = inspector_wrapper.list_findings(max_results=5) + assert "findings" in response + + # If there are findings, validate their structure + for finding in response.get("findings", []): + assert "findingArn" in finding + assert "awsAccountId" in finding + assert "type" in finding + + except ClientError as e: + # Access denied or validation errors are acceptable for integration tests + if e.response["Error"]["Code"] not in [ + "AccessDeniedException", + "ValidationException", + ]: + raise + + +@pytest.mark.integ +def test_list_coverage_integration(): + """Test listing coverage against real AWS service.""" + inspector_wrapper = InspectorWrapper.from_client() + + try: + response = inspector_wrapper.list_coverage(max_results=5) + assert "coveredResources" in response + + # If there are covered resources, validate their structure + for resource in response.get("coveredResources", []): + assert "accountId" in resource + assert "resourceId" in resource + assert "resourceType" in resource + + except ClientError as e: + # Access denied or validation errors are acceptable for integration tests + if e.response["Error"]["Code"] not in [ + "AccessDeniedException", + "ValidationException", + ]: + raise + + +@pytest.mark.integ +def test_hello_inspector_integration(): + """Test the hello Inspector function against real AWS service.""" + from inspector_hello import hello_inspector + + inspector_wrapper = InspectorWrapper.from_client() + + # This should not raise an exception, even if Inspector is not enabled + # The function should handle errors gracefully + try: + hello_inspector(inspector_wrapper) + except Exception as e: + # Only fail if it's an unexpected error + if "AccessDeniedException" not in str(e): + raise diff --git a/python/example_code/inspector/test/test_inspector_wrapper.py b/python/example_code/inspector/test/test_inspector_wrapper.py new file mode 100644 index 00000000000..e3b2420b579 --- /dev/null +++ b/python/example_code/inspector/test/test_inspector_wrapper.py @@ -0,0 +1,184 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Unit tests for inspector_wrapper.py functions. +""" + +import pytest +import sys +from botocore.exceptions import ClientError + +sys.path.append("..") +from inspector_wrapper import InspectorWrapper + + +@pytest.mark.parametrize( + "error_code", [None, "ValidationException", "AccessDeniedException"] +) +def test_enable_inspector(scenario_data, error_code): + """Test enabling Inspector with various parameters and error conditions.""" + account_ids = ["123456789012"] if error_code != "ValidationException" else None + resource_types = ["EC2", "ECR", "LAMBDA"] + + scenario_data.inspector_stubber.stub_enable( + account_ids=account_ids, resource_types=resource_types, error_code=error_code + ) + + if error_code is None: + response = scenario_data.wrapper.enable_inspector( + account_ids=account_ids, resource_types=resource_types + ) + assert "accounts" in response + assert len(response["accounts"]) > 0 + assert response["accounts"][0]["status"] == "ENABLED" + else: + with pytest.raises(ClientError) as exc_info: + scenario_data.wrapper.enable_inspector( + account_ids=account_ids, resource_types=resource_types + ) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize( + "error_code", [None, "ValidationException", "AccessDeniedException"] +) +def test_get_account_status(scenario_data, error_code): + """Test getting account status with various parameters and error conditions.""" + account_ids = ["123456789012"] if error_code != "ValidationException" else None + + scenario_data.inspector_stubber.stub_batch_get_account_status( + account_ids=account_ids, error_code=error_code + ) + + if error_code is None: + response = scenario_data.wrapper.get_account_status(account_ids=account_ids) + assert "accounts" in response + assert len(response["accounts"]) > 0 + assert "state" in response["accounts"][0] + else: + with pytest.raises(ClientError) as exc_info: + scenario_data.wrapper.get_account_status(account_ids=account_ids) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize( + "error_code", [None, "ValidationException", "InternalServerException"] +) +def test_list_findings(scenario_data, error_code): + """Test listing findings with various parameters and error conditions.""" + filter_criteria = ( + {"severity": [{"comparison": "EQUALS", "value": "CRITICAL"}]} + if error_code != "ValidationException" + else None + ) + max_results = 10 + sort_criteria = {"field": "SEVERITY", "sortOrder": "DESC"} + + scenario_data.inspector_stubber.stub_list_findings( + filter_criteria=filter_criteria, + max_results=max_results, + sort_criteria=sort_criteria, + error_code=error_code, + ) + + if error_code is None: + response = scenario_data.wrapper.list_findings( + filter_criteria=filter_criteria, + max_results=max_results, + sort_criteria=sort_criteria, + ) + assert "findings" in response + if response["findings"]: + finding = response["findings"][0] + assert "findingArn" in finding + assert "severity" in finding + else: + with pytest.raises(ClientError) as exc_info: + scenario_data.wrapper.list_findings( + filter_criteria=filter_criteria, + max_results=max_results, + sort_criteria=sort_criteria, + ) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize( + "error_code", [None, "ValidationException", "AccessDeniedException"] +) +def test_get_finding_details(scenario_data, error_code): + """Test getting finding details with various parameters and error conditions.""" + finding_arns = ["arn:aws:inspector2:us-east-1:123456789012:finding/finding-1"] + + scenario_data.inspector_stubber.stub_batch_get_finding_details( + finding_arns=finding_arns, error_code=error_code + ) + + if error_code is None: + response = scenario_data.wrapper.get_finding_details(finding_arns=finding_arns) + assert "findingDetails" in response + if response["findingDetails"]: + details = response["findingDetails"][0] + assert "findingArn" in details + else: + with pytest.raises(ClientError) as exc_info: + scenario_data.wrapper.get_finding_details(finding_arns=finding_arns) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "ValidationException"]) +def test_list_coverage(scenario_data, error_code): + """Test listing coverage with various parameters and error conditions.""" + filter_criteria = ( + {"resourceType": [{"comparison": "EQUALS", "value": "AWS_EC2_INSTANCE"}]} + if error_code != "ValidationException" + else None + ) + max_results = 10 + + scenario_data.inspector_stubber.stub_list_coverage( + filter_criteria=filter_criteria, max_results=max_results, error_code=error_code + ) + + if error_code is None: + response = scenario_data.wrapper.list_coverage( + filter_criteria=filter_criteria, max_results=max_results + ) + assert "coveredResources" in response + if response["coveredResources"]: + resource = response["coveredResources"][0] + assert "resourceId" in resource + assert "resourceType" in resource + else: + with pytest.raises(ClientError) as exc_info: + scenario_data.wrapper.list_coverage( + filter_criteria=filter_criteria, max_results=max_results + ) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize( + "error_code", [None, "ValidationException", "ConflictException"] +) +def test_disable_inspector(scenario_data, error_code): + """Test disabling Inspector with various parameters and error conditions.""" + account_ids = ["123456789012"] if error_code != "ValidationException" else None + resource_types = ["EC2", "ECR", "LAMBDA"] + + scenario_data.inspector_stubber.stub_disable( + account_ids=account_ids, resource_types=resource_types, error_code=error_code + ) + + if error_code is None: + response = scenario_data.wrapper.disable_inspector( + account_ids=account_ids, resource_types=resource_types + ) + assert "accounts" in response + assert len(response["accounts"]) > 0 + assert response["accounts"][0]["status"] == "DISABLED" + else: + with pytest.raises(ClientError) as exc_info: + scenario_data.wrapper.disable_inspector( + account_ids=account_ids, resource_types=resource_types + ) + assert exc_info.value.response["Error"]["Code"] == error_code diff --git a/python/test_tools/inspector_stubber.py b/python/test_tools/inspector_stubber.py new file mode 100644 index 00000000000..7c0178d2923 --- /dev/null +++ b/python/test_tools/inspector_stubber.py @@ -0,0 +1,362 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Stub functions that are used by the AWS Inspector unit tests. + +When tests are run against an actual AWS account, the stubber class does not +set up stubs and passes all calls through to the Boto 3 client. +""" + +from botocore.stub import ANY +from boto3 import client + +from test_tools.example_stubber import ExampleStubber + + +class InspectorStubber(ExampleStubber): + """ + A class that implements stub functions used by AWS Inspector unit tests. + + The stubbed functions expect certain parameters to be passed to them as + part of the tests, and will raise errors when the actual parameters differ from + the expected. + """ + + def __init__(self, inspector_client: client, use_stubs=True) -> None: + """ + Initializes the object with a specific client and configures it for + stubbing or AWS passthrough. + + :param inspector_client: A Boto 3 AWS Inspector client. + :param use_stubs: When True, use stubs to intercept requests. Otherwise, + pass requests through to AWS. + """ + super().__init__(inspector_client, use_stubs) + + def stub_enable(self, account_ids=None, resource_types=None, error_code=None): + """ + Stub the Enable operation. + + :param account_ids: List of account IDs to enable Inspector for. + :param resource_types: List of resource types to enable scanning for. + :param error_code: Error code to simulate an error response. + """ + expected_params = {} + if account_ids is not None: + expected_params["accountIds"] = account_ids + if resource_types is not None: + expected_params["resourceTypes"] = resource_types + + response = { + "accounts": [ + { + "accountId": "123456789012", + "status": "ENABLED", + "resourceStatus": { + "ec2": "ENABLED", + "ecr": "ENABLED", + "lambda": "ENABLED" + } + } + ] + } + self._stub_bifurcator( + "enable", expected_params, response, error_code=error_code + ) + + def stub_batch_get_account_status(self, account_ids=None, error_code=None): + """ + Stub the BatchGetAccountStatus operation. + + :param account_ids: List of account IDs to get status for. + :param error_code: Error code to simulate an error response. + """ + expected_params = {} + if account_ids is not None: + expected_params["accountIds"] = account_ids + + response = { + "accounts": [ + { + "accountId": "123456789012", + "state": { + "status": "ENABLED", + "errorCode": "ALREADY_ENABLED", + "errorMessage": "Inspector is already enabled for this account" + }, + "resourceState": { + "ec2": { + "status": "ENABLED", + "errorCode": "ALREADY_ENABLED", + "errorMessage": "EC2 scanning is already enabled" + }, + "ecr": { + "status": "ENABLED", + "errorCode": "ALREADY_ENABLED", + "errorMessage": "ECR scanning is already enabled" + }, + "lambda": { + "status": "ENABLED", + "errorCode": "ALREADY_ENABLED", + "errorMessage": "Lambda scanning is already enabled" + } + } + } + ] + } + self._stub_bifurcator( + "batch_get_account_status", expected_params, response, error_code=error_code + ) + + def stub_list_findings(self, filter_criteria=None, max_results=None, next_token=None, sort_criteria=None, error_code=None): + """ + Stub the ListFindings operation. + + :param filter_criteria: Filter criteria for findings. + :param max_results: Maximum number of results to return. + :param next_token: Token for pagination. + :param sort_criteria: Sort criteria for findings. + :param error_code: Error code to simulate an error response. + """ + expected_params = {} + if filter_criteria is not None: + expected_params["filterCriteria"] = filter_criteria + if max_results is not None: + expected_params["maxResults"] = max_results + if next_token is not None: + expected_params["nextToken"] = next_token + if sort_criteria is not None: + expected_params["sortCriteria"] = sort_criteria + + response = { + "findings": [ + { + "findingArn": "arn:aws:inspector2:us-east-1:123456789012:finding/finding-1", + "awsAccountId": "123456789012", + "type": "PACKAGE_VULNERABILITY", + "description": "CVE-2023-1234 - Critical vulnerability in package xyz", + "severity": "CRITICAL", + "firstObservedAt": "2023-01-01T00:00:00.000Z", + "lastObservedAt": "2023-01-01T00:00:00.000Z", + "updatedAt": "2023-01-01T00:00:00.000Z", + "status": "ACTIVE", + "title": "CVE-2023-1234 found in package xyz", + "inspectorScore": 9.8, + "remediation": { + "recommendation": { + "text": "Update package xyz to version 1.0.1 or later" + } + }, + "inspectorScoreDetails": { + "adjustedCvss": { + "score": 9.8, + "scoreSource": "NVD", + "scoringVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", + "version": "3.1" + } + }, + "networkReachabilityDetails": { + "networkPath": { + "steps": [ + { + "componentId": "i-1234567890abcdef0", + "componentType": "AWS_EC2_INSTANCE" + } + ] + }, + "openPortRange": { + "begin": 80, + "end": 80 + }, + "protocol": "TCP" + }, + "packageVulnerabilityDetails": { + "cvss": [ + { + "baseScore": 9.8, + "scoringVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", + "source": "NVD", + "version": "3.1" + } + ], + "source": "NVD", + "sourceUrl": "https://nvd.nist.gov/vuln/detail/CVE-2023-1234", + "vendorCreatedAt": "2023-01-01T00:00:00.000Z", + "vendorSeverity": "CRITICAL", + "vulnerabilityId": "CVE-2023-1234", + "vulnerablePackages": [ + { + "name": "xyz", + "version": "1.0.0", + "packageManager": "OS", + "fixedInVersion": "1.0.1" + } + ] + }, + "resources": [ + { + "id": "i-1234567890abcdef0", + "type": "AWS_EC2_INSTANCE", + "region": "us-east-1", + "partition": "aws", + "details": { + "awsEc2Instance": { + "iamInstanceProfileArn": "arn:aws:iam::123456789012:instance-profile/EC2-Inspector-Role", + "imageId": "ami-12345678", + "ipV4Addresses": ["10.0.0.1"], + "ipV6Addresses": [], + "keyName": "my-key-pair", + "launchedAt": "2023-01-01T00:00:00.000Z", + "platform": "LINUX", + "subnetId": "subnet-12345678", + "type": "t3.micro", + "vpcId": "vpc-12345678" + } + } + } + ] + } + ] + } + self._stub_bifurcator( + "list_findings", expected_params, response, error_code=error_code + ) + + def stub_batch_get_finding_details(self, finding_arns, error_code=None): + """ + Stub the BatchGetFindingDetails operation. + + :param finding_arns: List of finding ARNs to get details for. + :param error_code: Error code to simulate an error response. + """ + expected_params = {"findingArns": finding_arns} + + response = { + "findingDetails": [ + { + "findingArn": "arn:aws:inspector2:us-east-1:123456789012:finding/finding-1", + "cisaData": { + "action": "Apply updates per vendor instructions", + "dateAdded": "2023-01-01T00:00:00.000Z" + }, + "cwes": ["CWE-79", "CWE-89"], + "epssScore": 0.95, + "evidences": [ + { + "evidenceDetail": "Package xyz version 1.0.0 is vulnerable", + "evidenceRule": "INSPECTOR_PACKAGE_VULNERABILITY", + "severity": "HIGH" + } + ], + "exploitObserved": { + "firstSeen": "2023-01-01T00:00:00.000Z", + "lastSeen": "2023-01-01T00:00:00.000Z" + }, + "referenceUrls": [ + "https://nvd.nist.gov/vuln/detail/CVE-2023-1234", + "https://example.com/security-advisory" + ], + "riskScore": 8, + "tools": ["INSPECTOR"], + "ttps": ["T1190", "T1203"] + } + ] + } + self._stub_bifurcator( + "batch_get_finding_details", expected_params, response, error_code=error_code + ) + + def stub_list_coverage(self, filter_criteria=None, max_results=None, next_token=None, error_code=None): + """ + Stub the ListCoverage operation. + + :param filter_criteria: Filter criteria for coverage. + :param max_results: Maximum number of results to return. + :param next_token: Token for pagination. + :param error_code: Error code to simulate an error response. + """ + expected_params = {} + if filter_criteria is not None: + expected_params["filterCriteria"] = filter_criteria + if max_results is not None: + expected_params["maxResults"] = max_results + if next_token is not None: + expected_params["nextToken"] = next_token + + response = { + "coveredResources": [ + { + "accountId": "123456789012", + "resourceId": "i-1234567890abcdef0", + "resourceType": "AWS_EC2_INSTANCE", + "scanType": "PACKAGE", + "scanStatus": { + "statusCode": "ACTIVE", + "reason": "SUCCESSFUL" + }, + "resourceMetadata": { + "ec2": { + "amiId": "ami-12345678", + "platform": "LINUX", + "tags": { + "Name": "test-instance", + "Environment": "development" + } + } + }, + "lastScannedAt": "2023-01-01T00:00:00.000Z" + }, + { + "accountId": "123456789012", + "resourceId": "123456789012.dkr.ecr.us-east-1.amazonaws.com/my-repo:latest", + "resourceType": "AWS_ECR_CONTAINER_IMAGE", + "scanType": "PACKAGE", + "scanStatus": { + "statusCode": "ACTIVE", + "reason": "SUCCESSFUL" + }, + "resourceMetadata": { + "ecrRepository": { + "name": "my-repo", + "scanFrequency": "SCAN_ON_PUSH" + } + }, + "lastScannedAt": "2023-01-01T00:00:00.000Z" + } + ] + } + self._stub_bifurcator( + "list_coverage", expected_params, response, error_code=error_code + ) + + def stub_disable(self, account_ids=None, resource_types=None, error_code=None): + """ + Stub the Disable operation. + + :param account_ids: List of account IDs to disable Inspector for. + :param resource_types: List of resource types to disable scanning for. + :param error_code: Error code to simulate an error response. + """ + expected_params = {} + if account_ids is not None: + expected_params["accountIds"] = account_ids + if resource_types is not None: + expected_params["resourceTypes"] = resource_types + + response = { + "accounts": [ + { + "accountId": "123456789012", + "status": "DISABLED", + "resourceStatus": { + "ec2": "DISABLED", + "ecr": "DISABLED", + "lambda": "DISABLED" + } + } + ] + } + self._stub_bifurcator( + "disable", expected_params, response, error_code=error_code + ) \ No newline at end of file diff --git a/python/test_tools/stubber_factory.py b/python/test_tools/stubber_factory.py index d43bd83e24a..3b5524e4cb2 100644 --- a/python/test_tools/stubber_factory.py +++ b/python/test_tools/stubber_factory.py @@ -36,6 +36,7 @@ from test_tools.glacier_stubber import GlacierStubber from test_tools.glue_stubber import GlueStubber from test_tools.iam_stubber import IamStubber +from test_tools.inspector_stubber import InspectorStubber from test_tools.iot_sitewise_stubber import IoTSitewiseStubber from test_tools.healthlake_stubber import HealthLakeStubber from test_tools.keyspaces_stubber import KeyspacesStubber