Fix regression on use of mosquitto_connect_async() not working.

Closes #1415 and #1422. Thanks to Karl Palsson, Till Zimmermann and Liam Fry.
This commit is contained in:
Roger A. Light
2019-09-24 11:54:05 +01:00
parent 4b6cc208e2
commit 1a8c44b84f
36 changed files with 300 additions and 160 deletions

View File

@@ -8,6 +8,8 @@ Client library:
- Don't use `/` in autogenerated client ids, to avoid confusing with topics.
- Fix `mosquitto_max_inflight_messages_set()` and `mosquitto_int_option(...,
MOSQ_OPT_*_MAX, ...)` behaviour. Closes #1417.
- Fix regression on use of `mosquitto_connect_async()` not working.
Closes #1415 and #1422.
Clients:
- mosquitto_sub: Fix `-E` incorrectly not working unless `-d` was also

View File

@@ -119,9 +119,7 @@ int mosquitto_connect_bind_v5(struct mosquitto *mosq, const char *host, int port
rc = mosquitto__connect_init(mosq, host, port, keepalive, bind_address);
if(rc) return rc;
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_new;
pthread_mutex_unlock(&mosq->state_mutex);
mosquitto__set_state(mosq, mosq_cs_new);
return mosquitto__reconnect(mosq, true, properties);
}
@@ -138,10 +136,6 @@ int mosquitto_connect_bind_async(struct mosquitto *mosq, const char *host, int p
int rc = mosquitto__connect_init(mosq, host, port, keepalive, bind_address);
if(rc) return rc;
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_connect_async;
pthread_mutex_unlock(&mosq->state_mutex);
return mosquitto__reconnect(mosq, false, NULL);
}
@@ -163,6 +157,7 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos
const mosquitto_property *outgoing_properties = NULL;
mosquitto_property local_property;
int rc;
if(!mosq) return MOSQ_ERR_INVAL;
if(!mosq->host || mosq->port <= 0) return MOSQ_ERR_INVAL;
if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;
@@ -180,17 +175,6 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos
if(rc) return rc;
}
pthread_mutex_lock(&mosq->state_mutex);
#ifdef WITH_SOCKS
if(mosq->socks5_host){
mosq->state = mosq_cs_socks5_new;
}else
#endif
{
mosq->state = mosq_cs_new;
}
pthread_mutex_unlock(&mosq->state_mutex);
pthread_mutex_lock(&mosq->msgtime_mutex);
mosq->last_msg_in = mosquitto_time();
mosq->next_msg_out = mosq->last_msg_in + mosq->keepalive;
@@ -217,19 +201,23 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos
rc = net__socket_connect(mosq, mosq->host, mosq->port, mosq->bind_address, blocking);
}
if(rc>0){
mosquitto__set_state(mosq, mosq_cs_connect_pending);
return rc;
}
#ifdef WITH_SOCKS
if(mosq->socks5_host){
mosquitto__set_state(mosq, mosq_cs_socks5_new);
return socks5__send(mosq);
}else
#endif
{
mosquitto__set_state(mosq, mosq_cs_connected);
rc = send__connect(mosq, mosq->keepalive, mosq->clean_start, outgoing_properties);
if(rc){
packet__cleanup_all(mosq);
net__socket_close(mosq);
mosquitto__set_state(mosq, mosq_cs_new);
}
return rc;
}
@@ -262,21 +250,18 @@ int mosquitto_disconnect_v5(struct mosquitto *mosq, int reason_code, const mosqu
if(rc) return rc;
}
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_disconnecting;
pthread_mutex_unlock(&mosq->state_mutex);
if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
return send__disconnect(mosq, reason_code, outgoing_properties);
mosquitto__set_state(mosq, mosq_cs_disconnected);
if(mosq->sock == INVALID_SOCKET){
return MOSQ_ERR_NO_CONN;
}else{
return send__disconnect(mosq, reason_code, outgoing_properties);
}
}
void do_client_disconnect(struct mosquitto *mosq, int reason_code, const mosquitto_property *properties)
{
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_disconnecting;
pthread_mutex_unlock(&mosq->state_mutex);
mosquitto__set_state(mosq, mosq_cs_disconnected);
net__socket_close(mosq);
/* Free data and reset values */

View File

@@ -111,7 +111,7 @@ int handle__connack(struct mosquitto *mosq)
case 0:
pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state != mosq_cs_disconnecting){
mosq->state = mosq_cs_connected;
mosq->state = mosq_cs_active;
}
pthread_mutex_unlock(&mosq->state_mutex);
message__retry_check(mosq);

View File

@@ -37,9 +37,12 @@ Contributors:
int handle__pingreq(struct mosquitto *mosq)
{
int state;
assert(mosq);
if(mosq->state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}
@@ -57,11 +60,8 @@ int handle__pingresp(struct mosquitto *mosq)
assert(mosq);
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
if(state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@@ -51,11 +51,8 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
assert(mosq);
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
if(state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@@ -44,11 +44,8 @@ int handle__publish(struct mosquitto *mosq)
assert(mosq);
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
if(state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@@ -41,10 +41,12 @@ int handle__pubrec(struct mosquitto_db *db, struct mosquitto *mosq)
uint16_t mid;
int rc;
mosquitto_property *properties = NULL;
int state;
assert(mosq);
if(mosq->state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@@ -45,10 +45,12 @@ int handle__pubrel(struct mosquitto_db *db, struct mosquitto *mosq)
#endif
int rc;
mosquitto_property *properties = NULL;
int state;
assert(mosq);
if(mosq->state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@@ -29,6 +29,7 @@ Contributors:
#include "mqtt_protocol.h"
#include "packet_mosq.h"
#include "property_mosq.h"
#include "util_mosq.h"
int handle__suback(struct mosquitto *mosq)
@@ -44,11 +45,8 @@ int handle__suback(struct mosquitto *mosq)
assert(mosq);
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
if(state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@@ -42,10 +42,12 @@ int handle__unsuback(struct mosquitto *mosq)
uint16_t mid;
int rc;
mosquitto_property *properties = NULL;
int state;
assert(mosq);
if(mosq->state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@@ -83,17 +83,15 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
}else{
#ifdef WITH_SRV
if(mosq->achan){
pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state == mosq_cs_connect_srv){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_connect_srv){
rc = ares_fds(mosq->achan, &readfds, &writefds);
if(rc > maxfd){
maxfd = rc;
}
}else{
pthread_mutex_unlock(&mosq->state_mutex);
return MOSQ_ERR_NO_CONN;
}
pthread_mutex_unlock(&mosq->state_mutex);
}
#else
return MOSQ_ERR_NO_CONN;
@@ -198,15 +196,12 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
#ifndef WIN32
struct timespec req, rem;
#endif
int state;
if(!mosq) return MOSQ_ERR_INVAL;
mosq->reconnects = 0;
if(mosq->state == mosq_cs_connect_async){
mosquitto_reconnect(mosq);
}
while(run){
do{
rc = mosquitto_loop(mosq, timeout, max_packets);
@@ -234,13 +229,10 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
}
do{
rc = MOSQ_ERR_SUCCESS;
pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state == mosq_cs_disconnecting){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
run = 0;
pthread_mutex_unlock(&mosq->state_mutex);
}else{
pthread_mutex_unlock(&mosq->state_mutex);
if(mosq->reconnect_delay_max > mosq->reconnect_delay){
if(mosq->reconnect_exponential_backoff){
reconnect_delay = mosq->reconnect_delay*(mosq->reconnects+1)*(mosq->reconnects+1);
@@ -267,12 +259,10 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
}
#endif
pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state == mosq_cs_disconnecting){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
run = 0;
pthread_mutex_unlock(&mosq->state_mutex);
}else{
pthread_mutex_unlock(&mosq->state_mutex);
rc = mosquitto_reconnect(mosq);
}
}
@@ -293,13 +283,14 @@ int mosquitto_loop_misc(struct mosquitto *mosq)
static int mosquitto__loop_rc_handle(struct mosquitto *mosq, int rc)
{
int state;
if(rc){
net__socket_close(mosq);
pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state == mosq_cs_disconnecting){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
rc = MOSQ_ERR_SUCCESS;
}
pthread_mutex_unlock(&mosq->state_mutex);
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_disconnect){
mosq->in_callback = true;

View File

@@ -94,7 +94,7 @@ enum mosquitto_client_state {
mosq_cs_new = 0,
mosq_cs_connected = 1,
mosq_cs_disconnecting = 2,
mosq_cs_connect_async = 3,
mosq_cs_active = 3,
mosq_cs_connect_pending = 4,
mosq_cs_connect_srv = 5,
mosq_cs_disconnect_ws = 6,
@@ -107,7 +107,6 @@ enum mosquitto_client_state {
mosq_cs_socks5_userpass_reply = 13,
mosq_cs_socks5_send_userpass = 14,
mosq_cs_expiring = 15,
mosq_cs_connecting = 16,
mosq_cs_duplicate = 17, /* client that has been taken over by another with the same id */
mosq_cs_disconnect_with_will = 18,
mosq_cs_disused = 19, /* client that has been added to the disused list to be freed */

View File

@@ -34,6 +34,7 @@ Contributors:
#include "net_mosq.h"
#include "packet_mosq.h"
#include "read_handle.h"
#include "util_mosq.h"
#ifdef WITH_BROKER
# include "sys_tree.h"
# include "send_mosq.h"
@@ -218,10 +219,7 @@ int packet__write(struct mosquitto *mosq)
}
pthread_mutex_unlock(&mosq->out_packet_mutex);
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
state = mosquitto__get_state(mosq);
#if defined(WITH_TLS) && !defined(WITH_BROKER)
if((state == mosq_cs_connect_pending) || mosq->want_connect){
#else
@@ -329,10 +327,8 @@ int packet__read(struct mosquitto *mosq)
if(mosq->sock == INVALID_SOCKET){
return MOSQ_ERR_NO_CONN;
}
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
state = mosquitto__get_state(mosq);
if(state == mosq_cs_connect_pending){
return MOSQ_ERR_SUCCESS;
}
@@ -358,7 +354,7 @@ int packet__read(struct mosquitto *mosq)
#ifdef WITH_BROKER
G_BYTES_RECEIVED_INC(1);
/* Clients must send CONNECT as their first command. */
if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CMD_CONNECT){
if(!(mosq->bridge) && mosq->state == mosq_cs_connected && (byte&0xF0) != CMD_CONNECT){
return MOSQ_ERR_PROTOCOL;
}
#endif

View File

@@ -112,8 +112,11 @@ int socks5__send(struct mosquitto *mosq)
struct in6_addr addr_ipv6;
int ipv4_pton_result;
int ipv6_pton_result;
int state;
if(mosq->state == mosq_cs_socks5_new){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_socks5_new){
packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet));
if(!packet) return MOSQ_ERR_NOMEM;
@@ -134,9 +137,7 @@ int socks5__send(struct mosquitto *mosq)
packet->payload[2] = SOCKS_AUTH_NONE;
}
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_socks5_start;
pthread_mutex_unlock(&mosq->state_mutex);
mosquitto__set_state(mosq, mosq_cs_socks5_start);
mosq->in_packet.pos = 0;
mosq->in_packet.packet_length = 2;
@@ -149,7 +150,7 @@ int socks5__send(struct mosquitto *mosq)
}
return packet__queue(mosq, packet);
}else if(mosq->state == mosq_cs_socks5_auth_ok){
}else if(state == mosq_cs_socks5_auth_ok){
packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet));
if(!packet) return MOSQ_ERR_NOMEM;
@@ -201,9 +202,7 @@ int socks5__send(struct mosquitto *mosq)
packet->payload[1] = 0x01;
packet->payload[2] = 0x00;
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_socks5_request;
pthread_mutex_unlock(&mosq->state_mutex);
mosquitto__set_state(mosq, mosq_cs_socks5_request);
mosq->in_packet.pos = 0;
mosq->in_packet.packet_length = 5;
@@ -216,7 +215,7 @@ int socks5__send(struct mosquitto *mosq)
}
return packet__queue(mosq, packet);
}else if(mosq->state == mosq_cs_socks5_send_userpass){
}else if(state == mosq_cs_socks5_send_userpass){
packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet));
if(!packet) return MOSQ_ERR_NOMEM;
@@ -232,9 +231,7 @@ int socks5__send(struct mosquitto *mosq)
packet->payload[2+ulen] = plen;
memcpy(&(packet->payload[3+ulen]), mosq->socks5_password, plen);
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_socks5_userpass_reply;
pthread_mutex_unlock(&mosq->state_mutex);
mosquitto__set_state(mosq, mosq_cs_socks5_userpass_reply);
mosq->in_packet.pos = 0;
mosq->in_packet.packet_length = 2;
@@ -256,8 +253,10 @@ int socks5__read(struct mosquitto *mosq)
ssize_t len;
uint8_t *payload;
uint8_t i;
int state;
if(mosq->state == mosq_cs_socks5_start){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_socks5_start){
while(mosq->in_packet.to_process > 0){
len = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
if(len > 0){
@@ -289,17 +288,17 @@ int socks5__read(struct mosquitto *mosq)
switch(mosq->in_packet.payload[1]){
case SOCKS_AUTH_NONE:
packet__cleanup(&mosq->in_packet);
mosq->state = mosq_cs_socks5_auth_ok;
mosquitto__set_state(mosq, mosq_cs_socks5_auth_ok);
return socks5__send(mosq);
case SOCKS_AUTH_USERPASS:
packet__cleanup(&mosq->in_packet);
mosq->state = mosq_cs_socks5_send_userpass;
mosquitto__set_state(mosq, mosq_cs_socks5_send_userpass);
return socks5__send(mosq);
default:
packet__cleanup(&mosq->in_packet);
return MOSQ_ERR_AUTH;
}
}else if(mosq->state == mosq_cs_socks5_userpass_reply){
}else if(state == mosq_cs_socks5_userpass_reply){
while(mosq->in_packet.to_process > 0){
len = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
if(len > 0){
@@ -330,7 +329,7 @@ int socks5__read(struct mosquitto *mosq)
}
if(mosq->in_packet.payload[1] == 0){
packet__cleanup(&mosq->in_packet);
mosq->state = mosq_cs_socks5_auth_ok;
mosquitto__set_state(mosq, mosq_cs_socks5_auth_ok);
return socks5__send(mosq);
}else{
i = mosq->in_packet.payload[1];
@@ -355,7 +354,7 @@ int socks5__read(struct mosquitto *mosq)
}
return MOSQ_ERR_PROXY;
}
}else if(mosq->state == mosq_cs_socks5_request){
}else if(state == mosq_cs_socks5_request){
while(mosq->in_packet.to_process > 0){
len = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
if(len > 0){
@@ -423,7 +422,7 @@ int socks5__read(struct mosquitto *mosq)
if(mosq->in_packet.payload[1] == 0){
/* Auth passed */
packet__cleanup(&mosq->in_packet);
mosq->state = mosq_cs_new;
mosquitto__set_state(mosq, mosq_cs_new);
if(mosq->socks5_host){
int rc = net__socket_connect_step3(mosq, mosq->host);
if(rc) return rc;
@@ -432,7 +431,7 @@ int socks5__read(struct mosquitto *mosq)
}else{
i = mosq->in_packet.payload[1];
packet__cleanup(&mosq->in_packet);
mosq->state = mosq_cs_socks5_new;
mosquitto__set_state(mosq, mosq_cs_socks5_new);
switch(i){
case SOCKS_REPLY_CONNECTION_NOT_ALLOWED:
return MOSQ_ERR_AUTH;

View File

@@ -91,9 +91,7 @@ int mosquitto_connect_srv(struct mosquitto *mosq, const char *host, int keepaliv
mosquitto__free(h);
}
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_connect_srv;
pthread_mutex_unlock(&mosq->state_mutex);
mosquitto__set_state(mosq_cs_connect_srv);
mosq->keepalive = keepalive;

View File

@@ -22,6 +22,7 @@ Contributors:
#include "mosquitto_internal.h"
#include "net_mosq.h"
#include "util_mosq.h"
void *mosquitto__thread_main(void *obj);
@@ -89,9 +90,7 @@ void *mosquitto__thread_main(void *obj)
if(!mosq) return NULL;
do{
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
state = mosquitto__get_state(mosq);
if(state == mosq_cs_new){
#ifdef WIN32
Sleep(10);
@@ -103,10 +102,6 @@ void *mosquitto__thread_main(void *obj)
}
}while(1);
if(state == mosq_cs_connect_async){
mosquitto_reconnect(mosq);
}
if(!mosq->keepalive){
/* Sleep for a day if keepalive disabled. */
mosquitto_loop_forever(mosq, 1000*86400, 1);

View File

@@ -89,11 +89,8 @@ int mosquitto__check_keepalive(struct mosquitto *mosq)
if(mosq->keepalive && mosq->sock != INVALID_SOCKET &&
(now >= next_msg_out || now - last_msg_in >= mosq->keepalive)){
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
if(state == mosq_cs_connected && mosq->ping_t == 0){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_active && mosq->ping_t == 0){
send__pingreq(mosq);
/* Reset last msg times to give the server time to send a pingresp */
pthread_mutex_lock(&mosq->msgtime_mutex);
@@ -105,13 +102,12 @@ int mosquitto__check_keepalive(struct mosquitto *mosq)
net__socket_close(db, mosq);
#else
net__socket_close(mosq);
pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state == mosq_cs_disconnecting){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_disconnecting){
rc = MOSQ_ERR_SUCCESS;
}else{
rc = MOSQ_ERR_KEEPALIVE;
}
pthread_mutex_unlock(&mosq->state_mutex);
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_disconnect){
mosq->in_callback = true;
@@ -357,3 +353,30 @@ int util__random_bytes(void *bytes, int count)
#endif
return rc;
}
int mosquitto__set_state(struct mosquitto *mosq, enum mosquitto_client_state state)
{
pthread_mutex_lock(&mosq->state_mutex);
#ifdef WITH_BROKER
if(mosq->state != mosq_cs_disused)
#endif
{
mosq->state = state;
}
pthread_mutex_unlock(&mosq->state_mutex);
return MOSQ_ERR_SUCCESS;
}
enum mosquitto_client_state mosquitto__get_state(struct mosquitto *mosq)
{
enum mosquitto_client_state state;
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
return state;
}

View File

@@ -33,6 +33,9 @@ int mosquitto__check_keepalive(struct mosquitto *mosq);
uint16_t mosquitto__mid_generate(struct mosquitto *mosq);
FILE *mosquitto__fopen(const char *path, const char *mode, bool restrict_read);
int mosquitto__set_state(struct mosquitto *mosq, enum mosquitto_client_state state);
enum mosquitto_client_state mosquitto__get_state(struct mosquitto *mosq);
#ifdef WITH_TLS
int mosquitto__hex2bin_sha1(const char *hex, unsigned char **bin);
int mosquitto__hex2bin(const char *hex, unsigned char *bin, int bin_max_len);

View File

@@ -127,7 +127,7 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context)
if(!context || !context->bridge) return MOSQ_ERR_INVAL;
context__set_state(context, mosq_cs_new);
mosquitto__set_state(context, mosq_cs_new);
context->sock = INVALID_SOCKET;
context->last_msg_in = mosquitto_time();
context->next_msg_out = mosquitto_time() + context->bridge->keepalive;
@@ -247,7 +247,7 @@ int bridge__connect_step2(struct mosquitto_db *db, struct mosquitto *context)
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(context->sock), context);
if(rc == MOSQ_ERR_CONN_PENDING){
context__set_state(context, mosq_cs_connect_pending);
mosquitto__set_state(context, mosq_cs_connect_pending);
}
return rc;
}
@@ -306,7 +306,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
if(!context || !context->bridge) return MOSQ_ERR_INVAL;
context__set_state(context, mosq_cs_new);
mosquitto__set_state(context, mosq_cs_new);
context->sock = INVALID_SOCKET;
context->last_msg_in = mosquitto_time();
context->next_msg_out = mosquitto_time() + context->bridge->keepalive;
@@ -400,7 +400,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
return rc;
}else if(rc == MOSQ_ERR_CONN_PENDING){
context__set_state(context, mosq_cs_connect_pending);
mosquitto__set_state(context, mosq_cs_connect_pending);
}
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(context->sock), context);

View File

@@ -38,7 +38,7 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock)
if(!context) return NULL;
context->pollfd_index = -1;
context__set_state(context, mosq_cs_new);
mosquitto__set_state(context, mosq_cs_new);
context->sock = sock;
context->last_msg_in = mosquitto_time();
context->next_msg_out = mosquitto_time() + 60;
@@ -243,14 +243,14 @@ void context__disconnect(struct mosquitto_db *db, struct mosquitto *context)
}else{
session_expiry__add(db, context);
}
context__set_state(context, mosq_cs_disconnected);
mosquitto__set_state(context, mosq_cs_disconnected);
}
void context__add_to_disused(struct mosquitto_db *db, struct mosquitto *context)
{
if(context->state == mosq_cs_disused) return;
context__set_state(context, mosq_cs_disused);
mosquitto__set_state(context, mosq_cs_disused);
if(context->id){
context__remove_from_by_id(db, context);
@@ -308,10 +308,3 @@ void context__remove_from_by_id(struct mosquitto_db *db, struct mosquitto *conte
}
}
void context__set_state(struct mosquitto *context, enum mosquitto_client_state state)
{
if(context->state != mosq_cs_disused){
context->state = state;
}
}

View File

@@ -964,11 +964,11 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
uint32_t expiry_interval;
if(!context || context->sock == INVALID_SOCKET
|| (context->state == mosq_cs_connected && !context->id)){
|| (context->state == mosq_cs_active && !context->id)){
return MOSQ_ERR_INVAL;
}
if(context->state != mosq_cs_connected){
if(context->state != mosq_cs_active){
return MOSQ_ERR_SUCCESS;
}

View File

@@ -54,7 +54,7 @@ int handle__auth(struct mosquitto_db *db, struct mosquitto *context)
return MOSQ_ERR_PROTOCOL;
}
if((reason_code == MQTT_RC_REAUTHENTICATE && context->state != mosq_cs_connected)
if((reason_code == MQTT_RC_REAUTHENTICATE && context->state != mosq_cs_active)
|| (reason_code == MQTT_RC_CONTINUE_AUTHENTICATION
&& context->state != mosq_cs_authenticating && context->state != mosq_cs_reauthenticating)){
@@ -94,11 +94,11 @@ int handle__auth(struct mosquitto_db *db, struct mosquitto *context)
if(reason_code == MQTT_RC_REAUTHENTICATE){
/* This is a re-authentication attempt */
context__set_state(context, mosq_cs_reauthenticating);
mosquitto__set_state(context, mosq_cs_reauthenticating);
rc = mosquitto_security_auth_start(db, context, true, auth_data, auth_data_len, &auth_data_out, &auth_data_out_len);
}else{
if(context->state != mosq_cs_reauthenticating){
context__set_state(context, mosq_cs_authenticating);
mosquitto__set_state(context, mosq_cs_authenticating);
}
rc = mosquitto_security_auth_continue(db, context, auth_data, auth_data_len, &auth_data_out, &auth_data_out_len);
}
@@ -107,7 +107,7 @@ int handle__auth(struct mosquitto_db *db, struct mosquitto *context)
if(context->state == mosq_cs_authenticating){
return connect__on_authorised(db, context, auth_data_out, auth_data_out_len);
}else{
context__set_state(context, mosq_cs_connected);
mosquitto__set_state(context, mosq_cs_active);
rc = send__auth(db, context, MQTT_RC_SUCCESS, auth_data_out, auth_data_out_len);
free(auth_data_out);
return rc;

View File

@@ -108,7 +108,7 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context)
}
}
}
context__set_state(context, mosq_cs_connected);
mosquitto__set_state(context, mosq_cs_active);
return MOSQ_ERR_SUCCESS;
case CONNACK_REFUSED_PROTOCOL_VERSION:
if(context->bridge){

View File

@@ -166,7 +166,7 @@ int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, v
found_context->clean_start = true;
found_context->session_expiry_interval = 0;
context__set_state(found_context, mosq_cs_duplicate);
mosquitto__set_state(found_context, mosq_cs_duplicate);
do_disconnect(db, found_context, MOSQ_ERR_SUCCESS);
}
@@ -266,7 +266,7 @@ int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, v
}
free(auth_data_out);
context__set_state(context, mosq_cs_connected);
mosquitto__set_state(context, mosq_cs_active);
rc = send__connack(db, context, connect_ack, CONNACK_ACCEPTED, connack_props);
mosquitto_property_free_all(&connack_props);
return rc;
@@ -835,7 +835,7 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context)
if(rc == MOSQ_ERR_SUCCESS){
return connect__on_authorised(db, context, auth_data_out, auth_data_out_len);
}else if(rc == MOSQ_ERR_AUTH_CONTINUE){
context__set_state(context, mosq_cs_authenticating);
mosquitto__set_state(context, mosq_cs_authenticating);
rc = send__auth(db, context, MQTT_RC_CONTINUE_AUTHENTICATION, auth_data_out, auth_data_out_len);
free(auth_data_out);
return rc;

View File

@@ -21,6 +21,7 @@ Contributors:
#include "packet_mosq.h"
#include "property_mosq.h"
#include "send_mosq.h"
#include "util_mosq.h"
#include "will_mosq.h"
@@ -65,10 +66,10 @@ int handle__disconnect(struct mosquitto_db *db, struct mosquitto *context)
}
}
if(reason_code == MQTT_RC_DISCONNECT_WITH_WILL_MSG){
context__set_state(context, mosq_cs_disconnect_with_will);
mosquitto__set_state(context, mosq_cs_disconnect_with_will);
}else{
will__clear(context);
context__set_state(context, mosq_cs_disconnecting);
mosquitto__set_state(context, mosq_cs_disconnecting);
}
do_disconnect(db, context, MOSQ_ERR_SUCCESS);
return MOSQ_ERR_SUCCESS;

View File

@@ -61,7 +61,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
bool match;
#endif
if(context->state != mosq_cs_connected){
if(context->state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@@ -46,7 +46,7 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context)
if(!context) return MOSQ_ERR_INVAL;
if(context->state != mosq_cs_connected){
if(context->state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@@ -39,7 +39,7 @@ int handle__unsubscribe(struct mosquitto_db *db, struct mosquitto *context)
if(!context) return MOSQ_ERR_INVAL;
if(context->state != mosq_cs_connected){
if(context->state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}
log__printf(NULL, MOSQ_LOG_DEBUG, "Received UNSUBSCRIBE from %s", context->id);

View File

@@ -489,7 +489,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
log__printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", id);
G_CLIENTS_EXPIRED_INC();
context->session_expiry_interval = 0;
context__set_state(context, mosq_cs_expiring);
mosquitto__set_state(context, mosq_cs_expiring);
do_disconnect(db, context, MOSQ_ERR_SUCCESS);
}
}
@@ -660,7 +660,7 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context, int reaso
}
if(context->state != mosq_cs_disconnecting && context->state != mosq_cs_disconnect_with_will){
context__set_state(context, mosq_cs_disconnect_ws);
mosquitto__set_state(context, mosq_cs_disconnect_ws);
}
if(context->wsi){
libwebsocket_callback_on_writable(context->ws_context, context->wsi);
@@ -800,7 +800,7 @@ static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pol
len = sizeof(int);
if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
if(err == 0){
context__set_state(context, mosq_cs_new);
mosquitto__set_state(context, mosq_cs_new);
#if defined(WITH_ADNS) && defined(WITH_BRIDGE)
if(context->bridge){
bridge__connect_step3(db, context);

View File

@@ -653,7 +653,6 @@ void context__add_to_disused(struct mosquitto_db *db, struct mosquitto *context)
void context__free_disused(struct mosquitto_db *db);
void context__send_will(struct mosquitto_db *db, struct mosquitto *context);
void context__remove_from_by_id(struct mosquitto_db *db, struct mosquitto *context);
void context__set_state(struct mosquitto *context, enum mosquitto_client_state state);
int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, void *auth_data_out, uint16_t auth_data_out_len);

View File

@@ -948,7 +948,7 @@ static void security__disconnect_auth(struct mosquitto_db *db, struct mosquitto
if(context->protocol == mosq_p_mqtt5){
send__disconnect(context, MQTT_RC_ADMINISTRATIVE_ACTION, NULL);
}
context__set_state(context, mosq_cs_disconnecting);
mosquitto__set_state(context, mosq_cs_disconnecting);
do_disconnect(db, context, MOSQ_ERR_AUTH);
}
#endif
@@ -1005,7 +1005,7 @@ int mosquitto_security_apply_default(struct mosquitto_db *db)
}
if(!allow_anonymous && !context->username){
context__set_state(context, mosq_cs_disconnecting);
mosquitto__set_state(context, mosq_cs_disconnecting);
do_disconnect(db, context, MOSQ_ERR_AUTH);
continue;
}
@@ -1018,7 +1018,7 @@ int mosquitto_security_apply_default(struct mosquitto_db *db)
if(context->protocol == mosq_p_mqtt5){
send__disconnect(context, MQTT_RC_ADMINISTRATIVE_ACTION, NULL);
}
context__set_state(context, mosq_cs_disconnecting);
mosquitto__set_state(context, mosq_cs_disconnecting);
do_disconnect(db, context, MOSQ_ERR_AUTH);
continue;
}
@@ -1118,7 +1118,7 @@ int mosquitto_security_apply_default(struct mosquitto_db *db)
{
/* Username/password check only if the identity/subject check not used */
if(mosquitto_unpwd_check(db, context, context->username, context->password) != MOSQ_ERR_SUCCESS){
context__set_state(context, mosq_cs_disconnecting);
mosquitto__set_state(context, mosq_cs_disconnecting);
do_disconnect(db, context, MOSQ_ERR_AUTH);
continue;
}
@@ -1130,8 +1130,8 @@ int mosquitto_security_apply_default(struct mosquitto_db *db)
if(context->listener){
security_opts = &context->listener->security_options;
}else{
if(context->state != mosq_cs_connected){
context__set_state(context, mosq_cs_disconnecting);
if(context->state != mosq_cs_active){
mosquitto__set_state(context, mosq_cs_disconnecting);
do_disconnect(db, context, MOSQ_ERR_AUTH);
continue;
}

View File

@@ -32,6 +32,8 @@ c : test-compile
./01-will-unpwd-set.py $@/01-will-unpwd-set.test
./02-subscribe-qos0.py $@/02-subscribe-qos0.test
./02-subscribe-qos1.py $@/02-subscribe-qos1.test
./02-subscribe-qos1.py $@/02-subscribe-qos1-async1.test
./02-subscribe-qos1.py $@/02-subscribe-qos1-async2.test
./02-subscribe-qos2.py $@/02-subscribe-qos2.test
./02-unsubscribe-multiple-v5.py $@/02-unsubscribe-multiple-v5.test
./02-unsubscribe-v5.py $@/02-unsubscribe-v5.test

View File

@@ -0,0 +1,78 @@
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <mosquitto.h>
/* mosquitto_connect_async() test, with mosquitto_loop_start() called before mosquitto_connect_async(). */
static int run = -1;
static bool should_run = true;
void on_connect(struct mosquitto *mosq, void *obj, int rc)
{
if(rc){
exit(1);
}else{
mosquitto_subscribe(mosq, NULL, "qos1/test", 1);
}
}
void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
{
run = rc;
}
void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
{
//mosquitto_disconnect(mosq);
should_run = false;
}
int main(int argc, char *argv[])
{
int rc;
struct mosquitto *mosq;
int port = atoi(argv[1]);
mosquitto_lib_init();
mosq = mosquitto_new("subscribe-qos1-test", true, NULL);
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_disconnect_callback_set(mosq, on_disconnect);
mosquitto_subscribe_callback_set(mosq, on_subscribe);
printf("ok, about to call connect_async\n");
// this only works if loop_start is first. with loop_start second,
// it fails on both 1.6.4 _and_ 1.6.5
// in this order, 1.6.4 works and 1.6.5 fails.
rc = mosquitto_loop_start(mosq);
printf("loop_start returned rc: %d\n", rc);
if (rc) {
printf("which is: %s\n", mosquitto_strerror(rc));
}
// not sure which rc you want to be returned....
rc = mosquitto_connect_async(mosq, "localhost", port, 60);
printf("connect async returned rc: %d\n", rc);
if (rc) {
printf("which is: %s\n", mosquitto_strerror(rc));
}
printf("ok, so we can start just waiting now, loop_start will run in it's thread\n");
/* 10 millis to be system polite */
//struct timespec tv = { 0, 10e6 };
struct timespec tv = { 1, 0 };
while(should_run){
nanosleep(&tv, NULL);
printf("...waiting...\n");
}
printf("Already exited should_run....\n");
mosquitto_disconnect(mosq);
mosquitto_loop_stop(mosq, false);
mosquitto_lib_cleanup();
return run;
}

View File

@@ -0,0 +1,74 @@
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <mosquitto.h>
/* mosquitto_connect_async() test, with mosquitto_loop_start() called after mosquitto_connect_async(). */
static int run = -1;
static bool should_run = true;
void on_connect(struct mosquitto *mosq, void *obj, int rc)
{
if(rc){
exit(1);
}else{
mosquitto_subscribe(mosq, NULL, "qos1/test", 1);
}
}
void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
{
run = rc;
}
void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
{
//mosquitto_disconnect(mosq);
should_run = false;
}
int main(int argc, char *argv[])
{
int rc;
struct mosquitto *mosq;
int port = atoi(argv[1]);
mosquitto_lib_init();
mosq = mosquitto_new("subscribe-qos1-test", true, NULL);
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_disconnect_callback_set(mosq, on_disconnect);
mosquitto_subscribe_callback_set(mosq, on_subscribe);
printf("ok, about to call connect_async\n");
rc = mosquitto_connect_async(mosq, "localhost", port, 60);
printf("connect async returned rc: %d\n", rc);
if (rc) {
printf("which is: %s\n", mosquitto_strerror(rc));
}
rc = mosquitto_loop_start(mosq);
printf("loop_start returned rc: %d\n", rc);
if (rc) {
printf("which is: %s\n", mosquitto_strerror(rc));
}
printf("ok, so we can start just waiting now, loop_start will run in it's thread\n");
/* 10 millis to be system polite */
//struct timespec tv = { 0, 10e6 };
struct timespec tv = { 1, 0 };
while(should_run){
nanosleep(&tv, NULL);
printf("...waiting...\n");
}
printf("Already exited should_run....\n");
mosquitto_disconnect(mosq);
mosquitto_loop_stop(mosq, false);
mosquitto_lib_cleanup();
return run;
}

View File

@@ -13,6 +13,8 @@ SRC = \
01-server-keepalive-pingreq.c \
02-subscribe-qos0.c \
02-subscribe-qos1.c \
02-subscribe-qos1-async1.c \
02-subscribe-qos1-async2.c \
02-subscribe-qos2.c \
02-unsubscribe.c \
02-unsubscribe-v5.c \

View File

@@ -14,6 +14,8 @@ tests = [
(1, ['./02-subscribe-qos0.py', 'c/02-subscribe-qos0.test']),
(1, ['./02-subscribe-qos1.py', 'c/02-subscribe-qos1.test']),
(1, ['./02-subscribe-qos1.py', 'c/02-subscribe-qos1-async1.test']),
(1, ['./02-subscribe-qos1.py', 'c/02-subscribe-qos1-async2.test']),
(1, ['./02-subscribe-qos2.py', 'c/02-subscribe-qos2.test']),
(1, ['./02-unsubscribe-multiple-v5.py', 'c/02-unsubscribe-multiple-v5.test']),
(1, ['./02-unsubscribe-v5.py', 'c/02-unsubscribe-v5.test']),