diff --git a/msg/templates/urtps/RtpsTopics.cpp.em b/msg/templates/urtps/RtpsTopics.cpp.em index 666089ed858..eea687d2e4c 100644 --- a/msg/templates/urtps/RtpsTopics.cpp.em +++ b/msg/templates/urtps/RtpsTopics.cpp.em @@ -61,12 +61,12 @@ except AttributeError: #include "RtpsTopics.h" -bool RtpsTopics::init() +bool RtpsTopics::init(std::condition_variable* cv) { @[if recv_topics]@ // Initialise subscribers @[for topic in recv_topics]@ - if (_@(topic)_sub.init()) { + if (_@(topic)_sub.init(cv)) { std::cout << "@(topic) subscriber started" << std::endl; } else { std::cout << "ERROR starting @(topic) subscriber" << std::endl; diff --git a/msg/templates/urtps/RtpsTopics.h.em b/msg/templates/urtps/RtpsTopics.h.em index afe0dba74f3..052ec984cbb 100644 --- a/msg/templates/urtps/RtpsTopics.h.em +++ b/msg/templates/urtps/RtpsTopics.h.em @@ -54,6 +54,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer ****************************************************************************/ #include +#include @[for topic in send_topics]@ #include "@(topic)_Publisher.h" @@ -64,7 +65,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer class RtpsTopics { public: - bool init(); + bool init(std::condition_variable* cv); @[if send_topics]@ void publish(uint8_t topic_ID, char data_buffer[], size_t len); @[end if]@ diff --git a/msg/templates/urtps/Subscriber.cpp.em b/msg/templates/urtps/Subscriber.cpp.em index e475f61045a..4699555103d 100644 --- a/msg/templates/urtps/Subscriber.cpp.em +++ b/msg/templates/urtps/Subscriber.cpp.em @@ -74,8 +74,10 @@ except AttributeError: @(topic)_Subscriber::~@(topic)_Subscriber() { Domain::removeParticipant(mp_participant);} -bool @(topic)_Subscriber::init() +bool @(topic)_Subscriber::init(std::condition_variable* cv) { + m_listener.cv_msg = cv; + // Create RTPSParticipant ParticipantAttributes PParam; PParam.rtps.builtin.domainId = 0; // MUST BE THE SAME AS IN THE PUBLISHER @@ -131,21 +133,7 @@ void @(topic)_Subscriber::SubListener::onSubscriptionMatched(Subscriber* sub, Ma void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber* sub) { // Take data -@[if 1.5 <= fastrtpsgen_version <= 1.7]@ -@[ if ros2_distro]@ - @(package)::msg::dds_::@(topic)_ st; -@[ else]@ - @(topic)_ st; -@[ end if]@ -@[else]@ -@[ if ros2_distro]@ - @(package)::msg::@(topic) st; -@[ else]@ - @(topic) st; -@[ end if]@ -@[end if]@ - - if(sub->takeNextData(&st, &m_info)) + if(sub->takeNextData(&msg, &m_info)) { if(m_info.sampleKind == ALIVE) { @@ -153,6 +141,8 @@ void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber* sub) ++n_msg; //std::cout << "Sample received, count=" << n_msg << std::endl; has_msg = true; + + cv_msg->notify_all(); } } @@ -167,7 +157,7 @@ void @(topic)_Subscriber::run() bool @(topic)_Subscriber::hasMsg() { - return m_listener.has_msg; + return m_listener.has_msg.load(); } @[if 1.5 <= fastrtpsgen_version <= 1.7]@ diff --git a/msg/templates/urtps/Subscriber.h.em b/msg/templates/urtps/Subscriber.h.em index 64e5fbcecdf..1714ef08059 100644 --- a/msg/templates/urtps/Subscriber.h.em +++ b/msg/templates/urtps/Subscriber.h.em @@ -74,6 +74,9 @@ except AttributeError: #include "@(topic)PubSubTypes.h" @[end if]@ +#include +#include + using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; @@ -82,7 +85,7 @@ class @(topic)_Subscriber public: @(topic)_Subscriber(); virtual ~@(topic)_Subscriber(); - bool init(); + bool init(std::condition_variable* cv); void run(); bool hasMsg(); @[if 1.5 <= fastrtpsgen_version <= 1.7]@ @@ -105,7 +108,7 @@ private: class SubListener : public SubscriberListener { public: - SubListener() : n_matched(0), n_msg(0){}; + SubListener() : n_matched(0), n_msg(0), has_msg(false){}; ~SubListener(){}; void onSubscriptionMatched(Subscriber* sub, MatchingInfo& info); void onNewDataMessage(Subscriber* sub); @@ -125,7 +128,8 @@ private: @(topic) msg; @[ end if]@ @[end if]@ - bool has_msg = false; + std::atomic_bool has_msg; + std::condition_variable* cv_msg; } m_listener; @[if 1.5 <= fastrtpsgen_version <= 1.7]@ diff --git a/msg/templates/urtps/microRTPS_agent.cpp.em b/msg/templates/urtps/microRTPS_agent.cpp.em index 8df408edd8e..84f5ff1500e 100644 --- a/msg/templates/urtps/microRTPS_agent.cpp.em +++ b/msg/templates/urtps/microRTPS_agent.cpp.em @@ -59,6 +59,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer #include #include #include +#include #include #include @@ -188,11 +189,16 @@ void signal_handler(int signum) @[if recv_topics]@ std::atomic exit_sender_thread(false); +std::condition_variable cv_msg; +std::mutex cv_m; + void t_send(void *data) { char data_buffer[BUFFER_SIZE] = {}; int length = 0; uint8_t topic_ID = 255; + + std::unique_lock lk(cv_m); while (running) { @@ -214,7 +220,7 @@ void t_send(void *data) } } - usleep(_options.sleep_us); + cv_msg.wait_for(lk, std::chrono::microseconds(_options.sleep_us)); } } @[end if]@ @@ -268,7 +274,7 @@ int main(int argc, char** argv) std::chrono::time_point start, end; @[end if]@ - topics.init(); + topics.init(&cv_msg); running = true; @[if recv_topics]@