diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 65d0a930cb..59f3065860 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -560,9 +560,9 @@ class Executor virtual void spin_once_impl(std::chrono::nanoseconds timeout); - typedef std::map> + typedef std::map< + rclcpp::CallbackGroup::WeakPtr, rclcpp::GuardCondition::SharedPtr, + std::owner_less> WeakCallbackGroupsToGuardConditionsMap; /// maps callback groups to guard conditions diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index a406cb6e8e..4cae552981 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -107,8 +107,7 @@ Executor::~Executor() weak_groups_to_nodes_associated_with_executor_.clear(); weak_groups_to_nodes_.clear(); for (const auto & pair : weak_groups_to_guard_conditions_) { - auto guard_condition = pair.second; - memory_strategy_->remove_guard_condition(guard_condition); + memory_strategy_->remove_guard_condition(pair.second.get()); } weak_groups_to_guard_conditions_.clear(); @@ -218,7 +217,10 @@ Executor::add_callback_group_to_map( if (node_ptr->get_context()->is_valid()) { auto callback_group_guard_condition = group_ptr->get_notify_guard_condition(node_ptr->get_context()); - weak_groups_to_guard_conditions_[weak_group_ptr] = callback_group_guard_condition.get(); + // Store shared_ptr to keep the guard condition alive while registered with the executor. + // This prevents the guard condition from being finalized (impl set to NULL) while the + // memory strategy still holds a raw pointer to it during wait_for_work(). + weak_groups_to_guard_conditions_[weak_group_ptr] = callback_group_guard_condition; // Add the callback_group's notify condition to the guard condition handles memory_strategy_->add_guard_condition(*callback_group_guard_condition); } @@ -304,7 +306,7 @@ Executor::remove_callback_group_from_map( { auto iter = weak_groups_to_guard_conditions_.find(weak_group_ptr); if (iter != weak_groups_to_guard_conditions_.end()) { - memory_strategy_->remove_guard_condition(iter->second); + memory_strategy_->remove_guard_condition(iter->second.get()); } weak_groups_to_guard_conditions_.erase(weak_group_ptr); @@ -730,7 +732,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) if (callback_guard_pair != weak_groups_to_guard_conditions_.end()) { auto guard_condition = callback_guard_pair->second; weak_groups_to_guard_conditions_.erase(group_ptr); - memory_strategy_->remove_guard_condition(guard_condition); + memory_strategy_->remove_guard_condition(guard_condition.get()); } weak_groups_to_nodes_.erase(group_ptr); }); diff --git a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp index 07ca1e87d8..113d887578 100644 --- a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp +++ b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp @@ -14,10 +14,13 @@ #include +#include #include +#include #include #include #include +#include #include #include @@ -340,6 +343,92 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, subscriber_triggered_to_receive_mess EXPECT_TRUE(received_message_future.get()); } +/* + * Test destroying the last strong callback group reference while the executor is spinning. + * This exercises the callback-group lifetime path from https://github.com/ros2/rclcpp/issues/2163. + */ +TYPED_TEST(TestAddCallbackGroupsToExecutor, callback_group_destroyed_while_spinning) +{ + using ExecutorType = TypeParam; + + ExecutorType executor; + auto node = std::make_shared("callback_group_destroyed_while_spinning", "/ns"); + executor.add_node(node); + + auto count_live_callback_groups = [&executor]() { + size_t count = 0; + for (const auto & weak_group : executor.get_all_callback_groups()) { + if (weak_group.lock()) { + ++count; + } + } + return count; + }; + + auto wait_for_live_callback_groups = + [&count_live_callback_groups](size_t expected_count, std::chrono::milliseconds timeout) { + const auto deadline = std::chrono::steady_clock::now() + timeout; + while (std::chrono::steady_clock::now() < deadline) { + if (count_live_callback_groups() == expected_count) { + return true; + } + std::this_thread::sleep_for(1ms); + } + return count_live_callback_groups() == expected_count; + }; + + const auto initial_callback_group_count = count_live_callback_groups(); + + std::exception_ptr spin_exception; + std::thread spin_thread([&executor, &spin_exception]() { + try { + executor.spin(); + } catch (...) { + spin_exception = std::current_exception(); + } + }); + + auto heartbeat_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, true); + auto heartbeat_timer = node->create_wall_timer(1ms, []() {}, heartbeat_group); + + bool callback_groups_tracked = + wait_for_live_callback_groups(initial_callback_group_count + 1u, 2s); + const auto steady_state_callback_group_count = count_live_callback_groups(); + bool callback_groups_cleaned_up = callback_groups_tracked; + + for (size_t attempt = 0; attempt < 50 && callback_groups_cleaned_up; ++attempt) { + auto transient_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, true); + auto transient_timer = node->create_wall_timer(1ms, []() {}, transient_group); + + callback_groups_cleaned_up = wait_for_live_callback_groups( + steady_state_callback_group_count + 1u, 2s); + + transient_timer.reset(); + transient_group.reset(); + + callback_groups_cleaned_up = callback_groups_cleaned_up && + wait_for_live_callback_groups(steady_state_callback_group_count, 2s); + } + + executor.cancel(); + spin_thread.join(); + + EXPECT_TRUE(callback_groups_tracked); + EXPECT_TRUE(callback_groups_cleaned_up); + + if (spin_exception) { + try { + std::rethrow_exception(spin_exception); + } catch (const std::exception & exception) { + FAIL() << "executor.spin() threw: " << exception.what(); + } catch (...) { + FAIL() << "executor.spin() threw a non-standard exception"; + } + } +} + /* * Test removing callback group from executor that its not associated with. */