mirror of
https://github.com/eclipse-mosquitto/mosquitto.git
synced 2026-02-06 02:52:07 +08:00
Restrict persistence events to persisted clients only.
This commit is contained in:
@@ -352,6 +352,7 @@ struct mosquitto {
|
||||
bool in_by_id;
|
||||
bool is_dropping;
|
||||
bool is_bridge;
|
||||
bool is_persisted;
|
||||
struct mosquitto__bridge *bridge;
|
||||
struct mosquitto_msg_data msgs_in;
|
||||
struct mosquitto_msg_data msgs_out;
|
||||
|
||||
@@ -128,6 +128,14 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1
|
||||
* done in context__cleanup() below. */
|
||||
}
|
||||
|
||||
if(context->clean_start == true){
|
||||
sub__clean_session(found_context);
|
||||
found_context->session_expiry_interval = 0;
|
||||
plugin_persist__handle_client_delete(found_context);
|
||||
}
|
||||
context->is_persisted = found_context->is_persisted;
|
||||
found_context->is_persisted = false; /* stops persistence for context being removed */
|
||||
|
||||
if(context->clean_start == false && found_context->session_expiry_interval > 0){
|
||||
if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt5){
|
||||
connect_ack |= 0x01;
|
||||
@@ -183,11 +191,6 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1
|
||||
}
|
||||
}
|
||||
|
||||
if(context->clean_start == true){
|
||||
sub__clean_session(found_context);
|
||||
found_context->session_expiry_interval = 0;
|
||||
plugin_persist__handle_client_delete(found_context);
|
||||
}
|
||||
if((found_context->protocol == mosq_p_mqtt5 && found_context->session_expiry_interval == 0)
|
||||
|| (found_context->protocol != mosq_p_mqtt5 && found_context->clean_start == true)
|
||||
|| (context->clean_start == true)
|
||||
|
||||
@@ -213,9 +213,7 @@ int handle__subscribe(struct mosquitto *context)
|
||||
|
||||
log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub);
|
||||
|
||||
if(context->session_expiry_interval > 0){
|
||||
plugin_persist__handle_subscription_add(context, sub, qos | subscription_options, subscription_identifier);
|
||||
}
|
||||
plugin_persist__handle_subscription_add(context, sub, qos | subscription_options, subscription_identifier);
|
||||
}
|
||||
mosquitto__FREE(sub);
|
||||
|
||||
|
||||
@@ -374,10 +374,12 @@ int main(int argc, char *argv[])
|
||||
if(!ctxt->wsi)
|
||||
#endif
|
||||
{
|
||||
ctxt->is_persisted = false; /* prevent persistence removal */
|
||||
context__cleanup(ctxt, true);
|
||||
}
|
||||
}
|
||||
HASH_ITER(hh_sock, db.contexts_by_sock, ctxt, ctxt_tmp){
|
||||
ctxt->is_persisted = false; /* prevent persistence removal */
|
||||
context__cleanup(ctxt, true);
|
||||
}
|
||||
#ifdef WITH_BRIDGE
|
||||
|
||||
@@ -54,7 +54,7 @@ void plugin_persist__handle_client_add(struct mosquitto *context)
|
||||
|
||||
UNUSED(will); /* FIXME */
|
||||
|
||||
if(db.shutdown) return;
|
||||
if(db.shutdown || context->is_persisted) return;
|
||||
|
||||
opts = &db.config->security_options;
|
||||
memset(&event_data, 0, sizeof(event_data));
|
||||
@@ -77,6 +77,7 @@ void plugin_persist__handle_client_add(struct mosquitto *context)
|
||||
DL_FOREACH(opts->plugin_callbacks.persist_client_add, cb_base){
|
||||
cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_ADD, &event_data, cb_base->userdata);
|
||||
}
|
||||
context->is_persisted = true;
|
||||
}
|
||||
|
||||
|
||||
@@ -123,7 +124,8 @@ void plugin_persist__handle_client_delete(struct mosquitto *context)
|
||||
struct mosquitto__callback *cb_base;
|
||||
struct mosquitto__security_options *opts;
|
||||
|
||||
if(context->session_expiry_interval > 0
|
||||
if(context->is_persisted == false
|
||||
|| context->session_expiry_interval > 0
|
||||
|| context->id == NULL
|
||||
|| context->state == mosq_cs_duplicate
|
||||
|| db.shutdown){
|
||||
@@ -137,6 +139,7 @@ void plugin_persist__handle_client_delete(struct mosquitto *context)
|
||||
DL_FOREACH(opts->plugin_callbacks.persist_client_delete, cb_base){
|
||||
cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_DELETE, &event_data, cb_base->userdata);
|
||||
}
|
||||
context->is_persisted = false;
|
||||
}
|
||||
|
||||
|
||||
@@ -146,7 +149,7 @@ void plugin_persist__handle_subscription_add(struct mosquitto *context, const ch
|
||||
struct mosquitto__callback *cb_base;
|
||||
struct mosquitto__security_options *opts;
|
||||
|
||||
if(db.shutdown) return;
|
||||
if(db.shutdown || context->is_persisted == false) return;
|
||||
|
||||
opts = &db.config->security_options;
|
||||
memset(&event_data, 0, sizeof(event_data));
|
||||
@@ -167,7 +170,7 @@ void plugin_persist__handle_subscription_delete(struct mosquitto *context, const
|
||||
struct mosquitto__callback *cb_base;
|
||||
struct mosquitto__security_options *opts;
|
||||
|
||||
if(db.shutdown) return;
|
||||
if(db.shutdown || context->is_persisted == false) return;
|
||||
|
||||
opts = &db.config->security_options;
|
||||
memset(&event_data, 0, sizeof(event_data));
|
||||
@@ -186,7 +189,7 @@ void plugin_persist__handle_client_msg_add(struct mosquitto *context, const stru
|
||||
struct mosquitto__callback *cb_base;
|
||||
struct mosquitto__security_options *opts;
|
||||
|
||||
if(context->session_expiry_interval == 0
|
||||
if(context->is_persisted == false
|
||||
|| (cmsg->qos == 0 && db.config->queue_qos0_messages == false)
|
||||
|| db.shutdown){
|
||||
|
||||
@@ -218,7 +221,7 @@ void plugin_persist__handle_client_msg_delete(struct mosquitto *context, const s
|
||||
struct mosquitto__callback *cb_base;
|
||||
struct mosquitto__security_options *opts;
|
||||
|
||||
if(context->session_expiry_interval == 0
|
||||
if(context->is_persisted == false
|
||||
|| (cmsg->qos == 0 && db.config->queue_qos0_messages == false)
|
||||
|| db.shutdown){
|
||||
|
||||
@@ -248,7 +251,12 @@ void plugin_persist__handle_client_msg_update(struct mosquitto *context, const s
|
||||
struct mosquitto__callback *cb_base;
|
||||
struct mosquitto__security_options *opts;
|
||||
|
||||
if(db.shutdown) return;
|
||||
if(context->is_persisted == false
|
||||
|| (cmsg->qos == 0 && db.config->queue_qos0_messages == false)
|
||||
|| db.shutdown){
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
opts = &db.config->security_options;
|
||||
memset(&event_data, 0, sizeof(event_data));
|
||||
@@ -272,7 +280,7 @@ void plugin_persist__handle_client_msg_clear(struct mosquitto *context, uint8_t
|
||||
struct mosquitto__callback *cb_base;
|
||||
struct mosquitto__security_options *opts;
|
||||
|
||||
if(context->session_expiry_interval == 0 || db.shutdown) return;
|
||||
if(db.shutdown || context->is_persisted == false) return;
|
||||
|
||||
opts = &db.config->security_options;
|
||||
memset(&event_data, 0, sizeof(event_data));
|
||||
|
||||
@@ -457,6 +457,7 @@ int mosquitto_persist_client_add(struct mosquitto_evt_persist_client *client)
|
||||
context->max_qos = client->max_qos;
|
||||
context->maximum_packet_size = client->max_packet_size;
|
||||
context->retain_available = client->retain_available;
|
||||
context->is_persisted = true;
|
||||
|
||||
/* in per_listener_settings mode, try to find the listener by persisted port */
|
||||
if(db.config->per_listener_settings && client->listener_port > 0){
|
||||
@@ -548,6 +549,7 @@ int mosquitto_persist_client_delete(const char *client_id)
|
||||
|
||||
context->clean_start = true;
|
||||
context->session_expiry_interval = 0;
|
||||
context->is_persisted = false;
|
||||
mosquitto__set_state(context, mosq_cs_duplicate);
|
||||
do_disconnect(context, MOSQ_ERR_SUCCESS);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user