mirror of
https://github.com/PX4/PX4-Autopilot.git
synced 2026-06-02 03:49:12 +08:00
replay: some API refactoring & extensions
in preparation to the updated ekf2 replay
This commit is contained in:
@@ -3,5 +3,5 @@ uorb start
|
|||||||
ekf2 start --replay
|
ekf2 start --replay
|
||||||
logger start -e -t -b 1000 -p vehicle_attitude
|
logger start -e -t -b 1000 -p vehicle_attitude
|
||||||
sleep 0.2
|
sleep 0.2
|
||||||
replay start -m ekf2
|
replay start
|
||||||
|
|
||||||
|
|||||||
@@ -93,6 +93,8 @@ protected:
|
|||||||
uint8_t multi_id;
|
uint8_t multi_id;
|
||||||
int timestamp_offset; ///< marks the field of the timestamp
|
int timestamp_offset; ///< marks the field of the timestamp
|
||||||
|
|
||||||
|
bool ignored = false; ///< if true, it will not be considered for publication in the main loop
|
||||||
|
|
||||||
std::streampos next_read_pos;
|
std::streampos next_read_pos;
|
||||||
uint64_t next_timestamp; ///< timestamp of the file
|
uint64_t next_timestamp; ///< timestamp of the file
|
||||||
};
|
};
|
||||||
@@ -114,6 +116,11 @@ protected:
|
|||||||
*/
|
*/
|
||||||
virtual void onExitMainLoop() {};
|
virtual void onExitMainLoop() {};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* called when a new subscription is added
|
||||||
|
*/
|
||||||
|
virtual void onSubscriptionAdded(Subscription &sub, uint16_t msg_id) {};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* handle delay until topic can be published.
|
* handle delay until topic can be published.
|
||||||
* @param next_file_timestamp timestamp of next message to publish
|
* @param next_file_timestamp timestamp of next message to publish
|
||||||
@@ -126,7 +133,25 @@ protected:
|
|||||||
* handle the publication of a topic update
|
* handle the publication of a topic update
|
||||||
* @return true if published, false otherwise
|
* @return true if published, false otherwise
|
||||||
*/
|
*/
|
||||||
virtual bool handleTopicUpdate(Subscription &sub, void *data);
|
virtual bool handleTopicUpdate(Subscription &sub, void *data, std::ifstream &replay_file);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* read a topic from the file (offset given by the subscription) into _read_buffer
|
||||||
|
*/
|
||||||
|
void readTopicDataToBuffer(const Subscription &sub, std::ifstream &replay_file);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find next data message for this subscription, starting with the stored file offset.
|
||||||
|
* Skip the first message, and if found, read the timestamp and store the new file offset.
|
||||||
|
* This also takes care of new subscriptions and parameter updates. When reaching EOF,
|
||||||
|
* the subscription is set to invalid.
|
||||||
|
* File seek position is arbitrary after this call.
|
||||||
|
* @return false on file error
|
||||||
|
*/
|
||||||
|
bool nextDataMessage(std::ifstream &file, Subscription &subscription, int msg_id);
|
||||||
|
|
||||||
|
std::vector<Subscription> _subscriptions;
|
||||||
|
std::vector<uint8_t> _read_buffer;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool _task_should_exit = false;
|
bool _task_should_exit = false;
|
||||||
@@ -136,9 +161,6 @@ private:
|
|||||||
uint64_t _file_start_time;
|
uint64_t _file_start_time;
|
||||||
uint64_t _replay_start_time;
|
uint64_t _replay_start_time;
|
||||||
std::streampos _data_section_start; ///< first ADD_LOGGED_MSG message
|
std::streampos _data_section_start; ///< first ADD_LOGGED_MSG message
|
||||||
std::vector<uint8_t> _read_buffer;
|
|
||||||
|
|
||||||
std::vector<Subscription> _subscriptions;
|
|
||||||
|
|
||||||
/** keep track of file position to avoid adding a subscription multiple times. */
|
/** keep track of file position to avoid adding a subscription multiple times. */
|
||||||
std::streampos _subscription_file_pos = 0;
|
std::streampos _subscription_file_pos = 0;
|
||||||
@@ -173,16 +195,6 @@ private:
|
|||||||
bool readDropout(std::ifstream &file, uint16_t msg_size);
|
bool readDropout(std::ifstream &file, uint16_t msg_size);
|
||||||
bool readAndApplyParameter(std::ifstream &file, uint16_t msg_size);
|
bool readAndApplyParameter(std::ifstream &file, uint16_t msg_size);
|
||||||
|
|
||||||
/**
|
|
||||||
* Find next data message for this subscription, starting with the stored file offset.
|
|
||||||
* Skip the first message, and if found, read the timestamp and store the new file offset.
|
|
||||||
* This also takes care of new subscriptions and parameter updates. When reaching EOF,
|
|
||||||
* the subscription is set to invalid.
|
|
||||||
* File seek position is arbitrary after this call.
|
|
||||||
* @return false on file error
|
|
||||||
*/
|
|
||||||
bool nextDataMessage(std::ifstream &file, Subscription &subscription, int msg_id);
|
|
||||||
|
|
||||||
static const orb_metadata *findTopic(const std::string &name);
|
static const orb_metadata *findTopic(const std::string &name);
|
||||||
/** get the array size from a type. eg. float[3] -> return float */
|
/** get the array size from a type. eg. float[3] -> return float */
|
||||||
static std::string extractArraySize(const std::string &type_name_full, int &array_size);
|
static std::string extractArraySize(const std::string &type_name_full, int &array_size);
|
||||||
@@ -217,7 +229,8 @@ protected:
|
|||||||
* @param data
|
* @param data
|
||||||
* @return true if published, false otherwise
|
* @return true if published, false otherwise
|
||||||
*/
|
*/
|
||||||
bool handleTopicUpdate(Subscription &sub, void *data) override;
|
bool handleTopicUpdate(Subscription &sub, void *data, std::ifstream &replay_file) override;
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
|||||||
@@ -371,6 +371,8 @@ bool Replay::readAndAddSubscription(std::ifstream &file, uint16_t msg_size)
|
|||||||
|
|
||||||
_subscriptions[msg_id] = subscription;
|
_subscriptions[msg_id] = subscription;
|
||||||
|
|
||||||
|
onSubscriptionAdded(_subscriptions[msg_id], msg_id);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -672,7 +674,7 @@ void Replay::task_main()
|
|||||||
for (size_t i = 0; i < _subscriptions.size(); ++i) {
|
for (size_t i = 0; i < _subscriptions.size(); ++i) {
|
||||||
const Subscription &subscription = _subscriptions[i];
|
const Subscription &subscription = _subscriptions[i];
|
||||||
|
|
||||||
if (subscription.orb_meta) {
|
if (subscription.orb_meta && !subscription.ignored) {
|
||||||
if (next_file_time == 0 || subscription.next_timestamp < next_file_time) {
|
if (next_file_time == 0 || subscription.next_timestamp < next_file_time) {
|
||||||
next_msg_id = (int)i;
|
next_msg_id = (int)i;
|
||||||
next_file_time = subscription.next_timestamp;
|
next_file_time = subscription.next_timestamp;
|
||||||
@@ -704,18 +706,14 @@ void Replay::task_main()
|
|||||||
|
|
||||||
|
|
||||||
//It's time to publish
|
//It's time to publish
|
||||||
const size_t msg_read_size = sub.orb_meta->o_size_no_padding;
|
readTopicDataToBuffer(sub, replay_file);
|
||||||
const size_t msg_write_size = sub.orb_meta->o_size;
|
memcpy(_read_buffer.data() + sub.timestamp_offset, &publish_timestamp, sizeof(uint64_t)); //adjust the timestamp
|
||||||
_read_buffer.reserve(msg_write_size);
|
|
||||||
replay_file.seekg(sub.next_read_pos + (streamoff)(ULOG_MSG_HEADER_LEN + 2)); //skip header & msg id
|
|
||||||
replay_file.read((char *)_read_buffer.data(), msg_read_size);
|
|
||||||
*(uint64_t *)(_read_buffer.data() + sub.timestamp_offset) = publish_timestamp;
|
|
||||||
|
|
||||||
if (handleTopicUpdate(sub, _read_buffer.data())) {
|
if (handleTopicUpdate(sub, _read_buffer.data(), replay_file)) {
|
||||||
++nr_published_messages;
|
++nr_published_messages;
|
||||||
}
|
}
|
||||||
|
|
||||||
nextDataMessage(replay_file, _subscriptions[next_msg_id], next_msg_id);
|
nextDataMessage(replay_file, sub, next_msg_id);
|
||||||
|
|
||||||
//TODO: output status (eg. every sec), including total duration...
|
//TODO: output status (eg. every sec), including total duration...
|
||||||
}
|
}
|
||||||
@@ -737,7 +735,16 @@ void Replay::task_main()
|
|||||||
onExitMainLoop();
|
onExitMainLoop();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Replay::handleTopicUpdate(Subscription &sub, void *data)
|
void Replay::readTopicDataToBuffer(const Subscription &sub, std::ifstream &replay_file)
|
||||||
|
{
|
||||||
|
const size_t msg_read_size = sub.orb_meta->o_size_no_padding;
|
||||||
|
const size_t msg_write_size = sub.orb_meta->o_size;
|
||||||
|
_read_buffer.reserve(msg_write_size);
|
||||||
|
replay_file.seekg(sub.next_read_pos + (streamoff)(ULOG_MSG_HEADER_LEN + 2)); //skip header & msg id
|
||||||
|
replay_file.read((char *)_read_buffer.data(), msg_read_size);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Replay::handleTopicUpdate(Subscription &sub, void *data, std::ifstream &replay_file)
|
||||||
{
|
{
|
||||||
return publishTopic(sub, data);
|
return publishTopic(sub, data);
|
||||||
}
|
}
|
||||||
@@ -763,12 +770,12 @@ bool Replay::publishTopic(Subscription &sub, void *data)
|
|||||||
bool published = false;
|
bool published = false;
|
||||||
|
|
||||||
if (sub.orb_advert) {
|
if (sub.orb_advert) {
|
||||||
orb_publish(sub.orb_meta, sub.orb_advert, _read_buffer.data());
|
orb_publish(sub.orb_meta, sub.orb_advert, data);
|
||||||
published = true;
|
published = true;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if (sub.multi_id == 0) {
|
if (sub.multi_id == 0) {
|
||||||
sub.orb_advert = orb_advertise(sub.orb_meta, _read_buffer.data());
|
sub.orb_advert = orb_advertise(sub.orb_meta, data);
|
||||||
published = true;
|
published = true;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
@@ -786,8 +793,7 @@ bool Replay::publishTopic(Subscription &sub, void *data)
|
|||||||
|
|
||||||
if (advertised) {
|
if (advertised) {
|
||||||
int instance;
|
int instance;
|
||||||
sub.orb_advert = orb_advertise_multi(sub.orb_meta, _read_buffer.data(),
|
sub.orb_advert = orb_advertise_multi(sub.orb_meta, data, &instance, ORB_PRIO_DEFAULT);
|
||||||
&instance, ORB_PRIO_DEFAULT);
|
|
||||||
published = true;
|
published = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -796,7 +802,7 @@ bool Replay::publishTopic(Subscription &sub, void *data)
|
|||||||
return published;
|
return published;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ReplayEkf2::handleTopicUpdate(Subscription &sub, void *data)
|
bool ReplayEkf2::handleTopicUpdate(Subscription &sub, void *data, std::ifstream &replay_file)
|
||||||
{
|
{
|
||||||
if (sub.orb_meta == ORB_ID(ekf2_replay)) {
|
if (sub.orb_meta == ORB_ID(ekf2_replay)) {
|
||||||
struct ekf2_replay_s ekf2_replay;
|
struct ekf2_replay_s ekf2_replay;
|
||||||
|
|||||||
Reference in New Issue
Block a user