ros2_control - kilted
Loading...
Searching...
No Matches
realtime_publisher.hpp
1// Copyright (c) 2008, Willow Garage, Inc.
2//
3// Redistribution and use in source and binary forms, with or without
4// modification, are permitted provided that the following conditions are met:
5//
6// * Redistributions of source code must retain the above copyright
7// notice, this list of conditions and the following disclaimer.
8//
9// * Redistributions in binary form must reproduce the above copyright
10// notice, this list of conditions and the following disclaimer in the
11// documentation and/or other materials provided with the distribution.
12//
13// * Neither the name of the Willow Garage, Inc. nor the names of its
14// contributors may be used to endorse or promote products derived from
15// this software without specific prior written permission.
16//
17// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27// POSSIBILITY OF SUCH DAMAGE.
28
29/*
30 * Publishing ROS messages is difficult, as the publish function is
31 * not realtime safe. This class provides the proper locking so that
32 * you can call publish in realtime and a separate (non-realtime)
33 * thread will ensure that the message gets published over ROS.
34 *
35 * Author: Stuart Glaser
36 */
37#ifndef REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_
38#define REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_
39
40#include <atomic>
41#include <chrono>
42#include <condition_variable>
43#include <memory>
44#include <mutex>
45#include <string>
46#include <thread>
47#include <utility>
48
49#include "rclcpp/publisher.hpp"
50
51namespace realtime_tools
52{
53template <class MessageT>
55{
56public:
58 using PublisherType = rclcpp::Publisher<MessageT>;
59 using PublisherSharedPtr = typename rclcpp::Publisher<MessageT>::SharedPtr;
60
61 using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
62 using ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type;
63
64 RCLCPP_SMART_PTR_DEFINITIONS(RealtimePublisher<MessageT>)
65
66 [[deprecated(
67 "This variable is deprecated, it is recommended to use the try_publish() method instead.")]]
68 MessageT msg_;
69
79#ifdef _MSC_VER
80#pragma warning(push)
81#pragma warning(disable : 4996)
82#else
83#pragma GCC diagnostic push
84#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
85#endif
86 explicit RealtimePublisher(PublisherSharedPtr publisher)
87 : publisher_(publisher), is_running_(false), keep_running_(true), turn_(State::LOOP_NOT_STARTED)
88 {
89 thread_ = std::thread(&RealtimePublisher::publishingLoop, this);
90
91 // Wait for the thread to be ready before proceeding
92 // This is important to ensure that the thread is properly initialized and ready to handle
93 // messages before any other operations are performed on the RealtimePublisher instance.
94 while (!thread_.joinable() ||
95 turn_.load(std::memory_order_acquire) == State::LOOP_NOT_STARTED) {
96 std::this_thread::sleep_for(std::chrono::microseconds(100));
97 }
98 }
99
102 {
103 RCLCPP_DEBUG(rclcpp::get_logger("realtime_tools"), "Waiting for publishing thread to stop....");
104 stop();
105 while (is_running()) {
106 std::this_thread::sleep_for(std::chrono::microseconds(100));
107 }
108 RCLCPP_DEBUG(
109 rclcpp::get_logger("realtime_tools"), "Publishing thread stopped, joining thread....");
110 if (thread_.joinable()) {
111 thread_.join();
112 }
113 }
114#ifdef _MSC_VER
115#pragma warning(pop)
116#else
117#pragma GCC diagnostic pop
118#endif
119
127 void stop()
128 {
129 {
130 std::unique_lock<std::mutex> lock(msg_mutex_);
131 keep_running_ = false;
132 }
133 updated_cond_.notify_one(); // So the publishing loop can exit
134 }
135
144 [[deprecated(
145 "Use try_publish() method instead of this method. This method may be removed in future "
146 "versions.")]]
147 bool trylock()
148 {
149 return turn_.load(std::memory_order_acquire) == State::REALTIME && msg_mutex_.try_lock();
150 }
151
156 bool can_publish() const
157 {
158 std::unique_lock<std::mutex> lock(msg_mutex_, std::try_to_lock);
159 return can_publish(lock);
160 }
161
171 bool try_publish(const MessageT & msg)
172 {
173 std::unique_lock<std::mutex> lock(msg_mutex_, std::try_to_lock);
174 if (can_publish(lock)) {
175 {
176 std::unique_lock<std::mutex> scoped_lock(std::move(lock));
177#ifdef _MSC_VER
178#pragma warning(push)
179#pragma warning(disable : 4996)
180#else
181#pragma GCC diagnostic push
182#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
183#endif
184 msg_ = msg;
185#ifdef _MSC_VER
186#pragma warning(pop)
187#else
188#pragma GCC diagnostic pop
189#endif
190 turn_.store(State::NON_REALTIME, std::memory_order_release);
191 }
192 updated_cond_.notify_one(); // Notify the publishing thread
193 return true;
194 }
195 return false;
196 }
197
209 [[deprecated(
210 "Use try_publish() method instead of this method. This method may be removed in future "
211 "versions.")]]
212 bool tryPublish(const MessageT & msg)
213 {
214 return try_publish(msg);
215 }
216
224 [[deprecated(
225 "Use the try_publish() method to publish the message instead of using this method. This method "
226 "may be removed in future versions.")]]
228 {
229 turn_.store(State::NON_REALTIME, std::memory_order_release);
230#pragma GCC diagnostic push
231#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
232 unlock();
233#pragma GCC diagnostic pop
234 }
235
242 [[deprecated(
243 "Use the try_publish() method to publish the message instead of using this method. This method "
244 "may be removed in future versions.")]]
245 void lock()
246 {
247 msg_mutex_.lock();
248 }
249
254 [[deprecated(
255 "Use the try_publish() method to publish the message instead of using this method. This method "
256 "may be removed in future versions.")]]
257 void unlock()
258 {
259 msg_mutex_.unlock();
260 updated_cond_.notify_one();
261 }
262
268 std::thread & get_thread() { return thread_; }
269
275 const std::thread & get_thread() const { return thread_; }
276
277 [[deprecated(
278 "This getter method will be removed. It is recommended to use the try_publish() instead of "
279 "accessing the msg_ variable.")]]
280 const MessageT & get_msg() const
281 {
282#ifdef _MSC_VER
283#pragma warning(push)
284#pragma warning(disable : 4996)
285#else
286#pragma GCC diagnostic push
287#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
288#endif
289 return msg_;
290#ifdef _MSC_VER
291#pragma warning(pop)
292#else
293#pragma GCC diagnostic pop
294#endif
295 }
296
300 std::mutex & get_mutex() { return msg_mutex_; }
301
305 const std::mutex & get_mutex() const { return msg_mutex_; }
306
307private:
313 bool can_publish(std::unique_lock<std::mutex> & lock) const
314 {
315 return turn_.load(std::memory_order_acquire) == State::REALTIME && lock.owns_lock();
316 }
317
318 // non-copyable
319 RealtimePublisher(const RealtimePublisher &) = delete;
320 RealtimePublisher & operator=(const RealtimePublisher &) = delete;
321
322 bool is_running() const { return is_running_; }
323
335 void publishingLoop()
336 {
337 is_running_ = true;
338
339 while (keep_running_) {
340 MessageT outgoing;
341
342 {
343 turn_.store(State::REALTIME, std::memory_order_release);
344 // Locks msg_ and copies it to outgoing
345 std::unique_lock<std::mutex> lock_(msg_mutex_);
346 updated_cond_.wait(lock_, [&] { return turn_ == State::NON_REALTIME || !keep_running_; });
347#ifdef _MSC_VER
348#pragma warning(push)
349#pragma warning(disable : 4996)
350#else
351#pragma GCC diagnostic push
352#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
353#endif
354 outgoing = msg_;
355#ifdef _MSC_VER
356#pragma warning(pop)
357#else
358#pragma GCC diagnostic pop
359#endif
360 }
361
362 // Sends the outgoing message
363 if (keep_running_) {
364 publisher_->publish(outgoing);
365 }
366 }
367 is_running_ = false;
368 }
369
370 PublisherSharedPtr publisher_;
371 std::atomic<bool> is_running_;
372 std::atomic<bool> keep_running_;
373
374 std::thread thread_;
375
376 mutable std::mutex msg_mutex_; // Protects msg_
377 std::condition_variable updated_cond_;
378
379 enum class State : int { REALTIME, NON_REALTIME, LOOP_NOT_STARTED };
380 std::atomic<State> turn_; // Who's turn is it to use msg_?
381};
382
383template <class MessageT>
384using RealtimePublisherSharedPtr = std::shared_ptr<RealtimePublisher<MessageT>>;
385
386} // namespace realtime_tools
387#endif // REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_
Definition realtime_publisher.hpp:55
std::mutex & get_mutex()
Get the mutex protecting the stored message.
Definition realtime_publisher.hpp:300
void stop()
Stop the realtime publisher.
Definition realtime_publisher.hpp:127
bool try_publish(const MessageT &msg)
Try to publish the given message.
Definition realtime_publisher.hpp:171
RealtimePublisher(PublisherSharedPtr publisher)
Constructor for the realtime publisher.
Definition realtime_publisher.hpp:86
bool tryPublish(const MessageT &msg)
Try to publish the given message (deprecated)
Definition realtime_publisher.hpp:212
bool trylock()
Try to acquire the data lock for non-realtime message publishing.
Definition realtime_publisher.hpp:147
std::thread & get_thread()
Get the thread object for the publishing thread.
Definition realtime_publisher.hpp:268
~RealtimePublisher()
Destructor.
Definition realtime_publisher.hpp:101
void unlock()
Unlocks the data without publishing anything.
Definition realtime_publisher.hpp:257
void unlockAndPublish()
Unlock the msg_ variable for the non-realtime thread to start publishing.
Definition realtime_publisher.hpp:227
rclcpp::Publisher< MessageT > PublisherType
Provide various typedefs to resemble the rclcpp::Publisher type.
Definition realtime_publisher.hpp:58
bool can_publish() const
Check if the realtime publisher is in a state to publish messages.
Definition realtime_publisher.hpp:156
const std::mutex & get_mutex() const
Get the mutex protecting the stored message.
Definition realtime_publisher.hpp:305
void lock()
Acquire the data lock.
Definition realtime_publisher.hpp:245
const std::thread & get_thread() const
Get the thread object for the publishing thread.
Definition realtime_publisher.hpp:275
A pthread mutex wrapper that provides a mutex with the priority inheritance protocol and a priority c...
Definition async_function_handler.hpp:40