mirror of
https://github.com/eclipse-mosquitto/mosquitto.git
synced 2026-02-06 19:32:00 +08:00
Add mosquitto_rr, the "request-response" client.
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -16,6 +16,7 @@ cpp/*.test
|
||||
build/
|
||||
|
||||
client/mosquitto_pub
|
||||
client/mosquitto_rr
|
||||
client/mosquitto_sub
|
||||
|
||||
cov-int/
|
||||
@@ -36,6 +37,7 @@ man/mosquitto.conf.5
|
||||
man/libmosquitto.3
|
||||
man/mosquitto_passwd.1
|
||||
man/mosquitto_pub.1
|
||||
man/mosquitto_rr.1
|
||||
man/mosquitto_sub.1
|
||||
man/mqtt.7
|
||||
|
||||
|
||||
@@ -30,6 +30,8 @@ Client library features:
|
||||
- Add support for OCSP stapling to bridges.
|
||||
|
||||
Client features:
|
||||
- Add mosquitto_rr client, which can be used for "request-response" messaging,
|
||||
by sending a request message and awaiting a response.
|
||||
- Add -E to mosquitto_sub, which causes it to exit immediately after having
|
||||
its subscriptions acknowledged. Use with -c to create a durable client
|
||||
session without requiring a message to be received.
|
||||
|
||||
@@ -11,15 +11,19 @@ endif (WITH_SRV)
|
||||
|
||||
add_executable(mosquitto_pub pub_client.c pub_shared.c ${shared_src})
|
||||
add_executable(mosquitto_sub sub_client.c sub_client_output.c ${shared_src})
|
||||
add_executable(mosquitto_pub rr_client.c pub_shared.c ${shared_src})
|
||||
|
||||
|
||||
target_link_libraries(mosquitto_pub libmosquitto)
|
||||
target_link_libraries(mosquitto_sub libmosquitto)
|
||||
target_link_libraries(mosquitto_rr libmosquitto)
|
||||
|
||||
if (QNX)
|
||||
target_link_libraries(mosquitto_pub socket)
|
||||
target_link_libraries(mosquitto_sub socket)
|
||||
target_link_libraries(mosquitto_rr socket)
|
||||
endif()
|
||||
|
||||
install(TARGETS mosquitto_pub RUNTIME DESTINATION "${BINDIR}" LIBRARY DESTINATION "${LIBDIR}")
|
||||
install(TARGETS mosquitto_sub RUNTIME DESTINATION "${BINDIR}" LIBRARY DESTINATION "${LIBDIR}")
|
||||
install(TARGETS mosquitto_rr RUNTIME DESTINATION "${BINDIR}" LIBRARY DESTINATION "${LIBDIR}")
|
||||
|
||||
@@ -1,23 +1,23 @@
|
||||
include ../config.mk
|
||||
|
||||
.PHONY: all install uninstall reallyclean clean static static_pub static_sub
|
||||
.PHONY: all install uninstall reallyclean clean static static_pub static_sub static_rr
|
||||
|
||||
ifeq ($(WITH_SHARED_LIBRARIES),yes)
|
||||
SHARED_DEP:=../lib/libmosquitto.so.${SOVERSION}
|
||||
endif
|
||||
|
||||
ifeq ($(WITH_SHARED_LIBRARIES),yes)
|
||||
ALL_DEPS:= mosquitto_pub mosquitto_sub
|
||||
ALL_DEPS:= mosquitto_pub mosquitto_sub mosquitto_rr
|
||||
else
|
||||
ifeq ($(WITH_STATIC_LIBRARIES),yes)
|
||||
ALL_DEPS:= static_pub static_sub
|
||||
ALL_DEPS:= static_pub static_sub static_rr
|
||||
endif
|
||||
endif
|
||||
|
||||
all : ${ALL_DEPS}
|
||||
|
||||
static : static_pub static_sub
|
||||
# This makes mosquitto_pub/sub versions that are statically linked with
|
||||
static : static_pub static_sub static_rr
|
||||
# This makes mosquitto_pub/sub/rr versions that are statically linked with
|
||||
# libmosquitto only.
|
||||
|
||||
static_pub : pub_client.o pub_shared.o client_props.o client_shared.o ../lib/libmosquitto.a
|
||||
@@ -26,12 +26,18 @@ static_pub : pub_client.o pub_shared.o client_props.o client_shared.o ../lib/lib
|
||||
static_sub : sub_client.o sub_client_output.o client_props.o client_shared.o ../lib/libmosquitto.a
|
||||
${CROSS_COMPILE}${CC} $^ -o mosquitto_sub ${CLIENT_LDFLAGS} ${STATIC_LIB_DEPS}
|
||||
|
||||
static_rr : rr_client.o client_props.o client_shared.o pub_shared.o ../lib/libmosquitto.a
|
||||
${CROSS_COMPILE}${CC} $^ -o mosquitto_rr ${CLIENT_LDFLAGS} ${STATIC_LIB_DEPS}
|
||||
|
||||
mosquitto_pub : pub_client.o pub_shared.o client_shared.o client_props.o
|
||||
${CROSS_COMPILE}${CC} $^ -o $@ ${CLIENT_LDFLAGS}
|
||||
|
||||
mosquitto_sub : sub_client.o sub_client_output.o client_shared.o client_props.o
|
||||
${CROSS_COMPILE}${CC} $^ -o $@ ${CLIENT_LDFLAGS}
|
||||
|
||||
mosquitto_rr : rr_client.o client_shared.o client_props.o pub_shared.o sub_client_output.o
|
||||
${CROSS_COMPILE}${CC} $^ -o $@ ${CLIENT_LDFLAGS}
|
||||
|
||||
pub_client.o : pub_client.c ${SHARED_DEP}
|
||||
${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS}
|
||||
|
||||
@@ -44,6 +50,9 @@ sub_client.o : sub_client.c ${SHARED_DEP}
|
||||
sub_client_output.o : sub_client_output.c ${SHARED_DEP}
|
||||
${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS}
|
||||
|
||||
rr_client.o : rr_client.c ${SHARED_DEP}
|
||||
${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS}
|
||||
|
||||
client_shared.o : client_shared.c client_shared.h
|
||||
${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS}
|
||||
|
||||
@@ -60,12 +69,14 @@ install : all
|
||||
$(INSTALL) -d "${DESTDIR}$(prefix)/bin"
|
||||
$(INSTALL) ${STRIP_OPTS} mosquitto_pub "${DESTDIR}${prefix}/bin/mosquitto_pub"
|
||||
$(INSTALL) ${STRIP_OPTS} mosquitto_sub "${DESTDIR}${prefix}/bin/mosquitto_sub"
|
||||
$(INSTALL) ${STRIP_OPTS} mosquitto_rr "${DESTDIR}${prefix}/bin/mosquitto_rr"
|
||||
|
||||
uninstall :
|
||||
-rm -f "${DESTDIR}${prefix}/bin/mosquitto_pub"
|
||||
-rm -f "${DESTDIR}${prefix}/bin/mosquitto_sub"
|
||||
-rm -f "${DESTDIR}${prefix}/bin/mosquitto_rr"
|
||||
|
||||
reallyclean : clean
|
||||
|
||||
clean :
|
||||
-rm -f *.o mosquitto_pub mosquitto_sub *.gcda *.gcno
|
||||
-rm -f *.o mosquitto_pub mosquitto_sub mosquitto_rr *.gcda *.gcno
|
||||
|
||||
@@ -121,7 +121,7 @@ static int check_format(const char *str)
|
||||
}
|
||||
|
||||
|
||||
void init_config(struct mosq_config *cfg)
|
||||
void init_config(struct mosq_config *cfg, int pub_or_sub)
|
||||
{
|
||||
memset(cfg, 0, sizeof(*cfg));
|
||||
cfg->port = -1;
|
||||
@@ -129,7 +129,12 @@ void init_config(struct mosq_config *cfg)
|
||||
cfg->keepalive = 60;
|
||||
cfg->clean_session = true;
|
||||
cfg->eol = true;
|
||||
cfg->protocol_version = MQTT_PROTOCOL_V311;
|
||||
if(pub_or_sub == CLIENT_RR){
|
||||
cfg->protocol_version = MQTT_PROTOCOL_V5;
|
||||
cfg->msg_count = 1;
|
||||
}else{
|
||||
cfg->protocol_version = MQTT_PROTOCOL_V311;
|
||||
}
|
||||
}
|
||||
|
||||
void client_config_cleanup(struct mosq_config *cfg)
|
||||
@@ -147,6 +152,7 @@ void client_config_cleanup(struct mosq_config *cfg)
|
||||
free(cfg->will_topic);
|
||||
free(cfg->will_payload);
|
||||
free(cfg->format);
|
||||
free(cfg->response_topic);
|
||||
#ifdef WITH_TLS
|
||||
free(cfg->cafile);
|
||||
free(cfg->capath);
|
||||
@@ -210,7 +216,7 @@ int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char *
|
||||
#endif
|
||||
args[0] = NULL;
|
||||
|
||||
init_config(cfg);
|
||||
init_config(cfg, pub_or_sub);
|
||||
|
||||
/* Default config file */
|
||||
#ifndef WIN32
|
||||
@@ -224,8 +230,10 @@ int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char *
|
||||
}
|
||||
if(pub_or_sub == CLIENT_PUB){
|
||||
snprintf(loc, len, "%s/mosquitto_pub", env);
|
||||
}else{
|
||||
}else if(pub_or_sub == CLIENT_SUB){
|
||||
snprintf(loc, len, "%s/mosquitto_sub", env);
|
||||
}else{
|
||||
snprintf(loc, len, "%s/mosquitto_rr", env);
|
||||
}
|
||||
loc[len-1] = '\0';
|
||||
}else{
|
||||
@@ -239,8 +247,10 @@ int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char *
|
||||
}
|
||||
if(pub_or_sub == CLIENT_PUB){
|
||||
snprintf(loc, len, "%s/.config/mosquitto_pub", env);
|
||||
}else{
|
||||
}else if(pub_or_sub == CLIENT_SUB){
|
||||
snprintf(loc, len, "%s/.config/mosquitto_sub", env);
|
||||
}else{
|
||||
snprintf(loc, len, "%s/.config/mosquitto_rr", env);
|
||||
}
|
||||
loc[len-1] = '\0';
|
||||
}else{
|
||||
@@ -259,8 +269,10 @@ int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char *
|
||||
}
|
||||
if(pub_or_sub == CLIENT_PUB){
|
||||
snprintf(loc, len, "%s\\mosquitto_pub.conf", env);
|
||||
}else{
|
||||
}else if(pub_or_sub == CLIENT_SUB){
|
||||
snprintf(loc, len, "%s\\mosquitto_sub.conf", env);
|
||||
}else{
|
||||
snprintf(loc, len, "%s\\mosquitto_rr.conf", env);
|
||||
}
|
||||
loc[len-1] = '\0';
|
||||
}else{
|
||||
@@ -394,19 +406,25 @@ int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char *
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
int cfg_add_topic(struct mosq_config *cfg, int pub_or_sub, char *topic, const char *arg)
|
||||
int cfg_add_topic(struct mosq_config *cfg, int type, char *topic, const char *arg)
|
||||
{
|
||||
if(mosquitto_validate_utf8(topic, strlen(topic))){
|
||||
fprintf(stderr, "Error: Malformed UTF-8 in %s argument.\n\n", arg);
|
||||
return 1;
|
||||
}
|
||||
if(pub_or_sub == CLIENT_PUB){
|
||||
if(type == CLIENT_PUB || type == CLIENT_RR){
|
||||
if(mosquitto_pub_topic_check(topic) == MOSQ_ERR_INVAL){
|
||||
fprintf(stderr, "Error: Invalid publish topic '%s', does it contain '+' or '#'?\n", topic);
|
||||
return 1;
|
||||
}
|
||||
cfg->topic = strdup(topic);
|
||||
} else {
|
||||
}else if(type == CLIENT_RESPONSE_TOPIC){
|
||||
if(mosquitto_pub_topic_check(topic) == MOSQ_ERR_INVAL){
|
||||
fprintf(stderr, "Error: Invalid response topic '%s', does it contain '+' or '#'?\n", topic);
|
||||
return 1;
|
||||
}
|
||||
cfg->response_topic = strdup(topic);
|
||||
}else{
|
||||
if(mosquitto_sub_topic_check(topic) == MOSQ_ERR_INVAL){
|
||||
fprintf(stderr, "Error: Invalid subscription topic '%s', are all '+' and '#' wildcards correct?\n", topic);
|
||||
return 1;
|
||||
@@ -471,7 +489,7 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
|
||||
i++;
|
||||
#endif
|
||||
}else if(!strcmp(argv[i], "-C")){
|
||||
if(pub_or_sub == CLIENT_PUB){
|
||||
if(pub_or_sub != CLIENT_SUB){
|
||||
goto unknown_option;
|
||||
}else{
|
||||
if(i==argc-1){
|
||||
@@ -496,8 +514,21 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
|
||||
return 1;
|
||||
}
|
||||
cfg->protocol_version = MQTT_PROTOCOL_V5;
|
||||
}else if(!strcmp(argv[i], "-e")){
|
||||
if(pub_or_sub != CLIENT_RR){
|
||||
goto unknown_option;
|
||||
}
|
||||
if(i==argc-1){
|
||||
fprintf(stderr, "Error: -e argument given but no response topic specified.\n\n");
|
||||
return 1;
|
||||
}else{
|
||||
if(cfg_add_topic(cfg, CLIENT_RESPONSE_TOPIC, argv[i+1], "-e")){
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
i++;
|
||||
}else if(!strcmp(argv[i], "-E")){
|
||||
if(pub_or_sub == CLIENT_PUB){
|
||||
if(pub_or_sub != CLIENT_SUB){
|
||||
goto unknown_option;
|
||||
}
|
||||
cfg->exit_after_sub = true;
|
||||
@@ -652,7 +683,7 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
|
||||
}
|
||||
i++;
|
||||
}else if(!strcmp(argv[i], "-l") || !strcmp(argv[i], "--stdin-line")){
|
||||
if(pub_or_sub == CLIENT_SUB){
|
||||
if(pub_or_sub != CLIENT_PUB){
|
||||
goto unknown_option;
|
||||
}
|
||||
if(cfg->pub_mode != MSGMODE_NONE){
|
||||
@@ -765,7 +796,7 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
|
||||
}else if(!strcmp(argv[i], "--quiet")){
|
||||
cfg->quiet = true;
|
||||
}else if(!strcmp(argv[i], "-r") || !strcmp(argv[i], "--retain")){
|
||||
if(pub_or_sub == CLIENT_SUB){
|
||||
if(pub_or_sub != CLIENT_PUB){
|
||||
goto unknown_option;
|
||||
}
|
||||
cfg->retain = 1;
|
||||
@@ -774,8 +805,9 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
|
||||
goto unknown_option;
|
||||
}
|
||||
cfg->no_retain = true;
|
||||
cfg->sub_opts |= MQTT_SUB_OPT_SEND_RETAIN_NEVER;
|
||||
}else if(!strcmp(argv[i], "--remove-retained")){
|
||||
if(pub_or_sub == CLIENT_PUB){
|
||||
if(pub_or_sub != CLIENT_SUB){
|
||||
goto unknown_option;
|
||||
}
|
||||
cfg->remove_retained = true;
|
||||
@@ -785,7 +817,7 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
|
||||
}
|
||||
cfg->sub_opts |= MQTT_SUB_OPT_RETAIN_AS_PUBLISHED;
|
||||
}else if(!strcmp(argv[i], "--retained-only")){
|
||||
if(pub_or_sub == CLIENT_PUB){
|
||||
if(pub_or_sub != CLIENT_SUB){
|
||||
goto unknown_option;
|
||||
}
|
||||
cfg->retained_only = true;
|
||||
@@ -813,7 +845,7 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
|
||||
i++;
|
||||
}
|
||||
}else if(!strcmp(argv[i], "-T") || !strcmp(argv[i], "--filter-out")){
|
||||
if(pub_or_sub == CLIENT_PUB){
|
||||
if(pub_or_sub != CLIENT_SUB){
|
||||
goto unknown_option;
|
||||
}
|
||||
if(i==argc-1){
|
||||
@@ -864,7 +896,7 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
|
||||
i++;
|
||||
#endif
|
||||
}else if(!strcmp(argv[i], "-U") || !strcmp(argv[i], "--unsubscribe")){
|
||||
if(pub_or_sub == CLIENT_PUB){
|
||||
if(pub_or_sub != CLIENT_SUB){
|
||||
goto unknown_option;
|
||||
}
|
||||
if(i==argc-1){
|
||||
|
||||
@@ -29,6 +29,8 @@ Contributors:
|
||||
|
||||
#define CLIENT_PUB 1
|
||||
#define CLIENT_SUB 2
|
||||
#define CLIENT_RR 3
|
||||
#define CLIENT_RESPONSE_TOPIC 4
|
||||
|
||||
struct mosq_config {
|
||||
char *id;
|
||||
@@ -39,11 +41,11 @@ struct mosq_config {
|
||||
int port;
|
||||
int qos;
|
||||
bool retain;
|
||||
int pub_mode; /* pub */
|
||||
char *file_input; /* pub */
|
||||
char *message; /* pub */
|
||||
long msglen; /* pub */
|
||||
char *topic; /* pub */
|
||||
int pub_mode; /* pub, rr */
|
||||
char *file_input; /* pub, rr */
|
||||
char *message; /* pub, rr */
|
||||
long msglen; /* pub, rr */
|
||||
char *topic; /* pub, rr */
|
||||
char *bind_address;
|
||||
#ifdef WITH_SRV
|
||||
bool use_srv;
|
||||
@@ -104,6 +106,7 @@ struct mosq_config {
|
||||
mosquitto_property *disconnect_props;
|
||||
mosquitto_property *will_props;
|
||||
bool have_topic_alias; /* pub */
|
||||
char *response_topic; /* rr */
|
||||
};
|
||||
|
||||
int client_config_load(struct mosq_config *config, int pub_or_sub, int argc, char *argv[]);
|
||||
|
||||
@@ -29,6 +29,7 @@ Contributors:
|
||||
#define snprintf sprintf_s
|
||||
#endif
|
||||
|
||||
#include <mqtt_protocol.h>
|
||||
#include <mosquitto.h>
|
||||
#include "client_shared.h"
|
||||
#include "pub_shared.h"
|
||||
@@ -36,6 +37,23 @@ Contributors:
|
||||
/* Global variables for use in callbacks. See sub_client.c for an example of
|
||||
* using a struct to hold variables for use in callbacks. */
|
||||
static bool first_publish = true;
|
||||
static int last_mid = -1;
|
||||
static int last_mid_sent = -1;
|
||||
static char *line_buf = NULL;
|
||||
static int line_buf_len = 1024;
|
||||
static bool connected = true;
|
||||
static bool disconnect_sent = false;
|
||||
|
||||
|
||||
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc, const mosquitto_property *properties)
|
||||
{
|
||||
UNUSED(mosq);
|
||||
UNUSED(obj);
|
||||
UNUSED(rc);
|
||||
UNUSED(properties);
|
||||
|
||||
connected = false;
|
||||
}
|
||||
|
||||
int my_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, void *payload, int qos, bool retain)
|
||||
{
|
||||
@@ -107,6 +125,124 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flag
|
||||
}
|
||||
|
||||
|
||||
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_code, const mosquitto_property *properties)
|
||||
{
|
||||
UNUSED(obj);
|
||||
UNUSED(properties);
|
||||
|
||||
last_mid_sent = mid;
|
||||
if(reason_code > 127){
|
||||
if(!cfg.quiet) fprintf(stderr, "Warning: Publish %d failed: %s.\n", mid, mosquitto_reason_string(reason_code));
|
||||
}
|
||||
if(cfg.pub_mode == MSGMODE_STDIN_LINE){
|
||||
if(mid == last_mid){
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
disconnect_sent = true;
|
||||
}
|
||||
}else if(disconnect_sent == false){
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
disconnect_sent = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int pub_shared_init(void)
|
||||
{
|
||||
line_buf = malloc(line_buf_len);
|
||||
if(!line_buf){
|
||||
fprintf(stderr, "Error: Out of memory.\n");
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int pub_shared_loop(struct mosquitto *mosq)
|
||||
{
|
||||
int read_len;
|
||||
int pos;
|
||||
int rc, rc2;
|
||||
char *buf2;
|
||||
int buf_len_actual;
|
||||
int mode;
|
||||
|
||||
mode = cfg.pub_mode;
|
||||
|
||||
if(mode == MSGMODE_STDIN_LINE){
|
||||
mosquitto_loop_start(mosq);
|
||||
}
|
||||
|
||||
do{
|
||||
if(mode == MSGMODE_STDIN_LINE){
|
||||
if(status == STATUS_CONNACK_RECVD){
|
||||
pos = 0;
|
||||
read_len = line_buf_len;
|
||||
while(connected && fgets(&line_buf[pos], read_len, stdin)){
|
||||
buf_len_actual = strlen(line_buf);
|
||||
if(line_buf[buf_len_actual-1] == '\n'){
|
||||
line_buf[buf_len_actual-1] = '\0';
|
||||
rc2 = my_publish(mosq, &mid_sent, cfg.topic, buf_len_actual-1, line_buf, cfg.qos, cfg.retain);
|
||||
if(rc2){
|
||||
if(!cfg.quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2);
|
||||
mosquitto_disconnect_v5(mosq, MQTT_RC_DISCONNECT_WITH_WILL_MSG, cfg.disconnect_props);
|
||||
}
|
||||
break;
|
||||
}else{
|
||||
line_buf_len += 1024;
|
||||
pos += 1023;
|
||||
read_len = 1024;
|
||||
buf2 = realloc(line_buf, line_buf_len);
|
||||
if(!buf2){
|
||||
fprintf(stderr, "Error: Out of memory.\n");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
line_buf = buf2;
|
||||
}
|
||||
}
|
||||
if(feof(stdin)){
|
||||
if(mid_sent == -1){
|
||||
/* Empty file */
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
disconnect_sent = true;
|
||||
status = STATUS_DISCONNECTING;
|
||||
}else{
|
||||
last_mid = mid_sent;
|
||||
status = STATUS_WAITING;
|
||||
}
|
||||
}
|
||||
}else if(status == STATUS_WAITING){
|
||||
if(last_mid_sent == last_mid && disconnect_sent == false){
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
disconnect_sent = true;
|
||||
}
|
||||
#ifdef WIN32
|
||||
Sleep(100);
|
||||
#else
|
||||
struct timespec ts;
|
||||
ts.tv_sec = 0;
|
||||
ts.tv_nsec = 100000000;
|
||||
nanosleep(&ts, NULL);
|
||||
#endif
|
||||
}
|
||||
rc = MOSQ_ERR_SUCCESS;
|
||||
}else{
|
||||
rc = mosquitto_loop(mosq, -1, 1);
|
||||
}
|
||||
}while(rc == MOSQ_ERR_SUCCESS && connected);
|
||||
|
||||
if(mode == MSGMODE_STDIN_LINE){
|
||||
mosquitto_loop_stop(mosq, false);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void pub_shared_cleanup(void)
|
||||
{
|
||||
free(line_buf);
|
||||
}
|
||||
|
||||
|
||||
void print_usage(void)
|
||||
{
|
||||
int major, minor, revision;
|
||||
|
||||
@@ -40,43 +40,6 @@ int mid_sent = 0;
|
||||
int status = STATUS_CONNECTING;
|
||||
struct mosq_config cfg;
|
||||
|
||||
static int last_mid = -1;
|
||||
static int last_mid_sent = -1;
|
||||
static bool connected = true;
|
||||
static bool disconnect_sent = false;
|
||||
static char *buf = NULL;
|
||||
static int buf_len = 1024;
|
||||
|
||||
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc, const mosquitto_property *properties)
|
||||
{
|
||||
UNUSED(mosq);
|
||||
UNUSED(obj);
|
||||
UNUSED(rc);
|
||||
UNUSED(properties);
|
||||
|
||||
connected = false;
|
||||
}
|
||||
|
||||
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_code, const mosquitto_property *properties)
|
||||
{
|
||||
UNUSED(obj);
|
||||
UNUSED(properties);
|
||||
|
||||
last_mid_sent = mid;
|
||||
if(reason_code > 127){
|
||||
if(!cfg.quiet) fprintf(stderr, "Warning: Publish %d failed: %s.\n", mid, mosquitto_reason_string(reason_code));
|
||||
}
|
||||
if(cfg.pub_mode == MSGMODE_STDIN_LINE){
|
||||
if(mid == last_mid){
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
disconnect_sent = true;
|
||||
}
|
||||
}else if(disconnect_sent == false){
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
disconnect_sent = true;
|
||||
}
|
||||
}
|
||||
|
||||
void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str)
|
||||
{
|
||||
UNUSED(mosq);
|
||||
@@ -161,98 +124,3 @@ int load_file(const char *filename)
|
||||
}
|
||||
|
||||
|
||||
int pub_shared_init(void)
|
||||
{
|
||||
buf = malloc(buf_len);
|
||||
if(!buf){
|
||||
fprintf(stderr, "Error: Out of memory.\n");
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int pub_shared_loop(struct mosquitto *mosq)
|
||||
{
|
||||
int read_len;
|
||||
int pos;
|
||||
int rc, rc2;
|
||||
char *buf2;
|
||||
int buf_len_actual;
|
||||
int mode;
|
||||
|
||||
mode = cfg.pub_mode;
|
||||
|
||||
if(mode == MSGMODE_STDIN_LINE){
|
||||
mosquitto_loop_start(mosq);
|
||||
}
|
||||
|
||||
do{
|
||||
if(mode == MSGMODE_STDIN_LINE){
|
||||
if(status == STATUS_CONNACK_RECVD){
|
||||
pos = 0;
|
||||
read_len = buf_len;
|
||||
while(connected && fgets(&buf[pos], read_len, stdin)){
|
||||
buf_len_actual = strlen(buf);
|
||||
if(buf[buf_len_actual-1] == '\n'){
|
||||
buf[buf_len_actual-1] = '\0';
|
||||
rc2 = my_publish(mosq, &mid_sent, cfg.topic, buf_len_actual-1, buf, cfg.qos, cfg.retain);
|
||||
if(rc2){
|
||||
if(!cfg.quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2);
|
||||
mosquitto_disconnect_v5(mosq, MQTT_RC_DISCONNECT_WITH_WILL_MSG, cfg.disconnect_props);
|
||||
}
|
||||
break;
|
||||
}else{
|
||||
buf_len += 1024;
|
||||
pos += 1023;
|
||||
read_len = 1024;
|
||||
buf2 = realloc(buf, buf_len);
|
||||
if(!buf2){
|
||||
fprintf(stderr, "Error: Out of memory.\n");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
buf = buf2;
|
||||
}
|
||||
}
|
||||
if(feof(stdin)){
|
||||
if(mid_sent == -1){
|
||||
/* Empty file */
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
disconnect_sent = true;
|
||||
status = STATUS_DISCONNECTING;
|
||||
}else{
|
||||
last_mid = mid_sent;
|
||||
status = STATUS_WAITING;
|
||||
}
|
||||
}
|
||||
}else if(status == STATUS_WAITING){
|
||||
if(last_mid_sent == last_mid && disconnect_sent == false){
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
disconnect_sent = true;
|
||||
}
|
||||
#ifdef WIN32
|
||||
Sleep(100);
|
||||
#else
|
||||
struct timespec ts;
|
||||
ts.tv_sec = 0;
|
||||
ts.tv_nsec = 100000000;
|
||||
nanosleep(&ts, NULL);
|
||||
#endif
|
||||
}
|
||||
rc = MOSQ_ERR_SUCCESS;
|
||||
}else{
|
||||
rc = mosquitto_loop(mosq, -1, 1);
|
||||
}
|
||||
}while(rc == MOSQ_ERR_SUCCESS && connected);
|
||||
|
||||
if(mode == MSGMODE_STDIN_LINE){
|
||||
mosquitto_loop_stop(mosq, false);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void pub_shared_cleanup(void)
|
||||
{
|
||||
free(buf);
|
||||
}
|
||||
|
||||
396
client/rr_client.c
Normal file
396
client/rr_client.c
Normal file
@@ -0,0 +1,396 @@
|
||||
/*
|
||||
Copyright (c) 2009-2018 Roger Light <roger@atchoo.org>
|
||||
|
||||
All rights reserved. This program and the accompanying materials
|
||||
are made available under the terms of the Eclipse Public License v1.0
|
||||
and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
|
||||
The Eclipse Public License is available at
|
||||
http://www.eclipse.org/legal/epl-v10.html
|
||||
and the Eclipse Distribution License is available at
|
||||
http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
|
||||
Contributors:
|
||||
Roger Light - initial implementation and documentation.
|
||||
*/
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#include <assert.h>
|
||||
#include <errno.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#ifndef WIN32
|
||||
#include <unistd.h>
|
||||
#include <signal.h>
|
||||
#else
|
||||
#include <process.h>
|
||||
#include <winsock2.h>
|
||||
#define snprintf sprintf_s
|
||||
#endif
|
||||
|
||||
#include <mosquitto.h>
|
||||
#include <mqtt_protocol.h>
|
||||
#include "client_shared.h"
|
||||
#include "pub_shared.h"
|
||||
|
||||
enum rr__state {
|
||||
rr_s_new,
|
||||
rr_s_connected,
|
||||
rr_s_subscribed,
|
||||
rr_s_ready_to_publish,
|
||||
rr_s_wait_for_response,
|
||||
rr_s_disconnect
|
||||
};
|
||||
|
||||
static enum rr__state client_state = rr_s_new;
|
||||
|
||||
struct mosq_config cfg;
|
||||
bool process_messages = true;
|
||||
int msg_count = 0;
|
||||
struct mosquitto *mosq = NULL;
|
||||
|
||||
#ifndef WIN32
|
||||
void my_signal_handler(int signum)
|
||||
{
|
||||
if(signum == SIGALRM){
|
||||
process_messages = false;
|
||||
mosquitto_disconnect_v5(mosq, MQTT_RC_DISCONNECT_WITH_WILL_MSG, cfg.disconnect_props);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
void print_message(struct mosq_config *cfg, const struct mosquitto_message *message);
|
||||
|
||||
|
||||
int my_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, void *payload, int qos, bool retain)
|
||||
{
|
||||
return mosquitto_publish_v5(mosq, mid, topic, payloadlen, payload, qos, retain, cfg.publish_props);
|
||||
}
|
||||
|
||||
|
||||
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message, const mosquitto_property *properties)
|
||||
{
|
||||
print_message(&cfg, message);
|
||||
switch(cfg.pub_mode){
|
||||
case MSGMODE_CMD:
|
||||
case MSGMODE_FILE:
|
||||
case MSGMODE_STDIN_FILE:
|
||||
case MSGMODE_NULL:
|
||||
client_state = rr_s_disconnect;
|
||||
break;
|
||||
case MSGMODE_STDIN_LINE:
|
||||
client_state = rr_s_ready_to_publish;
|
||||
break;
|
||||
}
|
||||
/* FIXME - check all below
|
||||
if(process_messages == false) return;
|
||||
|
||||
if(cfg.retained_only && !message->retain && process_messages){
|
||||
process_messages = false;
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
return;
|
||||
}
|
||||
|
||||
if(message->retain && cfg.no_retain) return;
|
||||
if(cfg.filter_outs){
|
||||
for(i=0; i<cfg.filter_out_count; i++){
|
||||
mosquitto_topic_matches_sub(cfg.filter_outs[i], message->topic, &res);
|
||||
if(res) return;
|
||||
}
|
||||
}
|
||||
|
||||
//print_message(&cfg, message);
|
||||
|
||||
if(cfg.msg_count>0){
|
||||
msg_count++;
|
||||
if(cfg.msg_count == msg_count){
|
||||
process_messages = false;
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flags, const mosquitto_property *properties)
|
||||
{
|
||||
if(!result){
|
||||
client_state = rr_s_connected;
|
||||
mosquitto_subscribe_v5(mosq, NULL, cfg.response_topic, cfg.qos, 0, cfg.subscribe_props);
|
||||
}else{
|
||||
client_state = rr_s_disconnect;
|
||||
if(result && !cfg.quiet){
|
||||
fprintf(stderr, "%s\n", mosquitto_connack_string(result));
|
||||
}
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
|
||||
{
|
||||
int rc = MOSQ_ERR_SUCCESS;
|
||||
|
||||
if(granted_qos[0] < 128){
|
||||
client_state = rr_s_ready_to_publish;
|
||||
|
||||
if(rc){
|
||||
if(!cfg.quiet){
|
||||
switch(rc){
|
||||
case MOSQ_ERR_INVAL:
|
||||
fprintf(stderr, "Error: Invalid input. Does your topic contain '+' or '#'?\n");
|
||||
break;
|
||||
case MOSQ_ERR_NOMEM:
|
||||
fprintf(stderr, "Error: Out of memory when trying to publish message.\n");
|
||||
break;
|
||||
case MOSQ_ERR_NO_CONN:
|
||||
fprintf(stderr, "Error: Client not connected when trying to publish.\n");
|
||||
break;
|
||||
case MOSQ_ERR_PROTOCOL:
|
||||
fprintf(stderr, "Error: Protocol error when communicating with broker.\n");
|
||||
break;
|
||||
case MOSQ_ERR_PAYLOAD_SIZE:
|
||||
fprintf(stderr, "Error: Message payload is too large.\n");
|
||||
break;
|
||||
}
|
||||
}
|
||||
client_state = rr_s_disconnect;
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
}
|
||||
}else{
|
||||
client_state = rr_s_disconnect;
|
||||
if(!cfg.quiet){
|
||||
fprintf(stderr, "%s\n", mosquitto_reason_string(granted_qos[0]));
|
||||
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_code, const mosquitto_property *properties)
|
||||
{
|
||||
client_state = rr_s_wait_for_response;
|
||||
}
|
||||
|
||||
|
||||
void print_usage(void)
|
||||
{
|
||||
int major, minor, revision;
|
||||
|
||||
mosquitto_lib_version(&major, &minor, &revision);
|
||||
printf("mosquitto_rr is an mqtt client that can be used to publish a request message and wait for a response.\n");
|
||||
printf(" Defaults to MQTT v5, where the Request-Response feature will be used, but v3.1.1 can also be used\n");
|
||||
printf(" with v3.1.1 brokers.\n");
|
||||
printf("mosquitto_rr version %s running on libmosquitto %d.%d.%d.\n\n", VERSION, major, minor, revision);
|
||||
printf("Usage: mosquitto_rr {[-h host] [-p port] [-u username [-P password]] -t topic | -L URL} -e response-topic\n");
|
||||
printf(" [-c] [-k keepalive] [-q qos] [-R]\n");
|
||||
printf(" [-F format]\n");
|
||||
#ifndef WIN32
|
||||
printf(" [-W timeout_secs]\n");
|
||||
#endif
|
||||
#ifdef WITH_SRV
|
||||
printf(" [-A bind_address] [-S]\n");
|
||||
#else
|
||||
printf(" [-A bind_address]\n");
|
||||
#endif
|
||||
printf(" [-i id] [-I id_prefix]\n");
|
||||
printf(" [-d] [-N] [--quiet] [-v]\n");
|
||||
printf(" [--will-topic [--will-payload payload] [--will-qos qos] [--will-retain]]\n");
|
||||
#ifdef WITH_TLS
|
||||
printf(" [{--cafile file | --capath dir} [--cert file] [--key file]\n");
|
||||
printf(" [--ciphers ciphers] [--insecure]]\n");
|
||||
#ifdef WITH_TLS_PSK
|
||||
printf(" [--psk hex-key --psk-identity identity [--ciphers ciphers]]\n");
|
||||
#endif
|
||||
#endif
|
||||
#ifdef WITH_SOCKS
|
||||
printf(" [--proxy socks-url]\n");
|
||||
#endif
|
||||
printf(" [-D command identifier value]\n");
|
||||
printf(" mosquitto_rr --help\n\n");
|
||||
printf(" -A : bind the outgoing socket to this host/ip address. Use to control which interface\n");
|
||||
printf(" the client communicates over.\n");
|
||||
printf(" -c : disable 'clean session' (store subscription and pending messages when client disconnects).\n");
|
||||
printf(" -d : enable debug messages.\n");
|
||||
printf(" -D : Define MQTT v5 properties. See the documentation for more details.\n");
|
||||
printf(" -F : output format.\n");
|
||||
printf(" -h : mqtt host to connect to. Defaults to localhost.\n");
|
||||
printf(" -i : id to use for this client. Defaults to mosquitto_rr_ appended with the process id.\n");
|
||||
printf(" -k : keep alive in seconds for this client. Defaults to 60.\n");
|
||||
printf(" -L : specify user, password, hostname, port and topic as a URL in the form:\n");
|
||||
printf(" mqtt(s)://[username[:password]@]host[:port]/topic\n");
|
||||
printf(" -N : do not add an end of line character when printing the payload.\n");
|
||||
printf(" -p : network port to connect to. Defaults to 1883 for plain MQTT and 8883 for MQTT over TLS.\n");
|
||||
printf(" -P : provide a password\n");
|
||||
printf(" -q : quality of service level to use for communications. Defaults to 0.\n");
|
||||
printf(" -R : do not print stale messages (those with retain set).\n");
|
||||
#ifdef WITH_SRV
|
||||
printf(" -S : use SRV lookups to determine which host to connect to.\n");
|
||||
#endif
|
||||
printf(" -t : mqtt response topic to subscribe to. May be repeated multiple times.\n");
|
||||
printf(" -u : provide a username\n");
|
||||
printf(" -v : print received messages verbosely.\n");
|
||||
printf(" -V : specify the version of the MQTT protocol to use when connecting.\n");
|
||||
printf(" Defaults to 5.\n");
|
||||
#ifndef WIN32
|
||||
printf(" -W : Specifies a timeout in seconds how long to wait for a response.\n");
|
||||
#endif
|
||||
printf(" --help : display this message.\n");
|
||||
printf(" --quiet : don't print error messages.\n");
|
||||
printf(" --will-payload : payload for the client Will, which is sent by the broker in case of\n");
|
||||
printf(" unexpected disconnection. If not given and will-topic is set, a zero\n");
|
||||
printf(" length message will be sent.\n");
|
||||
printf(" --will-qos : QoS level for the client Will.\n");
|
||||
printf(" --will-retain : if given, make the client Will retained.\n");
|
||||
printf(" --will-topic : the topic on which to publish the client Will.\n");
|
||||
#ifdef WITH_TLS
|
||||
printf(" --cafile : path to a file containing trusted CA certificates to enable encrypted\n");
|
||||
printf(" certificate based communication.\n");
|
||||
printf(" --capath : path to a directory containing trusted CA certificates to enable encrypted\n");
|
||||
printf(" communication.\n");
|
||||
printf(" --cert : client certificate for authentication, if required by server.\n");
|
||||
printf(" --key : client private key for authentication, if required by server.\n");
|
||||
printf(" --ciphers : openssl compatible list of TLS ciphers to support.\n");
|
||||
printf(" --tls-version : TLS protocol version, can be one of tlsv1.2 tlsv1.1 or tlsv1.\n");
|
||||
printf(" Defaults to tlsv1.2 if available.\n");
|
||||
printf(" --insecure : do not check that the server certificate hostname matches the remote\n");
|
||||
printf(" hostname. Using this option means that you cannot be sure that the\n");
|
||||
printf(" remote host is the server you wish to connect to and so is insecure.\n");
|
||||
printf(" Do not use this option in a production environment.\n");
|
||||
#ifdef WITH_TLS_PSK
|
||||
printf(" --psk : pre-shared-key in hexadecimal (no leading 0x) to enable TLS-PSK mode.\n");
|
||||
printf(" --psk-identity : client identity string for TLS-PSK mode.\n");
|
||||
#endif
|
||||
#endif
|
||||
#ifdef WITH_SOCKS
|
||||
printf(" --proxy : SOCKS5 proxy URL of the form:\n");
|
||||
printf(" socks5h://[username[:password]@]hostname[:port]\n");
|
||||
printf(" Only \"none\" and \"username\" authentication is supported.\n");
|
||||
#endif
|
||||
printf("\nSee https://mosquitto.org/ for more information.\n\n");
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
int rc;
|
||||
#ifndef WIN32
|
||||
struct sigaction sigact;
|
||||
#endif
|
||||
|
||||
memset(&cfg, 0, sizeof(struct mosq_config));
|
||||
|
||||
mosquitto_lib_init();
|
||||
|
||||
rc = client_config_load(&cfg, CLIENT_RR, argc, argv);
|
||||
if(rc){
|
||||
if(rc == 2){
|
||||
/* --help */
|
||||
print_usage();
|
||||
}else{
|
||||
fprintf(stderr, "\nUse 'mosquitto_rr --help' to see usage.\n");
|
||||
}
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if(!cfg.topic || cfg.pub_mode == MSGMODE_NONE || !cfg.response_topic){
|
||||
fprintf(stderr, "Error: All of topic, message, and response topic must be supplied.\n");
|
||||
fprintf(stderr, "\nUse 'mosquitto_rr --help' to see usage.\n");
|
||||
goto cleanup;
|
||||
}
|
||||
rc = mosquitto_property_add_string(&cfg.publish_props, MQTT_PROP_RESPONSE_TOPIC, cfg.response_topic);
|
||||
if(rc){
|
||||
fprintf(stderr, "Error adding property RESPONSE_TOPIC.\n");
|
||||
goto cleanup;
|
||||
}
|
||||
rc = mosquitto_property_check_all(CMD_PUBLISH, cfg.publish_props);
|
||||
if(rc){
|
||||
if(!cfg.quiet) fprintf(stderr, "Error in PUBLISH properties: Duplicate response topic.\n");
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if(client_id_generate(&cfg)){
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
mosq = mosquitto_new(cfg.id, cfg.clean_session, &cfg);
|
||||
if(!mosq){
|
||||
switch(errno){
|
||||
case ENOMEM:
|
||||
if(!cfg.quiet) fprintf(stderr, "Error: Out of memory.\n");
|
||||
break;
|
||||
case EINVAL:
|
||||
if(!cfg.quiet) fprintf(stderr, "Error: Invalid id and/or clean_session.\n");
|
||||
break;
|
||||
}
|
||||
goto cleanup;
|
||||
}
|
||||
if(client_opts_set(mosq, &cfg)){
|
||||
goto cleanup;
|
||||
}
|
||||
if(cfg.debug){
|
||||
mosquitto_log_callback_set(mosq, my_log_callback);
|
||||
}
|
||||
mosquitto_connect_v5_callback_set(mosq, my_connect_callback);
|
||||
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
|
||||
mosquitto_message_v5_callback_set(mosq, my_message_callback);
|
||||
|
||||
rc = client_connect(mosq, &cfg);
|
||||
if(rc){
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
#ifndef WIN32
|
||||
sigact.sa_handler = my_signal_handler;
|
||||
sigemptyset(&sigact.sa_mask);
|
||||
sigact.sa_flags = 0;
|
||||
|
||||
if(sigaction(SIGALRM, &sigact, NULL) == -1){
|
||||
perror("sigaction");
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if(cfg.timeout){
|
||||
alarm(cfg.timeout);
|
||||
}
|
||||
#endif
|
||||
|
||||
do{
|
||||
rc = mosquitto_loop(mosq, -1, 1);
|
||||
if(client_state == rr_s_ready_to_publish){
|
||||
client_state = rr_s_wait_for_response;
|
||||
switch(cfg.pub_mode){
|
||||
case MSGMODE_CMD:
|
||||
case MSGMODE_FILE:
|
||||
case MSGMODE_STDIN_FILE:
|
||||
rc = my_publish(mosq, &mid_sent, cfg.topic, cfg.msglen, cfg.message, cfg.qos, cfg.retain);
|
||||
break;
|
||||
case MSGMODE_NULL:
|
||||
rc = my_publish(mosq, &mid_sent, cfg.topic, 0, NULL, cfg.qos, cfg.retain);
|
||||
break;
|
||||
case MSGMODE_STDIN_LINE:
|
||||
/* FIXME */
|
||||
break;
|
||||
}
|
||||
}
|
||||
}while(rc == MOSQ_ERR_SUCCESS && client_state != rr_s_disconnect);
|
||||
|
||||
mosquitto_destroy(mosq);
|
||||
mosquitto_lib_cleanup();
|
||||
|
||||
if(cfg.msg_count>0 && rc == MOSQ_ERR_NO_CONN){
|
||||
rc = 0;
|
||||
}
|
||||
client_config_cleanup(&cfg);
|
||||
if(rc){
|
||||
fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));
|
||||
}
|
||||
return rc;
|
||||
|
||||
cleanup:
|
||||
mosquitto_lib_cleanup();
|
||||
client_config_cleanup(&cfg);
|
||||
return 1;
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
install(FILES mosquitto_passwd.1 mosquitto_pub.1 mosquitto_sub.1 DESTINATION ${MANDIR}/man1)
|
||||
install(FILES mosquitto_passwd.1 mosquitto_pub.1 mosquitto_sub.1 mosquitto_rr.1 DESTINATION ${MANDIR}/man1)
|
||||
install(FILES libmosquitto.3 DESTINATION ${MANDIR}/man3)
|
||||
install(FILES mosquitto.conf.5 DESTINATION ${MANDIR}/man5)
|
||||
install(FILES mosquitto-tls.7 mqtt.7 DESTINATION ${MANDIR}/man7)
|
||||
|
||||
30
man/Makefile
30
man/Makefile
@@ -2,22 +2,26 @@ include ../config.mk
|
||||
|
||||
.PHONY : all clean install uninstall dist
|
||||
|
||||
all : mosquitto.8 mosquitto-tls.7 mosquitto.conf.5 mosquitto_passwd.1 mosquitto_pub.1 mosquitto_sub.1 mqtt.7 libmosquitto.3
|
||||
MANPAGES = \
|
||||
libmosquitto.3 \
|
||||
mosquitto-tls.7 \
|
||||
mosquitto.8 \
|
||||
mosquitto.conf.5 \
|
||||
mosquitto_passwd.1 \
|
||||
mosquitto_pub.1 \
|
||||
mosquitto_rr.1 \
|
||||
mosquitto_sub.1 \
|
||||
mqtt.7
|
||||
|
||||
all : ${MANPAGES}
|
||||
|
||||
clean :
|
||||
|
||||
reallyclean : clean
|
||||
-rm -f *.orig
|
||||
-rm -f libmosquitto.3
|
||||
-rm -f mosquitto.8
|
||||
-rm -f mosquitto.conf.5
|
||||
-rm -f mosquitto_passwd.1
|
||||
-rm -f mosquitto_pub.1
|
||||
-rm -f mosquitto_sub.1
|
||||
-rm -f mosquitto-tls.7
|
||||
-rm -f mqtt.7
|
||||
-rm -f ${MANPAGES}
|
||||
|
||||
dist : mosquitto.8 mosquitto-tls.7 mosquitto.conf.5 mosquitto_passwd.1 mosquitto_pub.1 mosquitto_sub.1 mqtt.7 libmosquitto.3
|
||||
dist : ${MANPAGES}
|
||||
|
||||
install :
|
||||
$(INSTALL) -d "${DESTDIR}$(mandir)/man8"
|
||||
@@ -28,6 +32,7 @@ install :
|
||||
$(INSTALL) -m 644 mosquitto_passwd.1 "${DESTDIR}${mandir}/man1/mosquitto_passwd.1"
|
||||
$(INSTALL) -m 644 mosquitto_pub.1 "${DESTDIR}${mandir}/man1/mosquitto_pub.1"
|
||||
$(INSTALL) -m 644 mosquitto_sub.1 "${DESTDIR}${mandir}/man1/mosquitto_sub.1"
|
||||
$(INSTALL) -m 644 mosquitto_rr.1 "${DESTDIR}${mandir}/man1/mosquitto_rr.1"
|
||||
$(INSTALL) -d "${DESTDIR}$(mandir)/man7"
|
||||
$(INSTALL) -m 644 mqtt.7 "${DESTDIR}${mandir}/man7/mqtt.7"
|
||||
$(INSTALL) -m 644 mosquitto-tls.7 "${DESTDIR}${mandir}/man7/mosquitto-tls.7"
|
||||
@@ -40,6 +45,7 @@ uninstall :
|
||||
-rm -f "${DESTDIR}${mandir}/man1/mosquitto_passwd.1"
|
||||
-rm -f "${DESTDIR}${mandir}/man1/mosquitto_pub.1"
|
||||
-rm -f "${DESTDIR}${mandir}/man1/mosquitto_sub.1"
|
||||
-rm -f "${DESTDIR}${mandir}/man1/mosquitto_rr.1"
|
||||
-rm -f "${DESTDIR}${mandir}/man7/mqtt.7"
|
||||
-rm -f "${DESTDIR}${mandir}/man7/mosquitto-tls.7"
|
||||
-rm -f "${DESTDIR}${mandir}/man3/libmosquitto.3"
|
||||
@@ -59,6 +65,9 @@ mosquitto_pub.1 : mosquitto_pub.1.xml
|
||||
mosquitto_sub.1 : mosquitto_sub.1.xml
|
||||
$(XSLTPROC) $^
|
||||
|
||||
mosquitto_rr.1 : mosquitto_rr.1.xml
|
||||
$(XSLTPROC) $^
|
||||
|
||||
mqtt.7 : mqtt.7.xml
|
||||
$(XSLTPROC) $^
|
||||
|
||||
@@ -81,6 +90,7 @@ potgen :
|
||||
xml2po -o po/mosquitto_passwd/mosquitto_passwd.1.pot mosquitto_passwd.1.xml
|
||||
xml2po -o po/mosquitto_pub/mosquitto_pub.1.pot mosquitto_pub.1.xml
|
||||
xml2po -o po/mosquitto_sub/mosquitto_sub.1.pot mosquitto_sub.1.xml
|
||||
xml2po -o po/mosquitto_sub/mosquitto_rr.1.pot mosquitto_rr.1.xml
|
||||
xml2po -o po/mqtt/mqtt.7.pot mqtt.7.xml
|
||||
xml2po -o po/mosquitto-tls/mosquitto-tls.7.pot mosquitto-tls.7.xml
|
||||
xml2po -o po/libmosquitto/libmosquitto.3.pot libmosquitto.3.xml
|
||||
|
||||
@@ -513,6 +513,12 @@
|
||||
<manvolnum>1</manvolnum>
|
||||
</citerefentry>
|
||||
</member>
|
||||
<member>
|
||||
<citerefentry>
|
||||
<refentrytitle><link xlink:href="mosquitto_rr-1.html">mosquitto_rr</link></refentrytitle>
|
||||
<manvolnum>1</manvolnum>
|
||||
</citerefentry>
|
||||
</member>
|
||||
<member>
|
||||
<citerefentry>
|
||||
<refentrytitle><link xlink:href="mosquitto_sub-1.html">mosquitto_sub</link></refentrytitle>
|
||||
|
||||
@@ -668,6 +668,12 @@
|
||||
<manvolnum>7</manvolnum>
|
||||
</citerefentry>
|
||||
</member>
|
||||
<member>
|
||||
<citerefentry>
|
||||
<refentrytitle><link xlink:href="mosquitto_rr-1.html">mosquitto_rr</link></refentrytitle>
|
||||
<manvolnum>1</manvolnum>
|
||||
</citerefentry>
|
||||
</member>
|
||||
<member>
|
||||
<citerefentry>
|
||||
<refentrytitle><link xlink:href="mosquitto_sub-1.html">mosquitto_sub</link></refentrytitle>
|
||||
|
||||
5
man/mosquitto_rr.1.meta
Normal file
5
man/mosquitto_rr.1.meta
Normal file
@@ -0,0 +1,5 @@
|
||||
.. title: mosquitto_rr man page
|
||||
.. slug: mosquitto_rr-1
|
||||
.. category: man
|
||||
.. type: man
|
||||
.. pretty_url: False
|
||||
813
man/mosquitto_rr.1.xml
Normal file
813
man/mosquitto_rr.1.xml
Normal file
File diff suppressed because it is too large
Load Diff
@@ -832,7 +832,7 @@ mosquitto_sub -t 'bbc/#' -T bbc/bbc1 --remove-retained</programlisting>
|
||||
<refsect2>
|
||||
<title>Connect</title>
|
||||
<itemizedlist>
|
||||
<listitem><para><option>authentication-data</option> (binary data - note treated as a string in mosquitto_pub)</para></listitem>
|
||||
<listitem><para><option>authentication-data</option> (binary data - note treated as a string in mosquitto_sub)</para></listitem>
|
||||
<listitem><para><option>authentication-method</option> (UTF-8 string pair)</para></listitem>
|
||||
<listitem><para><option>maximum-packet-size</option> (32-bit unsigned integer)</para></listitem>
|
||||
<listitem><para><option>receive-maximum</option> (16-bit unsigned integer)</para></listitem>
|
||||
@@ -870,7 +870,7 @@ mosquitto_sub -t 'bbc/#' -T bbc/bbc1 --remove-retained</programlisting>
|
||||
<title>Will properties</title>
|
||||
<itemizedlist>
|
||||
<listitem><para><option>content-type</option> (UTF-8 string)</para></listitem>
|
||||
<listitem><para><option>correlation-data</option> (binary data - note treated as a string in mosquitto_pub)</para></listitem>
|
||||
<listitem><para><option>correlation-data</option> (binary data - note treated as a string in mosquitto_sub)</para></listitem>
|
||||
<listitem><para><option>message-expiry-interval</option> (32-bit unsigned integer)</para></listitem>
|
||||
<listitem><para><option>payload-format-indicator</option> (8-bit unsigned integer)</para></listitem>
|
||||
<listitem><para><option>response-topic</option> (UTF-8 string)</para></listitem>
|
||||
@@ -951,6 +951,12 @@ mosquitto_sub -t 'bbc/#' -T bbc/bbc1 --remove-retained</programlisting>
|
||||
<manvolnum>1</manvolnum>
|
||||
</citerefentry>
|
||||
</member>
|
||||
<member>
|
||||
<citerefentry>
|
||||
<refentrytitle><link xlink:href="mosquitto_rr-1.html">mosquitto_rr</link></refentrytitle>
|
||||
<manvolnum>1</manvolnum>
|
||||
</citerefentry>
|
||||
</member>
|
||||
<member>
|
||||
<citerefentry>
|
||||
<refentrytitle><link xlink:href="mosquitto-8.html">mosquitto</link></refentrytitle>
|
||||
|
||||
Reference in New Issue
Block a user