253 std::function<T(
const rclcpp::Time &,
const rclcpp::Duration &)> callback,
254 int thread_priority = 50)
256 if (callback ==
nullptr) {
257 throw std::runtime_error(
258 "AsyncFunctionHandler: parsed function to call asynchronously is not valid!");
260 if (thread_.joinable()) {
261 throw std::runtime_error(
262 "AsyncFunctionHandler: Cannot reinitialize while the thread is "
263 "running. Please stop the async callback first!");
265 async_function_ = callback;
266 thread_priority_ = thread_priority;
284 std::function<T(
const rclcpp::Time &,
const rclcpp::Duration &)> callback,
285 std::function<
bool()> trigger_predicate,
int thread_priority = 50)
287 if (trigger_predicate ==
nullptr) {
288 throw std::runtime_error(
"AsyncFunctionHandler: parsed trigger predicate is not valid!");
290 init(callback, thread_priority);
291 trigger_predicate_ = trigger_predicate;
295 std::function<T(
const rclcpp::Time &,
const rclcpp::Duration &)> callback,
299 init(callback, params.trigger_predicate, params.thread_priority);
301 pause_thread_ = params.wait_until_initial_trigger;
325 const rclcpp::Time & time,
const rclcpp::Duration & period)
328 throw std::runtime_error(
"AsyncFunctionHandler: need to be initialized first!");
330 if (async_exception_ptr_) {
332 params_.logger,
"AsyncFunctionHandler: Exception caught in the async callback thread!");
333 std::rethrow_exception(async_exception_ptr_);
338 "AsyncFunctionHandler is configured with DETACHED scheduling policy. "
339 "This means that the async callback may not be synchronized with the main thread. ");
340 if (pause_thread_.load(std::memory_order_relaxed)) {
342 std::unique_lock<std::mutex> lock(async_mtx_);
343 pause_thread_ =
false;
344 RCLCPP_INFO(params_.logger,
"AsyncFunctionHandler: Resuming the async callback thread.");
345 async_callback_return_ = T();
346 auto const sync_period = std::chrono::nanoseconds(1'000'000'000 / params_.exec_rate);
347 previous_time_ = params_.clock->now() - rclcpp::Duration(sync_period);
349 async_callback_condition_.notify_one();
351 return std::make_pair(
true, async_callback_return_.load(std::memory_order_relaxed));
354 throw std::runtime_error(
355 "AsyncFunctionHandler: need to start the async callback thread first before triggering!");
357 std::unique_lock<std::mutex> lock(async_mtx_, std::try_to_lock);
358 bool trigger_status =
false;
359 if (lock.owns_lock() && !trigger_in_progress_ && trigger_predicate_()) {
361 std::unique_lock<std::mutex> scoped_lock(std::move(lock));
362 trigger_in_progress_ =
true;
363 current_callback_time_ = time;
364 current_callback_period_ = period;
366 async_callback_condition_.notify_one();
367 trigger_status =
true;
369 const T return_value = async_callback_return_;
370 return std::make_pair(trigger_status, return_value);
401 std::unique_lock<std::mutex> lock(async_mtx_);
402 stop_async_callback_ =
false;
403 trigger_in_progress_ =
false;
404 current_callback_time_ = rclcpp::Time(0, 0, RCL_CLOCK_UNINITIALIZED);
405 current_callback_period_ = rclcpp::Duration(0, 0);
406 last_execution_time_ = std::chrono::nanoseconds(0);
407 async_callback_return_ = T();
408 async_exception_ptr_ =
nullptr;
418 std::unique_lock<std::mutex> lock(async_mtx_);
419 cycle_end_condition_.wait(lock, [
this] {
return !trigger_in_progress_; });
435 RCLCPP_INFO_EXPRESSION(
436 params_.logger, !pause_thread_,
"AsyncFunctionHandler: Pausing the async callback thread.");
438 pause_thread_ =
true;
442 pause_thread_.store(
true, std::memory_order_relaxed);
443 std::unique_lock<std::mutex> lock(async_mtx_);
447 return pause_thread_.load(std::memory_order_relaxed);
478 bool is_stopped()
const {
return stop_async_callback_.load(std::memory_order_relaxed); }
484 bool is_paused()
const {
return pause_thread_.load(std::memory_order_relaxed); }
519 stop_async_callback_.store(
true, std::memory_order_relaxed);
520 std::unique_lock<std::mutex> lock(async_mtx_);
522 async_callback_condition_.notify_one();
533 return last_execution_time_.load(std::memory_order_relaxed);
545 throw std::runtime_error(
"AsyncFunctionHandler: need to be initialized first!");
547 if (!thread_.joinable()) {
549 thread_ = std::thread([
this]() ->
void {
553 "Could not enable FIFO RT scheduling policy. Consider setting up your user to do FIFO "
554 "RT scheduling. See "
555 "[https://control.ros.org/master/doc/ros2_control/controller_manager/doc/userdoc.html] "
558 if (!params_.cpu_affinity_cores.empty()) {
559 const auto affinity_result =
561 RCLCPP_WARN_EXPRESSION(
562 params_.logger, !affinity_result.first,
563 "Could not set CPU affinity for the async worker thread. Error: %s",
564 affinity_result.second.c_str());
565 RCLCPP_WARN_EXPRESSION(
566 params_.logger, affinity_result.first,
567 "Async worker thread is successfully pinned to the requested CPU cores!");
570 execute_synchronized_callback();
572 execute_detached_callback();
579 void execute_synchronized_callback()
581 while (!stop_async_callback_.load(std::memory_order_relaxed)) {
583 std::unique_lock<std::mutex> lock(async_mtx_);
584 async_callback_condition_.wait(
585 lock, [
this] {
return trigger_in_progress_ || stop_async_callback_; });
586 if (!stop_async_callback_) {
587 const auto start_time = std::chrono::steady_clock::now();
589 async_callback_return_ =
590 async_function_(current_callback_time_, current_callback_period_);
592 async_exception_ptr_ = std::current_exception();
594 const auto end_time = std::chrono::steady_clock::now();
595 last_execution_time_ =
596 std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time);
598 trigger_in_progress_ =
false;
600 cycle_end_condition_.notify_all();
604 void execute_detached_callback()
606 if (!params_.clock) {
607 throw std::runtime_error(
608 "AsyncFunctionHandler: Clock must be set when using DETACHED scheduling policy.");
610 if (params_.exec_rate == 0u) {
611 throw std::runtime_error(
612 "AsyncFunctionHandler: Execution rate must be set when using DETACHED scheduling policy.");
615 auto const period = std::chrono::nanoseconds(1'000'000'000 / params_.exec_rate);
618 std::unique_lock<std::mutex> lock(async_mtx_);
619 async_callback_condition_.wait(
620 lock, [
this] {
return !pause_thread_ || stop_async_callback_; });
623 previous_time_ = params_.clock->now();
624 std::this_thread::sleep_for(period);
625 std::chrono::steady_clock::time_point next_iteration_time{std::chrono::steady_clock::now()};
626 while (!stop_async_callback_.load(std::memory_order_relaxed)) {
628 std::unique_lock<std::mutex> lock(async_mtx_);
629 async_callback_condition_.wait(
630 lock, [
this] {
return !pause_thread_ || stop_async_callback_; });
631 if (!stop_async_callback_) {
633 auto const current_time = params_.clock->now();
634 auto const measured_period = current_time - previous_time_;
635 previous_time_ = current_time;
636 current_callback_time_ = current_time;
637 current_callback_period_ = measured_period;
639 const auto start_time = std::chrono::steady_clock::now();
641 async_callback_return_ = async_function_(current_time, measured_period);
643 async_exception_ptr_ = std::current_exception();
645 last_execution_time_ = std::chrono::duration_cast<std::chrono::nanoseconds>(
646 std::chrono::steady_clock::now() - start_time);
648 next_iteration_time += period;
649 const auto time_now = std::chrono::steady_clock::now();
650 if (next_iteration_time < time_now) {
651 const double time_diff =
652 std::chrono::duration<double, std::milli>(time_now - next_iteration_time).count();
653 const double cm_period = 1.e3 /
static_cast<double>(params_.exec_rate);
654 const int overrun_count =
static_cast<int>(std::ceil(time_diff / cm_period));
655 if (params_.print_warnings) {
656 RCLCPP_WARN_THROTTLE(
657 params_.logger, *params_.clock, 1000,
658 "Overrun detected! The async callback missed its desired rate of %d Hz. The loop "
659 "took %f ms (missed cycles : %d).",
660 params_.exec_rate, time_diff + cm_period, overrun_count + 1);
662 next_iteration_time += (overrun_count * period);
664 std::this_thread::sleep_until(next_iteration_time);
666 trigger_in_progress_ =
false;
668 cycle_end_condition_.notify_all();
672 rclcpp::Time current_callback_time_ = rclcpp::Time(0, 0, RCL_CLOCK_UNINITIALIZED);
673 rclcpp::Duration current_callback_period_{0, 0};
675 std::function<T(
const rclcpp::Time &,
const rclcpp::Duration &)> async_function_;
676 std::function<bool()> trigger_predicate_ = []() {
return true; };
680 AsyncFunctionHandlerParams params_;
681 rclcpp::Time previous_time_{0, 0, RCL_CLOCK_UNINITIALIZED};
682 int thread_priority_ = std::numeric_limits<int>::quiet_NaN();
683 std::atomic_bool stop_async_callback_{
false};
684 std::atomic_bool trigger_in_progress_{
false};
685 std::atomic_bool pause_thread_{
false};
686 std::atomic<T> async_callback_return_;
687 std::condition_variable async_callback_condition_;
688 std::condition_variable cycle_end_condition_;
689 std::mutex async_mtx_;
690 std::atomic<std::chrono::nanoseconds> last_execution_time_;
691 std::atomic<double> periodicity_;
692 std::exception_ptr async_exception_ptr_;