diff --git a/src/modules/uxrce_dds_client/dds_topics.h.em b/src/modules/uxrce_dds_client/dds_topics.h.em index 3c4311e6c3..69c8fc58cc 100644 --- a/src/modules/uxrce_dds_client/dds_topics.h.em +++ b/src/modules/uxrce_dds_client/dds_topics.h.em @@ -83,6 +83,8 @@ void SendTopicsSubs::reset() { num_payload_sent = 0; for (unsigned idx = 0; idx < sizeof(send_subscriptions)/sizeof(send_subscriptions[0]); ++idx) { send_subscriptions[idx].data_writer = uxr_object_id(0, UXR_INVALID_ID); + orb_unsubscribe(fds[idx].fd); + fds[idx].fd = -1; } }; diff --git a/src/modules/uxrce_dds_client/uxrce_dds_client.cpp b/src/modules/uxrce_dds_client/uxrce_dds_client.cpp index 20428c691e..690ae53af1 100644 --- a/src/modules/uxrce_dds_client/uxrce_dds_client.cpp +++ b/src/modules/uxrce_dds_client/uxrce_dds_client.cpp @@ -49,10 +49,8 @@ #include #include -#define STREAM_HISTORY 4 -#define BUFFER_SIZE (UXR_CONFIG_SERIAL_TRANSPORT_MTU * STREAM_HISTORY) // MTU==512 by default - #define PARTICIPANT_XML_SIZE 512 +static constexpr uint8_t TIMESYNC_MAX_TIMEOUTS = 10; using namespace time_literals; @@ -211,6 +209,182 @@ void UxrceddsClient::deinit() _comm = nullptr; } +bool UxrceddsClient::setup_session(uxrSession *session) +{ + _participant_config = static_cast(_param_uxrce_dds_ptcfg.get()); + _synchronize_timestamps = (_param_uxrce_dds_synct.get() > 0); + + bool got_response = false; + + while (!should_exit() && !got_response) { + // Sending ping without initing a XRCE session + got_response = uxr_ping_agent_attempts(_comm, 1000, 1); + } + + if (!got_response) { + return false; + } + + // Session + // The key identifier of the Client. All Clients connected to an Agent must have a different key. + const uint32_t key = (uint32_t)_param_uxrce_key.get(); + + if (key == 0) { + PX4_ERR("session key must be different from zero"); + return false; + } + + uxr_init_session(session, _comm, key); + + if (!uxr_create_session(session)) { + PX4_ERR("uxr_create_session failed"); + return false; + } + + // Streams + // Reliable for setup, afterwards best-effort to send the data (important: need to create all 4 streams) + _reliable_out = uxr_create_output_reliable_stream(session, _output_reliable_stream_buffer, + sizeof(_output_reliable_stream_buffer), STREAM_HISTORY); + + _best_effort_out = uxr_create_output_best_effort_stream(session, _output_data_stream_buffer, + sizeof(_output_data_stream_buffer)); + + uxrStreamId reliable_in = uxr_create_input_reliable_stream(session, _input_reliable_stream_buffer, + sizeof(_input_reliable_stream_buffer), + STREAM_HISTORY); + + uxrStreamId best_effort_in = uxr_create_input_best_effort_stream(session); + + // Create entities + _participant_id = uxr_object_id(0x01, UXR_PARTICIPANT_ID); + + uint16_t domain_id = _param_uxrce_dds_dom_id.get(); + + uint16_t participant_req{}; + + if (_participant_config == ParticipantConfig::Custom) { + // Create participant by reference (XML not required) + participant_req = uxr_buffer_create_participant_ref(session, _reliable_out, _participant_id, domain_id, + "px4_participant", UXR_REPLACE); + + } else { + // Construct participant XML and create participant by XML + char participant_xml[PARTICIPANT_XML_SIZE]; + int ret = snprintf(participant_xml, PARTICIPANT_XML_SIZE, "%s%s/px4_micro_xrce_dds%s", + (_participant_config == ParticipantConfig::LocalHostOnly) ? + "" + "" + "" + "" + "udp_localhost" + "UDPv4" + "
127.0.0.1
" + "
" + "
" + "
" + "" + "" + : + "" + "" + "", + _client_namespace != nullptr ? + _client_namespace + : + "", + (_participant_config == ParticipantConfig::LocalHostOnly) ? + "false" + "udp_localhost" + "" + "" + "" + : + "" + "" + "
" + ); + + if (ret < 0 || ret >= PARTICIPANT_XML_SIZE) { + PX4_ERR("create entities failed: namespace too long"); + return false; + } + + participant_req = uxr_buffer_create_participant_xml(session, _reliable_out, _participant_id, domain_id, + participant_xml, UXR_REPLACE); + } + + uint8_t request_status; + + if (!uxr_run_session_until_all_status(session, 1000, &participant_req, &request_status, 1)) { + PX4_ERR("create entities failed: participant: %i", request_status); + return false; + } + + // Set time-callback. + if (_synchronize_timestamps) { + uxr_set_time_callback(session, on_time, &_timesync); + + } else { + uxr_set_time_callback(session, on_time_no_sync, nullptr); + } + + uxr_set_request_callback(session, on_request, this); + uint8_t sync_timeouts = 0; + + // Spin until in sync with the Agent or the session time sync has multiple timeouts + while (_synchronize_timestamps) { + if (uxr_sync_session(session, 1000)) { + if (_timesync.sync_converged()) { + PX4_INFO("synchronized with time offset %-5" PRId64 "us", session->time_offset / 1000); + + if (_param_uxrce_dds_syncc.get() > 0) { + syncSystemClock(session); + } + + break; + } + + sync_timeouts = 0; + + } else { + sync_timeouts++; + } + + if (sync_timeouts > TIMESYNC_MAX_TIMEOUTS) { + return false; + } + + px4_usleep(10'000); + } + + if (!_pubs->init(session, _reliable_out, reliable_in, best_effort_in, _participant_id, _client_namespace)) { + PX4_ERR("pubs init failed"); + return false; + } + + // create VehicleCommand replier + if (_num_of_repliers < MAX_NUM_REPLIERS) { + if (add_replier(new VehicleCommandSrv(session, _reliable_out, reliable_in, _participant_id, _client_namespace, + _num_of_repliers))) { + PX4_ERR("replier init failed"); + return false; + } + } + + _connected = true; + return true; +} + +void UxrceddsClient::delete_session(uxrSession *session) +{ + delete_repliers(); + uxr_delete_session_retries(session, _connected ? 1 : 0); + + _last_payload_tx_rate = 0; + _subs->reset(); + _timesync.reset_filter(); +} + UxrceddsClient::~UxrceddsClient() { delete _subs; @@ -334,6 +508,7 @@ void UxrceddsClient::run() { _subs = new SendTopicsSubs(); _pubs = new RcvTopicsPubs(); + uxrSession session; if (!_subs || !_pubs) { PX4_ERR("alloc failed"); @@ -341,169 +516,27 @@ void UxrceddsClient::run() } while (!should_exit()) { - - while (!should_exit() && !_comm) { + while (!should_exit()) { if (!init()) { - // sleep before trying again px4_usleep(1'000'000); - } - } - - _participant_config = static_cast(_param_uxrce_dds_ptcfg.get()); - _synchronize_timestamps = (_param_uxrce_dds_synct.get() > 0); - - bool got_response = false; - - while (!should_exit() && !got_response) { - // Sending ping without initing a XRCE session - got_response = uxr_ping_agent_attempts(_comm, 1000, 1); - } - - if (!got_response) { - break; - } - - // Session - // The key identifier of the Client. All Clients connected to an Agent must have a different key. - const uint32_t key = (uint32_t)_param_uxrce_key.get(); - - if (key == 0) { - PX4_ERR("session key must be different from zero"); - return; - } - - uxrSession session; - uxr_init_session(&session, _comm, key); - - // void uxr_create_session_retries(uxrSession* session, size_t retries); - if (!uxr_create_session(&session)) { - PX4_ERR("uxr_create_session failed"); - return; - } - - // TODO: uxr_set_status_callback - - // Streams - // Reliable for setup, afterwards best-effort to send the data (important: need to create all 4 streams) - uint8_t output_reliable_stream_buffer[BUFFER_SIZE] {}; - uxrStreamId reliable_out = uxr_create_output_reliable_stream(&session, output_reliable_stream_buffer, - sizeof(output_reliable_stream_buffer), STREAM_HISTORY); - - uint8_t output_data_stream_buffer[2048] {}; - uxrStreamId best_effort_out = uxr_create_output_best_effort_stream(&session, output_data_stream_buffer, - sizeof(output_data_stream_buffer)); - - uint8_t input_reliable_stream_buffer[BUFFER_SIZE] {}; - uxrStreamId reliable_in = uxr_create_input_reliable_stream(&session, input_reliable_stream_buffer, - sizeof(input_reliable_stream_buffer), - STREAM_HISTORY); - - uxrStreamId best_effort_in = uxr_create_input_best_effort_stream(&session); - - // Create entities - uxrObjectId participant_id = uxr_object_id(0x01, UXR_PARTICIPANT_ID); - - uint16_t domain_id = _param_uxrce_dds_dom_id.get(); - - uint16_t participant_req{}; - - if (_participant_config == ParticipantConfig::Custom) { - // Create participant by reference (XML not required) - participant_req = uxr_buffer_create_participant_ref(&session, reliable_out, participant_id, domain_id, - "px4_participant", UXR_REPLACE); - - } else { - // Construct participant XML and create participant by XML - char participant_xml[PARTICIPANT_XML_SIZE]; - int ret = snprintf(participant_xml, PARTICIPANT_XML_SIZE, "%s%s/px4_micro_xrce_dds%s", - (_participant_config == ParticipantConfig::LocalHostOnly) ? - "" - "" - "" - "" - "udp_localhost" - "UDPv4" - "
127.0.0.1
" - "
" - "
" - "
" - "" - "" - : - "" - "" - "", - _client_namespace != nullptr ? - _client_namespace - : - "", - (_participant_config == ParticipantConfig::LocalHostOnly) ? - "false" - "udp_localhost" - "" - "" - "" - : - "" - "" - "
" - ); - - if (ret < 0 || ret >= PARTICIPANT_XML_SIZE) { - PX4_ERR("create entities failed: namespace too long"); - return; + PX4_ERR("init failed, will retry now"); + continue; } - participant_req = uxr_buffer_create_participant_xml(&session, reliable_out, participant_id, domain_id, - participant_xml, UXR_REPLACE); - } - - uint8_t request_status; - - if (!uxr_run_session_until_all_status(&session, 1000, &participant_req, &request_status, 1)) { - PX4_ERR("create entities failed: participant: %i", request_status); - return; - } - - if (!_pubs->init(&session, reliable_out, reliable_in, best_effort_in, participant_id, _client_namespace)) { - PX4_ERR("pubs init failed"); - return; - } - - // create VehicleCommand replier - if (_num_of_repliers < MAX_NUM_REPLIERS) { - if (add_replier(new VehicleCommandSrv(&session, reliable_out, reliable_in, participant_id, _client_namespace, - _num_of_repliers))) { - PX4_ERR("replier init failed"); - return; + if (!setup_session(&session)) { + delete_session(&session); + px4_usleep(1'000'000); + PX4_ERR("session setup failed, will retry now"); + continue; } - } - - _connected = true; - - // Set time-callback. - if (_synchronize_timestamps) { - uxr_set_time_callback(&session, on_time, &_timesync); - - } else { - uxr_set_time_callback(&session, on_time_no_sync, nullptr); - } - - uxr_set_request_callback(&session, on_request, this); - - // Spin until sync with the Agent - while (_synchronize_timestamps) { - if (uxr_sync_session(&session, 1000) && _timesync.sync_converged()) { - PX4_INFO("synchronized with time offset %-5" PRId64 "us", session.time_offset / 1000); - - if (_param_uxrce_dds_syncc.get() > 0) { - syncSystemClock(&session); - } + if (_comm && _connected) { break; } + } - px4_usleep(10'000); + if (should_exit()) { + return; } hrt_abstime last_sync_session = 0; @@ -518,7 +551,6 @@ void UxrceddsClient::run() _subs->init(); while (!should_exit() && _connected) { - perf_begin(_loop_perf); perf_count(_loop_interval_perf); @@ -537,7 +569,7 @@ void UxrceddsClient::run() /* Handle the poll results */ if (poll > 0) { - _subs->update(&session, reliable_out, best_effort_out, participant_id, _client_namespace); + _subs->update(&session, _reliable_out, _best_effort_out, _participant_id, _client_namespace); } else { if (poll < 0) { @@ -632,13 +664,7 @@ void UxrceddsClient::run() } - delete_repliers(); - - uxr_delete_session_retries(&session, _connected ? 1 : 0); - _last_payload_tx_rate = 0; - _last_payload_tx_rate = 0; - _subs->reset(); - _timesync.reset_filter(); + delete_session(&session); } } @@ -831,7 +857,7 @@ int UxrceddsClient::task_spawn(int argc, char *argv[]) _task_id = px4_task_spawn_cmd("uxrce_dds_client", SCHED_DEFAULT, SCHED_PRIORITY_DEFAULT, - PX4_STACK_ADJUSTED(12000), + PX4_STACK_ADJUSTED(8000), (px4_main_t)&run_trampoline, (char *const *)argv); diff --git a/src/modules/uxrce_dds_client/uxrce_dds_client.h b/src/modules/uxrce_dds_client/uxrce_dds_client.h index a0c563b279..d00571d3f9 100644 --- a/src/modules/uxrce_dds_client/uxrce_dds_client.h +++ b/src/modules/uxrce_dds_client/uxrce_dds_client.h @@ -53,6 +53,8 @@ #include "srv_base.h" #define MAX_NUM_REPLIERS 5 +#define STREAM_HISTORY 4 +#define BUFFER_SIZE (UXR_CONFIG_SERIAL_TRANSPORT_MTU * STREAM_HISTORY) // MTU==512 by default class UxrceddsClient : public ModuleBase, public ModuleParams { @@ -118,6 +120,9 @@ private: bool init(); void deinit(); + bool setup_session(uxrSession *session); + void delete_session(uxrSession *session); + bool setBaudrate(int fd, unsigned baud); void handleMessageFormatRequest(); @@ -159,6 +164,15 @@ private: SendTopicsSubs *_subs{nullptr}; RcvTopicsPubs *_pubs{nullptr}; + uxrObjectId _participant_id; + + uint8_t _output_reliable_stream_buffer[BUFFER_SIZE] {}; + uint8_t _output_data_stream_buffer[2048] {}; + uint8_t _input_reliable_stream_buffer[BUFFER_SIZE] {}; + + uxrStreamId _reliable_out; + uxrStreamId _best_effort_out; + SrvBase *_repliers[MAX_NUM_REPLIERS]; uint8_t _num_of_repliers{0};