|
| 1 | +import logging |
| 2 | +from unittest.mock import MagicMock |
| 3 | + |
| 4 | +from src.logging_levels import TRACE_LEVEL |
| 5 | +import src.writer_eventbridge as we |
| 6 | +import src.writer_kafka as wk |
| 7 | +import src.writer_postgres as wp |
| 8 | + |
| 9 | + |
| 10 | +def test_trace_eventbridge(caplog): |
| 11 | + logger = logging.getLogger("trace.eventbridge") |
| 12 | + logger.setLevel(TRACE_LEVEL) |
| 13 | + we.STATE["logger"] = logger |
| 14 | + we.STATE["event_bus_arn"] = "arn:aws:events:region:acct:event-bus/test" |
| 15 | + mock_client = MagicMock() |
| 16 | + mock_client.put_events.return_value = {"FailedEntryCount": 0, "Entries": []} |
| 17 | + we.STATE["client"] = mock_client |
| 18 | + caplog.set_level(TRACE_LEVEL) |
| 19 | + ok, err = we.write("topic.eb", {"k": 1}) |
| 20 | + assert ok and err is None |
| 21 | + assert any("EventBridge payload" in rec.message for rec in caplog.records) |
| 22 | + |
| 23 | + |
| 24 | +def test_trace_kafka(caplog): |
| 25 | + class FakeProducer: |
| 26 | + def produce(self, *a, **kw): |
| 27 | + cb = kw.get("callback") |
| 28 | + if cb: |
| 29 | + cb(None, object()) |
| 30 | + |
| 31 | + def flush(self, *a, **kw): # noqa: D401 |
| 32 | + return 0 |
| 33 | + |
| 34 | + logger = logging.getLogger("trace.kafka") |
| 35 | + logger.setLevel(TRACE_LEVEL) |
| 36 | + wk.STATE["logger"] = logger |
| 37 | + wk.STATE["producer"] = FakeProducer() |
| 38 | + caplog.set_level(TRACE_LEVEL) |
| 39 | + ok, err = wk.write("topic.kf", {"k": 2}) |
| 40 | + assert ok and err is None |
| 41 | + assert any("Kafka payload" in rec.message for rec in caplog.records) |
| 42 | + |
| 43 | + |
| 44 | +def test_trace_postgres(caplog, monkeypatch): |
| 45 | + # Prepare dummy psycopg2 connection machinery |
| 46 | + store = [] |
| 47 | + |
| 48 | + class DummyCursor: |
| 49 | + def execute(self, sql, params): |
| 50 | + store.append((sql, params)) |
| 51 | + |
| 52 | + def __enter__(self): |
| 53 | + return self |
| 54 | + |
| 55 | + def __exit__(self, exc_type, exc, tb): |
| 56 | + return False |
| 57 | + |
| 58 | + class DummyConnection: |
| 59 | + def cursor(self): |
| 60 | + return DummyCursor() |
| 61 | + |
| 62 | + def commit(self): |
| 63 | + pass |
| 64 | + |
| 65 | + def __enter__(self): |
| 66 | + return self |
| 67 | + |
| 68 | + def __exit__(self, exc_type, exc, tb): |
| 69 | + return False |
| 70 | + |
| 71 | + class DummyPsycopg2: |
| 72 | + def connect(self, **kwargs): # noqa: D401 |
| 73 | + return DummyConnection() |
| 74 | + |
| 75 | + monkeypatch.setattr(wp, "psycopg2", DummyPsycopg2()) |
| 76 | + |
| 77 | + logger = logging.getLogger("trace.postgres") |
| 78 | + logger.setLevel(TRACE_LEVEL) |
| 79 | + wp._logger = logger # type: ignore |
| 80 | + wp.POSTGRES = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} |
| 81 | + |
| 82 | + caplog.set_level(TRACE_LEVEL) |
| 83 | + message = {"event_id": "e", "tenant_id": "t", "source_app": "a", "environment": "dev", "timestamp": 1} |
| 84 | + ok, err = wp.write("public.cps.za.test", message) |
| 85 | + assert ok and err is None |
| 86 | + assert any("Postgres payload" in rec.message for rec in caplog.records) |
0 commit comments