uxrce_dds: support multi-instance uORB topics (#26305)

uxrce_dds: improve DDS to uORB multi-instance routing and docs

* Route single DDS topic to multiple uORB instances via message field
* Document route_field → instance mapping
* Allow subscriptions_demux without route_field
* Rename subscriptions_demux → subscriptions_multi
* Update docs (ROS 2 wording, version tip, minor fixes)
This commit is contained in:
Marco Hauswirth
2026-03-03 15:57:24 +01:00
committed by GitHub
parent 43aa8de22b
commit 7f5de5d141
5 changed files with 104 additions and 37 deletions
+10 -8
View File
@@ -833,9 +833,7 @@ def generate_dds_yaml_doc(allMessageFiles, output_file = 'dds_topics.md'):
for message in data["subscriptions"]:
all_message_types.add(message['type'].split("::")[-1])
all_topics.add(message['topic'].split('/')[-1])
if data["subscriptions_multi"]: # There is none now
dds_markdown += "None\n"
for message in data["subscriptions_multi"]:
for message in (data.get("subscriptions_multi") or []):
all_message_types.add(message['type'].split("::")[-1])
all_topics.add(message['topic'].split('/')[-1])
for message in allMessageFiles:
@@ -874,13 +872,17 @@ Topic | Type| Rate Limit
dds_markdown += "\n## Subscriptions Multi\n\n"
if not data["subscriptions_multi"]: # There is none now
subscriptions_multi = data.get("subscriptions_multi") or []
if not subscriptions_multi:
dds_markdown += "None\n"
else:
print("Warning - we now have subscription_multi data - check format")
dds_markdown += "Topic | Type\n--- | ---\n"
for message in data["subscriptions_multi"]:
dds_markdown += f"{message['topic']} | {message['type']}\n"
dds_markdown += "Topic | Type | Route Field | Max Instances\n--- | --- | --- | ---\n"
for message in subscriptions_multi:
type = message['type']
px4Type = type.split("::")[-1]
route_field = f"`{message['route_field']}`" if 'route_field' in message else "-"
max_instances = message.get('max_instances', '-')
dds_markdown += f"{message['topic']} | [{type}](../msg_docs/{px4Type}.md) | {route_field} | {max_instances}\n"
if messagesNotExported:
# Print the topics that are not exported to DDS
+17 -6
View File
@@ -522,10 +522,10 @@ subscriptions:
subscriptions_multi:
- topic: /fmu/in/vehicle_optical_flow_vel
type: px4_msgs::msg::VehicleOpticalFlowVel
...
- topic: /fmu/in/aux_global_position
type: px4_msgs::msg::AuxGlobalPosition
route_field: id # OPTIONAL: field used to demux into instances
max_instances: 4 # Required when route_field is set
```
@@ -563,10 +563,21 @@ Add a topic to the `subscriptions_multi` section to:
For example, if `vehicle_odometry` has already `2` instances, it creates a ROS 2 subscriber of `/fmu/in/vehicle_odometry` and a uORB publisher on instance `3` of `vehicle_odometry`.
- This ensures that no other internal PX4 module will publish on the same instance used by uXRCE-DDS.
The subscribers will be able to subscribe to the desired instance and distinguish between publishers.
- Note, however, that this guarantees separation between PX4 and ROS2 publishers, not among multiple ROS2 publishers.
In that scenario, their messages will still be routed to the same instance.
- Without `route_field`, this guarantees separation between PX4 and ROS 2 publishers, but not among multiple ROS 2 publishers. In that scenario, their messages will still be routed to the same instance.
- This is the desired behavior, for example, when you want PX4 to log the readings of two equal sensors; they will both publish on the same topic, but one will use instance 0 and the other will use instance 1.
<Badge type="tip" text="main (planned for: PX4 v1.18)" /> Optionally, add `route_field` and `max_instances` to demultiplex a single ROS 2 topic into multiple uORB instances based on a message field value:
- Each unique value of `route_field` is dynamically assigned to a separate uORB instance on first arrival, up to `max_instances`.
For example, a single `/fmu/in/aux_global_position` ROS 2 topic can be demultiplexed to up to 4 separate uORB instances of `aux_global_position`, with each unique `id` value mapped to its own instance.
- This allows multiple ROS 2 publishers to share a single DDS topic while PX4 subscribers can distinguish between them by subscribing to different uORB instances.
- `route_field` must be a field present in the message definition. `max_instances` is required when `route_field` is set and limits how many distinct sources can be demultiplexed simultaneously.
::: warning
The `subscriptions_multi` feature with `route_field` is currently only implemented in the uXRCE-DDS client.
The Zenoh bridge module does not yet support demux routing — topics listed under `subscriptions_multi` in `dds_topics.yaml` will be ignored by the Zenoh bridge.
:::
You can arbitrarily change the configuration.
For example, you could use different default namespaces or use a custom package to store the message definitions.
+54 -2
View File
@@ -163,7 +163,20 @@ struct RcvTopicsPubs {
@[ end for]@
@[ for sub in subscriptions_multi]@
@[ if sub.get('route_field')]@
uORB::PublicationMulti<@(sub['simple_base_type'])_s> @(sub['topic_simple'])_pubs[@(sub['max_instances'])] {
@[ for idx in range(sub['max_instances'])]@
{ORB_ID(@(sub['topic_simple']))}@('' if idx == sub['max_instances']-1 else ',')
@[ end for]@
};
// Maps route_field values (arbitrary, not bounded to [0, max_instances)) to uORB instance indices
struct {
uint32_t assigned_ids[@(sub['max_instances'])] {};
uint8_t num_assigned {0};
} @(sub['topic_simple'])_demux;
@[ else]@
uORB::PublicationMulti<@(sub['simple_base_type'])_s> @(sub['topic_simple'])_pub{ORB_ID(@(sub['topic_simple']))};
@[ end if]@
@[ end for]@
uint32_t num_payload_received{};
@@ -179,7 +192,7 @@ static void on_topic_update(uxrSession *session, uxrObjectId object_id, uint16_t
pubs->num_payload_received += length;
switch (object_id.id) {
@[ for idx, sub in enumerate(subscriptions + subscriptions_multi)]@
@[ for idx, sub in enumerate(subscriptions)]@
case @(idx)+ (65535U / 32U) + 1: {
@(sub['simple_base_type'])_s data;
@@ -190,6 +203,38 @@ static void on_topic_update(uxrSession *session, uxrObjectId object_id, uint16_t
}
break;
@[ end for]@
@[ for idx, sub in enumerate(subscriptions_multi)]@
case @(idx + len(subscriptions))+ (65535U / 32U) + 1: {
@(sub['simple_base_type'])_s data;
if (ucdr_deserialize_@(sub['simple_base_type'])(*ub, data, time_offset_us)) {
//print_message(ORB_ID(@(sub['simple_base_type'])), data);
@[ if sub.get('route_field')]@
int instance = -1;
for (uint8_t i = 0; i < pubs->@(sub['topic_simple'])_demux.num_assigned; i++) {
if (pubs->@(sub['topic_simple'])_demux.assigned_ids[i] == data.@(sub['route_field'])) {
instance = i;
break;
}
}
if (instance < 0 && pubs->@(sub['topic_simple'])_demux.num_assigned < @(sub['max_instances'])) {
instance = pubs->@(sub['topic_simple'])_demux.num_assigned++;
pubs->@(sub['topic_simple'])_demux.assigned_ids[instance] = data.@(sub['route_field']);
}
if (instance >= 0) {
pubs->@(sub['topic_simple'])_pubs[instance].publish(data);
}
@[ else]@
pubs->@(sub['topic_simple'])_pub.publish(data);
@[ end if]@
}
}
break;
@[ end for]@
default:
@@ -200,13 +245,20 @@ static void on_topic_update(uxrSession *session, uxrObjectId object_id, uint16_t
bool RcvTopicsPubs::init(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrStreamId reliable_in_stream_id, uxrStreamId best_effort_in_stream_id, uxrObjectId participant_id, const char *client_namespace)
{
@[ for idx, sub in enumerate(subscriptions + subscriptions_multi)]@
@[ for idx, sub in enumerate(subscriptions)]@
{
uint16_t queue_depth = orb_get_queue_size(ORB_ID(@(sub['simple_base_type']))) * 2; // use a bit larger queue size than internal
uint32_t message_version = get_message_version<@(sub['simple_base_type'])_s>();
create_data_reader(session, reliable_out_stream_id, best_effort_in_stream_id, participant_id, @(idx), client_namespace, "@(sub['topic'])", message_version, "@(sub['dds_type'])", queue_depth);
}
@[ end for]@
@[ for idx, sub in enumerate(subscriptions_multi)]@
{
uint16_t queue_depth = orb_get_queue_size(ORB_ID(@(sub['topic_simple']))) * @(sub.get('max_instances', 2)); // scale queue for multiple sources
uint32_t message_version = get_message_version<@(sub['simple_base_type'])_s>();
create_data_reader(session, reliable_out_stream_id, best_effort_in_stream_id, participant_id, @(idx + len(subscriptions)), client_namespace, "@(sub['topic'])", message_version, "@(sub['dds_type'])", queue_depth);
}
@[ end for]@
uxr_set_topic_callback(session, on_topic_update, this);
+9 -6
View File
@@ -133,8 +133,8 @@ subscriptions:
- topic: /fmu/in/config_control_setpoints
type: px4_msgs::msg::VehicleControlMode
- topic: /fmu/in/distance_sensor
type: px4_msgs::msg::DistanceSensor
# - topic: /fmu/in/distance_sensor
# type: px4_msgs::msg::DistanceSensor
- topic: /fmu/in/manual_control_input
type: px4_msgs::msg::ManualControlSetpoint
@@ -190,9 +190,6 @@ subscriptions:
- topic: /fmu/in/actuator_servos
type: px4_msgs::msg::ActuatorServos
- topic: /fmu/in/aux_global_position
type: px4_msgs::msg::AuxGlobalPosition
- topic: /fmu/in/fixed_wing_longitudinal_setpoint
type: px4_msgs::msg::FixedWingLongitudinalSetpoint
@@ -226,5 +223,11 @@ subscriptions:
- topic: /fmu/in/landing_gear
type: px4_msgs::msg::LandingGear
# Create uORB::PublicationMulti
# Subscriptions multiplexed to uORB multi-instances (optionally demultiplexed based on a message field)
subscriptions_multi:
- topic: /fmu/in/aux_global_position
type: px4_msgs::msg::AuxGlobalPosition
route_field: id
max_instances: 4
- topic: /fmu/in/distance_sensor
type: px4_msgs::msg::DistanceSensor
@@ -130,12 +130,11 @@ if subs_not_empty:
merged_em_globals['subscriptions'] = msg_map['subscriptions'] if subs_not_empty else []
subs_multi_not_empty = msg_map['subscriptions_multi'] is not None
if subs_multi_not_empty:
for sm in msg_map['subscriptions_multi']:
process_message_type(sm)
subs_multi = msg_map.get('subscriptions_multi') or []
for sd in subs_multi:
process_message_type(sd)
merged_em_globals['subscriptions_multi'] = msg_map['subscriptions_multi'] if subs_multi_not_empty else []
merged_em_globals['subscriptions_multi'] = subs_multi
merged_em_globals['type_includes'] = sorted(set(all_type_includes))