mirror of
https://github.com/eclipse-mosquitto/mosquitto.git
synced 2026-02-05 18:50:06 +08:00
Fixed memory issue with property handling in persistence SQLite will restore
Signed-off-by: Norbert Heusser <norbert.heusser@cedalo.com>
This commit is contained in:
committed by
Roger Light
parent
a447e9e9ea
commit
7a22685866
@@ -485,12 +485,16 @@ static int will_restore(struct mosquitto_sqlite *ms)
|
||||
properties = json_to_properties((const char *)sqlite3_column_text(stmt, 6));
|
||||
|
||||
rc = mosquitto_client_will_set(clientid, topic, payloadlen, payload, qos, retain, properties);
|
||||
if (rc == MOSQ_ERR_NOT_FOUND || (sqlite3_column_int64(stmt, 7) == 0 && sqlite3_column_int64(stmt, 8) == 0)){
|
||||
/* If the client does not exist this is the will message of a non-persistent client.
|
||||
If the client is a persistent client and was connected at the moment of a crash
|
||||
and has no will delay we publish it's will message.*/
|
||||
if (rc == MOSQ_ERR_NOT_FOUND){
|
||||
/* If the client does not exist this is the will message of a non-persistent client. */
|
||||
rc = publish_will_msg(topic, payloadlen, payload, qos, retain, properties);
|
||||
} else if (rc == MOSQ_ERR_SUCCESS && (sqlite3_column_int64(stmt, 7) == 0 && sqlite3_column_int64(stmt, 8) == 0)) {
|
||||
/* If the client is a persistent client and was connected at the moment of a crash
|
||||
and has no will delay we publish it's will message now, but need a new copy of the properties. */
|
||||
properties = json_to_properties((const char *)sqlite3_column_text(stmt, 6));
|
||||
rc = publish_will_msg(topic, payloadlen, payload, qos, retain, properties);
|
||||
}
|
||||
|
||||
if(rc == MOSQ_ERR_SUCCESS){
|
||||
count++;
|
||||
}else{
|
||||
|
||||
@@ -136,15 +136,13 @@ void plugin_persist__handle_client_delete(struct mosquitto *context)
|
||||
return;
|
||||
}
|
||||
|
||||
log__printf(NULL, MOSQ_LOG_INFO, "plugin_persist__handle_client_delete for client %s will %p", context->id, context->will);
|
||||
|
||||
plugin_persist__handle_will_delete(context);
|
||||
|
||||
if (context->is_persisted == false
|
||||
|| context->session_expiry_interval > 0){
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
opts = &db.config->security_options;
|
||||
memset(&event_data, 0, sizeof(event_data));
|
||||
event_data.data.clientid = context->id;
|
||||
@@ -423,8 +421,6 @@ void plugin_persist__handle_will_delete(struct mosquitto *context)
|
||||
return;
|
||||
}
|
||||
|
||||
log__printf(NULL, MOSQ_LOG_INFO, "plugin_persist__handle_will_delete for client %s", context->id);
|
||||
|
||||
opts = &db.config->security_options;
|
||||
DL_FOREACH_SAFE(opts->plugin_callbacks.persist_will_delete, cb_base, cb_next){
|
||||
cb_base->cb(MOSQ_EVT_PERSIST_WILL_ADD, &event_data, cb_base->userdata);
|
||||
|
||||
@@ -875,5 +875,11 @@ BROKER_EXPORT int mosquitto_client_will_set(const char* clientid, const char *to
|
||||
if (!mosq){
|
||||
return MOSQ_ERR_NOT_FOUND;
|
||||
}
|
||||
if (properties && mosq->protocol != mosq_p_mqtt5){
|
||||
if (net__is_connected(mosq)) {
|
||||
return MOSQ_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
mosq->protocol = mosq_p_mqtt5;
|
||||
}
|
||||
return will__set(mosq, topic, payloadlen, payload, qos, retain, properties);
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ from persist_module_helper import *
|
||||
from typing import Optional
|
||||
|
||||
import mqtt5_rc
|
||||
import mqtt5_props
|
||||
|
||||
persist_help = persist_module()
|
||||
|
||||
@@ -22,15 +23,47 @@ subscriber_id = "test-will-subscriber"
|
||||
publisher_id = "test-will-publisher"
|
||||
helper_id = "test-helper"
|
||||
|
||||
will_properties = mqtt5_props.gen_properties(
|
||||
[
|
||||
{"identifier": mqtt5_props.PROP_PAYLOAD_FORMAT_INDICATOR, "value": 1},
|
||||
{"identifier": mqtt5_props.PROP_CONTENT_TYPE, "value": "text"},
|
||||
{
|
||||
"identifier": mqtt5_props.PROP_USER_PROPERTY,
|
||||
"name": "test-user-property",
|
||||
"value": "nothing important",
|
||||
},
|
||||
]
|
||||
)
|
||||
|
||||
will_properties_in_db = (
|
||||
"["
|
||||
'{"identifier":"payload-format-indicator","value":1}'
|
||||
+ ',{"identifier":"content-type","value":"text"}'
|
||||
+ ',{"identifier":"user-property","name":"test-user-property","value":"nothing important"}'
|
||||
+ "]"
|
||||
)
|
||||
|
||||
send_will_disconnect_rc = mqtt5_rc.MQTT_RC_DISCONNECT_WITH_WILL_MSG
|
||||
normal_disconnect_rc = mqtt5_rc.MQTT_RC_NORMAL_DISCONNECTION
|
||||
|
||||
|
||||
def do_test(
|
||||
session_expiry: int,
|
||||
will_qos: int,
|
||||
will_retain: bool,
|
||||
will_delay: int = 0,
|
||||
disconnect_rc: Optional[int] = None,
|
||||
):
|
||||
mid = 1
|
||||
|
||||
print(
|
||||
f" {'persistent' if session_expiry > 0 else 'non persistent'} client"
|
||||
f" with will qos = {will_qos}"
|
||||
f", will_retain = {will_retain}"
|
||||
f" and will delay = {will_delay}"
|
||||
+ (f" and disconnect rc = {disconnect_rc}" if disconnect_rc is not None else "")
|
||||
)
|
||||
|
||||
def check_will_received(do_subscribe: bool, expect_will_publish=bool):
|
||||
nonlocal mid
|
||||
# Reconnect client, it should have a session
|
||||
@@ -64,12 +97,6 @@ def do_test(
|
||||
mosq_test.do_ping(subscriber_sock)
|
||||
subscriber_sock.close()
|
||||
|
||||
print(
|
||||
f" {'persistent' if session_expiry > 0 else 'non persistent'} client"
|
||||
f" with will qos = {will_qos}"
|
||||
f" and will_retain = {will_retain}"
|
||||
+ (f" and disconnect rc = {disconnect_rc}" if disconnect_rc is not None else "")
|
||||
)
|
||||
expect_will_received = will_qos > 0
|
||||
|
||||
conf_file = os.path.basename(__file__).replace(".py", f"_{port}.conf")
|
||||
@@ -83,7 +110,13 @@ def do_test(
|
||||
rc = 1
|
||||
|
||||
will_payload = b"My simple will message"
|
||||
will_properties = b""
|
||||
if will_delay:
|
||||
will_delay_property = mqtt5_props.gen_uint32_prop(
|
||||
mqtt5_props.PROP_WILL_DELAY_INTERVAL, will_delay
|
||||
)
|
||||
else:
|
||||
will_delay_property = b""
|
||||
will_delayed = will_delay > 0 and session_expiry > 0
|
||||
|
||||
broker = mosq_test.start_broker(filename=conf_file, use_conf=True, port=port)
|
||||
stde = None
|
||||
@@ -108,6 +141,7 @@ def do_test(
|
||||
will_qos=will_qos,
|
||||
will_retain=will_retain,
|
||||
will_payload=will_payload,
|
||||
will_properties=will_properties + will_delay_property,
|
||||
)
|
||||
if disconnect_rc:
|
||||
if disconnect_rc == mqtt5_rc.MQTT_RC_SESSION_TAKEN_OVER:
|
||||
@@ -128,7 +162,7 @@ def do_test(
|
||||
reason_code=disconnect_rc, proto_ver=proto_ver
|
||||
),
|
||||
)
|
||||
will_sent = disconnect_rc != mqtt5_rc.MQTT_RC_NORMAL_DISCONNECTION
|
||||
will_sent = disconnect_rc != normal_disconnect_rc and not will_delayed
|
||||
else:
|
||||
will_sent = False
|
||||
|
||||
@@ -168,14 +202,18 @@ def do_test(
|
||||
topic,
|
||||
will_qos,
|
||||
will_retain,
|
||||
properties=None,
|
||||
properties=will_properties_in_db,
|
||||
)
|
||||
|
||||
# Restart broker
|
||||
broker = mosq_test.start_broker(filename=conf_file, use_conf=True, port=port)
|
||||
|
||||
check_will_received(do_subscribe=False, expect_will_publish=will_qos > 0)
|
||||
check_will_received(do_subscribe=True, expect_will_publish=will_retain)
|
||||
check_will_received(
|
||||
do_subscribe=False, expect_will_publish=will_qos > 0 and not will_delayed
|
||||
)
|
||||
check_will_received(
|
||||
do_subscribe=True, expect_will_publish=will_retain and not will_delayed
|
||||
)
|
||||
|
||||
(broker_terminate_rc, stde2) = mosq_test.terminate_broker(broker)
|
||||
broker = None
|
||||
@@ -208,23 +246,31 @@ def do_test(
|
||||
# If disconnect_rc is not set the client will not disconnect
|
||||
# session_expiry, will qos, will retain, disconnect_rc
|
||||
|
||||
send_will_disconnect_rc = mqtt5_rc.MQTT_RC_DISCONNECT_WITH_WILL_MSG
|
||||
|
||||
# non persistent client connected during crash
|
||||
do_test(0, 0, 0)
|
||||
do_test(0, 1, 0)
|
||||
do_test(0, 0, 1)
|
||||
do_test(0, 1, 1)
|
||||
|
||||
# non persistent client connected during crash, will delay does not matter
|
||||
do_test(0, 0, 0, will_delay=30)
|
||||
do_test(0, 1, 0, will_delay=30)
|
||||
do_test(0, 0, 1, will_delay=30)
|
||||
do_test(0, 1, 1, will_delay=30)
|
||||
|
||||
# non persistent client disconnecting with will sent before crash
|
||||
do_test(0, 0, 0, mqtt5_rc.MQTT_RC_DISCONNECT_WITH_WILL_MSG)
|
||||
do_test(0, 1, 0, mqtt5_rc.MQTT_RC_DISCONNECT_WITH_WILL_MSG)
|
||||
do_test(0, 0, 1, mqtt5_rc.MQTT_RC_DISCONNECT_WITH_WILL_MSG)
|
||||
do_test(0, 1, 1, mqtt5_rc.MQTT_RC_DISCONNECT_WITH_WILL_MSG)
|
||||
do_test(0, 0, 0, disconnect_rc=send_will_disconnect_rc)
|
||||
do_test(0, 1, 0, disconnect_rc=send_will_disconnect_rc)
|
||||
do_test(0, 0, 1, disconnect_rc=send_will_disconnect_rc)
|
||||
do_test(0, 1, 1, disconnect_rc=send_will_disconnect_rc)
|
||||
|
||||
# non persistent client disconnecting without will sent before crash
|
||||
do_test(0, 0, 0, mqtt5_rc.MQTT_RC_NORMAL_DISCONNECTION)
|
||||
do_test(0, 1, 0, mqtt5_rc.MQTT_RC_NORMAL_DISCONNECTION)
|
||||
do_test(0, 0, 1, mqtt5_rc.MQTT_RC_NORMAL_DISCONNECTION)
|
||||
do_test(0, 1, 1, mqtt5_rc.MQTT_RC_NORMAL_DISCONNECTION)
|
||||
do_test(0, 0, 0, disconnect_rc=normal_disconnect_rc)
|
||||
do_test(0, 1, 0, disconnect_rc=normal_disconnect_rc)
|
||||
do_test(0, 0, 1, disconnect_rc=normal_disconnect_rc)
|
||||
do_test(0, 1, 1, disconnect_rc=normal_disconnect_rc)
|
||||
|
||||
# persistent client connected during crash
|
||||
do_test(60, 0, 0)
|
||||
@@ -232,18 +278,30 @@ do_test(60, 1, 0)
|
||||
do_test(60, 0, 1)
|
||||
do_test(60, 1, 1)
|
||||
|
||||
# persistent client connected during crash with a will delay of 30 seconds
|
||||
do_test(60, 0, 0, will_delay=30)
|
||||
do_test(60, 1, 0, will_delay=30)
|
||||
do_test(60, 0, 1, will_delay=30)
|
||||
do_test(60, 1, 1, will_delay=30)
|
||||
|
||||
# persistent client disconnecting with will sent before crash
|
||||
do_test(60, 0, 0, mqtt5_rc.MQTT_RC_DISCONNECT_WITH_WILL_MSG)
|
||||
do_test(60, 1, 0, mqtt5_rc.MQTT_RC_DISCONNECT_WITH_WILL_MSG)
|
||||
do_test(60, 0, 1, mqtt5_rc.MQTT_RC_DISCONNECT_WITH_WILL_MSG)
|
||||
do_test(60, 1, 1, mqtt5_rc.MQTT_RC_DISCONNECT_WITH_WILL_MSG)
|
||||
do_test(60, 0, 0, disconnect_rc=send_will_disconnect_rc)
|
||||
do_test(60, 1, 0, disconnect_rc=send_will_disconnect_rc)
|
||||
do_test(60, 0, 1, disconnect_rc=send_will_disconnect_rc)
|
||||
do_test(60, 1, 1, disconnect_rc=send_will_disconnect_rc)
|
||||
|
||||
# persistent client disconnecting without will sent before crash
|
||||
do_test(60, 0, 0, mqtt5_rc.MQTT_RC_NORMAL_DISCONNECTION)
|
||||
do_test(60, 1, 0, mqtt5_rc.MQTT_RC_NORMAL_DISCONNECTION)
|
||||
do_test(60, 0, 1, mqtt5_rc.MQTT_RC_NORMAL_DISCONNECTION)
|
||||
do_test(60, 1, 1, mqtt5_rc.MQTT_RC_NORMAL_DISCONNECTION)
|
||||
do_test(60, 0, 0, disconnect_rc=normal_disconnect_rc)
|
||||
do_test(60, 1, 0, disconnect_rc=normal_disconnect_rc)
|
||||
do_test(60, 0, 1, disconnect_rc=normal_disconnect_rc)
|
||||
do_test(60, 1, 1, disconnect_rc=normal_disconnect_rc)
|
||||
|
||||
# persistent client disconnecting with will sent, but will delay
|
||||
do_test(60, 0, 0, will_delay=30, disconnect_rc=send_will_disconnect_rc)
|
||||
do_test(60, 1, 0, will_delay=30, disconnect_rc=send_will_disconnect_rc)
|
||||
do_test(60, 0, 1, will_delay=30, disconnect_rc=send_will_disconnect_rc)
|
||||
do_test(60, 1, 1, will_delay=30, disconnect_rc=send_will_disconnect_rc)
|
||||
|
||||
# Remove will msg by session takeover through reconnect
|
||||
do_test(0, 1, 1, mqtt5_rc.MQTT_RC_SESSION_TAKEN_OVER)
|
||||
# do_test(60, 1, 1, mqtt5_rc.MQTT_RC_SESSION_TAKEN_OVER)
|
||||
do_test(0, 1, 1, disconnect_rc=mqtt5_rc.MQTT_RC_SESSION_TAKEN_OVER)
|
||||
# do_test(60, 1, 1, disconnect_rc=mqtt5_rc.MQTT_RC_SESSION_TAKEN_OVER)
|
||||
|
||||
Reference in New Issue
Block a user