Merge branch 'fixes'

This commit is contained in:
Roger A. Light
2025-03-06 15:04:13 +00:00
114 changed files with 1953 additions and 1314 deletions

View File

@@ -13,7 +13,7 @@ name: "CodeQL"
on:
push:
branches: [ master, fixes, develop, 1.6.x ]
branches: [ master, fixes, develop ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master ]
@@ -35,11 +35,11 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v2
uses: actions/checkout@v4
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
@@ -64,4 +64,4 @@ jobs:
- run: make binary
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
uses: github/codeql-action/analyze@v3

View File

@@ -9,7 +9,7 @@ jobs:
coverity:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
ref: develop

View File

@@ -9,7 +9,7 @@ jobs:
coverity:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
ref: fixes

View File

@@ -10,12 +10,12 @@ permissions:
pull-requests: write
concurrency:
group: lock
group: lock-threads
jobs:
action:
runs-on: ubuntu-latest
steps:
- uses: dessant/lock-threads@v3
- uses: dessant/lock-threads@v5
with:
issue-inactive-days: '90'

53
.github/workflows/windows-x86.yml vendored Normal file
View File

@@ -0,0 +1,53 @@
name: Windows x86 build
on:
workflow_dispatch:
push:
branches: [ "master", "fixes", "develop" ]
tags: [ "v[0-9]+.*" ]
pull_request:
branches: [ "master", "fixes", "develop" ]
env:
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)
BUILD_TYPE: Release
jobs:
mosquitto:
runs-on: windows-2022
steps:
- uses: actions/checkout@v4
- name: vcpkg build
uses: johnwason/vcpkg-action@v6
id: vcpkg
with:
manifest-dir: ${{ github.workspace }}
triplet: x86-windows
token: ${{ github.token }}
github-binarycache: true
- name: Configure CMake
run: cmake -B ${{github.workspace}}/build -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} -DWITH_WEBSOCKETS=ON -DWITH_TESTS=OFF -DCMAKE_GENERATOR_PLATFORM=WIN32 -DCMAKE_TOOLCHAIN_FILE=${{ github.workspace }}/vcpkg/scripts/buildsystems/vcpkg.cmake -DVCPKG_TARGET_TRIPLET=x86-windows -DVCPKG_MANIFEST_MODE=ON
- name: Build
run: cmake --build ${{github.workspace}}/build --config ${{env.BUILD_TYPE}}
- uses: suisei-cn/actions-download-file@v1.6.0
id: vcredist
name: Download VC redistributable
with:
url: https://aka.ms/vs/17/release/vc_redist.x86.exe
target: ${{github.workspace}}/installer/
- name: Installer
uses: joncloud/makensis-action@v4
with:
script-file: ${{github.workspace}}/installer/mosquitto.nsi
- name: Upload installer to artifacts
uses: actions/upload-artifact@v4
with:
name: installer
path: ${{ github.workspace }}/installer/mosquitto*.exe

View File

@@ -30,7 +30,7 @@ jobs:
github-binarycache: true
- name: Configure CMake
run: cmake -B ${{github.workspace}}/build64 -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} -DWITH_LIBWEBSOCKETS=ON -DWITH_TESTS=OFF -DCMAKE_GENERATOR_PLATFORM=x64 -DCMAKE_TOOLCHAIN_FILE=${{ github.workspace }}/vcpkg/scripts/buildsystems/vcpkg.cmake -DVCPKG_TARGET_TRIPLET=x64-windows-release -DVCPKG_MANIFEST_MODE=ON
run: cmake -B ${{github.workspace}}/build64 -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} -DWITH_WEBSOCKETS=ON -DWITH_TESTS=OFF -DCMAKE_GENERATOR_PLATFORM=x64 -DCMAKE_TOOLCHAIN_FILE=${{ github.workspace }}/vcpkg/scripts/buildsystems/vcpkg.cmake -DVCPKG_TARGET_TRIPLET=x64-windows-release -DVCPKG_MANIFEST_MODE=ON
- name: Build
run: cmake --build ${{github.workspace}}/build64 --config ${{env.BUILD_TYPE}}

View File

@@ -4,11 +4,10 @@
# To configure the build options either use the CMake gui, or run the command
# line utility including the "-i" option.
cmake_minimum_required(VERSION 3.5)
cmake_policy(SET CMP0042 NEW)
cmake_minimum_required(VERSION 3.18)
project(mosquitto)
set (VERSION 2.0.20)
set (VERSION 2.0.21)
list(APPEND CMAKE_MODULE_PATH "${PROJECT_SOURCE_DIR}/cmake/")

View File

@@ -1,3 +1,39 @@
2.0.21 - 2024-xx-xx
===================
Broker:
- Fix clients sending a RESERVED packet not being quickly disconnected.
Closes #2325.
- Fix `bind_interface` producing an error when used with an interface that has
an IPv6 link-local address and no other IPv6 addresses. Closes #2696.
- Fix mismatched wrapped/unwrapped memory alloc/free in properties. Closes #3192.
- Fix `allow_anonymous false` not being applied in local only mode. Closes #3198.
- Add `retain_expiry_interval` option to fix expired retained message not
being removed from memory if they are not subscribed to. Closes #3221.
- Produce an error if invalid combinations of cafile/capath/certfile/keyfile
are used. Closes #1836. Closes #3130.
- Backport keepalive checking from develop to fix problems in current
implementation. Closes #3138.
Client library:
- Fix potential deadlock in mosquitto_sub if `-W` is used. Closes #3175.
Apps:
- mosquitto_ctrl dynsec now also allows `-i` to specify a clientid as well as
`-c`. This matches the documentation which states `-i`. Closes #3219.
Client library:
- Fix threads linking on Windows for static libmosquitto library
Closes #3143
Build:
- Fix Windows builds not having websockets enabled.
- Add tzdata to docker images
Tests:
- Fix 08-ssl-connect-cert-auth-expired and 08-ssl-connect-cert-auth-revoked
tests when under load. Closes #3208.
2.0.20 - 2024-10-16
===================

View File

@@ -21,6 +21,10 @@ OBJS = \
util_topic.o \
utf8_mosq.o
ifeq ($(UNAME),Linux)
LIBS:=$(LIBS) -lrt
endif
.PHONY: all clean reallyclean
all : mosquitto_db_dump

View File

@@ -127,7 +127,11 @@ int client_request_response(struct mosq_ctrl *ctrl)
int rc;
time_t start;
if(ctrl->cfg.cafile == NULL && ctrl->cfg.capath == NULL){
if(ctrl->cfg.cafile == NULL && ctrl->cfg.capath == NULL && !ctrl->cfg.tls_use_os_certs && ctrl->cfg.port != 8883
# ifdef FINAL_WITH_TLS_PSK
&& !ctrl->cfg.psk
# endif
){
fprintf(stderr, "Warning: You are running mosquitto_ctrl without encryption.\nThis means all of the configuration changes you are making are visible on the network, including passwords.\n\n");
}

View File

@@ -49,7 +49,7 @@ void dynsec__print_usage(void)
printf("Set group for anonymous clients: setAnonymousGroup <groupname>\n");
printf("\nClients\n-------\n");
printf("Create a new client: createClient <username> [-c clientid] [-p password]\n");
printf("Create a new client: createClient <username> [-i clientid] [-p password]\n");
printf("Delete a client: deleteClient <username>\n");
printf("Set a client password: setClientPassword <username> [password]\n");
printf("Set a client id: setClientId <username> [clientid]\n");

View File

@@ -40,9 +40,9 @@ int dynsec_client__create(int argc, char *argv[], cJSON *j_command)
username = argv[0];
for(i=1; i<argc; i++){
if(!strcmp(argv[i], "-c")){
if(!strcmp(argv[i], "-c") || !strcmp(argv[i], "-i")){
if(i+1 == argc){
fprintf(stderr, "Error: -c argument given, but no clientid provided.\n");
fprintf(stderr, "Error: -i argument given, but no clientid provided.\n");
return MOSQ_ERR_INVAL;
}
clientid = argv[i+1];

View File

@@ -51,6 +51,7 @@ struct mosq_config {
char *tls_engine;
char *tls_engine_kpass_sha1;
char *keyform;
bool tls_use_os_certs;
# ifdef FINAL_WITH_TLS_PSK
char *psk;
char *psk_identity;

View File

@@ -247,6 +247,7 @@ static int client_config_line_proc(struct mosq_config *cfg, int *argc, char **ar
} else if(!strncasecmp(url, "mqtts://", 8)) {
url += 8;
cfg->port = 8883;
cfg->tls_use_os_certs = true;
} else {
fprintf(stderr, "Error: unsupported URL scheme.\n\n");
return 1;
@@ -388,6 +389,8 @@ static int client_config_line_proc(struct mosq_config *cfg, int *argc, char **ar
}
argv++;
(*argc)--;
}else if(!strcmp(argv[0], "--tls-use-os-certs")){
cfg->tls_use_os_certs = true;
}else if(!strcmp(argv[0], "--tls-version")){
if((*argc) == 1){
fprintf(stderr, "Error: --tls-version argument given but no version specified.\n\n");
@@ -609,7 +612,21 @@ int client_opts_set(struct mosquitto *mosq, struct mosq_config *cfg)
mosquitto_lib_cleanup();
return 1;
}
}
# ifdef FINAL_WITH_TLS_PSK
}else if (cfg->psk){
if(mosquitto_tls_psk_set(mosq, cfg->psk, cfg->psk_identity, NULL)){
fprintf(stderr, "Error: Problem setting TLS-PSK options.\n");
mosquitto_lib_cleanup();
return 1;
}
# endif
}else if(cfg->port == 8883){
mosquitto_int_option(mosq, MOSQ_OPT_TLS_USE_OS_CERTS, 1);
}
if(cfg->tls_use_os_certs){
mosquitto_int_option(mosq, MOSQ_OPT_TLS_USE_OS_CERTS, 1);
}
if(cfg->insecure && mosquitto_tls_insecure_set(mosq, true)){
fprintf(stderr, "Error: Problem setting TLS insecure option.\n");
mosquitto_lib_cleanup();
@@ -630,13 +647,6 @@ int client_opts_set(struct mosquitto *mosq, struct mosq_config *cfg)
mosquitto_lib_cleanup();
return 1;
}
# ifdef FINAL_WITH_TLS_PSK
if(cfg->psk && mosquitto_tls_psk_set(mosq, cfg->psk, cfg->psk_identity, NULL)){
fprintf(stderr, "Error: Problem setting TLS-PSK options.\n");
mosquitto_lib_cleanup();
return 1;
}
# endif
if((cfg->tls_version || cfg->ciphers) && mosquitto_tls_opts_set(mosq, 1, cfg->tls_version, cfg->ciphers)){
fprintf(stderr, "Error: Problem setting TLS options, check the options are valid.\n");
mosquitto_lib_cleanup();

View File

@@ -120,6 +120,10 @@ WITH_JEMALLOC:=no
# probably of no particular interest to end users.
WITH_XTREPORT=no
# Use the old O(n) keepalive check routine, instead of the new O(1) keepalive
# check routine. See src/keepalive.c for notes on this.
WITH_OLD_KEEPALIVE=no
# Build using clang and with address sanitiser enabled
WITH_ASAN=no
@@ -130,7 +134,7 @@ WITH_ASAN=no
# Also bump lib/mosquitto.h, CMakeLists.txt,
# installer/mosquitto.nsi, installer/mosquitto64.nsi
VERSION=2.0.20
VERSION=2.0.21
# Client library SO version. Bump if incompatible API/ABI changes are made.
SOVERSION=1
@@ -388,6 +392,10 @@ ifeq ($(WITH_XTREPORT),yes)
BROKER_CFLAGS:=$(BROKER_CFLAGS) -DWITH_XTREPORT
endif
ifeq ($(WITH_OLD_KEEPALIVE),yes)
BROKER_CPPFLAGS:=$(BROKER_CPPFLAGS) -DWITH_OLD_KEEPALIVE
endif
BROKER_LDADD:=${BROKER_LDADD} ${LDADD}
CLIENT_LDADD:=${CLIENT_LDADD} ${LDADD}
PASSWD_LDADD:=${PASSWD_LDADD} ${LDADD}

View File

@@ -82,7 +82,9 @@ RUN set -x && \
install -m644 /build/mosq/mosquitto.conf /mosquitto/config/mosquitto.conf && \
chown -R mosquitto:mosquitto /mosquitto && \
apk --no-cache add \
ca-certificates libuuid && \
ca-certificates \
libuuid \
tzdata && \
apk del build-deps && \
rm -rf /build

View File

@@ -82,7 +82,8 @@ RUN set -x && \
apk --no-cache add \
ca-certificates \
libressl \
libuuid && \
libuuid \
tzdata && \
apk del build-deps && \
rm -rf /build

View File

@@ -86,7 +86,8 @@ RUN set -x && \
install -Dm644 /build/mosq/edl-v10 /usr/share/licenses/mosquitto/edl-v10 && \
chown -R mosquitto:mosquitto /mosquitto && \
apk --no-cache add \
ca-certificates && \
ca-certificates \
tzdata && \
apk del build-deps && \
rm -rf /build

View File

@@ -88,7 +88,8 @@ RUN set -x && \
chown -R mosquitto:mosquitto /mosquitto && \
apk --no-cache add \
ca-certificates \
libressl && \
libressl \
tzdata && \
apk del build-deps && \
rm -rf /build

View File

@@ -90,7 +90,8 @@ RUN set -x && \
chown -R mosquitto:mosquitto /mosquitto && \
apk --no-cache add \
ca-certificates \
cjson && \
cjson \
tzdata && \
apk del build-deps && \
rm -rf /build

View File

@@ -71,6 +71,19 @@ to expose the ports that have been configured, for example:
docker run -it -p 1883:1883 -p 8080:8080 -v <absolute-path-to-configuration-file>:/mosquitto/config/mosquitto.conf eclipse-mosquitto:<version>
```
**Important**: The default configuration only listens on the loopback
interface. This means that there is no way to access Mosquitto in the docker
container without using a custom configuration containing at least a listener.
You also need to make a decision to allow anonymous connections or to set up a
different method of client authentication.
i.e. to configure a Mosquitto docker container as if it was running locally,
add the following to `mosquitto.conf`:
```
listener 1883
allow_anonymous true
```
Configuration can be changed to:
* persist data to `/mosquitto/data`

View File

@@ -92,7 +92,8 @@ RUN set -x && \
apk --no-cache add \
ca-certificates \
cjson \
libressl && \
libressl \
tzdata && \
apk del build-deps && \
rm -rf /build

View File

@@ -71,6 +71,20 @@ to expose the ports that have been configured, for example:
docker run -it -p 1883:1883 -p 8080:8080 -v <absolute-path-to-configuration-file>:/mosquitto/config/mosquitto.conf eclipse-mosquitto:<version>
```
**Important**: The default configuration only listens on the loopback
interface. This means that there is no way to access Mosquitto in the docker
container without using a custom configuration containing at least a listener.
You also need to make a decision to allow anonymous connections or to set up a
different method of client authentication.
i.e. to configure a Mosquitto docker container as if it was running locally,
add the following to `mosquitto.conf`:
```
listener 1883
allow_anonymous true
```
Configuration can be changed to:
* persist data to `/mosquitto/data`

View File

@@ -118,7 +118,8 @@ RUN set -x && \
install -Dm644 /build/mosq/edl-v10 /usr/share/licenses/mosquitto/edl-v10 && \
chown -R mosquitto:mosquitto /mosquitto && \
apk --no-cache add \
ca-certificates && \
ca-certificates \
tzdata && \
apk del build-deps && \
rm -rf /build

View File

@@ -58,3 +58,16 @@ docker run -it -p 1883:1883 -v <path-to-configuration-file>:/mosquitto/config/mo
:boom: if the mosquitto configuration (mosquitto.conf) was modified
to use non-default ports, the docker run command will need to be updated
to expose the ports that have been configured.
**Important**: The default configuration only listens on the
loopback interface. This means that there is no way to access Mosquitto in the
docker container without using a custom configuration containing at least
a listener. You also need to make a decision to allow anonymous connections or
to set up a different method of client authentication.
i.e. to configure a Mosquitto docker container as if it was running locally,
add the following to `mosquitto.conf`:
```
listener 1883
allow_anonymous true
```

View File

@@ -77,7 +77,8 @@ RUN set -x && \
install -Dm644 /build/mosq/edl-v10 /usr/share/licenses/mosquitto/edl-v10 && \
chown -R mosquitto:mosquitto /mosquitto && \
apk --no-cache add \
ca-certificates && \
ca-certificates \
tzdata && \
apk del build-deps && \
rm -rf /build

View File

@@ -66,7 +66,7 @@ extern "C" {
#define LIBMOSQUITTO_MAJOR 2
#define LIBMOSQUITTO_MINOR 0
#define LIBMOSQUITTO_REVISION 20
#define LIBMOSQUITTO_REVISION 21
/* LIBMOSQUITTO_VERSION_NUMBER looks like 1002001 for e.g. version 1.2.1. */
#define LIBMOSQUITTO_VERSION_NUMBER (LIBMOSQUITTO_MAJOR*1000000+LIBMOSQUITTO_MINOR*1000+LIBMOSQUITTO_REVISION)
@@ -399,8 +399,7 @@ libmosq_EXPORT int mosquitto_will_set(struct mosquitto *mosq, const char *topic,
* before calling <mosquitto_connect>.
*
* If the mosquitto instance `mosq` is using MQTT v5, the `properties` argument
* will be applied to the Will. For MQTT v3.1.1 and below, the `properties`
* argument will be ignored.
* will be applied to the Will.
*
* Set your client to use MQTT v5 immediately after it is created:
*
@@ -431,6 +430,8 @@ libmosq_EXPORT int mosquitto_will_set(struct mosquitto *mosq, const char *topic,
* using MQTT v5
* MOSQ_ERR_PROTOCOL - if a property is invalid for use with wills.
* MOSQ_ERR_DUPLICATE_PROPERTY - if a property is duplicated where it is forbidden.
* MOSQ_ERR_NOT_SUPPORTED - if properties is not NULL and the client is not
* using MQTT v5.
*/
libmosq_EXPORT int mosquitto_will_set_v5(struct mosquitto *mosq, const char *topic, int payloadlen, const void *payload, int qos, bool retain, mosquitto_property *properties);
@@ -562,8 +563,7 @@ libmosq_EXPORT int mosquitto_connect_bind(struct mosquitto *mosq, const char *ho
* <mosquitto_property_free_all>.
*
* If the mosquitto instance `mosq` is using MQTT v5, the `properties` argument
* will be applied to the CONNECT message. For MQTT v3.1.1 and below, the
* `properties` argument will be ignored.
* will be applied to the CONNECT message.
*
* Set your client to use MQTT v5 immediately after it is created:
*
@@ -594,6 +594,8 @@ libmosq_EXPORT int mosquitto_connect_bind(struct mosquitto *mosq, const char *ho
* Windows.
* MOSQ_ERR_DUPLICATE_PROPERTY - if a property is duplicated where it is forbidden.
* MOSQ_ERR_PROTOCOL - if any property is invalid for use with CONNECT.
* MOSQ_ERR_NOT_SUPPORTED - if properties is not NULL and the client is not
* using MQTT v5.
*
* See Also:
* <mosquitto_connect>, <mosquitto_connect_async>, <mosquitto_connect_bind_async>
@@ -795,8 +797,7 @@ libmosq_EXPORT int mosquitto_disconnect(struct mosquitto *mosq);
* <mosquitto_property_free_all>.
*
* If the mosquitto instance `mosq` is using MQTT v5, the `properties` argument
* will be applied to the DISCONNECT message. For MQTT v3.1.1 and below, the
* `properties` argument will be ignored.
* will be applied to the DISCONNECT message.
*
* Set your client to use MQTT v5 immediately after it is created:
*
@@ -813,6 +814,8 @@ libmosq_EXPORT int mosquitto_disconnect(struct mosquitto *mosq);
* MOSQ_ERR_NO_CONN - if the client isn't connected to a broker.
* MOSQ_ERR_DUPLICATE_PROPERTY - if a property is duplicated where it is forbidden.
* MOSQ_ERR_PROTOCOL - if any property is invalid for use with DISCONNECT.
* MOSQ_ERR_NOT_SUPPORTED - if properties is not NULL and the client is not
* using MQTT v5.
*/
libmosq_EXPORT int mosquitto_disconnect_v5(struct mosquitto *mosq, int reason_code, const mosquitto_property *properties);
@@ -879,8 +882,7 @@ libmosq_EXPORT int mosquitto_publish(struct mosquitto *mosq, int *mid, const cha
* <mosquitto_property_free_all>.
*
* If the mosquitto instance `mosq` is using MQTT v5, the `properties` argument
* will be applied to the PUBLISH message. For MQTT v3.1.1 and below, the
* `properties` argument will be ignored.
* will be applied to the PUBLISH message.
*
* Set your client to use MQTT v5 immediately after it is created:
*
@@ -920,6 +922,8 @@ libmosq_EXPORT int mosquitto_publish(struct mosquitto *mosq, int *mid, const cha
* the broker.
* MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than
* supported by the broker.
* MOSQ_ERR_NOT_SUPPORTED - if properties is not NULL and the client is not
* using MQTT v5.
*/
libmosq_EXPORT int mosquitto_publish_v5(
struct mosquitto *mosq,
@@ -971,8 +975,7 @@ libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const c
* <mosquitto_property_free_all>.
*
* If the mosquitto instance `mosq` is using MQTT v5, the `properties` argument
* will be applied to the PUBLISH message. For MQTT v3.1.1 and below, the
* `properties` argument will be ignored.
* will be applied to the PUBLISH message.
*
* Set your client to use MQTT v5 immediately after it is created:
*
@@ -1000,6 +1003,8 @@ libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const c
* MOSQ_ERR_PROTOCOL - if any property is invalid for use with SUBSCRIBE.
* MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than
* supported by the broker.
* MOSQ_ERR_NOT_SUPPORTED - if properties is not NULL and the client is not
* using MQTT v5.
*/
libmosq_EXPORT int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, int qos, int options, const mosquitto_property *properties);
@@ -1076,8 +1081,7 @@ libmosq_EXPORT int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const
* <mosquitto_property_free_all>.
*
* If the mosquitto instance `mosq` is using MQTT v5, the `properties` argument
* will be applied to the PUBLISH message. For MQTT v3.1.1 and below, the
* `properties` argument will be ignored.
* will be applied to the PUBLISH message.
*
* Set your client to use MQTT v5 immediately after it is created:
*
@@ -1103,6 +1107,8 @@ libmosq_EXPORT int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const
* MOSQ_ERR_PROTOCOL - if any property is invalid for use with UNSUBSCRIBE.
* MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than
* supported by the broker.
* MOSQ_ERR_NOT_SUPPORTED - if properties is not NULL and the client is not
* using MQTT v5.
*/
libmosq_EXPORT int mosquitto_unsubscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, const mosquitto_property *properties);

View File

@@ -34,6 +34,7 @@ Contributors:
/* Message types */
#define CMD_RESERVED 0x00U
#define CMD_CONNECT 0x10U
#define CMD_CONNACK 0x20U
#define CMD_PUBLISH 0x30U

View File

@@ -9,7 +9,7 @@
!define env_hklm 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
Name "Eclipse Mosquitto"
!define VERSION 2.0.20
!define VERSION 2.0.21
OutFile "mosquitto-${VERSION}-install-windows-x86.exe"
InstallDir "$PROGRAMFILES\mosquitto"
@@ -47,8 +47,8 @@ Section "Files" SecInstall
SetOutPath "$INSTDIR"
File "..\logo\mosquitto.ico"
File "..\build\src\Release\mosquitto.exe"
File "..\build\apps\mosquitto_passwd\Release\mosquitto_passwd.exe"
File "..\build\apps\mosquitto_ctrl\Release\mosquitto_ctrl.exe"
File "..\build\apps\mosquitto_passwd\Release\mosquitto_passwd.exe"
File "..\build\client\Release\mosquitto_pub.exe"
File "..\build\client\Release\mosquitto_sub.exe"
File "..\build\client\Release\mosquitto_rr.exe"
@@ -63,12 +63,17 @@ Section "Files" SecInstall
File "..\README.md"
File "..\README-windows.txt"
File "..\README-letsencrypt.md"
;File "C:\pthreads\Pre-built.2\dll\x86\pthreadVC2.dll"
File "C:\OpenSSL-Win32\bin\libssl-1_1.dll"
File "C:\OpenSSL-Win32\bin\libcrypto-1_1.dll"
File "..\SECURITY.md"
File "..\edl-v10"
File "..\epl-v20"
File "..\build\vcpkg_installed\x86-windows\bin\cjson.dll"
File "..\build\vcpkg_installed\x86-windows\bin\libcrypto-3.dll"
File "..\build\vcpkg_installed\x86-windows\bin\libssl-3.dll"
File "..\build\vcpkg_installed\x86-windows\bin\pthreadVC3.dll"
File "..\build\vcpkg_installed\x86-windows\bin\uv.dll"
File "..\build\vcpkg_installed\x86-windows\bin\websockets.dll"
SetOutPath "$INSTDIR\devel"
File "..\build\lib\Release\mosquitto.lib"
File "..\build\lib\cpp\Release\mosquittopp.lib"
@@ -93,6 +98,13 @@ Section "Files" SecInstall
SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000
SectionEnd
Section "Visual Studio Runtime"
SetOutPath "$INSTDIR"
File "VC_redist.x86.exe"
ExecWait '"$INSTDIR\VC_redist.x86.exe" /quiet /norestart'
Delete "$INSTDIR\VC_redist.x86.exe"
SectionEnd
Section "Service" SecService
ExecWait '"$INSTDIR\mosquitto.exe" install'
ExecWait 'sc start mosquitto'
@@ -104,36 +116,41 @@ Section "Uninstall"
ExecWait '"$INSTDIR\mosquitto.exe" uninstall'
Sleep 1000
Delete "$INSTDIR\mosquitto.dll"
Delete "$INSTDIR\mosquitto.exe"
Delete "$INSTDIR\mosquitto_ctrl.exe"
Delete "$INSTDIR\mosquitto_passwd.exe"
Delete "$INSTDIR\mosquitto_pub.exe"
Delete "$INSTDIR\mosquitto_sub.exe"
Delete "$INSTDIR\mosquitto_rr.exe"
Delete "$INSTDIR\mosquitto.dll"
Delete "$INSTDIR\mosquitto_sub.exe"
Delete "$INSTDIR\mosquittopp.dll"
Delete "$INSTDIR\mosquitto_dynamic_security.dll"
Delete "$INSTDIR\aclfile.example"
Delete "$INSTDIR\ChangeLog.txt"
Delete "$INSTDIR\mosquitto.conf"
Delete "$INSTDIR\pwfile.example"
Delete "$INSTDIR\NOTICE.md"
Delete "$INSTDIR\README.md"
Delete "$INSTDIR\README-windows.txt"
Delete "$INSTDIR\README-letsencrypt.md"
;Delete "$INSTDIR\pthreadVC2.dll"
Delete "$INSTDIR\libssl-1_1.dll"
Delete "$INSTDIR\libcrypto-1_1.dll"
Delete "$INSTDIR\SECURITY.md"
Delete "$INSTDIR\edl-v10"
Delete "$INSTDIR\epl-v20"
Delete "$INSTDIR\mosquitto.ico"
Delete "$INSTDIR\cjson.dll"
Delete "$INSTDIR\libcrypto-3.dll"
Delete "$INSTDIR\libssl-3.dll"
Delete "$INSTDIR\pthreadVC3.dll"
Delete "$INSTDIR\uv.dll"
Delete "$INSTDIR\websockets.dll"
Delete "$INSTDIR\devel\mosquitto.h"
Delete "$INSTDIR\devel\mosquitto.lib"
Delete "$INSTDIR\devel\mosquitto_broker.h"
Delete "$INSTDIR\devel\mosquitto_plugin.h"
Delete "$INSTDIR\devel\mosquittopp.h"
Delete "$INSTDIR\devel\mosquittopp.lib"
Delete "$INSTDIR\devel\mqtt_protocol.h"
RMDir "$INSTDIR\devel\mosquitto"
RMDir "$INSTDIR\devel"
Delete "$INSTDIR\Uninstall.exe"
@@ -151,4 +168,3 @@ LangString DESC_SecService ${LANG_ENGLISH} "Install mosquitto as a Windows servi
!insertmacro MUI_DESCRIPTION_TEXT ${SecInstall} $(DESC_SecInstall)
!insertmacro MUI_DESCRIPTION_TEXT ${SecService} $(DESC_SecService)
!insertmacro MUI_FUNCTION_DESCRIPTION_END

View File

@@ -9,7 +9,7 @@
!define env_hklm 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
Name "Eclipse Mosquitto"
!define VERSION 2.0.20
!define VERSION 2.0.21
OutFile "mosquitto-${VERSION}-install-windows-x64.exe"
!include "x64.nsh"

View File

@@ -89,19 +89,19 @@ set_target_properties(libmosquitto PROPERTIES
POSITION_INDEPENDENT_CODE 1
)
target_link_libraries(libmosquitto PRIVATE ${LIBRARIES})
if (WITH_THREADING)
if(WIN32)
target_link_libraries(libmosquitto PRIVATE PThreads4W::PThreads4W)
set (LIBRARIES ${LIBRARIES} PThreads4W::PThreads4W)
else()
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
target_link_libraries(libmosquitto PRIVATE Threads::Threads)
set (LIBRARIES ${LIBRARIES} Threads::Threads)
endif()
endif()
target_link_libraries(libmosquitto PRIVATE ${LIBRARIES})
set_target_properties(libmosquitto PROPERTIES
OUTPUT_NAME mosquitto
VERSION ${VERSION}

View File

@@ -155,10 +155,10 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in
message->dup = false;
message->properties = properties_copy;
pthread_mutex_lock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_lock(&mosq->msgs_out.mutex);
message->state = mosq_ms_invalid;
rc = message__queue(mosq, message, mosq_md_out);
pthread_mutex_unlock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_out.mutex);
return rc;
}
}

View File

@@ -24,99 +24,99 @@ Contributors:
void mosquitto_connect_callback_set(struct mosquitto *mosq, void (*on_connect)(struct mosquitto *, void *, int))
{
pthread_mutex_lock(&mosq->callback_mutex);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
mosq->on_connect = on_connect;
pthread_mutex_unlock(&mosq->callback_mutex);
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
}
void mosquitto_connect_with_flags_callback_set(struct mosquitto *mosq, void (*on_connect)(struct mosquitto *, void *, int, int))
{
pthread_mutex_lock(&mosq->callback_mutex);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
mosq->on_connect_with_flags = on_connect;
pthread_mutex_unlock(&mosq->callback_mutex);
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
}
void mosquitto_connect_v5_callback_set(struct mosquitto *mosq, void (*on_connect)(struct mosquitto *, void *, int, int, const mosquitto_property *))
{
pthread_mutex_lock(&mosq->callback_mutex);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
mosq->on_connect_v5 = on_connect;
pthread_mutex_unlock(&mosq->callback_mutex);
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
}
void mosquitto_disconnect_callback_set(struct mosquitto *mosq, void (*on_disconnect)(struct mosquitto *, void *, int))
{
pthread_mutex_lock(&mosq->callback_mutex);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
mosq->on_disconnect = on_disconnect;
pthread_mutex_unlock(&mosq->callback_mutex);
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
}
void mosquitto_disconnect_v5_callback_set(struct mosquitto *mosq, void (*on_disconnect)(struct mosquitto *, void *, int, const mosquitto_property *))
{
pthread_mutex_lock(&mosq->callback_mutex);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
mosq->on_disconnect_v5 = on_disconnect;
pthread_mutex_unlock(&mosq->callback_mutex);
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
}
void mosquitto_publish_callback_set(struct mosquitto *mosq, void (*on_publish)(struct mosquitto *, void *, int))
{
pthread_mutex_lock(&mosq->callback_mutex);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
mosq->on_publish = on_publish;
pthread_mutex_unlock(&mosq->callback_mutex);
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
}
void mosquitto_publish_v5_callback_set(struct mosquitto *mosq, void (*on_publish)(struct mosquitto *, void *, int, int, const mosquitto_property *props))
{
pthread_mutex_lock(&mosq->callback_mutex);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
mosq->on_publish_v5 = on_publish;
pthread_mutex_unlock(&mosq->callback_mutex);
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
}
void mosquitto_message_callback_set(struct mosquitto *mosq, void (*on_message)(struct mosquitto *, void *, const struct mosquitto_message *))
{
pthread_mutex_lock(&mosq->callback_mutex);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
mosq->on_message = on_message;
pthread_mutex_unlock(&mosq->callback_mutex);
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
}
void mosquitto_message_v5_callback_set(struct mosquitto *mosq, void (*on_message)(struct mosquitto *, void *, const struct mosquitto_message *, const mosquitto_property *props))
{
pthread_mutex_lock(&mosq->callback_mutex);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
mosq->on_message_v5 = on_message;
pthread_mutex_unlock(&mosq->callback_mutex);
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
}
void mosquitto_subscribe_callback_set(struct mosquitto *mosq, void (*on_subscribe)(struct mosquitto *, void *, int, int, const int *))
{
pthread_mutex_lock(&mosq->callback_mutex);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
mosq->on_subscribe = on_subscribe;
pthread_mutex_unlock(&mosq->callback_mutex);
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
}
void mosquitto_subscribe_v5_callback_set(struct mosquitto *mosq, void (*on_subscribe)(struct mosquitto *, void *, int, int, const int *, const mosquitto_property *props))
{
pthread_mutex_lock(&mosq->callback_mutex);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
mosq->on_subscribe_v5 = on_subscribe;
pthread_mutex_unlock(&mosq->callback_mutex);
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
}
void mosquitto_unsubscribe_callback_set(struct mosquitto *mosq, void (*on_unsubscribe)(struct mosquitto *, void *, int))
{
pthread_mutex_lock(&mosq->callback_mutex);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
mosq->on_unsubscribe = on_unsubscribe;
pthread_mutex_unlock(&mosq->callback_mutex);
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
}
void mosquitto_unsubscribe_v5_callback_set(struct mosquitto *mosq, void (*on_unsubscribe)(struct mosquitto *, void *, int, const mosquitto_property *props))
{
pthread_mutex_lock(&mosq->callback_mutex);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
mosq->on_unsubscribe_v5 = on_unsubscribe;
pthread_mutex_unlock(&mosq->callback_mutex);
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
}
void mosquitto_log_callback_set(struct mosquitto *mosq, void (*on_log)(struct mosquitto *, void *, int, const char *))
{
pthread_mutex_lock(&mosq->log_callback_mutex);
COMPAT_pthread_mutex_lock(&mosq->log_callback_mutex);
mosq->on_log = on_log;
pthread_mutex_unlock(&mosq->log_callback_mutex);
COMPAT_pthread_mutex_unlock(&mosq->log_callback_mutex);
}

View File

@@ -179,10 +179,10 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking)
if(rc) return rc;
}
pthread_mutex_lock(&mosq->msgtime_mutex);
COMPAT_pthread_mutex_lock(&mosq->msgtime_mutex);
mosq->last_msg_in = mosquitto_time();
mosq->next_msg_out = mosq->last_msg_in + mosq->keepalive;
pthread_mutex_unlock(&mosq->msgtime_mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
mosq->ping_t = 0;
@@ -267,11 +267,14 @@ int mosquitto_disconnect_v5(struct mosquitto *mosq, int reason_code, const mosqu
void do_client_disconnect(struct mosquitto *mosq, int reason_code, const mosquitto_property *properties)
{
void (*on_disconnect)(struct mosquitto *, void *userdata, int rc);
void (*on_disconnect_v5)(struct mosquitto *, void *userdata, int rc, const mosquitto_property *props);
mosquitto__set_state(mosq, mosq_cs_disconnected);
net__socket_close(mosq);
/* Free data and reset values */
pthread_mutex_lock(&mosq->out_packet_mutex);
COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
mosq->current_out_packet = mosq->out_packet;
if(mosq->out_packet){
mosq->out_packet = mosq->out_packet->next;
@@ -280,24 +283,27 @@ void do_client_disconnect(struct mosquitto *mosq, int reason_code, const mosquit
}
mosq->out_packet_count--;
}
pthread_mutex_unlock(&mosq->out_packet_mutex);
COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
pthread_mutex_lock(&mosq->msgtime_mutex);
COMPAT_pthread_mutex_lock(&mosq->msgtime_mutex);
mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
pthread_mutex_unlock(&mosq->msgtime_mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_disconnect){
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
on_disconnect = mosq->on_disconnect;
on_disconnect_v5 = mosq->on_disconnect_v5;
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_disconnect){
mosq->in_callback = true;
mosq->on_disconnect(mosq, mosq->userdata, reason_code);
on_disconnect(mosq, mosq->userdata, reason_code);
mosq->in_callback = false;
}
if(mosq->on_disconnect_v5){
if(on_disconnect_v5){
mosq->in_callback = true;
mosq->on_disconnect_v5(mosq, mosq->userdata, reason_code, properties);
on_disconnect_v5(mosq, mosq->userdata, reason_code, properties);
mosq->in_callback = false;
}
pthread_mutex_unlock(&mosq->callback_mutex);
pthread_mutex_unlock(&mosq->current_out_packet_mutex);
COMPAT_pthread_mutex_unlock(&mosq->current_out_packet_mutex);
}

View File

@@ -32,27 +32,35 @@ Contributors:
static void connack_callback(struct mosquitto *mosq, uint8_t reason_code, uint8_t connect_flags, const mosquitto_property *properties)
{
void (*on_connect)(struct mosquitto *, void *userdata, int rc);
void (*on_connect_with_flags)(struct mosquitto *, void *userdata, int rc, int flags);
void (*on_connect_v5)(struct mosquitto *, void *userdata, int rc, int flags, const mosquitto_property *props);
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received CONNACK (%d)", SAFE_PRINT(mosq->id), reason_code);
if(reason_code == MQTT_RC_SUCCESS){
mosq->reconnects = 0;
}
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_connect){
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
on_connect = mosq->on_connect;
on_connect_with_flags = mosq->on_connect_with_flags;
on_connect_v5 = mosq->on_connect_v5;
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_connect){
mosq->in_callback = true;
mosq->on_connect(mosq, mosq->userdata, reason_code);
on_connect(mosq, mosq->userdata, reason_code);
mosq->in_callback = false;
}
if(mosq->on_connect_with_flags){
if(on_connect_with_flags){
mosq->in_callback = true;
mosq->on_connect_with_flags(mosq, mosq->userdata, reason_code, connect_flags);
on_connect_with_flags(mosq, mosq->userdata, reason_code, connect_flags);
mosq->in_callback = false;
}
if(mosq->on_connect_v5){
if(on_connect_v5){
mosq->in_callback = true;
mosq->on_connect_v5(mosq, mosq->userdata, reason_code, connect_flags, properties);
on_connect_v5(mosq, mosq->userdata, reason_code, connect_flags, properties);
mosq->in_callback = false;
}
pthread_mutex_unlock(&mosq->callback_mutex);
}
@@ -117,11 +125,11 @@ int handle__connack(struct mosquitto *mosq)
switch(reason_code){
case 0:
pthread_mutex_lock(&mosq->state_mutex);
COMPAT_pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state != mosq_cs_disconnecting){
mosq->state = mosq_cs_active;
}
pthread_mutex_unlock(&mosq->state_mutex);
COMPAT_pthread_mutex_unlock(&mosq->state_mutex);
message__retry_check(mosq);
return MOSQ_ERR_SUCCESS;
case 1:

View File

@@ -136,26 +136,32 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
rc = message__delete(mosq, mid, mosq_md_out, qos);
if(rc == MOSQ_ERR_SUCCESS){
void (*on_publish)(struct mosquitto *, void *userdata, int mid);
void (*on_publish_v5)(struct mosquitto *, void *userdata, int mid, int reason_code, const mosquitto_property *props);
/* Only inform the client the message has been sent once. */
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_publish){
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
on_publish = mosq->on_publish;
on_publish_v5 = mosq->on_publish_v5;
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_publish){
mosq->in_callback = true;
mosq->on_publish(mosq, mosq->userdata, mid);
on_publish(mosq, mosq->userdata, mid);
mosq->in_callback = false;
}
if(mosq->on_publish_v5){
if(on_publish_v5){
mosq->in_callback = true;
mosq->on_publish_v5(mosq, mosq->userdata, mid, reason_code, properties);
on_publish_v5(mosq, mosq->userdata, mid, reason_code, properties);
mosq->in_callback = false;
}
pthread_mutex_unlock(&mosq->callback_mutex);
mosquitto_property_free_all(&properties);
}else if(rc != MOSQ_ERR_NOT_FOUND){
return rc;
}
pthread_mutex_lock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_lock(&mosq->msgs_out.mutex);
message__release_to_inflight(mosq, mosq_md_out);
pthread_mutex_unlock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_out.mutex);
return MOSQ_ERR_SUCCESS;
#endif

View File

@@ -120,38 +120,44 @@ int handle__publish(struct mosquitto *mosq)
(long)message->msg.payloadlen);
message->timestamp = mosquitto_time();
void (*on_message)(struct mosquitto *, void *userdata, const struct mosquitto_message *message);
void (*on_message_v5)(struct mosquitto *, void *userdata, const struct mosquitto_message *message, const mosquitto_property *props);
switch(message->msg.qos){
case 0:
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_message){
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
on_message = mosq->on_message;
on_message_v5 = mosq->on_message_v5;
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_message){
mosq->in_callback = true;
mosq->on_message(mosq, mosq->userdata, &message->msg);
on_message(mosq, mosq->userdata, &message->msg);
mosq->in_callback = false;
}
if(mosq->on_message_v5){
mosq->in_callback = true;
mosq->on_message_v5(mosq, mosq->userdata, &message->msg, properties);
on_message_v5(mosq, mosq->userdata, &message->msg, properties);
mosq->in_callback = false;
}
pthread_mutex_unlock(&mosq->callback_mutex);
message__cleanup(&message);
mosquitto_property_free_all(&properties);
return MOSQ_ERR_SUCCESS;
case 1:
util__decrement_receive_quota(mosq);
rc = send__puback(mosq, mid, 0, NULL);
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_message){
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
on_message = mosq->on_message;
on_message_v5 = mosq->on_message_v5;
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_message){
mosq->in_callback = true;
mosq->on_message(mosq, mosq->userdata, &message->msg);
on_message(mosq, mosq->userdata, &message->msg);
mosq->in_callback = false;
}
if(mosq->on_message_v5){
if(on_message_v5){
mosq->in_callback = true;
mosq->on_message_v5(mosq, mosq->userdata, &message->msg, properties);
on_message_v5(mosq, mosq->userdata, &message->msg, properties);
mosq->in_callback = false;
}
pthread_mutex_unlock(&mosq->callback_mutex);
message__cleanup(&message);
mosquitto_property_free_all(&properties);
return rc;
@@ -159,10 +165,10 @@ int handle__publish(struct mosquitto *mosq)
message->properties = properties;
util__decrement_receive_quota(mosq);
rc = send__pubrec(mosq, mid, 0, NULL);
pthread_mutex_lock(&mosq->msgs_in.mutex);
COMPAT_pthread_mutex_lock(&mosq->msgs_in.mutex);
message->state = mosq_ms_wait_for_pubrel;
message__queue(mosq, message, mosq_md_in);
pthread_mutex_unlock(&mosq->msgs_in.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_in.mutex);
return rc;
default:
message__cleanup(&message);

View File

@@ -107,18 +107,21 @@ int handle__pubrec(struct mosquitto *mosq)
}else{
if(!message__delete(mosq, mid, mosq_md_out, 2)){
/* Only inform the client the message has been sent once. */
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_publish_v5){
void (*on_publish_v5)(struct mosquitto *, void *userdata, int mid, int reason_code, const mosquitto_property *props);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
on_publish_v5 = mosq->on_publish_v5;
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_publish_v5){
mosq->in_callback = true;
mosq->on_publish_v5(mosq, mosq->userdata, mid, reason_code, properties);
on_publish_v5(mosq, mosq->userdata, mid, reason_code, properties);
mosq->in_callback = false;
}
pthread_mutex_unlock(&mosq->callback_mutex);
}
util__increment_send_quota(mosq);
pthread_mutex_lock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_lock(&mosq->msgs_out.mutex);
message__release_to_inflight(mosq, mosq_md_out);
pthread_mutex_unlock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_out.mutex);
return MOSQ_ERR_SUCCESS;
}
#endif

View File

@@ -116,18 +116,22 @@ int handle__pubrel(struct mosquitto *mosq)
if(rc == MOSQ_ERR_SUCCESS){
/* Only pass the message on if we have removed it from the queue - this
* prevents multiple callbacks for the same message. */
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_message){
void (*on_message)(struct mosquitto *, void *userdata, const struct mosquitto_message *message);
void (*on_message_v5)(struct mosquitto *, void *userdata, const struct mosquitto_message *message, const mosquitto_property *props);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
on_message = mosq->on_message;
on_message_v5 = mosq->on_message_v5;
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_message){
mosq->in_callback = true;
mosq->on_message(mosq, mosq->userdata, &message->msg);
on_message(mosq, mosq->userdata, &message->msg);
mosq->in_callback = false;
}
if(mosq->on_message_v5){
if(on_message_v5){
mosq->in_callback = true;
mosq->on_message_v5(mosq, mosq->userdata, &message->msg, message->properties);
on_message_v5(mosq, mosq->userdata, &message->msg, message->properties);
mosq->in_callback = false;
}
pthread_mutex_unlock(&mosq->callback_mutex);
mosquitto_property_free_all(&properties);
message__cleanup(&message);
}else if(rc == MOSQ_ERR_NOT_FOUND){

View File

@@ -97,18 +97,22 @@ int handle__suback(struct mosquitto *mosq)
/* Immediately free, we don't do anything with Reason String or User Property at the moment */
mosquitto_property_free_all(&properties);
#else
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_subscribe){
void (*on_subscribe)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos);
void (*on_subscribe_v5)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos, const mosquitto_property *props);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
on_subscribe = mosq->on_subscribe;
on_subscribe_v5 = mosq->on_subscribe_v5;
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_subscribe){
mosq->in_callback = true;
mosq->on_subscribe(mosq, mosq->userdata, mid, qos_count, granted_qos);
on_subscribe(mosq, mosq->userdata, mid, qos_count, granted_qos);
mosq->in_callback = false;
}
if(mosq->on_subscribe_v5){
if(on_subscribe_v5){
mosq->in_callback = true;
mosq->on_subscribe_v5(mosq, mosq->userdata, mid, qos_count, granted_qos, properties);
on_subscribe_v5(mosq, mosq->userdata, mid, qos_count, granted_qos, properties);
mosq->in_callback = false;
}
pthread_mutex_unlock(&mosq->callback_mutex);
mosquitto_property_free_all(&properties);
#endif
mosquitto__free(granted_qos);

View File

@@ -76,18 +76,22 @@ int handle__unsuback(struct mosquitto *mosq)
/* Immediately free, we don't do anything with Reason String or User Property at the moment */
mosquitto_property_free_all(&properties);
#else
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_unsubscribe){
void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid);
void (*on_unsubscribe_v5)(struct mosquitto *, void *userdata, int mid, const mosquitto_property *props);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
on_unsubscribe = mosq->on_unsubscribe;
on_unsubscribe_v5 = mosq->on_unsubscribe_v5;
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_unsubscribe){
mosq->in_callback = true;
mosq->on_unsubscribe(mosq, mosq->userdata, mid);
on_unsubscribe(mosq, mosq->userdata, mid);
mosq->in_callback = false;
}
if(mosq->on_unsubscribe_v5){
if(on_unsubscribe_v5){
mosq->in_callback = true;
mosq->on_unsubscribe_v5(mosq, mosq->userdata, mid, properties);
on_unsubscribe_v5(mosq, mosq->userdata, mid, properties);
mosq->in_callback = false;
}
pthread_mutex_unlock(&mosq->callback_mutex);
mosquitto_property_free_all(&properties);
#endif

View File

@@ -33,16 +33,19 @@ int log__printf(struct mosquitto *mosq, unsigned int priority, const char *fmt,
va_list va;
char *s;
size_t len;
void (*on_log)(struct mosquitto *, void *userdata, int level, const char *str);
assert(mosq);
assert(fmt);
pthread_mutex_lock(&mosq->log_callback_mutex);
if(mosq->on_log){
COMPAT_pthread_mutex_lock(&mosq->log_callback_mutex);
on_log = mosq->on_log;
COMPAT_pthread_mutex_unlock(&mosq->log_callback_mutex);
if(on_log){
len = strlen(fmt) + 500;
s = mosquitto__malloc(len*sizeof(char));
if(!s){
pthread_mutex_unlock(&mosq->log_callback_mutex);
return MOSQ_ERR_NOMEM;
}
@@ -51,11 +54,10 @@ int log__printf(struct mosquitto *mosq, unsigned int priority, const char *fmt,
va_end(va);
s[len-1] = '\0'; /* Ensure string is null terminated. */
mosq->on_log(mosq, mosq->userdata, (int)priority, s);
on_log(mosq, mosq->userdata, (int)priority, s);
mosquitto__free(s);
}
pthread_mutex_unlock(&mosq->log_callback_mutex);
return MOSQ_ERR_SUCCESS;
}

View File

@@ -70,13 +70,13 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
if(mosq->ssl == NULL || SSL_is_init_finished(mosq->ssl))
#endif
{
pthread_mutex_lock(&mosq->current_out_packet_mutex);
pthread_mutex_lock(&mosq->out_packet_mutex);
COMPAT_pthread_mutex_lock(&mosq->current_out_packet_mutex);
COMPAT_pthread_mutex_lock(&mosq->out_packet_mutex);
if(mosq->out_packet || mosq->current_out_packet){
FD_SET(mosq->sock, &writefds);
}
pthread_mutex_unlock(&mosq->out_packet_mutex);
pthread_mutex_unlock(&mosq->current_out_packet_mutex);
COMPAT_pthread_mutex_unlock(&mosq->out_packet_mutex);
COMPAT_pthread_mutex_unlock(&mosq->current_out_packet_mutex);
}
}
}else{
@@ -110,11 +110,11 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
}
now = mosquitto_time();
pthread_mutex_lock(&mosq->msgtime_mutex);
COMPAT_pthread_mutex_lock(&mosq->msgtime_mutex);
if(mosq->next_msg_out && now + timeout_ms/1000 > mosq->next_msg_out){
timeout_ms = (mosq->next_msg_out - now)*1000;
}
pthread_mutex_unlock(&mosq->msgtime_mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
if(timeout_ms < 0){
/* There has been a delay somewhere which means we should have already
@@ -252,7 +252,7 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
while(run){
do{
#ifdef HAVE_PTHREAD_CANCEL
pthread_testcancel();
COMPAT_pthread_testcancel();
#endif
rc = mosquitto_loop(mosq, timeout, max_packets);
}while(run && rc == MOSQ_ERR_SUCCESS);
@@ -279,7 +279,7 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
}
do{
#ifdef HAVE_PTHREAD_CANCEL
pthread_testcancel();
COMPAT_pthread_testcancel();
#endif
rc = MOSQ_ERR_SUCCESS;
if(mosquitto__get_request_disconnect(mosq)){
@@ -335,18 +335,23 @@ static int mosquitto__loop_rc_handle(struct mosquitto *mosq, int rc)
if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
rc = MOSQ_ERR_SUCCESS;
}
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_disconnect){
void (*on_disconnect)(struct mosquitto *, void *userdata, int rc);
void (*on_disconnect_v5)(struct mosquitto *, void *userdata, int rc, const mosquitto_property *props);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
on_disconnect = mosq->on_disconnect;
on_disconnect_v5 = mosq->on_disconnect_v5;
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_disconnect){
mosq->in_callback = true;
mosq->on_disconnect(mosq, mosq->userdata, rc);
on_disconnect(mosq, mosq->userdata, rc);
mosq->in_callback = false;
}
if(mosq->on_disconnect_v5){
if(on_disconnect_v5){
mosq->in_callback = true;
mosq->on_disconnect_v5(mosq, mosq->userdata, rc, NULL);
on_disconnect_v5(mosq, mosq->userdata, rc, NULL);
mosq->in_callback = false;
}
pthread_mutex_unlock(&mosq->callback_mutex);
}
return rc;
}
@@ -358,13 +363,13 @@ int mosquitto_loop_read(struct mosquitto *mosq, int max_packets)
int i;
if(max_packets < 1) return MOSQ_ERR_INVAL;
pthread_mutex_lock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_lock(&mosq->msgs_out.mutex);
max_packets = mosq->msgs_out.queue_len;
pthread_mutex_unlock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_out.mutex);
pthread_mutex_lock(&mosq->msgs_in.mutex);
COMPAT_pthread_mutex_lock(&mosq->msgs_in.mutex);
max_packets += mosq->msgs_in.queue_len;
pthread_mutex_unlock(&mosq->msgs_in.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_in.mutex);
if(max_packets < 1) max_packets = 1;
/* Queue len here tells us how many messages are awaiting processing and

View File

@@ -142,7 +142,7 @@ void message__reconnect_reset(struct mosquitto *mosq, bool update_quota_only)
struct mosquitto_message_all *message, *tmp;
assert(mosq);
pthread_mutex_lock(&mosq->msgs_in.mutex);
COMPAT_pthread_mutex_lock(&mosq->msgs_in.mutex);
mosq->msgs_in.inflight_quota = mosq->msgs_in.inflight_maximum;
mosq->msgs_in.queue_len = 0;
DL_FOREACH_SAFE(mosq->msgs_in.inflight, message, tmp){
@@ -157,10 +157,10 @@ void message__reconnect_reset(struct mosquitto *mosq, bool update_quota_only)
util__decrement_receive_quota(mosq);
}
}
pthread_mutex_unlock(&mosq->msgs_in.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_in.mutex);
pthread_mutex_lock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_lock(&mosq->msgs_out.mutex);
mosq->msgs_out.inflight_quota = mosq->msgs_out.inflight_maximum;
mosq->msgs_out.queue_len = 0;
DL_FOREACH_SAFE(mosq->msgs_out.inflight, message, tmp){
@@ -185,7 +185,7 @@ void message__reconnect_reset(struct mosquitto *mosq, bool update_quota_only)
message->state = mosq_ms_invalid;
}
}
pthread_mutex_unlock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_out.mutex);
}
@@ -228,12 +228,12 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir
assert(message);
if(dir == mosq_md_out){
pthread_mutex_lock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_lock(&mosq->msgs_out.mutex);
DL_FOREACH_SAFE(mosq->msgs_out.inflight, cur, tmp){
if(found == false && cur->msg.mid == mid){
if(cur->msg.qos != qos){
pthread_mutex_unlock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_out.mutex);
return MOSQ_ERR_PROTOCOL;
}
DL_DELETE(mosq->msgs_out.inflight, cur);
@@ -244,18 +244,18 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir
break;
}
}
pthread_mutex_unlock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_out.mutex);
if(found){
return MOSQ_ERR_SUCCESS;
}else{
return MOSQ_ERR_NOT_FOUND;
}
}else{
pthread_mutex_lock(&mosq->msgs_in.mutex);
COMPAT_pthread_mutex_lock(&mosq->msgs_in.mutex);
DL_FOREACH_SAFE(mosq->msgs_in.inflight, cur, tmp){
if(cur->msg.mid == mid){
if(cur->msg.qos != qos){
pthread_mutex_unlock(&mosq->msgs_in.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_in.mutex);
return MOSQ_ERR_PROTOCOL;
}
DL_DELETE(mosq->msgs_in.inflight, cur);
@@ -266,7 +266,7 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir
}
}
pthread_mutex_unlock(&mosq->msgs_in.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_in.mutex);
if(found){
return MOSQ_ERR_SUCCESS;
}else{
@@ -282,7 +282,7 @@ void message__retry_check(struct mosquitto *mosq)
assert(mosq);
#ifdef WITH_THREADING
pthread_mutex_lock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_lock(&mosq->msgs_out.mutex);
#endif
DL_FOREACH(mosq->msgs_out.inflight, msg){
@@ -309,7 +309,7 @@ void message__retry_check(struct mosquitto *mosq)
}
}
#ifdef WITH_THREADING
pthread_mutex_unlock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_out.mutex);
#endif
}
@@ -325,20 +325,20 @@ int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg
struct mosquitto_message_all *message, *tmp;
assert(mosq);
pthread_mutex_lock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_lock(&mosq->msgs_out.mutex);
DL_FOREACH_SAFE(mosq->msgs_out.inflight, message, tmp){
if(message->msg.mid == mid){
if(message->msg.qos != qos){
pthread_mutex_unlock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_out.mutex);
return MOSQ_ERR_PROTOCOL;
}
message->state = state;
message->timestamp = mosquitto_time();
pthread_mutex_unlock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_out.mutex);
return MOSQ_ERR_SUCCESS;
}
}
pthread_mutex_unlock(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_unlock(&mosq->msgs_out.mutex);
return MOSQ_ERR_NOT_FOUND;
}

View File

@@ -203,24 +203,27 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st
mosq->ssl = NULL;
mosq->ssl_ctx = NULL;
mosq->ssl_ctx_defaults = true;
#ifndef WITH_BROKER
mosq->user_ssl_ctx = NULL;
#endif
mosq->tls_cert_reqs = SSL_VERIFY_PEER;
mosq->tls_insecure = false;
mosq->want_write = false;
mosq->tls_ocsp_required = false;
#endif
#ifdef WITH_THREADING
pthread_mutex_init(&mosq->callback_mutex, NULL);
pthread_mutex_init(&mosq->log_callback_mutex, NULL);
pthread_mutex_init(&mosq->state_mutex, NULL);
pthread_mutex_init(&mosq->out_packet_mutex, NULL);
pthread_mutex_init(&mosq->current_out_packet_mutex, NULL);
pthread_mutex_init(&mosq->msgtime_mutex, NULL);
pthread_mutex_init(&mosq->msgs_in.mutex, NULL);
pthread_mutex_init(&mosq->msgs_out.mutex, NULL);
pthread_mutex_init(&mosq->mid_mutex, NULL);
COMPAT_pthread_mutex_init(&mosq->callback_mutex, NULL);
COMPAT_pthread_mutex_init(&mosq->log_callback_mutex, NULL);
COMPAT_pthread_mutex_init(&mosq->state_mutex, NULL);
COMPAT_pthread_mutex_init(&mosq->out_packet_mutex, NULL);
COMPAT_pthread_mutex_init(&mosq->current_out_packet_mutex, NULL);
COMPAT_pthread_mutex_init(&mosq->msgtime_mutex, NULL);
COMPAT_pthread_mutex_init(&mosq->msgs_in.mutex, NULL);
COMPAT_pthread_mutex_init(&mosq->msgs_out.mutex, NULL);
COMPAT_pthread_mutex_init(&mosq->mid_mutex, NULL);
mosq->thread_id = pthread_self();
#endif
/* This must be after pthread_mutex_init(), otherwise the log mutex may be
/* This must be after COMPAT_pthread_mutex_init(), otherwise the log mutex may be
* used before being initialised. */
if(net__socketpair(&mosq->sockpairR, &mosq->sockpairW)){
log__printf(mosq, MOSQ_LOG_WARNING,
@@ -238,8 +241,8 @@ void mosquitto__destroy(struct mosquitto *mosq)
#ifdef WITH_THREADING
# ifdef HAVE_PTHREAD_CANCEL
if(mosq->threaded == mosq_ts_self && !pthread_equal(mosq->thread_id, pthread_self())){
pthread_cancel(mosq->thread_id);
pthread_join(mosq->thread_id, NULL);
COMPAT_pthread_cancel(mosq->thread_id);
COMPAT_pthread_join(mosq->thread_id, NULL);
mosq->threaded = mosq_ts_none;
}
# endif
@@ -248,15 +251,15 @@ void mosquitto__destroy(struct mosquitto *mosq)
/* If mosq->id is not NULL then the client has already been initialised
* and so the mutexes need destroying. If mosq->id is NULL, the mutexes
* haven't been initialised. */
pthread_mutex_destroy(&mosq->callback_mutex);
pthread_mutex_destroy(&mosq->log_callback_mutex);
pthread_mutex_destroy(&mosq->state_mutex);
pthread_mutex_destroy(&mosq->out_packet_mutex);
pthread_mutex_destroy(&mosq->current_out_packet_mutex);
pthread_mutex_destroy(&mosq->msgtime_mutex);
pthread_mutex_destroy(&mosq->msgs_in.mutex);
pthread_mutex_destroy(&mosq->msgs_out.mutex);
pthread_mutex_destroy(&mosq->mid_mutex);
COMPAT_pthread_mutex_destroy(&mosq->callback_mutex);
COMPAT_pthread_mutex_destroy(&mosq->log_callback_mutex);
COMPAT_pthread_mutex_destroy(&mosq->state_mutex);
COMPAT_pthread_mutex_destroy(&mosq->out_packet_mutex);
COMPAT_pthread_mutex_destroy(&mosq->current_out_packet_mutex);
COMPAT_pthread_mutex_destroy(&mosq->msgtime_mutex);
COMPAT_pthread_mutex_destroy(&mosq->msgs_in.mutex);
COMPAT_pthread_mutex_destroy(&mosq->msgs_out.mutex);
COMPAT_pthread_mutex_destroy(&mosq->mid_mutex);
}
#endif
if(mosq->sock != INVALID_SOCKET){
@@ -268,9 +271,17 @@ void mosquitto__destroy(struct mosquitto *mosq)
if(mosq->ssl){
SSL_free(mosq->ssl);
}
if(mosq->ssl_ctx){
SSL_CTX_free(mosq->ssl_ctx);
}
#ifndef WITH_BROKER
if(mosq->user_ssl_ctx){
SSL_CTX_free(mosq->user_ssl_ctx);
}else if(mosq->ssl_ctx){
SSL_CTX_free(mosq->ssl_ctx);
}
#else
if(mosq->ssl_ctx){
SSL_CTX_free(mosq->ssl_ctx);
}
#endif
mosquitto__free(mosq->tls_cafile);
mosquitto__free(mosq->tls_capath);
mosquitto__free(mosq->tls_certfile);
@@ -281,6 +292,10 @@ void mosquitto__destroy(struct mosquitto *mosq)
mosquitto__free(mosq->tls_psk);
mosquitto__free(mosq->tls_psk_identity);
mosquitto__free(mosq->tls_alpn);
#ifndef OPENSSL_NO_ENGINE
mosquitto__free(mosq->tls_engine);
mosq->tls_engine = NULL;
#endif
#endif
mosquitto__free(mosq->address);

View File

@@ -353,6 +353,10 @@ struct mosquitto {
struct mosquitto *for_free_next;
struct session_expiry_list *expiry_list_item;
uint16_t remote_port;
# ifndef WITH_OLD_KEEPALIVE
struct mosquitto *keepalive_next;
struct mosquitto *keepalive_prev;
# endif
#endif
uint32_t events;
};

View File

@@ -198,6 +198,15 @@ void net__init_tls(void)
}
#endif
bool net__is_connected(struct mosquitto *mosq)
{
#if defined(WITH_BROKER) && defined(WITH_WEBSOCKETS)
return mosq->sock != INVALID_SOCKET || mosq->wsi != NULL;
#else
return mosq->sock != INVALID_SOCKET;
#endif
}
/* Close a socket associated with a context and set it to -1.
* Returns 1 on failure (context is NULL)
* Returns 0 on success.

View File

@@ -75,6 +75,7 @@ int net__try_connect_step2(struct mosquitto *mosq, uint16_t port, mosq_sock_t *s
int net__socket_connect_step3(struct mosquitto *mosq, const char *host);
int net__socket_nonblock(mosq_sock_t *sock);
int net__socketpair(mosq_sock_t *sp1, mosq_sock_t *sp2);
bool net__is_connected(struct mosquitto *mosq);
ssize_t net__read(struct mosquitto *mosq, void *buf, size_t count);
ssize_t net__write(struct mosquitto *mosq, const void *buf, size_t count);

View File

@@ -296,20 +296,24 @@ int packet__write(struct mosquitto *mosq)
if(((packet->command)&0xF6) == CMD_PUBLISH){
G_PUB_MSGS_SENT_INC(1);
#ifndef WITH_BROKER
void (*on_publish)(struct mosquitto *, void *userdata, int mid);
void (*on_publish_v5)(struct mosquitto *, void *userdata, int mid, int reason_code, const mosquitto_property *props);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_publish){
/* This is a QoS=0 message */
mosq->in_callback = true;
mosq->on_publish(mosq, mosq->userdata, packet->mid);
mosq->in_callback = false;
}
if(mosq->on_publish_v5){
/* This is a QoS=0 message */
mosq->in_callback = true;
mosq->on_publish_v5(mosq, mosq->userdata, packet->mid, 0, NULL);
mosq->in_callback = false;
}
on_publish = mosq->on_publish;
on_publish_v5 = mosq->on_publish_v5;
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_publish){
/* This is a QoS=0 message */
mosq->in_callback = true;
on_publish(mosq, mosq->userdata, packet->mid);
mosq->in_callback = false;
}
if(on_publish_v5){
/* This is a QoS=0 message */
mosq->in_callback = true;
on_publish_v5(mosq, mosq->userdata, packet->mid, 0, NULL);
mosq->in_callback = false;
}
}else if(((packet->command)&0xF0) == CMD_DISCONNECT){
do_client_disconnect(mosq, MOSQ_ERR_SUCCESS, NULL);
packet__cleanup(packet);
@@ -395,6 +399,11 @@ int packet__read(struct mosquitto *mosq)
/* Clients must send CONNECT as their first command. */
if(!(mosq->bridge) && state == mosq_cs_new && (byte&0xF0) != CMD_CONNECT){
return MOSQ_ERR_PROTOCOL;
}else if((byte&0xF0) == CMD_RESERVED){
if(mosq->protocol == mosq_p_mqtt5){
send__disconnect(mosq, MQTT_RC_PROTOCOL_ERROR, NULL);
}
return MOSQ_ERR_PROTOCOL;
}
#endif
}else{

View File

@@ -241,7 +241,7 @@ void property__free(mosquitto_property **property)
break;
}
free(*property);
mosquitto__free(*property);
*property = NULL;
}
@@ -1120,7 +1120,7 @@ const mosquitto_property *mosquitto_property_read_binary(const mosquitto_propert
if(value){
*len = p->value.bin.len;
*value = calloc(1, *len + 1U);
*value = mosquitto__calloc(1, *len + 1U);
if(!(*value)) return NULL;
memcpy(*value, p->value.bin.v, *len);
@@ -1149,7 +1149,7 @@ const mosquitto_property *mosquitto_property_read_string(const mosquitto_propert
}
if(value){
*value = calloc(1, (size_t)p->value.s.len+1);
*value = mosquitto__calloc(1, (size_t)p->value.s.len+1);
if(!(*value)) return NULL;
memcpy(*value, p->value.s.v, p->value.s.len);
@@ -1172,16 +1172,16 @@ const mosquitto_property *mosquitto_property_read_string_pair(const mosquitto_pr
if(p->identifier != MQTT_PROP_USER_PROPERTY) return NULL;
if(name){
*name = calloc(1, (size_t)p->name.len+1);
*name = mosquitto__calloc(1, (size_t)p->name.len+1);
if(!(*name)) return NULL;
memcpy(*name, p->name.v, p->name.len);
}
if(value){
*value = calloc(1, (size_t)p->value.s.len+1);
*value = mosquitto__calloc(1, (size_t)p->value.s.len+1);
if(!(*value)){
if(name){
free(*name);
mosquitto__free(*name);
*name = NULL;
}
return NULL;
@@ -1203,7 +1203,7 @@ int mosquitto_property_copy_all(mosquitto_property **dest, const mosquitto_prope
*dest = NULL;
while(src){
pnew = calloc(1, sizeof(mosquitto_property));
pnew = mosquitto__calloc(1, sizeof(mosquitto_property));
if(!pnew){
mosquitto_property_free_all(dest);
return MOSQ_ERR_NOMEM;
@@ -1255,7 +1255,7 @@ int mosquitto_property_copy_all(mosquitto_property **dest, const mosquitto_prope
case MQTT_PROP_SERVER_REFERENCE:
case MQTT_PROP_REASON_STRING:
pnew->value.s.len = src->value.s.len;
pnew->value.s.v = src->value.s.v ? strdup(src->value.s.v) : (char*)calloc(1,1);
pnew->value.s.v = src->value.s.v ? mosquitto__strdup(src->value.s.v) : (char*)mosquitto__calloc(1,1);
if(!pnew->value.s.v){
mosquitto_property_free_all(dest);
return MOSQ_ERR_NOMEM;
@@ -1265,7 +1265,7 @@ int mosquitto_property_copy_all(mosquitto_property **dest, const mosquitto_prope
case MQTT_PROP_AUTHENTICATION_DATA:
case MQTT_PROP_CORRELATION_DATA:
pnew->value.bin.len = src->value.bin.len;
pnew->value.bin.v = malloc(pnew->value.bin.len);
pnew->value.bin.v = mosquitto__malloc(pnew->value.bin.len);
if(!pnew->value.bin.v){
mosquitto_property_free_all(dest);
return MOSQ_ERR_NOMEM;
@@ -1275,14 +1275,14 @@ int mosquitto_property_copy_all(mosquitto_property **dest, const mosquitto_prope
case MQTT_PROP_USER_PROPERTY:
pnew->value.s.len = src->value.s.len;
pnew->value.s.v = src->value.s.v ? strdup(src->value.s.v) : (char*)calloc(1,1);
pnew->value.s.v = src->value.s.v ? mosquitto__strdup(src->value.s.v) : (char*)mosquitto__calloc(1,1);
if(!pnew->value.s.v){
mosquitto_property_free_all(dest);
return MOSQ_ERR_NOMEM;
}
pnew->name.len = src->name.len;
pnew->name.v = src->name.v ? strdup(src->name.v) : (char*)calloc(1,1);
pnew->name.v = src->name.v ? mosquitto__strdup(src->name.v) : (char*)mosquitto__calloc(1,1);
if(!pnew->name.v){
mosquitto_property_free_all(dest);
return MOSQ_ERR_NOMEM;

Some files were not shown because too many files have changed in this diff Show More