mirror of
https://github.com/PX4/PX4-Autopilot.git
synced 2026-06-07 09:13:32 +08:00
Merge pull request #2604 from mcharleb/muorb-updates
muorb: file rename and updates
This commit is contained in:
@@ -39,7 +39,7 @@
|
||||
ifeq ($(PX4_TARGET_OS),qurt)
|
||||
|
||||
SRCS = \
|
||||
muorb_fastrpc.cpp \
|
||||
px4muorb.cpp \
|
||||
uORBFastRpcChannel.cpp
|
||||
|
||||
INCLUDE_DIRS += \
|
||||
|
||||
@@ -30,7 +30,7 @@
|
||||
* POSSIBILITY OF SUCH DAMAGE.
|
||||
*
|
||||
****************************************************************************/
|
||||
#include "muorb_fastrpc.h"
|
||||
#include "px4muorb.hpp"
|
||||
#include "qurt.h"
|
||||
#include "uORBFastRpcChannel.hpp"
|
||||
#include "uORBManager.hpp"
|
||||
@@ -44,19 +44,20 @@
|
||||
#include "uORB/topics/sensor_combined.h"
|
||||
#include "uORB.h"
|
||||
|
||||
#include "HAP_power.h"
|
||||
|
||||
#define _ENABLE_MUORB 1
|
||||
|
||||
__BEGIN_DECLS
|
||||
|
||||
int dspal_main(int argc, const char *argv[]);
|
||||
|
||||
__END_DECLS
|
||||
extern "C" {
|
||||
int dspal_main(int argc, const char *argv[]);
|
||||
};
|
||||
|
||||
|
||||
int muorb_fastrpc_orb_initialize()
|
||||
int px4muorb_orb_initialize()
|
||||
{
|
||||
int rc = 0;
|
||||
PX4_WARN("Before calling dspal_entry() method...");
|
||||
HAP_power_request(100, 100, 1000);
|
||||
// registere the fastrpc muorb with uORBManager.
|
||||
uORB::Manager::get_instance()->set_uorb_communicator(uORB::FastRpcChannel::GetInstance());
|
||||
const char *argv[2] = { "dspal", "start" };
|
||||
@@ -66,7 +67,7 @@ int muorb_fastrpc_orb_initialize()
|
||||
return rc;
|
||||
}
|
||||
|
||||
int muorb_fastrpc_add_subscriber(const char *name)
|
||||
int px4muorb_add_subscriber(const char *name)
|
||||
{
|
||||
int rc = 0;
|
||||
uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance();
|
||||
@@ -87,7 +88,7 @@ int muorb_fastrpc_add_subscriber(const char *name)
|
||||
return rc;
|
||||
}
|
||||
|
||||
int muorb_fastrpc_remove_subscriber(const char *name)
|
||||
int px4muorb_remove_subscriber(const char *name)
|
||||
{
|
||||
int rc = 0;
|
||||
uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance();
|
||||
@@ -105,7 +106,7 @@ int muorb_fastrpc_remove_subscriber(const char *name)
|
||||
|
||||
}
|
||||
|
||||
int muorb_fastrpc_send_topic_data(const char *name, const uint8_t *data, int data_len_in_bytes)
|
||||
int px4muorb_send_topic_data(const char *name, const uint8_t *data, int data_len_in_bytes)
|
||||
{
|
||||
int rc = 0;
|
||||
uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance();
|
||||
@@ -121,7 +122,7 @@ int muorb_fastrpc_send_topic_data(const char *name, const uint8_t *data, int dat
|
||||
return rc;
|
||||
}
|
||||
|
||||
int muorb_fastrpc_is_subscriber_present(const char *topic_name, int *status)
|
||||
int px4muorb_is_subscriber_present(const char *topic_name, int *status)
|
||||
{
|
||||
int rc = 0;
|
||||
int32_t local_status = 0;
|
||||
@@ -135,20 +136,35 @@ int muorb_fastrpc_is_subscriber_present(const char *topic_name, int *status)
|
||||
return rc;
|
||||
}
|
||||
|
||||
int muorb_fastrpc_receive_msg(int *msg_type, char *topic_name, int topic_name_len, uint8_t *data, int data_len_in_bytes,
|
||||
int *bytes_returned)
|
||||
int px4muorb_receive_msg(int *msg_type, char *topic_name, int topic_name_len, uint8_t *data, int data_len_in_bytes,
|
||||
int *bytes_returned)
|
||||
{
|
||||
int rc = 0;
|
||||
int32_t local_msg_type = 0;
|
||||
int32_t local_bytes_returned = 0;
|
||||
uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance();
|
||||
//PX4_DEBUG( "topic_namePtr: [0x%p] dataPtr: [0x%p]", topic_name, data );
|
||||
rc = channel->get_data(&local_msg_type, topic_name, topic_name_len, data, data_len_in_bytes, &local_bytes_returned);
|
||||
*msg_type = (int)local_msg_type;
|
||||
*bytes_returned = (int)local_bytes_returned;
|
||||
return rc;
|
||||
}
|
||||
|
||||
int muorb_fastrpc_unblock_recieve_msg(void)
|
||||
int px4muorb_receive_bulk_data(uint8_t *bulk_transfer_buffer, int max_size_in_bytes,
|
||||
int *returned_length_in_bytes, int *topic_count)
|
||||
{
|
||||
int rc = 0;
|
||||
int32_t local_bytes_returned = 0;
|
||||
int32_t local_topic_count = 0;
|
||||
uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance();
|
||||
//PX4_DEBUG( "topic_namePtr: [0x%p] dataPtr: [0x%p]", topic_name, data );
|
||||
rc = channel->get_bulk_data(bulk_transfer_buffer, max_size_in_bytes, &local_bytes_returned, &local_topic_count);
|
||||
*returned_length_in_bytes = (int)local_bytes_returned;
|
||||
*topic_count = (int)local_topic_count;
|
||||
return rc;
|
||||
}
|
||||
|
||||
int px4muorb_unblock_recieve_msg(void)
|
||||
{
|
||||
int rc = 0;
|
||||
uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance();
|
||||
@@ -35,21 +35,24 @@
|
||||
#include <systemlib/visibility.h>
|
||||
#include <stdint.h>
|
||||
|
||||
__BEGIN_DECLS
|
||||
extern "C" {
|
||||
|
||||
int muorb_fastrpc_orb_initialize() __EXPORT;
|
||||
int px4muorb_orb_initialize() __EXPORT;
|
||||
|
||||
int muorb_fastrpc_add_subscriber(const char *name) __EXPORT;
|
||||
int px4muorb_add_subscriber(const char *name) __EXPORT;
|
||||
|
||||
int muorb_fastrpc_remove_subscriber(const char *name) __EXPORT;
|
||||
int px4muorb_remove_subscriber(const char *name) __EXPORT;
|
||||
|
||||
int muorb_fastrpc_send_topic_data(const char *name, const uint8_t *data, int data_len_in_bytes) __EXPORT;
|
||||
int px4muorb_send_topic_data(const char *name, const uint8_t *data, int data_len_in_bytes) __EXPORT;
|
||||
|
||||
int muorb_fastrpc_is_subscriber_present(const char *topic_name, int *status) __EXPORT;
|
||||
int px4muorb_is_subscriber_present(const char *topic_name, int *status) __EXPORT;
|
||||
|
||||
int muorb_fastrpc_receive_msg(int *msg_type, char *topic_name, int topic_name_len, uint8_t *data, int data_len_in_bytes,
|
||||
int *bytes_returned) __EXPORT;
|
||||
int px4muorb_receive_msg(int *msg_type, char *topic_name, int topic_name_len, uint8_t *data, int data_len_in_bytes,
|
||||
int *bytes_returned) __EXPORT;
|
||||
|
||||
int muorb_fastrpc_unblock_recieve_msg(void) __EXPORT;
|
||||
int px4muorb_receive_bulk_data(uint8_t *_BulkTransferBuffer, int max_size_in_bytes,
|
||||
int *length_in_bytes, int *topic_count) __EXPORT;
|
||||
|
||||
__END_DECLS
|
||||
int px4muorb_unblock_recieve_msg(void) __EXPORT;
|
||||
|
||||
}
|
||||
@@ -33,10 +33,24 @@
|
||||
#include "uORBFastRpcChannel.hpp"
|
||||
#include "px4_log.h"
|
||||
#include <algorithm>
|
||||
#include <drivers/drv_hrt.h>
|
||||
|
||||
// static intialization.
|
||||
uORB::FastRpcChannel uORB::FastRpcChannel::_Instance;
|
||||
|
||||
static hrt_abstime _check_time;
|
||||
static unsigned long _dropped_pkts;
|
||||
static unsigned long _get_min = 0xFFFFFF;
|
||||
static unsigned long _get_max = 0;
|
||||
static unsigned long _min_q = 200;
|
||||
static unsigned long _max_q = 0;
|
||||
static unsigned long _avg_q = 0;
|
||||
static unsigned long _count = 0;
|
||||
static unsigned long _get_bulk_min = 0xFFFFFF;
|
||||
static unsigned long _get_bulk_max = 0;
|
||||
static unsigned long _bulk_topic_count_min = 0xFFFFFF;
|
||||
static unsigned long _bulk_topic_count_max = 0;
|
||||
|
||||
//==============================================================================
|
||||
//==============================================================================
|
||||
uORB::FastRpcChannel::FastRpcChannel()
|
||||
@@ -81,11 +95,11 @@ int16_t uORB::FastRpcChannel::is_subscriber_present(const char *messageName, int
|
||||
|
||||
if (std::find(_Subscribers.begin(), _Subscribers.end(), messageName) != _Subscribers.end()) {
|
||||
*status = 1;
|
||||
PX4_DEBUG("******* Found subscriber for message[%s]....", messageName);
|
||||
//PX4_DEBUG("******* Found subscriber for message[%s]....", messageName);
|
||||
|
||||
} else {
|
||||
*status = 0;
|
||||
PX4_WARN("@@@@@ Subscriber not found for[%s]...numSubscribers[%d]", messageName, _Subscribers.size());
|
||||
//PX4_WARN("@@@@@ Subscriber not found for[%s]...numSubscribers[%d]", messageName, _Subscribers.size());
|
||||
int i = 0;
|
||||
|
||||
for (std::list<std::string>::iterator it = _Subscribers.begin(); it != _Subscribers.end(); ++it) {
|
||||
@@ -120,19 +134,23 @@ int16_t uORB::FastRpcChannel::register_handler(uORBCommunicator::IChannelRxHandl
|
||||
int16_t uORB::FastRpcChannel::send_message(const char *messageName, int32_t length, uint8_t *data)
|
||||
{
|
||||
int16_t rc = 0;
|
||||
hrt_abstime t1, t2;
|
||||
static hrt_abstime check_time = 0;
|
||||
int32_t initial_queue_size = 0;
|
||||
|
||||
if (_RemoteSubscribers.find(messageName) == _RemoteSubscribers.end()) {
|
||||
//there is no-remote subscriber. So do not queue the message.
|
||||
return rc;
|
||||
}
|
||||
|
||||
t1 = hrt_absolute_time();
|
||||
_QueueMutex.lock();
|
||||
bool overwriteData = false;
|
||||
|
||||
if (IsDataQFull()) {
|
||||
// queue is full. Overwrite the oldest data.
|
||||
PX4_WARN("[send_message] Queue Full Overwrite the oldest data. in[%ld] out[%ld] max[%ld]",
|
||||
_DataQInIndex, _DataQOutIndex, _MAX_MSG_QUEUE_SIZE);
|
||||
//PX4_WARN("[send_message] Queue Full Overwrite the oldest data. in[%ld] out[%ld] max[%ld]",
|
||||
// _DataQInIndex, _DataQOutIndex, _MAX_MSG_QUEUE_SIZE);
|
||||
_DataQOutIndex++;
|
||||
|
||||
if (_DataQOutIndex == _MAX_MSG_QUEUE_SIZE) {
|
||||
@@ -140,6 +158,7 @@ int16_t uORB::FastRpcChannel::send_message(const char *messageName, int32_t leng
|
||||
}
|
||||
|
||||
overwriteData = true;
|
||||
_dropped_pkts++;
|
||||
}
|
||||
|
||||
// now check to see if the data queue's buffer size if large enough to memcpy the data.
|
||||
@@ -158,11 +177,30 @@ int16_t uORB::FastRpcChannel::send_message(const char *messageName, int32_t leng
|
||||
}
|
||||
|
||||
// the assumption here is that each caller reads only one data from either control or data queue.
|
||||
if (!overwriteData) {
|
||||
//if (!overwriteData) {
|
||||
if (DataQSize() == 1) { // post it only of the queue moves from empty to available.
|
||||
_DataAvailableSemaphore.post();
|
||||
}
|
||||
|
||||
if ((unsigned long)DataQSize() < _min_q) { _min_q = (unsigned long)DataQSize(); }
|
||||
if ((unsigned long)DataQSize() > _max_q) { _max_q = (unsigned long)DataQSize(); }
|
||||
|
||||
_count++;
|
||||
_avg_q = ((double)((_avg_q * (_count - 1)) + (unsigned long)(DataQSize()))) / (double)(_count);
|
||||
|
||||
_QueueMutex.unlock();
|
||||
t2 = hrt_absolute_time();
|
||||
|
||||
if ((unsigned long)(t2 - check_time) > 10000000) {
|
||||
//PX4_DEBUG("MsgName: %20s, t1: %lu, t2: %lu, dt: %lu",messageName, (unsigned long) t1, (unsigned long) t2, (unsigned long) (t2-t1));
|
||||
//PX4_DEBUG("Q. Stats: min: %lu, max : %lu, avg: %lu count: %lu ", _min_q, _max_q, (unsigned long)(_avg_q * 1000.0), _count);
|
||||
_min_q = _MAX_MSG_QUEUE_SIZE * 2;
|
||||
_max_q = 0;
|
||||
_avg_q = 0;
|
||||
_count = 0;
|
||||
check_time = t2;
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
@@ -235,8 +273,13 @@ int16_t uORB::FastRpcChannel::get_data
|
||||
)
|
||||
{
|
||||
int16_t rc = 0;
|
||||
PX4_DEBUG("Get data should not be called...");
|
||||
return -1;
|
||||
// wait for data availability
|
||||
static hrt_abstime check_time = 0;
|
||||
hrt_abstime t1 = hrt_absolute_time();
|
||||
_DataAvailableSemaphore.wait();
|
||||
hrt_abstime t2 = hrt_absolute_time();
|
||||
_QueueMutex.lock();
|
||||
|
||||
if (DataQSize() != 0 || ControlQSize() != 0) {
|
||||
@@ -314,5 +357,191 @@ int16_t uORB::FastRpcChannel::get_data
|
||||
}
|
||||
|
||||
_QueueMutex.unlock();
|
||||
hrt_abstime t3 = hrt_absolute_time();
|
||||
|
||||
if ((unsigned long)(t3 - t1) > _get_max) { _get_max = (unsigned long)(t3 - t1); }
|
||||
|
||||
if ((unsigned long)(t3 - t1) < _get_min) { _get_min = (unsigned long)(t3 - t1); }
|
||||
|
||||
if ((unsigned long)(t3 - check_time) > 1000000) {
|
||||
if (rc != 0) {
|
||||
topic_name[0] = '\0';
|
||||
}
|
||||
/*
|
||||
PX4_DEBUG("GetData: %30s: t1: %lu t2: %lu t3: %lu", topic_name, (unsigned long)t1, (unsigned long)t2,
|
||||
(unsigned long)t3);
|
||||
PX4_DEBUG(".... dt1: %7lu dt2: %7lu Q: %d", (unsigned long)(t2 - t1), (unsigned long)(t3 - t2), DataQSize());
|
||||
PX4_DEBUG("ADSP RPC Stats: _get_min: %lu _get_max: %lu _dropped_pkts: %lu", _get_min, _get_max, _dropped_pkts);
|
||||
*/
|
||||
check_time = t3;
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
int16_t uORB::FastRpcChannel::get_bulk_data
|
||||
(
|
||||
uint8_t *buffer,
|
||||
int32_t max_buffer_in_bytes,
|
||||
int32_t *returned_bytes,
|
||||
int32_t *topic_count
|
||||
)
|
||||
{
|
||||
int16_t rc = 0;
|
||||
// wait for data availability
|
||||
static hrt_abstime check_time = 0;
|
||||
hrt_abstime t1 = hrt_absolute_time();
|
||||
_DataAvailableSemaphore.wait();
|
||||
hrt_abstime t2 = hrt_absolute_time();
|
||||
|
||||
_QueueMutex.lock();
|
||||
|
||||
int32_t bytes_copied = 0;
|
||||
int32_t copy_result = 0;
|
||||
*returned_bytes = 0;
|
||||
*topic_count = 0;
|
||||
int32_t topic_count_to_return = 0;
|
||||
|
||||
if (DataQSize() != 0) {
|
||||
//PX4_DEBUG( "get_bulk_data: QSize: %d", DataQSize() );
|
||||
topic_count_to_return = DataQSize();
|
||||
|
||||
while (DataQSize() != 0) {
|
||||
// this is a hack as we are using a counting semaphore. Should be re-implemented with cond_variable and wait.
|
||||
//_DataAvailableSemaphore.wait();
|
||||
if (get_data_msg_size_at(_DataQOutIndex) < (max_buffer_in_bytes - bytes_copied)) {
|
||||
// there is enough space in the buffer, copy the data.
|
||||
//PX4_DEBUG( "Coping Data to buffer..." );
|
||||
copy_result = copy_data_to_buffer(_DataQOutIndex, buffer, bytes_copied, max_buffer_in_bytes);
|
||||
|
||||
if (copy_result == -1) {
|
||||
if (bytes_copied == 0) {
|
||||
rc = -1;
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
} else {
|
||||
//PX4_DEBUG( "[%d] %02x %02x %02x %02x", *topic_count,\
|
||||
// buffer[bytes_copied], \
|
||||
// buffer[bytes_copied+1], \
|
||||
// buffer[bytes_copied+2], \
|
||||
// buffer[bytes_copied+3] );
|
||||
bytes_copied += copy_result;
|
||||
(*topic_count)++;
|
||||
*returned_bytes = bytes_copied;
|
||||
_DataQOutIndex++;
|
||||
|
||||
if (_DataQOutIndex == _MAX_MSG_QUEUE_SIZE) {
|
||||
_DataQOutIndex = 0;
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
if (bytes_copied == 0) {
|
||||
rc = -1;
|
||||
PX4_WARN("ERROR: Insufficent space in data buffer, no topics returned");
|
||||
|
||||
} else {
|
||||
PX4_DEBUG("Exiting out of the while loop...");
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
PX4_ERR("[get_data_bulk] Error: Semaphore is up when there is no data on the control/data queues");
|
||||
rc = -1;
|
||||
}
|
||||
|
||||
if (topic_count_to_return != *topic_count) {
|
||||
PX4_WARN("Not sending all topics: topics_to_return:[%d] topics_returning:[%d]", topic_count_to_return, *topic_count);
|
||||
}
|
||||
|
||||
_QueueMutex.unlock();
|
||||
hrt_abstime t3 = hrt_absolute_time();
|
||||
|
||||
if ((unsigned long)(t3 - t1) > _get_bulk_max) { _get_bulk_max = (unsigned long)(t3 - t1); }
|
||||
if ((unsigned long)(t3 - t1) < _get_bulk_min) { _get_bulk_min = (unsigned long)(t3 - t1); }
|
||||
if ((unsigned long)(*topic_count) > _bulk_topic_count_max) { _bulk_topic_count_max = (unsigned long)(*topic_count); }
|
||||
if ((unsigned long)(*topic_count) < _bulk_topic_count_min) { _bulk_topic_count_min = (unsigned long)(*topic_count); }
|
||||
if ((unsigned long)(t3 - check_time) > 10000000) {
|
||||
//PX4_DEBUG("GetData: t1: %lu t2: %lu t3: %lu", (unsigned long)t1, (unsigned long)t2, (unsigned long)t3);
|
||||
//PX4_DEBUG(".... dt1: %7lu dt2: %7lu Q: %d", (unsigned long)(t2 - t1), (unsigned long)(t3 - t2), DataQSize());
|
||||
//PX4_DEBUG("ADSP RPC Stats: _get_bulk_min: %lu _get_bulk_max: %lu _dropped_pkts: %lu", _get_bulk_min, _get_bulk_max,
|
||||
// _dropped_pkts);
|
||||
//PX4_DEBUG(" .... topic_count_min: %lu topic_count_max: %lu", _bulk_topic_count_min, _bulk_topic_count_max);
|
||||
_get_bulk_max = 0;
|
||||
_get_bulk_min = 0xFFFFFF;
|
||||
_bulk_topic_count_min = 0xFFFFFF;
|
||||
_bulk_topic_count_max = 0;
|
||||
check_time = t3;
|
||||
}
|
||||
|
||||
//PX4_DEBUG( "Returning topics: %d bytes_returned: %d", *topic_count, *returned_bytes );
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
int32_t uORB::FastRpcChannel::get_data_msg_size_at(int32_t index)
|
||||
{
|
||||
// the assumption here is that this is called within the context of semaphore,
|
||||
// hence lock/unlock is not needed.
|
||||
int32_t rc = 0;
|
||||
rc += _DataMsgQueue[ index ]._Length;
|
||||
rc += _DataMsgQueue[ index ]._MsgName.size() + 1;
|
||||
rc += _PACKET_HEADER_SIZE;
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
int32_t uORB::FastRpcChannel::copy_data_to_buffer(int32_t src_index, uint8_t *dst_buffer, int32_t offset,
|
||||
int32_t dst_buffer_len)
|
||||
{
|
||||
int32_t rc = -1;
|
||||
|
||||
// before calling this method the following are assumed:
|
||||
// * sem_lock is acquired for data protection
|
||||
// * the dst_buffer is validated to
|
||||
|
||||
// compute the different offsets to pack the packets.
|
||||
int32_t field_header_offset = offset;
|
||||
int32_t field_topic_name_offset = field_header_offset + sizeof(struct BulkTransferHeader);
|
||||
int32_t field_data_offset = field_topic_name_offset + _DataMsgQueue[ src_index ]._MsgName.size() + 1;
|
||||
|
||||
struct BulkTransferHeader header = { (uint16_t)(_DataMsgQueue[ src_index ]._MsgName.size() + 1), (uint16_t)(_DataMsgQueue[ src_index ]._Length) };
|
||||
|
||||
|
||||
//PX4_DEBUG( "Offsets: header[%d] name[%d] data[%d]",
|
||||
// field_header_offset,
|
||||
// field_topic_name_offset,
|
||||
// field_data_offset );
|
||||
|
||||
if ((field_data_offset + _DataMsgQueue[ src_index ]._Length) < dst_buffer_len) {
|
||||
memmove(&(dst_buffer[field_header_offset]), (char *)(&header), sizeof(header));
|
||||
// pack the data here.
|
||||
memmove
|
||||
(
|
||||
&(dst_buffer[field_topic_name_offset]),
|
||||
_DataMsgQueue[ src_index ]._MsgName.c_str(),
|
||||
_DataMsgQueue[ src_index ]._MsgName.size()
|
||||
);
|
||||
|
||||
if (_DataMsgQueue[ src_index ]._MsgName.size() == 0) {
|
||||
PX4_WARN("########## Error MsgName cannot be zero: ");
|
||||
}
|
||||
|
||||
dst_buffer[ field_topic_name_offset + _DataMsgQueue[ src_index ]._MsgName.size()] = '\0';
|
||||
memmove(&(dst_buffer[field_data_offset]), _DataMsgQueue[ src_index ]._Buffer, _DataMsgQueue[ src_index ]._Length);
|
||||
rc = field_data_offset + _DataMsgQueue[ src_index ]._Length - offset;
|
||||
|
||||
} else {
|
||||
PX4_WARN("Error coping the DataMsg to dst buffer, insuffienct space. ");
|
||||
PX4_WARN("... offset[%d] len[%d] data_msg_len[%d]",
|
||||
offset, dst_buffer_len, (field_data_offset - offset) + _DataMsgQueue[ src_index ]._Length);
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
@@ -123,6 +123,8 @@ public:
|
||||
int32_t *bytes_returned
|
||||
);
|
||||
|
||||
int16_t get_bulk_data(uint8_t *buffer, int32_t max_size_in_bytes, int32_t *returned_bytes, int32_t *topic_count);
|
||||
|
||||
// function to check if there are subscribers for a topic on adsp.
|
||||
int16_t is_subscriber_present(const char *messageName, int32_t *status);
|
||||
|
||||
@@ -153,6 +155,11 @@ private: // data members
|
||||
static const int32_t _CONTROL_MSG_TYPE_REMOVE_SUBSCRIBER = 2;
|
||||
static const int32_t _DATA_MSG_TYPE = 3;
|
||||
|
||||
static const int32_t _PACKET_FIELD_TOPIC_NAME_LEN_SIZE_IN_BYTES = 2;
|
||||
static const int32_t _PACKET_FIELD_DATA_LEN_IN_BYTES = 2;
|
||||
static const int32_t _PACKET_HEADER_SIZE =
|
||||
_PACKET_FIELD_TOPIC_NAME_LEN_SIZE_IN_BYTES + _PACKET_FIELD_DATA_LEN_IN_BYTES;
|
||||
|
||||
struct FastRpcDataMsg {
|
||||
int32_t _MaxBufferSize;
|
||||
int32_t _Length;
|
||||
@@ -165,6 +172,12 @@ private: // data members
|
||||
std::string _MsgName;
|
||||
};
|
||||
|
||||
struct BulkTransferHeader {
|
||||
uint16_t _MsgNameLen;
|
||||
uint16_t _DataLen;
|
||||
};
|
||||
|
||||
|
||||
struct FastRpcDataMsg _DataMsgQueue[ _MAX_MSG_QUEUE_SIZE ];
|
||||
int32_t _DataQInIndex;
|
||||
int32_t _DataQOutIndex;
|
||||
@@ -245,6 +258,9 @@ private://class members.
|
||||
int32_t DataQSize();
|
||||
int32_t ControlQSize();
|
||||
|
||||
int32_t get_data_msg_size_at(int32_t index);
|
||||
int32_t copy_data_to_buffer(int32_t src_index, uint8_t *dst_buffer, int32_t offset, int32_t dst_buffer_len);
|
||||
|
||||
std::set<std::string> _RemoteSubscribers;
|
||||
};
|
||||
|
||||
|
||||
@@ -44,6 +44,5 @@ INCLUDE_DIRS += $(EXT_MUORB_LIB_ROOT)/krait/include \
|
||||
$(PX4_BASE)/src/modules/uORB \
|
||||
$(PX4_BASE)/src/modules
|
||||
|
||||
EXTRA_LIBS += $(EXT_MUORB_LIB_ROOT)/krait/libs/libmuorb.so
|
||||
EXTRA_LIBS += $(EXT_MUORB_LIB_ROOT)/krait/libs/libpx4muorb.so
|
||||
endif
|
||||
|
||||
|
||||
@@ -33,19 +33,37 @@
|
||||
|
||||
#include "uORBKraitFastRpcChannel.hpp"
|
||||
#include "px4_log.h"
|
||||
|
||||
#include "px4_tasks.h"
|
||||
#include <drivers/drv_hrt.h>
|
||||
#include <cstdio>
|
||||
|
||||
#define LOG_TAG "uORBKraitFastRpcChannel.cpp"
|
||||
|
||||
static void DumpData(uint8_t *buffer, int32_t length, int32_t num_topics);
|
||||
|
||||
static std::string _log_file_name = "./hex_dump.txt";
|
||||
|
||||
// static intialization.
|
||||
uORB::KraitFastRpcChannel uORB::KraitFastRpcChannel::_Instance;
|
||||
|
||||
static unsigned long _snd_msg_min = 0xFFFFFF;
|
||||
static unsigned long _snd_msg_max = 0;
|
||||
static double _snd_msg_avg = 0.0;
|
||||
static unsigned long _snd_msg_count = 0;
|
||||
static unsigned long _overall_snd_min = 0xFFFFFF;
|
||||
static unsigned long _overall_snd_max = 0;
|
||||
static double _overall_snd_avg = 0.0;
|
||||
static unsigned long _overall_snd_count = 0;
|
||||
static hrt_abstime _log_check_time = 0;
|
||||
static hrt_abstime _log_check_interval = 10000000;
|
||||
|
||||
|
||||
//==============================================================================
|
||||
//==============================================================================
|
||||
uORB::KraitFastRpcChannel::KraitFastRpcChannel()
|
||||
: _RxHandler(nullptr)
|
||||
, _ThreadStarted(false)
|
||||
, _ShouldExit(false)
|
||||
, _ThreadShouldExit(false)
|
||||
{
|
||||
_KraitWrapper.Initialize();
|
||||
}
|
||||
@@ -91,17 +109,63 @@ int16_t uORB::KraitFastRpcChannel::send_message(const char *messageName, int32_t
|
||||
// invoke the fast rpc call to send data defined in idl.
|
||||
//PX4_DEBUG( "Before calling send_data for [%s] len[%d]\n", messageName.c_str(), length );
|
||||
int32_t status = 0;
|
||||
hrt_abstime t1, t4;
|
||||
hrt_abstime t2 = 0, t3 = 0;
|
||||
t1 = hrt_absolute_time();
|
||||
|
||||
if (_KraitWrapper.IsSubscriberPresent(messageName, &status) == 0) {
|
||||
if (status > 0) { // there are remote subscribers
|
||||
rc = _KraitWrapper.SendData(messageName, length, data);
|
||||
//PX4_DEBUG( "***** SENDING[%s] topic to remote....\n", messageName.c_str() );
|
||||
|
||||
} else {
|
||||
//PX4_DEBUG( "******* NO SUBSCRIBER PRESENT ON THE REMOTE FOR topic[%s] \n", messageName.c_str() );
|
||||
if (_AdspSubscriberCache.find(std::string(messageName)) == _AdspSubscriberCache.end()) {
|
||||
// check the status from adsp. as it is not cached.
|
||||
if (_KraitWrapper.IsSubscriberPresent(messageName, &status) == 0) {
|
||||
_AdspSubscriberCache[messageName] = status;
|
||||
_AdspSubscriberSampleTimestamp[messageName] = hrt_absolute_time();
|
||||
}
|
||||
|
||||
} else {
|
||||
PX4_ERR("Error returned for KraitWrapper.IsSubscriberPresent(%s)\n", messageName);
|
||||
if ((hrt_absolute_time() - _AdspSubscriberSampleTimestamp[messageName]) > _SubCacheRefreshRate) {
|
||||
if (_KraitWrapper.IsSubscriberPresent(messageName, &status) == 0) {
|
||||
_AdspSubscriberCache[messageName] = status;
|
||||
_AdspSubscriberSampleTimestamp[messageName] = hrt_absolute_time();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (_AdspSubscriberCache[messageName] > 0) {// there are remote subscribers
|
||||
t2 = hrt_absolute_time();
|
||||
rc = _KraitWrapper.SendData(messageName, length, data);
|
||||
t3 = hrt_absolute_time();
|
||||
_snd_msg_count++;
|
||||
//PX4_DEBUG( "***** SENDING[%s] topic to remote....\n", messageName.c_str() );
|
||||
|
||||
} else {
|
||||
//PX4_DEBUG( "******* NO SUBSCRIBER PRESENT ON THE REMOTE FOR topic[%s] \n", messageName.c_str() );
|
||||
}
|
||||
|
||||
t4 = hrt_absolute_time();
|
||||
_overall_snd_count++;
|
||||
|
||||
if ((t4 - t1) < _overall_snd_min) { _overall_snd_min = (t4 - t1); }
|
||||
|
||||
if ((t4 - t1) > _overall_snd_max) { _overall_snd_max = (t4 - t1); }
|
||||
|
||||
if (_AdspSubscriberCache[messageName] > 0) {
|
||||
if ((t3 - t2) < _snd_msg_min) { _snd_msg_min = (t3 - t2); }
|
||||
|
||||
if ((t3 - t2) > _snd_msg_max) { _snd_msg_max = (t3 - t2); }
|
||||
|
||||
_snd_msg_avg = ((double)((_snd_msg_avg * (_snd_msg_count - 1)) + (unsigned long)(t3 - t2))) / (double)(_snd_msg_count);
|
||||
}
|
||||
|
||||
_overall_snd_avg = ((double)((_overall_snd_avg * (_overall_snd_count - 1)) + (unsigned long)(t4 - t1))) / (double)(
|
||||
_overall_snd_count);
|
||||
|
||||
|
||||
|
||||
if ((t4 - _log_check_time) > _log_check_interval) {
|
||||
_log_check_time = t4;
|
||||
_overall_snd_min = _snd_msg_min = 0xFFFFFFF;
|
||||
_overall_snd_max = _snd_msg_max = 0;
|
||||
_overall_snd_count = _snd_msg_count = 0;
|
||||
_overall_snd_avg = _snd_msg_avg = 0.0;
|
||||
}
|
||||
|
||||
//PX4_DEBUG( "Response for SendMessage for [%s],len[%d] rc[%d]\n", messageName.c_str(), length, rc );
|
||||
@@ -111,80 +175,181 @@ int16_t uORB::KraitFastRpcChannel::send_message(const char *messageName, int32_t
|
||||
void uORB::KraitFastRpcChannel::Start()
|
||||
{
|
||||
_ThreadStarted = true;
|
||||
pthread_create(&_RecvThread, NULL, thread_start, this);
|
||||
_ThreadShouldExit = false;
|
||||
pthread_attr_t recv_thread_attr;
|
||||
pthread_attr_init(&recv_thread_attr);
|
||||
|
||||
struct sched_param param;
|
||||
(void)pthread_attr_getschedparam(&recv_thread_attr, ¶m);
|
||||
param.sched_priority = SCHED_PRIORITY_MAX - 80;
|
||||
(void)pthread_attr_setschedparam(&recv_thread_attr, ¶m);
|
||||
|
||||
pthread_attr_setstacksize(&recv_thread_attr, 4096);
|
||||
|
||||
if (pthread_create(&_RecvThread, &recv_thread_attr, thread_start, (void *)this) != 0) {
|
||||
PX4_ERR("Error creating the receive thread for muorb");
|
||||
|
||||
} else {
|
||||
pthread_setname_np(_RecvThread, "muorb_krait_receiver");
|
||||
}
|
||||
|
||||
pthread_attr_destroy(&recv_thread_attr);
|
||||
}
|
||||
|
||||
void uORB::KraitFastRpcChannel::Stop()
|
||||
{
|
||||
_ShouldExit = true;
|
||||
_ThreadShouldExit = true;
|
||||
_KraitWrapper.UnblockReceiveData();
|
||||
PX4_DEBUG("After calling krait_wrapper_unlock_receive_Data...\n");
|
||||
PX4_DEBUG("After calling _KraitWrapper.UnblockReceiveData()...\n");
|
||||
pthread_join(_RecvThread, NULL);
|
||||
PX4_DEBUG("*** After calling thread wait...\n");
|
||||
PX4_DEBUG("*** After calling pthread_join...\n");
|
||||
_ThreadStarted = false;
|
||||
_ShouldExit = false;
|
||||
}
|
||||
|
||||
|
||||
void uORB::KraitFastRpcChannel::thread_start(void *handler)
|
||||
void *uORB::KraitFastRpcChannel::thread_start(void *handler)
|
||||
{
|
||||
if (handler != nullptr) {
|
||||
((uORB::KraitFastRpcChannel *)handler)->fastrpc_recv_thread();
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void uORB::KraitFastRpcChannel::fastrpc_recv_thread()
|
||||
{
|
||||
// sit in while loop.
|
||||
int32_t rc = 0;
|
||||
int32_t type = 0;
|
||||
char *name = nullptr;
|
||||
int32_t data_length = 0;
|
||||
uint8_t *data = nullptr;
|
||||
unsigned long rpc_min, rpc_max;
|
||||
unsigned long orb_min, orb_max;
|
||||
double rpc_avg, orb_avg;
|
||||
unsigned long count = 0;
|
||||
rpc_max = orb_max = 0;
|
||||
rpc_min = orb_min = 0xFFFFFFFF;
|
||||
rpc_avg = orb_avg = 0.0;
|
||||
|
||||
while (!_ShouldExit) {
|
||||
int32_t num_topics = 0;
|
||||
|
||||
hrt_abstime check_time = 0;
|
||||
|
||||
while (!_ThreadShouldExit) {
|
||||
hrt_abstime t1, t2, t3;
|
||||
t1 = hrt_absolute_time();
|
||||
// call the fastrpc recv data call.
|
||||
//uorb_fastrpc_recieve( &type, &name_len, name, &data_length, data );
|
||||
rc = _KraitWrapper.ReceiveData(&type, &name, &data_length, &data);
|
||||
//rc = _KraitWrapper.ReceiveData(&type, &name, &data_length, &data);
|
||||
rc = _KraitWrapper.ReceiveBulkData(&data, &data_length, &num_topics);
|
||||
|
||||
t2 = hrt_absolute_time();
|
||||
|
||||
if (rc == 0) {
|
||||
switch (type) {
|
||||
case _CONTROL_MSG_TYPE_ADD_SUBSCRIBER:
|
||||
if (_RxHandler != nullptr) {
|
||||
_RxHandler->process_add_subscription(name, 1);
|
||||
PX4_DEBUG("Received add subscriber control message for: [%s]\n", name);
|
||||
//PX4_DEBUG( "Num of topics Received: %d", num_topics );
|
||||
int32_t bytes_processed = 0;
|
||||
|
||||
for (int i = 0; i < num_topics; ++i) {
|
||||
uint8_t *new_pkt = &(data[bytes_processed]);
|
||||
struct BulkTransferHeader *header = (struct BulkTransferHeader *)new_pkt;
|
||||
char *messageName = (char *)(new_pkt + sizeof(struct BulkTransferHeader));
|
||||
uint16_t check_msg_len = strlen(messageName);
|
||||
|
||||
if (header->_MsgNameLen != (check_msg_len + 1)) {
|
||||
PX4_ERR("Error: Packing error. Sent Msg Len. of[%d] but strlen returned:[%d]", header->_MsgNameLen , check_msg_len);
|
||||
PX4_ERR("Error: NumTopics: %d processing topic: %d msgLen[%d] dataLen[%d] data_len[%d] bytes processed: %d",
|
||||
num_topics, i, header->_MsgNameLen, header->_DataLen , data_length, bytes_processed);
|
||||
DumpData(data, data_length, num_topics);
|
||||
break;
|
||||
}
|
||||
|
||||
break;
|
||||
uint8_t *topic_data = (uint8_t *)(messageName + strlen(messageName) + 1);
|
||||
|
||||
case _CONTROL_MSG_TYPE_REMOVE_SUBSCRIBER:
|
||||
if (_RxHandler != nullptr) {
|
||||
_RxHandler->process_remove_subscription(name);
|
||||
PX4_DEBUG("Received remove subscriber control message for: [%s]\n", name);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case _DATA_MSG_TYPE:
|
||||
if (_RxHandler != nullptr) {
|
||||
_RxHandler->process_received_message(name,
|
||||
data_length, data);
|
||||
_RxHandler->process_received_message(messageName,
|
||||
header->_DataLen, topic_data);
|
||||
//PX4_DEBUG( "Received topic data for control message for: [%s] len[%d]\n", name, data_length );
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
// error condition.
|
||||
break;
|
||||
bytes_processed += header->_MsgNameLen + header->_DataLen + sizeof(struct BulkTransferHeader);
|
||||
}
|
||||
|
||||
} else {
|
||||
PX4_DEBUG("Error: Getting data over fastRPC channel\n");
|
||||
break;
|
||||
}
|
||||
|
||||
t3 = hrt_absolute_time();
|
||||
count++;
|
||||
|
||||
if ((unsigned long)(t2 - t1) < rpc_min) {
|
||||
rpc_min = (unsigned long)(t2 - t1);
|
||||
}
|
||||
if ((unsigned long)(t2 - t1) > rpc_max) {
|
||||
rpc_max = (unsigned long)(t2 - t1);
|
||||
}
|
||||
if ((unsigned long)(t3 - t2) < orb_min) {
|
||||
orb_min = (unsigned long)(t3 - t2);
|
||||
}
|
||||
if ((unsigned long)(t3 - t2) > orb_max) {
|
||||
orb_max = (unsigned long)(t3 - t2);
|
||||
}
|
||||
|
||||
rpc_avg = ((double)((rpc_avg * (count - 1)) + (unsigned long)(t2 - t1))) / (double)(count);
|
||||
orb_avg = ((double)((orb_avg * (count - 1)) + (unsigned long)(t3 - t2))) / (double)(count);
|
||||
|
||||
if ((unsigned long)(t3 - check_time) >= 10000000) {
|
||||
//PX4_DEBUG("Krait RPC Stats : rpc_min: %lu rpc_max: %lu rpc_avg: %f", rpc_min, rpc_max, rpc_avg);
|
||||
//PX4_DEBUG("Krait RPC(orb) Stats: orb_min: %lu orb_max: %lu orb_avg: %f", orb_min, orb_max, orb_avg);
|
||||
check_time = t3;
|
||||
rpc_max = orb_max = 0;
|
||||
rpc_min = orb_min = 0xFFFFFF;
|
||||
orb_avg = 0;
|
||||
rpc_avg = 0;
|
||||
count = 0;
|
||||
}
|
||||
|
||||
//PX4_DEBUG("MsgName: %30s, t1: %lu, t2: %lu, t3: %lu, dt1: %lu, dt2: %lu",name, (unsigned long) t1, (unsigned long) t2, (unsigned long) t3,
|
||||
// (unsigned long) (t2-t1), (unsigned long) (t3-t2));
|
||||
|
||||
}
|
||||
|
||||
PX4_DEBUG("[uORB::KraitFastRpcChannel::fastrpc_recv_thread] Exiting fastrpc_recv_thread\n");
|
||||
}
|
||||
|
||||
void DumpData(uint8_t *buffer, int32_t length, int32_t num_topics)
|
||||
{
|
||||
FILE *fp = fopen(_log_file_name.c_str(), "a+");
|
||||
|
||||
if (fp == nullptr) {
|
||||
PX4_ERR("Error unable to open log file[%s]", _log_file_name.c_str());
|
||||
return;
|
||||
}
|
||||
|
||||
fprintf(fp, "===== Data Len[%d] num_topics[%d] ======\n", length, num_topics);
|
||||
|
||||
for (int i = 0; i < length; i += 16) {
|
||||
int remaining_chars = length - i;
|
||||
remaining_chars = (remaining_chars >= 16) ? 16 : remaining_chars;
|
||||
|
||||
fprintf(fp, "%p - ", &(buffer[i]));
|
||||
|
||||
for (int j = 0; j < remaining_chars; j++) {
|
||||
fprintf(fp, " %02X", buffer[i + j]);
|
||||
|
||||
if (j == 7) {
|
||||
fprintf(fp, " -");
|
||||
}
|
||||
}
|
||||
|
||||
fprintf(fp, " ");
|
||||
|
||||
for (int j = 0; j < remaining_chars; j++) {
|
||||
fprintf(fp, "%c", (char)buffer[i + j ]);
|
||||
}
|
||||
|
||||
fprintf(fp, "\n");
|
||||
}
|
||||
|
||||
fclose(fp);
|
||||
}
|
||||
|
||||
|
||||
@@ -38,7 +38,9 @@
|
||||
#include <string>
|
||||
#include <pthread.h>
|
||||
#include "uORB/uORBCommunicator.hpp"
|
||||
#include "muorbKraitFastRpcWrapper.hpp"
|
||||
#include <px4_muorb/px4muorb_KraitRpcWrapper.hpp>
|
||||
#include <map>
|
||||
#include "drivers/drv_hrt.h"
|
||||
|
||||
namespace uORB
|
||||
{
|
||||
@@ -119,23 +121,31 @@ public:
|
||||
private: // data members
|
||||
static uORB::KraitFastRpcChannel _Instance;
|
||||
uORBCommunicator::IChannelRxHandler *_RxHandler;
|
||||
pthread_t _RecvThread;
|
||||
pthread_t _RecvThread;
|
||||
bool _ThreadStarted;
|
||||
bool _ShouldExit;
|
||||
bool _ThreadShouldExit;
|
||||
|
||||
static const int32_t _CONTROL_MSG_TYPE_ADD_SUBSCRIBER = 1;
|
||||
static const int32_t _CONTROL_MSG_TYPE_REMOVE_SUBSCRIBER = 2;
|
||||
static const int32_t _DATA_MSG_TYPE = 3;
|
||||
|
||||
muorb::KraitFastRpcWrapper _KraitWrapper;
|
||||
struct BulkTransferHeader {
|
||||
uint16_t _MsgNameLen;
|
||||
uint16_t _DataLen;
|
||||
};
|
||||
|
||||
px4muorb::KraitRpcWrapper _KraitWrapper;
|
||||
|
||||
std::map<std::string, int32_t> _AdspSubscriberCache;
|
||||
std::map<std::string, hrt_abstime> _AdspSubscriberSampleTimestamp;
|
||||
//hrt_abstime _SubCacheSampleTimestamp;
|
||||
static const hrt_abstime _SubCacheRefreshRate = 1000000; // 1 second;
|
||||
|
||||
private://class members.
|
||||
/// constructor.
|
||||
KraitFastRpcChannel();
|
||||
|
||||
static void thread_start(void *handler);
|
||||
static void *thread_start(void *handler);
|
||||
|
||||
void fastrpc_recv_thread();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user