Added syncronization to t_send worker thread

Which avoids possible deadlocks
This commit is contained in:
Peter van der Perk
2019-11-07 18:20:39 +01:00
committed by Nuno Marques
parent f0d22d3962
commit 991399f105
5 changed files with 69 additions and 71 deletions
+3 -30
View File
@@ -61,12 +61,12 @@ except AttributeError:
#include "RtpsTopics.h" #include "RtpsTopics.h"
bool RtpsTopics::init(std::condition_variable* cv) bool RtpsTopics::init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue<uint8_t>* t_send_queue)
{ {
@[if recv_topics]@ @[if recv_topics]@
// Initialise subscribers // Initialise subscribers
@[for topic in recv_topics]@ @[for topic in recv_topics]@
if (_@(topic)_sub.init(cv)) { if (_@(topic)_sub.init(@(rtps_message_id(ids, topic)), t_send_queue_cv, t_send_queue_mutex, t_send_queue)) {
std::cout << "@(topic) subscriber started" << std::endl; std::cout << "@(topic) subscriber started" << std::endl;
} else { } else {
std::cout << "ERROR starting @(topic) subscriber" << std::endl; std::cout << "ERROR starting @(topic) subscriber" << std::endl;
@@ -126,34 +126,6 @@ void RtpsTopics::publish(uint8_t topic_ID, char data_buffer[], size_t len)
@[end if]@ @[end if]@
@[if recv_topics]@ @[if recv_topics]@
bool RtpsTopics::hasMsg(uint8_t *topic_ID)
{
if (nullptr == topic_ID) return false;
*topic_ID = 0;
while (_next_sub_idx < @(len(recv_topics)) && 0 == *topic_ID)
{
switch (_sub_topics[_next_sub_idx])
{
@[for topic in recv_topics]@
case @(rtps_message_id(ids, topic)): if (_@(topic)_sub.hasMsg()) *topic_ID = @(rtps_message_id(ids, topic)); break;
@[end for]@
default:
printf("Unexpected topic ID to check hasMsg\n");
break;
}
_next_sub_idx++;
}
if (0 == *topic_ID)
{
_next_sub_idx = 0;
return false;
}
return true;
}
bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr) bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr)
{ {
bool ret = false; bool ret = false;
@@ -178,6 +150,7 @@ bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr)
@[ end if]@ @[ end if]@
msg.serialize(scdr); msg.serialize(scdr);
ret = true; ret = true;
_@(topic)_sub.unlockMsg();
} }
break; break;
@[end for]@ @[end for]@
+2 -8
View File
@@ -55,6 +55,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer
#include <fastcdr/Cdr.h> #include <fastcdr/Cdr.h>
#include <condition_variable> #include <condition_variable>
#include <queue>
@[for topic in send_topics]@ @[for topic in send_topics]@
#include "@(topic)_Publisher.h" #include "@(topic)_Publisher.h"
@@ -65,12 +66,11 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer
class RtpsTopics { class RtpsTopics {
public: public:
bool init(std::condition_variable* cv); bool init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue<uint8_t>* t_send_queue);
@[if send_topics]@ @[if send_topics]@
void publish(uint8_t topic_ID, char data_buffer[], size_t len); void publish(uint8_t topic_ID, char data_buffer[], size_t len);
@[end if]@ @[end if]@
@[if recv_topics]@ @[if recv_topics]@
bool hasMsg(uint8_t *topic_ID);
bool getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr); bool getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr);
@[end if]@ @[end if]@
@@ -88,11 +88,5 @@ private:
@(topic)_Subscriber _@(topic)_sub; @(topic)_Subscriber _@(topic)_sub;
@[end for]@ @[end for]@
unsigned _next_sub_idx = 0;
unsigned char _sub_topics[@(len(recv_topics))] = {
@[for topic in recv_topics]@
@(rtps_message_id(ids, topic)), // @(topic)
@[end for]@
};
@[end if]@ @[end if]@
}; };
+26 -6
View File
@@ -74,9 +74,12 @@ except AttributeError:
@(topic)_Subscriber::~@(topic)_Subscriber() { Domain::removeParticipant(mp_participant);} @(topic)_Subscriber::~@(topic)_Subscriber() { Domain::removeParticipant(mp_participant);}
bool @(topic)_Subscriber::init(std::condition_variable* cv) bool @(topic)_Subscriber::init(uint8_t topic_ID, std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue<uint8_t>* t_send_queue)
{ {
m_listener.cv_msg = cv; m_listener.topic_ID = topic_ID;
m_listener.t_send_queue_cv = t_send_queue_cv;
m_listener.t_send_queue_mutex = t_send_queue_mutex;
m_listener.t_send_queue = t_send_queue;
// Create RTPSParticipant // Create RTPSParticipant
ParticipantAttributes PParam; ParticipantAttributes PParam;
@@ -132,17 +135,27 @@ void @(topic)_Subscriber::SubListener::onSubscriptionMatched(Subscriber* sub, Ma
void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber* sub) void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber* sub)
{ {
std::unique_lock<std::mutex> has_msg_lock(has_msg_mutex);
if(has_msg.load() == true) // Check if msg has been fetched
{
has_msg_cv.wait(has_msg_lock); // Wait till msg has been fetched
}
has_msg_lock.unlock();
// Take data // Take data
if(sub->takeNextData(&msg, &m_info)) if(sub->takeNextData(&msg, &m_info))
{ {
if(m_info.sampleKind == ALIVE) if(m_info.sampleKind == ALIVE)
{ {
// Print your structure data here. std::unique_lock<std::mutex> lk(*t_send_queue_mutex);
++n_msg; ++n_msg;
//std::cout << "Sample received, count=" << n_msg << std::endl;
has_msg = true; has_msg = true;
cv_msg->notify_all(); t_send_queue->push(topic_ID);
lk.unlock();
t_send_queue_cv->notify_one();
} }
} }
@@ -174,6 +187,13 @@ bool @(topic)_Subscriber::hasMsg()
@[ end if]@ @[ end if]@
@[end if]@ @[end if]@
{ {
m_listener.has_msg = false;
return m_listener.msg; return m_listener.msg;
} }
void @(topic)_Subscriber::unlockMsg()
{
std::unique_lock<std::mutex> has_msg_lock(m_listener.has_msg_mutex);
m_listener.has_msg = false;
has_msg_lock.unlock();
m_listener.has_msg_cv.notify_one();
}
+10 -2
View File
@@ -76,6 +76,7 @@ except AttributeError:
#include <atomic> #include <atomic>
#include <condition_variable> #include <condition_variable>
#include <queue>
using namespace eprosima::fastrtps; using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps; using namespace eprosima::fastrtps::rtps;
@@ -85,7 +86,7 @@ class @(topic)_Subscriber
public: public:
@(topic)_Subscriber(); @(topic)_Subscriber();
virtual ~@(topic)_Subscriber(); virtual ~@(topic)_Subscriber();
bool init(std::condition_variable* cv); bool init(uint8_t topic_ID, std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue<uint8_t>* t_send_queue);
void run(); void run();
bool hasMsg(); bool hasMsg();
@[if 1.5 <= fastrtpsgen_version <= 1.7]@ @[if 1.5 <= fastrtpsgen_version <= 1.7]@
@@ -101,6 +102,8 @@ public:
@(topic) getMsg(); @(topic) getMsg();
@[ end if]@ @[ end if]@
@[end if]@ @[end if]@
void unlockMsg();
private: private:
Participant *mp_participant; Participant *mp_participant;
Subscriber *mp_subscriber; Subscriber *mp_subscriber;
@@ -129,7 +132,12 @@ private:
@[ end if]@ @[ end if]@
@[end if]@ @[end if]@
std::atomic_bool has_msg; std::atomic_bool has_msg;
std::condition_variable* cv_msg; uint8_t topic_ID;
std::condition_variable* t_send_queue_cv;
std::mutex* t_send_queue_mutex;
std::queue<uint8_t>* t_send_queue;
std::condition_variable has_msg_cv;
std::mutex has_msg_mutex;
} m_listener; } m_listener;
@[if 1.5 <= fastrtpsgen_version <= 1.7]@ @[if 1.5 <= fastrtpsgen_version <= 1.7]@
+28 -25
View File
@@ -60,6 +60,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer
#include <csignal> #include <csignal>
#include <termios.h> #include <termios.h>
#include <condition_variable> #include <condition_variable>
#include <queue>
#include <fastcdr/Cdr.h> #include <fastcdr/Cdr.h>
#include <fastcdr/FastCdr.h> #include <fastcdr/FastCdr.h>
@@ -189,38 +190,39 @@ void signal_handler(int signum)
@[if recv_topics]@ @[if recv_topics]@
std::atomic<bool> exit_sender_thread(false); std::atomic<bool> exit_sender_thread(false);
std::condition_variable cv_msg; std::condition_variable t_send_queue_cv;
std::mutex cv_m; std::mutex t_send_queue_mutex;
std::queue<uint8_t> t_send_queue;
void t_send(void *data) void t_send(void *data)
{ {
char data_buffer[BUFFER_SIZE] = {}; char data_buffer[BUFFER_SIZE] = {};
int length = 0; int length = 0;
uint8_t topic_ID = 255;
std::unique_lock<std::mutex> lk(cv_m);
while (running) while (running && !exit_sender_thread.load())
{ {
// Send subscribed topics over UART std::unique_lock<std::mutex> lk(t_send_queue_mutex);
while (topics.hasMsg(&topic_ID) && !exit_sender_thread.load()) while (t_send_queue.empty() && !exit_sender_thread.load())
{ {
uint16_t header_length = transport_node->get_header_length(); t_send_queue_cv.wait(lk);
/* make room for the header to fill in later */ }
eprosima::fastcdr::FastBuffer cdrbuffer(&data_buffer[header_length], sizeof(data_buffer)-header_length); uint8_t topic_ID = t_send_queue.front();
eprosima::fastcdr::Cdr scdr(cdrbuffer); t_send_queue.pop();
if (topics.getMsg(topic_ID, scdr)) lk.unlock();
uint16_t header_length = transport_node->get_header_length();
/* make room for the header to fill in later */
eprosima::fastcdr::FastBuffer cdrbuffer(&data_buffer[header_length], sizeof(data_buffer)-header_length);
eprosima::fastcdr::Cdr scdr(cdrbuffer);
if (topics.getMsg(topic_ID, scdr))
{
length = scdr.getSerializedDataLength();
if (0 < (length = transport_node->write(topic_ID, data_buffer, length)))
{ {
length = scdr.getSerializedDataLength(); total_sent += length;
if (0 < (length = transport_node->write(topic_ID, data_buffer, length))) ++sent;
{
total_sent += length;
++sent;
}
} }
} }
cv_msg.wait_for(lk, std::chrono::microseconds(_options.sleep_us));
} }
} }
@[end if]@ @[end if]@
@@ -274,7 +276,7 @@ int main(int argc, char** argv)
std::chrono::time_point<std::chrono::steady_clock> start, end; std::chrono::time_point<std::chrono::steady_clock> start, end;
@[end if]@ @[end if]@
topics.init(&cv_msg); topics.init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue);
running = true; running = true;
@[if recv_topics]@ @[if recv_topics]@
@@ -307,12 +309,13 @@ int main(int argc, char** argv)
received = sent = total_read = total_sent = 0; received = sent = total_read = total_sent = 0;
receiving = false; receiving = false;
} }
@[else]@
@[end if]@
usleep(_options.sleep_us); usleep(_options.sleep_us);
@[end if]@
} }
@[if recv_topics]@ @[if recv_topics]@
exit_sender_thread = true; exit_sender_thread = true;
t_send_queue_cv.notify_one();
sender_thread.join(); sender_thread.join();
@[end if]@ @[end if]@
delete transport_node; delete transport_node;