From 1a8c44b84f23ea819eca72df0ca6ac869cacb461 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 24 Sep 2019 11:54:05 +0100 Subject: [PATCH] Fix regression on use of `mosquitto_connect_async()` not working. Closes #1415 and #1422. Thanks to Karl Palsson, Till Zimmermann and Liam Fry. --- ChangeLog.txt | 2 + lib/connect.c | 41 +++++--------- lib/handle_connack.c | 2 +- lib/handle_ping.c | 12 ++--- lib/handle_pubackcomp.c | 7 +-- lib/handle_publish.c | 7 +-- lib/handle_pubrec.c | 4 +- lib/handle_pubrel.c | 4 +- lib/handle_suback.c | 8 ++- lib/handle_unsuback.c | 4 +- lib/loop.c | 31 ++++------- lib/mosquitto_internal.h | 3 +- lib/packet_mosq.c | 12 ++--- lib/socks_mosq.c | 39 +++++++------- lib/srv_mosq.c | 4 +- lib/thread_mosq.c | 9 +--- lib/util_mosq.c | 39 +++++++++++--- lib/util_mosq.h | 3 ++ src/bridge.c | 8 +-- src/context.c | 13 ++--- src/database.c | 4 +- src/handle_auth.c | 8 +-- src/handle_connack.c | 2 +- src/handle_connect.c | 6 +-- src/handle_disconnect.c | 5 +- src/handle_publish.c | 2 +- src/handle_subscribe.c | 2 +- src/handle_unsubscribe.c | 2 +- src/loop.c | 6 +-- src/mosquitto_broker_internal.h | 1 - src/security_default.c | 12 ++--- test/lib/Makefile | 2 + test/lib/c/02-subscribe-qos1-async1.c | 78 +++++++++++++++++++++++++++ test/lib/c/02-subscribe-qos1-async2.c | 74 +++++++++++++++++++++++++ test/lib/c/Makefile | 2 + test/lib/test.py | 2 + 36 files changed, 300 insertions(+), 160 deletions(-) create mode 100644 test/lib/c/02-subscribe-qos1-async1.c create mode 100644 test/lib/c/02-subscribe-qos1-async2.c diff --git a/ChangeLog.txt b/ChangeLog.txt index c1b4a4f6..1855980b 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -8,6 +8,8 @@ Client library: - Don't use `/` in autogenerated client ids, to avoid confusing with topics. - Fix `mosquitto_max_inflight_messages_set()` and `mosquitto_int_option(..., MOSQ_OPT_*_MAX, ...)` behaviour. Closes #1417. +- Fix regression on use of `mosquitto_connect_async()` not working. + Closes #1415 and #1422. Clients: - mosquitto_sub: Fix `-E` incorrectly not working unless `-d` was also diff --git a/lib/connect.c b/lib/connect.c index 78b655fb..fd5c4979 100644 --- a/lib/connect.c +++ b/lib/connect.c @@ -119,9 +119,7 @@ int mosquitto_connect_bind_v5(struct mosquitto *mosq, const char *host, int port rc = mosquitto__connect_init(mosq, host, port, keepalive, bind_address); if(rc) return rc; - pthread_mutex_lock(&mosq->state_mutex); - mosq->state = mosq_cs_new; - pthread_mutex_unlock(&mosq->state_mutex); + mosquitto__set_state(mosq, mosq_cs_new); return mosquitto__reconnect(mosq, true, properties); } @@ -138,10 +136,6 @@ int mosquitto_connect_bind_async(struct mosquitto *mosq, const char *host, int p int rc = mosquitto__connect_init(mosq, host, port, keepalive, bind_address); if(rc) return rc; - pthread_mutex_lock(&mosq->state_mutex); - mosq->state = mosq_cs_connect_async; - pthread_mutex_unlock(&mosq->state_mutex); - return mosquitto__reconnect(mosq, false, NULL); } @@ -163,6 +157,7 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos const mosquitto_property *outgoing_properties = NULL; mosquitto_property local_property; int rc; + if(!mosq) return MOSQ_ERR_INVAL; if(!mosq->host || mosq->port <= 0) return MOSQ_ERR_INVAL; if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED; @@ -180,17 +175,6 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos if(rc) return rc; } - pthread_mutex_lock(&mosq->state_mutex); -#ifdef WITH_SOCKS - if(mosq->socks5_host){ - mosq->state = mosq_cs_socks5_new; - }else -#endif - { - mosq->state = mosq_cs_new; - } - pthread_mutex_unlock(&mosq->state_mutex); - pthread_mutex_lock(&mosq->msgtime_mutex); mosq->last_msg_in = mosquitto_time(); mosq->next_msg_out = mosq->last_msg_in + mosq->keepalive; @@ -217,19 +201,23 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos rc = net__socket_connect(mosq, mosq->host, mosq->port, mosq->bind_address, blocking); } if(rc>0){ + mosquitto__set_state(mosq, mosq_cs_connect_pending); return rc; } #ifdef WITH_SOCKS if(mosq->socks5_host){ + mosquitto__set_state(mosq, mosq_cs_socks5_new); return socks5__send(mosq); }else #endif { + mosquitto__set_state(mosq, mosq_cs_connected); rc = send__connect(mosq, mosq->keepalive, mosq->clean_start, outgoing_properties); if(rc){ packet__cleanup_all(mosq); net__socket_close(mosq); + mosquitto__set_state(mosq, mosq_cs_new); } return rc; } @@ -262,21 +250,18 @@ int mosquitto_disconnect_v5(struct mosquitto *mosq, int reason_code, const mosqu if(rc) return rc; } - pthread_mutex_lock(&mosq->state_mutex); - mosq->state = mosq_cs_disconnecting; - pthread_mutex_unlock(&mosq->state_mutex); - - if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; - return send__disconnect(mosq, reason_code, outgoing_properties); + mosquitto__set_state(mosq, mosq_cs_disconnected); + if(mosq->sock == INVALID_SOCKET){ + return MOSQ_ERR_NO_CONN; + }else{ + return send__disconnect(mosq, reason_code, outgoing_properties); + } } void do_client_disconnect(struct mosquitto *mosq, int reason_code, const mosquitto_property *properties) { - pthread_mutex_lock(&mosq->state_mutex); - mosq->state = mosq_cs_disconnecting; - pthread_mutex_unlock(&mosq->state_mutex); - + mosquitto__set_state(mosq, mosq_cs_disconnected); net__socket_close(mosq); /* Free data and reset values */ diff --git a/lib/handle_connack.c b/lib/handle_connack.c index ab66ff0f..379da156 100644 --- a/lib/handle_connack.c +++ b/lib/handle_connack.c @@ -111,7 +111,7 @@ int handle__connack(struct mosquitto *mosq) case 0: pthread_mutex_lock(&mosq->state_mutex); if(mosq->state != mosq_cs_disconnecting){ - mosq->state = mosq_cs_connected; + mosq->state = mosq_cs_active; } pthread_mutex_unlock(&mosq->state_mutex); message__retry_check(mosq); diff --git a/lib/handle_ping.c b/lib/handle_ping.c index 69355be8..8482336c 100644 --- a/lib/handle_ping.c +++ b/lib/handle_ping.c @@ -37,9 +37,12 @@ Contributors: int handle__pingreq(struct mosquitto *mosq) { + int state; + assert(mosq); - if(mosq->state != mosq_cs_connected){ + state = mosquitto__get_state(mosq); + if(state != mosq_cs_active){ return MOSQ_ERR_PROTOCOL; } @@ -57,11 +60,8 @@ int handle__pingresp(struct mosquitto *mosq) assert(mosq); - pthread_mutex_lock(&mosq->state_mutex); - state = mosq->state; - pthread_mutex_unlock(&mosq->state_mutex); - - if(state != mosq_cs_connected){ + state = mosquitto__get_state(mosq); + if(state != mosq_cs_active){ return MOSQ_ERR_PROTOCOL; } diff --git a/lib/handle_pubackcomp.c b/lib/handle_pubackcomp.c index 774e948c..5bf3d004 100644 --- a/lib/handle_pubackcomp.c +++ b/lib/handle_pubackcomp.c @@ -51,11 +51,8 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type) assert(mosq); - pthread_mutex_lock(&mosq->state_mutex); - state = mosq->state; - pthread_mutex_unlock(&mosq->state_mutex); - - if(state != mosq_cs_connected){ + state = mosquitto__get_state(mosq); + if(state != mosq_cs_active){ return MOSQ_ERR_PROTOCOL; } diff --git a/lib/handle_publish.c b/lib/handle_publish.c index 485742fa..909fbaf5 100644 --- a/lib/handle_publish.c +++ b/lib/handle_publish.c @@ -44,11 +44,8 @@ int handle__publish(struct mosquitto *mosq) assert(mosq); - pthread_mutex_lock(&mosq->state_mutex); - state = mosq->state; - pthread_mutex_unlock(&mosq->state_mutex); - - if(state != mosq_cs_connected){ + state = mosquitto__get_state(mosq); + if(state != mosq_cs_active){ return MOSQ_ERR_PROTOCOL; } diff --git a/lib/handle_pubrec.c b/lib/handle_pubrec.c index 81f1b496..fda76f04 100644 --- a/lib/handle_pubrec.c +++ b/lib/handle_pubrec.c @@ -41,10 +41,12 @@ int handle__pubrec(struct mosquitto_db *db, struct mosquitto *mosq) uint16_t mid; int rc; mosquitto_property *properties = NULL; + int state; assert(mosq); - if(mosq->state != mosq_cs_connected){ + state = mosquitto__get_state(mosq); + if(state != mosq_cs_active){ return MOSQ_ERR_PROTOCOL; } diff --git a/lib/handle_pubrel.c b/lib/handle_pubrel.c index 209a3e0d..c33d4cbd 100644 --- a/lib/handle_pubrel.c +++ b/lib/handle_pubrel.c @@ -45,10 +45,12 @@ int handle__pubrel(struct mosquitto_db *db, struct mosquitto *mosq) #endif int rc; mosquitto_property *properties = NULL; + int state; assert(mosq); - if(mosq->state != mosq_cs_connected){ + state = mosquitto__get_state(mosq); + if(state != mosq_cs_active){ return MOSQ_ERR_PROTOCOL; } diff --git a/lib/handle_suback.c b/lib/handle_suback.c index 8dfdab63..0973e7f2 100644 --- a/lib/handle_suback.c +++ b/lib/handle_suback.c @@ -29,6 +29,7 @@ Contributors: #include "mqtt_protocol.h" #include "packet_mosq.h" #include "property_mosq.h" +#include "util_mosq.h" int handle__suback(struct mosquitto *mosq) @@ -44,11 +45,8 @@ int handle__suback(struct mosquitto *mosq) assert(mosq); - pthread_mutex_lock(&mosq->state_mutex); - state = mosq->state; - pthread_mutex_unlock(&mosq->state_mutex); - - if(state != mosq_cs_connected){ + state = mosquitto__get_state(mosq); + if(state != mosq_cs_active){ return MOSQ_ERR_PROTOCOL; } diff --git a/lib/handle_unsuback.c b/lib/handle_unsuback.c index 3ced0e52..06f9c16d 100644 --- a/lib/handle_unsuback.c +++ b/lib/handle_unsuback.c @@ -42,10 +42,12 @@ int handle__unsuback(struct mosquitto *mosq) uint16_t mid; int rc; mosquitto_property *properties = NULL; + int state; assert(mosq); - if(mosq->state != mosq_cs_connected){ + state = mosquitto__get_state(mosq); + if(state != mosq_cs_active){ return MOSQ_ERR_PROTOCOL; } diff --git a/lib/loop.c b/lib/loop.c index 0f751a9c..1903feda 100644 --- a/lib/loop.c +++ b/lib/loop.c @@ -83,17 +83,15 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets) }else{ #ifdef WITH_SRV if(mosq->achan){ - pthread_mutex_lock(&mosq->state_mutex); - if(mosq->state == mosq_cs_connect_srv){ + state = mosquitto__get_state(mosq); + if(state == mosq_cs_connect_srv){ rc = ares_fds(mosq->achan, &readfds, &writefds); if(rc > maxfd){ maxfd = rc; } }else{ - pthread_mutex_unlock(&mosq->state_mutex); return MOSQ_ERR_NO_CONN; } - pthread_mutex_unlock(&mosq->state_mutex); } #else return MOSQ_ERR_NO_CONN; @@ -198,15 +196,12 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets) #ifndef WIN32 struct timespec req, rem; #endif + int state; if(!mosq) return MOSQ_ERR_INVAL; mosq->reconnects = 0; - if(mosq->state == mosq_cs_connect_async){ - mosquitto_reconnect(mosq); - } - while(run){ do{ rc = mosquitto_loop(mosq, timeout, max_packets); @@ -234,13 +229,10 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets) } do{ rc = MOSQ_ERR_SUCCESS; - pthread_mutex_lock(&mosq->state_mutex); - if(mosq->state == mosq_cs_disconnecting){ + state = mosquitto__get_state(mosq); + if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){ run = 0; - pthread_mutex_unlock(&mosq->state_mutex); }else{ - pthread_mutex_unlock(&mosq->state_mutex); - if(mosq->reconnect_delay_max > mosq->reconnect_delay){ if(mosq->reconnect_exponential_backoff){ reconnect_delay = mosq->reconnect_delay*(mosq->reconnects+1)*(mosq->reconnects+1); @@ -267,12 +259,10 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets) } #endif - pthread_mutex_lock(&mosq->state_mutex); - if(mosq->state == mosq_cs_disconnecting){ + state = mosquitto__get_state(mosq); + if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){ run = 0; - pthread_mutex_unlock(&mosq->state_mutex); }else{ - pthread_mutex_unlock(&mosq->state_mutex); rc = mosquitto_reconnect(mosq); } } @@ -293,13 +283,14 @@ int mosquitto_loop_misc(struct mosquitto *mosq) static int mosquitto__loop_rc_handle(struct mosquitto *mosq, int rc) { + int state; + if(rc){ net__socket_close(mosq); - pthread_mutex_lock(&mosq->state_mutex); - if(mosq->state == mosq_cs_disconnecting){ + state = mosquitto__get_state(mosq); + if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){ rc = MOSQ_ERR_SUCCESS; } - pthread_mutex_unlock(&mosq->state_mutex); pthread_mutex_lock(&mosq->callback_mutex); if(mosq->on_disconnect){ mosq->in_callback = true; diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index 5280a77f..de640144 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -94,7 +94,7 @@ enum mosquitto_client_state { mosq_cs_new = 0, mosq_cs_connected = 1, mosq_cs_disconnecting = 2, - mosq_cs_connect_async = 3, + mosq_cs_active = 3, mosq_cs_connect_pending = 4, mosq_cs_connect_srv = 5, mosq_cs_disconnect_ws = 6, @@ -107,7 +107,6 @@ enum mosquitto_client_state { mosq_cs_socks5_userpass_reply = 13, mosq_cs_socks5_send_userpass = 14, mosq_cs_expiring = 15, - mosq_cs_connecting = 16, mosq_cs_duplicate = 17, /* client that has been taken over by another with the same id */ mosq_cs_disconnect_with_will = 18, mosq_cs_disused = 19, /* client that has been added to the disused list to be freed */ diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c index 101e2f5a..f9c46b6b 100644 --- a/lib/packet_mosq.c +++ b/lib/packet_mosq.c @@ -34,6 +34,7 @@ Contributors: #include "net_mosq.h" #include "packet_mosq.h" #include "read_handle.h" +#include "util_mosq.h" #ifdef WITH_BROKER # include "sys_tree.h" # include "send_mosq.h" @@ -218,10 +219,7 @@ int packet__write(struct mosquitto *mosq) } pthread_mutex_unlock(&mosq->out_packet_mutex); - pthread_mutex_lock(&mosq->state_mutex); - state = mosq->state; - pthread_mutex_unlock(&mosq->state_mutex); - + state = mosquitto__get_state(mosq); #if defined(WITH_TLS) && !defined(WITH_BROKER) if((state == mosq_cs_connect_pending) || mosq->want_connect){ #else @@ -329,10 +327,8 @@ int packet__read(struct mosquitto *mosq) if(mosq->sock == INVALID_SOCKET){ return MOSQ_ERR_NO_CONN; } - pthread_mutex_lock(&mosq->state_mutex); - state = mosq->state; - pthread_mutex_unlock(&mosq->state_mutex); + state = mosquitto__get_state(mosq); if(state == mosq_cs_connect_pending){ return MOSQ_ERR_SUCCESS; } @@ -358,7 +354,7 @@ int packet__read(struct mosquitto *mosq) #ifdef WITH_BROKER G_BYTES_RECEIVED_INC(1); /* Clients must send CONNECT as their first command. */ - if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CMD_CONNECT){ + if(!(mosq->bridge) && mosq->state == mosq_cs_connected && (byte&0xF0) != CMD_CONNECT){ return MOSQ_ERR_PROTOCOL; } #endif diff --git a/lib/socks_mosq.c b/lib/socks_mosq.c index 0df58a4b..78959ecf 100644 --- a/lib/socks_mosq.c +++ b/lib/socks_mosq.c @@ -112,8 +112,11 @@ int socks5__send(struct mosquitto *mosq) struct in6_addr addr_ipv6; int ipv4_pton_result; int ipv6_pton_result; + int state; - if(mosq->state == mosq_cs_socks5_new){ + state = mosquitto__get_state(mosq); + + if(state == mosq_cs_socks5_new){ packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet)); if(!packet) return MOSQ_ERR_NOMEM; @@ -134,9 +137,7 @@ int socks5__send(struct mosquitto *mosq) packet->payload[2] = SOCKS_AUTH_NONE; } - pthread_mutex_lock(&mosq->state_mutex); - mosq->state = mosq_cs_socks5_start; - pthread_mutex_unlock(&mosq->state_mutex); + mosquitto__set_state(mosq, mosq_cs_socks5_start); mosq->in_packet.pos = 0; mosq->in_packet.packet_length = 2; @@ -149,7 +150,7 @@ int socks5__send(struct mosquitto *mosq) } return packet__queue(mosq, packet); - }else if(mosq->state == mosq_cs_socks5_auth_ok){ + }else if(state == mosq_cs_socks5_auth_ok){ packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet)); if(!packet) return MOSQ_ERR_NOMEM; @@ -201,9 +202,7 @@ int socks5__send(struct mosquitto *mosq) packet->payload[1] = 0x01; packet->payload[2] = 0x00; - pthread_mutex_lock(&mosq->state_mutex); - mosq->state = mosq_cs_socks5_request; - pthread_mutex_unlock(&mosq->state_mutex); + mosquitto__set_state(mosq, mosq_cs_socks5_request); mosq->in_packet.pos = 0; mosq->in_packet.packet_length = 5; @@ -216,7 +215,7 @@ int socks5__send(struct mosquitto *mosq) } return packet__queue(mosq, packet); - }else if(mosq->state == mosq_cs_socks5_send_userpass){ + }else if(state == mosq_cs_socks5_send_userpass){ packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet)); if(!packet) return MOSQ_ERR_NOMEM; @@ -232,9 +231,7 @@ int socks5__send(struct mosquitto *mosq) packet->payload[2+ulen] = plen; memcpy(&(packet->payload[3+ulen]), mosq->socks5_password, plen); - pthread_mutex_lock(&mosq->state_mutex); - mosq->state = mosq_cs_socks5_userpass_reply; - pthread_mutex_unlock(&mosq->state_mutex); + mosquitto__set_state(mosq, mosq_cs_socks5_userpass_reply); mosq->in_packet.pos = 0; mosq->in_packet.packet_length = 2; @@ -256,8 +253,10 @@ int socks5__read(struct mosquitto *mosq) ssize_t len; uint8_t *payload; uint8_t i; + int state; - if(mosq->state == mosq_cs_socks5_start){ + state = mosquitto__get_state(mosq); + if(state == mosq_cs_socks5_start){ while(mosq->in_packet.to_process > 0){ len = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); if(len > 0){ @@ -289,17 +288,17 @@ int socks5__read(struct mosquitto *mosq) switch(mosq->in_packet.payload[1]){ case SOCKS_AUTH_NONE: packet__cleanup(&mosq->in_packet); - mosq->state = mosq_cs_socks5_auth_ok; + mosquitto__set_state(mosq, mosq_cs_socks5_auth_ok); return socks5__send(mosq); case SOCKS_AUTH_USERPASS: packet__cleanup(&mosq->in_packet); - mosq->state = mosq_cs_socks5_send_userpass; + mosquitto__set_state(mosq, mosq_cs_socks5_send_userpass); return socks5__send(mosq); default: packet__cleanup(&mosq->in_packet); return MOSQ_ERR_AUTH; } - }else if(mosq->state == mosq_cs_socks5_userpass_reply){ + }else if(state == mosq_cs_socks5_userpass_reply){ while(mosq->in_packet.to_process > 0){ len = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); if(len > 0){ @@ -330,7 +329,7 @@ int socks5__read(struct mosquitto *mosq) } if(mosq->in_packet.payload[1] == 0){ packet__cleanup(&mosq->in_packet); - mosq->state = mosq_cs_socks5_auth_ok; + mosquitto__set_state(mosq, mosq_cs_socks5_auth_ok); return socks5__send(mosq); }else{ i = mosq->in_packet.payload[1]; @@ -355,7 +354,7 @@ int socks5__read(struct mosquitto *mosq) } return MOSQ_ERR_PROXY; } - }else if(mosq->state == mosq_cs_socks5_request){ + }else if(state == mosq_cs_socks5_request){ while(mosq->in_packet.to_process > 0){ len = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); if(len > 0){ @@ -423,7 +422,7 @@ int socks5__read(struct mosquitto *mosq) if(mosq->in_packet.payload[1] == 0){ /* Auth passed */ packet__cleanup(&mosq->in_packet); - mosq->state = mosq_cs_new; + mosquitto__set_state(mosq, mosq_cs_new); if(mosq->socks5_host){ int rc = net__socket_connect_step3(mosq, mosq->host); if(rc) return rc; @@ -432,7 +431,7 @@ int socks5__read(struct mosquitto *mosq) }else{ i = mosq->in_packet.payload[1]; packet__cleanup(&mosq->in_packet); - mosq->state = mosq_cs_socks5_new; + mosquitto__set_state(mosq, mosq_cs_socks5_new); switch(i){ case SOCKS_REPLY_CONNECTION_NOT_ALLOWED: return MOSQ_ERR_AUTH; diff --git a/lib/srv_mosq.c b/lib/srv_mosq.c index d08c2bd0..50f30118 100644 --- a/lib/srv_mosq.c +++ b/lib/srv_mosq.c @@ -91,9 +91,7 @@ int mosquitto_connect_srv(struct mosquitto *mosq, const char *host, int keepaliv mosquitto__free(h); } - pthread_mutex_lock(&mosq->state_mutex); - mosq->state = mosq_cs_connect_srv; - pthread_mutex_unlock(&mosq->state_mutex); + mosquitto__set_state(mosq_cs_connect_srv); mosq->keepalive = keepalive; diff --git a/lib/thread_mosq.c b/lib/thread_mosq.c index 7bde453d..d92526b3 100644 --- a/lib/thread_mosq.c +++ b/lib/thread_mosq.c @@ -22,6 +22,7 @@ Contributors: #include "mosquitto_internal.h" #include "net_mosq.h" +#include "util_mosq.h" void *mosquitto__thread_main(void *obj); @@ -89,9 +90,7 @@ void *mosquitto__thread_main(void *obj) if(!mosq) return NULL; do{ - pthread_mutex_lock(&mosq->state_mutex); - state = mosq->state; - pthread_mutex_unlock(&mosq->state_mutex); + state = mosquitto__get_state(mosq); if(state == mosq_cs_new){ #ifdef WIN32 Sleep(10); @@ -103,10 +102,6 @@ void *mosquitto__thread_main(void *obj) } }while(1); - if(state == mosq_cs_connect_async){ - mosquitto_reconnect(mosq); - } - if(!mosq->keepalive){ /* Sleep for a day if keepalive disabled. */ mosquitto_loop_forever(mosq, 1000*86400, 1); diff --git a/lib/util_mosq.c b/lib/util_mosq.c index 928d93cb..23cc910a 100644 --- a/lib/util_mosq.c +++ b/lib/util_mosq.c @@ -89,11 +89,8 @@ int mosquitto__check_keepalive(struct mosquitto *mosq) if(mosq->keepalive && mosq->sock != INVALID_SOCKET && (now >= next_msg_out || now - last_msg_in >= mosq->keepalive)){ - pthread_mutex_lock(&mosq->state_mutex); - state = mosq->state; - pthread_mutex_unlock(&mosq->state_mutex); - - if(state == mosq_cs_connected && mosq->ping_t == 0){ + state = mosquitto__get_state(mosq); + if(state == mosq_cs_active && mosq->ping_t == 0){ send__pingreq(mosq); /* Reset last msg times to give the server time to send a pingresp */ pthread_mutex_lock(&mosq->msgtime_mutex); @@ -105,13 +102,12 @@ int mosquitto__check_keepalive(struct mosquitto *mosq) net__socket_close(db, mosq); #else net__socket_close(mosq); - pthread_mutex_lock(&mosq->state_mutex); - if(mosq->state == mosq_cs_disconnecting){ + state = mosquitto__get_state(mosq); + if(state == mosq_cs_disconnecting){ rc = MOSQ_ERR_SUCCESS; }else{ rc = MOSQ_ERR_KEEPALIVE; } - pthread_mutex_unlock(&mosq->state_mutex); pthread_mutex_lock(&mosq->callback_mutex); if(mosq->on_disconnect){ mosq->in_callback = true; @@ -357,3 +353,30 @@ int util__random_bytes(void *bytes, int count) #endif return rc; } + + +int mosquitto__set_state(struct mosquitto *mosq, enum mosquitto_client_state state) +{ + pthread_mutex_lock(&mosq->state_mutex); +#ifdef WITH_BROKER + if(mosq->state != mosq_cs_disused) +#endif + { + mosq->state = state; + } + pthread_mutex_unlock(&mosq->state_mutex); + + return MOSQ_ERR_SUCCESS; +} + +enum mosquitto_client_state mosquitto__get_state(struct mosquitto *mosq) +{ + enum mosquitto_client_state state; + + pthread_mutex_lock(&mosq->state_mutex); + state = mosq->state; + pthread_mutex_unlock(&mosq->state_mutex); + + return state; +} + diff --git a/lib/util_mosq.h b/lib/util_mosq.h index 9ee570f3..a527fd62 100644 --- a/lib/util_mosq.h +++ b/lib/util_mosq.h @@ -33,6 +33,9 @@ int mosquitto__check_keepalive(struct mosquitto *mosq); uint16_t mosquitto__mid_generate(struct mosquitto *mosq); FILE *mosquitto__fopen(const char *path, const char *mode, bool restrict_read); +int mosquitto__set_state(struct mosquitto *mosq, enum mosquitto_client_state state); +enum mosquitto_client_state mosquitto__get_state(struct mosquitto *mosq); + #ifdef WITH_TLS int mosquitto__hex2bin_sha1(const char *hex, unsigned char **bin); int mosquitto__hex2bin(const char *hex, unsigned char *bin, int bin_max_len); diff --git a/src/bridge.c b/src/bridge.c index 640fb407..b18e4135 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -127,7 +127,7 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context) if(!context || !context->bridge) return MOSQ_ERR_INVAL; - context__set_state(context, mosq_cs_new); + mosquitto__set_state(context, mosq_cs_new); context->sock = INVALID_SOCKET; context->last_msg_in = mosquitto_time(); context->next_msg_out = mosquitto_time() + context->bridge->keepalive; @@ -247,7 +247,7 @@ int bridge__connect_step2(struct mosquitto_db *db, struct mosquitto *context) HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(context->sock), context); if(rc == MOSQ_ERR_CONN_PENDING){ - context__set_state(context, mosq_cs_connect_pending); + mosquitto__set_state(context, mosq_cs_connect_pending); } return rc; } @@ -306,7 +306,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context) if(!context || !context->bridge) return MOSQ_ERR_INVAL; - context__set_state(context, mosq_cs_new); + mosquitto__set_state(context, mosq_cs_new); context->sock = INVALID_SOCKET; context->last_msg_in = mosquitto_time(); context->next_msg_out = mosquitto_time() + context->bridge->keepalive; @@ -400,7 +400,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context) return rc; }else if(rc == MOSQ_ERR_CONN_PENDING){ - context__set_state(context, mosq_cs_connect_pending); + mosquitto__set_state(context, mosq_cs_connect_pending); } HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(context->sock), context); diff --git a/src/context.c b/src/context.c index f024d171..9996f8b8 100644 --- a/src/context.c +++ b/src/context.c @@ -38,7 +38,7 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock) if(!context) return NULL; context->pollfd_index = -1; - context__set_state(context, mosq_cs_new); + mosquitto__set_state(context, mosq_cs_new); context->sock = sock; context->last_msg_in = mosquitto_time(); context->next_msg_out = mosquitto_time() + 60; @@ -243,14 +243,14 @@ void context__disconnect(struct mosquitto_db *db, struct mosquitto *context) }else{ session_expiry__add(db, context); } - context__set_state(context, mosq_cs_disconnected); + mosquitto__set_state(context, mosq_cs_disconnected); } void context__add_to_disused(struct mosquitto_db *db, struct mosquitto *context) { if(context->state == mosq_cs_disused) return; - context__set_state(context, mosq_cs_disused); + mosquitto__set_state(context, mosq_cs_disused); if(context->id){ context__remove_from_by_id(db, context); @@ -308,10 +308,3 @@ void context__remove_from_by_id(struct mosquitto_db *db, struct mosquitto *conte } } - -void context__set_state(struct mosquitto *context, enum mosquitto_client_state state) -{ - if(context->state != mosq_cs_disused){ - context->state = state; - } -} diff --git a/src/database.c b/src/database.c index 715c1690..dbfc0b0f 100644 --- a/src/database.c +++ b/src/database.c @@ -964,11 +964,11 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) uint32_t expiry_interval; if(!context || context->sock == INVALID_SOCKET - || (context->state == mosq_cs_connected && !context->id)){ + || (context->state == mosq_cs_active && !context->id)){ return MOSQ_ERR_INVAL; } - if(context->state != mosq_cs_connected){ + if(context->state != mosq_cs_active){ return MOSQ_ERR_SUCCESS; } diff --git a/src/handle_auth.c b/src/handle_auth.c index 03bf4771..254a832e 100644 --- a/src/handle_auth.c +++ b/src/handle_auth.c @@ -54,7 +54,7 @@ int handle__auth(struct mosquitto_db *db, struct mosquitto *context) return MOSQ_ERR_PROTOCOL; } - if((reason_code == MQTT_RC_REAUTHENTICATE && context->state != mosq_cs_connected) + if((reason_code == MQTT_RC_REAUTHENTICATE && context->state != mosq_cs_active) || (reason_code == MQTT_RC_CONTINUE_AUTHENTICATION && context->state != mosq_cs_authenticating && context->state != mosq_cs_reauthenticating)){ @@ -94,11 +94,11 @@ int handle__auth(struct mosquitto_db *db, struct mosquitto *context) if(reason_code == MQTT_RC_REAUTHENTICATE){ /* This is a re-authentication attempt */ - context__set_state(context, mosq_cs_reauthenticating); + mosquitto__set_state(context, mosq_cs_reauthenticating); rc = mosquitto_security_auth_start(db, context, true, auth_data, auth_data_len, &auth_data_out, &auth_data_out_len); }else{ if(context->state != mosq_cs_reauthenticating){ - context__set_state(context, mosq_cs_authenticating); + mosquitto__set_state(context, mosq_cs_authenticating); } rc = mosquitto_security_auth_continue(db, context, auth_data, auth_data_len, &auth_data_out, &auth_data_out_len); } @@ -107,7 +107,7 @@ int handle__auth(struct mosquitto_db *db, struct mosquitto *context) if(context->state == mosq_cs_authenticating){ return connect__on_authorised(db, context, auth_data_out, auth_data_out_len); }else{ - context__set_state(context, mosq_cs_connected); + mosquitto__set_state(context, mosq_cs_active); rc = send__auth(db, context, MQTT_RC_SUCCESS, auth_data_out, auth_data_out_len); free(auth_data_out); return rc; diff --git a/src/handle_connack.c b/src/handle_connack.c index ac08d880..77d98934 100644 --- a/src/handle_connack.c +++ b/src/handle_connack.c @@ -108,7 +108,7 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context) } } } - context__set_state(context, mosq_cs_connected); + mosquitto__set_state(context, mosq_cs_active); return MOSQ_ERR_SUCCESS; case CONNACK_REFUSED_PROTOCOL_VERSION: if(context->bridge){ diff --git a/src/handle_connect.c b/src/handle_connect.c index 9de98dd9..17434b2b 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -166,7 +166,7 @@ int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, v found_context->clean_start = true; found_context->session_expiry_interval = 0; - context__set_state(found_context, mosq_cs_duplicate); + mosquitto__set_state(found_context, mosq_cs_duplicate); do_disconnect(db, found_context, MOSQ_ERR_SUCCESS); } @@ -266,7 +266,7 @@ int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, v } free(auth_data_out); - context__set_state(context, mosq_cs_connected); + mosquitto__set_state(context, mosq_cs_active); rc = send__connack(db, context, connect_ack, CONNACK_ACCEPTED, connack_props); mosquitto_property_free_all(&connack_props); return rc; @@ -835,7 +835,7 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context) if(rc == MOSQ_ERR_SUCCESS){ return connect__on_authorised(db, context, auth_data_out, auth_data_out_len); }else if(rc == MOSQ_ERR_AUTH_CONTINUE){ - context__set_state(context, mosq_cs_authenticating); + mosquitto__set_state(context, mosq_cs_authenticating); rc = send__auth(db, context, MQTT_RC_CONTINUE_AUTHENTICATION, auth_data_out, auth_data_out_len); free(auth_data_out); return rc; diff --git a/src/handle_disconnect.c b/src/handle_disconnect.c index 77671aca..eab7b689 100644 --- a/src/handle_disconnect.c +++ b/src/handle_disconnect.c @@ -21,6 +21,7 @@ Contributors: #include "packet_mosq.h" #include "property_mosq.h" #include "send_mosq.h" +#include "util_mosq.h" #include "will_mosq.h" @@ -65,10 +66,10 @@ int handle__disconnect(struct mosquitto_db *db, struct mosquitto *context) } } if(reason_code == MQTT_RC_DISCONNECT_WITH_WILL_MSG){ - context__set_state(context, mosq_cs_disconnect_with_will); + mosquitto__set_state(context, mosq_cs_disconnect_with_will); }else{ will__clear(context); - context__set_state(context, mosq_cs_disconnecting); + mosquitto__set_state(context, mosq_cs_disconnecting); } do_disconnect(db, context, MOSQ_ERR_SUCCESS); return MOSQ_ERR_SUCCESS; diff --git a/src/handle_publish.c b/src/handle_publish.c index a13160fa..49aaf8b3 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -61,7 +61,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) bool match; #endif - if(context->state != mosq_cs_connected){ + if(context->state != mosq_cs_active){ return MOSQ_ERR_PROTOCOL; } diff --git a/src/handle_subscribe.c b/src/handle_subscribe.c index 2f37ca98..2f3eb305 100644 --- a/src/handle_subscribe.c +++ b/src/handle_subscribe.c @@ -46,7 +46,7 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context) if(!context) return MOSQ_ERR_INVAL; - if(context->state != mosq_cs_connected){ + if(context->state != mosq_cs_active){ return MOSQ_ERR_PROTOCOL; } diff --git a/src/handle_unsubscribe.c b/src/handle_unsubscribe.c index bbf6dc80..e9f89f8c 100644 --- a/src/handle_unsubscribe.c +++ b/src/handle_unsubscribe.c @@ -39,7 +39,7 @@ int handle__unsubscribe(struct mosquitto_db *db, struct mosquitto *context) if(!context) return MOSQ_ERR_INVAL; - if(context->state != mosq_cs_connected){ + if(context->state != mosq_cs_active){ return MOSQ_ERR_PROTOCOL; } log__printf(NULL, MOSQ_LOG_DEBUG, "Received UNSUBSCRIBE from %s", context->id); diff --git a/src/loop.c b/src/loop.c index c63e9968..92673db2 100644 --- a/src/loop.c +++ b/src/loop.c @@ -489,7 +489,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li log__printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", id); G_CLIENTS_EXPIRED_INC(); context->session_expiry_interval = 0; - context__set_state(context, mosq_cs_expiring); + mosquitto__set_state(context, mosq_cs_expiring); do_disconnect(db, context, MOSQ_ERR_SUCCESS); } } @@ -660,7 +660,7 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context, int reaso } if(context->state != mosq_cs_disconnecting && context->state != mosq_cs_disconnect_with_will){ - context__set_state(context, mosq_cs_disconnect_ws); + mosquitto__set_state(context, mosq_cs_disconnect_ws); } if(context->wsi){ libwebsocket_callback_on_writable(context->ws_context, context->wsi); @@ -800,7 +800,7 @@ static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pol len = sizeof(int); if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){ if(err == 0){ - context__set_state(context, mosq_cs_new); + mosquitto__set_state(context, mosq_cs_new); #if defined(WITH_ADNS) && defined(WITH_BRIDGE) if(context->bridge){ bridge__connect_step3(db, context); diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index a0ab5a07..3c87b9aa 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -653,7 +653,6 @@ void context__add_to_disused(struct mosquitto_db *db, struct mosquitto *context) void context__free_disused(struct mosquitto_db *db); void context__send_will(struct mosquitto_db *db, struct mosquitto *context); void context__remove_from_by_id(struct mosquitto_db *db, struct mosquitto *context); -void context__set_state(struct mosquitto *context, enum mosquitto_client_state state); int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, void *auth_data_out, uint16_t auth_data_out_len); diff --git a/src/security_default.c b/src/security_default.c index 15a47fce..07b34d40 100644 --- a/src/security_default.c +++ b/src/security_default.c @@ -948,7 +948,7 @@ static void security__disconnect_auth(struct mosquitto_db *db, struct mosquitto if(context->protocol == mosq_p_mqtt5){ send__disconnect(context, MQTT_RC_ADMINISTRATIVE_ACTION, NULL); } - context__set_state(context, mosq_cs_disconnecting); + mosquitto__set_state(context, mosq_cs_disconnecting); do_disconnect(db, context, MOSQ_ERR_AUTH); } #endif @@ -1005,7 +1005,7 @@ int mosquitto_security_apply_default(struct mosquitto_db *db) } if(!allow_anonymous && !context->username){ - context__set_state(context, mosq_cs_disconnecting); + mosquitto__set_state(context, mosq_cs_disconnecting); do_disconnect(db, context, MOSQ_ERR_AUTH); continue; } @@ -1018,7 +1018,7 @@ int mosquitto_security_apply_default(struct mosquitto_db *db) if(context->protocol == mosq_p_mqtt5){ send__disconnect(context, MQTT_RC_ADMINISTRATIVE_ACTION, NULL); } - context__set_state(context, mosq_cs_disconnecting); + mosquitto__set_state(context, mosq_cs_disconnecting); do_disconnect(db, context, MOSQ_ERR_AUTH); continue; } @@ -1118,7 +1118,7 @@ int mosquitto_security_apply_default(struct mosquitto_db *db) { /* Username/password check only if the identity/subject check not used */ if(mosquitto_unpwd_check(db, context, context->username, context->password) != MOSQ_ERR_SUCCESS){ - context__set_state(context, mosq_cs_disconnecting); + mosquitto__set_state(context, mosq_cs_disconnecting); do_disconnect(db, context, MOSQ_ERR_AUTH); continue; } @@ -1130,8 +1130,8 @@ int mosquitto_security_apply_default(struct mosquitto_db *db) if(context->listener){ security_opts = &context->listener->security_options; }else{ - if(context->state != mosq_cs_connected){ - context__set_state(context, mosq_cs_disconnecting); + if(context->state != mosq_cs_active){ + mosquitto__set_state(context, mosq_cs_disconnecting); do_disconnect(db, context, MOSQ_ERR_AUTH); continue; } diff --git a/test/lib/Makefile b/test/lib/Makefile index 180e660d..e02cc5c0 100644 --- a/test/lib/Makefile +++ b/test/lib/Makefile @@ -32,6 +32,8 @@ c : test-compile ./01-will-unpwd-set.py $@/01-will-unpwd-set.test ./02-subscribe-qos0.py $@/02-subscribe-qos0.test ./02-subscribe-qos1.py $@/02-subscribe-qos1.test + ./02-subscribe-qos1.py $@/02-subscribe-qos1-async1.test + ./02-subscribe-qos1.py $@/02-subscribe-qos1-async2.test ./02-subscribe-qos2.py $@/02-subscribe-qos2.test ./02-unsubscribe-multiple-v5.py $@/02-unsubscribe-multiple-v5.test ./02-unsubscribe-v5.py $@/02-unsubscribe-v5.test diff --git a/test/lib/c/02-subscribe-qos1-async1.c b/test/lib/c/02-subscribe-qos1-async1.c new file mode 100644 index 00000000..381dbd12 --- /dev/null +++ b/test/lib/c/02-subscribe-qos1-async1.c @@ -0,0 +1,78 @@ +#include +#include +#include +#include +#include + +/* mosquitto_connect_async() test, with mosquitto_loop_start() called before mosquitto_connect_async(). */ + +static int run = -1; +static bool should_run = true; + +void on_connect(struct mosquitto *mosq, void *obj, int rc) +{ + if(rc){ + exit(1); + }else{ + mosquitto_subscribe(mosq, NULL, "qos1/test", 1); + } +} + +void on_disconnect(struct mosquitto *mosq, void *obj, int rc) +{ + run = rc; +} + +void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) +{ + //mosquitto_disconnect(mosq); + should_run = false; +} + +int main(int argc, char *argv[]) +{ + int rc; + struct mosquitto *mosq; + + int port = atoi(argv[1]); + + mosquitto_lib_init(); + + mosq = mosquitto_new("subscribe-qos1-test", true, NULL); + mosquitto_connect_callback_set(mosq, on_connect); + mosquitto_disconnect_callback_set(mosq, on_disconnect); + mosquitto_subscribe_callback_set(mosq, on_subscribe); + printf("ok, about to call connect_async\n"); + + // this only works if loop_start is first. with loop_start second, + // it fails on both 1.6.4 _and_ 1.6.5 + // in this order, 1.6.4 works and 1.6.5 fails. + rc = mosquitto_loop_start(mosq); + printf("loop_start returned rc: %d\n", rc); + if (rc) { + printf("which is: %s\n", mosquitto_strerror(rc)); + } + + // not sure which rc you want to be returned.... + rc = mosquitto_connect_async(mosq, "localhost", port, 60); + printf("connect async returned rc: %d\n", rc); + if (rc) { + printf("which is: %s\n", mosquitto_strerror(rc)); + } + + printf("ok, so we can start just waiting now, loop_start will run in it's thread\n"); + /* 10 millis to be system polite */ + //struct timespec tv = { 0, 10e6 }; + struct timespec tv = { 1, 0 }; + while(should_run){ + nanosleep(&tv, NULL); + printf("...waiting...\n"); + } + printf("Already exited should_run....\n"); + + mosquitto_disconnect(mosq); + mosquitto_loop_stop(mosq, false); + + mosquitto_lib_cleanup(); + return run; +} diff --git a/test/lib/c/02-subscribe-qos1-async2.c b/test/lib/c/02-subscribe-qos1-async2.c new file mode 100644 index 00000000..ffcfaff9 --- /dev/null +++ b/test/lib/c/02-subscribe-qos1-async2.c @@ -0,0 +1,74 @@ +#include +#include +#include +#include +#include + +/* mosquitto_connect_async() test, with mosquitto_loop_start() called after mosquitto_connect_async(). */ + +static int run = -1; +static bool should_run = true; + +void on_connect(struct mosquitto *mosq, void *obj, int rc) +{ + if(rc){ + exit(1); + }else{ + mosquitto_subscribe(mosq, NULL, "qos1/test", 1); + } +} + +void on_disconnect(struct mosquitto *mosq, void *obj, int rc) +{ + run = rc; +} + +void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) +{ + //mosquitto_disconnect(mosq); + should_run = false; +} + +int main(int argc, char *argv[]) +{ + int rc; + struct mosquitto *mosq; + + int port = atoi(argv[1]); + + mosquitto_lib_init(); + + mosq = mosquitto_new("subscribe-qos1-test", true, NULL); + mosquitto_connect_callback_set(mosq, on_connect); + mosquitto_disconnect_callback_set(mosq, on_disconnect); + mosquitto_subscribe_callback_set(mosq, on_subscribe); + printf("ok, about to call connect_async\n"); + + rc = mosquitto_connect_async(mosq, "localhost", port, 60); + printf("connect async returned rc: %d\n", rc); + if (rc) { + printf("which is: %s\n", mosquitto_strerror(rc)); + } + + rc = mosquitto_loop_start(mosq); + printf("loop_start returned rc: %d\n", rc); + if (rc) { + printf("which is: %s\n", mosquitto_strerror(rc)); + } + + printf("ok, so we can start just waiting now, loop_start will run in it's thread\n"); + /* 10 millis to be system polite */ + //struct timespec tv = { 0, 10e6 }; + struct timespec tv = { 1, 0 }; + while(should_run){ + nanosleep(&tv, NULL); + printf("...waiting...\n"); + } + printf("Already exited should_run....\n"); + + mosquitto_disconnect(mosq); + mosquitto_loop_stop(mosq, false); + + mosquitto_lib_cleanup(); + return run; +} diff --git a/test/lib/c/Makefile b/test/lib/c/Makefile index 4eed7da0..b42a768f 100644 --- a/test/lib/c/Makefile +++ b/test/lib/c/Makefile @@ -13,6 +13,8 @@ SRC = \ 01-server-keepalive-pingreq.c \ 02-subscribe-qos0.c \ 02-subscribe-qos1.c \ + 02-subscribe-qos1-async1.c \ + 02-subscribe-qos1-async2.c \ 02-subscribe-qos2.c \ 02-unsubscribe.c \ 02-unsubscribe-v5.c \ diff --git a/test/lib/test.py b/test/lib/test.py index 2bc84643..d61b01c0 100755 --- a/test/lib/test.py +++ b/test/lib/test.py @@ -14,6 +14,8 @@ tests = [ (1, ['./02-subscribe-qos0.py', 'c/02-subscribe-qos0.test']), (1, ['./02-subscribe-qos1.py', 'c/02-subscribe-qos1.test']), + (1, ['./02-subscribe-qos1.py', 'c/02-subscribe-qos1-async1.test']), + (1, ['./02-subscribe-qos1.py', 'c/02-subscribe-qos1-async2.test']), (1, ['./02-subscribe-qos2.py', 'c/02-subscribe-qos2.test']), (1, ['./02-unsubscribe-multiple-v5.py', 'c/02-unsubscribe-multiple-v5.test']), (1, ['./02-unsubscribe-v5.py', 'c/02-unsubscribe-v5.test']),