Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ dcicutils
Change Log
----------

8.18.5
======
* wrr / 2026-07-01 / branch: fm/sec-fix-util1 / PR-332
- Fixed Zip Slip/Tar Slip path traversal in zip_utils.py archive extraction.
- Redacted credentials from AuthenticationError exception messages in ff_utils.py.
- Expanded credential-redaction regex in obfuscation_utils.py to cover token/api_key/authorization/bearer/jwt/private_key.


8.18.4
======
* ajs / 2025-09-30 / branch: ajs_upd_es_metadata_fxns_250925 / PR-330
Expand Down
20 changes: 19 additions & 1 deletion dcicutils/ff_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1338,10 +1338,28 @@ class UnifiedAuthenticator:

class AuthenticationError(Exception):

@staticmethod
def _redact_auth(auth: Optional[AnyAuthData]) -> Optional[AnyAuthData]:
# auth is often the caller's actual (key, secret) credential; it must never be interpolated
# verbatim into an exception message, since that message tends to end up in logs
# (CloudWatch, Sentry, Foursight check output, bare `print(exc)`/`logger.exception`, etc.)
# Note this redacts ALL dict values (not just ones matching an "is this sensitive" key-name
# heuristic like obfuscate_json uses elsewhere), since the whole auth argument is, by
# definition, credential material -- including whichever value the caller put under an
# unexpected/misspelled key, which is exactly the malformed-auth case that reaches here.
if isinstance(auth, dict):
return {key: "<REDACTED>" for key in auth}
elif isinstance(auth, (tuple, list)) and len(auth) == 2:
return type(auth)(("<REDACTED>", "<REDACTED>"))
elif auth is None:
return None
else:
return f"<REDACTED {type(auth).__name__}>"

def __init__(self, message: str, auth: Optional[AnyAuthData], ff_env: Optional[PortalEnvName]):
self.auth = auth
self.ff_env = ff_env
super().__init__(f"{message} You gave auth={auth}, ff_env={ff_env}")
super().__init__(f"{message} You gave auth={self._redact_auth(auth)}, ff_env={ff_env}")

@classmethod
def unified_authentication(cls, auth: Optional[AnyAuthDict] = None,
Expand Down
31 changes: 23 additions & 8 deletions dcicutils/obfuscation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,30 @@

# The _SENSITIVE_KEY_NAMES_REGEX regex defines key names representing sensitive values, case-insensitive.
# Note the below 'crypt(?!_key_id$)' regex matches any thing with 'crypt' except for 'crypt_key_id'.
#
# NOTE: token/authoriz(e|ation)/api(_)?key/access(_)?key/bearer/jwt/private(_)?key were added because,
# without them, this (and callers like trace_utils.Trace's TRACE_REDACT logic, which exists specifically
# to keep credentials out of logs) would silently fail to redact common credential shapes such as an
# "Authorization: Bearer <token>" header, an "api_key", or an "access_token" -- letting live secrets
# leak into logs/traces even though the obfuscation machinery is in place and believed to be protecting
# against exactly that.
_SENSITIVE_KEY_NAMES_REGEX = re.compile(
r"""
.*(
password |
passwd |
secret |
secrt |
scret |
session.*token |
session.*id |
password |
passwd |
secret |
secrt |
scret |
session.*token |
session.*id |
token |
authoriz(e|ation) |
api.?key |
access.?key |
bearer |
jwt |
private.?key |
crypt(?!_key_id$)
).*
""", re.VERBOSE | re.IGNORECASE)
Expand All @@ -28,7 +42,8 @@
def should_obfuscate(key: str, value: Any = None) -> bool:
"""
Returns True if the given key looks as if it represents a sensitive value.
Just sees if it contains "secret" or "password" or "crypt" some obvious variants,
Just sees if it contains "secret", "password", "crypt", "token", "authorization",
"api_key", "access_key", "bearer", "jwt", "private_key", or some obvious variants,
case-insensitive; i.e. whatever is in the _SENSITIVE_KEY_NAMES_REGEX list
containing regular expressions; add more to if/when needed.

Expand Down
18 changes: 17 additions & 1 deletion dcicutils/zip_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,22 @@
import zipfile


def _assert_within_directory(member_path: str, target_directory: str) -> None:
# Guards against "zip slip"/"tar slip" path traversal, where a malicious archive
# entry (e.g. named "../../etc/cron.d/evil" or with an absolute path) would
# otherwise be extracted outside of the intended target directory.
target_directory = os.path.realpath(target_directory)
resolved_path = os.path.realpath(os.path.join(target_directory, member_path))
if os.path.commonpath([target_directory, resolved_path]) != target_directory:
raise ValueError(f"Refusing to extract archive member outside of target directory: {member_path}")


@contextmanager
def unpack_zip_file_to_temporary_directory(file: str) -> str:
with temporary_directory() as tmp_directory_name:
with zipfile.ZipFile(file, "r") as zipf:
for member in zipf.namelist():
_assert_within_directory(member, tmp_directory_name)
zipf.extractall(tmp_directory_name)
yield tmp_directory_name

Expand All @@ -21,7 +33,11 @@ def unpack_zip_file_to_temporary_directory(file: str) -> str:
def unpack_tar_file_to_temporary_directory(file: str) -> str:
with temporary_directory() as tmp_directory_name:
with tarfile.open(file, "r") as tarf:
tarf.extractall(tmp_directory_name)
for member in tarf.getmembers():
_assert_within_directory(member.name, tmp_directory_name)
# The "data" filter (PEP 706) additionally rejects unsafe member types/permissions
# (e.g. device files, symlinks escaping the target, setuid bits) on extraction.
tarf.extractall(tmp_directory_name, filter="data")
yield tmp_directory_name


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dcicutils"
version = "8.18.4"
version = "8.18.5"
description = "Utility package for interacting with the 4DN Data Portal and other 4DN resources"
authors = ["4DN-DCIC Team <support@4dnucleome.org>"]
license = "MIT"
Expand Down
22 changes: 22 additions & 0 deletions test/test_ff_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,28 @@ def test_unified_authentication_unit():
ff_utils.unified_authentication(None, None)


def test_unified_authenticator_authentication_error_redacts_auth_in_message():
# A malformed-but-truthy auth (e.g. a misspelled 'key' field) reaches AuthenticationError with
# the caller's actual credential material. That credential must never appear in the exception's
# message text (which tends to end up in logs/Sentry/Foursight check output), even though the
# original value must still be available programmatically via the .auth attribute.
ts = TestScenarios
bad_auth = ts.some_badly_formed_auth_dict
assert ts.some_auth_key in bad_auth.values() and ts.some_auth_secret in bad_auth.values()

with mock.patch.object(s3_utils, "s3Utils") as MockS3Utils:
MockS3Utils.side_effect = AssertionError("s3Utils() should not be used for this locally-checkable auth.")
try:
ff_utils.unified_authentication(bad_auth, ts.foo_env)
raise AssertionError("Expected an AuthenticationError to be raised.")
except ff_utils.UnifiedAuthenticator.AuthenticationError as exc:
message = str(exc)
assert ts.some_auth_key not in message
assert ts.some_auth_secret not in message
# The original (unredacted) auth remains available on the exception for legitimate use.
assert exc.auth == bad_auth


# Integration tests

@pytest.mark.integratedx
Expand Down
36 changes: 36 additions & 0 deletions test/test_obfuscation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ def test_should_obfuscate() -> None:
assert_should_obfuscate_with_different_cases("crypt_key_id", False)
assert_should_obfuscate_with_different_cases("not_a_s3cret_foo", False)

# Common HTTP auth credential key shapes that must be redacted (previously silently NOT matched).
assert_should_obfuscate_with_different_cases("token", True)
assert_should_obfuscate_with_different_cases("access_token", True)
assert_should_obfuscate_with_different_cases("authorization", True)
assert_should_obfuscate_with_different_cases("Authorization", True)
assert_should_obfuscate_with_different_cases("api_key", True)
assert_should_obfuscate_with_different_cases("apikey", True)
assert_should_obfuscate_with_different_cases("access_key", True)
assert_should_obfuscate_with_different_cases("accesskey", True)
assert_should_obfuscate_with_different_cases("bearer", True)
assert_should_obfuscate_with_different_cases("jwt", True)
assert_should_obfuscate_with_different_cases("private_key", True)
assert_should_obfuscate_with_different_cases("privatekey", True)

# Edge cases that are really soft errors...
assert should_obfuscate(None) is False # NoQA - Argument is not intended, but function returns False
assert should_obfuscate(17) is False # NoQA - ditto
Expand Down Expand Up @@ -172,3 +186,25 @@ def test_obfuscate_json_with_tuple():
x = obfuscate_json(d, obfuscated="<REDACTED>")
assert x == o
assert d == d_copy


def test_obfuscate_json_with_http_auth_credential_shapes():
# End-to-end: a realistic log/trace payload carrying common HTTP auth credential shapes
# (as would flow through trace_utils.Trace's TRACE_REDACT) must have all of them scrubbed.
d = {
"url": "https://example.com/api",
"headers": {"Authorization": "Bearer abc123.def456.ghi789", "Content-Type": "application/json"},
"api_key": "live_sk_1234567890",
"access_token": "ya29.live-access-token-value",
"private_key": "-----BEGIN PRIVATE KEY-----abc-----END PRIVATE KEY-----",
}
d_copy = copy.deepcopy(d)
x = obfuscate_json(d, obfuscated="<REDACTED>")
assert x == {
"url": "https://example.com/api",
"headers": {"Authorization": "<REDACTED>", "Content-Type": "application/json"},
"api_key": "<REDACTED>",
"access_token": "<REDACTED>",
"private_key": "<REDACTED>",
}
assert d == d_copy # original not mutated
80 changes: 80 additions & 0 deletions test/test_zip_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import io
import os
import tarfile
import zipfile

import pytest

from dcicutils.tmpfile_utils import temporary_directory
from dcicutils.zip_utils import (
unpack_zip_file_to_temporary_directory, unpack_tar_file_to_temporary_directory,
)


def _make_zip_with_member(zip_path: str, member_name: str, content: bytes = b"data") -> None:
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr(member_name, content)


def _make_tar_with_member(tar_path: str, member_name: str, content: bytes = b"data") -> None:
with tarfile.open(tar_path, "w") as tf:
info = tarfile.TarInfo(name=member_name)
info.size = len(content)
tf.addfile(info, io.BytesIO(content))


def test_unpack_zip_file_to_temporary_directory_rejects_path_traversal():
# Zip Slip: a malicious archive entry named with ../ segments must not be extracted
# outside of the target directory, even though zipfile.extractall would otherwise allow it.
with temporary_directory() as work_dir:
zip_path = os.path.join(work_dir, "evil.zip")
_make_zip_with_member(zip_path, "../../../tmp/dcicutils_zip_slip_poc.txt", b"pwned")
with pytest.raises(ValueError):
with unpack_zip_file_to_temporary_directory(zip_path):
pass
# The malicious path must never have landed on disk anywhere.
assert not os.path.exists("/tmp/dcicutils_zip_slip_poc.txt")


def test_unpack_zip_file_to_temporary_directory_rejects_absolute_path():
with temporary_directory() as work_dir:
zip_path = os.path.join(work_dir, "evil_abs.zip")
_make_zip_with_member(zip_path, "/tmp/dcicutils_zip_slip_abs_poc.txt", b"pwned")
with pytest.raises(ValueError):
with unpack_zip_file_to_temporary_directory(zip_path):
pass
assert not os.path.exists("/tmp/dcicutils_zip_slip_abs_poc.txt")


def test_unpack_zip_file_to_temporary_directory_extracts_benign_archive():
# Legitimate archives must still work normally after the guard is added.
with temporary_directory() as work_dir:
zip_path = os.path.join(work_dir, "benign.zip")
_make_zip_with_member(zip_path, "subdir/hello.txt", b"hello world")
with unpack_zip_file_to_temporary_directory(zip_path) as tmp_dir:
extracted_path = os.path.join(tmp_dir, "subdir", "hello.txt")
assert os.path.exists(extracted_path)
with open(extracted_path, "rb") as f:
assert f.read() == b"hello world"


def test_unpack_tar_file_to_temporary_directory_rejects_path_traversal():
# Tar Slip: same attack, via tarfile.extractall.
with temporary_directory() as work_dir:
tar_path = os.path.join(work_dir, "evil.tar")
_make_tar_with_member(tar_path, "../../../tmp/dcicutils_tar_slip_poc.txt", b"pwned")
with pytest.raises(ValueError):
with unpack_tar_file_to_temporary_directory(tar_path):
pass
assert not os.path.exists("/tmp/dcicutils_tar_slip_poc.txt")


def test_unpack_tar_file_to_temporary_directory_extracts_benign_archive():
with temporary_directory() as work_dir:
tar_path = os.path.join(work_dir, "benign.tar")
_make_tar_with_member(tar_path, "subdir/hello.txt", b"hello world")
with unpack_tar_file_to_temporary_directory(tar_path) as tmp_dir:
extracted_path = os.path.join(tmp_dir, "subdir", "hello.txt")
assert os.path.exists(extracted_path)
with open(extracted_path, "rb") as f:
assert f.read() == b"hello world"
Loading