diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 0624f92c62..a35345962d 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -31,6 +31,8 @@ #include "rclcpp/experimental/subscription_intra_process_buffer.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/type_support_decl.hpp" +#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp" +#include "rclcpp/logging.hpp" #include "tracetools/tracetools.h" namespace rclcpp @@ -70,6 +72,8 @@ class SubscriptionIntraProcess using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstDataSharedPtr; using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::SubscribedTypeUniquePtr; using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr; + using SubscriptionTopicStatisticsSharedPtr = + std::shared_ptr; SubscriptionIntraProcess( AnySubscriptionCallback callback, @@ -77,7 +81,8 @@ class SubscriptionIntraProcess rclcpp::Context::SharedPtr context, const std::string & topic_name, const rclcpp::QoS & qos_profile, - rclcpp::IntraProcessBufferType buffer_type) + rclcpp::IntraProcessBufferType buffer_type, + SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr) : SubscriptionIntraProcessBuffer( std::make_shared(*allocator), @@ -85,7 +90,8 @@ class SubscriptionIntraProcess topic_name, qos_profile, buffer_type), - any_callback_(callback) + any_callback_(callback), + subscription_topic_statistics_(std::move(subscription_topic_statistics)) { TRACETOOLS_TRACEPOINT( rclcpp_subscription_callback_added, @@ -174,6 +180,14 @@ class SubscriptionIntraProcess msg_info.publisher_gid = {0, {0}}; msg_info.from_intra_process = true; + std::chrono::time_point now; + if (subscription_topic_statistics_) { + RCLCPP_WARN_ONCE( + rclcpp::get_logger("rclcpp"), + "Intra-process communication does not support accurate message age statistics"); + now = std::chrono::system_clock::now(); + } + auto shared_ptr = std::static_pointer_cast>( data); @@ -185,9 +199,17 @@ class SubscriptionIntraProcess any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info); } shared_ptr.reset(); + + if (subscription_topic_statistics_) { + const auto nanos = std::chrono::time_point_cast(now); + const auto time = rclcpp::Time(nanos.time_since_epoch().count()); + subscription_topic_statistics_->handle_message(msg_info, time); + } } AnySubscriptionCallback any_callback_; + /// Optional statistics calculator for intra-process deliveries. + SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_; }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 20f2b575f7..01f685e457 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -172,7 +172,8 @@ class Subscription : public SubscriptionBase context, this->get_topic_name(), // important to get like this, as it has the fully-qualified name qos_profile, - resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback)); + resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback), + subscription_topic_statistics); TRACETOOLS_TRACEPOINT( rclcpp_subscription_init, static_cast(get_subscription_handle().get()), diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index 6a253a4278..77066d787b 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -25,6 +25,7 @@ #include "rclcpp/context.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/qos.hpp" +#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp" #include "rmw/types.h" #include "rmw/qos_profiles.h" @@ -376,7 +377,10 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< public: RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcess) - explicit SubscriptionIntraProcess(const std::string & topic, const rclcpp::QoS & qos) + explicit SubscriptionIntraProcess( + const std::string & topic, + const rclcpp::QoS & qos, + std::shared_ptr = nullptr) : SubscriptionIntraProcessBuffer(topic, qos) { } diff --git a/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp b/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp index 0137caf036..47c22f1ef0 100644 --- a/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp +++ b/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp @@ -181,8 +181,9 @@ class SubscriberWithTopicStatistics : public rclcpp::Node public: SubscriberWithTopicStatistics( const std::string & name, const std::string & topic, - std::chrono::milliseconds publish_period = defaultStatisticsPublishPeriod) - : Node(name) + std::chrono::milliseconds publish_period = defaultStatisticsPublishPeriod, + bool use_intra_process = false) + : Node(name, rclcpp::NodeOptions().use_intra_process_comms(use_intra_process)) { // Manually enable topic statistics via options auto options = rclcpp::SubscriptionOptions(); @@ -265,7 +266,7 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_invalid_publish_period) { ASSERT_THROW( SubscriberWithTopicStatistics( - "test_period_node", "should_throw_invalid_arg", std::chrono::milliseconds(0) + "test_period_node", "should_throw_invalid_arg", std::chrono::milliseconds(0), false ), std::invalid_argument); } @@ -278,7 +279,9 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction) { auto empty_subscriber = std::make_shared>( kTestSubNodeName, - kTestSubStatsEmptyTopic); + kTestSubStatsEmptyTopic, + defaultStatisticsPublishPeriod, + false); // Manually create publisher tied to the node auto topic_stats_publisher = @@ -322,7 +325,9 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_no auto empty_subscriber = std::make_shared>( kTestSubNodeName, - kTestSubStatsEmptyTopic); + kTestSubStatsEmptyTopic, + defaultStatisticsPublishPeriod, + false); rclcpp::executors::SingleThreadedExecutor ex; ex.add_node(empty_publisher); @@ -367,7 +372,9 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_include_window auto msg_subscriber_with_topic_statistics = std::make_shared>( kTestSubNodeName, - kTestSubStatsTopic); + kTestSubStatsTopic, + defaultStatisticsPublishPeriod, + false); // Create a message publisher auto msg_publisher = @@ -421,3 +428,45 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_include_window } } } + +void run_intra_process_test(bool use_intra_process) +{ + auto empty_publisher = std::make_shared>( + kTestPubNodeName, + kTestSubStatsEmptyTopic); + auto statistics_listener = std::make_shared( + use_intra_process ? "stats_listener_intra" : "stats_listener_inter", + "/statistics", + kNumExpectedWindows); + auto empty_subscriber = std::make_shared>( + kTestSubNodeName, + kTestSubStatsEmptyTopic, + defaultStatisticsPublishPeriod, + use_intra_process); + + rclcpp::executors::SingleThreadedExecutor ex; + ex.add_node(empty_publisher); + ex.add_node(statistics_listener); + ex.add_node(empty_subscriber); + + ex.spin_until_future_complete(statistics_listener->GetFuture(), kTestTimeout); + + const auto received_messages = statistics_listener->GetReceivedMessages(); + EXPECT_EQ(kNumExpectedWindows, received_messages.size()); + + for (const auto & msg : received_messages) { + if (msg.metrics_source == kMessagePeriodSourceLabel) { + check_if_statistic_message_is_populated(msg); + } + } +} + +TEST_F(TestSubscriptionTopicStatisticsFixture, test_stats_period_inter_process) +{ + run_intra_process_test(false); +} + +TEST_F(TestSubscriptionTopicStatisticsFixture, test_stats_period_intra_process) +{ + run_intra_process_test(true); +} \ No newline at end of file