17 #ifndef REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_
18 #define REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_
21 #include <condition_variable>
32 #include "rclcpp/duration.hpp"
33 #include "rclcpp/logging.hpp"
34 #include "rclcpp/time.hpp"
35 #include "realtime_tools/thread_priority.hpp"
60 std::function<T(
const rclcpp::Time &,
const rclcpp::Duration &)> callback,
61 int thread_priority = 50)
63 if (callback ==
nullptr) {
64 throw std::runtime_error(
65 "AsyncFunctionHandler: parsed function to call asynchronously is not valid!");
67 if (thread_.joinable()) {
68 throw std::runtime_error(
69 "AsyncFunctionHandler: Cannot reinitialize while the thread is "
70 "running. Please stop the async callback first!");
72 async_function_ = callback;
73 thread_priority_ = thread_priority;
91 std::function<T(
const rclcpp::Time &,
const rclcpp::Duration &)> callback,
92 std::function<
bool()> trigger_predicate,
int thread_priority = 50)
94 if (trigger_predicate ==
nullptr) {
95 throw std::runtime_error(
"AsyncFunctionHandler: parsed trigger predicate is not valid!");
97 init(callback, thread_priority);
98 trigger_predicate_ = trigger_predicate;
118 const rclcpp::Time & time,
const rclcpp::Duration & period)
121 throw std::runtime_error(
"AsyncFunctionHandler: need to be initialized first!");
124 throw std::runtime_error(
125 "AsyncFunctionHandler: need to start the async callback thread first before triggering!");
127 std::unique_lock<std::mutex> lock(async_mtx_, std::try_to_lock);
128 bool trigger_status =
false;
129 if (lock.owns_lock() && !trigger_in_progress_ && trigger_predicate_()) {
131 std::unique_lock<std::mutex> scoped_lock(std::move(lock));
132 trigger_in_progress_ =
true;
133 current_callback_time_ = time;
134 current_callback_period_ = period;
136 async_callback_condition_.notify_one();
137 trigger_status =
true;
139 const T return_value = async_callback_return_;
140 return std::make_pair(trigger_status, return_value);
150 std::unique_lock<std::mutex> lock(async_mtx_);
151 cycle_end_condition_.wait(lock, [
this] {
return !trigger_in_progress_; });
206 std::unique_lock<std::mutex> lock(async_mtx_);
207 stop_async_callback_ =
true;
209 async_callback_condition_.notify_one();
229 throw std::runtime_error(
"AsyncFunctionHandler: need to be initialized first!");
231 if (!thread_.joinable()) {
232 stop_async_callback_ =
false;
233 trigger_in_progress_ =
false;
234 last_execution_time_ = 0.0;
235 async_callback_return_ = T();
236 thread_ = std::thread([
this]() ->
void {
239 rclcpp::get_logger(
"AsyncFunctionHandler"),
240 "Could not enable FIFO RT scheduling policy. Consider setting up your user to do FIFO "
243 "[https://control.ros.org/master/doc/ros2_control/controller_manager/doc/userdoc.html] "
247 while (!stop_async_callback_.load(std::memory_order_relaxed)) {
249 std::unique_lock<std::mutex> lock(async_mtx_);
250 async_callback_condition_.wait(
251 lock, [
this] {
return trigger_in_progress_ || stop_async_callback_; });
252 if (!stop_async_callback_) {
253 const auto start_time = std::chrono::steady_clock::now();
254 async_callback_return_ =
255 async_function_(current_callback_time_, current_callback_period_);
256 const auto end_time = std::chrono::steady_clock::now();
257 last_execution_time_ = std::chrono::duration<double>(end_time - start_time).count();
259 trigger_in_progress_ =
false;
261 cycle_end_condition_.notify_all();
268 rclcpp::Time current_callback_time_;
269 rclcpp::Duration current_callback_period_{0, 0};
271 std::function<T(
const rclcpp::Time &,
const rclcpp::Duration &)> async_function_;
272 std::function<bool()> trigger_predicate_ = []() {
return true; };
276 int thread_priority_ = std::numeric_limits<int>::quiet_NaN();
277 std::atomic_bool stop_async_callback_{
false};
278 std::atomic_bool trigger_in_progress_{
false};
279 std::atomic<T> async_callback_return_;
280 std::condition_variable async_callback_condition_;
281 std::condition_variable cycle_end_condition_;
282 std::mutex async_mtx_;
283 std::atomic<double> last_execution_time_;