diff --git a/src/snowflake/connector/crl.py b/src/snowflake/connector/crl.py index 0b1203fc95..0d774ebe18 100644 --- a/src/snowflake/connector/crl.py +++ b/src/snowflake/connector/crl.py @@ -1,6 +1,7 @@ #!/usr/bin/env python from __future__ import annotations +from collections import defaultdict from dataclasses import dataclass from datetime import datetime, timedelta, timezone from enum import Enum, unique @@ -11,6 +12,7 @@ from cryptography import x509 from cryptography.hazmat._oid import ExtensionOID from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa from OpenSSL.SSL import Connection as SSLConnection @@ -53,15 +55,15 @@ class CRLConfig: CertRevocationCheckMode.DISABLED ) allow_certificates_without_crl_url: bool = False - connection_timeout_ms: int = 3000 - read_timeout_ms: int = 3000 + connection_timeout_ms: int = 5000 + read_timeout_ms: int = 5000 # 5s cache_validity_time: timedelta = timedelta(hours=24) enable_crl_cache: bool = True enable_crl_file_cache: bool = True crl_cache_dir: Path | str | None = None crl_cache_removal_delay_days: int = 7 crl_cache_cleanup_interval_hours: int = 1 - crl_cache_start_cleanup: bool = False + crl_cache_start_cleanup: bool = True @classmethod def from_connection(cls, sf_connection) -> CRLConfig: @@ -176,6 +178,7 @@ class CRLValidator: def __init__( self, session_manager: SessionManager | Any, + trusted_certificates: list[x509.Certificate], cert_revocation_check_mode: CertRevocationCheckMode = CRLConfig.cert_revocation_check_mode, allow_certificates_without_crl_url: bool = CRLConfig.allow_certificates_without_crl_url, connection_timeout_ms: int = CRLConfig.connection_timeout_ms, @@ -191,9 +194,22 @@ def __init__( self._cache_validity_time = cache_validity_time self._cache_manager = cache_manager or CRLCacheManager.noop() + # list of trusted CA and their certificates + self._trusted_ca: dict[x509.Name, list[x509.Certificate]] = defaultdict(list) + for cert in trusted_certificates: + self._trusted_ca[cert.subject].append(cert) + + # declaration of validate_certificate_is_not_revoked function cache + self._cache_for__validate_certificate_is_not_revoked: dict[ + x509.Certificate, CRLValidationResult + ] = {} + @classmethod def from_config( - cls, config: CRLConfig, session_manager: SessionManager + cls, + config: CRLConfig, + session_manager: SessionManager, + trusted_certificates: list[x509.Certificate], ) -> CRLValidator: """ Create a CRLValidator instance from a CRLConfig. @@ -204,6 +220,7 @@ def from_config( Args: config: CRLConfig instance containing CRL-related parameters session_manager: SessionManager instance + trusted_certificates: List of trusted CA certificates Returns: CRLValidator: Configured CRLValidator instance @@ -244,6 +261,7 @@ def from_config( return cls( session_manager=session_manager, + trusted_certificates=trusted_certificates, cert_revocation_check_mode=config.cert_revocation_check_mode, allow_certificates_without_crl_url=config.allow_certificates_without_crl_url, connection_timeout_ms=config.connection_timeout_ms, @@ -272,9 +290,7 @@ def validate_certificate_chains( if certificate_chains is None or len(certificate_chains) == 0: logger.warning("Certificate chains are empty") - if self._cert_revocation_check_mode == CertRevocationCheckMode.ADVISORY: - return True - return False + return self._cert_revocation_check_mode == CertRevocationCheckMode.ADVISORY results = [] for chain in certificate_chains: @@ -294,24 +310,133 @@ def validate_certificate_chains( def _validate_single_chain( self, chain: list[x509.Certificate] ) -> CRLValidationResult: - """Validate a single certificate chain""" + """ + Returns: + UNREVOKED: If there is a path to any trusted certificate where all certificates are unrevoked. + REVOKED: If all paths to trusted certificates are revoked. + ERROR: If there is a path to any trusted certificate on which none certificate is revoked, + but some certificates can't be verified. + """ # An empty chain is considered an error if len(chain) == 0: return CRLValidationResult.ERROR - # the last certificate of the chain is considered the root and isn't validated - results = [] - for i in range(len(chain) - 1): - result = self._validate_certificate(chain[i], chain[i + 1]) - if result == CRLValidationResult.REVOKED: - return CRLValidationResult.REVOKED - results.append(result) - if CRLValidationResult.ERROR in results: + subject_certificates: dict[x509.Name, list[x509.Certificate]] = defaultdict( + list + ) + for cert in chain: + subject_certificates[cert.subject].append(cert) + currently_visited_subjects: set[x509.Name] = set() + + def traverse_chain(cert: x509.Certificate) -> CRLValidationResult | None: + # UNREVOKED - unrevoked path to a trusted certificate found + # REVOKED - all paths are revoked + # ERROR - some certificates on potentially unrevoked paths can't be verified, or no path to a trusted CA is detected + # None - ignore this path (cycle detected) + if self._is_certificate_trusted_by_os(cert): + logger.debug("Found trusted certificate: %s", cert.subject) + return CRLValidationResult.UNREVOKED + + if trusted_ca_issuer := self._get_trusted_ca_issuer(cert): + logger.debug("Certificate signed by trusted CA: %s", cert.subject) + return self._validate_certificate_is_not_revoked_with_cache( + cert, trusted_ca_issuer + ) + + if cert.issuer in currently_visited_subjects: + # cycle detected - invalid path + return None + + valid_results: list[tuple[CRLValidationResult, x509.Certificate]] = [] + for ca_cert in subject_certificates[cert.issuer]: + if not self._verify_certificate_signature(cert, ca_cert): + logger.debug( + "Certificate signature verification failed for %s, looking for other paths", + cert, + ) + continue + + currently_visited_subjects.add(cert.issuer) + ca_result = traverse_chain(ca_cert) + currently_visited_subjects.remove(cert.issuer) + if ca_result is None: + # ignore invalid path result + continue + if ca_result == CRLValidationResult.UNREVOKED: + # good path found + return self._validate_certificate_is_not_revoked_with_cache( + cert, ca_cert + ) + valid_results.append((ca_result, ca_cert)) + + if len(valid_results) == 0: + # "root" certificate not cought by "is_trusted_by_os" check + logger.debug("No path towards trusted anchor: %s", cert.subject) + return CRLValidationResult.ERROR + + # check if there exists an ERROR path + for ca_result, ca_cert in valid_results: + if ca_result == CRLValidationResult.ERROR: + cert_result = self._validate_certificate_is_not_revoked_with_cache( + cert, ca_cert + ) + if cert_result == CRLValidationResult.REVOKED: + return CRLValidationResult.REVOKED + return CRLValidationResult.ERROR + + # no ERROR result found, all paths are REVOKED + return CRLValidationResult.REVOKED + + currently_visited_subjects.add(chain[0].subject) + error_result = False + revoked_result = False + for cert in subject_certificates[chain[0].subject]: + result = traverse_chain(cert) + if result == CRLValidationResult.UNREVOKED: + return result + error_result |= result == CRLValidationResult.ERROR + revoked_result |= result == CRLValidationResult.REVOKED + + if error_result or not revoked_result: return CRLValidationResult.ERROR + return CRLValidationResult.REVOKED - return CRLValidationResult.UNREVOKED + def _is_certificate_trusted_by_os(self, cert: x509.Certificate) -> bool: + if cert.subject not in self._trusted_ca: + return False + + cert_der = cert.public_bytes(serialization.Encoding.DER) + return any( + cert_der == trusted_cert.public_bytes(serialization.Encoding.DER) + for trusted_cert in self._trusted_ca[cert.subject] + ) - def _validate_certificate( + def _get_trusted_ca_issuer(self, cert: x509.Certificate) -> x509.Certificate | None: + for trusted_cert in self._trusted_ca[cert.issuer]: + if self._verify_certificate_signature(cert, trusted_cert): + return trusted_cert + return None + + def _verify_certificate_signature( + self, cert: x509.Certificate, ca_cert: x509.Certificate + ) -> bool: + try: + cert.verify_directly_issued_by(ca_cert) + return True + except Exception: + return False + + def _validate_certificate_is_not_revoked_with_cache( + self, cert: x509.Certificate, ca_cert: x509.Certificate + ) -> CRLValidationResult: + # validate certificate can be called multiple times with the same certificate + if cert not in self._cache_for__validate_certificate_is_not_revoked: + self._cache_for__validate_certificate_is_not_revoked[cert] = ( + self._validate_certificate_is_not_revoked(cert, ca_cert) + ) + return self._cache_for__validate_certificate_is_not_revoked[cert] + + def _validate_certificate_is_not_revoked( self, cert: x509.Certificate, ca_cert: x509.Certificate ) -> CRLValidationResult: """Validate a single certificate against CRL""" @@ -343,14 +468,29 @@ def _validate_certificate( @staticmethod def _is_short_lived_certificate(cert: x509.Certificate) -> bool: - """Check if certificate is short-lived (validity <= 5 days)""" + """Check if certificate is short-lived according to CA/Browser Forum definition: + - For certificates issued on or after 15 March 2024 and prior to 15 March 2026: + validity period <= 10 days (864,000 seconds) + - For certificates issued on or after 15 March 2026: + validity period <= 7 days (604,800 seconds) + """ try: # Use timezone.utc versions to avoid deprecation warnings + issue_date = cert.not_valid_before_utc validity_period = cert.not_valid_after_utc - cert.not_valid_before_utc except AttributeError: # Fallback for older versions + issue_date = cert.not_valid_before validity_period = cert.not_valid_after - cert.not_valid_before - return validity_period.days <= 5 + + # Convert issue_date to UTC if it's not timezone-aware + if issue_date.tzinfo is None: + issue_date = issue_date.replace(tzinfo=timezone.utc) + + march_15_2026 = datetime(2026, 3, 15, tzinfo=timezone.utc) + if issue_date >= march_15_2026: + return validity_period.total_seconds() <= 604800 # 7 days in seconds + return validity_period.total_seconds() <= 864000 # 10 days in seconds @staticmethod def _extract_crl_distribution_points(cert: x509.Certificate) -> list[str]: @@ -446,7 +586,7 @@ def _check_certificate_against_crl_url( ca_cert.subject, crl_url, ) - # In most cases this indicates a configuration issue, but we'll still try verification + return CRLValidationResult.ERROR if not self._verify_crl_signature(crl, ca_cert): logger.warning("CRL signature verification failed for URL: %s", crl_url) @@ -545,25 +685,16 @@ def _extract_certificate_chains_from_connection( Returns: List of certificate chains, where each chain is a list of x509.Certificate objects """ - from OpenSSL.crypto import FILETYPE_ASN1, dump_certificate - try: - cert_chain = connection.get_peer_cert_chain() + # Convert OpenSSL certificates to cryptography x509 certificates + cert_chain = connection.get_peer_cert_chain(as_cryptography=True) if not cert_chain: logger.debug("No certificate chain found in connection") return [] - - # Convert OpenSSL certificates to cryptography x509 certificates - x509_chain = [] - for cert_openssl in cert_chain: - cert_der = dump_certificate(FILETYPE_ASN1, cert_openssl) - cert_x509 = x509.load_der_x509_certificate(cert_der, default_backend()) - x509_chain.append(cert_x509) - logger.debug( - "Extracted %d certificates for CRL validation", len(x509_chain) + "Extracted %d certificates for CRL validation", len(cert_chain) ) - return [x509_chain] # Return as a single chain + return [cert_chain] # Return as a single chain except Exception as e: logger.warning( diff --git a/src/snowflake/connector/ssl_wrap_socket.py b/src/snowflake/connector/ssl_wrap_socket.py index cf3ee92f91..48a34d47bc 100644 --- a/src/snowflake/connector/ssl_wrap_socket.py +++ b/src/snowflake/connector/ssl_wrap_socket.py @@ -16,7 +16,7 @@ from functools import wraps from inspect import signature as _sig from socket import socket -from typing import Any +from typing import TYPE_CHECKING, Any import certifi import OpenSSL.SSL @@ -30,6 +30,9 @@ from .vendored.urllib3.contrib.pyopenssl import PyOpenSSLContext, WrappedSocket from .vendored.urllib3.util import ssl_ as ssl_ +if TYPE_CHECKING: + from cryptography import x509 + DEFAULT_OCSP_MODE: OCSPMode = OCSPMode.FAIL_OPEN FEATURE_OCSP_MODE: OCSPMode = DEFAULT_OCSP_MODE FEATURE_ROOT_CERTS_DICT_LOCK_TIMEOUT: int = ( @@ -150,6 +153,17 @@ def inject_into_urllib3() -> None: connection_.ssl_wrap_socket = ssl_wrap_socket_with_cert_revocation_checks +def _load_trusted_certificates(cafile: str | None) -> list[x509.Certificate]: + # Use default SSL context to load the CA file and get the certificates + ctx = ssl.create_default_context() + ctx.load_verify_locations(cafile=cafile) + certs = ctx.get_ca_certs(binary_form=True) + from cryptography.hazmat.backends import default_backend + from cryptography.x509 import load_der_x509_certificate + + return [load_der_x509_certificate(cert, default_backend()) for cert in certs] + + @wraps(ssl_.ssl_wrap_socket) def ssl_wrap_socket_with_cert_revocation_checks( *args: Any, **kwargs: Any @@ -166,12 +180,12 @@ def ssl_wrap_socket_with_cert_revocation_checks( # Ensure PyOpenSSL context with partial-chain is used if none or wrong type provided provided_ctx = params.get("ssl_context") + cafile_for_ctx = _resolve_cafile(params) if not isinstance(provided_ctx, PyOpenSSLContext): - cafile_for_ctx = _resolve_cafile(params) params["ssl_context"] = _build_context_with_partial_chain(cafile_for_ctx) else: # If a PyOpenSSLContext is provided, ensure it trusts the provided CA and partial-chain is enabled - _ensure_partial_chain_on_context(provided_ctx, _resolve_cafile(params)) + _ensure_partial_chain_on_context(provided_ctx, cafile_for_ctx) ret = ssl_.ssl_wrap_socket(**params) @@ -184,7 +198,9 @@ def ssl_wrap_socket_with_cert_revocation_checks( != CertRevocationCheckMode.DISABLED ): crl_validator = CRLValidator.from_config( - FEATURE_CRL_CONFIG, get_current_session_manager() + FEATURE_CRL_CONFIG, + get_current_session_manager(), + trusted_certificates=_load_trusted_certificates(cafile_for_ctx), ) if not crl_validator.validate_connection(ret.connection): raise OperationalError( diff --git a/test/integ/test_crl.py b/test/integ/test_crl.py index 4b9ff21304..8f97907646 100644 --- a/test/integ/test_crl.py +++ b/test/integ/test_crl.py @@ -8,8 +8,18 @@ from __future__ import annotations import tempfile +import warnings import pytest +from cryptography.utils import CryptographyDeprecationWarning + + +@pytest.fixture(autouse=True) +def _ignore_deprecation_warnings(): + """Fixture to handle deprecation warnings in all tests in this module.""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore", CryptographyDeprecationWarning) + yield @pytest.mark.skipolddriver diff --git a/test/unit/test_crl.py b/test/unit/test_crl.py index 67ae3e6781..f85184840b 100644 --- a/test/unit/test_crl.py +++ b/test/unit/test_crl.py @@ -62,6 +62,27 @@ class CertificateChain: leaf_cert: x509.Certificate +@dataclass +class CrossSignedCertificateChain: + # CA + # / \ + # rootA rootB + # / \ + # A --(AsignB)--> B + # <-(BsignA)-- + # \ / + # leafA leafB + # \/ + # subject + + rootA: x509.Certificate + rootB: x509.Certificate + AsignB: x509.Certificate + BsignA: x509.Certificate + leafA: x509.Certificate + leafB: x509.Certificate + + @pytest.fixture(scope="module") def cert_gen(): class CertificateGeneratorUtil: @@ -242,6 +263,123 @@ def create_simple_chain(self) -> CertificateChain: return CertificateChain(root_cert, intermediate_cert, leaf_cert) + def create_cross_signed_chain(self) -> CertificateChain: + A_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + B_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + leaf_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + + A_name = x509.Name( + [ + x509.NameAttribute( + NameOID.COMMON_NAME, + f"Test CA A {self.random.randint(1, 10000)}", + ) + ] + ) + B_name = x509.Name( + [ + x509.NameAttribute( + NameOID.COMMON_NAME, + f"Test CA B {self.random.randint(1, 10000)}", + ) + ] + ) + leaf_name = x509.Name( + [ + x509.NameAttribute( + NameOID.COMMON_NAME, + f"Test Leaf {self.random.randint(1, 10000)}", + ) + ] + ) + rootA_cert = ( + x509.CertificateBuilder() + .subject_name(A_name) + .issuer_name(self.ca_certificate.subject) + .public_key(A_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(timezone.utc)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=365)) + .add_extension( + x509.BasicConstraints(ca=True, path_length=None), + critical=True, + ) + .sign(self.ca_private_key, hashes.SHA256()) + ) + rootB_cert = ( + x509.CertificateBuilder() + .subject_name(B_name) + .issuer_name(self.ca_certificate.subject) + .public_key(B_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(timezone.utc)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=365)) + .add_extension( + x509.BasicConstraints(ca=True, path_length=None), + critical=True, + ) + .sign(self.ca_private_key, hashes.SHA256()) + ) + BsignA_cert = ( + x509.CertificateBuilder() + .subject_name(A_name) + .issuer_name(B_name) + .public_key(A_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(timezone.utc)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=365)) + .add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True, + ) + .sign(B_key, hashes.SHA256()) + ) + AsignB_cert = ( + x509.CertificateBuilder() + .subject_name(B_name) + .issuer_name(A_name) + .public_key(B_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(timezone.utc)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=365)) + .add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True, + ) + .sign(A_key, hashes.SHA256()) + ) + leafA_cert = ( + x509.CertificateBuilder() + .subject_name(leaf_name) + .issuer_name(A_name) + .public_key(leaf_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(timezone.utc)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=365)) + .add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True, + ) + .sign(A_key, hashes.SHA256()) + ) + leafB_cert = ( + x509.CertificateBuilder() + .subject_name(leaf_name) + .issuer_name(B_name) + .public_key(leaf_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(timezone.utc)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=365)) + .add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True, + ) + .sign(B_key, hashes.SHA256()) + ) + return CrossSignedCertificateChain( + rootA_cert, rootB_cert, AsignB_cert, BsignA_cert, leafA_cert, leafB_cert + ) + def create_short_lived_certificate( self, validity_days: int, issuance_date: datetime ) -> x509.Certificate: @@ -262,7 +400,7 @@ def create_short_lived_certificate( cert = ( x509.CertificateBuilder() .subject_name(name) - .issuer_name(name) # Self-signed for simplicity + .issuer_name(self.ca_certificate.subject) .public_key(key.public_key()) .serial_number(x509.random_serial_number()) .not_valid_before(issuance_date) @@ -271,7 +409,7 @@ def create_short_lived_certificate( x509.BasicConstraints(ca=False, path_length=None), critical=True, ) - .sign(key, hashes.SHA256()) + .sign(self.ca_private_key, hashes.SHA256(), backend=default_backend()) ) return cert @@ -347,6 +485,7 @@ def test_should_allow_connection_when_crl_validation_disabled( validator = CRLValidator( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.DISABLED, + trusted_certificates=[chain.root_cert], ) assert validator.validate_certificate_chains(chains) @@ -358,6 +497,7 @@ def test_should_allow_connection_when_crl_validation_disabled_and_no_cert_chain( validator = CRLValidator( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.DISABLED, + trusted_certificates=[], ) assert validator.validate_certificate_chains([]) assert validator.validate_certificate_chains(None) @@ -368,6 +508,7 @@ def test_should_fail_with_null_or_empty_certificate_chains(cert_gen, session_man validator = CRLValidator( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + trusted_certificates=[], ) assert not validator.validate_certificate_chains([]) assert not validator.validate_certificate_chains(None) @@ -383,6 +524,7 @@ def test_should_handle_certificates_without_crl_urls_in_enabled_mode( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, allow_certificates_without_crl_url=False, + trusted_certificates=[chain.root_cert], ) assert not validator.validate_certificate_chains(chains) @@ -398,6 +540,7 @@ def test_should_allow_certificates_without_crl_urls_when_configured( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, allow_certificates_without_crl_url=True, + trusted_certificates=[chain.root_cert], ) assert validator.validate_certificate_chains(chains) @@ -410,14 +553,14 @@ def test_should_pass_in_advisory_mode_even_with_errors(cert_gen, session_manager validator = CRLValidator( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ADVISORY, + trusted_certificates=[chain.root_cert], ) assert validator.validate_certificate_chains(chains) def test_should_validate_multiple_chains_and_return_first_valid_with_no_crl_urls( - cert_gen, - session_manager, + cert_gen, session_manager ): """Test validation of multiple chains and return first valid""" # Create a certificate that would be considered invalid (before March 2024) @@ -437,12 +580,169 @@ def test_should_validate_multiple_chains_and_return_first_valid_with_no_crl_urls session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, allow_certificates_without_crl_url=True, + trusted_certificates=[valid_chain.root_cert], ) result = validator.validate_certificate_chains(chains) assert result, "Should return true when at least one valid chain is found" +def test_cross_signed_certificate_chain(cert_gen, session_manager): + """Test validation of cross-signed certificate chain""" + chain = cert_gen.create_cross_signed_chain() + validator = CRLValidator( + session_manager, + cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + allow_certificates_without_crl_url=True, + trusted_certificates=[cert_gen.ca_certificate], + ) + + # provide full chain in arbitrary order + chains = [ + [ + chain.leafA, + chain.AsignB, + chain.leafB, + chain.BsignA, + chain.rootB, + chain.rootA, + ] + ] + assert validator.validate_certificate_chains(chains) + + # only A is signed by CA + chains = [ + [ + chain.leafA, + chain.AsignB, + chain.leafB, + chain.BsignA, + # chain.rootB, + chain.rootA, + ] + ] + assert validator.validate_certificate_chains(chains) + + # nor A nor B is signed by CA + chains = [ + [ + chain.leafA, + chain.AsignB, + chain.leafB, + chain.BsignA, + # chain.rootB, + # chain.rootA, + ] + ] + assert not validator.validate_certificate_chains(chains) + + # mingled A and B paths passed in one chain - A has no connection to CA, B has + chains = [ + [ + chain.leafA, + chain.AsignB, + chain.leafB, + # chain.BsignA, + chain.rootB, + # chain.rootA, + ] + ] + assert validator.validate_certificate_chains(chains) + + +def test_starfield_incident(cert_gen, session_manager): + # leaf is signed by A, who is signed by CA (revoked) and B, who is also a trusted CA + chain = cert_gen.create_cross_signed_chain() + validator = CRLValidator( + session_manager, + cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + trusted_certificates=[cert_gen.ca_certificate, chain.rootB], + ) + + def mock_validate(cert, _): + if cert == chain.rootA: + return CRLValidationResult.REVOKED + return CRLValidationResult.UNREVOKED + + validator._validate_certificate_is_not_revoked = mock_validate + + assert ( + validator._validate_single_chain([chain.leafA, chain.BsignA, chain.rootA]) + == CRLValidationResult.UNREVOKED + ) + + +def test_validate_single_chain(cert_gen, session_manager): + chain = cert_gen.create_cross_signed_chain() + validator = CRLValidator( + session_manager, + cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + trusted_certificates=[cert_gen.ca_certificate], + ) + + input_chain = [chain.leafA, chain.leafB, chain.rootA, chain.rootB] + + # case 1: at least one valid path + def mock_validate_with_special_cert(revoked_cert, error_result): + validator._validate_certificate_is_not_revoked_with_cache = lambda cert, _: ( + error_result if cert == revoked_cert else CRLValidationResult.UNREVOKED + ) + + for error_result in [CRLValidationResult.ERROR, CRLValidationResult.REVOKED]: + for revoked_cert in [chain.rootA, chain.rootB, chain.leafA, chain.leafB]: + mock_validate_with_special_cert(revoked_cert, error_result) + assert ( + validator._validate_single_chain(input_chain) + == CRLValidationResult.UNREVOKED + ) + + # case 2: all paths revoked + def mock_validate(cert, _): + if cert in [chain.rootA, chain.rootB]: + return CRLValidationResult.REVOKED + return CRLValidationResult.UNREVOKED + + validator._validate_certificate_is_not_revoked_with_cache = mock_validate + assert validator._validate_single_chain(input_chain) == CRLValidationResult.REVOKED + + # case 3: revoked + error should result in revoked\ + def mock_validate(cert, _): + if cert in [chain.rootA, chain.leafB]: + return CRLValidationResult.REVOKED + return CRLValidationResult.ERROR + + validator._validate_certificate_is_not_revoked_with_cache = mock_validate + assert validator._validate_single_chain(input_chain) == CRLValidationResult.REVOKED + + # case 4: no path to trusted certificate + def mock_validate(cert, _): + return CRLValidationResult.UNREVOKED + + validator._validate_certificate_is_not_revoked_with_cache = mock_validate + assert ( + validator._validate_single_chain( + [chain.leafA, chain.leafB, chain.AsignB, chain.BsignA] + ) + == CRLValidationResult.ERROR + ) + + # case 5: only unrevoked path has an error + def mock_validate(cert, _): + if cert in [chain.rootA, chain.leafB]: + return CRLValidationResult.REVOKED + if cert == chain.BsignA: + return CRLValidationResult.ERROR + return CRLValidationResult.UNREVOKED + + validator._validate_certificate_is_not_revoked_with_cache = mock_validate + assert ( + validator._validate_single_chain( + [chain.leafA, chain.rootA, chain.leafB, chain.rootB, chain.BsignA] + ) + == CRLValidationResult.ERROR + ) + + @responses.activate def test_should_validate_non_revoked_certificate_successfully( cert_gen, crl_urls, session_manager @@ -467,6 +767,38 @@ def test_should_validate_non_revoked_certificate_successfully( validator = CRLValidator( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + trusted_certificates=[cert_gen.ca_certificate], + ) + + assert validator.validate_certificate_chains([chain]) + assert resp.call_count + + +@responses.activate +def test_should_validate_non_revoked_certificate_successfully_if_root_not_provided_on_chain( + cert_gen, crl_urls, session_manager +): + """Test validation of non-revoked certificate""" + # Setup mock HTTP client + crl_content = cert_gen.generate_valid_crl() + resp = responses.add( + responses.GET, + crl_urls.test_ca, + body=crl_content, + status=200, + content_type="application/pkcs7-mime", + ) + + # Create certificate with CRL distribution point + cert = cert_gen.create_certificate_with_crl_distribution_points( + "CN=Test Server", [crl_urls.test_ca] + ) + chain = [cert] + + validator = CRLValidator( + session_manager, + cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + trusted_certificates=[cert_gen.ca_certificate], ) assert validator.validate_certificate_chains([chain]) @@ -495,6 +827,7 @@ def test_should_fail_for_revoked_certificate(cert_gen, crl_urls, session_manager validator = CRLValidator( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + trusted_certificates=[cert_gen.ca_certificate], ) assert not validator.validate_certificate_chains([chain]) @@ -524,6 +857,7 @@ def test_should_allow_revoked_certificate_when_crl_validation_disabled( validator = CRLValidator( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.DISABLED, + trusted_certificates=[cert_gen.ca_certificate], ) assert validator.validate_certificate_chains([chain]) @@ -546,6 +880,7 @@ def test_should_pass_in_advisory_mode_with_crl_errors( validator = CRLValidator( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ADVISORY, + trusted_certificates=[cert_gen.ca_certificate], ) assert validator.validate_certificate_chains([chain]) @@ -568,6 +903,7 @@ def test_should_fail_in_enabled_mode_with_crl_errors( validator = CRLValidator( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + trusted_certificates=[cert_gen.ca_certificate], ) assert not validator.validate_certificate_chains([chain]) @@ -606,6 +942,7 @@ def test_should_validate_multiple_chains_and_success_if_just_one_valid( validator = CRLValidator( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + trusted_certificates=[cert_gen.ca_certificate], ) assert validator.validate_certificate_chains([invalid_chain, valid_chain]) @@ -633,6 +970,7 @@ def test_should_reject_expired_crl(cert_gen, crl_urls, session_manager): validator = CRLValidator( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + trusted_certificates=[cert_gen.ca_certificate], ) assert not validator.validate_certificate_chains([chain]) @@ -650,6 +988,7 @@ def test_should_skip_short_lived_certificates(cert_gen, session_manager): validator = CRLValidator( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + trusted_certificates=[cert_gen.ca_certificate], ) # Should pass without any HTTP calls (no responses setup) @@ -691,6 +1030,7 @@ def test_should_handle_multiple_crl_distribution_points( validator = CRLValidator( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + trusted_certificates=[cert_gen.ca_certificate], ) assert validator.validate_certificate_chains([chain]) @@ -702,12 +1042,14 @@ def test_crl_validator_creation(session_manager): """Test that CRLValidator can be created properly""" # Test basic instantiation - validator = CRLValidator(session_manager) + validator = CRLValidator(session_manager, trusted_certificates=[]) assert validator is not None assert isinstance(validator, CRLValidator) # Test that it works with from_config class method - validator = CRLValidator.from_config(CRLConfig(), session_manager) + validator = CRLValidator.from_config( + CRLConfig(), session_manager, trusted_certificates=[] + ) assert validator is not None assert isinstance(validator, CRLValidator) @@ -725,7 +1067,7 @@ def test_crl_validator_atexit_cleanup(session_manager): try: # Create validator which should start cleanup - CRLValidator.from_config(config, session_manager) + CRLValidator.from_config(config, session_manager, trusted_certificates=[]) # Verify cleanup is running through factory assert CRLCacheFactory.is_periodic_cleanup_running() @@ -750,14 +1092,16 @@ def test_crl_validator_validate_connection(session_manager): # Test with no certificate chain mock_connection.get_peer_cert_chain.return_value = [] - validator = CRLValidator(session_manager) + validator = CRLValidator(session_manager, trusted_certificates=[]) # Should return True when disabled (default) assert validator.validate_connection(mock_connection) # Test with enabled mode and no certificates validator = CRLValidator( - session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ENABLED + session_manager, + cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + trusted_certificates=[], ) assert not validator.validate_connection(mock_connection) @@ -766,7 +1110,9 @@ def test_crl_validator_extract_certificate_chains_from_connection( cert_gen, session_manager ): """Test the _extract_certificate_chains_from_connection method""" - validator = CRLValidator(session_manager) + chain = cert_gen.create_simple_chain() + + validator = CRLValidator(session_manager, trusted_certificates=[chain.root_cert]) # Test with no certificate chain mock_connection = Mock() @@ -776,7 +1122,6 @@ def test_crl_validator_extract_certificate_chains_from_connection( assert chains == [] # Test with mock certificate chain - chain = cert_gen.create_simple_chain() mock_certs = [] # Create mock OpenSSL certificates @@ -992,7 +1337,9 @@ def test_crl_validator_download_crl_success(cert_gen, session_manager): content_type="application/pkcs7-mime", ) - validator = CRLValidator(session_manager) + validator = CRLValidator( + session_manager, trusted_certificates=[cert_gen.ca_certificate] + ) # Test the download method - it returns a tuple (crl, timestamp) crl, timestamp = validator._download_crl(crl_url) @@ -1008,7 +1355,7 @@ def test_crl_validator_download_crl_http_error(session_manager): responses.add(responses.GET, crl_url, status=404) - validator = CRLValidator(session_manager) + validator = CRLValidator(session_manager, trusted_certificates=[]) # Should return (None, None) on HTTP error crl, timestamp = validator._download_crl(crl_url) @@ -1024,7 +1371,10 @@ def test_crl_validator_download_crl_network_timeout(session_manager): crl_url = "http://example.com/slow.crl" validator = CRLValidator( - session_manager, connection_timeout_ms=1000, read_timeout_ms=1000 + session_manager, + connection_timeout_ms=1000, + read_timeout_ms=1000, + trusted_certificates=[], ) # Mock requests to raise timeout @@ -1045,7 +1395,7 @@ def test_crl_validator_download_crl_network_error(session_manager): crl_url = "http://example.com/unreachable.crl" - validator = CRLValidator(session_manager) + validator = CRLValidator(session_manager, trusted_certificates=[]) # Mock requests to raise connection error with mock_patch.object( @@ -1064,7 +1414,7 @@ def test_crl_validator_extract_crl_distribution_points_success( crl_urls = ["http://example.com/ca.crl", "http://backup.com/ca.crl"] cert = cert_gen.create_certificate_with_crl_distribution_points("CN=Test", crl_urls) - validator = CRLValidator(session_manager) + validator = CRLValidator(session_manager, trusted_certificates=[]) extracted_urls = validator._extract_crl_distribution_points(cert) @@ -1081,7 +1431,7 @@ def test_crl_validator_extract_crl_distribution_points_no_extension( chain = cert_gen.create_simple_chain() cert = chain.leaf_cert - validator = CRLValidator(session_manager) + validator = CRLValidator(session_manager, trusted_certificates=[]) # Should return empty list when no CRL extension found extracted_urls = validator._extract_crl_distribution_points(cert) @@ -1102,7 +1452,7 @@ def test_crl_validator_check_certificate_against_crl_not_revoked( mock_crl = Mock(spec=CertificateRevocationList) mock_crl.get_revoked_certificate_by_serial_number.return_value = None - validator = CRLValidator(session_manager) + validator = CRLValidator(session_manager, trusted_certificates=[]) # Should return UNREVOKED result = validator._check_certificate_against_crl(cert, mock_crl) @@ -1122,7 +1472,7 @@ def test_crl_validator_check_certificate_against_crl_revoked(cert_gen, session_m mock_crl = Mock(spec=CertificateRevocationList) mock_crl.get_revoked_certificate_by_serial_number.return_value = mock_revoked_cert - validator = CRLValidator(session_manager) + validator = CRLValidator(session_manager, trusted_certificates=[]) # Should return REVOKED result = validator._check_certificate_against_crl(cert, mock_crl) @@ -1143,12 +1493,15 @@ def test_crl_validator_check_certificate_against_crl_expired( mock_crl = Mock(spec=x509.CertificateRevocationList) mock_crl.next_update_utc = datetime.now(timezone.utc) - timedelta(days=1) # Expired mock_crl.get_revoked_certificate_by_serial_number.return_value = None + mock_crl.issuer = parent.subject # Cache will return an expired CRL mock_cache_mgr = Mock(spec=CRLCacheManager) mock_cache_mgr.get.return_value = CRLCacheEntry(mock_crl, datetime.now()) - validator = CRLValidator(session_manager, cache_manager=mock_cache_mgr) + validator = CRLValidator( + session_manager, cache_manager=mock_cache_mgr, trusted_certificates=[] + ) with mock_patch.object( validator, "_download_crl", return_value=(mock_crl, datetime.now()) ) as mock_download, mock_patch.object( @@ -1177,11 +1530,12 @@ def test_crl_validator_validate_certificate_with_cache_hit( # Mock cache manager with cache hit mock_crl = Mock(spec=x509.CertificateRevocationList) mock_crl.next_update_utc = datetime.now(timezone.utc) + timedelta(days=7) + mock_crl.issuer = ca_cert.subject mock_cache_manager = Mock() cached_entry = CRLCacheEntry(mock_crl, datetime.now(timezone.utc)) mock_cache_manager.get.return_value = cached_entry - validator = CRLValidator(session_manager) + validator = CRLValidator(session_manager, trusted_certificates=[]) validator._cache_manager = mock_cache_manager # Mock CRL parsing and validation @@ -1192,7 +1546,7 @@ def test_crl_validator_validate_certificate_with_cache_hit( ) as mock_check, mock_patch.object( validator, "_verify_crl_signature", return_value=True ) as mock_verify: - result = validator._validate_certificate(cert, ca_cert) + result = validator._validate_certificate_is_not_revoked(cert, ca_cert) # Should use cached CRL assert result == CRLValidationResult.UNREVOKED @@ -1215,7 +1569,11 @@ def test_crl_validator_validate_certificate_with_cache_miss( mock_cache_manager = Mock() mock_cache_manager.get.return_value = None - validator = CRLValidator(session_manager, cache_manager=mock_cache_manager) + validator = CRLValidator( + session_manager, + cache_manager=mock_cache_manager, + trusted_certificates=[], + ) # Mock successful download and validation with mock_patch.object( @@ -1229,12 +1587,11 @@ def test_crl_validator_validate_certificate_with_cache_miss( ) as mock_check, mock_patch.object( validator, "_verify_crl_signature", return_value=True ) as mock_verify: - mock_crl = Mock() mock_crl.next_update_utc = datetime.now(timezone.utc) + timedelta(days=7) + mock_crl.issuer = ca_cert.subject # Set the CRL issuer to match CA subject mock_load_crl.return_value = mock_crl - - result = validator._validate_certificate(cert, ca_cert) + result = validator._validate_certificate_is_not_revoked(cert, ca_cert) # Should download CRL and cache it assert result == CRLValidationResult.UNREVOKED @@ -1251,7 +1608,7 @@ def test_crl_signature_verification_success(cert_gen, session_manager): crl_bytes = cert_gen.generate_valid_crl() crl = x509.load_der_x509_crl(crl_bytes, backend=default_backend()) - validator = CRLValidator(session_manager) + validator = CRLValidator(session_manager, trusted_certificates=[]) # Should successfully verify the signature result = validator._verify_crl_signature(crl, cert_gen.ca_certificate) @@ -1286,7 +1643,7 @@ def test_crl_signature_verification_failure_wrong_ca(cert_gen, session_manager): .sign(different_ca_key, hashes.SHA256(), backend=default_backend()) ) - validator = CRLValidator(session_manager) + validator = CRLValidator(session_manager, trusted_certificates=[]) # Should fail to verify the signature with wrong CA result = validator._verify_crl_signature(crl, different_ca_cert) @@ -1339,7 +1696,7 @@ def test_crl_signature_verification_with_ec_key(session_manager): ec_crl = builder.sign(ec_private_key, hashes.SHA256(), backend=default_backend()) - validator = CRLValidator(session_manager) + validator = CRLValidator(session_manager, trusted_certificates=[]) # Should successfully verify EC signature result = validator._verify_crl_signature(ec_crl, ec_ca_cert) @@ -1359,7 +1716,7 @@ def test_crl_signature_verification_with_corrupted_signature(cert_gen, session_m corrupted_crl.signature = b"corrupted_signature_bytes" corrupted_crl.tbs_certlist_bytes = crl.tbs_certlist_bytes - validator = CRLValidator(session_manager) + validator = CRLValidator(session_manager, trusted_certificates=[]) # Should fail to verify corrupted signature result = validator._verify_crl_signature(corrupted_crl, cert_gen.ca_certificate) @@ -1373,10 +1730,12 @@ def test_crl_signature_verification_exception_handling(cert_gen, session_manager crl = x509.load_der_x509_crl(crl_bytes, backend=default_backend()) # Mock CA certificate that will cause an exception + mock_public_key = Mock() + mock_public_key.verify.side_effect = Exception("Test exception") mock_ca_cert = Mock(spec=x509.Certificate) - mock_ca_cert.public_key.side_effect = Exception("Test exception") + mock_ca_cert.public_key.return_value = mock_public_key - validator = CRLValidator(session_manager) + validator = CRLValidator(session_manager, trusted_certificates=[]) # Should handle exception gracefully and return False result = validator._verify_crl_signature(crl, mock_ca_cert) @@ -1415,12 +1774,15 @@ def test_crl_signature_verification_integration_with_validation_flow( validator_enabled = CRLValidator( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + trusted_certificates=[], ) with mock_patch.object( validator_enabled, "_fetch_crl_from_url", return_value=invalid_crl_bytes ): - result = validator_enabled._validate_certificate(cert, cert_gen.ca_certificate) + result = validator_enabled._validate_certificate_is_not_revoked( + cert, cert_gen.ca_certificate + ) assert result == CRLValidationResult.ERROR # Test in ADVISORY mode - should also fail due to signature verification failure @@ -1428,12 +1790,15 @@ def test_crl_signature_verification_integration_with_validation_flow( validator_advisory = CRLValidator( session_manager, cert_revocation_check_mode=CertRevocationCheckMode.ADVISORY, + trusted_certificates=[], ) with mock_patch.object( validator_advisory, "_fetch_crl_from_url", return_value=invalid_crl_bytes ): - result = validator_advisory._validate_certificate(cert, cert_gen.ca_certificate) + result = validator_advisory._validate_certificate_is_not_revoked( + cert, cert_gen.ca_certificate + ) # Even in ADVISORY mode, signature verification failure should return ERROR # We cannot trust a CRL whose signature cannot be verified assert result == CRLValidationResult.ERROR @@ -1444,8 +1809,8 @@ def test_crl_signature_verification_with_issuer_mismatch_warning( ): """Test that we log a warning when CRL issuer doesn't match CA certificate subject""" # Create a valid CRL signed by the test CA - crl_bytes = cert_gen.generate_valid_crl() - crl = x509.load_der_x509_crl(crl_bytes, backend=default_backend()) + crl = Mock(spec=x509.CertificateRevocationList) + crl.issuer = cert_gen.ca_certificate.subject # Create a different CA certificate with different subject different_ca_key = rsa.generate_private_key( @@ -1469,7 +1834,7 @@ def test_crl_signature_verification_with_issuer_mismatch_warning( .sign(different_ca_key, hashes.SHA256(), backend=default_backend()) ) - validator = CRLValidator(session_manager) + validator = CRLValidator(session_manager, trusted_certificates=[]) # Mock the _verify_crl_signature to return True to focus on the issuer check with mock_patch.object( @@ -1491,8 +1856,8 @@ def test_crl_signature_verification_with_issuer_mismatch_warning( "http://test.crl", ) - # Should still return UNREVOKED since signature verification was mocked to succeed - assert result == CRLValidationResult.UNREVOKED + # Should return ERROR due to issuer mismatch + assert result == CRLValidationResult.ERROR # Verify that the warning was logged assert len(caplog.records) > 0 @@ -1505,3 +1870,215 @@ def test_crl_signature_verification_with_issuer_mismatch_warning( assert ( warning_found ), f"Expected warning about CRL issuer mismatch not found in logs: {[r.message for r in caplog.records]}" + + +@pytest.mark.parametrize( + "issue_date,validity_days,expected", + [ + ( + # Issued on March 15, 2024, should use 10-day rule + datetime(2024, 3, 15, tzinfo=timezone.utc), + 10, + True, + ), + ( + # Issued on March 15, 2024, should use 10-day rule + datetime(2024, 3, 15, tzinfo=timezone.utc), + 11, + False, + ), + ( + # Issued on March 15, 2024, should use 10-day rule + datetime(2024, 3, 15), + 10, + True, + ), + ( + # Issued on March 15, 2024, should use 10-day rule + datetime(2024, 3, 15), + 11, + False, + ), + ( + # Issued on March 15, 2026, should use 7-day rule + datetime(2026, 3, 15, tzinfo=timezone.utc), + 7, + True, + ), + ( + # Issued on March 15, 2026, should use 7-day rule + datetime(2026, 3, 15, tzinfo=timezone.utc), + 8, + False, + ), + ( + # Issued on March 15, 2026, should use 7-day rule + datetime(2026, 3, 15), + 7, + True, + ), + ( + # Issued on March 15, 2026, should use 7-day rule + datetime(2026, 3, 15), + 8, + False, + ), + ], +) +def test_is_short_lived_certificate(cert_gen, issue_date, validity_days, expected): + cert = cert_gen.create_short_lived_certificate(validity_days, issue_date) + assert CRLValidator._is_short_lived_certificate(cert) == expected + + +def test_validate_certificate_signatures(cert_gen, session_manager): + """Test that certificate validation fails with ERROR when signed by wrong key""" + # Create a certificate signed by the test CA + valid_cert = cert_gen.create_certificate_with_crl_distribution_points( + "CN=Test Server", [] + ) + + # Create a different CA key pair + different_ca_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend() + ) + different_cert = ( + x509.CertificateBuilder() + .subject_name(valid_cert.subject) + .issuer_name(cert_gen.ca_certificate.subject) + .public_key(cert_gen.ca_private_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(timezone.utc)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=365)) + .add_extension( + x509.BasicConstraints(ca=True, path_length=None), + critical=True, + ) + .sign(different_ca_key, hashes.SHA256(), backend=default_backend()) + ) + short_lived_different_cert = ( + x509.CertificateBuilder() + .subject_name(valid_cert.subject) + .issuer_name(cert_gen.ca_certificate.subject) + .public_key(different_ca_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(timezone.utc)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=3)) + .add_extension( + x509.BasicConstraints(ca=True, path_length=None), + critical=True, + ) + .sign(different_ca_key, hashes.SHA256(), backend=default_backend()) + ) + + validator = CRLValidator( + session_manager, + cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + allow_certificates_without_crl_url=True, + trusted_certificates=[cert_gen.ca_certificate], + ) + + # wrong signature - no path found = ERROR + assert ( + validator._validate_single_chain([different_cert]) == CRLValidationResult.ERROR + ) + # wrong signature - short-lived - no path found = ERROR + assert ( + validator._validate_single_chain([short_lived_different_cert]) + == CRLValidationResult.ERROR + ) + # wrong signature does not stop from searching of new path + assert ( + validator._validate_single_chain( + [different_cert, short_lived_different_cert, valid_cert] + ) + == CRLValidationResult.UNREVOKED + ) + + +def test_validate_certificate_signatures_in_chain(cert_gen, session_manager): + """Test that certificate validation fails with ERROR when signed by wrong key""" + # Create a certificate chain signed by the test CA: leaf -> A -> B -> CA + # mingle with A -> B + chain = cert_gen.create_cross_signed_chain() + + valid_cert = chain.BsignA + + # Create a different CA key pair + different_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend() + ) + different_cert = ( + x509.CertificateBuilder() + .subject_name(valid_cert.subject) + .issuer_name(cert_gen.ca_certificate.subject) + .public_key(cert_gen.ca_private_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(timezone.utc)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=365)) + .add_extension( + x509.BasicConstraints(ca=True, path_length=None), + critical=True, + ) + .sign(different_key, hashes.SHA256(), backend=default_backend()) + ) + short_lived_different_cert = ( + x509.CertificateBuilder() + .subject_name(valid_cert.subject) + .issuer_name(cert_gen.ca_certificate.subject) + .public_key(different_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(timezone.utc)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=3)) + .add_extension( + x509.BasicConstraints(ca=True, path_length=None), + critical=True, + ) + .sign(different_key, hashes.SHA256(), backend=default_backend()) + ) + + validator = CRLValidator( + session_manager, + allow_certificates_without_crl_url=True, + cert_revocation_check_mode=CertRevocationCheckMode.ENABLED, + trusted_certificates=[cert_gen.ca_certificate], + ) + + # wrong signature - no path found = ERROR + assert ( + validator._validate_single_chain([chain.leafA, different_cert, chain.rootB]) + == CRLValidationResult.ERROR + ) + # wrong signature - short-lived - no path found = ERROR + assert ( + validator._validate_single_chain( + [chain.leafA, short_lived_different_cert, chain.rootB] + ) + == CRLValidationResult.ERROR + ) + # wrong signature does not stop from searching of new path + assert ( + validator._validate_single_chain( + [ + chain.leafA, + different_cert, + short_lived_different_cert, + valid_cert, + chain.rootB, + ] + ) + == CRLValidationResult.UNREVOKED + ) + + +def test_trusted_certificates_helpers(cert_gen): + chain = cert_gen.create_simple_chain() + + validator = CRLValidator( + session_manager=Mock(), trusted_certificates=[chain.root_cert] + ) + + assert validator._is_certificate_trusted_by_os(chain.root_cert) is True + assert validator._is_certificate_trusted_by_os(chain.intermediate_cert) is False + + assert validator._get_trusted_ca_issuer(chain.intermediate_cert) is chain.root_cert + assert validator._get_trusted_ca_issuer(chain.leaf_cert) is None