Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 45 additions & 17 deletions dimos/agents/skills/navigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class NavigationSkillContainer(SkillModule):
"WavefrontFrontierExplorer.stop_exploration",
"WavefrontFrontierExplorer.explore",
"WavefrontFrontierExplorer.is_exploration_active",
"ObjectDBModule.lookup",
]

color_image: In[Image]
Expand Down Expand Up @@ -93,7 +94,7 @@ def tag_location(self, location_name: str) -> str:

if not self._skill_started:
raise ValueError(f"{self} has not been started.")
tf = self.tf.get("map", "base_link", time_tolerance=2.0)
tf = self.tf.get("world", "base_link", time_tolerance=2.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a breaking change that i thinik was never tested or caught

if not tf:
return "Could not get the robot's current transform."

Expand All @@ -114,37 +115,64 @@ def tag_location(self, location_name: str) -> str:
return f"Tagged '{location_name}': ({position.x},{position.y})."

@skill()
def navigate_with_text(self, query: str) -> str:
"""Navigate to a location by querying the existing semantic map using natural language.

First attempts to locate an object in the robot's camera view using vision.
If the object is found, navigates to it. If not, falls back to querying the
semantic map for a location matching the description.
CALL THIS SKILL FOR ONE SUBJECT AT A TIME. For example: "Go to the person wearing a blue shirt in the living room",
you should call this skill twice, once for the person wearing a blue shirt and once for the living room.
def navigate_to_detected_object(self, object_name: str) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

object name cant be a string need type

"""Navigate to an object detected by the vision system.

Use this skill to navigate to specific objects that have been detected
by the robot's cameras, such as:
- "Navigate to person in white"
- "Go to red coffee mug"
- "Move to the wooden chair"

Args:
query: Text query to search for in the semantic map
object_name: Description or name of the object to navigate to

Returns:
Status message indicating success or failure
"""
if not self._skill_started:
raise ValueError(f"{self} has not been started.")

lookup_rpc = self.get_rpc_calls("ObjectDBModule.lookup")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@paul-nechifor we need to fix this rpc shit with modules super weird and hard coded

objects = lookup_rpc(object_name)

if not objects:
return f"No objects found matching '{object_name}'"

obj = objects[0]

goal_pose = PoseStamped(
position=obj.pose.position,
orientation=Quaternion(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets calculate goal_pose via a seperate method. This is goal point which is just position. Goal_pose requires /global_map and /odom and the /goal_point and it picks the closest point within some buffer due to the size of the robot that is not in collision and then points the right way using trig.

frame_id=obj.pose.frame_id
)

result = self._navigate_to(goal_pose)

if result:
return f"Successfully navigated to '{obj.name}'"
else:
return f"Failed to reach '{obj.name}'"

#@skill()
def navigate_with_text(self, query: str) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def navigate_with_text(self, query: str) -> str:
@skill()
def navigate_with_text(self, query: str) -> str:

"""Navigate to a location by querying the existing semantic map..."""
if not self._skill_started:
raise ValueError(f"{self} has not been started.")

success_msg = self._navigate_by_tagged_location(query)
if success_msg:
return success_msg

logger.info(f"No tagged location found for {query}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert


success_msg = self._navigate_to_object(query)
if success_msg:
return success_msg

logger.info(f"No object in view found for {query}")

success_msg = self._navigate_using_semantic_map(query)
if success_msg:
return success_msg

return f"No tagged location called '{query}'. No object in view matching '{query}'. No matching location found in semantic map for '{query}'."
return f"Could not find '{query}' using any method"

def _navigate_by_tagged_location(self, query: str) -> str | None:
try:
Expand All @@ -162,7 +190,7 @@ def _navigate_by_tagged_location(self, query: str) -> str | None:
goal_pose = PoseStamped(
position=make_vector3(*robot_location.position),
orientation=Quaternion.from_euler(Vector3(*robot_location.rotation)),
frame_id="map",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug fix from frame thing

frame_id="world",
)

result = self._navigate_to(goal_pose)
Expand Down Expand Up @@ -393,7 +421,7 @@ def _get_goal_pose_from_result(self, result: dict[str, Any]) -> PoseStamped | No
return PoseStamped(
position=make_vector3(pos_x, pos_y, 0),
orientation=Quaternion.from_euler(make_vector3(0, 0, theta)),
frame_id="map",
frame_id="world",
)


Expand Down
6 changes: 3 additions & 3 deletions dimos/agents_deprecated/memory/spatial_vector_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def add_image_vector(
ids=[vector_id], embeddings=[embedding.tolist()], metadatas=[metadata]
)

logger.info(f"Added image vector {vector_id} with metadata: {metadata}")
#logger.info(f"Added image vector {vector_id} with metadata: {metadata}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert


def query_by_embedding(self, embedding: np.ndarray, limit: int = 5) -> list[dict]: # type: ignore[type-arg]
"""
Expand Down Expand Up @@ -225,8 +225,8 @@ def _process_query_results(self, results) -> list[dict]: # type: ignore[no-unty
)

# Get the image from visual memory
image = self.visual_memory.get(lookup_id)
result["image"] = image
#image = self.visual_memory.get(lookup_id)
#result["image"] = image

processed_results.append(result)

Expand Down
Loading
Loading