diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 4d6f98597..0562c7a6f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -6,6 +6,14 @@ dcicutils Change Log ---------- +8.18.5 +====== +* wrr / 2026-07-01 / branch: fm/sec-fix-util1 / PR-332 + - Fixed Zip Slip/Tar Slip path traversal in zip_utils.py archive extraction. + - Redacted credentials from AuthenticationError exception messages in ff_utils.py. + - Expanded credential-redaction regex in obfuscation_utils.py to cover token/api_key/authorization/bearer/jwt/private_key. + + 8.18.4 ====== * ajs / 2025-09-30 / branch: ajs_upd_es_metadata_fxns_250925 / PR-330 diff --git a/dcicutils/ff_utils.py b/dcicutils/ff_utils.py index 5fc077ab5..3967d821b 100644 --- a/dcicutils/ff_utils.py +++ b/dcicutils/ff_utils.py @@ -1338,10 +1338,28 @@ class UnifiedAuthenticator: class AuthenticationError(Exception): + @staticmethod + def _redact_auth(auth: Optional[AnyAuthData]) -> Optional[AnyAuthData]: + # auth is often the caller's actual (key, secret) credential; it must never be interpolated + # verbatim into an exception message, since that message tends to end up in logs + # (CloudWatch, Sentry, Foursight check output, bare `print(exc)`/`logger.exception`, etc.) + # Note this redacts ALL dict values (not just ones matching an "is this sensitive" key-name + # heuristic like obfuscate_json uses elsewhere), since the whole auth argument is, by + # definition, credential material -- including whichever value the caller put under an + # unexpected/misspelled key, which is exactly the malformed-auth case that reaches here. + if isinstance(auth, dict): + return {key: "" for key in auth} + elif isinstance(auth, (tuple, list)) and len(auth) == 2: + return type(auth)(("", "")) + elif auth is None: + return None + else: + return f"" + def __init__(self, message: str, auth: Optional[AnyAuthData], ff_env: Optional[PortalEnvName]): self.auth = auth self.ff_env = ff_env - super().__init__(f"{message} You gave auth={auth}, ff_env={ff_env}") + super().__init__(f"{message} You gave auth={self._redact_auth(auth)}, ff_env={ff_env}") @classmethod def unified_authentication(cls, auth: Optional[AnyAuthDict] = None, diff --git a/dcicutils/obfuscation_utils.py b/dcicutils/obfuscation_utils.py index b51491137..e026aacf1 100644 --- a/dcicutils/obfuscation_utils.py +++ b/dcicutils/obfuscation_utils.py @@ -10,16 +10,30 @@ # The _SENSITIVE_KEY_NAMES_REGEX regex defines key names representing sensitive values, case-insensitive. # Note the below 'crypt(?!_key_id$)' regex matches any thing with 'crypt' except for 'crypt_key_id'. +# +# NOTE: token/authoriz(e|ation)/api(_)?key/access(_)?key/bearer/jwt/private(_)?key were added because, +# without them, this (and callers like trace_utils.Trace's TRACE_REDACT logic, which exists specifically +# to keep credentials out of logs) would silently fail to redact common credential shapes such as an +# "Authorization: Bearer " header, an "api_key", or an "access_token" -- letting live secrets +# leak into logs/traces even though the obfuscation machinery is in place and believed to be protecting +# against exactly that. _SENSITIVE_KEY_NAMES_REGEX = re.compile( r""" .*( - password | - passwd | - secret | - secrt | - scret | - session.*token | - session.*id | + password | + passwd | + secret | + secrt | + scret | + session.*token | + session.*id | + token | + authoriz(e|ation) | + api.?key | + access.?key | + bearer | + jwt | + private.?key | crypt(?!_key_id$) ).* """, re.VERBOSE | re.IGNORECASE) @@ -28,7 +42,8 @@ def should_obfuscate(key: str, value: Any = None) -> bool: """ Returns True if the given key looks as if it represents a sensitive value. - Just sees if it contains "secret" or "password" or "crypt" some obvious variants, + Just sees if it contains "secret", "password", "crypt", "token", "authorization", + "api_key", "access_key", "bearer", "jwt", "private_key", or some obvious variants, case-insensitive; i.e. whatever is in the _SENSITIVE_KEY_NAMES_REGEX list containing regular expressions; add more to if/when needed. diff --git a/dcicutils/zip_utils.py b/dcicutils/zip_utils.py index 93c5cfef7..8472d1be7 100644 --- a/dcicutils/zip_utils.py +++ b/dcicutils/zip_utils.py @@ -9,10 +9,22 @@ import zipfile +def _assert_within_directory(member_path: str, target_directory: str) -> None: + # Guards against "zip slip"/"tar slip" path traversal, where a malicious archive + # entry (e.g. named "../../etc/cron.d/evil" or with an absolute path) would + # otherwise be extracted outside of the intended target directory. + target_directory = os.path.realpath(target_directory) + resolved_path = os.path.realpath(os.path.join(target_directory, member_path)) + if os.path.commonpath([target_directory, resolved_path]) != target_directory: + raise ValueError(f"Refusing to extract archive member outside of target directory: {member_path}") + + @contextmanager def unpack_zip_file_to_temporary_directory(file: str) -> str: with temporary_directory() as tmp_directory_name: with zipfile.ZipFile(file, "r") as zipf: + for member in zipf.namelist(): + _assert_within_directory(member, tmp_directory_name) zipf.extractall(tmp_directory_name) yield tmp_directory_name @@ -21,7 +33,11 @@ def unpack_zip_file_to_temporary_directory(file: str) -> str: def unpack_tar_file_to_temporary_directory(file: str) -> str: with temporary_directory() as tmp_directory_name: with tarfile.open(file, "r") as tarf: - tarf.extractall(tmp_directory_name) + for member in tarf.getmembers(): + _assert_within_directory(member.name, tmp_directory_name) + # The "data" filter (PEP 706) additionally rejects unsafe member types/permissions + # (e.g. device files, symlinks escaping the target, setuid bits) on extraction. + tarf.extractall(tmp_directory_name, filter="data") yield tmp_directory_name diff --git a/pyproject.toml b/pyproject.toml index 7e62db89c..fe9c3528b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dcicutils" -version = "8.18.4" +version = "8.18.5" description = "Utility package for interacting with the 4DN Data Portal and other 4DN resources" authors = ["4DN-DCIC Team "] license = "MIT" diff --git a/test/test_ff_utils.py b/test/test_ff_utils.py index b221ae693..3955abf9d 100644 --- a/test/test_ff_utils.py +++ b/test/test_ff_utils.py @@ -348,6 +348,28 @@ def test_unified_authentication_unit(): ff_utils.unified_authentication(None, None) +def test_unified_authenticator_authentication_error_redacts_auth_in_message(): + # A malformed-but-truthy auth (e.g. a misspelled 'key' field) reaches AuthenticationError with + # the caller's actual credential material. That credential must never appear in the exception's + # message text (which tends to end up in logs/Sentry/Foursight check output), even though the + # original value must still be available programmatically via the .auth attribute. + ts = TestScenarios + bad_auth = ts.some_badly_formed_auth_dict + assert ts.some_auth_key in bad_auth.values() and ts.some_auth_secret in bad_auth.values() + + with mock.patch.object(s3_utils, "s3Utils") as MockS3Utils: + MockS3Utils.side_effect = AssertionError("s3Utils() should not be used for this locally-checkable auth.") + try: + ff_utils.unified_authentication(bad_auth, ts.foo_env) + raise AssertionError("Expected an AuthenticationError to be raised.") + except ff_utils.UnifiedAuthenticator.AuthenticationError as exc: + message = str(exc) + assert ts.some_auth_key not in message + assert ts.some_auth_secret not in message + # The original (unredacted) auth remains available on the exception for legitimate use. + assert exc.auth == bad_auth + + # Integration tests @pytest.mark.integratedx diff --git a/test/test_obfuscation_utils.py b/test/test_obfuscation_utils.py index 58477d364..7b2a3e56b 100644 --- a/test/test_obfuscation_utils.py +++ b/test/test_obfuscation_utils.py @@ -25,6 +25,20 @@ def test_should_obfuscate() -> None: assert_should_obfuscate_with_different_cases("crypt_key_id", False) assert_should_obfuscate_with_different_cases("not_a_s3cret_foo", False) + # Common HTTP auth credential key shapes that must be redacted (previously silently NOT matched). + assert_should_obfuscate_with_different_cases("token", True) + assert_should_obfuscate_with_different_cases("access_token", True) + assert_should_obfuscate_with_different_cases("authorization", True) + assert_should_obfuscate_with_different_cases("Authorization", True) + assert_should_obfuscate_with_different_cases("api_key", True) + assert_should_obfuscate_with_different_cases("apikey", True) + assert_should_obfuscate_with_different_cases("access_key", True) + assert_should_obfuscate_with_different_cases("accesskey", True) + assert_should_obfuscate_with_different_cases("bearer", True) + assert_should_obfuscate_with_different_cases("jwt", True) + assert_should_obfuscate_with_different_cases("private_key", True) + assert_should_obfuscate_with_different_cases("privatekey", True) + # Edge cases that are really soft errors... assert should_obfuscate(None) is False # NoQA - Argument is not intended, but function returns False assert should_obfuscate(17) is False # NoQA - ditto @@ -172,3 +186,25 @@ def test_obfuscate_json_with_tuple(): x = obfuscate_json(d, obfuscated="") assert x == o assert d == d_copy + + +def test_obfuscate_json_with_http_auth_credential_shapes(): + # End-to-end: a realistic log/trace payload carrying common HTTP auth credential shapes + # (as would flow through trace_utils.Trace's TRACE_REDACT) must have all of them scrubbed. + d = { + "url": "https://example.com/api", + "headers": {"Authorization": "Bearer abc123.def456.ghi789", "Content-Type": "application/json"}, + "api_key": "live_sk_1234567890", + "access_token": "ya29.live-access-token-value", + "private_key": "-----BEGIN PRIVATE KEY-----abc-----END PRIVATE KEY-----", + } + d_copy = copy.deepcopy(d) + x = obfuscate_json(d, obfuscated="") + assert x == { + "url": "https://example.com/api", + "headers": {"Authorization": "", "Content-Type": "application/json"}, + "api_key": "", + "access_token": "", + "private_key": "", + } + assert d == d_copy # original not mutated diff --git a/test/test_zip_utils.py b/test/test_zip_utils.py new file mode 100644 index 000000000..764e045d1 --- /dev/null +++ b/test/test_zip_utils.py @@ -0,0 +1,80 @@ +import io +import os +import tarfile +import zipfile + +import pytest + +from dcicutils.tmpfile_utils import temporary_directory +from dcicutils.zip_utils import ( + unpack_zip_file_to_temporary_directory, unpack_tar_file_to_temporary_directory, +) + + +def _make_zip_with_member(zip_path: str, member_name: str, content: bytes = b"data") -> None: + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr(member_name, content) + + +def _make_tar_with_member(tar_path: str, member_name: str, content: bytes = b"data") -> None: + with tarfile.open(tar_path, "w") as tf: + info = tarfile.TarInfo(name=member_name) + info.size = len(content) + tf.addfile(info, io.BytesIO(content)) + + +def test_unpack_zip_file_to_temporary_directory_rejects_path_traversal(): + # Zip Slip: a malicious archive entry named with ../ segments must not be extracted + # outside of the target directory, even though zipfile.extractall would otherwise allow it. + with temporary_directory() as work_dir: + zip_path = os.path.join(work_dir, "evil.zip") + _make_zip_with_member(zip_path, "../../../tmp/dcicutils_zip_slip_poc.txt", b"pwned") + with pytest.raises(ValueError): + with unpack_zip_file_to_temporary_directory(zip_path): + pass + # The malicious path must never have landed on disk anywhere. + assert not os.path.exists("/tmp/dcicutils_zip_slip_poc.txt") + + +def test_unpack_zip_file_to_temporary_directory_rejects_absolute_path(): + with temporary_directory() as work_dir: + zip_path = os.path.join(work_dir, "evil_abs.zip") + _make_zip_with_member(zip_path, "/tmp/dcicutils_zip_slip_abs_poc.txt", b"pwned") + with pytest.raises(ValueError): + with unpack_zip_file_to_temporary_directory(zip_path): + pass + assert not os.path.exists("/tmp/dcicutils_zip_slip_abs_poc.txt") + + +def test_unpack_zip_file_to_temporary_directory_extracts_benign_archive(): + # Legitimate archives must still work normally after the guard is added. + with temporary_directory() as work_dir: + zip_path = os.path.join(work_dir, "benign.zip") + _make_zip_with_member(zip_path, "subdir/hello.txt", b"hello world") + with unpack_zip_file_to_temporary_directory(zip_path) as tmp_dir: + extracted_path = os.path.join(tmp_dir, "subdir", "hello.txt") + assert os.path.exists(extracted_path) + with open(extracted_path, "rb") as f: + assert f.read() == b"hello world" + + +def test_unpack_tar_file_to_temporary_directory_rejects_path_traversal(): + # Tar Slip: same attack, via tarfile.extractall. + with temporary_directory() as work_dir: + tar_path = os.path.join(work_dir, "evil.tar") + _make_tar_with_member(tar_path, "../../../tmp/dcicutils_tar_slip_poc.txt", b"pwned") + with pytest.raises(ValueError): + with unpack_tar_file_to_temporary_directory(tar_path): + pass + assert not os.path.exists("/tmp/dcicutils_tar_slip_poc.txt") + + +def test_unpack_tar_file_to_temporary_directory_extracts_benign_archive(): + with temporary_directory() as work_dir: + tar_path = os.path.join(work_dir, "benign.tar") + _make_tar_with_member(tar_path, "subdir/hello.txt", b"hello world") + with unpack_tar_file_to_temporary_directory(tar_path) as tmp_dir: + extracted_path = os.path.join(tmp_dir, "subdir", "hello.txt") + assert os.path.exists(extracted_path) + with open(extracted_path, "rb") as f: + assert f.read() == b"hello world"