ros2_control - humble
async_function_handler.hpp
1 // Copyright 2024 PAL Robotics S.L.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #ifndef REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_
18 #define REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_
19 
20 #include <atomic>
21 #include <condition_variable>
22 #include <functional>
23 #include <limits>
24 #include <memory>
25 #include <mutex>
26 #include <stdexcept>
27 #include <string>
28 #include <thread>
29 #include <utility>
30 #include <vector>
31 
32 #include "rclcpp/duration.hpp"
33 #include "rclcpp/logging.hpp"
34 #include "rclcpp/time.hpp"
35 #include "realtime_tools/realtime_helpers.hpp"
36 
37 namespace realtime_tools
38 {
44 template <typename T>
46 {
47 public:
48  AsyncFunctionHandler() = default;
49 
51 
53 
59  void init(
60  std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> callback,
61  int thread_priority = 50)
62  {
63  if (callback == nullptr) {
64  throw std::runtime_error(
65  "AsyncFunctionHandler: parsed function to call asynchronously is not valid!");
66  }
67  if (thread_.joinable()) {
68  throw std::runtime_error(
69  "AsyncFunctionHandler: Cannot reinitialize while the thread is "
70  "running. Please stop the async callback first!");
71  }
72  async_function_ = callback;
73  thread_priority_ = thread_priority;
74  }
75 
77 
90  void init(
91  std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> callback,
92  std::function<bool()> trigger_predicate, int thread_priority = 50)
93  {
94  if (trigger_predicate == nullptr) {
95  throw std::runtime_error("AsyncFunctionHandler: parsed trigger predicate is not valid!");
96  }
97  init(callback, thread_priority);
98  trigger_predicate_ = trigger_predicate;
99  }
100 
102 
121  std::pair<bool, T> trigger_async_callback(
122  const rclcpp::Time & time, const rclcpp::Duration & period)
123  {
124  if (!is_initialized()) {
125  throw std::runtime_error("AsyncFunctionHandler: need to be initialized first!");
126  }
127  if (async_exception_ptr_) {
128  RCLCPP_ERROR(
129  rclcpp::get_logger("AsyncFunctionHandler"),
130  "AsyncFunctionHandler: Exception caught in the async callback thread!");
131  std::rethrow_exception(async_exception_ptr_);
132  }
133  if (!is_running()) {
134  throw std::runtime_error(
135  "AsyncFunctionHandler: need to start the async callback thread first before triggering!");
136  }
137  std::unique_lock<std::mutex> lock(async_mtx_, std::try_to_lock);
138  bool trigger_status = false;
139  if (lock.owns_lock() && !trigger_in_progress_ && trigger_predicate_()) {
140  {
141  std::unique_lock<std::mutex> scoped_lock(std::move(lock));
142  trigger_in_progress_ = true;
143  current_callback_time_ = time;
144  current_callback_period_ = period;
145  }
146  async_callback_condition_.notify_one();
147  trigger_status = true;
148  }
149  const T return_value = async_callback_return_;
150  return std::make_pair(trigger_status, return_value);
151  }
152 
154 
157  T get_last_return_value() const { return async_callback_return_; }
158 
160 
163  const rclcpp::Time & get_current_callback_time() const { return current_callback_time_; }
164 
166 
169  const rclcpp::Duration & get_current_callback_period() const { return current_callback_period_; }
170 
172 
180  {
181  std::unique_lock<std::mutex> lock(async_mtx_);
182  stop_async_callback_ = false;
183  trigger_in_progress_ = false;
184  current_callback_time_ = rclcpp::Time(0, 0, RCL_CLOCK_UNINITIALIZED);
185  current_callback_period_ = rclcpp::Duration(0, 0);
186  last_execution_time_ = std::chrono::nanoseconds(0);
187  async_callback_return_ = T();
188  async_exception_ptr_ = nullptr;
189  }
190 
192 
196  {
197  if (is_running()) {
198  std::unique_lock<std::mutex> lock(async_mtx_);
199  cycle_end_condition_.wait(lock, [this] { return !trigger_in_progress_; });
200  }
201  }
202 
204 
207  bool is_initialized() const { return async_function_ && trigger_predicate_; }
208 
210 
215  {
216  if (is_running()) {
217  thread_.join();
218  }
219  }
220 
222 
225  bool is_running() const { return thread_.joinable(); }
226 
228 
231  bool is_stopped() const { return stop_async_callback_; }
232 
234 
237  std::thread & get_thread() { return thread_; }
238 
240 
243  const std::thread & get_thread() const { return thread_; }
244 
246 
249  bool is_trigger_cycle_in_progress() const { return trigger_in_progress_; }
250 
252 
256  void stop_thread()
257  {
258  if (is_running()) {
259  {
260  std::unique_lock<std::mutex> lock(async_mtx_);
261  stop_async_callback_ = true;
262  }
263  async_callback_condition_.notify_one();
264  thread_.join();
265  }
266  }
267 
269 
272  std::chrono::nanoseconds get_last_execution_time() const
273  {
274  return last_execution_time_.load(std::memory_order_relaxed);
275  }
276 
278 
284  {
285  if (!is_initialized()) {
286  throw std::runtime_error("AsyncFunctionHandler: need to be initialized first!");
287  }
288  if (!thread_.joinable()) {
289  reset_variables();
290  thread_ = std::thread([this]() -> void {
291  if (!realtime_tools::configure_sched_fifo(thread_priority_)) {
292  RCLCPP_WARN(
293  rclcpp::get_logger("AsyncFunctionHandler"),
294  "Could not enable FIFO RT scheduling policy. Consider setting up your user to do FIFO "
295  "RT "
296  "scheduling. See "
297  "[https://control.ros.org/master/doc/ros2_control/controller_manager/doc/userdoc.html] "
298  "for details.");
299  }
300 
301  while (!stop_async_callback_.load(std::memory_order_relaxed)) {
302  {
303  std::unique_lock<std::mutex> lock(async_mtx_);
304  async_callback_condition_.wait(
305  lock, [this] { return trigger_in_progress_ || stop_async_callback_; });
306  if (!stop_async_callback_) {
307  const auto start_time = std::chrono::steady_clock::now();
308  try {
309  async_callback_return_ =
310  async_function_(current_callback_time_, current_callback_period_);
311  } catch (...) {
312  async_exception_ptr_ = std::current_exception();
313  }
314  const auto end_time = std::chrono::steady_clock::now();
315  last_execution_time_ =
316  std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time);
317  }
318  trigger_in_progress_ = false;
319  }
320  cycle_end_condition_.notify_all();
321  }
322  });
323  }
324  }
325 
326 private:
327  rclcpp::Time current_callback_time_ = rclcpp::Time(0, 0, RCL_CLOCK_UNINITIALIZED);
328  rclcpp::Duration current_callback_period_{0, 0};
329 
330  std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> async_function_;
331  std::function<bool()> trigger_predicate_ = []() { return true; };
332 
333  // Async related variables
334  std::thread thread_;
335  int thread_priority_ = std::numeric_limits<int>::quiet_NaN();
336  std::atomic_bool stop_async_callback_{false};
337  std::atomic_bool trigger_in_progress_{false};
338  std::atomic<T> async_callback_return_;
339  std::condition_variable async_callback_condition_;
340  std::condition_variable cycle_end_condition_;
341  std::mutex async_mtx_;
342  std::atomic<std::chrono::nanoseconds> last_execution_time_;
343  std::exception_ptr async_exception_ptr_;
344 };
345 } // namespace realtime_tools
346 
347 #endif // REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_
Class to handle asynchronous function calls. AsyncFunctionHandler is a class that allows the user to ...
Definition: async_function_handler.hpp:46
void wait_for_trigger_cycle_to_finish()
Waits until the current async callback method trigger cycle is finished.
Definition: async_function_handler.hpp:195
bool is_trigger_cycle_in_progress() const
Check if the async callback method is in progress.
Definition: async_function_handler.hpp:249
std::chrono::nanoseconds get_last_execution_time() const
Get the last execution time of the async callback method.
Definition: async_function_handler.hpp:272
void start_thread()
Initializes and starts the callback thread.
Definition: async_function_handler.hpp:283
void init(std::function< T(const rclcpp::Time &, const rclcpp::Duration &)> callback, std::function< bool()> trigger_predicate, int thread_priority=50)
Initialize the AsyncFunctionHandler with the callback, trigger_predicate and thread_priority.
Definition: async_function_handler.hpp:90
void reset_variables()
Resets the internal variables of the AsyncFunctionHandler.
Definition: async_function_handler.hpp:179
bool is_running() const
Check if the async worker thread is running.
Definition: async_function_handler.hpp:225
const std::thread & get_thread() const
Get the const version of async worker thread.
Definition: async_function_handler.hpp:243
const rclcpp::Time & get_current_callback_time() const
Get the current callback time.
Definition: async_function_handler.hpp:163
void init(std::function< T(const rclcpp::Time &, const rclcpp::Duration &)> callback, int thread_priority=50)
Initialize the AsyncFunctionHandler with the callback and thread_priority.
Definition: async_function_handler.hpp:59
std::thread & get_thread()
Get the async worker thread.
Definition: async_function_handler.hpp:237
void stop_thread()
Stops the callback thread.
Definition: async_function_handler.hpp:256
void join_async_callback_thread()
Join the async callback thread.
Definition: async_function_handler.hpp:214
const rclcpp::Duration & get_current_callback_period() const
Get the current callback period.
Definition: async_function_handler.hpp:169
bool is_stopped() const
Check if the async callback is triggered to stop the cycle.
Definition: async_function_handler.hpp:231
bool is_initialized() const
Check if the AsyncFunctionHandler is initialized.
Definition: async_function_handler.hpp:207
T get_last_return_value() const
Get the last return value of the async callback method.
Definition: async_function_handler.hpp:157
std::pair< bool, T > trigger_async_callback(const rclcpp::Time &time, const rclcpp::Duration &period)
Triggers the async callback method cycle.
Definition: async_function_handler.hpp:121
A pthread mutex wrapper that provides a mutex with the priority inheritance protocol and a priority c...
Definition: async_function_handler.hpp:38
bool configure_sched_fifo(int priority)
Definition: realtime_helpers.cpp:54