mirror of
https://github.com/eclipse-mosquitto/mosquitto.git
synced 2026-02-06 02:52:07 +08:00
Refactoring in database.c
Signed-off-by: Norbert Heusser <norbert.heusser@cedalo.com>
This commit is contained in:
committed by
Roger Light
parent
d68dd563db
commit
d14d6bdfc8
@@ -363,6 +363,29 @@ static void db__message_remove_queued(struct mosquitto *context, struct mosquitt
|
||||
mosquitto__FREE(item);
|
||||
}
|
||||
|
||||
static void db__fill_inflight_out_from_queue(struct mosquitto *context)
|
||||
{
|
||||
struct mosquitto__client_msg *client_msg, *tmp;
|
||||
|
||||
DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){
|
||||
if(!db__ready_for_flight(context, mosq_md_out, client_msg->data.qos)){
|
||||
return;
|
||||
}
|
||||
switch(client_msg->data.qos){
|
||||
case 0:
|
||||
client_msg->data.state = mosq_ms_publish_qos0;
|
||||
break;
|
||||
case 1:
|
||||
client_msg->data.state = mosq_ms_publish_qos1;
|
||||
break;
|
||||
case 2:
|
||||
client_msg->data.state = mosq_ms_publish_qos2;
|
||||
break;
|
||||
}
|
||||
plugin_persist__handle_client_msg_update(context, client_msg);
|
||||
db__message_dequeue_first(context, &context->msgs_out);
|
||||
}
|
||||
}
|
||||
|
||||
void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data)
|
||||
{
|
||||
@@ -415,26 +438,7 @@ int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mo
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){
|
||||
if(!db__ready_for_flight(context, mosq_md_out, client_msg->data.qos)){
|
||||
break;
|
||||
}
|
||||
|
||||
switch(client_msg->data.qos){
|
||||
case 0:
|
||||
client_msg->data.state = mosq_ms_publish_qos0;
|
||||
break;
|
||||
case 1:
|
||||
client_msg->data.state = mosq_ms_publish_qos1;
|
||||
break;
|
||||
case 2:
|
||||
client_msg->data.state = mosq_ms_publish_qos2;
|
||||
break;
|
||||
}
|
||||
plugin_persist__handle_client_msg_update(context, client_msg);
|
||||
db__message_dequeue_first(context, &context->msgs_out);
|
||||
}
|
||||
db__fill_inflight_out_from_queue(context);
|
||||
#ifdef WITH_PERSISTENCE
|
||||
db.persistence_changes++;
|
||||
#endif
|
||||
@@ -1036,22 +1040,8 @@ static int db__message_reconnect_reset_outgoing(struct mosquitto *context)
|
||||
*/
|
||||
DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){
|
||||
db__msg_add_to_queued_stats(&context->msgs_out, client_msg);
|
||||
if(db__ready_for_flight(context, mosq_md_out, client_msg->data.qos)){
|
||||
switch(client_msg->data.qos){
|
||||
case 0:
|
||||
client_msg->data.state = mosq_ms_publish_qos0;
|
||||
break;
|
||||
case 1:
|
||||
client_msg->data.state = mosq_ms_publish_qos1;
|
||||
break;
|
||||
case 2:
|
||||
client_msg->data.state = mosq_ms_publish_qos2;
|
||||
break;
|
||||
}
|
||||
db__message_dequeue_first(context, &context->msgs_out);
|
||||
plugin_persist__handle_client_msg_update(context, client_msg);
|
||||
}
|
||||
}
|
||||
db__fill_inflight_out_from_queue(context);
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
@@ -1436,30 +1426,11 @@ int db__message_write_queued_in(struct mosquitto *context)
|
||||
|
||||
int db__message_write_queued_out(struct mosquitto *context)
|
||||
{
|
||||
struct mosquitto__client_msg *client_msg, *tmp;
|
||||
|
||||
if(context->state != mosq_cs_active){
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){
|
||||
if(!db__ready_for_flight(context, mosq_md_out, client_msg->data.qos)){
|
||||
break;
|
||||
}
|
||||
db__fill_inflight_out_from_queue(context);
|
||||
|
||||
switch(client_msg->data.qos){
|
||||
case 0:
|
||||
client_msg->data.state = mosq_ms_publish_qos0;
|
||||
break;
|
||||
case 1:
|
||||
client_msg->data.state = mosq_ms_publish_qos1;
|
||||
break;
|
||||
case 2:
|
||||
client_msg->data.state = mosq_ms_publish_qos2;
|
||||
break;
|
||||
}
|
||||
db__message_dequeue_first(context, &context->msgs_out);
|
||||
plugin_persist__handle_client_msg_update(context, client_msg);
|
||||
}
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user