37#ifndef REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_
38#define REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_
42#include <condition_variable>
48#include "rclcpp/publisher.hpp"
52template <
class MessageT>
58 using PublisherSharedPtr =
typename rclcpp::Publisher<MessageT>::SharedPtr;
60 using PublishedType =
typename rclcpp::TypeAdapter<MessageT>::custom_type;
61 using ROSMessageType =
typename rclcpp::TypeAdapter<MessageT>::ros_message_type;
78 : publisher_(publisher), is_running_(false), keep_running_(true), turn_(State::LOOP_NOT_STARTED)
80 thread_ = std::thread(&RealtimePublisher::publishingLoop,
this);
85 while (!thread_.joinable() ||
86 turn_.load(std::memory_order_acquire) == State::LOOP_NOT_STARTED) {
87 std::this_thread::sleep_for(std::chrono::microseconds(100));
92 "Use constructor with rclcpp::Publisher<T>::SharedPtr instead - this class does not make sense "
93 "without a real publisher")]]
95 : is_running_(false), keep_running_(false), turn_(State::LOOP_NOT_STARTED)
102 RCLCPP_DEBUG(rclcpp::get_logger(
"realtime_tools"),
"Waiting for publishing thread to stop....");
104 while (is_running()) {
105 std::this_thread::sleep_for(std::chrono::microseconds(100));
108 rclcpp::get_logger(
"realtime_tools"),
"Publishing thread stopped, joining thread....");
109 if (thread_.joinable()) {
124 std::unique_lock<std::mutex>
lock(msg_mutex_);
125 keep_running_ =
false;
127 updated_cond_.notify_one();
140 if (turn_.load(std::memory_order_acquire) == State::REALTIME && msg_mutex_.try_lock()) {
176 turn_.store(State::NON_REALTIME, std::memory_order_release);
186 void lock() { msg_mutex_.lock(); }
195 updated_cond_.notify_one();
203 bool is_running()
const {
return is_running_; }
216 void publishingLoop()
220 while (keep_running_) {
224 turn_.store(State::REALTIME, std::memory_order_release);
226 std::unique_lock<std::mutex> lock_(msg_mutex_);
227 updated_cond_.wait(lock_, [&] {
return turn_ == State::NON_REALTIME || !keep_running_; });
233 publisher_->publish(outgoing);
239 PublisherSharedPtr publisher_;
240 std::atomic<bool> is_running_;
241 std::atomic<bool> keep_running_;
245 std::mutex msg_mutex_;
246 std::condition_variable updated_cond_;
248 enum class State :
int { REALTIME, NON_REALTIME, LOOP_NOT_STARTED };
249 std::atomic<State> turn_;
252template <
class MessageT>
253using RealtimePublisherSharedPtr = std::shared_ptr<RealtimePublisher<MessageT>>;