17#ifndef REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_
18#define REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_
21#include <condition_variable>
32#include "rclcpp/duration.hpp"
33#include "rclcpp/logging.hpp"
34#include "rclcpp/time.hpp"
35#include "realtime_tools/realtime_helpers.hpp"
60 std::function<T(
const rclcpp::Time &,
const rclcpp::Duration &)> callback,
61 int thread_priority = 50)
63 if (callback ==
nullptr) {
64 throw std::runtime_error(
65 "AsyncFunctionHandler: parsed function to call asynchronously is not valid!");
67 if (thread_.joinable()) {
68 throw std::runtime_error(
69 "AsyncFunctionHandler: Cannot reinitialize while the thread is "
70 "running. Please stop the async callback first!");
72 async_function_ = callback;
73 thread_priority_ = thread_priority;
91 std::function<T(
const rclcpp::Time &,
const rclcpp::Duration &)> callback,
92 std::function<
bool()> trigger_predicate,
int thread_priority = 50)
94 if (trigger_predicate ==
nullptr) {
95 throw std::runtime_error(
"AsyncFunctionHandler: parsed trigger predicate is not valid!");
97 init(callback, thread_priority);
98 trigger_predicate_ = trigger_predicate;
122 const rclcpp::Time & time,
const rclcpp::Duration & period)
125 throw std::runtime_error(
"AsyncFunctionHandler: need to be initialized first!");
127 if (async_exception_ptr_) {
129 rclcpp::get_logger(
"AsyncFunctionHandler"),
130 "AsyncFunctionHandler: Exception caught in the async callback thread!");
131 std::rethrow_exception(async_exception_ptr_);
134 throw std::runtime_error(
135 "AsyncFunctionHandler: need to start the async callback thread first before triggering!");
137 std::unique_lock<std::mutex> lock(async_mtx_, std::try_to_lock);
138 bool trigger_status =
false;
139 if (lock.owns_lock() && !trigger_in_progress_ && trigger_predicate_()) {
141 std::unique_lock<std::mutex> scoped_lock(std::move(lock));
142 trigger_in_progress_ =
true;
143 current_callback_time_ = time;
144 current_callback_period_ = period;
146 async_callback_condition_.notify_one();
147 trigger_status =
true;
149 const T return_value = async_callback_return_;
150 return std::make_pair(trigger_status, return_value);
181 std::unique_lock<std::mutex> lock(async_mtx_);
182 stop_async_callback_ =
false;
183 trigger_in_progress_ =
false;
184 current_callback_time_ = rclcpp::Time(0, 0, RCL_CLOCK_UNINITIALIZED);
185 current_callback_period_ = rclcpp::Duration(0, 0);
186 last_execution_time_ = std::chrono::nanoseconds(0);
187 async_callback_return_ = T();
188 async_exception_ptr_ =
nullptr;
198 std::unique_lock<std::mutex> lock(async_mtx_);
199 cycle_end_condition_.wait(lock, [
this] {
return !trigger_in_progress_; });
260 std::unique_lock<std::mutex> lock(async_mtx_);
261 stop_async_callback_ =
true;
263 async_callback_condition_.notify_one();
274 return last_execution_time_.load(std::memory_order_relaxed);
286 throw std::runtime_error(
"AsyncFunctionHandler: need to be initialized first!");
288 if (!thread_.joinable()) {
290 thread_ = std::thread([
this]() ->
void {
293 rclcpp::get_logger(
"AsyncFunctionHandler"),
294 "Could not enable FIFO RT scheduling policy. Consider setting up your user to do FIFO "
297 "[https://control.ros.org/master/doc/ros2_control/controller_manager/doc/userdoc.html] "
301 while (!stop_async_callback_.load(std::memory_order_relaxed)) {
303 std::unique_lock<std::mutex> lock(async_mtx_);
304 async_callback_condition_.wait(
305 lock, [
this] {
return trigger_in_progress_ || stop_async_callback_; });
306 if (!stop_async_callback_) {
307 const auto start_time = std::chrono::steady_clock::now();
309 async_callback_return_ =
310 async_function_(current_callback_time_, current_callback_period_);
312 async_exception_ptr_ = std::current_exception();
314 const auto end_time = std::chrono::steady_clock::now();
315 last_execution_time_ =
316 std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time);
318 trigger_in_progress_ =
false;
320 cycle_end_condition_.notify_all();
327 rclcpp::Time current_callback_time_ = rclcpp::Time(0, 0, RCL_CLOCK_UNINITIALIZED);
328 rclcpp::Duration current_callback_period_{0, 0};
330 std::function<T(
const rclcpp::Time &,
const rclcpp::Duration &)> async_function_;
331 std::function<bool()> trigger_predicate_ = []() {
return true; };
335 int thread_priority_ = std::numeric_limits<int>::quiet_NaN();
336 std::atomic_bool stop_async_callback_{
false};
337 std::atomic_bool trigger_in_progress_{
false};
338 std::atomic<T> async_callback_return_;
339 std::condition_variable async_callback_condition_;
340 std::condition_variable cycle_end_condition_;
341 std::mutex async_mtx_;
342 std::atomic<std::chrono::nanoseconds> last_execution_time_;
343 std::exception_ptr async_exception_ptr_;