mirror of
https://github.com/PX4/PX4-Autopilot.git
synced 2026-06-01 11:06:04 +08:00
Send RTPS header and payload in one stream
This avoids assembling the header and payload on the receiver side
This commit is contained in:
@@ -79,6 +79,7 @@ void* send(void* /*unused*/)
|
|||||||
uint64_t sent = 0, total_sent = 0;
|
uint64_t sent = 0, total_sent = 0;
|
||||||
int loop = 0, read = 0;
|
int loop = 0, read = 0;
|
||||||
uint32_t length = 0;
|
uint32_t length = 0;
|
||||||
|
uint16_t header_length = 0;
|
||||||
|
|
||||||
/* subscribe to topics */
|
/* subscribe to topics */
|
||||||
int fds[@(len(send_topics))] = {};
|
int fds[@(len(send_topics))] = {};
|
||||||
@@ -91,7 +92,8 @@ void* send(void* /*unused*/)
|
|||||||
|
|
||||||
// microBuffer to serialized using the user defined buffer
|
// microBuffer to serialized using the user defined buffer
|
||||||
struct microBuffer microBufferWriter;
|
struct microBuffer microBufferWriter;
|
||||||
initStaticAlignedBuffer(data_buffer, BUFFER_SIZE, µBufferWriter);
|
header_length=transport_node->get_header_length();
|
||||||
|
initStaticAlignedBuffer(&data_buffer[header_length], BUFFER_SIZE-header_length, µBufferWriter);
|
||||||
// microCDR structs for managing the microBuffer
|
// microCDR structs for managing the microBuffer
|
||||||
struct microCDR microCDRWriter;
|
struct microCDR microCDRWriter;
|
||||||
initMicroCDR(µCDRWriter, µBufferWriter);
|
initMicroCDR(µCDRWriter, µBufferWriter);
|
||||||
@@ -110,7 +112,9 @@ void* send(void* /*unused*/)
|
|||||||
struct @(topic)_s data;
|
struct @(topic)_s data;
|
||||||
// copy raw data into local buffer
|
// copy raw data into local buffer
|
||||||
if (orb_copy(ORB_ID(@(topic)), fds[@(idx)], &data) == 0) {
|
if (orb_copy(ORB_ID(@(topic)), fds[@(idx)], &data) == 0) {
|
||||||
serialize_@(topic)(&data, data_buffer, &length, µCDRWriter);
|
/* payload is shifted by header length to make room for header*/
|
||||||
|
serialize_@(topic)(&data, &data_buffer[header_length], &length, µCDRWriter);
|
||||||
|
|
||||||
if (0 < (read = transport_node->write((char)@(message_id(topic)), data_buffer, length)))
|
if (0 < (read = transport_node->write((char)@(message_id(topic)), data_buffer, length)))
|
||||||
{
|
{
|
||||||
total_sent += read;
|
total_sent += read;
|
||||||
|
|||||||
@@ -193,12 +193,14 @@ void t_send(void *data)
|
|||||||
// Send subscribed topics over UART
|
// Send subscribed topics over UART
|
||||||
while (topics.hasMsg(&topic_ID))
|
while (topics.hasMsg(&topic_ID))
|
||||||
{
|
{
|
||||||
eprosima::fastcdr::FastBuffer cdrbuffer(data_buffer, sizeof(data_buffer));
|
uint16_t header_length = get_header_length();
|
||||||
|
/* make room for the header to fill in later */
|
||||||
|
eprosima::fastcdr::FastBuffer cdrbuffer(&data_buffer[header_length], sizeof(data_buffer)-header_length);
|
||||||
eprosima::fastcdr::Cdr scdr(cdrbuffer);
|
eprosima::fastcdr::Cdr scdr(cdrbuffer);
|
||||||
if (topics.getMsg(topic_ID, scdr))
|
if (topics.getMsg(topic_ID, scdr))
|
||||||
{
|
{
|
||||||
length = scdr.getSerializedDataLength();
|
length = scdr.getSerializedDataLength();
|
||||||
if (0 < (length = transport_node->write(topic_ID, scdr.getBufferPointer(), length)))
|
if (0 < (length = transport_node->write(topic_ID, data_buffer, length)))
|
||||||
{
|
{
|
||||||
total_sent += length;
|
total_sent += length;
|
||||||
++sent;
|
++sent;
|
||||||
|
|||||||
@@ -35,6 +35,7 @@
|
|||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
|
#include <cstdlib>
|
||||||
|
|
||||||
#include "microRTPS_transport.h"
|
#include "microRTPS_transport.h"
|
||||||
|
|
||||||
@@ -193,6 +194,11 @@ ssize_t Transport_node::read(uint8_t *topic_ID, char out_buffer[], size_t buffer
|
|||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ssize_t Transport_node::get_header_length()
|
||||||
|
{
|
||||||
|
return sizeof(struct Header);
|
||||||
|
}
|
||||||
|
|
||||||
ssize_t Transport_node::write(const uint8_t topic_ID, char buffer[], size_t length)
|
ssize_t Transport_node::write(const uint8_t topic_ID, char buffer[], size_t length)
|
||||||
{
|
{
|
||||||
if (!fds_OK()) {
|
if (!fds_OK()) {
|
||||||
@@ -214,7 +220,7 @@ ssize_t Transport_node::write(const uint8_t topic_ID, char buffer[], size_t leng
|
|||||||
|
|
||||||
// [>,>,>,topic_ID,seq,payload_length,CRCHigh,CRCLow,payload_start, ... ,payload_end]
|
// [>,>,>,topic_ID,seq,payload_length,CRCHigh,CRCLow,payload_start, ... ,payload_end]
|
||||||
|
|
||||||
uint16_t crc = crc16((uint8_t *)buffer, length);
|
uint16_t crc = crc16((uint8_t *)&buffer[sizeof(header)], length);
|
||||||
|
|
||||||
header.topic_ID = topic_ID;
|
header.topic_ID = topic_ID;
|
||||||
header.seq = seq++;
|
header.seq = seq++;
|
||||||
@@ -223,18 +229,13 @@ ssize_t Transport_node::write(const uint8_t topic_ID, char buffer[], size_t leng
|
|||||||
header.crc_h = (crc >> 8) & 0xff;
|
header.crc_h = (crc >> 8) & 0xff;
|
||||||
header.crc_l = crc & 0xff;
|
header.crc_l = crc & 0xff;
|
||||||
|
|
||||||
ssize_t len = node_write(&header, sizeof(header));
|
/* Headroom for header is created in client */
|
||||||
|
/*Fill in the header in the same payload buffer to call a single node_write */
|
||||||
if (len != sizeof(header)) {
|
memcpy(buffer, &header, sizeof(header));
|
||||||
|
ssize_t len = node_write(buffer, length + sizeof(header));
|
||||||
|
if (len != ssize_t(length + sizeof(header))) {
|
||||||
goto err;
|
goto err;
|
||||||
}
|
}
|
||||||
|
|
||||||
len = node_write(buffer, length);
|
|
||||||
|
|
||||||
if (len != ssize_t(length)) {
|
|
||||||
goto err;
|
|
||||||
}
|
|
||||||
|
|
||||||
return len + sizeof(header);
|
return len + sizeof(header);
|
||||||
|
|
||||||
err:
|
err:
|
||||||
|
|||||||
@@ -46,6 +46,8 @@ public:
|
|||||||
virtual uint8_t close() {return 0;}
|
virtual uint8_t close() {return 0;}
|
||||||
ssize_t read(uint8_t *topic_ID, char out_buffer[], size_t buffer_len);
|
ssize_t read(uint8_t *topic_ID, char out_buffer[], size_t buffer_len);
|
||||||
ssize_t write(const uint8_t topic_ID, char buffer[], size_t length);
|
ssize_t write(const uint8_t topic_ID, char buffer[], size_t length);
|
||||||
|
/* Get the Length of struct Header to make headroom for the size of struct Header alongwith payload*/
|
||||||
|
ssize_t get_header_length();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual ssize_t node_read(void *buffer, size_t len) = 0;
|
virtual ssize_t node_read(void *buffer, size_t len) = 0;
|
||||||
|
|||||||
Reference in New Issue
Block a user