mirror of
https://github.com/eclipse-mosquitto/mosquitto.git
synced 2026-02-05 18:50:06 +08:00
Plugin interface interface and test improvements.
This commit is contained in:
@@ -1045,10 +1045,8 @@ int mosquitto_subscription_delete(const char *client_id, const char *topic);
|
||||
* message for an existing topic first.
|
||||
*
|
||||
* Parameters:
|
||||
* msg->plugin_topic - the topic that the message references
|
||||
* The broker will *not* modify this string and it remains the
|
||||
* property of the plugin.
|
||||
* msg->store_id - the store id of the stored message that is to be retained
|
||||
* topic - the topic that the message references
|
||||
* store_id - the store id of the stored message that is to be retained
|
||||
*
|
||||
* Returns:
|
||||
* MOSQ_ERR_SUCCESS - on success
|
||||
@@ -1056,7 +1054,7 @@ int mosquitto_subscription_delete(const char *client_id, const char *topic);
|
||||
* MOSQ_ERR_NOT_FOUND - the referenced stored message was not found
|
||||
* MOSQ_ERR_NOMEM - on out of memory
|
||||
*/
|
||||
int mosquitto_persist_retain_add(struct mosquitto_evt_persist_retain *retain);
|
||||
int mosquitto_persist_retain_add(const char *topic, uint64_t store_id);
|
||||
|
||||
|
||||
/* Function: mosquitto_persist_retain_delete
|
||||
@@ -1064,9 +1062,7 @@ int mosquitto_persist_retain_add(struct mosquitto_evt_persist_retain *retain);
|
||||
* Use to delete a retained message.
|
||||
*
|
||||
* Parameters:
|
||||
* msg->plugin_topic - the topic that the message references
|
||||
* The broker will *not* modify this string and it remains the
|
||||
* property of the plugin.
|
||||
* topic - the topic that the message references
|
||||
*
|
||||
* Returns:
|
||||
* MOSQ_ERR_SUCCESS - on success
|
||||
|
||||
@@ -248,7 +248,7 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic,
|
||||
}
|
||||
|
||||
/* Payload */
|
||||
if(payloadlen){
|
||||
if(payloadlen && payload){
|
||||
packet__write_bytes(packet, payload, payloadlen);
|
||||
}
|
||||
|
||||
|
||||
@@ -281,8 +281,8 @@ void db__msg_store_remove(struct mosquitto_msg_store *store, bool notify)
|
||||
HASH_DELETE(hh, db.msg_store, store);
|
||||
db.msg_store_count--;
|
||||
db.msg_store_bytes -= store->payloadlen;
|
||||
if(db.shutdown == false || notify == true){
|
||||
plugin_persist__handle_msg_delete(store);
|
||||
if(notify == true){
|
||||
plugin_persist__handle_msg_remove(store);
|
||||
}
|
||||
db__msg_store_free(store);
|
||||
}
|
||||
|
||||
@@ -21,10 +21,10 @@ _mosquitto_kick_client_by_username
|
||||
_mosquitto_log_printf
|
||||
_mosquitto_malloc
|
||||
_mosquitto_persist_client_add
|
||||
_mosquitto_persist_client_delete
|
||||
_mosquitto_persist_client_msg_add
|
||||
_mosquitto_persist_client_msg_delete
|
||||
_mosquitto_persist_client_msg_update
|
||||
_mosquitto_persist_client_delete
|
||||
_mosquitto_persist_client_update
|
||||
_mosquitto_persist_msg_add
|
||||
_mosquitto_persist_msg_delete
|
||||
@@ -53,6 +53,7 @@ _mosquitto_pub_topic_check
|
||||
_mosquitto_realloc
|
||||
_mosquitto_set_username
|
||||
_mosquitto_strdup
|
||||
_mosquitto_string_to_property_info
|
||||
_mosquitto_sub_matches_acl
|
||||
_mosquitto_sub_matches_acl_with_pattern
|
||||
_mosquitto_sub_topic_check
|
||||
|
||||
@@ -54,6 +54,7 @@
|
||||
mosquitto_realloc;
|
||||
mosquitto_set_username;
|
||||
mosquitto_strdup;
|
||||
mosquitto_string_to_property_info;
|
||||
mosquitto_sub_matches_acl;
|
||||
mosquitto_sub_matches_acl_with_pattern;
|
||||
mosquitto_sub_topic_check;
|
||||
|
||||
@@ -90,6 +90,9 @@ void plugin_persist__handle_client_update(struct mosquitto *context)
|
||||
if(db.shutdown) return;
|
||||
|
||||
UNUSED(will); /* FIXME */
|
||||
|
||||
if(db.shutdown) return;
|
||||
|
||||
opts = &db.config->security_options;
|
||||
memset(&event_data, 0, sizeof(event_data));
|
||||
event_data.client_id = context->id;
|
||||
|
||||
@@ -539,9 +539,13 @@ int mosquitto_persist_client_msg_add(struct mosquitto_evt_persist_client_msg *cl
|
||||
struct mosquitto *context;
|
||||
struct mosquitto_msg_store *stored;
|
||||
|
||||
if(client_msg == NULL || client_msg->plugin_client_id == NULL) return MOSQ_ERR_INVAL;
|
||||
if(client_msg == NULL || client_msg->plugin_client_id == NULL){
|
||||
free(client_msg->plugin_client_id);
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
|
||||
HASH_FIND(hh_id, db.contexts_by_id, client_msg->plugin_client_id, strlen(client_msg->plugin_client_id), context);
|
||||
free(client_msg->plugin_client_id);
|
||||
if(context == NULL){
|
||||
return MOSQ_ERR_NOT_FOUND;
|
||||
}
|
||||
|
||||
14
src/retain.c
14
src/retain.c
@@ -71,22 +71,22 @@ int retain__init(void)
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
int mosquitto_persist_retain_add(struct mosquitto_evt_persist_retain *msg)
|
||||
int mosquitto_persist_retain_add(const char *topic, uint64_t store_id)
|
||||
{
|
||||
struct mosquitto_msg_store *stored;
|
||||
int rc = MOSQ_ERR_UNKNOWN;
|
||||
char **split_topics = NULL;
|
||||
char *local_topic = NULL;
|
||||
|
||||
if(msg == NULL || msg->plugin_topic == NULL) return MOSQ_ERR_INVAL;
|
||||
if(topic == NULL) return MOSQ_ERR_INVAL;
|
||||
|
||||
HASH_FIND(hh, db.msg_store, &msg->store_id, sizeof(msg->store_id), stored);
|
||||
HASH_FIND(hh, db.msg_store, &store_id, sizeof(store_id), stored);
|
||||
if(stored){
|
||||
if(sub__topic_tokenise(msg->plugin_topic, &local_topic, &split_topics, NULL)) return MOSQ_ERR_NOMEM;
|
||||
if(sub__topic_tokenise(topic, &local_topic, &split_topics, NULL)) return MOSQ_ERR_NOMEM;
|
||||
|
||||
rc = retain__store(msg->plugin_topic, stored, split_topics, false);
|
||||
mosquitto__FREE(split_topics);
|
||||
mosquitto__FREE(local_topic);
|
||||
rc = retain__store(topic, stored, split_topics, false);
|
||||
mosquitto__free(split_topics);
|
||||
mosquitto__free(local_topic);
|
||||
}
|
||||
|
||||
return rc;
|
||||
|
||||
@@ -24,11 +24,18 @@ qos = 2
|
||||
connect_packet = mosq_test.gen_connect(client_id, proto_ver=proto_ver, clean_session=False)
|
||||
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
|
||||
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
|
||||
|
||||
mid = 1
|
||||
publish_packet = mosq_test.gen_publish(topic=topic, qos=qos, payload="message", mid=mid, proto_ver=proto_ver)
|
||||
pubrec_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
|
||||
pubrel_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
|
||||
pubcomp_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
|
||||
publish1_packet = mosq_test.gen_publish(topic=topic, qos=qos, payload="message1", mid=mid, proto_ver=proto_ver)
|
||||
pubrec1_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
|
||||
pubrel1_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
|
||||
pubcomp1_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
|
||||
|
||||
mid = 2
|
||||
publish2_packet = mosq_test.gen_publish(topic=topic, qos=qos, payload="message2", mid=mid, proto_ver=proto_ver)
|
||||
pubrec2_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
|
||||
pubrel2_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
|
||||
pubcomp2_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
|
||||
|
||||
connect_packet_helper = mosq_test.gen_connect(helper_id, proto_ver=proto_ver, clean_session=True)
|
||||
subscribe_packet = mosq_test.gen_subscribe(mid, topic, qos=qos, proto_ver=proto_ver)
|
||||
@@ -39,8 +46,9 @@ broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=Tr
|
||||
con = None
|
||||
try:
|
||||
# Connect client, start flow, disconnect
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port)
|
||||
mosq_test.do_send_receive(sock, publish_packet, pubrec_packet, "pubrec send")
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port, connack_error="connack 1")
|
||||
mosq_test.do_send_receive(sock, publish1_packet, pubrec1_packet, "pubrec1 send")
|
||||
mosq_test.do_send_receive(sock, publish2_packet, pubrec2_packet, "pubrec2 send")
|
||||
sock.close()
|
||||
|
||||
# Kill broker
|
||||
@@ -51,15 +59,21 @@ try:
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
# Connect helper and subscribe
|
||||
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port)
|
||||
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port, connack_error="helper connack")
|
||||
mosq_test.do_send_receive(helper, subscribe_packet, suback_packet, "suback helper")
|
||||
|
||||
# Complete the flow
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port)
|
||||
mosq_test.do_send_receive(sock, pubrel_packet, pubcomp_packet, "pubrel send")
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port, connack_error="connack 2")
|
||||
mosq_test.do_send_receive(sock, pubrel1_packet, pubcomp1_packet, "pubrel1 send")
|
||||
mosq_test.do_send_receive(sock, pubrel2_packet, pubcomp2_packet, "pubrel2 send")
|
||||
|
||||
mosq_test.do_receive_send(helper, publish_packet, pubrec_packet, "pubrec receive")
|
||||
mosq_test.do_receive_send(helper, pubrel_packet, pubcomp_packet, "pubcomp receive")
|
||||
mosq_test.expect_packet(helper, "publish1 receive", publish1_packet)
|
||||
mosq_test.expect_packet(helper, "publish2 receive", publish2_packet)
|
||||
helper.send(pubrec1_packet)
|
||||
mosq_test.do_receive_send(helper, pubrel1_packet, pubcomp1_packet, "pubcomp1 receive")
|
||||
|
||||
helper.send(pubrec2_packet)
|
||||
mosq_test.do_receive_send(helper, pubrel2_packet, pubcomp2_packet, "pubcomp2 receive")
|
||||
|
||||
rc = 0
|
||||
finally:
|
||||
|
||||
92
test/broker/15-persist-client-msg-in-v5-0.py
Executable file
92
test/broker/15-persist-client-msg-in-v5-0.py
Executable file
@@ -0,0 +1,92 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Connect a client, start a QoS 2 flow, disconnect, restore, carry on with the
|
||||
# QoS 2 flow. Is it received?
|
||||
|
||||
from mosq_test_helper import *
|
||||
import persist_help
|
||||
|
||||
port = mosq_test.get_port()
|
||||
conf_file = os.path.basename(__file__).replace('.py', '.conf')
|
||||
persist_help.write_config(conf_file, port)
|
||||
|
||||
rc = 1
|
||||
|
||||
persist_help.init(port)
|
||||
|
||||
client_id = "persist-client-msg-in-v5-0"
|
||||
proto_ver = 5
|
||||
|
||||
helper_id = "persist-client-msg-in-v5-0-helper"
|
||||
topic = "client-msg-in/2"
|
||||
qos = 2
|
||||
|
||||
connect_props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60)
|
||||
connect_packet = mosq_test.gen_connect(client_id, proto_ver=proto_ver, clean_session=False, properties=connect_props)
|
||||
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
|
||||
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
|
||||
|
||||
mid = 1
|
||||
publish1_packet = mosq_test.gen_publish(topic=topic, qos=qos, payload="message1", mid=mid, proto_ver=proto_ver)
|
||||
pubrec1_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
|
||||
pubrel1_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
|
||||
pubcomp1_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
|
||||
|
||||
mid = 2
|
||||
publish2_packet = mosq_test.gen_publish(topic=topic, qos=qos, payload="message2", mid=mid, proto_ver=proto_ver)
|
||||
pubrec2_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
|
||||
pubrel2_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
|
||||
pubcomp2_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
|
||||
|
||||
connect_packet_helper = mosq_test.gen_connect(helper_id, proto_ver=proto_ver, clean_session=True)
|
||||
subscribe_packet = mosq_test.gen_subscribe(mid, topic, qos=qos, proto_ver=proto_ver)
|
||||
suback_packet = mosq_test.gen_suback(mid=mid, qos=qos, proto_ver=proto_ver)
|
||||
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
con = None
|
||||
try:
|
||||
# Connect client, start flow, disconnect
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port, connack_error="connack 1")
|
||||
mosq_test.do_send_receive(sock, publish1_packet, pubrec1_packet, "pubrec1 send")
|
||||
mosq_test.do_send_receive(sock, publish2_packet, pubrec2_packet, "pubrec2 send")
|
||||
sock.close()
|
||||
|
||||
# Kill broker
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
|
||||
# Restart broker
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
# Connect helper and subscribe
|
||||
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port, connack_error="helper connack")
|
||||
mosq_test.do_send_receive(helper, subscribe_packet, suback_packet, "suback helper")
|
||||
|
||||
# Complete the flow
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port, connack_error="connack 2")
|
||||
mosq_test.do_send_receive(sock, pubrel1_packet, pubcomp1_packet, "pubrel1 send")
|
||||
mosq_test.do_send_receive(sock, pubrel2_packet, pubcomp2_packet, "pubrel2 send")
|
||||
|
||||
mosq_test.expect_packet(helper, "publish1 receive", publish1_packet)
|
||||
mosq_test.expect_packet(helper, "publish2 receive", publish2_packet)
|
||||
helper.send(pubrec1_packet)
|
||||
mosq_test.do_receive_send(helper, pubrel1_packet, pubcomp1_packet, "pubcomp1 receive")
|
||||
|
||||
helper.send(pubrec2_packet)
|
||||
mosq_test.do_receive_send(helper, pubrel2_packet, pubcomp2_packet, "pubcomp2 receive")
|
||||
|
||||
rc = 0
|
||||
finally:
|
||||
if broker is not None:
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
(stdo, stde) = broker.communicate()
|
||||
os.remove(conf_file)
|
||||
rc += persist_help.cleanup(port)
|
||||
|
||||
if rc:
|
||||
print(stde.decode('utf-8'))
|
||||
|
||||
|
||||
exit(rc)
|
||||
102
test/broker/15-persist-client-msg-out-queue-v3-1-1.py
Executable file
102
test/broker/15-persist-client-msg-out-queue-v3-1-1.py
Executable file
@@ -0,0 +1,102 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Connect a client, add a subscription, disconnect, send a message with a
|
||||
# different client, restore, reconnect, check it is received.
|
||||
|
||||
from mosq_test_helper import *
|
||||
import persist_help
|
||||
|
||||
port = mosq_test.get_port()
|
||||
conf_file = os.path.basename(__file__).replace('.py', '.conf')
|
||||
persist_help.write_config(conf_file, port)
|
||||
|
||||
rc = 1
|
||||
|
||||
persist_help.init(port)
|
||||
|
||||
keepalive = 10
|
||||
client_id = "persist-client-msg-v3-1-1"
|
||||
proto_ver = 4
|
||||
|
||||
helper_id = "persist-client-msg-v3-1-1-helper"
|
||||
topic0 = "client-msg/0"
|
||||
topic1 = "client-msg/1"
|
||||
topic2 = "client-msg/2"
|
||||
|
||||
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False)
|
||||
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
|
||||
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
|
||||
mid = 1
|
||||
subscribe_packet0 = mosq_test.gen_subscribe(mid, topic0, qos=0, proto_ver=proto_ver)
|
||||
suback_packet0 = mosq_test.gen_suback(mid=mid, qos=0, proto_ver=proto_ver)
|
||||
subscribe_packet1 = mosq_test.gen_subscribe(mid, topic1, qos=1, proto_ver=proto_ver)
|
||||
suback_packet1 = mosq_test.gen_suback(mid=mid, qos=1, proto_ver=proto_ver)
|
||||
subscribe_packet2 = mosq_test.gen_subscribe(mid, topic2, qos=2, proto_ver=proto_ver)
|
||||
suback_packet2 = mosq_test.gen_suback(mid=mid, qos=2, proto_ver=proto_ver)
|
||||
|
||||
connect_packet_helper = mosq_test.gen_connect(helper_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True)
|
||||
publish_packet0 = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver)
|
||||
mid = 1
|
||||
publish_packet1 = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver)
|
||||
puback_packet = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
|
||||
mid = 2
|
||||
publish_packet2 = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver)
|
||||
pubrec_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
|
||||
pubrel_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
|
||||
pubcomp_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
|
||||
|
||||
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
con = None
|
||||
try:
|
||||
# Connect client, subscribe, disconnect
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port)
|
||||
mosq_test.do_send_receive(sock, subscribe_packet0, suback_packet0, "suback 0")
|
||||
mosq_test.do_send_receive(sock, subscribe_packet1, suback_packet1, "suback 1")
|
||||
mosq_test.do_send_receive(sock, subscribe_packet2, suback_packet2, "suback 2")
|
||||
sock.close()
|
||||
|
||||
# Connect helper and publish
|
||||
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port)
|
||||
helper.send(publish_packet0)
|
||||
mosq_test.do_send_receive(helper, publish_packet1, puback_packet, "puback helper")
|
||||
mosq_test.do_send_receive(helper, publish_packet2, pubrec_packet, "pubrec helper")
|
||||
mosq_test.do_send_receive(helper, pubrel_packet, pubcomp_packet, "pubcomp helper")
|
||||
helper.close()
|
||||
|
||||
# Kill broker
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
|
||||
# Restart broker
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
# Connect client again, it should have a session
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port)
|
||||
|
||||
# Does the client get the messages
|
||||
mosq_test.do_receive_send(sock, publish_packet1, puback_packet, "publish 1")
|
||||
mosq_test.do_receive_send(sock, publish_packet2, pubrec_packet, "publish 2")
|
||||
mosq_test.do_receive_send(sock, pubrel_packet, pubcomp_packet, "pubrel 2")
|
||||
sock.close()
|
||||
|
||||
# Connect client again, it should have a session
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port)
|
||||
# If there are messages, the ping will fail
|
||||
mosq_test.do_ping(sock)
|
||||
|
||||
rc = 0
|
||||
finally:
|
||||
if broker is not None:
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
(stdo, stde) = broker.communicate()
|
||||
os.remove(conf_file)
|
||||
rc += persist_help.cleanup(port)
|
||||
|
||||
if rc:
|
||||
print(stde.decode('utf-8'))
|
||||
|
||||
|
||||
exit(rc)
|
||||
102
test/broker/15-persist-client-msg-out-v5-0.py
Executable file
102
test/broker/15-persist-client-msg-out-v5-0.py
Executable file
@@ -0,0 +1,102 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Connect a client, add a subscription, disconnect, send a message with a
|
||||
# different client, restore, reconnect, check it is received.
|
||||
|
||||
from mosq_test_helper import *
|
||||
import persist_help
|
||||
|
||||
port = mosq_test.get_port()
|
||||
conf_file = os.path.basename(__file__).replace('.py', '.conf')
|
||||
persist_help.write_config(conf_file, port)
|
||||
|
||||
rc = 1
|
||||
|
||||
persist_help.init(port)
|
||||
|
||||
client_id = "persist-client-msg-v5-0"
|
||||
proto_ver = 5
|
||||
|
||||
helper_id = "persist-client-msg-v5-0-helper"
|
||||
topic0 = "client-msg/0"
|
||||
topic1 = "client-msg/1"
|
||||
topic2 = "client-msg/2"
|
||||
|
||||
connect_props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60)
|
||||
connect_packet = mosq_test.gen_connect(client_id, proto_ver=proto_ver, clean_session=False, properties=connect_props)
|
||||
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
|
||||
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
|
||||
mid = 1
|
||||
subscribe_packet0 = mosq_test.gen_subscribe(mid, topic0, qos=0, proto_ver=proto_ver)
|
||||
suback_packet0 = mosq_test.gen_suback(mid=mid, qos=0, proto_ver=proto_ver)
|
||||
subscribe_packet1 = mosq_test.gen_subscribe(mid, topic1, qos=1, proto_ver=proto_ver)
|
||||
suback_packet1 = mosq_test.gen_suback(mid=mid, qos=1, proto_ver=proto_ver)
|
||||
subscribe_packet2 = mosq_test.gen_subscribe(mid, topic2, qos=2, proto_ver=proto_ver)
|
||||
suback_packet2 = mosq_test.gen_suback(mid=mid, qos=2, proto_ver=proto_ver)
|
||||
|
||||
connect_packet_helper = mosq_test.gen_connect(helper_id, proto_ver=proto_ver, clean_session=True)
|
||||
publish_packet0 = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver)
|
||||
mid = 1
|
||||
publish_packet1 = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver)
|
||||
puback_packet = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
|
||||
mid = 2
|
||||
publish_packet2 = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver)
|
||||
pubrec_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
|
||||
pubrel_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
|
||||
pubcomp_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
|
||||
|
||||
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
con = None
|
||||
try:
|
||||
# Connect client, subscribe, disconnect
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port, connack_error="connack 1")
|
||||
mosq_test.do_send_receive(sock, subscribe_packet0, suback_packet0, "suback 0")
|
||||
mosq_test.do_send_receive(sock, subscribe_packet1, suback_packet1, "suback 1")
|
||||
mosq_test.do_send_receive(sock, subscribe_packet2, suback_packet2, "suback 2")
|
||||
sock.close()
|
||||
|
||||
# Connect helper and publish
|
||||
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port, connack_error="helper connack")
|
||||
helper.send(publish_packet0)
|
||||
mosq_test.do_send_receive(helper, publish_packet1, puback_packet, "puback helper")
|
||||
mosq_test.do_send_receive(helper, publish_packet2, pubrec_packet, "pubrec helper")
|
||||
mosq_test.do_send_receive(helper, pubrel_packet, pubcomp_packet, "pubcomp helper")
|
||||
helper.close()
|
||||
|
||||
# Kill broker
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
|
||||
# Restart broker
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
# Connect client again, it should have a session
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port, connack_error="connack 2")
|
||||
|
||||
# Does the client get the messages
|
||||
mosq_test.do_receive_send(sock, publish_packet1, puback_packet, "publish 1")
|
||||
mosq_test.do_receive_send(sock, publish_packet2, pubrec_packet, "publish 2")
|
||||
mosq_test.do_receive_send(sock, pubrel_packet, pubcomp_packet, "pubrel 2")
|
||||
sock.close()
|
||||
|
||||
# Connect client again, it should have a session
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port, connack_error="connack 3")
|
||||
# If there are messages, the ping will fail
|
||||
mosq_test.do_ping(sock)
|
||||
|
||||
rc = 0
|
||||
finally:
|
||||
if broker is not None:
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
(stdo, stde) = broker.communicate()
|
||||
os.remove(conf_file)
|
||||
rc += persist_help.cleanup(port)
|
||||
|
||||
if rc:
|
||||
print(stde.decode('utf-8'))
|
||||
|
||||
|
||||
exit(rc)
|
||||
0
test/broker/15-persist-client-v3-1-1.conf
Normal file
0
test/broker/15-persist-client-v3-1-1.conf
Normal file
83
test/broker/15-persist-client-v5.0.py
Executable file
83
test/broker/15-persist-client-v5.0.py
Executable file
@@ -0,0 +1,83 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Connect a client, check it is restored, clear the client, check it is not there.
|
||||
|
||||
from mosq_test_helper import *
|
||||
import persist_help
|
||||
|
||||
port = mosq_test.get_port()
|
||||
conf_file = os.path.basename(__file__).replace('.py', '.conf')
|
||||
persist_help.write_config(conf_file, port)
|
||||
|
||||
rc = 1
|
||||
|
||||
persist_help.init(port)
|
||||
|
||||
keepalive = 10
|
||||
client_id = "persist-client-v5-0"
|
||||
proto_ver = 5
|
||||
|
||||
connect_props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60)
|
||||
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False, properties=connect_props)
|
||||
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
|
||||
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
|
||||
|
||||
connect_packet_clean = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True)
|
||||
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
con = None
|
||||
try:
|
||||
# Connect client
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port, connack_error="connack 1")
|
||||
mosq_test.do_ping(sock)
|
||||
sock.close()
|
||||
|
||||
# Kill broker
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
|
||||
# Restart broker
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
# Connect client again, it should have a session
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port, connack_error="connack 2")
|
||||
mosq_test.do_ping(sock)
|
||||
sock.close()
|
||||
|
||||
# Clear the client
|
||||
sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet1, timeout=5, port=port, connack_error="connack 3")
|
||||
mosq_test.do_ping(sock)
|
||||
sock.close()
|
||||
|
||||
# Connect client, it should not have a session
|
||||
sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet1, timeout=5, port=port, connack_error="connack 4")
|
||||
mosq_test.do_ping(sock)
|
||||
sock.close()
|
||||
|
||||
# Kill broker
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
|
||||
# Restart broker
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
# Connect client, it should not have a session
|
||||
sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet1, timeout=5, port=port, connack_error="connack 5")
|
||||
mosq_test.do_ping(sock)
|
||||
sock.close()
|
||||
|
||||
rc = 0
|
||||
finally:
|
||||
if broker is not None:
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
(stdo, stde) = broker.communicate()
|
||||
os.remove(conf_file)
|
||||
rc += persist_help.cleanup(port)
|
||||
|
||||
if rc:
|
||||
print(stde.decode('utf-8'))
|
||||
|
||||
|
||||
exit(rc)
|
||||
86
test/broker/15-persist-publish-properties-v5-0.py
Executable file
86
test/broker/15-persist-publish-properties-v5-0.py
Executable file
@@ -0,0 +1,86 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Publish a retained messages, check they are restored, with properties attached
|
||||
|
||||
from mosq_test_helper import *
|
||||
import persist_help
|
||||
|
||||
port = mosq_test.get_port()
|
||||
conf_file = os.path.basename(__file__).replace('.py', '.conf')
|
||||
persist_help.write_config(conf_file, port)
|
||||
|
||||
rc = 1
|
||||
|
||||
persist_help.init(port)
|
||||
|
||||
topic = "test/retainprop"
|
||||
source_id = "persist-retain-properties-v5-0"
|
||||
qos = 0
|
||||
proto_ver = 5
|
||||
connect_packet = mosq_test.gen_connect(source_id, proto_ver=proto_ver, clean_session=True)
|
||||
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
|
||||
|
||||
props = mqtt5_props.gen_byte_prop(mqtt5_props.PROP_PAYLOAD_FORMAT_INDICATOR, 1)
|
||||
props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_CONTENT_TYPE, "plain/text")
|
||||
props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_RESPONSE_TOPIC, "/dev/null")
|
||||
#props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_CORRELATION_DATA, "2357289375902345")
|
||||
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value")
|
||||
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value")
|
||||
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value")
|
||||
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value")
|
||||
publish_packet = mosq_test.gen_publish(topic, qos=qos, payload="retained message 1", retain=True, proto_ver=proto_ver, properties=props)
|
||||
|
||||
mid = 1
|
||||
subscribe_packet = mosq_test.gen_subscribe(mid, "test/retainprop", 0, proto_ver=proto_ver)
|
||||
suback_packet = mosq_test.gen_suback(mid, qos=0, proto_ver=proto_ver)
|
||||
|
||||
mid = 2
|
||||
unsubscribe_packet = mosq_test.gen_unsubscribe(mid, "test/retainprop", proto_ver=proto_ver)
|
||||
unsuback_packet = mosq_test.gen_unsuback(mid, proto_ver=proto_ver)
|
||||
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
try:
|
||||
# Connect client
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
|
||||
# Check no retained messages exist
|
||||
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
|
||||
# Ping will fail if a PUBLISH is received
|
||||
mosq_test.do_ping(sock)
|
||||
# Unsubscribe, so we don't receive the messages
|
||||
mosq_test.do_send_receive(sock, unsubscribe_packet, unsuback_packet, "unsuback")
|
||||
|
||||
# Send some retained messages
|
||||
sock.send(publish_packet)
|
||||
mosq_test.do_ping(sock)
|
||||
sock.close()
|
||||
|
||||
# Kill broker
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
|
||||
# Restart broker
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
# Connect client
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
|
||||
# Subscribe
|
||||
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
|
||||
# Check retained messages exist
|
||||
mosq_test.expect_packet(sock, "publish", publish_packet)
|
||||
mosq_test.do_ping(sock)
|
||||
|
||||
rc = 0
|
||||
finally:
|
||||
if broker is not None:
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
(stdo, stde) = broker.communicate()
|
||||
os.remove(conf_file)
|
||||
rc += persist_help.cleanup(port)
|
||||
|
||||
if rc:
|
||||
print(stde.decode('utf-8'))
|
||||
|
||||
|
||||
exit(rc)
|
||||
88
test/broker/15-persist-retain-v5-0.py
Executable file
88
test/broker/15-persist-retain-v5-0.py
Executable file
@@ -0,0 +1,88 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Publish a retained messages, check they are restored
|
||||
|
||||
from mosq_test_helper import *
|
||||
import persist_help
|
||||
|
||||
port = mosq_test.get_port()
|
||||
conf_file = os.path.basename(__file__).replace('.py', '.conf')
|
||||
persist_help.write_config(conf_file, port)
|
||||
|
||||
rc = 1
|
||||
|
||||
persist_help.init(port)
|
||||
|
||||
topic1 = "test/retain1"
|
||||
topic2 = "test/retain2"
|
||||
topic3 = "test/retain3"
|
||||
source_id = "persist-retain-v5-0"
|
||||
qos = 0
|
||||
payload2 = "retained message 2"
|
||||
payload3 = "retained message 3"
|
||||
proto_ver = 5
|
||||
connect_packet = mosq_test.gen_connect(source_id, proto_ver=proto_ver, clean_session=True)
|
||||
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
|
||||
|
||||
publish1_packet = mosq_test.gen_publish(topic1, qos=qos, payload="retained message 1", retain=True, proto_ver=proto_ver)
|
||||
publish2_packet = mosq_test.gen_publish(topic2, qos=qos, payload=payload2, retain=False, proto_ver=proto_ver)
|
||||
publish3_packet = mosq_test.gen_publish(topic3, qos=qos, payload=payload3, retain=True, proto_ver=proto_ver)
|
||||
|
||||
mid = 1
|
||||
subscribe_packet = mosq_test.gen_subscribe(mid, "#", 0, proto_ver=proto_ver)
|
||||
suback_packet = mosq_test.gen_suback(mid, qos=0, proto_ver=proto_ver)
|
||||
|
||||
mid = 2
|
||||
unsubscribe_packet = mosq_test.gen_unsubscribe(mid, "#", proto_ver=proto_ver)
|
||||
unsuback_packet = mosq_test.gen_unsuback(mid, proto_ver=proto_ver)
|
||||
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
try:
|
||||
# Connect client
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
|
||||
# Check no retained messages exist
|
||||
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
|
||||
# Ping will fail if a PUBLISH is received
|
||||
mosq_test.do_ping(sock)
|
||||
# Unsubscribe, so we don't receive the messages
|
||||
mosq_test.do_send_receive(sock, unsubscribe_packet, unsuback_packet, "unsuback")
|
||||
|
||||
# Send some retained messages
|
||||
sock.send(publish1_packet)
|
||||
mosq_test.do_ping(sock)
|
||||
sock.send(publish2_packet) # Not retained
|
||||
mosq_test.do_ping(sock)
|
||||
sock.send(publish3_packet)
|
||||
mosq_test.do_ping(sock)
|
||||
sock.close()
|
||||
|
||||
# Kill broker
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
|
||||
# Restart broker
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
# Connect client
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
|
||||
# Subscribe
|
||||
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
|
||||
# Check retained messages exist
|
||||
mosq_test.receive_unordered(sock, publish1_packet, publish3_packet, "publish 1 / 3")
|
||||
mosq_test.do_ping(sock)
|
||||
|
||||
rc = 0
|
||||
finally:
|
||||
if broker is not None:
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
(stdo, stde) = broker.communicate()
|
||||
os.remove(conf_file)
|
||||
rc += persist_help.cleanup(port)
|
||||
|
||||
if rc:
|
||||
print(stde.decode('utf-8'))
|
||||
|
||||
|
||||
exit(rc)
|
||||
@@ -6,6 +6,18 @@
|
||||
from mosq_test_helper import *
|
||||
import persist_help
|
||||
|
||||
def helper(port, packets):
|
||||
helper_id = "persist-subscription-v3-1-1-helper"
|
||||
connect_packet_helper = mosq_test.gen_connect(helper_id, proto_ver=4, clean_session=True)
|
||||
|
||||
# Connect helper and publish
|
||||
helper = mosq_test.do_client_connect(connect_packet_helper, packets["connack1"], timeout=5, port=port)
|
||||
helper.send(packets["publish0"])
|
||||
mosq_test.do_send_receive(helper, packets["publish1"], packets["puback1"], "puback helper")
|
||||
mosq_test.do_send_receive(helper, packets["publish2"], packets["pubrec2"], "pubrec helper")
|
||||
mosq_test.do_send_receive(helper, packets["pubrel2"], packets["pubcomp2"], "pubcomp helper")
|
||||
helper.close()
|
||||
|
||||
port = mosq_test.get_port()
|
||||
conf_file = os.path.basename(__file__).replace('.py', '.conf')
|
||||
persist_help.write_config(conf_file, port)
|
||||
@@ -17,35 +29,34 @@ persist_help.init(port)
|
||||
client_id = "persist-subscription-v3-1-1"
|
||||
proto_ver = 4
|
||||
|
||||
helper_id = "persist-subscription-v3-1-1-helper"
|
||||
topic0 = "subscription/0"
|
||||
topic1 = "subscription/1"
|
||||
topic2 = "subscription/2"
|
||||
|
||||
connect_packet = mosq_test.gen_connect(client_id, proto_ver=proto_ver, clean_session=False)
|
||||
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
|
||||
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
|
||||
packets = {}
|
||||
packets["connect"] = mosq_test.gen_connect(client_id, proto_ver=proto_ver, clean_session=False)
|
||||
packets["connack1"] = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
|
||||
packets["connack2"] = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
|
||||
mid = 1
|
||||
subscribe_packet0 = mosq_test.gen_subscribe(mid, topic0, qos=0, proto_ver=proto_ver)
|
||||
suback_packet0 = mosq_test.gen_suback(mid=mid, qos=0, proto_ver=proto_ver)
|
||||
subscribe_packet1 = mosq_test.gen_subscribe(mid, topic1, qos=1, proto_ver=proto_ver)
|
||||
suback_packet1 = mosq_test.gen_suback(mid=mid, qos=1, proto_ver=proto_ver)
|
||||
subscribe_packet2 = mosq_test.gen_subscribe(mid, topic2, qos=2, proto_ver=proto_ver)
|
||||
suback_packet2 = mosq_test.gen_suback(mid=mid, qos=2, proto_ver=proto_ver)
|
||||
packets["subscribe0"] = mosq_test.gen_subscribe(mid, topic0, qos=0, proto_ver=proto_ver)
|
||||
packets["suback0"] = mosq_test.gen_suback(mid=mid, qos=0, proto_ver=proto_ver)
|
||||
packets["subscribe1"] = mosq_test.gen_subscribe(mid, topic1, qos=1, proto_ver=proto_ver)
|
||||
packets["suback1"] = mosq_test.gen_suback(mid=mid, qos=1, proto_ver=proto_ver)
|
||||
packets["subscribe2"] = mosq_test.gen_subscribe(mid, topic2, qos=2, proto_ver=proto_ver)
|
||||
packets["suback2"] = mosq_test.gen_suback(mid=mid, qos=2, proto_ver=proto_ver)
|
||||
|
||||
unsubscribe_packet2 = mosq_test.gen_unsubscribe(mid, topic2, proto_ver=proto_ver)
|
||||
unsuback_packet2 = mosq_test.gen_unsuback(mid=mid, proto_ver=proto_ver)
|
||||
packets["unsubscribe2"] = mosq_test.gen_unsubscribe(mid, topic2, proto_ver=proto_ver)
|
||||
packets["unsuback2"] = mosq_test.gen_unsuback(mid=mid, proto_ver=proto_ver)
|
||||
|
||||
connect_packet_helper = mosq_test.gen_connect(helper_id, proto_ver=proto_ver, clean_session=True)
|
||||
publish_packet0 = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver)
|
||||
packets["publish0"] = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver)
|
||||
mid = 1
|
||||
publish_packet1 = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver)
|
||||
puback_packet = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
|
||||
packets["publish1"] = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver)
|
||||
packets["puback1"] = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
|
||||
mid = 2
|
||||
publish_packet2 = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver)
|
||||
pubrec_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
|
||||
pubrel_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
|
||||
pubcomp_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
|
||||
packets["publish2"] = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver)
|
||||
packets["pubrec2"] = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
|
||||
packets["pubrel2"] = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
|
||||
packets["pubcomp2"] = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
|
||||
|
||||
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
@@ -53,10 +64,10 @@ broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=Tr
|
||||
con = None
|
||||
try:
|
||||
# Connect client
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port)
|
||||
mosq_test.do_send_receive(sock, subscribe_packet0, suback_packet0, "suback 0")
|
||||
mosq_test.do_send_receive(sock, subscribe_packet1, suback_packet1, "suback 1")
|
||||
mosq_test.do_send_receive(sock, subscribe_packet2, suback_packet2, "suback 2")
|
||||
sock = mosq_test.do_client_connect(packets["connect"], packets["connack1"], timeout=5, port=port)
|
||||
mosq_test.do_send_receive(sock, packets["subscribe0"], packets["suback0"], "suback 0")
|
||||
mosq_test.do_send_receive(sock, packets["subscribe1"], packets["suback1"], "suback 1")
|
||||
mosq_test.do_send_receive(sock, packets["subscribe2"], packets["suback2"], "suback 2")
|
||||
sock.close()
|
||||
|
||||
# Kill broker
|
||||
@@ -67,25 +78,19 @@ try:
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
# Connect client again, it should have a session
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port)
|
||||
sock = mosq_test.do_client_connect(packets["connect"], packets["connack2"], timeout=5, port=port)
|
||||
mosq_test.do_ping(sock)
|
||||
|
||||
# Connect helper and publish
|
||||
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port)
|
||||
helper.send(publish_packet0)
|
||||
mosq_test.do_send_receive(helper, publish_packet1, puback_packet, "puback helper")
|
||||
mosq_test.do_send_receive(helper, publish_packet2, pubrec_packet, "pubrec helper")
|
||||
mosq_test.do_send_receive(helper, pubrel_packet, pubcomp_packet, "pubcomp helper")
|
||||
helper.close()
|
||||
helper(port, packets)
|
||||
|
||||
# Does the client get the messages
|
||||
mosq_test.expect_packet(sock, "publish 0", publish_packet0)
|
||||
mosq_test.do_receive_send(sock, publish_packet1, puback_packet, "publish 1")
|
||||
mosq_test.do_receive_send(sock, publish_packet2, pubrec_packet, "publish 2")
|
||||
mosq_test.do_receive_send(sock, pubrel_packet, pubcomp_packet, "pubrel 2")
|
||||
mosq_test.expect_packet(sock, "publish 0", packets["publish0"])
|
||||
mosq_test.do_receive_send(sock, packets["publish1"], packets["puback1"], "publish 1")
|
||||
mosq_test.do_receive_send(sock, packets["publish2"], packets["pubrec2"], "publish 2")
|
||||
mosq_test.do_receive_send(sock, packets["pubrel2"], packets["pubcomp2"], "pubrel 2")
|
||||
|
||||
# Unsubscribe
|
||||
mosq_test.do_send_receive(sock, unsubscribe_packet2, unsuback_packet2, "unsuback 2")
|
||||
mosq_test.do_send_receive(sock, packets["unsubscribe2"], packets["unsuback2"], "unsuback 2")
|
||||
sock.close()
|
||||
|
||||
# Kill broker
|
||||
@@ -96,20 +101,15 @@ try:
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
# Connect client again, it should have a session
|
||||
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port)
|
||||
sock = mosq_test.do_client_connect(packets["connect"], packets["connack2"], timeout=5, port=port)
|
||||
mosq_test.do_ping(sock)
|
||||
|
||||
# Connect helper and publish
|
||||
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port)
|
||||
helper.send(publish_packet0)
|
||||
mosq_test.do_send_receive(helper, publish_packet1, puback_packet, "puback helper")
|
||||
mosq_test.do_send_receive(helper, publish_packet2, pubrec_packet, "pubrec helper")
|
||||
mosq_test.do_send_receive(helper, pubrel_packet, pubcomp_packet, "pubcomp helper")
|
||||
helper.close()
|
||||
helper(port, packets)
|
||||
|
||||
# Does the client get the messages
|
||||
mosq_test.expect_packet(sock, "publish 0", publish_packet0)
|
||||
mosq_test.do_receive_send(sock, publish_packet1, puback_packet, "publish 1")
|
||||
mosq_test.expect_packet(sock, "publish 0", packets["publish0"])
|
||||
mosq_test.do_receive_send(sock, packets["publish1"], packets["puback1"], "publish 1")
|
||||
mosq_test.do_ping(sock)
|
||||
|
||||
rc = 0
|
||||
|
||||
139
test/broker/15-persist-subscription-v5-0.py
Executable file
139
test/broker/15-persist-subscription-v5-0.py
Executable file
@@ -0,0 +1,139 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Connect a client, add a subscription, disconnect, restore, reconnect, send a
|
||||
# message with a different client, check it is received.
|
||||
|
||||
from mosq_test_helper import *
|
||||
import persist_help
|
||||
|
||||
def helper(port, packets):
|
||||
helper_id = "persist-subscription-v5-0-helper"
|
||||
connect_packet_helper = mosq_test.gen_connect(helper_id, proto_ver=5, clean_session=True)
|
||||
connack_packet_helper = mosq_test.gen_connack(rc=0, proto_ver=5)
|
||||
|
||||
# Connect helper and publish
|
||||
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet_helper, timeout=5, port=port, connack_error="helper connack")
|
||||
helper.send(packets["publish0-helper"])
|
||||
mosq_test.do_send_receive(helper, packets["publish1-helper"], packets["puback1"], "puback helper")
|
||||
mosq_test.do_send_receive(helper, packets["publish2-helper"], packets["pubrec2"], "pubrec helper")
|
||||
mosq_test.do_send_receive(helper, packets["pubrel2"], packets["pubcomp2"], "pubcomp helper")
|
||||
helper.close()
|
||||
|
||||
port = mosq_test.get_port()
|
||||
conf_file = os.path.basename(__file__).replace('.py', '.conf')
|
||||
persist_help.write_config(conf_file, port)
|
||||
|
||||
rc = 1
|
||||
|
||||
persist_help.init(port)
|
||||
|
||||
client_id = "persist-subscription-v5-0"
|
||||
proto_ver = 5
|
||||
|
||||
topic0 = "subscription/0"
|
||||
topic1 = "subscription/1"
|
||||
topic2 = "subscription/2"
|
||||
|
||||
packets = {}
|
||||
connect_props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60)
|
||||
packets["connect"] = mosq_test.gen_connect(client_id, proto_ver=proto_ver, clean_session=False, properties=connect_props)
|
||||
packets["connack1"] = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
|
||||
packets["connack2"] = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
|
||||
mid = 1
|
||||
|
||||
publish_props0 = mqtt5_props.gen_varint_prop(mqtt5_props.PROP_SUBSCRIPTION_IDENTIFIER, 100)
|
||||
packets["subscribe0"] = mosq_test.gen_subscribe(mid, topic0, qos=0, proto_ver=proto_ver, properties=publish_props0)
|
||||
packets["suback0"] = mosq_test.gen_suback(mid=mid, qos=0, proto_ver=proto_ver)
|
||||
|
||||
publish_props1 = mqtt5_props.gen_varint_prop(mqtt5_props.PROP_SUBSCRIPTION_IDENTIFIER, 101)
|
||||
packets["subscribe1"] = mosq_test.gen_subscribe(mid, topic1, qos=1, proto_ver=proto_ver, properties=publish_props1)
|
||||
packets["suback1"] = mosq_test.gen_suback(mid=mid, qos=1, proto_ver=proto_ver)
|
||||
|
||||
publish_props2 = mqtt5_props.gen_varint_prop(mqtt5_props.PROP_SUBSCRIPTION_IDENTIFIER, 102)
|
||||
packets["subscribe2"] = mosq_test.gen_subscribe(mid, topic2, qos=2, proto_ver=proto_ver, properties=publish_props2)
|
||||
packets["suback2"] = mosq_test.gen_suback(mid=mid, qos=2, proto_ver=proto_ver)
|
||||
|
||||
packets["unsubscribe2"] = mosq_test.gen_unsubscribe(mid, topic2, proto_ver=proto_ver)
|
||||
packets["unsuback2"] = mosq_test.gen_unsuback(mid=mid, proto_ver=proto_ver)
|
||||
|
||||
packets["publish0-helper"] = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver)
|
||||
packets["publish0"] = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver, properties=publish_props0)
|
||||
mid = 1
|
||||
packets["publish1-helper"] = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver)
|
||||
packets["publish1"] = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver, properties=publish_props1)
|
||||
packets["puback1"] = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
|
||||
mid = 2
|
||||
packets["publish2-helper"] = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver)
|
||||
packets["publish2"] = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver, properties=publish_props2)
|
||||
packets["pubrec2"] = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
|
||||
packets["pubrel2"] = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
|
||||
packets["pubcomp2"] = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
|
||||
|
||||
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
con = None
|
||||
try:
|
||||
# Connect client
|
||||
sock = mosq_test.do_client_connect(packets["connect"], packets["connack1"], timeout=5, port=port, connack_error="connack1")
|
||||
mosq_test.do_send_receive(sock, packets["subscribe0"], packets["suback0"], "suback 0")
|
||||
mosq_test.do_send_receive(sock, packets["subscribe1"], packets["suback1"], "suback 1")
|
||||
mosq_test.do_send_receive(sock, packets["subscribe2"], packets["suback2"], "suback 2")
|
||||
sock.close()
|
||||
|
||||
# Kill broker
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
|
||||
# Restart broker
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
# Connect client again, it should have a session
|
||||
sock = mosq_test.do_client_connect(packets["connect"], packets["connack2"], timeout=5, port=port)
|
||||
mosq_test.do_ping(sock)
|
||||
|
||||
helper(port, packets)
|
||||
|
||||
# Does the client get the messages
|
||||
mosq_test.expect_packet(sock, "publish 0", packets["publish0"])
|
||||
mosq_test.do_receive_send(sock, packets["publish1"], packets["puback1"], "publish 1")
|
||||
mosq_test.do_receive_send(sock, packets["publish2"], packets["pubrec2"], "publish 2")
|
||||
mosq_test.do_receive_send(sock, packets["pubrel2"], packets["pubcomp2"], "pubrel 2")
|
||||
|
||||
# Unsubscribe
|
||||
mosq_test.do_send_receive(sock, packets["unsubscribe2"], packets["unsuback2"], "unsuback 2")
|
||||
sock.close()
|
||||
|
||||
# Kill broker
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
|
||||
# Restart broker
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
|
||||
|
||||
# Connect client again, it should have a session
|
||||
sock = mosq_test.do_client_connect(packets["connect"], packets["connack2"], timeout=5, port=port, connack_error="connack2")
|
||||
mosq_test.do_ping(sock)
|
||||
|
||||
# Connect helper and publish
|
||||
helper(port, packets)
|
||||
|
||||
# Does the client get the messages
|
||||
mosq_test.expect_packet(sock, "publish 0", packets["publish0"])
|
||||
mosq_test.do_receive_send(sock, packets["publish1"], packets["puback1"], "publish 1")
|
||||
mosq_test.do_ping(sock)
|
||||
|
||||
rc = 0
|
||||
finally:
|
||||
if broker is not None:
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
(stdo, stde) = broker.communicate()
|
||||
os.remove(conf_file)
|
||||
rc += persist_help.cleanup(port)
|
||||
|
||||
if rc:
|
||||
print(stde.decode('utf-8'))
|
||||
|
||||
|
||||
exit(rc)
|
||||
@@ -253,11 +253,17 @@ endif
|
||||
endif
|
||||
|
||||
15 :
|
||||
./15-persist-client-v3-1-1.py
|
||||
./15-persist-subscription-v3-1-1.py
|
||||
./15-persist-retain-v3-1-1.py
|
||||
./15-persist-client-msg-in-v3-1-1.py
|
||||
./15-persist-client-msg-out-v3-1-1.py
|
||||
#./15-persist-client-msg-in-v5-0.py
|
||||
#./15-persist-client-msg-out-queue-v3-1-1.py
|
||||
#./15-persist-client-msg-out-v3-1-1.py
|
||||
#./15-persist-client-msg-out-v5-0.py
|
||||
#./15-persist-client-v3-1-1.py
|
||||
#./15-persist-client-v5.0.py
|
||||
#./15-persist-publish-properties-v5-0.py
|
||||
#./15-persist-retain-v3-1-1.py
|
||||
#./15-persist-retain-v5-0.py
|
||||
#./15-persist-subscription-v3-1-1.py
|
||||
#./15-persist-subscription-v5-0.py
|
||||
|
||||
16 :
|
||||
./16-cmd-args.py
|
||||
|
||||
@@ -213,11 +213,17 @@ tests = [
|
||||
(1, './14-dynsec-role.py'),
|
||||
(1, './14-dynsec-role-invalid.py'),
|
||||
|
||||
#(1, './15-persist-client-msg-in-v3-1-1.py'),
|
||||
#(1, './15-persist-client-msg-in-v5-0.py'),
|
||||
#(1, './15-persist-client-msg-out-queue-v3-1-1.py'),
|
||||
#(1, './15-persist-client-msg-out-v3-1-1.py'),
|
||||
#(1, './15-persist-client-msg-out-v5-0.py'),
|
||||
#(1, './15-persist-client-v3-1-1.py'),
|
||||
#(1, './15-persist-client-v5.0.py'),
|
||||
#(1, './15-persist-publish-properties-v5-0.py'),
|
||||
#(1, './15-persist-retain-v3-1-1.py'),
|
||||
#(1, './15-persist-retain-v5-0.py'),
|
||||
#(1, './15-persist-subscription-v3-1-1.py'),
|
||||
#(1, './15-persist-subscription-v5-0.py'),
|
||||
|
||||
(1, './16-cmd-args.py'),
|
||||
(1, './16-config-includedir.py'),
|
||||
|
||||
Reference in New Issue
Block a user