diff --git a/src/modules/muorb/adsp/module.mk b/src/modules/muorb/adsp/module.mk index 65d0e8e555..d31ca360db 100644 --- a/src/modules/muorb/adsp/module.mk +++ b/src/modules/muorb/adsp/module.mk @@ -39,7 +39,7 @@ ifeq ($(PX4_TARGET_OS),qurt) SRCS = \ - muorb_fastrpc.cpp \ + px4muorb.cpp \ uORBFastRpcChannel.cpp INCLUDE_DIRS += \ diff --git a/src/modules/muorb/adsp/muorb_fastrpc.cpp b/src/modules/muorb/adsp/px4muorb.cpp similarity index 76% rename from src/modules/muorb/adsp/muorb_fastrpc.cpp rename to src/modules/muorb/adsp/px4muorb.cpp index febe578258..9e7265f2a3 100644 --- a/src/modules/muorb/adsp/muorb_fastrpc.cpp +++ b/src/modules/muorb/adsp/px4muorb.cpp @@ -30,7 +30,7 @@ * POSSIBILITY OF SUCH DAMAGE. * ****************************************************************************/ -#include "muorb_fastrpc.h" +#include "px4muorb.hpp" #include "qurt.h" #include "uORBFastRpcChannel.hpp" #include "uORBManager.hpp" @@ -44,19 +44,20 @@ #include "uORB/topics/sensor_combined.h" #include "uORB.h" +#include "HAP_power.h" + #define _ENABLE_MUORB 1 -__BEGIN_DECLS - -int dspal_main(int argc, const char *argv[]); - -__END_DECLS +extern "C" { + int dspal_main(int argc, const char *argv[]); +}; -int muorb_fastrpc_orb_initialize() +int px4muorb_orb_initialize() { int rc = 0; PX4_WARN("Before calling dspal_entry() method..."); + HAP_power_request(100, 100, 1000); // registere the fastrpc muorb with uORBManager. uORB::Manager::get_instance()->set_uorb_communicator(uORB::FastRpcChannel::GetInstance()); const char *argv[2] = { "dspal", "start" }; @@ -66,7 +67,7 @@ int muorb_fastrpc_orb_initialize() return rc; } -int muorb_fastrpc_add_subscriber(const char *name) +int px4muorb_add_subscriber(const char *name) { int rc = 0; uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance(); @@ -87,7 +88,7 @@ int muorb_fastrpc_add_subscriber(const char *name) return rc; } -int muorb_fastrpc_remove_subscriber(const char *name) +int px4muorb_remove_subscriber(const char *name) { int rc = 0; uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance(); @@ -105,7 +106,7 @@ int muorb_fastrpc_remove_subscriber(const char *name) } -int muorb_fastrpc_send_topic_data(const char *name, const uint8_t *data, int data_len_in_bytes) +int px4muorb_send_topic_data(const char *name, const uint8_t *data, int data_len_in_bytes) { int rc = 0; uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance(); @@ -121,7 +122,7 @@ int muorb_fastrpc_send_topic_data(const char *name, const uint8_t *data, int dat return rc; } -int muorb_fastrpc_is_subscriber_present(const char *topic_name, int *status) +int px4muorb_is_subscriber_present(const char *topic_name, int *status) { int rc = 0; int32_t local_status = 0; @@ -135,20 +136,35 @@ int muorb_fastrpc_is_subscriber_present(const char *topic_name, int *status) return rc; } -int muorb_fastrpc_receive_msg(int *msg_type, char *topic_name, int topic_name_len, uint8_t *data, int data_len_in_bytes, - int *bytes_returned) +int px4muorb_receive_msg(int *msg_type, char *topic_name, int topic_name_len, uint8_t *data, int data_len_in_bytes, + int *bytes_returned) { int rc = 0; int32_t local_msg_type = 0; int32_t local_bytes_returned = 0; uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance(); + //PX4_DEBUG( "topic_namePtr: [0x%p] dataPtr: [0x%p]", topic_name, data ); rc = channel->get_data(&local_msg_type, topic_name, topic_name_len, data, data_len_in_bytes, &local_bytes_returned); *msg_type = (int)local_msg_type; *bytes_returned = (int)local_bytes_returned; return rc; } -int muorb_fastrpc_unblock_recieve_msg(void) +int px4muorb_receive_bulk_data(uint8_t *bulk_transfer_buffer, int max_size_in_bytes, + int *returned_length_in_bytes, int *topic_count) +{ + int rc = 0; + int32_t local_bytes_returned = 0; + int32_t local_topic_count = 0; + uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance(); + //PX4_DEBUG( "topic_namePtr: [0x%p] dataPtr: [0x%p]", topic_name, data ); + rc = channel->get_bulk_data(bulk_transfer_buffer, max_size_in_bytes, &local_bytes_returned, &local_topic_count); + *returned_length_in_bytes = (int)local_bytes_returned; + *topic_count = (int)local_topic_count; + return rc; +} + +int px4muorb_unblock_recieve_msg(void) { int rc = 0; uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance(); diff --git a/src/modules/muorb/adsp/muorb_fastrpc.h b/src/modules/muorb/adsp/px4muorb.hpp similarity index 72% rename from src/modules/muorb/adsp/muorb_fastrpc.h rename to src/modules/muorb/adsp/px4muorb.hpp index 96eda20d35..d509819570 100644 --- a/src/modules/muorb/adsp/muorb_fastrpc.h +++ b/src/modules/muorb/adsp/px4muorb.hpp @@ -35,21 +35,24 @@ #include #include -__BEGIN_DECLS +extern "C" { -int muorb_fastrpc_orb_initialize() __EXPORT; +int px4muorb_orb_initialize() __EXPORT; -int muorb_fastrpc_add_subscriber(const char *name) __EXPORT; +int px4muorb_add_subscriber(const char *name) __EXPORT; -int muorb_fastrpc_remove_subscriber(const char *name) __EXPORT; +int px4muorb_remove_subscriber(const char *name) __EXPORT; -int muorb_fastrpc_send_topic_data(const char *name, const uint8_t *data, int data_len_in_bytes) __EXPORT; +int px4muorb_send_topic_data(const char *name, const uint8_t *data, int data_len_in_bytes) __EXPORT; -int muorb_fastrpc_is_subscriber_present(const char *topic_name, int *status) __EXPORT; +int px4muorb_is_subscriber_present(const char *topic_name, int *status) __EXPORT; -int muorb_fastrpc_receive_msg(int *msg_type, char *topic_name, int topic_name_len, uint8_t *data, int data_len_in_bytes, - int *bytes_returned) __EXPORT; +int px4muorb_receive_msg(int *msg_type, char *topic_name, int topic_name_len, uint8_t *data, int data_len_in_bytes, + int *bytes_returned) __EXPORT; -int muorb_fastrpc_unblock_recieve_msg(void) __EXPORT; +int px4muorb_receive_bulk_data(uint8_t *_BulkTransferBuffer, int max_size_in_bytes, + int *length_in_bytes, int *topic_count) __EXPORT; -__END_DECLS +int px4muorb_unblock_recieve_msg(void) __EXPORT; + +} diff --git a/src/modules/muorb/adsp/uORBFastRpcChannel.cpp b/src/modules/muorb/adsp/uORBFastRpcChannel.cpp index 87c2bba57d..d281c1a7a4 100644 --- a/src/modules/muorb/adsp/uORBFastRpcChannel.cpp +++ b/src/modules/muorb/adsp/uORBFastRpcChannel.cpp @@ -33,10 +33,24 @@ #include "uORBFastRpcChannel.hpp" #include "px4_log.h" #include +#include // static intialization. uORB::FastRpcChannel uORB::FastRpcChannel::_Instance; +static hrt_abstime _check_time; +static unsigned long _dropped_pkts; +static unsigned long _get_min = 0xFFFFFF; +static unsigned long _get_max = 0; +static unsigned long _min_q = 200; +static unsigned long _max_q = 0; +static unsigned long _avg_q = 0; +static unsigned long _count = 0; +static unsigned long _get_bulk_min = 0xFFFFFF; +static unsigned long _get_bulk_max = 0; +static unsigned long _bulk_topic_count_min = 0xFFFFFF; +static unsigned long _bulk_topic_count_max = 0; + //============================================================================== //============================================================================== uORB::FastRpcChannel::FastRpcChannel() @@ -81,11 +95,11 @@ int16_t uORB::FastRpcChannel::is_subscriber_present(const char *messageName, int if (std::find(_Subscribers.begin(), _Subscribers.end(), messageName) != _Subscribers.end()) { *status = 1; - PX4_DEBUG("******* Found subscriber for message[%s]....", messageName); + //PX4_DEBUG("******* Found subscriber for message[%s]....", messageName); } else { *status = 0; - PX4_WARN("@@@@@ Subscriber not found for[%s]...numSubscribers[%d]", messageName, _Subscribers.size()); + //PX4_WARN("@@@@@ Subscriber not found for[%s]...numSubscribers[%d]", messageName, _Subscribers.size()); int i = 0; for (std::list::iterator it = _Subscribers.begin(); it != _Subscribers.end(); ++it) { @@ -120,19 +134,23 @@ int16_t uORB::FastRpcChannel::register_handler(uORBCommunicator::IChannelRxHandl int16_t uORB::FastRpcChannel::send_message(const char *messageName, int32_t length, uint8_t *data) { int16_t rc = 0; + hrt_abstime t1, t2; + static hrt_abstime check_time = 0; + int32_t initial_queue_size = 0; if (_RemoteSubscribers.find(messageName) == _RemoteSubscribers.end()) { //there is no-remote subscriber. So do not queue the message. return rc; } + t1 = hrt_absolute_time(); _QueueMutex.lock(); bool overwriteData = false; if (IsDataQFull()) { // queue is full. Overwrite the oldest data. - PX4_WARN("[send_message] Queue Full Overwrite the oldest data. in[%ld] out[%ld] max[%ld]", - _DataQInIndex, _DataQOutIndex, _MAX_MSG_QUEUE_SIZE); + //PX4_WARN("[send_message] Queue Full Overwrite the oldest data. in[%ld] out[%ld] max[%ld]", + // _DataQInIndex, _DataQOutIndex, _MAX_MSG_QUEUE_SIZE); _DataQOutIndex++; if (_DataQOutIndex == _MAX_MSG_QUEUE_SIZE) { @@ -140,6 +158,7 @@ int16_t uORB::FastRpcChannel::send_message(const char *messageName, int32_t leng } overwriteData = true; + _dropped_pkts++; } // now check to see if the data queue's buffer size if large enough to memcpy the data. @@ -158,11 +177,30 @@ int16_t uORB::FastRpcChannel::send_message(const char *messageName, int32_t leng } // the assumption here is that each caller reads only one data from either control or data queue. - if (!overwriteData) { + //if (!overwriteData) { + if (DataQSize() == 1) { // post it only of the queue moves from empty to available. _DataAvailableSemaphore.post(); } + if ((unsigned long)DataQSize() < _min_q) { _min_q = (unsigned long)DataQSize(); } + if ((unsigned long)DataQSize() > _max_q) { _max_q = (unsigned long)DataQSize(); } + + _count++; + _avg_q = ((double)((_avg_q * (_count - 1)) + (unsigned long)(DataQSize()))) / (double)(_count); + _QueueMutex.unlock(); + t2 = hrt_absolute_time(); + + if ((unsigned long)(t2 - check_time) > 10000000) { + //PX4_DEBUG("MsgName: %20s, t1: %lu, t2: %lu, dt: %lu",messageName, (unsigned long) t1, (unsigned long) t2, (unsigned long) (t2-t1)); + //PX4_DEBUG("Q. Stats: min: %lu, max : %lu, avg: %lu count: %lu ", _min_q, _max_q, (unsigned long)(_avg_q * 1000.0), _count); + _min_q = _MAX_MSG_QUEUE_SIZE * 2; + _max_q = 0; + _avg_q = 0; + _count = 0; + check_time = t2; + } + return rc; } @@ -235,8 +273,13 @@ int16_t uORB::FastRpcChannel::get_data ) { int16_t rc = 0; + PX4_DEBUG("Get data should not be called..."); + return -1; // wait for data availability + static hrt_abstime check_time = 0; + hrt_abstime t1 = hrt_absolute_time(); _DataAvailableSemaphore.wait(); + hrt_abstime t2 = hrt_absolute_time(); _QueueMutex.lock(); if (DataQSize() != 0 || ControlQSize() != 0) { @@ -314,5 +357,191 @@ int16_t uORB::FastRpcChannel::get_data } _QueueMutex.unlock(); + hrt_abstime t3 = hrt_absolute_time(); + + if ((unsigned long)(t3 - t1) > _get_max) { _get_max = (unsigned long)(t3 - t1); } + + if ((unsigned long)(t3 - t1) < _get_min) { _get_min = (unsigned long)(t3 - t1); } + + if ((unsigned long)(t3 - check_time) > 1000000) { + if (rc != 0) { + topic_name[0] = '\0'; + } + /* + PX4_DEBUG("GetData: %30s: t1: %lu t2: %lu t3: %lu", topic_name, (unsigned long)t1, (unsigned long)t2, + (unsigned long)t3); + PX4_DEBUG(".... dt1: %7lu dt2: %7lu Q: %d", (unsigned long)(t2 - t1), (unsigned long)(t3 - t2), DataQSize()); + PX4_DEBUG("ADSP RPC Stats: _get_min: %lu _get_max: %lu _dropped_pkts: %lu", _get_min, _get_max, _dropped_pkts); + */ + check_time = t3; + } + + return rc; +} + + +int16_t uORB::FastRpcChannel::get_bulk_data +( + uint8_t *buffer, + int32_t max_buffer_in_bytes, + int32_t *returned_bytes, + int32_t *topic_count +) +{ + int16_t rc = 0; + // wait for data availability + static hrt_abstime check_time = 0; + hrt_abstime t1 = hrt_absolute_time(); + _DataAvailableSemaphore.wait(); + hrt_abstime t2 = hrt_absolute_time(); + + _QueueMutex.lock(); + + int32_t bytes_copied = 0; + int32_t copy_result = 0; + *returned_bytes = 0; + *topic_count = 0; + int32_t topic_count_to_return = 0; + + if (DataQSize() != 0) { + //PX4_DEBUG( "get_bulk_data: QSize: %d", DataQSize() ); + topic_count_to_return = DataQSize(); + + while (DataQSize() != 0) { + // this is a hack as we are using a counting semaphore. Should be re-implemented with cond_variable and wait. + //_DataAvailableSemaphore.wait(); + if (get_data_msg_size_at(_DataQOutIndex) < (max_buffer_in_bytes - bytes_copied)) { + // there is enough space in the buffer, copy the data. + //PX4_DEBUG( "Coping Data to buffer..." ); + copy_result = copy_data_to_buffer(_DataQOutIndex, buffer, bytes_copied, max_buffer_in_bytes); + + if (copy_result == -1) { + if (bytes_copied == 0) { + rc = -1; + } + + break; + + } else { + //PX4_DEBUG( "[%d] %02x %02x %02x %02x", *topic_count,\ + // buffer[bytes_copied], \ + // buffer[bytes_copied+1], \ + // buffer[bytes_copied+2], \ + // buffer[bytes_copied+3] ); + bytes_copied += copy_result; + (*topic_count)++; + *returned_bytes = bytes_copied; + _DataQOutIndex++; + + if (_DataQOutIndex == _MAX_MSG_QUEUE_SIZE) { + _DataQOutIndex = 0; + } + } + + } else { + if (bytes_copied == 0) { + rc = -1; + PX4_WARN("ERROR: Insufficent space in data buffer, no topics returned"); + + } else { + PX4_DEBUG("Exiting out of the while loop..."); + } + + break; + } + } + + } else { + PX4_ERR("[get_data_bulk] Error: Semaphore is up when there is no data on the control/data queues"); + rc = -1; + } + + if (topic_count_to_return != *topic_count) { + PX4_WARN("Not sending all topics: topics_to_return:[%d] topics_returning:[%d]", topic_count_to_return, *topic_count); + } + + _QueueMutex.unlock(); + hrt_abstime t3 = hrt_absolute_time(); + + if ((unsigned long)(t3 - t1) > _get_bulk_max) { _get_bulk_max = (unsigned long)(t3 - t1); } + if ((unsigned long)(t3 - t1) < _get_bulk_min) { _get_bulk_min = (unsigned long)(t3 - t1); } + if ((unsigned long)(*topic_count) > _bulk_topic_count_max) { _bulk_topic_count_max = (unsigned long)(*topic_count); } + if ((unsigned long)(*topic_count) < _bulk_topic_count_min) { _bulk_topic_count_min = (unsigned long)(*topic_count); } + if ((unsigned long)(t3 - check_time) > 10000000) { + //PX4_DEBUG("GetData: t1: %lu t2: %lu t3: %lu", (unsigned long)t1, (unsigned long)t2, (unsigned long)t3); + //PX4_DEBUG(".... dt1: %7lu dt2: %7lu Q: %d", (unsigned long)(t2 - t1), (unsigned long)(t3 - t2), DataQSize()); + //PX4_DEBUG("ADSP RPC Stats: _get_bulk_min: %lu _get_bulk_max: %lu _dropped_pkts: %lu", _get_bulk_min, _get_bulk_max, + // _dropped_pkts); + //PX4_DEBUG(" .... topic_count_min: %lu topic_count_max: %lu", _bulk_topic_count_min, _bulk_topic_count_max); + _get_bulk_max = 0; + _get_bulk_min = 0xFFFFFF; + _bulk_topic_count_min = 0xFFFFFF; + _bulk_topic_count_max = 0; + check_time = t3; + } + + //PX4_DEBUG( "Returning topics: %d bytes_returned: %d", *topic_count, *returned_bytes ); + return rc; +} + + +int32_t uORB::FastRpcChannel::get_data_msg_size_at(int32_t index) +{ + // the assumption here is that this is called within the context of semaphore, + // hence lock/unlock is not needed. + int32_t rc = 0; + rc += _DataMsgQueue[ index ]._Length; + rc += _DataMsgQueue[ index ]._MsgName.size() + 1; + rc += _PACKET_HEADER_SIZE; + return rc; +} + + +int32_t uORB::FastRpcChannel::copy_data_to_buffer(int32_t src_index, uint8_t *dst_buffer, int32_t offset, + int32_t dst_buffer_len) +{ + int32_t rc = -1; + + // before calling this method the following are assumed: + // * sem_lock is acquired for data protection + // * the dst_buffer is validated to + + // compute the different offsets to pack the packets. + int32_t field_header_offset = offset; + int32_t field_topic_name_offset = field_header_offset + sizeof(struct BulkTransferHeader); + int32_t field_data_offset = field_topic_name_offset + _DataMsgQueue[ src_index ]._MsgName.size() + 1; + + struct BulkTransferHeader header = { (uint16_t)(_DataMsgQueue[ src_index ]._MsgName.size() + 1), (uint16_t)(_DataMsgQueue[ src_index ]._Length) }; + + + //PX4_DEBUG( "Offsets: header[%d] name[%d] data[%d]", + // field_header_offset, + // field_topic_name_offset, + // field_data_offset ); + + if ((field_data_offset + _DataMsgQueue[ src_index ]._Length) < dst_buffer_len) { + memmove(&(dst_buffer[field_header_offset]), (char *)(&header), sizeof(header)); + // pack the data here. + memmove + ( + &(dst_buffer[field_topic_name_offset]), + _DataMsgQueue[ src_index ]._MsgName.c_str(), + _DataMsgQueue[ src_index ]._MsgName.size() + ); + + if (_DataMsgQueue[ src_index ]._MsgName.size() == 0) { + PX4_WARN("########## Error MsgName cannot be zero: "); + } + + dst_buffer[ field_topic_name_offset + _DataMsgQueue[ src_index ]._MsgName.size()] = '\0'; + memmove(&(dst_buffer[field_data_offset]), _DataMsgQueue[ src_index ]._Buffer, _DataMsgQueue[ src_index ]._Length); + rc = field_data_offset + _DataMsgQueue[ src_index ]._Length - offset; + + } else { + PX4_WARN("Error coping the DataMsg to dst buffer, insuffienct space. "); + PX4_WARN("... offset[%d] len[%d] data_msg_len[%d]", + offset, dst_buffer_len, (field_data_offset - offset) + _DataMsgQueue[ src_index ]._Length); + } + return rc; } diff --git a/src/modules/muorb/adsp/uORBFastRpcChannel.hpp b/src/modules/muorb/adsp/uORBFastRpcChannel.hpp index 0446fb1772..37955603fd 100644 --- a/src/modules/muorb/adsp/uORBFastRpcChannel.hpp +++ b/src/modules/muorb/adsp/uORBFastRpcChannel.hpp @@ -123,6 +123,8 @@ public: int32_t *bytes_returned ); + int16_t get_bulk_data(uint8_t *buffer, int32_t max_size_in_bytes, int32_t *returned_bytes, int32_t *topic_count); + // function to check if there are subscribers for a topic on adsp. int16_t is_subscriber_present(const char *messageName, int32_t *status); @@ -153,6 +155,11 @@ private: // data members static const int32_t _CONTROL_MSG_TYPE_REMOVE_SUBSCRIBER = 2; static const int32_t _DATA_MSG_TYPE = 3; + static const int32_t _PACKET_FIELD_TOPIC_NAME_LEN_SIZE_IN_BYTES = 2; + static const int32_t _PACKET_FIELD_DATA_LEN_IN_BYTES = 2; + static const int32_t _PACKET_HEADER_SIZE = + _PACKET_FIELD_TOPIC_NAME_LEN_SIZE_IN_BYTES + _PACKET_FIELD_DATA_LEN_IN_BYTES; + struct FastRpcDataMsg { int32_t _MaxBufferSize; int32_t _Length; @@ -165,6 +172,12 @@ private: // data members std::string _MsgName; }; + struct BulkTransferHeader { + uint16_t _MsgNameLen; + uint16_t _DataLen; + }; + + struct FastRpcDataMsg _DataMsgQueue[ _MAX_MSG_QUEUE_SIZE ]; int32_t _DataQInIndex; int32_t _DataQOutIndex; @@ -245,6 +258,9 @@ private://class members. int32_t DataQSize(); int32_t ControlQSize(); + int32_t get_data_msg_size_at(int32_t index); + int32_t copy_data_to_buffer(int32_t src_index, uint8_t *dst_buffer, int32_t offset, int32_t dst_buffer_len); + std::set _RemoteSubscribers; }; diff --git a/src/modules/muorb/krait/module.mk b/src/modules/muorb/krait/module.mk index c628b53f96..4f30ee46e1 100644 --- a/src/modules/muorb/krait/module.mk +++ b/src/modules/muorb/krait/module.mk @@ -44,6 +44,5 @@ INCLUDE_DIRS += $(EXT_MUORB_LIB_ROOT)/krait/include \ $(PX4_BASE)/src/modules/uORB \ $(PX4_BASE)/src/modules -EXTRA_LIBS += $(EXT_MUORB_LIB_ROOT)/krait/libs/libmuorb.so +EXTRA_LIBS += $(EXT_MUORB_LIB_ROOT)/krait/libs/libpx4muorb.so endif - diff --git a/src/modules/muorb/krait/uORBKraitFastRpcChannel.cpp b/src/modules/muorb/krait/uORBKraitFastRpcChannel.cpp index 3900a38c94..325fdc4b08 100644 --- a/src/modules/muorb/krait/uORBKraitFastRpcChannel.cpp +++ b/src/modules/muorb/krait/uORBKraitFastRpcChannel.cpp @@ -33,19 +33,37 @@ #include "uORBKraitFastRpcChannel.hpp" #include "px4_log.h" - +#include "px4_tasks.h" +#include +#include #define LOG_TAG "uORBKraitFastRpcChannel.cpp" +static void DumpData(uint8_t *buffer, int32_t length, int32_t num_topics); + +static std::string _log_file_name = "./hex_dump.txt"; + // static intialization. uORB::KraitFastRpcChannel uORB::KraitFastRpcChannel::_Instance; +static unsigned long _snd_msg_min = 0xFFFFFF; +static unsigned long _snd_msg_max = 0; +static double _snd_msg_avg = 0.0; +static unsigned long _snd_msg_count = 0; +static unsigned long _overall_snd_min = 0xFFFFFF; +static unsigned long _overall_snd_max = 0; +static double _overall_snd_avg = 0.0; +static unsigned long _overall_snd_count = 0; +static hrt_abstime _log_check_time = 0; +static hrt_abstime _log_check_interval = 10000000; + + //============================================================================== //============================================================================== uORB::KraitFastRpcChannel::KraitFastRpcChannel() : _RxHandler(nullptr) , _ThreadStarted(false) - , _ShouldExit(false) + , _ThreadShouldExit(false) { _KraitWrapper.Initialize(); } @@ -91,17 +109,63 @@ int16_t uORB::KraitFastRpcChannel::send_message(const char *messageName, int32_t // invoke the fast rpc call to send data defined in idl. //PX4_DEBUG( "Before calling send_data for [%s] len[%d]\n", messageName.c_str(), length ); int32_t status = 0; + hrt_abstime t1, t4; + hrt_abstime t2 = 0, t3 = 0; + t1 = hrt_absolute_time(); - if (_KraitWrapper.IsSubscriberPresent(messageName, &status) == 0) { - if (status > 0) { // there are remote subscribers - rc = _KraitWrapper.SendData(messageName, length, data); - //PX4_DEBUG( "***** SENDING[%s] topic to remote....\n", messageName.c_str() ); - - } else { - //PX4_DEBUG( "******* NO SUBSCRIBER PRESENT ON THE REMOTE FOR topic[%s] \n", messageName.c_str() ); + if (_AdspSubscriberCache.find(std::string(messageName)) == _AdspSubscriberCache.end()) { + // check the status from adsp. as it is not cached. + if (_KraitWrapper.IsSubscriberPresent(messageName, &status) == 0) { + _AdspSubscriberCache[messageName] = status; + _AdspSubscriberSampleTimestamp[messageName] = hrt_absolute_time(); } + } else { - PX4_ERR("Error returned for KraitWrapper.IsSubscriberPresent(%s)\n", messageName); + if ((hrt_absolute_time() - _AdspSubscriberSampleTimestamp[messageName]) > _SubCacheRefreshRate) { + if (_KraitWrapper.IsSubscriberPresent(messageName, &status) == 0) { + _AdspSubscriberCache[messageName] = status; + _AdspSubscriberSampleTimestamp[messageName] = hrt_absolute_time(); + } + } + } + + if (_AdspSubscriberCache[messageName] > 0) {// there are remote subscribers + t2 = hrt_absolute_time(); + rc = _KraitWrapper.SendData(messageName, length, data); + t3 = hrt_absolute_time(); + _snd_msg_count++; + //PX4_DEBUG( "***** SENDING[%s] topic to remote....\n", messageName.c_str() ); + + } else { + //PX4_DEBUG( "******* NO SUBSCRIBER PRESENT ON THE REMOTE FOR topic[%s] \n", messageName.c_str() ); + } + + t4 = hrt_absolute_time(); + _overall_snd_count++; + + if ((t4 - t1) < _overall_snd_min) { _overall_snd_min = (t4 - t1); } + + if ((t4 - t1) > _overall_snd_max) { _overall_snd_max = (t4 - t1); } + + if (_AdspSubscriberCache[messageName] > 0) { + if ((t3 - t2) < _snd_msg_min) { _snd_msg_min = (t3 - t2); } + + if ((t3 - t2) > _snd_msg_max) { _snd_msg_max = (t3 - t2); } + + _snd_msg_avg = ((double)((_snd_msg_avg * (_snd_msg_count - 1)) + (unsigned long)(t3 - t2))) / (double)(_snd_msg_count); + } + + _overall_snd_avg = ((double)((_overall_snd_avg * (_overall_snd_count - 1)) + (unsigned long)(t4 - t1))) / (double)( + _overall_snd_count); + + + + if ((t4 - _log_check_time) > _log_check_interval) { + _log_check_time = t4; + _overall_snd_min = _snd_msg_min = 0xFFFFFFF; + _overall_snd_max = _snd_msg_max = 0; + _overall_snd_count = _snd_msg_count = 0; + _overall_snd_avg = _snd_msg_avg = 0.0; } //PX4_DEBUG( "Response for SendMessage for [%s],len[%d] rc[%d]\n", messageName.c_str(), length, rc ); @@ -111,80 +175,181 @@ int16_t uORB::KraitFastRpcChannel::send_message(const char *messageName, int32_t void uORB::KraitFastRpcChannel::Start() { _ThreadStarted = true; - pthread_create(&_RecvThread, NULL, thread_start, this); + _ThreadShouldExit = false; + pthread_attr_t recv_thread_attr; + pthread_attr_init(&recv_thread_attr); + + struct sched_param param; + (void)pthread_attr_getschedparam(&recv_thread_attr, ¶m); + param.sched_priority = SCHED_PRIORITY_MAX - 80; + (void)pthread_attr_setschedparam(&recv_thread_attr, ¶m); + + pthread_attr_setstacksize(&recv_thread_attr, 4096); + + if (pthread_create(&_RecvThread, &recv_thread_attr, thread_start, (void *)this) != 0) { + PX4_ERR("Error creating the receive thread for muorb"); + + } else { + pthread_setname_np(_RecvThread, "muorb_krait_receiver"); + } + + pthread_attr_destroy(&recv_thread_attr); } void uORB::KraitFastRpcChannel::Stop() { - _ShouldExit = true; + _ThreadShouldExit = true; _KraitWrapper.UnblockReceiveData(); - PX4_DEBUG("After calling krait_wrapper_unlock_receive_Data...\n"); + PX4_DEBUG("After calling _KraitWrapper.UnblockReceiveData()...\n"); pthread_join(_RecvThread, NULL); - PX4_DEBUG("*** After calling thread wait...\n"); + PX4_DEBUG("*** After calling pthread_join...\n"); _ThreadStarted = false; - _ShouldExit = false; } -void uORB::KraitFastRpcChannel::thread_start(void *handler) +void *uORB::KraitFastRpcChannel::thread_start(void *handler) { if (handler != nullptr) { ((uORB::KraitFastRpcChannel *)handler)->fastrpc_recv_thread(); } + + return 0; } void uORB::KraitFastRpcChannel::fastrpc_recv_thread() { // sit in while loop. int32_t rc = 0; - int32_t type = 0; - char *name = nullptr; int32_t data_length = 0; uint8_t *data = nullptr; + unsigned long rpc_min, rpc_max; + unsigned long orb_min, orb_max; + double rpc_avg, orb_avg; + unsigned long count = 0; + rpc_max = orb_max = 0; + rpc_min = orb_min = 0xFFFFFFFF; + rpc_avg = orb_avg = 0.0; - while (!_ShouldExit) { + int32_t num_topics = 0; + + hrt_abstime check_time = 0; + + while (!_ThreadShouldExit) { + hrt_abstime t1, t2, t3; + t1 = hrt_absolute_time(); // call the fastrpc recv data call. //uorb_fastrpc_recieve( &type, &name_len, name, &data_length, data ); - rc = _KraitWrapper.ReceiveData(&type, &name, &data_length, &data); + //rc = _KraitWrapper.ReceiveData(&type, &name, &data_length, &data); + rc = _KraitWrapper.ReceiveBulkData(&data, &data_length, &num_topics); + + t2 = hrt_absolute_time(); if (rc == 0) { - switch (type) { - case _CONTROL_MSG_TYPE_ADD_SUBSCRIBER: - if (_RxHandler != nullptr) { - _RxHandler->process_add_subscription(name, 1); - PX4_DEBUG("Received add subscriber control message for: [%s]\n", name); + //PX4_DEBUG( "Num of topics Received: %d", num_topics ); + int32_t bytes_processed = 0; + + for (int i = 0; i < num_topics; ++i) { + uint8_t *new_pkt = &(data[bytes_processed]); + struct BulkTransferHeader *header = (struct BulkTransferHeader *)new_pkt; + char *messageName = (char *)(new_pkt + sizeof(struct BulkTransferHeader)); + uint16_t check_msg_len = strlen(messageName); + + if (header->_MsgNameLen != (check_msg_len + 1)) { + PX4_ERR("Error: Packing error. Sent Msg Len. of[%d] but strlen returned:[%d]", header->_MsgNameLen , check_msg_len); + PX4_ERR("Error: NumTopics: %d processing topic: %d msgLen[%d] dataLen[%d] data_len[%d] bytes processed: %d", + num_topics, i, header->_MsgNameLen, header->_DataLen , data_length, bytes_processed); + DumpData(data, data_length, num_topics); + break; } - break; + uint8_t *topic_data = (uint8_t *)(messageName + strlen(messageName) + 1); - case _CONTROL_MSG_TYPE_REMOVE_SUBSCRIBER: if (_RxHandler != nullptr) { - _RxHandler->process_remove_subscription(name); - PX4_DEBUG("Received remove subscriber control message for: [%s]\n", name); - } - - break; - - case _DATA_MSG_TYPE: - if (_RxHandler != nullptr) { - _RxHandler->process_received_message(name, - data_length, data); + _RxHandler->process_received_message(messageName, + header->_DataLen, topic_data); //PX4_DEBUG( "Received topic data for control message for: [%s] len[%d]\n", name, data_length ); } - break; - - default: - // error condition. - break; + bytes_processed += header->_MsgNameLen + header->_DataLen + sizeof(struct BulkTransferHeader); } } else { PX4_DEBUG("Error: Getting data over fastRPC channel\n"); break; } + + t3 = hrt_absolute_time(); + count++; + + if ((unsigned long)(t2 - t1) < rpc_min) { + rpc_min = (unsigned long)(t2 - t1); + } + if ((unsigned long)(t2 - t1) > rpc_max) { + rpc_max = (unsigned long)(t2 - t1); + } + if ((unsigned long)(t3 - t2) < orb_min) { + orb_min = (unsigned long)(t3 - t2); + } + if ((unsigned long)(t3 - t2) > orb_max) { + orb_max = (unsigned long)(t3 - t2); + } + + rpc_avg = ((double)((rpc_avg * (count - 1)) + (unsigned long)(t2 - t1))) / (double)(count); + orb_avg = ((double)((orb_avg * (count - 1)) + (unsigned long)(t3 - t2))) / (double)(count); + + if ((unsigned long)(t3 - check_time) >= 10000000) { + //PX4_DEBUG("Krait RPC Stats : rpc_min: %lu rpc_max: %lu rpc_avg: %f", rpc_min, rpc_max, rpc_avg); + //PX4_DEBUG("Krait RPC(orb) Stats: orb_min: %lu orb_max: %lu orb_avg: %f", orb_min, orb_max, orb_avg); + check_time = t3; + rpc_max = orb_max = 0; + rpc_min = orb_min = 0xFFFFFF; + orb_avg = 0; + rpc_avg = 0; + count = 0; + } + + //PX4_DEBUG("MsgName: %30s, t1: %lu, t2: %lu, t3: %lu, dt1: %lu, dt2: %lu",name, (unsigned long) t1, (unsigned long) t2, (unsigned long) t3, + // (unsigned long) (t2-t1), (unsigned long) (t3-t2)); + } PX4_DEBUG("[uORB::KraitFastRpcChannel::fastrpc_recv_thread] Exiting fastrpc_recv_thread\n"); } +void DumpData(uint8_t *buffer, int32_t length, int32_t num_topics) +{ + FILE *fp = fopen(_log_file_name.c_str(), "a+"); + + if (fp == nullptr) { + PX4_ERR("Error unable to open log file[%s]", _log_file_name.c_str()); + return; + } + + fprintf(fp, "===== Data Len[%d] num_topics[%d] ======\n", length, num_topics); + + for (int i = 0; i < length; i += 16) { + int remaining_chars = length - i; + remaining_chars = (remaining_chars >= 16) ? 16 : remaining_chars; + + fprintf(fp, "%p - ", &(buffer[i])); + + for (int j = 0; j < remaining_chars; j++) { + fprintf(fp, " %02X", buffer[i + j]); + + if (j == 7) { + fprintf(fp, " -"); + } + } + + fprintf(fp, " "); + + for (int j = 0; j < remaining_chars; j++) { + fprintf(fp, "%c", (char)buffer[i + j ]); + } + + fprintf(fp, "\n"); + } + + fclose(fp); +} + diff --git a/src/modules/muorb/krait/uORBKraitFastRpcChannel.hpp b/src/modules/muorb/krait/uORBKraitFastRpcChannel.hpp index 5c61f1e961..d729e6cd7b 100644 --- a/src/modules/muorb/krait/uORBKraitFastRpcChannel.hpp +++ b/src/modules/muorb/krait/uORBKraitFastRpcChannel.hpp @@ -38,7 +38,9 @@ #include #include #include "uORB/uORBCommunicator.hpp" -#include "muorbKraitFastRpcWrapper.hpp" +#include +#include +#include "drivers/drv_hrt.h" namespace uORB { @@ -119,23 +121,31 @@ public: private: // data members static uORB::KraitFastRpcChannel _Instance; uORBCommunicator::IChannelRxHandler *_RxHandler; - pthread_t _RecvThread; + pthread_t _RecvThread; bool _ThreadStarted; - bool _ShouldExit; + bool _ThreadShouldExit; static const int32_t _CONTROL_MSG_TYPE_ADD_SUBSCRIBER = 1; static const int32_t _CONTROL_MSG_TYPE_REMOVE_SUBSCRIBER = 2; static const int32_t _DATA_MSG_TYPE = 3; - muorb::KraitFastRpcWrapper _KraitWrapper; + struct BulkTransferHeader { + uint16_t _MsgNameLen; + uint16_t _DataLen; + }; + px4muorb::KraitRpcWrapper _KraitWrapper; + std::map _AdspSubscriberCache; + std::map _AdspSubscriberSampleTimestamp; + //hrt_abstime _SubCacheSampleTimestamp; + static const hrt_abstime _SubCacheRefreshRate = 1000000; // 1 second; private://class members. /// constructor. KraitFastRpcChannel(); - static void thread_start(void *handler); + static void *thread_start(void *handler); void fastrpc_recv_thread();