diff --git a/scripts/qlc_final_wiring_smoke.py b/scripts/qlc_final_wiring_smoke.py index 5d10265..a806c7e 100644 --- a/scripts/qlc_final_wiring_smoke.py +++ b/scripts/qlc_final_wiring_smoke.py @@ -58,21 +58,20 @@ def main(argv: list[str] | None = None) -> int: def _run_gateway(bundle: dict[str, Any], gateway_repo: Path, simulator_url: str, *, real_submit: bool) -> dict[str, Any]: - if gateway_repo.exists(): - sys.path.insert(0, str(gateway_repo)) try: - from fnpqnn_gateway_mvp.qlc_submit import qlc_submit + module = _load_repo_module(gateway_repo, "fnpqnn_gateway_mvp", "qlc_submit") + qlc_submit = module.qlc_submit except Exception as exc: return {"success": False, "stage": "gateway_import", "error": _compact_error(exc)} return qlc_submit(bundle, simulator_url=simulator_url, dry_run=not real_submit, timeout=5) def _run_fnp_testclient(bundle: dict[str, Any], fnp_repo: Path) -> dict[str, Any]: - if fnp_repo.exists(): - sys.path.insert(0, str(fnp_repo)) try: from fastapi.testclient import TestClient - from api.main import app + + module = _load_repo_module(fnp_repo, "api", "main") + app = module.app except Exception as exc: return {"success": False, "stage": "fnp_import", "error": _compact_error(exc)} mesh_payload = bundle["gateway_submission"]["mesh_payload"] @@ -94,6 +93,38 @@ def _run_fnp_testclient(bundle: dict[str, Any], fnp_repo: Path) -> dict[str, Any } +def _load_repo_module(repo: Path, package_name: str, module_name: str): + import importlib.util + + package_dir = repo.resolve() / package_name + init_file = package_dir / "__init__.py" + target_file = package_dir / f"{module_name}.py" + if not init_file.is_file(): + raise ImportError(f"{package_name} package not found at {init_file}") + if not target_file.is_file(): + raise ImportError(f"{package_name}.{module_name} not found at {target_file}") + + package_spec = importlib.util.spec_from_file_location( + package_name, + str(init_file), + submodule_search_locations=[str(package_dir)], + ) + if package_spec is None or package_spec.loader is None: + raise ImportError(f"Cannot load package {package_name}") + package_module = importlib.util.module_from_spec(package_spec) + sys.modules[package_name] = package_module + package_spec.loader.exec_module(package_module) + + qualified_name = f"{package_name}.{module_name}" + module_spec = importlib.util.spec_from_file_location(qualified_name, str(target_file)) + if module_spec is None or module_spec.loader is None: + raise ImportError(f"Cannot load module {qualified_name}") + module = importlib.util.module_from_spec(module_spec) + sys.modules[qualified_name] = module + module_spec.loader.exec_module(module) + return module + + def _compact_inspection(inspection: dict[str, Any]) -> dict[str, Any]: return { "success": inspection.get("success"), diff --git a/src/ffed_qlc/cli.py b/src/ffed_qlc/cli.py index d51cdd5..fafb3e6 100644 --- a/src/ffed_qlc/cli.py +++ b/src/ffed_qlc/cli.py @@ -33,18 +33,15 @@ def main(argv: list[str] | None = None) -> int: pack = sub.add_parser("pack", help="Pack a file into an authenticated QLC-style container") pack.add_argument("--input", required=True) pack.add_argument("--output", required=True) - pack.add_argument("--passphrase") pack.add_argument("--passphrase-env", default="FFED_QLC_PASSPHRASE") unpack = sub.add_parser("unpack", help="Unpack an authenticated QLC-style container") unpack.add_argument("--input", required=True) unpack.add_argument("--output", required=True) - unpack.add_argument("--passphrase") unpack.add_argument("--passphrase-env", default="FFED_QLC_PASSPHRASE") verify = sub.add_parser("verify", help="Authenticate a QLC container and print a safe manifest") verify.add_argument("--input", required=True) - verify.add_argument("--passphrase") verify.add_argument("--passphrase-env", default="FFED_QLC_PASSPHRASE") verify.add_argument("--output") verify.add_argument("--no-decrypt", action="store_true", help="Inspect manifest without authenticating plaintext") @@ -74,7 +71,6 @@ def main(argv: list[str] | None = None) -> int: yolo_pack.add_argument("--output", required=True) yolo_pack.add_argument("--source-id", required=True) yolo_pack.add_argument("--proof-output", required=True) - yolo_pack.add_argument("--passphrase") yolo_pack.add_argument("--passphrase-env", default="FFED_QLC_PASSPHRASE") yolo_pack.add_argument("--detections-json", help="YOLO detections JSON file; metadata only, no image bytes") yolo_pack.add_argument("--codeproject-url", default="http://localhost:32168") @@ -149,14 +145,14 @@ def main(argv: list[str] | None = None) -> int: return 0 if args.command == "pack": - passphrase = _resolve_passphrase(args.passphrase, args.passphrase_env) + passphrase = _resolve_passphrase(args.passphrase_env) plaintext = Path(args.input).read_bytes() Path(args.output).write_bytes(pack_bytes(plaintext, passphrase)) print(args.output) return 0 if args.command == "unpack": - passphrase = _resolve_passphrase(args.passphrase, args.passphrase_env) + passphrase = _resolve_passphrase(args.passphrase_env) container = Path(args.input).read_bytes() Path(args.output).write_bytes(unpack_bytes(container, passphrase)) print(args.output) @@ -167,7 +163,7 @@ def main(argv: list[str] | None = None) -> int: if args.no_decrypt: record = inspect_container(container) else: - passphrase = _resolve_passphrase(args.passphrase, args.passphrase_env) + passphrase = _resolve_passphrase(args.passphrase_env) record = verify_container(container, passphrase) output = json.dumps(record, indent=2, sort_keys=True) if args.output: @@ -198,7 +194,7 @@ def main(argv: list[str] | None = None) -> int: return 0 if args.command == "yolo-pack": - passphrase = _resolve_passphrase(args.passphrase, args.passphrase_env) + passphrase = _resolve_passphrase(args.passphrase_env) plaintext = Path(args.input).read_bytes() container = pack_bytes(plaintext, passphrase) Path(args.output).write_bytes(container) @@ -270,9 +266,7 @@ def main(argv: list[str] | None = None) -> int: return 2 -def _resolve_passphrase(value: str | None, env_name: str | None) -> str: - if value: - return value +def _resolve_passphrase(env_name: str | None) -> str: if env_name: env_value = os.environ.get(env_name) if env_value: diff --git a/src/ffed_qlc/key_schedule.py b/src/ffed_qlc/key_schedule.py index ef2956c..12bb74e 100644 --- a/src/ffed_qlc/key_schedule.py +++ b/src/ffed_qlc/key_schedule.py @@ -49,10 +49,20 @@ def _manifest_from_container(container: bytes) -> Mapping[str, Any]: if not container.startswith(MAGIC): raise ValueError("invalid QLC container magic") offset = len(MAGIC) + if len(container) < offset + HEADER_LENGTH_BYTES: + raise ValueError("truncated QLC container header") header_length = int.from_bytes(container[offset : offset + HEADER_LENGTH_BYTES], "big") header_start = offset + HEADER_LENGTH_BYTES header_end = header_start + header_length - header = json.loads(container[header_start:header_end].decode("utf-8")) + if len(container) < header_end: + raise ValueError("truncated QLC container body") + try: + header_str = container[header_start:header_end].decode("utf-8") + header = json.loads(header_str) + except (UnicodeDecodeError, json.JSONDecodeError) as exc: + raise ValueError("invalid QLC container header JSON") from exc + if not isinstance(header, dict): + raise ValueError("QLC container header must be a JSON object") manifest = header.get("qlc_manifest") if not isinstance(manifest, Mapping): raise ValueError("QLC container has no qlc_manifest") diff --git a/src/ffed_qlc/structural_transform.py b/src/ffed_qlc/structural_transform.py index 4a2763c..9a3bdf1 100644 --- a/src/ffed_qlc/structural_transform.py +++ b/src/ffed_qlc/structural_transform.py @@ -268,6 +268,7 @@ def _encode_header(header: dict[str, object]) -> bytes: return json.dumps(header, sort_keys=True, separators=(",", ":")).encode("utf-8") + def _split_container(container: bytes) -> tuple[dict[str, object], bytes, bytes]: if not container.startswith(MAGIC): raise QLCTransformError("invalid QLC container magic") @@ -281,8 +282,11 @@ def _split_container(container: bytes) -> tuple[dict[str, object], bytes, bytes] raise QLCTransformError("truncated QLC container body") header_bytes = container[header_start:header_end] try: - header = json.loads(header_bytes.decode("utf-8")) - except json.JSONDecodeError as exc: + header_str = header_bytes.decode("utf-8") + header = json.loads(header_str) + if not isinstance(header, dict): + raise QLCTransformError("QLC header must be a JSON object") + except (UnicodeDecodeError, json.JSONDecodeError) as exc: raise QLCTransformError("invalid QLC container header JSON") from exc _validate_header(header) return header, header_bytes, container[header_end:] @@ -304,11 +308,11 @@ def _validate_header(header: dict[str, object]) -> None: missing = required.difference(header) if missing: raise QLCTransformError(f"missing QLC header fields: {', '.join(sorted(missing))}") - if header["version"] != 1: + if header.get("version") != 1: raise QLCTransformError("unsupported QLC container version") - if header["transform"] != "phi_cut_project_permutation_v1": + if header.get("transform") != "phi_cut_project_permutation_v1": raise QLCTransformError("unsupported QLC transform") - if header["cipher"] != "ChaCha20-Poly1305" or header["kdf"] != "scrypt": + if header.get("cipher") != "ChaCha20-Poly1305" or header.get("kdf") != "scrypt": raise QLCTransformError("unsupported QLC cryptographic profile") diff --git a/src/ffed_qlc/telemetry.py b/src/ffed_qlc/telemetry.py index e327b48..58291bc 100644 --- a/src/ffed_qlc/telemetry.py +++ b/src/ffed_qlc/telemetry.py @@ -1,6 +1,7 @@ from __future__ import annotations import os +import re import socket from typing import Any, Mapping @@ -16,11 +17,19 @@ } +def _sanitize(value: str, *, limit: int = 120) -> str: + cleaned = re.sub(r"[^A-Za-z0-9_.\-/]", "_", str(value)).strip("_.-/") + cleaned = re.sub(r"_+", "_", cleaned) + return (cleaned[:limit] or "unknown") + + def emit_dogstatsd_counter(name: str, value: int = 1, tags: tuple[str, ...] = ()) -> None: host = os.environ.get("DD_DOGSTATSD_HOST", "127.0.0.1") port = int(os.environ.get("DD_DOGSTATSD_PORT", "8125")) - tag_suffix = f"|#{','.join(tags)}" if tags else "" - payload = f"{name}:{value}|c{tag_suffix}".encode("utf-8") + safe_name = _sanitize(name, limit=200) + safe_tags = [_sanitize(t) for t in tags] + tag_suffix = f"|#{','.join(safe_tags)}" if safe_tags else "" + payload = f"{safe_name}:{value}|c{tag_suffix}".encode("utf-8") with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: sock.sendto(payload, (host, port)) diff --git a/tests/test_key_schedule.py b/tests/test_key_schedule.py index 2b94dd4..3cb17ae 100644 --- a/tests/test_key_schedule.py +++ b/tests/test_key_schedule.py @@ -1,4 +1,7 @@ +import pytest + from ffed_qlc import derive_chunk_key_schedule, inspect_container, pack_bytes +from ffed_qlc.key_schedule import HEADER_LENGTH_BYTES, MAGIC def test_chunk_key_schedule_is_deterministic_for_manifest() -> None: @@ -45,3 +48,11 @@ def test_container_manifest_includes_chunk_key_schedule() -> None: assert manifest["chunk_key_schedule"]["schema"] == "ffed.qlc.granular_chunk_key_schedule.v1" assert manifest["chunk_key_schedule"]["chunks"][0]["key_material_exposed"] is False assert derive_chunk_key_schedule(container, 1) == manifest["chunk_key_schedule"] + + +def test_manifest_extraction_rejects_malformed_header_json() -> None: + header = b"\xff" + container = MAGIC + len(header).to_bytes(HEADER_LENGTH_BYTES, "big") + header + + with pytest.raises(ValueError, match="invalid QLC container header JSON"): + derive_chunk_key_schedule(container, 1) diff --git a/tests/test_structural_transform.py b/tests/test_structural_transform.py index 065175b..fa4a299 100644 --- a/tests/test_structural_transform.py +++ b/tests/test_structural_transform.py @@ -3,6 +3,7 @@ import pytest from ffed_qlc import QLCTransformError, inspect_container, pack_bytes, quasicrystal_coordinates, unpack_bytes, verify_container +from ffed_qlc.structural_transform import HEADER_LENGTH_BYTES, MAGIC def test_pack_unpack_round_trip() -> None: @@ -21,6 +22,22 @@ def test_unpack_rejects_wrong_passphrase() -> None: unpack_bytes(container, "wrong-passphrase") +def test_inspect_container_rejects_malformed_non_utf8_header() -> None: + header = b"\xff\xff" + container = MAGIC + len(header).to_bytes(HEADER_LENGTH_BYTES, "big") + header + + with pytest.raises(QLCTransformError, match="invalid QLC container header JSON"): + inspect_container(container) + + +def test_inspect_container_rejects_non_object_header() -> None: + header = b"[]" + container = MAGIC + len(header).to_bytes(HEADER_LENGTH_BYTES, "big") + header + + with pytest.raises(QLCTransformError, match="QLC header must be a JSON object"): + inspect_container(container) + + def test_inspect_container_returns_public_safe_manifest() -> None: container = pack_bytes(b"secret", "passphrase") diff --git a/tests/test_workflow.py b/tests/test_workflow.py index 0098c3b..8dcfab0 100644 --- a/tests/test_workflow.py +++ b/tests/test_workflow.py @@ -8,6 +8,7 @@ emit_qlc_workflow_counter, pack_bytes, ) +from ffed_qlc.telemetry import _sanitize as sanitize_metric_token def _container() -> bytes: @@ -113,3 +114,14 @@ def fake_emit(name: str, value: int = 1, tags: tuple[str, ...] = ()) -> None: assert emitted[0][0] == "ffed_qlc.workflow.accepted" assert "qlc_schema:ffed.qlc.protection_workflow_bundle.v1" in emitted[0][2] assert "simulator_status:ok" in emitted[0][2] + + +def test_dogstatsd_sanitizer_rejects_injection_delimiters_and_bounds_length() -> None: + unsafe = "metric|c\nsecond:tag,another" + ("x" * 200) + safe = sanitize_metric_token(unsafe) + + assert "|" not in safe + assert "\n" not in safe + assert ":" not in safe + assert "," not in safe + assert len(safe) <= 120