diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..a61f6fb --- /dev/null +++ b/.flake8 @@ -0,0 +1,4 @@ +[flake8] +max-line-length = 110 +extend-ignore = E203 +exclude = .venv, .git, __pycache__, build, dist diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b6e5000..54d47bb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,6 +1,10 @@ name: CI -on: [push, pull_request] +on: + pull_request: + branches: + - dev + - main jobs: build: @@ -13,10 +17,9 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + pip install -r requirements.txt - name: Lint with flake8 run: | - pip install flake8 flake8 src tests - name: Run tests run: | diff --git a/.prettierrc.json b/.prettierrc.json index 1376935..25cb2cb 100644 --- a/.prettierrc.json +++ b/.prettierrc.json @@ -3,7 +3,7 @@ "useTabs": false, "singleQuote": true, "trailingComma": "all", - "printWidth": 120, + "printWidth": 110, "semi": true, "endOfLine": "lf", "overrides": [ diff --git a/.vscode/settings.json b/.vscode/settings.json index 1dc798a..4614b91 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -28,5 +28,6 @@ "python.analysis.typeCheckingMode": "basic", "python.analysis.autoImportCompletions": true, "python.defaultInterpreterPath": ".venv/bin/python", - "python.terminal.activateEnvInCurrentTerminal": true + "python.terminal.activateEnvInCurrentTerminal": true, + "cSpell.words": ["MQTT"] } diff --git a/docs/Makefile b/docs/Makefile index b4f7145..2b3851d 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -25,5 +25,5 @@ apidoc: html: $(SPHINXBUILD) -b html "$(SOURCEDIR)" "$(BUILDDIR)/html" -livehtml: +livehtml:clean sphinx-autobuild --host 0.0.0.0 --port ${PORT} -c . "$(SOURCEDIR)" "$(BUILDDIR)/html" diff --git a/docs/api/robot.communication.rst b/docs/api/robot.communication.rst index e53e76d..f558f3e 100644 --- a/docs/api/robot.communication.rst +++ b/docs/api/robot.communication.rst @@ -4,6 +4,14 @@ robot.communication package Submodules ---------- +robot.communication.communication module +---------------------------------------- + +.. automodule:: robot.communication.communication + :members: + :show-inheritance: + :undoc-members: + robot.communication.directed\_comm module ----------------------------------------- diff --git a/docs/api/robot.exception.rst b/docs/api/robot.exception.rst new file mode 100644 index 0000000..842e054 --- /dev/null +++ b/docs/api/robot.exception.rst @@ -0,0 +1,10 @@ +robot.exception package +======================= + +Module contents +--------------- + +.. automodule:: robot.exception + :members: + :show-inheritance: + :undoc-members: diff --git a/docs/api/robot.helpers.rst b/docs/api/robot.helpers.rst index b84e488..a51b3a6 100644 --- a/docs/api/robot.helpers.rst +++ b/docs/api/robot.helpers.rst @@ -12,6 +12,14 @@ robot.helpers.coordinate module :show-inheritance: :undoc-members: +robot.helpers.motion\_controller module +--------------------------------------- + +.. automodule:: robot.helpers.motion_controller + :members: + :show-inheritance: + :undoc-members: + robot.helpers.robot\_mqtt module -------------------------------- diff --git a/docs/api/robot.indicators.rst b/docs/api/robot.indicators.rst index 3b831c6..d00b035 100644 --- a/docs/api/robot.indicators.rst +++ b/docs/api/robot.indicators.rst @@ -4,6 +4,14 @@ robot.indicators package Submodules ---------- +robot.indicators.abstract\_indicator module +------------------------------------------- + +.. automodule:: robot.indicators.abstract_indicator + :members: + :show-inheritance: + :undoc-members: + robot.indicators.neopixel module -------------------------------- diff --git a/docs/api/robot.interfaces.rst b/docs/api/robot.interfaces.rst new file mode 100644 index 0000000..2fbf4d4 --- /dev/null +++ b/docs/api/robot.interfaces.rst @@ -0,0 +1,10 @@ +robot.interfaces package +======================== + +Module contents +--------------- + +.. automodule:: robot.interfaces + :members: + :show-inheritance: + :undoc-members: diff --git a/docs/api/robot.mqtt.rst b/docs/api/robot.mqtt.rst new file mode 100644 index 0000000..bf9d252 --- /dev/null +++ b/docs/api/robot.mqtt.rst @@ -0,0 +1,29 @@ +robot.mqtt package +================== + +Submodules +---------- + +robot.mqtt.mqtt\_msg module +--------------------------- + +.. automodule:: robot.mqtt.mqtt_msg + :members: + :show-inheritance: + :undoc-members: + +robot.mqtt.robot\_mqtt\_client module +------------------------------------- + +.. automodule:: robot.mqtt.robot_mqtt_client + :members: + :show-inheritance: + :undoc-members: + +Module contents +--------------- + +.. automodule:: robot.mqtt + :members: + :show-inheritance: + :undoc-members: diff --git a/docs/api/robot.rst b/docs/api/robot.rst index 547e914..28d36a0 100644 --- a/docs/api/robot.rst +++ b/docs/api/robot.rst @@ -8,9 +8,13 @@ Subpackages :maxdepth: 4 robot.communication + robot.exception robot.helpers robot.indicators + robot.interfaces + robot.mqtt robot.sensors + robot.types Submodules ---------- @@ -39,6 +43,14 @@ robot.robot\_base module :show-inheritance: :undoc-members: +robot.virtual\_robot module +--------------------------- + +.. automodule:: robot.virtual_robot + :members: + :show-inheritance: + :undoc-members: + Module contents --------------- diff --git a/docs/api/robot.sensors.rst b/docs/api/robot.sensors.rst index 9cf6702..7ae6661 100644 --- a/docs/api/robot.sensors.rst +++ b/docs/api/robot.sensors.rst @@ -4,6 +4,14 @@ robot.sensors package Submodules ---------- +robot.sensors.abstract\_sensor module +------------------------------------- + +.. automodule:: robot.sensors.abstract_sensor + :members: + :show-inheritance: + :undoc-members: + robot.sensors.color module -------------------------- diff --git a/docs/api/robot.types.rst b/docs/api/robot.types.rst new file mode 100644 index 0000000..45c93dd --- /dev/null +++ b/docs/api/robot.types.rst @@ -0,0 +1,29 @@ +robot.types package +=================== + +Submodules +---------- + +robot.types.proximity\_reading\_type module +------------------------------------------- + +.. automodule:: robot.types.proximity_reading_type + :members: + :show-inheritance: + :undoc-members: + +robot.types.rgb\_color\_type module +----------------------------------- + +.. automodule:: robot.types.rgb_color_type + :members: + :show-inheritance: + :undoc-members: + +Module contents +--------------- + +.. automodule:: robot.types + :members: + :show-inheritance: + :undoc-members: diff --git a/docs/index.md b/docs/index.md index 957b2d8..12845db 100644 --- a/docs/index.md +++ b/docs/index.md @@ -14,5 +14,5 @@ Welcome to the documentation for the Python robot library. This site includes in README installation usage -api/index +api/modules ``` diff --git a/examples/my_test_robot.py b/examples/my_test_robot.py new file mode 100644 index 0000000..93e461e --- /dev/null +++ b/examples/my_test_robot.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import threading +import time + +from robot import MQTTSettings, VirtualRobot +from robot.interfaces import RobotState + + +class MyTestRobot(VirtualRobot): + def setup(self) -> None: # type: ignore[override] + print("My Test Robot Started") + super().setup() + + def loop(self) -> None: # type: ignore[override] + super().loop() + if self.state == RobotState.RUN: + print("Test") + self.delay(1000) + + def communication_interrupt(self, msg: str) -> None: + print( + f"communicationInterrupt on {self.id} " + f"with msg:{msg}" + ) + + +if __name__ == "__main__": + # Configure MQTT (fill with your broker details) + MQTTSettings.server = "localhost" + MQTTSettings.port = 1883 + MQTTSettings.user_name = "" + MQTTSettings.password = "" + MQTTSettings.channel = "v1" + + robot = MyTestRobot(10, 0, 0, 90) + t = threading.Thread(target=robot.run, daemon=True) + t.start() + + # Example to send start after a short delay + time.sleep(1) + robot.start() + t.join() diff --git a/pyproject.toml b/pyproject.toml index 9615291..168bbfe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,6 @@ version = "0.1.0" description = "A modular robot control library" authors = [ { name = "Nuwan Jaliyagoda", email = "nuwanjaliyagoda@gmail.com" }, - { name = "Nuwan Jaliyagoda", email = "kavindumethpura@gmail.com" }, { name = "Kavindu Prabhath Methpura", email = "kavindumethpura@gmail.com" }, ] readme = "README.md" diff --git a/requirements.txt b/requirements.txt index e410884..1c5ba72 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,4 @@ -paho-mqtt==2.1.0 \ No newline at end of file +flake8==7.3.0 +packaging==25.0 +paho-mqtt==2.1.0 +pytest==8.4.2 diff --git a/src/robot/__init__.py b/src/robot/__init__.py index d27dffa..3861fb5 100644 --- a/src/robot/__init__.py +++ b/src/robot/__init__.py @@ -1,11 +1,17 @@ -"""Top-level package for robot library.""" +"""Top-level package for robot library (Python port of robot-library-java).""" from .robot_base import Robot -from .mqtt_client import RobotMqttClient from .motion import MotionController +from .virtual_robot import VirtualRobot + +# Expose common subpackages +from .configs.mqtt_settings import MQTTSettings +from .configs.robot_settings import RobotSettings __all__ = [ "Robot", - "RobotMqttClient", "MotionController", + "VirtualRobot", + "MQTTSettings", + "RobotSettings", ] diff --git a/src/robot/communication/communication.py b/src/robot/communication/communication.py new file mode 100644 index 0000000..5fbb453 --- /dev/null +++ b/src/robot/communication/communication.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from robot.interfaces import IMqttHandler +from robot.mqtt.robot_mqtt_client import RobotMqttClient + + +class Communication(IMqttHandler): + def __init__(self, robot_id: int, mqtt_client: RobotMqttClient): + self.robot_mqtt_client = mqtt_client + self.robot_id = robot_id + + def send_message(self, msg: str) -> None: # abstract + raise NotImplementedError + + def send_message_with_distance(self, msg: str, distance: int) -> None: # abstract + raise NotImplementedError diff --git a/src/robot/communication/directed_comm.py b/src/robot/communication/directed_comm.py index c0317d8..d773a77 100644 --- a/src/robot/communication/directed_comm.py +++ b/src/robot/communication/directed_comm.py @@ -1,7 +1,35 @@ -"""Directed communication module.""" +"""Directed communication module mirroring Java logic.""" +from __future__ import annotations -class DirectedCommunication: - """Placeholder directed communication class.""" +import json - pass +from robot.communication.communication import Communication +from robot.mqtt.mqtt_msg import MqttMsg +from robot.mqtt.robot_mqtt_client import RobotMqttClient + + +class DirectedCommunication(Communication): + def __init__(self, robot_id: int, mqtt_client: RobotMqttClient): + super().__init__(robot_id, mqtt_client) + self._topics_sub: dict[str, str] = {} + self._subscribe("COMMUNICATION_IN_DIR", f"comm/in/direct/{robot_id}") + + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) + + def send_message(self, msg: str) -> None: + obj = {"id": self.robot_id, "msg": msg} + self.robot_mqtt_client.publish("comm/out/direct", json.dumps(obj)) + + def send_message_with_distance(self, msg: str, distance: int) -> None: + obj = {"id": self.robot_id, "msg": msg, "dist": distance} + self.robot_mqtt_client.publish("comm/out/direct", json.dumps(obj)) + + def handle_subscription(self, robot, m: MqttMsg) -> None: + topic, msg = m.topic, m.message + if topic == self._topics_sub.get("COMMUNICATION_IN_DIR"): + robot.communication_interrupt(msg) + else: + print(f"Received (unknown dir): {topic}> {msg}") diff --git a/src/robot/communication/simple_comm.py b/src/robot/communication/simple_comm.py index 7ae7394..f7217b3 100644 --- a/src/robot/communication/simple_comm.py +++ b/src/robot/communication/simple_comm.py @@ -1,7 +1,36 @@ -"""Simple communication module.""" +"""Simple communication module mirroring Java logic.""" +from __future__ import annotations -class SimpleCommunication: - """Placeholder simple communication class.""" +import json +from robot.mqtt.robot_mqtt_client import RobotMqttClient +from robot.mqtt.mqtt_msg import MqttMsg +from robot.communication.communication import Communication - pass + +class SimpleCommunication(Communication): + def __init__(self, robot_id: int, mqtt_client: RobotMqttClient): + super().__init__(robot_id, mqtt_client) + self._topics_sub: dict[str, str] = {} + self._subscribe("COMMUNICATION_IN_SIMP", f"comm/in/simple/{robot_id}") + + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) + + def send_message(self, msg: str) -> None: + obj = {"id": self.robot_id, "msg": msg} + self.robot_mqtt_client.publish("comm/out/simple", json.dumps(obj)) + + def send_message_with_distance(self, msg: str, distance: int) -> None: + obj = {"id": self.robot_id, "msg": msg, "dist": distance} + self.robot_mqtt_client.publish("comm/out/simple", json.dumps(obj)) + + # IMqttHandler + def handle_subscription(self, robot, m: MqttMsg) -> None: + topic, msg = m.topic, m.message + if topic == self._topics_sub.get("COMMUNICATION_IN_SIMP"): + robot.communication_interrupt(msg) + else: + print(self._topics_sub.get("COMMUNICATION_IN_SIMP")) + print(f"Received (unknown simp): {topic}> {msg}") diff --git a/src/robot/configs/mqtt_settings.py b/src/robot/configs/mqtt_settings.py new file mode 100644 index 0000000..7c3e42e --- /dev/null +++ b/src/robot/configs/mqtt_settings.py @@ -0,0 +1,23 @@ +"""MQTT settings analogous to swarm.configs.MQTTSettings (Java). + +Set these values before starting a robot to connect to the broker. +""" + + +class MQTTSettings: + server: str | None = None + port: int = 1883 + user_name: str | None = None + password: str | None = None + channel: str | None = None + + @classmethod + def print(cls) -> None: + print(f"server: {cls.server}") + print(f"port: {cls.port}") + print(f"username: {cls.user_name}") + print(f"password: {'(set)' if cls.password else '(unset)'}") + print(f"channel: {cls.channel}") + + +__all__ = ["MQTTSettings"] diff --git a/src/robot/configs/robot_settings.py b/src/robot/configs/robot_settings.py new file mode 100644 index 0000000..9c662f6 --- /dev/null +++ b/src/robot/configs/robot_settings.py @@ -0,0 +1,16 @@ +"""Robot settings analogous to swarm.configs.RobotSettings (Java).""" + + +class RobotSettings: + ROBOT_SPEED_MAX: int = 255 + ROBOT_SPEED_MIN: int = 50 + + ROBOT_RADIUS: int = 6 # in cm + ROBOT_WIDTH: int = 12 # in cm + ROBOT_WHEEL_RADIUS: float = 3.5 # in cm + + # 0: no logs (TODO: implement levels) + ROBOT_LOG_LEVEL: int = 0 + + +__all__ = ["RobotSettings"] diff --git a/src/robot/exception/__init__.py b/src/robot/exception/__init__.py new file mode 100644 index 0000000..7c915d5 --- /dev/null +++ b/src/robot/exception/__init__.py @@ -0,0 +1,46 @@ +"""Exception types mirroring Java exceptions (slim wrappers).""" + + +class MotionControllerException(Exception): + def __init__(self, message: str): + super().__init__(message) + print(f"Motion Error: {message}") + + +class MqttClientException(Exception): + def __init__(self, message: str): + super().__init__(message) + print(f"MQTT Error: {message}") + + +class SensorException(Exception): + def __init__(self, message: str): + super().__init__(message) + print(f"Sensor Error: {message}") + + +class ProximityException(Exception): + def __init__(self, message: str): + super().__init__(message) + print(f"Proximity reading error: {message}") + + +class RGBColorException(Exception): + def __init__( + self, + R: int | None = None, + G: int | None = None, + B: int | None = None, + ): + msg = f"Invalid RGB values: R={R}, G={G}, B={B}" + super().__init__(msg) + print(msg) + + +__all__ = [ + "MotionControllerException", + "MqttClientException", + "SensorException", + "ProximityException", + "RGBColorException", +] diff --git a/src/robot/helpers/__init__.py b/src/robot/helpers/__init__.py index 76c58a1..7752bbc 100644 --- a/src/robot/helpers/__init__.py +++ b/src/robot/helpers/__init__.py @@ -2,8 +2,10 @@ from .coordinate import Coordinate from .robot_mqtt import RobotMQTT +from .motion_controller import MotionController __all__ = [ "Coordinate", "RobotMQTT", + "MotionController", ] diff --git a/src/robot/helpers/coordinate.py b/src/robot/helpers/coordinate.py index 09e5a61..c9f09a0 100644 --- a/src/robot/helpers/coordinate.py +++ b/src/robot/helpers/coordinate.py @@ -1,7 +1,108 @@ -"""Coordinate tracking helper.""" +from __future__ import annotations +from robot.interfaces import IMqttHandler +from robot.mqtt.mqtt_msg import MqttMsg +from robot.mqtt.robot_mqtt_client import RobotMqttClient -class Coordinate: - """Placeholder coordinate tracker.""" - pass +class Coordinate(IMqttHandler): + def __init__( + self, + robot_id: int, + x: float, + y: float, + heading: float, + mqtt_client: RobotMqttClient, + ): + self._x = float(x) + self._y = float(y) + self._heading = float(heading) + self._robot_id = robot_id + self.robot_mqtt_client = mqtt_client + + # subscriptions + self._topics_sub: dict[str, str] = {} + self._subscribe("ROBOT_LOCALIZATION", "localization/update/?") + + # Subscriptions ------------------------------------------------------ + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) + + def handle_subscription(self, robot, message: MqttMsg) -> None: # noqa: D401 + topic = message.topic + if topic == self._topics_sub.get("ROBOT_LOCALIZATION"): + print(f"publishing the localization data of robot {robot.get_id()}") + self.publish_coordinate() + + # Getters/Setters ---------------------------------------------------- + def get_x(self) -> float: + return self._x + + def get_y(self) -> float: + return self._y + + def set_x(self, x: float) -> None: + self._x = float(x) + + def set_y(self, y: float) -> None: + self._y = float(y) + + def get_heading(self) -> float: + return self._heading + + def get_heading_rad(self) -> float: + import math + + return float(math.radians(self._heading)) + + def set_heading(self, heading: float) -> None: + self._heading = float(self._normalize_heading(heading)) + + def set_heading_rad(self, heading: float) -> None: + import math + + self.set_heading(math.degrees(heading)) + + def set_coordinate(self, x: float, y: float) -> None: + self.set_x(x) + self.set_y(y) + + def set_coordinate_heading(self, x: float, y: float, heading: float) -> None: + self.set_coordinate(x, y) + self.set_heading(heading) + + # Utilities ----------------------------------------------------------- + def __str__(self) -> str: + return ( + f"x:{self._round2(self._x)} y:{self._round2(self._y)} " + f"heading:{self._round2(self._heading)}" + ) + + def print(self) -> None: # noqa: A003 - mirror Java name + print(str(self)) + + def publish_coordinate(self) -> None: + # Keep format identical to Java using a JSON array of one object + coord = { + "id": self._robot_id, + "x": self.get_x(), + "y": self.get_y(), + "heading": self.get_heading(), + "reality": "V", + } + data = [coord] + + import json + + self.robot_mqtt_client.publish("localization/update", json.dumps(data)) + + # Internal helpers ---------------------------------------------------- + def _round2(self, v: float) -> float: + return round(v * 100) / 100.0 + + def _normalize_heading(self, heading: float) -> float: + import math + + # normalize to [-180, 180] + return heading - math.ceil(heading / 360.0 - 0.5) * 360.0 diff --git a/src/robot/helpers/motion_controller.py b/src/robot/helpers/motion_controller.py new file mode 100644 index 0000000..09df5d8 --- /dev/null +++ b/src/robot/helpers/motion_controller.py @@ -0,0 +1,174 @@ +from __future__ import annotations + +import math +import time + +from robot.configs.robot_settings import RobotSettings +from robot.exception import MotionControllerException +from robot.helpers.coordinate import Coordinate + + +class MotionController: + # This is the maximum duration allowed to coordinate calculation + _MAX_DURATION = 100 + + CM_2_MM = 10 + SEC_2_MS = 1000 + + # Additional delays for simulation time + ADDITIONAL_DELAY = 500 + + # Match cm/s speed + SPEED_FACTOR = 0.1 + + def __init__(self, coordinate: Coordinate | None = None): + if coordinate is None: + # Lightweight fallback coordinate for tests or standalone use + class _StubCoord: + def __init__(self): + self._x = 0.0 + self._y = 0.0 + self._h = 0.0 + + def get_x(self): + return self._x + + def get_y(self): + return self._y + + def get_heading(self): + return self._h + + def get_heading_rad(self): + return math.radians(self._h) + + def set_coordinate_heading(self, x, y, h): + self._x, self._y, self._h = x, y, h + + def publish_coordinate(self): + pass + + coordinate = _StubCoord() # type: ignore[assignment] + self.c = coordinate # type: ignore[assignment] + + # Wrappers ----------------------------------------------------------- + def move( + self, + left_speed: int, + right_speed: int, + duration: int | float | None = None, + ) -> None: + if duration is None: + duration = self._MAX_DURATION + self._move(left_speed, right_speed, float(duration)) + + def rotate(self, speed: int, duration: int | float | None = None) -> None: + if duration is None: + duration = self._MAX_DURATION + self._rotate(speed, float(duration)) + + def rotate_radians(self, speed: int, radians: float) -> None: + self.rotate_degree(speed, math.degrees(radians)) + + def rotate_degree(self, speed: int, degree: float) -> None: + try: + if degree == 0 or degree < -180 or degree > 180: + raise MotionControllerException( + "Degree should in range [-180, 180], except 0" + ) + if speed < RobotSettings.ROBOT_SPEED_MIN: + raise MotionControllerException( + "Speed should be greater than ROBOT_SPEED_MIN" + ) + + sign = int(degree / abs(degree)) + distance = ( + 2 * math.pi * RobotSettings.ROBOT_RADIUS * (abs(degree) / 360) + ) * self.CM_2_MM + duration = float(distance / abs(speed)) * self.SEC_2_MS + self._debug( + f"Sign: {sign} Distance: {distance} Duration: {duration}" + ) + self._rotate(sign * speed, duration) + except MotionControllerException: # noqa: F841 + pass + + # Core movement ------------------------------------------------------ + def _rotate(self, speed: int, duration: float) -> None: + self._move(speed, -1 * speed, duration) + + def _move( + self, + left_speed: int, + right_speed: int, + duration: float, + ) -> None: + if not ( + self._is_speed_in_range(left_speed) + and self._is_speed_in_range(right_speed) + ): + try: + raise MotionControllerException( + "One of the provided speeds is out of " + "range in move() function" + ) + except MotionControllerException: + return + + # step interval in ms, break the duration into slices + step_interval = 100 + cumulative_interval = 0 + steps = int(duration // step_interval) if duration > 0 else 0 + + for _ in range(steps): + dL = left_speed * self.SPEED_FACTOR * (step_interval / 1000.0) + dR = right_speed * self.SPEED_FACTOR * (step_interval / 1000.0) + d = (dL + dR) / 2.0 + h = self.c.get_heading_rad() + + x = self.c.get_x() + d * math.cos(h) + y = self.c.get_y() + d * math.sin(h) + heading = ( + self.c.get_heading_rad() + + (dR - dL) / (RobotSettings.ROBOT_WIDTH) + ) + + self.c.set_coordinate_heading(x, y, math.degrees(heading)) + + cumulative_interval += step_interval + if cumulative_interval >= self.ADDITIONAL_DELAY: + self._debug(f"Adding extra delay of {self.ADDITIONAL_DELAY}") + self._delay(self.ADDITIONAL_DELAY) + self.c.publish_coordinate() + cumulative_interval -= self.ADDITIONAL_DELAY + + # Round to nearest int + self.c.set_coordinate_heading( + round(self.c.get_x()), + round(self.c.get_y()), + round(self.c.get_heading()), + ) + self.c.publish_coordinate() + + # Helpers ------------------------------------------------------------ + def _is_speed_in_range(self, speed: int) -> bool: + if speed > 0: + return ( + RobotSettings.ROBOT_SPEED_MIN <= speed + <= RobotSettings.ROBOT_SPEED_MAX + ) + elif speed < 0: + return ( + -RobotSettings.ROBOT_SPEED_MAX + <= speed + <= -RobotSettings.ROBOT_SPEED_MIN + ) + return True # 0 acceptable + + def _delay(self, duration_ms: int) -> None: + time.sleep(max(0, duration_ms) / 1000.0) + + @staticmethod + def _debug(msg: str, level: int = 0) -> None: + if level > 0: + print(f"[DEBUG]\t{msg}") diff --git a/src/robot/helpers/robot_mqtt.py b/src/robot/helpers/robot_mqtt.py index 0af763c..e01eddd 100644 --- a/src/robot/helpers/robot_mqtt.py +++ b/src/robot/helpers/robot_mqtt.py @@ -1,7 +1,51 @@ -"""MQTT helper functions.""" +from __future__ import annotations + +import json + +from robot.mqtt.mqtt_msg import MqttMsg +from robot.mqtt.robot_mqtt_client import RobotMqttClient class RobotMQTT: - """Placeholder MQTT helper.""" + def __init__(self, robot_id: int, mqtt: RobotMqttClient, reality: str): + self.robot_mqtt_client = mqtt + self.robot_id = robot_id + self.reality = reality + + self._topics_sub: dict[str, str] = {} + self._subscribe("ROBOT_MSG", f"robot/msg/{robot_id}") + self._subscribe("ROBOT_MSG_BROADCAST", "robot/msg/broadcast") + + def robot_create(self, x: float, y: float, heading: float) -> None: + msg = { + "id": self.robot_id, + "x": x, + "y": y, + "heading": heading, + "reality": self.reality, + } + self.robot_mqtt_client.publish("robot/create", json.dumps(msg)) + + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) - pass + def handle_subscription(self, robot, m: MqttMsg) -> None: + topic, msg = m.topic, m.message + if topic in ( + self._topics_sub.get("ROBOT_MSG"), + self._topics_sub.get("ROBOT_MSG_BROADCAST"), + ): + msg_topic = msg.split(" ")[0] + if msg_topic == "ID?": + obj = {"id": self.robot_id, "reality": "V"} + self.robot_mqtt_client.publish("robot/live", json.dumps(obj)) + print(f"robot/live > {json.dumps(obj)}") + elif msg_topic == "START": + robot.start() + elif msg_topic == "STOP": + robot.stop() + elif msg_topic == "RESET": + robot.reset() + else: + print(f"Received (unknown): {topic}> {msg}") diff --git a/src/robot/indicators/abstract_indicator.py b/src/robot/indicators/abstract_indicator.py new file mode 100644 index 0000000..36e2d3c --- /dev/null +++ b/src/robot/indicators/abstract_indicator.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from robot.mqtt.robot_mqtt_client import RobotMqttClient + + +class AbstractIndicator: + def __init__(self, robot, mqtt_client: RobotMqttClient): + self.robot_mqtt_client = mqtt_client + self.robot_id = robot.get_id() + self.robot = robot + + def handle_subscription(self, r, m): # must be overridden in subclasses + raise NotImplementedError diff --git a/src/robot/indicators/neopixel.py b/src/robot/indicators/neopixel.py index 0cd1144..489404a 100644 --- a/src/robot/indicators/neopixel.py +++ b/src/robot/indicators/neopixel.py @@ -1,7 +1,46 @@ -"""NeoPixel indicator controller.""" +"""NeoPixel indicator controller mirroring Java logic.""" +from __future__ import annotations -class NeoPixel: - """Placeholder NeoPixel controller.""" +import json - pass +from robot.mqtt.mqtt_msg import MqttMsg +from robot.mqtt.robot_mqtt_client import RobotMqttClient +from robot.types import RGBColorType + +from .abstract_indicator import AbstractIndicator + + +class NeoPixel(AbstractIndicator): + def __init__(self, robot, mqtt_client: RobotMqttClient): + super().__init__(robot, mqtt_client) + self._topics_sub: dict[str, str] = {} + self._subscribe("NEOPIXEL_IN", f"output/neopixel/{self.robot_id}") + + # Set the default color at beginning + self.change_color(66, 66, 66) + + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) + + def handle_subscription(self, r, m: MqttMsg) -> None: + topic, msg = m.topic, m.message + if topic == self._topics_sub.get("NEOPIXEL_IN"): + colors = msg.split(" ") + R = int(colors[0]) + G = int(colors[1]) + B = int(colors[2]) + self.change_color(R, G, B) + else: + print(f"Received (unknown): {topic}> {msg}") + + def change_color(self, red: int, green: int, blue: int) -> None: + color = RGBColorType(red, green, blue) + obj = { + "id": self.robot_id, + "R": color.get_r(), + "G": color.get_g(), + "B": color.get_b(), + } + self.robot_mqtt_client.publish("output/neopixel", json.dumps(obj)) diff --git a/src/robot/interfaces/__init__.py b/src/robot/interfaces/__init__.py new file mode 100644 index 0000000..8be57b2 --- /dev/null +++ b/src/robot/interfaces/__init__.py @@ -0,0 +1,49 @@ +"""Interfaces mirroring the Java interfaces (snake_case in Python). + +This module defines protocol-like interfaces and enums to preserve the design +from the Java implementation while remaining Pythonic. +""" + +from __future__ import annotations + +from enum import Enum +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +if TYPE_CHECKING: # pragma: no cover - for type hints only + from robot.mqtt.mqtt_msg import MqttMsg + from robot.robot_base import Robot + + +class RobotState(Enum): + WAIT = "WAIT" + RUN = "RUN" + BEGIN = "BEGIN" + + +@runtime_checkable +class IRobotState(Protocol): + state: RobotState + + def loop(self) -> None: ... + + def sensor_interrupt(self, sensor: str, value: str) -> None: ... + + def communication_interrupt(self, msg: str) -> None: ... + + def start(self) -> None: ... + + def stop(self) -> None: ... + + def reset(self) -> None: ... + + +@runtime_checkable +class IMqttHandler(Protocol): + def handle_subscription(self, r: "Robot", m: "MqttMsg") -> None: ... + + +__all__ = [ + "RobotState", + "IRobotState", + "IMqttHandler", +] diff --git a/src/robot/motion.py b/src/robot/motion.py index 3945773..f956ce2 100644 --- a/src/robot/motion.py +++ b/src/robot/motion.py @@ -1,7 +1,3 @@ -"""Motion controller algorithms.""" +"""Motion controller algorithms (re-export).""" - -class MotionController: - """Placeholder motion controller.""" - - pass +from .helpers.motion_controller import MotionController # noqa: F401 diff --git a/src/robot/mqtt/__init__.py b/src/robot/mqtt/__init__.py new file mode 100644 index 0000000..d7f0957 --- /dev/null +++ b/src/robot/mqtt/__init__.py @@ -0,0 +1,7 @@ +from .mqtt_msg import MqttMsg +from .robot_mqtt_client import RobotMqttClient + +__all__ = [ + "MqttMsg", + "RobotMqttClient", +] diff --git a/src/robot/mqtt/mqtt_msg.py b/src/robot/mqtt/mqtt_msg.py new file mode 100644 index 0000000..c9fee96 --- /dev/null +++ b/src/robot/mqtt/mqtt_msg.py @@ -0,0 +1,19 @@ +from __future__ import annotations + + +class MqttMsg: + _next_id = 0 + + def __init__(self, topic: str, message: str, qos: int = 0): + self.id = MqttMsg._next_id + MqttMsg._next_id += 1 + + self.topic = topic + self.message = message + self.topic_groups = topic.split("/") + self.channel = self.topic_groups[0] if len(self.topic_groups) > 1 else "" + self.qos = qos + + def __lt__(self, other: "MqttMsg") -> bool: + # Define an arbitrary but stable ordering for potential priority queues + return self.id < other.id diff --git a/src/robot/mqtt/robot_mqtt_client.py b/src/robot/mqtt/robot_mqtt_client.py new file mode 100644 index 0000000..e8d7c73 --- /dev/null +++ b/src/robot/mqtt/robot_mqtt_client.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +from collections import deque +from typing import Deque + +import paho.mqtt.client as mqtt + +from robot.exception import MqttClientException +from robot.mqtt.mqtt_msg import MqttMsg + + +class RobotMqttClient: + """MQTT client wrapper mirroring Java's RobotMqttClient. + + - Maintains `in_queue` and `out_queue` of `MqttMsg`. + - Prefixes publish/subscribe topics with `MQTTSettings.channel` externally. + """ + + def __init__( + self, + server: str, + port: int, + user_name: str | None, + password: str | None, + channel: str, + ): + self.server = server + self.port = int(port) + self.user_name = user_name or "" + self.password = password or "" + self.channel = channel or "" + + self._client = mqtt.Client() + self._connected = False + + # Incoming/outgoing queues + self.in_queue: Deque[MqttMsg] = deque() + self.out_queue: Deque[MqttMsg] = deque() + + # Setup callbacks + self._client.on_connect = self._on_connect + self._client.on_message = self._on_message + self._client.on_disconnect = self._on_disconnect + + self._connect() + + # Connection management ------------------------------------------------- + def _connect(self) -> None: + try: + if self.user_name: + self._client.username_pw_set(self.user_name, self.password) + # Start network loop in a background thread + self._client.connect(self.server, self.port, keepalive=60) + self._client.loop_start() + self._connected = True + print("MQTT: Connected") + except Exception as e: + # broad but acceptable for connection bootstrapping + print(f"MQTT connection error: {e}") + self._connected = False + + def _on_connect(self, client: mqtt.Client, userdata, flags, rc): # noqa: ANN001, ANN201 + if rc == 0: + self._connected = True + else: + print(f"MQTT: Connection failed with code {rc}") + + def _on_disconnect(self, client: mqtt.Client, userdata, rc): # noqa: ANN001, ANN201 + self._connected = False + print("Connection lost!") + + # MQTT callbacks -------------------------------------------------------- + def _on_message( + self, + client: mqtt.Client, + userdata, + msg: mqtt.MQTTMessage, + ): # noqa: ANN001, ANN201 + try: + topic = msg.topic + except AttributeError: + topic = msg[0] + try: + payload = msg.payload.decode("utf-8") + except Exception: + payload = str(msg.payload) + + # Strip channel prefix before enqueueing, matching Java behavior + if "/" in topic: + t = topic[topic.find("/") + 1 :] + else: + t = topic + + if payload: + self.in_queue.append(MqttMsg(t, payload)) + + # API ------------------------------------------------------------------- + def publish( + self, topic: str, body: str, qos: int = 0, retained: bool = False + ) -> None: + if not (self._connected and topic and body): + raise MqttClientException("Not Connected or empty topic/body") + full_topic = f"{self.channel}/{topic}" if self.channel else topic + try: + self._client.publish(full_topic, body, qos=qos, retain=retained) + except Exception as e: + raise MqttClientException(str(e)) + + def subscribe(self, topic: str) -> None: + if not (self._connected and topic): + raise MqttClientException("Not Connected or empty topic") + full_topic = f"{self.channel}/{topic}" if self.channel else topic + try: + self._client.subscribe(full_topic) + print(f"Subscribed to {full_topic}") + except Exception as e: + raise MqttClientException(str(e)) + + def in_queue_pop(self) -> MqttMsg | None: + return self.in_queue.popleft() if self.in_queue else None + + def out_queue_pop(self) -> MqttMsg | None: + return self.out_queue.popleft() if self.out_queue else None diff --git a/src/robot/mqtt_client.py b/src/robot/mqtt_client.py index 4367bb4..3796230 100644 --- a/src/robot/mqtt_client.py +++ b/src/robot/mqtt_client.py @@ -1,7 +1,6 @@ -"""MQTT client wrapper for robot communication.""" +"""Compatibility re-export for RobotMqttClient. +The implementation lives in `robot.mqtt.robot_mqtt_client`. +""" -class RobotMqttClient: - """Placeholder MQTT client wrapper.""" - - pass +from .mqtt.robot_mqtt_client import RobotMqttClient # noqa: F401 diff --git a/src/robot/robot_base.py b/src/robot/robot_base.py index 54d0c4b..e971737 100644 --- a/src/robot/robot_base.py +++ b/src/robot/robot_base.py @@ -1,7 +1,180 @@ -"""Base Robot class.""" +"""Abstract Robot base class mirroring swarm.robot.Robot (Java). +Method names are provided in snake_case. +""" -class Robot: - """Base robot class providing lifecycle management.""" +from __future__ import annotations - pass +import time +from typing import Optional + +from robot.communication.directed_comm import DirectedCommunication +from robot.communication.simple_comm import SimpleCommunication +from robot.configs.mqtt_settings import MQTTSettings +from robot.helpers.coordinate import Coordinate +from robot.helpers.motion_controller import MotionController +from robot.helpers.robot_mqtt import RobotMQTT +from robot.indicators.neopixel import NeoPixel +from robot.interfaces import IRobotState, RobotState +from robot.mqtt.mqtt_msg import MqttMsg +from robot.mqtt.robot_mqtt_client import RobotMqttClient +from robot.sensors.color import ColorSensor +from robot.sensors.distance import DistanceSensor +from robot.sensors.proximity import ProximitySensor + + +class Robot(IRobotState): + # Sensors + dist_sensor: DistanceSensor + proximity_sensor: ProximitySensor + color_sensor: ColorSensor + + # Communication + simple_comm: SimpleCommunication + directed_comm: DirectedCommunication + + # Output + neo_pixel: NeoPixel + + # Helpers + motion: MotionController + robot_mqtt: RobotMQTT + coordinates: Coordinate + robot_mqtt_client: RobotMqttClient + + # Vars + id: int + reality: str + state: RobotState = RobotState.WAIT + + def __init__( + self, + id: int, + x: float, + y: float, + heading: float, + reality: str, + ): + self.id = id + self.reality = reality + + # Create mqtt client + self.robot_mqtt_client = RobotMqttClient( + MQTTSettings.server or "localhost", + int(MQTTSettings.port), + MQTTSettings.user_name or "", + MQTTSettings.password or "", + MQTTSettings.channel or "v1", + ) + + self.coordinates = Coordinate(id, x, y, heading, self.robot_mqtt_client) + self.robot_mqtt = RobotMQTT(id, self.robot_mqtt_client, reality) + + # Request simulator to create robot instance + self.robot_mqtt.robot_create( + self.coordinates.get_x(), + self.coordinates.get_y(), + self.coordinates.get_heading(), + ) + + self.delay(1000) + self.motion = MotionController(self.coordinates) + + # Lifecycle --------------------------------------------------------- + def setup(self) -> None: + # Setup each module + self.dist_sensor = DistanceSensor(self, self.robot_mqtt_client) + self.proximity_sensor = ProximitySensor(self, self.robot_mqtt_client) + self.color_sensor = ColorSensor(self, self.robot_mqtt_client) + + self.neo_pixel = NeoPixel(self, self.robot_mqtt_client) + + self.simple_comm = SimpleCommunication(self.id, self.robot_mqtt_client) + self.directed_comm = DirectedCommunication(self.id, self.robot_mqtt_client) + + self.coordinates.set_coordinate_heading( + self.coordinates.get_x(), + self.coordinates.get_y(), + self.coordinates.get_heading(), + ) + self.coordinates.publish_coordinate() + + def get_id(self) -> int: + return self.id + + def run(self) -> None: + self.setup() + while True: + begin_time = time.time() * 1000 + try: + self.loop() + except Exception as e: # noqa: BLE001 + print(e) + end_time = time.time() * 1000 + + # Maintain ~1Hz loop rate as in Java implementation + self.delay(int(max(0, 1000 - (end_time - begin_time)))) + + try: + self.handle_subscribe_queue() + except Exception as e: # noqa: BLE001 + print(e) + + # Subscription handler ---------------------------------------------- + def handle_subscribe_queue(self) -> None: + while self.robot_mqtt_client.in_queue: + m: Optional[MqttMsg] = self.robot_mqtt_client.in_queue_pop() + if not m: + continue + tg0 = m.topic_groups[0] if m.topic_groups else "" + if tg0 == "robot": + self.robot_mqtt.handle_subscription(self, m) + elif tg0 == "sensor": + if len(m.topic_groups) > 1: + if m.topic_groups[1] == "distance": + self.dist_sensor.handle_subscription(self, m) + elif m.topic_groups[1] == "color": + self.color_sensor.handle_subscription(self, m) + elif m.topic_groups[1] == "proximity": + self.proximity_sensor.handle_subscription(self, m) + elif tg0 == "localization": + if m.topic == "localization/update/?": + self.coordinates.handle_subscription(self, m) + elif tg0 == "comm": + if len(m.topic_groups) > 2 and m.topic_groups[2] == "simple": + self.simple_comm.handle_subscription(self, m) + else: + self.directed_comm.handle_subscription(self, m) + elif tg0 == "output": + if len(m.topic_groups) > 1 and m.topic_groups[1] == "NeoPixel": + self.neo_pixel.handle_subscription(self, m) + + # State handlers ----------------------------------------------------- + def start(self) -> None: + print(f"Robot start on {self.id}") + self.state = RobotState.RUN + + def stop(self) -> None: + print(f"Robot stop on {self.id}") + self.state = RobotState.WAIT + + def reset(self) -> None: + print(f"Robot reset on {self.id}") + self.state = RobotState.BEGIN + + # Utility ------------------------------------------------------------ + def delay(self, milliseconds: int) -> None: + try: + time.sleep(max(0, milliseconds) / 1000.0) + except Exception: + pass + + # Abstracts to implement in subclasses ------------------------------ + def loop(self) -> None: + pass + + def sensor_interrupt(self, sensor: str, value: str) -> None: + pass + + def communication_interrupt(self, msg: str) -> None: + pass diff --git a/src/robot/sensors/abstract_sensor.py b/src/robot/sensors/abstract_sensor.py new file mode 100644 index 0000000..08d12b4 --- /dev/null +++ b/src/robot/sensors/abstract_sensor.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +from robot.interfaces import IMqttHandler +from robot.mqtt.robot_mqtt_client import RobotMqttClient + + +class AbstractSensor(IMqttHandler): + def __init__(self, robot, mqtt_client: RobotMqttClient): + self.robot_mqtt_client = mqtt_client + self.robot_id = robot.get_id() + self.robot = robot + + def handle_subscription(self, r, m): # default no-op + pass diff --git a/src/robot/sensors/color.py b/src/robot/sensors/color.py index 94fe0be..24df54f 100644 --- a/src/robot/sensors/color.py +++ b/src/robot/sensors/color.py @@ -1,7 +1,67 @@ -"""Color sensor implementation.""" +"""Color sensor implementation mirroring Java logic.""" +from __future__ import annotations -class ColorSensor: - """Placeholder color sensor.""" +import json +from time import time - pass +from robot.exception import SensorException +from robot.mqtt.mqtt_msg import MqttMsg +from robot.mqtt.robot_mqtt_client import RobotMqttClient +from robot.types import RGBColorType + +from .abstract_sensor import AbstractSensor + + +class ColorSensor(AbstractSensor): + MQTT_TIMEOUT = 1000 # ms + + def __init__(self, robot, mqtt_client: RobotMqttClient): + super().__init__(robot, mqtt_client) + self._topics_sub: dict[str, str] = {} + self.color = RGBColorType(0, 0, 0) + self._subscribe("COLOR_IN", f"sensor/color/{self.robot_id}") + self._subscribe("COLOR_LOOK", f"sensor/color/{self.robot_id}/?") + self._col_lock = False + + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) + + def handle_subscription(self, robot, m: MqttMsg) -> None: + topic, msg = m.topic, m.message + if topic == self._topics_sub.get("COLOR_IN"): + self.color.set_color_from_str(msg) + self._col_lock = False + else: + print(f"Received (unknown): {topic}> {m}") + + def get_color(self) -> RGBColorType: + msg = {"id": self.robot_id, "reality": "M"} + self._col_lock = True + self.robot_mqtt_client.publish("sensor/color", json.dumps(msg)) + self.robot.delay(250) + + start_time = time() * 1000 + timeout = False + while self._col_lock and not timeout: + try: + self.robot.handle_subscribe_queue() + except Exception as e: # noqa: BLE001 + print(e) + self.robot.delay(100) + timeout = time() * 1000 - start_time > self.MQTT_TIMEOUT + + if timeout: + raise SensorException("Color sensor timeout") + return self.color + + def send_color(self, red: int, green: int, blue: int, ambient: int) -> None: + obj = { + "id": self.robot_id, + "R": red, + "G": green, + "B": blue, + "ambient": ambient, + } + self.robot_mqtt_client.publish("sensor/color/", json.dumps(obj)) diff --git a/src/robot/sensors/distance.py b/src/robot/sensors/distance.py index 3c8c035..1d9b097 100644 --- a/src/robot/sensors/distance.py +++ b/src/robot/sensors/distance.py @@ -1,7 +1,63 @@ -"""Distance sensor implementation.""" +"""Distance sensor implementation mirroring Java logic.""" +from __future__ import annotations -class DistanceSensor: - """Placeholder distance sensor.""" +import json +from time import time - pass +from robot.mqtt.robot_mqtt_client import RobotMqttClient +from robot.mqtt.mqtt_msg import MqttMsg +from robot.exception import SensorException +from .abstract_sensor import AbstractSensor + + +class DistanceSensor(AbstractSensor): + MQTT_TIMEOUT = 1000 # ms + + def __init__(self, robot, mqtt_client: RobotMqttClient): + super().__init__(robot, mqtt_client) + self._topics_sub: dict[str, str] = {} + self._subscribe("DISTANCE_IN", f"sensor/distance/{self.robot_id}") + self._subscribe("DISTANCE_LOOK", f"sensor/distance/{self.robot_id}/?") + self._dist_lock = False + self._dist_value = 0 + + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) + + def handle_subscription(self, robot, m: MqttMsg) -> None: + topic, msg = m.topic, m.message + if topic == self._topics_sub.get("DISTANCE_IN"): + if msg == "Infinity": + self._dist_value = -1 + else: + self._dist_value = int(msg) + self._dist_lock = False + else: + print(f"Received (unknown): {topic}> {msg}") + + def get_distance(self) -> float: + msg = {"id": self.robot_id, "reality": "M"} + self._dist_lock = True + self.robot_mqtt_client.publish("sensor/distance", json.dumps(msg)) + self.robot.delay(250) + + start_time = time() * 1000 + timeout = False + while self._dist_lock and not timeout: + try: + self.robot.handle_subscribe_queue() + except Exception as e: # noqa: BLE001 + print(e) + self.robot.delay(100) + timeout = (time() * 1000 - start_time > self.MQTT_TIMEOUT) + + if timeout: + raise SensorException("Distance sensor timeout") + + return float(self._dist_value) + + def send_distance(self, dist: float) -> None: + obj = {"id": self.robot_id, "dist": dist} + self.robot_mqtt_client.publish("sensor/distance/", json.dumps(obj)) diff --git a/src/robot/sensors/proximity.py b/src/robot/sensors/proximity.py index 683a83d..8adf3a5 100644 --- a/src/robot/sensors/proximity.py +++ b/src/robot/sensors/proximity.py @@ -1,7 +1,80 @@ -"""Proximity sensor implementation.""" +"""Proximity sensor implementation mirroring Java logic.""" +from __future__ import annotations -class ProximitySensor: - """Placeholder proximity sensor.""" +import json +from time import time - pass +from robot.exception import SensorException +from robot.mqtt.mqtt_msg import MqttMsg +from robot.mqtt.robot_mqtt_client import RobotMqttClient +from robot.types import ProximityReadingType + +from .abstract_sensor import AbstractSensor + + +class ProximitySensor(AbstractSensor): + MQTT_TIMEOUT = 2000 # ms + + def __init__(self, robot, mqtt_client: RobotMqttClient): + super().__init__(robot, mqtt_client) + self._topics_sub: dict[str, str] = {} + self._subscribe("PROXIMITY_IN", f"sensor/proximity/{self.robot_id}") + self._proximity_lock = False + self._proximity: ProximityReadingType | None = None + self._angles: list[int] = [0] + + def __init__with_angles( + self, robot, angles: list[int], mqtt_client: RobotMqttClient + ): # helper to mirror overload + super().__init__(robot, mqtt_client) + self._topics_sub = {} + self._angles = list(angles) + self._subscribe("PROXIMITY_IN", f"sensor/proximity/{self.robot_id}") + self._proximity_lock = False + self._proximity = None + + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) + + def handle_subscription(self, robot, m: MqttMsg) -> None: + topic, msg = m.topic, m.message + if topic == self._topics_sub.get("PROXIMITY_IN"): + try: + self._proximity = ProximityReadingType(self._angles, msg) + except Exception as e: # noqa: BLE001 + print(e) + self._proximity_lock = False + else: + print(f"Received (unknown): {topic}> {msg}") + + def set_angles(self, proximity_angles: list[int]) -> None: + self._angles = list(proximity_angles) + + def get_proximity(self) -> ProximityReadingType: + angle_array = list(self._angles) + msg = {"id": self.robot_id, "angles": angle_array, "reality": "V"} + self._proximity_lock = True + self.robot_mqtt_client.publish("sensor/proximity", json.dumps(msg)) + self.robot.delay(250 * len(self._angles)) + + start_time = time() * 1000 + timeout = False + while self._proximity_lock and not timeout: + try: + self.robot.handle_subscribe_queue() + except Exception as e: # noqa: BLE001 + print(e) + self.robot.delay(100) + timeout = time() * 1000 - start_time > self.MQTT_TIMEOUT + + if timeout: + raise SensorException("Proximity sensor timeout") + assert self._proximity is not None + return self._proximity + + def send_proximity(self) -> None: + assert self._proximity is not None + obj = {"id": self.robot_id, "proximity": str(self._proximity)} + self.robot_mqtt_client.publish("sensor/proximity/", json.dumps(obj)) diff --git a/src/robot/types/__init__.py b/src/robot/types/__init__.py new file mode 100644 index 0000000..0111fa1 --- /dev/null +++ b/src/robot/types/__init__.py @@ -0,0 +1,7 @@ +from .proximity_reading_type import ProximityReadingType +from .rgb_color_type import RGBColorType + +__all__ = [ + "RGBColorType", + "ProximityReadingType", +] diff --git a/src/robot/types/proximity_reading_type.py b/src/robot/types/proximity_reading_type.py new file mode 100644 index 0000000..e36345a --- /dev/null +++ b/src/robot/types/proximity_reading_type.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from robot.exception import ProximityException + +from .rgb_color_type import RGBColorType + + +class ProximityReadingType: + def __init__(self, angles: list[int] | tuple[int, ...], s: str): + reading_count = len(angles) + values = s.split() + + self._distances: list[int] = [0] * reading_count + self._colors: list[RGBColorType] = [ + RGBColorType(0, 0, 0) for _ in range(reading_count) + ] + + if len(values) != reading_count * 2: + raise ProximityException( + f"ProximityReadingType: length mismatch {len(values)}" + ) + + for i in range(reading_count): + vi = values[i] + if vi == "Infinity": + print(f"Proximity: Infinity reading received for {i}") + self._distances[i] = -1 + else: + self._distances[i] = int(vi) + + color = RGBColorType(0, 0, 0) + color.set_color_from_hex_code(values[reading_count + i]) + self._colors[i] = color + + def distances(self) -> list[int]: + return self._distances + + def colors(self) -> list[RGBColorType]: + return self._colors + + def __str__(self) -> str: + parts: list[str] = [] + parts.extend(str(d) for d in self._distances) + parts.append(" ") + parts.extend(str(c) for c in self._colors) + return " ".join(parts) diff --git a/src/robot/types/rgb_color_type.py b/src/robot/types/rgb_color_type.py new file mode 100644 index 0000000..412183f --- /dev/null +++ b/src/robot/types/rgb_color_type.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +from typing import Iterable + +from robot.exception import RGBColorException + + +class RGBColorType: + def __init__( + self, + R: int | str | Iterable[int], + G: int | None = None, + B: int | None = None, + ): + # Overloads similar to Java constructors + if isinstance(R, str): + # "R G B" format or hex code like "#00AAFF" + s = R.strip() + if s.startswith("#"): + self.set_color_from_hex_code(s) + else: + self.set_color_from_str(s) + elif G is None and B is None and not isinstance(R, int): + vals = list(R) + if not (len(vals) == 3 or len(vals) == 4): + raise ValueError( + "length of the color[] should be equal to 3 (ambient ignored)" + ) + self.set_color(vals[0], vals[1], vals[2]) + else: + assert G is not None and B is not None + self.set_color(R, G, B) + + def set_color_from_str(self, s: str) -> None: + parts = s.split() + if not (len(parts) == 3 or len(parts) == 4): + raise RGBColorException() + self.set_color(int(parts[0]), int(parts[1]), int(parts[2])) + + def set_color_from_hex_code(self, hex_code: str) -> None: + # Expecting format like "#RRGGBB" + self.R = self._validate(int(hex_code[1:3], 16)) + self.G = self._validate(int(hex_code[3:5], 16)) + self.B = self._validate(int(hex_code[5:7], 16)) + + def set_color(self, R: int, G: int, B: int) -> None: + if not (0 <= R <= 255): + raise RGBColorException(R, G, B) + if not (0 <= G <= 255): + raise RGBColorException(R, G, B) + if not (0 <= B <= 255): + raise RGBColorException(R, G, B) + + self.R = R + self.G = G + self.B = B + + def get_r(self) -> int: + return self.R + + def get_g(self) -> int: + return self.G + + def get_b(self) -> int: + return self.B + + def get_color(self) -> list[int]: + return [self.R, self.G, self.B] + + def __str__(self) -> str: + return f"R:{self.R}, G:{self.G}, B:{self.B}" + + def to_string_color(self) -> str: + return f"{self.R} {self.G} {self.B}" + + def compare_to(self, color: "RGBColorType") -> bool: + return ( + (color.get_r() == self.R) + and (color.get_g() == self.G) + and (color.get_b() == self.B) + ) diff --git a/src/robot/virtual_robot.py b/src/robot/virtual_robot.py new file mode 100644 index 0000000..25baa29 --- /dev/null +++ b/src/robot/virtual_robot.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from .robot_base import Robot + + +class VirtualRobot(Robot): + def __init__(self, id: int, x: float, y: float, heading: float): + super().__init__(id, x, y, heading, "V") + + def loop(self) -> None: + # To be implemented by user subclasses + pass + + def sensor_interrupt(self, sensor: str, value: str) -> None: + if sensor == "distance": + print(f"Distance sensor interrupt on {self.id}with value{value}") + elif sensor == "color": + print(f"Color sensor interrupt on {self.id}with value{value}") + elif sensor == "proximity": + print(f"Proximity sensor interrupt on {self.id}with value{value}") + else: + print("Unknown sensor type") + + def communication_interrupt(self, msg: str) -> None: + # To be implemented in subclass if desired + pass