Retry logic in uxrce_dds_client startup (#23723)

* Added retries if uxrce_dds_client fails during setup

* Break timesync endless loop in case of agent reset

* ORB unsubscribe, to avoid too many open fails with multiple reconnects & format

* Added review feedback
This commit is contained in:
Alexander Lerach
2024-09-26 17:43:49 +02:00
committed by GitHub
parent ba75b9c35e
commit 224aef142c
3 changed files with 209 additions and 167 deletions
@@ -83,6 +83,8 @@ void SendTopicsSubs::reset() {
num_payload_sent = 0;
for (unsigned idx = 0; idx < sizeof(send_subscriptions)/sizeof(send_subscriptions[0]); ++idx) {
send_subscriptions[idx].data_writer = uxr_object_id(0, UXR_INVALID_ID);
orb_unsubscribe(fds[idx].fd);
fds[idx].fd = -1;
}
};
+193 -167
View File
@@ -49,10 +49,8 @@
#include <stdlib.h>
#include <unistd.h>
#define STREAM_HISTORY 4
#define BUFFER_SIZE (UXR_CONFIG_SERIAL_TRANSPORT_MTU * STREAM_HISTORY) // MTU==512 by default
#define PARTICIPANT_XML_SIZE 512
static constexpr uint8_t TIMESYNC_MAX_TIMEOUTS = 10;
using namespace time_literals;
@@ -211,6 +209,182 @@ void UxrceddsClient::deinit()
_comm = nullptr;
}
bool UxrceddsClient::setup_session(uxrSession *session)
{
_participant_config = static_cast<ParticipantConfig>(_param_uxrce_dds_ptcfg.get());
_synchronize_timestamps = (_param_uxrce_dds_synct.get() > 0);
bool got_response = false;
while (!should_exit() && !got_response) {
// Sending ping without initing a XRCE session
got_response = uxr_ping_agent_attempts(_comm, 1000, 1);
}
if (!got_response) {
return false;
}
// Session
// The key identifier of the Client. All Clients connected to an Agent must have a different key.
const uint32_t key = (uint32_t)_param_uxrce_key.get();
if (key == 0) {
PX4_ERR("session key must be different from zero");
return false;
}
uxr_init_session(session, _comm, key);
if (!uxr_create_session(session)) {
PX4_ERR("uxr_create_session failed");
return false;
}
// Streams
// Reliable for setup, afterwards best-effort to send the data (important: need to create all 4 streams)
_reliable_out = uxr_create_output_reliable_stream(session, _output_reliable_stream_buffer,
sizeof(_output_reliable_stream_buffer), STREAM_HISTORY);
_best_effort_out = uxr_create_output_best_effort_stream(session, _output_data_stream_buffer,
sizeof(_output_data_stream_buffer));
uxrStreamId reliable_in = uxr_create_input_reliable_stream(session, _input_reliable_stream_buffer,
sizeof(_input_reliable_stream_buffer),
STREAM_HISTORY);
uxrStreamId best_effort_in = uxr_create_input_best_effort_stream(session);
// Create entities
_participant_id = uxr_object_id(0x01, UXR_PARTICIPANT_ID);
uint16_t domain_id = _param_uxrce_dds_dom_id.get();
uint16_t participant_req{};
if (_participant_config == ParticipantConfig::Custom) {
// Create participant by reference (XML not required)
participant_req = uxr_buffer_create_participant_ref(session, _reliable_out, _participant_id, domain_id,
"px4_participant", UXR_REPLACE);
} else {
// Construct participant XML and create participant by XML
char participant_xml[PARTICIPANT_XML_SIZE];
int ret = snprintf(participant_xml, PARTICIPANT_XML_SIZE, "%s<name>%s/px4_micro_xrce_dds</name>%s",
(_participant_config == ParticipantConfig::LocalHostOnly) ?
"<dds>"
"<profiles>"
"<transport_descriptors>"
"<transport_descriptor>"
"<transport_id>udp_localhost</transport_id>"
"<type>UDPv4</type>"
"<interfaceWhiteList><address>127.0.0.1</address></interfaceWhiteList>"
"</transport_descriptor>"
"</transport_descriptors>"
"</profiles>"
"<participant>"
"<rtps>"
:
"<dds>"
"<participant>"
"<rtps>",
_client_namespace != nullptr ?
_client_namespace
:
"",
(_participant_config == ParticipantConfig::LocalHostOnly) ?
"<useBuiltinTransports>false</useBuiltinTransports>"
"<userTransports><transport_id>udp_localhost</transport_id></userTransports>"
"</rtps>"
"</participant>"
"</dds>"
:
"</rtps>"
"</participant>"
"</dds>"
);
if (ret < 0 || ret >= PARTICIPANT_XML_SIZE) {
PX4_ERR("create entities failed: namespace too long");
return false;
}
participant_req = uxr_buffer_create_participant_xml(session, _reliable_out, _participant_id, domain_id,
participant_xml, UXR_REPLACE);
}
uint8_t request_status;
if (!uxr_run_session_until_all_status(session, 1000, &participant_req, &request_status, 1)) {
PX4_ERR("create entities failed: participant: %i", request_status);
return false;
}
// Set time-callback.
if (_synchronize_timestamps) {
uxr_set_time_callback(session, on_time, &_timesync);
} else {
uxr_set_time_callback(session, on_time_no_sync, nullptr);
}
uxr_set_request_callback(session, on_request, this);
uint8_t sync_timeouts = 0;
// Spin until in sync with the Agent or the session time sync has multiple timeouts
while (_synchronize_timestamps) {
if (uxr_sync_session(session, 1000)) {
if (_timesync.sync_converged()) {
PX4_INFO("synchronized with time offset %-5" PRId64 "us", session->time_offset / 1000);
if (_param_uxrce_dds_syncc.get() > 0) {
syncSystemClock(session);
}
break;
}
sync_timeouts = 0;
} else {
sync_timeouts++;
}
if (sync_timeouts > TIMESYNC_MAX_TIMEOUTS) {
return false;
}
px4_usleep(10'000);
}
if (!_pubs->init(session, _reliable_out, reliable_in, best_effort_in, _participant_id, _client_namespace)) {
PX4_ERR("pubs init failed");
return false;
}
// create VehicleCommand replier
if (_num_of_repliers < MAX_NUM_REPLIERS) {
if (add_replier(new VehicleCommandSrv(session, _reliable_out, reliable_in, _participant_id, _client_namespace,
_num_of_repliers))) {
PX4_ERR("replier init failed");
return false;
}
}
_connected = true;
return true;
}
void UxrceddsClient::delete_session(uxrSession *session)
{
delete_repliers();
uxr_delete_session_retries(session, _connected ? 1 : 0);
_last_payload_tx_rate = 0;
_subs->reset();
_timesync.reset_filter();
}
UxrceddsClient::~UxrceddsClient()
{
delete _subs;
@@ -334,6 +508,7 @@ void UxrceddsClient::run()
{
_subs = new SendTopicsSubs();
_pubs = new RcvTopicsPubs();
uxrSession session;
if (!_subs || !_pubs) {
PX4_ERR("alloc failed");
@@ -341,169 +516,27 @@ void UxrceddsClient::run()
}
while (!should_exit()) {
while (!should_exit() && !_comm) {
while (!should_exit()) {
if (!init()) {
// sleep before trying again
px4_usleep(1'000'000);
}
}
_participant_config = static_cast<ParticipantConfig>(_param_uxrce_dds_ptcfg.get());
_synchronize_timestamps = (_param_uxrce_dds_synct.get() > 0);
bool got_response = false;
while (!should_exit() && !got_response) {
// Sending ping without initing a XRCE session
got_response = uxr_ping_agent_attempts(_comm, 1000, 1);
}
if (!got_response) {
break;
}
// Session
// The key identifier of the Client. All Clients connected to an Agent must have a different key.
const uint32_t key = (uint32_t)_param_uxrce_key.get();
if (key == 0) {
PX4_ERR("session key must be different from zero");
return;
}
uxrSession session;
uxr_init_session(&session, _comm, key);
// void uxr_create_session_retries(uxrSession* session, size_t retries);
if (!uxr_create_session(&session)) {
PX4_ERR("uxr_create_session failed");
return;
}
// TODO: uxr_set_status_callback
// Streams
// Reliable for setup, afterwards best-effort to send the data (important: need to create all 4 streams)
uint8_t output_reliable_stream_buffer[BUFFER_SIZE] {};
uxrStreamId reliable_out = uxr_create_output_reliable_stream(&session, output_reliable_stream_buffer,
sizeof(output_reliable_stream_buffer), STREAM_HISTORY);
uint8_t output_data_stream_buffer[2048] {};
uxrStreamId best_effort_out = uxr_create_output_best_effort_stream(&session, output_data_stream_buffer,
sizeof(output_data_stream_buffer));
uint8_t input_reliable_stream_buffer[BUFFER_SIZE] {};
uxrStreamId reliable_in = uxr_create_input_reliable_stream(&session, input_reliable_stream_buffer,
sizeof(input_reliable_stream_buffer),
STREAM_HISTORY);
uxrStreamId best_effort_in = uxr_create_input_best_effort_stream(&session);
// Create entities
uxrObjectId participant_id = uxr_object_id(0x01, UXR_PARTICIPANT_ID);
uint16_t domain_id = _param_uxrce_dds_dom_id.get();
uint16_t participant_req{};
if (_participant_config == ParticipantConfig::Custom) {
// Create participant by reference (XML not required)
participant_req = uxr_buffer_create_participant_ref(&session, reliable_out, participant_id, domain_id,
"px4_participant", UXR_REPLACE);
} else {
// Construct participant XML and create participant by XML
char participant_xml[PARTICIPANT_XML_SIZE];
int ret = snprintf(participant_xml, PARTICIPANT_XML_SIZE, "%s<name>%s/px4_micro_xrce_dds</name>%s",
(_participant_config == ParticipantConfig::LocalHostOnly) ?
"<dds>"
"<profiles>"
"<transport_descriptors>"
"<transport_descriptor>"
"<transport_id>udp_localhost</transport_id>"
"<type>UDPv4</type>"
"<interfaceWhiteList><address>127.0.0.1</address></interfaceWhiteList>"
"</transport_descriptor>"
"</transport_descriptors>"
"</profiles>"
"<participant>"
"<rtps>"
:
"<dds>"
"<participant>"
"<rtps>",
_client_namespace != nullptr ?
_client_namespace
:
"",
(_participant_config == ParticipantConfig::LocalHostOnly) ?
"<useBuiltinTransports>false</useBuiltinTransports>"
"<userTransports><transport_id>udp_localhost</transport_id></userTransports>"
"</rtps>"
"</participant>"
"</dds>"
:
"</rtps>"
"</participant>"
"</dds>"
);
if (ret < 0 || ret >= PARTICIPANT_XML_SIZE) {
PX4_ERR("create entities failed: namespace too long");
return;
PX4_ERR("init failed, will retry now");
continue;
}
participant_req = uxr_buffer_create_participant_xml(&session, reliable_out, participant_id, domain_id,
participant_xml, UXR_REPLACE);
}
uint8_t request_status;
if (!uxr_run_session_until_all_status(&session, 1000, &participant_req, &request_status, 1)) {
PX4_ERR("create entities failed: participant: %i", request_status);
return;
}
if (!_pubs->init(&session, reliable_out, reliable_in, best_effort_in, participant_id, _client_namespace)) {
PX4_ERR("pubs init failed");
return;
}
// create VehicleCommand replier
if (_num_of_repliers < MAX_NUM_REPLIERS) {
if (add_replier(new VehicleCommandSrv(&session, reliable_out, reliable_in, participant_id, _client_namespace,
_num_of_repliers))) {
PX4_ERR("replier init failed");
return;
if (!setup_session(&session)) {
delete_session(&session);
px4_usleep(1'000'000);
PX4_ERR("session setup failed, will retry now");
continue;
}
}
_connected = true;
// Set time-callback.
if (_synchronize_timestamps) {
uxr_set_time_callback(&session, on_time, &_timesync);
} else {
uxr_set_time_callback(&session, on_time_no_sync, nullptr);
}
uxr_set_request_callback(&session, on_request, this);
// Spin until sync with the Agent
while (_synchronize_timestamps) {
if (uxr_sync_session(&session, 1000) && _timesync.sync_converged()) {
PX4_INFO("synchronized with time offset %-5" PRId64 "us", session.time_offset / 1000);
if (_param_uxrce_dds_syncc.get() > 0) {
syncSystemClock(&session);
}
if (_comm && _connected) {
break;
}
}
px4_usleep(10'000);
if (should_exit()) {
return;
}
hrt_abstime last_sync_session = 0;
@@ -518,7 +551,6 @@ void UxrceddsClient::run()
_subs->init();
while (!should_exit() && _connected) {
perf_begin(_loop_perf);
perf_count(_loop_interval_perf);
@@ -537,7 +569,7 @@ void UxrceddsClient::run()
/* Handle the poll results */
if (poll > 0) {
_subs->update(&session, reliable_out, best_effort_out, participant_id, _client_namespace);
_subs->update(&session, _reliable_out, _best_effort_out, _participant_id, _client_namespace);
} else {
if (poll < 0) {
@@ -632,13 +664,7 @@ void UxrceddsClient::run()
}
delete_repliers();
uxr_delete_session_retries(&session, _connected ? 1 : 0);
_last_payload_tx_rate = 0;
_last_payload_tx_rate = 0;
_subs->reset();
_timesync.reset_filter();
delete_session(&session);
}
}
@@ -831,7 +857,7 @@ int UxrceddsClient::task_spawn(int argc, char *argv[])
_task_id = px4_task_spawn_cmd("uxrce_dds_client",
SCHED_DEFAULT,
SCHED_PRIORITY_DEFAULT,
PX4_STACK_ADJUSTED(12000),
PX4_STACK_ADJUSTED(8000),
(px4_main_t)&run_trampoline,
(char *const *)argv);
@@ -53,6 +53,8 @@
#include "srv_base.h"
#define MAX_NUM_REPLIERS 5
#define STREAM_HISTORY 4
#define BUFFER_SIZE (UXR_CONFIG_SERIAL_TRANSPORT_MTU * STREAM_HISTORY) // MTU==512 by default
class UxrceddsClient : public ModuleBase<UxrceddsClient>, public ModuleParams
{
@@ -118,6 +120,9 @@ private:
bool init();
void deinit();
bool setup_session(uxrSession *session);
void delete_session(uxrSession *session);
bool setBaudrate(int fd, unsigned baud);
void handleMessageFormatRequest();
@@ -159,6 +164,15 @@ private:
SendTopicsSubs *_subs{nullptr};
RcvTopicsPubs *_pubs{nullptr};
uxrObjectId _participant_id;
uint8_t _output_reliable_stream_buffer[BUFFER_SIZE] {};
uint8_t _output_data_stream_buffer[2048] {};
uint8_t _input_reliable_stream_buffer[BUFFER_SIZE] {};
uxrStreamId _reliable_out;
uxrStreamId _best_effort_out;
SrvBase *_repliers[MAX_NUM_REPLIERS];
uint8_t _num_of_repliers{0};