diff --git a/Tools/check_code_style_all.sh b/Tools/check_code_style_all.sh index f66a039e15d..c8224c0a7bc 100755 --- a/Tools/check_code_style_all.sh +++ b/Tools/check_code_style_all.sh @@ -31,6 +31,7 @@ find \ src/modules/muorb \ src/modules/param \ src/modules/px4iofirmware \ + src/modules/replay \ src/modules/segway \ src/modules/sensors \ src/modules/simulator \ diff --git a/cmake/configs/posix_sitl_default.cmake b/cmake/configs/posix_sitl_default.cmake index 6457ea5a2b1..5721e30d457 100644 --- a/cmake/configs/posix_sitl_default.cmake +++ b/cmake/configs/posix_sitl_default.cmake @@ -52,6 +52,7 @@ set(config_module_list modules/param modules/position_estimator_inav modules/local_position_estimator + modules/replay modules/sdlog2 modules/sensors modules/simulator diff --git a/src/modules/logger/messages.h b/src/modules/logger/messages.h index 10e5d018388..bcbe0157305 100644 --- a/src/modules/logger/messages.h +++ b/src/modules/logger/messages.h @@ -49,14 +49,18 @@ enum class ULogMessageType : uint8_t { /* declare message data structs with byte alignment (no padding) */ #pragma pack(push, 1) -#define ULOG_MSG_HEADER_LEN 3 //accounts for msg_size and msg_type - /** first bytes of the file */ struct ulog_file_header_s { uint8_t magic[8]; uint64_t timestamp; }; +#define ULOG_MSG_HEADER_LEN 3 //accounts for msg_size and msg_type +struct ulog_message_header_s { + uint16_t msg_size; + uint8_t msg_type; +}; + struct ulog_message_format_s { uint16_t msg_size; //size of message - ULOG_MSG_HEADER_LEN uint8_t msg_type = static_cast(ULogMessageType::FORMAT); diff --git a/src/modules/replay/CMakeLists.txt b/src/modules/replay/CMakeLists.txt new file mode 100644 index 00000000000..557cb070f5b --- /dev/null +++ b/src/modules/replay/CMakeLists.txt @@ -0,0 +1,44 @@ +############################################################################ +# +# Copyright (c) 2016 PX4 Development Team. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# 3. Neither the name PX4 nor the names of its contributors may be +# used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS +# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +############################################################################# +px4_add_module( + MODULE modules__replay + MAIN replay + COMPILE_FLAGS + STACK_MAIN 1000 + STACK_MAX 4000 + SRCS + replay_main.cpp + DEPENDS + platforms__common + ) +# vim: set noet ft=cmake fenc=utf-8 ff=unix : diff --git a/src/modules/replay/definitions.hpp b/src/modules/replay/definitions.hpp new file mode 100644 index 00000000000..1b728aa9946 --- /dev/null +++ b/src/modules/replay/definitions.hpp @@ -0,0 +1,45 @@ +/**************************************************************************** + * + * Copyright (c) 2016 PX4 Development Team. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name PX4 nor the names of its contributors may be + * used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ****************************************************************************/ + +#pragma once + +namespace px4 +{ +namespace replay +{ + +static const char *ENV_FILENAME = "replay"; ///< name for getenv() + + +} //namespace replay +} //namespace px4 diff --git a/src/modules/replay/replay.hpp b/src/modules/replay/replay.hpp new file mode 100644 index 00000000000..78315dde6a9 --- /dev/null +++ b/src/modules/replay/replay.hpp @@ -0,0 +1,152 @@ +/**************************************************************************** + * + * Copyright (c) 2016 PX4 Development Team. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name PX4 nor the names of its contributors may be + * used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ****************************************************************************/ + +#pragma once + +#include +#include +#include +#include +#include + +#include "definitions.hpp" + +#include + +namespace px4 +{ + +/** + * @class Replay + * Parses an ULog file and replays it in 'real-time'. The timestamp of each replayed message is offset + * to match the starting time of replay. It keeps a stream for each subscription to find the next message + * to replay. This is necessary because data messages from different subscriptions don't need to be in + * monotonic increasing order. + */ +class Replay +{ +public: + Replay(); + + /// Destructor, also waits for task exit + ~Replay(); + + /// Start task. + /// @param quiet silently fail if no log file found + /// @return OK on success. + static int start(bool quiet); + + static void task_main_trampoline(int argc, char *argv[]); + + void task_main(); + + /** + * Tell the replay module that we want to use replay mode. + * After that, only 'replay start' must be executed (typically the last step after startup). + * @param file_name file name of the used log replay file. Will be copied. + */ + static void setupReplayFile(const char *file_name); + + static bool isSetup() { return _replay_file; } +private: + bool _task_should_exit = false; + std::set _overridden_params; + std::map _file_formats; ///< all formats we read from the file + + uint64_t _file_start_time; + uint64_t _replay_start_time; + std::streampos _data_section_start; ///< first ADD_LOGGED_MSG message + std::vector _read_buffer; + + struct Subscription { + + const orb_metadata *orb_meta = nullptr; ///< if nullptr, this subscription is invalid + orb_advert_t orb_advert = nullptr; + uint8_t multi_id; + int timestamp_offset; ///< marks the field of the timestamp + + std::streampos next_read_pos; + uint64_t next_timestamp; ///< timestamp of the file + }; + std::vector _subscriptions; + + /** keep track of file position to avoid adding a subscription multiple times. */ + std::streampos _subscription_file_pos = 0; + + bool readFileHeader(std::ifstream &file); + + /** + * Read definitions section: check formats, apply parameters and store + * the start of the data section. + * @return true on success + */ + bool readFileDefinitions(std::ifstream &file); + + ///file parsing methods. They return false, when further parsing should be aborted. + bool readFormat(std::ifstream &file, uint16_t msg_size); + bool readAndAddSubscription(std::ifstream &file, uint16_t msg_size); + + /** + * Read and handle additional messages starting at current file position, while position < end_position. + * This handles dropout and parameter update messages. + * We need to handle these separately, because they have no timestamp. We look at the file position instead. + * @return false on file error + */ + bool readAndHandleAdditionalMessages(std::ifstream &file, std::streampos end_position); + bool readDropout(std::ifstream &file, uint16_t msg_size); + bool readAndApplyParameter(std::ifstream &file, uint16_t msg_size); + + /** + * Find next data message for this subscription, starting with the stored file offset. + * Skip the first message, and if found, read the timestamp and store the new file offset. + * This also takes care of new subscriptions and parameter updates. When reaching EOF, + * the subscription is set to invalid. + * File seek position is arbitrary after this call. + * @return false on file error + */ + bool nextDataMessage(std::ifstream &file, Subscription &subscription, int msg_id); + + static const orb_metadata *findTopic(const std::string &name); + /** get the array size from a type. eg. float[3] -> return float */ + static std::string extractArraySize(const std::string &type_name_full, int &array_size); + /** get the size of a type that is not an array */ + static size_t sizeOfType(const std::string &type_name); + /** get the size of a type that can be an array */ + static size_t sizeOfFullType(const std::string &type_name_full); + + void setUserParams(const char *filename); + + static char *_replay_file; +}; + +} //namespace px4 diff --git a/src/modules/replay/replay_main.cpp b/src/modules/replay/replay_main.cpp new file mode 100644 index 00000000000..cf91f5a3aaa --- /dev/null +++ b/src/modules/replay/replay_main.cpp @@ -0,0 +1,861 @@ +/**************************************************************************** + * + * Copyright (c) 2016 PX4 Development Team. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name PX4 nor the names of its contributors may be + * used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ****************************************************************************/ + +/** + * @file replay_main.cpp + * This module reads messages from an ULog file and publishes them. + * It sets the parameters from the log file and handles user-defined + * parameter overrides. + * + * @author Beat Kueng +*/ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "replay.hpp" + +#define PARAMS_OVERRIDE_FILE PX4_ROOTFSDIR "/replay_params.txt" + + +extern "C" __EXPORT int replay_main(int argc, char *argv[]); + +using namespace std; + +namespace px4 +{ +class Replay; + +namespace replay +{ +Replay *instance = nullptr; +static int control_task = -1; //task handle for task + +} //namespace replay + + +char *Replay::_replay_file = nullptr; + +Replay::Replay() +{ +} + +Replay::~Replay() +{ + if (replay::control_task != -1) { + _task_should_exit = true; + + /* wait for a second for the task to quit at our request */ + unsigned int i = 0; + + do { + usleep(20000); + + /* if we have given up, kill it */ + if (++i > 200) { + px4_task_delete(replay::control_task); + replay::control_task = -1; + break; + } + } while (replay::control_task != -1); + } +} + +void Replay::setupReplayFile(const char *file_name) +{ + if (_replay_file) { + free(_replay_file); + } + + _replay_file = strdup(file_name); +} + + + +void Replay::setUserParams(const char *filename) +{ + string line, param_name, value_string; + ifstream myfile(filename); + + if (!myfile.is_open()) { + return; + } + + PX4_INFO("Applying override params from %s...", filename); + + while (!myfile.eof()) { + getline(myfile, line); + + if (line.empty() || line[0] == '#') { + continue; + } + + istringstream mystrstream(line); + mystrstream >> param_name; + mystrstream >> value_string; + + double param_value_double = stod(value_string); + + param_t handle = param_find(param_name.c_str()); + param_type_t param_format = param_type(handle); + _overridden_params.insert(param_name); + + if (param_format == PARAM_TYPE_INT32) { + int32_t value = 0; + value = (int32_t)param_value_double; + param_set(handle, (const void *)&value); + + } else if (param_format == PARAM_TYPE_FLOAT) { + float value = 0; + value = (float)param_value_double; + param_set(handle, (const void *)&value); + } + } +} + +bool Replay::readFileHeader(std::ifstream &file) +{ + file.seekg(0); + ulog_file_header_s msg_header; + file.read((char *)&msg_header, sizeof(msg_header)); + + if (!file) { + return false; + } + + _file_start_time = msg_header.timestamp; + //verify it's an ULog file + char magic[8]; + magic[0] = 'U'; + magic[1] = 'L'; + magic[2] = 'o'; + magic[3] = 'g'; + magic[4] = 0x01; + magic[5] = 0x12; + magic[6] = 0x35; + return memcmp(magic, msg_header.magic, 7) == 0; +} + +bool Replay::readFileDefinitions(std::ifstream &file) +{ + PX4_INFO("Applying params from ULog file..."); + + ulog_message_header_s message_header; + file.seekg(sizeof(ulog_file_header_s)); + + while (true) { + file.read((char *)&message_header, ULOG_MSG_HEADER_LEN); + + if (!file) { + return false; + } + + switch (message_header.msg_type) { + case (int)ULogMessageType::FORMAT: + if (!readFormat(file, message_header.msg_size)) { + return false; + } + + break; + + case (int)ULogMessageType::PARAMETER: + if (!readAndApplyParameter(file, message_header.msg_size)) { + return false; + } + + break; + + case (int)ULogMessageType::ADD_LOGGED_MSG: + _data_section_start = file.tellg() - (streamoff)ULOG_MSG_HEADER_LEN; + return true; + + case (int)ULogMessageType::INFO: //skip + file.seekg(message_header.msg_size, ios::cur); + break; + + default: + PX4_ERR("unknown log definition type %i, size %i (offset %i)", + (int)message_header.msg_type, (int)message_header.msg_size, (int)file.tellg()); + file.seekg(message_header.msg_size, ios::cur); + break; + } + } + + return true; +} + +bool Replay::readFormat(std::ifstream &file, uint16_t msg_size) +{ + _read_buffer.reserve(msg_size + 1); + char *format = (char *)_read_buffer.data(); + file.read(format, msg_size); + format[msg_size] = 0; + + if (!file) { + return false; + } + + string str_format(format); + size_t pos = str_format.find(':'); + + if (pos == string::npos) { + return false; + } + + string name = str_format.substr(0, pos); + string fields = str_format.substr(pos + 1); + _file_formats[name] = fields; + + return true; +} + +bool Replay::readAndAddSubscription(std::ifstream &file, uint16_t msg_size) +{ + _read_buffer.reserve(msg_size + 1); + char *message = (char *)_read_buffer.data(); + streampos this_message_pos = file.tellg() - (streamoff)ULOG_MSG_HEADER_LEN; + file.read(message, msg_size); + message[msg_size] = 0; + + if (!file) { + return false; + } + + if (file.tellg() <= _subscription_file_pos) { //already read this subscription + return true; + } + + _subscription_file_pos = file.tellg(); + + uint8_t multi_id = *(uint8_t *)message; + uint16_t msg_id = ((uint16_t) message[1]) | (((uint16_t) message[2]) << 8); + string topic_name(message + 3); + const orb_metadata *orb_meta = findTopic(topic_name); + + if (!orb_meta) { + PX4_WARN("Topic %s not found internally. Will ignore it", topic_name.c_str()); + return true; + } + + //check the format: the field definitions must match + //FIXME: this should check recursively, all used nested types + string file_format = _file_formats[topic_name]; + + if (file_format != orb_meta->o_fields) { + PX4_WARN("Formats for %s don't match. Will ignore it.", topic_name.c_str()); + PX4_WARN(" Internal format: %s", orb_meta->o_fields); + PX4_WARN(" File format : %s", file_format.c_str()); + } + + Subscription subscription; + subscription.orb_meta = orb_meta; + subscription.multi_id = multi_id; + + + //find the timestamp offset (not necessarily the first field) + string fields = orb_meta->o_fields; + size_t prev_field_end = 0; + size_t field_end = fields.find(';'); + bool timestamp_found = false; + subscription.timestamp_offset = 0; + + while (field_end != string::npos && !timestamp_found) { + size_t space_pos = fields.find(' ', prev_field_end); + + if (space_pos != string::npos) { + string type_name_full = fields.substr(prev_field_end, space_pos - prev_field_end); + string field_name = fields.substr(space_pos + 1, field_end - space_pos - 1); + + if (field_name == "timestamp") { + timestamp_found = true; + + if (type_name_full != "uint64_t") { + PX4_ERR("Unsupported timestamp type %s, ignoring the topic %s", type_name_full, + orb_meta->o_name); + return true; + } + + } else { + subscription.timestamp_offset += sizeOfFullType(type_name_full); + } + } + + prev_field_end = field_end + 1; + field_end = fields.find(';', prev_field_end); + } + + if (!timestamp_found) { + return true; + } + + + //find first data message (and the timestamp) + streampos cur_pos = file.tellg(); + subscription.next_read_pos = this_message_pos; //this will be skipped + + if (!nextDataMessage(file, subscription, msg_id)) { + return false; + } + + file.seekg(cur_pos); + + if (!subscription.orb_meta) { + //no message found. This is not a fatal error + return true; + } + + PX4_DEBUG("adding subscription for %s (msg_id %i)", subscription.orb_meta->o_name, msg_id); + + //add subscription + if (_subscriptions.size() <= msg_id) { + _subscriptions.resize(msg_id + 1); + } + + _subscriptions[msg_id] = subscription; + + return true; +} + +bool Replay::readAndHandleAdditionalMessages(std::ifstream &file, std::streampos end_position) +{ + ulog_message_header_s message_header; + + while (file.tellg() < end_position) { + file.read((char *)&message_header, ULOG_MSG_HEADER_LEN); + + if (!file) { + return false; + } + + switch (message_header.msg_type) { + case (int)ULogMessageType::PARAMETER: + if (!readAndApplyParameter(file, message_header.msg_size)) { + return false; + } + + break; + + case (int)ULogMessageType::DROPOUT: + readDropout(file, message_header.msg_size); + break; + + default: //skip all others + file.seekg(message_header.msg_size, ios::cur); + break; + } + } + + return true; +} + +bool Replay::readAndApplyParameter(std::ifstream &file, uint16_t msg_size) +{ + _read_buffer.reserve(msg_size); + uint8_t *message = (uint8_t *)_read_buffer.data(); + file.read((char *)message, msg_size); + + if (!file) { + return false; + } + + uint8_t key_len = message[0]; + string key((char *)message + 1, key_len); + + size_t pos = key.find(' '); + + if (pos == string::npos) { + return false; + } + + string type = key.substr(0, pos); + string param_name = key.substr(pos + 1); + + if (_overridden_params.find(param_name) != _overridden_params.end()) { + //this parameter is overridden, so don't apply it + return true; + } + + if (type != "int32_t" && type != "float") { + PX4_WARN("unknown parameter type %s, name %s (ignoring it)", type.c_str(), param_name.c_str()); + return true; + } + + param_t handle = param_find(param_name.c_str()); + + if (handle != PARAM_INVALID) { + param_set(handle, (const void *)(message + 1 + key_len)); + } + + return true; +} + +bool Replay::readDropout(std::ifstream &file, uint16_t msg_size) +{ + uint16_t duration; + file.read((char *)&duration, sizeof(duration)); + + PX4_INFO("Dropout in replayed log, %i ms", (int)duration); + return file.good(); +} + +bool Replay::nextDataMessage(std::ifstream &file, Subscription &subscription, int msg_id) +{ + ulog_message_header_s message_header; + file.seekg(subscription.next_read_pos); + //ignore the first message (it's data we already read) + file.read((char *)&message_header, ULOG_MSG_HEADER_LEN); + + if (file) { + file.seekg(message_header.msg_size, ios::cur); + } + + uint16_t file_msg_id; + bool done = false; + + while (file && !done) { + streampos cur_pos = file.tellg(); + file.read((char *)&message_header, ULOG_MSG_HEADER_LEN); + + if (!file) { + break; + } + + switch (message_header.msg_type) { + case (int)ULogMessageType::ADD_LOGGED_MSG: + readAndAddSubscription(file, message_header.msg_size); + break; + + case (int)ULogMessageType::DATA: + file.read((char *)&file_msg_id, sizeof(file_msg_id)); + + if (file) { + if (msg_id == file_msg_id) { + if (message_header.msg_size == subscription.orb_meta->o_size_no_padding + 2) { + subscription.next_read_pos = cur_pos; + file.seekg(subscription.timestamp_offset, ios::cur); + file.read((char *)&subscription.next_timestamp, sizeof(subscription.next_timestamp)); + done = true; + + } else { //sanity check failed! + PX4_ERR("data message %s has wrong size %i (expected %i). Skipping", + subscription.orb_meta->o_name, message_header.msg_size, + subscription.orb_meta->o_size_no_padding + 2); + file.seekg(message_header.msg_size - sizeof(file_msg_id), ios::cur); + } + + } else { //not the one we are looking for + file.seekg(message_header.msg_size - sizeof(file_msg_id), ios::cur); + } + } + + break; + + case (int)ULogMessageType::REMOVE_LOGGED_MSG: //skip these + case (int)ULogMessageType::PARAMETER: + case (int)ULogMessageType::DROPOUT: + case (int)ULogMessageType::SYNC: + case (int)ULogMessageType::LOGGING: + file.seekg(message_header.msg_size, ios::cur); + break; + + default: + //this really should not happen + PX4_ERR("unknown log message type %i, size %i (offset %i)", + (int)message_header.msg_type, (int)message_header.msg_size, (int)file.tellg()); + file.seekg(message_header.msg_size, ios::cur); + break; + } + } + + if (file.eof()) { //no more data messages for this subscription + subscription.orb_meta = nullptr; + file.clear(); + } + + return file.good(); +} + +const orb_metadata *Replay::findTopic(const std::string &name) +{ + const orb_metadata **topics = orb_get_topics(); + + for (size_t i = 0; i < orb_topics_count(); i++) { + if (name == topics[i]->o_name) { + return topics[i]; + } + } + + return nullptr; +} + +std::string Replay::extractArraySize(const std::string &type_name_full, int &array_size) +{ + size_t start_pos = type_name_full.find('['); + size_t end_pos = type_name_full.find(']'); + + if (start_pos == string::npos || end_pos == string::npos) { + array_size = 1; + return type_name_full; + } + + array_size = atoi(type_name_full.substr(start_pos + 1, end_pos - start_pos - 1).c_str()); + return type_name_full.substr(0, start_pos); +} + +size_t Replay::sizeOfType(const std::string &type_name) +{ + if (type_name == "int8_t" || type_name == "uint8_t") { + return 1; + + } else if (type_name == "int16_t" || type_name == "uint16_t") { + return 2; + + } else if (type_name == "int32_t" || type_name == "uint32_t") { + return 4; + + } else if (type_name == "int64_t" || type_name == "uint64_t") { + return 8; + + } else if (type_name == "float") { + return 4; + + } else if (type_name == "double") { + return 8; + + } else if (type_name == "char" || type_name == "bool") { + return 1; + } + + const orb_metadata *orb_meta = findTopic(type_name); + + if (orb_meta) { + return orb_meta->o_size; + } + + PX4_ERR("unknown type: %s", type_name.c_str()); + return 0; +} +size_t Replay::sizeOfFullType(const std::string &type_name_full) +{ + int array_size; + string type_name = extractArraySize(type_name_full, array_size); + return sizeOfType(type_name) * array_size; +} + +void Replay::task_main() +{ + // log reader currently assumes little endian + int num = 1; + + if (*(char *)&num != 1) { + PX4_ERR("Replay only works on little endian!"); + return; + } + + ifstream replay_file(_replay_file, ios::in | ios::binary); + + if (!replay_file.is_open()) { + PX4_ERR("Failed to open replay file"); + return; + } + + if (!readFileHeader(replay_file)) { + PX4_ERR("Failed to read file header. Not a valid ULog file"); + return; + } + + //initialize the formats and apply the parameters from the log file + if (!readFileDefinitions(replay_file)) { + PX4_ERR("Failed to read ULog definitions section. Broken file?"); + return; + } + + setUserParams(PARAMS_OVERRIDE_FILE); + + _replay_start_time = hrt_absolute_time(); + + PX4_INFO("Replay in progress..."); + + ulog_message_header_s message_header; + replay_file.seekg(_data_section_start); + + //we know the next message must be an ADD_LOGGED_MSG + replay_file.read((char *)&message_header, ULOG_MSG_HEADER_LEN); + + if (!readAndAddSubscription(replay_file, message_header.msg_size)) { + PX4_ERR("Failed to read subscription"); + return; + } + + + //we update the timestamps from the file by a constant offset to match + //the current replay time + const uint64_t timestamp_offset = _replay_start_time - _file_start_time; + uint32_t nr_published_messages = 0; + streampos last_additional_message_pos = _data_section_start; + + while (!_task_should_exit && replay_file) { + + //Find the next message to publish. Messages from different subscriptions don't need + //to be in chronological order, so we need to check all subscriptions + uint64_t next_file_time = 0; + int next_msg_id = -1; + + for (size_t i = 0; i < _subscriptions.size(); ++i) { + const Subscription &subscription = _subscriptions[i]; + + if (subscription.orb_meta) { + if (next_file_time == 0 || subscription.next_timestamp < next_file_time) { + next_msg_id = (int)i; + next_file_time = subscription.next_timestamp; + } + } + } + + if (next_msg_id == -1) { + break; //no active subscription anymore. We're done. + } + + Subscription &sub = _subscriptions[next_msg_id]; + + if (next_file_time == 0) { + //someone didn't set the timestamp properly. Consider the message invalid + nextDataMessage(replay_file, sub, next_msg_id); + continue; + } + + + //handle additional messages between last and next published data + replay_file.seekg(last_additional_message_pos); + streampos next_additional_message_pos = sub.next_read_pos; + readAndHandleAdditionalMessages(replay_file, next_additional_message_pos); + last_additional_message_pos = next_additional_message_pos; + + + //wait if necessary + const uint64_t publish_timestamp = next_file_time + timestamp_offset; + uint64_t cur_time = hrt_absolute_time(); + + if (cur_time < publish_timestamp) { + usleep(publish_timestamp - cur_time); + } + + //It's time to publish + const size_t msg_read_size = sub.orb_meta->o_size_no_padding; + const size_t msg_write_size = sub.orb_meta->o_size; + _read_buffer.reserve(msg_write_size); + replay_file.seekg(sub.next_read_pos + (streamoff)(ULOG_MSG_HEADER_LEN + 2)); //skip header & msg id + replay_file.read((char *)_read_buffer.data(), msg_read_size); + *(uint64_t *)(_read_buffer.data() + sub.timestamp_offset) = publish_timestamp; + + if (sub.orb_advert) { + orb_publish(sub.orb_meta, sub.orb_advert, _read_buffer.data()); + ++nr_published_messages; + + } else { + if (sub.multi_id == 0) { + sub.orb_advert = orb_advertise(sub.orb_meta, _read_buffer.data()); + ++nr_published_messages; + + } else { + // make sure the other instances are advertised already so that we get the correct instance + bool advertised = false; + + for (const auto &subscription : _subscriptions) { + if (subscription.orb_meta) { + if (strcmp(sub.orb_meta->o_name, subscription.orb_meta->o_name) == 0 && + subscription.orb_advert && subscription.multi_id == sub.multi_id - 1) { + advertised = true; + } + } + } + + if (advertised) { + int instance; + sub.orb_advert = orb_advertise_multi(sub.orb_meta, _read_buffer.data(), + &instance, ORB_PRIO_DEFAULT); + ++nr_published_messages; + } + } + } + + + nextDataMessage(replay_file, _subscriptions[next_msg_id], next_msg_id); + + //TODO: output status (eg. every sec), including total duration... + } + + for (auto &subscription : _subscriptions) { + if (subscription.orb_advert) { + orb_unadvertise(subscription.orb_advert); + subscription.orb_advert = nullptr; + } + } + + if (!_task_should_exit) { + PX4_INFO("Replay done (published %u msgs, %.3lf s)", nr_published_messages, + (double)hrt_elapsed_time(&_replay_start_time) / 1.e6); + + //TODO: should we close the log file & exit (optionally, by adding a parameter -q) ? + } +} + +void Replay::task_main_trampoline(int argc, char *argv[]) +{ + replay::instance = new Replay(); + + if (replay::instance == nullptr) { + PX4_ERR("alloc failed"); + return; + } + + replay::instance->task_main(); + replay::control_task = -1; +} + +int Replay::start(bool quiet) +{ + ASSERT(replay::control_task == -1); + + //check for logfile env variable + const char *logfile = getenv(replay::ENV_FILENAME); + + if (logfile) { + PX4_INFO("using replay log file: %s", logfile); + setupReplayFile(logfile); + + } else { + if (quiet) { + return OK; + + } else { + PX4_ERR("no log file given (via env variable %s)", replay::ENV_FILENAME); + return -1; + } + } + + /* start the task */ + replay::control_task = px4_task_spawn_cmd("replay", + SCHED_DEFAULT, + SCHED_PRIORITY_MAX - 5, + 4000, + (px4_main_t)&Replay::task_main_trampoline, + nullptr); + + if (replay::control_task < 0) { + replay::control_task = -1; + PX4_ERR("task start failed"); + return -errno; + } + + return OK; +} + +} //namespace px4 + +using namespace px4; + +int replay_main(int argc, char *argv[]) +{ + if (argc < 1) { + PX4_WARN("usage: replay {trystart|start|stop|status}"); + return 1; + } + + bool do_start = false; + bool quiet = false; + + if (!strcmp(argv[1], "start")) { + do_start = true; + + } else if (!strcmp(argv[1], "trystart")) { + do_start = true; + quiet = true; + } + + if (do_start) { + if (replay::instance != nullptr) { + PX4_WARN("already running"); + return 1; + } + + if (OK != replay::instance->start(quiet)) { + PX4_ERR("start failed"); + return 1; + } + + return 0; + } + + if (!strcmp(argv[1], "stop")) { + if (replay::instance == nullptr) { + PX4_WARN("not running"); + return 1; + } + + delete replay::instance; + replay::instance = nullptr; + + return 0; + } + + if (!strcmp(argv[1], "status")) { + if (replay::instance) { + PX4_WARN("running"); + return 0; + + } else { + PX4_WARN("not running"); + return 1; + } + } + + PX4_ERR("unrecognized command"); + return 1; +}