Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dimos/hardware/manipulators/xarm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2025-2026 Dimensional Inc.
# Copyright 2025 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
11 changes: 1 addition & 10 deletions dimos/hardware/manipulators/xarm/components/gripper_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,9 @@
- Robotiq gripper
"""

from typing import TYPE_CHECKING, Any

from dimos.core import rpc
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
from xarm.wrapper import XArmAPI

logger = setup_logger()


Expand All @@ -42,10 +37,6 @@ class GripperControlComponent:
- self.config: XArmDriverConfig instance
"""

# Type hints for attributes expected from parent class
arm: "XArmAPI"
config: Any # Config dict accessed as object (dict with attribute access)

# =========================================================================
# Standard xArm Gripper
# =========================================================================
Expand Down Expand Up @@ -314,7 +305,7 @@ def robotiq_close(
return (-1, str(e))

@rpc
def robotiq_get_status(self) -> tuple[int, dict[str, Any] | None]:
def robotiq_get_status(self) -> tuple[int, dict | None]:
"""Get Robotiq gripper status."""
try:
ret = self.arm.robotiq_get_status()
Expand Down
9 changes: 0 additions & 9 deletions dimos/hardware/manipulators/xarm/components/kinematics.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,9 @@
- Inverse kinematics
"""

from typing import TYPE_CHECKING, Any

from dimos.core import rpc
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
from xarm.wrapper import XArmAPI

logger = setup_logger()


Expand All @@ -40,10 +35,6 @@ class KinematicsComponent:
- self.config: XArmDriverConfig instance
"""

# Type hints for attributes expected from parent class
arm: "XArmAPI"
config: Any # Config dict accessed as object (dict with attribute access)

@rpc
def get_inverse_kinematics(self, pose: list[float]) -> tuple[int, list[float] | None]:
"""
Expand Down
11 changes: 0 additions & 11 deletions dimos/hardware/manipulators/xarm/components/motion_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,10 @@
"""

import math
import threading
from typing import TYPE_CHECKING, Any

from dimos.core import rpc
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
from xarm.wrapper import XArmAPI

logger = setup_logger()


Expand All @@ -46,12 +41,6 @@ class MotionControlComponent:
- self._joint_cmd_: Optional[list[float]]
"""

# Type hints for attributes expected from parent class
arm: "XArmAPI"
config: Any # Config dict accessed as object (dict with attribute access)
_joint_cmd_lock: threading.Lock
_joint_cmd_: list[float] | None

@rpc
def set_joint_angles(self, angles: list[float]) -> tuple[int, str]:
"""
Expand Down
97 changes: 85 additions & 12 deletions dimos/hardware/manipulators/xarm/components/state_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,12 @@
- Firmware version
"""

import threading
from typing import TYPE_CHECKING, Any
from typing import Optional

from dimos.core import rpc
from dimos.msgs.sensor_msgs import JointState, RobotState
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
from xarm.wrapper import XArmAPI

logger = setup_logger()


Expand All @@ -47,13 +43,6 @@ class StateQueryComponent:
- self._robot_state_: Optional[RobotState]
"""

# Type hints for attributes expected from parent class
arm: "XArmAPI"
config: Any # Config dict accessed as object (dict with attribute access)
_joint_state_lock: threading.Lock
_joint_states_: JointState | None
_robot_state_: RobotState | None

@rpc
def get_joint_state(self) -> JointState | None:
"""
Expand Down Expand Up @@ -183,3 +172,87 @@ def get_ft_sensor_mode(self) -> tuple[int, int | None]:
return (code, mode if code == 0 else None)
except Exception:
return (-1, None)

@rpc
def get_servo_angle(self) -> tuple[int, list[float] | None]:
"""Get joint angles."""
try:
code, angles = self.arm.get_servo_angle(is_radian=self.config.is_radian)
return (code, list(angles) if code == 0 else None)
except Exception as e:
logger.error(f"get_servo_angle failed: {e}")
return (-1, None)

@rpc
def get_position_aa(self) -> tuple[int, list[float] | None]:
"""Get TCP position in axis-angle format."""
try:
code, position = self.arm.get_position_aa(is_radian=self.config.is_radian)
return (code, list(position) if code == 0 else None)
except Exception as e:
logger.error(f"get_position_aa failed: {e}")
return (-1, None)

# =========================================================================
# Robot State Queries
# =========================================================================

@rpc
def get_state(self) -> tuple[int, int | None]:
"""Get robot state (0=ready, 3=pause, 4=stop)."""
try:
code, state = self.arm.get_state()
return (code, state if code == 0 else None)
except Exception:
return (-1, None)

@rpc
def get_cmdnum(self) -> tuple[int, int | None]:
"""Get command queue length."""
try:
code, cmdnum = self.arm.get_cmdnum()
return (code, cmdnum if code == 0 else None)
except Exception:
return (-1, None)

@rpc
def get_err_warn_code(self) -> tuple[int, list[int] | None]:
"""Get error and warning codes."""
try:
err_warn = [0, 0]
code = self.arm.get_err_warn_code(err_warn)
return (code, err_warn if code == 0 else None)
except Exception:
return (-1, None)

# =========================================================================
# Force/Torque Sensor Queries
# =========================================================================

@rpc
def get_ft_sensor_data(self) -> tuple[int, list[float] | None]:
"""Get force/torque sensor data [fx, fy, fz, tx, ty, tz]."""
try:
code, ft_data = self.arm.get_ft_sensor_data()
return (code, list(ft_data) if code == 0 else None)
except Exception as e:
logger.error(f"get_ft_sensor_data failed: {e}")
return (-1, None)

@rpc
def get_ft_sensor_error(self) -> tuple[int, int | None]:
"""Get FT sensor error code."""
try:
code, error = self.arm.get_ft_sensor_error()
return (code, error if code == 0 else None)
except Exception:
return (-1, None)

@rpc
def get_ft_sensor_mode(self) -> tuple[int, int | None]:
"""Get FT sensor application mode."""
try:
code, mode = self.arm.get_ft_sensor_app_get()
return (code, mode if code == 0 else None)
except Exception:
return (-1, None)
16 changes: 0 additions & 16 deletions dimos/hardware/manipulators/xarm/components/system_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,9 @@
- Emergency stop
"""

from typing import TYPE_CHECKING, Any, Protocol

from dimos.core import rpc
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
from xarm.wrapper import XArmAPI

class XArmConfig(Protocol):
"""Protocol for XArm configuration."""

is_radian: bool
velocity_control: bool


logger = setup_logger()


Expand All @@ -49,10 +37,6 @@ class SystemControlComponent:
- self.config: XArmDriverConfig instance
"""

# Type hints for attributes expected from parent class
arm: "XArmAPI"
config: Any # Should be XArmConfig but accessed as dict

@rpc
def enable_servo_mode(self) -> tuple[int, str]:
"""
Expand Down
7 changes: 4 additions & 3 deletions dimos/hardware/manipulators/xarm/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
from typing import Protocol

from dimos.core import In, Out
from dimos.msgs.geometry_msgs import WrenchStamped
from dimos.msgs.sensor_msgs import JointCommand, JointState
from dimos.msgs.geometry_msgs import PoseStamped, Twist, WrenchStamped
from dimos.msgs.nav_msgs import Path
from dimos.msgs.sensor_msgs import JointCommand, JointState, RobotState


@dataclass
class RobotState:
"""Custom message containing full robot state (deprecated - use RobotStateMsg)."""
"""Custom message containing full robot state."""

state: int = 0 # Robot state (0: ready, 3: paused, 4: stopped, etc.)
mode: int = 0 # Control mode (0: position, 1: servo, 4: joint velocity, 5: cartesian velocity)
Expand Down
9 changes: 7 additions & 2 deletions dimos/hardware/manipulators/xarm/xarm_blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from dimos.hardware.manipulators.xarm.xarm_driver import xarm_driver as xarm_driver_blueprint
from dimos.manipulation.control import cartesian_motion_controller, joint_trajectory_controller
from dimos.msgs.geometry_msgs import PoseStamped
from dimos.msgs.sensor_msgs import (
from dimos.msgs.sensor_msgs import ( # type: ignore[attr-defined]
JointCommand,
JointState,
RobotState,
Expand All @@ -57,6 +57,7 @@ def xarm_driver(**config: Any) -> Any:
- has_force_torque: Whether F/T sensor is attached (default: False)
- control_rate: Control loop + joint feedback rate in Hz (default: 100)
- monitor_rate: Robot state monitoring rate in Hz (default: 10)
- connection_type: "hardware" for real XArm or "sim" for simulation (default: "hardware")

Returns:
Blueprint configuration for XArmDriver
Expand All @@ -68,6 +69,7 @@ def xarm_driver(**config: Any) -> Any:
config.setdefault("has_force_torque", False)
config.setdefault("control_rate", 100)
config.setdefault("monitor_rate", 10)
config.setdefault("connection_type", "hardware")

# Return the xarm_driver blueprint with the config
return xarm_driver_blueprint(**config)
Expand Down Expand Up @@ -165,8 +167,9 @@ def xarm_driver(**config: Any) -> Any:
dof=6, # XArm6
has_gripper=False,
has_force_torque=False,
control_rate=500,
control_rate=100,
monitor_rate=10,
connection_type="hardware",
),
joint_trajectory_controller(
control_frequency=100.0,
Expand Down Expand Up @@ -196,6 +199,7 @@ def xarm_driver(**config: Any) -> Any:
has_force_torque=False,
control_rate=100,
monitor_rate=10,
connection_type="hardware",
),
joint_trajectory_controller(
control_frequency=100.0,
Expand Down Expand Up @@ -226,6 +230,7 @@ def xarm_driver(**config: Any) -> Any:
has_force_torque=False,
control_rate=100,
monitor_rate=10,
connection_type="hardware",
),
cartesian_motion_controller(
control_frequency=20.0,
Expand Down
1 change: 1 addition & 0 deletions dimos/robot/all_blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"xarm7-servo": "dimos.hardware.manipulators.xarm.xarm_blueprints:xarm7_servo",
"xarm-cartesian": "dimos.hardware.manipulators.xarm.xarm_blueprints:xarm_cartesian",
"xarm-trajectory": "dimos.hardware.manipulators.xarm.xarm_blueprints:xarm_trajectory",
"xarm7-trajectory-sim": "dimos.hardware.manipulators.xarm.xarm_blueprints:xarm7_trajectory_sim",
# Piper manipulator blueprints
"piper-servo": "dimos.hardware.manipulators.piper.piper_blueprints:piper_servo",
"piper-cartesian": "dimos.hardware.manipulators.piper.piper_blueprints:piper_cartesian",
Expand Down
19 changes: 19 additions & 0 deletions dimos/simulation/manipulators/mujoco_sim/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright 2025 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""MuJoCo simulation bridge base classes and utilities."""

from dimos.simulation.manipulators.mujoco_sim.bridge_base import MujocoSimBridgeBase

__all__ = ["MujocoSimBridgeBase"]
Loading