-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprotocol.py
More file actions
128 lines (109 loc) · 3.88 KB
/
protocol.py
File metadata and controls
128 lines (109 loc) · 3.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# -*- coding: utf-8 -*-
"""Lightweight UDP command server for SSL robot control.
Notes
-----
- Author: Mauricio Moraes Godoy
- Date: 2025-09-15
Protocol
--------
JSON messages over UDP. Supported commands:
- {"cmd": "vel", "vx": float, "vy": float, "vtheta": float, "heading": float|null}
- {"cmd": "stop"}
- {"cmd": "heading", "theta": float} # set desired heading target
- {"cmd": "imu", "theta": float} # update measured heading
- {"cmd": "ping"}
- {"cmd": "drive", "vx": float, "vy": float, "vtheta": float,
"theta": float|null, "theta_target": float|null}
Responses (optional when ack=True): {"ok": true, "cmd": "<name>"}
"""
import json
import socket
from typing import Optional, Tuple
from ssl_robot import SSLRobot
class UDPCommandServer:
"""Simple UDP server that decodes JSON commands and drives the robot.
Parameters
----------
robot : SSLRobot
Robot controller instance.
host : str, default "0.0.0.0"
Bind address.
port : int, default 20011
UDP port to listen on.
ack : bool, default False
If True, send a small JSON acknowledgment back to the sender.
"""
def __init__(self, robot: SSLRobot, host: str = "0.0.0.0", port: int = 20011, ack: bool = False):
self.robot = robot
self.host = host
self.port = port
self.ack = ack
self.sock: Optional[socket.socket] = None
self._running: bool = False
def start(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((self.host, self.port))
self._running = True
def stop(self):
self._running = False
if self.sock is not None:
try:
self.sock.close()
finally:
self.sock = None
def serve_forever(self):
if self.sock is None:
self.start()
assert self.sock is not None
while self._running:
data, addr = self.sock.recvfrom(4096)
self._handle_packet(data, addr)
def _handle_packet(self, data: bytes, addr: Tuple[str, int]):
try:
payload = json.loads(data.decode("utf-8"))
except Exception:
return
cmd = payload.get("cmd")
ok = True
if cmd == "vel":
vx = float(payload.get("vx", 0.0))
vy = float(payload.get("vy", 0.0))
vtheta = float(payload.get("vtheta", 0.0))
heading = payload.get("heading")
heading = float(heading) if heading is not None else None
self.robot.set_velocity(vx, vy, vtheta, heading=heading)
elif cmd == "drive":
vx = float(payload.get("vx", 0.0))
vy = float(payload.get("vy", 0.0))
vtheta = float(payload.get("vtheta", 0.0))
theta = payload.get("theta")
theta_target = payload.get("theta_target")
if theta is not None:
try:
self.robot.update_heading(float(theta))
except Exception:
pass
if theta_target is not None:
try:
self.robot.set_heading_target(float(theta_target))
except Exception:
pass
self.robot.set_velocity(vx, vy, vtheta)
elif cmd == "stop":
self.robot.stop()
elif cmd == "heading":
theta = float(payload.get("theta", 0.0))
self.robot.set_heading_target(theta)
elif cmd == "imu":
theta = float(payload.get("theta", 0.0))
self.robot.update_heading(theta)
elif cmd == "ping":
pass
else:
ok = False
if self.ack and self.sock is not None:
try:
resp = json.dumps({"ok": ok, "cmd": cmd}).encode("utf-8")
self.sock.sendto(resp, addr)
except Exception:
pass