diff options
Diffstat (limited to 'src/aclk/mqtt_websockets')
21 files changed, 476 insertions, 2371 deletions
diff --git a/src/aclk/mqtt_websockets/.github/workflows/run-tests.yaml b/src/aclk/mqtt_websockets/.github/workflows/run-tests.yaml deleted file mode 100644 index da5dde821..000000000 --- a/src/aclk/mqtt_websockets/.github/workflows/run-tests.yaml +++ /dev/null @@ -1,14 +0,0 @@ -name: run-tests -on: - push: - schedule: - - cron: '5 3 * * 0' - pull_request: -jobs: - run-tests: - runs-on: ubuntu-latest - steps: - - name: Install ruby and deps - run: sudo apt-get install ruby ruby-dev mosquitto - - name: Checkout - uses: actions/checkout@v2 diff --git a/src/aclk/mqtt_websockets/.gitignore b/src/aclk/mqtt_websockets/.gitignore deleted file mode 100644 index 9f1a0d89a..000000000 --- a/src/aclk/mqtt_websockets/.gitignore +++ /dev/null @@ -1,10 +0,0 @@ -build/* -!build/.keep -test -.vscode -mqtt/mqtt.c -mqtt/include/mqtt.h -libmqttwebsockets.* -*.o -.dirstamp -.deps diff --git a/src/aclk/mqtt_websockets/README.md b/src/aclk/mqtt_websockets/README.md index b159686df..9507fedb5 100644 --- a/src/aclk/mqtt_websockets/README.md +++ b/src/aclk/mqtt_websockets/README.md @@ -4,4 +4,4 @@ Library to connect MQTT client over Websockets Secure (WSS). ## License -The Project is released under GPL v3 license. See [License](LICENSE) +The Project is released under GPL v3 license. See [License](/LICENSE) diff --git a/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.c b/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.c deleted file mode 100644 index 8950c6906..000000000 --- a/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.c +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright: SPDX-License-Identifier: GPL-3.0-only - -#include "cringbuffer.h" -#include "cringbuffer_internal.h" - -#include <stdlib.h> -#include <assert.h> -#include <string.h> - -#define MIN(a,b) (((a)<(b))?(a):(b)) -#define MAX(a,b) (((a)>(b))?(a):(b)) - -// this allows user to use their own -// custom memory allocation functions -#ifdef RBUF_CUSTOM_MALLOC -#include "../../helpers/ringbuffer_pal.h" -#else -#define crbuf_malloc(...) malloc(__VA_ARGS__) -#define crbuf_free(...) free(__VA_ARGS__) -#endif - -rbuf_t rbuf_create(size_t size) -{ - rbuf_t buffer = crbuf_malloc(sizeof(struct rbuf_t) + size); - if (!buffer) - return NULL; - - memset(buffer, 0, sizeof(struct rbuf_t)); - - buffer->data = ((char*)buffer) + sizeof(struct rbuf_t); - - buffer->head = buffer->data; - buffer->tail = buffer->data; - buffer->size = size; - buffer->end = buffer->data + size; - - return buffer; -} - -void rbuf_free(rbuf_t buffer) -{ - crbuf_free(buffer); -} - -void rbuf_flush(rbuf_t buffer) -{ - buffer->head = buffer->data; - buffer->tail = buffer->data; - buffer->size_data = 0; -} - -char *rbuf_get_linear_insert_range(rbuf_t buffer, size_t *bytes) -{ - *bytes = 0; - if (buffer->head == buffer->tail && buffer->size_data) - return NULL; - - *bytes = ((buffer->head >= buffer->tail) ? buffer->end : buffer->tail) - buffer->head; - return buffer->head; -} - -char *rbuf_get_linear_read_range(rbuf_t buffer, size_t *bytes) -{ - *bytes = 0; - if(buffer->head == buffer->tail && !buffer->size_data) - return NULL; - - *bytes = ((buffer->tail >= buffer->head) ? buffer->end : buffer->head) - buffer->tail; - - return buffer->tail; -} - -int rbuf_bump_head(rbuf_t buffer, size_t bytes) -{ - size_t free_bytes = rbuf_bytes_free(buffer); - if (bytes > free_bytes) - return 0; - int i = buffer->head - buffer->data; - buffer->head = &buffer->data[(i + bytes) % buffer->size]; - buffer->size_data += bytes; - return 1; -} - -int rbuf_bump_tail(rbuf_t buffer, size_t bytes) -{ - if(!rbuf_bump_tail_noopt(buffer, bytes)) - return 0; - - // if tail catched up with head - // start writing buffer from beggining - // this is not necessary (rbuf must work well without it) - // but helps to optimize big writes as rbuf_get_linear_insert_range - // will return bigger continuous region - if(buffer->tail == buffer->head) { - assert(buffer->size_data == 0); - rbuf_flush(buffer); - } - - return 1; -} - -size_t rbuf_get_capacity(rbuf_t buffer) -{ - return buffer->size; -} - -size_t rbuf_bytes_available(rbuf_t buffer) -{ - return buffer->size_data; -} - -size_t rbuf_bytes_free(rbuf_t buffer) -{ - return buffer->size - buffer->size_data; -} - -size_t rbuf_push(rbuf_t buffer, const char *data, size_t len) -{ - size_t to_cpy; - char *w_ptr = rbuf_get_linear_insert_range(buffer, &to_cpy); - if(!to_cpy) - return to_cpy; - - to_cpy = MIN(to_cpy, len); - memcpy(w_ptr, data, to_cpy); - rbuf_bump_head(buffer, to_cpy); - if(to_cpy < len) - to_cpy += rbuf_push(buffer, &data[to_cpy], len - to_cpy); - return to_cpy; -} - -size_t rbuf_pop(rbuf_t buffer, char *data, size_t len) -{ - size_t to_cpy; - const char *r_ptr = rbuf_get_linear_read_range(buffer, &to_cpy); - if(!to_cpy) - return to_cpy; - - to_cpy = MIN(to_cpy, len); - memcpy(data, r_ptr, to_cpy); - rbuf_bump_tail(buffer, to_cpy); - if(to_cpy < len) - to_cpy += rbuf_pop(buffer, &data[to_cpy], len - to_cpy); - return to_cpy; -} - -static inline void rbuf_ptr_inc(rbuf_t buffer, const char **ptr) -{ - (*ptr)++; - if(*ptr >= buffer->end) - *ptr = buffer->data; -} - -int rbuf_memcmp(rbuf_t buffer, const char *haystack, const char *needle, size_t needle_bytes) -{ - const char *end = needle + needle_bytes; - - // as head==tail can mean 2 things here - if (haystack == buffer->head && buffer->size_data) { - if (*haystack != *needle) - return (*haystack - *needle); - rbuf_ptr_inc(buffer, &haystack); - needle++; - } - - while (haystack != buffer->head && needle != end) { - if (*haystack != *needle) - return (*haystack - *needle); - rbuf_ptr_inc(buffer, &haystack); - needle++; - } - return 0; -} - -int rbuf_memcmp_n(rbuf_t buffer, const char *to_cmp, size_t to_cmp_bytes) -{ - return rbuf_memcmp(buffer, buffer->tail, to_cmp, to_cmp_bytes); -} - -char *rbuf_find_bytes(rbuf_t buffer, const char *needle, size_t needle_bytes, int *found_idx) -{ - const char *ptr = buffer->tail; - *found_idx = 0; - - if (!rbuf_bytes_available(buffer)) - return NULL; - - if (buffer->head == buffer->tail && buffer->size_data) { - if(!rbuf_memcmp(buffer, ptr, needle, needle_bytes)) - return (char *)ptr; - rbuf_ptr_inc(buffer, &ptr); - (*found_idx)++; - } - - while (ptr != buffer->head) - { - if(!rbuf_memcmp(buffer, ptr, needle, needle_bytes)) - return (char *)ptr; - rbuf_ptr_inc(buffer, &ptr); - (*found_idx)++; - } - return NULL; -} diff --git a/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.h b/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.h deleted file mode 100644 index eb98035a9..000000000 --- a/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.h +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright: SPDX-License-Identifier: GPL-3.0-only - -#ifndef CRINGBUFFER_H -#define CRINGBUFFER_H - -#include <stddef.h> - -typedef struct rbuf_t *rbuf_t; - -rbuf_t rbuf_create(size_t size); -void rbuf_free(rbuf_t buffer); -void rbuf_flush(rbuf_t buffer); - -/* /param bytes how much bytes can be copied into pointer returned - * /return pointer where data can be copied to or NULL if buffer full - */ -char *rbuf_get_linear_insert_range(rbuf_t buffer, size_t *bytes); -char *rbuf_get_linear_read_range(rbuf_t buffer, size_t *bytes); - -int rbuf_bump_head(rbuf_t buffer, size_t bytes); -int rbuf_bump_tail(rbuf_t buffer, size_t bytes); - -/* @param buffer related buffer instance - * @returns total capacity of buffer in bytes (not free/used) - */ -size_t rbuf_get_capacity(rbuf_t buffer); - -/* @param buffer related buffer instance - * @returns count of bytes stored in the buffer - */ -size_t rbuf_bytes_available(rbuf_t buffer); - -/* @param buffer related buffer instance - * @returns count of bytes available/free in the buffer (how many more bytes you can store in this buffer) - */ -size_t rbuf_bytes_free(rbuf_t buffer); - -/* writes as many bytes from `data` into the `buffer` as possible - * but maximum `len` bytes - */ -size_t rbuf_push(rbuf_t buffer, const char *data, size_t len); -size_t rbuf_pop(rbuf_t buffer, char *data, size_t len); - -char *rbuf_find_bytes(rbuf_t buffer, const char *needle, size_t needle_bytes, int *found_idx); -int rbuf_memcmp_n(rbuf_t buffer, const char *to_cmp, size_t to_cmp_bytes); - -#endif diff --git a/src/aclk/mqtt_websockets/c-rbuf/cringbuffer_internal.h b/src/aclk/mqtt_websockets/c-rbuf/cringbuffer_internal.h deleted file mode 100644 index d32de187c..000000000 --- a/src/aclk/mqtt_websockets/c-rbuf/cringbuffer_internal.h +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright: SPDX-License-Identifier: GPL-3.0-only - -#ifndef CRINGBUFFER_INTERNAL_H -#define CRINGBUFFER_INTERNAL_H - -struct rbuf_t { - char *data; - - // points to next byte where we can write - char *head; - // points to oldest (next to be poped) readable byte - char *tail; - - // to avoid calculating data + size - // all the time - char *end; - - size_t size; - size_t size_data; -}; - -/* this exists so that it can be tested by unit tests - * without optimization that resets head and tail to - * beginning if buffer empty - */ -inline static int rbuf_bump_tail_noopt(rbuf_t buffer, size_t bytes) -{ - if (bytes > buffer->size_data) - return 0; - int i = buffer->tail - buffer->data; - buffer->tail = &buffer->data[(i + bytes) % buffer->size]; - buffer->size_data -= bytes; - - return 1; -} - -#endif diff --git a/src/aclk/mqtt_websockets/c-rbuf/ringbuffer_test.c b/src/aclk/mqtt_websockets/c-rbuf/ringbuffer_test.c deleted file mode 100644 index 6a17c9956..000000000 --- a/src/aclk/mqtt_websockets/c-rbuf/ringbuffer_test.c +++ /dev/null @@ -1,485 +0,0 @@ -// Copyright: SPDX-License-Identifier: GPL-3.0-only - -#include "ringbuffer.h" - -// to be able to access internals -// never do this from app -#include "../src/ringbuffer_internal.h" - -#include <stdio.h> -#include <string.h> - -#define KNRM "\x1B[0m" -#define KRED "\x1B[31m" -#define KGRN "\x1B[32m" -#define KYEL "\x1B[33m" -#define KBLU "\x1B[34m" -#define KMAG "\x1B[35m" -#define KCYN "\x1B[36m" -#define KWHT "\x1B[37m" - -#define UNUSED(x) (void)(x) - -int total_fails = 0; -int total_tests = 0; -int total_checks = 0; - -#define CHECK_EQ_RESULT(x, y) \ - while (s_len--) \ - putchar('.'); \ - printf("%s%s " KNRM "\n", (((x) == (y)) ? KGRN : KRED), (((x) == (y)) ? " PASS " : " FAIL ")); \ - if ((x) != (y)) \ - total_fails++; \ - total_checks++; - -#define CHECK_EQ_PREFIX(x, y, prefix, subtest_name, ...) \ - { \ - int s_len = \ - 100 - \ - printf(("Checking: " KWHT "%s %s%2d " subtest_name " " KNRM), __func__, prefix, subtest_no, ##__VA_ARGS__); \ - CHECK_EQ_RESULT(x, y) \ - } - -#define CHECK_EQ(x, y, subtest_name, ...) \ - { \ - int s_len = \ - 100 - printf(("Checking: " KWHT "%s %2d " subtest_name " " KNRM), __func__, subtest_no, ##__VA_ARGS__); \ - CHECK_EQ_RESULT(x, y) \ - } - -#define TEST_DECL() \ - int subtest_no = 0; \ - printf(KYEL "TEST SUITE: %s\n" KNRM, __func__); \ - total_tests++; - -static void test_rbuf_get_linear_insert_range() -{ - TEST_DECL(); - - // check empty buffer behaviour - rbuf_t buff = rbuf_create(5); - char *to_write; - size_t ret; - to_write = rbuf_get_linear_insert_range(buff, &ret); - CHECK_EQ(ret, 5, "empty size"); - CHECK_EQ(to_write, buff->head, "empty write ptr"); - rbuf_free(buff); - - // check full buffer behaviour - subtest_no++; - buff = rbuf_create(5); - ret = rbuf_bump_head(buff, 5); - CHECK_EQ(ret, 1, "ret"); - to_write = rbuf_get_linear_insert_range(buff, &ret); - CHECK_EQ(to_write, NULL, "writable NULL"); - CHECK_EQ(ret, 0, "writable count = 0"); - - // check buffer flush - subtest_no++; - rbuf_flush(buff); - CHECK_EQ(rbuf_bytes_free(buff), 5, "size_free"); - CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail"); - CHECK_EQ(buff->head, buff->data, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); - - // check behaviour head > tail - subtest_no++; - rbuf_flush(buff); - rbuf_bump_head(buff, 3); - to_write = rbuf_get_linear_insert_range(buff, &ret); - CHECK_EQ(to_write, buff->head, "write location"); - CHECK_EQ(ret, 2, "availible to linear write"); - - // check behaviour tail > head - subtest_no++; - rbuf_flush(buff); - rbuf_bump_head(buff, 5); - rbuf_bump_tail(buff, 3); - CHECK_EQ(buff->head, buff->data, "head_ptr"); - CHECK_EQ(buff->tail, buff->data + 3, "tail_ptr"); - to_write = rbuf_get_linear_insert_range(buff, &ret); - CHECK_EQ(to_write, buff->head, "write location"); - CHECK_EQ(ret, 3, "availible to linear write"); - -/* // check behaviour tail and head at last element - subtest_no++; - rbuf_flush(buff); - rbuf_bump_head(buff, 4); - rbuf_bump_tail(buff, 4); - CHECK_EQ(buff->head, buff->end - 1, "head_ptr"); - CHECK_EQ(buff->tail, buff->end - 1, "tail_ptr"); - to_write = rbuf_get_linear_insert_range(buff, &ret); - CHECK_EQ(to_write, buff->head, "write location"); - CHECK_EQ(ret, 1, "availible to linear write");*/ - - // check behaviour tail and head at last element - // after rbuf_bump_tail optimisation that restarts buffer - // in case tail catches up with head - subtest_no++; - rbuf_flush(buff); - rbuf_bump_head(buff, 4); - rbuf_bump_tail(buff, 4); - CHECK_EQ(buff->head, buff->data, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); - to_write = rbuf_get_linear_insert_range(buff, &ret); - CHECK_EQ(to_write, buff->head, "write location"); - CHECK_EQ(ret, 5, "availible to linear write"); -} - -#define _CHECK_EQ(x, y, subtest_name, ...) CHECK_EQ_PREFIX(x, y, prefix, subtest_name, ##__VA_ARGS__) -#define _PREFX "(size = %5zu) " -static void test_rbuf_bump_head_bsize(size_t size) -{ - char prefix[16]; - snprintf(prefix, 16, _PREFX, size); - int subtest_no = 0; - rbuf_t buff = rbuf_create(size); - _CHECK_EQ(rbuf_bytes_free(buff), size, "size_free"); - - subtest_no++; - int ret = rbuf_bump_head(buff, size); - _CHECK_EQ(buff->data, buff->head, "loc"); - _CHECK_EQ(ret, 1, "ret"); - _CHECK_EQ(buff->size_data, buff->size, "size"); - _CHECK_EQ(rbuf_bytes_free(buff), 0, "size_free"); - - subtest_no++; - ret = rbuf_bump_head(buff, 1); - _CHECK_EQ(buff->data, buff->head, "loc no move"); - _CHECK_EQ(ret, 0, "ret error"); - _CHECK_EQ(buff->size_data, buff->size, "size"); - _CHECK_EQ(rbuf_bytes_free(buff), 0, "size_free"); - rbuf_free(buff); - - subtest_no++; - buff = rbuf_create(size); - ret = rbuf_bump_head(buff, size - 1); - _CHECK_EQ(buff->head, buff->end-1, "loc end"); - rbuf_free(buff); -} -#undef _CHECK_EQ - -static void test_rbuf_bump_head() -{ - TEST_DECL(); - UNUSED(subtest_no); - - size_t test_sizes[] = { 1, 2, 3, 5, 6, 7, 8, 100, 99999, 0 }; - for (int i = 0; test_sizes[i]; i++) - test_rbuf_bump_head_bsize(test_sizes[i]); -} - -static void test_rbuf_bump_tail_noopt(int subtest_no) -{ - rbuf_t buff = rbuf_create(10); - CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free"); - CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail"); - - subtest_no++; - int ret = rbuf_bump_head(buff, 5); - CHECK_EQ(ret, 1, "ret"); - CHECK_EQ(rbuf_bytes_free(buff), 5, "size_free"); - CHECK_EQ(rbuf_bytes_available(buff), 5, "size_avail"); - CHECK_EQ(buff->head, buff->data + 5, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); - - subtest_no++; - ret = rbuf_bump_tail_noopt(buff, 2); - CHECK_EQ(ret, 1, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 3, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 7, "size_free"); - CHECK_EQ(buff->head, buff->data + 5, "head_ptr"); - CHECK_EQ(buff->tail, buff->data + 2, "tail_ptr"); - - subtest_no++; - ret = rbuf_bump_tail_noopt(buff, 3); - CHECK_EQ(ret, 1, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free"); - CHECK_EQ(buff->head, buff->data + 5, "head_ptr"); - CHECK_EQ(buff->tail, buff->data + 5, "tail_ptr"); - - subtest_no++; - ret = rbuf_bump_tail_noopt(buff, 1); - CHECK_EQ(ret, 0, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free"); - CHECK_EQ(buff->head, buff->data + 5, "head_ptr"); - CHECK_EQ(buff->tail, buff->data + 5, "tail_ptr"); - - subtest_no++; - ret = rbuf_bump_head(buff, 7); - CHECK_EQ(ret, 1, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 7, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 3, "size_free"); - CHECK_EQ(buff->head, buff->data + 2, "head_ptr"); - CHECK_EQ(buff->tail, buff->data + 5, "tail_ptr"); - - subtest_no++; - ret = rbuf_bump_tail_noopt(buff, 5); - CHECK_EQ(ret, 1, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 2, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 8, "size_free"); - CHECK_EQ(buff->head, buff->data + 2, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); - - // check tail can't overrun head - subtest_no++; - ret = rbuf_bump_tail_noopt(buff, 3); - CHECK_EQ(ret, 0, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 2, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 8, "size_free"); - CHECK_EQ(buff->head, buff->data + 2, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); - - // check head can't overrun tail - subtest_no++; - ret = rbuf_bump_head(buff, 9); - CHECK_EQ(ret, 0, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 2, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 8, "size_free"); - CHECK_EQ(buff->head, buff->data + 2, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); - - // check head can fill the buffer - subtest_no++; - ret = rbuf_bump_head(buff, 8); - CHECK_EQ(ret, 1, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 10, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 0, "size_free"); - CHECK_EQ(buff->head, buff->data, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); - - // check can empty the buffer - subtest_no++; - ret = rbuf_bump_tail_noopt(buff, 10); - CHECK_EQ(ret, 1, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free"); - CHECK_EQ(buff->head, buff->data, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); -} - -static void test_rbuf_bump_tail_opt(int subtest_no) -{ - subtest_no++; - rbuf_t buff = rbuf_create(10); - CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free"); - CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail"); - - subtest_no++; - int ret = rbuf_bump_head(buff, 5); - CHECK_EQ(ret, 1, "ret"); - CHECK_EQ(rbuf_bytes_free(buff), 5, "size_free"); - CHECK_EQ(rbuf_bytes_available(buff), 5, "size_avail"); - CHECK_EQ(buff->head, buff->data + 5, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); - - subtest_no++; - ret = rbuf_bump_tail(buff, 2); - CHECK_EQ(ret, 1, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 3, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 7, "size_free"); - CHECK_EQ(buff->head, buff->data + 5, "head_ptr"); - CHECK_EQ(buff->tail, buff->data + 2, "tail_ptr"); - - subtest_no++; - ret = rbuf_bump_tail(buff, 3); - CHECK_EQ(ret, 1, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free"); - CHECK_EQ(buff->head, buff->data, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); - - subtest_no++; - ret = rbuf_bump_tail_noopt(buff, 1); - CHECK_EQ(ret, 0, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free"); - CHECK_EQ(buff->head, buff->data, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); - - subtest_no++; - ret = rbuf_bump_head(buff, 6); - ret = rbuf_bump_tail(buff, 5); - ret = rbuf_bump_head(buff, 6); - CHECK_EQ(ret, 1, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 7, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 3, "size_free"); - CHECK_EQ(buff->head, buff->data + 2, "head_ptr"); - CHECK_EQ(buff->tail, buff->data + 5, "tail_ptr"); - - subtest_no++; - ret = rbuf_bump_tail(buff, 5); - CHECK_EQ(ret, 1, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 2, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 8, "size_free"); - CHECK_EQ(buff->head, buff->data + 2, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); - - // check tail can't overrun head - subtest_no++; - ret = rbuf_bump_tail(buff, 3); - CHECK_EQ(ret, 0, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 2, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 8, "size_free"); - CHECK_EQ(buff->head, buff->data + 2, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); - - // check head can't overrun tail - subtest_no++; - ret = rbuf_bump_head(buff, 9); - CHECK_EQ(ret, 0, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 2, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 8, "size_free"); - CHECK_EQ(buff->head, buff->data + 2, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); - - // check head can fill the buffer - subtest_no++; - ret = rbuf_bump_head(buff, 8); - CHECK_EQ(ret, 1, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 10, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 0, "size_free"); - CHECK_EQ(buff->head, buff->data, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); - - // check can empty the buffer - subtest_no++; - ret = rbuf_bump_tail(buff, 10); - CHECK_EQ(ret, 1, "ret"); - CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail"); - CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free"); - CHECK_EQ(buff->head, buff->data, "head_ptr"); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); -} - -static void test_rbuf_bump_tail() -{ - TEST_DECL(); - test_rbuf_bump_tail_noopt(subtest_no); - test_rbuf_bump_tail_opt(subtest_no); -} - -#define ASCII_A 0x61 -#define ASCII_Z 0x7A -#define TEST_DATA_SIZE ASCII_Z-ASCII_A+1 -static void test_rbuf_push() -{ - TEST_DECL(); - rbuf_t buff = rbuf_create(10); - int i; - char test_data[TEST_DATA_SIZE]; - - for (int i = 0; i <= TEST_DATA_SIZE; i++) - test_data[i] = i + ASCII_A; - - int ret = rbuf_push(buff, test_data, 10); - CHECK_EQ(ret, 10, "written 10 bytes"); - CHECK_EQ(rbuf_bytes_free(buff), 0, "empty size == 0"); - for (i = 0; i < 10; i++) - CHECK_EQ(buff->data[i], i + ASCII_A, "Check data"); - - subtest_no++; - rbuf_flush(buff); - rbuf_bump_head(buff, 5); - rbuf_bump_tail_noopt(buff, 5); //to not reset both pointers to beginning - ret = rbuf_push(buff, test_data, 10); - CHECK_EQ(ret, 10, "written 10 bytes"); - for (i = 0; i < 10; i++) - CHECK_EQ(buff->data[i], ((i+5)%10) + ASCII_A, "Check Data"); - - subtest_no++; - rbuf_flush(buff); - rbuf_bump_head(buff, 9); - rbuf_bump_tail_noopt(buff, 9); - ret = rbuf_push(buff, test_data, 10); - CHECK_EQ(ret, 10, "written 10 bytes"); - for (i = 0; i < 10; i++) - CHECK_EQ(buff->data[i], ((i + 1) % 10) + ASCII_A, "Check data"); - - // let tail > head - subtest_no++; - rbuf_flush(buff); - rbuf_bump_head(buff, 9); - rbuf_bump_tail_noopt(buff, 9); - rbuf_bump_head(buff, 1); - ret = rbuf_push(buff, test_data, 9); - CHECK_EQ(ret, 9, "written 9 bytes"); - CHECK_EQ(buff->head, buff->end - 1, "head_ptr"); - CHECK_EQ(buff->tail, buff->head, "tail_ptr"); - rbuf_bump_tail(buff, 1); - //TODO push byte can be usefull optimisation - ret = rbuf_push(buff, &test_data[9], 1); - CHECK_EQ(ret, 1, "written 1 byte"); - CHECK_EQ(rbuf_bytes_free(buff), 0, "empty size == 0"); - for (i = 0; i < 10; i++) - CHECK_EQ(buff->data[i], i + ASCII_A, "Check data"); - - subtest_no++; - rbuf_flush(buff); - rbuf_bump_head(buff, 9); - rbuf_bump_tail_noopt(buff, 7); - rbuf_bump_head(buff, 1); - ret = rbuf_push(buff, test_data, 7); - CHECK_EQ(ret, 7, "written 7 bytes"); - CHECK_EQ(buff->head, buff->data + 7, "head_ptr"); - CHECK_EQ(buff->tail, buff->head, "tail_ptr"); - rbuf_bump_tail(buff, 3); - CHECK_EQ(buff->tail, buff->data, "tail_ptr"); - //TODO push byte can be usefull optimisation - ret = rbuf_push(buff, &test_data[7], 3); - CHECK_EQ(ret, 3, "written 3 bytes"); - CHECK_EQ(rbuf_bytes_free(buff), 0, "empty size == 0"); - for (i = 0; i < 10; i++) - CHECK_EQ(buff->data[i], i + ASCII_A, "Check data"); - - // test can't overfill the buffer - subtest_no++; - rbuf_flush(buff); - rbuf_push(buff, test_data, TEST_DATA_SIZE); - CHECK_EQ(ret, 3, "written 10 bytes"); - for (i = 0; i < 10; i++) - CHECK_EQ(buff->data[i], i + ASCII_A, "Check data"); -} - -#define TEST_RBUF_FIND_BYTES_SIZE 10 -void test_rbuf_find_bytes() -{ - TEST_DECL(); - rbuf_t buff = rbuf_create(TEST_RBUF_FIND_BYTES_SIZE); - char *filler_3 = " "; - char *needle = "needle"; - int idx; - char *ptr; - - // make sure needle is wrapped aroung in the buffer - // to test we still can find it - // target "edle ne" - rbuf_bump_head(buff, TEST_RBUF_FIND_BYTES_SIZE / 2); - rbuf_push(buff, filler_3, strlen(filler_3)); - rbuf_bump_tail(buff, TEST_RBUF_FIND_BYTES_SIZE / 2); - rbuf_push(buff, needle, strlen(needle)); - ptr = rbuf_find_bytes(buff, needle, strlen(needle), &idx); - CHECK_EQ(ptr, buff->data + (TEST_RBUF_FIND_BYTES_SIZE / 2) + strlen(filler_3), "Pointer to needle correct"); - CHECK_EQ(idx, ptr - buff->tail, "Check needle index"); -} - -int main() -{ - test_rbuf_bump_head(); - test_rbuf_bump_tail(); - test_rbuf_get_linear_insert_range(); - test_rbuf_push(); - test_rbuf_find_bytes(); - - printf( - KNRM "Total Tests %d, Total Checks %d, Successful Checks %d, Failed Checks %d\n", - total_tests, total_checks, total_checks - total_fails, total_fails); - if (total_fails) - printf(KRED "!!!Some test(s) Failed!!!\n"); - else - printf(KGRN "ALL TESTS PASSED\n"); - - return total_fails; -} diff --git a/src/aclk/mqtt_websockets/c_rhash/c_rhash.c b/src/aclk/mqtt_websockets/c_rhash/c_rhash.c deleted file mode 100644 index a71b500e2..000000000 --- a/src/aclk/mqtt_websockets/c_rhash/c_rhash.c +++ /dev/null @@ -1,264 +0,0 @@ -// Copyright: SPDX-License-Identifier: GPL-3.0-only - -#include "c_rhash_internal.h" - -#include <stdlib.h> -#include <string.h> - -#ifdef DEBUG_VERBOSE -#include <stdio.h> -#endif - -#define c_rmalloc(...) malloc(__VA_ARGS__) -#define c_rcalloc(...) calloc(__VA_ARGS__) -#define c_rfree(...) free(__VA_ARGS__) - -static inline uint32_t simple_hash(const char *name) { - unsigned char *s = (unsigned char *) name; - uint32_t hval = 0x811c9dc5; - while (*s) { - hval *= 16777619; - hval ^= (uint32_t) *s++; - } - return hval; -} - -c_rhash c_rhash_new(size_t bin_count) { - if (!bin_count) - bin_count = 1000; - - c_rhash hash = c_rcalloc(1, sizeof(struct c_rhash_s) + (bin_count * sizeof(struct bin_ll*)) ); - if (hash == NULL) - return NULL; - - hash->bin_count = bin_count; - hash->bins = (c_rhash_bin *)((char*)hash + sizeof(struct c_rhash_s)); - - return hash; -} - -static size_t get_itemtype_len(uint8_t item_type, const void* item_data) { - switch (item_type) { - case ITEMTYPE_STRING: - return strlen(item_data) + 1; - case ITEMTYPE_UINT64: - return sizeof(uint64_t); - case ITEMTYPE_UINT8: - return 1; - case ITEMTYPE_OPAQUE_PTR: - return sizeof(void*); - default: - return 0; - } -} - -static int compare_bin_item(struct bin_item *item, uint8_t key_type, const void *key) { - if (item->key_type != key_type) - return 1; - - size_t key_value_len = get_itemtype_len(key_type, key); - - if(key_type == ITEMTYPE_STRING) { - size_t new_key_value_len = get_itemtype_len(item->key_type, item->key); - if (new_key_value_len != key_value_len) - return 1; - } - - if(memcmp(item->key, key, key_value_len) == 0) { - return 0; - } - - return 1; -} - -static int insert_into_bin(c_rhash_bin *bin, uint8_t key_type, const void *key, uint8_t value_type, const void *value) { - struct bin_item *prev = NULL; - while (*bin != NULL) { - if (!compare_bin_item(*bin, key_type, key)) { -#ifdef DEBUG_VERBOSE - printf("Key already present! Updating value!\n"); -#endif -// TODO: optimize here if the new value is of different kind compared to the old one -// in case it is not crazily bigger we can reuse the memory and avoid malloc and free - c_rfree((*bin)->value); - (*bin)->value_type = value_type; - (*bin)->value = c_rmalloc(get_itemtype_len(value_type, value)); - if ((*bin)->value == NULL) - return 1; - memcpy((*bin)->value, value, get_itemtype_len(value_type, value)); - return 0; - } - prev = *bin; - bin = &(*bin)->next; - } - - if (*bin == NULL) - *bin = c_rcalloc(1, sizeof(struct bin_item)); - if (prev != NULL) - prev->next = *bin; - - (*bin)->key_type = key_type; - size_t len = get_itemtype_len(key_type, key); - (*bin)->key = c_rmalloc(len); - memcpy((*bin)->key, key, len); - - (*bin)->value_type = value_type; - len = get_itemtype_len(value_type, value); - (*bin)->value = c_rmalloc(len); - memcpy((*bin)->value, value, len); - return 0; -} - -static inline uint32_t get_bin_idx_str(c_rhash hash, const char *key) { - uint32_t nhash = simple_hash(key); - return nhash % hash->bin_count; -} - -static inline c_rhash_bin *get_binptr_by_str(c_rhash hash, const char *key) { - return &hash->bins[get_bin_idx_str(hash, key)]; -} - -int c_rhash_insert_str_ptr(c_rhash hash, const char *key, void *value) { - c_rhash_bin *bin = get_binptr_by_str(hash, key); - -#ifdef DEBUG_VERBOSE - if (bin != NULL) - printf("COLLISION. There will be more than one item in bin idx=%d\n", nhash); -#endif - - return insert_into_bin(bin, ITEMTYPE_STRING, key, ITEMTYPE_OPAQUE_PTR, &value); -} - -int c_rhash_insert_str_uint8(c_rhash hash, const char *key, uint8_t value) { - c_rhash_bin *bin = get_binptr_by_str(hash, key); - -#ifdef DEBUG_VERBOSE - if (bin != NULL) - printf("COLLISION. There will be more than one item in bin idx=%d\n", nhash); -#endif - - return insert_into_bin(bin, ITEMTYPE_STRING, key, ITEMTYPE_UINT8, &value); -} - -int c_rhash_insert_uint64_ptr(c_rhash hash, uint64_t key, void *value) { - c_rhash_bin *bin = &hash->bins[key % hash->bin_count]; - -#ifdef DEBUG_VERBOSE - if (bin != NULL) - printf("COLLISION. There will be more than one item in bin idx=%d\n", nhash); -#endif - - return insert_into_bin(bin, ITEMTYPE_UINT64, &key, ITEMTYPE_OPAQUE_PTR, &value); -} - -int c_rhash_get_uint8_by_str(c_rhash hash, const char *key, uint8_t *ret_val) { - uint32_t nhash = get_bin_idx_str(hash, key); - - struct bin_item *bin = hash->bins[nhash]; - - while (bin) { - if (bin->key_type == ITEMTYPE_STRING) { - if (!strcmp(bin->key, key)) { - *ret_val = *(uint8_t*)bin->value; - return 0; - } - } - bin = bin->next; - } - return 1; -} - -int c_rhash_get_ptr_by_str(c_rhash hash, const char *key, void **ret_val) { - uint32_t nhash = get_bin_idx_str(hash, key); - - struct bin_item *bin = hash->bins[nhash]; - - while (bin) { - if (bin->key_type == ITEMTYPE_STRING) { - if (!strcmp(bin->key, key)) { - *ret_val = *((void**)bin->value); - return 0; - } - } - bin = bin->next; - } - *ret_val = NULL; - return 1; -} - -int c_rhash_get_ptr_by_uint64(c_rhash hash, uint64_t key, void **ret_val) { - uint32_t nhash = key % hash->bin_count; - - struct bin_item *bin = hash->bins[nhash]; - - while (bin) { - if (bin->key_type == ITEMTYPE_UINT64) { - if (*((uint64_t *)bin->key) == key) { - *ret_val = *((void**)bin->value); - return 0; - } - } - bin = bin->next; - } - *ret_val = NULL; - return 1; -} - -static void c_rhash_destroy_bin(c_rhash_bin bin) { - struct bin_item *next; - do { - next = bin->next; - c_rfree(bin->key); - c_rfree(bin->value); - c_rfree(bin); - bin = next; - } while (bin != NULL); -} - -int c_rhash_iter_uint64_keys(c_rhash hash, c_rhash_iter_t *iter, uint64_t *key) { - while (iter->bin < hash->bin_count) { - if (iter->item != NULL) - iter->item = iter->item->next; - if (iter->item == NULL) { - if (iter->initialized) - iter->bin++; - else - iter->initialized = 1; - if (iter->bin < hash->bin_count) - iter->item = hash->bins[iter->bin]; - } - if (iter->item != NULL && iter->item->key_type == ITEMTYPE_UINT64) { - *key = *(uint64_t*)iter->item->key; - return 0; - } - } - return 1; -} - -int c_rhash_iter_str_keys(c_rhash hash, c_rhash_iter_t *iter, const char **key) { - while (iter->bin < hash->bin_count) { - if (iter->item != NULL) - iter->item = iter->item->next; - if (iter->item == NULL) { - if (iter->initialized) - iter->bin++; - else - iter->initialized = 1; - if (iter->bin < hash->bin_count) - iter->item = hash->bins[iter->bin]; - } - if (iter->item != NULL && iter->item->key_type == ITEMTYPE_STRING) { - *key = (const char*)iter->item->key; - return 0; - } - } - return 1; -} - -void c_rhash_destroy(c_rhash hash) { - for (size_t i = 0; i < hash->bin_count; i++) { - if (hash->bins[i] != NULL) - c_rhash_destroy_bin(hash->bins[i]); - } - c_rfree(hash); -} diff --git a/src/aclk/mqtt_websockets/c_rhash/c_rhash.h b/src/aclk/mqtt_websockets/c_rhash/c_rhash.h deleted file mode 100644 index 37addd161..000000000 --- a/src/aclk/mqtt_websockets/c_rhash/c_rhash.h +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright: SPDX-License-Identifier: GPL-3.0-only - -#include <sys/types.h> -#include <stdint.h> -#include <stddef.h> - -#ifndef DEFAULT_BIN_COUNT - #define DEFAULT_BIN_COUNT 1000 -#endif - -#define ITEMTYPE_UNSET (0x0) -#define ITEMTYPE_STRING (0x1) -#define ITEMTYPE_UINT8 (0x2) -#define ITEMTYPE_UINT64 (0x3) -#define ITEMTYPE_OPAQUE_PTR (0x4) - -typedef struct c_rhash_s *c_rhash; - -c_rhash c_rhash_new(size_t bin_count); - -void c_rhash_destroy(c_rhash hash); - -// # Insert -// ## Insert where key is string -int c_rhash_insert_str_ptr(c_rhash hash, const char *key, void *value); -int c_rhash_insert_str_uint8(c_rhash hash, const char *key, uint8_t value); -// ## Insert where key is uint64 -int c_rhash_insert_uint64_ptr(c_rhash hash, uint64_t key, void *value); - -// # Get -// ## Get where key is string -int c_rhash_get_ptr_by_str(c_rhash hash, const char *key, void **ret_val); -int c_rhash_get_uint8_by_str(c_rhash hash, const char *key, uint8_t *ret_val); -// ## Get where key is uint64 -int c_rhash_get_ptr_by_uint64(c_rhash hash, uint64_t key, void **ret_val); - -typedef struct { - size_t bin; - struct bin_item *item; - int initialized; -} c_rhash_iter_t; - -#define C_RHASH_ITER_T_INITIALIZER { .bin = 0, .item = NULL, .initialized = 0 } - -#define c_rhash_iter_t_initialize(p_iter) memset(p_iter, 0, sizeof(c_rhash_iter_t)) - -/* - * goes trough whole hash map and returns every - * type uint64 key present/stored - * - * it is not necessary to finish iterating and iterator can be reinitialized - * there are no guarantees on the order in which the keys will come - * behavior here is implementation dependent and can change any time - * - * returns: - * 0 for every key and stores the key in *key - * 1 on error or when all keys of this type has been already iterated over - */ -int c_rhash_iter_uint64_keys(c_rhash hash, c_rhash_iter_t *iter, uint64_t *key); - -int c_rhash_iter_str_keys(c_rhash hash, c_rhash_iter_t *iter, const char **key); diff --git a/src/aclk/mqtt_websockets/c_rhash/c_rhash_internal.h b/src/aclk/mqtt_websockets/c_rhash/c_rhash_internal.h deleted file mode 100644 index 20f741076..000000000 --- a/src/aclk/mqtt_websockets/c_rhash/c_rhash_internal.h +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright: SPDX-License-Identifier: GPL-3.0-only - -#include "c_rhash.h" - -struct bin_item { - uint8_t key_type:4; - void *key; - uint8_t value_type:4; - void *value; - - struct bin_item *next; -}; - -typedef struct bin_item *c_rhash_bin; - -struct c_rhash_s { - size_t bin_count; - c_rhash_bin *bins; -}; diff --git a/src/aclk/mqtt_websockets/c_rhash/tests.c b/src/aclk/mqtt_websockets/c_rhash/tests.c deleted file mode 100644 index 909c5562d..000000000 --- a/src/aclk/mqtt_websockets/c_rhash/tests.c +++ /dev/null @@ -1,273 +0,0 @@ -// Copyright: SPDX-License-Identifier: GPL-3.0-only - -#include <stdio.h> -#include <string.h> - -#include "c_rhash.h" - -// terminal color codes -#define KNRM "\x1B[0m" -#define KRED "\x1B[31m" -#define KGRN "\x1B[32m" -#define KYEL "\x1B[33m" -#define KBLU "\x1B[34m" -#define KMAG "\x1B[35m" -#define KCYN "\x1B[36m" -#define KWHT "\x1B[37m" - -#define KEY_1 "key1" -#define KEY_2 "keya" - -#define PRINT_ERR(str, ...) fprintf(stderr, "└─╼ ❌ " KRED str KNRM "\n" __VA_OPT__(,) __VA_ARGS__) - -#define ASSERT_RETVAL(fnc, comparator, expected_retval, ...) \ -{ int rval; \ -if(!((rval = fnc(__VA_ARGS__)) comparator expected_retval)) { \ - PRINT_ERR("Failed test. Value returned by \"%s\" in fnc:\"%s\",line:%d is not equal to expected value. Expected:%d, Got:%d", #fnc, __FUNCTION__, __LINE__, expected_retval, rval); \ - rc = 1; \ - goto test_cleanup; \ -} passed_subtest_count++;}; - -#define ASSERT_VAL_UINT8(returned, expected) \ -if(returned != expected) { \ - PRINT_ERR("Failed test. Value returned (%d) doesn't match expected (%d)! fnc:\"%s\",line:%d", returned, expected, __FUNCTION__, __LINE__); \ - rc = 1; \ - goto test_cleanup; \ -} passed_subtest_count++; - -#define ASSERT_VAL_PTR(returned, expected) \ -if((void*)returned != (void*)expected) { \ - PRINT_ERR("Failed test. Value returned(%p) doesn't match expected(%p)! fnc:\"%s\",line:%d", (void*)returned, (void*)expected, __FUNCTION__, __LINE__); \ - rc = 1; \ - goto test_cleanup; \ -} passed_subtest_count++; - -#define ALL_SUBTESTS_PASS() printf("└─╼ ✅" KGRN " Test \"%s\" DONE. All of %zu subtests PASS. (line:%d)\n" KNRM, __FUNCTION__, passed_subtest_count, __LINE__); - -#define TEST_START() size_t passed_subtest_count = 0; int rc = 0; printf("╒═ Starting test \"%s\"\n", __FUNCTION__); - -int test_str_uint8() { - c_rhash hash = c_rhash_new(100); - uint8_t val; - - TEST_START(); - // function should fail on empty hash - ASSERT_RETVAL(c_rhash_get_uint8_by_str, !=, 0, hash, KEY_1, &val); - - ASSERT_RETVAL(c_rhash_insert_str_uint8, ==, 0, hash, KEY_1, 5); - ASSERT_RETVAL(c_rhash_get_uint8_by_str, ==, 0, hash, KEY_1, &val); - ASSERT_VAL_UINT8(5, val); - - ASSERT_RETVAL(c_rhash_insert_str_uint8, ==, 0, hash, KEY_2, 8); - ASSERT_RETVAL(c_rhash_get_uint8_by_str, ==, 0, hash, KEY_1, &val); - ASSERT_VAL_UINT8(5, val); - ASSERT_RETVAL(c_rhash_get_uint8_by_str, ==, 0, hash, KEY_2, &val); - ASSERT_VAL_UINT8(8, val); - ASSERT_RETVAL(c_rhash_get_uint8_by_str, !=, 0, hash, "sndnskjdf", &val); - - // test update of key - ASSERT_RETVAL(c_rhash_insert_str_uint8, ==, 0, hash, KEY_1, 100); - ASSERT_RETVAL(c_rhash_get_uint8_by_str, ==, 0, hash, KEY_1, &val); - ASSERT_VAL_UINT8(100, val); - - ALL_SUBTESTS_PASS(); -test_cleanup: - c_rhash_destroy(hash); - return rc; -} - -int test_uint64_ptr() { - c_rhash hash = c_rhash_new(100); - void *val; - - TEST_START(); - - // function should fail on empty hash - ASSERT_RETVAL(c_rhash_get_ptr_by_uint64, !=, 0, hash, 0, &val); - - ASSERT_RETVAL(c_rhash_insert_uint64_ptr, ==, 0, hash, 0, &hash); - ASSERT_RETVAL(c_rhash_get_ptr_by_uint64, ==, 0, hash, 0, &val); - ASSERT_VAL_PTR(&hash, val); - - ASSERT_RETVAL(c_rhash_insert_uint64_ptr, ==, 0, hash, 1, &val); - ASSERT_RETVAL(c_rhash_get_ptr_by_uint64, ==, 0, hash, 0, &val); - ASSERT_VAL_PTR(&hash, val); - ASSERT_RETVAL(c_rhash_get_ptr_by_uint64, ==, 0, hash, 1, &val); - ASSERT_VAL_PTR(&val, val); - ASSERT_RETVAL(c_rhash_get_ptr_by_uint64, !=, 0, hash, 2, &val); - - ALL_SUBTESTS_PASS(); -test_cleanup: - c_rhash_destroy(hash); - return rc; -} - -#define UINT64_PTR_INC_ITERATION_COUNT 5000 -int test_uint64_ptr_incremental() { - c_rhash hash = c_rhash_new(100); - void *val; - - TEST_START(); - - char a = 0x20; - char *ptr = &a; - while(ptr < &a + UINT64_PTR_INC_ITERATION_COUNT) { - ASSERT_RETVAL(c_rhash_insert_uint64_ptr, ==, 0, hash, (ptr-&a), ptr); - ptr++; - } - - ptr = &a; - char *retptr; - for(int i = 0; i < UINT64_PTR_INC_ITERATION_COUNT; i++) { - ASSERT_RETVAL(c_rhash_get_ptr_by_uint64, ==, 0, hash, i, (void**)&retptr); - ASSERT_VAL_PTR(retptr, (&a+i)); - } - - ALL_SUBTESTS_PASS(); -test_cleanup: - c_rhash_destroy(hash); - return rc; -} - -struct test_string { - const char *str; - int counter; -}; - -struct test_string test_strings[] = { - { .str = "Cillum reprehenderit eiusmod elit nisi aliquip esse exercitation commodo Lorem voluptate esse.", .counter = 0 }, - { .str = "Ullamco eiusmod tempor occaecat ad.", .counter = 0 }, - { .str = "Esse aliquip tempor sint tempor ullamco duis aute incididunt ad.", .counter = 0 }, - { .str = "Cillum Lorem labore cupidatat commodo proident adipisicing.", .counter = 0 }, - { .str = "Quis ad cillum officia exercitation.", .counter = 0 }, - { .str = "Ipsum enim dolor ullamco amet sint nisi ut occaecat sint non.", .counter = 0 }, - { .str = "Id duis officia ipsum cupidatat velit fugiat.", .counter = 0 }, - { .str = "Aliqua non occaecat voluptate reprehenderit reprehenderit veniam minim exercitation ea aliquip enim aliqua deserunt qui.", .counter = 0 }, - { .str = "Ullamco elit tempor laboris reprehenderit quis deserunt duis quis tempor reprehenderit magna dolore reprehenderit exercitation.", .counter = 0 }, - { .str = "Culpa do dolor quis incididunt et labore in ex.", .counter = 0 }, - { .str = "Aliquip velit cupidatat qui incididunt ipsum nostrud eiusmod ut proident nisi magna fugiat excepteur.", .counter = 0 }, - { .str = "Aliqua qui dolore tempor id proident ullamco sunt magna.", .counter = 0 }, - { .str = "Labore eiusmod ut fugiat dolore reprehenderit mollit magna.", .counter = 0 }, - { .str = "Veniam aliquip dolor excepteur minim nulla esse cupidatat esse.", .counter = 0 }, - { .str = "Do quis dolor irure nostrud occaecat aute proident anim.", .counter = 0 }, - { .str = "Enim veniam non nulla ad quis sit amet.", .counter = 0 }, - { .str = "Cillum reprehenderit do enim esse do ullamco consectetur ea.", .counter = 0 }, - { .str = "Sit et duis sint anim qui ad anim labore exercitation sunt cupidatat.", .counter = 0 }, - { .str = "Dolor officia adipisicing sint pariatur in dolor occaecat officia reprehenderit magna.", .counter = 0 }, - { .str = "Aliquip dolore qui occaecat eiusmod sunt incididunt reprehenderit minim et.", .counter = 0 }, - { .str = "Aute fugiat laboris cillum tempor consequat tempor do non laboris culpa officia nisi.", .counter = 0 }, - { .str = "Et excepteur do aliquip fugiat nisi velit tempor officia enim quis elit incididunt.", .counter = 0 }, - { .str = "Eu officia adipisicing incididunt occaecat officia cupidatat enim sit sit officia.", .counter = 0 }, - { .str = "Do amet cillum duis pariatur commodo nulla cillum magna nulla Lorem veniam cupidatat.", .counter = 0 }, - { .str = "Dolor adipisicing voluptate laboris occaecat culpa aliquip ipsum ut consequat aliqua aliquip commodo sunt velit.", .counter = 0 }, - { .str = "Nulla proident ipsum quis nulla.", .counter = 0 }, - { .str = "Laborum adipisicing nulla do aute aliqua est quis sint culpa pariatur laborum voluptate qui.", .counter = 0 }, - { .str = "Proident eiusmod sunt et nulla elit pariatur dolore irure ex voluptate excepteur adipisicing consectetur.", .counter = 0 }, - { .str = "Consequat ex voluptate officia excepteur aute deserunt proident commodo et.", .counter = 0 }, - { .str = "Velit sit cupidatat dolor dolore.", .counter = 0 }, - { .str = "Sunt enim do non anim nostrud exercitation ullamco ex proident commodo.", .counter = 0 }, - { .str = "Id ex officia cillum ad.", .counter = 0 }, - { .str = "Laboris in sunt eiusmod veniam laboris nostrud.", .counter = 0 }, - { .str = "Ex magna occaecat ea ea incididunt aliquip.", .counter = 0 }, - { .str = "Sunt eiusmod ex nostrud eu pariatur sit cupidatat ea adipisicing cillum culpa esse consequat aliquip.", .counter = 0 }, - { .str = "Excepteur commodo qui incididunt enim culpa sunt non excepteur Lorem adipisicing.", .counter = 0 }, - { .str = "Quis officia est ullamco reprehenderit incididunt occaecat pariatur ex reprehenderit nisi.", .counter = 0 }, - { .str = "Culpa irure proident proident et eiusmod irure aliqua ipsum cupidatat minim sit.", .counter = 0 }, - { .str = "Qui cupidatat aliquip est velit magna veniam.", .counter = 0 }, - { .str = "Pariatur ad ad mollit nostrud non irure minim veniam anim aliquip quis eu.", .counter = 0 }, - { .str = "Nisi ex minim eu adipisicing tempor Lorem nisi do ad exercitation est non eu.", .counter = 0 }, - { .str = "Cupidatat do mollit ad commodo cupidatat ut.", .counter = 0 }, - { .str = "Est non excepteur eiusmod nostrud et eu.", .counter = 0 }, - { .str = "Cupidatat mollit nisi magna officia ut elit eiusmod.", .counter = 0 }, - { .str = "Est aliqua consectetur laboris ex consequat est ut dolor.", .counter = 0 }, - { .str = "Duis eu laboris laborum ut id Lorem nostrud qui ad velit proident fugiat minim ullamco.", .counter = 0 }, - { .str = "Pariatur esse excepteur anim amet excepteur irure sint quis esse ex cupidatat ut.", .counter = 0 }, - { .str = "Esse reprehenderit amet qui excepteur aliquip amet.", .counter = 0 }, - { .str = "Ullamco laboris elit labore adipisicing aute nulla qui laborum tempor officia ut dolor aute.", .counter = 0 }, - { .str = "Commodo sunt cillum velit minim laborum Lorem aliqua tempor ad id eu.", .counter = 0 }, - { .str = NULL, .counter = 0 } -}; - -uint32_t test_strings_contain_element(const char *str) { - struct test_string *str_desc = test_strings; - while(str_desc->str) { - if (!strcmp(str, str_desc->str)) - return str_desc - test_strings; - str_desc++; - } - return -1; -} - -#define TEST_INCREMENT_STR_KEYS_HASH_SIZE 20 -int test_increment_str_keys() { - c_rhash hash; - const char *key; - - TEST_START(); - - hash = c_rhash_new(TEST_INCREMENT_STR_KEYS_HASH_SIZE); // less than element count of test_strings - - c_rhash_iter_t iter = C_RHASH_ITER_T_INITIALIZER; - - // check iter on empty hash - ASSERT_RETVAL(c_rhash_iter_str_keys, !=, 0, hash, &iter, &key); - - int32_t element_count = 0; - while (test_strings[element_count].str) { - ASSERT_RETVAL(c_rhash_insert_str_ptr, ==, 0, hash, test_strings[element_count].str, NULL); - test_strings[element_count].counter++; // we want to test we got each key exactly once - element_count++; - } - - if (element_count <= TEST_INCREMENT_STR_KEYS_HASH_SIZE * 2) { - // verify we are actually test also iteration trough single bin (when 2 keys have same hash pointing them to same bin) - PRINT_ERR("For this test to properly test all the hash size needs to be much smaller than all test key count."); - rc = 1; - goto test_cleanup; - } - - // we insert another type of key as iterator should skip it - // in case is another type - ASSERT_RETVAL(c_rhash_insert_uint64_ptr, ==, 0, hash, 5, NULL); - - c_rhash_iter_t_initialize(&iter); - while(!c_rhash_iter_str_keys(hash, &iter, &key)) { - element_count--; - int i; - if ( (i = test_strings_contain_element(key)) < 0) { - PRINT_ERR("Key \"%s\" is not present in test_strings array! (Fnc: %s, Line: %d)", key, __FUNCTION__, __LINE__); - rc = 1; - goto test_cleanup; - } - passed_subtest_count++; - - test_strings[i].counter--; - } - ASSERT_VAL_UINT8(element_count, 0); // we added also same non string keys - - // check each key was present exactly once - struct test_string *str_desc = test_strings; - while (str_desc->str) { - ASSERT_VAL_UINT8(str_desc->counter, 0); - str_desc++; - } - - ALL_SUBTESTS_PASS(); -test_cleanup: - c_rhash_destroy(hash); - return rc; -} - -#define RUN_TEST(fnc) \ -if(fnc()) \ - return 1; - -int main(int argc, char *argv[]) { - RUN_TEST(test_str_uint8); - RUN_TEST(test_uint64_ptr); - RUN_TEST(test_uint64_ptr_incremental); - RUN_TEST(test_increment_str_keys); - // TODO hash with mixed key tests - // TODO iterator test - return 0; -} diff --git a/src/aclk/mqtt_websockets/common_internal.h b/src/aclk/mqtt_websockets/common_internal.h index 2be1c45b8..d79dbb3f3 100644 --- a/src/aclk/mqtt_websockets/common_internal.h +++ b/src/aclk/mqtt_websockets/common_internal.h @@ -1,27 +1,12 @@ -// SPDX-License-Identifier: GPL-3.0-only +// SPDX-License-Identifier: GPL-3.0-or-later #ifndef COMMON_INTERNAL_H #define COMMON_INTERNAL_H #include "endian_compat.h" -#ifdef MQTT_WSS_CUSTOM_ALLOC -#include "../helpers/mqtt_wss_pal.h" -#else -#define mw_malloc(...) malloc(__VA_ARGS__) -#define mw_calloc(...) calloc(__VA_ARGS__) -#define mw_free(...) free(__VA_ARGS__) -#define mw_strdup(...) strdup(__VA_ARGS__) -#define mw_realloc(...) realloc(__VA_ARGS__) -#endif - #ifndef MQTT_WSS_FRAG_MEMALIGN #define MQTT_WSS_FRAG_MEMALIGN (8) #endif -#define OPENSSL_VERSION_095 0x00905100L -#define OPENSSL_VERSION_097 0x00907000L -#define OPENSSL_VERSION_110 0x10100000L -#define OPENSSL_VERSION_111 0x10101000L - #endif /* COMMON_INTERNAL_H */ diff --git a/src/aclk/mqtt_websockets/common_public.h b/src/aclk/mqtt_websockets/common_public.h index a855737f9..8f3b4f7d1 100644 --- a/src/aclk/mqtt_websockets/common_public.h +++ b/src/aclk/mqtt_websockets/common_public.h @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + #ifndef MQTT_WEBSOCKETS_COMMON_PUBLIC_H #define MQTT_WEBSOCKETS_COMMON_PUBLIC_H diff --git a/src/aclk/mqtt_websockets/mqtt_ng.c b/src/aclk/mqtt_websockets/mqtt_ng.c index 8ad6bd5c9..9abe77b5f 100644 --- a/src/aclk/mqtt_websockets/mqtt_ng.c +++ b/src/aclk/mqtt_websockets/mqtt_ng.c @@ -1,35 +1,19 @@ -// Copyright: SPDX-License-Identifier: GPL-3.0-only +// SPDX-License-Identifier: GPL-3.0-or-later #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif -#include <stdint.h> -#include <stdlib.h> -#include <string.h> -#include <pthread.h> -#include <inttypes.h> - -#include "c_rhash/c_rhash.h" +#include "libnetdata/libnetdata.h" #include "common_internal.h" #include "mqtt_constants.h" -#include "mqtt_wss_log.h" #include "mqtt_ng.h" -#define UNIT_LOG_PREFIX "mqtt_client: " -#define FATAL(fmt, ...) mws_fatal(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define ERROR(fmt, ...) mws_error(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define WARN(fmt, ...) mws_warn (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define INFO(fmt, ...) mws_info (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define DEBUG(fmt, ...) mws_debug(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) - #define SMALL_STRING_DONT_FRAGMENT_LIMIT 128 -#define MIN(a,b) (((a)<(b))?(a):(b)) - -#define LOCK_HDR_BUFFER(buffer) pthread_mutex_lock(&((buffer)->mutex)) -#define UNLOCK_HDR_BUFFER(buffer) pthread_mutex_unlock(&((buffer)->mutex)) +#define LOCK_HDR_BUFFER(buffer) spinlock_lock(&((buffer)->spinlock)) +#define UNLOCK_HDR_BUFFER(buffer) spinlock_unlock(&((buffer)->spinlock)) #define BUFFER_FRAG_GARBAGE_COLLECT 0x01 // some packets can be marked for garbage collection @@ -75,17 +59,17 @@ struct transaction_buffer { // to be able to revert state easily // in case of error mid processing struct header_buffer state_backup; - pthread_mutex_t mutex; + SPINLOCK spinlock; struct buffer_fragment *sending_frag; }; enum mqtt_client_state { - RAW = 0, - CONNECT_PENDING, - CONNECTING, - CONNECTED, - ERROR, - DISCONNECTED + MQTT_STATE_RAW = 0, + MQTT_STATE_CONNECT_PENDING, + MQTT_STATE_CONNECTING, + MQTT_STATE_CONNECTED, + MQTT_STATE_ERROR, + MQTT_STATE_DISCONNECTED }; enum parser_state { @@ -224,7 +208,7 @@ struct topic_aliases_data { c_rhash stoi_dict; uint32_t idx_max; uint32_t idx_assigned; - pthread_rwlock_t rwlock; + SPINLOCK spinlock; }; struct mqtt_ng_client { @@ -234,8 +218,6 @@ struct mqtt_ng_client { mqtt_msg_data connect_msg; - mqtt_wss_log_ctx_t log; - mqtt_ng_send_fnc_t send_fnc_ptr; void *user_ctx; @@ -253,7 +235,7 @@ struct mqtt_ng_client { unsigned int ping_pending:1; struct mqtt_ng_stats stats; - pthread_mutex_t stats_mutex; + SPINLOCK stats_spinlock; struct topic_aliases_data tx_topic_aliases; c_rhash rx_aliases; @@ -407,7 +389,7 @@ enum memory_mode { CALLER_RESPONSIBLE }; -static inline enum memory_mode ptr2memory_mode(void * ptr) { +static enum memory_mode ptr2memory_mode(void * ptr) { if (ptr == NULL) return MEMCPY; if (ptr == CALLER_RESPONSIBILITY) @@ -492,15 +474,8 @@ static void buffer_rebuild(struct header_buffer *buf) } while(frag); } -static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx) +static void buffer_garbage_collect(struct header_buffer *buf) { -#if !defined(MQTT_DEBUG_VERBOSE) && !defined(ADDITIONAL_CHECKS) - (void) log_ctx; -#endif -#ifdef MQTT_DEBUG_VERBOSE - mws_debug(log_ctx, "Buffer Garbage Collection!"); -#endif - struct buffer_fragment *frag = BUFFER_FIRST_FRAG(buf); while (frag) { if (!frag_is_marked_for_gc(frag)) @@ -511,12 +486,8 @@ static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t frag = frag->next; } - if (frag == BUFFER_FIRST_FRAG(buf)) { -#ifdef MQTT_DEBUG_VERBOSE - mws_debug(log_ctx, "Buffer Garbage Collection! No Space Reclaimed!"); -#endif + if (frag == BUFFER_FIRST_FRAG(buf)) return; - } if (!frag) { buf->tail_frag = NULL; @@ -535,21 +506,17 @@ static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t buffer_rebuild(buf); } -static void transaction_buffer_garbage_collect(struct transaction_buffer *buf, mqtt_wss_log_ctx_t log_ctx) +static void transaction_buffer_garbage_collect(struct transaction_buffer *buf) { -#ifdef MQTT_DEBUG_VERBOSE - mws_debug(log_ctx, "Transaction Buffer Garbage Collection! %s", buf->sending_frag == NULL ? "NULL" : "in flight message"); -#endif - // Invalidate the cached sending fragment // as we will move data around if (buf->sending_frag != &ping_frag) buf->sending_frag = NULL; - buffer_garbage_collect(&buf->hdr_buffer, log_ctx); + buffer_garbage_collect(&buf->hdr_buffer); } -static int transaction_buffer_grow(struct transaction_buffer *buf, mqtt_wss_log_ctx_t log_ctx, float rate, size_t max) +static int transaction_buffer_grow(struct transaction_buffer *buf, float rate, size_t max) { if (buf->hdr_buffer.size >= max) return 0; @@ -565,35 +532,30 @@ static int transaction_buffer_grow(struct transaction_buffer *buf, mqtt_wss_log_ void *ret = reallocz(buf->hdr_buffer.data, buf->hdr_buffer.size); if (ret == NULL) { - mws_warn(log_ctx, "Buffer growth failed (realloc)"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "Buffer growth failed (realloc)"); return 1; } - mws_debug(log_ctx, "Message metadata buffer was grown"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Message metadata buffer was grown"); buf->hdr_buffer.data = ret; buffer_rebuild(&buf->hdr_buffer); return 0; } -inline static int transaction_buffer_init(struct transaction_buffer *to_init, size_t size) +inline static void transaction_buffer_init(struct transaction_buffer *to_init, size_t size) { - pthread_mutex_init(&to_init->mutex, NULL); + spinlock_init(&to_init->spinlock); to_init->hdr_buffer.size = size; to_init->hdr_buffer.data = mallocz(size); - if (to_init->hdr_buffer.data == NULL) - return 1; - to_init->hdr_buffer.tail = to_init->hdr_buffer.data; to_init->hdr_buffer.tail_frag = NULL; - return 0; } static void transaction_buffer_destroy(struct transaction_buffer *to_init) { buffer_purge(&to_init->hdr_buffer); - pthread_mutex_destroy(&to_init->mutex); freez(to_init->hdr_buffer.data); } @@ -629,54 +591,30 @@ void transaction_buffer_transaction_rollback(struct transaction_buffer *buf, str struct mqtt_ng_client *mqtt_ng_init(struct mqtt_ng_init *settings) { struct mqtt_ng_client *client = callocz(1, sizeof(struct mqtt_ng_client)); - if (client == NULL) - return NULL; - if (transaction_buffer_init(&client->main_buffer, HEADER_BUFFER_SIZE)) - goto err_free_client; + transaction_buffer_init(&client->main_buffer, HEADER_BUFFER_SIZE); client->rx_aliases = RX_ALIASES_INITIALIZE(); - if (client->rx_aliases == NULL) - goto err_free_trx_buf; - if (pthread_mutex_init(&client->stats_mutex, NULL)) - goto err_free_rx_alias; + spinlock_init(&client->stats_spinlock); + spinlock_init(&client->tx_topic_aliases.spinlock); client->tx_topic_aliases.stoi_dict = TX_ALIASES_INITIALIZE(); - if (client->tx_topic_aliases.stoi_dict == NULL) - goto err_free_stats_mutex; client->tx_topic_aliases.idx_max = UINT16_MAX; - if (pthread_rwlock_init(&client->tx_topic_aliases.rwlock, NULL)) - goto err_free_tx_alias; - // TODO just embed the struct into mqtt_ng_client client->parser.received_data = settings->data_in; client->send_fnc_ptr = settings->data_out_fnc; client->user_ctx = settings->user_ctx; - client->log = settings->log; - client->puback_callback = settings->puback_callback; client->connack_callback = settings->connack_callback; client->msg_callback = settings->msg_callback; return client; - -err_free_tx_alias: - c_rhash_destroy(client->tx_topic_aliases.stoi_dict); -err_free_stats_mutex: - pthread_mutex_destroy(&client->stats_mutex); -err_free_rx_alias: - c_rhash_destroy(client->rx_aliases); -err_free_trx_buf: - transaction_buffer_destroy(&client->main_buffer); -err_free_client: - freez(client); - return NULL; } -static inline uint8_t get_control_packet_type(uint8_t first_hdr_byte) +static uint8_t get_control_packet_type(uint8_t first_hdr_byte) { return first_hdr_byte >> 4; } @@ -708,33 +646,27 @@ static void mqtt_ng_destroy_tx_alias_hash(c_rhash hash) void mqtt_ng_destroy(struct mqtt_ng_client *client) { transaction_buffer_destroy(&client->main_buffer); - pthread_mutex_destroy(&client->stats_mutex); mqtt_ng_destroy_tx_alias_hash(client->tx_topic_aliases.stoi_dict); - pthread_rwlock_destroy(&client->tx_topic_aliases.rwlock); mqtt_ng_destroy_rx_alias_hash(client->rx_aliases); freez(client); } -int frag_set_external_data(mqtt_wss_log_ctx_t log, struct buffer_fragment *frag, void *data, size_t data_len, free_fnc_t data_free_fnc) +int frag_set_external_data(struct buffer_fragment *frag, void *data, size_t data_len, free_fnc_t data_free_fnc) { if (frag->len) { // TODO?: This could potentially be done in future if we set rule // external data always follows in buffer data // could help reduce fragmentation in some messages but // currently not worth it considering time is tight - mws_fatal(log, UNIT_LOG_PREFIX "INTERNAL ERROR: Cannot set external data to fragment already containing in buffer data!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "INTERNAL ERROR: Cannot set external data to fragment already containing in buffer data!"); return 1; } switch (ptr2memory_mode(data_free_fnc)) { case MEMCPY: frag->data = mallocz(data_len); - if (frag->data == NULL) { - mws_error(log, UNIT_LOG_PREFIX "OOM while malloc @_optimized_add"); - return 1; - } memcpy(frag->data, data, data_len); break; case EXTERNAL_FREE_AFTER_USE: @@ -816,18 +748,18 @@ static size_t mqtt_ng_connect_size(struct mqtt_auth_properties *auth, #define PACK_2B_INT(buffer, integer, frag) { *(uint16_t *)WRITE_POS(frag) = htobe16((integer)); \ DATA_ADVANCE(buffer, sizeof(uint16_t), frag); } -static int _optimized_add(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx, void *data, size_t data_len, free_fnc_t data_free_fnc, struct buffer_fragment **frag) +static int _optimized_add(struct header_buffer *buf, void *data, size_t data_len, free_fnc_t data_free_fnc, struct buffer_fragment **frag) { if (data_len > SMALL_STRING_DONT_FRAGMENT_LIMIT) { buffer_frag_flag_t flags = BUFFER_FRAG_DATA_EXTERNAL; if ((*frag)->flags & BUFFER_FRAG_GARBAGE_COLLECT_ON_SEND) flags |= BUFFER_FRAG_GARBAGE_COLLECT_ON_SEND; if( (*frag = buffer_new_frag(buf, flags)) == NULL ) { - mws_error(log_ctx, "Out of buffer space while generating the message"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Out of buffer space while generating the message"); return 1; } - if (frag_set_external_data(log_ctx, *frag, data, data_len, data_free_fnc)) { - mws_error(log_ctx, "Error adding external data to newly created fragment"); + if (frag_set_external_data(*frag, data, data_len, data_free_fnc)) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Error adding external data to newly created fragment"); return 1; } // we dont want to write to this fragment anymore @@ -842,31 +774,30 @@ static int _optimized_add(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx, return 0; } -#define TRY_GENERATE_MESSAGE(generator_function, client, ...) \ - int rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \ +#define TRY_GENERATE_MESSAGE(generator_function, ...) \ + int rc = generator_function(&client->main_buffer, ##__VA_ARGS__); \ if (rc == MQTT_NG_MSGGEN_BUFFER_OOM) { \ LOCK_HDR_BUFFER(&client->main_buffer); \ - transaction_buffer_garbage_collect((&client->main_buffer), client->log); \ + transaction_buffer_garbage_collect((&client->main_buffer)); \ UNLOCK_HDR_BUFFER(&client->main_buffer); \ - rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \ + rc = generator_function(&client->main_buffer, ##__VA_ARGS__); \ if (rc == MQTT_NG_MSGGEN_BUFFER_OOM && client->max_mem_bytes) { \ LOCK_HDR_BUFFER(&client->main_buffer); \ - transaction_buffer_grow((&client->main_buffer), client->log, GROWTH_FACTOR, client->max_mem_bytes); \ + transaction_buffer_grow((&client->main_buffer),GROWTH_FACTOR, client->max_mem_bytes); \ UNLOCK_HDR_BUFFER(&client->main_buffer); \ - rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \ + rc = generator_function(&client->main_buffer, ##__VA_ARGS__); \ } \ if (rc == MQTT_NG_MSGGEN_BUFFER_OOM) \ - mws_error(client->log, "%s failed to generate message due to insufficient buffer space (line %d)", __FUNCTION__, __LINE__); \ + nd_log(NDLS_DAEMON, NDLP_ERR, "%s failed to generate message due to insufficient buffer space (line %d)", __FUNCTION__, __LINE__); \ } \ if (rc == MQTT_NG_MSGGEN_OK) { \ - pthread_mutex_lock(&client->stats_mutex); \ + spinlock_lock(&client->stats_spinlock); \ client->stats.tx_messages_queued++; \ - pthread_mutex_unlock(&client->stats_mutex); \ + spinlock_unlock(&client->stats_spinlock); \ } \ return rc; mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, - mqtt_wss_log_ctx_t log_ctx, struct mqtt_auth_properties *auth, struct mqtt_lwt_properties *lwt, uint8_t clean_start, @@ -874,7 +805,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, { // Sanity Checks First (are given parameters correct and up to MQTT spec) if (!auth->client_id) { - mws_error(log_ctx, "ClientID must be set. [MQTT-3.1.3-3]"); + nd_log(NDLS_DAEMON, NDLP_ERR, "ClientID must be set. [MQTT-3.1.3-3]"); return NULL; } @@ -885,29 +816,29 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, // however server MUST allow ClientIDs between 1-23 bytes [MQTT-3.1.3-5] // so we will warn client server might not like this and he is using it // at his own risk! - mws_warn(log_ctx, "client_id provided is empty string. This might not be allowed by server [MQTT-3.1.3-6]"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "client_id provided is empty string. This might not be allowed by server [MQTT-3.1.3-6]"); } if(len > MQTT_MAX_CLIENT_ID) { // [MQTT-3.1.3-5] server MUST allow client_id length 1-32 // server MAY allow longer client_id, if user provides longer client_id // warn them he is doing so at his own risk! - mws_warn(log_ctx, "client_id provided is longer than 23 bytes, server might not allow that [MQTT-3.1.3-5]"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "client_id provided is longer than 23 bytes, server might not allow that [MQTT-3.1.3-5]"); } if (lwt) { if (lwt->will_message && lwt->will_message_size > 65535) { - mws_error(log_ctx, "Will message cannot be longer than 65535 bytes due to MQTT protocol limitations [MQTT-3.1.3-4] and [MQTT-1.5.6]"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Will message cannot be longer than 65535 bytes due to MQTT protocol limitations [MQTT-3.1.3-4] and [MQTT-1.5.6]"); return NULL; } if (!lwt->will_topic) { //TODO topic given with strlen==0 ? check specs - mws_error(log_ctx, "If will message is given will topic must also be given [MQTT-3.1.3.3]"); + nd_log(NDLS_DAEMON, NDLP_ERR, "If will message is given will topic must also be given [MQTT-3.1.3.3]"); return NULL; } if (lwt->will_qos > MQTT_MAX_QOS) { // refer to [MQTT-3-1.2-12] - mws_error(log_ctx, "QOS for LWT message is bigger than max"); + nd_log(NDLS_DAEMON, NDLP_ERR, "QOS for LWT message is bigger than max"); return NULL; } } @@ -941,8 +872,10 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, *connect_flags = 0; if (auth->username) *connect_flags |= MQTT_CONNECT_FLAG_USERNAME; + if (auth->password) *connect_flags |= MQTT_CONNECT_FLAG_PASSWORD; + if (lwt) { *connect_flags |= MQTT_CONNECT_FLAG_LWT; *connect_flags |= lwt->will_qos << MQTT_CONNECT_FLAG_QOS_BITSHIFT; @@ -966,7 +899,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, // [MQTT-3.1.3.1] Client identifier CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->client_id), frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, auth->client_id, strlen(auth->client_id), auth->client_id_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, auth->client_id, strlen(auth->client_id), auth->client_id_free, &frag)) goto fail_rollback; if (lwt != NULL) { @@ -980,7 +913,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, // Will Topic [MQTT-3.1.3.3] CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, strlen(lwt->will_topic), frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, lwt->will_topic, strlen(lwt->will_topic), lwt->will_topic_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, lwt->will_topic, strlen(lwt->will_topic), lwt->will_topic_free, &frag)) goto fail_rollback; // Will Payload [MQTT-3.1.3.4] @@ -988,7 +921,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, lwt->will_message_size, frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, lwt->will_message, lwt->will_message_size, lwt->will_topic_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, lwt->will_message, lwt->will_message_size, lwt->will_topic_free, &frag)) goto fail_rollback; } } @@ -998,7 +931,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->username), frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, auth->username, strlen(auth->username), auth->username_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, auth->username, strlen(auth->username), auth->username_free, &frag)) goto fail_rollback; } @@ -1007,7 +940,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->password), frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, auth->password, strlen(auth->password), auth->password_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, auth->password, strlen(auth->password), auth->password_free, &frag)) goto fail_rollback; } trx_buf->hdr_buffer.tail_frag->flags |= BUFFER_FRAG_MQTT_PACKET_TAIL; @@ -1024,7 +957,7 @@ int mqtt_ng_connect(struct mqtt_ng_client *client, uint8_t clean_start, uint16_t keep_alive) { - client->client_state = RAW; + client->client_state = MQTT_STATE_RAW; client->parser.state = MQTT_PARSE_FIXED_HEADER_PACKET_TYPE; LOCK_HDR_BUFFER(&client->main_buffer); @@ -1033,28 +966,23 @@ int mqtt_ng_connect(struct mqtt_ng_client *client, buffer_purge(&client->main_buffer.hdr_buffer); UNLOCK_HDR_BUFFER(&client->main_buffer); - pthread_rwlock_wrlock(&client->tx_topic_aliases.rwlock); + spinlock_lock(&client->tx_topic_aliases.spinlock); // according to MQTT spec topic aliases should not be persisted // even if clean session is true mqtt_ng_destroy_tx_alias_hash(client->tx_topic_aliases.stoi_dict); + client->tx_topic_aliases.stoi_dict = TX_ALIASES_INITIALIZE(); - if (client->tx_topic_aliases.stoi_dict == NULL) { - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); - return 1; - } client->tx_topic_aliases.idx_assigned = 0; - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); + spinlock_unlock(&client->tx_topic_aliases.spinlock); mqtt_ng_destroy_rx_alias_hash(client->rx_aliases); client->rx_aliases = RX_ALIASES_INITIALIZE(); - if (client->rx_aliases == NULL) - return 1; - client->connect_msg = mqtt_ng_generate_connect(&client->main_buffer, client->log, auth, lwt, clean_start, keep_alive); + client->connect_msg = mqtt_ng_generate_connect(&client->main_buffer, auth, lwt, clean_start, keep_alive); if (client->connect_msg == NULL) return 1; - pthread_mutex_lock(&client->stats_mutex); + spinlock_lock(&client->stats_spinlock); if (clean_start) client->stats.tx_messages_queued = 1; else @@ -1062,9 +990,9 @@ int mqtt_ng_connect(struct mqtt_ng_client *client, client->stats.tx_messages_sent = 0; client->stats.rx_messages_rcvd = 0; - pthread_mutex_unlock(&client->stats_mutex); + spinlock_unlock(&client->stats_spinlock); - client->client_state = CONNECT_PENDING; + client->client_state = MQTT_STATE_CONNECT_PENDING; return 0; } @@ -1074,15 +1002,16 @@ uint16_t get_unused_packet_id() { return packet_id ? packet_id : ++packet_id; } -static inline size_t mqtt_ng_publish_size(const char *topic, - size_t msg_len, - uint16_t topic_id) +static size_t mqtt_ng_publish_size( + const char *topic, + size_t msg_len, + uint16_t topic_id) { - size_t retval = 2 /* Topic Name Length */ - + (topic == NULL ? 0 : strlen(topic)) - + 2 /* Packet identifier */ - + 1 /* Properties Length TODO for now fixed to 1 property */ - + msg_len; + size_t retval = 2 + + (topic == NULL ? 0 : strlen(topic)) /* Topic Name Length */ + + 2 /* Packet identifier */ + + 1 /* Properties Length for now fixed to 1 property */ + + msg_len; if (topic_id) retval += 3; @@ -1091,7 +1020,6 @@ static inline size_t mqtt_ng_publish_size(const char *topic, } int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf, - mqtt_wss_log_ctx_t log_ctx, char *topic, free_fnc_t topic_free, void *msg, @@ -1130,7 +1058,7 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf, // [MQTT-3.3.2.1] PACK_2B_INT(&trx_buf->hdr_buffer, topic == NULL ? 0 : strlen(topic), frag); if (topic != NULL) { - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, topic, strlen(topic), topic_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, topic, strlen(topic), topic_free, &frag)) goto fail_rollback; BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); } @@ -1154,7 +1082,7 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf, if( (frag = buffer_new_frag(&trx_buf->hdr_buffer, BUFFER_FRAG_DATA_EXTERNAL)) == NULL ) goto fail_rollback; - if (frag_set_external_data(log_ctx, frag, msg, msg_len, msg_free)) + if (frag_set_external_data(frag, msg, msg_len, msg_free)) goto fail_rollback; trx_buf->hdr_buffer.tail_frag->flags |= BUFFER_FRAG_MQTT_PACKET_TAIL; @@ -1178,9 +1106,9 @@ int mqtt_ng_publish(struct mqtt_ng_client *client, uint16_t *packet_id) { struct topic_alias_data *alias = NULL; - pthread_rwlock_rdlock(&client->tx_topic_aliases.rwlock); + spinlock_lock(&client->tx_topic_aliases.spinlock); c_rhash_get_ptr_by_str(client->tx_topic_aliases.stoi_dict, topic, (void**)&alias); - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); + spinlock_unlock(&client->tx_topic_aliases.spinlock); uint16_t topic_id = 0; @@ -1194,14 +1122,14 @@ int mqtt_ng_publish(struct mqtt_ng_client *client, } if (client->max_msg_size && PUBLISH_SP_SIZE + mqtt_ng_publish_size(topic, msg_len, topic_id) > client->max_msg_size) { - mws_error(client->log, "Message too big for server: %zu", msg_len); + nd_log(NDLS_DAEMON, NDLP_ERR, "Message too big for server: %zu", msg_len); return MQTT_NG_MSGGEN_MSG_TOO_BIG; } - TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, client, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id, topic_id); + TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id, topic_id); } -static inline size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_count) +static size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_count) { size_t len = 2 /* Packet Identifier */ + 1 /* Properties Length TODO for now fixed 0 */; len += sub_count * (2 /* topic filter string length */ + 1 /* [MQTT-3.8.3.1] Subscription Options Byte */); @@ -1212,7 +1140,7 @@ static inline size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_co return len; } -int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, struct mqtt_sub *subs, size_t sub_count) +int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, struct mqtt_sub *subs, size_t sub_count) { // >> START THE RODEO << transaction_buffer_transaction_start(trx_buf); @@ -1247,7 +1175,7 @@ int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, mqtt_wss_log_ for (size_t i = 0; i < sub_count; i++) { BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, strlen(subs[i].topic), frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, subs[i].topic, strlen(subs[i].topic), subs[i].topic_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, subs[i].topic, strlen(subs[i].topic), subs[i].topic_free, &frag)) goto fail_rollback; BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); *WRITE_POS(frag) = subs[i].options; @@ -1264,12 +1192,11 @@ fail_rollback: int mqtt_ng_subscribe(struct mqtt_ng_client *client, struct mqtt_sub *subs, size_t sub_count) { - TRY_GENERATE_MESSAGE(mqtt_ng_generate_subscribe, client, subs, sub_count); + TRY_GENERATE_MESSAGE(mqtt_ng_generate_subscribe, subs, sub_count); } -int mqtt_ng_generate_disconnect(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, uint8_t reason_code) +int mqtt_ng_generate_disconnect(struct transaction_buffer *trx_buf, uint8_t reason_code) { - (void) log_ctx; // >> START THE RODEO << transaction_buffer_transaction_start(trx_buf); @@ -1308,12 +1235,11 @@ fail_rollback: int mqtt_ng_disconnect(struct mqtt_ng_client *client, uint8_t reason_code) { - TRY_GENERATE_MESSAGE(mqtt_ng_generate_disconnect, client, reason_code); + TRY_GENERATE_MESSAGE(mqtt_ng_generate_disconnect, reason_code); } -static int mqtt_generate_puback(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, uint16_t packet_id, uint8_t reason_code) +static int mqtt_generate_puback(struct transaction_buffer *trx_buf, uint16_t packet_id, uint8_t reason_code) { - (void) log_ctx; // >> START THE RODEO << transaction_buffer_transaction_start(trx_buf); @@ -1353,7 +1279,7 @@ fail_rollback: static int mqtt_ng_puback(struct mqtt_ng_client *client, uint16_t packet_id, uint8_t reason_code) { - TRY_GENERATE_MESSAGE(mqtt_generate_puback, client, packet_id, reason_code); + TRY_GENERATE_MESSAGE(mqtt_generate_puback, packet_id, reason_code); } int mqtt_ng_ping(struct mqtt_ng_client *client) @@ -1370,7 +1296,6 @@ int mqtt_ng_ping(struct mqtt_ng_client *client) #define MQTT_NG_CLIENT_PROTOCOL_ERROR -1 #define MQTT_NG_CLIENT_SERVER_RETURNED_ERROR -2 #define MQTT_NG_CLIENT_NOT_IMPL_YET -3 -#define MQTT_NG_CLIENT_OOM -4 #define MQTT_NG_CLIENT_INTERNAL_ERROR -5 #define BUF_READ_CHECK_AT_LEAST(buf, x) \ @@ -1379,10 +1304,10 @@ int mqtt_ng_ping(struct mqtt_ng_client *client) #define vbi_parser_reset_ctx(ctx) memset(ctx, 0, sizeof(struct mqtt_vbi_parser_ctx)) -static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data, mqtt_wss_log_ctx_t log) +static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data) { if (ctx->bytes > MQTT_VBI_MAXBYTES - 1) { - mws_error(log, "MQTT Variable Byte Integer can't be longer than %d bytes", MQTT_VBI_MAXBYTES); + nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT Variable Byte Integer can't be longer than %d bytes", MQTT_VBI_MAXBYTES); return MQTT_NG_CLIENT_PROTOCOL_ERROR; } if (!ctx->bytes || ctx->data[ctx->bytes-1] & MQTT_VBI_CONTINUATION_FLAG) { @@ -1394,7 +1319,7 @@ static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data, mqtt_w } if (mqtt_vbi_to_uint32(ctx->data, &ctx->result)) { - mws_error(log, "MQTT Variable Byte Integer failed to be parsed."); + nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT Variable Byte Integer failed to be parsed."); return MQTT_NG_CLIENT_PROTOCOL_ERROR; } @@ -1480,12 +1405,12 @@ struct mqtt_property *get_property_by_id(struct mqtt_property *props, uint8_t pr } // Parses [MQTT-2.2.2] -static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t data, mqtt_wss_log_ctx_t log) +static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t data) { int rc; switch (ctx->state) { case PROPERTIES_LENGTH: - rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data, log); + rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data); if (rc == MQTT_NG_CLIENT_PARSE_DONE) { ctx->properties_length = ctx->vbi_parser_ctx.result; ctx->bytes_consumed += ctx->vbi_parser_ctx.bytes; @@ -1534,7 +1459,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t ctx->state = PROPERTY_TYPE_STR_BIN_LEN; break; default: - mws_error(log, "Unsupported property type %d for property id %d.", (int)ctx->tail->type, (int)ctx->tail->id); + nd_log(NDLS_DAEMON, NDLP_ERR, "Unsupported property type %d for property id %d.", (int)ctx->tail->type, (int)ctx->tail->id); return MQTT_NG_CLIENT_PROTOCOL_ERROR; } break; @@ -1552,7 +1477,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t ctx->state = PROPERTY_TYPE_STR; break; default: - mws_error(log, "Unexpected datatype in PROPERTY_TYPE_STR_BIN_LEN %d", (int)ctx->tail->type); + nd_log(NDLS_DAEMON, NDLP_ERR, "Unexpected datatype in PROPERTY_TYPE_STR_BIN_LEN %d", (int)ctx->tail->type); return MQTT_NG_CLIENT_INTERNAL_ERROR; } break; @@ -1577,7 +1502,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t ctx->state = PROPERTY_NEXT; break; case PROPERTY_TYPE_VBI: - rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data, log); + rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data); if (rc == MQTT_NG_CLIENT_PARSE_DONE) { ctx->tail->data.uint32 = ctx->vbi_parser_ctx.result; ctx->bytes_consumed += ctx->vbi_parser_ctx.bytes; @@ -1627,9 +1552,9 @@ static int parse_connack_varhdr(struct mqtt_ng_client *client) mqtt_properties_parser_ctx_reset(&parser->properties_parser); break; case MQTT_PARSE_VARHDR_PROPS: - return parse_properties_array(&parser->properties_parser, parser->received_data, client->log); + return parse_properties_array(&parser->properties_parser, parser->received_data); default: - ERROR("invalid state for connack varhdr parser"); + nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for connack varhdr parser"); return MQTT_NG_CLIENT_INTERNAL_ERROR; } return MQTT_NG_CLIENT_OK_CALL_AGAIN; @@ -1653,9 +1578,9 @@ static int parse_disconnect_varhdr(struct mqtt_ng_client *client) mqtt_properties_parser_ctx_reset(&parser->properties_parser); break; case MQTT_PARSE_VARHDR_PROPS: - return parse_properties_array(&parser->properties_parser, parser->received_data, client->log); + return parse_properties_array(&parser->properties_parser, parser->received_data); default: - ERROR("invalid state for connack varhdr parser"); + nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for connack varhdr parser"); return MQTT_NG_CLIENT_INTERNAL_ERROR; } return MQTT_NG_CLIENT_OK_CALL_AGAIN; @@ -1691,9 +1616,9 @@ static int parse_puback_varhdr(struct mqtt_ng_client *client) mqtt_properties_parser_ctx_reset(&parser->properties_parser); /* FALLTHROUGH */ case MQTT_PARSE_VARHDR_PROPS: - return parse_properties_array(&parser->properties_parser, parser->received_data, client->log); + return parse_properties_array(&parser->properties_parser, parser->received_data); default: - ERROR("invalid state for puback varhdr parser"); + nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for puback varhdr parser"); return MQTT_NG_CLIENT_INTERNAL_ERROR; } return MQTT_NG_CLIENT_OK_CALL_AGAIN; @@ -1716,7 +1641,7 @@ static int parse_suback_varhdr(struct mqtt_ng_client *client) mqtt_properties_parser_ctx_reset(&parser->properties_parser); /* FALLTHROUGH */ case MQTT_PARSE_VARHDR_PROPS: - rc = parse_properties_array(&parser->properties_parser, parser->received_data, client->log); + rc = parse_properties_array(&parser->properties_parser, parser->received_data); if (rc != MQTT_NG_CLIENT_PARSE_DONE) return rc; parser->mqtt_parsed_len += parser->properties_parser.bytes_consumed; @@ -1737,7 +1662,7 @@ static int parse_suback_varhdr(struct mqtt_ng_client *client) return MQTT_NG_CLIENT_NEED_MORE_BYTES; default: - ERROR("invalid state for suback varhdr parser"); + nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for suback varhdr parser"); return MQTT_NG_CLIENT_INTERNAL_ERROR; } return MQTT_NG_CLIENT_OK_CALL_AGAIN; @@ -1761,8 +1686,6 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client) break; } publish->topic = callocz(1, publish->topic_len + 1 /* add 0x00 */); - if (publish->topic == NULL) - return MQTT_NG_CLIENT_OOM; parser->varhdr_state = MQTT_PARSE_VARHDR_TOPICNAME; /* FALLTHROUGH */ case MQTT_PARSE_VARHDR_TOPICNAME: @@ -1788,7 +1711,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client) parser->mqtt_parsed_len += 2; /* FALLTHROUGH */ case MQTT_PARSE_VARHDR_PROPS: - rc = parse_properties_array(&parser->properties_parser, parser->received_data, client->log); + rc = parse_properties_array(&parser->properties_parser, parser->received_data); if (rc != MQTT_NG_CLIENT_PARSE_DONE) return rc; parser->mqtt_parsed_len += parser->properties_parser.bytes_consumed; @@ -1798,7 +1721,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client) if (parser->mqtt_fixed_hdr_remaining_length < parser->mqtt_parsed_len) { freez(publish->topic); publish->topic = NULL; - ERROR("Error parsing PUBLISH message"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Error parsing PUBLISH message"); return MQTT_NG_CLIENT_PROTOCOL_ERROR; } publish->data_len = parser->mqtt_fixed_hdr_remaining_length - parser->mqtt_parsed_len; @@ -1809,18 +1732,12 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client) BUF_READ_CHECK_AT_LEAST(parser->received_data, publish->data_len); publish->data = mallocz(publish->data_len); - if (publish->data == NULL) { - freez(publish->topic); - publish->topic = NULL; - return MQTT_NG_CLIENT_OOM; - } - rbuf_pop(parser->received_data, publish->data, publish->data_len); parser->mqtt_parsed_len += publish->data_len; return MQTT_NG_CLIENT_PARSE_DONE; default: - ERROR("invalid state for publish varhdr parser"); + nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for publish varhdr parser"); return MQTT_NG_CLIENT_INTERNAL_ERROR; } return MQTT_NG_CLIENT_OK_CALL_AGAIN; @@ -1840,7 +1757,7 @@ static int parse_data(struct mqtt_ng_client *client) parser->state = MQTT_PARSE_FIXED_HEADER_LEN; break; case MQTT_PARSE_FIXED_HEADER_LEN: - rc = vbi_parser_parse(&parser->vbi_parser, parser->received_data, client->log); + rc = vbi_parser_parse(&parser->vbi_parser, parser->received_data); if (rc == MQTT_NG_CLIENT_PARSE_DONE) { parser->mqtt_fixed_hdr_remaining_length = parser->vbi_parser.result; parser->state = MQTT_PARSE_VARIABLE_HEADER; @@ -1883,10 +1800,11 @@ static int parse_data(struct mqtt_ng_client *client) return rc; case MQTT_CPT_PINGRESP: if (parser->mqtt_fixed_hdr_remaining_length) { - ERROR ("PINGRESP has to be 0 Remaining Length."); // [MQTT-3.13.1] + nd_log(NDLS_DAEMON, NDLP_ERR, "PINGRESP has to be 0 Remaining Length."); // [MQTT-3.13.1] return MQTT_NG_CLIENT_PROTOCOL_ERROR; } parser->state = MQTT_PARSE_MQTT_PACKET_DONE; + ping_timeout = 0; break; case MQTT_CPT_DISCONNECT: rc = parse_disconnect_varhdr(client); @@ -1896,7 +1814,7 @@ static int parse_data(struct mqtt_ng_client *client) } return rc; default: - ERROR("Parsing Control Packet Type %" PRIu8 " not implemented yet.", get_control_packet_type(parser->mqtt_control_packet_type)); + nd_log(NDLS_DAEMON, NDLP_ERR, "Parsing Control Packet Type %" PRIu8 " not implemented yet.", get_control_packet_type(parser->mqtt_control_packet_type)); rbuf_bump_tail(parser->received_data, parser->mqtt_fixed_hdr_remaining_length); parser->state = MQTT_PARSE_MQTT_PACKET_DONE; return MQTT_NG_CLIENT_NOT_IMPL_YET; @@ -1916,12 +1834,12 @@ static int parse_data(struct mqtt_ng_client *client) // return -1 on error // return 0 if there is fragment set static int mqtt_ng_next_to_send(struct mqtt_ng_client *client) { - if (client->client_state == CONNECT_PENDING) { + if (client->client_state == MQTT_STATE_CONNECT_PENDING) { client->main_buffer.sending_frag = client->connect_msg; - client->client_state = CONNECTING; + client->client_state = MQTT_STATE_CONNECTING; return 0; } - if (client->client_state != CONNECTED) + if (client->client_state != MQTT_STATE_CONNECTED) return -1; struct buffer_fragment *frag = BUFFER_FIRST_FRAG(&client->main_buffer.hdr_buffer); @@ -1959,7 +1877,7 @@ static int send_fragment(struct mqtt_ng_client *client) { if (bytes) processed = client->send_fnc_ptr(client->user_ctx, ptr, bytes); else - WARN("This fragment was fully sent already. This should not happen!"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "This fragment was fully sent already. This should not happen!"); frag->sent += processed; if (frag->sent != frag->len) @@ -1967,11 +1885,11 @@ static int send_fragment(struct mqtt_ng_client *client) { if (frag->flags & BUFFER_FRAG_MQTT_PACKET_TAIL) { client->time_of_last_send = time(NULL); - pthread_mutex_lock(&client->stats_mutex); + spinlock_lock(&client->stats_spinlock); if (client->main_buffer.sending_frag != &ping_frag) client->stats.tx_messages_queued--; client->stats.tx_messages_sent++; - pthread_mutex_unlock(&client->stats_mutex); + spinlock_unlock(&client->stats_spinlock); client->main_buffer.sending_frag = NULL; return 1; } @@ -1995,7 +1913,7 @@ static void try_send_all(struct mqtt_ng_client *client) { } while(send_all_message_fragments(client) >= 0); } -static inline void mark_message_for_gc(struct buffer_fragment *frag) +static void mark_message_for_gc(struct buffer_fragment *frag) { while (frag) { frag->flags |= BUFFER_FRAG_GARBAGE_COLLECT; @@ -2013,7 +1931,7 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id) while (frag) { if ( (frag->flags & BUFFER_FRAG_MQTT_PACKET_HEAD) && frag->packet_id == packet_id) { if (!frag->sent) { - ERROR("Received packet_id (%" PRIu16 ") belongs to MQTT packet which was not yet sent!", packet_id); + nd_log(NDLS_DAEMON, NDLP_ERR, "Received packet_id (%" PRIu16 ") belongs to MQTT packet which was not yet sent!", packet_id); UNLOCK_HDR_BUFFER(&client->main_buffer); return 1; } @@ -2023,7 +1941,7 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id) } frag = frag->next; } - ERROR("Received packet_id (%" PRIu16 ") is unknown!", packet_id); + nd_log(NDLS_DAEMON, NDLP_ERR, "Received packet_id (%" PRIu16 ") is unknown!", packet_id); UNLOCK_HDR_BUFFER(&client->main_buffer); return 1; } @@ -2031,110 +1949,113 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id) int handle_incoming_traffic(struct mqtt_ng_client *client) { int rc; + while ((rc = parse_data(client)) == MQTT_NG_CLIENT_OK_CALL_AGAIN) { + ; + } + if (rc != MQTT_NG_CLIENT_MQTT_PACKET_DONE) + return rc; + struct mqtt_publish *pub; - while( (rc = parse_data(client)) == MQTT_NG_CLIENT_OK_CALL_AGAIN ); - if ( rc == MQTT_NG_CLIENT_MQTT_PACKET_DONE ) { - struct mqtt_property *prop; -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("MQTT Packet Parsed Successfully!"); -#endif - pthread_mutex_lock(&client->stats_mutex); - client->stats.rx_messages_rcvd++; - pthread_mutex_unlock(&client->stats_mutex); - - switch (get_control_packet_type(client->parser.mqtt_control_packet_type)) { - case MQTT_CPT_CONNACK: -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("Received CONNACK"); -#endif - LOCK_HDR_BUFFER(&client->main_buffer); - mark_message_for_gc(client->connect_msg); - UNLOCK_HDR_BUFFER(&client->main_buffer); - client->connect_msg = NULL; - if (client->client_state != CONNECTING) { - ERROR("Received unexpected CONNACK"); - client->client_state = ERROR; - return MQTT_NG_CLIENT_PROTOCOL_ERROR; - } - if ((prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_MAX_PKT_SIZE)) != NULL) { - INFO("MQTT server limits message size to %" PRIu32, prop->data.uint32); - client->max_msg_size = prop->data.uint32; - } - if (client->connack_callback) - client->connack_callback(client->user_ctx, client->parser.mqtt_packet.connack.reason_code); - if (!client->parser.mqtt_packet.connack.reason_code) { - INFO("MQTT Connection Accepted By Server"); - client->client_state = CONNECTED; - break; - } - client->client_state = ERROR; - return MQTT_NG_CLIENT_SERVER_RETURNED_ERROR; - case MQTT_CPT_PUBACK: -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("Received PUBACK %" PRIu16, client->parser.mqtt_packet.puback.packet_id); -#endif - if (mark_packet_acked(client, client->parser.mqtt_packet.puback.packet_id)) - return MQTT_NG_CLIENT_PROTOCOL_ERROR; - if (client->puback_callback) - client->puback_callback(client->parser.mqtt_packet.puback.packet_id); - break; - case MQTT_CPT_PINGRESP: -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("Received PINGRESP"); -#endif - break; - case MQTT_CPT_SUBACK: -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("Received SUBACK %" PRIu16, client->parser.mqtt_packet.suback.packet_id); -#endif - if (mark_packet_acked(client, client->parser.mqtt_packet.suback.packet_id)) - return MQTT_NG_CLIENT_PROTOCOL_ERROR; + struct mqtt_property *prop; + spinlock_lock(&client->stats_spinlock); + client->stats.rx_messages_rcvd++; + spinlock_unlock(&client->stats_spinlock); + + uint8_t ctrl_packet_type = get_control_packet_type(client->parser.mqtt_control_packet_type); + switch (ctrl_packet_type) { + case MQTT_CPT_CONNACK: + LOCK_HDR_BUFFER(&client->main_buffer); + mark_message_for_gc(client->connect_msg); + UNLOCK_HDR_BUFFER(&client->main_buffer); + + client->connect_msg = NULL; + + if (client->client_state != MQTT_STATE_CONNECTING) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Received unexpected CONNACK"); + client->client_state = MQTT_STATE_ERROR; + return MQTT_NG_CLIENT_PROTOCOL_ERROR; + } + + if ((prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_MAX_PKT_SIZE)) != NULL) { + nd_log(NDLS_DAEMON, NDLP_INFO, "MQTT server limits message size to %" PRIu32, prop->data.uint32); + client->max_msg_size = prop->data.uint32; + } + + if (client->connack_callback) + client->connack_callback(client->user_ctx, client->parser.mqtt_packet.connack.reason_code); + if (!client->parser.mqtt_packet.connack.reason_code) { + nd_log(NDLS_DAEMON, NDLP_INFO, "MQTT Connection Accepted By Server"); + client->client_state = MQTT_STATE_CONNECTED; break; - case MQTT_CPT_PUBLISH: -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("Recevied PUBLISH"); -#endif - pub = &client->parser.mqtt_packet.publish; - if (pub->qos > 1) { - freez(pub->topic); - freez(pub->data); - return MQTT_NG_CLIENT_NOT_IMPL_YET; - } - if ( pub->qos == 1 && (rc = mqtt_ng_puback(client, pub->packet_id, 0)) ) { - client->client_state = ERROR; - ERROR("Error generating PUBACK reply for PUBLISH"); - return rc; - } - if ( (prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_TOPIC_ALIAS)) != NULL ) { - // Topic Alias property was sent from server - void *topic_ptr; - if (!c_rhash_get_ptr_by_uint64(client->rx_aliases, prop->data.uint8, &topic_ptr)) { - if (pub->topic != NULL) { - ERROR("We do not yet support topic alias reassignment"); - return MQTT_NG_CLIENT_NOT_IMPL_YET; - } - pub->topic = topic_ptr; - } else { - if (pub->topic == NULL) { - ERROR("Topic alias with id %d unknown and topic not set by server!", prop->data.uint8); - return MQTT_NG_CLIENT_PROTOCOL_ERROR; - } - c_rhash_insert_uint64_ptr(client->rx_aliases, prop->data.uint8, pub->topic); + } + client->client_state = MQTT_STATE_ERROR; + return MQTT_NG_CLIENT_SERVER_RETURNED_ERROR; + + case MQTT_CPT_PUBACK: + if (mark_packet_acked(client, client->parser.mqtt_packet.puback.packet_id)) + return MQTT_NG_CLIENT_PROTOCOL_ERROR; + if (client->puback_callback) + client->puback_callback(client->parser.mqtt_packet.puback.packet_id); + break; + + case MQTT_CPT_PINGRESP: + break; + + case MQTT_CPT_SUBACK: + if (mark_packet_acked(client, client->parser.mqtt_packet.suback.packet_id)) + return MQTT_NG_CLIENT_PROTOCOL_ERROR; + break; + + case MQTT_CPT_PUBLISH: + pub = &client->parser.mqtt_packet.publish; + + if (pub->qos > 1) { + freez(pub->topic); + freez(pub->data); + return MQTT_NG_CLIENT_NOT_IMPL_YET; + } + + if ( pub->qos == 1 && ((rc = mqtt_ng_puback(client, pub->packet_id, 0))) ) { + client->client_state = MQTT_STATE_ERROR; + nd_log(NDLS_DAEMON, NDLP_ERR, "Error generating PUBACK reply for PUBLISH"); + return rc; + } + + if ( (prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_TOPIC_ALIAS)) != NULL ) { + // Topic Alias property was sent from server + void *topic_ptr; + if (!c_rhash_get_ptr_by_uint64(client->rx_aliases, prop->data.uint8, &topic_ptr)) { + if (pub->topic != NULL) { + nd_log(NDLS_DAEMON, NDLP_ERR, "We do not yet support topic alias reassignment"); + return MQTT_NG_CLIENT_NOT_IMPL_YET; } + pub->topic = topic_ptr; + } else { + if (pub->topic == NULL) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Topic alias with id %d unknown and topic not set by server!", prop->data.uint8); + return MQTT_NG_CLIENT_PROTOCOL_ERROR; + } + c_rhash_insert_uint64_ptr(client->rx_aliases, prop->data.uint8, pub->topic); } - if (client->msg_callback) - client->msg_callback(pub->topic, pub->data, pub->data_len, pub->qos); - // in case we have property topic alias and we have topic we take over the string - // and add pointer to it into topic alias list - if (prop == NULL) - freez(pub->topic); - freez(pub->data); - return MQTT_NG_CLIENT_WANT_WRITE; - case MQTT_CPT_DISCONNECT: - INFO ("Got MQTT DISCONNECT control packet from server. Reason code: %d", (int)client->parser.mqtt_packet.disconnect.reason_code); - client->client_state = DISCONNECTED; - break; - } + } + + if (client->msg_callback) + client->msg_callback(pub->topic, pub->data, pub->data_len, pub->qos); + // in case we have property topic alias and we have topic we take over the string + // and add pointer to it into topic alias list + if (prop == NULL) + freez(pub->topic); + freez(pub->data); + return MQTT_NG_CLIENT_WANT_WRITE; + + case MQTT_CPT_DISCONNECT: + nd_log(NDLS_DAEMON, NDLP_INFO, "Got MQTT DISCONNECT control packet from server. Reason code: %d", (int)client->parser.mqtt_packet.disconnect.reason_code); + client->client_state = MQTT_STATE_DISCONNECTED; + break; + + default: + nd_log(NDLS_DAEMON, NDLP_INFO, "Got unknown control packet %u from server", ctrl_packet_type); + break; } return rc; @@ -2142,10 +2063,10 @@ int handle_incoming_traffic(struct mqtt_ng_client *client) int mqtt_ng_sync(struct mqtt_ng_client *client) { - if (client->client_state == RAW || client->client_state == DISCONNECTED) + if (client->client_state == MQTT_STATE_RAW || client->client_state == MQTT_STATE_DISCONNECTED) return 0; - if (client->client_state == ERROR) + if (client->client_state == MQTT_STATE_ERROR) return 1; LOCK_HDR_BUFFER(&client->main_buffer); @@ -2182,9 +2103,9 @@ void mqtt_ng_set_max_mem(struct mqtt_ng_client *client, size_t bytes) void mqtt_ng_get_stats(struct mqtt_ng_client *client, struct mqtt_ng_stats *stats) { - pthread_mutex_lock(&client->stats_mutex); + spinlock_lock(&client->stats_spinlock); memcpy(stats, &client->stats, sizeof(struct mqtt_ng_stats)); - pthread_mutex_unlock(&client->stats_mutex); + spinlock_unlock(&client->stats_spinlock); stats->tx_bytes_queued = 0; stats->tx_buffer_reclaimable = 0; @@ -2207,11 +2128,11 @@ void mqtt_ng_get_stats(struct mqtt_ng_client *client, struct mqtt_ng_stats *stat int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic) { uint16_t idx; - pthread_rwlock_wrlock(&client->tx_topic_aliases.rwlock); + spinlock_lock(&client->tx_topic_aliases.spinlock); if (client->tx_topic_aliases.idx_assigned >= client->tx_topic_aliases.idx_max) { - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); - mws_error(client->log, "Tx topic alias indexes were exhausted (current version of the library doesn't support reassigning yet. Feel free to contribute."); + spinlock_unlock(&client->tx_topic_aliases.spinlock); + nd_log(NDLS_DAEMON, NDLP_ERR, "Tx topic alias indexes were exhausted (current version of the library doesn't support reassigning yet. Feel free to contribute."); return 0; //0 is not a valid topic alias } @@ -2220,8 +2141,8 @@ int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic) // this is not a problem for library but might be helpful to warn user // as it might indicate bug in their program (but also might be expected) idx = alias->idx; - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); - mws_debug(client->log, "%s topic \"%s\" already has alias set. Ignoring.", __FUNCTION__, topic); + spinlock_unlock(&client->tx_topic_aliases.spinlock); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "%s topic \"%s\" already has alias set. Ignoring.", __FUNCTION__, topic); return idx; } @@ -2232,6 +2153,6 @@ int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic) c_rhash_insert_str_ptr(client->tx_topic_aliases.stoi_dict, topic, (void*)alias); - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); + spinlock_unlock(&client->tx_topic_aliases.spinlock); return idx; } diff --git a/src/aclk/mqtt_websockets/mqtt_ng.h b/src/aclk/mqtt_websockets/mqtt_ng.h index 4b0584d58..c5f6d94cc 100644 --- a/src/aclk/mqtt_websockets/mqtt_ng.h +++ b/src/aclk/mqtt_websockets/mqtt_ng.h @@ -1,10 +1,5 @@ -// Copyright: SPDX-License-Identifier: GPL-3.0-only +// SPDX-License-Identifier: GPL-3.0-or-later -#include <stdint.h> -#include <sys/types.h> -#include <time.h> - -#include "c-rbuf/cringbuffer.h" #include "common_public.h" #define MQTT_NG_MSGGEN_OK 0 @@ -15,7 +10,7 @@ #define MQTT_NG_MSGGEN_MSG_TOO_BIG 3 struct mqtt_ng_client; - +extern time_t ping_timeout; /* Converts integer to MQTT Variable Byte Integer as per 1.5.5 of MQTT 5 specs * @param input value to be converted * @param output pointer to memory where output will be written to. Must allow up to 4 bytes to be written. @@ -72,7 +67,6 @@ int mqtt_ng_ping(struct mqtt_ng_client *client); typedef ssize_t (*mqtt_ng_send_fnc_t)(void *user_ctx, const void* buf, size_t len); struct mqtt_ng_init { - mqtt_wss_log_ctx_t log; rbuf_t data_in; mqtt_ng_send_fnc_t data_out_fnc; void *user_ctx; diff --git a/src/aclk/mqtt_websockets/mqtt_wss_client.c b/src/aclk/mqtt_websockets/mqtt_wss_client.c index bb0e17262..5c576ced5 100644 --- a/src/aclk/mqtt_websockets/mqtt_wss_client.c +++ b/src/aclk/mqtt_websockets/mqtt_wss_client.c @@ -1,32 +1,24 @@ -// SPDX-License-Identifier: GPL-3.0-only +// SPDX-License-Identifier: GPL-3.0-or-later #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif +#include "libnetdata/libnetdata.h" #include "mqtt_wss_client.h" #include "mqtt_ng.h" #include "ws_client.h" #include "common_internal.h" - -#include <stdlib.h> -#include <fcntl.h> -#include <unistd.h> -#include <poll.h> -#include <string.h> -#include <time.h> - -#include <sys/socket.h> -#include <netinet/in.h> - -#include <openssl/err.h> -#include <openssl/ssl.h> +#include "../aclk.h" #define PIPE_READ_END 0 #define PIPE_WRITE_END 1 #define POLLFD_SOCKET 0 #define POLLFD_PIPE 1 +#define PING_TIMEOUT (60) //Expect a ping response within this time (seconds) +time_t ping_timeout = 0; + #if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) && (SSLEAY_VERSION_NUMBER >= OPENSSL_VERSION_097) #include <openssl/conf.h> #endif @@ -69,6 +61,8 @@ char *util_openssl_ret_err(int err) return "SSL_ERROR_SYSCALL"; case SSL_ERROR_SSL: return "SSL_ERROR_SSL"; + default: + break; } return "UNKNOWN"; } @@ -76,8 +70,6 @@ char *util_openssl_ret_err(int err) struct mqtt_wss_client_struct { ws_client *ws_client; - mqtt_wss_log_ctx_t log; - // immediate connection (e.g. proxy server) char *host; int port; @@ -129,69 +121,49 @@ static void mws_connack_callback_ng(void *user_ctx, int code) switch(code) { case 0: client->mqtt_connected = 1; - return; + break; //TODO manual labor: all the CONNACK error codes with some nice error message default: - mws_error(client->log, "MQTT CONNACK returned error %d", code); - return; + nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT CONNACK returned error %d", code); + break; } } static ssize_t mqtt_send_cb(void *user_ctx, const void* buf, size_t len) { mqtt_wss_client client = user_ctx; -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "mqtt_pal_sendall(len=%d)", len); -#endif int ret = ws_client_send(client->ws_client, WS_OP_BINARY_FRAME, buf, len); - if (ret >= 0 && (size_t)ret != len) { -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "Not complete message sent (Msg=%d,Sent=%d). Need to arm POLLOUT!", len, ret); -#endif + if (ret >= 0 && (size_t)ret != len) client->mqtt_didnt_finish_write = 1; - } return ret; } -mqtt_wss_client mqtt_wss_new(const char *log_prefix, - mqtt_wss_log_callback_t log_callback, - msg_callback_fnc_t msg_callback, - void (*puback_callback)(uint16_t packet_id)) +mqtt_wss_client mqtt_wss_new( + msg_callback_fnc_t msg_callback, + void (*puback_callback)(uint16_t packet_id)) { - mqtt_wss_log_ctx_t log; - - log = mqtt_wss_log_ctx_create(log_prefix, log_callback); - if(!log) - return NULL; - SSL_library_init(); SSL_load_error_strings(); mqtt_wss_client client = callocz(1, sizeof(struct mqtt_wss_client_struct)); - if (!client) { - mws_error(log, "OOM alocating mqtt_wss_client"); - goto fail; - } spinlock_init(&client->stat_lock); client->msg_callback = msg_callback; client->puback_callback = puback_callback; - client->ws_client = ws_client_new(0, &client->target_host, log); + client->ws_client = ws_client_new(0, &client->target_host); if (!client->ws_client) { - mws_error(log, "Error creating ws_client"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Error creating ws_client"); goto fail_1; } - client->log = log; - #ifdef __APPLE__ if (pipe(client->write_notif_pipe)) { #else if (pipe2(client->write_notif_pipe, O_CLOEXEC /*| O_DIRECT*/)) { #endif - mws_error(log, "Couldn't create pipe"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Couldn't create pipe"); goto fail_2; } @@ -201,7 +173,6 @@ mqtt_wss_client mqtt_wss_new(const char *log_prefix, client->poll_fds[POLLFD_SOCKET].events = POLLIN; struct mqtt_ng_init settings = { - .log = log, .data_in = client->ws_client->buf_to_mqtt, .data_out_fnc = &mqtt_send_cb, .user_ctx = client, @@ -209,22 +180,14 @@ mqtt_wss_client mqtt_wss_new(const char *log_prefix, .puback_callback = puback_callback, .msg_callback = msg_callback }; - if ( (client->mqtt = mqtt_ng_init(&settings)) == NULL ) { - mws_error(log, "Error initializing internal MQTT client"); - goto fail_3; - } + client->mqtt = mqtt_ng_init(&settings); return client; -fail_3: - close(client->write_notif_pipe[PIPE_WRITE_END]); - close(client->write_notif_pipe[PIPE_READ_END]); fail_2: ws_client_destroy(client->ws_client); fail_1: freez(client); -fail: - mqtt_wss_log_ctx_destroy(log); return NULL; } @@ -265,30 +228,25 @@ void mqtt_wss_destroy(mqtt_wss_client client) if (client->sockfd > 0) close(client->sockfd); - mqtt_wss_log_ctx_destroy(client->log); freez(client); } static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx) { - SSL *ssl; - X509 *err_cert; - mqtt_wss_client client; - int err = 0, depth; - char *err_str; + int err = 0; - ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx()); - client = SSL_get_ex_data(ssl, 0); + SSL* ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx()); + mqtt_wss_client client = SSL_get_ex_data(ssl, 0); // TODO handle depth as per https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set_verify.html if (!preverify_ok) { err = X509_STORE_CTX_get_error(ctx); - depth = X509_STORE_CTX_get_error_depth(ctx); - err_cert = X509_STORE_CTX_get_current_cert(ctx); - err_str = X509_NAME_oneline(X509_get_subject_name(err_cert), NULL, 0); + int depth = X509_STORE_CTX_get_error_depth(ctx); + X509* err_cert = X509_STORE_CTX_get_current_cert(ctx); + char* err_str = X509_NAME_oneline(X509_get_subject_name(err_cert), NULL, 0); - mws_error(client->log, "verify error:num=%d:%s:depth=%d:%s", err, + nd_log(NDLS_DAEMON, NDLP_ERR, "verify error:num=%d:%s:depth=%d:%s", err, X509_verify_cert_error_string(err), depth, err_str); freez(err_str); @@ -298,7 +256,7 @@ static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx) client->ssl_flags & MQTT_WSS_SSL_ALLOW_SELF_SIGNED) { preverify_ok = 1; - mws_error(client->log, "Self Signed Certificate Accepted as the connection was " + nd_log(NDLS_DAEMON, NDLP_ERR, "Self Signed Certificate Accepted as the connection was " "requested with MQTT_WSS_SSL_ALLOW_SELF_SIGNED"); } @@ -312,16 +270,14 @@ static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx) #define HTTP_HDR_TERMINATOR "\x0D\x0A\x0D\x0A" #define HTTP_CODE_LEN 4 #define HTTP_REASON_MAX_LEN 512 -static int http_parse_reply(mqtt_wss_client client, rbuf_t buf) +static int http_parse_reply(rbuf_t buf) { - char *ptr; char http_code_s[4]; - int http_code; int idx; if (rbuf_memcmp_n(buf, PROXY_HTTP, strlen(PROXY_HTTP))) { if (rbuf_memcmp_n(buf, PROXY_HTTP10, strlen(PROXY_HTTP10))) { - mws_error(client->log, "http_proxy expected reply with \"" PROXY_HTTP "\" or \"" PROXY_HTTP10 "\""); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy expected reply with \"" PROXY_HTTP "\" or \"" PROXY_HTTP10 "\""); return 1; } } @@ -329,39 +285,37 @@ static int http_parse_reply(mqtt_wss_client client, rbuf_t buf) rbuf_bump_tail(buf, strlen(PROXY_HTTP)); if (!rbuf_pop(buf, http_code_s, 1) || http_code_s[0] != 0x20) { - mws_error(client->log, "http_proxy missing space after \"" PROXY_HTTP "\" or \"" PROXY_HTTP10 "\""); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy missing space after \"" PROXY_HTTP "\" or \"" PROXY_HTTP10 "\""); return 2; } if (!rbuf_pop(buf, http_code_s, HTTP_CODE_LEN)) { - mws_error(client->log, "http_proxy missing HTTP code"); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy missing HTTP code"); return 3; } for (int i = 0; i < HTTP_CODE_LEN - 1; i++) if (http_code_s[i] > 0x39 || http_code_s[i] < 0x30) { - mws_error(client->log, "http_proxy HTTP code non numeric"); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy HTTP code non numeric"); return 4; } http_code_s[HTTP_CODE_LEN - 1] = 0; - http_code = atoi(http_code_s); + int http_code = str2i(http_code_s); // TODO check if we ever have more headers here rbuf_find_bytes(buf, HTTP_ENDLINE, strlen(HTTP_ENDLINE), &idx); if (idx >= HTTP_REASON_MAX_LEN) { - mws_error(client->log, "http_proxy returned reason that is too long"); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy returned reason that is too long"); return 5; } if (http_code != 200) { - ptr = mallocz(idx + 1); - if (!ptr) - return 6; + char *ptr = mallocz(idx + 1); rbuf_pop(buf, ptr, idx); ptr[idx] = 0; - mws_error(client->log, "http_proxy returned error code %d \"%s\"", http_code, ptr); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy returned error code %d \"%s\"", http_code, ptr); freez(ptr); return 7; }/* else @@ -374,52 +328,11 @@ static int http_parse_reply(mqtt_wss_client client, rbuf_t buf) rbuf_bump_tail(buf, strlen(HTTP_HDR_TERMINATOR)); if (rbuf_bytes_available(buf)) { - mws_error(client->log, "http_proxy unexpected trailing bytes after end of HTTP hdr"); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy unexpected trailing bytes after end of HTTP hdr"); return 8; } - mws_debug(client->log, "http_proxy CONNECT succeeded"); - return 0; -} - -#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110 -static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void) -{ - EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx)); - - if (ctx != NULL) { - memset(ctx, 0, sizeof(*ctx)); - } - return ctx; -} -static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx) -{ - OPENSSL_free(ctx); - return; -} -#endif - -inline static int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len) -{ - int len; - unsigned char *str = out; - EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new(); - EVP_EncodeInit(ctx); - EVP_EncodeUpdate(ctx, str, outl, in, in_len); - str += *outl; - EVP_EncodeFinal(ctx, str, &len); - *outl += len; - - str = out; - while(*str) { - if (*str != 0x0D && *str != 0x0A) - *out++ = *str++; - else - str++; - } - *out = 0; - - EVP_ENCODE_CTX_free(ctx); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "http_proxy CONNECT succeeded"); return 0; } @@ -430,13 +343,12 @@ static int http_proxy_connect(mqtt_wss_client client) rbuf_t r_buf = rbuf_create(4096); if (!r_buf) return 1; - char *r_buf_ptr; size_t r_buf_linear_insert_capacity; poll_fd.fd = client->sockfd; poll_fd.events = POLLIN; - r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity); + char *r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity); snprintf(r_buf_ptr, r_buf_linear_insert_capacity,"%s %s:%d %s" HTTP_ENDLINE "Host: %s" HTTP_ENDLINE, PROXY_CONNECT, client->target_host, client->target_port, PROXY_HTTP, client->target_host); write(client->sockfd, r_buf_ptr, strlen(r_buf_ptr)); @@ -445,7 +357,7 @@ static int http_proxy_connect(mqtt_wss_client client) size_t creds_plain_len = strlen(client->proxy_uname) + strlen(client->proxy_passwd) + 2; char *creds_plain = mallocz(creds_plain_len); if (!creds_plain) { - mws_error(client->log, "OOM creds_plain"); + nd_log(NDLS_DAEMON, NDLP_ERR, "OOM creds_plain"); rc = 6; goto cleanup; } @@ -456,7 +368,7 @@ static int http_proxy_connect(mqtt_wss_client client) char *creds_base64 = mallocz(creds_base64_len + 1); if (!creds_base64) { freez(creds_plain); - mws_error(client->log, "OOM creds_base64"); + nd_log(NDLS_DAEMON, NDLP_ERR, "OOM creds_base64"); rc = 6; goto cleanup; } @@ -466,8 +378,7 @@ static int http_proxy_connect(mqtt_wss_client client) *ptr++ = ':'; strcpy(ptr, client->proxy_passwd); - int b64_len; - base64_encode_helper((unsigned char*)creds_base64, &b64_len, (unsigned char*)creds_plain, strlen(creds_plain)); + (void) netdata_base64_encode((unsigned char*)creds_base64, (unsigned char*)creds_plain, strlen(creds_plain)); freez(creds_plain); r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity); @@ -482,13 +393,13 @@ static int http_proxy_connect(mqtt_wss_client client) // or timeout while ((rc = poll(&poll_fd, 1, 1000)) >= 0) { if (!rc) { - mws_error(client->log, "http_proxy timeout waiting reply from proxy server"); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy timeout waiting reply from proxy server"); rc = 2; goto cleanup; } r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity); if (!r_buf_ptr) { - mws_error(client->log, "http_proxy read ring buffer full"); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy read ring buffer full"); rc = 3; goto cleanup; } @@ -496,20 +407,20 @@ static int http_proxy_connect(mqtt_wss_client client) if (errno == EWOULDBLOCK || errno == EAGAIN) { continue; } - mws_error(client->log, "http_proxy error reading from socket \"%s\"", strerror(errno)); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy error reading from socket \"%s\"", strerror(errno)); rc = 4; goto cleanup; } rbuf_bump_head(r_buf, rc); if (rbuf_find_bytes(r_buf, HTTP_HDR_TERMINATOR, strlen(HTTP_HDR_TERMINATOR), &rc)) { rc = 0; - if (http_parse_reply(client, r_buf)) + if (http_parse_reply(r_buf)) rc = 5; goto cleanup; } } - mws_error(client->log, "proxy negotiation poll error \"%s\"", strerror(errno)); + nd_log(NDLS_DAEMON, NDLP_ERR, "proxy negotiation poll error \"%s\"", strerror(errno)); rc = 5; cleanup: rbuf_free(r_buf); @@ -522,11 +433,11 @@ int mqtt_wss_connect( int port, struct mqtt_connect_params *mqtt_params, int ssl_flags, - struct mqtt_wss_proxy *proxy, + const struct mqtt_wss_proxy *proxy, bool *fallback_ipv4) { if (!mqtt_params) { - mws_error(client->log, "mqtt_params can't be null!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "mqtt_params can't be null!"); return -1; } @@ -583,7 +494,7 @@ int mqtt_wss_connect( struct timeval timeout = { .tv_sec = 10, .tv_usec = 0 }; int fd = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, client->host, 0, port_str, &timeout, fallback_ipv4); if (fd < 0) { - mws_error(client->log, "Could not connect to remote endpoint \"%s\", port %d.\n", client->host, port); + nd_log(NDLS_DAEMON, NDLP_ERR, "Could not connect to remote endpoint \"%s\", port %d.\n", client->host, port); return -3; } @@ -598,12 +509,12 @@ int mqtt_wss_connect( int flag = 1; int result = setsockopt(client->sockfd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int)); if (result < 0) - mws_error(client->log, "Could not dissable NAGLE"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Could not dissable NAGLE"); client->poll_fds[POLLFD_SOCKET].fd = client->sockfd; if (fcntl(client->sockfd, F_SETFL, fcntl(client->sockfd, F_GETFL, 0) | O_NONBLOCK) == -1) { - mws_error(client->log, "Error setting O_NONBLOCK to TCP socket. \"%s\"", strerror(errno)); + nd_log(NDLS_DAEMON, NDLP_ERR, "Error setting O_NONBLOCK to TCP socket. \"%s\"", strerror(errno)); return -8; } @@ -619,7 +530,7 @@ int mqtt_wss_connect( SSL_library_init(); #else if (OPENSSL_init_ssl(OPENSSL_INIT_LOAD_CONFIG, NULL) != 1) { - mws_error(client->log, "Failed to initialize SSL"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to initialize SSL"); return -1; }; #endif @@ -636,7 +547,7 @@ int mqtt_wss_connect( SSL_CTX_set_default_verify_paths(client->ssl_ctx); SSL_CTX_set_verify(client->ssl_ctx, SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE, cert_verify_callback); } else - mws_error(client->log, "SSL Certificate checking completely disabled!!!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "SSL Certificate checking completely disabled!!!"); #ifdef MQTT_WSS_DEBUG if(client->ssl_ctx_keylog_cb) @@ -646,7 +557,7 @@ int mqtt_wss_connect( client->ssl = SSL_new(client->ssl_ctx); if (!(client->ssl_flags & MQTT_WSS_SSL_DONT_CHECK_CERTS)) { if (!SSL_set_ex_data(client->ssl, 0, client)) { - mws_error(client->log, "Could not SSL_set_ex_data"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Could not SSL_set_ex_data"); return -4; } } @@ -654,27 +565,27 @@ int mqtt_wss_connect( SSL_set_connect_state(client->ssl); if (!SSL_set_tlsext_host_name(client->ssl, client->target_host)) { - mws_error(client->log, "Error setting TLS SNI host"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Error setting TLS SNI host"); return -7; } result = SSL_connect(client->ssl); if (result != -1 && result != 1) { - mws_error(client->log, "SSL could not connect"); + nd_log(NDLS_DAEMON, NDLP_ERR, "SSL could not connect"); return -5; } if (result == -1) { int ec = SSL_get_error(client->ssl, result); if (ec != SSL_ERROR_WANT_READ && ec != SSL_ERROR_WANT_WRITE) { - mws_error(client->log, "Failed to start SSL connection"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to start SSL connection"); return -6; } } client->mqtt_keepalive = (mqtt_params->keep_alive ? mqtt_params->keep_alive : 400); - mws_info(client->log, "Going to connect using internal MQTT 5 implementation"); + nd_log(NDLS_DAEMON, NDLP_INFO, "Going to connect using internal MQTT 5 implementation"); struct mqtt_auth_properties auth; auth.client_id = (char*)mqtt_params->clientid; auth.client_id_free = NULL; @@ -694,7 +605,7 @@ int mqtt_wss_connect( int ret = mqtt_ng_connect(client->mqtt, &auth, mqtt_params->will_msg ? &lwt : NULL, 1, client->mqtt_keepalive); if (ret) { - mws_error(client->log, "Error generating MQTT connect"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Error generating MQTT connect"); return 1; } @@ -703,7 +614,7 @@ int mqtt_wss_connect( // wait till MQTT connection is established while (!client->mqtt_connected) { if(mqtt_wss_service(client, -1)) { - mws_error(client->log, "Error connecting to MQTT WSS server \"%s\", port %d.", host, port); + nd_log(NDLS_DAEMON, NDLP_ERR, "Error connecting to MQTT WSS server \"%s\", port %d.", host, port); return 2; } } @@ -716,14 +627,14 @@ int mqtt_wss_connect( #define NSEC_PER_MSEC 1000000ULL #define NSEC_PER_SEC 1000000000ULL -static inline uint64_t boottime_usec(mqtt_wss_client client) { +static uint64_t boottime_usec(void) { struct timespec ts; #if defined(__APPLE__) || defined(__FreeBSD__) if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1) { #else if (clock_gettime(CLOCK_BOOTTIME, &ts) == -1) { #endif - mws_error(client->log, "clock_gettimte failed"); + nd_log(NDLS_DAEMON, NDLP_ERR, "clock_gettimte failed"); return 0; } return (uint64_t)ts.tv_sec * USEC_PER_SEC + (ts.tv_nsec % NSEC_PER_SEC) / NSEC_PER_USEC; @@ -732,7 +643,7 @@ static inline uint64_t boottime_usec(mqtt_wss_client client) { #define MWS_TIMED_OUT 1 #define MWS_ERROR 2 #define MWS_OK 0 -static inline const char *mqtt_wss_error_tos(int ec) +static const char *mqtt_wss_error_tos(int ec) { switch(ec) { case MWS_TIMED_OUT: @@ -745,13 +656,12 @@ static inline const char *mqtt_wss_error_tos(int ec) } -static inline int mqtt_wss_service_all(mqtt_wss_client client, int timeout_ms) +static int mqtt_wss_service_all(mqtt_wss_client client, int timeout_ms) { - uint64_t exit_by = boottime_usec(client) + (timeout_ms * NSEC_PER_MSEC); - uint64_t now; + uint64_t exit_by = boottime_usec() + (timeout_ms * NSEC_PER_MSEC); client->poll_fds[POLLFD_SOCKET].events |= POLLOUT; // TODO when entering mwtt_wss_service use out buffer size to arm POLLOUT while (rbuf_bytes_available(client->ws_client->buf_write)) { - now = boottime_usec(client); + const uint64_t now = boottime_usec(); if (now >= exit_by) return MWS_TIMED_OUT; if (mqtt_wss_service(client, exit_by - now)) @@ -762,15 +672,13 @@ static inline int mqtt_wss_service_all(mqtt_wss_client client, int timeout_ms) void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms) { - int ret; - // block application from sending more MQTT messages client->mqtt_disconnecting = 1; // send whatever was left at the time of calling this function - ret = mqtt_wss_service_all(client, timeout_ms / 4); + int ret = mqtt_wss_service_all(client, timeout_ms / 4); if(ret) - mws_error(client->log, + nd_log(NDLS_DAEMON, NDLP_ERR, "Error while trying to send all remaining data in an attempt " "to gracefully disconnect! EC=%d Desc:\"%s\"", ret, @@ -782,7 +690,7 @@ void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms) ret = mqtt_wss_service_all(client, timeout_ms / 4); if(ret) - mws_error(client->log, + nd_log(NDLS_DAEMON, NDLP_ERR, "Error while trying to send MQTT disconnect message in an attempt " "to gracefully disconnect! EC=%d Desc:\"%s\"", ret, @@ -795,7 +703,7 @@ void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms) if(ret) { // Some MQTT/WSS servers will close socket on receipt of MQTT disconnect and // do not wait for WebSocket to be closed properly - mws_warn(client->log, + nd_log(NDLS_DAEMON, NDLP_WARNING, "Error while trying to send WebSocket disconnect message in an attempt " "to gracefully disconnect! EC=%d Desc:\"%s\".", ret, @@ -810,22 +718,19 @@ void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms) client->sockfd = -1; } -static inline void mqtt_wss_wakeup(mqtt_wss_client client) +static void mqtt_wss_wakeup(mqtt_wss_client client) { -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "mqtt_wss_wakup - forcing wake up of main loop"); -#endif write(client->write_notif_pipe[PIPE_WRITE_END], " ", 1); } #define THROWAWAY_BUF_SIZE 32 char throwaway[THROWAWAY_BUF_SIZE]; -static inline void util_clear_pipe(int fd) +static void util_clear_pipe(int fd) { (void)read(fd, throwaway, THROWAWAY_BUF_SIZE); } -static inline void set_socket_pollfds(mqtt_wss_client client, int ssl_ret) { +static void set_socket_pollfds(mqtt_wss_client client, int ssl_ret) { if (ssl_ret == SSL_ERROR_WANT_WRITE) client->poll_fds[POLLFD_SOCKET].events |= POLLOUT; if (ssl_ret == SSL_ERROR_WANT_READ) @@ -836,27 +741,25 @@ static int handle_mqtt_internal(mqtt_wss_client client) { int rc = mqtt_ng_sync(client->mqtt); if (rc) { - mws_error(client->log, "mqtt_ng_sync returned %d != 0", rc); + nd_log(NDLS_DAEMON, NDLP_ERR, "mqtt_ng_sync returned %d != 0", rc); client->mqtt_connected = 0; return 1; } return 0; } -#define SEC_TO_MSEC 1000 -static inline long long int t_till_next_keepalive_ms(mqtt_wss_client client) +static int t_till_next_keepalive_ms(mqtt_wss_client client) { time_t last_send = mqtt_ng_last_send_time(client->mqtt); - long long int next_mqtt_keep_alive = (last_send * SEC_TO_MSEC) - + (client->mqtt_keepalive * (SEC_TO_MSEC * 0.75 /* SEND IN ADVANCE */)); - return(next_mqtt_keep_alive - (time(NULL) * SEC_TO_MSEC)); + time_t next_mqtt_keep_alive = last_send + client->mqtt_keepalive * 0.75; + return ((next_mqtt_keep_alive - now_realtime_sec()) * MSEC_PER_SEC); } #ifdef MQTT_WSS_CPUSTATS -static inline uint64_t mqtt_wss_now_usec(mqtt_wss_client client) { +static uint64_t mqtt_wss_now_usec(void) { struct timespec ts; if(clock_gettime(CLOCK_MONOTONIC, &ts) == -1) { - mws_error(client->log, "clock_gettime(CLOCK_MONOTONIC, ×pec) failed."); + nd_log(NDLS_DAEMON, NDLP_ERR, "clock_gettime(CLOCK_MONOTONIC, ×pec) failed."); return 0; } return (uint64_t)ts.tv_sec * USEC_PER_SEC + (ts.tv_nsec % NSEC_PER_SEC) / NSEC_PER_USEC; @@ -871,63 +774,51 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) int send_keepalive = 0; #ifdef MQTT_WSS_CPUSTATS - uint64_t t1,t2; - t1 = mqtt_wss_now_usec(client); -#endif - -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, ">>>>> mqtt_wss_service <<<<<"); - mws_debug(client->log, "Waiting for events: %s%s%s", - (client->poll_fds[POLLFD_SOCKET].events & POLLIN) ? "SOCKET_POLLIN " : "", - (client->poll_fds[POLLFD_SOCKET].events & POLLOUT) ? "SOCKET_POLLOUT " : "", - (client->poll_fds[POLLFD_PIPE].events & POLLIN) ? "PIPE_POLLIN" : "" ); + uint64_t t2; + uint64_t t1 = mqtt_wss_now_usec(); #endif // Check user requested TO doesn't interfere with MQTT keep alives - long long int till_next_keep_alive = t_till_next_keepalive_ms(client); - if (client->mqtt_connected && (timeout_ms < 0 || timeout_ms >= till_next_keep_alive)) { - #ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "Shortening Timeout requested %d to %lld to ensure keep-alive can be sent", timeout_ms, till_next_keep_alive); - #endif - timeout_ms = till_next_keep_alive; - send_keepalive = 1; + if (!ping_timeout) { + int till_next_keep_alive = t_till_next_keepalive_ms(client); + if (till_next_keep_alive < 0) + till_next_keep_alive = 0; + if (client->mqtt_connected && (timeout_ms < 0 || timeout_ms >= till_next_keep_alive)) { + timeout_ms = till_next_keep_alive; + send_keepalive = 1; + } } #ifdef MQTT_WSS_CPUSTATS - t2 = mqtt_wss_now_usec(client); + t2 = mqtt_wss_now_usec(); client->stats.time_keepalive += t2 - t1; #endif if ((ret = poll(client->poll_fds, 2, timeout_ms >= 0 ? timeout_ms : -1)) < 0) { if (errno == EINTR) { - mws_warn(client->log, "poll interrupted by EINTR"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "poll interrupted by EINTR"); return 0; } - mws_error(client->log, "poll error \"%s\"", strerror(errno)); + nd_log(NDLS_DAEMON, NDLP_ERR, "poll error \"%s\"", strerror(errno)); return -2; } -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "Poll events happened: %s%s%s%s", - (client->poll_fds[POLLFD_SOCKET].revents & POLLIN) ? "SOCKET_POLLIN " : "", - (client->poll_fds[POLLFD_SOCKET].revents & POLLOUT) ? "SOCKET_POLLOUT " : "", - (client->poll_fds[POLLFD_PIPE].revents & POLLIN) ? "PIPE_POLLIN " : "", - (!ret) ? "POLL_TIMEOUT" : ""); -#endif - #ifdef MQTT_WSS_CPUSTATS - t1 = mqtt_wss_now_usec(client); + t1 = mqtt_wss_now_usec(); #endif if (ret == 0) { + time_t now = now_realtime_sec(); if (send_keepalive) { // otherwise we shortened the timeout ourselves to take care of // MQTT keep alives -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "Forcing MQTT Ping/keep-alive"); -#endif mqtt_ng_ping(client->mqtt); + ping_timeout = now + PING_TIMEOUT; } else { + if (ping_timeout && ping_timeout < now) { + disconnect_req = ACLK_PING_TIMEOUT; + ping_timeout = 0; + } // if poll timed out and user requested timeout was being used // return here let user do his work and he will call us back soon return 0; @@ -935,7 +826,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) } #ifdef MQTT_WSS_CPUSTATS - t2 = mqtt_wss_now_usec(client); + t2 = mqtt_wss_now_usec(); client->stats.time_keepalive += t2 - t1; #endif @@ -943,9 +834,6 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) if ((ptr = rbuf_get_linear_insert_range(client->ws_client->buf_read, &size))) { if((ret = SSL_read(client->ssl, ptr, size)) > 0) { -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "SSL_Read: Read %d.", ret); -#endif spinlock_lock(&client->stat_lock); client->stats.bytes_rx += ret; spinlock_unlock(&client->stat_lock); @@ -953,22 +841,19 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) } else { int errnobkp = errno; ret = SSL_get_error(client->ssl, ret); -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "Read Err: %s", util_openssl_ret_err(ret)); -#endif set_socket_pollfds(client, ret); if (ret != SSL_ERROR_WANT_READ && ret != SSL_ERROR_WANT_WRITE) { - mws_error(client->log, "SSL_read error: %d %s", ret, util_openssl_ret_err(ret)); + nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_read error: %d %s", ret, util_openssl_ret_err(ret)); if (ret == SSL_ERROR_SYSCALL) - mws_error(client->log, "SSL_read SYSCALL errno: %d %s", errnobkp, strerror(errnobkp)); + nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_read SYSCALL errno: %d %s", errnobkp, strerror(errnobkp)); return MQTT_WSS_ERR_CONN_DROP; } } } #ifdef MQTT_WSS_CPUSTATS - t1 = mqtt_wss_now_usec(client); + t1 = mqtt_wss_now_usec(); client->stats.time_read_socket += t1 - t2; #endif @@ -976,18 +861,20 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) switch(ret) { case WS_CLIENT_PROTOCOL_ERROR: return MQTT_WSS_ERR_PROTO_WS; + case WS_CLIENT_NEED_MORE_BYTES: -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "WSCLIENT WANT READ"); -#endif client->poll_fds[POLLFD_SOCKET].events |= POLLIN; break; + case WS_CLIENT_CONNECTION_CLOSED: return MQTT_WSS_ERR_CONN_DROP; + + default: + return MQTT_WSS_ERR_PROTO_WS; } #ifdef MQTT_WSS_CPUSTATS - t2 = mqtt_wss_now_usec(client); + t2 = mqtt_wss_now_usec(); client->stats.time_process_websocket += t2 - t1; #endif @@ -1002,18 +889,12 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) } #ifdef MQTT_WSS_CPUSTATS - t1 = mqtt_wss_now_usec(client); + t1 = mqtt_wss_now_usec(); client->stats.time_process_mqtt += t1 - t2; #endif if ((ptr = rbuf_get_linear_read_range(client->ws_client->buf_write, &size))) { -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "Have data to write to SSL"); -#endif if ((ret = SSL_write(client->ssl, ptr, size)) > 0) { -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "SSL_Write: Written %d of avail %d.", ret, size); -#endif spinlock_lock(&client->stat_lock); client->stats.bytes_tx += ret; spinlock_unlock(&client->stat_lock); @@ -1021,15 +902,12 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) } else { int errnobkp = errno; ret = SSL_get_error(client->ssl, ret); -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "Write Err: %s", util_openssl_ret_err(ret)); -#endif set_socket_pollfds(client, ret); if (ret != SSL_ERROR_WANT_READ && ret != SSL_ERROR_WANT_WRITE) { - mws_error(client->log, "SSL_write error: %d %s", ret, util_openssl_ret_err(ret)); + nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_write error: %d %s", ret, util_openssl_ret_err(ret)); if (ret == SSL_ERROR_SYSCALL) - mws_error(client->log, "SSL_write SYSCALL errno: %d %s", errnobkp, strerror(errnobkp)); + nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_write SYSCALL errno: %d %s", errnobkp, strerror(errnobkp)); return MQTT_WSS_ERR_CONN_DROP; } } @@ -1039,7 +917,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) util_clear_pipe(client->write_notif_pipe[PIPE_READ_END]); #ifdef MQTT_WSS_CPUSTATS - t2 = mqtt_wss_now_usec(client); + t2 = mqtt_wss_now_usec(); client->stats.time_write_socket += t2 - t1; #endif @@ -1056,12 +934,12 @@ int mqtt_wss_publish5(mqtt_wss_client client, uint16_t *packet_id) { if (client->mqtt_disconnecting) { - mws_error(client->log, "mqtt_wss is disconnecting can't publish"); + nd_log(NDLS_DAEMON, NDLP_ERR, "mqtt_wss is disconnecting can't publish"); return 1; } if (!client->mqtt_connected) { - mws_error(client->log, "MQTT is offline. Can't send message."); + nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT is offline. Can't send message."); return 1; } uint8_t mqtt_flags = 0; @@ -1072,7 +950,7 @@ int mqtt_wss_publish5(mqtt_wss_client client, int rc = mqtt_ng_publish(client->mqtt, topic, topic_free, msg, msg_free, msg_len, mqtt_flags, packet_id); if (rc == MQTT_NG_MSGGEN_MSG_TOO_BIG) - return MQTT_WSS_ERR_TOO_BIG_FOR_SERVER; + return MQTT_WSS_ERR_MSG_TOO_BIG; mqtt_wss_wakeup(client); @@ -1083,12 +961,12 @@ int mqtt_wss_subscribe(mqtt_wss_client client, char *topic, int max_qos_level) { (void)max_qos_level; //TODO now hardcoded if (!client->mqtt_connected) { - mws_error(client->log, "MQTT is offline. Can't subscribe."); + nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT is offline. Can't subscribe."); return 1; } if (client->mqtt_disconnecting) { - mws_error(client->log, "mqtt_wss is disconnecting can't subscribe"); + nd_log(NDLS_DAEMON, NDLP_ERR, "mqtt_wss is disconnecting can't subscribe"); return 1; } diff --git a/src/aclk/mqtt_websockets/mqtt_wss_client.h b/src/aclk/mqtt_websockets/mqtt_wss_client.h index f0bdce98b..2fd94075d 100644 --- a/src/aclk/mqtt_websockets/mqtt_wss_client.h +++ b/src/aclk/mqtt_websockets/mqtt_wss_client.h @@ -1,56 +1,36 @@ -// SPDX-License-Identifier: GPL-3.0-only -// Copyright (C) 2020 Timotej Šiškovič +// SPDX-License-Identifier: GPL-3.0-or-later #ifndef MQTT_WSS_CLIENT_H #define MQTT_WSS_CLIENT_H -#include <stdbool.h> -#include <stdint.h> -#include <stddef.h> //size_t - -#include "mqtt_wss_log.h" #include "common_public.h" -// All OK call me at your earliest convinience -#define MQTT_WSS_OK 0 -/* All OK, poll timeout you requested when calling mqtt_wss_service expired - you might want to know if timeout - * happened or we got some data or handle same as MQTT_WSS_OK - */ -#define MQTT_WSS_OK_TO 1 -// Connection was closed by remote -#define MQTT_WSS_ERR_CONN_DROP -1 -// Error in MQTT protocol (e.g. malformed packet) -#define MQTT_WSS_ERR_PROTO_MQTT -2 -// Error in WebSocket protocol (e.g. malformed packet) -#define MQTT_WSS_ERR_PROTO_WS -3 - -#define MQTT_WSS_ERR_TX_BUF_TOO_SMALL -4 -#define MQTT_WSS_ERR_RX_BUF_TOO_SMALL -5 - -#define MQTT_WSS_ERR_TOO_BIG_FOR_SERVER -6 -// if client was initialized with MQTT 3 but MQTT 5 feature -// was requested by user of library -#define MQTT_WSS_ERR_CANT_DO -8 + +#define MQTT_WSS_OK 0 // All OK call me at your earliest convinience +#define MQTT_WSS_OK_TO 1 // All OK, poll timeout you requested when calling mqtt_wss_service expired + //you might want to know if timeout + //happened or we got some data or handle same as MQTT_WSS_OK +#define MQTT_WSS_ERR_CONN_DROP -1 // Connection was closed by remote +#define MQTT_WSS_ERR_PROTO_MQTT -2 // Error in MQTT protocol (e.g. malformed packet) +#define MQTT_WSS_ERR_PROTO_WS -3 // Error in WebSocket protocol (e.g. malformed packet) +#define MQTT_WSS_ERR_MSG_TOO_BIG -6 // Message size too big for server +#define MQTT_WSS_ERR_CANT_DO -8 // if client was initialized with MQTT 3 but MQTT 5 feature + // was requested by user of library typedef struct mqtt_wss_client_struct *mqtt_wss_client; typedef void (*msg_callback_fnc_t)(const char *topic, const void *msg, size_t msglen, int qos); + /* Creates new instance of MQTT over WSS. Doesn't start connection. - * @param log_prefix this is prefix to be used when logging to discern between multiple - * mqtt_wss instances. Can be NULL. - * @param log_callback is function pointer to fnc to be called when mqtt_wss wants - * to log. This allows plugging this library into your own logging system/solution. - * If NULL STDOUT/STDERR will be used. * @param msg_callback is function pointer to function which will be called * when application level message arrives from broker (for subscribed topics). * Can be NULL if you are not interested about incoming messages. * @param puback_callback is function pointer to function to be called when QOS1 Publish * is acknowledged by server */ -mqtt_wss_client mqtt_wss_new(const char *log_prefix, - mqtt_wss_log_callback_t log_callback, - msg_callback_fnc_t msg_callback, - void (*puback_callback)(uint16_t packet_id)); +mqtt_wss_client mqtt_wss_new( + msg_callback_fnc_t msg_callback, + void (*puback_callback)(uint16_t packet_id)); void mqtt_wss_set_max_buf_size(mqtt_wss_client client, size_t size); @@ -76,7 +56,7 @@ int mqtt_wss_connect( int port, struct mqtt_connect_params *mqtt_params, int ssl_flags, - struct mqtt_wss_proxy *proxy, + const struct mqtt_wss_proxy *proxy, bool *fallback_ipv4); int mqtt_wss_service(mqtt_wss_client client, int timeout_ms); void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms); diff --git a/src/aclk/mqtt_websockets/mqtt_wss_log.c b/src/aclk/mqtt_websockets/mqtt_wss_log.c deleted file mode 100644 index e5da76fcf..000000000 --- a/src/aclk/mqtt_websockets/mqtt_wss_log.c +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright: SPDX-License-Identifier: GPL-3.0-only - -#include <stdlib.h> -#include <stdarg.h> -#include <string.h> -#include <stdio.h> - -#include "mqtt_wss_log.h" -#include "common_internal.h" - -struct mqtt_wss_log_ctx { - mqtt_wss_log_callback_t extern_log_fnc; - char *ctx_prefix; - char *buffer; - char *buffer_w_ptr; - size_t buffer_bytes_avail; -}; - -#define LOG_BUFFER_SIZE 1024 * 4 -#define LOG_CTX_PREFIX_SEV_STR " : " -#define LOG_CTX_PREFIX_LIMIT 15 -#define LOG_CTX_PREFIX_LIMIT_STR (LOG_CTX_PREFIX_LIMIT - (2 + strlen(LOG_CTX_PREFIX_SEV_STR))) // with [] characters and affixed ' ' it is total 15 chars -#if (LOG_CTX_PREFIX_LIMIT * 10) > LOG_BUFFER_SIZE -#error "LOG_BUFFER_SIZE too small" -#endif -mqtt_wss_log_ctx_t mqtt_wss_log_ctx_create(const char *ctx_prefix, mqtt_wss_log_callback_t log_callback) -{ - mqtt_wss_log_ctx_t ctx = callocz(1, sizeof(struct mqtt_wss_log_ctx)); - if(!ctx) - return NULL; - - if(log_callback) { - ctx->extern_log_fnc = log_callback; - ctx->buffer = callocz(1, LOG_BUFFER_SIZE); - if(!ctx->buffer) - goto cleanup; - - ctx->buffer_w_ptr = ctx->buffer; - if(ctx_prefix) { - *(ctx->buffer_w_ptr++) = '['; - strncpy(ctx->buffer_w_ptr, ctx_prefix, LOG_CTX_PREFIX_LIMIT_STR); - ctx->buffer_w_ptr += strnlen(ctx_prefix, LOG_CTX_PREFIX_LIMIT_STR); - *(ctx->buffer_w_ptr++) = ']'; - } - strcpy(ctx->buffer_w_ptr, LOG_CTX_PREFIX_SEV_STR); - ctx->buffer_w_ptr += strlen(LOG_CTX_PREFIX_SEV_STR); - // no term '\0' -> calloc is used - - ctx->buffer_bytes_avail = LOG_BUFFER_SIZE - strlen(ctx->buffer); - - return ctx; - } - - if(ctx_prefix) { - ctx->ctx_prefix = strndup(ctx_prefix, LOG_CTX_PREFIX_LIMIT_STR); - if(!ctx->ctx_prefix) - goto cleanup; - } - - return ctx; - -cleanup: - freez(ctx); - return NULL; -} - -void mqtt_wss_log_ctx_destroy(mqtt_wss_log_ctx_t ctx) -{ - freez(ctx->ctx_prefix); - freez(ctx->buffer); - freez(ctx); -} - -static inline char severity_to_c(int severity) -{ - switch (severity) { - case MQTT_WSS_LOG_FATAL: - return 'F'; - case MQTT_WSS_LOG_ERROR: - return 'E'; - case MQTT_WSS_LOG_WARN: - return 'W'; - case MQTT_WSS_LOG_INFO: - return 'I'; - case MQTT_WSS_LOG_DEBUG: - return 'D'; - default: - return '?'; - } -} - -void mws_log(int severity, mqtt_wss_log_ctx_t ctx, const char *fmt, va_list args) -{ - size_t size; - - if(ctx->extern_log_fnc) { - size = vsnprintf(ctx->buffer_w_ptr, ctx->buffer_bytes_avail, fmt, args); - *(ctx->buffer_w_ptr - 3) = severity_to_c(severity); - - ctx->extern_log_fnc(severity, ctx->buffer); - - if(size >= ctx->buffer_bytes_avail) - mws_error(ctx, "Last message of this type was truncated! Consider what you log or increase LOG_BUFFER_SIZE if really needed."); - - return; - } - - if(ctx->ctx_prefix) - printf("[%s] ", ctx->ctx_prefix); - - printf("%c: ", severity_to_c(severity)); - - vprintf(fmt, args); - putchar('\n'); -} - -#define DEFINE_MWS_SEV_FNC(severity_fncname, severity) \ -void mws_ ## severity_fncname(mqtt_wss_log_ctx_t ctx, const char *fmt, ...) \ -{ \ - va_list args; \ - va_start(args, fmt); \ - mws_log(severity, ctx, fmt, args); \ - va_end(args); \ -} - -DEFINE_MWS_SEV_FNC(fatal, MQTT_WSS_LOG_FATAL) -DEFINE_MWS_SEV_FNC(error, MQTT_WSS_LOG_ERROR) -DEFINE_MWS_SEV_FNC(warn, MQTT_WSS_LOG_WARN ) -DEFINE_MWS_SEV_FNC(info, MQTT_WSS_LOG_INFO ) -DEFINE_MWS_SEV_FNC(debug, MQTT_WSS_LOG_DEBUG) diff --git a/src/aclk/mqtt_websockets/mqtt_wss_log.h b/src/aclk/mqtt_websockets/mqtt_wss_log.h deleted file mode 100644 index 6ae60d870..000000000 --- a/src/aclk/mqtt_websockets/mqtt_wss_log.h +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright: SPDX-License-Identifier: GPL-3.0-only - -#ifndef MQTT_WSS_LOG_H -#define MQTT_WSS_LOG_H - -typedef enum mqtt_wss_log_type { - MQTT_WSS_LOG_DEBUG = 0x01, - MQTT_WSS_LOG_INFO = 0x02, - MQTT_WSS_LOG_WARN = 0x03, - MQTT_WSS_LOG_ERROR = 0x81, - MQTT_WSS_LOG_FATAL = 0x88 -} mqtt_wss_log_type_t; - -typedef void (*mqtt_wss_log_callback_t)(mqtt_wss_log_type_t, const char*); - -typedef struct mqtt_wss_log_ctx *mqtt_wss_log_ctx_t; - -/** Creates logging context with optional prefix and optional callback - * @param ctx_prefix String to be prefixed to every log message. - * This is useful if multiple clients are instantiated to be able to - * know which one this message belongs to. Can be `NULL` for no prefix. - * @param log_callback Callback to be called instead of logging to - * `STDOUT` or `STDERR` (if debug enabled otherwise silent). Callback has to be - * pointer to function of `void function(mqtt_wss_log_type_t, const char*)` type. - * If `NULL` default will be used (silent or STDERR/STDOUT). - * @return mqtt_wss_log_ctx_t or `NULL` on error */ -mqtt_wss_log_ctx_t mqtt_wss_log_ctx_create(const char *ctx_prefix, mqtt_wss_log_callback_t log_callback); - -/** Destroys logging context and cleans up the memory - * @param ctx Context to destroy */ -void mqtt_wss_log_ctx_destroy(mqtt_wss_log_ctx_t ctx); - -void mws_fatal(mqtt_wss_log_ctx_t ctx, const char *fmt, ...); -void mws_error(mqtt_wss_log_ctx_t ctx, const char *fmt, ...); -void mws_warn (mqtt_wss_log_ctx_t ctx, const char *fmt, ...); -void mws_info (mqtt_wss_log_ctx_t ctx, const char *fmt, ...); -void mws_debug(mqtt_wss_log_ctx_t ctx, const char *fmt, ...); - -#endif /* MQTT_WSS_LOG_H */ diff --git a/src/aclk/mqtt_websockets/ws_client.c b/src/aclk/mqtt_websockets/ws_client.c index a6b9b23f3..99ea266c8 100644 --- a/src/aclk/mqtt_websockets/ws_client.c +++ b/src/aclk/mqtt_websockets/ws_client.c @@ -1,103 +1,43 @@ -// Copyright (C) 2020 Timotej Šiškovič -// SPDX-License-Identifier: GPL-3.0-only -// -// This program is free software: you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free Software Foundation, version 3. -// -// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -// See the GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License along with this program. -// If not, see <https://www.gnu.org/licenses/>. - -#include <fcntl.h> -#include <unistd.h> -#include <string.h> -#include <errno.h> -#include <ctype.h> - -#include <openssl/evp.h> +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "libnetdata/libnetdata.h" #include "ws_client.h" #include "common_internal.h" -#ifdef MQTT_WEBSOCKETS_DEBUG -#include "../c-rbuf/src/ringbuffer_internal.h" -#endif - -#define UNIT_LOG_PREFIX "ws_client: " -#define FATAL(fmt, ...) mws_fatal(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define ERROR(fmt, ...) mws_error(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define WARN(fmt, ...) mws_warn (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define INFO(fmt, ...) mws_info (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define DEBUG(fmt, ...) mws_debug(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) - const char *websocket_upgrage_hdr = "GET /mqtt HTTP/1.1\x0D\x0A" "Host: %s\x0D\x0A" "Upgrade: websocket\x0D\x0A" "Connection: Upgrade\x0D\x0A" "Sec-WebSocket-Key: %s\x0D\x0A" - "Origin: http://example.com\x0D\x0A" + "Origin: \x0D\x0A" "Sec-WebSocket-Protocol: mqtt\x0D\x0A" "Sec-WebSocket-Version: 13\x0D\x0A\x0D\x0A"; const char *mqtt_protoid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; #define DEFAULT_RINGBUFFER_SIZE (1024*128) -#define ENTROPY_SOURCE "/dev/urandom" -ws_client *ws_client_new(size_t buf_size, char **host, mqtt_wss_log_ctx_t log) -{ - ws_client *client; +ws_client *ws_client_new(size_t buf_size, char **host) +{ if(!host) return NULL; - client = callocz(1, sizeof(ws_client)); - if (!client) - return NULL; - + ws_client *client = callocz(1, sizeof(ws_client)); client->host = host; - client->log = log; - client->buf_read = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE); - if (!client->buf_read) - goto cleanup; - client->buf_write = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE); - if (!client->buf_write) - goto cleanup_1; - client->buf_to_mqtt = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE); - if (!client->buf_to_mqtt) - goto cleanup_2; - - client->entropy_fd = open(ENTROPY_SOURCE, O_RDONLY | O_CLOEXEC); - if (client->entropy_fd < 1) { - ERROR("Error opening entropy source \"" ENTROPY_SOURCE "\". Reason: \"%s\"", strerror(errno)); - goto cleanup_3; - } return client; - -cleanup_3: - rbuf_free(client->buf_to_mqtt); -cleanup_2: - rbuf_free(client->buf_write); -cleanup_1: - rbuf_free(client->buf_read); -cleanup: - freez(client); - return NULL; } void ws_client_free_headers(ws_client *client) { struct http_header *ptr = client->hs.headers; - struct http_header *tmp; while (ptr) { - tmp = ptr; + struct http_header *tmp = ptr; ptr = ptr->next; freez(tmp); } @@ -112,7 +52,6 @@ void ws_client_destroy(ws_client *client) ws_client_free_headers(client); freez(client->hs.nonce_reply); freez(client->hs.http_reply_msg); - close(client->entropy_fd); rbuf_free(client->buf_read); rbuf_free(client->buf_write); rbuf_free(client->buf_to_mqtt); @@ -141,7 +80,7 @@ void ws_client_reset(ws_client *client) int ws_client_add_http_header(ws_client *client, struct http_header *hdr) { if (client->hs.hdr_count > MAX_HTTP_HDR_COUNT) { - ERROR("Too many HTTP response header fields"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Too many HTTP response header fields"); return -1; } @@ -156,7 +95,7 @@ int ws_client_add_http_header(ws_client *client, struct http_header *hdr) return 0; } -int ws_client_want_write(ws_client *client) +int ws_client_want_write(const ws_client *client) { return rbuf_bytes_available(client->buf_write); } @@ -165,78 +104,89 @@ int ws_client_want_write(ws_client *client) #define TEMP_BUF_SIZE 4096 int ws_client_start_handshake(ws_client *client) { - nd_uuid_t nonce; + unsigned char nonce[WEBSOCKET_NONCE_SIZE]; char nonce_b64[256]; char second[TEMP_BUF_SIZE]; unsigned int md_len; - unsigned char *digest; + unsigned char digest[EVP_MAX_MD_SIZE]; // EVP_MAX_MD_SIZE ensures enough space EVP_MD_CTX *md_ctx; const EVP_MD *md; + int rc = 1; if(!client->host || !*client->host) { - ERROR("Hostname has not been set. We should not be able to come here!"); - return 1; - } - - uuid_generate_random(nonce); - EVP_EncodeBlock((unsigned char *)nonce_b64, (const unsigned char *)nonce, WEBSOCKET_NONCE_SIZE); - snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr, *client->host, nonce_b64); - - if(rbuf_bytes_free(client->buf_write) < strlen(second)) { - ERROR("Write buffer capacity too low."); + nd_log(NDLS_DAEMON, NDLP_ERR, "Hostname has not been set. We should not be able to come here!"); return 1; } - rbuf_push(client->buf_write, second, strlen(second)); - client->state = WS_HANDSHAKE; - - //Calculating expected Sec-WebSocket-Accept reply - snprintf(second, TEMP_BUF_SIZE, "%s%s", nonce_b64, mqtt_protoid); + // Generate a random 16-byte nonce + os_random_bytes(nonce, sizeof(nonce)); + // Initialize the digest context #if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) md_ctx = EVP_MD_CTX_create(); #else md_ctx = EVP_MD_CTX_new(); #endif if (md_ctx == NULL) { - ERROR("Cant create EVP_MD Context"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Can't create EVP_MD context"); return 1; } - md = EVP_get_digestbyname("sha1"); + md = EVP_sha1(); // Use SHA-1 for WebSocket handshake if (!md) { - ERROR("Unknown message digest"); - return 1; + nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown message digest SHA-1"); + goto exit_with_error; } - if ((digest = (unsigned char *)OPENSSL_malloc(EVP_MD_size(EVP_sha256()))) == NULL) { - ERROR("Cant alloc digest"); - return 1; + (void) netdata_base64_encode((unsigned char *) nonce_b64, nonce, WEBSOCKET_NONCE_SIZE); + + // Format and push the upgrade header to the write buffer + size_t bytes = snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr, *client->host, nonce_b64); + if(rbuf_bytes_free(client->buf_write) < bytes) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Write buffer capacity too low."); + goto exit_with_error; } + rbuf_push(client->buf_write, second, bytes); + + client->state = WS_HANDSHAKE; - EVP_DigestInit_ex(md_ctx, md, NULL); - EVP_DigestUpdate(md_ctx, second, strlen(second)); - EVP_DigestFinal_ex(md_ctx, digest, &md_len); + // Create the expected Sec-WebSocket-Accept value + bytes = snprintf(second, TEMP_BUF_SIZE, "%s%s", nonce_b64, mqtt_protoid); - EVP_EncodeBlock((unsigned char *)nonce_b64, digest, (int) md_len); + if (!EVP_DigestInit_ex(md_ctx, md, NULL)) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to initialize digest context"); + goto exit_with_error; + } + + if (!EVP_DigestUpdate(md_ctx, second, bytes)) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to update digest"); + goto exit_with_error; + } + + if (!EVP_DigestFinal_ex(md_ctx, digest, &md_len)) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to finalize digest"); + goto exit_with_error; + } + + (void) netdata_base64_encode((unsigned char *) nonce_b64, digest, md_len); freez(client->hs.nonce_reply); client->hs.nonce_reply = strdupz(nonce_b64); + rc = 0; - OPENSSL_free(digest); - +exit_with_error: #if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) EVP_MD_CTX_destroy(md_ctx); #else EVP_MD_CTX_free(md_ctx); #endif - return 0; + return rc; } #define BUF_READ_MEMCMP_CONST(const, err) \ if (rbuf_memcmp_n(client->buf_read, const, strlen(const))) { \ - ERROR(err); \ + nd_log(NDLS_DAEMON, NDLP_ERR, err); \ rbuf_flush(client->buf_read); \ return WS_CLIENT_PROTOCOL_ERROR; \ } @@ -262,7 +212,7 @@ int ws_client_start_handshake(ws_client *client) #define HTTP_HDR_LINE_CHECK_LIMIT(x) \ if ((x) >= MAX_HTTP_LINE_LENGTH) { \ - ERROR("HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \ + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \ return WS_CLIENT_PROTOCOL_ERROR; \ } @@ -285,13 +235,13 @@ int ws_client_parse_handshake_resp(ws_client *client) BUF_READ_CHECK_AT_LEAST(HTTP_SC_LENGTH); // "XXX " http return code rbuf_pop(client->buf_read, buf, HTTP_SC_LENGTH); if (buf[HTTP_SC_LENGTH - 1] != 0x20) { - ERROR("HTTP status code received is not terminated by space (0x20)"); + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP status code received is not terminated by space (0x20)"); return WS_CLIENT_PROTOCOL_ERROR; } buf[HTTP_SC_LENGTH - 1] = 0; client->hs.http_code = atoi(buf); if (client->hs.http_code < 100 || client->hs.http_code >= 600) { - ERROR("HTTP status code received not in valid range 100-600"); + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP status code received not in valid range 100-600"); return WS_CLIENT_PROTOCOL_ERROR; } client->hs.hdr_state = WS_HDR_ENDLINE; @@ -330,16 +280,16 @@ int ws_client_parse_handshake_resp(ws_client *client) ptr = rbuf_find_bytes(client->buf_read, HTTP_HDR_SEPARATOR, strlen(HTTP_HDR_SEPARATOR), &idx_sep); if (!ptr || idx_sep > idx_crlf) { - ERROR("Expected HTTP hdr field key/value separator \": \" before endline in non empty HTTP header line"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Expected HTTP hdr field key/value separator \": \" before endline in non empty HTTP header line"); return WS_CLIENT_PROTOCOL_ERROR; } if (idx_crlf == idx_sep + (int)strlen(HTTP_HDR_SEPARATOR)) { - ERROR("HTTP Header value cannot be empty"); + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP Header value cannot be empty"); return WS_CLIENT_PROTOCOL_ERROR; } if (idx_sep > HTTP_HEADER_NAME_MAX_LEN) { - ERROR("HTTP header too long (%d)", idx_sep); + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP header too long (%d)", idx_sep); return WS_CLIENT_PROTOCOL_ERROR; } @@ -347,23 +297,21 @@ int ws_client_parse_handshake_resp(ws_client *client) hdr->key = ((char*)hdr) + sizeof(struct http_header); hdr->value = hdr->key + idx_sep + 1; - bytes = rbuf_pop(client->buf_read, hdr->key, idx_sep); + rbuf_pop(client->buf_read, hdr->key, idx_sep); rbuf_bump_tail(client->buf_read, strlen(HTTP_HDR_SEPARATOR)); - bytes = rbuf_pop(client->buf_read, hdr->value, idx_crlf - idx_sep - strlen(HTTP_HDR_SEPARATOR)); + rbuf_pop(client->buf_read, hdr->value, idx_crlf - idx_sep - strlen(HTTP_HDR_SEPARATOR)); rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE)); for (int i = 0; hdr->key[i]; i++) hdr->key[i] = tolower(hdr->key[i]); -// DEBUG("HTTP header \"%s\" received. Value \"%s\"", hdr->key, hdr->value); - if (ws_client_add_http_header(client, hdr)) return WS_CLIENT_PROTOCOL_ERROR; if (!strcmp(hdr->key, WS_CONN_ACCEPT)) { if (strcmp(client->hs.nonce_reply, hdr->value)) { - ERROR("Received NONCE \"%s\" does not match expected nonce of \"%s\"", hdr->value, client->hs.nonce_reply); + nd_log(NDLS_DAEMON, NDLP_ERR, "Received NONCE \"%s\" does not match expected nonce of \"%s\"", hdr->value, client->hs.nonce_reply); return WS_CLIENT_PROTOCOL_ERROR; } client->hs.nonce_matched = 1; @@ -373,21 +321,21 @@ int ws_client_parse_handshake_resp(ws_client *client) case WS_HDR_PARSE_DONE: if (!client->hs.nonce_matched) { - ERROR("Missing " WS_CONN_ACCEPT " header"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Missing " WS_CONN_ACCEPT " header"); return WS_CLIENT_PROTOCOL_ERROR; } if (client->hs.http_code != 101) { - ERROR("HTTP return code not 101. Received %d with msg \"%s\".", client->hs.http_code, client->hs.http_reply_msg); + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP return code not 101. Received %d with msg \"%s\".", client->hs.http_code, client->hs.http_reply_msg); return WS_CLIENT_PROTOCOL_ERROR; } client->state = WS_ESTABLISHED; client->hs.hdr_state = WS_HDR_ALL_DONE; - INFO("Websocket Connection Accepted By Server"); + nd_log(NDLS_DAEMON, NDLP_INFO, "Websocket Connection Accepted By Server"); return WS_CLIENT_PARSING_DONE; case WS_HDR_ALL_DONE: - FATAL("This is error we should never come here!"); + nd_log(NDLS_DAEMON, NDLP_CRIT, "This is error we should never come here!"); return WS_CLIENT_PROTOCOL_ERROR; } return 0; @@ -397,7 +345,7 @@ int ws_client_parse_handshake_resp(ws_client *client) #define WS_FINAL_FRAG BYTE_MSB #define WS_PAYLOAD_MASKED BYTE_MSB -static inline size_t get_ws_hdr_size(size_t payload_size) +static size_t get_ws_hdr_size(size_t payload_size) { size_t hdr_len = 2 + 4 /*mask*/; if(payload_size > 125) @@ -408,7 +356,7 @@ static inline size_t get_ws_hdr_size(size_t payload_size) } #define MAX_POSSIBLE_HDR_LEN 14 -int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size) +int ws_client_send(const ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size) { // TODO maybe? implement fragmenting, it is not necessary though // as both tested MQTT brokers have no reuirement of one MQTT envelope @@ -416,24 +364,16 @@ int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const ch // one big MQTT message as single fragmented WebSocket envelope char hdr[MAX_POSSIBLE_HDR_LEN]; char *ptr = hdr; - char *mask; int size_written = 0; size_t j = 0; size_t w_buff_free = rbuf_bytes_free(client->buf_write); size_t hdr_len = get_ws_hdr_size(size); - if (w_buff_free < hdr_len * 2) { -#ifdef DEBUG_ULTRA_VERBOSE - DEBUG("Write buffer full. Can't write requested %d size.", size); -#endif + if (w_buff_free < hdr_len * 2) return 0; - } if (w_buff_free < (hdr_len + size)) { -#ifdef DEBUG_ULTRA_VERBOSE - DEBUG("Can't write whole MQTT packet of %d bytes into the buffer. Will do partial send of %d.", size, w_buff_free - hdr_len); -#endif size = w_buff_free - hdr_len; hdr_len = get_ws_hdr_size(size); // the actual needed header size might decrease if we cut number of bytes @@ -459,12 +399,10 @@ int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const ch ptr += sizeof(be); } else *ptr++ |= size; - - mask = ptr; - if (read(client->entropy_fd, mask, sizeof(uint32_t)) < (ssize_t)sizeof(uint32_t)) { - ERROR("Unable to get mask from \"" ENTROPY_SOURCE "\""); - return -2; - } + + char *mask = ptr; + uint32_t mask32 = os_random32() + 1; + memcpy(mask, &mask32, sizeof(mask32)); rbuf_push(client->buf_write, hdr, hdr_len); @@ -490,7 +428,7 @@ int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const ch return size_written; } -static int check_opcode(ws_client *client,enum websocket_opcode oc) +static int check_opcode(enum websocket_opcode oc) { switch(oc) { case WS_OP_BINARY_FRAME: @@ -498,34 +436,34 @@ static int check_opcode(ws_client *client,enum websocket_opcode oc) case WS_OP_PING: return 0; case WS_OP_CONTINUATION_FRAME: - FATAL("WS_OP_CONTINUATION_FRAME NOT IMPLEMENTED YET!!!!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "WS_OP_CONTINUATION_FRAME NOT IMPLEMENTED YET!!!!"); return 0; case WS_OP_TEXT_FRAME: - FATAL("WS_OP_TEXT_FRAME NOT IMPLEMENTED YET!!!!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "WS_OP_TEXT_FRAME NOT IMPLEMENTED YET!!!!"); return 0; case WS_OP_PONG: - FATAL("WS_OP_PONG NOT IMPLEMENTED YET!!!!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "WS_OP_PONG NOT IMPLEMENTED YET!!!!"); return 0; default: return WS_CLIENT_PROTOCOL_ERROR; } } -static inline void ws_client_rx_post_hdr_state(ws_client *client) +static void ws_client_rx_post_hdr_state(ws_client *client) { switch(client->rx.opcode) { case WS_OP_BINARY_FRAME: client->rx.parse_state = WS_PAYLOAD_DATA; - return; + break; case WS_OP_CONNECTION_CLOSE: client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE; - return; + break; case WS_OP_PING: client->rx.parse_state = WS_PAYLOAD_PING_REQ_PAYLOAD; - return; + break; default: client->rx.parse_state = WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD; - return; + break; } } @@ -541,15 +479,15 @@ int ws_client_process_rx_ws(ws_client *client) client->rx.opcode = buf[0] & (char)~BYTE_MSB; if (!(buf[0] & (char)~WS_FINAL_FRAG)) { - ERROR("Not supporting fragmented messages yet!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Not supporting fragmented messages yet!"); return WS_CLIENT_PROTOCOL_ERROR; } - if (check_opcode(client, client->rx.opcode) == WS_CLIENT_PROTOCOL_ERROR) + if (check_opcode(client->rx.opcode) == WS_CLIENT_PROTOCOL_ERROR) return WS_CLIENT_PROTOCOL_ERROR; if (buf[1] & (char)WS_PAYLOAD_MASKED) { - ERROR("Mask is not allowed in Server->Client Websocket direction."); + nd_log(NDLS_DAEMON, NDLP_ERR, "Mask is not allowed in Server->Client Websocket direction."); return WS_CLIENT_PROTOCOL_ERROR; } @@ -584,12 +522,8 @@ int ws_client_process_rx_ws(ws_client *client) if (!rbuf_bytes_available(client->buf_read)) return WS_CLIENT_NEED_MORE_BYTES; char *insert = rbuf_get_linear_insert_range(client->buf_to_mqtt, &size); - if (!insert) { -#ifdef DEBUG_ULTRA_VERBOSE - DEBUG("BUFFER TOO FULL. Avail %d req %d", (int)size, (int)remaining); -#endif + if (!insert) return WS_CLIENT_BUFFER_FULL; - } size = (size > remaining) ? remaining : size; size = rbuf_pop(client->buf_read, insert, size); rbuf_bump_head(client->buf_to_mqtt, size); @@ -603,11 +537,11 @@ int ws_client_process_rx_ws(ws_client *client) // b) 2byte reason code // c) 2byte reason code followed by message if (client->rx.payload_length == 1) { - ERROR("WebScoket CONNECTION_CLOSE can't have payload of size 1"); + nd_log(NDLS_DAEMON, NDLP_ERR, "WebScoket CONNECTION_CLOSE can't have payload of size 1"); return WS_CLIENT_PROTOCOL_ERROR; } if (!client->rx.payload_length) { - INFO("WebSocket server closed the connection without giving reason."); + nd_log(NDLS_DAEMON, NDLP_INFO, "WebSocket server closed the connection without giving reason."); client->rx.parse_state = WS_PACKET_DONE; break; } @@ -621,7 +555,7 @@ int ws_client_process_rx_ws(ws_client *client) client->rx.payload_processed += sizeof(uint16_t); if(client->rx.payload_processed == client->rx.payload_length) { - INFO("WebSocket server closed the connection with EC=%d. Without message.", + nd_log(NDLS_DAEMON, NDLP_INFO, "WebSocket server closed the connection with EC=%d. Without message.", client->rx.specific_data.op_close.ec); client->rx.parse_state = WS_PACKET_DONE; break; @@ -640,7 +574,7 @@ int ws_client_process_rx_ws(ws_client *client) client->rx.payload_length - client->rx.payload_processed); } client->rx.specific_data.op_close.reason[client->rx.payload_length] = 0; - INFO("WebSocket server closed the connection with EC=%d and reason \"%s\"", + nd_log(NDLS_DAEMON, NDLP_INFO, "WebSocket server closed the connection with EC=%d and reason \"%s\"", client->rx.specific_data.op_close.ec, client->rx.specific_data.op_close.reason); freez(client->rx.specific_data.op_close.reason); @@ -649,14 +583,14 @@ int ws_client_process_rx_ws(ws_client *client) break; case WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD: BUF_READ_CHECK_AT_LEAST(client->rx.payload_length); - WARN("Skipping Websocket Packet of unsupported/unknown type"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "Skipping Websocket Packet of unsupported/unknown type"); if (client->rx.payload_length) rbuf_bump_tail(client->buf_read, client->rx.payload_length); client->rx.parse_state = WS_PACKET_DONE; return WS_CLIENT_PARSING_DONE; case WS_PAYLOAD_PING_REQ_PAYLOAD: if (client->rx.payload_length > rbuf_get_capacity(client->buf_read) / 2) { - ERROR("Ping arrived with payload which is too big!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Ping arrived with payload which is too big!"); return WS_CLIENT_INTERNAL_ERROR; } BUF_READ_CHECK_AT_LEAST(client->rx.payload_length); @@ -666,7 +600,7 @@ int ws_client_process_rx_ws(ws_client *client) // then attempt to send as soon as buffer space clears up size = ws_client_send(client, WS_OP_PONG, client->rx.specific_data.ping_msg, client->rx.payload_length); if (size != client->rx.payload_length) { - ERROR("Unable to send the PONG as one packet back. Closing connection."); + nd_log(NDLS_DAEMON, NDLP_ERR, "Unable to send the PONG as one packet back. Closing connection."); return WS_CLIENT_PROTOCOL_ERROR; } client->rx.parse_state = WS_PACKET_DONE; @@ -678,7 +612,7 @@ int ws_client_process_rx_ws(ws_client *client) return WS_CLIENT_CONNECTION_CLOSED; return WS_CLIENT_PARSING_DONE; default: - FATAL("Unknown parse state"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown parse state"); return WS_CLIENT_INTERNAL_ERROR; } return 0; @@ -711,6 +645,8 @@ int ws_client_process(ws_client *client) case WS_CLIENT_CONNECTION_CLOSED: client->state = WS_CONN_CLOSED_GRACEFUL; break; + default: + break; } // if ret == 0 we can continue parsing // if ret == WS_CLIENT_PARSING_DONE we processed @@ -719,13 +655,13 @@ int ws_client_process(ws_client *client) } while (!ret || ret == WS_CLIENT_PARSING_DONE); break; case WS_ERROR: - ERROR("ws_client is in error state. Restart the connection!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "ws_client is in error state. Restart the connection!"); return WS_CLIENT_PROTOCOL_ERROR; case WS_CONN_CLOSED_GRACEFUL: - ERROR("Connection has been gracefully closed. Calling this is useless (and probably bug) until you reconnect again."); + nd_log(NDLS_DAEMON, NDLP_ERR, "Connection has been gracefully closed. Calling this is useless (and probably bug) until you reconnect again."); return WS_CLIENT_CONNECTION_CLOSED; default: - FATAL("Unknown connection state! Probably memory corruption."); + nd_log(NDLS_DAEMON, NDLP_CRIT, "Unknown connection state! Probably memory corruption."); return WS_CLIENT_INTERNAL_ERROR; } return ret; diff --git a/src/aclk/mqtt_websockets/ws_client.h b/src/aclk/mqtt_websockets/ws_client.h index 0ccbd29a8..67e5835a2 100644 --- a/src/aclk/mqtt_websockets/ws_client.h +++ b/src/aclk/mqtt_websockets/ws_client.h @@ -1,14 +1,8 @@ -// SPDX-License-Identifier: GPL-3.0-only -// Copyright (C) 2020 Timotej Šiškovič +// SPDX-License-Identifier: GPL-3.0-or-later #ifndef WS_CLIENT_H #define WS_CLIENT_H -#include "c-rbuf/cringbuffer.h" -#include "mqtt_wss_log.h" - -#include <stdint.h> - #define WS_CLIENT_NEED_MORE_BYTES 0x10 #define WS_CLIENT_PARSING_DONE 0x11 #define WS_CLIENT_CONNECTION_CLOSED 0x12 @@ -98,23 +92,20 @@ typedef struct websocket_client { // memory usage and remove one more memcpy buf_read->buf_to_mqtt rbuf_t buf_to_mqtt; // RAW data for MQTT lib - int entropy_fd; - // careful host is borrowed, don't free char **host; - mqtt_wss_log_ctx_t log; } ws_client; -ws_client *ws_client_new(size_t buf_size, char **host, mqtt_wss_log_ctx_t log); +ws_client *ws_client_new(size_t buf_size, char **host); void ws_client_destroy(ws_client *client); void ws_client_reset(ws_client *client); int ws_client_start_handshake(ws_client *client); -int ws_client_want_write(ws_client *client); +int ws_client_want_write(const ws_client *client); int ws_client_process(ws_client *client); -int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size); +int ws_client_send(const ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size); #endif /* WS_CLIENT_H */ |