-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsniff_mqtt.py
More file actions
84 lines (60 loc) · 2.14 KB
/
sniff_mqtt.py
File metadata and controls
84 lines (60 loc) · 2.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import os
import json
from datetime import datetime
import paho.mqtt.client as mqtt
CFG_PATH = os.path.join("ADaMpy", "config", "adam_config.json")
def now():
return datetime.now().strftime("%H:%M:%S")
def load_cfg():
with open(CFG_PATH, "r", encoding="utf-8-sig") as f:
return json.load(f)
def classify(payload: str) -> str:
s = (payload or "").lstrip()
if not s:
return "EMPTY"
if s.startswith("{"):
return "JSON"
if s.startswith("a") and len(s) >= 2 and s[1].isdigit():
return "GPAP_ALARM"
if s.startswith("o") and len(s) >= 2 and s[1] in "acds":
return "GPAP_RESPONSE"
if s in ("m", "u"):
return "MUTE_TOGGLE"
if s.startswith("k|"):
return "LEGACY_ACK"
return "UNKNOWN"
def on_connect(client, userdata, flags, rc):
print(f"[{now()}] CONNECT rc={rc}")
for t in userdata["topics"]:
client.subscribe(t, qos=1)
print(f"[{now()}] SUB {t}")
def on_message(client, userdata, msg):
raw = msg.payload.decode("utf-8", errors="replace")
kind = classify(raw)
preview = raw.replace("\r", "\\r").replace("\n", "\\n")
if len(preview) > 200:
preview = preview[:200] + "..."
print(f"[{now()}] {kind} topic={msg.topic} payload={preview}")
def main():
cfg = load_cfg()
broker = cfg.get("broker_host", "public.cloud.shiftr.io")
port = int(cfg.get("broker_port", 1883))
user = cfg.get("username", "public")
pw = cfg.get("password", "public")
alarm_topic = cfg.get("alarm_topic", "adam/in/alarms")
ack_topic = cfg.get("ack_topic", "adam/acks")
annunciators = list(cfg.get("annunciators", []))
topics = [alarm_topic, ack_topic, f"{ack_topic}/#"] + annunciators
client = mqtt.Client(client_id="sniff_mqtt")
client.username_pw_set(user, pw)
client.user_data_set({"topics": topics})
client.on_connect = on_connect
client.on_message = on_message
print(f"[{now()}] Connecting to {broker}:{port}")
print(f"[{now()}] Watching topics:")
for t in topics:
print(" -", t)
client.connect(broker, port)
client.loop_forever()
if __name__ == "__main__":
main()