diff --git a/src/drivers/protocol_splitter/protocol_splitter.cpp b/src/drivers/protocol_splitter/protocol_splitter.cpp index 19164904b4b..1464606228f 100644 --- a/src/drivers/protocol_splitter/protocol_splitter.cpp +++ b/src/drivers/protocol_splitter/protocol_splitter.cpp @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (c) 2016 PX4 Development Team. All rights reserved. + * Copyright (c) 2016-2021 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -33,8 +33,11 @@ /** * @file protocol_splitter.cpp - * NuttX Driver to multiplex mavlink and RTPS on a single serial port. - * Makes sure the two protocols can be read & written simultanously by 2 processes. + * + * NuttX Driver to multiplex MAVLink and RTPS on a single serial port. + * Makes sure the two protocols can be read & written simultaneously by two + * processes. + * * It will create two devices: * /dev/mavlink * /dev/rtps @@ -43,13 +46,18 @@ #include #include #include +#include #include #include #include #include +#include #include -#include +#include + + +static constexpr uint64_t reader_timeout_us = 1000000; class Mavlink2Dev; class RtpsDev; @@ -58,28 +66,27 @@ class ReadBuffer; extern "C" __EXPORT int protocol_splitter_main(int argc, char *argv[]); /* -MessageType is in MSB of header[1] - | - v - Mavlink 0000 0000b - Rtps 1000 0000b + MessageType is in MSB of header[1] + | + v + Mavlink 0000 0000b + Rtps 1000 0000b */ -enum MessageType {Mavlink = 0x00, Rtps = 0x01}; +enum class MessageType : uint8_t {Mavlink = 0x00, Rtps = 0x01}; -const char Sp2HeaderMagic = 'S'; -const int Sp2HeaderSize = 4; +constexpr char Sp2HeaderMagic = 'S'; +constexpr int Sp2HeaderSize = 4; /* -Header Structure: + Header Structure: - bits: 1 2 3 4 5 6 7 8 -header[0] - | Magic | -header[1] - |T| LenH | -header[2] - | LenL | -header[3] - | Checksum | + bits: 1 2 3 4 5 6 7 8 + header[0] - | Magic | + header[1] - |T| LenH | + header[2] - | LenL | + header[3] - | Checksum | */ -typedef union __attribute__((packed)) -{ +union __attribute__((packed)) Sp2Header { uint8_t bytes[4]; struct { char magic; // 'S' @@ -88,7 +95,7 @@ typedef union __attribute__((packed)) uint8_t len_l; // Length LSB uint8_t checksum; // XOR of the three bytes above } fields; -} Sp2Header_t; +}; struct StaticData { Mavlink2Dev *mavlink2; @@ -111,24 +118,34 @@ public: void copy(void *dest, size_t pos, size_t n); void remove(size_t pos, size_t n); - uint8_t buffer[512] = {}; + void print_stats(); + + uint8_t buffer[1024] = {}; size_t buf_size = 0; - static const size_t BUFFER_THRESHOLD = sizeof(buffer) * 0.8; + // We keep track of the first Mavlink and Rtps packet in the buffer. + // If start and end are equal there is no packet. + size_t start_mavlink = 0; + size_t end_mavlink = 0; + size_t start_rtps = 0; + size_t end_rtps = 0; + // Just for stats. + size_t mavlink_parsed = 0; + size_t rtps_parsed = 0; + size_t bytes_received = 0; + size_t header_bytes_received = 0; + + // To keep track of readers. + hrt_abstime last_mavlink_read = 0; + hrt_abstime last_rtps_read = 0; }; int ReadBuffer::read(int fd) { - /* Discard whole buffer if it's filled beyond a threshold, - * This should prevent buffer being filled by garbage that - * no reader (MAVLink or RTPS) can understand. - * - * TODO: a better approach would be checking if both reader - * start understanding messages beyond a certain buffer size, - * meaning that everything before is garbage. - */ - if (buf_size > BUFFER_THRESHOLD) { - buf_size = 0; + if (sizeof(buffer) == buf_size) { + // This happens if one consumer does not read the data, or not fast enough. + // TODO: add a mechanism to thrown away data if a user is no longer reading. + PX4_DEBUG("Buffer full: %zu %zu %zu %zu", start_mavlink, end_mavlink, start_rtps, end_rtps); } int r = ::read(fd, buffer + buf_size, sizeof(buffer) - buf_size); @@ -138,6 +155,7 @@ int ReadBuffer::read(int fd) } buf_size += r; + bytes_received += r; return r; } @@ -148,7 +166,7 @@ void ReadBuffer::copy(void *dest, size_t pos, size_t n) ASSERT(pos + n <= buf_size); if (dest) { - memmove(dest, buffer + pos, n); // send desired data + memcpy(dest, buffer + pos, n); } } @@ -157,14 +175,52 @@ void ReadBuffer::remove(size_t pos, size_t n) ASSERT(pos < buf_size); ASSERT(pos + n <= buf_size); - memmove(buffer + pos, buffer + (pos + n), sizeof(buffer) - pos - n); + memmove(buffer + pos, buffer + (pos + n), buf_size - pos - n); buf_size -= n; } +void ReadBuffer::print_stats() +{ + PX4_INFO_RAW("\tReceived:\n"); + PX4_INFO_RAW("\tTotal: %9zu bytes\n", + bytes_received); + PX4_INFO_RAW("\tHeaders: %9zu bytes (%5.1f %%)\n", + header_bytes_received, + static_cast(static_cast(header_bytes_received) + / static_cast(bytes_received) + * 100.f)); + PX4_INFO_RAW("\tMAVLink: %9zu bytes (%5.1f %%)\n", + mavlink_parsed, + static_cast(static_cast(mavlink_parsed) + / static_cast(bytes_received - header_bytes_received) + * 100.f)); + PX4_INFO_RAW("\tRTPS: %9zu bytes (%5.1f %%)\n", + rtps_parsed, + static_cast(static_cast(rtps_parsed) + / static_cast(bytes_received - header_bytes_received) + * 100.f)); + + size_t lost = bytes_received - mavlink_parsed - rtps_parsed - header_bytes_received; + + if (end_mavlink > start_mavlink) { + lost -= end_mavlink - start_mavlink; + } + + if (end_rtps > start_rtps) { + lost -= end_rtps - start_rtps; + } + + PX4_INFO_RAW("\tUnused: %9zu bytes (%5.1f %%)\n", lost, + static_cast(static_cast(lost) + / static_cast(bytes_received) + * 100.f)); +} + + class DevCommon : public cdev::CDev { public: - DevCommon(const char *device_path); + DevCommon(const char *device_path, ReadBuffer *read_buffer); virtual ~DevCommon(); virtual int ioctl(struct file *filp, int cmd, unsigned long arg); @@ -176,10 +232,12 @@ public: protected: - Sp2Header_t _header; - virtual pollevent_t poll_state(struct file *filp); + int try_to_copy_data(char *buffer, size_t buflen, MessageType message_type); + void scan_for_packets(); + void check_for_timeouts(); + void cleanup(); void lock(enum Operation op) { @@ -201,6 +259,8 @@ protected: int _fd = -1; + Sp2Header _header; + uint16_t _packet_len; enum class ParserState : uint8_t { Idle = 0, @@ -208,20 +268,19 @@ protected: }; ParserState _parser_state = ParserState::Idle; - bool _had_data = false; ///< whether poll() returned available data - -private: + ReadBuffer *_read_buffer; }; -DevCommon::DevCommon(const char *device_path) +DevCommon::DevCommon(const char *device_path, ReadBuffer *read_buffer) : CDev(device_path) + , _read_buffer(read_buffer) { } DevCommon::~DevCommon() { if (_fd >= 0) { - /* discard all pending data, as close() might block otherwise on NuttX with flow control enabled */ + // discard all pending data, as close() might block otherwise on NuttX with flow control enabled tcflush(_fd, TCIOFLUSH); ::close(_fd); } @@ -229,8 +288,8 @@ DevCommon::~DevCommon() int DevCommon::ioctl(struct file *filp, int cmd, unsigned long arg) { - //pretend we have enough space left to write, so mavlink will not drop data and throw off - //our parsing state + // pretend we have enough space left to write, so mavlink will not drop data and throw off + // our parsing state if (cmd == FIONSPACE) { *(int *)arg = 1024; return 0; @@ -242,8 +301,16 @@ int DevCommon::ioctl(struct file *filp, int cmd, unsigned long arg) int DevCommon::open(file *filp) { _fd = ::open(objects->device_name, O_RDWR | O_NOCTTY); + + if (_fd < 0) { + PX4_ERR("open failed: %s", strerror(errno)); + return -1; + } + CDev::open(filp); - return _fd >= 0 ? 0 : -1; + + + return 0; } int DevCommon::close(file *filp) @@ -269,125 +336,257 @@ pollevent_t DevCommon::poll_state(struct file *filp) * the _fd in here or by overriding some other method. */ - int ret = ::poll(fds, sizeof(fds) / sizeof(fds[0]), 100); - _had_data = ret > 0 && (fds[0].revents & POLLIN); + ::poll(fds, sizeof(fds) / sizeof(fds[0]), 100); return POLLIN; } +int DevCommon::try_to_copy_data(char *buffer, size_t buflen, MessageType message_type) +{ + if (buflen == 0) { + return -1; + } + + switch (message_type) { + case MessageType::Mavlink: + if (_read_buffer->start_mavlink < _read_buffer->end_mavlink) { + // We have Mavlink data ready to send. + const size_t len_available = _read_buffer->end_mavlink - _read_buffer->start_mavlink; + // We can only send what fits in the callers buffer. + const size_t len_to_copy = math::min(len_available, buflen); + + // Copy it to the callers buffer and remove it from our buffer. + _read_buffer->copy(buffer, _read_buffer->start_mavlink, len_to_copy); + + // Shift the markers accordingly. + _read_buffer->start_mavlink += len_to_copy; + + // Keep track for stats. + _read_buffer->mavlink_parsed += len_to_copy; + return len_to_copy; + + } else { + return -1; + } + + case MessageType::Rtps: + if (_read_buffer->start_rtps < _read_buffer->end_rtps) { + // We have Rtps data ready to send + const size_t len_available = _read_buffer->end_rtps - _read_buffer->start_rtps; + // We can only send what fits in the callers buffer. + const size_t len_to_copy = math::min(len_available, buflen); + + // Copy it to the callers buffer and remove it from our buffer. + _read_buffer->copy(buffer, _read_buffer->start_rtps, len_to_copy); + + // Shift the markers accordingly. + _read_buffer->start_rtps += len_to_copy; + + // Keep track for stats. + _read_buffer->rtps_parsed += len_to_copy; + return len_to_copy; + + } else { + return -1; + } + + break; + + + default: + return -1; + } +} + +void DevCommon::scan_for_packets() +{ + if (_read_buffer->buf_size < Sp2HeaderSize) { + // We have not even one header in the buffer, no need to scan yet. + return; + } + + bool mavlink_available = (_read_buffer->start_mavlink < _read_buffer->end_mavlink); + bool rtps_available = (_read_buffer->start_rtps < _read_buffer->end_rtps); + + if (mavlink_available && rtps_available) { + // We still have data for both, no need to scan yet. + return; + } + + const size_t begin = math::min(_read_buffer->end_mavlink, _read_buffer->end_rtps); + + for (size_t i = begin; i < _read_buffer->buf_size - Sp2HeaderSize; /* ++i */) { + + const Sp2Header *header = reinterpret_cast(&_read_buffer->buffer[i]); + + if (header->fields.magic != Sp2HeaderMagic) { + // Not the magic byte that we're looking for. + ++i; + continue; + } + + const uint8_t checksum = (_read_buffer->buffer[i] ^ _read_buffer->buffer[i + 1] ^ _read_buffer->buffer[i + 2]); + + if (header->fields.checksum != checksum) { + // Checksum failed. + ++i; + continue; + } + + if (header->fields.type != static_cast(MessageType::Mavlink) && + header->fields.type != static_cast(MessageType::Rtps)) { + // Ignore unknown protocols + ++i; + continue; + } + + const size_t payload_len = ((uint16_t)header->fields.len_h << 8) | header->fields.len_l; + + if (payload_len > sizeof(_read_buffer->buffer)) { + // This can happen if by accident data matches the header including checksum. + // Given we skip most data using the last payload_len, we should not see this too often, + // unless the link is very lossy and we often have to re-sync. + PX4_DEBUG("payload size %zu > buffer size %zu: %d, protocol: %s", + payload_len, sizeof(_read_buffer->buffer), + (header->fields.type == static_cast(MessageType::Mavlink)) ? "Mavlink" : "Rtps"); + ++i; + continue; + } + + if (i + Sp2HeaderSize + payload_len > _read_buffer->buf_size) { + // We don't have a enough data in the buffer yet, try again later. + break; + } + + if (header->fields.type == static_cast(MessageType::Mavlink) && !mavlink_available) { + _read_buffer->start_mavlink = i + Sp2HeaderSize; + _read_buffer->end_mavlink = _read_buffer->start_mavlink + payload_len; + mavlink_available = true; + + // Overwrite header magic byte, so we don't parse them again. + _read_buffer->buffer[i] = 0; + _read_buffer->header_bytes_received += Sp2HeaderSize; + + } else if (header->fields.type == static_cast(MessageType::Rtps) && !rtps_available) { + _read_buffer->start_rtps = i + Sp2HeaderSize; + _read_buffer->end_rtps = _read_buffer->start_rtps + payload_len; + rtps_available = true; + + // Overwrite header magic byte, so we don't parse them again. + _read_buffer->buffer[i] = 0; + _read_buffer->header_bytes_received += Sp2HeaderSize; + } + + if (mavlink_available && rtps_available) { + // Both have at least one message ready, we can stop now. + break; + } + + i += payload_len; + + } +} + + +void DevCommon::check_for_timeouts() +{ + // If a reader has timed out, mark its data as read. + + if (hrt_elapsed_time(&_read_buffer->last_mavlink_read) > reader_timeout_us) { + if (_read_buffer->start_mavlink < _read_buffer->end_mavlink) { + _read_buffer->start_mavlink = _read_buffer->end_mavlink; + } + } + + if (hrt_elapsed_time(&_read_buffer->last_rtps_read) > reader_timeout_us) { + if (_read_buffer->start_rtps < _read_buffer->end_rtps) { + _read_buffer->start_rtps = _read_buffer->end_rtps; + } + } +} + +void DevCommon::cleanup() +{ + const bool mavlink_available = (_read_buffer->start_mavlink < _read_buffer->end_mavlink); + const bool rtps_available = (_read_buffer->start_rtps < _read_buffer->end_rtps); + + // Clean up garbage bytes and accumulated headers + + size_t garbage_end = 0; + + if (!mavlink_available && !rtps_available) { + garbage_end = math::max(_read_buffer->start_mavlink, _read_buffer->start_rtps); + + } else { + garbage_end = math::min(_read_buffer->start_mavlink, _read_buffer->start_rtps); + } + + if (garbage_end > 0) { + _read_buffer->remove(0, garbage_end); + + _read_buffer->start_mavlink -= math::min(garbage_end, _read_buffer->start_mavlink); + _read_buffer->end_mavlink -= math::min(garbage_end, _read_buffer->end_mavlink); + _read_buffer->start_rtps -= math::min(garbage_end, _read_buffer->start_rtps); + _read_buffer->end_rtps -= math::min(garbage_end, _read_buffer->end_rtps); + } +} + + class Mavlink2Dev : public DevCommon { public: - Mavlink2Dev(ReadBuffer *_read_buffer); + Mavlink2Dev(ReadBuffer *read_buffer); virtual ~Mavlink2Dev() {} virtual ssize_t read(struct file *filp, char *buffer, size_t buflen); virtual ssize_t write(struct file *filp, const char *buffer, size_t buflen); - -protected: - ReadBuffer *_read_buffer; - size_t _remaining_partial = 0; - size_t _partial_start = 0; - uint8_t _partial_buffer[512] = {}; }; Mavlink2Dev::Mavlink2Dev(ReadBuffer *read_buffer) - : DevCommon("/dev/mavlink") - , _read_buffer{read_buffer} + : DevCommon("/dev/mavlink", read_buffer) { _header.fields.magic = Sp2HeaderMagic; _header.fields.len_h = 0; _header.fields.len_l = 0; _header.fields.checksum = 0; - _header.fields.type = MessageType::Mavlink; + _header.fields.type = static_cast(MessageType::Mavlink); } ssize_t Mavlink2Dev::read(struct file *filp, char *buffer, size_t buflen) { - int i, ret; - uint16_t packet_len, payload_len; - Sp2Header_t *header; - - /* last reading was partial (i.e., buffer didn't fit whole message), - * so now we'll just send remaining bytes */ - if (_remaining_partial > 0) { - size_t len = _remaining_partial; - - if (buflen < len) { - len = buflen; - } - - memmove(buffer, _partial_buffer + _partial_start, len); - _partial_start += len; - _remaining_partial -= len; - - if (_remaining_partial == 0) { - _partial_start = 0; - } - - return len; - } - - if (!_had_data) { - return 0; - } + _read_buffer->last_mavlink_read = hrt_absolute_time(); lock(Read); + + // The cleanup needs to be right after a scan, so we don't clean up + // something that we haven't found yet. + scan_for_packets(); + check_for_timeouts(); + cleanup(); + + // If we have already a packet ready in the current buffer, we don't have + // to read and can grab data straightaway. + int ret = try_to_copy_data(buffer, buflen, MessageType::Mavlink); + + if (ret > 0) { + unlock(Read); + return ret; + } + + // Otherwise, we have to do a read. ret = _read_buffer->read(_fd); if (ret < 0) { - goto end; + unlock(Read); + return ret; } - ret = 0; + // Now we need to check again if there is data available. + scan_for_packets(); - if (_read_buffer->buf_size < Sp2HeaderSize) { - goto end; - } + // And try to copy it out. + ret = try_to_copy_data(buffer, buflen, MessageType::Mavlink); - // Search for a mavlink packet on buffer to send it - i = 0; - - while ((unsigned)i < (_read_buffer->buf_size - Sp2HeaderSize) && - (((Sp2Header_t *) &_read_buffer->buffer[i])->fields.magic != Sp2HeaderMagic - || ((Sp2Header_t *) &_read_buffer->buffer[i])->fields.type != (uint8_t)MessageType::Mavlink - || ((Sp2Header_t *) &_read_buffer->buffer[i])->fields.checksum != - (_read_buffer->buffer[i] ^ _read_buffer->buffer[i + 1] ^ _read_buffer->buffer[i + 2]) - )) { - i++; - } - - // We need at least the first six bytes to get packet len - if ((unsigned)i >= _read_buffer->buf_size - Sp2HeaderSize) { - goto end; - } - - header = (Sp2Header_t *)&_read_buffer->buffer[i]; - payload_len = ((uint16_t)header->fields.len_h << 8) | header->fields.len_l; - packet_len = payload_len + Sp2HeaderSize; - - // packet is bigger than what we've read, better luck next time - if ((unsigned)i + packet_len > _read_buffer->buf_size) { - goto end; - } - - /* if buffer doesn't fit message, send what's possible and copy remaining - * data into a temporary buffer on this class */ - if (payload_len > buflen) { - _read_buffer->copy(buffer, i + Sp2HeaderSize, buflen); - _read_buffer->copy(_partial_buffer, i + Sp2HeaderSize + buflen, payload_len - buflen); - _read_buffer->remove(i, packet_len); - _remaining_partial = payload_len - buflen; - ret = buflen; - goto end; - } - - _read_buffer->copy(buffer, i + Sp2HeaderSize, payload_len); - _read_buffer->remove(i, packet_len); - - ret = payload_len; - -end: unlock(Read); return ret; } @@ -466,89 +665,57 @@ ssize_t Mavlink2Dev::write(struct file *filp, const char *buffer, size_t buflen) class RtpsDev : public DevCommon { public: - RtpsDev(ReadBuffer *_read_buffer); + RtpsDev(ReadBuffer *read_buffer); virtual ~RtpsDev() {} virtual ssize_t read(struct file *filp, char *buffer, size_t buflen); virtual ssize_t write(struct file *filp, const char *buffer, size_t buflen); protected: - ReadBuffer *_read_buffer; - static const uint8_t HEADER_SIZE = 9; }; RtpsDev::RtpsDev(ReadBuffer *read_buffer) - : DevCommon("/dev/rtps") - , _read_buffer{read_buffer} + : DevCommon("/dev/rtps", read_buffer) { _header.fields.magic = Sp2HeaderMagic; _header.fields.len_h = 0; _header.fields.len_l = 0; _header.fields.checksum = 0; - _header.fields.type = MessageType::Rtps; + _header.fields.type = static_cast(MessageType::Rtps); } ssize_t RtpsDev::read(struct file *filp, char *buffer, size_t buflen) { - int i, ret; - uint16_t packet_len, payload_len; - Sp2Header_t *header; - - if (!_had_data) { - return 0; - } + _read_buffer->last_rtps_read = hrt_absolute_time(); lock(Read); + + scan_for_packets(); + check_for_timeouts(); + cleanup(); + + // If we have already a packet ready in the current buffer, we don't have to read. + int ret = try_to_copy_data(buffer, buflen, MessageType::Rtps); + + if (ret > 0) { + unlock(Read); + return ret; + } + + // Otherwise, we have to do a read. ret = _read_buffer->read(_fd); if (ret < 0) { - goto end; + unlock(Read); + return ret; } - ret = 0; + scan_for_packets(); - if (_read_buffer->buf_size < Sp2HeaderSize) { - goto end; - } + // And check again. + ret = try_to_copy_data(buffer, buflen, MessageType::Rtps); - // Search for a rtps packet on buffer to send it - i = 0; - - while ((unsigned)i < (_read_buffer->buf_size - Sp2HeaderSize) && - (((Sp2Header_t *) &_read_buffer->buffer[i])->fields.magic != Sp2HeaderMagic - || ((Sp2Header_t *) &_read_buffer->buffer[i])->fields.type != (uint8_t)MessageType::Rtps - || ((Sp2Header_t *) &_read_buffer->buffer[i])->fields.checksum != - (_read_buffer->buffer[i] ^ _read_buffer->buffer[i + 1] ^ _read_buffer->buffer[i + 2]) - )) { - i++; - } - - // We need at least the first six bytes to get packet len - if ((unsigned)i >= _read_buffer->buf_size - Sp2HeaderSize) { - goto end; - } - - header = (Sp2Header_t *)&_read_buffer->buffer[i]; - payload_len = ((uint16_t)header->fields.len_h << 8) | header->fields.len_l; - packet_len = payload_len + Sp2HeaderSize; - - // packet is bigger than what we've read, better luck next time - if ((unsigned)i + packet_len > _read_buffer->buf_size) { - goto end; - } - - // buffer should be big enough to hold a rtps packet - if (packet_len > buflen) { - ret = -EMSGSIZE; - goto end; - } - - _read_buffer->copy(buffer, i + Sp2HeaderSize, payload_len); - _read_buffer->remove(i, packet_len); - ret = payload_len; - -end: unlock(Read); return ret; } @@ -623,7 +790,7 @@ int protocol_splitter_main(int argc, char *argv[]) */ if (!strcmp(argv[1], "start")) { if (objects) { - PX4_ERR("already running"); + PX4_WARN("already running"); return 1; } @@ -681,6 +848,13 @@ int protocol_splitter_main(int argc, char *argv[]) if (objects) { PX4_INFO("running"); + if (sem_wait(&objects->r_lock) != 0) { + return -1; + } + + objects->read_buffer->print_stats(); + sem_post(&objects->r_lock); + } else { PX4_INFO("not running"); }