37#ifndef REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_
38#define REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_
42#include <condition_variable>
49#include "rclcpp/publisher.hpp"
53template <
class MessageT>
59 using PublisherSharedPtr =
typename rclcpp::Publisher<MessageT>::SharedPtr;
61 using PublishedType =
typename rclcpp::TypeAdapter<MessageT>::custom_type;
62 using ROSMessageType =
typename rclcpp::TypeAdapter<MessageT>::ros_message_type;
78 : publisher_(publisher), is_running_(false), keep_running_(true), turn_(State::LOOP_NOT_STARTED)
80 thread_ = std::thread(&RealtimePublisher::publishingLoop,
this);
85 while (!thread_.joinable() ||
86 turn_.load(std::memory_order_acquire) == State::LOOP_NOT_STARTED) {
87 std::this_thread::sleep_for(std::chrono::microseconds(100));
94 RCLCPP_DEBUG(rclcpp::get_logger(
"realtime_tools"),
"Waiting for publishing thread to stop....");
96 while (is_running()) {
97 std::this_thread::sleep_for(std::chrono::microseconds(100));
100 rclcpp::get_logger(
"realtime_tools"),
"Publishing thread stopped, joining thread....");
101 if (thread_.joinable()) {
116 std::unique_lock<std::mutex>
lock(msg_mutex_);
117 keep_running_ =
false;
119 updated_cond_.notify_one();
131 "Use try_publish() method instead of this method. This method may be removed in future "
135 return turn_.load(std::memory_order_acquire) == State::REALTIME && msg_mutex_.try_lock();
145 std::unique_lock<std::mutex>
lock(msg_mutex_, std::try_to_lock);
160 std::unique_lock<std::mutex>
lock(msg_mutex_, std::try_to_lock);
163 std::unique_lock<std::mutex> scoped_lock(std::move(
lock));
165 turn_.store(State::NON_REALTIME, std::memory_order_release);
167 updated_cond_.notify_one();
185 "Use try_publish() method instead of this method. This method may be removed in future "
200 "Use the try_publish() method to publish the message instead of using this method. This method "
201 "may be removed in future versions.")]]
204 turn_.store(State::NON_REALTIME, std::memory_order_release);
205#pragma GCC diagnostic push
206#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
208#pragma GCC diagnostic pop
218 "Use the try_publish() method to publish the message instead of using this method. This method "
219 "may be removed in future versions.")]]
230 "Use the try_publish() method to publish the message instead of using this method. This method "
231 "may be removed in future versions.")]]
235 updated_cond_.notify_one();
238 std::thread & get_thread() {
return thread_; }
240 const std::thread & get_thread()
const {
return thread_; }
242 const MessageT & get_msg()
const {
return msg_; }
244 std::mutex & get_mutex() {
return msg_mutex_; }
246 const std::mutex & get_mutex()
const {
return msg_mutex_; }
257 return turn_.load(std::memory_order_acquire) == State::REALTIME &&
lock.owns_lock();
264 bool is_running()
const {
return is_running_; }
277 void publishingLoop()
281 while (keep_running_) {
285 turn_.store(State::REALTIME, std::memory_order_release);
287 std::unique_lock<std::mutex> lock_(msg_mutex_);
288 updated_cond_.wait(lock_, [&] {
return turn_ == State::NON_REALTIME || !keep_running_; });
294 publisher_->publish(outgoing);
300 PublisherSharedPtr publisher_;
301 std::atomic<bool> is_running_;
302 std::atomic<bool> keep_running_;
306 mutable std::mutex msg_mutex_;
307 std::condition_variable updated_cond_;
309 enum class State :
int { REALTIME, NON_REALTIME, LOOP_NOT_STARTED };
310 std::atomic<State> turn_;
313template <
class MessageT>
314using RealtimePublisherSharedPtr = std::shared_ptr<RealtimePublisher<MessageT>>;