diff --git a/src/database.c b/src/database.c index 89f386e1..28b7aa82 100644 --- a/src/database.c +++ b/src/database.c @@ -1201,6 +1201,12 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid) } +void db__retain_expiry_check() +{ + retain__expiry_check(&db.retains); +} + + void db__expire_all_messages(struct mosquitto *context) { struct mosquitto__client_msg *client_msg, *tmp; diff --git a/src/mosquitto.c b/src/mosquitto.c index 210e72ef..4a3789bc 100644 --- a/src/mosquitto.c +++ b/src/mosquitto.c @@ -427,6 +427,7 @@ int main(int argc, char *argv[]) plugin_persist__handle_restore(); session_expiry__check(); + db__retain_expiry_check(); db__msg_store_compact(); /* After loading persisted clients and ACLs, try to associate them, diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 64d17dfa..5078236b 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -732,6 +732,7 @@ int db__message_write_queued_in(struct mosquitto *context); void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg); void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg); uint64_t db__new_msg_id(void); +void db__retain_expiry_check(void); void db__expire_all_messages(struct mosquitto *context); void db__check_acl_of_all_messages(struct mosquitto *context); @@ -889,7 +890,7 @@ int retain__init(void); void retain__clean(struct mosquitto__retainhier **retainhier); int retain__queue(struct mosquitto *context, const struct mosquitto_subscription *sub); int retain__store(const char *topic, struct mosquitto__base_msg *base_msg, char **split_topics, bool persist); - +void retain__expiry_check(struct mosquitto__retainhier **retainhier); /* ============================================================ * Security related functions * ============================================================ */ diff --git a/src/plugin_public.c b/src/plugin_public.c index 3d9da0db..9531f074 100644 --- a/src/plugin_public.c +++ b/src/plugin_public.c @@ -751,7 +751,6 @@ BROKER_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_base_msg *msg_ { struct mosquitto context; struct mosquitto__base_msg *base_msg; - int i; int rc; memset(&context, 0, sizeof(context)); diff --git a/src/retain.c b/src/retain.c index 66bb37d6..e3718f20 100644 --- a/src/retain.c +++ b/src/retain.c @@ -192,6 +192,19 @@ int retain__store(const char *topic, struct mosquitto__base_msg *base_msg, char return MOSQ_ERR_SUCCESS; } +static bool retain__delete_expired_msg(struct mosquitto__retainhier *branch) +{ + if(branch->retained && branch->retained->data.expiry_time > 0 && db.now_real_s >= branch->retained->data.expiry_time){ + plugin_persist__handle_retain_msg_delete(branch->retained); + db__msg_store_ref_dec(&branch->retained); + branch->retained = NULL; +#ifdef WITH_SYS_TREE + db.retained_count--; +#endif + return true; + } + return false; +} static int retain__process(struct mosquitto__retainhier *branch, struct mosquitto *context, const struct mosquitto_subscription *sub) { @@ -200,13 +213,7 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt uint16_t mid; struct mosquitto__base_msg *retained; - if(branch->retained->data.expiry_time > 0 && db.now_real_s >= branch->retained->data.expiry_time){ - plugin_persist__handle_retain_msg_delete(branch->retained); - db__msg_store_ref_dec(&branch->retained); - branch->retained = NULL; -#ifdef WITH_SYS_TREE - db.retained_count--; -#endif + if(retain__delete_expired_msg(branch)){ return MOSQ_ERR_SUCCESS; } @@ -344,6 +351,17 @@ int retain__queue(struct mosquitto *context, const struct mosquitto_subscription return MOSQ_ERR_SUCCESS; } +void retain__expiry_check(struct mosquitto__retainhier **retainhier) +{ + struct mosquitto__retainhier *peer, *retainhier_tmp; + + HASH_ITER(hh, *retainhier, peer, retainhier_tmp){ + retain__expiry_check(&peer->children); + if (retain__delete_expired_msg(peer)){ + retain__clean_empty_hierarchy(peer); + } + } +} void retain__clean(struct mosquitto__retainhier **retainhier) { @@ -360,3 +378,4 @@ void retain__clean(struct mosquitto__retainhier **retainhier) } } + diff --git a/test/broker/15-persist-client-drop-expired-messages.py b/test/broker/15-persist-client-drop-expired-messages.py index 84b0d671..7e5231e5 100755 --- a/test/broker/15-persist-client-drop-expired-messages.py +++ b/test/broker/15-persist-client-drop-expired-messages.py @@ -27,9 +27,10 @@ def do_test( additional_config_entries: dict, resubscribe: bool, num_messages_two_subscribers: int = 0, + num_retain_messages : int = 0, ): print( - f"{test_case_name}, resubscribe = {resubscribe}, two_subscribers = {'True' if num_messages_two_subscribers > 0 else 'False'}" + f"{test_case_name}, resubscribe = {resubscribe}, two_subscribers = {'True' if num_messages_two_subscribers > 0 else 'False'}, num_retain_messages = {num_retain_messages} " ) conf_file = os.path.basename(__file__).replace(".py", f"_{port}.conf") @@ -71,6 +72,7 @@ def do_test( 0, num_messages - num_messages_two_subscribers, message_expiry=60, + retain_end=num_retain_messages, ) if num_messages_two_subscribers > 0: @@ -90,6 +92,7 @@ def do_test( num_messages - num_messages_two_subscribers, num_messages, message_expiry=60, + retain_end=num_retain_messages, ) publisher_sock.close() @@ -105,6 +108,7 @@ def do_test( client_msg_counts=msg_counts, publisher_id=publisher_id, num_published_msgs=num_messages, + retain_end = num_retain_messages, message_expiry=60, ) @@ -143,6 +147,7 @@ def do_test( client_msg_counts=msg_counts, publisher_id=publisher_id, num_published_msgs=num_messages, + retain_end = 0, ) rc = broker_terminate_rc @@ -182,3 +187,9 @@ do_test( resubscribe=False, num_messages_two_subscribers=30, ) +do_test( + "memory queue", + additional_config_entries=memory_queue_config, + resubscribe=False, + num_retain_messages=40, +) diff --git a/test/broker/15-persist-client-expired-session.py b/test/broker/15-persist-client-expired-session.py index a28a6574..c8826e09 100755 --- a/test/broker/15-persist-client-expired-session.py +++ b/test/broker/15-persist-client-expired-session.py @@ -26,9 +26,10 @@ def do_test( additional_config_entries: dict, resubscribe: bool, num_messages_two_subscribers: int = 0, + num_retain_messages : int = 0, ): print( - f"{test_case_name}, resubscribe = {resubscribe}, two_subscribers = {'True' if num_messages_two_subscribers > 0 else 'False'}" + f"{test_case_name}, resubscribe = {resubscribe}, two_subscribers = {'True' if num_messages_two_subscribers > 0 else 'False'}, num_retain_messages = {num_retain_messages} " ) conf_file = os.path.basename(__file__).replace(".py", f"_{port}.conf") @@ -69,6 +70,7 @@ def do_test( topic, 0, num_messages - num_messages_two_subscribers, + retain_end=num_retain_messages, ) if num_messages_two_subscribers > 0: @@ -87,6 +89,7 @@ def do_test( topic, num_messages - num_messages_two_subscribers, num_messages, + retain_end=num_retain_messages, ) publisher_sock.close() @@ -102,6 +105,7 @@ def do_test( client_msg_counts=msg_counts, publisher_id=publisher_id, num_published_msgs=num_messages, + retain_end = num_retain_messages, ) # Put session expiry_time into the past @@ -136,6 +140,7 @@ def do_test( client_msg_counts=msg_counts, publisher_id=publisher_id, num_published_msgs=num_messages, + retain_end=num_retain_messages, ) if num_messages_two_subscribers > 0: @@ -175,6 +180,7 @@ def do_test( client_msg_counts=msg_counts, publisher_id=publisher_id, num_published_msgs=num_messages, + retain_end=num_retain_messages, ) rc = broker_terminate_rc @@ -221,3 +227,27 @@ do_test( resubscribe=True, num_messages_two_subscribers=20, ) +do_test( + "memory queue", + additional_config_entries=memory_queue_config, + resubscribe=False, + num_retain_messages=30, +) +# The following test case is open for discussion as adapting +# the check routines will be hard and some observations +# are unclear right now +# do_test( +# "memory queue", +# additional_config_entries=memory_queue_config, +# resubscribe=False, +# num_messages_two_subscribers=40, +# num_retain_messages=30, +# ) +# do_test( +# "memory queue", +# additional_config_entries=memory_queue_config, +# resubscribe=False, +# num_messages_two_subscribers=50, +# num_retain_messages=60, +# ) + diff --git a/test/broker/persist_module_helper.py b/test/broker/persist_module_helper.py index 15b9594a..e6a83e36 100644 --- a/test/broker/persist_module_helper.py +++ b/test/broker/persist_module_helper.py @@ -47,6 +47,7 @@ def publish_messages( topic: str, start: int, end: int, + retain_end=0, message_expiry: int = 0, qos: int = 1, ): @@ -65,6 +66,7 @@ def publish_messages( mid=mid, qos=qos, payload=payload.encode("UTF-8"), + retain = True if i < retain_end else False, proto_ver=proto_ver, properties=props, ) @@ -80,10 +82,11 @@ def check_db( client_msg_counts: dict[str, int], publisher_id: str, num_published_msgs: int, + retain_end: int, message_expiry: int = 0, qos: int = 1, ): - count_list = [v for v in client_msg_counts.values() if v] + [0] + count_list = [v for v in client_msg_counts.values() if v is not None] + [0] num_base_msgs = max(count_list) num_subscriptions = sum(1 for c in client_msg_counts.values() if c is not None) num_client_msgs_out = sum(count_list) @@ -91,7 +94,8 @@ def check_db( port, clients=len(client_msg_counts), client_msgs_out=num_client_msgs_out, - base_msgs=num_base_msgs, + base_msgs=num_base_msgs if num_base_msgs > 0 or retain_end == 0 else 1, + retain_msgs=1 if retain_end > 0 else 0, subscriptions=num_subscriptions, ) @@ -131,7 +135,7 @@ def check_db( mid, port, qos, - retain=0, + retain=1 if i < retain_end else 0, idx=i, ) # Check client msg diff --git a/test/broker/persist_sqlite.py b/test/broker/persist_sqlite.py index c42ea2ec..e7888761 100755 --- a/test/broker/persist_sqlite.py +++ b/test/broker/persist_sqlite.py @@ -360,10 +360,6 @@ def check_client_msg( raise ValueError( "Invalid state %d / %d for message %s" % (row[8], state, msg_id) ) - except ValueError as err: - raise ValueError( - str(err) + f" in client message client_id = {client_id} cmsg_id = {idx}" - ) from err finally: con.close() diff --git a/test/unit/broker/subs_stubs.c b/test/unit/broker/subs_stubs.c index a3280cc5..913e84a6 100644 --- a/test/unit/broker/subs_stubs.c +++ b/test/unit/broker/subs_stubs.c @@ -115,6 +115,11 @@ int retain__init(void) return MOSQ_ERR_SUCCESS; } +void retain__expiry_check(struct mosquitto__retainhier **retainhier) +{ + UNUSED(retainhier); +} + void retain__clean(struct mosquitto__retainhier **retainhier) { UNUSED(retainhier);