diff --git a/src/database.c b/src/database.c index d4286246..94375128 100644 --- a/src/database.c +++ b/src/database.c @@ -281,7 +281,7 @@ void db__msg_store_remove(struct mosquitto_msg_store *store, bool notify) HASH_DELETE(hh, db.msg_store, store); db.msg_store_count--; db.msg_store_bytes -= store->payloadlen; - if(db.shutdown == false || notify == false){ + if(db.shutdown == false || notify == true){ plugin_persist__handle_msg_remove(store); } db__msg_store_free(store); diff --git a/src/handle_subscribe.c b/src/handle_subscribe.c index ea4e56db..1e1a528f 100644 --- a/src/handle_subscribe.c +++ b/src/handle_subscribe.c @@ -214,7 +214,7 @@ int handle__subscribe(struct mosquitto *context) log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub); if(context->session_expiry_interval > 0){ - plugin_persist__handle_subscription_add(context, sub, subscription_options, subscription_identifier); + plugin_persist__handle_subscription_add(context, sub, qos | subscription_options, subscription_identifier); } } mosquitto__free(sub); diff --git a/src/handle_unsubscribe.c b/src/handle_unsubscribe.c index 3726d9cf..f4030d88 100644 --- a/src/handle_unsubscribe.c +++ b/src/handle_unsubscribe.c @@ -130,9 +130,7 @@ int handle__unsubscribe(struct mosquitto *context) log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s", sub); if(allowed){ rc = sub__remove(context, sub, db.subs, &reason); - if(context->session_expiry_interval > 0){ - plugin_persist__handle_subscription_remove(context, sub); - } + plugin_persist__handle_subscription_remove(context, sub); }else{ rc = MOSQ_ERR_SUCCESS; } diff --git a/src/plugin_persist.c b/src/plugin_persist.c index e3b8e608..f7f56f69 100644 --- a/src/plugin_persist.c +++ b/src/plugin_persist.c @@ -201,8 +201,15 @@ void plugin_persist__handle_client_msg_remove(struct mosquitto *context, const s struct mosquitto__callback *cb_base; struct mosquitto__security_options *opts; + if(context->session_expiry_interval == 0 + || (cmsg->qos == 0 && db.config->queue_qos0_messages == false)){ + + return; + } + opts = &db.config->security_options; memset(&event_data, 0, sizeof(event_data)); + event_data.client_id = context->id; event_data.cmsg_id = cmsg->cmsg_id; event_data.mid = cmsg->mid; diff --git a/src/retain.c b/src/retain.c index 3ef4df6a..5ddfefb6 100644 --- a/src/retain.c +++ b/src/retain.c @@ -157,7 +157,7 @@ int retain__store(const char *topic, struct mosquitto_msg_store *stored, char ** #endif if(retainhier->retained){ - if(persist){ + if(persist && retainhier->retained->topic[0] != '$'){ plugin_persist__handle_retain_remove(retainhier->retained); } db__msg_store_ref_dec(&retainhier->retained); @@ -171,11 +171,9 @@ int retain__store(const char *topic, struct mosquitto_msg_store *stored, char ** if(stored->payloadlen){ retainhier->retained = stored; db__msg_store_ref_inc(retainhier->retained); - if(retainhier->retained->topic[0] != '$'){ - if(persist){ - plugin_persist__handle_msg_add(retainhier->retained); - plugin_persist__handle_retain_add(retainhier->retained); - } + if(persist && retainhier->retained->topic[0] != '$'){ + plugin_persist__handle_msg_add(retainhier->retained); + plugin_persist__handle_retain_add(retainhier->retained); } #ifdef WITH_SYS_TREE db.retained_count++; diff --git a/src/subs.c b/src/subs.c index 90489dc3..e67b5503 100644 --- a/src/subs.c +++ b/src/subs.c @@ -719,6 +719,7 @@ int sub__clean_session(struct mosquitto *context) hier = context->subs[i]->hier; + plugin_persist__handle_subscription_remove(context, context->subs[i]->topic_filter); if(context->subs[i]->shared){ leaf = context->subs[i]->shared->subs; while(leaf){