Zenoh: Improve out-of-memory behaviour

Abort publication immediately when not enough memory is available.
Zero _zenoh_publishers and _zenoh_subscribers to ensure we're not
dereferencing a wrong pointer
This commit is contained in:
Peter van der Perk
2025-11-17 09:33:26 +01:00
committed by Jacob Dahl
parent 042a251542
commit b8974cb77b
6 changed files with 39 additions and 11 deletions
+5 -2
View File
@@ -52,7 +52,9 @@ if(NOT DEFINED CONFIG_PTHREAD_MUTEX_TYPES AND CONFIG_PLATFORM_NUTTX)
message( SEND_ERROR "Pthread mutex is diabled, Zenoh will not function." ) message( SEND_ERROR "Pthread mutex is diabled, Zenoh will not function." )
endif() endif()
set(Z_TRANSPORT_LEASE 60000 CACHE STRING "Link lease duration in milliseconds to announce to other zenoh nodes") # Match ROS2 RMW_ZENOH # Match ROS2 rmw_zenoh settings
set(Z_TRANSPORT_LEASE 60000 CACHE STRING "Link lease duration in milliseconds to announce to other zenoh nodes")
set(Z_TRANSPORT_LEASE_EXPIRE_FACTOR 2 CACHE STRING "Link lease duration in milliseconds to announce to other zenoh nodes")
set(FRAG_MAX_SIZE 512 CACHE STRING "Use this to override the maximum size for fragmented messages") set(FRAG_MAX_SIZE 512 CACHE STRING "Use this to override the maximum size for fragmented messages")
set(BATCH_UNICAST_SIZE 256 CACHE STRING "Use this to override the maximum unicast batch size") set(BATCH_UNICAST_SIZE 256 CACHE STRING "Use this to override the maximum unicast batch size")
@@ -72,7 +74,8 @@ target_compile_options(zenohpico_static PUBLIC -Wno-cast-align
-Wno-type-limits -Wno-type-limits
-Wno-unused-variable -Wno-unused-variable
-Wno-maybe-uninitialized -Wno-maybe-uninitialized
-Wno-conversion) -Wno-conversion
-Wno-float-equal)
target_compile_options(zenohpico_static PRIVATE -Wno-missing-prototypes) target_compile_options(zenohpico_static PRIVATE -Wno-missing-prototypes)
if(CONFIG_PLATFORM_NUTTX) if(CONFIG_PLATFORM_NUTTX)
@@ -66,7 +66,7 @@ public:
~uORB_Zenoh_Publisher() override = default; ~uORB_Zenoh_Publisher() override = default;
// Update the uORB Subscription and broadcast a Zenoh ROS2 message // Update the uORB Subscription and broadcast a Zenoh ROS2 message
virtual int8_t update() override virtual z_result_t update() override
{ {
#ifdef CONFIG_ZENOH_PUB_ON_MATCHING #ifdef CONFIG_ZENOH_PUB_ON_MATCHING
z_matching_status_t status; z_matching_status_t status;
@@ -78,8 +78,10 @@ int Zenoh_Publisher::declare_publisher(z_owned_session_t s, const char *keyexpr,
return 0; return 0;
} }
int8_t Zenoh_Publisher::publish(const uint8_t *buf, int size) z_result_t Zenoh_Publisher::publish(const uint8_t *buf, int size)
{ {
z_result_t ret;
z_publisher_put_options_t options; z_publisher_put_options_t options;
z_publisher_put_options_default(&options); z_publisher_put_options_default(&options);
@@ -87,12 +89,21 @@ int8_t Zenoh_Publisher::publish(const uint8_t *buf, int size)
_attachment.time = hrt_absolute_time(); _attachment.time = hrt_absolute_time();
z_owned_bytes_t z_attachment; z_owned_bytes_t z_attachment;
z_bytes_from_static_buf(&z_attachment, (const uint8_t *)&_attachment, RMW_ATTACHEMENT_SIZE); ret = z_bytes_from_static_buf(&z_attachment, (const uint8_t *)&_attachment, RMW_ATTACHEMENT_SIZE);
if (ret != Z_OK) {
return ret;
}
options.attachment = z_move(z_attachment); options.attachment = z_move(z_attachment);
z_owned_bytes_t payload; z_owned_bytes_t payload;
z_bytes_copy_from_buf(&payload, buf, size); ret = z_bytes_copy_from_buf(&payload, buf, size);
if (ret != Z_OK) {
return ret;
}
return z_publisher_put(z_loan(_pub), z_move(payload), &options); return z_publisher_put(z_loan(_pub), z_move(payload), &options);
} }
@@ -60,7 +60,7 @@ public:
virtual int undeclare_publisher(); virtual int undeclare_publisher();
virtual int8_t update() = 0; virtual z_result_t update() = 0;
virtual void print(); virtual void print();
+15 -2
View File
@@ -270,6 +270,7 @@ int ZENOH::setupTopics(px4_pollfd_struct_t *pfds)
#ifdef Z_SUBSCRIBE #ifdef Z_SUBSCRIBE
_zenoh_subscribers = (Zenoh_Subscriber **)malloc(sizeof(Zenoh_Subscriber *)*_sub_count); _zenoh_subscribers = (Zenoh_Subscriber **)malloc(sizeof(Zenoh_Subscriber *)*_sub_count);
memset(_zenoh_subscribers, 0x0, sizeof(Zenoh_Subscriber *)*_sub_count);
if (_zenoh_subscribers) { if (_zenoh_subscribers) {
char topic[TOPIC_INFO_SIZE]; char topic[TOPIC_INFO_SIZE];
@@ -305,6 +306,7 @@ int ZENOH::setupTopics(px4_pollfd_struct_t *pfds)
#endif #endif
} else { } else {
_zenoh_subscribers[i] = NULL;
PX4_ERR("Could not create a subscriber for type %s", type); PX4_ERR("Could not create a subscriber for type %s", type);
} }
@@ -325,6 +327,7 @@ int ZENOH::setupTopics(px4_pollfd_struct_t *pfds)
#ifdef Z_PUBLISH #ifdef Z_PUBLISH
_zenoh_publishers = (uORB_Zenoh_Publisher **)malloc(_pub_count * sizeof(uORB_Zenoh_Publisher *)); _zenoh_publishers = (uORB_Zenoh_Publisher **)malloc(_pub_count * sizeof(uORB_Zenoh_Publisher *));
memset(_zenoh_publishers, 0x0, _pub_count * sizeof(uORB_Zenoh_Publisher *));
if (_zenoh_publishers) { if (_zenoh_publishers) {
char topic[TOPIC_INFO_SIZE]; char topic[TOPIC_INFO_SIZE];
@@ -361,6 +364,7 @@ int ZENOH::setupTopics(px4_pollfd_struct_t *pfds)
#endif #endif
} else { } else {
_zenoh_publishers[i] = NULL;
PX4_ERR("Could not create a publisher for type %s", type); PX4_ERR("Could not create a publisher for type %s", type);
} }
@@ -384,7 +388,7 @@ int ZENOH::setupTopics(px4_pollfd_struct_t *pfds)
void ZENOH::run() void ZENOH::run()
{ {
int8_t ret; z_result_t ret;
int i; int i;
_pub_count = _config.getPubCount(); _pub_count = _config.getPubCount();
_sub_count = _config.getSubCount(); _sub_count = _config.getSubCount();
@@ -395,6 +399,8 @@ void ZENOH::run()
return; return;
} }
connected = true;
PX4_INFO("Starting reading/writing tasks..."); PX4_INFO("Starting reading/writing tasks...");
if (setupTopics(pfds) < 0) { if (setupTopics(pfds) < 0) {
@@ -451,6 +457,8 @@ void ZENOH::run()
zp_stop_lease_task(z_session_loan_mut(&_s)); zp_stop_lease_task(z_session_loan_mut(&_s));
z_drop(z_session_move(&_s)); z_drop(z_session_move(&_s));
connected = false;
exit_and_cleanup(); exit_and_cleanup();
} }
@@ -496,7 +504,12 @@ Zenoh demo bridge
int ZENOH::print_status() int ZENOH::print_status()
{ {
PX4_INFO("running"); if (connected) {
PX4_INFO("Connected");
} else {
PX4_INFO("Connecting");
}
PX4_INFO("Publishers"); PX4_INFO("Publishers");
+3 -2
View File
@@ -97,11 +97,12 @@ private:
Zenoh_Config _config; Zenoh_Config _config;
int _pub_count; int _pub_count;
uORB_Zenoh_Publisher **_zenoh_publishers; uORB_Zenoh_Publisher **_zenoh_publishers = nullptr;
int _sub_count; int _sub_count;
Zenoh_Subscriber **_zenoh_subscribers; Zenoh_Subscriber **_zenoh_subscribers = nullptr;
z_owned_session_t _s; z_owned_session_t _s;
bool connected = false;
px4_guid_t _px4_guid{}; px4_guid_t _px4_guid{};