From 0c0683094b6124c8478482000dd55ae5a7e4c449 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Wed, 4 Jun 2025 10:25:02 +0100 Subject: [PATCH] Allow plugins to choose when their tick event is next called --- ChangeLog.txt | 3 + include/mosquitto/broker.h | 7 +- include/mosquitto/libcommon_time.h | 8 ++ libcommon/time_common.c | 9 ++ plugins/examples/CMakeLists.txt | 1 + plugins/examples/Makefile | 1 + .../delayed-auth/mosquitto_delayed_auth.c | 2 +- plugins/examples/tick-interval/CMakeLists.txt | 3 + plugins/examples/tick-interval/Makefile | 15 +++ .../tick-interval/mosquitto_tick_interval.c | 120 ++++++++++++++++++ plugins/examples/tick-interval/test.conf | 7 + plugins/examples/tick-interval/test.sh | 3 + .../wildcard-temp/mosquitto_wildcard_temp.c | 2 +- plugins/persist-sqlite/persist_sqlite.h | 1 - plugins/persist-sqlite/tick.c | 6 +- src/control.c | 11 +- src/mosquitto_broker_internal.h | 7 +- src/plugin_callbacks.c | 9 +- src/plugin_reload.c | 2 +- src/plugin_tick.c | 20 ++- 20 files changed, 214 insertions(+), 23 deletions(-) create mode 100644 plugins/examples/tick-interval/CMakeLists.txt create mode 100644 plugins/examples/tick-interval/Makefile create mode 100644 plugins/examples/tick-interval/mosquitto_tick_interval.c create mode 100644 plugins/examples/tick-interval/test.conf create mode 100755 plugins/examples/tick-interval/test.sh diff --git a/ChangeLog.txt b/ChangeLog.txt index 770a3003..5873e6b7 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -139,6 +139,9 @@ Plugins / plugin interface: subscribe/unsubscribes actually succeed. Allow modifying topic and qos. - Add `mosquitto_persistence_location()` for plugins to use to find a valid location for storing persistent data. +- Plugins can now use the next_s and next_ms members of the tick event data + struct to set a minimum interval that the broker will wait before calling the + tick callback again. Client library: - Add MOSQ_OPT_DISABLE_SOCKETPAIR to allow the disabling of the socketpair diff --git a/include/mosquitto/broker.h b/include/mosquitto/broker.h index 2fcea1ac..c213260a 100644 --- a/include/mosquitto/broker.h +++ b/include/mosquitto/broker.h @@ -431,9 +431,10 @@ mosq_EXPORT int mosquitto_plugin_set_info( * Called when a client connects with TLS-PSK and the broker needs * the PSK information. * * MOSQ_EVT_TICK - * Called periodically in the event loop. At the moment this - * occurs at a regular frequency, but this should not be relied - * upon. + * Called periodically by the broker. The next_s and next_ms + * values of the event data can be used to set a minimum interval + * that the broker will wait before calling the tick event again + * for this callback. * * MOSQ_EVT_DISCONNECT * Called when a client disconnects from the broker. * * MOSQ_EVT_CONNECT diff --git a/include/mosquitto/libcommon_time.h b/include/mosquitto/libcommon_time.h index 68f1b504..48113ec4 100644 --- a/include/mosquitto/libcommon_time.h +++ b/include/mosquitto/libcommon_time.h @@ -38,6 +38,14 @@ libmosqcommon_EXPORT time_t mosquitto_time(void); */ libmosqcommon_EXPORT void mosquitto_time_ns(time_t *s, long *ns); +/* Function: mosquitto_time_cmp + * + * Returns < 0 if the time t1 is smaller (earlier) than t2 + * Returns > 0 if the time t1 is greater (later) than t2 + * Returns == 0 if the time t1 is exactly equal to t2 + */ +libmosqcommon_EXPORT long mosquitto_time_cmp(time_t t1_s, long t1_ns, time_t t2_s, long t2_ns); + #ifdef __cplusplus } #endif diff --git a/libcommon/time_common.c b/libcommon/time_common.c index 28aa7566..34b36f72 100644 --- a/libcommon/time_common.c +++ b/libcommon/time_common.c @@ -107,3 +107,12 @@ void mosquitto_time_ns(time_t *s, long *ns) *ns = tv.tv_usec * 1000; #endif } + +long mosquitto_time_cmp(time_t t1_s, long t1_ns, time_t t2_s, long t2_ns) +{ + if(t1_s == t2_s){ + return (long)(t1_ns - t2_ns); + }else{ + return (long)(t1_s - t2_s); + } +} diff --git a/plugins/examples/CMakeLists.txt b/plugins/examples/CMakeLists.txt index 40733f62..71ca9046 100644 --- a/plugins/examples/CMakeLists.txt +++ b/plugins/examples/CMakeLists.txt @@ -16,6 +16,7 @@ add_subdirectory(payload-modification) add_subdirectory(payload-size-stats) add_subdirectory(plugin-event-stats) add_subdirectory(print-ip-on-publish) +add_subdirectory(tick-interval) add_subdirectory(topic-modification) add_subdirectory(topic-jail) add_subdirectory(wildcard-temp) diff --git a/plugins/examples/Makefile b/plugins/examples/Makefile index 36baa883..247636f6 100644 --- a/plugins/examples/Makefile +++ b/plugins/examples/Makefile @@ -15,6 +15,7 @@ DIRS= \ payload-size-stats \ plugin-event-stats \ print-ip-on-publish \ + tick-interval \ topic-modification \ topic-jail \ wildcard-temp diff --git a/plugins/examples/delayed-auth/mosquitto_delayed_auth.c b/plugins/examples/delayed-auth/mosquitto_delayed_auth.c index 61dd7572..23b806a4 100644 --- a/plugins/examples/delayed-auth/mosquitto_delayed_auth.c +++ b/plugins/examples/delayed-auth/mosquitto_delayed_auth.c @@ -138,7 +138,7 @@ static int tick_callback(int event, void *event_data, void *userdata) } last_check = now; } - /* Declare that we want another call in at most 1 second */ + /* Declare that we want another call in 1 second at the earliest */ ed->next_s = 1; return MOSQ_ERR_SUCCESS; diff --git a/plugins/examples/tick-interval/CMakeLists.txt b/plugins/examples/tick-interval/CMakeLists.txt new file mode 100644 index 00000000..c2dbf9fd --- /dev/null +++ b/plugins/examples/tick-interval/CMakeLists.txt @@ -0,0 +1,3 @@ +set (PLUGIN_NAME mosquitto_tick_interval) + +add_mosquitto_plugin_no_install("${PLUGIN_NAME}" "${PLUGIN_NAME}.c" "" "libmosquitto_common") diff --git a/plugins/examples/tick-interval/Makefile b/plugins/examples/tick-interval/Makefile new file mode 100644 index 00000000..98e29060 --- /dev/null +++ b/plugins/examples/tick-interval/Makefile @@ -0,0 +1,15 @@ +R=../../.. +include ${R}/config.mk + +PLUGIN_NAME=mosquitto_tick_interval +LOCAL_CFLAGS+= +LOCAL_CPPFLAGS+= +LOCAL_LDFLAGS+= +LOCAL_LIBADD+= + +all : binary + +OBJS:=${PLUGIN_NAME}.o + +PLUGIN_NOINST:=1 +include ${R}/plugins/plugin.mk diff --git a/plugins/examples/tick-interval/mosquitto_tick_interval.c b/plugins/examples/tick-interval/mosquitto_tick_interval.c new file mode 100644 index 00000000..0e25d3ff --- /dev/null +++ b/plugins/examples/tick-interval/mosquitto_tick_interval.c @@ -0,0 +1,120 @@ +/* +Copyright (c) 2025 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License 2.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + https://www.eclipse.org/legal/epl-2.0/ +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +SPDX-License-Identifier: EPL-2.0 OR EDL-1.0 + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +/* + * This is an example plugin showing how a plugin can choose how frequently it + * receives a tick event. Note that this request is not a guarantee that the + * tick will be called that frequently, only that it will not be called more + * frequently. + * + * Setting to 0 means that a tick event will always be triggered for this + * plugin when the broker is ready to do so. + * + * Compile with: + * gcc -I -fPIC -shared mosquitto_tick.c -o mosquitto_tick.so + * + * Use in config with the below, where the interval is in seconds: + * + * plugin /path/to/mosquitto_tick.so + * plugin_opt_interval 1 + * + * Note that this only works on Mosquitto 2.0 or later. + */ + + +#include +#include +#include +#include +#include +#include + +#include "mosquitto.h" + +#define PLUGIN_NAME "tick-interval" +#define PLUGIN_VERSION NULL + +#ifndef UNUSED +# define UNUSED(A) (void)(A) +#endif + +MOSQUITTO_PLUGIN_DECLARE_VERSION(5); + +struct plugin_data{ + mosquitto_plugin_id_t *pid; + int interval; +}; + + +static int tick_callback(int event, void *event_data, void *userdata) +{ + struct mosquitto_evt_tick *ed = event_data; + struct plugin_data *data = userdata; + + UNUSED(event); + + mosquitto_log_printf(MOSQ_LOG_INFO, "Tick event for plugin with interval %d.", data->interval); + ed->next_s = data->interval; /* We want the next tick to occur at the earliest in "interval" seconds */ + ed->next_ms = 0; /* And 0 milliseconds */ + + return MOSQ_ERR_SUCCESS; +} + + +int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **userdata, struct mosquitto_opt *opts, int opt_count) +{ + struct plugin_data *data; + + data = mosquitto_calloc(1, sizeof(struct plugin_data)); + if(!data){ + return MOSQ_ERR_NOMEM; + } + *userdata = data; + data->interval = -1; + data->pid = identifier; + + mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION); + + for(int i=0; iinterval = atoi(opts[i].value); + }else{ + mosquitto_log_printf(MOSQ_LOG_ERR, "Error: Unknown option '%s'.", opts[i].key); + return MOSQ_ERR_INVAL; + } + } + if(data->interval < 0){ + mosquitto_log_printf(MOSQ_LOG_ERR, "Error: interval must be >= 0."); + return MOSQ_ERR_INVAL; + } + + return mosquitto_callback_register(data->pid, MOSQ_EVT_TICK, tick_callback, NULL, data); +} + + +int mosquitto_plugin_cleanup(void *userdata, struct mosquitto_opt *opts, int opt_count) +{ + struct plugin_data *data = userdata; + + UNUSED(opts); + UNUSED(opt_count); + + mosquitto_FREE(data); + + return MOSQ_ERR_SUCCESS; +} diff --git a/plugins/examples/tick-interval/test.conf b/plugins/examples/tick-interval/test.conf new file mode 100644 index 00000000..db45f534 --- /dev/null +++ b/plugins/examples/tick-interval/test.conf @@ -0,0 +1,7 @@ +listener 1883 + +plugin ./mosquitto_tick_interval.so +plugin_opt_interval 1 + +plugin ./mosquitto_tick_interval.so +plugin_opt_interval 4 diff --git a/plugins/examples/tick-interval/test.sh b/plugins/examples/tick-interval/test.sh new file mode 100755 index 00000000..34e75a3d --- /dev/null +++ b/plugins/examples/tick-interval/test.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +valgrind --log-file=vglog ../../../src/mosquitto -c test.conf -v diff --git a/plugins/examples/wildcard-temp/mosquitto_wildcard_temp.c b/plugins/examples/wildcard-temp/mosquitto_wildcard_temp.c index 129d54e8..05c7d4b2 100644 --- a/plugins/examples/wildcard-temp/mosquitto_wildcard_temp.c +++ b/plugins/examples/wildcard-temp/mosquitto_wildcard_temp.c @@ -182,7 +182,7 @@ static int tick_callback(int event, void *event_data, void *userdata) } } - /* Declare that we want another call in at most 1 second */ + /* Declare that we want another call in 1 second at the earliest */ ed->next_s = 1; return MOSQ_ERR_SUCCESS; diff --git a/plugins/persist-sqlite/persist_sqlite.h b/plugins/persist-sqlite/persist_sqlite.h index d0c84788..263a274a 100644 --- a/plugins/persist-sqlite/persist_sqlite.h +++ b/plugins/persist-sqlite/persist_sqlite.h @@ -49,7 +49,6 @@ struct mosquitto_sqlite { sqlite3_stmt *retain_msg_remove_stmt; sqlite3_stmt *will_add_stmt; sqlite3_stmt *will_remove_stmt; - time_t last_transaction; int synchronous; unsigned int event_count; unsigned int flush_period; diff --git a/plugins/persist-sqlite/tick.c b/plugins/persist-sqlite/tick.c index 4b12df5a..5acadd3b 100644 --- a/plugins/persist-sqlite/tick.c +++ b/plugins/persist-sqlite/tick.c @@ -31,12 +31,14 @@ int persist_sqlite__tick_cb(int event, void *event_data, void *userdata) struct mosquitto_sqlite *ms = userdata; UNUSED(event); - if(ed->now_s >= ms->last_transaction + ms->flush_period && ms->event_count > 0){ - ms->last_transaction = ed->now_s; + + if(ms->event_count > 0){ ms->event_count = 0; sqlite3_exec(ms->db, "END;", NULL, NULL, NULL); sqlite3_exec(ms->db, "BEGIN;", NULL, NULL, NULL); } + ed->next_s = ms->flush_period; + return MOSQ_ERR_SUCCESS; } diff --git a/src/control.c b/src/control.c index 1a4ccef8..2adf11e9 100644 --- a/src/control.c +++ b/src/control.c @@ -110,14 +110,15 @@ int control__register_callback(mosquitto_plugin_id_t *pid, MOSQ_FUNC_generic_cal if(cb_new == NULL){ return MOSQ_ERR_NOMEM; } - cb_new->data = mosquitto_strdup(topic); - if(cb_new->data == NULL){ + cb_new->data.topic = mosquitto_strdup(topic); + if(cb_new->data.topic == NULL){ mosquitto_FREE(cb_new); return MOSQ_ERR_NOMEM; } + cb_new->identifier = pid; cb_new->cb = cb_func; cb_new->userdata = userdata; - HASH_ADD_KEYPTR(hh, opts->plugin_callbacks.control, cb_new->data, strlen(cb_new->data), cb_new); + HASH_ADD_KEYPTR(hh, opts->plugin_callbacks.control, cb_new->data.topic, strlen(cb_new->data.topic), cb_new); if(pid->plugin_name){ struct control_endpoint *ep; @@ -156,7 +157,7 @@ int control__unregister_callback(mosquitto_plugin_id_t *identifier, MOSQ_FUNC_ge HASH_FIND(hh, opts->plugin_callbacks.control, topic, topic_len, cb_found); if(cb_found && cb_found->cb == cb_func){ HASH_DELETE(hh, opts->plugin_callbacks.control, cb_found); - mosquitto_FREE(cb_found->data); + mosquitto_FREE(cb_found->data.topic); mosquitto_FREE(cb_found); DL_FOREACH(identifier->control_endpoints, ep){ @@ -188,7 +189,7 @@ void control__unregister_all_callbacks(mosquitto_plugin_id_t *identifier) HASH_FIND(hh, opts->plugin_callbacks.control, ep->topic, strlen(ep->topic), cb_found); if(cb_found){ HASH_DELETE(hh, opts->plugin_callbacks.control, cb_found); - mosquitto_FREE(cb_found->data); + mosquitto_FREE(cb_found->data.topic); mosquitto_FREE(cb_found); } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index d007a49f..66d249b5 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -157,7 +157,11 @@ struct mosquitto__callback{ struct mosquitto__callback *next, *prev; /* For typical callbacks */ MOSQ_FUNC_generic_callback cb; void *userdata; - char *data; /* e.g. topic for control event */ + union{ + char *topic; + struct timespec next_tick; + } data; + mosquitto_plugin_id_t *identifier; }; struct plugin__callbacks{ @@ -312,6 +316,7 @@ struct mosquitto_plugin_id_t{ char *plugin_version; struct control_endpoint *control_endpoints; struct plugin_own_callback *own_callbacks; + struct timespec next_tick; }; struct mosquitto__config { diff --git a/src/plugin_callbacks.c b/src/plugin_callbacks.c index 22367752..7298ce01 100644 --- a/src/plugin_callbacks.c +++ b/src/plugin_callbacks.c @@ -90,12 +90,12 @@ static const char *get_event_name(enum mosquitto_plugin_event event) return ""; } -static bool check_callback_exists(struct mosquitto__callback *cb_base, MOSQ_FUNC_generic_callback cb_func) +static bool check_callback_exists(struct mosquitto__callback *cb_base, mosquitto_plugin_id_t *identifier, MOSQ_FUNC_generic_callback cb_func) { struct mosquitto__callback *tail, *tmp; DL_FOREACH_SAFE(cb_base, tail, tmp){ - if(tail->cb == cb_func){ + if(tail->identifier == identifier && tail->cb == cb_func){ return true; } } @@ -186,7 +186,7 @@ static int remove_callback(mosquitto_plugin_id_t *plugin, struct plugin_own_call } DL_FOREACH_SAFE(*cb_base, tail, tmp){ - if(tail->cb == own->cb_func){ + if(tail->identifier == plugin && tail->cb == own->cb_func){ DL_DELETE(*cb_base, tail); mosquitto_FREE(tail); break; @@ -240,7 +240,7 @@ BROKER_EXPORT int mosquitto_callback_register( return MOSQ_ERR_NOT_SUPPORTED; } - if(check_callback_exists(*cb_base, cb_func)){ + if(check_callback_exists(*cb_base, identifier, cb_func)){ return MOSQ_ERR_ALREADY_EXISTS; } @@ -252,6 +252,7 @@ BROKER_EXPORT int mosquitto_callback_register( } DL_APPEND(*cb_base, cb_new); + cb_new->identifier = identifier; cb_new->cb = cb_func; cb_new->userdata = userdata; } diff --git a/src/plugin_reload.c b/src/plugin_reload.c index 79984899..4e3b913a 100644 --- a/src/plugin_reload.c +++ b/src/plugin_reload.c @@ -34,7 +34,7 @@ static void plugin__handle_reload_single(struct mosquitto__security_options *opt // Using DL_FOREACH_SAFE here, as reload callbacks might unregister themself DL_FOREACH_SAFE(opts->plugin_callbacks.reload, cb_base, cb_next){ - cb_base->cb(MOSQ_EVT_TICK, &event_data, cb_base->userdata); + cb_base->cb(MOSQ_EVT_RELOAD, &event_data, cb_base->userdata); } } diff --git a/src/plugin_tick.c b/src/plugin_tick.c index abe37e0a..6a09d445 100644 --- a/src/plugin_tick.c +++ b/src/plugin_tick.c @@ -35,10 +35,22 @@ static void plugin__handle_tick_single(struct mosquitto__security_options *opts) // Using DL_FOREACH_SAFE here, as tick callbacks might unregister themself DL_FOREACH_SAFE(opts->plugin_callbacks.tick, cb_base, cb_next){ mosquitto_time_ns(&event_data.now_s, &event_data.now_ns); - event_data.next_s = 0; - event_data.next_ms = 0; - cb_base->cb(MOSQ_EVT_TICK, &event_data, cb_base->userdata); - loop__update_next_event(event_data.next_s * 1000 + event_data.next_ms); + + if(mosquitto_time_cmp(event_data.now_s, event_data.now_ns, + cb_base->data.next_tick.tv_sec, cb_base->data.next_tick.tv_nsec) > 0){ + + event_data.next_s = 0; + event_data.next_ms = 0; + cb_base->cb(MOSQ_EVT_TICK, &event_data, cb_base->userdata); + loop__update_next_event(event_data.next_s * 1000 + event_data.next_ms); + + cb_base->data.next_tick.tv_sec = event_data.now_s + event_data.next_s; + cb_base->data.next_tick.tv_nsec = event_data.now_ns + 1000000*event_data.next_ms; + if(cb_base->data.next_tick.tv_nsec > 1000000000){ + cb_base->data.next_tick.tv_nsec -= 1000000000; + cb_base->data.next_tick.tv_sec += 1; + } + } } }