diff --git a/include/mosquitto_broker.h b/include/mosquitto_broker.h index 376a1293..0b2ae3b8 100644 --- a/include/mosquitto_broker.h +++ b/include/mosquitto_broker.h @@ -284,11 +284,7 @@ struct mosquitto_evt_persist_subscription { }; -/* Data for the MOSQ_EVT_PERSIST_CLIENT_MSG_ADD/_DELETE/_UPDATE event */ -/* NOTE: The persistence interface is currently marked as unstable, which means - * it may change in a future minor release. */ -struct mosquitto_evt_persist_client_msg { - void *future; +struct mosquitto_client_msg { const char *client_id; uint64_t cmsg_id; uint64_t store_id; @@ -303,6 +299,15 @@ struct mosquitto_evt_persist_client_msg { void *future2[8]; }; +/* Data for the MOSQ_EVT_PERSIST_CLIENT_MSG_ADD/_DELETE/_UPDATE event */ +/* NOTE: The persistence interface is currently marked as unstable, which means + * it may change in a future minor release. */ +struct mosquitto_evt_persist_client_msg { + void *future; + struct mosquitto_client_msg data; + void *future2[8]; +}; + struct mosquitto_base_msg { uint64_t store_id; diff --git a/plugins/persist-sqlite/client_msgs.c b/plugins/persist-sqlite/client_msgs.c index 65b1cb48..08f55d3a 100644 --- a/plugins/persist-sqlite/client_msgs.c +++ b/plugins/persist-sqlite/client_msgs.c @@ -32,16 +32,16 @@ int persist_sqlite__client_msg_add_cb(int event, void *event_data, void *userdat UNUSED(event); - if(sqlite3_bind_text(ms->client_msg_add_stmt, 1, ed->client_id, (int)strlen(ed->client_id), SQLITE_STATIC) == SQLITE_OK - && sqlite3_bind_int64(ms->client_msg_add_stmt, 2, (int64_t)ed->cmsg_id) == SQLITE_OK - && sqlite3_bind_int64(ms->client_msg_add_stmt, 3, (int64_t)ed->store_id) == SQLITE_OK - && sqlite3_bind_int(ms->client_msg_add_stmt, 4, ed->dup) == SQLITE_OK - && sqlite3_bind_int(ms->client_msg_add_stmt, 5, ed->direction) == SQLITE_OK - && sqlite3_bind_int(ms->client_msg_add_stmt, 6, ed->mid) == SQLITE_OK - && sqlite3_bind_int(ms->client_msg_add_stmt, 7, ed->qos) == SQLITE_OK - && sqlite3_bind_int(ms->client_msg_add_stmt, 8, ed->retain) == SQLITE_OK - && sqlite3_bind_int(ms->client_msg_add_stmt, 9, ed->state) == SQLITE_OK - && sqlite3_bind_int(ms->client_msg_add_stmt, 10, (int)ed->subscription_identifier) == SQLITE_OK + if(sqlite3_bind_text(ms->client_msg_add_stmt, 1, ed->data.client_id, (int)strlen(ed->data.client_id), SQLITE_STATIC) == SQLITE_OK + && sqlite3_bind_int64(ms->client_msg_add_stmt, 2, (int64_t)ed->data.cmsg_id) == SQLITE_OK + && sqlite3_bind_int64(ms->client_msg_add_stmt, 3, (int64_t)ed->data.store_id) == SQLITE_OK + && sqlite3_bind_int(ms->client_msg_add_stmt, 4, ed->data.dup) == SQLITE_OK + && sqlite3_bind_int(ms->client_msg_add_stmt, 5, ed->data.direction) == SQLITE_OK + && sqlite3_bind_int(ms->client_msg_add_stmt, 6, ed->data.mid) == SQLITE_OK + && sqlite3_bind_int(ms->client_msg_add_stmt, 7, ed->data.qos) == SQLITE_OK + && sqlite3_bind_int(ms->client_msg_add_stmt, 8, ed->data.retain) == SQLITE_OK + && sqlite3_bind_int(ms->client_msg_add_stmt, 9, ed->data.state) == SQLITE_OK + && sqlite3_bind_int(ms->client_msg_add_stmt, 10, (int)ed->data.subscription_identifier) == SQLITE_OK ){ @@ -67,9 +67,9 @@ int persist_sqlite__client_msg_remove_cb(int event, void *event_data, void *user UNUSED(event); - if(sqlite3_bind_text(ms->client_msg_remove_stmt, 1, ed->client_id, (int)strlen(ed->client_id), SQLITE_STATIC) == SQLITE_OK - && sqlite3_bind_int64(ms->client_msg_remove_stmt, 2, (int64_t)ed->store_id) == SQLITE_OK - && sqlite3_bind_int(ms->client_msg_remove_stmt, 3, ed->direction) == SQLITE_OK + if(sqlite3_bind_text(ms->client_msg_remove_stmt, 1, ed->data.client_id, (int)strlen(ed->data.client_id), SQLITE_STATIC) == SQLITE_OK + && sqlite3_bind_int64(ms->client_msg_remove_stmt, 2, (int64_t)ed->data.store_id) == SQLITE_OK + && sqlite3_bind_int(ms->client_msg_remove_stmt, 3, ed->data.direction) == SQLITE_OK ){ ms->event_count++; @@ -94,10 +94,10 @@ int persist_sqlite__client_msg_update_cb(int event, void *event_data, void *user UNUSED(event); - if(sqlite3_bind_int(ms->client_msg_update_stmt, 1, ed->state) == SQLITE_OK - && sqlite3_bind_int(ms->client_msg_update_stmt, 2, ed->dup) == SQLITE_OK - && sqlite3_bind_text(ms->client_msg_update_stmt, 3, ed->client_id, (int)strlen(ed->client_id), SQLITE_STATIC) == SQLITE_OK - && sqlite3_bind_int64(ms->client_msg_update_stmt, 4, (int64_t)ed->store_id) == SQLITE_OK + if(sqlite3_bind_int(ms->client_msg_update_stmt, 1, ed->data.state) == SQLITE_OK + && sqlite3_bind_int(ms->client_msg_update_stmt, 2, ed->data.dup) == SQLITE_OK + && sqlite3_bind_text(ms->client_msg_update_stmt, 3, ed->data.client_id, (int)strlen(ed->data.client_id), SQLITE_STATIC) == SQLITE_OK + && sqlite3_bind_int64(ms->client_msg_update_stmt, 4, (int64_t)ed->data.store_id) == SQLITE_OK ){ ms->event_count++; diff --git a/plugins/persist-sqlite/restore.c b/plugins/persist-sqlite/restore.c index bc9b135f..cf1c0b55 100644 --- a/plugins/persist-sqlite/restore.c +++ b/plugins/persist-sqlite/restore.c @@ -273,7 +273,7 @@ static int subscription_restore(struct mosquitto_sqlite *ms) static int base_msg_restore(struct mosquitto_sqlite *ms) { sqlite3_stmt *stmt; - struct mosquitto_base_msg msg; + struct mosquitto_base_msg base_msg; int rc; long count = 0, failed = 0; const char *str; @@ -290,39 +290,39 @@ static int base_msg_restore(struct mosquitto_sqlite *ms) } while(sqlite3_step(stmt) == SQLITE_ROW){ - memset(&msg, 0, sizeof(msg)); - msg.store_id = (uint64_t)sqlite3_column_int64(stmt, 0); - msg.expiry_time = (time_t)sqlite3_column_int64(stmt, 1); + memset(&base_msg, 0, sizeof(base_msg)); + base_msg.store_id = (uint64_t)sqlite3_column_int64(stmt, 0); + base_msg.expiry_time = (time_t)sqlite3_column_int64(stmt, 1); str = (const char *)sqlite3_column_text(stmt, 2); if(str){ - msg.topic = strdup(str); - if(!msg.topic){ + base_msg.topic = strdup(str); + if(!base_msg.topic){ failed++; continue; } } - msg.source_id = (char *)sqlite3_column_text(stmt, 4); - msg.source_username = (char *)sqlite3_column_text(stmt, 5); + base_msg.source_id = (char *)sqlite3_column_text(stmt, 4); + base_msg.source_username = (char *)sqlite3_column_text(stmt, 5); payload = (const void *)sqlite3_column_blob(stmt, 3); - msg.payloadlen = (uint32_t)sqlite3_column_int(stmt, 6); - if(payload && msg.payloadlen){ - msg.payload = malloc(msg.payloadlen+1); - if(!msg.payload){ - free(msg.topic); + base_msg.payloadlen = (uint32_t)sqlite3_column_int(stmt, 6); + if(payload && base_msg.payloadlen){ + base_msg.payload = malloc(base_msg.payloadlen+1); + if(!base_msg.payload){ + free(base_msg.topic); failed++; continue; } - memcpy(msg.payload, payload, msg.payloadlen); - ((uint8_t *)msg.payload)[msg.payloadlen] = 0; + memcpy(base_msg.payload, payload, base_msg.payloadlen); + ((uint8_t *)base_msg.payload)[base_msg.payloadlen] = 0; } - msg.source_mid = (uint16_t)sqlite3_column_int(stmt, 7); - msg.source_port = (uint16_t)sqlite3_column_int(stmt, 8); - msg.qos = (uint8_t)sqlite3_column_int(stmt, 9); - msg.retain = sqlite3_column_int(stmt, 10); - msg.properties = json_to_properties((const char *)sqlite3_column_text(stmt, 11)); + base_msg.source_mid = (uint16_t)sqlite3_column_int(stmt, 7); + base_msg.source_port = (uint16_t)sqlite3_column_int(stmt, 8); + base_msg.qos = (uint8_t)sqlite3_column_int(stmt, 9); + base_msg.retain = sqlite3_column_int(stmt, 10); + base_msg.properties = json_to_properties((const char *)sqlite3_column_text(stmt, 11)); - rc = mosquitto_persist_base_msg_add(&msg); + rc = mosquitto_persist_base_msg_add(&base_msg); if(rc == MOSQ_ERR_SUCCESS){ count++; }else{ @@ -339,7 +339,7 @@ static int base_msg_restore(struct mosquitto_sqlite *ms) static int client_msg_restore(struct mosquitto_sqlite *ms) { sqlite3_stmt *stmt; - struct mosquitto_evt_persist_client_msg msg; + struct mosquitto_evt_persist_client_msg client_msg; int rc; long count = 0, failed = 0; @@ -353,20 +353,20 @@ static int client_msg_restore(struct mosquitto_sqlite *ms) return MOSQ_ERR_UNKNOWN; } - memset(&msg, 0, sizeof(msg)); + memset(&client_msg, 0, sizeof(client_msg)); while(sqlite3_step(stmt) == SQLITE_ROW){ - msg.client_id = (const char *)sqlite3_column_text(stmt, 0); - msg.cmsg_id = (uint64_t)sqlite3_column_int64(stmt, 1); - msg.store_id = (uint64_t)sqlite3_column_int64(stmt, 2); - msg.dup = sqlite3_column_int(stmt, 3); - msg.direction = (uint8_t)sqlite3_column_int(stmt, 4); - msg.mid = (uint16_t)sqlite3_column_int(stmt, 5); - msg.qos = (uint8_t)sqlite3_column_int(stmt, 6); - msg.retain = sqlite3_column_int(stmt, 7); - msg.state = (uint8_t)sqlite3_column_int(stmt, 8); - msg.subscription_identifier = (uint32_t)sqlite3_column_int(stmt, 9); + client_msg.data.client_id = (const char *)sqlite3_column_text(stmt, 0); + client_msg.data.cmsg_id = (uint64_t)sqlite3_column_int64(stmt, 1); + client_msg.data.store_id = (uint64_t)sqlite3_column_int64(stmt, 2); + client_msg.data.dup = sqlite3_column_int(stmt, 3); + client_msg.data.direction = (uint8_t)sqlite3_column_int(stmt, 4); + client_msg.data.mid = (uint16_t)sqlite3_column_int(stmt, 5); + client_msg.data.qos = (uint8_t)sqlite3_column_int(stmt, 6); + client_msg.data.retain = sqlite3_column_int(stmt, 7); + client_msg.data.state = (uint8_t)sqlite3_column_int(stmt, 8); + client_msg.data.subscription_identifier = (uint32_t)sqlite3_column_int(stmt, 9); - rc = mosquitto_persist_client_msg_add(&msg); + rc = mosquitto_persist_client_msg_add(&client_msg); if(rc == MOSQ_ERR_SUCCESS){ count++; }else{ diff --git a/src/database.c b/src/database.c index 61ea333b..a260bc51 100644 --- a/src/database.c +++ b/src/database.c @@ -138,44 +138,44 @@ bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_ms } -void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg) +void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg) { msg_data->inflight_count++; - msg_data->inflight_bytes += msg->base_msg->msg.payloadlen; - if(msg->qos != 0){ + msg_data->inflight_bytes += client_msg->base_msg->msg.payloadlen; + if(client_msg->data.qos != 0){ msg_data->inflight_count12++; - msg_data->inflight_bytes12 += msg->base_msg->msg.payloadlen; + msg_data->inflight_bytes12 += client_msg->base_msg->msg.payloadlen; } } -static void db__msg_remove_from_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg) +static void db__msg_remove_from_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg) { msg_data->inflight_count--; - msg_data->inflight_bytes -= msg->base_msg->msg.payloadlen; - if(msg->qos != 0){ + msg_data->inflight_bytes -= client_msg->base_msg->msg.payloadlen; + if(client_msg->data.qos != 0){ msg_data->inflight_count12--; - msg_data->inflight_bytes12 -= msg->base_msg->msg.payloadlen; + msg_data->inflight_bytes12 -= client_msg->base_msg->msg.payloadlen; } } -void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg) +void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg) { msg_data->queued_count++; - msg_data->queued_bytes += msg->base_msg->msg.payloadlen; - if(msg->qos != 0){ + msg_data->queued_bytes += client_msg->base_msg->msg.payloadlen; + if(client_msg->data.qos != 0){ msg_data->queued_count12++; - msg_data->queued_bytes12 += msg->base_msg->msg.payloadlen; + msg_data->queued_bytes12 += client_msg->base_msg->msg.payloadlen; } } -static void db__msg_remove_from_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg) +static void db__msg_remove_from_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg) { msg_data->queued_count--; - msg_data->queued_bytes -= msg->base_msg->msg.payloadlen; - if(msg->qos != 0){ + msg_data->queued_bytes -= client_msg->base_msg->msg.payloadlen; + if(client_msg->data.qos != 0){ msg_data->queued_count12--; - msg_data->queued_bytes12 -= msg->base_msg->msg.payloadlen; + msg_data->queued_bytes12 -= client_msg->base_msg->msg.payloadlen; } } @@ -366,73 +366,73 @@ static void db__message_remove_queued(struct mosquitto *context, struct mosquitt void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data) { - struct mosquitto__client_msg *msg; + struct mosquitto__client_msg *client_msg; UNUSED(context); - msg = msg_data->queued; - DL_DELETE(msg_data->queued, msg); - DL_APPEND(msg_data->inflight, msg); + client_msg = msg_data->queued; + DL_DELETE(msg_data->queued, client_msg); + DL_APPEND(msg_data->inflight, client_msg); if(msg_data->inflight_quota > 0){ msg_data->inflight_quota--; } - db__msg_remove_from_queued_stats(msg_data, msg); - db__msg_add_to_inflight_stats(msg_data, msg); + db__msg_remove_from_queued_stats(msg_data, client_msg); + db__msg_add_to_inflight_stats(msg_data, client_msg); } int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state expect_state, int qos) { - struct mosquitto__client_msg *tail, *tmp; + struct mosquitto__client_msg *client_msg, *tmp; bool deleted = false; if(!context) return MOSQ_ERR_INVAL; - DL_FOREACH_SAFE(context->msgs_out.inflight, tail, tmp){ - if(tail->mid == mid){ - if(tail->qos != qos){ + DL_FOREACH_SAFE(context->msgs_out.inflight, client_msg, tmp){ + if(client_msg->data.mid == mid){ + if(client_msg->data.qos != qos){ return MOSQ_ERR_PROTOCOL; - }else if(qos == 2 && tail->state != expect_state){ + }else if(qos == 2 && client_msg->data.state != expect_state){ return MOSQ_ERR_PROTOCOL; } - db__message_remove_inflight(context, &context->msgs_out, tail); + db__message_remove_inflight(context, &context->msgs_out, client_msg); deleted = true; break; } } if(deleted == false){ - DL_FOREACH_SAFE(context->msgs_out.queued, tail, tmp){ - if(tail->mid == mid){ - if(tail->qos != qos){ + DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){ + if(client_msg->data.mid == mid){ + if(client_msg->data.qos != qos){ return MOSQ_ERR_PROTOCOL; - }else if(qos == 2 && tail->state != expect_state){ + }else if(qos == 2 && client_msg->data.state != expect_state){ return MOSQ_ERR_PROTOCOL; } - db__message_remove_queued(context, &context->msgs_out, tail); + db__message_remove_queued(context, &context->msgs_out, client_msg); break; } } } - DL_FOREACH_SAFE(context->msgs_out.queued, tail, tmp){ - if(!db__ready_for_flight(context, mosq_md_out, tail->qos)){ + DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){ + if(!db__ready_for_flight(context, mosq_md_out, client_msg->data.qos)){ break; } - switch(tail->qos){ + switch(client_msg->data.qos){ case 0: - tail->state = mosq_ms_publish_qos0; + client_msg->data.state = mosq_ms_publish_qos0; break; case 1: - tail->state = mosq_ms_publish_qos1; + client_msg->data.state = mosq_ms_publish_qos1; break; case 2: - tail->state = mosq_ms_publish_qos2; + client_msg->data.state = mosq_ms_publish_qos2; break; } - plugin_persist__handle_client_msg_update(context, tail); + plugin_persist__handle_client_msg_update(context, client_msg); db__message_dequeue_first(context, &context->msgs_out); } #ifdef WITH_PERSISTENCE @@ -446,7 +446,7 @@ int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mo /* Only for QoS 2 messages */ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto__base_msg *base_msg, bool persist) { - struct mosquitto__client_msg *msg; + struct mosquitto__client_msg *client_msg; struct mosquitto_msg_data *msg_data; enum mosquitto_msg_state state = mosq_ms_invalid; int rc = 0; @@ -484,43 +484,43 @@ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, str } #endif - msg = mosquitto__malloc(sizeof(struct mosquitto__client_msg)); - if(!msg) return MOSQ_ERR_NOMEM; - msg->prev = NULL; - msg->next = NULL; + client_msg = mosquitto__malloc(sizeof(struct mosquitto__client_msg)); + if(!client_msg) return MOSQ_ERR_NOMEM; + client_msg->prev = NULL; + client_msg->next = NULL; if(cmsg_id){ - msg->cmsg_id = cmsg_id; + client_msg->data.cmsg_id = cmsg_id; }else{ - msg->cmsg_id = ++context->last_cmsg_id; + client_msg->data.cmsg_id = ++context->last_cmsg_id; } - msg->base_msg = base_msg; - db__msg_store_ref_inc(msg->base_msg); - msg->mid = base_msg->msg.source_mid; - msg->direction = mosq_md_in; - msg->state = state; - msg->dup = false; + client_msg->base_msg = base_msg; + db__msg_store_ref_inc(client_msg->base_msg); + client_msg->data.mid = base_msg->msg.source_mid; + client_msg->data.direction = mosq_md_in; + client_msg->data.state = state; + client_msg->data.dup = false; if(base_msg->msg.qos > context->max_qos){ - msg->qos = context->max_qos; + client_msg->data.qos = context->max_qos; }else{ - msg->qos = base_msg->msg.qos; + client_msg->data.qos = base_msg->msg.qos; } - msg->retain = base_msg->msg.retain; - msg->subscription_identifier = 0; + client_msg->data.retain = base_msg->msg.retain; + client_msg->data.subscription_identifier = 0; if(state == mosq_ms_queued){ - DL_APPEND(msg_data->queued, msg); - db__msg_add_to_queued_stats(msg_data, msg); + DL_APPEND(msg_data->queued, client_msg); + db__msg_add_to_queued_stats(msg_data, client_msg); }else{ - DL_APPEND(msg_data->inflight, msg); - db__msg_add_to_inflight_stats(msg_data, msg); + DL_APPEND(msg_data->inflight, client_msg); + db__msg_add_to_inflight_stats(msg_data, client_msg); } if(persist && context->is_persisted){ - plugin_persist__handle_base_msg_add(msg->base_msg); - plugin_persist__handle_client_msg_add(context, msg); + plugin_persist__handle_base_msg_add(client_msg->base_msg); + plugin_persist__handle_client_msg_add(context, client_msg); } - if(msg->base_msg->msg.qos > 0){ + if(client_msg->base_msg->msg.qos > 0){ util__decrement_receive_quota(context); } return rc; @@ -528,7 +528,7 @@ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, str int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto__base_msg *base_msg, uint32_t subscription_identifier, bool update, bool persist) { - struct mosquitto__client_msg *msg; + struct mosquitto__client_msg *client_msg; struct mosquitto_msg_data *msg_data; enum mosquitto_msg_state state = mosq_ms_invalid; int rc = 0; @@ -626,40 +626,40 @@ int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uin } #endif - msg = mosquitto__malloc(sizeof(struct mosquitto__client_msg)); - if(!msg) return MOSQ_ERR_NOMEM; - msg->prev = NULL; - msg->next = NULL; + client_msg = mosquitto__malloc(sizeof(struct mosquitto__client_msg)); + if(!client_msg) return MOSQ_ERR_NOMEM; + client_msg->prev = NULL; + client_msg->next = NULL; if(cmsg_id){ - msg->cmsg_id = cmsg_id; + client_msg->data.cmsg_id = cmsg_id; }else{ - msg->cmsg_id = ++context->last_cmsg_id; + client_msg->data.cmsg_id = ++context->last_cmsg_id; } - msg->base_msg = base_msg; - db__msg_store_ref_inc(msg->base_msg); - msg->mid = mid; - msg->direction = mosq_md_out; - msg->state = state; - msg->dup = false; + client_msg->base_msg = base_msg; + db__msg_store_ref_inc(client_msg->base_msg); + client_msg->data.mid = mid; + client_msg->data.direction = mosq_md_out; + client_msg->data.state = state; + client_msg->data.dup = false; if(qos > context->max_qos){ - msg->qos = context->max_qos; + client_msg->data.qos = context->max_qos; }else{ - msg->qos = qos; + client_msg->data.qos = qos; } - msg->retain = retain; - msg->subscription_identifier = subscription_identifier; + client_msg->data.retain = retain; + client_msg->data.subscription_identifier = subscription_identifier; if(state == mosq_ms_queued){ - DL_APPEND(msg_data->queued, msg); - db__msg_add_to_queued_stats(msg_data, msg); + DL_APPEND(msg_data->queued, client_msg); + db__msg_add_to_queued_stats(msg_data, client_msg); }else{ - DL_APPEND(msg_data->inflight, msg); - db__msg_add_to_inflight_stats(msg_data, msg); + DL_APPEND(msg_data->inflight, client_msg); + db__msg_add_to_inflight_stats(msg_data, client_msg); } if(persist && context->is_persisted){ - plugin_persist__handle_base_msg_add(msg->base_msg); - plugin_persist__handle_client_msg_add(context, msg); + plugin_persist__handle_base_msg_add(client_msg->base_msg); + plugin_persist__handle_client_msg_add(context, client_msg); } if(db.config->allow_duplicate_messages == false && retain == false){ @@ -691,7 +691,7 @@ int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uin } #endif - if(msg->qos > 0 && state != mosq_ms_queued){ + if(client_msg->data.qos > 0 && state != mosq_ms_queued){ util__decrement_send_quota(context); } @@ -707,16 +707,16 @@ int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uin int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos, bool persist) { - struct mosquitto__client_msg *tail; + struct mosquitto__client_msg *client_msg; - DL_FOREACH(context->msgs_out.inflight, tail){ - if(tail->mid == mid){ - if(tail->qos != qos){ + DL_FOREACH(context->msgs_out.inflight, client_msg){ + if(client_msg->data.mid == mid){ + if(client_msg->data.qos != qos){ return MOSQ_ERR_PROTOCOL; } - tail->state = state; + client_msg->data.state = state; if(persist){ - plugin_persist__handle_client_msg_update(context, tail); + plugin_persist__handle_client_msg_update(context, client_msg); } return MOSQ_ERR_SUCCESS; } @@ -727,12 +727,12 @@ int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mo static void db__messages_delete_list(struct mosquitto__client_msg **head) { - struct mosquitto__client_msg *tail, *tmp; + struct mosquitto__client_msg *client_msg, *tmp; - DL_FOREACH_SAFE(*head, tail, tmp){ - DL_DELETE(*head, tail); - db__msg_store_ref_dec(&tail->base_msg); - mosquitto__FREE(tail); + DL_FOREACH_SAFE(*head, client_msg, tmp){ + DL_DELETE(*head, client_msg); + db__msg_store_ref_dec(&client_msg->base_msg); + mosquitto__FREE(client_msg); } *head = NULL; } @@ -966,21 +966,21 @@ int db__message_store(const struct mosquitto *source, struct mosquitto__base_msg int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto__base_msg **base_msg) { - struct mosquitto__client_msg *tail; + struct mosquitto__client_msg *client_msg; if(!context) return MOSQ_ERR_INVAL; *base_msg = NULL; - DL_FOREACH(context->msgs_in.inflight, tail){ - if(tail->base_msg->msg.source_mid == mid){ - *base_msg = tail->base_msg; + DL_FOREACH(context->msgs_in.inflight, client_msg){ + if(client_msg->base_msg->msg.source_mid == mid){ + *base_msg = client_msg->base_msg; return MOSQ_ERR_SUCCESS; } } - DL_FOREACH(context->msgs_in.queued, tail){ - if(tail->base_msg->msg.source_mid == mid){ - *base_msg = tail->base_msg; + DL_FOREACH(context->msgs_in.queued, client_msg){ + if(client_msg->base_msg->msg.source_mid == mid){ + *base_msg = client_msg->base_msg; return MOSQ_ERR_SUCCESS; } } @@ -992,7 +992,7 @@ int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosqu * retry, and to set incoming messages to expect an appropriate retry. */ static int db__message_reconnect_reset_outgoing(struct mosquitto *context) { - struct mosquitto__client_msg *msg, *tmp; + struct mosquitto__client_msg *client_msg, *tmp; context->msgs_out.inflight_bytes = 0; context->msgs_out.inflight_bytes12 = 0; @@ -1004,28 +1004,28 @@ static int db__message_reconnect_reset_outgoing(struct mosquitto *context) context->msgs_out.queued_count12 = 0; context->msgs_out.inflight_quota = context->msgs_out.inflight_maximum; - DL_FOREACH_SAFE(context->msgs_out.inflight, msg, tmp){ - db__msg_add_to_inflight_stats(&context->msgs_out, msg); - if(msg->qos > 0){ + DL_FOREACH_SAFE(context->msgs_out.inflight, client_msg, tmp){ + db__msg_add_to_inflight_stats(&context->msgs_out, client_msg); + if(client_msg->data.qos > 0){ util__decrement_send_quota(context); } - switch(msg->qos){ + switch(client_msg->data.qos){ case 0: - msg->state = mosq_ms_publish_qos0; + client_msg->data.state = mosq_ms_publish_qos0; break; case 1: - msg->state = mosq_ms_publish_qos1; + client_msg->data.state = mosq_ms_publish_qos1; break; case 2: - if(msg->state == mosq_ms_wait_for_pubcomp){ - msg->state = mosq_ms_resend_pubrel; + if(client_msg->data.state == mosq_ms_wait_for_pubcomp){ + client_msg->data.state = mosq_ms_resend_pubrel; }else{ - msg->state = mosq_ms_publish_qos2; + client_msg->data.state = mosq_ms_publish_qos2; } break; } - plugin_persist__handle_client_msg_update(context, msg); + plugin_persist__handle_client_msg_update(context, client_msg); } /* Messages received when the client was disconnected are put * in the mosq_ms_queued state. If we don't change them to the @@ -1033,22 +1033,22 @@ static int db__message_reconnect_reset_outgoing(struct mosquitto *context) * get sent until the client next receives a message - and they * will be sent out of order. */ - DL_FOREACH_SAFE(context->msgs_out.queued, msg, tmp){ - db__msg_add_to_queued_stats(&context->msgs_out, msg); - if(db__ready_for_flight(context, mosq_md_out, msg->qos)){ - switch(msg->qos){ + DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){ + db__msg_add_to_queued_stats(&context->msgs_out, client_msg); + if(db__ready_for_flight(context, mosq_md_out, client_msg->data.qos)){ + switch(client_msg->data.qos){ case 0: - msg->state = mosq_ms_publish_qos0; + client_msg->data.state = mosq_ms_publish_qos0; break; case 1: - msg->state = mosq_ms_publish_qos1; + client_msg->data.state = mosq_ms_publish_qos1; break; case 2: - msg->state = mosq_ms_publish_qos2; + client_msg->data.state = mosq_ms_publish_qos2; break; } db__message_dequeue_first(context, &context->msgs_out); - plugin_persist__handle_client_msg_update(context, msg); + plugin_persist__handle_client_msg_update(context, client_msg); } } @@ -1059,7 +1059,7 @@ static int db__message_reconnect_reset_outgoing(struct mosquitto *context) /* Called on reconnect to set incoming messages to expect an appropriate retry. */ static int db__message_reconnect_reset_incoming(struct mosquitto *context) { - struct mosquitto__client_msg *msg, *tmp; + struct mosquitto__client_msg *client_msg, *tmp; context->msgs_in.inflight_bytes = 0; context->msgs_in.inflight_bytes12 = 0; @@ -1071,16 +1071,16 @@ static int db__message_reconnect_reset_incoming(struct mosquitto *context) context->msgs_in.queued_count12 = 0; context->msgs_in.inflight_quota = context->msgs_in.inflight_maximum; - DL_FOREACH_SAFE(context->msgs_in.inflight, msg, tmp){ - db__msg_add_to_inflight_stats(&context->msgs_in, msg); - if(msg->qos > 0){ + DL_FOREACH_SAFE(context->msgs_in.inflight, client_msg, tmp){ + db__msg_add_to_inflight_stats(&context->msgs_in, client_msg); + if(client_msg->data.qos > 0){ util__decrement_receive_quota(context); } - if(msg->qos != 2){ + if(client_msg->data.qos != 2){ /* Anything msgs_in, msg); + db__message_remove_inflight(context, &context->msgs_in, client_msg); }else{ /* Message state can be preserved here because it should match * whatever the client has got. */ @@ -1093,22 +1093,22 @@ static int db__message_reconnect_reset_incoming(struct mosquitto *context) * get sent until the client next receives a message - and they * will be sent out of order. */ - DL_FOREACH_SAFE(context->msgs_in.queued, msg, tmp){ - db__msg_add_to_queued_stats(&context->msgs_in, msg); - if(db__ready_for_flight(context, mosq_md_in, msg->qos)){ - switch(msg->qos){ + DL_FOREACH_SAFE(context->msgs_in.queued, client_msg, tmp){ + db__msg_add_to_queued_stats(&context->msgs_in, client_msg); + if(db__ready_for_flight(context, mosq_md_in, client_msg->data.qos)){ + switch(client_msg->data.qos){ case 0: - msg->state = mosq_ms_publish_qos0; + client_msg->data.state = mosq_ms_publish_qos0; break; case 1: - msg->state = mosq_ms_publish_qos1; + client_msg->data.state = mosq_ms_publish_qos1; break; case 2: - msg->state = mosq_ms_publish_qos2; + client_msg->data.state = mosq_ms_publish_qos2; break; } db__message_dequeue_first(context, &context->msgs_in); - plugin_persist__handle_client_msg_update(context, msg); + plugin_persist__handle_client_msg_update(context, client_msg); } } @@ -1128,16 +1128,16 @@ int db__message_reconnect_reset(struct mosquitto *context) int db__message_remove_incoming(struct mosquitto* context, uint16_t mid) { - struct mosquitto__client_msg *tail, *tmp; + struct mosquitto__client_msg *client_msg, *tmp; if(!context) return MOSQ_ERR_INVAL; - DL_FOREACH_SAFE(context->msgs_in.inflight, tail, tmp){ - if(tail->mid == mid) { - if(tail->base_msg->msg.qos != 2){ + DL_FOREACH_SAFE(context->msgs_in.inflight, client_msg, tmp){ + if(client_msg->data.mid == mid) { + if(client_msg->base_msg->msg.qos != 2){ return MOSQ_ERR_PROTOCOL; } - db__message_remove_inflight(context, &context->msgs_in, tail); + db__message_remove_inflight(context, &context->msgs_in, client_msg); return MOSQ_ERR_SUCCESS; } } @@ -1148,7 +1148,7 @@ int db__message_remove_incoming(struct mosquitto* context, uint16_t mid) int db__message_release_incoming(struct mosquitto *context, uint16_t mid) { - struct mosquitto__client_msg *tail, *tmp; + struct mosquitto__client_msg *client_msg, *tmp; int retain; char *topic; char *source_id; @@ -1157,26 +1157,26 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid) if(!context) return MOSQ_ERR_INVAL; - DL_FOREACH_SAFE(context->msgs_in.inflight, tail, tmp){ - if(tail->mid == mid){ - if(tail->base_msg->msg.qos != 2){ + DL_FOREACH_SAFE(context->msgs_in.inflight, client_msg, tmp){ + if(client_msg->data.mid == mid){ + if(client_msg->base_msg->msg.qos != 2){ return MOSQ_ERR_PROTOCOL; } - topic = tail->base_msg->msg.topic; - retain = tail->retain; - source_id = tail->base_msg->msg.source_id; + topic = client_msg->base_msg->msg.topic; + retain = client_msg->data.retain; + source_id = client_msg->base_msg->msg.source_id; /* topic==NULL should be a QoS 2 message that was * denied/dropped and is being processed so the client doesn't * keep resending it. That means we don't send it to other * clients. */ if(topic == NULL){ - db__message_remove_inflight(context, &context->msgs_in, tail); + db__message_remove_inflight(context, &context->msgs_in, client_msg); deleted = true; }else{ - rc = sub__messages_queue(source_id, topic, 2, retain, &tail->base_msg); + rc = sub__messages_queue(source_id, topic, 2, retain, &client_msg->base_msg); if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_NO_SUBSCRIBERS){ - db__message_remove_inflight(context, &context->msgs_in, tail); + db__message_remove_inflight(context, &context->msgs_in, client_msg); deleted = true; }else{ return 1; @@ -1185,16 +1185,16 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid) } } - DL_FOREACH_SAFE(context->msgs_in.queued, tail, tmp){ - if(db__ready_for_flight(context, mosq_md_in, tail->qos)){ + DL_FOREACH_SAFE(context->msgs_in.queued, client_msg, tmp){ + if(db__ready_for_flight(context, mosq_md_in, client_msg->data.qos)){ break; } - if(tail->qos == 2){ - send__pubrec(context, tail->mid, 0, NULL); - tail->state = mosq_ms_wait_for_pubrel; + if(client_msg->data.qos == 2){ + send__pubrec(context, client_msg->data.mid, 0, NULL); + client_msg->data.state = mosq_ms_wait_for_pubrel; db__message_dequeue_first(context, &context->msgs_in); - plugin_persist__handle_client_msg_update(context, tail); + plugin_persist__handle_client_msg_update(context, client_msg); } } if(deleted){ @@ -1207,39 +1207,40 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid) void db__expire_all_messages(struct mosquitto *context) { - struct mosquitto__client_msg *msg, *tmp; + struct mosquitto__client_msg *client_msg, *tmp; - DL_FOREACH_SAFE(context->msgs_out.inflight, msg, tmp){ - if(msg->base_msg->msg.expiry_time && db.now_real_s > msg->base_msg->msg.expiry_time){ - if(msg->qos > 0){ + DL_FOREACH_SAFE(context->msgs_out.inflight, client_msg, tmp){ + if(client_msg->base_msg->msg.expiry_time && db.now_real_s > client_msg->base_msg->msg.expiry_time){ + if(client_msg->data.qos > 0){ util__increment_send_quota(context); } - db__message_remove_inflight(context, &context->msgs_out, msg); + db__message_remove_inflight(context, &context->msgs_out, client_msg); } } - DL_FOREACH_SAFE(context->msgs_out.queued, msg, tmp){ - if(msg->base_msg->msg.expiry_time && db.now_real_s > msg->base_msg->msg.expiry_time){ - db__message_remove_queued(context, &context->msgs_out, msg); + DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){ + if(client_msg->base_msg->msg.expiry_time && db.now_real_s > client_msg->base_msg->msg.expiry_time){ + db__message_remove_queued(context, &context->msgs_out, client_msg); } } - DL_FOREACH_SAFE(context->msgs_in.inflight, msg, tmp){ - if(msg->base_msg->msg.expiry_time && db.now_real_s > msg->base_msg->msg.expiry_time){ - if(msg->qos > 0){ + DL_FOREACH_SAFE(context->msgs_in.inflight, client_msg, tmp){ + if(client_msg->base_msg->msg.expiry_time && db.now_real_s > client_msg->base_msg->msg.expiry_time){ + if(client_msg->data.qos > 0){ util__increment_receive_quota(context); } - db__message_remove_inflight(context, &context->msgs_in, msg); + db__message_remove_inflight(context, &context->msgs_in, client_msg); } } - DL_FOREACH_SAFE(context->msgs_in.queued, msg, tmp){ - if(msg->base_msg->msg.expiry_time && db.now_real_s > msg->base_msg->msg.expiry_time){ - db__message_remove_queued(context, &context->msgs_in, msg); + DL_FOREACH_SAFE(context->msgs_in.queued, client_msg, tmp){ + if(client_msg->base_msg->msg.expiry_time && db.now_real_s > client_msg->base_msg->msg.expiry_time){ + db__message_remove_queued(context, &context->msgs_in, client_msg); } } } -static int db__message_write_inflight_out_single(struct mosquitto *context, struct mosquitto__client_msg *msg) +static int db__message_write_inflight_out_single(struct mosquitto *context, struct mosquitto__client_msg *client_msg) { + struct mosquitto__base_msg *base_msg; mosquitto_property *base_msg_props = NULL; int rc; uint16_t mid; @@ -1252,34 +1253,36 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru uint32_t expiry_interval; uint32_t subscription_id; + base_msg = client_msg->base_msg; + expiry_interval = 0; - if(msg->base_msg->msg.expiry_time){ - if(db.now_real_s > msg->base_msg->msg.expiry_time){ + if(base_msg->msg.expiry_time){ + if(db.now_real_s > base_msg->msg.expiry_time){ /* Message is expired, must not send. */ - if(msg->direction == mosq_md_out && msg->qos > 0){ + if(client_msg->data.direction == mosq_md_out && client_msg->data.qos > 0){ util__increment_send_quota(context); } - db__message_remove_inflight(context, &context->msgs_out, msg); + db__message_remove_inflight(context, &context->msgs_out, client_msg); return MOSQ_ERR_SUCCESS; }else{ - expiry_interval = (uint32_t)(msg->base_msg->msg.expiry_time - db.now_real_s); + expiry_interval = (uint32_t)(base_msg->msg.expiry_time - db.now_real_s); } } - mid = msg->mid; - retries = msg->dup; - retain = msg->retain; - topic = msg->base_msg->msg.topic; - qos = (uint8_t)msg->qos; - payloadlen = msg->base_msg->msg.payloadlen; - payload = msg->base_msg->msg.payload; - subscription_id = msg->subscription_identifier; - base_msg_props = msg->base_msg->msg.properties; + mid = client_msg->data.mid; + retries = client_msg->data.dup; + retain = client_msg->data.retain; + topic = base_msg->msg.topic; + qos = (uint8_t)client_msg->data.qos; + payloadlen = base_msg->msg.payloadlen; + payload = base_msg->msg.payload; + subscription_id = client_msg->data.subscription_identifier; + base_msg_props = base_msg->msg.properties; - switch(msg->state){ + switch(client_msg->data.state){ case mosq_ms_publish_qos0: rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, subscription_id, base_msg_props, expiry_interval); if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){ - db__message_remove_inflight(context, &context->msgs_out, msg); + db__message_remove_inflight(context, &context->msgs_out, client_msg); }else{ return rc; } @@ -1288,11 +1291,11 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru case mosq_ms_publish_qos1: rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, subscription_id, base_msg_props, expiry_interval); if(rc == MOSQ_ERR_SUCCESS){ - msg->dup = 1; /* Any retry attempts are a duplicate. */ - msg->state = mosq_ms_wait_for_puback; - plugin_persist__handle_client_msg_update(context, msg); + client_msg->data.dup = 1; /* Any retry attempts are a duplicate. */ + client_msg->data.state = mosq_ms_wait_for_puback; + plugin_persist__handle_client_msg_update(context, client_msg); }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ - db__message_remove_inflight(context, &context->msgs_out, msg); + db__message_remove_inflight(context, &context->msgs_out, client_msg); }else{ return rc; } @@ -1301,11 +1304,11 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru case mosq_ms_publish_qos2: rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, subscription_id, base_msg_props, expiry_interval); if(rc == MOSQ_ERR_SUCCESS){ - msg->dup = 1; /* Any retry attempts are a duplicate. */ - msg->state = mosq_ms_wait_for_pubrec; - plugin_persist__handle_client_msg_update(context, msg); + client_msg->data.dup = 1; /* Any retry attempts are a duplicate. */ + client_msg->data.state = mosq_ms_wait_for_pubrec; + plugin_persist__handle_client_msg_update(context, client_msg); }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ - db__message_remove_inflight(context, &context->msgs_out, msg); + db__message_remove_inflight(context, &context->msgs_out, client_msg); }else{ return rc; } @@ -1314,8 +1317,8 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru case mosq_ms_resend_pubrel: rc = send__pubrel(context, mid, NULL); if(!rc){ - msg->state = mosq_ms_wait_for_pubcomp; - plugin_persist__handle_client_msg_update(context, msg); + client_msg->data.state = mosq_ms_wait_for_pubcomp; + plugin_persist__handle_client_msg_update(context, client_msg); }else{ return rc; } @@ -1337,15 +1340,15 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru int db__message_write_inflight_out_all(struct mosquitto *context) { - struct mosquitto__client_msg *tail, *tmp; + struct mosquitto__client_msg *client_msg, *tmp; int rc; if(context->state != mosq_cs_active || !net__is_connected(context)){ return MOSQ_ERR_SUCCESS; } - DL_FOREACH_SAFE(context->msgs_out.inflight, tail, tmp){ - rc = db__message_write_inflight_out_single(context, tail); + DL_FOREACH_SAFE(context->msgs_out.inflight, client_msg, tmp){ + rc = db__message_write_inflight_out_single(context, client_msg); if(rc) return rc; } return MOSQ_ERR_SUCCESS; @@ -1354,7 +1357,7 @@ int db__message_write_inflight_out_all(struct mosquitto *context) int db__message_write_inflight_out_latest(struct mosquitto *context) { - struct mosquitto__client_msg *tail, *next; + struct mosquitto__client_msg *client_msg, *next; int rc; if(context->state != mosq_cs_active @@ -1371,27 +1374,27 @@ int db__message_write_inflight_out_latest(struct mosquitto *context) /* Start at the end of the list and work backwards looking for the first * message in a non-publish state */ - tail = context->msgs_out.inflight->prev; - while(tail != context->msgs_out.inflight && - (tail->state == mosq_ms_publish_qos0 - || tail->state == mosq_ms_publish_qos1 - || tail->state == mosq_ms_publish_qos2)){ + client_msg = context->msgs_out.inflight->prev; + while(client_msg != context->msgs_out.inflight && + (client_msg->data.state == mosq_ms_publish_qos0 + || client_msg->data.state == mosq_ms_publish_qos1 + || client_msg->data.state == mosq_ms_publish_qos2)){ - tail = tail->prev; + client_msg = client_msg->prev; } /* Tail is now either the head of the list, if that message is waiting for * publish, or the oldest message not waiting for a publish. In the latter * case, any pending publishes should be next after this message. */ - if(tail != context->msgs_out.inflight){ - tail = tail->next; + if(client_msg != context->msgs_out.inflight){ + client_msg = client_msg->next; } - while(tail){ - next = tail->next; - rc = db__message_write_inflight_out_single(context, tail); + while(client_msg){ + next = client_msg->next; + rc = db__message_write_inflight_out_single(context, client_msg); if(rc) return rc; - tail = next; + client_msg = next; } return MOSQ_ERR_SUCCESS; } @@ -1399,27 +1402,27 @@ int db__message_write_inflight_out_latest(struct mosquitto *context) int db__message_write_queued_in(struct mosquitto *context) { - struct mosquitto__client_msg *tail, *tmp; + struct mosquitto__client_msg *client_msg, *tmp; int rc; if(context->state != mosq_cs_active){ return MOSQ_ERR_SUCCESS; } - DL_FOREACH_SAFE(context->msgs_in.queued, tail, tmp){ + DL_FOREACH_SAFE(context->msgs_in.queued, client_msg, tmp){ if(context->msgs_in.inflight_maximum != 0 && context->msgs_in.inflight_quota == 0){ break; } - if(tail->qos == 2){ - tail->state = mosq_ms_send_pubrec; + if(client_msg->data.qos == 2){ + client_msg->data.state = mosq_ms_send_pubrec; db__message_dequeue_first(context, &context->msgs_in); - rc = send__pubrec(context, tail->mid, 0, NULL); + rc = send__pubrec(context, client_msg->data.mid, 0, NULL); if(!rc){ - tail->state = mosq_ms_wait_for_pubrel; - plugin_persist__handle_client_msg_update(context, tail); + client_msg->data.state = mosq_ms_wait_for_pubrel; + plugin_persist__handle_client_msg_update(context, client_msg); }else{ - plugin_persist__handle_client_msg_update(context, tail); + plugin_persist__handle_client_msg_update(context, client_msg); return rc; } } @@ -1430,30 +1433,30 @@ int db__message_write_queued_in(struct mosquitto *context) int db__message_write_queued_out(struct mosquitto *context) { - struct mosquitto__client_msg *tail, *tmp; + struct mosquitto__client_msg *client_msg, *tmp; if(context->state != mosq_cs_active){ return MOSQ_ERR_SUCCESS; } - DL_FOREACH_SAFE(context->msgs_out.queued, tail, tmp){ - if(!db__ready_for_flight(context, mosq_md_out, tail->qos)){ + DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){ + if(!db__ready_for_flight(context, mosq_md_out, client_msg->data.qos)){ break; } - switch(tail->qos){ + switch(client_msg->data.qos){ case 0: - tail->state = mosq_ms_publish_qos0; + client_msg->data.state = mosq_ms_publish_qos0; break; case 1: - tail->state = mosq_ms_publish_qos1; + client_msg->data.state = mosq_ms_publish_qos1; break; case 2: - tail->state = mosq_ms_publish_qos2; + client_msg->data.state = mosq_ms_publish_qos2; break; } db__message_dequeue_first(context, &context->msgs_out); - plugin_persist__handle_client_msg_update(context, tail); + plugin_persist__handle_client_msg_update(context, client_msg); } return MOSQ_ERR_SUCCESS; } diff --git a/src/handle_connect.c b/src/handle_connect.c index 28c31849..5bba3613 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -85,22 +85,24 @@ static char *client_id_gen(uint16_t *idlen, const char *auto_id_prefix, uint16_t * assuming a possible change of username. */ static void connection_check_acl(struct mosquitto *context, struct mosquitto__client_msg **head) { - struct mosquitto__client_msg *msg_tail, *tmp; + struct mosquitto__client_msg *client_msg, *tmp; + struct mosquitto__base_msg *base_msg; int access; - DL_FOREACH_SAFE((*head), msg_tail, tmp){ - if(msg_tail->direction == mosq_md_out){ + DL_FOREACH_SAFE((*head), client_msg, tmp){ + base_msg = client_msg->base_msg; + if(client_msg->data.direction == mosq_md_out){ access = MOSQ_ACL_READ; }else{ access = MOSQ_ACL_WRITE; } - if(mosquitto_acl_check(context, msg_tail->base_msg->msg.topic, - msg_tail->base_msg->msg.payloadlen, msg_tail->base_msg->msg.payload, - msg_tail->base_msg->msg.qos, msg_tail->base_msg->msg.retain, access) != MOSQ_ERR_SUCCESS){ + if(mosquitto_acl_check(context, base_msg->msg.topic, + base_msg->msg.payloadlen, base_msg->msg.payload, + base_msg->msg.qos, base_msg->msg.retain, access) != MOSQ_ERR_SUCCESS){ - DL_DELETE((*head), msg_tail); - db__msg_store_ref_dec(&msg_tail->base_msg); - mosquitto__FREE(msg_tail); + DL_DELETE((*head), client_msg); + db__msg_store_ref_dec(&client_msg->base_msg); + mosquitto__FREE(client_msg); } } } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index b098b216..2529aebd 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -420,17 +420,10 @@ struct mosquitto__base_msg{ }; struct mosquitto__client_msg{ + struct mosquitto_client_msg data; struct mosquitto__client_msg *prev; struct mosquitto__client_msg *next; struct mosquitto__base_msg *base_msg; - uint64_t cmsg_id; - uint32_t subscription_identifier; - uint16_t mid; - uint8_t qos; - bool retain; - enum mosquitto_msg_direction direction; - enum mosquitto_msg_state state; - bool dup; }; diff --git a/src/persist_read.c b/src/persist_read.c index cbbc7d5d..7bc5d6ce 100644 --- a/src/persist_read.c +++ b/src/persist_read.c @@ -141,19 +141,19 @@ static int persist__client_msg_restore(struct P_client_msg *chunk) cmsg->next = NULL; cmsg->base_msg = NULL; - cmsg->cmsg_id = ++context->last_cmsg_id; - cmsg->mid = chunk->F.mid; - cmsg->qos = chunk->F.qos; - cmsg->retain = (chunk->F.retain_dup&0xF0)>>4; - cmsg->direction = chunk->F.direction; - cmsg->state = chunk->F.state; - cmsg->dup = chunk->F.retain_dup&0x0F; - cmsg->subscription_identifier = chunk->subscription_identifier; + cmsg->data.cmsg_id = ++context->last_cmsg_id; + cmsg->data.mid = chunk->F.mid; + cmsg->data.qos = chunk->F.qos; + cmsg->data.retain = (chunk->F.retain_dup&0xF0)>>4; + cmsg->data.direction = chunk->F.direction; + cmsg->data.state = chunk->F.state; + cmsg->data.dup = chunk->F.retain_dup&0x0F; + cmsg->data.subscription_identifier = chunk->subscription_identifier; cmsg->base_msg = msg; db__msg_store_ref_inc(cmsg->base_msg); - if(cmsg->direction == mosq_md_out){ + if(cmsg->data.direction == mosq_md_out){ msg_data = &context->msgs_out; }else{ msg_data = &context->msgs_in; diff --git a/src/persist_write.c b/src/persist_write.c index a0e168e6..265b6ee8 100644 --- a/src/persist_write.c +++ b/src/persist_write.c @@ -62,14 +62,14 @@ static int persist__client_messages_save(FILE *db_fptr, struct mosquitto *contex } chunk.F.store_id = cmsg->base_msg->msg.store_id; - chunk.F.mid = cmsg->mid; + chunk.F.mid = cmsg->data.mid; chunk.F.id_len = (uint16_t)strlen(context->id); - chunk.F.qos = cmsg->qos; - chunk.F.retain_dup = (uint8_t)((cmsg->retain&0x0F)<<4 | (cmsg->dup&0x0F)); - chunk.F.direction = (uint8_t)cmsg->direction; - chunk.F.state = (uint8_t)cmsg->state; + chunk.F.qos = cmsg->data.qos; + chunk.F.retain_dup = (uint8_t)((cmsg->data.retain&0x0F)<<4 | (cmsg->data.dup&0x0F)); + chunk.F.direction = (uint8_t)cmsg->data.direction; + chunk.F.state = (uint8_t)cmsg->data.state; chunk.client_id = context->id; - chunk.subscription_identifier = cmsg->subscription_identifier; + chunk.subscription_identifier = cmsg->data.subscription_identifier; rc = persist__chunk_client_msg_write_v6(db_fptr, &chunk); if(rc){ diff --git a/src/plugin_persist.c b/src/plugin_persist.c index d0308802..0dbee3d3 100644 --- a/src/plugin_persist.c +++ b/src/plugin_persist.c @@ -183,14 +183,14 @@ void plugin_persist__handle_subscription_delete(struct mosquitto *context, char } -void plugin_persist__handle_client_msg_add(struct mosquitto *context, const struct mosquitto__client_msg *cmsg) +void plugin_persist__handle_client_msg_add(struct mosquitto *context, const struct mosquitto__client_msg *client_msg) { struct mosquitto_evt_persist_client_msg event_data; struct mosquitto__callback *cb_base; struct mosquitto__security_options *opts; if(context->is_persisted == false - || (cmsg->qos == 0 && db.config->queue_qos0_messages == false) + || (client_msg->data.qos == 0 && db.config->queue_qos0_messages == false) || db.shutdown){ return; @@ -199,15 +199,15 @@ void plugin_persist__handle_client_msg_add(struct mosquitto *context, const stru opts = &db.config->security_options; memset(&event_data, 0, sizeof(event_data)); - event_data.client_id = context->id; - event_data.cmsg_id = cmsg->cmsg_id; - event_data.store_id = cmsg->base_msg->msg.store_id; - event_data.mid = cmsg->mid; - event_data.qos = cmsg->qos; - event_data.retain = cmsg->retain; - event_data.dup = cmsg->dup; - event_data.direction = (uint8_t)cmsg->direction; - event_data.state = (uint8_t)cmsg->state; + event_data.data.client_id = context->id; + event_data.data.cmsg_id = client_msg->data.cmsg_id; + event_data.data.store_id = client_msg->base_msg->msg.store_id; + event_data.data.mid = client_msg->data.mid; + event_data.data.qos = client_msg->data.qos; + event_data.data.retain = client_msg->data.retain; + event_data.data.dup = client_msg->data.dup; + event_data.data.direction = (uint8_t)client_msg->data.direction; + event_data.data.state = (uint8_t)client_msg->data.state; DL_FOREACH(opts->plugin_callbacks.persist_client_msg_add, cb_base){ cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_MSG_ADD, &event_data, cb_base->userdata); @@ -215,14 +215,14 @@ void plugin_persist__handle_client_msg_add(struct mosquitto *context, const stru } -void plugin_persist__handle_client_msg_delete(struct mosquitto *context, const struct mosquitto__client_msg *cmsg) +void plugin_persist__handle_client_msg_delete(struct mosquitto *context, const struct mosquitto__client_msg *client_msg) { struct mosquitto_evt_persist_client_msg event_data; struct mosquitto__callback *cb_base; struct mosquitto__security_options *opts; if(context->is_persisted == false - || (cmsg->qos == 0 && db.config->queue_qos0_messages == false) + || (client_msg->data.qos == 0 && db.config->queue_qos0_messages == false) || db.shutdown){ return; @@ -231,13 +231,13 @@ void plugin_persist__handle_client_msg_delete(struct mosquitto *context, const s opts = &db.config->security_options; memset(&event_data, 0, sizeof(event_data)); - event_data.client_id = context->id; - event_data.cmsg_id = cmsg->cmsg_id; - event_data.mid = cmsg->mid; - event_data.state = (uint8_t)cmsg->state; - event_data.qos = cmsg->qos; - event_data.store_id = cmsg->base_msg->msg.store_id; - event_data.direction = (uint8_t)cmsg->direction; + event_data.data.client_id = context->id; + event_data.data.cmsg_id = client_msg->data.cmsg_id; + event_data.data.mid = client_msg->data.mid; + event_data.data.state = (uint8_t)client_msg->data.state; + event_data.data.qos = client_msg->data.qos; + event_data.data.store_id = client_msg->base_msg->msg.store_id; + event_data.data.direction = (uint8_t)client_msg->data.direction; DL_FOREACH(opts->plugin_callbacks.persist_client_msg_delete, cb_base){ cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_MSG_DELETE, &event_data, cb_base->userdata); @@ -245,14 +245,14 @@ void plugin_persist__handle_client_msg_delete(struct mosquitto *context, const s } -void plugin_persist__handle_client_msg_update(struct mosquitto *context, const struct mosquitto__client_msg *cmsg) +void plugin_persist__handle_client_msg_update(struct mosquitto *context, const struct mosquitto__client_msg *client_msg) { struct mosquitto_evt_persist_client_msg event_data; struct mosquitto__callback *cb_base; struct mosquitto__security_options *opts; if(context->is_persisted == false - || (cmsg->qos == 0 && db.config->queue_qos0_messages == false) + || (client_msg->data.qos == 0 && db.config->queue_qos0_messages == false) || db.shutdown){ return; @@ -261,14 +261,14 @@ void plugin_persist__handle_client_msg_update(struct mosquitto *context, const s opts = &db.config->security_options; memset(&event_data, 0, sizeof(event_data)); - event_data.client_id = context->id; - event_data.cmsg_id = cmsg->cmsg_id; - event_data.mid = cmsg->mid; - event_data.store_id = cmsg->base_msg->msg.store_id; - event_data.state = (uint8_t)cmsg->state; - event_data.dup = cmsg->dup; - event_data.direction = (uint8_t)cmsg->direction; - event_data.qos = cmsg->qos; + event_data.data.client_id = context->id; + event_data.data.cmsg_id = client_msg->data.cmsg_id; + event_data.data.mid = client_msg->data.mid; + event_data.data.store_id = client_msg->base_msg->msg.store_id; + event_data.data.state = (uint8_t)client_msg->data.state; + event_data.data.dup = client_msg->data.dup; + event_data.data.direction = (uint8_t)client_msg->data.direction; + event_data.data.qos = client_msg->data.qos; DL_FOREACH(opts->plugin_callbacks.persist_client_msg_update, cb_base){ cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_MSG_UPDATE, &event_data, cb_base->userdata); diff --git a/src/plugin_public.c b/src/plugin_public.c index ec8cceb9..0cdc7bc7 100644 --- a/src/plugin_public.c +++ b/src/plugin_public.c @@ -616,27 +616,28 @@ BROKER_EXPORT int mosquitto_persist_client_msg_add(struct mosquitto_evt_persist_ struct mosquitto *context; struct mosquitto__base_msg *base_msg; - if(client_msg == NULL || client_msg->client_id == NULL){ + if(client_msg == NULL || client_msg->data.client_id == NULL){ return MOSQ_ERR_INVAL; } - HASH_FIND(hh_id, db.contexts_by_id, client_msg->client_id, strlen(client_msg->client_id), context); + HASH_FIND(hh_id, db.contexts_by_id, client_msg->data.client_id, strlen(client_msg->data.client_id), context); if(context == NULL){ return MOSQ_ERR_NOT_FOUND; } - base_msg = find_store_msg(client_msg->store_id); + base_msg = find_store_msg(client_msg->data.store_id); if(base_msg == NULL){ return MOSQ_ERR_NOT_FOUND; } - if(client_msg->direction == mosq_md_out){ - if(client_msg->qos > 0){ - context->last_mid = client_msg->mid; + if(client_msg->data.direction == mosq_md_out){ + if(client_msg->data.qos > 0){ + context->last_mid = client_msg->data.mid; } - return db__message_insert_outgoing(context, client_msg->cmsg_id, client_msg->mid, client_msg->qos, client_msg->retain, - base_msg, client_msg->subscription_identifier, false, false); - }else if(client_msg->direction == mosq_md_in){ - return db__message_insert_incoming(context, client_msg->cmsg_id, base_msg, false); + return db__message_insert_outgoing(context, client_msg->data.cmsg_id, client_msg->data.mid, + client_msg->data.qos, client_msg->data.retain, + base_msg, client_msg->data.subscription_identifier, false, false); + }else if(client_msg->data.direction == mosq_md_in){ + return db__message_insert_incoming(context, client_msg->data.cmsg_id, base_msg, false); }else{ return MOSQ_ERR_INVAL; } @@ -648,17 +649,17 @@ BROKER_EXPORT int mosquitto_persist_client_msg_delete(struct mosquitto_evt_persi { struct mosquitto *context; - if(client_msg == NULL || client_msg->client_id == NULL) return MOSQ_ERR_INVAL; + if(client_msg == NULL || client_msg->data.client_id == NULL) return MOSQ_ERR_INVAL; - HASH_FIND(hh_id, db.contexts_by_id, client_msg->client_id, strlen(client_msg->client_id), context); + HASH_FIND(hh_id, db.contexts_by_id, client_msg->data.client_id, strlen(client_msg->data.client_id), context); if(context == NULL){ return MOSQ_ERR_NOT_FOUND; } - if(client_msg->direction == mosq_md_out){ - return db__message_delete_outgoing(context, client_msg->mid, client_msg->state, client_msg->qos); - }else if(client_msg->direction == mosq_md_in){ - return db__message_remove_incoming(context, client_msg->mid); + if(client_msg->data.direction == mosq_md_out){ + return db__message_delete_outgoing(context, client_msg->data.mid, client_msg->data.state, client_msg->data.qos); + }else if(client_msg->data.direction == mosq_md_in){ + return db__message_remove_incoming(context, client_msg->data.mid); }else{ return MOSQ_ERR_INVAL; } @@ -670,16 +671,16 @@ BROKER_EXPORT int mosquitto_persist_client_msg_update(struct mosquitto_evt_persi { struct mosquitto *context; - if(client_msg == NULL || client_msg->client_id == NULL) return MOSQ_ERR_INVAL; + if(client_msg == NULL || client_msg->data.client_id == NULL) return MOSQ_ERR_INVAL; - HASH_FIND(hh_id, db.contexts_by_id, client_msg->client_id, strlen(client_msg->client_id), context); + HASH_FIND(hh_id, db.contexts_by_id, client_msg->data.client_id, strlen(client_msg->data.client_id), context); if(context == NULL){ return MOSQ_ERR_NOT_FOUND; } - if(client_msg->direction == mosq_md_out){ - db__message_update_outgoing(context, client_msg->mid, client_msg->state, client_msg->qos, false); - }else if(client_msg->direction == mosq_md_in){ + if(client_msg->data.direction == mosq_md_out){ + db__message_update_outgoing(context, client_msg->data.mid, client_msg->data.state, client_msg->data.qos, false); + }else if(client_msg->data.direction == mosq_md_in){ // FIXME db__message_update_incoming(context, client_msg->mid, client_msg->state, client_msg->qos, false); }else{ return MOSQ_ERR_INVAL; @@ -692,16 +693,16 @@ BROKER_EXPORT int mosquitto_persist_client_msg_clear(struct mosquitto_evt_persis { struct mosquitto *context; - if(client_msg == NULL || client_msg->client_id == NULL) return MOSQ_ERR_INVAL; + if(client_msg == NULL || client_msg->data.client_id == NULL) return MOSQ_ERR_INVAL; - HASH_FIND(hh_id, db.contexts_by_id, client_msg->client_id, strlen(client_msg->client_id), context); + HASH_FIND(hh_id, db.contexts_by_id, client_msg->data.client_id, strlen(client_msg->data.client_id), context); if(context == NULL){ return MOSQ_ERR_NOT_FOUND; } - if(client_msg->direction == mosq_bmd_in || client_msg->direction == mosq_bmd_all){ + if(client_msg->data.direction == mosq_bmd_in || client_msg->data.direction == mosq_bmd_all){ db__messages_delete_incoming(context); - }else if(client_msg->direction == mosq_bmd_out || client_msg->direction == mosq_bmd_all){ + }else if(client_msg->data.direction == mosq_bmd_out || client_msg->data.direction == mosq_bmd_all){ db__messages_delete_outgoing(context); } return MOSQ_ERR_SUCCESS; diff --git a/test/unit/persist_read_test.c b/test/unit/persist_read_test.c index 1a07bbf1..c9c544eb 100644 --- a/test/unit/persist_read_test.c +++ b/test/unit/persist_read_test.c @@ -299,13 +299,13 @@ static void TEST_v3_client_message(void) CU_ASSERT_NSTRING_EQUAL(context->msgs_out.inflight->base_msg->msg.payload, "payload", 7); } } - CU_ASSERT_EQUAL(context->msgs_out.inflight->mid, 0x73); - CU_ASSERT_EQUAL(context->msgs_out.inflight->qos, 1); - CU_ASSERT_EQUAL(context->msgs_out.inflight->retain, 0); - CU_ASSERT_EQUAL(context->msgs_out.inflight->direction, mosq_md_out); - CU_ASSERT_EQUAL(context->msgs_out.inflight->state, mosq_ms_wait_for_puback); - CU_ASSERT_EQUAL(context->msgs_out.inflight->dup, 0); - CU_ASSERT_EQUAL(context->msgs_out.inflight->subscription_identifier, 0); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.mid, 0x73); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.qos, 1); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.retain, 0); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.direction, mosq_md_out); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.state, mosq_ms_wait_for_puback); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.dup, 0); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.subscription_identifier, 0); } } } @@ -674,13 +674,13 @@ static void TEST_v6_client_message(void) CU_ASSERT_NSTRING_EQUAL(context->msgs_out.inflight->base_msg->msg.payload, "payload", 7); } } - CU_ASSERT_EQUAL(context->msgs_out.inflight->mid, 0x73); - CU_ASSERT_EQUAL(context->msgs_out.inflight->qos, 1); - CU_ASSERT_EQUAL(context->msgs_out.inflight->retain, 0); - CU_ASSERT_EQUAL(context->msgs_out.inflight->direction, mosq_md_out); - CU_ASSERT_EQUAL(context->msgs_out.inflight->state, mosq_ms_wait_for_puback); - CU_ASSERT_EQUAL(context->msgs_out.inflight->dup, 0); - CU_ASSERT_EQUAL(context->msgs_out.inflight->subscription_identifier, 0); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.mid, 0x73); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.qos, 1); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.retain, 0); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.direction, mosq_md_out); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.state, mosq_ms_wait_for_puback); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.dup, 0); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.subscription_identifier, 0); } } } @@ -723,13 +723,13 @@ static void TEST_v6_client_message_props(void) CU_ASSERT_NSTRING_EQUAL(context->msgs_out.inflight->base_msg->msg.payload, "payload", 7); } } - CU_ASSERT_EQUAL(context->msgs_out.inflight->mid, 0x73); - CU_ASSERT_EQUAL(context->msgs_out.inflight->qos, 1); - CU_ASSERT_EQUAL(context->msgs_out.inflight->retain, 0); - CU_ASSERT_EQUAL(context->msgs_out.inflight->direction, mosq_md_out); - CU_ASSERT_EQUAL(context->msgs_out.inflight->state, mosq_ms_wait_for_puback); - CU_ASSERT_EQUAL(context->msgs_out.inflight->dup, 0); - CU_ASSERT_EQUAL(context->msgs_out.inflight->subscription_identifier, 1); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.mid, 0x73); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.qos, 1); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.retain, 0); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.direction, mosq_md_out); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.state, mosq_ms_wait_for_puback); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.dup, 0); + CU_ASSERT_EQUAL(context->msgs_out.inflight->data.subscription_identifier, 1); } } }