microRTPS: agent: match the code style from the PX4 Firmware

This commit is contained in:
TSC21
2021-05-01 11:33:08 +02:00
committed by Nuno Marques
parent 323ce797f8
commit 63571b3e3f
12 changed files with 754 additions and 664 deletions
+79 -71
View File
@@ -74,107 +74,115 @@ except AttributeError:
#include "@(topic)_Publisher.h"
@(topic)_Publisher::@(topic)_Publisher()
: mp_participant(nullptr),
mp_publisher(nullptr)
: mp_participant(nullptr),
mp_publisher(nullptr)
{ }
@(topic)_Publisher::~@(topic)_Publisher()
{
Domain::removeParticipant(mp_participant);
Domain::removeParticipant(mp_participant);
}
bool @(topic)_Publisher::init(const std::string& ns)
bool @(topic)_Publisher::init(const std::string &ns)
{
// Create RTPSParticipant
ParticipantAttributes PParam;
// Create RTPSParticipant
ParticipantAttributes PParam;
@[if version.parse(fastrtps_version) < version.parse('2.0')]@
PParam.rtps.builtin.domainId = 0;
PParam.rtps.builtin.domainId = 0;
@[else]@
PParam.domainId = 0;
PParam.domainId = 0;
@[end if]@
@[if version.parse(fastrtps_version) <= version.parse('1.8.4')]@
PParam.rtps.builtin.leaseDuration = c_TimeInfinite;
PParam.rtps.builtin.leaseDuration = c_TimeInfinite;
@[else]@
PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite;
PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite;
@[end if]@
std::string nodeName = ns;
nodeName.append("@(topic)_publisher");
PParam.rtps.setName(nodeName.c_str());
std::string nodeName = ns;
nodeName.append("@(topic)_publisher");
PParam.rtps.setName(nodeName.c_str());
// Check if ROS_LOCALHOST_ONLY is set. This means that one wants to use
// only the localhost network for data sharing
const char* localhost_only = std::getenv("ROS_LOCALHOST_ONLY");
if (localhost_only && strcmp(localhost_only, "1") == 0) {
// Create a custom network transport descriptor to whitelist the localhost
auto localhostDescriptor = std::make_shared<UDPv4TransportDescriptor>();
localhostDescriptor->interfaceWhiteList.emplace_back("127.0.0.1");
// Check if ROS_LOCALHOST_ONLY is set. This means that one wants to use
// only the localhost network for data sharing
const char *localhost_only = std::getenv("ROS_LOCALHOST_ONLY");
// Disable the built-in Transport Layer
PParam.rtps.useBuiltinTransports = false;
if (localhost_only && strcmp(localhost_only, "1") == 0) {
// Create a custom network transport descriptor to whitelist the localhost
auto localhostDescriptor = std::make_shared<UDPv4TransportDescriptor>();
localhostDescriptor->interfaceWhiteList.emplace_back("127.0.0.1");
// Add the descriptor as a custom user transport
PParam.rtps.userTransports.push_back(localhostDescriptor);
}
// Disable the built-in Transport Layer
PParam.rtps.useBuiltinTransports = false;
mp_participant = Domain::createParticipant(PParam);
if(mp_participant == nullptr)
return false;
// Add the descriptor as a custom user transport
PParam.rtps.userTransports.push_back(localhostDescriptor);
}
// Register the type
Domain::registerType(mp_participant, static_cast<TopicDataType*>(&@(topic)DataType));
mp_participant = Domain::createParticipant(PParam);
// Create Publisher
PublisherAttributes Wparam;
Wparam.topic.topicKind = NO_KEY;
Wparam.topic.topicDataType = @(topic)DataType.getName();
if (mp_participant == nullptr) {
return false;
}
// Register the type
Domain::registerType(mp_participant, static_cast<TopicDataType *>(&@(topic)DataType));
// Create Publisher
PublisherAttributes Wparam;
Wparam.topic.topicKind = NO_KEY;
Wparam.topic.topicDataType = @(topic)DataType.getName();
@[if ros2_distro]@
@[ if ros2_distro == "ardent"]@
Wparam.qos.m_partition.push_back("rt");
std::string topicName = ns;
topicName.append("@(topic)_PubSubTopic");
Wparam.topic.topicName = topicName;
Wparam.qos.m_partition.push_back("rt");
std::string topicName = ns;
topicName.append("@(topic)_PubSubTopic");
Wparam.topic.topicName = topicName;
@[ else]@
std::string topicName = "rt/";
topicName.append(ns);
topicName.append("@(topic)_PubSubTopic");
Wparam.topic.topicName = topicName;
std::string topicName = "rt/";
topicName.append(ns);
topicName.append("@(topic)_PubSubTopic");
Wparam.topic.topicName = topicName;
@[ end if]@
@[else]@
std::string topicName = ns;
topicName.append("@(topic)PubSubTopic");
Wparam.topic.topicName = topicName;
std::string topicName = ns;
topicName.append("@(topic)PubSubTopic");
Wparam.topic.topicName = topicName;
@[end if]@
mp_publisher = Domain::createPublisher(mp_participant, Wparam, static_cast<PublisherListener*>(&m_listener));
if(mp_publisher == nullptr)
return false;
return true;
mp_publisher = Domain::createPublisher(mp_participant, Wparam, static_cast<PublisherListener *>(&m_listener));
if (mp_publisher == nullptr) {
return false;
}
return true;
}
void @(topic)_Publisher::PubListener::onPublicationMatched(Publisher* pub, MatchingInfo& info)
void @(topic)_Publisher::PubListener::onPublicationMatched(Publisher *pub, MatchingInfo &info)
{
// The first 6 values of the ID guidPrefix of an entity in a DDS-RTPS Domain
// are the same for all its subcomponents (publishers, subscribers)
bool is_different_endpoint = false;
for (size_t i = 0; i < 6; i++) {
if (pub->getGuid().guidPrefix.value[i] != info.remoteEndpointGuid.guidPrefix.value[i]) {
is_different_endpoint = true;
break;
}
}
// The first 6 values of the ID guidPrefix of an entity in a DDS-RTPS Domain
// are the same for all its subcomponents (publishers, subscribers)
bool is_different_endpoint = false;
// If the matching happens for the same entity, do not make a match
if (is_different_endpoint) {
if (info.status == MATCHED_MATCHING) {
n_matched++;
std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) publisher matched\033[0m" << std::endl;
} else {
n_matched--;
std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) publisher unmatched\033[0m" << std::endl;
}
}
for (size_t i = 0; i < 6; i++) {
if (pub->getGuid().guidPrefix.value[i] != info.remoteEndpointGuid.guidPrefix.value[i]) {
is_different_endpoint = true;
break;
}
}
// If the matching happens for the same entity, do not make a match
if (is_different_endpoint) {
if (info.status == MATCHED_MATCHING) {
n_matched++;
std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) publisher matched\033[0m" << std::endl;
} else {
n_matched--;
std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) publisher unmatched\033[0m" << std::endl;
}
}
}
void @(topic)_Publisher::publish(@(topic)_msg_t* st)
void @(topic)_Publisher::publish(@(topic)_msg_t *st)
{
mp_publisher->write(st);
mp_publisher->write(st);
}
+17 -17
View File
@@ -25,7 +25,7 @@ except AttributeError:
/****************************************************************************
*
* Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima).
* Copyright (c) 2018-2019 PX4 Development Team. All rights reserved.
* Copyright (c) 2018-2021 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
@@ -99,24 +99,24 @@ using @(topic)_msg_datatype = @(topic)PubSubType;
class @(topic)_Publisher
{
public:
@(topic)_Publisher();
virtual ~@(topic)_Publisher();
bool init(const std::string& ns);
void run();
void publish(@(topic)_msg_t* st);
@(topic)_Publisher();
virtual ~@(topic)_Publisher();
bool init(const std::string &ns);
void run();
void publish(@(topic)_msg_t *st);
private:
Participant *mp_participant;
Publisher *mp_publisher;
Participant *mp_participant;
Publisher *mp_publisher;
class PubListener : public PublisherListener
{
public:
PubListener() : n_matched(0){};
~PubListener(){};
void onPublicationMatched(Publisher* pub, MatchingInfo& info);
int n_matched;
} m_listener;
@(topic)_msg_datatype @(topic)DataType;
class PubListener : public PublisherListener
{
public:
PubListener() : n_matched(0) {};
~PubListener() {};
void onPublicationMatched(Publisher *pub, MatchingInfo &info);
int n_matched;
} m_listener;
@(topic)_msg_datatype @(topic)DataType;
};
#endif // _@(topic)__PUBLISHER_H_
+95 -81
View File
@@ -24,7 +24,7 @@ package = package[0]
/****************************************************************************
*
* Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima).
* Copyright (c) 2018-2019 PX4 Development Team. All rights reserved.
* Copyright (c) 2018-2021 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
@@ -56,113 +56,127 @@ package = package[0]
#include "RtpsTopics.h"
bool RtpsTopics::init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue<uint8_t>* t_send_queue, const std::string& ns)
bool RtpsTopics::init(std::condition_variable *t_send_queue_cv, std::mutex *t_send_queue_mutex,
std::queue<uint8_t> *t_send_queue, const std::string &ns)
{
@[if recv_topics]@
// Initialise subscribers
std::cout << "\033[0;36m--- Subscribers ---\033[0m" << std::endl;
// Initialise subscribers
std::cout << "\033[0;36m--- Subscribers ---\033[0m" << std::endl;
@[for topic in recv_topics]@
if (_@(topic)_sub.init(@(rtps_message_id(ids, topic)), t_send_queue_cv, t_send_queue_mutex, t_send_queue, ns)) {
std::cout << "- @(topic) subscriber started" << std::endl;
} else {
std::cerr << "Failed starting @(topic) subscriber" << std::endl;
return false;
}
if (_@(topic)_sub.init(@(rtps_message_id(ids, topic)), t_send_queue_cv, t_send_queue_mutex, t_send_queue, ns)) {
std::cout << "- @(topic) subscriber started" << std::endl;
} else {
std::cerr << "Failed starting @(topic) subscriber" << std::endl;
return false;
}
@[end for]@
std::cout << "\033[0;36m-----------------------\033[0m" << std::endl << std::endl;
std::cout << "\033[0;36m-----------------------\033[0m" << std::endl << std::endl;
@[end if]@
@[if send_topics]@
// Initialise publishers
std::cout << "\033[0;36m---- Publishers ----\033[0m" << std::endl;
// Initialise publishers
std::cout << "\033[0;36m---- Publishers ----\033[0m" << std::endl;
@[for topic in send_topics]@
if (_@(topic)_pub.init(ns)) {
std::cout << "- @(topic) publisher started" << std::endl;
if (_@(topic)_pub.init(ns)) {
std::cout << "- @(topic) publisher started" << std::endl;
@[ if topic == 'Timesync' or topic == 'timesync']@
_timesync->start(&_@(topic)_pub);
_timesync->start(&_@(topic)_pub);
@[ end if]@
} else {
std::cerr << "ERROR starting @(topic) publisher" << std::endl;
return false;
}
} else {
std::cerr << "ERROR starting @(topic) publisher" << std::endl;
return false;
}
@[end for]@
std::cout << "\033[0;36m-----------------------\033[0m" << std::endl;
std::cout << "\033[0;36m-----------------------\033[0m" << std::endl;
@[end if]@
return true;
return true;
}
@[if send_topics]@
void RtpsTopics::publish(uint8_t topic_ID, char data_buffer[], size_t len)
{
switch (topic_ID)
{
switch (topic_ID) {
@[for topic in send_topics]@
case @(rtps_message_id(ids, topic)): // @(topic)
{
@(topic)_msg_t st;
eprosima::fastcdr::FastBuffer cdrbuffer(data_buffer, len);
eprosima::fastcdr::Cdr cdr_des(cdrbuffer);
st.deserialize(cdr_des);
@[ if topic == 'Timesync' or topic == 'timesync']@
_timesync->processTimesyncMsg(&st);
if (getMsgSysID(&st) == 1) {
@[ end if]@
// apply timestamp offset
uint64_t timestamp = getMsgTimestamp(&st);
uint64_t timestamp_sample = getMsgTimestampSample(&st);
_timesync->subtractOffset(timestamp);
setMsgTimestamp(&st, timestamp);
_timesync->subtractOffset(timestamp_sample);
setMsgTimestampSample(&st, timestamp_sample);
_@(topic)_pub.publish(&st);
case @(rtps_message_id(ids, topic)): { // @(topic)
@(topic)_msg_t st;
eprosima::fastcdr::FastBuffer cdrbuffer(data_buffer, len);
eprosima::fastcdr::Cdr cdr_des(cdrbuffer);
st.deserialize(cdr_des);
@[ if topic == 'Timesync' or topic == 'timesync']@
}
_timesync->processTimesyncMsg(&st);
if (getMsgSysID(&st) == 1) {
@[ end if]@
}
break;
// apply timestamp offset
uint64_t timestamp = getMsgTimestamp(&st);
uint64_t timestamp_sample = getMsgTimestampSample(&st);
_timesync->subtractOffset(timestamp);
setMsgTimestamp(&st, timestamp);
_timesync->subtractOffset(timestamp_sample);
setMsgTimestampSample(&st, timestamp_sample);
_@(topic)_pub.publish(&st);
@[ if topic == 'Timesync' or topic == 'timesync']@
}
@[ end if]@
}
break;
@[end for]@
default:
printf("\033[1;33m[ micrortps_agent ]\tUnexpected topic ID '%hhu' to publish Please make sure the agent is capable of parsing the message associated to the topic ID '%hhu'\033[0m\n", topic_ID, topic_ID);
break;
}
default:
printf("\033[1;33m[ micrortps_agent ]\tUnexpected topic ID '%hhu' to publish Please make sure the agent is capable of parsing the message associated to the topic ID '%hhu'\033[0m\n",
topic_ID, topic_ID);
break;
}
}
@[end if]@
@[if recv_topics]@
bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr)
{
bool ret = false;
switch (topic_ID)
{
@[for topic in recv_topics]@
case @(rtps_message_id(ids, topic)): // @(topic)
if (_@(topic)_sub.hasMsg())
{
@(topic)_msg_t msg = _@(topic)_sub.getMsg();
@[ if topic == 'Timesync' or topic == 'timesync']@
if (getMsgSysID(&msg) == 0) {
@[ end if]@
// apply timestamps offset
uint64_t timestamp = getMsgTimestamp(&msg);
uint64_t timestamp_sample = getMsgTimestampSample(&msg);
_timesync->addOffset(timestamp);
setMsgTimestamp(&msg, timestamp);
_timesync->addOffset(timestamp_sample);
setMsgTimestampSample(&msg, timestamp_sample);
msg.serialize(scdr);
ret = true;
@[ if topic == 'Timesync' or topic == 'timesync']@
}
@[ end if]@
_@(topic)_sub.unlockMsg();
}
break;
@[end for]@
default:
printf("\033[1;33m[ micrortps_agent ]\tUnexpected topic ID '%hhu' to getMsg. Please make sure the agent is capable of parsing the message associated to the topic ID '%hhu'\033[0m\n", topic_ID, topic_ID);
break;
}
bool ret = false;
return ret;
switch (topic_ID) {
@[for topic in recv_topics]@
case @(rtps_message_id(ids, topic)): // @(topic)
if (_@(topic)_sub.hasMsg()) {
@(topic)_msg_t msg = _@(topic)_sub.getMsg();
@[ if topic == 'Timesync' or topic == 'timesync']@
if (getMsgSysID(&msg) == 0) {
@[ end if]@
// apply timestamps offset
uint64_t timestamp = getMsgTimestamp(&msg);
uint64_t timestamp_sample = getMsgTimestampSample(&msg);
_timesync->addOffset(timestamp);
setMsgTimestamp(&msg, timestamp);
_timesync->addOffset(timestamp_sample);
setMsgTimestampSample(&msg, timestamp_sample);
msg.serialize(scdr);
ret = true;
@[ if topic == 'Timesync' or topic == 'timesync']@
}
@[ end if]@
_@(topic)_sub.unlockMsg();
}
break;
@[end for]@
default:
printf("\033[1;33m[ micrortps_agent ]\tUnexpected topic ID '%hhu' to getMsg. Please make sure the agent is capable of parsing the message associated to the topic ID '%hhu'\033[0m\n",
topic_ID, topic_ID);
break;
}
return ret;
}
@[end if]@
+77 -75
View File
@@ -30,7 +30,7 @@ except AttributeError:
/****************************************************************************
*
* Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima).
* Copyright (c) 2018-2019 PX4 Development Team. All rights reserved.
* Copyright (c) 2018-2021 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
@@ -91,120 +91,122 @@ using @(topic)_msg_t = @(topic);
@[ end if]@
@[end for]@
class RtpsTopics {
class RtpsTopics
{
public:
bool init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue<uint8_t>* t_send_queue, const std::string& ns);
void set_timesync(const std::shared_ptr<TimeSync>& timesync) { _timesync = timesync; };
bool init(std::condition_variable *t_send_queue_cv, std::mutex *t_send_queue_mutex, std::queue<uint8_t> *t_send_queue,
const std::string &ns);
void set_timesync(const std::shared_ptr<TimeSync> &timesync) { _timesync = timesync; };
@[if send_topics]@
void publish(uint8_t topic_ID, char data_buffer[], size_t len);
void publish(uint8_t topic_ID, char data_buffer[], size_t len);
@[end if]@
@[if recv_topics]@
bool getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr);
bool getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr);
@[end if]@
private:
@[if send_topics]@
/** Publishers **/
/** Publishers **/
@[for topic in send_topics]@
@(topic)_Publisher _@(topic)_pub;
@(topic)_Publisher _@(topic)_pub;
@[end for]@
@[end if]@
@[if recv_topics]@
/** Subscribers **/
/** Subscribers **/
@[for topic in recv_topics]@
@(topic)_Subscriber _@(topic)_sub;
@(topic)_Subscriber _@(topic)_sub;
@[end for]@
@[end if]@
// SFINAE
template<typename T> struct hasTimestampSample{
private:
template<typename U,
typename = decltype(std::declval<U>().timestamp_sample(int64_t()))>
static std::true_type detect(int);
template<typename U>
static std::false_type detect(...);
public:
static constexpr bool value = decltype(detect<T>(0))::value;
};
// SFINAE
template<typename T> struct hasTimestampSample{
private:
template<typename U,
typename = decltype(std::declval<U>().timestamp_sample(int64_t()))>
static std::true_type detect(int);
template<typename U>
static std::false_type detect(...);
public:
static constexpr bool value = decltype(detect<T>(0))::value;
};
template<typename T>
inline typename std::enable_if<!hasTimestampSample<T>::value, uint64_t>::type
getMsgTimestampSample_impl(const T*) { return 0; }
template<typename T>
inline typename std::enable_if < !hasTimestampSample<T>::value, uint64_t >::type
getMsgTimestampSample_impl(const T *) { return 0; }
/** Msg metada Getters **/
/** Msg metada Getters **/
@[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@
template <class T>
inline uint64_t getMsgTimestamp(const T* msg) { return msg->timestamp_(); }
template <class T>
inline uint64_t getMsgTimestamp(const T *msg) { return msg->timestamp_(); }
template<typename T>
inline typename std::enable_if<hasTimestampSample<T>::value, uint64_t>::type
getMsgTimestampSample_impl(const T* msg) { return msg->timestamp_sample_(); }
template<typename T>
inline typename std::enable_if<hasTimestampSample<T>::value, uint64_t>::type
getMsgTimestampSample_impl(const T *msg) { return msg->timestamp_sample_(); }
template <class T>
inline uint8_t getMsgSysID(const T* msg) { return msg->sys_id_(); }
template <class T>
inline uint8_t getMsgSysID(const T *msg) { return msg->sys_id_(); }
template <class T>
inline uint8_t getMsgSeq(const T* msg) { return msg->seq_(); }
template <class T>
inline uint8_t getMsgSeq(const T *msg) { return msg->seq_(); }
@[elif ros2_distro]@
template <class T>
inline uint64_t getMsgTimestamp(const T* msg) { return msg->timestamp(); }
template <class T>
inline uint64_t getMsgTimestamp(const T *msg) { return msg->timestamp(); }
template<typename T>
inline typename std::enable_if<hasTimestampSample<T>::value, uint64_t>::type
getMsgTimestampSample_impl(const T* msg) { return msg->timestamp_sample(); }
template<typename T>
inline typename std::enable_if<hasTimestampSample<T>::value, uint64_t>::type
getMsgTimestampSample_impl(const T *msg) { return msg->timestamp_sample(); }
template <class T>
inline uint8_t getMsgSysID(const T* msg) { return msg->sys_id(); }
template <class T>
inline uint8_t getMsgSysID(const T *msg) { return msg->sys_id(); }
template <class T>
inline uint8_t getMsgSeq(const T* msg) { return msg->seq(); }
template <class T>
inline uint8_t getMsgSeq(const T *msg) { return msg->seq(); }
@[end if]@
template <class T>
inline uint64_t getMsgTimestampSample(const T* msg) { return getMsgTimestampSample_impl(msg); }
template <class T>
inline uint64_t getMsgTimestampSample(const T *msg) { return getMsgTimestampSample_impl(msg); }
template<typename T>
inline typename std::enable_if<!hasTimestampSample<T>::value, void>::type
setMsgTimestampSample_impl(T*, const uint64_t&) {}
template<typename T>
inline typename std::enable_if <!hasTimestampSample<T>::value, void>::type
setMsgTimestampSample_impl(T *, const uint64_t &) {}
/** Msg metadata Setters **/
/** Msg metadata Setters **/
@[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@
template <class T>
inline void setMsgTimestamp(T* msg, const uint64_t& timestamp) { msg->timestamp_() = timestamp; }
template <class T>
inline void setMsgTimestamp(T *msg, const uint64_t &timestamp) { msg->timestamp_() = timestamp; }
template <class T>
inline typename std::enable_if<hasTimestampSample<T>::value, void>::type
setMsgTimestampSample_impl(T* msg, const uint64_t& timestamp_sample) { msg->timestamp_sample_() = timestamp_sample; }
template <class T>
inline typename std::enable_if<hasTimestampSample<T>::value, void>::type
setMsgTimestampSample_impl(T *msg, const uint64_t &timestamp_sample) { msg->timestamp_sample_() = timestamp_sample; }
template <class T>
inline void setMsgSysID(T* msg, const uint8_t& sys_id) { msg->sys_id_() = sys_id; }
template <class T>
inline void setMsgSysID(T *msg, const uint8_t &sys_id) { msg->sys_id_() = sys_id; }
template <class T>
inline void setMsgSeq(T* msg, const uint8_t& seq) { msg->seq_() = seq; }
template <class T>
inline void setMsgSeq(T *msg, const uint8_t &seq) { msg->seq_() = seq; }
@[elif ros2_distro]@
template <class T>
inline void setMsgTimestamp(T* msg, const uint64_t& timestamp) { msg->timestamp() = timestamp; }
template <class T>
inline void setMsgTimestamp(T *msg, const uint64_t &timestamp) { msg->timestamp() = timestamp; }
template <class T>
inline typename std::enable_if<hasTimestampSample<T>::value, void>::type
setMsgTimestampSample_impl(T* msg, const uint64_t& timestamp_sample) { msg->timestamp_sample() = timestamp_sample; }
template <class T>
inline typename std::enable_if<hasTimestampSample<T>::value, void>::type
setMsgTimestampSample_impl(T *msg, const uint64_t &timestamp_sample) { msg->timestamp_sample() = timestamp_sample; }
template <class T>
inline void setMsgSysID(T* msg, const uint8_t& sys_id) { msg->sys_id() = sys_id; }
template <class T>
inline void setMsgSysID(T *msg, const uint8_t &sys_id) { msg->sys_id() = sys_id; }
template <class T>
inline void setMsgSeq(T* msg, const uint8_t& seq) { msg->seq() = seq; }
template <class T>
inline void setMsgSeq(T *msg, const uint8_t &seq) { msg->seq() = seq; }
@[end if]@
template <class T>
inline void setMsgTimestampSample(T* msg, const uint64_t& timestamp_sample) { setMsgTimestampSample_impl(msg, timestamp_sample); }
template <class T>
inline void setMsgTimestampSample(T *msg, const uint64_t &timestamp_sample) { setMsgTimestampSample_impl(msg, timestamp_sample); }
/**
* @@brief Timesync object ptr.
* This object is used to compuyte and apply the time offsets to the
* messages timestamps.
*/
std::shared_ptr<TimeSync> _timesync;
/**
* @@brief Timesync object ptr.
* This object is used to compuyte and apply the time offsets to the
* messages timestamps.
*/
std::shared_ptr<TimeSync> _timesync;
};
+122 -112
View File
@@ -73,172 +73,182 @@ except AttributeError:
#include "@(topic)_Subscriber.h"
@(topic)_Subscriber::@(topic)_Subscriber()
: mp_participant(nullptr),
mp_subscriber(nullptr)
: mp_participant(nullptr),
mp_subscriber(nullptr)
{ }
@(topic)_Subscriber::~@(topic)_Subscriber()
{
Domain::removeParticipant(mp_participant);
Domain::removeParticipant(mp_participant);
}
bool @(topic)_Subscriber::init(uint8_t topic_ID, std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue<uint8_t>* t_send_queue, const std::string& ns)
bool @(topic)_Subscriber::init(uint8_t topic_ID, std::condition_variable *t_send_queue_cv,
std::mutex *t_send_queue_mutex, std::queue<uint8_t> *t_send_queue, const std::string &ns)
{
m_listener.topic_ID = topic_ID;
m_listener.t_send_queue_cv = t_send_queue_cv;
m_listener.t_send_queue_mutex = t_send_queue_mutex;
m_listener.t_send_queue = t_send_queue;
m_listener.topic_ID = topic_ID;
m_listener.t_send_queue_cv = t_send_queue_cv;
m_listener.t_send_queue_mutex = t_send_queue_mutex;
m_listener.t_send_queue = t_send_queue;
// Create RTPSParticipant
ParticipantAttributes PParam;
// Create RTPSParticipant
ParticipantAttributes PParam;
@[if version.parse(fastrtps_version) < version.parse('2.0')]@
PParam.rtps.builtin.domainId = 0;
PParam.rtps.builtin.domainId = 0;
@[else]@
PParam.domainId = 0;
PParam.domainId = 0;
@[end if]@
@[if version.parse(fastrtps_version) <= version.parse('1.8.4')]@
PParam.rtps.builtin.leaseDuration = c_TimeInfinite;
PParam.rtps.builtin.leaseDuration = c_TimeInfinite;
@[else]@
PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite;
PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite;
@[end if]@
std::string nodeName = ns;
nodeName.append("@(topic)_subscriber");
PParam.rtps.setName(nodeName.c_str());
std::string nodeName = ns;
nodeName.append("@(topic)_subscriber");
PParam.rtps.setName(nodeName.c_str());
// Check if ROS_LOCALHOST_ONLY is set. This means that one wants to use
// only the localhost network for data sharing
const char* localhost_only = std::getenv("ROS_LOCALHOST_ONLY");
if (localhost_only && strcmp(localhost_only, "1") == 0) {
// Create a custom network transport descriptor to whitelist the localhost
auto localhostDescriptor = std::make_shared<UDPv4TransportDescriptor>();
localhostDescriptor->interfaceWhiteList.emplace_back("127.0.0.1");
// Check if ROS_LOCALHOST_ONLY is set. This means that one wants to use
// only the localhost network for data sharing
const char *localhost_only = std::getenv("ROS_LOCALHOST_ONLY");
// Disable the built-in Transport Layer
PParam.rtps.useBuiltinTransports = false;
if (localhost_only && strcmp(localhost_only, "1") == 0) {
// Create a custom network transport descriptor to whitelist the localhost
auto localhostDescriptor = std::make_shared<UDPv4TransportDescriptor>();
localhostDescriptor->interfaceWhiteList.emplace_back("127.0.0.1");
// Add the descriptor as a custom user transport
PParam.rtps.userTransports.push_back(localhostDescriptor);
}
// Disable the built-in Transport Layer
PParam.rtps.useBuiltinTransports = false;
mp_participant = Domain::createParticipant(PParam);
if(mp_participant == nullptr)
return false;
// Add the descriptor as a custom user transport
PParam.rtps.userTransports.push_back(localhostDescriptor);
}
//Register the type
Domain::registerType(mp_participant, static_cast<TopicDataType*>(&@(topic)DataType));
mp_participant = Domain::createParticipant(PParam);
// Create Subscriber
SubscriberAttributes Rparam;
Rparam.topic.topicKind = NO_KEY;
Rparam.topic.topicDataType = @(topic)DataType.getName();
if (mp_participant == nullptr) {
return false;
}
// Register the type
Domain::registerType(mp_participant, static_cast<TopicDataType *>(&@(topic)DataType));
// Create Subscriber
SubscriberAttributes Rparam;
Rparam.topic.topicKind = NO_KEY;
Rparam.topic.topicDataType = @(topic)DataType.getName();
@[if ros2_distro]@
@[ if ros2_distro == "ardent"]@
Rparam.qos.m_partition.push_back("rt");
std::string topicName = ns;
topicName.append("@(topic)_PubSubTopic");
Rparam.topic.topicName = topicName;
Rparam.qos.m_partition.push_back("rt");
std::string topicName = ns;
topicName.append("@(topic)_PubSubTopic");
Rparam.topic.topicName = topicName;
@[ else]@
std::string topicName = "rt/";
topicName.append(ns);
topicName.append("@(topic)_PubSubTopic");
Rparam.topic.topicName = topicName;
std::string topicName = "rt/";
topicName.append(ns);
topicName.append("@(topic)_PubSubTopic");
Rparam.topic.topicName = topicName;
@[ end if]@
@[else]@
std::string topicName = ns;
topicName.append("@(topic)PubSubTopic");
Rparam.topic.topicName = topicName;
std::string topicName = ns;
topicName.append("@(topic)PubSubTopic");
Rparam.topic.topicName = topicName;
@[end if]@
mp_subscriber = Domain::createSubscriber(mp_participant, Rparam, static_cast<SubscriberListener*>(&m_listener));
if(mp_subscriber == nullptr)
return false;
return true;
mp_subscriber = Domain::createSubscriber(mp_participant, Rparam, static_cast<SubscriberListener *>(&m_listener));
if (mp_subscriber == nullptr) {
return false;
}
return true;
}
void @(topic)_Subscriber::SubListener::onSubscriptionMatched(Subscriber* sub, MatchingInfo& info)
void @(topic)_Subscriber::SubListener::onSubscriptionMatched(Subscriber *sub, MatchingInfo &info)
{
@# Since the time sync runs on the bridge itself, it is required that there is a
@# match between two topics of the same entity
@[if topic != 'Timesync' and topic != 'timesync']@
// The first 6 values of the ID guidPrefix of an entity in a DDS-RTPS Domain
// are the same for all its subcomponents (publishers, subscribers)
bool is_different_endpoint = false;
for (size_t i = 0; i < 6; i++) {
if (sub->getGuid().guidPrefix.value[i] != info.remoteEndpointGuid.guidPrefix.value[i]) {
is_different_endpoint = true;
break;
}
}
// The first 6 values of the ID guidPrefix of an entity in a DDS-RTPS Domain
// are the same for all its subcomponents (publishers, subscribers)
bool is_different_endpoint = false;
for (size_t i = 0; i < 6; i++) {
if (sub->getGuid().guidPrefix.value[i] != info.remoteEndpointGuid.guidPrefix.value[i]) {
is_different_endpoint = true;
break;
}
}
// If the matching happens for the same entity, do not make a match
if (is_different_endpoint) {
if (info.status == MATCHED_MATCHING) {
n_matched++;
std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) subscriber matched\033[0m" << std::endl;
} else {
n_matched--;
std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) subscriber unmatched\033[0m" << std::endl;
}
}
// If the matching happens for the same entity, do not make a match
if (is_different_endpoint) {
if (info.status == MATCHED_MATCHING) {
n_matched++;
std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) subscriber matched\033[0m" << std::endl;
} else {
n_matched--;
std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) subscriber unmatched\033[0m" << std::endl;
}
}
@[else]@
(void)sub;
(void)sub;
if (info.status == MATCHED_MATCHING) {
n_matched++;
} else {
n_matched--;
}
if (info.status == MATCHED_MATCHING) {
n_matched++;
} else {
n_matched--;
}
@[end if]@
}
void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber* sub)
void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber *sub)
{
if (n_matched > 0) {
std::unique_lock<std::mutex> has_msg_lock(has_msg_mutex);
if(has_msg.load() == true) // Check if msg has been fetched
{
has_msg_cv.wait(has_msg_lock); // Wait till msg has been fetched
}
has_msg_lock.unlock();
if (n_matched > 0) {
std::unique_lock<std::mutex> has_msg_lock(has_msg_mutex);
// Take data
if(sub->takeNextData(&msg, &m_info))
{
if(m_info.sampleKind == ALIVE)
{
std::unique_lock<std::mutex> lk(*t_send_queue_mutex);
if (has_msg.load() == true) { // Check if msg has been fetched
has_msg_cv.wait(has_msg_lock); // Wait till msg has been fetched
}
++n_msg;
has_msg = true;
has_msg_lock.unlock();
t_send_queue->push(topic_ID);
lk.unlock();
t_send_queue_cv->notify_one();
// Take data
if (sub->takeNextData(&msg, &m_info)) {
if (m_info.sampleKind == ALIVE) {
std::unique_lock<std::mutex> lk(*t_send_queue_mutex);
}
}
}
++n_msg;
has_msg = true;
t_send_queue->push(topic_ID);
lk.unlock();
t_send_queue_cv->notify_one();
}
}
}
}
bool @(topic)_Subscriber::hasMsg()
{
if (m_listener.n_matched > 0) {
return m_listener.has_msg.load();
}
if (m_listener.n_matched > 0) {
return m_listener.has_msg.load();
}
return false;
return false;
}
@(topic)_msg_t @(topic)_Subscriber::getMsg()
{
return m_listener.msg;
return m_listener.msg;
}
void @(topic)_Subscriber::unlockMsg()
{
if (m_listener.n_matched > 0) {
std::unique_lock<std::mutex> has_msg_lock(m_listener.has_msg_mutex);
m_listener.has_msg = false;
has_msg_lock.unlock();
m_listener.has_msg_cv.notify_one();
}
if (m_listener.n_matched > 0) {
std::unique_lock<std::mutex> has_msg_lock(m_listener.has_msg_mutex);
m_listener.has_msg = false;
has_msg_lock.unlock();
m_listener.has_msg_cv.notify_one();
}
}
+31 -30
View File
@@ -25,7 +25,7 @@ except AttributeError:
/****************************************************************************
*
* Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima).
* Copyright (c) 2018-2019 PX4 Development Team. All rights reserved.
* Copyright (c) 2018-2021 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
@@ -103,39 +103,40 @@ using @(topic)_msg_datatype = @(topic)PubSubType;
class @(topic)_Subscriber
{
public:
@(topic)_Subscriber();
virtual ~@(topic)_Subscriber();
bool init(uint8_t topic_ID, std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue<uint8_t>* t_send_queue, const std::string& ns);
void run();
bool hasMsg();
@(topic)_msg_t getMsg();
void unlockMsg();
@(topic)_Subscriber();
virtual ~@(topic)_Subscriber();
bool init(uint8_t topic_ID, std::condition_variable *t_send_queue_cv, std::mutex *t_send_queue_mutex,
std::queue<uint8_t> *t_send_queue, const std::string &ns);
void run();
bool hasMsg();
@(topic)_msg_t getMsg();
void unlockMsg();
private:
Participant *mp_participant;
Subscriber *mp_subscriber;
Participant *mp_participant;
Subscriber *mp_subscriber;
class SubListener : public SubscriberListener
{
public:
SubListener() : n_matched(0), n_msg(0), has_msg(false){};
~SubListener(){};
void onSubscriptionMatched(Subscriber* sub, MatchingInfo& info);
void onNewDataMessage(Subscriber* sub);
SampleInfo_t m_info;
int n_matched;
int n_msg;
@(topic)_msg_t msg;
std::atomic_bool has_msg;
uint8_t topic_ID;
std::condition_variable* t_send_queue_cv;
std::mutex* t_send_queue_mutex;
std::queue<uint8_t>* t_send_queue;
std::condition_variable has_msg_cv;
std::mutex has_msg_mutex;
class SubListener : public SubscriberListener
{
public:
SubListener() : n_matched(0), n_msg(0), has_msg(false) {};
~SubListener() {};
void onSubscriptionMatched(Subscriber *sub, MatchingInfo &info);
void onNewDataMessage(Subscriber *sub);
SampleInfo_t m_info;
int n_matched;
int n_msg;
@(topic)_msg_t msg;
std::atomic_bool has_msg;
uint8_t topic_ID;
std::condition_variable *t_send_queue_cv;
std::mutex *t_send_queue_mutex;
std::queue<uint8_t> *t_send_queue;
std::condition_variable has_msg_cv;
std::mutex has_msg_mutex;
} m_listener;
@(topic)_msg_datatype @(topic)DataType;
} m_listener;
@(topic)_msg_datatype @(topic)DataType;
};
#endif // _@(topic)__SUBSCRIBER_H_
+188 -181
View File
@@ -91,89 +91,98 @@ RtpsTopics topics;
uint32_t total_sent = 0, sent = 0;
struct options {
enum class eTransports
{
UART,
UDP
};
eTransports transport = options::eTransports::UART;
char device[64] = DEVICE;
int sleep_us = SLEEP_US;
uint32_t baudrate = BAUDRATE;
int poll_ms = POLL_MS;
uint16_t recv_port = DEFAULT_RECV_PORT;
uint16_t send_port = DEFAULT_SEND_PORT;
char ip[16] = DEFAULT_IP;
bool sw_flow_control = false;
bool hw_flow_control = false;
bool verbose_debug = false;
std::string ns = "";
enum class eTransports {
UART,
UDP
};
eTransports transport = options::eTransports::UART;
char device[64] = DEVICE;
int sleep_us = SLEEP_US;
uint32_t baudrate = BAUDRATE;
int poll_ms = POLL_MS;
uint16_t recv_port = DEFAULT_RECV_PORT;
uint16_t send_port = DEFAULT_SEND_PORT;
char ip[16] = DEFAULT_IP;
bool sw_flow_control = false;
bool hw_flow_control = false;
bool verbose_debug = false;
std::string ns = "";
} _options;
static void usage(const char *name)
{
printf("usage: %s [options]\n\n"
" -b <baudrate> UART device baudrate. Default 460800\n"
" -d <device> UART device. Default /dev/ttyACM0\n"
" -f <sw flow control> Activates UART link SW flow control\n"
" -h <hw flow control> Activates UART link HW flow control\n"
" -i <ip_address> Target IP for UDP. Default 127.0.0.1\n"
" -n <namespace> ROS 2 topics namespace. Identifies the vehicle in a multi-agent network\n"
" -p <poll_ms> Time in ms to poll over UART. Default 1ms\n"
" -r <reception port> UDP port for receiving. Default 2020\n"
" -s <sending port> UDP port for sending. Default 2019\n"
" -t <transport> [UART|UDP] Default UART\n"
" -v <debug verbosity> Add more verbosity\n"
" -w <sleep_time_us> Time in us for which each iteration sleep. Default 1ms\n",
name);
printf("usage: %s [options]\n\n"
" -b <baudrate> UART device baudrate. Default 460800\n"
" -d <device> UART device. Default /dev/ttyACM0\n"
" -f <sw flow control> Activates UART link SW flow control\n"
" -h <hw flow control> Activates UART link HW flow control\n"
" -i <ip_address> Target IP for UDP. Default 127.0.0.1\n"
" -n <namespace> ROS 2 topics namespace. Identifies the vehicle in a multi-agent network\n"
" -p <poll_ms> Time in ms to poll over UART. Default 1ms\n"
" -r <reception port> UDP port for receiving. Default 2020\n"
" -s <sending port> UDP port for sending. Default 2019\n"
" -t <transport> [UART|UDP] Default UART\n"
" -v <debug verbosity> Add more verbosity\n"
" -w <sleep_time_us> Time in us for which each iteration sleep. Default 1ms\n",
name);
}
static int parse_options(int argc, char **argv)
{
int ch;
int ch;
while ((ch = getopt(argc, argv, "t:d:w:b:p:r:s:i:fhvn:")) != EOF)
{
switch (ch)
{
case 't': _options.transport = strcmp(optarg, "UDP") == 0?
options::eTransports::UDP
:options::eTransports::UART; break;
case 'd': if (nullptr != optarg) strcpy(_options.device, optarg); break;
case 'w': _options.sleep_us = strtol(optarg, nullptr, 10); break;
case 'b': _options.baudrate = strtoul(optarg, nullptr, 10); break;
case 'p': _options.poll_ms = strtol(optarg, nullptr, 10); break;
case 'r': _options.recv_port = strtoul(optarg, nullptr, 10); break;
case 's': _options.send_port = strtoul(optarg, nullptr, 10); break;
case 'i': if (nullptr != optarg) strcpy(_options.ip, optarg); break;
case 'f': _options.sw_flow_control = true; break;
case 'h': _options.hw_flow_control = true; break;
case 'v': _options.verbose_debug = true; break;
case 'n': if (nullptr != optarg) _options.ns = std::string(optarg) + "/"; break;
default:
usage(argv[0]);
return -1;
}
}
while ((ch = getopt(argc, argv, "t:d:w:b:p:r:s:i:fhvn:")) != EOF) {
switch (ch) {
case 't': _options.transport = strcmp(optarg, "UDP") == 0 ?
options::eTransports::UDP
: options::eTransports::UART; break;
if (_options.poll_ms < 1) {
_options.poll_ms = 1;
printf("\033[1;33m[ micrortps_agent ]\tPoll timeout too low, using 1 ms\033[0m");
}
case 'd': if (nullptr != optarg) strcpy(_options.device, optarg); break;
if (_options.hw_flow_control && _options.sw_flow_control) {
printf("\033[0;31m[ micrortps_agent ]\tHW and SW flow control set. Please set only one or another\033[0m");
return -1;
}
case 'w': _options.sleep_us = strtol(optarg, nullptr, 10); break;
return 0;
case 'b': _options.baudrate = strtoul(optarg, nullptr, 10); break;
case 'p': _options.poll_ms = strtol(optarg, nullptr, 10); break;
case 'r': _options.recv_port = strtoul(optarg, nullptr, 10); break;
case 's': _options.send_port = strtoul(optarg, nullptr, 10); break;
case 'i': if (nullptr != optarg) strcpy(_options.ip, optarg); break;
case 'f': _options.sw_flow_control = true; break;
case 'h': _options.hw_flow_control = true; break;
case 'v': _options.verbose_debug = true; break;
case 'n': if (nullptr != optarg) _options.ns = std::string(optarg) + "/"; break;
default:
usage(argv[0]);
return -1;
}
}
if (_options.poll_ms < 1) {
_options.poll_ms = 1;
printf("\033[1;33m[ micrortps_agent ]\tPoll timeout too low, using 1 ms\033[0m");
}
if (_options.hw_flow_control && _options.sw_flow_control) {
printf("\033[0;31m[ micrortps_agent ]\tHW and SW flow control set. Please set only one or another\033[0m");
return -1;
}
return 0;
}
void signal_handler(int signum)
{
printf("\033[1;33m[ micrortps_agent ]\tInterrupt signal (%d) received.\033[0m\n", signum);
running = 0;
transport_node->close();
printf("\033[1;33m[ micrortps_agent ]\tInterrupt signal (%d) received.\033[0m\n", signum);
running = 0;
transport_node->close();
}
@[if recv_topics]@
@@ -182,152 +191,150 @@ std::condition_variable t_send_queue_cv;
std::mutex t_send_queue_mutex;
std::queue<uint8_t> t_send_queue;
void t_send(void*)
void t_send(void *)
{
char data_buffer[BUFFER_SIZE] = {};
uint32_t length = 0;
char data_buffer[BUFFER_SIZE] = {};
uint32_t length = 0;
while (running && !exit_sender_thread.load())
{
std::unique_lock<std::mutex> lk(t_send_queue_mutex);
while (t_send_queue.empty() && !exit_sender_thread.load())
{
t_send_queue_cv.wait(lk);
}
uint8_t topic_ID = t_send_queue.front();
t_send_queue.pop();
lk.unlock();
while (running && !exit_sender_thread.load()) {
std::unique_lock<std::mutex> lk(t_send_queue_mutex);
size_t header_length = transport_node->get_header_length();
/* make room for the header to fill in later */
eprosima::fastcdr::FastBuffer cdrbuffer(&data_buffer[header_length], sizeof(data_buffer)-header_length);
eprosima::fastcdr::Cdr scdr(cdrbuffer);
while (t_send_queue.empty() && !exit_sender_thread.load()) {
t_send_queue_cv.wait(lk);
}
if (topics.getMsg(topic_ID, scdr))
{
length = scdr.getSerializedDataLength();
if (0 < (length = transport_node->write(topic_ID, data_buffer, length)))
{
total_sent += length;
++sent;
}
}
}
uint8_t topic_ID = t_send_queue.front();
t_send_queue.pop();
lk.unlock();
size_t header_length = transport_node->get_header_length();
/* make room for the header to fill in later */
eprosima::fastcdr::FastBuffer cdrbuffer(&data_buffer[header_length], sizeof(data_buffer) - header_length);
eprosima::fastcdr::Cdr scdr(cdrbuffer);
if (topics.getMsg(topic_ID, scdr)) {
length = scdr.getSerializedDataLength();
if (0 < (length = transport_node->write(topic_ID, data_buffer, length))) {
total_sent += length;
++sent;
}
}
}
}
@[end if]@
int main(int argc, char** argv)
int main(int argc, char **argv)
{
if (-1 == parse_options(argc, argv))
{
printf("\033[1;33m[ micrortps_agent ]\tEXITING...\033[0m\n");
return -1;
}
if (-1 == parse_options(argc, argv)) {
printf("\033[1;33m[ micrortps_agent ]\tEXITING...\033[0m\n");
return -1;
}
// register signal SIGINT and signal handler
signal(SIGINT, signal_handler);
// register signal SIGINT and signal handler
signal(SIGINT, signal_handler);
printf("\033[0;37m--- MicroRTPS Agent ---\033[0m\n");
printf("[ micrortps_agent ]\tStarting link...\n");
printf("\033[0;37m--- MicroRTPS Agent ---\033[0m\n");
printf("[ micrortps_agent ]\tStarting link...\n");
const char* localhost_only = std::getenv("ROS_LOCALHOST_ONLY");
if (localhost_only && strcmp(localhost_only, "1") == 0) {
printf("[ micrortps_agent ]\tUsing only the localhost network for data sharing...\n");
}
const char *localhost_only = std::getenv("ROS_LOCALHOST_ONLY");
switch (_options.transport)
{
case options::eTransports::UART:
{
transport_node = new UART_node(_options.device, _options.baudrate, _options.poll_ms,
_options.sw_flow_control, _options.hw_flow_control, _options.verbose_debug);
printf("[ micrortps_agent ]\tUART transport: device: %s; baudrate: %d; sleep: %dus; poll: %dms; flow_control: %s\n",
_options.device, _options.baudrate, _options.sleep_us, _options.poll_ms,
_options.sw_flow_control ? "SW enabled" : (_options.hw_flow_control ? "HW enabled" : "No"));
}
break;
case options::eTransports::UDP:
{
transport_node = new UDP_node(_options.ip, _options.recv_port, _options.send_port, _options.verbose_debug);
printf("[ micrortps_agent ]\tUDP transport: ip address: %s; recv port: %u; send port: %u; sleep: %dus\n",
_options.ip, _options.recv_port, _options.send_port, _options.sleep_us);
}
break;
default:
printf("\033[0;37m[ micrortps_agent ]\tEXITING...\033[0m\n");
return -1;
}
if (localhost_only && strcmp(localhost_only, "1") == 0) {
printf("[ micrortps_agent ]\tUsing only the localhost network for data sharing...\n");
}
if (0 > transport_node->init())
{
printf("\033[0;37m[ micrortps_agent ]\tEXITING...\033[0m\n");
return -1;
}
switch (_options.transport) {
case options::eTransports::UART: {
transport_node = new UART_node(_options.device, _options.baudrate, _options.poll_ms,
_options.sw_flow_control, _options.hw_flow_control, _options.verbose_debug);
printf("[ micrortps_agent ]\tUART transport: device: %s; baudrate: %d; sleep: %dus; poll: %dms; flow_control: %s\n",
_options.device, _options.baudrate, _options.sleep_us, _options.poll_ms,
_options.sw_flow_control ? "SW enabled" : (_options.hw_flow_control ? "HW enabled" : "No"));
}
break;
sleep(1);
case options::eTransports::UDP: {
transport_node = new UDP_node(_options.ip, _options.recv_port, _options.send_port, _options.verbose_debug);
printf("[ micrortps_agent ]\tUDP transport: ip address: %s; recv port: %u; send port: %u; sleep: %dus\n",
_options.ip, _options.recv_port, _options.send_port, _options.sleep_us);
}
break;
default:
printf("\033[0;37m[ micrortps_agent ]\tEXITING...\033[0m\n");
return -1;
}
if (0 > transport_node->init()) {
printf("\033[0;37m[ micrortps_agent ]\tEXITING...\033[0m\n");
return -1;
}
sleep(1);
@[if send_topics]@
char data_buffer[BUFFER_SIZE] = {};
int received = 0, loop = 0;
int length = 0, total_read = 0;
bool receiving = false;
uint8_t topic_ID = 255;
std::chrono::time_point<std::chrono::steady_clock> start, end;
char data_buffer[BUFFER_SIZE] = {};
int received = 0, loop = 0;
int length = 0, total_read = 0;
bool receiving = false;
uint8_t topic_ID = 255;
std::chrono::time_point<std::chrono::steady_clock> start, end;
@[end if]@
// Init timesync
std::shared_ptr<TimeSync> timeSync = std::make_shared<TimeSync>(_options.verbose_debug);
// Init timesync
std::shared_ptr<TimeSync> timeSync = std::make_shared<TimeSync>(_options.verbose_debug);
topics.set_timesync(timeSync);
topics.set_timesync(timeSync);
@[if recv_topics]@
topics.init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue, _options.ns);
topics.init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue, _options.ns);
@[end if]@
running = true;
running = true;
@[if recv_topics]@
std::thread sender_thread(t_send, nullptr);
std::thread sender_thread(t_send, nullptr);
@[end if]@
while (running)
{
while (running) {
@[if send_topics]@
++loop;
if (!receiving) start = std::chrono::steady_clock::now();
// Publish messages received from UART
while (0 < (length = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE)))
{
topics.publish(topic_ID, data_buffer, sizeof(data_buffer));
++received;
total_read += length;
receiving = true;
end = std::chrono::steady_clock::now();
}
++loop;
if (!receiving) { start = std::chrono::steady_clock::now(); }
// Publish messages received from UART
while (0 < (length = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) {
topics.publish(topic_ID, data_buffer, sizeof(data_buffer));
++received;
total_read += length;
receiving = true;
end = std::chrono::steady_clock::now();
}
if ((receiving && std::chrono::duration<double>(std::chrono::steady_clock::now() - end).count() > WAIT_CNST) ||
(!running && loop > 1)) {
std::chrono::duration<double> elapsed_secs = end - start;
printf("[ micrortps_agent ]\tSENT: %lumessages \t- %lubytes\n", (unsigned long)sent, (unsigned long)total_sent);
printf("[ micrortps_agent ]\tRECEIVED: %dmessages \t- %dbytes; %d LOOPS - %.03f seconds - %.02fKB/s\n",
received, total_read, loop, elapsed_secs.count(), (double)total_read / (1000 * elapsed_secs.count()));
received = sent = total_read = total_sent = 0;
receiving = false;
}
if ((receiving && std::chrono::duration<double>(std::chrono::steady_clock::now() - end).count() > WAIT_CNST) ||
(!running && loop > 1))
{
std::chrono::duration<double> elapsed_secs = end - start;
printf("[ micrortps_agent ]\tSENT: %lumessages \t- %lubytes\n", (unsigned long)sent, (unsigned long)total_sent);
printf("[ micrortps_agent ]\tRECEIVED: %dmessages \t- %dbytes; %d LOOPS - %.03f seconds - %.02fKB/s\n",
received, total_read, loop, elapsed_secs.count(), (double)total_read/(1000*elapsed_secs.count()));
received = sent = total_read = total_sent = 0;
receiving = false;
}
@[else]@
usleep(_options.sleep_us);
usleep(_options.sleep_us);
@[end if]@
}
}
@[if recv_topics]@
exit_sender_thread = true;
t_send_queue_cv.notify_one();
sender_thread.join();
exit_sender_thread = true;
t_send_queue_cv.notify_one();
sender_thread.join();
@[end if]@
delete transport_node;
transport_node = nullptr;
delete transport_node;
transport_node = nullptr;
timeSync->stop();
timeSync->reset();
timeSync->stop();
timeSync->reset();
return 0;
return 0;
}
@@ -1,7 +1,7 @@
################################################################################
#
# Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima).
# Copyright (c) 2018-2019 PX4 Development Team. All rights reserved.
# Copyright (c) 2018-2021 PX4 Development Team. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
@@ -42,12 +42,12 @@ find_package(fastcdr REQUIRED)
include(CheckCXXCompilerFlag)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_COMPILER_IS_CLANG OR
CMAKE_CXX_COMPILER_ID MATCHES "Clang")
check_cxx_compiler_flag(--std=c++14 SUPPORTS_CXX14)
if(SUPPORTS_CXX14)
add_compile_options(--std=c++14)
else()
message(FATAL_ERROR "Compiler doesn't support C++14")
endif()
check_cxx_compiler_flag(--std=c++14 SUPPORTS_CXX14)
if(SUPPORTS_CXX14)
add_compile_options(--std=c++14)
else()
message(FATAL_ERROR "Compiler doesn't support C++14")
endif()
endif()
file(GLOB MICRORTPS_AGENT_SOURCES src/*.cpp src/*.h)
+42 -27
View File
@@ -25,7 +25,7 @@ except AttributeError:
}@
/****************************************************************************
*
* Copyright (c) 2020 PX4 Development Team. All rights reserved.
* Copyright (c) 2020-2021 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
@@ -67,18 +67,19 @@ except AttributeError:
#include "microRTPS_timesync.h"
TimeSync::TimeSync(bool debug)
: _offset_ns(-1),
_skew_ns_per_sync(0.0),
_num_samples(0),
_request_reset_counter(0),
_last_msg_seq(0),
_last_remote_msg_seq(0),
_debug(debug)
: _offset_ns(-1),
_skew_ns_per_sync(0.0),
_num_samples(0),
_request_reset_counter(0),
_last_msg_seq(0),
_last_remote_msg_seq(0),
_debug(debug)
{ }
TimeSync::~TimeSync() { stop(); }
void TimeSync::start(const TimesyncPublisher* pub) {
void TimeSync::start(const TimesyncPublisher *pub)
{
stop();
_timesync_pub = (*pub);
@@ -96,28 +97,35 @@ void TimeSync::start(const TimesyncPublisher* pub) {
_send_timesync_thread.reset(new std::thread(run));
}
void TimeSync::stop() {
void TimeSync::stop()
{
_request_stop = true;
if (_send_timesync_thread && _send_timesync_thread->joinable()) _send_timesync_thread->join();
if (_send_timesync_thread && _send_timesync_thread->joinable()) { _send_timesync_thread->join(); }
_send_timesync_thread.reset();
}
void TimeSync::reset() {
void TimeSync::reset()
{
_num_samples = 0;
_request_reset_counter = 0;
}
int64_t TimeSync::getTimeNSec() {
int64_t TimeSync::getTimeNSec()
{
auto time = std::chrono::steady_clock::now();
return std::chrono::time_point_cast<std::chrono::nanoseconds>(time).time_since_epoch().count();
}
int64_t TimeSync::getTimeUSec() {
int64_t TimeSync::getTimeUSec()
{
auto time = std::chrono::steady_clock::now();
return std::chrono::time_point_cast<std::chrono::microseconds>(time).time_since_epoch().count();
}
bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t local_t3_ns) {
bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t local_t3_ns)
{
int64_t rtti = local_t3_ns - local_t1_ns;
// assume rtti is evenly split both directions
@@ -127,19 +135,23 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t
if (_request_reset_counter > REQUEST_RESET_COUNTER_THRESHOLD) {
reset();
if (_debug) std::cout << "\033[1;33m[ micrortps__timesync ]\tTimesync clock changed, resetting\033[0m" << std::endl;
if (_debug) { std::cout << "\033[1;33m[ micrortps__timesync ]\tTimesync clock changed, resetting\033[0m" << std::endl; }
}
if (_num_samples == 0) {
updateOffset(measurement_offset);
_skew_ns_per_sync = 0;
}
if (_num_samples == 0) {
updateOffset(measurement_offset);
_skew_ns_per_sync = 0;
}
if (_num_samples >= WINDOW_SIZE) {
if (std::abs(measurement_offset - _offset_ns.load()) > TRIGGER_RESET_THRESHOLD_NS) {
_request_reset_counter++;
if (_debug) std::cout << "\033[1;33m[ micrortps__timesync ]\tTimesync offset outlier, discarding\033[0m" << std::endl;
if (_debug) { std::cout << "\033[1;33m[ micrortps__timesync ]\tTimesync offset outlier, discarding\033[0m" << std::endl; }
return false;
} else {
_request_reset_counter = 0;
}
@@ -147,7 +159,8 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t
// ignore if rtti > 50ms
if (rtti > 50ll * 1000ll * 1000ll) {
if (_debug) std::cout << "\033[1;33m[ micrortps__timesync ]\tRTTI too high for timesync: " << rtti / (1000ll * 1000ll) << "ms\033[0m" << std::endl;
if (_debug) { std::cout << "\033[1;33m[ micrortps__timesync ]\tRTTI too high for timesync: " << rtti / (1000ll * 1000ll) << "ms\033[0m" << std::endl; }
return false;
}
@@ -165,20 +178,21 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t
updateOffset(static_cast<int64_t>((_skew_ns_per_sync + _offset_ns.load()) * (1. - alpha) +
measurement_offset * alpha));
_skew_ns_per_sync =
static_cast<int64_t>(beta * (_offset_ns.load() - offset_prev) + (1. - beta) * _skew_ns_per_sync);
static_cast<int64_t>(beta * (_offset_ns.load() - offset_prev) + (1. - beta) * _skew_ns_per_sync);
_num_samples++;
return true;
}
void TimeSync::processTimesyncMsg(timesync_msg_t * msg) {
void TimeSync::processTimesyncMsg(timesync_msg_t *msg)
{
if (getMsgSysID(msg) == 1 && getMsgSeq(msg) != _last_remote_msg_seq) {
_last_remote_msg_seq = getMsgSeq(msg);
_last_remote_msg_seq = getMsgSeq(msg);
if (getMsgTC1(msg) > 0) {
if (!addMeasurement(getMsgTS1(msg), getMsgTC1(msg), getTimeNSec())) {
if (_debug) std::cerr << "\033[1;33m[ micrortps__timesync ]\tOffset not updated\033[0m" << std::endl;
if (_debug) { std::cerr << "\033[1;33m[ micrortps__timesync ]\tOffset not updated\033[0m" << std::endl; }
}
} else if (getMsgTC1(msg) == 0) {
@@ -192,7 +206,8 @@ void TimeSync::processTimesyncMsg(timesync_msg_t * msg) {
}
}
timesync_msg_t TimeSync::newTimesyncMsg() {
timesync_msg_t TimeSync::newTimesyncMsg()
{
timesync_msg_t msg{};
setMsgTimestamp(&msg, getTimeUSec());
+29 -28
View File
@@ -26,7 +26,7 @@ except AttributeError:
}@
/****************************************************************************
*
* Copyright (c) 2020 PX4 Development Team. All rights reserved.
* Copyright (c) 2020-2021 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
@@ -107,7 +107,8 @@ using TimesyncPublisher = Timesync_Publisher;
using TimesyncPublisher = timesync_Publisher;
@[end if]@
class TimeSync {
class TimeSync
{
public:
TimeSync(bool debug);
virtual ~TimeSync();
@@ -116,7 +117,7 @@ public:
* @@brief Starts the timesync publishing thread
* @@param[in] pub The timesync publisher entity to use
*/
void start(const TimesyncPublisher* pub);
void start(const TimesyncPublisher *pub);
/**
* @@brief Resets the filter
@@ -153,7 +154,7 @@ public:
* @@brief Processes DDS timesync message
* @@param[in,out] msg The timestamp msg to be processed
*/
void processTimesyncMsg(timesync_msg_t* msg);
void processTimesyncMsg(timesync_msg_t *msg);
/**
* @@brief Creates a new timesync DDS message to be sent from the agent to the client
@@ -171,13 +172,13 @@ public:
* @@brief Sums the time sync offset to the timestamp
* @@param[in,out] timestamp The timestamp to add the offset to
*/
inline void addOffset(uint64_t& timestamp) { timestamp = (timestamp * 1000LL + _offset_ns.load()) / 1000ULL; }
inline void addOffset(uint64_t &timestamp) { timestamp = (timestamp * 1000LL + _offset_ns.load()) / 1000ULL; }
/**
* @@brief Substracts the time sync offset to the timestamp
* @@param[in,out] timestamp The timestamp to subtract the offset of
*/
inline void subtractOffset(uint64_t& timestamp) { timestamp = (timestamp * 1000LL - _offset_ns.load()) / 1000ULL; }
inline void subtractOffset(uint64_t &timestamp) { timestamp = (timestamp * 1000LL - _offset_ns.load()) / 1000ULL; }
private:
std::atomic<int64_t> _offset_ns;
@@ -205,35 +206,35 @@ private:
* @@brief Updates the offset of the time sync filter
* @@param[in] offset The value of the offset to update to
*/
inline void updateOffset(const uint64_t& offset) { _offset_ns.store(offset, std::memory_order_relaxed); }
inline void updateOffset(const uint64_t &offset) { _offset_ns.store(offset, std::memory_order_relaxed); }
/** Timesync msg Getters **/
@[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@
inline uint64_t getMsgTimestamp(const timesync_msg_t* msg) { return msg->timestamp_(); }
inline uint8_t getMsgSysID(const timesync_msg_t* msg) { return msg->sys_id_(); }
inline uint8_t getMsgSeq(const timesync_msg_t* msg) { return msg->seq_(); }
inline int64_t getMsgTC1(const timesync_msg_t* msg) { return msg->tc1_(); }
inline int64_t getMsgTS1(const timesync_msg_t* msg) { return msg->ts1_(); }
inline uint64_t getMsgTimestamp(const timesync_msg_t *msg) { return msg->timestamp_(); }
inline uint8_t getMsgSysID(const timesync_msg_t *msg) { return msg->sys_id_(); }
inline uint8_t getMsgSeq(const timesync_msg_t *msg) { return msg->seq_(); }
inline int64_t getMsgTC1(const timesync_msg_t *msg) { return msg->tc1_(); }
inline int64_t getMsgTS1(const timesync_msg_t *msg) { return msg->ts1_(); }
@[elif ros2_distro]@
inline uint64_t getMsgTimestamp(const timesync_msg_t* msg) { return msg->timestamp(); }
inline uint8_t getMsgSysID(const timesync_msg_t* msg) { return msg->sys_id(); }
inline uint8_t getMsgSeq(const timesync_msg_t* msg) { return msg->seq(); }
inline int64_t getMsgTC1(const timesync_msg_t* msg) { return msg->tc1(); }
inline int64_t getMsgTS1(const timesync_msg_t* msg) { return msg->ts1(); }
@[end if]@
inline uint64_t getMsgTimestamp(const timesync_msg_t *msg) { return msg->timestamp(); }
inline uint8_t getMsgSysID(const timesync_msg_t *msg) { return msg->sys_id(); }
inline uint8_t getMsgSeq(const timesync_msg_t *msg) { return msg->seq(); }
inline int64_t getMsgTC1(const timesync_msg_t *msg) { return msg->tc1(); }
inline int64_t getMsgTS1(const timesync_msg_t *msg) { return msg->ts1(); }
@[end if]@
/** Timesync msg Setters **/
@[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@
inline void setMsgTimestamp(timesync_msg_t* msg, const uint64_t& timestamp) { msg->timestamp_() = timestamp; }
inline void setMsgSysID(timesync_msg_t* msg, const uint8_t& sys_id) { msg->sys_id_() = sys_id; }
inline void setMsgSeq(timesync_msg_t* msg, const uint8_t& seq) { msg->seq_() = seq; }
inline void setMsgTC1(timesync_msg_t* msg, const int64_t& tc1) { msg->tc1_() = tc1; }
inline void setMsgTS1(timesync_msg_t* msg, const int64_t& ts1) { msg->ts1_() = ts1; }
inline void setMsgTimestamp(timesync_msg_t *msg, const uint64_t &timestamp) { msg->timestamp_() = timestamp; }
inline void setMsgSysID(timesync_msg_t *msg, const uint8_t &sys_id) { msg->sys_id_() = sys_id; }
inline void setMsgSeq(timesync_msg_t *msg, const uint8_t &seq) { msg->seq_() = seq; }
inline void setMsgTC1(timesync_msg_t *msg, const int64_t &tc1) { msg->tc1_() = tc1; }
inline void setMsgTS1(timesync_msg_t *msg, const int64_t &ts1) { msg->ts1_() = ts1; }
@[elif ros2_distro]@
inline void setMsgTimestamp(timesync_msg_t* msg, const uint64_t& timestamp) { msg->timestamp() = timestamp; }
inline void setMsgSysID(timesync_msg_t* msg, const uint8_t& sys_id) { msg->sys_id() = sys_id; }
inline void setMsgSeq(timesync_msg_t* msg, const uint8_t& seq) { msg->seq() = seq; }
inline void setMsgTC1(timesync_msg_t* msg, const int64_t& tc1) { msg->tc1() = tc1; }
inline void setMsgTS1(timesync_msg_t* msg, const int64_t& ts1) { msg->ts1() = ts1; }
inline void setMsgTimestamp(timesync_msg_t *msg, const uint64_t &timestamp) { msg->timestamp() = timestamp; }
inline void setMsgSysID(timesync_msg_t *msg, const uint8_t &sys_id) { msg->sys_id() = sys_id; }
inline void setMsgSeq(timesync_msg_t *msg, const uint8_t &seq) { msg->seq() = seq; }
inline void setMsgTC1(timesync_msg_t *msg, const int64_t &tc1) { msg->tc1() = tc1; }
inline void setMsgTS1(timesync_msg_t *msg, const int64_t &ts1) { msg->ts1() = ts1; }
@[end if]@
};
+62 -30
View File
@@ -1,7 +1,7 @@
/****************************************************************************
*
* Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima).
* Copyright (c) 2018-2019 PX4 Development Team. All rights reserved.
* Copyright (c) 2018-2021 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
@@ -80,8 +80,8 @@ uint16_t const crc16_table[256] = {
};
Transport_node::Transport_node(const bool _debug):
rx_buff_pos(0),
debug(_debug)
rx_buff_pos(0),
debug(_debug)
{
}
@@ -120,9 +120,13 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer
if (errsv && EAGAIN != errsv && ETIMEDOUT != errsv) {
#ifndef PX4_DEBUG
if (debug) printf("\033[0;31m[ micrortps_transport ]\tRead fail %d\033[0m\n", errsv);
if (debug) { printf("\033[0;31m[ micrortps_transport ]\tRead fail %d\033[0m\n", errsv); }
#else
if (debug) PX4_DEBUG("Read fail %d", errsv);
if (debug) { PX4_DEBUG("Read fail %d", errsv); }
#endif /* PX4_DEBUG */
}
@@ -150,9 +154,13 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer
// Start not found
if (msg_start_pos > (rx_buff_pos - header_size)) {
#ifndef PX4_DEBUG
if (debug) printf("\033[1;33m[ micrortps_transport ]\t (↓↓ %" PRIu32 ")\033[0m\n", msg_start_pos);
if (debug) { printf("\033[1;33m[ micrortps_transport ]\t (↓↓ %" PRIu32 ")\033[0m\n", msg_start_pos); }
#else
if (debug) PX4_DEBUG(" (↓↓ %" PRIu32 ")", msg_start_pos);
if (debug) { PX4_DEBUG(" (↓↓ %" PRIu32 ")", msg_start_pos); }
#endif /* PX4_DEBUG */
// All we've checked so far is garbage, drop it - but save unchecked bytes
@@ -178,9 +186,13 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer
// If there's garbage at the beginning, drop it
if (msg_start_pos > 0) {
#ifndef PX4_DEBUG
if (debug) printf("\033[1;33m[ micrortps_transport ]\t (↓ %" PRIu32 ")\033[0m\n", msg_start_pos);
if (debug) { printf("\033[1;33m[ micrortps_transport ]\t (↓ %" PRIu32 ")\033[0m\n", msg_start_pos); }
#else
if (debug) PX4_DEBUG(" (↓ %" PRIu32 ")", msg_start_pos);
if (debug) { PX4_DEBUG(" (↓ %" PRIu32 ")", msg_start_pos); }
#endif /* PX4_DEBUG */
memmove(rx_buffer, rx_buffer + msg_start_pos, rx_buff_pos - msg_start_pos);
rx_buff_pos -= msg_start_pos;
@@ -194,9 +206,13 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer
if (read_crc != calc_crc) {
#ifndef PX4_DEBUG
if (debug) printf("\033[0;31m[ micrortps_transport ]\tBad CRC %" PRIu16 " != %" PRIu16 "\t\t(↓ %lu)\033[0m\n", read_crc, calc_crc, (unsigned long)(header_size + payload_len));
if (debug) { printf("\033[0;31m[ micrortps_transport ]\tBad CRC %" PRIu16 " != %" PRIu16 "\t\t(↓ %lu)\033[0m\n", read_crc, calc_crc, (unsigned long)(header_size + payload_len)); }
#else
if (debug) PX4_DEBUG("Bad CRC %u != %u\t\t(↓ %lu)", read_crc, calc_crc, (unsigned long)(header_size + payload_len));
if (debug) { PX4_DEBUG("Bad CRC %u != %u\t\t(↓ %lu)", read_crc, calc_crc, (unsigned long)(header_size + payload_len)); }
#endif /* PX4_DEBUG */
// Drop garbage up just beyond the start of the message
@@ -223,7 +239,7 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer
size_t Transport_node::get_header_length()
{
return sizeof(struct Header);
return sizeof(struct Header);
}
ssize_t Transport_node::write(const uint8_t topic_ID, char buffer[], size_t length)
@@ -248,15 +264,17 @@ ssize_t Transport_node::write(const uint8_t topic_ID, char buffer[], size_t leng
/* Fill in the header in the same payload buffer to call a single node_write */
memcpy(buffer, &header, sizeof(header));
ssize_t len = node_write(buffer, length + sizeof(header));
if (len != ssize_t(length + sizeof(header))) {
return len;
}
return len + sizeof(header);
}
UART_node::UART_node(const char *_uart_name, const uint32_t _baudrate,
const uint32_t _poll_ms, const bool _hw_flow_control,
const bool _sw_flow_control, const bool _debug):
const uint32_t _poll_ms, const bool _hw_flow_control,
const bool _sw_flow_control, const bool _debug):
Transport_node(_debug),
uart_fd(-1),
baudrate(_baudrate),
@@ -302,7 +320,8 @@ int UART_node::init()
if ((termios_state = tcgetattr(uart_fd, &uart_config)) < 0) {
int errno_bkp = errno;
#ifndef PX4_ERR
printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR GET CONF %s: %d (%d)\n\033[0m", uart_name, termios_state, errno);
printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR GET CONF %s: %d (%d)\n\033[0m", uart_name, termios_state,
errno);
#else
PX4_ERR("UART transport: ERR GET CONF %s: %d (%d)", uart_name, termios_state, errno);
#endif /* PX4_ERR */
@@ -326,6 +345,7 @@ int UART_node::init()
if (hw_flow_control) {
// HW flow control
uart_config.c_lflag |= CRTSCTS;
} else if (sw_flow_control) {
// SW flow control
uart_config.c_lflag |= (IXON | IXOFF | IXANY);
@@ -337,7 +357,7 @@ int UART_node::init()
if (!baudrate_to_speed(baudrate, &speed)) {
#ifndef PX4_ERR
printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET BAUD %s: Unsupported baudrate: %d\n\tsupported examples:\n\t9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, 921600, 1000000\033[0m\n",
uart_name, baudrate);
uart_name, baudrate);
#else
PX4_ERR("UART transport: ERR SET BAUD %s: Unsupported baudrate: %" PRIu32 "\n\tsupported examples:\n\t9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, 921600, 1000000\n",
uart_name, baudrate);
@@ -349,7 +369,8 @@ int UART_node::init()
if (cfsetispeed(&uart_config, speed) < 0 || cfsetospeed(&uart_config, speed) < 0) {
int errno_bkp = errno;
#ifndef PX4_ERR
printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET BAUD %s: %d (%d)\033[0m\n", uart_name, termios_state, errno);
printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET BAUD %s: %d (%d)\033[0m\n", uart_name, termios_state,
errno);
#else
PX4_ERR("ERR SET BAUD %s: %d (%d)", uart_name, termios_state, errno);
#endif /* PX4_ERR */
@@ -373,10 +394,10 @@ int UART_node::init()
while (0 < ::read(uart_fd, (void *)&aux, 64)) {
flush = true;
/**
* According to px4_time.h, px4_usleep() is only defined when lockstep is set
* to be used
*/
/**
* According to px4_time.h, px4_usleep() is only defined when lockstep is set
* to be used
*/
#ifndef px4_usleep
usleep(1000);
#else
@@ -386,15 +407,24 @@ int UART_node::init()
if (flush) {
#ifndef PX4_DEBUG
if (debug) printf("[ micrortps_transport ]\tUART transport: Flush\n");
if (debug) { printf("[ micrortps_transport ]\tUART transport: Flush\n"); }
#else
if (debug) PX4_DEBUG("UART transport: Flush");
if (debug) { PX4_DEBUG("UART transport: Flush"); }
#endif /* PX4_DEBUG */
} else {
#ifndef PX4_DEBUG
if (debug) printf("[ micrortps_transport ]\tUART transport: No flush\n");
if (debug) { printf("[ micrortps_transport ]\tUART transport: No flush\n"); }
#else
if (debug) PX4_DEBUG("UART transport: No flush");
if (debug) { PX4_DEBUG("UART transport: No flush"); }
#endif /* PX4_INFO */
}
@@ -549,17 +579,17 @@ bool UART_node::baudrate_to_speed(uint32_t bauds, speed_t *speed)
return true;
}
UDP_node::UDP_node(const char* _udp_ip, uint16_t _udp_port_recv,
uint16_t _udp_port_send, const bool _debug):
UDP_node::UDP_node(const char *_udp_ip, uint16_t _udp_port_recv,
uint16_t _udp_port_send, const bool _debug):
Transport_node(_debug),
sender_fd(-1),
receiver_fd(-1),
udp_port_recv(_udp_port_recv),
udp_port_send(_udp_port_send)
{
if (nullptr != _udp_ip) {
strcpy(udp_ip, _udp_ip);
}
if (nullptr != _udp_ip) {
strcpy(udp_ip, _udp_ip);
}
}
UDP_node::~UDP_node()
@@ -598,6 +628,7 @@ int UDP_node::init_receiver(uint16_t udp_port)
#endif /* PX4_ERR */
return -1;
}
#ifndef PX4_INFO
printf("[ micrortps_transport ]\tUDP transport: Trying to connect...");
#else
@@ -612,6 +643,7 @@ int UDP_node::init_receiver(uint16_t udp_port)
#endif /* PX4_ERR */
return -1;
}
#ifndef PX4_INFO
printf("[ micrortps_transport ]\tUDP transport: Connected to server!\n\n");
#else
+5 -5
View File
@@ -1,7 +1,7 @@
/****************************************************************************
*
* Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima).
* Copyright (c) 2018-2019 PX4 Development Team. All rights reserved.
* Copyright (c) 2018-2021 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
@@ -97,8 +97,8 @@ class UART_node: public Transport_node
{
public:
UART_node(const char *_uart_name, const uint32_t _baudrate,
const uint32_t _poll_ms, const bool _hw_flow_control,
const bool _sw_flow_control, const bool _debug);
const uint32_t _poll_ms, const bool _hw_flow_control,
const bool _sw_flow_control, const bool _debug);
virtual ~UART_node();
int init();
@@ -122,8 +122,8 @@ protected:
class UDP_node: public Transport_node
{
public:
UDP_node(const char* _udp_ip, uint16_t udp_port_recv, uint16_t udp_port_send,
const bool _debug);
UDP_node(const char *_udp_ip, uint16_t udp_port_recv, uint16_t udp_port_send,
const bool _debug);
virtual ~UDP_node();
int init();