Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 165 additions & 34 deletions src/snowflake/connector/crl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python
from __future__ import annotations

from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from enum import Enum, unique
Expand All @@ -11,6 +12,7 @@
from cryptography import x509
from cryptography.hazmat._oid import ExtensionOID
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa
from OpenSSL.SSL import Connection as SSLConnection

Expand Down Expand Up @@ -53,15 +55,15 @@ class CRLConfig:
CertRevocationCheckMode.DISABLED
)
allow_certificates_without_crl_url: bool = False
connection_timeout_ms: int = 3000
read_timeout_ms: int = 3000
connection_timeout_ms: int = 5000
read_timeout_ms: int = 5000 # 5s
cache_validity_time: timedelta = timedelta(hours=24)
enable_crl_cache: bool = True
enable_crl_file_cache: bool = True
crl_cache_dir: Path | str | None = None
crl_cache_removal_delay_days: int = 7
crl_cache_cleanup_interval_hours: int = 1
crl_cache_start_cleanup: bool = False
crl_cache_start_cleanup: bool = True

@classmethod
def from_connection(cls, sf_connection) -> CRLConfig:
Expand Down Expand Up @@ -176,6 +178,7 @@ class CRLValidator:
def __init__(
self,
session_manager: SessionManager | Any,
trusted_certificates: list[x509.Certificate],
cert_revocation_check_mode: CertRevocationCheckMode = CRLConfig.cert_revocation_check_mode,
allow_certificates_without_crl_url: bool = CRLConfig.allow_certificates_without_crl_url,
connection_timeout_ms: int = CRLConfig.connection_timeout_ms,
Expand All @@ -191,9 +194,22 @@ def __init__(
self._cache_validity_time = cache_validity_time
self._cache_manager = cache_manager or CRLCacheManager.noop()

# list of trusted CA and their certificates
self._trusted_ca: dict[x509.Name, list[x509.Certificate]] = defaultdict(list)
for cert in trusted_certificates:
self._trusted_ca[cert.subject].append(cert)

# declaration of validate_certificate_is_not_revoked function cache
self._cache_for__validate_certificate_is_not_revoked: dict[
x509.Certificate, CRLValidationResult
] = {}

@classmethod
def from_config(
cls, config: CRLConfig, session_manager: SessionManager
cls,
config: CRLConfig,
session_manager: SessionManager,
trusted_certificates: list[x509.Certificate],
) -> CRLValidator:
"""
Create a CRLValidator instance from a CRLConfig.
Expand All @@ -204,6 +220,7 @@ def from_config(
Args:
config: CRLConfig instance containing CRL-related parameters
session_manager: SessionManager instance
trusted_certificates: List of trusted CA certificates

Returns:
CRLValidator: Configured CRLValidator instance
Expand Down Expand Up @@ -244,6 +261,7 @@ def from_config(

return cls(
session_manager=session_manager,
trusted_certificates=trusted_certificates,
cert_revocation_check_mode=config.cert_revocation_check_mode,
allow_certificates_without_crl_url=config.allow_certificates_without_crl_url,
connection_timeout_ms=config.connection_timeout_ms,
Expand Down Expand Up @@ -272,9 +290,7 @@ def validate_certificate_chains(

if certificate_chains is None or len(certificate_chains) == 0:
logger.warning("Certificate chains are empty")
if self._cert_revocation_check_mode == CertRevocationCheckMode.ADVISORY:
return True
return False
return self._cert_revocation_check_mode == CertRevocationCheckMode.ADVISORY

results = []
for chain in certificate_chains:
Expand All @@ -294,24 +310,133 @@ def validate_certificate_chains(
def _validate_single_chain(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is _validate_certificate_list more accurate? (Many chains are being checked. certificate_list is the name of the field in the TLS 1.3 RFC.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's called "chain" all over the code base (including openSSL and other drivers) - I'd leave it like this.

self, chain: list[x509.Certificate]
) -> CRLValidationResult:
"""Validate a single certificate chain"""
"""
Returns:
UNREVOKED: If there is a path to any trusted certificate where all certificates are unrevoked.
REVOKED: If all paths to trusted certificates are revoked.
ERROR: If there is a path to any trusted certificate on which none certificate is revoked,
but some certificates can't be verified.
"""
# An empty chain is considered an error
if len(chain) == 0:
return CRLValidationResult.ERROR
# the last certificate of the chain is considered the root and isn't validated
results = []
for i in range(len(chain) - 1):
result = self._validate_certificate(chain[i], chain[i + 1])
if result == CRLValidationResult.REVOKED:
return CRLValidationResult.REVOKED
results.append(result)

if CRLValidationResult.ERROR in results:
subject_certificates: dict[x509.Name, list[x509.Certificate]] = defaultdict(
list
)
for cert in chain:
Copy link
Contributor

@sfc-gh-jkasten sfc-gh-jkasten Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Perhaps add a quick check on the chain size? It would be a weird attack, but I get a bit more concerned as we look towards PQ.

subject_certificates[cert.subject].append(cert)
currently_visited_subjects: set[x509.Name] = set()

def traverse_chain(cert: x509.Certificate) -> CRLValidationResult | None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - another revocation bypass. We need to fully validate the certificate. In this additional case, the expiration time of the certificate is not being checked.

  1. Revoked target.com
  2. Legit intermediate signing target.com that chains to root
  3. Expired target.com certificate (can be easily retrieved via crt.sh, etc.)
  4. Expired target.com cert's legit intermediate that chains to root.

1 + 2 satisfies normal certificate validation. 3 + 4 satisfies the revocation check (It is extremely likely the CRL endpoint is still in use)
Sorry for not calling out this additional detail last night.

# UNREVOKED - unrevoked path to a trusted certificate found
# REVOKED - all paths are revoked
# ERROR - some certificates on potentially unrevoked paths can't be verified, or no path to a trusted CA is detected
# None - ignore this path (cycle detected)
if self._is_certificate_trusted_by_os(cert):
logger.debug("Found trusted certificate: %s", cert.subject)
return CRLValidationResult.UNREVOKED

if trusted_ca_issuer := self._get_trusted_ca_issuer(cert):
logger.debug("Certificate signed by trusted CA: %s", cert.subject)
return self._validate_certificate_is_not_revoked_with_cache(
cert, trusted_ca_issuer
)

if cert.issuer in currently_visited_subjects:
# cycle detected - invalid path
return None

valid_results: list[tuple[CRLValidationResult, x509.Certificate]] = []
for ca_cert in subject_certificates[cert.issuer]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure that the "issuer" is an actual issuer (basicConstraints == true).

Consider an attacker sending

  1. Revoked example.com
  2. Legit intermediate
  3. example.com signed by a leaf certificate jdkasten.com
  4. jdkasten.com
  5. jdkasten.com's intermediate.

All of the signatures would work out. L378 means that revocation would also work. We need to verify that all CAs are actually CAs. (pathLen is probably not necessary to check)

This attack is related to the lack of signature attack. I hope there is a library that will allow you to check both the signature and the CA properties with the same code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @snowflakedb/pki-oversight

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed :)

Copy link
Contributor

@sfc-gh-jkasten sfc-gh-jkasten Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am so sorry. I think the function you used does not check that the CA certificate is an actual CA certificate?

Here is the documentation for the function.
https://cryptography.io/en/42.0.2/x509/reference/#cryptography.x509.Certificate.verify_directly_issued_by

Here is the code.
https://github.com/pyca/cryptography/blob/fe8438c62eb470c976114b75581b8c7ea3e9e854/src/rust/src/x509/certificate.rs#L321

Let me know if I completely missed the control. (If so, sorry)

if not self._verify_certificate_signature(cert, ca_cert):
logger.debug(
"Certificate signature verification failed for %s, looking for other paths",
cert,
)
continue

currently_visited_subjects.add(cert.issuer)
ca_result = traverse_chain(ca_cert)
currently_visited_subjects.remove(cert.issuer)
if ca_result is None:
# ignore invalid path result
continue
if ca_result == CRLValidationResult.UNREVOKED:
# good path found
return self._validate_certificate_is_not_revoked_with_cache(
cert, ca_cert
)
valid_results.append((ca_result, ca_cert))

if len(valid_results) == 0:
# "root" certificate not cought by "is_trusted_by_os" check
logger.debug("No path towards trusted anchor: %s", cert.subject)
return CRLValidationResult.ERROR

# check if there exists an ERROR path
for ca_result, ca_cert in valid_results:
if ca_result == CRLValidationResult.ERROR:
cert_result = self._validate_certificate_is_not_revoked_with_cache(
cert, ca_cert
)
if cert_result == CRLValidationResult.REVOKED:
return CRLValidationResult.REVOKED
return CRLValidationResult.ERROR

# no ERROR result found, all paths are REVOKED
return CRLValidationResult.REVOKED

currently_visited_subjects.add(chain[0].subject)
error_result = False
revoked_result = False
for cert in subject_certificates[chain[0].subject]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that this is allowing for multiple leaf certificates in the chain. (This is outside the spec and fine in theory.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right - just to be safe

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about this more, I think you would need to make sure that the public key contained within each of the leaf certificates is equivalent at minimum.

What we are missing is the ability to tie the leaf certificate to the TLS handshake.

In essence consider the attack...

  1. Revoked target.com
  2. Legit intermediate signing target.com that chains to root
  3. New target.com certificate you pulled from CT/crt.sh - it is completely valid, but you don't have the key.
  4. New target.com's intermediate

(1 + 2, get past the normal certificate verification. 3+4 bypass revocation checks)

In general, this is outside of the TLS spec and would require that TLS servers get multiple certificates from trusted CAs. I suspect that this capability is not worth the additional trouble. It contributes to the other attacks I listed, but its removal does not prevent them.

result = traverse_chain(cert)
if result == CRLValidationResult.UNREVOKED:
return result
error_result |= result == CRLValidationResult.ERROR
revoked_result |= result == CRLValidationResult.REVOKED

if error_result or not revoked_result:
return CRLValidationResult.ERROR
return CRLValidationResult.REVOKED

return CRLValidationResult.UNREVOKED
def _is_certificate_trusted_by_os(self, cert: x509.Certificate) -> bool:
if cert.subject not in self._trusted_ca:
return False

cert_der = cert.public_bytes(serialization.Encoding.DER)
return any(
cert_der == trusted_cert.public_bytes(serialization.Encoding.DER)
for trusted_cert in self._trusted_ca[cert.subject]
)

def _validate_certificate(
def _get_trusted_ca_issuer(self, cert: x509.Certificate) -> x509.Certificate | None:
for trusted_cert in self._trusted_ca[cert.issuer]:
if self._verify_certificate_signature(cert, trusted_cert):
return trusted_cert
return None

def _verify_certificate_signature(
self, cert: x509.Certificate, ca_cert: x509.Certificate
) -> bool:
try:
cert.verify_directly_issued_by(ca_cert)
return True
except Exception:
return False

def _validate_certificate_is_not_revoked_with_cache(
self, cert: x509.Certificate, ca_cert: x509.Certificate
) -> CRLValidationResult:
# validate certificate can be called multiple times with the same certificate
if cert not in self._cache_for__validate_certificate_is_not_revoked:
self._cache_for__validate_certificate_is_not_revoked[cert] = (
self._validate_certificate_is_not_revoked(cert, ca_cert)
)
return self._cache_for__validate_certificate_is_not_revoked[cert]

def _validate_certificate_is_not_revoked(
self, cert: x509.Certificate, ca_cert: x509.Certificate
) -> CRLValidationResult:
"""Validate a single certificate against CRL"""
Expand Down Expand Up @@ -343,14 +468,29 @@ def _validate_certificate(

@staticmethod
def _is_short_lived_certificate(cert: x509.Certificate) -> bool:
"""Check if certificate is short-lived (validity <= 5 days)"""
"""Check if certificate is short-lived according to CA/Browser Forum definition:
- For certificates issued on or after 15 March 2024 and prior to 15 March 2026:
validity period <= 10 days (864,000 seconds)
- For certificates issued on or after 15 March 2026:
validity period <= 7 days (604,800 seconds)
"""
try:
# Use timezone.utc versions to avoid deprecation warnings
issue_date = cert.not_valid_before_utc
validity_period = cert.not_valid_after_utc - cert.not_valid_before_utc
except AttributeError:
# Fallback for older versions
issue_date = cert.not_valid_before
validity_period = cert.not_valid_after - cert.not_valid_before
Copy link
Contributor

@sfc-gh-jkasten sfc-gh-jkasten Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The not_valid_after field is inclusive. This will calculate that the lifetime is 1 second shorter than it actually is. I think this falls on the safe side of the equation down below, (it will treat certificates as short-lived which are technically not).

RFC 5280 Section 4.1.2.5 text

The other thing you have to worry here is an underlying library trying to include leap seconds on you.
It is common in the WebPKI to give a little buffer and to stay away from the edges[design doc]. Competent CAs will always give a lot of breathing room away from the edges.

Edit: There is also some complexity around the definition of "issued_at". Most certificates are actually backdated and so browsers use the earliest SCT contained within the certificate for their definition. We should ignore that complexity though and maybe just give a bit more breathing room.

return validity_period.days <= 5

# Convert issue_date to UTC if it's not timezone-aware
if issue_date.tzinfo is None:
issue_date = issue_date.replace(tzinfo=timezone.utc)

march_15_2026 = datetime(2026, 3, 15, tzinfo=timezone.utc)
if issue_date >= march_15_2026:
return validity_period.total_seconds() <= 604800 # 7 days in seconds
return validity_period.total_seconds() <= 864000 # 10 days in seconds

@staticmethod
def _extract_crl_distribution_points(cert: x509.Certificate) -> list[str]:
Expand Down Expand Up @@ -446,7 +586,7 @@ def _check_certificate_against_crl_url(
ca_cert.subject,
crl_url,
)
# In most cases this indicates a configuration issue, but we'll still try verification
return CRLValidationResult.ERROR

if not self._verify_crl_signature(crl, ca_cert):
logger.warning("CRL signature verification failed for URL: %s", crl_url)
Expand Down Expand Up @@ -545,25 +685,16 @@ def _extract_certificate_chains_from_connection(
Returns:
List of certificate chains, where each chain is a list of x509.Certificate objects
"""
from OpenSSL.crypto import FILETYPE_ASN1, dump_certificate

try:
cert_chain = connection.get_peer_cert_chain()
# Convert OpenSSL certificates to cryptography x509 certificates
cert_chain = connection.get_peer_cert_chain(as_cryptography=True)
if not cert_chain:
logger.debug("No certificate chain found in connection")
return []

# Convert OpenSSL certificates to cryptography x509 certificates
x509_chain = []
for cert_openssl in cert_chain:
cert_der = dump_certificate(FILETYPE_ASN1, cert_openssl)
cert_x509 = x509.load_der_x509_certificate(cert_der, default_backend())
x509_chain.append(cert_x509)

logger.debug(
"Extracted %d certificates for CRL validation", len(x509_chain)
"Extracted %d certificates for CRL validation", len(cert_chain)
)
return [x509_chain] # Return as a single chain
return [cert_chain] # Return as a single chain

except Exception as e:
logger.warning(
Expand Down
24 changes: 20 additions & 4 deletions src/snowflake/connector/ssl_wrap_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from functools import wraps
from inspect import signature as _sig
from socket import socket
from typing import Any
from typing import TYPE_CHECKING, Any

import certifi
import OpenSSL.SSL
Expand All @@ -30,6 +30,9 @@
from .vendored.urllib3.contrib.pyopenssl import PyOpenSSLContext, WrappedSocket
from .vendored.urllib3.util import ssl_ as ssl_

if TYPE_CHECKING:
from cryptography import x509

DEFAULT_OCSP_MODE: OCSPMode = OCSPMode.FAIL_OPEN
FEATURE_OCSP_MODE: OCSPMode = DEFAULT_OCSP_MODE
FEATURE_ROOT_CERTS_DICT_LOCK_TIMEOUT: int = (
Expand Down Expand Up @@ -150,6 +153,17 @@ def inject_into_urllib3() -> None:
connection_.ssl_wrap_socket = ssl_wrap_socket_with_cert_revocation_checks


def _load_trusted_certificates(cafile: str | None) -> list[x509.Certificate]:
# Use default SSL context to load the CA file and get the certificates
ctx = ssl.create_default_context()
ctx.load_verify_locations(cafile=cafile)
certs = ctx.get_ca_certs(binary_form=True)
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import load_der_x509_certificate

return [load_der_x509_certificate(cert, default_backend()) for cert in certs]


@wraps(ssl_.ssl_wrap_socket)
def ssl_wrap_socket_with_cert_revocation_checks(
*args: Any, **kwargs: Any
Expand All @@ -166,12 +180,12 @@ def ssl_wrap_socket_with_cert_revocation_checks(

# Ensure PyOpenSSL context with partial-chain is used if none or wrong type provided
provided_ctx = params.get("ssl_context")
cafile_for_ctx = _resolve_cafile(params)
if not isinstance(provided_ctx, PyOpenSSLContext):
cafile_for_ctx = _resolve_cafile(params)
params["ssl_context"] = _build_context_with_partial_chain(cafile_for_ctx)
else:
# If a PyOpenSSLContext is provided, ensure it trusts the provided CA and partial-chain is enabled
_ensure_partial_chain_on_context(provided_ctx, _resolve_cafile(params))
_ensure_partial_chain_on_context(provided_ctx, cafile_for_ctx)

ret = ssl_.ssl_wrap_socket(**params)

Expand All @@ -184,7 +198,9 @@ def ssl_wrap_socket_with_cert_revocation_checks(
!= CertRevocationCheckMode.DISABLED
):
crl_validator = CRLValidator.from_config(
FEATURE_CRL_CONFIG, get_current_session_manager()
FEATURE_CRL_CONFIG,
get_current_session_manager(),
trusted_certificates=_load_trusted_certificates(cafile_for_ctx),
)
if not crl_validator.validate_connection(ret.connection):
raise OperationalError(
Expand Down
10 changes: 10 additions & 0 deletions test/integ/test_crl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,18 @@
from __future__ import annotations

import tempfile
import warnings

import pytest
from cryptography.utils import CryptographyDeprecationWarning


@pytest.fixture(autouse=True)
def _ignore_deprecation_warnings():
"""Fixture to handle deprecation warnings in all tests in this module."""
with warnings.catch_warnings():
warnings.simplefilter("ignore", CryptographyDeprecationWarning)
yield


@pytest.mark.skipolddriver
Expand Down
Loading
Loading