From 62a3e07423549a1c4210c790c58f5c155295d470 Mon Sep 17 00:00:00 2001 From: Mark Charlebois Date: Tue, 21 Jun 2016 10:39:30 -0700 Subject: [PATCH] orb_exists support for muorb Also added builtin command wait_for_topic Signed-off-by: Mark Charlebois --- src/firmware/qurt/px4muorb.idl | 15 +- src/modules/muorb/adsp/px4muorb.cpp | 29 ++ src/modules/muorb/adsp/px4muorb.hpp | 4 + src/modules/muorb/adsp/uORBFastRpcChannel.cpp | 257 ++++++++++++++---- src/modules/muorb/adsp/uORBFastRpcChannel.hpp | 35 ++- .../muorb/krait/px4muorb_KraitRpcWrapper.cpp | 10 + .../muorb/krait/px4muorb_KraitRpcWrapper.hpp | 2 + .../muorb/krait/uORBKraitFastRpcChannel.cpp | 32 ++- .../muorb/krait/uORBKraitFastRpcChannel.hpp | 28 ++ src/modules/sdlog2/sdlog2.c | 9 - src/modules/uORB/uORBCommunicator.hpp | 42 +++ src/modules/uORB/uORBDevices.cpp | 20 ++ src/modules/uORB/uORBDevices.hpp | 3 + src/modules/uORB/uORBManager.cpp | 28 +- src/modules/uORB/uORBManager.hpp | 15 + 15 files changed, 462 insertions(+), 67 deletions(-) diff --git a/src/firmware/qurt/px4muorb.idl b/src/firmware/qurt/px4muorb.idl index e87492fb3e..c8872c09e3 100644 --- a/src/firmware/qurt/px4muorb.idl +++ b/src/firmware/qurt/px4muorb.idl @@ -72,6 +72,18 @@ interface px4muorb{ */ AEEResult param_update_value_from_shmem( in unsigned long param, rout sequence value); + /** + * Interface called from krait to inform of a published topic. + * @param topic_name: name of the topic being advertised + */ + AEEResult topic_advertised(in string topic_name); + + /** + * Interface called from krait to inform of a published topic. + * @param topic_name: name of the topic being advertised + */ + AEEResult topic_unadvertised(in string topic_name); + /** * Interface to add a subscriber to the identified topic * @param topic_name @@ -125,6 +137,7 @@ interface px4muorb{ * 1 ==> add_subscriber * 2 ==> remove_subscriber * 3 ==> recieve_topic_data + * 4 ==> advertised_topic * @note: for message types, 1 & 2, the data pointer returned is null with length of 0. * @param topic_name * The topic being returned. @@ -141,7 +154,7 @@ interface px4muorb{ /** * Since the receive_msg is a blocking call, the client will not be able to peform a clean shutdown. Calling this * function will unblock the receive msg an return an error code. - * Ideally this should be implemented as a time for the receive msg call. + * Ideally this should be implemented as a timeout for the #receive_msg call. * @param none * @return status * 0 = success diff --git a/src/modules/muorb/adsp/px4muorb.cpp b/src/modules/muorb/adsp/px4muorb.cpp index 4e25989b17..2a0b943081 100644 --- a/src/modules/muorb/adsp/px4muorb.cpp +++ b/src/modules/muorb/adsp/px4muorb.cpp @@ -158,6 +158,35 @@ int px4muorb_param_update_value_from_shmem(uint32_t param, const uint8_t *value, release_shmem_lock(__FILE__, __LINE__); return 0; + +int px4muorb_topic_advertised(const char *topic_name) +{ + int rc = 0; + PX4_INFO("TEST px4muorb_topic_advertised of [%s] on remote side...", topic_name); + uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance(); + uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler(); + + if (rxHandler != nullptr) { + rc = rxHandler->process_remote_topic(topic_name, 1); + } else { + rc = -1; + } + return rc; +} + +int px4muorb_topic_unadvertised(const char *topic_name) +{ + int rc = 0; + PX4_INFO("TEST px4muorb_topic_unadvertised of [%s] on remote side...", topic_name); + uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance(); + uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler(); + + if (rxHandler != nullptr) { + rc = rxHandler->process_remote_topic(topic_name, 0); + } else { + rc = -1; + } + return rc; } int px4muorb_add_subscriber(const char *name) diff --git a/src/modules/muorb/adsp/px4muorb.hpp b/src/modules/muorb/adsp/px4muorb.hpp index 36d5f98ca5..e7c1ca7699 100644 --- a/src/modules/muorb/adsp/px4muorb.hpp +++ b/src/modules/muorb/adsp/px4muorb.hpp @@ -49,6 +49,10 @@ extern "C" { int px4muorb_param_update_value_from_shmem(uint32_t param, const uint8_t *value, int data_len_in_bytes) __EXPORT; + int px4muorb_topic_advertised(const char *name) __EXPORT; + + int px4muorb_topic_unadvertised(const char *name) __EXPORT; + int px4muorb_add_subscriber(const char *name) __EXPORT; int px4muorb_remove_subscriber(const char *name) __EXPORT; diff --git a/src/modules/muorb/adsp/uORBFastRpcChannel.cpp b/src/modules/muorb/adsp/uORBFastRpcChannel.cpp index 08266a3f07..f730316a19 100644 --- a/src/modules/muorb/adsp/uORBFastRpcChannel.cpp +++ b/src/modules/muorb/adsp/uORBFastRpcChannel.cpp @@ -68,6 +68,85 @@ uORB::FastRpcChannel::FastRpcChannel() _RemoteSubscribers.clear(); } +//============================================================================== +//============================================================================== +int16_t uORB::FastRpcChannel::topic_advertised(const char *messageName) +{ + int16_t rc = 0; + hrt_abstime t1, t2; + static hrt_abstime check_time = 0; + + PX4_DEBUG("=========publish topic[%s] to remote...", messageName); + + t1 = hrt_absolute_time(); + _QueueMutex.lock(); + bool overwriteData = false; + + if (IsControlQFull()) { + // queue is full. Overwrite the oldest data. + //PX4_WARN("[topic_advertised] Queue Full Overwrite the oldest data. in[%ld] out[%ld] max[%ld]", + // _ControlQInIndex, _ControlQOutIndex, _MAX_MSG_QUEUE_SIZE); + _ControlQOutIndex++; + + if (_ControlQOutIndex == _MAX_MSG_QUEUE_SIZE) { + _ControlQOutIndex = 0; + } + + overwriteData = true; + _dropped_pkts++; + } + + _ControlMsgQueue[ _ControlQInIndex ]._Type = _CONTROL_MSG_TYPE_ADVERTISE; + _ControlMsgQueue[ _ControlQInIndex ]._MsgName = messageName; + + _ControlQInIndex++; + + if (_ControlQInIndex == _MAX_MSG_QUEUE_SIZE) { + _ControlQInIndex = 0; + } + + // the assumption here is that each caller reads only one data from either control or data queue. + //if (!overwriteData) { + if (ControlQSize() == 1 && DataQSize() == 0) { // post it only of the queue moves from empty to available. + _DataAvailableSemaphore.post(); + } + + if ((unsigned long)ControlQSize() < _min_q) { _min_q = (unsigned long)ControlQSize(); } + + if ((unsigned long)ControlQSize() > _max_q) { _max_q = (unsigned long)ControlQSize(); } + + _count++; + _avg_q = ((double)((_avg_q * (_count - 1)) + (unsigned long)(ControlQSize()))) / (double)(_count); + + _QueueMutex.unlock(); + t2 = hrt_absolute_time(); + + if ((unsigned long)(t2 - check_time) > 10000000) { + //PX4_DEBUG("MsgName: %20s, t1: %lu, t2: %lu, dt: %lu",messageName, (unsigned long) t1, (unsigned long) t2, (unsigned long) (t2-t1)); + //PX4_DEBUG("Q. Stats: min: %lu, max : %lu, avg: %lu count: %lu ", _min_q, _max_q, (unsigned long)(_avg_q * 1000.0), _count); + _min_q = _MAX_MSG_QUEUE_SIZE * 2; + _max_q = 0; + _avg_q = 0; + _count = 0; + check_time = t2; + } + + return rc; +} + +//============================================================================== +//============================================================================== +/* +//TODO: verify if needed +int16_t uORB::FastRpcChannel::topic_unadvertised(const char *messageName) +{ + int16_t rc = 0; + + PX4_DEBUG("=========unpublish topic[%s] to remote...", messageName); + return rc; +} +*/ + //============================================================================== //============================================================================== int16_t uORB::FastRpcChannel::add_subscription(const char *messageName, int32_t msgRateInHz) @@ -176,7 +255,7 @@ int16_t uORB::FastRpcChannel::send_message(const char *messageName, int32_t leng // the assumption here is that each caller reads only one data from either control or data queue. //if (!overwriteData) { - if (DataQSize() == 1) { // post it only of the queue moves from empty to available. + if (DataQSize() == 1 && ControlQSize() == 0) { // post it only of the queue moves from empty to available. _DataAvailableSemaphore.post(); } @@ -403,54 +482,103 @@ int16_t uORB::FastRpcChannel::get_bulk_data *topic_count = 0; int32_t topic_count_to_return = 0; - if (DataQSize() != 0) { - //PX4_DEBUG( "get_bulk_data: QSize: %d", DataQSize() ); - topic_count_to_return = DataQSize(); + if (DataQSize() != 0 || ControlQSize() != 0) { + if (DataQSize() != 0) { + //PX4_DEBUG( "get_bulk_data: QSize: %d", DataQSize() ); + topic_count_to_return = DataQSize(); - while (DataQSize() != 0) { - // this is a hack as we are using a counting semaphore. Should be re-implemented with cond_variable and wait. - //_DataAvailableSemaphore.wait(); - if (get_data_msg_size_at(_DataQOutIndex) < (max_buffer_in_bytes - bytes_copied)) { - // there is enough space in the buffer, copy the data. - //PX4_DEBUG( "Coping Data to buffer..." ); - copy_result = copy_data_to_buffer(_DataQOutIndex, buffer, bytes_copied, max_buffer_in_bytes); + while (DataQSize() != 0) { + // this is a hack as we are using a counting semaphore. Should be re-implemented with cond_variable and wait. + //_DataAvailableSemaphore.wait(); + if (get_msg_size_at(true, _DataQOutIndex) < (max_buffer_in_bytes - bytes_copied)) { + // there is enough space in the buffer, copy the data. + //PX4_DEBUG( "Coping Data to buffer..." ); + copy_result = copy_msg_to_buffer(true, _DataQOutIndex, buffer, bytes_copied, max_buffer_in_bytes); - if (copy_result == -1) { + if (copy_result == -1) { + if (bytes_copied == 0) { + rc = -1; + } + + break; + + } else { + //PX4_DEBUG( "[%d] %02x %02x %02x %02x", *topic_count,\ + // buffer[bytes_copied], \ + // buffer[bytes_copied+1], \ + // buffer[bytes_copied+2], \ + // buffer[bytes_copied+3] ); + bytes_copied += copy_result; + (*topic_count)++; + *returned_bytes = bytes_copied; + _DataQOutIndex++; + + if (_DataQOutIndex == _MAX_MSG_QUEUE_SIZE) { + _DataQOutIndex = 0; + } + } + + } else { if (bytes_copied == 0) { rc = -1; + PX4_WARN("ERROR: Insufficent space in data buffer, no topics returned"); + + } else { + PX4_DEBUG("Exiting out of the while loop..."); } break; - - } else { - //PX4_DEBUG( "[%d] %02x %02x %02x %02x", *topic_count,\ - // buffer[bytes_copied], \ - // buffer[bytes_copied+1], \ - // buffer[bytes_copied+2], \ - // buffer[bytes_copied+3] ); - bytes_copied += copy_result; - (*topic_count)++; - *returned_bytes = bytes_copied; - _DataQOutIndex++; - - if (_DataQOutIndex == _MAX_MSG_QUEUE_SIZE) { - _DataQOutIndex = 0; - } } - - } else { - if (bytes_copied == 0) { - rc = -1; - PX4_WARN("ERROR: Insufficent space in data buffer, no topics returned"); - - } else { - PX4_DEBUG("Exiting out of the while loop..."); - } - - break; } } + if (ControlQSize() != 0) { + //PX4_DEBUG( "get_bulk_data: QSize: %d", ControlQSize() ); + topic_count_to_return += ControlQSize(); + while (ControlQSize() != 0) { + // this is a hack as we are using a counting semaphore. Should be re-implemented with cond_variable and wait. + //_DataAvailableSemaphore.wait(); + if (get_msg_size_at(false, _ControlQOutIndex) < (max_buffer_in_bytes - bytes_copied)) { + // there is enough space in the buffer, copy the data. + //PX4_DEBUG( "Coping Control msg to buffer..." ); + copy_result = copy_msg_to_buffer(false, _ControlQOutIndex, buffer, bytes_copied, max_buffer_in_bytes); + + if (copy_result == -1) { + if (bytes_copied == 0) { + rc = -1; + } + + break; + + } else { + //PX4_DEBUG( "[%d] %02x %02x %02x %02x", *topic_count,\ + // buffer[bytes_copied], \ + // buffer[bytes_copied+1], \ + // buffer[bytes_copied+2], \ + // buffer[bytes_copied+3] ); + bytes_copied += copy_result; + (*topic_count)++; + *returned_bytes = bytes_copied; + _ControlQOutIndex++; + + if (_ControlQOutIndex == _MAX_MSG_QUEUE_SIZE) { + _ControlQOutIndex = 0; + } + } + + } else { + if (bytes_copied == 0) { + rc = -1; + PX4_WARN("ERROR: Insufficent space in data buffer, no topics returned"); + + } else { + PX4_DEBUG("Exiting out of the while loop..."); + } + + break; + } + } + } } else { PX4_ERR("[get_data_bulk] Error: Semaphore is up when there is no data on the control/data queues"); rc = -1; @@ -488,20 +616,22 @@ int16_t uORB::FastRpcChannel::get_bulk_data return rc; } - -int32_t uORB::FastRpcChannel::get_data_msg_size_at(int32_t index) +int32_t uORB::FastRpcChannel::get_msg_size_at(bool isData, int32_t index) { // the assumption here is that this is called within the context of semaphore, // hence lock/unlock is not needed. int32_t rc = 0; - rc += _DataMsgQueue[ index ]._Length; - rc += _DataMsgQueue[ index ]._MsgName.size() + 1; + if (isData) { + rc += _DataMsgQueue[ index ]._Length; + rc += _DataMsgQueue[ index ]._MsgName.size() + 1; + } else { + rc += _ControlMsgQueue[ index ]._MsgName.size() + 1; + } rc += _PACKET_HEADER_SIZE; return rc; } - -int32_t uORB::FastRpcChannel::copy_data_to_buffer(int32_t src_index, uint8_t *dst_buffer, int32_t offset, +int32_t uORB::FastRpcChannel::copy_msg_to_buffer(bool isData, int32_t src_index, uint8_t *dst_buffer, int32_t offset, int32_t dst_buffer_len) { int32_t rc = -1; @@ -510,12 +640,19 @@ int32_t uORB::FastRpcChannel::copy_data_to_buffer(int32_t src_index, uint8_t *ds // * sem_lock is acquired for data protection // * the dst_buffer is validated to + uint16_t msg_size = (isData ? + (uint16_t)(_DataMsgQueue[ src_index ]._MsgName.size()) : + (uint16_t)(_ControlMsgQueue[ src_index ]._MsgName.size())); + // compute the different offsets to pack the packets. int32_t field_header_offset = offset; int32_t field_topic_name_offset = field_header_offset + sizeof(struct BulkTransferHeader); - int32_t field_data_offset = field_topic_name_offset + _DataMsgQueue[ src_index ]._MsgName.size() + 1; + int32_t field_data_offset = field_topic_name_offset + msg_size + 1; - struct BulkTransferHeader header = { (uint16_t)(_DataMsgQueue[ src_index ]._MsgName.size() + 1), (uint16_t)(_DataMsgQueue[ src_index ]._Length) }; + int16_t msg_type = isData ? _DATA_MSG_TYPE : _ControlMsgQueue[ src_index ]._Type; + + struct BulkTransferHeader header = { (uint16_t)msg_type,( uint16_t)(msg_size + 1), + (uint16_t)(isData ? (_DataMsgQueue[ src_index ]._Length) : 0) }; //PX4_DEBUG( "Offsets: header[%d] name[%d] data[%d]", @@ -523,7 +660,7 @@ int32_t uORB::FastRpcChannel::copy_data_to_buffer(int32_t src_index, uint8_t *ds // field_topic_name_offset, // field_data_offset ); - if ((field_data_offset + _DataMsgQueue[ src_index ]._Length) < dst_buffer_len) { + if (isData && (field_data_offset + _DataMsgQueue[ src_index ]._Length) < dst_buffer_len) { memmove(&(dst_buffer[field_header_offset]), (char *)(&header), sizeof(header)); // pack the data here. memmove @@ -541,10 +678,32 @@ int32_t uORB::FastRpcChannel::copy_data_to_buffer(int32_t src_index, uint8_t *ds memmove(&(dst_buffer[field_data_offset]), _DataMsgQueue[ src_index ]._Buffer, _DataMsgQueue[ src_index ]._Length); rc = field_data_offset + _DataMsgQueue[ src_index ]._Length - offset; + } else if (field_data_offset < dst_buffer_len) { //This is a control message + memmove(&(dst_buffer[field_header_offset]), (char *)(&header), sizeof(header)); + // pack the data here. + memmove + ( + &(dst_buffer[field_topic_name_offset]), + _ControlMsgQueue[ src_index ]._MsgName.c_str(), + _ControlMsgQueue[ src_index ]._MsgName.size() + ); + + if (_ControlMsgQueue[ src_index ]._MsgName.size() == 0) { + PX4_WARN("########## Error MsgName cannot be zero: "); + } + + dst_buffer[ field_topic_name_offset + _ControlMsgQueue[ src_index ]._MsgName.size()] = '\0'; + rc = field_data_offset - offset; + } else { - PX4_WARN("Error coping the DataMsg to dst buffer, insuffienct space. "); - PX4_WARN("... offset[%ld] len[%ld] data_msg_len[%ld]", + PX4_WARN("Error coping the Msg to dst buffer, insuffienct space. "); + if (isData) { + PX4_WARN("Data... offset[%ld] len[%ld] data_msg_len[%ld]", offset, dst_buffer_len, (field_data_offset - offset) + _DataMsgQueue[ src_index ]._Length); + } else { + PX4_WARN("ControlMsg... offset[%ld] len[%ld]", + offset, dst_buffer_len, (field_data_offset - offset)); + } } return rc; diff --git a/src/modules/muorb/adsp/uORBFastRpcChannel.hpp b/src/modules/muorb/adsp/uORBFastRpcChannel.hpp index f300b155a0..194e4936bd 100644 --- a/src/modules/muorb/adsp/uORBFastRpcChannel.hpp +++ b/src/modules/muorb/adsp/uORBFastRpcChannel.hpp @@ -57,6 +57,33 @@ public: return &(_Instance); } + /** + * @brief Interface to notify the remote entity of a topic being advertised. + * + * @param messageName + * This represents the uORB message name(aka topic); This message name should be + * globally unique. + * @return + * 0 = success; This means the messages is successfully sent to the receiver + * Note: This does not mean that the receiver as received it. + * otherwise = failure. + */ + virtual int16_t topic_advertised(const char *messageName); + + /** + * @brief Interface to notify the remote entity of a topic being unadvertised + * and is no longer publishing messages. + * + * @param messageName + * This represents the uORB message name(aka topic); This message name should be + * globally unique. + * @return + * 0 = success; This means the messages is successfully sent to the receiver + * Note: This does not mean that the receiver as received it. + * otherwise = failure. + */ + //virtual int16_t topic_unadvertised(const char *messageName); + /** * @brief Interface to notify the remote entity of interest of a * subscription for a message. @@ -155,10 +182,11 @@ private: // data members static const int32_t _CONTROL_MSG_TYPE_ADD_SUBSCRIBER = 1; static const int32_t _CONTROL_MSG_TYPE_REMOVE_SUBSCRIBER = 2; static const int32_t _DATA_MSG_TYPE = 3; + static const int32_t _CONTROL_MSG_TYPE_ADVERTISE = 4; static const int32_t _PACKET_FIELD_TOPIC_NAME_LEN_SIZE_IN_BYTES = 2; static const int32_t _PACKET_FIELD_DATA_LEN_IN_BYTES = 2; - static const int32_t _PACKET_HEADER_SIZE = + static const int32_t _PACKET_HEADER_SIZE = 1 + //first byte is the MSG Type _PACKET_FIELD_TOPIC_NAME_LEN_SIZE_IN_BYTES + _PACKET_FIELD_DATA_LEN_IN_BYTES; struct FastRpcDataMsg { @@ -174,6 +202,7 @@ private: // data members }; struct BulkTransferHeader { + uint16_t _MsgType; uint16_t _MsgNameLen; uint16_t _DataLen; }; @@ -261,8 +290,8 @@ private://class members. int32_t DataQSize(); int32_t ControlQSize(); - int32_t get_data_msg_size_at(int32_t index); - int32_t copy_data_to_buffer(int32_t src_index, uint8_t *dst_buffer, int32_t offset, int32_t dst_buffer_len); + int32_t get_msg_size_at(bool isData, int32_t index); + int32_t copy_msg_to_buffer(bool isData, int32_t src_index, uint8_t *dst_buffer, int32_t offset, int32_t dst_buffer_len); std::set _RemoteSubscribers; }; diff --git a/src/modules/muorb/krait/px4muorb_KraitRpcWrapper.cpp b/src/modules/muorb/krait/px4muorb_KraitRpcWrapper.cpp index abbf4f3f8f..b156992f94 100644 --- a/src/modules/muorb/krait/px4muorb_KraitRpcWrapper.cpp +++ b/src/modules/muorb/krait/px4muorb_KraitRpcWrapper.cpp @@ -278,6 +278,16 @@ bool px4muorb::KraitRpcWrapper::Terminate() return true; } +int32_t px4muorb::KraitRpcWrapper::TopicAdvertised(const char *topic) +{ + return ((_Initialized) ? px4muorb_topic_advertised(topic) : -1); +} + +int32_t px4muorb::KraitRpcWrapper::TopicUnadvertised(const char *topic) +{ + return ((_Initialized) ? px4muorb_topic_unadvertised(topic) : -1); +} + int32_t px4muorb::KraitRpcWrapper::AddSubscriber(const char *topic) { return ((_Initialized) ? px4muorb_add_subscriber(topic) : -1); diff --git a/src/modules/muorb/krait/px4muorb_KraitRpcWrapper.hpp b/src/modules/muorb/krait/px4muorb_KraitRpcWrapper.hpp index a22c00fbf8..9a931ec02b 100644 --- a/src/modules/muorb/krait/px4muorb_KraitRpcWrapper.hpp +++ b/src/modules/muorb/krait/px4muorb_KraitRpcWrapper.hpp @@ -65,6 +65,8 @@ public: /** * Muorb related functions to pub/sub of orb topic from krait to adsp */ + int32_t TopicAdvertised(const char *topic); + int32_t TopicUnadvertised(const char *topic); int32_t AddSubscriber(const char *topic); int32_t RemoveSubscriber(const char *topic); int32_t SendData(const char *topic, int32_t length_in_bytes, const uint8_t *data); diff --git a/src/modules/muorb/krait/uORBKraitFastRpcChannel.cpp b/src/modules/muorb/krait/uORBKraitFastRpcChannel.cpp index 534bf3bc1e..2da4c031bf 100644 --- a/src/modules/muorb/krait/uORBKraitFastRpcChannel.cpp +++ b/src/modules/muorb/krait/uORBKraitFastRpcChannel.cpp @@ -67,6 +67,27 @@ uORB::KraitFastRpcChannel::KraitFastRpcChannel() : _KraitWrapper.Initialize(); } +int16_t uORB::KraitFastRpcChannel::topic_advertised(const char *messageName) +{ + int16_t rc = 0; + PX4_DEBUG("Before calling TopicAdvertised for [%s]\n", messageName); + rc = _KraitWrapper.TopicAdvertised(messageName); + PX4_DEBUG("Response for TopicAdvertised for [%s], rc[%d]\n", messageName, rc); + return rc; +} + +/* +//TODO: verify if needed +int16_t uORB::KraitFastRpcChannel::topic_unadvertised(const char *messageName) +{ + int16_t rc = 0; + PX4_DEBUG("Before calling TopicUnadvertised for [%s]\n", messageName); + rc = _KraitWrapper.TopicUnadvertised(messageName); + PX4_DEBUG("Response for TopicUnadvertised for [%s], rc[%d]\n", messageName, rc); + return rc; +} +*/ + int16_t uORB::KraitFastRpcChannel::add_subscription(const char *messageName, int32_t msgRateInHz) { int16_t rc = 0; @@ -254,9 +275,14 @@ void uORB::KraitFastRpcChannel::fastrpc_recv_thread() uint8_t *topic_data = (uint8_t *)(messageName + strlen(messageName) + 1); if (_RxHandler != nullptr) { - _RxHandler->process_received_message(messageName, - header->_DataLen, topic_data); - //PX4_DEBUG( "Received topic data for control message for: [%s] len[%d]\n", name, data_length ); + if (header->_MsgType == _DATA_MSG_TYPE) { + //PX4_DEBUG( "Received topic data for: [%s] len[%d]\n", messageName, data_length ); + _RxHandler->process_received_message(messageName, + header->_DataLen, topic_data); + } else if (header->_MsgType == _CONTROL_MSG_TYPE_ADVERTISE) { + PX4_DEBUG( "Received topic for control message for: [%s] len[%d]\n", messageName, data_length ); + _RxHandler->process_remote_topic(messageName, true); + } } bytes_processed += header->_MsgNameLen + header->_DataLen + sizeof(struct BulkTransferHeader); diff --git a/src/modules/muorb/krait/uORBKraitFastRpcChannel.hpp b/src/modules/muorb/krait/uORBKraitFastRpcChannel.hpp index 888602ae9c..0424475909 100644 --- a/src/modules/muorb/krait/uORBKraitFastRpcChannel.hpp +++ b/src/modules/muorb/krait/uORBKraitFastRpcChannel.hpp @@ -70,6 +70,32 @@ public: return (_InstancePtr != nullptr); } + /** + * @brief Interface to notify the remote entity of a topic being advertised. + * + * @param messageName + * This represents the uORB message name(aka topic); This message name should be + * globally unique. + * @return + * 0 = success; This means the messages is successfully sent to the receiver + * Note: This does not mean that the receiver as received it. + * otherwise = failure. + */ + virtual int16_t topic_advertised(const char *messageName); + + /** + * @brief Interface to notify the remote entity of a topic being unadvertised + * and is no longer publishing messages. + * + * @param messageName + * This represents the uORB message name(aka topic); This message name should be + * globally unique. + * @return + * 0 = success; This means the messages is successfully sent to the receiver + * Note: This does not mean that the receiver as received it. + * otherwise = failure. + */ + //virtual int16_t topic_unadvertised(const char *messageName); /** * @brief Interface to notify the remote entity of interest of a @@ -141,8 +167,10 @@ private: // data members static const int32_t _CONTROL_MSG_TYPE_ADD_SUBSCRIBER = 1; static const int32_t _CONTROL_MSG_TYPE_REMOVE_SUBSCRIBER = 2; static const int32_t _DATA_MSG_TYPE = 3; + static const int32_t _CONTROL_MSG_TYPE_ADVERTISE = 4; struct BulkTransferHeader { + uint16_t _MsgType; uint16_t _MsgNameLen; uint16_t _DataLen; }; diff --git a/src/modules/sdlog2/sdlog2.c b/src/modules/sdlog2/sdlog2.c index d8687f986f..8cc8166ad5 100644 --- a/src/modules/sdlog2/sdlog2.c +++ b/src/modules/sdlog2/sdlog2.c @@ -928,16 +928,7 @@ bool copy_if_updated_multi(orb_id_t topic, int multi_instance, int *handle, void bool updated = false; if (*handle < 0) { -#if __PX4_POSIX_EAGLE - // The orb_exists call doesn't work correctly on Snapdragon yet. - // (No data gets sent from the QURT to the Linux side because there - // are no subscribers. However, there won't be any subscribers, if - // they check using orb_exists() before subscribing.) - if (true) -#else if (OK == orb_exists(topic, multi_instance)) -#endif - { *handle = orb_subscribe_multi(topic, multi_instance); /* copy first data */ diff --git a/src/modules/uORB/uORBCommunicator.hpp b/src/modules/uORB/uORBCommunicator.hpp index 971ee3ab01..cbb996b82b 100644 --- a/src/modules/uORB/uORBCommunicator.hpp +++ b/src/modules/uORB/uORBCommunicator.hpp @@ -56,6 +56,33 @@ public: // INTERFACES FOR Control messages over a channel. //========================================================================= + /** + * @brief Interface to notify the remote entity of a topic being advertised. + * + * @param messageName + * This represents the uORB message name(aka topic); This message name should be + * globally unique. + * @return + * 0 = success; This means the messages is successfully sent to the receiver + * Note: This does not mean that the receiver as received it. + * otherwise = failure. + */ + virtual int16_t topic_advertised(const char *messageName) = 0; + + /** + * @brief Interface to notify the remote entity of a topic being unadvertised + * and is no longer publishing messages. + * + * @param messageName + * This represents the uORB message name(aka topic); This message name should be + * globally unique. + * @return + * 0 = success; This means the messages is successfully sent to the receiver + * Note: This does not mean that the receiver as received it. + * otherwise = failure. + */ + //virtual int16_t topic_unadvertised(const char *messageName) = 0; + /** * @brief Interface to notify the remote entity of interest of a * subscription for a message. @@ -127,6 +154,21 @@ class uORBCommunicator::IChannelRxHandler { public: + /** + * Interface to process a received topic from remote. + * @param topic_name + * This represents the uORB message Name (topic); This message Name should be + * globally unique. + * @param isAdvertisement + * Represents if the topic has been advertised or is no longer avialable. + * @return + * 0 = success; This means the messages is successfully handled in the + * handler. + * otherwise = failure. + */ + + virtual int16_t process_remote_topic(const char *topic_name, bool isAdvertisement) = 0; + /** * Interface to process a received AddSubscription from remote. * @param messageName diff --git a/src/modules/uORB/uORBDevices.cpp b/src/modules/uORB/uORBDevices.cpp index 3d2943ee33..472dd56941 100644 --- a/src/modules/uORB/uORBDevices.cpp +++ b/src/modules/uORB/uORBDevices.cpp @@ -489,6 +489,26 @@ int uORB::DeviceNode::unadvertise(orb_advert_t handle) return PX4_OK; } +int16_t uORB::DeviceNode::topic_advertised(const orb_metadata *meta, int priority) +{ + uORBCommunicator::IChannel *ch = uORB::Manager::get_instance()->get_uorb_communicator(); + if (ch != nullptr && meta != nullptr) { + return ch->topic_advertised(meta->o_name); + } + return -1; +} +/* +//TODO: Check if we need this since we only unadvertise when things all shutdown and it doesn't actually remove the device +int16_t uORB::DeviceNode::topic_unadvertised(const orb_metadata *meta, int priority) +{ + uORBCommunicator::IChannel *ch = uORB::Manager::get_instance()->get_uorb_communicator(); + if (ch != nullptr && meta != nullptr) { + return ch->topic_unadvertised(meta->o_name); + } + return -1; +} +*/ + pollevent_t uORB::DeviceNode::poll_state(device::file_t *filp) { diff --git a/src/modules/uORB/uORBDevices.hpp b/src/modules/uORB/uORBDevices.hpp index ef3f6364b6..7eb9fa9d3a 100644 --- a/src/modules/uORB/uORBDevices.hpp +++ b/src/modules/uORB/uORBDevices.hpp @@ -125,6 +125,9 @@ public: static int unadvertise(orb_advert_t handle); + static int16_t topic_advertised(const orb_metadata *meta, int priority); + //static int16_t topic_unadvertised(const orb_metadata *meta, int priority); + /** * processes a request for add subscription from remote * @param rateInHz diff --git a/src/modules/uORB/uORBManager.cpp b/src/modules/uORB/uORBManager.cpp index b4ad57901d..b53726ec6e 100644 --- a/src/modules/uORB/uORBManager.cpp +++ b/src/modules/uORB/uORBManager.cpp @@ -131,11 +131,15 @@ int uORB::Manager::orb_exists(const struct orb_metadata *meta, int instance) return ERROR; } -#ifdef __PX4_NUTTX +#if __PX4_NUTTX struct stat buffer; return stat(path, &buffer); #else - return px4_access(path, F_OK); + ret = px4_access(path, F_OK); + if (ret == -1 && meta != nullptr && _remote_topics.size() > 0) { + ret = (_remote_topics.find(meta->o_name) != _remote_topics.end()); + } + return ret; #endif } @@ -197,6 +201,9 @@ orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta, return nullptr; } + //For remote systems call over and inform them + uORB::DeviceNode::topic_advertised(meta, priority); + /* the advertiser must perform an initial publish to initialise the object */ result = orb_publish(meta, advertiser, data); @@ -433,6 +440,23 @@ uORBCommunicator::IChannel *uORB::Manager::get_uorb_communicator(void) return _comm_channel; } +//----------------------------------------------------------------------------- +//----------------------------------------------------------------------------- +int16_t uORB::Manager::process_remote_topic(const char *topic_name, bool isAdvertisement) +{ + warnx("[posix-uORB::Manager::process_remote_topic(%d)] name: %s, isAdver: %d", + __LINE__, topic_name, isAdvertisement); + int16_t rc = 0; + + if (isAdvertisement) { + _remote_topics.insert(topic_name); + } else { + _remote_topics.erase(topic_name); + } + + return rc; +} + //----------------------------------------------------------------------------- //----------------------------------------------------------------------------- int16_t uORB::Manager::process_add_subscription(const char *messageName, diff --git a/src/modules/uORB/uORBManager.hpp b/src/modules/uORB/uORBManager.hpp index 18df99fa2f..5ac0c6c4cb 100644 --- a/src/modules/uORB/uORBManager.hpp +++ b/src/modules/uORB/uORBManager.hpp @@ -410,6 +410,7 @@ private: // data members // the communicator channel instance. uORBCommunicator::IChannel *_comm_channel; ORBSet _remote_subscriber_topics; + ORBSet _remote_topics; DeviceMaster *_device_masters[Flavor_count]; ///< Allow at most one DeviceMaster per Flavor @@ -417,6 +418,20 @@ private: //class methods Manager(); ~Manager(); + /** + * Interface to process a received topic from remote. + * @param topic_name + * This represents the uORB message Name (topic); This message Name should be + * globally unique. + * @param isAdvertisement + * Represents if the topic has been advertised or is no longer avialable. + * @return + * 0 = success; This means the messages is successfully handled in the + * handler. + * otherwise = failure. + */ + virtual int16_t process_remote_topic(const char *topic_name, bool isAdvertisement); + /** * Interface to process a received AddSubscription from remote. * @param messageName