diff --git a/msg/templates/urtps/Publisher.cpp.em b/msg/templates/urtps/Publisher.cpp.em index e24c03417f..d783428b1a 100644 --- a/msg/templates/urtps/Publisher.cpp.em +++ b/msg/templates/urtps/Publisher.cpp.em @@ -74,107 +74,115 @@ except AttributeError: #include "@(topic)_Publisher.h" @(topic)_Publisher::@(topic)_Publisher() - : mp_participant(nullptr), - mp_publisher(nullptr) + : mp_participant(nullptr), + mp_publisher(nullptr) { } @(topic)_Publisher::~@(topic)_Publisher() { - Domain::removeParticipant(mp_participant); + Domain::removeParticipant(mp_participant); } -bool @(topic)_Publisher::init(const std::string& ns) +bool @(topic)_Publisher::init(const std::string &ns) { - // Create RTPSParticipant - ParticipantAttributes PParam; + // Create RTPSParticipant + ParticipantAttributes PParam; @[if version.parse(fastrtps_version) < version.parse('2.0')]@ - PParam.rtps.builtin.domainId = 0; + PParam.rtps.builtin.domainId = 0; @[else]@ - PParam.domainId = 0; + PParam.domainId = 0; @[end if]@ @[if version.parse(fastrtps_version) <= version.parse('1.8.4')]@ - PParam.rtps.builtin.leaseDuration = c_TimeInfinite; + PParam.rtps.builtin.leaseDuration = c_TimeInfinite; @[else]@ - PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite; + PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite; @[end if]@ - std::string nodeName = ns; - nodeName.append("@(topic)_publisher"); - PParam.rtps.setName(nodeName.c_str()); + std::string nodeName = ns; + nodeName.append("@(topic)_publisher"); + PParam.rtps.setName(nodeName.c_str()); - // Check if ROS_LOCALHOST_ONLY is set. This means that one wants to use - // only the localhost network for data sharing - const char* localhost_only = std::getenv("ROS_LOCALHOST_ONLY"); - if (localhost_only && strcmp(localhost_only, "1") == 0) { - // Create a custom network transport descriptor to whitelist the localhost - auto localhostDescriptor = std::make_shared(); - localhostDescriptor->interfaceWhiteList.emplace_back("127.0.0.1"); + // Check if ROS_LOCALHOST_ONLY is set. This means that one wants to use + // only the localhost network for data sharing + const char *localhost_only = std::getenv("ROS_LOCALHOST_ONLY"); - // Disable the built-in Transport Layer - PParam.rtps.useBuiltinTransports = false; + if (localhost_only && strcmp(localhost_only, "1") == 0) { + // Create a custom network transport descriptor to whitelist the localhost + auto localhostDescriptor = std::make_shared(); + localhostDescriptor->interfaceWhiteList.emplace_back("127.0.0.1"); - // Add the descriptor as a custom user transport - PParam.rtps.userTransports.push_back(localhostDescriptor); - } + // Disable the built-in Transport Layer + PParam.rtps.useBuiltinTransports = false; - mp_participant = Domain::createParticipant(PParam); - if(mp_participant == nullptr) - return false; + // Add the descriptor as a custom user transport + PParam.rtps.userTransports.push_back(localhostDescriptor); + } - // Register the type - Domain::registerType(mp_participant, static_cast(&@(topic)DataType)); + mp_participant = Domain::createParticipant(PParam); - // Create Publisher - PublisherAttributes Wparam; - Wparam.topic.topicKind = NO_KEY; - Wparam.topic.topicDataType = @(topic)DataType.getName(); + if (mp_participant == nullptr) { + return false; + } + + // Register the type + Domain::registerType(mp_participant, static_cast(&@(topic)DataType)); + + // Create Publisher + PublisherAttributes Wparam; + Wparam.topic.topicKind = NO_KEY; + Wparam.topic.topicDataType = @(topic)DataType.getName(); @[if ros2_distro]@ @[ if ros2_distro == "ardent"]@ - Wparam.qos.m_partition.push_back("rt"); - std::string topicName = ns; - topicName.append("@(topic)_PubSubTopic"); - Wparam.topic.topicName = topicName; + Wparam.qos.m_partition.push_back("rt"); + std::string topicName = ns; + topicName.append("@(topic)_PubSubTopic"); + Wparam.topic.topicName = topicName; @[ else]@ - std::string topicName = "rt/"; - topicName.append(ns); - topicName.append("@(topic)_PubSubTopic"); - Wparam.topic.topicName = topicName; + std::string topicName = "rt/"; + topicName.append(ns); + topicName.append("@(topic)_PubSubTopic"); + Wparam.topic.topicName = topicName; @[ end if]@ @[else]@ - std::string topicName = ns; - topicName.append("@(topic)PubSubTopic"); - Wparam.topic.topicName = topicName; + std::string topicName = ns; + topicName.append("@(topic)PubSubTopic"); + Wparam.topic.topicName = topicName; @[end if]@ - mp_publisher = Domain::createPublisher(mp_participant, Wparam, static_cast(&m_listener)); - if(mp_publisher == nullptr) - return false; - return true; + mp_publisher = Domain::createPublisher(mp_participant, Wparam, static_cast(&m_listener)); + + if (mp_publisher == nullptr) { + return false; + } + + return true; } -void @(topic)_Publisher::PubListener::onPublicationMatched(Publisher* pub, MatchingInfo& info) +void @(topic)_Publisher::PubListener::onPublicationMatched(Publisher *pub, MatchingInfo &info) { - // The first 6 values of the ID guidPrefix of an entity in a DDS-RTPS Domain - // are the same for all its subcomponents (publishers, subscribers) - bool is_different_endpoint = false; - for (size_t i = 0; i < 6; i++) { - if (pub->getGuid().guidPrefix.value[i] != info.remoteEndpointGuid.guidPrefix.value[i]) { - is_different_endpoint = true; - break; - } - } + // The first 6 values of the ID guidPrefix of an entity in a DDS-RTPS Domain + // are the same for all its subcomponents (publishers, subscribers) + bool is_different_endpoint = false; - // If the matching happens for the same entity, do not make a match - if (is_different_endpoint) { - if (info.status == MATCHED_MATCHING) { - n_matched++; - std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) publisher matched\033[0m" << std::endl; - } else { - n_matched--; - std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) publisher unmatched\033[0m" << std::endl; - } - } + for (size_t i = 0; i < 6; i++) { + if (pub->getGuid().guidPrefix.value[i] != info.remoteEndpointGuid.guidPrefix.value[i]) { + is_different_endpoint = true; + break; + } + } + + // If the matching happens for the same entity, do not make a match + if (is_different_endpoint) { + if (info.status == MATCHED_MATCHING) { + n_matched++; + std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) publisher matched\033[0m" << std::endl; + + } else { + n_matched--; + std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) publisher unmatched\033[0m" << std::endl; + } + } } -void @(topic)_Publisher::publish(@(topic)_msg_t* st) +void @(topic)_Publisher::publish(@(topic)_msg_t *st) { - mp_publisher->write(st); + mp_publisher->write(st); } diff --git a/msg/templates/urtps/Publisher.h.em b/msg/templates/urtps/Publisher.h.em index 76236b832b..9b701b669c 100644 --- a/msg/templates/urtps/Publisher.h.em +++ b/msg/templates/urtps/Publisher.h.em @@ -25,7 +25,7 @@ except AttributeError: /**************************************************************************** * * Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima). - * Copyright (c) 2018-2019 PX4 Development Team. All rights reserved. + * Copyright (c) 2018-2021 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: @@ -99,24 +99,24 @@ using @(topic)_msg_datatype = @(topic)PubSubType; class @(topic)_Publisher { public: - @(topic)_Publisher(); - virtual ~@(topic)_Publisher(); - bool init(const std::string& ns); - void run(); - void publish(@(topic)_msg_t* st); + @(topic)_Publisher(); + virtual ~@(topic)_Publisher(); + bool init(const std::string &ns); + void run(); + void publish(@(topic)_msg_t *st); private: - Participant *mp_participant; - Publisher *mp_publisher; + Participant *mp_participant; + Publisher *mp_publisher; - class PubListener : public PublisherListener - { - public: - PubListener() : n_matched(0){}; - ~PubListener(){}; - void onPublicationMatched(Publisher* pub, MatchingInfo& info); - int n_matched; - } m_listener; - @(topic)_msg_datatype @(topic)DataType; + class PubListener : public PublisherListener + { + public: + PubListener() : n_matched(0) {}; + ~PubListener() {}; + void onPublicationMatched(Publisher *pub, MatchingInfo &info); + int n_matched; + } m_listener; + @(topic)_msg_datatype @(topic)DataType; }; #endif // _@(topic)__PUBLISHER_H_ diff --git a/msg/templates/urtps/RtpsTopics.cpp.em b/msg/templates/urtps/RtpsTopics.cpp.em index 32586121ff..d9435fa95e 100644 --- a/msg/templates/urtps/RtpsTopics.cpp.em +++ b/msg/templates/urtps/RtpsTopics.cpp.em @@ -24,7 +24,7 @@ package = package[0] /**************************************************************************** * * Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima). - * Copyright (c) 2018-2019 PX4 Development Team. All rights reserved. + * Copyright (c) 2018-2021 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: @@ -56,113 +56,127 @@ package = package[0] #include "RtpsTopics.h" -bool RtpsTopics::init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue* t_send_queue, const std::string& ns) +bool RtpsTopics::init(std::condition_variable *t_send_queue_cv, std::mutex *t_send_queue_mutex, + std::queue *t_send_queue, const std::string &ns) { @[if recv_topics]@ - // Initialise subscribers - std::cout << "\033[0;36m--- Subscribers ---\033[0m" << std::endl; + // Initialise subscribers + std::cout << "\033[0;36m--- Subscribers ---\033[0m" << std::endl; @[for topic in recv_topics]@ - if (_@(topic)_sub.init(@(rtps_message_id(ids, topic)), t_send_queue_cv, t_send_queue_mutex, t_send_queue, ns)) { - std::cout << "- @(topic) subscriber started" << std::endl; - } else { - std::cerr << "Failed starting @(topic) subscriber" << std::endl; - return false; - } + + if (_@(topic)_sub.init(@(rtps_message_id(ids, topic)), t_send_queue_cv, t_send_queue_mutex, t_send_queue, ns)) { + std::cout << "- @(topic) subscriber started" << std::endl; + + } else { + std::cerr << "Failed starting @(topic) subscriber" << std::endl; + return false; + } + @[end for]@ - std::cout << "\033[0;36m-----------------------\033[0m" << std::endl << std::endl; + std::cout << "\033[0;36m-----------------------\033[0m" << std::endl << std::endl; @[end if]@ @[if send_topics]@ - // Initialise publishers - std::cout << "\033[0;36m---- Publishers ----\033[0m" << std::endl; + // Initialise publishers + std::cout << "\033[0;36m---- Publishers ----\033[0m" << std::endl; @[for topic in send_topics]@ - if (_@(topic)_pub.init(ns)) { - std::cout << "- @(topic) publisher started" << std::endl; + + if (_@(topic)_pub.init(ns)) { + std::cout << "- @(topic) publisher started" << std::endl; @[ if topic == 'Timesync' or topic == 'timesync']@ - _timesync->start(&_@(topic)_pub); + _timesync->start(&_@(topic)_pub); @[ end if]@ - } else { - std::cerr << "ERROR starting @(topic) publisher" << std::endl; - return false; - } + + } else { + std::cerr << "ERROR starting @(topic) publisher" << std::endl; + return false; + } + @[end for]@ - std::cout << "\033[0;36m-----------------------\033[0m" << std::endl; + std::cout << "\033[0;36m-----------------------\033[0m" << std::endl; @[end if]@ - return true; + return true; } @[if send_topics]@ void RtpsTopics::publish(uint8_t topic_ID, char data_buffer[], size_t len) { - switch (topic_ID) - { + switch (topic_ID) { @[for topic in send_topics]@ - case @(rtps_message_id(ids, topic)): // @(topic) - { - @(topic)_msg_t st; - eprosima::fastcdr::FastBuffer cdrbuffer(data_buffer, len); - eprosima::fastcdr::Cdr cdr_des(cdrbuffer); - st.deserialize(cdr_des); -@[ if topic == 'Timesync' or topic == 'timesync']@ - _timesync->processTimesyncMsg(&st); - if (getMsgSysID(&st) == 1) { -@[ end if]@ - // apply timestamp offset - uint64_t timestamp = getMsgTimestamp(&st); - uint64_t timestamp_sample = getMsgTimestampSample(&st); - _timesync->subtractOffset(timestamp); - setMsgTimestamp(&st, timestamp); - _timesync->subtractOffset(timestamp_sample); - setMsgTimestampSample(&st, timestamp_sample); - _@(topic)_pub.publish(&st); + case @(rtps_message_id(ids, topic)): { // @(topic) + @(topic)_msg_t st; + eprosima::fastcdr::FastBuffer cdrbuffer(data_buffer, len); + eprosima::fastcdr::Cdr cdr_des(cdrbuffer); + st.deserialize(cdr_des); @[ if topic == 'Timesync' or topic == 'timesync']@ - } + _timesync->processTimesyncMsg(&st); + + if (getMsgSysID(&st) == 1) { @[ end if]@ - } - break; + // apply timestamp offset + uint64_t timestamp = getMsgTimestamp(&st); + uint64_t timestamp_sample = getMsgTimestampSample(&st); + _timesync->subtractOffset(timestamp); + setMsgTimestamp(&st, timestamp); + _timesync->subtractOffset(timestamp_sample); + setMsgTimestampSample(&st, timestamp_sample); + _@(topic)_pub.publish(&st); +@[ if topic == 'Timesync' or topic == 'timesync']@ + } + +@[ end if]@ + } + break; @[end for]@ - default: - printf("\033[1;33m[ micrortps_agent ]\tUnexpected topic ID '%hhu' to publish Please make sure the agent is capable of parsing the message associated to the topic ID '%hhu'\033[0m\n", topic_ID, topic_ID); - break; - } + + default: + printf("\033[1;33m[ micrortps_agent ]\tUnexpected topic ID '%hhu' to publish Please make sure the agent is capable of parsing the message associated to the topic ID '%hhu'\033[0m\n", + topic_ID, topic_ID); + break; + } } @[end if]@ @[if recv_topics]@ bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr) { - bool ret = false; - switch (topic_ID) - { -@[for topic in recv_topics]@ - case @(rtps_message_id(ids, topic)): // @(topic) - if (_@(topic)_sub.hasMsg()) - { - @(topic)_msg_t msg = _@(topic)_sub.getMsg(); -@[ if topic == 'Timesync' or topic == 'timesync']@ - if (getMsgSysID(&msg) == 0) { -@[ end if]@ - // apply timestamps offset - uint64_t timestamp = getMsgTimestamp(&msg); - uint64_t timestamp_sample = getMsgTimestampSample(&msg); - _timesync->addOffset(timestamp); - setMsgTimestamp(&msg, timestamp); - _timesync->addOffset(timestamp_sample); - setMsgTimestampSample(&msg, timestamp_sample); - msg.serialize(scdr); - ret = true; -@[ if topic == 'Timesync' or topic == 'timesync']@ - } -@[ end if]@ - _@(topic)_sub.unlockMsg(); - } - break; -@[end for]@ - default: - printf("\033[1;33m[ micrortps_agent ]\tUnexpected topic ID '%hhu' to getMsg. Please make sure the agent is capable of parsing the message associated to the topic ID '%hhu'\033[0m\n", topic_ID, topic_ID); - break; - } + bool ret = false; - return ret; + switch (topic_ID) { +@[for topic in recv_topics]@ + + case @(rtps_message_id(ids, topic)): // @(topic) + if (_@(topic)_sub.hasMsg()) { + @(topic)_msg_t msg = _@(topic)_sub.getMsg(); +@[ if topic == 'Timesync' or topic == 'timesync']@ + + if (getMsgSysID(&msg) == 0) { +@[ end if]@ + // apply timestamps offset + uint64_t timestamp = getMsgTimestamp(&msg); + uint64_t timestamp_sample = getMsgTimestampSample(&msg); + _timesync->addOffset(timestamp); + setMsgTimestamp(&msg, timestamp); + _timesync->addOffset(timestamp_sample); + setMsgTimestampSample(&msg, timestamp_sample); + msg.serialize(scdr); + ret = true; +@[ if topic == 'Timesync' or topic == 'timesync']@ + } + +@[ end if]@ + _@(topic)_sub.unlockMsg(); + } + + break; +@[end for]@ + + default: + printf("\033[1;33m[ micrortps_agent ]\tUnexpected topic ID '%hhu' to getMsg. Please make sure the agent is capable of parsing the message associated to the topic ID '%hhu'\033[0m\n", + topic_ID, topic_ID); + break; + } + + return ret; } @[end if]@ diff --git a/msg/templates/urtps/RtpsTopics.h.em b/msg/templates/urtps/RtpsTopics.h.em index 37d87c4fd2..51cacbfcf7 100644 --- a/msg/templates/urtps/RtpsTopics.h.em +++ b/msg/templates/urtps/RtpsTopics.h.em @@ -30,7 +30,7 @@ except AttributeError: /**************************************************************************** * * Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima). - * Copyright (c) 2018-2019 PX4 Development Team. All rights reserved. + * Copyright (c) 2018-2021 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: @@ -91,120 +91,122 @@ using @(topic)_msg_t = @(topic); @[ end if]@ @[end for]@ -class RtpsTopics { +class RtpsTopics +{ public: - bool init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue* t_send_queue, const std::string& ns); - void set_timesync(const std::shared_ptr& timesync) { _timesync = timesync; }; + bool init(std::condition_variable *t_send_queue_cv, std::mutex *t_send_queue_mutex, std::queue *t_send_queue, + const std::string &ns); + void set_timesync(const std::shared_ptr ×ync) { _timesync = timesync; }; @[if send_topics]@ - void publish(uint8_t topic_ID, char data_buffer[], size_t len); + void publish(uint8_t topic_ID, char data_buffer[], size_t len); @[end if]@ @[if recv_topics]@ - bool getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr); + bool getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr); @[end if]@ private: @[if send_topics]@ - /** Publishers **/ + /** Publishers **/ @[for topic in send_topics]@ - @(topic)_Publisher _@(topic)_pub; + @(topic)_Publisher _@(topic)_pub; @[end for]@ @[end if]@ @[if recv_topics]@ - /** Subscribers **/ + /** Subscribers **/ @[for topic in recv_topics]@ - @(topic)_Subscriber _@(topic)_sub; + @(topic)_Subscriber _@(topic)_sub; @[end for]@ @[end if]@ - // SFINAE - template struct hasTimestampSample{ - private: - template().timestamp_sample(int64_t()))> - static std::true_type detect(int); - template - static std::false_type detect(...); - public: - static constexpr bool value = decltype(detect(0))::value; - }; + // SFINAE + template struct hasTimestampSample{ + private: + template().timestamp_sample(int64_t()))> + static std::true_type detect(int); + template + static std::false_type detect(...); + public: + static constexpr bool value = decltype(detect(0))::value; + }; - template - inline typename std::enable_if::value, uint64_t>::type - getMsgTimestampSample_impl(const T*) { return 0; } + template + inline typename std::enable_if < !hasTimestampSample::value, uint64_t >::type + getMsgTimestampSample_impl(const T *) { return 0; } - /** Msg metada Getters **/ + /** Msg metada Getters **/ @[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@ - template - inline uint64_t getMsgTimestamp(const T* msg) { return msg->timestamp_(); } + template + inline uint64_t getMsgTimestamp(const T *msg) { return msg->timestamp_(); } - template - inline typename std::enable_if::value, uint64_t>::type - getMsgTimestampSample_impl(const T* msg) { return msg->timestamp_sample_(); } + template + inline typename std::enable_if::value, uint64_t>::type + getMsgTimestampSample_impl(const T *msg) { return msg->timestamp_sample_(); } - template - inline uint8_t getMsgSysID(const T* msg) { return msg->sys_id_(); } + template + inline uint8_t getMsgSysID(const T *msg) { return msg->sys_id_(); } - template - inline uint8_t getMsgSeq(const T* msg) { return msg->seq_(); } + template + inline uint8_t getMsgSeq(const T *msg) { return msg->seq_(); } @[elif ros2_distro]@ - template - inline uint64_t getMsgTimestamp(const T* msg) { return msg->timestamp(); } + template + inline uint64_t getMsgTimestamp(const T *msg) { return msg->timestamp(); } - template - inline typename std::enable_if::value, uint64_t>::type - getMsgTimestampSample_impl(const T* msg) { return msg->timestamp_sample(); } + template + inline typename std::enable_if::value, uint64_t>::type + getMsgTimestampSample_impl(const T *msg) { return msg->timestamp_sample(); } - template - inline uint8_t getMsgSysID(const T* msg) { return msg->sys_id(); } + template + inline uint8_t getMsgSysID(const T *msg) { return msg->sys_id(); } - template - inline uint8_t getMsgSeq(const T* msg) { return msg->seq(); } + template + inline uint8_t getMsgSeq(const T *msg) { return msg->seq(); } @[end if]@ - template - inline uint64_t getMsgTimestampSample(const T* msg) { return getMsgTimestampSample_impl(msg); } + template + inline uint64_t getMsgTimestampSample(const T *msg) { return getMsgTimestampSample_impl(msg); } - template - inline typename std::enable_if::value, void>::type - setMsgTimestampSample_impl(T*, const uint64_t&) {} + template + inline typename std::enable_if ::value, void>::type + setMsgTimestampSample_impl(T *, const uint64_t &) {} - /** Msg metadata Setters **/ + /** Msg metadata Setters **/ @[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@ - template - inline void setMsgTimestamp(T* msg, const uint64_t& timestamp) { msg->timestamp_() = timestamp; } + template + inline void setMsgTimestamp(T *msg, const uint64_t ×tamp) { msg->timestamp_() = timestamp; } - template - inline typename std::enable_if::value, void>::type - setMsgTimestampSample_impl(T* msg, const uint64_t& timestamp_sample) { msg->timestamp_sample_() = timestamp_sample; } + template + inline typename std::enable_if::value, void>::type + setMsgTimestampSample_impl(T *msg, const uint64_t ×tamp_sample) { msg->timestamp_sample_() = timestamp_sample; } - template - inline void setMsgSysID(T* msg, const uint8_t& sys_id) { msg->sys_id_() = sys_id; } + template + inline void setMsgSysID(T *msg, const uint8_t &sys_id) { msg->sys_id_() = sys_id; } - template - inline void setMsgSeq(T* msg, const uint8_t& seq) { msg->seq_() = seq; } + template + inline void setMsgSeq(T *msg, const uint8_t &seq) { msg->seq_() = seq; } @[elif ros2_distro]@ - template - inline void setMsgTimestamp(T* msg, const uint64_t& timestamp) { msg->timestamp() = timestamp; } + template + inline void setMsgTimestamp(T *msg, const uint64_t ×tamp) { msg->timestamp() = timestamp; } - template - inline typename std::enable_if::value, void>::type - setMsgTimestampSample_impl(T* msg, const uint64_t& timestamp_sample) { msg->timestamp_sample() = timestamp_sample; } + template + inline typename std::enable_if::value, void>::type + setMsgTimestampSample_impl(T *msg, const uint64_t ×tamp_sample) { msg->timestamp_sample() = timestamp_sample; } - template - inline void setMsgSysID(T* msg, const uint8_t& sys_id) { msg->sys_id() = sys_id; } + template + inline void setMsgSysID(T *msg, const uint8_t &sys_id) { msg->sys_id() = sys_id; } - template - inline void setMsgSeq(T* msg, const uint8_t& seq) { msg->seq() = seq; } + template + inline void setMsgSeq(T *msg, const uint8_t &seq) { msg->seq() = seq; } @[end if]@ - template - inline void setMsgTimestampSample(T* msg, const uint64_t& timestamp_sample) { setMsgTimestampSample_impl(msg, timestamp_sample); } + template + inline void setMsgTimestampSample(T *msg, const uint64_t ×tamp_sample) { setMsgTimestampSample_impl(msg, timestamp_sample); } - /** - * @@brief Timesync object ptr. - * This object is used to compuyte and apply the time offsets to the - * messages timestamps. - */ - std::shared_ptr _timesync; + /** + * @@brief Timesync object ptr. + * This object is used to compuyte and apply the time offsets to the + * messages timestamps. + */ + std::shared_ptr _timesync; }; diff --git a/msg/templates/urtps/Subscriber.cpp.em b/msg/templates/urtps/Subscriber.cpp.em index 49f3674bb8..b280774a14 100644 --- a/msg/templates/urtps/Subscriber.cpp.em +++ b/msg/templates/urtps/Subscriber.cpp.em @@ -73,172 +73,182 @@ except AttributeError: #include "@(topic)_Subscriber.h" @(topic)_Subscriber::@(topic)_Subscriber() - : mp_participant(nullptr), - mp_subscriber(nullptr) + : mp_participant(nullptr), + mp_subscriber(nullptr) { } @(topic)_Subscriber::~@(topic)_Subscriber() { - Domain::removeParticipant(mp_participant); + Domain::removeParticipant(mp_participant); } -bool @(topic)_Subscriber::init(uint8_t topic_ID, std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue* t_send_queue, const std::string& ns) +bool @(topic)_Subscriber::init(uint8_t topic_ID, std::condition_variable *t_send_queue_cv, + std::mutex *t_send_queue_mutex, std::queue *t_send_queue, const std::string &ns) { - m_listener.topic_ID = topic_ID; - m_listener.t_send_queue_cv = t_send_queue_cv; - m_listener.t_send_queue_mutex = t_send_queue_mutex; - m_listener.t_send_queue = t_send_queue; + m_listener.topic_ID = topic_ID; + m_listener.t_send_queue_cv = t_send_queue_cv; + m_listener.t_send_queue_mutex = t_send_queue_mutex; + m_listener.t_send_queue = t_send_queue; - // Create RTPSParticipant - ParticipantAttributes PParam; + // Create RTPSParticipant + ParticipantAttributes PParam; @[if version.parse(fastrtps_version) < version.parse('2.0')]@ - PParam.rtps.builtin.domainId = 0; + PParam.rtps.builtin.domainId = 0; @[else]@ - PParam.domainId = 0; + PParam.domainId = 0; @[end if]@ @[if version.parse(fastrtps_version) <= version.parse('1.8.4')]@ - PParam.rtps.builtin.leaseDuration = c_TimeInfinite; + PParam.rtps.builtin.leaseDuration = c_TimeInfinite; @[else]@ - PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite; + PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite; @[end if]@ - std::string nodeName = ns; - nodeName.append("@(topic)_subscriber"); - PParam.rtps.setName(nodeName.c_str()); + std::string nodeName = ns; + nodeName.append("@(topic)_subscriber"); + PParam.rtps.setName(nodeName.c_str()); - // Check if ROS_LOCALHOST_ONLY is set. This means that one wants to use - // only the localhost network for data sharing - const char* localhost_only = std::getenv("ROS_LOCALHOST_ONLY"); - if (localhost_only && strcmp(localhost_only, "1") == 0) { - // Create a custom network transport descriptor to whitelist the localhost - auto localhostDescriptor = std::make_shared(); - localhostDescriptor->interfaceWhiteList.emplace_back("127.0.0.1"); + // Check if ROS_LOCALHOST_ONLY is set. This means that one wants to use + // only the localhost network for data sharing + const char *localhost_only = std::getenv("ROS_LOCALHOST_ONLY"); - // Disable the built-in Transport Layer - PParam.rtps.useBuiltinTransports = false; + if (localhost_only && strcmp(localhost_only, "1") == 0) { + // Create a custom network transport descriptor to whitelist the localhost + auto localhostDescriptor = std::make_shared(); + localhostDescriptor->interfaceWhiteList.emplace_back("127.0.0.1"); - // Add the descriptor as a custom user transport - PParam.rtps.userTransports.push_back(localhostDescriptor); - } + // Disable the built-in Transport Layer + PParam.rtps.useBuiltinTransports = false; - mp_participant = Domain::createParticipant(PParam); - if(mp_participant == nullptr) - return false; + // Add the descriptor as a custom user transport + PParam.rtps.userTransports.push_back(localhostDescriptor); + } - //Register the type - Domain::registerType(mp_participant, static_cast(&@(topic)DataType)); + mp_participant = Domain::createParticipant(PParam); - // Create Subscriber - SubscriberAttributes Rparam; - Rparam.topic.topicKind = NO_KEY; - Rparam.topic.topicDataType = @(topic)DataType.getName(); + if (mp_participant == nullptr) { + return false; + } + + // Register the type + Domain::registerType(mp_participant, static_cast(&@(topic)DataType)); + + // Create Subscriber + SubscriberAttributes Rparam; + Rparam.topic.topicKind = NO_KEY; + Rparam.topic.topicDataType = @(topic)DataType.getName(); @[if ros2_distro]@ @[ if ros2_distro == "ardent"]@ - Rparam.qos.m_partition.push_back("rt"); - std::string topicName = ns; - topicName.append("@(topic)_PubSubTopic"); - Rparam.topic.topicName = topicName; + Rparam.qos.m_partition.push_back("rt"); + std::string topicName = ns; + topicName.append("@(topic)_PubSubTopic"); + Rparam.topic.topicName = topicName; @[ else]@ - std::string topicName = "rt/"; - topicName.append(ns); - topicName.append("@(topic)_PubSubTopic"); - Rparam.topic.topicName = topicName; + std::string topicName = "rt/"; + topicName.append(ns); + topicName.append("@(topic)_PubSubTopic"); + Rparam.topic.topicName = topicName; @[ end if]@ @[else]@ - std::string topicName = ns; - topicName.append("@(topic)PubSubTopic"); - Rparam.topic.topicName = topicName; + std::string topicName = ns; + topicName.append("@(topic)PubSubTopic"); + Rparam.topic.topicName = topicName; @[end if]@ - mp_subscriber = Domain::createSubscriber(mp_participant, Rparam, static_cast(&m_listener)); - if(mp_subscriber == nullptr) - return false; - return true; + mp_subscriber = Domain::createSubscriber(mp_participant, Rparam, static_cast(&m_listener)); + + if (mp_subscriber == nullptr) { + return false; + } + + return true; } -void @(topic)_Subscriber::SubListener::onSubscriptionMatched(Subscriber* sub, MatchingInfo& info) +void @(topic)_Subscriber::SubListener::onSubscriptionMatched(Subscriber *sub, MatchingInfo &info) { @# Since the time sync runs on the bridge itself, it is required that there is a @# match between two topics of the same entity @[if topic != 'Timesync' and topic != 'timesync']@ - // The first 6 values of the ID guidPrefix of an entity in a DDS-RTPS Domain - // are the same for all its subcomponents (publishers, subscribers) - bool is_different_endpoint = false; - for (size_t i = 0; i < 6; i++) { - if (sub->getGuid().guidPrefix.value[i] != info.remoteEndpointGuid.guidPrefix.value[i]) { - is_different_endpoint = true; - break; - } - } + // The first 6 values of the ID guidPrefix of an entity in a DDS-RTPS Domain + // are the same for all its subcomponents (publishers, subscribers) + bool is_different_endpoint = false; + + for (size_t i = 0; i < 6; i++) { + if (sub->getGuid().guidPrefix.value[i] != info.remoteEndpointGuid.guidPrefix.value[i]) { + is_different_endpoint = true; + break; + } + } + + // If the matching happens for the same entity, do not make a match + if (is_different_endpoint) { + if (info.status == MATCHED_MATCHING) { + n_matched++; + std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) subscriber matched\033[0m" << std::endl; + + } else { + n_matched--; + std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) subscriber unmatched\033[0m" << std::endl; + } + } - // If the matching happens for the same entity, do not make a match - if (is_different_endpoint) { - if (info.status == MATCHED_MATCHING) { - n_matched++; - std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) subscriber matched\033[0m" << std::endl; - } else { - n_matched--; - std::cout << "\033[0;37m[ micrortps_agent ]\t@(topic) subscriber unmatched\033[0m" << std::endl; - } - } @[else]@ - (void)sub; + (void)sub; - if (info.status == MATCHED_MATCHING) { - n_matched++; - } else { - n_matched--; - } + if (info.status == MATCHED_MATCHING) { + n_matched++; + + } else { + n_matched--; + } @[end if]@ } -void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber* sub) +void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber *sub) { - if (n_matched > 0) { - std::unique_lock has_msg_lock(has_msg_mutex); - if(has_msg.load() == true) // Check if msg has been fetched - { - has_msg_cv.wait(has_msg_lock); // Wait till msg has been fetched - } - has_msg_lock.unlock(); + if (n_matched > 0) { + std::unique_lock has_msg_lock(has_msg_mutex); - // Take data - if(sub->takeNextData(&msg, &m_info)) - { - if(m_info.sampleKind == ALIVE) - { - std::unique_lock lk(*t_send_queue_mutex); + if (has_msg.load() == true) { // Check if msg has been fetched + has_msg_cv.wait(has_msg_lock); // Wait till msg has been fetched + } - ++n_msg; - has_msg = true; + has_msg_lock.unlock(); - t_send_queue->push(topic_ID); - lk.unlock(); - t_send_queue_cv->notify_one(); + // Take data + if (sub->takeNextData(&msg, &m_info)) { + if (m_info.sampleKind == ALIVE) { + std::unique_lock lk(*t_send_queue_mutex); - } - } - } + ++n_msg; + has_msg = true; + + t_send_queue->push(topic_ID); + lk.unlock(); + t_send_queue_cv->notify_one(); + + } + } + } } bool @(topic)_Subscriber::hasMsg() { - if (m_listener.n_matched > 0) { - return m_listener.has_msg.load(); - } + if (m_listener.n_matched > 0) { + return m_listener.has_msg.load(); + } - return false; + return false; } @(topic)_msg_t @(topic)_Subscriber::getMsg() { - return m_listener.msg; + return m_listener.msg; } void @(topic)_Subscriber::unlockMsg() { - if (m_listener.n_matched > 0) { - std::unique_lock has_msg_lock(m_listener.has_msg_mutex); - m_listener.has_msg = false; - has_msg_lock.unlock(); - m_listener.has_msg_cv.notify_one(); - } + if (m_listener.n_matched > 0) { + std::unique_lock has_msg_lock(m_listener.has_msg_mutex); + m_listener.has_msg = false; + has_msg_lock.unlock(); + m_listener.has_msg_cv.notify_one(); + } } diff --git a/msg/templates/urtps/Subscriber.h.em b/msg/templates/urtps/Subscriber.h.em index c24e32dcca..fc4a7e558a 100644 --- a/msg/templates/urtps/Subscriber.h.em +++ b/msg/templates/urtps/Subscriber.h.em @@ -25,7 +25,7 @@ except AttributeError: /**************************************************************************** * * Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima). - * Copyright (c) 2018-2019 PX4 Development Team. All rights reserved. + * Copyright (c) 2018-2021 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: @@ -103,39 +103,40 @@ using @(topic)_msg_datatype = @(topic)PubSubType; class @(topic)_Subscriber { public: - @(topic)_Subscriber(); - virtual ~@(topic)_Subscriber(); - bool init(uint8_t topic_ID, std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue* t_send_queue, const std::string& ns); - void run(); - bool hasMsg(); - @(topic)_msg_t getMsg(); - void unlockMsg(); + @(topic)_Subscriber(); + virtual ~@(topic)_Subscriber(); + bool init(uint8_t topic_ID, std::condition_variable *t_send_queue_cv, std::mutex *t_send_queue_mutex, + std::queue *t_send_queue, const std::string &ns); + void run(); + bool hasMsg(); + @(topic)_msg_t getMsg(); + void unlockMsg(); private: - Participant *mp_participant; - Subscriber *mp_subscriber; + Participant *mp_participant; + Subscriber *mp_subscriber; - class SubListener : public SubscriberListener - { - public: - SubListener() : n_matched(0), n_msg(0), has_msg(false){}; - ~SubListener(){}; - void onSubscriptionMatched(Subscriber* sub, MatchingInfo& info); - void onNewDataMessage(Subscriber* sub); - SampleInfo_t m_info; - int n_matched; - int n_msg; - @(topic)_msg_t msg; - std::atomic_bool has_msg; - uint8_t topic_ID; - std::condition_variable* t_send_queue_cv; - std::mutex* t_send_queue_mutex; - std::queue* t_send_queue; - std::condition_variable has_msg_cv; - std::mutex has_msg_mutex; + class SubListener : public SubscriberListener + { + public: + SubListener() : n_matched(0), n_msg(0), has_msg(false) {}; + ~SubListener() {}; + void onSubscriptionMatched(Subscriber *sub, MatchingInfo &info); + void onNewDataMessage(Subscriber *sub); + SampleInfo_t m_info; + int n_matched; + int n_msg; + @(topic)_msg_t msg; + std::atomic_bool has_msg; + uint8_t topic_ID; + std::condition_variable *t_send_queue_cv; + std::mutex *t_send_queue_mutex; + std::queue *t_send_queue; + std::condition_variable has_msg_cv; + std::mutex has_msg_mutex; - } m_listener; - @(topic)_msg_datatype @(topic)DataType; + } m_listener; + @(topic)_msg_datatype @(topic)DataType; }; #endif // _@(topic)__SUBSCRIBER_H_ diff --git a/msg/templates/urtps/microRTPS_agent.cpp.em b/msg/templates/urtps/microRTPS_agent.cpp.em index 88740101a4..e8af3c2cc0 100644 --- a/msg/templates/urtps/microRTPS_agent.cpp.em +++ b/msg/templates/urtps/microRTPS_agent.cpp.em @@ -91,89 +91,98 @@ RtpsTopics topics; uint32_t total_sent = 0, sent = 0; struct options { - enum class eTransports - { - UART, - UDP - }; - eTransports transport = options::eTransports::UART; - char device[64] = DEVICE; - int sleep_us = SLEEP_US; - uint32_t baudrate = BAUDRATE; - int poll_ms = POLL_MS; - uint16_t recv_port = DEFAULT_RECV_PORT; - uint16_t send_port = DEFAULT_SEND_PORT; - char ip[16] = DEFAULT_IP; - bool sw_flow_control = false; - bool hw_flow_control = false; - bool verbose_debug = false; - std::string ns = ""; + enum class eTransports { + UART, + UDP + }; + eTransports transport = options::eTransports::UART; + char device[64] = DEVICE; + int sleep_us = SLEEP_US; + uint32_t baudrate = BAUDRATE; + int poll_ms = POLL_MS; + uint16_t recv_port = DEFAULT_RECV_PORT; + uint16_t send_port = DEFAULT_SEND_PORT; + char ip[16] = DEFAULT_IP; + bool sw_flow_control = false; + bool hw_flow_control = false; + bool verbose_debug = false; + std::string ns = ""; } _options; static void usage(const char *name) { - printf("usage: %s [options]\n\n" - " -b UART device baudrate. Default 460800\n" - " -d UART device. Default /dev/ttyACM0\n" - " -f Activates UART link SW flow control\n" - " -h Activates UART link HW flow control\n" - " -i Target IP for UDP. Default 127.0.0.1\n" - " -n ROS 2 topics namespace. Identifies the vehicle in a multi-agent network\n" - " -p Time in ms to poll over UART. Default 1ms\n" - " -r UDP port for receiving. Default 2020\n" - " -s UDP port for sending. Default 2019\n" - " -t [UART|UDP] Default UART\n" - " -v Add more verbosity\n" - " -w Time in us for which each iteration sleep. Default 1ms\n", - name); + printf("usage: %s [options]\n\n" + " -b UART device baudrate. Default 460800\n" + " -d UART device. Default /dev/ttyACM0\n" + " -f Activates UART link SW flow control\n" + " -h Activates UART link HW flow control\n" + " -i Target IP for UDP. Default 127.0.0.1\n" + " -n ROS 2 topics namespace. Identifies the vehicle in a multi-agent network\n" + " -p Time in ms to poll over UART. Default 1ms\n" + " -r UDP port for receiving. Default 2020\n" + " -s UDP port for sending. Default 2019\n" + " -t [UART|UDP] Default UART\n" + " -v Add more verbosity\n" + " -w Time in us for which each iteration sleep. Default 1ms\n", + name); } static int parse_options(int argc, char **argv) { - int ch; + int ch; - while ((ch = getopt(argc, argv, "t:d:w:b:p:r:s:i:fhvn:")) != EOF) - { - switch (ch) - { - case 't': _options.transport = strcmp(optarg, "UDP") == 0? - options::eTransports::UDP - :options::eTransports::UART; break; - case 'd': if (nullptr != optarg) strcpy(_options.device, optarg); break; - case 'w': _options.sleep_us = strtol(optarg, nullptr, 10); break; - case 'b': _options.baudrate = strtoul(optarg, nullptr, 10); break; - case 'p': _options.poll_ms = strtol(optarg, nullptr, 10); break; - case 'r': _options.recv_port = strtoul(optarg, nullptr, 10); break; - case 's': _options.send_port = strtoul(optarg, nullptr, 10); break; - case 'i': if (nullptr != optarg) strcpy(_options.ip, optarg); break; - case 'f': _options.sw_flow_control = true; break; - case 'h': _options.hw_flow_control = true; break; - case 'v': _options.verbose_debug = true; break; - case 'n': if (nullptr != optarg) _options.ns = std::string(optarg) + "/"; break; - default: - usage(argv[0]); - return -1; - } - } + while ((ch = getopt(argc, argv, "t:d:w:b:p:r:s:i:fhvn:")) != EOF) { + switch (ch) { + case 't': _options.transport = strcmp(optarg, "UDP") == 0 ? + options::eTransports::UDP + : options::eTransports::UART; break; - if (_options.poll_ms < 1) { - _options.poll_ms = 1; - printf("\033[1;33m[ micrortps_agent ]\tPoll timeout too low, using 1 ms\033[0m"); - } + case 'd': if (nullptr != optarg) strcpy(_options.device, optarg); break; - if (_options.hw_flow_control && _options.sw_flow_control) { - printf("\033[0;31m[ micrortps_agent ]\tHW and SW flow control set. Please set only one or another\033[0m"); - return -1; - } + case 'w': _options.sleep_us = strtol(optarg, nullptr, 10); break; - return 0; + case 'b': _options.baudrate = strtoul(optarg, nullptr, 10); break; + + case 'p': _options.poll_ms = strtol(optarg, nullptr, 10); break; + + case 'r': _options.recv_port = strtoul(optarg, nullptr, 10); break; + + case 's': _options.send_port = strtoul(optarg, nullptr, 10); break; + + case 'i': if (nullptr != optarg) strcpy(_options.ip, optarg); break; + + case 'f': _options.sw_flow_control = true; break; + + case 'h': _options.hw_flow_control = true; break; + + case 'v': _options.verbose_debug = true; break; + + case 'n': if (nullptr != optarg) _options.ns = std::string(optarg) + "/"; break; + + default: + usage(argv[0]); + return -1; + } + } + + if (_options.poll_ms < 1) { + _options.poll_ms = 1; + printf("\033[1;33m[ micrortps_agent ]\tPoll timeout too low, using 1 ms\033[0m"); + } + + if (_options.hw_flow_control && _options.sw_flow_control) { + printf("\033[0;31m[ micrortps_agent ]\tHW and SW flow control set. Please set only one or another\033[0m"); + return -1; + } + + return 0; } void signal_handler(int signum) { - printf("\033[1;33m[ micrortps_agent ]\tInterrupt signal (%d) received.\033[0m\n", signum); - running = 0; - transport_node->close(); + printf("\033[1;33m[ micrortps_agent ]\tInterrupt signal (%d) received.\033[0m\n", signum); + running = 0; + transport_node->close(); } @[if recv_topics]@ @@ -182,152 +191,150 @@ std::condition_variable t_send_queue_cv; std::mutex t_send_queue_mutex; std::queue t_send_queue; -void t_send(void*) +void t_send(void *) { - char data_buffer[BUFFER_SIZE] = {}; - uint32_t length = 0; + char data_buffer[BUFFER_SIZE] = {}; + uint32_t length = 0; - while (running && !exit_sender_thread.load()) - { - std::unique_lock lk(t_send_queue_mutex); - while (t_send_queue.empty() && !exit_sender_thread.load()) - { - t_send_queue_cv.wait(lk); - } - uint8_t topic_ID = t_send_queue.front(); - t_send_queue.pop(); - lk.unlock(); + while (running && !exit_sender_thread.load()) { + std::unique_lock lk(t_send_queue_mutex); - size_t header_length = transport_node->get_header_length(); - /* make room for the header to fill in later */ - eprosima::fastcdr::FastBuffer cdrbuffer(&data_buffer[header_length], sizeof(data_buffer)-header_length); - eprosima::fastcdr::Cdr scdr(cdrbuffer); + while (t_send_queue.empty() && !exit_sender_thread.load()) { + t_send_queue_cv.wait(lk); + } - if (topics.getMsg(topic_ID, scdr)) - { - length = scdr.getSerializedDataLength(); - if (0 < (length = transport_node->write(topic_ID, data_buffer, length))) - { - total_sent += length; - ++sent; - } - } - } + uint8_t topic_ID = t_send_queue.front(); + t_send_queue.pop(); + lk.unlock(); + + size_t header_length = transport_node->get_header_length(); + /* make room for the header to fill in later */ + eprosima::fastcdr::FastBuffer cdrbuffer(&data_buffer[header_length], sizeof(data_buffer) - header_length); + eprosima::fastcdr::Cdr scdr(cdrbuffer); + + if (topics.getMsg(topic_ID, scdr)) { + length = scdr.getSerializedDataLength(); + + if (0 < (length = transport_node->write(topic_ID, data_buffer, length))) { + total_sent += length; + ++sent; + } + } + } } @[end if]@ -int main(int argc, char** argv) +int main(int argc, char **argv) { - if (-1 == parse_options(argc, argv)) - { - printf("\033[1;33m[ micrortps_agent ]\tEXITING...\033[0m\n"); - return -1; - } + if (-1 == parse_options(argc, argv)) { + printf("\033[1;33m[ micrortps_agent ]\tEXITING...\033[0m\n"); + return -1; + } - // register signal SIGINT and signal handler - signal(SIGINT, signal_handler); + // register signal SIGINT and signal handler + signal(SIGINT, signal_handler); - printf("\033[0;37m--- MicroRTPS Agent ---\033[0m\n"); - printf("[ micrortps_agent ]\tStarting link...\n"); + printf("\033[0;37m--- MicroRTPS Agent ---\033[0m\n"); + printf("[ micrortps_agent ]\tStarting link...\n"); - const char* localhost_only = std::getenv("ROS_LOCALHOST_ONLY"); - if (localhost_only && strcmp(localhost_only, "1") == 0) { - printf("[ micrortps_agent ]\tUsing only the localhost network for data sharing...\n"); - } + const char *localhost_only = std::getenv("ROS_LOCALHOST_ONLY"); - switch (_options.transport) - { - case options::eTransports::UART: - { - transport_node = new UART_node(_options.device, _options.baudrate, _options.poll_ms, - _options.sw_flow_control, _options.hw_flow_control, _options.verbose_debug); - printf("[ micrortps_agent ]\tUART transport: device: %s; baudrate: %d; sleep: %dus; poll: %dms; flow_control: %s\n", - _options.device, _options.baudrate, _options.sleep_us, _options.poll_ms, - _options.sw_flow_control ? "SW enabled" : (_options.hw_flow_control ? "HW enabled" : "No")); - } - break; - case options::eTransports::UDP: - { - transport_node = new UDP_node(_options.ip, _options.recv_port, _options.send_port, _options.verbose_debug); - printf("[ micrortps_agent ]\tUDP transport: ip address: %s; recv port: %u; send port: %u; sleep: %dus\n", - _options.ip, _options.recv_port, _options.send_port, _options.sleep_us); - } - break; - default: - printf("\033[0;37m[ micrortps_agent ]\tEXITING...\033[0m\n"); - return -1; - } + if (localhost_only && strcmp(localhost_only, "1") == 0) { + printf("[ micrortps_agent ]\tUsing only the localhost network for data sharing...\n"); + } - if (0 > transport_node->init()) - { - printf("\033[0;37m[ micrortps_agent ]\tEXITING...\033[0m\n"); - return -1; - } + switch (_options.transport) { + case options::eTransports::UART: { + transport_node = new UART_node(_options.device, _options.baudrate, _options.poll_ms, + _options.sw_flow_control, _options.hw_flow_control, _options.verbose_debug); + printf("[ micrortps_agent ]\tUART transport: device: %s; baudrate: %d; sleep: %dus; poll: %dms; flow_control: %s\n", + _options.device, _options.baudrate, _options.sleep_us, _options.poll_ms, + _options.sw_flow_control ? "SW enabled" : (_options.hw_flow_control ? "HW enabled" : "No")); + } + break; - sleep(1); + case options::eTransports::UDP: { + transport_node = new UDP_node(_options.ip, _options.recv_port, _options.send_port, _options.verbose_debug); + printf("[ micrortps_agent ]\tUDP transport: ip address: %s; recv port: %u; send port: %u; sleep: %dus\n", + _options.ip, _options.recv_port, _options.send_port, _options.sleep_us); + } + break; + + default: + printf("\033[0;37m[ micrortps_agent ]\tEXITING...\033[0m\n"); + return -1; + } + + if (0 > transport_node->init()) { + printf("\033[0;37m[ micrortps_agent ]\tEXITING...\033[0m\n"); + return -1; + } + + sleep(1); @[if send_topics]@ - char data_buffer[BUFFER_SIZE] = {}; - int received = 0, loop = 0; - int length = 0, total_read = 0; - bool receiving = false; - uint8_t topic_ID = 255; - std::chrono::time_point start, end; + char data_buffer[BUFFER_SIZE] = {}; + int received = 0, loop = 0; + int length = 0, total_read = 0; + bool receiving = false; + uint8_t topic_ID = 255; + std::chrono::time_point start, end; @[end if]@ - // Init timesync - std::shared_ptr timeSync = std::make_shared(_options.verbose_debug); + // Init timesync + std::shared_ptr timeSync = std::make_shared(_options.verbose_debug); - topics.set_timesync(timeSync); + topics.set_timesync(timeSync); @[if recv_topics]@ - topics.init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue, _options.ns); + topics.init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue, _options.ns); @[end if]@ - running = true; + running = true; @[if recv_topics]@ - std::thread sender_thread(t_send, nullptr); + std::thread sender_thread(t_send, nullptr); @[end if]@ - while (running) - { + while (running) { @[if send_topics]@ - ++loop; - if (!receiving) start = std::chrono::steady_clock::now(); - // Publish messages received from UART - while (0 < (length = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) - { - topics.publish(topic_ID, data_buffer, sizeof(data_buffer)); - ++received; - total_read += length; - receiving = true; - end = std::chrono::steady_clock::now(); - } + ++loop; + + if (!receiving) { start = std::chrono::steady_clock::now(); } + + // Publish messages received from UART + while (0 < (length = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) { + topics.publish(topic_ID, data_buffer, sizeof(data_buffer)); + ++received; + total_read += length; + receiving = true; + end = std::chrono::steady_clock::now(); + } + + if ((receiving && std::chrono::duration(std::chrono::steady_clock::now() - end).count() > WAIT_CNST) || + (!running && loop > 1)) { + std::chrono::duration elapsed_secs = end - start; + printf("[ micrortps_agent ]\tSENT: %lumessages \t- %lubytes\n", (unsigned long)sent, (unsigned long)total_sent); + printf("[ micrortps_agent ]\tRECEIVED: %dmessages \t- %dbytes; %d LOOPS - %.03f seconds - %.02fKB/s\n", + received, total_read, loop, elapsed_secs.count(), (double)total_read / (1000 * elapsed_secs.count())); + received = sent = total_read = total_sent = 0; + receiving = false; + } - if ((receiving && std::chrono::duration(std::chrono::steady_clock::now() - end).count() > WAIT_CNST) || - (!running && loop > 1)) - { - std::chrono::duration elapsed_secs = end - start; - printf("[ micrortps_agent ]\tSENT: %lumessages \t- %lubytes\n", (unsigned long)sent, (unsigned long)total_sent); - printf("[ micrortps_agent ]\tRECEIVED: %dmessages \t- %dbytes; %d LOOPS - %.03f seconds - %.02fKB/s\n", - received, total_read, loop, elapsed_secs.count(), (double)total_read/(1000*elapsed_secs.count())); - received = sent = total_read = total_sent = 0; - receiving = false; - } @[else]@ - usleep(_options.sleep_us); + usleep(_options.sleep_us); @[end if]@ - } + } + @[if recv_topics]@ - exit_sender_thread = true; - t_send_queue_cv.notify_one(); - sender_thread.join(); + exit_sender_thread = true; + t_send_queue_cv.notify_one(); + sender_thread.join(); @[end if]@ - delete transport_node; - transport_node = nullptr; + delete transport_node; + transport_node = nullptr; - timeSync->stop(); - timeSync->reset(); + timeSync->stop(); + timeSync->reset(); - return 0; + return 0; } diff --git a/msg/templates/urtps/microRTPS_agent_CMakeLists.txt.em b/msg/templates/urtps/microRTPS_agent_CMakeLists.txt.em index 2f642734f3..e76759380a 100644 --- a/msg/templates/urtps/microRTPS_agent_CMakeLists.txt.em +++ b/msg/templates/urtps/microRTPS_agent_CMakeLists.txt.em @@ -1,7 +1,7 @@ ################################################################################ # # Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima). -# Copyright (c) 2018-2019 PX4 Development Team. All rights reserved. +# Copyright (c) 2018-2021 PX4 Development Team. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: @@ -42,12 +42,12 @@ find_package(fastcdr REQUIRED) include(CheckCXXCompilerFlag) if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_COMPILER_IS_CLANG OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") - check_cxx_compiler_flag(--std=c++14 SUPPORTS_CXX14) - if(SUPPORTS_CXX14) - add_compile_options(--std=c++14) - else() - message(FATAL_ERROR "Compiler doesn't support C++14") - endif() + check_cxx_compiler_flag(--std=c++14 SUPPORTS_CXX14) + if(SUPPORTS_CXX14) + add_compile_options(--std=c++14) + else() + message(FATAL_ERROR "Compiler doesn't support C++14") + endif() endif() file(GLOB MICRORTPS_AGENT_SOURCES src/*.cpp src/*.h) diff --git a/msg/templates/urtps/microRTPS_timesync.cpp.em b/msg/templates/urtps/microRTPS_timesync.cpp.em index ba5d9bab3a..dc4ff60fb5 100644 --- a/msg/templates/urtps/microRTPS_timesync.cpp.em +++ b/msg/templates/urtps/microRTPS_timesync.cpp.em @@ -25,7 +25,7 @@ except AttributeError: }@ /**************************************************************************** * - * Copyright (c) 2020 PX4 Development Team. All rights reserved. + * Copyright (c) 2020-2021 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: @@ -67,18 +67,19 @@ except AttributeError: #include "microRTPS_timesync.h" TimeSync::TimeSync(bool debug) - : _offset_ns(-1), - _skew_ns_per_sync(0.0), - _num_samples(0), - _request_reset_counter(0), - _last_msg_seq(0), - _last_remote_msg_seq(0), - _debug(debug) + : _offset_ns(-1), + _skew_ns_per_sync(0.0), + _num_samples(0), + _request_reset_counter(0), + _last_msg_seq(0), + _last_remote_msg_seq(0), + _debug(debug) { } TimeSync::~TimeSync() { stop(); } -void TimeSync::start(const TimesyncPublisher* pub) { +void TimeSync::start(const TimesyncPublisher *pub) +{ stop(); _timesync_pub = (*pub); @@ -96,28 +97,35 @@ void TimeSync::start(const TimesyncPublisher* pub) { _send_timesync_thread.reset(new std::thread(run)); } -void TimeSync::stop() { +void TimeSync::stop() +{ _request_stop = true; - if (_send_timesync_thread && _send_timesync_thread->joinable()) _send_timesync_thread->join(); + + if (_send_timesync_thread && _send_timesync_thread->joinable()) { _send_timesync_thread->join(); } + _send_timesync_thread.reset(); } -void TimeSync::reset() { +void TimeSync::reset() +{ _num_samples = 0; _request_reset_counter = 0; } -int64_t TimeSync::getTimeNSec() { +int64_t TimeSync::getTimeNSec() +{ auto time = std::chrono::steady_clock::now(); return std::chrono::time_point_cast(time).time_since_epoch().count(); } -int64_t TimeSync::getTimeUSec() { +int64_t TimeSync::getTimeUSec() +{ auto time = std::chrono::steady_clock::now(); return std::chrono::time_point_cast(time).time_since_epoch().count(); } -bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t local_t3_ns) { +bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t local_t3_ns) +{ int64_t rtti = local_t3_ns - local_t1_ns; // assume rtti is evenly split both directions @@ -127,19 +135,23 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t if (_request_reset_counter > REQUEST_RESET_COUNTER_THRESHOLD) { reset(); - if (_debug) std::cout << "\033[1;33m[ micrortps__timesync ]\tTimesync clock changed, resetting\033[0m" << std::endl; + + if (_debug) { std::cout << "\033[1;33m[ micrortps__timesync ]\tTimesync clock changed, resetting\033[0m" << std::endl; } } - if (_num_samples == 0) { - updateOffset(measurement_offset); - _skew_ns_per_sync = 0; - } + if (_num_samples == 0) { + updateOffset(measurement_offset); + _skew_ns_per_sync = 0; + } if (_num_samples >= WINDOW_SIZE) { if (std::abs(measurement_offset - _offset_ns.load()) > TRIGGER_RESET_THRESHOLD_NS) { _request_reset_counter++; - if (_debug) std::cout << "\033[1;33m[ micrortps__timesync ]\tTimesync offset outlier, discarding\033[0m" << std::endl; + + if (_debug) { std::cout << "\033[1;33m[ micrortps__timesync ]\tTimesync offset outlier, discarding\033[0m" << std::endl; } + return false; + } else { _request_reset_counter = 0; } @@ -147,7 +159,8 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t // ignore if rtti > 50ms if (rtti > 50ll * 1000ll * 1000ll) { - if (_debug) std::cout << "\033[1;33m[ micrortps__timesync ]\tRTTI too high for timesync: " << rtti / (1000ll * 1000ll) << "ms\033[0m" << std::endl; + if (_debug) { std::cout << "\033[1;33m[ micrortps__timesync ]\tRTTI too high for timesync: " << rtti / (1000ll * 1000ll) << "ms\033[0m" << std::endl; } + return false; } @@ -165,20 +178,21 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t updateOffset(static_cast((_skew_ns_per_sync + _offset_ns.load()) * (1. - alpha) + measurement_offset * alpha)); _skew_ns_per_sync = - static_cast(beta * (_offset_ns.load() - offset_prev) + (1. - beta) * _skew_ns_per_sync); + static_cast(beta * (_offset_ns.load() - offset_prev) + (1. - beta) * _skew_ns_per_sync); _num_samples++; return true; } -void TimeSync::processTimesyncMsg(timesync_msg_t * msg) { +void TimeSync::processTimesyncMsg(timesync_msg_t *msg) +{ if (getMsgSysID(msg) == 1 && getMsgSeq(msg) != _last_remote_msg_seq) { - _last_remote_msg_seq = getMsgSeq(msg); + _last_remote_msg_seq = getMsgSeq(msg); if (getMsgTC1(msg) > 0) { if (!addMeasurement(getMsgTS1(msg), getMsgTC1(msg), getTimeNSec())) { - if (_debug) std::cerr << "\033[1;33m[ micrortps__timesync ]\tOffset not updated\033[0m" << std::endl; + if (_debug) { std::cerr << "\033[1;33m[ micrortps__timesync ]\tOffset not updated\033[0m" << std::endl; } } } else if (getMsgTC1(msg) == 0) { @@ -192,7 +206,8 @@ void TimeSync::processTimesyncMsg(timesync_msg_t * msg) { } } -timesync_msg_t TimeSync::newTimesyncMsg() { +timesync_msg_t TimeSync::newTimesyncMsg() +{ timesync_msg_t msg{}; setMsgTimestamp(&msg, getTimeUSec()); diff --git a/msg/templates/urtps/microRTPS_timesync.h.em b/msg/templates/urtps/microRTPS_timesync.h.em index 2a3e12bd8d..82847b96c6 100644 --- a/msg/templates/urtps/microRTPS_timesync.h.em +++ b/msg/templates/urtps/microRTPS_timesync.h.em @@ -26,7 +26,7 @@ except AttributeError: }@ /**************************************************************************** * - * Copyright (c) 2020 PX4 Development Team. All rights reserved. + * Copyright (c) 2020-2021 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: @@ -107,7 +107,8 @@ using TimesyncPublisher = Timesync_Publisher; using TimesyncPublisher = timesync_Publisher; @[end if]@ -class TimeSync { +class TimeSync +{ public: TimeSync(bool debug); virtual ~TimeSync(); @@ -116,7 +117,7 @@ public: * @@brief Starts the timesync publishing thread * @@param[in] pub The timesync publisher entity to use */ - void start(const TimesyncPublisher* pub); + void start(const TimesyncPublisher *pub); /** * @@brief Resets the filter @@ -153,7 +154,7 @@ public: * @@brief Processes DDS timesync message * @@param[in,out] msg The timestamp msg to be processed */ - void processTimesyncMsg(timesync_msg_t* msg); + void processTimesyncMsg(timesync_msg_t *msg); /** * @@brief Creates a new timesync DDS message to be sent from the agent to the client @@ -171,13 +172,13 @@ public: * @@brief Sums the time sync offset to the timestamp * @@param[in,out] timestamp The timestamp to add the offset to */ - inline void addOffset(uint64_t& timestamp) { timestamp = (timestamp * 1000LL + _offset_ns.load()) / 1000ULL; } + inline void addOffset(uint64_t ×tamp) { timestamp = (timestamp * 1000LL + _offset_ns.load()) / 1000ULL; } /** * @@brief Substracts the time sync offset to the timestamp * @@param[in,out] timestamp The timestamp to subtract the offset of */ - inline void subtractOffset(uint64_t& timestamp) { timestamp = (timestamp * 1000LL - _offset_ns.load()) / 1000ULL; } + inline void subtractOffset(uint64_t ×tamp) { timestamp = (timestamp * 1000LL - _offset_ns.load()) / 1000ULL; } private: std::atomic _offset_ns; @@ -205,35 +206,35 @@ private: * @@brief Updates the offset of the time sync filter * @@param[in] offset The value of the offset to update to */ - inline void updateOffset(const uint64_t& offset) { _offset_ns.store(offset, std::memory_order_relaxed); } + inline void updateOffset(const uint64_t &offset) { _offset_ns.store(offset, std::memory_order_relaxed); } /** Timesync msg Getters **/ @[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@ - inline uint64_t getMsgTimestamp(const timesync_msg_t* msg) { return msg->timestamp_(); } - inline uint8_t getMsgSysID(const timesync_msg_t* msg) { return msg->sys_id_(); } - inline uint8_t getMsgSeq(const timesync_msg_t* msg) { return msg->seq_(); } - inline int64_t getMsgTC1(const timesync_msg_t* msg) { return msg->tc1_(); } - inline int64_t getMsgTS1(const timesync_msg_t* msg) { return msg->ts1_(); } + inline uint64_t getMsgTimestamp(const timesync_msg_t *msg) { return msg->timestamp_(); } + inline uint8_t getMsgSysID(const timesync_msg_t *msg) { return msg->sys_id_(); } + inline uint8_t getMsgSeq(const timesync_msg_t *msg) { return msg->seq_(); } + inline int64_t getMsgTC1(const timesync_msg_t *msg) { return msg->tc1_(); } + inline int64_t getMsgTS1(const timesync_msg_t *msg) { return msg->ts1_(); } @[elif ros2_distro]@ - inline uint64_t getMsgTimestamp(const timesync_msg_t* msg) { return msg->timestamp(); } - inline uint8_t getMsgSysID(const timesync_msg_t* msg) { return msg->sys_id(); } - inline uint8_t getMsgSeq(const timesync_msg_t* msg) { return msg->seq(); } - inline int64_t getMsgTC1(const timesync_msg_t* msg) { return msg->tc1(); } - inline int64_t getMsgTS1(const timesync_msg_t* msg) { return msg->ts1(); } -@[end if]@ + inline uint64_t getMsgTimestamp(const timesync_msg_t *msg) { return msg->timestamp(); } + inline uint8_t getMsgSysID(const timesync_msg_t *msg) { return msg->sys_id(); } + inline uint8_t getMsgSeq(const timesync_msg_t *msg) { return msg->seq(); } + inline int64_t getMsgTC1(const timesync_msg_t *msg) { return msg->tc1(); } + inline int64_t getMsgTS1(const timesync_msg_t *msg) { return msg->ts1(); } + @[end if]@ /** Timesync msg Setters **/ @[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@ - inline void setMsgTimestamp(timesync_msg_t* msg, const uint64_t& timestamp) { msg->timestamp_() = timestamp; } - inline void setMsgSysID(timesync_msg_t* msg, const uint8_t& sys_id) { msg->sys_id_() = sys_id; } - inline void setMsgSeq(timesync_msg_t* msg, const uint8_t& seq) { msg->seq_() = seq; } - inline void setMsgTC1(timesync_msg_t* msg, const int64_t& tc1) { msg->tc1_() = tc1; } - inline void setMsgTS1(timesync_msg_t* msg, const int64_t& ts1) { msg->ts1_() = ts1; } + inline void setMsgTimestamp(timesync_msg_t *msg, const uint64_t ×tamp) { msg->timestamp_() = timestamp; } + inline void setMsgSysID(timesync_msg_t *msg, const uint8_t &sys_id) { msg->sys_id_() = sys_id; } + inline void setMsgSeq(timesync_msg_t *msg, const uint8_t &seq) { msg->seq_() = seq; } + inline void setMsgTC1(timesync_msg_t *msg, const int64_t &tc1) { msg->tc1_() = tc1; } + inline void setMsgTS1(timesync_msg_t *msg, const int64_t &ts1) { msg->ts1_() = ts1; } @[elif ros2_distro]@ - inline void setMsgTimestamp(timesync_msg_t* msg, const uint64_t& timestamp) { msg->timestamp() = timestamp; } - inline void setMsgSysID(timesync_msg_t* msg, const uint8_t& sys_id) { msg->sys_id() = sys_id; } - inline void setMsgSeq(timesync_msg_t* msg, const uint8_t& seq) { msg->seq() = seq; } - inline void setMsgTC1(timesync_msg_t* msg, const int64_t& tc1) { msg->tc1() = tc1; } - inline void setMsgTS1(timesync_msg_t* msg, const int64_t& ts1) { msg->ts1() = ts1; } + inline void setMsgTimestamp(timesync_msg_t *msg, const uint64_t ×tamp) { msg->timestamp() = timestamp; } + inline void setMsgSysID(timesync_msg_t *msg, const uint8_t &sys_id) { msg->sys_id() = sys_id; } + inline void setMsgSeq(timesync_msg_t *msg, const uint8_t &seq) { msg->seq() = seq; } + inline void setMsgTC1(timesync_msg_t *msg, const int64_t &tc1) { msg->tc1() = tc1; } + inline void setMsgTS1(timesync_msg_t *msg, const int64_t &ts1) { msg->ts1() = ts1; } @[end if]@ }; diff --git a/msg/templates/urtps/microRTPS_transport.cpp b/msg/templates/urtps/microRTPS_transport.cpp index c5a919a75a..7d7f7c9b88 100644 --- a/msg/templates/urtps/microRTPS_transport.cpp +++ b/msg/templates/urtps/microRTPS_transport.cpp @@ -1,7 +1,7 @@ /**************************************************************************** * * Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima). - * Copyright (c) 2018-2019 PX4 Development Team. All rights reserved. + * Copyright (c) 2018-2021 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: @@ -80,8 +80,8 @@ uint16_t const crc16_table[256] = { }; Transport_node::Transport_node(const bool _debug): - rx_buff_pos(0), - debug(_debug) + rx_buff_pos(0), + debug(_debug) { } @@ -120,9 +120,13 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer if (errsv && EAGAIN != errsv && ETIMEDOUT != errsv) { #ifndef PX4_DEBUG - if (debug) printf("\033[0;31m[ micrortps_transport ]\tRead fail %d\033[0m\n", errsv); + + if (debug) { printf("\033[0;31m[ micrortps_transport ]\tRead fail %d\033[0m\n", errsv); } + #else - if (debug) PX4_DEBUG("Read fail %d", errsv); + + if (debug) { PX4_DEBUG("Read fail %d", errsv); } + #endif /* PX4_DEBUG */ } @@ -150,9 +154,13 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer // Start not found if (msg_start_pos > (rx_buff_pos - header_size)) { #ifndef PX4_DEBUG - if (debug) printf("\033[1;33m[ micrortps_transport ]\t (↓↓ %" PRIu32 ")\033[0m\n", msg_start_pos); + + if (debug) { printf("\033[1;33m[ micrortps_transport ]\t (↓↓ %" PRIu32 ")\033[0m\n", msg_start_pos); } + #else - if (debug) PX4_DEBUG(" (↓↓ %" PRIu32 ")", msg_start_pos); + + if (debug) { PX4_DEBUG(" (↓↓ %" PRIu32 ")", msg_start_pos); } + #endif /* PX4_DEBUG */ // All we've checked so far is garbage, drop it - but save unchecked bytes @@ -178,9 +186,13 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer // If there's garbage at the beginning, drop it if (msg_start_pos > 0) { #ifndef PX4_DEBUG - if (debug) printf("\033[1;33m[ micrortps_transport ]\t (↓ %" PRIu32 ")\033[0m\n", msg_start_pos); + + if (debug) { printf("\033[1;33m[ micrortps_transport ]\t (↓ %" PRIu32 ")\033[0m\n", msg_start_pos); } + #else - if (debug) PX4_DEBUG(" (↓ %" PRIu32 ")", msg_start_pos); + + if (debug) { PX4_DEBUG(" (↓ %" PRIu32 ")", msg_start_pos); } + #endif /* PX4_DEBUG */ memmove(rx_buffer, rx_buffer + msg_start_pos, rx_buff_pos - msg_start_pos); rx_buff_pos -= msg_start_pos; @@ -194,9 +206,13 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer if (read_crc != calc_crc) { #ifndef PX4_DEBUG - if (debug) printf("\033[0;31m[ micrortps_transport ]\tBad CRC %" PRIu16 " != %" PRIu16 "\t\t(↓ %lu)\033[0m\n", read_crc, calc_crc, (unsigned long)(header_size + payload_len)); + + if (debug) { printf("\033[0;31m[ micrortps_transport ]\tBad CRC %" PRIu16 " != %" PRIu16 "\t\t(↓ %lu)\033[0m\n", read_crc, calc_crc, (unsigned long)(header_size + payload_len)); } + #else - if (debug) PX4_DEBUG("Bad CRC %u != %u\t\t(↓ %lu)", read_crc, calc_crc, (unsigned long)(header_size + payload_len)); + + if (debug) { PX4_DEBUG("Bad CRC %u != %u\t\t(↓ %lu)", read_crc, calc_crc, (unsigned long)(header_size + payload_len)); } + #endif /* PX4_DEBUG */ // Drop garbage up just beyond the start of the message @@ -223,7 +239,7 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer size_t Transport_node::get_header_length() { - return sizeof(struct Header); + return sizeof(struct Header); } ssize_t Transport_node::write(const uint8_t topic_ID, char buffer[], size_t length) @@ -248,15 +264,17 @@ ssize_t Transport_node::write(const uint8_t topic_ID, char buffer[], size_t leng /* Fill in the header in the same payload buffer to call a single node_write */ memcpy(buffer, &header, sizeof(header)); ssize_t len = node_write(buffer, length + sizeof(header)); + if (len != ssize_t(length + sizeof(header))) { return len; } + return len + sizeof(header); } UART_node::UART_node(const char *_uart_name, const uint32_t _baudrate, - const uint32_t _poll_ms, const bool _hw_flow_control, - const bool _sw_flow_control, const bool _debug): + const uint32_t _poll_ms, const bool _hw_flow_control, + const bool _sw_flow_control, const bool _debug): Transport_node(_debug), uart_fd(-1), baudrate(_baudrate), @@ -302,7 +320,8 @@ int UART_node::init() if ((termios_state = tcgetattr(uart_fd, &uart_config)) < 0) { int errno_bkp = errno; #ifndef PX4_ERR - printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR GET CONF %s: %d (%d)\n\033[0m", uart_name, termios_state, errno); + printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR GET CONF %s: %d (%d)\n\033[0m", uart_name, termios_state, + errno); #else PX4_ERR("UART transport: ERR GET CONF %s: %d (%d)", uart_name, termios_state, errno); #endif /* PX4_ERR */ @@ -326,6 +345,7 @@ int UART_node::init() if (hw_flow_control) { // HW flow control uart_config.c_lflag |= CRTSCTS; + } else if (sw_flow_control) { // SW flow control uart_config.c_lflag |= (IXON | IXOFF | IXANY); @@ -337,7 +357,7 @@ int UART_node::init() if (!baudrate_to_speed(baudrate, &speed)) { #ifndef PX4_ERR printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET BAUD %s: Unsupported baudrate: %d\n\tsupported examples:\n\t9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, 921600, 1000000\033[0m\n", - uart_name, baudrate); + uart_name, baudrate); #else PX4_ERR("UART transport: ERR SET BAUD %s: Unsupported baudrate: %" PRIu32 "\n\tsupported examples:\n\t9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, 921600, 1000000\n", uart_name, baudrate); @@ -349,7 +369,8 @@ int UART_node::init() if (cfsetispeed(&uart_config, speed) < 0 || cfsetospeed(&uart_config, speed) < 0) { int errno_bkp = errno; #ifndef PX4_ERR - printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET BAUD %s: %d (%d)\033[0m\n", uart_name, termios_state, errno); + printf("\033[0;31m[ micrortps_transport ]\tUART transport: ERR SET BAUD %s: %d (%d)\033[0m\n", uart_name, termios_state, + errno); #else PX4_ERR("ERR SET BAUD %s: %d (%d)", uart_name, termios_state, errno); #endif /* PX4_ERR */ @@ -373,10 +394,10 @@ int UART_node::init() while (0 < ::read(uart_fd, (void *)&aux, 64)) { flush = true; -/** - * According to px4_time.h, px4_usleep() is only defined when lockstep is set - * to be used - */ + /** + * According to px4_time.h, px4_usleep() is only defined when lockstep is set + * to be used + */ #ifndef px4_usleep usleep(1000); #else @@ -386,15 +407,24 @@ int UART_node::init() if (flush) { #ifndef PX4_DEBUG - if (debug) printf("[ micrortps_transport ]\tUART transport: Flush\n"); + + if (debug) { printf("[ micrortps_transport ]\tUART transport: Flush\n"); } + #else - if (debug) PX4_DEBUG("UART transport: Flush"); + + if (debug) { PX4_DEBUG("UART transport: Flush"); } + #endif /* PX4_DEBUG */ + } else { #ifndef PX4_DEBUG - if (debug) printf("[ micrortps_transport ]\tUART transport: No flush\n"); + + if (debug) { printf("[ micrortps_transport ]\tUART transport: No flush\n"); } + #else - if (debug) PX4_DEBUG("UART transport: No flush"); + + if (debug) { PX4_DEBUG("UART transport: No flush"); } + #endif /* PX4_INFO */ } @@ -549,17 +579,17 @@ bool UART_node::baudrate_to_speed(uint32_t bauds, speed_t *speed) return true; } -UDP_node::UDP_node(const char* _udp_ip, uint16_t _udp_port_recv, - uint16_t _udp_port_send, const bool _debug): +UDP_node::UDP_node(const char *_udp_ip, uint16_t _udp_port_recv, + uint16_t _udp_port_send, const bool _debug): Transport_node(_debug), sender_fd(-1), receiver_fd(-1), udp_port_recv(_udp_port_recv), udp_port_send(_udp_port_send) { - if (nullptr != _udp_ip) { - strcpy(udp_ip, _udp_ip); - } + if (nullptr != _udp_ip) { + strcpy(udp_ip, _udp_ip); + } } UDP_node::~UDP_node() @@ -598,6 +628,7 @@ int UDP_node::init_receiver(uint16_t udp_port) #endif /* PX4_ERR */ return -1; } + #ifndef PX4_INFO printf("[ micrortps_transport ]\tUDP transport: Trying to connect..."); #else @@ -612,6 +643,7 @@ int UDP_node::init_receiver(uint16_t udp_port) #endif /* PX4_ERR */ return -1; } + #ifndef PX4_INFO printf("[ micrortps_transport ]\tUDP transport: Connected to server!\n\n"); #else diff --git a/msg/templates/urtps/microRTPS_transport.h b/msg/templates/urtps/microRTPS_transport.h index 784221050d..ac6351a7c3 100644 --- a/msg/templates/urtps/microRTPS_transport.h +++ b/msg/templates/urtps/microRTPS_transport.h @@ -1,7 +1,7 @@ /**************************************************************************** * * Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima). - * Copyright (c) 2018-2019 PX4 Development Team. All rights reserved. + * Copyright (c) 2018-2021 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: @@ -97,8 +97,8 @@ class UART_node: public Transport_node { public: UART_node(const char *_uart_name, const uint32_t _baudrate, - const uint32_t _poll_ms, const bool _hw_flow_control, - const bool _sw_flow_control, const bool _debug); + const uint32_t _poll_ms, const bool _hw_flow_control, + const bool _sw_flow_control, const bool _debug); virtual ~UART_node(); int init(); @@ -122,8 +122,8 @@ protected: class UDP_node: public Transport_node { public: - UDP_node(const char* _udp_ip, uint16_t udp_port_recv, uint16_t udp_port_send, - const bool _debug); + UDP_node(const char *_udp_ip, uint16_t udp_port_recv, uint16_t udp_port_send, + const bool _debug); virtual ~UDP_node(); int init();