Introduced condition variable in ROS2 subscriber to solve 500ms latency spikes

This commit is contained in:
Peter van der Perk
2019-11-05 15:02:12 +01:00
committed by Nuno Marques
parent ef475fa9d8
commit 08a27492b4
5 changed files with 26 additions and 25 deletions
+2 -2
View File
@@ -61,12 +61,12 @@ except AttributeError:
#include "RtpsTopics.h" #include "RtpsTopics.h"
bool RtpsTopics::init() bool RtpsTopics::init(std::condition_variable* cv)
{ {
@[if recv_topics]@ @[if recv_topics]@
// Initialise subscribers // Initialise subscribers
@[for topic in recv_topics]@ @[for topic in recv_topics]@
if (_@(topic)_sub.init()) { if (_@(topic)_sub.init(cv)) {
std::cout << "@(topic) subscriber started" << std::endl; std::cout << "@(topic) subscriber started" << std::endl;
} else { } else {
std::cout << "ERROR starting @(topic) subscriber" << std::endl; std::cout << "ERROR starting @(topic) subscriber" << std::endl;
+2 -1
View File
@@ -54,6 +54,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer
****************************************************************************/ ****************************************************************************/
#include <fastcdr/Cdr.h> #include <fastcdr/Cdr.h>
#include <condition_variable>
@[for topic in send_topics]@ @[for topic in send_topics]@
#include "@(topic)_Publisher.h" #include "@(topic)_Publisher.h"
@@ -64,7 +65,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer
class RtpsTopics { class RtpsTopics {
public: public:
bool init(); bool init(std::condition_variable* cv);
@[if send_topics]@ @[if send_topics]@
void publish(uint8_t topic_ID, char data_buffer[], size_t len); void publish(uint8_t topic_ID, char data_buffer[], size_t len);
@[end if]@ @[end if]@
+7 -17
View File
@@ -74,8 +74,10 @@ except AttributeError:
@(topic)_Subscriber::~@(topic)_Subscriber() { Domain::removeParticipant(mp_participant);} @(topic)_Subscriber::~@(topic)_Subscriber() { Domain::removeParticipant(mp_participant);}
bool @(topic)_Subscriber::init() bool @(topic)_Subscriber::init(std::condition_variable* cv)
{ {
m_listener.cv_msg = cv;
// Create RTPSParticipant // Create RTPSParticipant
ParticipantAttributes PParam; ParticipantAttributes PParam;
PParam.rtps.builtin.domainId = 0; // MUST BE THE SAME AS IN THE PUBLISHER PParam.rtps.builtin.domainId = 0; // MUST BE THE SAME AS IN THE PUBLISHER
@@ -131,21 +133,7 @@ void @(topic)_Subscriber::SubListener::onSubscriptionMatched(Subscriber* sub, Ma
void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber* sub) void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber* sub)
{ {
// Take data // Take data
@[if 1.5 <= fastrtpsgen_version <= 1.7]@ if(sub->takeNextData(&msg, &m_info))
@[ if ros2_distro]@
@(package)::msg::dds_::@(topic)_ st;
@[ else]@
@(topic)_ st;
@[ end if]@
@[else]@
@[ if ros2_distro]@
@(package)::msg::@(topic) st;
@[ else]@
@(topic) st;
@[ end if]@
@[end if]@
if(sub->takeNextData(&st, &m_info))
{ {
if(m_info.sampleKind == ALIVE) if(m_info.sampleKind == ALIVE)
{ {
@@ -153,6 +141,8 @@ void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber* sub)
++n_msg; ++n_msg;
//std::cout << "Sample received, count=" << n_msg << std::endl; //std::cout << "Sample received, count=" << n_msg << std::endl;
has_msg = true; has_msg = true;
cv_msg->notify_all();
} }
} }
@@ -167,7 +157,7 @@ void @(topic)_Subscriber::run()
bool @(topic)_Subscriber::hasMsg() bool @(topic)_Subscriber::hasMsg()
{ {
return m_listener.has_msg; return m_listener.has_msg.load();
} }
@[if 1.5 <= fastrtpsgen_version <= 1.7]@ @[if 1.5 <= fastrtpsgen_version <= 1.7]@
+7 -3
View File
@@ -74,6 +74,9 @@ except AttributeError:
#include "@(topic)PubSubTypes.h" #include "@(topic)PubSubTypes.h"
@[end if]@ @[end if]@
#include <atomic>
#include <condition_variable>
using namespace eprosima::fastrtps; using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps; using namespace eprosima::fastrtps::rtps;
@@ -82,7 +85,7 @@ class @(topic)_Subscriber
public: public:
@(topic)_Subscriber(); @(topic)_Subscriber();
virtual ~@(topic)_Subscriber(); virtual ~@(topic)_Subscriber();
bool init(); bool init(std::condition_variable* cv);
void run(); void run();
bool hasMsg(); bool hasMsg();
@[if 1.5 <= fastrtpsgen_version <= 1.7]@ @[if 1.5 <= fastrtpsgen_version <= 1.7]@
@@ -105,7 +108,7 @@ private:
class SubListener : public SubscriberListener class SubListener : public SubscriberListener
{ {
public: public:
SubListener() : n_matched(0), n_msg(0){}; SubListener() : n_matched(0), n_msg(0), has_msg(false){};
~SubListener(){}; ~SubListener(){};
void onSubscriptionMatched(Subscriber* sub, MatchingInfo& info); void onSubscriptionMatched(Subscriber* sub, MatchingInfo& info);
void onNewDataMessage(Subscriber* sub); void onNewDataMessage(Subscriber* sub);
@@ -125,7 +128,8 @@ private:
@(topic) msg; @(topic) msg;
@[ end if]@ @[ end if]@
@[end if]@ @[end if]@
bool has_msg = false; std::atomic_bool has_msg;
std::condition_variable* cv_msg;
} m_listener; } m_listener;
@[if 1.5 <= fastrtpsgen_version <= 1.7]@ @[if 1.5 <= fastrtpsgen_version <= 1.7]@
+8 -2
View File
@@ -59,6 +59,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer
#include <ctime> #include <ctime>
#include <csignal> #include <csignal>
#include <termios.h> #include <termios.h>
#include <condition_variable>
#include <fastcdr/Cdr.h> #include <fastcdr/Cdr.h>
#include <fastcdr/FastCdr.h> #include <fastcdr/FastCdr.h>
@@ -188,11 +189,16 @@ void signal_handler(int signum)
@[if recv_topics]@ @[if recv_topics]@
std::atomic<bool> exit_sender_thread(false); std::atomic<bool> exit_sender_thread(false);
std::condition_variable cv_msg;
std::mutex cv_m;
void t_send(void *data) void t_send(void *data)
{ {
char data_buffer[BUFFER_SIZE] = {}; char data_buffer[BUFFER_SIZE] = {};
int length = 0; int length = 0;
uint8_t topic_ID = 255; uint8_t topic_ID = 255;
std::unique_lock<std::mutex> lk(cv_m);
while (running) while (running)
{ {
@@ -214,7 +220,7 @@ void t_send(void *data)
} }
} }
usleep(_options.sleep_us); cv_msg.wait_for(lk, std::chrono::microseconds(_options.sleep_us));
} }
} }
@[end if]@ @[end if]@
@@ -268,7 +274,7 @@ int main(int argc, char** argv)
std::chrono::time_point<std::chrono::steady_clock> start, end; std::chrono::time_point<std::chrono::steady_clock> start, end;
@[end if]@ @[end if]@
topics.init(); topics.init(&cv_msg);
running = true; running = true;
@[if recv_topics]@ @[if recv_topics]@