37#ifndef REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_
38#define REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_
42#include <condition_variable>
49#include "rclcpp/publisher.hpp"
53template <
class MessageT>
59 using PublisherSharedPtr =
typename rclcpp::Publisher<MessageT>::SharedPtr;
61 using PublishedType =
typename rclcpp::TypeAdapter<MessageT>::custom_type;
62 using ROSMessageType =
typename rclcpp::TypeAdapter<MessageT>::ros_message_type;
76 : publisher_(publisher), is_running_(false), keep_running_(true), turn_(State::LOOP_NOT_STARTED)
78 thread_ = std::thread(&RealtimePublisher::publishingLoop,
this);
83 while (!thread_.joinable() ||
84 turn_.load(std::memory_order_acquire) == State::LOOP_NOT_STARTED) {
85 std::this_thread::sleep_for(std::chrono::microseconds(100));
92 RCLCPP_DEBUG(rclcpp::get_logger(
"realtime_tools"),
"Waiting for publishing thread to stop....");
94 while (is_running()) {
95 std::this_thread::sleep_for(std::chrono::microseconds(100));
98 rclcpp::get_logger(
"realtime_tools"),
"Publishing thread stopped, joining thread....");
99 if (thread_.joinable()) {
114 std::unique_lock<std::mutex> lock(msg_mutex_);
115 keep_running_ =
false;
117 updated_cond_.notify_one();
126 std::unique_lock<std::mutex> lock(msg_mutex_, std::try_to_lock);
141 std::unique_lock<std::mutex> lock(msg_mutex_, std::try_to_lock);
144 std::unique_lock<std::mutex> scoped_lock(std::move(lock));
146 turn_.store(State::NON_REALTIME, std::memory_order_release);
148 updated_cond_.notify_one();
176 const std::mutex &
get_mutex()
const {
return msg_mutex_; }
184 bool can_publish(std::unique_lock<std::mutex> & lock)
const
186 return turn_.load(std::memory_order_acquire) == State::REALTIME && lock.owns_lock();
190 RealtimePublisher(
const RealtimePublisher &) =
delete;
191 RealtimePublisher & operator=(
const RealtimePublisher &) =
delete;
193 bool is_running()
const {
return is_running_; }
206 void publishingLoop()
210 while (keep_running_) {
214 turn_.store(State::REALTIME, std::memory_order_release);
216 std::unique_lock<std::mutex> lock_(msg_mutex_);
217 updated_cond_.wait(lock_, [&] {
return turn_ == State::NON_REALTIME || !keep_running_; });
223 publisher_->publish(outgoing);
229 PublisherSharedPtr publisher_;
230 std::atomic<bool> is_running_;
231 std::atomic<bool> keep_running_;
237 mutable std::mutex msg_mutex_;
238 std::condition_variable updated_cond_;
240 enum class State :
int { REALTIME, NON_REALTIME, LOOP_NOT_STARTED };
241 std::atomic<State> turn_;
244template <
class MessageT>
245using RealtimePublisherSharedPtr = std::shared_ptr<RealtimePublisher<MessageT>>;