Update Zenoh for library and implement rmw_zenoh features

New zenoh-pico library
Keyexpr instrospection
RIHS01 Types
This commit is contained in:
Peter van der Perk
2025-08-14 13:36:28 +02:00
committed by Beat Küng
parent e1a7fbce71
commit 9d02698987
7 changed files with 139 additions and 56 deletions
+14 -8
View File
@@ -43,27 +43,33 @@ set(BUILD_TESTING OFF)
set(CHECK_THREADS NO)
set(MESSAGE_QUIET ON)
set(ZENOH_DEBUG ${CONFIG_ZENOH_DEBUG})
set(PICO_STATIC ON)
set(Z_FEATURE_TCP_NODELAY 0)
set(Z_FEATURE_MATCHING 0)
#set(Z_FEATURE_BATCHING 0)
#set(Z_FEATURE_INTEREST 0)
px4_add_git_submodule(TARGET git_zenoh-pico PATH "zenoh-pico")
add_subdirectory(zenoh-pico)
unset(MESSAGE_QUIET)
add_dependencies(zenohpico git_zenoh-pico px4_platform)
target_compile_options(zenohpico PUBLIC -Wno-cast-align
add_dependencies(zenohpico_static git_zenoh-pico px4_platform)
target_compile_options(zenohpico_static PUBLIC -Wno-cast-align
-Wno-narrowing
-Wno-stringop-overflow
-Wno-stringop-truncation
-Wno-unused-result
-Wno-type-limits
-DZ_BATCH_SIZE_RX=512
-DZ_BATCH_SIZE_TX=512
-DZ_FRAG_MAX_SIZE=1024)
-DZ_BATCH_SIZE_TX=512)
target_compile_options(zenohpico PRIVATE -Wno-missing-prototypes)
target_compile_options(zenohpico_static PRIVATE -Wno-missing-prototypes)
if(CONFIG_PLATFORM_NUTTX)
target_compile_options(zenohpico PRIVATE -DUNIX_NO_MULTICAST_IF)
target_compile_options(zenohpico_static PRIVATE -DUNIX_NO_MULTICAST_IF)
endif()
if(CONFIG_ZENOH_SERIAL)
target_compile_options(zenohpico PRIVATE -DZ_LINK_SERIAL)
target_compile_options(zenohpico_static PRIVATE -DZ_LINK_SERIAL)
endif()
@@ -81,7 +87,7 @@ px4_add_module(
cdr
uorb_msgs
px4_work_queue
zenohpico
zenohpico_static
zenoh_topics
git_zenoh-pico
INCLUDES
@@ -44,7 +44,6 @@
Zenoh_Publisher::Zenoh_Publisher()
{
this->_topic[0] = 0x0;
}
Zenoh_Publisher::~Zenoh_Publisher()
@@ -60,20 +59,19 @@ int Zenoh_Publisher::undeclare_publisher()
int Zenoh_Publisher::declare_publisher(z_owned_session_t s, const char *keyexpr)
{
strncpy(this->_topic, keyexpr, sizeof(this->_topic));
z_view_keyexpr_t ke;
z_view_keyexpr_from_str(&ke, this->_topic);
if (z_declare_publisher(&_pub, z_loan(s), z_loan(ke), NULL) < 0) {
if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) {
printf("%s is not a valid key expression\n", keyexpr);
return -1;
}
if (z_declare_publisher(z_loan(s), &_pub, z_loan(ke), NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}
if (!z_publisher_check(&_pub)) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}
z_timestamp_new(&ts, z_loan(s));
return 0;
}
@@ -82,14 +80,39 @@ int8_t Zenoh_Publisher::publish(const uint8_t *buf, int size)
{
z_publisher_put_options_t options;
z_publisher_put_options_default(&options);
options.encoding = NULL;
z_owned_bytes_t attachment;
z_bytes_empty(&attachment);
ze_owned_serializer_t serializer;
ze_serializer_empty(&serializer);
ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number++);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp");
ze_serializer_serialize_int64(z_loan_mut(serializer), hrt_absolute_time());
px4_guid_t px4_guid;
board_get_px4_guid(px4_guid);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid");
ze_serializer_serialize_buf(z_loan_mut(serializer), px4_guid, 16);
ze_serializer_finish(z_move(serializer), &attachment);
options.attachment = z_move(attachment);
// Add timestamp
options.timestamp = &ts;
z_owned_bytes_t payload;
z_bytes_serialize_from_slice(&payload, buf, size);
z_bytes_copy_from_buf(&payload, buf, size);
return z_publisher_put(z_loan(_pub), z_move(payload), &options);
}
void Zenoh_Publisher::print()
{
printf("Topic: %s\n", this->_topic);
z_view_string_t keystr;
z_keyexpr_as_view_string(z_publisher_keyexpr(z_loan(_pub)), &keystr);
printf("Topic: %s\n", z_string_data(z_loan(keystr)));
}
@@ -66,5 +66,6 @@ protected:
int8_t publish(const uint8_t *, int size);
z_owned_publisher_t _pub;
char _topic[60]; // The Topic name is somewhere is the Zenoh stack as well but no good api to fetch it.
int64_t sequence_number;
z_timestamp_t ts;
};
@@ -41,7 +41,7 @@
#include "zenoh_subscriber.hpp"
static void data_handler_cb(const z_loaned_sample_t *sample, void *arg)
static void data_handler_cb(z_loaned_sample_t *sample, void *arg)
{
static_cast<Zenoh_Subscriber *>(arg)->data_handler(sample);
}
@@ -50,15 +50,13 @@ void Zenoh_Subscriber::data_handler(const z_loaned_sample_t *sample)
{
z_view_string_t keystr;
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr);
z_owned_slice_t value;
z_bytes_deserialize_into_slice(z_sample_payload(sample), &value);
printf(">> [Subscriber] Received ('%s' size '%d')\n", z_string_data(z_loan(keystr)), (int)z_slice_len(z_loan(value)));
printf(">> [Subscriber] Received ('%s' size '%d')\n", z_string_data(z_loan(keystr)),
(int)z_bytes_len(z_sample_payload(sample)));
}
Zenoh_Subscriber::Zenoh_Subscriber()
{
this->_topic[0] = 0x0;
}
Zenoh_Subscriber::~Zenoh_Subscriber()
@@ -75,32 +73,29 @@ int Zenoh_Subscriber::undeclare_subscriber()
int Zenoh_Subscriber::declare_subscriber(z_owned_session_t s, const char *keyexpr)
{
z_owned_closure_sample_t callback;
z_closure_sample(&callback, data_handler_cb, NULL, this);
strncpy(this->_topic, keyexpr, sizeof(this->_topic));
z_closure(&callback, data_handler_cb, NULL, this);
z_view_keyexpr_t ke;
z_view_keyexpr_from_str(&ke, this->_topic);
z_view_keyexpr_from_str(&ke, keyexpr);
if (z_declare_subscriber(&_sub, z_loan(s), z_loan(ke), z_closure_sample_move(&callback), NULL) < 0) {
if (z_declare_subscriber(z_loan(s), &_sub, z_loan(ke), z_closure_sample_move(&callback), NULL) < 0) {
printf("Unable to declare subscriber.\n");
exit(-1);
}
if (!z_subscriber_check(&_sub)) {
printf("Unable to declare subscriber for key expression!\n %s\n", keyexpr);
return -1;
}
return 0;
}
void Zenoh_Subscriber::print()
{
printf("Topic: %s\n", this->_topic);
z_view_string_t keystr;
z_keyexpr_as_view_string(z_subscriber_keyexpr(z_loan(_sub)), &keystr);
printf("Topic: %s\n", z_string_data(z_loan(keystr)));
}
void Zenoh_Subscriber::print(const char *type_string, const char *topic_string)
{
printf("Topic: %s -> %s %s \n", this->_topic, type_string, topic_string);
z_view_string_t keystr;
z_keyexpr_as_view_string(z_subscriber_keyexpr(z_loan(_sub)), &keystr);
printf("Topic: %s -> %s %s \n", z_string_data(z_loan(keystr)), type_string, topic_string);
}
@@ -70,5 +70,4 @@ protected:
virtual void print(const char *type_string, const char *topic_string);
z_owned_subscriber_t _sub;
char _topic[60]; // The Topic name is somewhere is the Zenoh stack as well but no good api to fetch it.
};
+69 -14
View File
@@ -52,12 +52,37 @@
// Auto-generated header to all uORB <-> CDR conversions
#include <uorb_pubsub_factory.hpp>
// FIXME make this a parameter
const uint8_t ros_domain_id = 0;
#define Z_PUBLISH
#define Z_SUBSCRIBE
extern "C" __EXPORT int zenoh_main(int argc, char *argv[]);
void toCamelCase(char *input)
{
int capitalizeNext = 1; // Capitalize the first letter
int j = 0;
for (int i = 0; input[i] != '\0'; i++) {
if (input[i] == '_') {
capitalizeNext = 1; // Next letter should be capitalized
} else {
if (capitalizeNext && isalpha(input[i])) {
input[j++] = toupper(input[i]);
capitalizeNext = 0;
} else {
input[j++] = input[i];
}
}
}
input[j] = '\0'; // Null-terminate the input string
}
ZENOH::ZENOH():
ModuleParams(nullptr)
{
@@ -93,26 +118,19 @@ void ZENOH::run()
PX4_INFO("Opening session...");
z_owned_session_t s;
ret = z_open(&s, z_move(config));
ret = z_open(&s, z_move(config), NULL);
if (ret < 0) {
if (ret < 0) {
PX4_ERR("Unable to open session, ret: %d", ret);
return;
}
PX4_INFO("Checking session...");
if (!z_session_check(&s)) {
PX4_ERR("Unable to check session!");
return;
}
PX4_INFO("Starting reading/writing tasks...");
// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) {
PX4_ERR("Unable to start read and lease tasks");
z_close(z_move(s));
z_drop(z_move(s));
return;
}
@@ -122,16 +140,34 @@ void ZENOH::run()
{
char topic[TOPIC_INFO_SIZE];
char type[TOPIC_INFO_SIZE];
char keyexpr[KEYEXPR_SIZE];
for (i = 0; i < _sub_count; i++) {
z_config.getSubscriberMapping(topic, type);
_zenoh_subscribers[i] = genSubscriber(type);
if (_zenoh_subscribers[i] != 0) {
_zenoh_subscribers[i]->declare_subscriber(s, topic);
const uint8_t *rihs_hash = getRIHS01_Hash(type);
toCamelCase(type); // Convert uORB type to camel case
snprintf(keyexpr, KEYEXPR_SIZE, "%i/%s/"
KEYEXPR_MSG_NAME "%s_/RIHS01_"
"%02x%02x%02x%02x%02x%02x%02x%02x"
"%02x%02x%02x%02x%02x%02x%02x%02x"
"%02x%02x%02x%02x%02x%02x%02x%02x"
"%02x%02x%02x%02x%02x%02x%02x%02x",
ros_domain_id, topic, type,
rihs_hash[0], rihs_hash[1], rihs_hash[2], rihs_hash[3],
rihs_hash[4], rihs_hash[5], rihs_hash[6], rihs_hash[7],
rihs_hash[8], rihs_hash[9], rihs_hash[10], rihs_hash[11],
rihs_hash[12], rihs_hash[13], rihs_hash[14], rihs_hash[15],
rihs_hash[16], rihs_hash[17], rihs_hash[18], rihs_hash[19],
rihs_hash[20], rihs_hash[21], rihs_hash[22], rihs_hash[23],
rihs_hash[24], rihs_hash[25], rihs_hash[26], rihs_hash[27],
rihs_hash[28], rihs_hash[29], rihs_hash[30], rihs_hash[31]
);
_zenoh_subscribers[i]->declare_subscriber(s, keyexpr);
}
}
if (z_config.getSubscriberMapping(topic, type) < 0) {
@@ -149,13 +185,32 @@ void ZENOH::run()
{
char topic[TOPIC_INFO_SIZE];
char type[TOPIC_INFO_SIZE];
char keyexpr[KEYEXPR_SIZE];
for (i = 0; i < _pub_count; i++) {
z_config.getPublisherMapping(topic, type);
_zenoh_publishers[i] = genPublisher(type);
if (_zenoh_publishers[i] != 0) {
_zenoh_publishers[i]->declare_publisher(s, topic);
const uint8_t *rihs_hash = getRIHS01_Hash(type);
toCamelCase(type); // Convert uORB type to camel case
snprintf(keyexpr, KEYEXPR_SIZE, "%i/%s/"
KEYEXPR_MSG_NAME "%s_/RIHS01_"
"%02x%02x%02x%02x%02x%02x%02x%02x"
"%02x%02x%02x%02x%02x%02x%02x%02x"
"%02x%02x%02x%02x%02x%02x%02x%02x"
"%02x%02x%02x%02x%02x%02x%02x%02x",
ros_domain_id, topic, type,
rihs_hash[0], rihs_hash[1], rihs_hash[2], rihs_hash[3],
rihs_hash[4], rihs_hash[5], rihs_hash[6], rihs_hash[7],
rihs_hash[8], rihs_hash[9], rihs_hash[10], rihs_hash[11],
rihs_hash[12], rihs_hash[13], rihs_hash[14], rihs_hash[15],
rihs_hash[16], rihs_hash[17], rihs_hash[18], rihs_hash[19],
rihs_hash[20], rihs_hash[21], rihs_hash[22], rihs_hash[23],
rihs_hash[24], rihs_hash[25], rihs_hash[26], rihs_hash[27],
rihs_hash[28], rihs_hash[29], rihs_hash[30], rihs_hash[31]
);
_zenoh_publishers[i]->declare_publisher(s, keyexpr);
_zenoh_publishers[i]->setPollFD(&pfds[i]);
}
}
@@ -211,7 +266,7 @@ void ZENOH::run()
zp_stop_read_task(z_session_loan_mut(&s));
zp_stop_lease_task(z_session_loan_mut(&s));
z_close(z_session_move(&s));
z_drop(z_session_move(&s));
exit_and_cleanup();
}
+7 -3
View File
@@ -56,9 +56,13 @@
#define NET_MODE_SIZE sizeof("client")
#define NET_LOCATOR_SIZE 64
#define NET_CONFIG_LINE_SIZE NET_MODE_SIZE + NET_LOCATOR_SIZE
#define TOPIC_INFO_SIZE 64
#define MAX_LINE_SIZE 2*TOPIC_INFO_SIZE
#define NET_CONFIG_LINE_SIZE (NET_MODE_SIZE + NET_LOCATOR_SIZE)
#define KEYEXPR_RIHS01_SIZE sizeof("RIHS01_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
#define KEYEXPR_MSG_NAME "px4_msgs::msg::dds_::"
#define KEYEXPR_MSG_NAME_SIZE sizeof(KEYEXPR_MSG_NAME)
#define TOPIC_INFO_SIZE (64)
#define MAX_LINE_SIZE (2 * TOPIC_INFO_SIZE)
#define KEYEXPR_SIZE (MAX_LINE_SIZE + KEYEXPR_MSG_NAME_SIZE + KEYEXPR_RIHS01_SIZE)
class Zenoh_Config
{