diff --git a/topological_navigation/launch/restriction_handling_example.launch.xml b/topological_navigation/launch/restriction_handling_example.launch.xml new file mode 100755 index 00000000..cc5cd09b --- /dev/null +++ b/topological_navigation/launch/restriction_handling_example.launch.xml @@ -0,0 +1,57 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/topological_navigation/launch/restrictions.launch.py b/topological_navigation/launch/restrictions.launch.py new file mode 100644 index 00000000..1833ee85 --- /dev/null +++ b/topological_navigation/launch/restrictions.launch.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +#! /usr/bin/env python3 +# ---------------------------------- +# @author: jheselden +# @email: jheselden@lincoln.ac.uk +# @date: +# ---------------------------------- + +import os +from os.path import join + +from ament_index_python.packages import get_package_share_directory + +from launch_ros.actions import Node +from launch import LaunchDescription +from launch.actions import IncludeLaunchDescription, DeclareLaunchArgument, GroupAction +from launch.launch_description_sources import PythonLaunchDescriptionSource +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node, SetParameter + + +def generate_launch_description(): + LD = LaunchDescription() + + ## Define Arguments + tmap_input = LaunchConfiguration('tmap') + tmap_default = os.getenv('TMAP_FILE') + LD.add_action(DeclareLaunchArgument('tmap', default_value=tmap_default)) + + ## Topological Map Server + LD.add_action(Node( + package='topological_navigation', + executable='map_manager2.py', + name='topomap2_server', + arguments=[tmap_default] + )) + LD.add_action(Node( + package='topological_navigation', + executable='topological_transform_publisher.py', + name='topological_transform_publisher' + )) + LD.add_action(Node( + package='topological_navigation', + executable='topomap_marker2.py', + name='topomap_marker2' + )) + + ## Restrictions Handler Example + LD.add_action(Node( + package='topological_navigation', + executable='restrictions_handler.py', + name='restrictions_handler', + parameters=[{'enable_eval_sub': True}, + {'initial_restriction': "'robot_short' in '$' or '$' == 'True'"}], + remappings=[('/topological_map_2', '/topological_map_2')] + )) + LD.add_action(Node( + package='topological_navigation', + executable='topological_transform_publisher.py', + name='restricted_topological_transform_publisher', + remappings=[('/topological_map_2', '/restrictions_handler/topological_map_2')] + )) + LD.add_action(Node( + package='topological_navigation', + executable='topomap_marker2.py', + name='restricted_topomap_marker2', + remappings=[('/topological_map_2', '/restrictions_handler/topological_map_2')] + )) + + + ## Execute all Components + return LD diff --git a/topological_navigation/setup.py b/topological_navigation/setup.py index 772f2b6e..f5001dfc 100644 --- a/topological_navigation/setup.py +++ b/topological_navigation/setup.py @@ -36,6 +36,7 @@ 'navigation2.py = topological_navigation.scripts.navigation2:main', 'navstats_logger.py = topological_navigation.scripts.navstats_logger:main', 'reconf_at_edges_server.py = topological_navigation.scripts.reconf_at_edges_server:main', + 'restrictions_handler.py = topological_navigation.scripts.restrictions_handler:main', 'restrictions_manager.py = topological_navigation.scripts.restrictions_manager:main', 'search_route.py = topological_navigation.scripts.search_route:main', 'speed_based_prediction.py = topological_navigation.scripts.speed_based_prediction:main', diff --git a/topological_navigation/topological_navigation/scripts/restrictions_handler.py b/topological_navigation/topological_navigation/scripts/restrictions_handler.py new file mode 100755 index 00000000..097e313e --- /dev/null +++ b/topological_navigation/topological_navigation/scripts/restrictions_handler.py @@ -0,0 +1,178 @@ +# -*- coding: utf-8 -*- +#! /usr/bin/env python3 +# ---------------------------------- +# @author: jheselden +# @email: jheselden@lincoln.ac.uk +# @date: 16 november 2023 +# ---------------------------------- + +# Builtin imports +import yaml, json +from pprint import pprint + +# ROS2 imports +import rclpy +from rclpy.node import Node +from rclpy import Parameter +from rclpy.qos import QoSProfile, DurabilityPolicy + +# Msg imports +from std_msgs.msg import String +from topological_navigation_msgs.msg import TopologicalMap as TMap + + + +class Restrictor(Node): + + def __init__(self): + super().__init__('restriction_handler') + + # Declare Parameters + self.declare_parameter('initial_restriction', "") + self.declare_parameter('enable_eval_sub', True) + ir = Parameter('str', Parameter.Type.STRING, "") + es = Parameter('bool', Parameter.Type.BOOL, True) + + # Set initial eval restriction + initial_restriction = self.get_parameter_or("initial_restriction", ir).value + self.eval = str(initial_restriction) if initial_restriction else None + self.get_logger().info(f"restrictor launched with initial restriction as: {self.eval}") + + # Setup publisher for map + self.res_tmap2_pub = self.create_publisher(String, '~/restricted_topological_map_2', 2) + + # Check if eval subscriber is needed to enable changes to eval restriction condition + enable_eval_sub = self.get_parameter_or("~/enable_eval_sub", es).value + if enable_eval_sub: + qos = QoSProfile(depth=1, durability=DurabilityPolicy.TRANSIENT_LOCAL) + self.res_eval_sub = self.create_subscription(String, '~/restriction_evalator', self.py_eval, qos) + + # Subscribe to map + self.tmap = None + qos = QoSProfile(depth=1, durability=DurabilityPolicy.TRANSIENT_LOCAL) + self.tmap2_sub = self.create_subscription(String, '/topological_map_2', self.tmap_cb, qos) + + def tmap_cb(self, msg): + #Recieve and decode map, and send through filter is evaluation paramaters are already set + self.tmap = yaml.safe_load(msg.data) + self.get_logger().info(f"map recieved with {len(self.tmap['nodes'])} nodes") + if self.eval: + self.py_eval(String(data=self.eval)) + + + def py_eval(self, msg): + """ This eval is compared with the restriction condition for the topological map, only those which match the evaluation remain. + e.g. | node.restriction = "['short', 'tall']" + | msg.data = "'short' in $" + | eval( "'short' in $".replace('$', node.restriction) ) + | eval( "'short' in ['short', 'tall']" ) = True + """ + self.get_logger().info(f"py_eval: {msg.data}") + + # Early exit for if tmap not available + if not self.tmap: + self.eval = msg + self.get_logger().info("tmap not recieved, will apply eval once map arrives") + return + tmap = self.tmap + nodes_to_keep = [] + + # Quick pass to reduce evals for multilples of the same restriction condition + condition = msg.data + self.condition_results = {True:[], False:[]} + + # Filter nodes + self.get_logger().info('\n\n\n') + + total_nodes = len(tmap['nodes']) + tmap['nodes'] = [n for n in tmap['nodes'] if self.check_outcome(condition, n['node']['restrictions_planning'])] + remaining_nodes = len(tmap['nodes']) + self.get_logger().info(f"filter removed {total_nodes - remaining_nodes} nodes") + + # Filter edges + kept_nodes = [n['node']['name'] for n in tmap['nodes']] + total_edges = sum([len(n['node']['edges']) for n in tmap['nodes']]) + + for node in tmap['nodes']: + if 'edges' in node['node']: + # Only allow edges which go to a kept node + node['node']['edges'] = [e for e in node['node']['edges'] if e['node'] in kept_nodes] + # Only allow edges which match the condition + node['node']['edges'] = [e for e in node['node']['edges'] if self.check_outcome(condition, e['restrictions_planning'])] + + remaining_edges = sum([len(n['node']['edges']) for n in tmap['nodes']]) + self.get_logger().info(f"filter removed {total_edges-remaining_edges} edges") + + # Output results of each restriction evaluation + self.get_logger().info("Restriction evaluations:") + for c in self.condition_results[True]: self.get_logger().info(f" True: {c}") + for c in self.condition_results[False]: self.get_logger().info(f" False: {c}") + + # Save and publish new map + self.tmap = tmap + s = json.dumps(self.tmap) + self.res_tmap2_pub.publish(String(data=s)) + + + def check_outcome(self, query, restriction): + # Format query and restriction together for eval + check = query.replace('$', str(restriction)) + + # Early exit for if eval already processed for previous component + if check in self.condition_results[True]: return True + if check in self.condition_results[False]: return False + + # Process check condition + result = eval(check) + self.get_logger().info(f"checking: {check} = {result}") + + # Save result for early exit + self.condition_results[result] += [check] + + return result + + + +def main(args=None): + rclpy.init(args=args) + + R = Restrictor() + rclpy.spin(R) + + R.destroy_node() + rclpy.shutdown() + + +def example(args=None): + print('Example 1:') + node_restriction = "['short', 'tall']" + msg_data = "'short' in $" + query = msg_data.replace('$', node_restriction) + print('restriction', node_restriction) + print('condition', msg_data) + print('full query', query) + print('query result', eval(query)) + print('\n') + + print('Example 2:') + node_restriction = "robot_short & robot_tall" + msg_data = "'robot_short' in '$'" + query = msg_data.replace('$', node_restriction) + print('restriction', node_restriction) + print('condition', msg_data) + print('full query', query) + print('query result', eval(query)) + print('\n') + + print('Example 3:') + node_restriction = "'True'" + msg_data = "'robot_short' in '$' or '$' == 'True'" + query = msg_data.replace('$', node_restriction) + print('restriction', node_restriction) + print('condition', msg_data) + print('full query', query) + print('query result', eval(query)) + print('\n') + +if __name__ == '__main__': + main() diff --git a/topological_navigation/topological_navigation/topomap_marker2.py b/topological_navigation/topological_navigation/topomap_marker2.py index c8e0e425..9f1d01e2 100644 --- a/topological_navigation/topological_navigation/topomap_marker2.py +++ b/topological_navigation/topological_navigation/topomap_marker2.py @@ -39,6 +39,9 @@ def rescale_callback(self, msg): self.get_logger().info('new scale recieved') self.get_logger().info(f'scale: {msg.data}') self.scale = float(msg.data) + if self.map == None: + self.get_logger().warn(f'map not recieved') + return self.map_callback(self.map) def map_callback(self, msg):