mirror of
https://github.com/PX4/PX4-Autopilot.git
synced 2026-05-09 22:08:56 +08:00
feat(zenoh): add support for configuring zenoh publisher options (#27157)
* feat(zenoh): add support for configuring zenoh publisher options Add zenoh parameters for common default options for all publishers. Options exposed are reliability, priority, congestion control and is_express Allow override of common publisher options for specific publisher through its config: config fille supports additional row where multiple options can be specified with csv string Expose config options through zenoh config add publisher. Allow options per publisher to be specified for default config in zenoh/dds_topics.yaml * fix(zenoh): Put individual zenoh publisher config override feature under config option ZENOH_PUB_OPTION_OVERRIDE Enabled ZENOH_PUB_OPTION_OVERRIDE on targets with enough flash memory * fix(zenoh): added publication options for default publications in dds_topics.yaml Rare messages that are important to arive: set as reliable, enabled express and with blocking congesiton control Fast vehcile position messages that can impact control decision making: set as best enabled express and with drop congestion control * docs(zenoh) : added documentation for zenoh publisher options usage * docs(docs): Link to params --------- Co-authored-by: Hamish Willee <hamishwillee@gmail.com>
This commit is contained in:
@@ -15,6 +15,7 @@ CONFIG_MODULES_MAVLINK=y
|
||||
CONFIG_MODULES_NAVIGATOR=y
|
||||
CONFIG_MODULES_SENSORS=y
|
||||
CONFIG_MODULES_ZENOH=y
|
||||
CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y
|
||||
CONFIG_SYSTEMCMDS_I2CDETECT=y
|
||||
CONFIG_SYSTEMCMDS_LED_CONTROL=y
|
||||
CONFIG_SYSTEMCMDS_MFT=y
|
||||
|
||||
@@ -81,6 +81,7 @@ CONFIG_MODULES_SENSORS=y
|
||||
CONFIG_MODULES_UXRCE_DDS_CLIENT=y
|
||||
CONFIG_MODULES_VTOL_ATT_CONTROL=y
|
||||
CONFIG_MODULES_ZENOH=y
|
||||
CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y
|
||||
CONFIG_SYSTEMCMDS_ACTUATOR_TEST=y
|
||||
CONFIG_SYSTEMCMDS_BSONDUMP=y
|
||||
CONFIG_SYSTEMCMDS_DMESG=y
|
||||
|
||||
@@ -77,6 +77,7 @@ CONFIG_MODULES_SENSORS=y
|
||||
CONFIG_MODULES_UXRCE_DDS_CLIENT=y
|
||||
CONFIG_MODULES_VTOL_ATT_CONTROL=y
|
||||
CONFIG_MODULES_ZENOH=y
|
||||
CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y
|
||||
CONFIG_SYSTEMCMDS_ACTUATOR_TEST=y
|
||||
CONFIG_SYSTEMCMDS_BSONDUMP=y
|
||||
CONFIG_SYSTEMCMDS_DMESG=y
|
||||
|
||||
@@ -3,3 +3,4 @@ CONFIG_DRIVERS_UAVCAN=n
|
||||
CONFIG_MODULES_UXRCE_DDS_CLIENT=n
|
||||
CONFIG_BOARD_CONSTRAINED_FLASH=y
|
||||
CONFIG_MODULES_ZENOH=y
|
||||
CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y
|
||||
|
||||
@@ -90,6 +90,7 @@ CONFIG_MODULES_SENSORS=y
|
||||
CONFIG_MODULES_UXRCE_DDS_CLIENT=y
|
||||
CONFIG_MODULES_VTOL_ATT_CONTROL=y
|
||||
CONFIG_MODULES_ZENOH=y
|
||||
CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y
|
||||
CONFIG_SYSTEMCMDS_ACTUATOR_TEST=y
|
||||
CONFIG_SYSTEMCMDS_BSONDUMP=y
|
||||
CONFIG_SYSTEMCMDS_DMESG=y
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
CONFIG_MODULES_UXRCE_DDS_CLIENT=n
|
||||
CONFIG_MODULES_ZENOH=y
|
||||
CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y
|
||||
|
||||
@@ -59,7 +59,7 @@ For more information about key expressions, refer to the [rmw_zenoh design docum
|
||||
|
||||
Before setting up the Zenoh communication, first make sure that your firmware contains the driver that implements the [`zenoh` driver](../modules/modules_driver.md#zenoh), which provides the implementation of the _PX4 Zenoh-Pico Node_.
|
||||
|
||||
You can check if the module is present on your board by searching for the key `CONFIG_MODULES_ZENOH=y` in your board's `default.px4board` KConfig file.
|
||||
You can check if the module is present on your board by searching for the key `CONFIG_MODULES_ZENOH=y` in your board's `default.px4board` [KConfig file](../hardware/porting_guide_config.md).
|
||||
For example, you can see that the module is present in `px4_fmu-v6xrt` build targets from [/boards/px4/fmu-v6xrt/default.px4board](https://github.com/PX4/PX4-Autopilot/blob/main/boards/px4/fmu-v6xrt/default.px4board#L91).
|
||||
|
||||
If `CONFIG_MODULES_ZENOH=y` is not preset you can add this key to your board configuration and rebuild.
|
||||
@@ -89,6 +89,11 @@ inclusion of the message type hash (RIHS01, as defined in REP-2016) in the Zenoh
|
||||
Note that this will break compatibility with ROS 2 Jazzy and later.
|
||||
:::
|
||||
|
||||
::: tip
|
||||
Per-publisher option overrides can be enabled or disabled at compile time using the `CONFIG_ZENOH_PUB_OPTION_OVERRIDE` KConfig key.
|
||||
When this is disabled, PX4 uses only global publisher option parameters, and per-publisher CLI/config overrides are not available.
|
||||
:::
|
||||
|
||||
### Enable Zenoh on PX4 Startup
|
||||
|
||||
Set the [ZENOH_ENABLE](../advanced_config/parameter_reference.md#ZENOH_ENABLE) parameter to `1` to enable Zenoh on PX4 startup.
|
||||
@@ -132,6 +137,21 @@ This folder contains three key files:
|
||||
- **`pub.csv`** – Maps **uORB topics to ROS2 topics** (used for publishing).
|
||||
- **`sub.csv`** – Maps **ROS2 topics to uORB topics** (used for subscribing).
|
||||
|
||||
#### Publisher Options
|
||||
|
||||
Zenoh publisher behaviour can be controlled through global PX4 parameters:
|
||||
|
||||
- [ZENOH_PUB_CC](../advanced_config/parameter_reference.md#ZENOH_PUB_CC): congestion control (`Drop` or `Block`) - controls what happens when the transport path is congested. `Drop` prefers freshness/latency and may drop messages, while `Block` preserves delivery by applying backpressure to the publisher.
|
||||
- [ZENOH_PUB_REL](../advanced_config/parameter_reference.md#ZENOH_PUB_REL): reliability (`Reliable` or `BestEffort`) - selects the Zenoh reliability mode used by the router path and seen by the underlying ROS 2 `rmw_zenoh` transport.
|
||||
- [ZENOH_PUB_EXPR](../advanced_config/parameter_reference.md#ZENOH_PUB_EXPR): express mode (`Disabled` or `Enabled`) - when enabled, Zenoh does not wait to batch this operation with others. This usually reduces latency at the cost of higher bandwidth/overhead.
|
||||
- [ZENOH_PUB_PRIO](../advanced_config/parameter_reference.md#ZENOH_PUB_PRIO): priority (`RealTime`, `InteractiveHigh`, `InteractiveLow`, `DataHigh`, `Data`, `DataLow`, `Background`) - sets relative message priority for routing/scheduling under load.
|
||||
|
||||
These are applied to all Zenoh publishers.
|
||||
|
||||
If `CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y`, individual publishers can override one or more global publisher options.
|
||||
Default configuration [dds_topics.yaml](../middleware/dds_topics.md) already provides overrides for several publishers.
|
||||
Individual publisher options can be overriden through the mapping configuration shown in the next section
|
||||
|
||||
### 4. Modifying Topic Mappings
|
||||
|
||||
Zenoh topic mappings define how data flows between PX4's internal uORB topics and external ROS2 topics via Zenoh.
|
||||
@@ -154,6 +174,25 @@ The main operations and their commands are:
|
||||
zenoh config add publisher <zenoh_topic> <uorb_topic> [uorb_instance]
|
||||
```
|
||||
|
||||
When `CONFIG_ZENOH_PUB_OPTION_OVERRIDE=y`, publisher options can also be passed:
|
||||
|
||||
```sh
|
||||
zenoh config add publisher <zenoh_topic> <uorb_topic> [uorb_instance] [options]
|
||||
```
|
||||
|
||||
Options are comma-separated `key=value` pairs:
|
||||
|
||||
- `cc=drop|block` (congestion control)
|
||||
- `express=true|false` (batching behaviour)
|
||||
- `prio=real_time|interactive_high|interactive_low|data_high|data|data_low|background` (priority)
|
||||
- `rel=reliable|best_effort` (reliability)
|
||||
|
||||
Example:
|
||||
|
||||
```sh
|
||||
zenoh config add publisher /fmu/out/vehicle_status vehicle_status 0 "cc=block,express=true,rel=best_effort"
|
||||
```
|
||||
|
||||
- Subscribe to a Zenoh topic and forward it to a uORB topic:
|
||||
|
||||
```sh
|
||||
|
||||
@@ -102,6 +102,17 @@ def process_message_type(msg_type):
|
||||
# topic_simple: eg vehicle_status
|
||||
msg_type['topic_simple'] = msg_type['topic'].split('/')[-1]
|
||||
|
||||
# Optional per-publisher QoS options from YAML 'options:' field.
|
||||
# Converts e.g. {cc: block, express: true} -> "cc=block,express=true"
|
||||
opts = msg_type.get('options', None)
|
||||
if opts and isinstance(opts, dict):
|
||||
# Normalize booleans to lowercase strings expected by the parser.
|
||||
msg_type['pub_options_str'] = ','.join(
|
||||
f"{k}={str(v).lower() if isinstance(v, bool) else v}" for k, v in opts.items()
|
||||
)
|
||||
else:
|
||||
msg_type['pub_options_str'] = ''
|
||||
|
||||
def process_message_instance(msg_type):
|
||||
if 'instance' in msg_type:
|
||||
# if instance is given, check if it is a non negative integer
|
||||
|
||||
@@ -43,6 +43,15 @@ if MODULES_ZENOH
|
||||
expression (supported by ROS2 Jazzy and later).
|
||||
Set to false to use in ROS2 Humble and earlier.
|
||||
|
||||
config ZENOH_PUB_OPTION_OVERRIDE
|
||||
bool "Enable per-publisher option overrides"
|
||||
default n
|
||||
---help---
|
||||
Enable per-publisher overrides for Zenoh publisher options
|
||||
(cc/express/prio/rel) via config file and CLI.
|
||||
Disable this to reduce flash usage and use only global
|
||||
publisher parameters (ZENOH_PUB_CC/REL/EXPR/PRIO).
|
||||
|
||||
# Choose exactly one item
|
||||
choice ZENOH_PUBSUB_SELECTION
|
||||
prompt "Publishers/Subscribers selection"
|
||||
|
||||
@@ -2,17 +2,42 @@
|
||||
#
|
||||
# This file maps all the topics that are to be used on the Zenoh client.
|
||||
#
|
||||
# Per-publisher options can be overridden via an optional 'options' dict:
|
||||
#
|
||||
# - topic: /fmu/out/example
|
||||
# type: px4_msgs::msg::Example
|
||||
# options:
|
||||
# cc: block # congestion_control: drop | block
|
||||
# express: true # is_express: true | false
|
||||
# prio: real_time # priority: real_time | interactive_high | interactive_low |
|
||||
# # data_high | data | data_low | background
|
||||
# rel: reliable # reliability: reliable | best_effort
|
||||
#
|
||||
# Omitting 'options' (or individual keys) inherits from global PX4 parameters
|
||||
# ZENOH_PUB_CC / ZENOH_PUB_REL / ZENOH_PUB_EXPR / ZENOH_PUB_PRIO.
|
||||
#####
|
||||
publications:
|
||||
|
||||
- topic: /fmu/out/register_ext_component_reply
|
||||
type: px4_msgs::msg::RegisterExtComponentReply
|
||||
options:
|
||||
cc: block
|
||||
rel: reliable
|
||||
express: true
|
||||
|
||||
- topic: /fmu/out/arming_check_request
|
||||
type: px4_msgs::msg::ArmingCheckRequest
|
||||
options:
|
||||
cc: block
|
||||
rel: reliable
|
||||
express: true
|
||||
|
||||
- topic: /fmu/out/mode_completed
|
||||
type: px4_msgs::msg::ModeCompleted
|
||||
options:
|
||||
cc: block
|
||||
rel: reliable
|
||||
express: true
|
||||
|
||||
- topic: /fmu/out/battery_status
|
||||
type: px4_msgs::msg::BatteryStatus
|
||||
@@ -46,24 +71,48 @@ publications:
|
||||
|
||||
- topic: /fmu/out/vehicle_attitude
|
||||
type: px4_msgs::msg::VehicleAttitude
|
||||
options:
|
||||
cc: drop
|
||||
rel: best_effort
|
||||
express: true
|
||||
|
||||
- topic: /fmu/out/vehicle_control_mode
|
||||
type: px4_msgs::msg::VehicleControlMode
|
||||
|
||||
- topic: /fmu/out/vehicle_command_ack
|
||||
type: px4_msgs::msg::VehicleCommandAck
|
||||
options:
|
||||
cc: block
|
||||
rel: reliable
|
||||
express: true
|
||||
|
||||
- topic: /fmu/out/vehicle_global_position
|
||||
type: px4_msgs::msg::VehicleGlobalPosition
|
||||
options:
|
||||
cc: drop
|
||||
rel: best_effort
|
||||
express: true
|
||||
|
||||
- topic: /fmu/out/vehicle_gps_position
|
||||
type: px4_msgs::msg::SensorGps
|
||||
options:
|
||||
cc: drop
|
||||
rel: best_effort
|
||||
express: true
|
||||
|
||||
- topic: /fmu/out/vehicle_local_position
|
||||
type: px4_msgs::msg::VehicleLocalPosition
|
||||
options:
|
||||
cc: drop
|
||||
rel: best_effort
|
||||
express: true
|
||||
|
||||
- topic: /fmu/out/vehicle_odometry
|
||||
type: px4_msgs::msg::VehicleOdometry
|
||||
options:
|
||||
cc: drop
|
||||
rel: best_effort
|
||||
express: true
|
||||
|
||||
- topic: /fmu/out/vehicle_status
|
||||
type: px4_msgs::msg::VehicleStatus
|
||||
|
||||
@@ -18,7 +18,7 @@ import os
|
||||
|
||||
const char* default_pub_config =
|
||||
@[ for pub in publications]@
|
||||
"@(pub['topic']);@(pub['topic_simple']);0\n"
|
||||
"@(pub['topic']);@(pub['topic_simple']);0@[if pub.get('pub_options_str')];@(pub['pub_options_str'])@[end if]\n"
|
||||
@[ end for]@
|
||||
;
|
||||
|
||||
|
||||
@@ -59,7 +59,8 @@ int Zenoh_Publisher::undeclare_publisher()
|
||||
return 0;
|
||||
}
|
||||
|
||||
int Zenoh_Publisher::declare_publisher(z_owned_session_t s, const char *keyexpr, uint8_t *gid)
|
||||
int Zenoh_Publisher::declare_publisher(z_owned_session_t s, const char *keyexpr, uint8_t *gid,
|
||||
z_publisher_options_t *opts)
|
||||
{
|
||||
z_view_keyexpr_t ke;
|
||||
|
||||
@@ -68,7 +69,7 @@ int Zenoh_Publisher::declare_publisher(z_owned_session_t s, const char *keyexpr,
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (z_declare_publisher(z_loan(s), &_pub, z_loan(ke), NULL) < 0) {
|
||||
if (z_declare_publisher(z_loan(s), &_pub, z_loan(ke), opts) < 0) {
|
||||
printf("Unable to declare publisher for key expression!\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -56,7 +56,8 @@ public:
|
||||
Zenoh_Publisher();
|
||||
virtual ~Zenoh_Publisher();
|
||||
|
||||
virtual int declare_publisher(z_owned_session_t s, const char *keyexpr, uint8_t *gid);
|
||||
virtual int declare_publisher(z_owned_session_t s, const char *keyexpr, uint8_t *gid,
|
||||
z_publisher_options_t *opts);
|
||||
|
||||
virtual int undeclare_publisher();
|
||||
|
||||
|
||||
@@ -361,15 +361,38 @@ int ZENOH::setupTopics(px4_pollfd_struct_t *pfds)
|
||||
char topic[TOPIC_INFO_SIZE];
|
||||
char type[TOPIC_INFO_SIZE];
|
||||
int instance;
|
||||
z_publisher_options_t global_opts;
|
||||
z_publisher_options_default(&global_opts);
|
||||
global_opts.congestion_control = (z_congestion_control_t)_zenoh_pub_cc.get();
|
||||
global_opts.is_express = (bool)_zenoh_pub_expr.get();
|
||||
global_opts.priority = (z_priority_t)_zenoh_pub_prio.get();
|
||||
#ifdef Z_FEATURE_UNSTABLE_API
|
||||
global_opts.reliability = (z_reliability_t)_zenoh_pub_rel.get();
|
||||
#endif
|
||||
|
||||
for (i = 0; i < _pub_count; i++) {
|
||||
|
||||
#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
z_publisher_options_t pub_opts = global_opts;
|
||||
#endif
|
||||
|
||||
#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
|
||||
if (_config.getPublisherMapping(topic, type, &instance, &pub_opts)) {
|
||||
#else
|
||||
|
||||
if (_config.getPublisherMapping(topic, type, &instance)) {
|
||||
#endif
|
||||
_zenoh_publishers[i] = genPublisher(type, instance);
|
||||
const uint8_t *rihs_hash = getRIHS01_Hash(type);
|
||||
|
||||
if (rihs_hash && _zenoh_publishers[i] != 0 &&
|
||||
generate_rmw_zenoh_topic_keyexpr(topic, rihs_hash, type, keyexpr) > 0) {
|
||||
_zenoh_publishers[i]->declare_publisher(_s, keyexpr, (uint8_t *)&_px4_guid);
|
||||
#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
_zenoh_publishers[i]->declare_publisher(_s, keyexpr, (uint8_t *)&_px4_guid, &pub_opts);
|
||||
#else
|
||||
_zenoh_publishers[i]->declare_publisher(_s, keyexpr, (uint8_t *)&_px4_guid, &global_opts);
|
||||
#endif
|
||||
_zenoh_publishers[i]->setPollFD(&pfds[i]);
|
||||
#ifdef CONFIG_ZENOH_RMW_LIVELINESS
|
||||
|
||||
@@ -536,7 +559,15 @@ Zenoh demo bridge
|
||||
PRINT_MODULE_USAGE_COMMAND("stop");
|
||||
PRINT_MODULE_USAGE_COMMAND("status");
|
||||
PRINT_MODULE_USAGE_COMMAND("config");
|
||||
|
||||
#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
PX4_INFO_RAW(" add publisher <zenoh_topic> <uorb_topic> [uorb_instance] [options] Publish uORB topic to Zenoh\n");
|
||||
PX4_INFO_RAW(" [options] key=value pairs: cc=drop|block, express=true|false,\n");
|
||||
PX4_INFO_RAW(" prio=real_time|interactive_high|interactive_low|data_high|data|data_low|background,\n");
|
||||
PX4_INFO_RAW(" rel=reliable|best_effort (e.g. \"cc=block,express=true\")\n");
|
||||
#else
|
||||
PX4_INFO_RAW(" add publisher <zenoh_topic> <uorb_topic> <optional uorb_instance> Publish uORB topic to Zenoh\n");
|
||||
#endif
|
||||
PX4_INFO_RAW(" add subscriber <zenoh_topic> <uorb_topic> <optional uorb_instance> Publish Zenoh topic to uORB\n");
|
||||
PX4_INFO_RAW(" delete publisher <zenoh_topic>\n");
|
||||
PX4_INFO_RAW(" delete subscriber <zenoh_topic>\n");
|
||||
|
||||
@@ -88,7 +88,11 @@ public:
|
||||
|
||||
private:
|
||||
DEFINE_PARAMETERS(
|
||||
(ParamInt<px4::params::ZENOH_DOMAIN_ID>) _zenoh_domain_id
|
||||
(ParamInt<px4::params::ZENOH_DOMAIN_ID>) _zenoh_domain_id,
|
||||
(ParamInt<px4::params::ZENOH_PUB_CC>) _zenoh_pub_cc,
|
||||
(ParamInt<px4::params::ZENOH_PUB_REL>) _zenoh_pub_rel,
|
||||
(ParamInt<px4::params::ZENOH_PUB_EXPR>) _zenoh_pub_expr,
|
||||
(ParamInt<px4::params::ZENOH_PUB_PRIO>) _zenoh_pub_prio
|
||||
)
|
||||
|
||||
int generate_rmw_zenoh_node_liveliness_keyexpr(const z_id_t *id, char *keyexpr);
|
||||
|
||||
@@ -95,8 +95,13 @@ Zenoh_Config::~Zenoh_Config()
|
||||
}
|
||||
}
|
||||
|
||||
int Zenoh_Config::AddPubSub(char *topic, char *datatype, int instance_no, const char *filename)
|
||||
int Zenoh_Config::AddPubSub(char *topic, char *datatype, int instance_no, const char *filename,
|
||||
const char *options_str)
|
||||
{
|
||||
#ifndef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
(void)options_str;
|
||||
#endif
|
||||
|
||||
{
|
||||
char f_topic[TOPIC_INFO_SIZE];
|
||||
char f_type[TOPIC_INFO_SIZE];
|
||||
@@ -120,7 +125,18 @@ int Zenoh_Config::AddPubSub(char *topic, char *datatype, int instance_no, const
|
||||
FILE *fp = fopen(filename, "a");
|
||||
|
||||
if (fp) {
|
||||
#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
|
||||
if (options_str && options_str[0] != '\0') {
|
||||
fprintf(fp, "%s;%s;%d;%s\n", topic, datatype, instance_no, options_str);
|
||||
|
||||
} else {
|
||||
fprintf(fp, "%s;%s;%d\n", topic, datatype, instance_no);
|
||||
}
|
||||
|
||||
#else
|
||||
fprintf(fp, "%s;%s;%d\n", topic, datatype, instance_no);
|
||||
#endif
|
||||
|
||||
} else {
|
||||
return -1;
|
||||
@@ -292,12 +308,60 @@ int Zenoh_Config::cli(int argc, char *argv[])
|
||||
if (strcmp(argv[2], "publisher") == 0) {
|
||||
int instance = 0;
|
||||
|
||||
#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
const char *options_str = nullptr;
|
||||
|
||||
if (argc == 6) {
|
||||
int parsed_instance;
|
||||
|
||||
if (sscanf(argv[5], "%d", &parsed_instance) == 1 && parsed_instance >= 0) {
|
||||
instance = parsed_instance;
|
||||
|
||||
} else {
|
||||
options_str = argv[5];
|
||||
}
|
||||
|
||||
} else if (argc == 7) {
|
||||
if (sscanf(argv[5], "%d", &instance) != 1 || instance < 0) {
|
||||
printf("Invalid instance %s (must be an integer, 0 for the default instance or a specific instance's index)\n",
|
||||
argv[5]);
|
||||
printf("Invalid instance %s (must be a non-negative integer)\n", argv[5]);
|
||||
return 0;
|
||||
}
|
||||
|
||||
options_str = argv[6];
|
||||
|
||||
} else if (argc > 7) {
|
||||
printf("Too many arguments\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (options_str && !parsePublisherOptions(options_str, nullptr)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (AddPubSub(argv[3], argv[4], instance, ZENOH_PUB_CONFIG_PATH, options_str) > 0) {
|
||||
if (options_str) {
|
||||
printf("Added %s %s to publishers (instance %d, options: %s)\n", argv[3], argv[4], instance,
|
||||
options_str);
|
||||
|
||||
} else {
|
||||
printf("Added %s %s to publishers (instance %d)\n", argv[3], argv[4], instance);
|
||||
}
|
||||
|
||||
} else {
|
||||
printf("Could not add uORB %s:%d -> %s to publishers\n", argv[3], instance, argv[4]);
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
if (argc == 6) {
|
||||
if (sscanf(argv[5], "%d", &instance) != 1 || instance < 0) {
|
||||
printf("Invalid instance %s (must be a non-negative integer)\n", argv[5]);
|
||||
return 0;
|
||||
}
|
||||
|
||||
} else if (argc > 6) {
|
||||
printf("Too many arguments\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (AddPubSub(argv[3], argv[4], instance, ZENOH_PUB_CONFIG_PATH) > 0) {
|
||||
@@ -307,6 +371,8 @@ int Zenoh_Config::cli(int argc, char *argv[])
|
||||
printf("Could not add uORB %s:%d -> %s to publishers\n", argv[3], instance, argv[4]);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
} else if (strcmp(argv[2], "subscriber") == 0) {
|
||||
int instance = 0;
|
||||
|
||||
@@ -443,9 +509,128 @@ int Zenoh_Config::closePubSubMapping()
|
||||
}
|
||||
|
||||
|
||||
// Very rudamentary here but we've to wait for a more advanced param system
|
||||
int Zenoh_Config::getPubSubMapping(char *topic, char *type, int *instance, const char *filename)
|
||||
#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
bool Zenoh_Config::parsePublisherOptions(const char *options_str, z_publisher_options_t *opts)
|
||||
{
|
||||
if (options_str == nullptr || options_str[0] == '\0') {
|
||||
return true;
|
||||
}
|
||||
|
||||
char buf[ZENOH_PUB_OPTIONS_SIZE];
|
||||
strncpy(buf, options_str, sizeof(buf) - 1);
|
||||
buf[sizeof(buf) - 1] = '\0';
|
||||
|
||||
int len = strlen(buf);
|
||||
|
||||
while (len > 0 && (buf[len - 1] == '\n' || buf[len - 1] == '\r' || buf[len - 1] == ' ')) {
|
||||
buf[--len] = '\0';
|
||||
}
|
||||
|
||||
bool valid = true;
|
||||
char *saveptr_pair = nullptr;
|
||||
char *pair = strtok_r(buf, ",", &saveptr_pair);
|
||||
|
||||
while (pair != nullptr) {
|
||||
char *eq = strchr(pair, '=');
|
||||
|
||||
if (eq != nullptr) {
|
||||
*eq = '\0';
|
||||
const char *key = pair;
|
||||
const char *val = eq + 1;
|
||||
|
||||
if (strcmp(key, "cc") == 0) {
|
||||
if (strcmp(val, "block") == 0) {
|
||||
if (opts) { opts->congestion_control = Z_CONGESTION_CONTROL_BLOCK; }
|
||||
|
||||
} else if (strcmp(val, "drop") == 0) {
|
||||
if (opts) { opts->congestion_control = Z_CONGESTION_CONTROL_DROP; }
|
||||
|
||||
} else {
|
||||
PX4_WARN("Invalid cc value '%s' (valid: block, drop)", val);
|
||||
valid = false;
|
||||
}
|
||||
|
||||
} else if (strcmp(key, "express") == 0) {
|
||||
if (strcmp(val, "true") == 0) {
|
||||
if (opts) { opts->is_express = true; }
|
||||
|
||||
} else if (strcmp(val, "false") == 0) {
|
||||
if (opts) { opts->is_express = false; }
|
||||
|
||||
} else {
|
||||
PX4_WARN("Invalid express value '%s' (valid: true, false)", val);
|
||||
valid = false;
|
||||
}
|
||||
|
||||
} else if (strcmp(key, "prio") == 0) {
|
||||
if (strcmp(val, "real_time") == 0) {
|
||||
if (opts) { opts->priority = Z_PRIORITY_REAL_TIME; }
|
||||
|
||||
} else if (strcmp(val, "interactive_high") == 0) {
|
||||
if (opts) { opts->priority = Z_PRIORITY_INTERACTIVE_HIGH; }
|
||||
|
||||
} else if (strcmp(val, "interactive_low") == 0) {
|
||||
if (opts) { opts->priority = Z_PRIORITY_INTERACTIVE_LOW; }
|
||||
|
||||
} else if (strcmp(val, "data_high") == 0) {
|
||||
if (opts) { opts->priority = Z_PRIORITY_DATA_HIGH; }
|
||||
|
||||
} else if (strcmp(val, "data") == 0) {
|
||||
if (opts) { opts->priority = Z_PRIORITY_DATA; }
|
||||
|
||||
} else if (strcmp(val, "data_low") == 0) {
|
||||
if (opts) { opts->priority = Z_PRIORITY_DATA_LOW; }
|
||||
|
||||
} else if (strcmp(val, "background") == 0) {
|
||||
if (opts) { opts->priority = Z_PRIORITY_BACKGROUND; }
|
||||
|
||||
} else {
|
||||
PX4_WARN("Invalid prio value '%s' (valid: real_time, interactive_high, interactive_low, data_high, data, data_low, background)",
|
||||
val);
|
||||
valid = false;
|
||||
}
|
||||
|
||||
#ifdef Z_FEATURE_UNSTABLE_API
|
||||
|
||||
} else if (strcmp(key, "rel") == 0) {
|
||||
if (strcmp(val, "reliable") == 0) {
|
||||
if (opts) { opts->reliability = Z_RELIABILITY_RELIABLE; }
|
||||
|
||||
} else if (strcmp(val, "best_effort") == 0) {
|
||||
if (opts) { opts->reliability = Z_RELIABILITY_BEST_EFFORT; }
|
||||
|
||||
} else {
|
||||
PX4_WARN("Invalid rel value '%s' (valid: reliable, best_effort)", val);
|
||||
valid = false;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
} else {
|
||||
PX4_WARN("Unknown publisher option key '%s' (valid: cc, express, prio, rel)", key);
|
||||
valid = false;
|
||||
}
|
||||
|
||||
} else {
|
||||
PX4_WARN("Malformed option '%s' (expected key=value)", pair);
|
||||
valid = false;
|
||||
}
|
||||
|
||||
pair = strtok_r(nullptr, ",", &saveptr_pair);
|
||||
}
|
||||
|
||||
return valid;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
// Very rudamentary here but we've to wait for a more advanced param system
|
||||
int Zenoh_Config::getPubSubMapping(char *topic, char *type, int *instance, const char *filename, char *options_str)
|
||||
{
|
||||
#ifndef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
(void)options_str;
|
||||
#endif
|
||||
|
||||
char buffer[MAX_LINE_SIZE];
|
||||
|
||||
if (fp_mapping == NULL) {
|
||||
@@ -456,12 +641,17 @@ int Zenoh_Config::getPubSubMapping(char *topic, char *type, int *instance, const
|
||||
while (fgets(buffer, MAX_LINE_SIZE, fp_mapping) != NULL) {
|
||||
|
||||
if (buffer[0] != '\n') {
|
||||
#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
const char *fields[4];
|
||||
int nfields = parse_csv_line(buffer, fields, 4);
|
||||
#else
|
||||
const char *fields[3];
|
||||
int nfields = parse_csv_line(buffer, fields, 3);
|
||||
#endif
|
||||
|
||||
|
||||
if (nfields >= 2) {
|
||||
if (nfields == 3) {
|
||||
if (nfields >= 3) {
|
||||
if (sscanf(fields[2], "%d", instance) != 1) {
|
||||
PX4_WARN("Malformed zenoh config instance %s (instance field should be an integer following the type)\n", fields[2]);
|
||||
return -1;
|
||||
@@ -471,6 +661,20 @@ int Zenoh_Config::getPubSubMapping(char *topic, char *type, int *instance, const
|
||||
*instance = -1;
|
||||
}
|
||||
|
||||
#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
|
||||
if (options_str != nullptr) {
|
||||
if (nfields >= 4 && fields[3] != nullptr) {
|
||||
strncpy(options_str, fields[3], ZENOH_PUB_OPTIONS_SIZE - 1);
|
||||
options_str[ZENOH_PUB_OPTIONS_SIZE - 1] = '\0';
|
||||
|
||||
} else {
|
||||
options_str[0] = '\0';
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
strncpy(type, fields[1], TOPIC_INFO_SIZE);
|
||||
strncpy(topic, fields[0], TOPIC_INFO_SIZE);
|
||||
return 1;
|
||||
@@ -518,14 +722,35 @@ void Zenoh_Config::dump_config()
|
||||
char type[TOPIC_INFO_SIZE];
|
||||
int instance_no;
|
||||
|
||||
#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
char options_str[ZENOH_PUB_OPTIONS_SIZE];
|
||||
#endif
|
||||
|
||||
printf("Publisher config:\n");
|
||||
|
||||
|
||||
#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
|
||||
while (getPubSubMapping(topic, type, &instance_no, ZENOH_PUB_CONFIG_PATH, options_str) > 0) {
|
||||
printf("Topic: %s\n", topic);
|
||||
printf("Type: %s\n", type);
|
||||
printf("Instance: %d\n", instance_no);
|
||||
|
||||
if (options_str[0] != '\0') {
|
||||
printf("Options: %s\n", options_str);
|
||||
}
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
while (getPubSubMapping(topic, type, &instance_no, ZENOH_PUB_CONFIG_PATH) > 0) {
|
||||
printf("Topic: %s\n", topic);
|
||||
printf("Type: %s\n", type);
|
||||
printf("Instance: %d\n", instance_no);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
printf("\nSubscriber config:\n");
|
||||
|
||||
while (getPubSubMapping(topic, type, &instance_no, ZENOH_SUB_CONFIG_PATH) > 0) {
|
||||
|
||||
@@ -61,8 +61,13 @@
|
||||
#define KEYEXPR_MSG_NAME "px4_msgs::msg::dds_::"
|
||||
#define KEYEXPR_MSG_NAME_SIZE sizeof(KEYEXPR_MSG_NAME)
|
||||
#define TOPIC_INFO_SIZE (96)
|
||||
#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
#define ZENOH_PUB_OPTIONS_SIZE (64)
|
||||
#define MAX_LINE_SIZE (2 * TOPIC_INFO_SIZE + ZENOH_PUB_OPTIONS_SIZE + 4)
|
||||
#else
|
||||
#define MAX_LINE_SIZE (2 * TOPIC_INFO_SIZE)
|
||||
#define KEYEXPR_SIZE (MAX_LINE_SIZE + KEYEXPR_MSG_NAME_SIZE + KEYEXPR_RIHS01_SIZE + 128)
|
||||
#endif
|
||||
#define KEYEXPR_SIZE (2 * TOPIC_INFO_SIZE + KEYEXPR_MSG_NAME_SIZE + KEYEXPR_RIHS01_SIZE + 128)
|
||||
|
||||
class Zenoh_Config
|
||||
{
|
||||
@@ -81,9 +86,22 @@ public:
|
||||
{
|
||||
return getLineCount(ZENOH_SUB_CONFIG_PATH);
|
||||
}
|
||||
int getPublisherMapping(char *topic, char *type, int *instance)
|
||||
int getPublisherMapping(char *topic, char *type, int *instance, z_publisher_options_t *opts = nullptr)
|
||||
{
|
||||
(void)opts;
|
||||
#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
char options_str[ZENOH_PUB_OPTIONS_SIZE];
|
||||
int ret = getPubSubMapping(topic, type, instance, ZENOH_PUB_CONFIG_PATH, options_str);
|
||||
|
||||
if (ret > 0 && opts != nullptr) {
|
||||
parsePublisherOptions(options_str, opts);
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
||||
#else
|
||||
return getPubSubMapping(topic, type, instance, ZENOH_PUB_CONFIG_PATH);
|
||||
#endif
|
||||
}
|
||||
// existing_instance will be either 0 (should create a new instance) or nonzero (should reuse the existing 0 instance)
|
||||
int getSubscriberMapping(char *topic, char *type, int *existing_instance)
|
||||
@@ -94,8 +112,12 @@ public:
|
||||
|
||||
|
||||
private:
|
||||
int getPubSubMapping(char *topic, char *type, int *new_instance, const char *filename);
|
||||
int AddPubSub(char *topic, char *datatype, int new_instance, const char *filename);
|
||||
#ifdef CONFIG_ZENOH_PUB_OPTION_OVERRIDE
|
||||
bool parsePublisherOptions(const char *options_str, z_publisher_options_t *opts);
|
||||
#endif
|
||||
int getPubSubMapping(char *topic, char *type, int *new_instance, const char *filename, char *options_str = nullptr);
|
||||
int AddPubSub(char *topic, char *datatype, int new_instance, const char *filename,
|
||||
const char *options_str = nullptr);
|
||||
int DeletePubSub(char *topic, const char *filename);
|
||||
int SetNetworkConfig(char *mode, char *locator);
|
||||
int getLineCount(const char *filename);
|
||||
|
||||
@@ -9,3 +9,52 @@ parameters:
|
||||
default: 0
|
||||
min: 0
|
||||
max: 232
|
||||
ZENOH_PUB_CC:
|
||||
description:
|
||||
short: Zenoh publisher congestion control (global default)
|
||||
type: int32
|
||||
default: 0
|
||||
min: 0
|
||||
max: 1
|
||||
values:
|
||||
0: Drop
|
||||
1: Block
|
||||
reboot_required: true
|
||||
ZENOH_PUB_REL:
|
||||
description:
|
||||
short: Zenoh publisher reliability (global default)
|
||||
type: int32
|
||||
default: 0
|
||||
min: 0
|
||||
max: 1
|
||||
values:
|
||||
0: Reliable
|
||||
1: BestEffort
|
||||
reboot_required: true
|
||||
ZENOH_PUB_EXPR:
|
||||
description:
|
||||
short: Zenoh publisher express mode (global default)
|
||||
type: int32
|
||||
default: 0
|
||||
min: 0
|
||||
max: 1
|
||||
values:
|
||||
0: Disabled
|
||||
1: Enabled
|
||||
reboot_required: true
|
||||
ZENOH_PUB_PRIO:
|
||||
description:
|
||||
short: Zenoh publisher priority (global default)
|
||||
type: int32
|
||||
default: 5
|
||||
min: 1
|
||||
max: 7
|
||||
values:
|
||||
1: RealTime
|
||||
2: InteractiveHigh
|
||||
3: InteractiveLow
|
||||
4: DataHigh
|
||||
5: Data
|
||||
6: DataLow
|
||||
7: Background
|
||||
reboot_required: true
|
||||
|
||||
Reference in New Issue
Block a user