From 0ffc7f0f8a43b4ce61f58bfac8295d45a425d764 Mon Sep 17 00:00:00 2001 From: Vuk-SFL Date: Fri, 1 May 2026 09:29:17 +0200 Subject: [PATCH] feat(zenoh): add support for configuring zenoh publisher options (#27157) * feat(zenoh): add support for configuring zenoh publisher options Add zenoh parameters for common default options for all publishers. Options exposed are reliability, priority, congestion control and is_express Allow override of common publisher options for specific publisher through its config: config fille supports additional row where multiple options can be specified with csv string Expose config options through zenoh config add publisher. Allow options per publisher to be specified for default config in zenoh/dds_topics.yaml * fix(zenoh): Put individual zenoh publisher config override feature under config option ZENOH_PUB_OPTION_OVERRIDE Enabled ZENOH_PUB_OPTION_OVERRIDE on targets with enough flash memory * fix(zenoh): added publication options for default publications in dds_topics.yaml Rare messages that are important to arive: set as reliable, enabled express and with blocking congesiton control Fast vehcile position messages that can impact control decision making: set as best enabled express and with drop congestion control * docs(zenoh) : added documentation for zenoh publisher options usage * docs(docs): Link to params --------- Co-authored-by: Hamish Willee --- boards/nxp/mr-canhubk3/default.px4board | 1 + boards/nxp/mr-tropic/default.px4board | 1 + boards/nxp/tropic-community/default.px4board | 1 + boards/px4/fmu-v6x/zenoh.px4board | 1 + boards/px4/fmu-v6xrt/default.px4board | 1 + boards/px4/sitl/zenoh.px4board | 1 + docs/en/middleware/zenoh.md | 41 ++- .../uxrce_dds_client/generate_dds_topics.py | 11 + src/modules/zenoh/Kconfig | 9 + src/modules/zenoh/dds_topics.yaml | 49 ++++ src/modules/zenoh/default_topics.c.em | 2 +- .../zenoh/publishers/zenoh_publisher.cpp | 5 +- .../zenoh/publishers/zenoh_publisher.hpp | 3 +- src/modules/zenoh/zenoh.cpp | 33 ++- src/modules/zenoh/zenoh.h | 6 +- src/modules/zenoh/zenoh_config.cpp | 237 +++++++++++++++++- src/modules/zenoh/zenoh_config.hpp | 30 ++- src/modules/zenoh/zenoh_params.yaml | 49 ++++ 18 files changed, 464 insertions(+), 17 deletions(-) diff --git a/boards/nxp/mr-canhubk3/default.px4board b/boards/nxp/mr-canhubk3/default.px4board index d46a7370f8..e34cc194a9 100644 --- a/boards/nxp/mr-canhubk3/default.px4board +++ b/boards/nxp/mr-canhubk3/default.px4board @@ -15,6 +15,7 @@ CONFIG_MODULES_MAVLINK=y CONFIG_MODULES_NAVIGATOR=y CONFIG_MODULES_SENSORS=y CONFIG_MODULES_ZENOH=y +CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y CONFIG_SYSTEMCMDS_I2CDETECT=y CONFIG_SYSTEMCMDS_LED_CONTROL=y CONFIG_SYSTEMCMDS_MFT=y diff --git a/boards/nxp/mr-tropic/default.px4board b/boards/nxp/mr-tropic/default.px4board index c46e24f477..40c966fe67 100644 --- a/boards/nxp/mr-tropic/default.px4board +++ b/boards/nxp/mr-tropic/default.px4board @@ -81,6 +81,7 @@ CONFIG_MODULES_SENSORS=y CONFIG_MODULES_UXRCE_DDS_CLIENT=y CONFIG_MODULES_VTOL_ATT_CONTROL=y CONFIG_MODULES_ZENOH=y +CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y CONFIG_SYSTEMCMDS_ACTUATOR_TEST=y CONFIG_SYSTEMCMDS_BSONDUMP=y CONFIG_SYSTEMCMDS_DMESG=y diff --git a/boards/nxp/tropic-community/default.px4board b/boards/nxp/tropic-community/default.px4board index 28144c2b8d..dd609e553a 100644 --- a/boards/nxp/tropic-community/default.px4board +++ b/boards/nxp/tropic-community/default.px4board @@ -77,6 +77,7 @@ CONFIG_MODULES_SENSORS=y CONFIG_MODULES_UXRCE_DDS_CLIENT=y CONFIG_MODULES_VTOL_ATT_CONTROL=y CONFIG_MODULES_ZENOH=y +CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y CONFIG_SYSTEMCMDS_ACTUATOR_TEST=y CONFIG_SYSTEMCMDS_BSONDUMP=y CONFIG_SYSTEMCMDS_DMESG=y diff --git a/boards/px4/fmu-v6x/zenoh.px4board b/boards/px4/fmu-v6x/zenoh.px4board index 4cd6071e6d..a37900a64b 100644 --- a/boards/px4/fmu-v6x/zenoh.px4board +++ b/boards/px4/fmu-v6x/zenoh.px4board @@ -3,3 +3,4 @@ CONFIG_DRIVERS_UAVCAN=n CONFIG_MODULES_UXRCE_DDS_CLIENT=n CONFIG_BOARD_CONSTRAINED_FLASH=y CONFIG_MODULES_ZENOH=y +CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y diff --git a/boards/px4/fmu-v6xrt/default.px4board b/boards/px4/fmu-v6xrt/default.px4board index be4e4183d5..67aeef7a33 100644 --- a/boards/px4/fmu-v6xrt/default.px4board +++ b/boards/px4/fmu-v6xrt/default.px4board @@ -90,6 +90,7 @@ CONFIG_MODULES_SENSORS=y CONFIG_MODULES_UXRCE_DDS_CLIENT=y CONFIG_MODULES_VTOL_ATT_CONTROL=y CONFIG_MODULES_ZENOH=y +CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y CONFIG_SYSTEMCMDS_ACTUATOR_TEST=y CONFIG_SYSTEMCMDS_BSONDUMP=y CONFIG_SYSTEMCMDS_DMESG=y diff --git a/boards/px4/sitl/zenoh.px4board b/boards/px4/sitl/zenoh.px4board index 20ff39d1b0..a010a60b2a 100644 --- a/boards/px4/sitl/zenoh.px4board +++ b/boards/px4/sitl/zenoh.px4board @@ -1,2 +1,3 @@ CONFIG_MODULES_UXRCE_DDS_CLIENT=n CONFIG_MODULES_ZENOH=y +CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y diff --git a/docs/en/middleware/zenoh.md b/docs/en/middleware/zenoh.md index e4ef2c9976..7d35887e82 100644 --- a/docs/en/middleware/zenoh.md +++ b/docs/en/middleware/zenoh.md @@ -59,7 +59,7 @@ For more information about key expressions, refer to the [rmw_zenoh design docum Before setting up the Zenoh communication, first make sure that your firmware contains the driver that implements the [`zenoh` driver](../modules/modules_driver.md#zenoh), which provides the implementation of the _PX4 Zenoh-Pico Node_. -You can check if the module is present on your board by searching for the key `CONFIG_MODULES_ZENOH=y` in your board's `default.px4board` KConfig file. +You can check if the module is present on your board by searching for the key `CONFIG_MODULES_ZENOH=y` in your board's `default.px4board` [KConfig file](../hardware/porting_guide_config.md). For example, you can see that the module is present in `px4_fmu-v6xrt` build targets from [/boards/px4/fmu-v6xrt/default.px4board](https://github.com/PX4/PX4-Autopilot/blob/main/boards/px4/fmu-v6xrt/default.px4board#L91). If `CONFIG_MODULES_ZENOH=y` is not preset you can add this key to your board configuration and rebuild. @@ -89,6 +89,11 @@ inclusion of the message type hash (RIHS01, as defined in REP-2016) in the Zenoh Note that this will break compatibility with ROS 2 Jazzy and later. ::: +::: tip +Per-publisher option overrides can be enabled or disabled at compile time using the `CONFIG_ZENOH_PUB_OPTION_OVERRIDE` KConfig key. +When this is disabled, PX4 uses only global publisher option parameters, and per-publisher CLI/config overrides are not available. +::: + ### Enable Zenoh on PX4 Startup Set the [ZENOH_ENABLE](../advanced_config/parameter_reference.md#ZENOH_ENABLE) parameter to `1` to enable Zenoh on PX4 startup. @@ -132,6 +137,21 @@ This folder contains three key files: - **`pub.csv`** – Maps **uORB topics to ROS2 topics** (used for publishing). - **`sub.csv`** – Maps **ROS2 topics to uORB topics** (used for subscribing). +#### Publisher Options + +Zenoh publisher behaviour can be controlled through global PX4 parameters: + +- [ZENOH_PUB_CC](../advanced_config/parameter_reference.md#ZENOH_PUB_CC): congestion control (`Drop` or `Block`) - controls what happens when the transport path is congested. `Drop` prefers freshness/latency and may drop messages, while `Block` preserves delivery by applying backpressure to the publisher. +- [ZENOH_PUB_REL](../advanced_config/parameter_reference.md#ZENOH_PUB_REL): reliability (`Reliable` or `BestEffort`) - selects the Zenoh reliability mode used by the router path and seen by the underlying ROS 2 `rmw_zenoh` transport. +- [ZENOH_PUB_EXPR](../advanced_config/parameter_reference.md#ZENOH_PUB_EXPR): express mode (`Disabled` or `Enabled`) - when enabled, Zenoh does not wait to batch this operation with others. This usually reduces latency at the cost of higher bandwidth/overhead. +- [ZENOH_PUB_PRIO](../advanced_config/parameter_reference.md#ZENOH_PUB_PRIO): priority (`RealTime`, `InteractiveHigh`, `InteractiveLow`, `DataHigh`, `Data`, `DataLow`, `Background`) - sets relative message priority for routing/scheduling under load. + +These are applied to all Zenoh publishers. + +If `CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y`, individual publishers can override one or more global publisher options. +Default configuration [dds_topics.yaml](../middleware/dds_topics.md) already provides overrides for several publishers. +Individual publisher options can be overriden through the mapping configuration shown in the next section + ### 4. Modifying Topic Mappings Zenoh topic mappings define how data flows between PX4's internal uORB topics and external ROS2 topics via Zenoh. @@ -154,6 +174,25 @@ The main operations and their commands are: zenoh config add publisher [uorb_instance] ``` + When `CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y`, publisher options can also be passed: + + ```sh + zenoh config add publisher [uorb_instance] [options] + ``` + + Options are comma-separated `key=value` pairs: + + - `cc=drop|block` (congestion control) + - `express=true|false` (batching behaviour) + - `prio=real_time|interactive_high|interactive_low|data_high|data|data_low|background` (priority) + - `rel=reliable|best_effort` (reliability) + + Example: + + ```sh + zenoh config add publisher /fmu/out/vehicle_status vehicle_status 0 "cc=block,express=true,rel=best_effort" + ``` + - Subscribe to a Zenoh topic and forward it to a uORB topic: ```sh diff --git a/src/modules/uxrce_dds_client/generate_dds_topics.py b/src/modules/uxrce_dds_client/generate_dds_topics.py index 73fd057d73..64cf957527 100644 --- a/src/modules/uxrce_dds_client/generate_dds_topics.py +++ b/src/modules/uxrce_dds_client/generate_dds_topics.py @@ -102,6 +102,17 @@ def process_message_type(msg_type): # topic_simple: eg vehicle_status msg_type['topic_simple'] = msg_type['topic'].split('/')[-1] + # Optional per-publisher QoS options from YAML 'options:' field. + # Converts e.g. {cc: block, express: true} -> "cc=block,express=true" + opts = msg_type.get('options', None) + if opts and isinstance(opts, dict): + # Normalize booleans to lowercase strings expected by the parser. + msg_type['pub_options_str'] = ','.join( + f"{k}={str(v).lower() if isinstance(v, bool) else v}" for k, v in opts.items() + ) + else: + msg_type['pub_options_str'] = '' + def process_message_instance(msg_type): if 'instance' in msg_type: # if instance is given, check if it is a non negative integer diff --git a/src/modules/zenoh/Kconfig b/src/modules/zenoh/Kconfig index 8408d08092..1db97e7a30 100644 --- a/src/modules/zenoh/Kconfig +++ b/src/modules/zenoh/Kconfig @@ -43,6 +43,15 @@ if MODULES_ZENOH expression (supported by ROS2 Jazzy and later). Set to false to use in ROS2 Humble and earlier. + config ZENOH_PUB_OPTION_OVERRIDE + bool "Enable per-publisher option overrides" + default n + ---help--- + Enable per-publisher overrides for Zenoh publisher options + (cc/express/prio/rel) via config file and CLI. + Disable this to reduce flash usage and use only global + publisher parameters (ZENOH_PUB_CC/REL/EXPR/PRIO). + # Choose exactly one item choice ZENOH_PUBSUB_SELECTION prompt "Publishers/Subscribers selection" diff --git a/src/modules/zenoh/dds_topics.yaml b/src/modules/zenoh/dds_topics.yaml index 6f6f3dc00a..1bde5af93a 100644 --- a/src/modules/zenoh/dds_topics.yaml +++ b/src/modules/zenoh/dds_topics.yaml @@ -2,17 +2,42 @@ # # This file maps all the topics that are to be used on the Zenoh client. # +# Per-publisher options can be overridden via an optional 'options' dict: +# +# - topic: /fmu/out/example +# type: px4_msgs::msg::Example +# options: +# cc: block # congestion_control: drop | block +# express: true # is_express: true | false +# prio: real_time # priority: real_time | interactive_high | interactive_low | +# # data_high | data | data_low | background +# rel: reliable # reliability: reliable | best_effort +# +# Omitting 'options' (or individual keys) inherits from global PX4 parameters +# ZENOH_PUB_CC / ZENOH_PUB_REL / ZENOH_PUB_EXPR / ZENOH_PUB_PRIO. ##### publications: - topic: /fmu/out/register_ext_component_reply type: px4_msgs::msg::RegisterExtComponentReply + options: + cc: block + rel: reliable + express: true - topic: /fmu/out/arming_check_request type: px4_msgs::msg::ArmingCheckRequest + options: + cc: block + rel: reliable + express: true - topic: /fmu/out/mode_completed type: px4_msgs::msg::ModeCompleted + options: + cc: block + rel: reliable + express: true - topic: /fmu/out/battery_status type: px4_msgs::msg::BatteryStatus @@ -46,24 +71,48 @@ publications: - topic: /fmu/out/vehicle_attitude type: px4_msgs::msg::VehicleAttitude + options: + cc: drop + rel: best_effort + express: true - topic: /fmu/out/vehicle_control_mode type: px4_msgs::msg::VehicleControlMode - topic: /fmu/out/vehicle_command_ack type: px4_msgs::msg::VehicleCommandAck + options: + cc: block + rel: reliable + express: true - topic: /fmu/out/vehicle_global_position type: px4_msgs::msg::VehicleGlobalPosition + options: + cc: drop + rel: best_effort + express: true - topic: /fmu/out/vehicle_gps_position type: px4_msgs::msg::SensorGps + options: + cc: drop + rel: best_effort + express: true - topic: /fmu/out/vehicle_local_position type: px4_msgs::msg::VehicleLocalPosition + options: + cc: drop + rel: best_effort + express: true - topic: /fmu/out/vehicle_odometry type: px4_msgs::msg::VehicleOdometry + options: + cc: drop + rel: best_effort + express: true - topic: /fmu/out/vehicle_status type: px4_msgs::msg::VehicleStatus diff --git a/src/modules/zenoh/default_topics.c.em b/src/modules/zenoh/default_topics.c.em index f80b18dff6..5ac9f31106 100644 --- a/src/modules/zenoh/default_topics.c.em +++ b/src/modules/zenoh/default_topics.c.em @@ -18,7 +18,7 @@ import os const char* default_pub_config = @[ for pub in publications]@ - "@(pub['topic']);@(pub['topic_simple']);0\n" + "@(pub['topic']);@(pub['topic_simple']);0@[if pub.get('pub_options_str')];@(pub['pub_options_str'])@[end if]\n" @[ end for]@ ; diff --git a/src/modules/zenoh/publishers/zenoh_publisher.cpp b/src/modules/zenoh/publishers/zenoh_publisher.cpp index c83caa266f..aef04f5823 100644 --- a/src/modules/zenoh/publishers/zenoh_publisher.cpp +++ b/src/modules/zenoh/publishers/zenoh_publisher.cpp @@ -59,7 +59,8 @@ int Zenoh_Publisher::undeclare_publisher() return 0; } -int Zenoh_Publisher::declare_publisher(z_owned_session_t s, const char *keyexpr, uint8_t *gid) +int Zenoh_Publisher::declare_publisher(z_owned_session_t s, const char *keyexpr, uint8_t *gid, + z_publisher_options_t *opts) { z_view_keyexpr_t ke; @@ -68,7 +69,7 @@ int Zenoh_Publisher::declare_publisher(z_owned_session_t s, const char *keyexpr, return -1; } - if (z_declare_publisher(z_loan(s), &_pub, z_loan(ke), NULL) < 0) { + if (z_declare_publisher(z_loan(s), &_pub, z_loan(ke), opts) < 0) { printf("Unable to declare publisher for key expression!\n"); return -1; } diff --git a/src/modules/zenoh/publishers/zenoh_publisher.hpp b/src/modules/zenoh/publishers/zenoh_publisher.hpp index b787f39737..e9dfc38922 100644 --- a/src/modules/zenoh/publishers/zenoh_publisher.hpp +++ b/src/modules/zenoh/publishers/zenoh_publisher.hpp @@ -56,7 +56,8 @@ public: Zenoh_Publisher(); virtual ~Zenoh_Publisher(); - virtual int declare_publisher(z_owned_session_t s, const char *keyexpr, uint8_t *gid); + virtual int declare_publisher(z_owned_session_t s, const char *keyexpr, uint8_t *gid, + z_publisher_options_t *opts); virtual int undeclare_publisher(); diff --git a/src/modules/zenoh/zenoh.cpp b/src/modules/zenoh/zenoh.cpp index 92aeba9d2b..d1ea6940ae 100644 --- a/src/modules/zenoh/zenoh.cpp +++ b/src/modules/zenoh/zenoh.cpp @@ -361,15 +361,38 @@ int ZENOH::setupTopics(px4_pollfd_struct_t *pfds) char topic[TOPIC_INFO_SIZE]; char type[TOPIC_INFO_SIZE]; int instance; + z_publisher_options_t global_opts; + z_publisher_options_default(&global_opts); + global_opts.congestion_control = (z_congestion_control_t)_zenoh_pub_cc.get(); + global_opts.is_express = (bool)_zenoh_pub_expr.get(); + global_opts.priority = (z_priority_t)_zenoh_pub_prio.get(); +#ifdef Z_FEATURE_UNSTABLE_API + global_opts.reliability = (z_reliability_t)_zenoh_pub_rel.get(); +#endif for (i = 0; i < _pub_count; i++) { + +#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE + z_publisher_options_t pub_opts = global_opts; +#endif + +#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE + + if (_config.getPublisherMapping(topic, type, &instance, &pub_opts)) { +#else + if (_config.getPublisherMapping(topic, type, &instance)) { +#endif _zenoh_publishers[i] = genPublisher(type, instance); const uint8_t *rihs_hash = getRIHS01_Hash(type); if (rihs_hash && _zenoh_publishers[i] != 0 && generate_rmw_zenoh_topic_keyexpr(topic, rihs_hash, type, keyexpr) > 0) { - _zenoh_publishers[i]->declare_publisher(_s, keyexpr, (uint8_t *)&_px4_guid); +#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE + _zenoh_publishers[i]->declare_publisher(_s, keyexpr, (uint8_t *)&_px4_guid, &pub_opts); +#else + _zenoh_publishers[i]->declare_publisher(_s, keyexpr, (uint8_t *)&_px4_guid, &global_opts); +#endif _zenoh_publishers[i]->setPollFD(&pfds[i]); #ifdef CONFIG_ZENOH_RMW_LIVELINESS @@ -536,7 +559,15 @@ Zenoh demo bridge PRINT_MODULE_USAGE_COMMAND("stop"); PRINT_MODULE_USAGE_COMMAND("status"); PRINT_MODULE_USAGE_COMMAND("config"); + +#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE + PX4_INFO_RAW(" add publisher [uorb_instance] [options] Publish uORB topic to Zenoh\n"); + PX4_INFO_RAW(" [options] key=value pairs: cc=drop|block, express=true|false,\n"); + PX4_INFO_RAW(" prio=real_time|interactive_high|interactive_low|data_high|data|data_low|background,\n"); + PX4_INFO_RAW(" rel=reliable|best_effort (e.g. \"cc=block,express=true\")\n"); +#else PX4_INFO_RAW(" add publisher Publish uORB topic to Zenoh\n"); +#endif PX4_INFO_RAW(" add subscriber Publish Zenoh topic to uORB\n"); PX4_INFO_RAW(" delete publisher \n"); PX4_INFO_RAW(" delete subscriber \n"); diff --git a/src/modules/zenoh/zenoh.h b/src/modules/zenoh/zenoh.h index 7e2b6d5f5d..871c2a35bd 100644 --- a/src/modules/zenoh/zenoh.h +++ b/src/modules/zenoh/zenoh.h @@ -88,7 +88,11 @@ public: private: DEFINE_PARAMETERS( - (ParamInt) _zenoh_domain_id + (ParamInt) _zenoh_domain_id, + (ParamInt) _zenoh_pub_cc, + (ParamInt) _zenoh_pub_rel, + (ParamInt) _zenoh_pub_expr, + (ParamInt) _zenoh_pub_prio ) int generate_rmw_zenoh_node_liveliness_keyexpr(const z_id_t *id, char *keyexpr); diff --git a/src/modules/zenoh/zenoh_config.cpp b/src/modules/zenoh/zenoh_config.cpp index 1a88194348..cdcf8ec682 100644 --- a/src/modules/zenoh/zenoh_config.cpp +++ b/src/modules/zenoh/zenoh_config.cpp @@ -95,8 +95,13 @@ Zenoh_Config::~Zenoh_Config() } } -int Zenoh_Config::AddPubSub(char *topic, char *datatype, int instance_no, const char *filename) +int Zenoh_Config::AddPubSub(char *topic, char *datatype, int instance_no, const char *filename, + const char *options_str) { +#ifndef CONFIG_ZENOH_PUB_OPTION_OVERRIDE + (void)options_str; +#endif + { char f_topic[TOPIC_INFO_SIZE]; char f_type[TOPIC_INFO_SIZE]; @@ -120,7 +125,18 @@ int Zenoh_Config::AddPubSub(char *topic, char *datatype, int instance_no, const FILE *fp = fopen(filename, "a"); if (fp) { +#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE + + if (options_str && options_str[0] != '\0') { + fprintf(fp, "%s;%s;%d;%s\n", topic, datatype, instance_no, options_str); + + } else { + fprintf(fp, "%s;%s;%d\n", topic, datatype, instance_no); + } + +#else fprintf(fp, "%s;%s;%d\n", topic, datatype, instance_no); +#endif } else { return -1; @@ -292,12 +308,60 @@ int Zenoh_Config::cli(int argc, char *argv[]) if (strcmp(argv[2], "publisher") == 0) { int instance = 0; +#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE + const char *options_str = nullptr; + if (argc == 6) { + int parsed_instance; + + if (sscanf(argv[5], "%d", &parsed_instance) == 1 && parsed_instance >= 0) { + instance = parsed_instance; + + } else { + options_str = argv[5]; + } + + } else if (argc == 7) { if (sscanf(argv[5], "%d", &instance) != 1 || instance < 0) { - printf("Invalid instance %s (must be an integer, 0 for the default instance or a specific instance's index)\n", - argv[5]); + printf("Invalid instance %s (must be a non-negative integer)\n", argv[5]); return 0; } + + options_str = argv[6]; + + } else if (argc > 7) { + printf("Too many arguments\n"); + return 0; + } + + if (options_str && !parsePublisherOptions(options_str, nullptr)) { + return 0; + } + + if (AddPubSub(argv[3], argv[4], instance, ZENOH_PUB_CONFIG_PATH, options_str) > 0) { + if (options_str) { + printf("Added %s %s to publishers (instance %d, options: %s)\n", argv[3], argv[4], instance, + options_str); + + } else { + printf("Added %s %s to publishers (instance %d)\n", argv[3], argv[4], instance); + } + + } else { + printf("Could not add uORB %s:%d -> %s to publishers\n", argv[3], instance, argv[4]); + } + +#else + + if (argc == 6) { + if (sscanf(argv[5], "%d", &instance) != 1 || instance < 0) { + printf("Invalid instance %s (must be a non-negative integer)\n", argv[5]); + return 0; + } + + } else if (argc > 6) { + printf("Too many arguments\n"); + return 0; } if (AddPubSub(argv[3], argv[4], instance, ZENOH_PUB_CONFIG_PATH) > 0) { @@ -307,6 +371,8 @@ int Zenoh_Config::cli(int argc, char *argv[]) printf("Could not add uORB %s:%d -> %s to publishers\n", argv[3], instance, argv[4]); } +#endif + } else if (strcmp(argv[2], "subscriber") == 0) { int instance = 0; @@ -443,9 +509,128 @@ int Zenoh_Config::closePubSubMapping() } -// Very rudamentary here but we've to wait for a more advanced param system -int Zenoh_Config::getPubSubMapping(char *topic, char *type, int *instance, const char *filename) +#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE +bool Zenoh_Config::parsePublisherOptions(const char *options_str, z_publisher_options_t *opts) { + if (options_str == nullptr || options_str[0] == '\0') { + return true; + } + + char buf[ZENOH_PUB_OPTIONS_SIZE]; + strncpy(buf, options_str, sizeof(buf) - 1); + buf[sizeof(buf) - 1] = '\0'; + + int len = strlen(buf); + + while (len > 0 && (buf[len - 1] == '\n' || buf[len - 1] == '\r' || buf[len - 1] == ' ')) { + buf[--len] = '\0'; + } + + bool valid = true; + char *saveptr_pair = nullptr; + char *pair = strtok_r(buf, ",", &saveptr_pair); + + while (pair != nullptr) { + char *eq = strchr(pair, '='); + + if (eq != nullptr) { + *eq = '\0'; + const char *key = pair; + const char *val = eq + 1; + + if (strcmp(key, "cc") == 0) { + if (strcmp(val, "block") == 0) { + if (opts) { opts->congestion_control = Z_CONGESTION_CONTROL_BLOCK; } + + } else if (strcmp(val, "drop") == 0) { + if (opts) { opts->congestion_control = Z_CONGESTION_CONTROL_DROP; } + + } else { + PX4_WARN("Invalid cc value '%s' (valid: block, drop)", val); + valid = false; + } + + } else if (strcmp(key, "express") == 0) { + if (strcmp(val, "true") == 0) { + if (opts) { opts->is_express = true; } + + } else if (strcmp(val, "false") == 0) { + if (opts) { opts->is_express = false; } + + } else { + PX4_WARN("Invalid express value '%s' (valid: true, false)", val); + valid = false; + } + + } else if (strcmp(key, "prio") == 0) { + if (strcmp(val, "real_time") == 0) { + if (opts) { opts->priority = Z_PRIORITY_REAL_TIME; } + + } else if (strcmp(val, "interactive_high") == 0) { + if (opts) { opts->priority = Z_PRIORITY_INTERACTIVE_HIGH; } + + } else if (strcmp(val, "interactive_low") == 0) { + if (opts) { opts->priority = Z_PRIORITY_INTERACTIVE_LOW; } + + } else if (strcmp(val, "data_high") == 0) { + if (opts) { opts->priority = Z_PRIORITY_DATA_HIGH; } + + } else if (strcmp(val, "data") == 0) { + if (opts) { opts->priority = Z_PRIORITY_DATA; } + + } else if (strcmp(val, "data_low") == 0) { + if (opts) { opts->priority = Z_PRIORITY_DATA_LOW; } + + } else if (strcmp(val, "background") == 0) { + if (opts) { opts->priority = Z_PRIORITY_BACKGROUND; } + + } else { + PX4_WARN("Invalid prio value '%s' (valid: real_time, interactive_high, interactive_low, data_high, data, data_low, background)", + val); + valid = false; + } + +#ifdef Z_FEATURE_UNSTABLE_API + + } else if (strcmp(key, "rel") == 0) { + if (strcmp(val, "reliable") == 0) { + if (opts) { opts->reliability = Z_RELIABILITY_RELIABLE; } + + } else if (strcmp(val, "best_effort") == 0) { + if (opts) { opts->reliability = Z_RELIABILITY_BEST_EFFORT; } + + } else { + PX4_WARN("Invalid rel value '%s' (valid: reliable, best_effort)", val); + valid = false; + } + +#endif + + } else { + PX4_WARN("Unknown publisher option key '%s' (valid: cc, express, prio, rel)", key); + valid = false; + } + + } else { + PX4_WARN("Malformed option '%s' (expected key=value)", pair); + valid = false; + } + + pair = strtok_r(nullptr, ",", &saveptr_pair); + } + + return valid; +} +#endif + + +// Very rudamentary here but we've to wait for a more advanced param system +int Zenoh_Config::getPubSubMapping(char *topic, char *type, int *instance, const char *filename, char *options_str) +{ +#ifndef CONFIG_ZENOH_PUB_OPTION_OVERRIDE + (void)options_str; +#endif + char buffer[MAX_LINE_SIZE]; if (fp_mapping == NULL) { @@ -456,12 +641,17 @@ int Zenoh_Config::getPubSubMapping(char *topic, char *type, int *instance, const while (fgets(buffer, MAX_LINE_SIZE, fp_mapping) != NULL) { if (buffer[0] != '\n') { +#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE + const char *fields[4]; + int nfields = parse_csv_line(buffer, fields, 4); +#else const char *fields[3]; int nfields = parse_csv_line(buffer, fields, 3); +#endif if (nfields >= 2) { - if (nfields == 3) { + if (nfields >= 3) { if (sscanf(fields[2], "%d", instance) != 1) { PX4_WARN("Malformed zenoh config instance %s (instance field should be an integer following the type)\n", fields[2]); return -1; @@ -471,6 +661,20 @@ int Zenoh_Config::getPubSubMapping(char *topic, char *type, int *instance, const *instance = -1; } +#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE + + if (options_str != nullptr) { + if (nfields >= 4 && fields[3] != nullptr) { + strncpy(options_str, fields[3], ZENOH_PUB_OPTIONS_SIZE - 1); + options_str[ZENOH_PUB_OPTIONS_SIZE - 1] = '\0'; + + } else { + options_str[0] = '\0'; + } + } + +#endif + strncpy(type, fields[1], TOPIC_INFO_SIZE); strncpy(topic, fields[0], TOPIC_INFO_SIZE); return 1; @@ -518,14 +722,35 @@ void Zenoh_Config::dump_config() char type[TOPIC_INFO_SIZE]; int instance_no; +#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE + char options_str[ZENOH_PUB_OPTIONS_SIZE]; +#endif + printf("Publisher config:\n"); + +#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE + + while (getPubSubMapping(topic, type, &instance_no, ZENOH_PUB_CONFIG_PATH, options_str) > 0) { + printf("Topic: %s\n", topic); + printf("Type: %s\n", type); + printf("Instance: %d\n", instance_no); + + if (options_str[0] != '\0') { + printf("Options: %s\n", options_str); + } + } + +#else + while (getPubSubMapping(topic, type, &instance_no, ZENOH_PUB_CONFIG_PATH) > 0) { printf("Topic: %s\n", topic); printf("Type: %s\n", type); printf("Instance: %d\n", instance_no); } +#endif + printf("\nSubscriber config:\n"); while (getPubSubMapping(topic, type, &instance_no, ZENOH_SUB_CONFIG_PATH) > 0) { diff --git a/src/modules/zenoh/zenoh_config.hpp b/src/modules/zenoh/zenoh_config.hpp index e92abdb1c7..c818f97a59 100644 --- a/src/modules/zenoh/zenoh_config.hpp +++ b/src/modules/zenoh/zenoh_config.hpp @@ -61,8 +61,13 @@ #define KEYEXPR_MSG_NAME "px4_msgs::msg::dds_::" #define KEYEXPR_MSG_NAME_SIZE sizeof(KEYEXPR_MSG_NAME) #define TOPIC_INFO_SIZE (96) +#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE +#define ZENOH_PUB_OPTIONS_SIZE (64) +#define MAX_LINE_SIZE (2 * TOPIC_INFO_SIZE + ZENOH_PUB_OPTIONS_SIZE + 4) +#else #define MAX_LINE_SIZE (2 * TOPIC_INFO_SIZE) -#define KEYEXPR_SIZE (MAX_LINE_SIZE + KEYEXPR_MSG_NAME_SIZE + KEYEXPR_RIHS01_SIZE + 128) +#endif +#define KEYEXPR_SIZE (2 * TOPIC_INFO_SIZE + KEYEXPR_MSG_NAME_SIZE + KEYEXPR_RIHS01_SIZE + 128) class Zenoh_Config { @@ -81,9 +86,22 @@ public: { return getLineCount(ZENOH_SUB_CONFIG_PATH); } - int getPublisherMapping(char *topic, char *type, int *instance) + int getPublisherMapping(char *topic, char *type, int *instance, z_publisher_options_t *opts = nullptr) { + (void)opts; +#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE + char options_str[ZENOH_PUB_OPTIONS_SIZE]; + int ret = getPubSubMapping(topic, type, instance, ZENOH_PUB_CONFIG_PATH, options_str); + + if (ret > 0 && opts != nullptr) { + parsePublisherOptions(options_str, opts); + } + + return ret; + +#else return getPubSubMapping(topic, type, instance, ZENOH_PUB_CONFIG_PATH); +#endif } // existing_instance will be either 0 (should create a new instance) or nonzero (should reuse the existing 0 instance) int getSubscriberMapping(char *topic, char *type, int *existing_instance) @@ -94,8 +112,12 @@ public: private: - int getPubSubMapping(char *topic, char *type, int *new_instance, const char *filename); - int AddPubSub(char *topic, char *datatype, int new_instance, const char *filename); +#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE + bool parsePublisherOptions(const char *options_str, z_publisher_options_t *opts); +#endif + int getPubSubMapping(char *topic, char *type, int *new_instance, const char *filename, char *options_str = nullptr); + int AddPubSub(char *topic, char *datatype, int new_instance, const char *filename, + const char *options_str = nullptr); int DeletePubSub(char *topic, const char *filename); int SetNetworkConfig(char *mode, char *locator); int getLineCount(const char *filename); diff --git a/src/modules/zenoh/zenoh_params.yaml b/src/modules/zenoh/zenoh_params.yaml index e7473cab6e..b7f0d39677 100644 --- a/src/modules/zenoh/zenoh_params.yaml +++ b/src/modules/zenoh/zenoh_params.yaml @@ -9,3 +9,52 @@ parameters: default: 0 min: 0 max: 232 + ZENOH_PUB_CC: + description: + short: Zenoh publisher congestion control (global default) + type: int32 + default: 0 + min: 0 + max: 1 + values: + 0: Drop + 1: Block + reboot_required: true + ZENOH_PUB_REL: + description: + short: Zenoh publisher reliability (global default) + type: int32 + default: 0 + min: 0 + max: 1 + values: + 0: Reliable + 1: BestEffort + reboot_required: true + ZENOH_PUB_EXPR: + description: + short: Zenoh publisher express mode (global default) + type: int32 + default: 0 + min: 0 + max: 1 + values: + 0: Disabled + 1: Enabled + reboot_required: true + ZENOH_PUB_PRIO: + description: + short: Zenoh publisher priority (global default) + type: int32 + default: 5 + min: 1 + max: 7 + values: + 1: RealTime + 2: InteractiveHigh + 3: InteractiveLow + 4: DataHigh + 5: Data + 6: DataLow + 7: Background + reboot_required: true