diff --git a/src/modules/zenoh/CMakeLists.txt b/src/modules/zenoh/CMakeLists.txt index e3a8166dff..ce2dce2e50 100644 --- a/src/modules/zenoh/CMakeLists.txt +++ b/src/modules/zenoh/CMakeLists.txt @@ -43,27 +43,33 @@ set(BUILD_TESTING OFF) set(CHECK_THREADS NO) set(MESSAGE_QUIET ON) set(ZENOH_DEBUG ${CONFIG_ZENOH_DEBUG}) +set(PICO_STATIC ON) +set(Z_FEATURE_TCP_NODELAY 0) +set(Z_FEATURE_MATCHING 0) +#set(Z_FEATURE_BATCHING 0) +#set(Z_FEATURE_INTEREST 0) px4_add_git_submodule(TARGET git_zenoh-pico PATH "zenoh-pico") add_subdirectory(zenoh-pico) unset(MESSAGE_QUIET) -add_dependencies(zenohpico git_zenoh-pico px4_platform) -target_compile_options(zenohpico PUBLIC -Wno-cast-align +add_dependencies(zenohpico_static git_zenoh-pico px4_platform) +target_compile_options(zenohpico_static PUBLIC -Wno-cast-align -Wno-narrowing -Wno-stringop-overflow -Wno-stringop-truncation -Wno-unused-result + -Wno-type-limits -DZ_BATCH_SIZE_RX=512 - -DZ_BATCH_SIZE_TX=512 - -DZ_FRAG_MAX_SIZE=1024) + -DZ_BATCH_SIZE_TX=512) -target_compile_options(zenohpico PRIVATE -Wno-missing-prototypes) + +target_compile_options(zenohpico_static PRIVATE -Wno-missing-prototypes) if(CONFIG_PLATFORM_NUTTX) - target_compile_options(zenohpico PRIVATE -DUNIX_NO_MULTICAST_IF) + target_compile_options(zenohpico_static PRIVATE -DUNIX_NO_MULTICAST_IF) endif() if(CONFIG_ZENOH_SERIAL) - target_compile_options(zenohpico PRIVATE -DZ_LINK_SERIAL) + target_compile_options(zenohpico_static PRIVATE -DZ_LINK_SERIAL) endif() @@ -81,7 +87,7 @@ px4_add_module( cdr uorb_msgs px4_work_queue - zenohpico + zenohpico_static zenoh_topics git_zenoh-pico INCLUDES diff --git a/src/modules/zenoh/publishers/zenoh_publisher.cpp b/src/modules/zenoh/publishers/zenoh_publisher.cpp index 8ce688d2b7..9ca45f573d 100644 --- a/src/modules/zenoh/publishers/zenoh_publisher.cpp +++ b/src/modules/zenoh/publishers/zenoh_publisher.cpp @@ -44,7 +44,6 @@ Zenoh_Publisher::Zenoh_Publisher() { - this->_topic[0] = 0x0; } Zenoh_Publisher::~Zenoh_Publisher() @@ -60,20 +59,19 @@ int Zenoh_Publisher::undeclare_publisher() int Zenoh_Publisher::declare_publisher(z_owned_session_t s, const char *keyexpr) { - strncpy(this->_topic, keyexpr, sizeof(this->_topic)); - z_view_keyexpr_t ke; - z_view_keyexpr_from_str(&ke, this->_topic); - if (z_declare_publisher(&_pub, z_loan(s), z_loan(ke), NULL) < 0) { + if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) { + printf("%s is not a valid key expression\n", keyexpr); + return -1; + } + + if (z_declare_publisher(z_loan(s), &_pub, z_loan(ke), NULL) < 0) { printf("Unable to declare publisher for key expression!\n"); return -1; } - if (!z_publisher_check(&_pub)) { - printf("Unable to declare publisher for key expression!\n"); - return -1; - } + z_timestamp_new(&ts, z_loan(s)); return 0; } @@ -82,14 +80,39 @@ int8_t Zenoh_Publisher::publish(const uint8_t *buf, int size) { z_publisher_put_options_t options; z_publisher_put_options_default(&options); - options.encoding = NULL; + + z_owned_bytes_t attachment; + z_bytes_empty(&attachment); + + ze_owned_serializer_t serializer; + ze_serializer_empty(&serializer); + + ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number"); + ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number++); + + ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp"); + ze_serializer_serialize_int64(z_loan_mut(serializer), hrt_absolute_time()); + + px4_guid_t px4_guid; + board_get_px4_guid(px4_guid); + + ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid"); + ze_serializer_serialize_buf(z_loan_mut(serializer), px4_guid, 16); + + ze_serializer_finish(z_move(serializer), &attachment); + options.attachment = z_move(attachment); + + // Add timestamp + options.timestamp = &ts; z_owned_bytes_t payload; - z_bytes_serialize_from_slice(&payload, buf, size); + z_bytes_copy_from_buf(&payload, buf, size); return z_publisher_put(z_loan(_pub), z_move(payload), &options); } void Zenoh_Publisher::print() { - printf("Topic: %s\n", this->_topic); + z_view_string_t keystr; + z_keyexpr_as_view_string(z_publisher_keyexpr(z_loan(_pub)), &keystr); + printf("Topic: %s\n", z_string_data(z_loan(keystr))); } diff --git a/src/modules/zenoh/publishers/zenoh_publisher.hpp b/src/modules/zenoh/publishers/zenoh_publisher.hpp index a12849c786..920a623e09 100644 --- a/src/modules/zenoh/publishers/zenoh_publisher.hpp +++ b/src/modules/zenoh/publishers/zenoh_publisher.hpp @@ -66,5 +66,6 @@ protected: int8_t publish(const uint8_t *, int size); z_owned_publisher_t _pub; - char _topic[60]; // The Topic name is somewhere is the Zenoh stack as well but no good api to fetch it. + int64_t sequence_number; + z_timestamp_t ts; }; diff --git a/src/modules/zenoh/subscribers/zenoh_subscriber.cpp b/src/modules/zenoh/subscribers/zenoh_subscriber.cpp index 79f37d8981..56001f8421 100644 --- a/src/modules/zenoh/subscribers/zenoh_subscriber.cpp +++ b/src/modules/zenoh/subscribers/zenoh_subscriber.cpp @@ -41,7 +41,7 @@ #include "zenoh_subscriber.hpp" -static void data_handler_cb(const z_loaned_sample_t *sample, void *arg) +static void data_handler_cb(z_loaned_sample_t *sample, void *arg) { static_cast(arg)->data_handler(sample); } @@ -50,15 +50,13 @@ void Zenoh_Subscriber::data_handler(const z_loaned_sample_t *sample) { z_view_string_t keystr; z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); - z_owned_slice_t value; - z_bytes_deserialize_into_slice(z_sample_payload(sample), &value); - printf(">> [Subscriber] Received ('%s' size '%d')\n", z_string_data(z_loan(keystr)), (int)z_slice_len(z_loan(value))); + printf(">> [Subscriber] Received ('%s' size '%d')\n", z_string_data(z_loan(keystr)), + (int)z_bytes_len(z_sample_payload(sample))); } Zenoh_Subscriber::Zenoh_Subscriber() { - this->_topic[0] = 0x0; } Zenoh_Subscriber::~Zenoh_Subscriber() @@ -75,32 +73,29 @@ int Zenoh_Subscriber::undeclare_subscriber() int Zenoh_Subscriber::declare_subscriber(z_owned_session_t s, const char *keyexpr) { z_owned_closure_sample_t callback; - z_closure_sample(&callback, data_handler_cb, NULL, this); - - strncpy(this->_topic, keyexpr, sizeof(this->_topic)); + z_closure(&callback, data_handler_cb, NULL, this); z_view_keyexpr_t ke; - z_view_keyexpr_from_str(&ke, this->_topic); + z_view_keyexpr_from_str(&ke, keyexpr); - if (z_declare_subscriber(&_sub, z_loan(s), z_loan(ke), z_closure_sample_move(&callback), NULL) < 0) { + if (z_declare_subscriber(z_loan(s), &_sub, z_loan(ke), z_closure_sample_move(&callback), NULL) < 0) { printf("Unable to declare subscriber.\n"); exit(-1); } - if (!z_subscriber_check(&_sub)) { - printf("Unable to declare subscriber for key expression!\n %s\n", keyexpr); - return -1; - } - return 0; } void Zenoh_Subscriber::print() { - printf("Topic: %s\n", this->_topic); + z_view_string_t keystr; + z_keyexpr_as_view_string(z_subscriber_keyexpr(z_loan(_sub)), &keystr); + printf("Topic: %s\n", z_string_data(z_loan(keystr))); } void Zenoh_Subscriber::print(const char *type_string, const char *topic_string) { - printf("Topic: %s -> %s %s \n", this->_topic, type_string, topic_string); + z_view_string_t keystr; + z_keyexpr_as_view_string(z_subscriber_keyexpr(z_loan(_sub)), &keystr); + printf("Topic: %s -> %s %s \n", z_string_data(z_loan(keystr)), type_string, topic_string); } diff --git a/src/modules/zenoh/subscribers/zenoh_subscriber.hpp b/src/modules/zenoh/subscribers/zenoh_subscriber.hpp index 1b6d109220..5359fec291 100644 --- a/src/modules/zenoh/subscribers/zenoh_subscriber.hpp +++ b/src/modules/zenoh/subscribers/zenoh_subscriber.hpp @@ -70,5 +70,4 @@ protected: virtual void print(const char *type_string, const char *topic_string); z_owned_subscriber_t _sub; - char _topic[60]; // The Topic name is somewhere is the Zenoh stack as well but no good api to fetch it. }; diff --git a/src/modules/zenoh/zenoh.cpp b/src/modules/zenoh/zenoh.cpp index 6411734a29..38dc9529cb 100644 --- a/src/modules/zenoh/zenoh.cpp +++ b/src/modules/zenoh/zenoh.cpp @@ -52,12 +52,37 @@ // Auto-generated header to all uORB <-> CDR conversions #include +// FIXME make this a parameter +const uint8_t ros_domain_id = 0; #define Z_PUBLISH #define Z_SUBSCRIBE extern "C" __EXPORT int zenoh_main(int argc, char *argv[]); +void toCamelCase(char *input) +{ + int capitalizeNext = 1; // Capitalize the first letter + int j = 0; + + for (int i = 0; input[i] != '\0'; i++) { + if (input[i] == '_') { + capitalizeNext = 1; // Next letter should be capitalized + + } else { + if (capitalizeNext && isalpha(input[i])) { + input[j++] = toupper(input[i]); + capitalizeNext = 0; + + } else { + input[j++] = input[i]; + } + } + } + + input[j] = '\0'; // Null-terminate the input string +} + ZENOH::ZENOH(): ModuleParams(nullptr) { @@ -93,26 +118,19 @@ void ZENOH::run() PX4_INFO("Opening session..."); z_owned_session_t s; - ret = z_open(&s, z_move(config)); + ret = z_open(&s, z_move(config), NULL); - if (ret < 0) { + if (ret < 0) { PX4_ERR("Unable to open session, ret: %d", ret); return; } - PX4_INFO("Checking session..."); - - if (!z_session_check(&s)) { - PX4_ERR("Unable to check session!"); - return; - } - PX4_INFO("Starting reading/writing tasks..."); // Start read and lease tasks for zenoh-pico if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) { PX4_ERR("Unable to start read and lease tasks"); - z_close(z_move(s)); + z_drop(z_move(s)); return; } @@ -122,16 +140,34 @@ void ZENOH::run() { char topic[TOPIC_INFO_SIZE]; char type[TOPIC_INFO_SIZE]; + char keyexpr[KEYEXPR_SIZE]; for (i = 0; i < _sub_count; i++) { z_config.getSubscriberMapping(topic, type); _zenoh_subscribers[i] = genSubscriber(type); if (_zenoh_subscribers[i] != 0) { - _zenoh_subscribers[i]->declare_subscriber(s, topic); + const uint8_t *rihs_hash = getRIHS01_Hash(type); + toCamelCase(type); // Convert uORB type to camel case + snprintf(keyexpr, KEYEXPR_SIZE, "%i/%s/" + KEYEXPR_MSG_NAME "%s_/RIHS01_" + "%02x%02x%02x%02x%02x%02x%02x%02x" + "%02x%02x%02x%02x%02x%02x%02x%02x" + "%02x%02x%02x%02x%02x%02x%02x%02x" + "%02x%02x%02x%02x%02x%02x%02x%02x", + ros_domain_id, topic, type, + rihs_hash[0], rihs_hash[1], rihs_hash[2], rihs_hash[3], + rihs_hash[4], rihs_hash[5], rihs_hash[6], rihs_hash[7], + rihs_hash[8], rihs_hash[9], rihs_hash[10], rihs_hash[11], + rihs_hash[12], rihs_hash[13], rihs_hash[14], rihs_hash[15], + rihs_hash[16], rihs_hash[17], rihs_hash[18], rihs_hash[19], + rihs_hash[20], rihs_hash[21], rihs_hash[22], rihs_hash[23], + rihs_hash[24], rihs_hash[25], rihs_hash[26], rihs_hash[27], + rihs_hash[28], rihs_hash[29], rihs_hash[30], rihs_hash[31] + ); + _zenoh_subscribers[i]->declare_subscriber(s, keyexpr); } - } if (z_config.getSubscriberMapping(topic, type) < 0) { @@ -149,13 +185,32 @@ void ZENOH::run() { char topic[TOPIC_INFO_SIZE]; char type[TOPIC_INFO_SIZE]; + char keyexpr[KEYEXPR_SIZE]; for (i = 0; i < _pub_count; i++) { z_config.getPublisherMapping(topic, type); _zenoh_publishers[i] = genPublisher(type); if (_zenoh_publishers[i] != 0) { - _zenoh_publishers[i]->declare_publisher(s, topic); + const uint8_t *rihs_hash = getRIHS01_Hash(type); + toCamelCase(type); // Convert uORB type to camel case + snprintf(keyexpr, KEYEXPR_SIZE, "%i/%s/" + KEYEXPR_MSG_NAME "%s_/RIHS01_" + "%02x%02x%02x%02x%02x%02x%02x%02x" + "%02x%02x%02x%02x%02x%02x%02x%02x" + "%02x%02x%02x%02x%02x%02x%02x%02x" + "%02x%02x%02x%02x%02x%02x%02x%02x", + ros_domain_id, topic, type, + rihs_hash[0], rihs_hash[1], rihs_hash[2], rihs_hash[3], + rihs_hash[4], rihs_hash[5], rihs_hash[6], rihs_hash[7], + rihs_hash[8], rihs_hash[9], rihs_hash[10], rihs_hash[11], + rihs_hash[12], rihs_hash[13], rihs_hash[14], rihs_hash[15], + rihs_hash[16], rihs_hash[17], rihs_hash[18], rihs_hash[19], + rihs_hash[20], rihs_hash[21], rihs_hash[22], rihs_hash[23], + rihs_hash[24], rihs_hash[25], rihs_hash[26], rihs_hash[27], + rihs_hash[28], rihs_hash[29], rihs_hash[30], rihs_hash[31] + ); + _zenoh_publishers[i]->declare_publisher(s, keyexpr); _zenoh_publishers[i]->setPollFD(&pfds[i]); } } @@ -211,7 +266,7 @@ void ZENOH::run() zp_stop_read_task(z_session_loan_mut(&s)); zp_stop_lease_task(z_session_loan_mut(&s)); - z_close(z_session_move(&s)); + z_drop(z_session_move(&s)); exit_and_cleanup(); } diff --git a/src/modules/zenoh/zenoh_config.hpp b/src/modules/zenoh/zenoh_config.hpp index 68a1f7121f..d8c1f97fb8 100644 --- a/src/modules/zenoh/zenoh_config.hpp +++ b/src/modules/zenoh/zenoh_config.hpp @@ -56,9 +56,13 @@ #define NET_MODE_SIZE sizeof("client") #define NET_LOCATOR_SIZE 64 -#define NET_CONFIG_LINE_SIZE NET_MODE_SIZE + NET_LOCATOR_SIZE -#define TOPIC_INFO_SIZE 64 -#define MAX_LINE_SIZE 2*TOPIC_INFO_SIZE +#define NET_CONFIG_LINE_SIZE (NET_MODE_SIZE + NET_LOCATOR_SIZE) +#define KEYEXPR_RIHS01_SIZE sizeof("RIHS01_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX") +#define KEYEXPR_MSG_NAME "px4_msgs::msg::dds_::" +#define KEYEXPR_MSG_NAME_SIZE sizeof(KEYEXPR_MSG_NAME) +#define TOPIC_INFO_SIZE (64) +#define MAX_LINE_SIZE (2 * TOPIC_INFO_SIZE) +#define KEYEXPR_SIZE (MAX_LINE_SIZE + KEYEXPR_MSG_NAME_SIZE + KEYEXPR_RIHS01_SIZE) class Zenoh_Config {