37#ifndef REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_
38#define REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_
42#include <condition_variable>
49#include "rclcpp/publisher.hpp"
53template <
class MessageT>
59 using PublisherSharedPtr =
typename rclcpp::Publisher<MessageT>::SharedPtr;
61 using PublishedType =
typename rclcpp::TypeAdapter<MessageT>::custom_type;
62 using ROSMessageType =
typename rclcpp::TypeAdapter<MessageT>::ros_message_type;
67 "This variable is deprecated, it is recommended to use the try_publish() method instead.")]]
81#pragma warning(disable : 4996)
83#pragma GCC diagnostic push
84#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
87 : publisher_(publisher), is_running_(false), keep_running_(true), turn_(State::LOOP_NOT_STARTED)
89 thread_ = std::thread(&RealtimePublisher::publishingLoop,
this);
94 while (!thread_.joinable() ||
95 turn_.load(std::memory_order_acquire) == State::LOOP_NOT_STARTED) {
96 std::this_thread::sleep_for(std::chrono::microseconds(100));
103 RCLCPP_DEBUG(rclcpp::get_logger(
"realtime_tools"),
"Waiting for publishing thread to stop....");
105 while (is_running()) {
106 std::this_thread::sleep_for(std::chrono::microseconds(100));
109 rclcpp::get_logger(
"realtime_tools"),
"Publishing thread stopped, joining thread....");
110 if (thread_.joinable()) {
117#pragma GCC diagnostic pop
130 std::unique_lock<std::mutex>
lock(msg_mutex_);
131 keep_running_ =
false;
133 updated_cond_.notify_one();
145 "Use try_publish() method instead of this method. This method may be removed in future "
149 return turn_.load(std::memory_order_acquire) == State::REALTIME && msg_mutex_.try_lock();
158 std::unique_lock<std::mutex>
lock(msg_mutex_, std::try_to_lock);
173 std::unique_lock<std::mutex>
lock(msg_mutex_, std::try_to_lock);
176 std::unique_lock<std::mutex> scoped_lock(std::move(
lock));
179#pragma warning(disable : 4996)
181#pragma GCC diagnostic push
182#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
188#pragma GCC diagnostic pop
190 turn_.store(State::NON_REALTIME, std::memory_order_release);
192 updated_cond_.notify_one();
210 "Use try_publish() method instead of this method. This method may be removed in future "
225 "Use the try_publish() method to publish the message instead of using this method. This method "
226 "may be removed in future versions.")]]
229 turn_.store(State::NON_REALTIME, std::memory_order_release);
230#pragma GCC diagnostic push
231#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
233#pragma GCC diagnostic pop
243 "Use the try_publish() method to publish the message instead of using this method. This method "
244 "may be removed in future versions.")]]
255 "Use the try_publish() method to publish the message instead of using this method. This method "
256 "may be removed in future versions.")]]
260 updated_cond_.notify_one();
278 "This getter method will be removed. It is recommended to use the try_publish() instead of "
279 "accessing the msg_ variable.")]]
280 const MessageT & get_msg()
const
284#pragma warning(disable : 4996)
286#pragma GCC diagnostic push
287#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
293#pragma GCC diagnostic pop
305 const std::mutex &
get_mutex()
const {
return msg_mutex_; }
315 return turn_.load(std::memory_order_acquire) == State::REALTIME &&
lock.owns_lock();
319 RealtimePublisher(
const RealtimePublisher &) =
delete;
320 RealtimePublisher & operator=(
const RealtimePublisher &) =
delete;
322 bool is_running()
const {
return is_running_; }
335 void publishingLoop()
339 while (keep_running_) {
343 turn_.store(State::REALTIME, std::memory_order_release);
345 std::unique_lock<std::mutex> lock_(msg_mutex_);
346 updated_cond_.wait(lock_, [&] {
return turn_ == State::NON_REALTIME || !keep_running_; });
349#pragma warning(disable : 4996)
351#pragma GCC diagnostic push
352#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
358#pragma GCC diagnostic pop
364 publisher_->publish(outgoing);
370 PublisherSharedPtr publisher_;
371 std::atomic<bool> is_running_;
372 std::atomic<bool> keep_running_;
376 mutable std::mutex msg_mutex_;
377 std::condition_variable updated_cond_;
379 enum class State :
int { REALTIME, NON_REALTIME, LOOP_NOT_STARTED };
380 std::atomic<State> turn_;
383template <
class MessageT>
384using RealtimePublisherSharedPtr = std::shared_ptr<RealtimePublisher<MessageT>>;