ros2_control - rolling
Loading...
Searching...
No Matches
async_function_handler.hpp
1// Copyright 2024 PAL Robotics S.L.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
16
17#ifndef REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_
18#define REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_
19
20#include <atomic>
21#include <cmath>
22#include <condition_variable>
23#include <functional>
24#include <limits>
25#include <memory>
26#include <mutex>
27#include <stdexcept>
28#include <string>
29#include <thread>
30#include <utility>
31#include <vector>
32
33#include "rclcpp/clock.hpp"
34#include "rclcpp/duration.hpp"
35#include "rclcpp/logging.hpp"
36#include "rclcpp/time.hpp"
37#include "realtime_tools/realtime_helpers.hpp"
38
40{
50{
51public:
52 enum Value : int8_t {
53 UNKNOWN = -1,
56 };
57
58 AsyncSchedulingPolicy() = default;
59 constexpr AsyncSchedulingPolicy(Value value) : value_(value) {} // NOLINT(runtime/explicit)
60 explicit AsyncSchedulingPolicy(const std::string & data_type)
61 {
62 if (data_type == "synchronized") {
63 value_ = SYNCHRONIZED;
64 } else if (data_type == "detached") {
65 value_ = DETACHED;
66 } else {
67 value_ = UNKNOWN;
68 }
69 }
70
71 operator Value() const { return value_; }
72
73 explicit operator bool() const = delete;
74
75 constexpr bool operator==(AsyncSchedulingPolicy other) const { return value_ == other.value_; }
76 constexpr bool operator!=(AsyncSchedulingPolicy other) const { return value_ != other.value_; }
77
78 constexpr bool operator==(Value other) const { return value_ == other; }
79 constexpr bool operator!=(Value other) const { return value_ != other; }
80
81 std::string to_string() const
82 {
83 switch (value_) {
84 case SYNCHRONIZED:
85 return "synchronized";
86 case DETACHED:
87 return "detached";
88 default:
89 return "unknown";
90 }
91 }
92
93 AsyncSchedulingPolicy from_string(const std::string & data_type)
94 {
95 return AsyncSchedulingPolicy(data_type);
96 }
97
98private:
99 Value value_ = UNKNOWN;
100};
101
127{
133 bool validate() const
134 {
135 if (thread_priority < 0 || thread_priority > 99) {
136 RCLCPP_ERROR(
137 logger, "Invalid thread priority: %d. It should be between 0 and 99.", thread_priority);
138 return false;
139 }
140 if (scheduling_policy == AsyncSchedulingPolicy::DETACHED) {
141 if (!clock) {
142 RCLCPP_ERROR(logger, "Clock must be set when using DETACHED scheduling policy.");
143 return false;
144 }
145 if (exec_rate == 0u) {
146 RCLCPP_ERROR(logger, "Execution rate must be set when using DETACHED scheduling policy.");
147 return false;
148 }
149 }
150 if (scheduling_policy == AsyncSchedulingPolicy::UNKNOWN) {
151 throw std::runtime_error(
152 "AsyncFunctionHandlerParams: scheduling policy is unknown. "
153 "Please set it to either 'synchronized' or 'detached'.");
154 }
155 if (trigger_predicate == nullptr) {
156 RCLCPP_ERROR(logger, "The parsed trigger predicate is not valid!");
157 return false;
158 }
159 for (const int & core : cpu_affinity_cores) {
160 if (core < 0) {
161 RCLCPP_ERROR(logger, "Invalid CPU core id: %d. It should be a non-negative integer.", core);
162 return false;
163 }
164 }
165 return true;
166 }
167
184 template <typename NodeT>
185 void initialize(NodeT & node, const std::string & prefix)
186 {
187 if (node->has_parameter(prefix + "thread_priority")) {
188 thread_priority = static_cast<int>(node->get_parameter(prefix + "thread_priority").as_int());
189 }
190 if (node->has_parameter(prefix + "cpu_affinity")) {
191 const auto cpu_affinity_param =
192 node->get_parameter(prefix + "cpu_affinity").as_integer_array();
193 for (const auto & core : cpu_affinity_param) {
194 cpu_affinity_cores.push_back(static_cast<int>(core));
195 }
196 }
197 if (node->has_parameter(prefix + "scheduling_policy")) {
198 scheduling_policy =
199 AsyncSchedulingPolicy(node->get_parameter(prefix + "scheduling_policy").as_string());
200 }
201 if (
202 scheduling_policy == AsyncSchedulingPolicy::DETACHED &&
203 node->has_parameter(prefix + "execution_rate")) {
204 const int execution_rate =
205 static_cast<int>(node->get_parameter(prefix + "execution_rate").as_int());
206 if (execution_rate <= 0) {
207 throw std::runtime_error(
208 "AsyncFunctionHandler: execution_rate parameter must be positive.");
209 }
210 exec_rate = static_cast<unsigned int>(execution_rate);
211 }
212 if (node->has_parameter(prefix + "wait_until_initial_trigger")) {
213 wait_until_initial_trigger =
214 node->get_parameter(prefix + "wait_until_initial_trigger").as_bool();
215 }
216 if (node->has_parameter(prefix + "print_warnings")) {
217 print_warnings = node->get_parameter(prefix + "print_warnings").as_bool();
218 }
219 }
220
221 int thread_priority = 50;
222 std::vector<int> cpu_affinity_cores = {};
223 AsyncSchedulingPolicy scheduling_policy = AsyncSchedulingPolicy::SYNCHRONIZED;
224 unsigned int exec_rate = 0u;
225 rclcpp::Clock::SharedPtr clock = nullptr;
226 rclcpp::Logger logger = rclcpp::get_logger("AsyncFunctionHandler");
227 std::function<bool()> trigger_predicate = []() { return true; };
228 bool wait_until_initial_trigger = true;
229 bool print_warnings = true;
230};
231
237template <typename T>
239{
240public:
241 AsyncFunctionHandler() = default;
242
244
246
252 void init(
253 std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> callback,
254 int thread_priority = 50)
255 {
256 if (callback == nullptr) {
257 throw std::runtime_error(
258 "AsyncFunctionHandler: parsed function to call asynchronously is not valid!");
259 }
260 if (thread_.joinable()) {
261 throw std::runtime_error(
262 "AsyncFunctionHandler: Cannot reinitialize while the thread is "
263 "running. Please stop the async callback first!");
264 }
265 async_function_ = callback;
266 thread_priority_ = thread_priority;
267 }
268
270
283 void init(
284 std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> callback,
285 std::function<bool()> trigger_predicate, int thread_priority = 50)
286 {
287 if (trigger_predicate == nullptr) {
288 throw std::runtime_error("AsyncFunctionHandler: parsed trigger predicate is not valid!");
289 }
290 init(callback, thread_priority);
291 trigger_predicate_ = trigger_predicate;
292 }
293
294 void init(
295 std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> callback,
296 const AsyncFunctionHandlerParams & params)
297 {
298 params.validate();
299 init(callback, params.trigger_predicate, params.thread_priority);
300 params_ = params;
301 pause_thread_ = params.wait_until_initial_trigger;
302 }
303
305
324 std::pair<bool, T> trigger_async_callback(
325 const rclcpp::Time & time, const rclcpp::Duration & period)
326 {
327 if (!is_initialized()) {
328 throw std::runtime_error("AsyncFunctionHandler: need to be initialized first!");
329 }
330 if (async_exception_ptr_) {
331 RCLCPP_ERROR(
332 params_.logger, "AsyncFunctionHandler: Exception caught in the async callback thread!");
333 std::rethrow_exception(async_exception_ptr_);
334 }
335 if (params_.scheduling_policy == AsyncSchedulingPolicy::DETACHED) {
336 RCLCPP_WARN_ONCE(
337 params_.logger,
338 "AsyncFunctionHandler is configured with DETACHED scheduling policy. "
339 "This means that the async callback may not be synchronized with the main thread. ");
340 if (pause_thread_.load(std::memory_order_relaxed)) {
341 {
342 std::unique_lock<std::mutex> lock(async_mtx_);
343 pause_thread_ = false;
344 RCLCPP_INFO(params_.logger, "AsyncFunctionHandler: Resuming the async callback thread.");
345 async_callback_return_ = T();
346 auto const sync_period = std::chrono::nanoseconds(1'000'000'000 / params_.exec_rate);
347 previous_time_ = params_.clock->now() - rclcpp::Duration(sync_period);
348 }
349 async_callback_condition_.notify_one();
350 }
351 return std::make_pair(true, async_callback_return_.load(std::memory_order_relaxed));
352 }
353 if (!is_running()) {
354 throw std::runtime_error(
355 "AsyncFunctionHandler: need to start the async callback thread first before triggering!");
356 }
357 std::unique_lock<std::mutex> lock(async_mtx_, std::try_to_lock);
358 bool trigger_status = false;
359 if (lock.owns_lock() && !trigger_in_progress_ && trigger_predicate_()) {
360 {
361 std::unique_lock<std::mutex> scoped_lock(std::move(lock));
362 trigger_in_progress_ = true;
363 current_callback_time_ = time;
364 current_callback_period_ = period;
365 }
366 async_callback_condition_.notify_one();
367 trigger_status = true;
368 }
369 const T return_value = async_callback_return_;
370 return std::make_pair(trigger_status, return_value);
371 }
372
374
377 T get_last_return_value() const { return async_callback_return_; }
378
380
383 const rclcpp::Time & get_current_callback_time() const { return current_callback_time_; }
384
386
389 const rclcpp::Duration & get_current_callback_period() const { return current_callback_period_; }
390
392
400 {
401 std::unique_lock<std::mutex> lock(async_mtx_);
402 stop_async_callback_ = false;
403 trigger_in_progress_ = false;
404 current_callback_time_ = rclcpp::Time(0, 0, RCL_CLOCK_UNINITIALIZED);
405 current_callback_period_ = rclcpp::Duration(0, 0);
406 last_execution_time_ = std::chrono::nanoseconds(0);
407 async_callback_return_ = T();
408 async_exception_ptr_ = nullptr;
409 }
410
412
416 {
417 if (is_running()) {
418 std::unique_lock<std::mutex> lock(async_mtx_);
419 cycle_end_condition_.wait(lock, [this] { return !trigger_in_progress_; });
420 return true;
421 }
422 return false;
423 }
424
426
434 {
435 RCLCPP_INFO_EXPRESSION(
436 params_.logger, !pause_thread_, "AsyncFunctionHandler: Pausing the async callback thread.");
437 if (params_.scheduling_policy == AsyncSchedulingPolicy::SYNCHRONIZED) {
438 pause_thread_ = true;
440 } else {
441 if (is_running()) {
442 pause_thread_.store(true, std::memory_order_relaxed);
443 std::unique_lock<std::mutex> lock(async_mtx_);
444 return true;
445 }
446 }
447 return pause_thread_.load(std::memory_order_relaxed);
448 }
449
451
454 bool is_initialized() const { return async_function_ && trigger_predicate_; }
455
457
462 {
463 if (is_running()) {
464 thread_.join();
465 }
466 }
467
469
472 bool is_running() const { return thread_.joinable(); }
473
475
478 bool is_stopped() const { return stop_async_callback_.load(std::memory_order_relaxed); }
479
481
484 bool is_paused() const { return pause_thread_.load(std::memory_order_relaxed); }
485
487
490 std::thread & get_thread() { return thread_; }
491
493
496 const std::thread & get_thread() const { return thread_; }
497
499
502 const AsyncFunctionHandlerParams & get_params() const { return params_; }
503
505
508 bool is_trigger_cycle_in_progress() const { return trigger_in_progress_; }
509
511
516 {
517 if (is_running()) {
518 {
519 stop_async_callback_.store(true, std::memory_order_relaxed);
520 std::unique_lock<std::mutex> lock(async_mtx_);
521 }
522 async_callback_condition_.notify_one();
523 thread_.join();
524 }
525 }
526
528
531 std::chrono::nanoseconds get_last_execution_time() const
532 {
533 return last_execution_time_.load(std::memory_order_relaxed);
534 }
535
537
543 {
544 if (!is_initialized()) {
545 throw std::runtime_error("AsyncFunctionHandler: need to be initialized first!");
546 }
547 if (!thread_.joinable()) {
549 thread_ = std::thread([this]() -> void {
550 if (!realtime_tools::configure_sched_fifo(thread_priority_)) {
551 RCLCPP_WARN(
552 params_.logger,
553 "Could not enable FIFO RT scheduling policy. Consider setting up your user to do FIFO "
554 "RT scheduling. See "
555 "[https://control.ros.org/master/doc/ros2_control/controller_manager/doc/userdoc.html] "
556 "for details.");
557 }
558 if (!params_.cpu_affinity_cores.empty()) {
559 const auto affinity_result =
560 realtime_tools::set_current_thread_affinity(params_.cpu_affinity_cores);
561 RCLCPP_WARN_EXPRESSION(
562 params_.logger, !affinity_result.first,
563 "Could not set CPU affinity for the async worker thread. Error: %s",
564 affinity_result.second.c_str());
565 RCLCPP_WARN_EXPRESSION(
566 params_.logger, affinity_result.first,
567 "Async worker thread is successfully pinned to the requested CPU cores!");
568 }
569 if (params_.scheduling_policy == AsyncSchedulingPolicy::SYNCHRONIZED) {
570 execute_synchronized_callback();
571 } else {
572 execute_detached_callback();
573 }
574 });
575 }
576 }
577
578private:
579 void execute_synchronized_callback()
580 {
581 while (!stop_async_callback_.load(std::memory_order_relaxed)) {
582 {
583 std::unique_lock<std::mutex> lock(async_mtx_);
584 async_callback_condition_.wait(
585 lock, [this] { return trigger_in_progress_ || stop_async_callback_; });
586 if (!stop_async_callback_) {
587 const auto start_time = std::chrono::steady_clock::now();
588 try {
589 async_callback_return_ =
590 async_function_(current_callback_time_, current_callback_period_);
591 } catch (...) {
592 async_exception_ptr_ = std::current_exception();
593 }
594 const auto end_time = std::chrono::steady_clock::now();
595 last_execution_time_ =
596 std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time);
597 }
598 trigger_in_progress_ = false;
599 }
600 cycle_end_condition_.notify_all();
601 }
602 }
603
604 void execute_detached_callback()
605 {
606 if (!params_.clock) {
607 throw std::runtime_error(
608 "AsyncFunctionHandler: Clock must be set when using DETACHED scheduling policy.");
609 }
610 if (params_.exec_rate == 0u) {
611 throw std::runtime_error(
612 "AsyncFunctionHandler: Execution rate must be set when using DETACHED scheduling policy.");
613 }
614
615 auto const period = std::chrono::nanoseconds(1'000'000'000 / params_.exec_rate);
616
617 if (pause_thread_) {
618 std::unique_lock<std::mutex> lock(async_mtx_);
619 async_callback_condition_.wait(
620 lock, [this] { return !pause_thread_ || stop_async_callback_; });
621 }
622 // for calculating the measured period of the loop
623 previous_time_ = params_.clock->now();
624 std::this_thread::sleep_for(period);
625 std::chrono::steady_clock::time_point next_iteration_time{std::chrono::steady_clock::now()};
626 while (!stop_async_callback_.load(std::memory_order_relaxed)) {
627 {
628 std::unique_lock<std::mutex> lock(async_mtx_);
629 async_callback_condition_.wait(
630 lock, [this] { return !pause_thread_ || stop_async_callback_; });
631 if (!stop_async_callback_) {
632 // calculate measured period
633 auto const current_time = params_.clock->now();
634 auto const measured_period = current_time - previous_time_;
635 previous_time_ = current_time;
636 current_callback_time_ = current_time;
637 current_callback_period_ = measured_period;
638
639 const auto start_time = std::chrono::steady_clock::now();
640 try {
641 async_callback_return_ = async_function_(current_time, measured_period);
642 } catch (...) {
643 async_exception_ptr_ = std::current_exception();
644 }
645 last_execution_time_ = std::chrono::duration_cast<std::chrono::nanoseconds>(
646 std::chrono::steady_clock::now() - start_time);
647
648 next_iteration_time += period;
649 const auto time_now = std::chrono::steady_clock::now();
650 if (next_iteration_time < time_now) {
651 const double time_diff =
652 std::chrono::duration<double, std::milli>(time_now - next_iteration_time).count();
653 const double cm_period = 1.e3 / static_cast<double>(params_.exec_rate);
654 const int overrun_count = static_cast<int>(std::ceil(time_diff / cm_period));
655 if (params_.print_warnings) {
656 RCLCPP_WARN_THROTTLE(
657 params_.logger, *params_.clock, 1000,
658 "Overrun detected! The async callback missed its desired rate of %d Hz. The loop "
659 "took %f ms (missed cycles : %d).",
660 params_.exec_rate, time_diff + cm_period, overrun_count + 1);
661 }
662 next_iteration_time += (overrun_count * period);
663 }
664 std::this_thread::sleep_until(next_iteration_time);
665 }
666 trigger_in_progress_ = false;
667 }
668 cycle_end_condition_.notify_all();
669 }
670 }
671
672 rclcpp::Time current_callback_time_ = rclcpp::Time(0, 0, RCL_CLOCK_UNINITIALIZED);
673 rclcpp::Duration current_callback_period_{0, 0};
674
675 std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> async_function_;
676 std::function<bool()> trigger_predicate_ = []() { return true; };
677
678 // Async related variables
679 std::thread thread_;
680 AsyncFunctionHandlerParams params_;
681 rclcpp::Time previous_time_{0, 0, RCL_CLOCK_UNINITIALIZED};
682 int thread_priority_ = std::numeric_limits<int>::quiet_NaN();
683 std::atomic_bool stop_async_callback_{false};
684 std::atomic_bool trigger_in_progress_{false};
685 std::atomic_bool pause_thread_{false};
686 std::atomic<T> async_callback_return_;
687 std::condition_variable async_callback_condition_;
688 std::condition_variable cycle_end_condition_;
689 std::mutex async_mtx_;
690 std::atomic<std::chrono::nanoseconds> last_execution_time_;
691 std::atomic<double> periodicity_;
692 std::exception_ptr async_exception_ptr_;
693};
694} // namespace realtime_tools
695
696#endif // REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_
Class to handle asynchronous function calls. AsyncFunctionHandler is a class that allows the user to ...
Definition async_function_handler.hpp:239
bool wait_for_trigger_cycle_to_finish()
Waits until the current async callback method trigger cycle is finished.
Definition async_function_handler.hpp:415
std::pair< bool, T > trigger_async_callback(const rclcpp::Time &time, const rclcpp::Duration &period)
Triggers the async callback method cycle.
Definition async_function_handler.hpp:324
std::thread & get_thread()
Get the async worker thread.
Definition async_function_handler.hpp:490
bool is_trigger_cycle_in_progress() const
Check if the async callback method is in progress.
Definition async_function_handler.hpp:508
std::chrono::nanoseconds get_last_execution_time() const
Get the last execution time of the async callback method.
Definition async_function_handler.hpp:531
const rclcpp::Time & get_current_callback_time() const
Get the current callback time.
Definition async_function_handler.hpp:383
void start_thread()
Initializes and starts the callback thread.
Definition async_function_handler.hpp:542
void init(std::function< T(const rclcpp::Time &, const rclcpp::Duration &)> callback, std::function< bool()> trigger_predicate, int thread_priority=50)
Initialize the AsyncFunctionHandler with the callback, trigger_predicate and thread_priority.
Definition async_function_handler.hpp:283
const AsyncFunctionHandlerParams & get_params() const
Get the parameters used to configure the AsyncFunctionHandler.
Definition async_function_handler.hpp:502
void reset_variables()
Resets the internal variables of the AsyncFunctionHandler.
Definition async_function_handler.hpp:399
bool is_running() const
Check if the async worker thread is running.
Definition async_function_handler.hpp:472
bool is_paused() const
Check if the async callback thread is paused.
Definition async_function_handler.hpp:484
bool pause_execution()
Pauses the execution of the async callback thread.
Definition async_function_handler.hpp:433
const rclcpp::Duration & get_current_callback_period() const
Get the current callback period.
Definition async_function_handler.hpp:389
void init(std::function< T(const rclcpp::Time &, const rclcpp::Duration &)> callback, int thread_priority=50)
Initialize the AsyncFunctionHandler with the callback and thread_priority.
Definition async_function_handler.hpp:252
void stop_thread()
Stops the callback thread.
Definition async_function_handler.hpp:515
void join_async_callback_thread()
Join the async callback thread.
Definition async_function_handler.hpp:461
bool is_stopped() const
Check if the async callback is triggered to stop the cycle.
Definition async_function_handler.hpp:478
bool is_initialized() const
Check if the AsyncFunctionHandler is initialized.
Definition async_function_handler.hpp:454
const std::thread & get_thread() const
Get the const version of async worker thread.
Definition async_function_handler.hpp:496
T get_last_return_value() const
Get the last return value of the async callback method.
Definition async_function_handler.hpp:377
Enum class to define the scheduling policy for the async worker thread. SYNCHRONIZED: The async worke...
Definition async_function_handler.hpp:50
Value
Definition async_function_handler.hpp:52
@ DETACHED
Synchronized scheduling policy.
Definition async_function_handler.hpp:55
@ SYNCHRONIZED
Unknown scheduling policy.
Definition async_function_handler.hpp:54
A pthread mutex wrapper that provides a mutex with the priority inheritance protocol and a priority c...
Definition async_function_handler.hpp:40
std::pair< bool, std::string > set_current_thread_affinity(int core)
Definition realtime_helpers.cpp:238
bool configure_sched_fifo(int priority)
Definition realtime_helpers.cpp:74
The AsyncFunctionHandlerParams struct is used to configure the AsyncFunctionHandler....
Definition async_function_handler.hpp:127
void initialize(NodeT &node, const std::string &prefix)
Initialize the parameters from a node's parameters. The node should have the following parameters:
Definition async_function_handler.hpp:185
bool validate() const
Validate the parameters.
Definition async_function_handler.hpp:133