summaryrefslogtreecommitdiffstats
path: root/src/aclk/mqtt_websockets
diff options
context:
space:
mode:
Diffstat (limited to 'src/aclk/mqtt_websockets')
-rw-r--r--src/aclk/mqtt_websockets/.github/workflows/run-tests.yaml14
-rw-r--r--src/aclk/mqtt_websockets/.gitignore10
-rw-r--r--src/aclk/mqtt_websockets/README.md2
-rw-r--r--src/aclk/mqtt_websockets/c-rbuf/cringbuffer.c203
-rw-r--r--src/aclk/mqtt_websockets/c-rbuf/cringbuffer.h47
-rw-r--r--src/aclk/mqtt_websockets/c-rbuf/cringbuffer_internal.h37
-rw-r--r--src/aclk/mqtt_websockets/c-rbuf/ringbuffer_test.c485
-rw-r--r--src/aclk/mqtt_websockets/c_rhash/c_rhash.c264
-rw-r--r--src/aclk/mqtt_websockets/c_rhash/c_rhash.h61
-rw-r--r--src/aclk/mqtt_websockets/c_rhash/c_rhash_internal.h19
-rw-r--r--src/aclk/mqtt_websockets/c_rhash/tests.c273
-rw-r--r--src/aclk/mqtt_websockets/common_internal.h17
-rw-r--r--src/aclk/mqtt_websockets/common_public.h2
-rw-r--r--src/aclk/mqtt_websockets/mqtt_ng.c541
-rw-r--r--src/aclk/mqtt_websockets/mqtt_ng.h10
-rw-r--r--src/aclk/mqtt_websockets/mqtt_wss_client.c360
-rw-r--r--src/aclk/mqtt_websockets/mqtt_wss_client.h54
-rw-r--r--src/aclk/mqtt_websockets/mqtt_wss_log.c130
-rw-r--r--src/aclk/mqtt_websockets/mqtt_wss_log.h39
-rw-r--r--src/aclk/mqtt_websockets/ws_client.c262
-rw-r--r--src/aclk/mqtt_websockets/ws_client.h17
21 files changed, 476 insertions, 2371 deletions
diff --git a/src/aclk/mqtt_websockets/.github/workflows/run-tests.yaml b/src/aclk/mqtt_websockets/.github/workflows/run-tests.yaml
deleted file mode 100644
index da5dde821..000000000
--- a/src/aclk/mqtt_websockets/.github/workflows/run-tests.yaml
+++ /dev/null
@@ -1,14 +0,0 @@
-name: run-tests
-on:
- push:
- schedule:
- - cron: '5 3 * * 0'
- pull_request:
-jobs:
- run-tests:
- runs-on: ubuntu-latest
- steps:
- - name: Install ruby and deps
- run: sudo apt-get install ruby ruby-dev mosquitto
- - name: Checkout
- uses: actions/checkout@v2
diff --git a/src/aclk/mqtt_websockets/.gitignore b/src/aclk/mqtt_websockets/.gitignore
deleted file mode 100644
index 9f1a0d89a..000000000
--- a/src/aclk/mqtt_websockets/.gitignore
+++ /dev/null
@@ -1,10 +0,0 @@
-build/*
-!build/.keep
-test
-.vscode
-mqtt/mqtt.c
-mqtt/include/mqtt.h
-libmqttwebsockets.*
-*.o
-.dirstamp
-.deps
diff --git a/src/aclk/mqtt_websockets/README.md b/src/aclk/mqtt_websockets/README.md
index b159686df..9507fedb5 100644
--- a/src/aclk/mqtt_websockets/README.md
+++ b/src/aclk/mqtt_websockets/README.md
@@ -4,4 +4,4 @@ Library to connect MQTT client over Websockets Secure (WSS).
## License
-The Project is released under GPL v3 license. See [License](LICENSE)
+The Project is released under GPL v3 license. See [License](/LICENSE)
diff --git a/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.c b/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.c
deleted file mode 100644
index 8950c6906..000000000
--- a/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.c
+++ /dev/null
@@ -1,203 +0,0 @@
-// Copyright: SPDX-License-Identifier: GPL-3.0-only
-
-#include "cringbuffer.h"
-#include "cringbuffer_internal.h"
-
-#include <stdlib.h>
-#include <assert.h>
-#include <string.h>
-
-#define MIN(a,b) (((a)<(b))?(a):(b))
-#define MAX(a,b) (((a)>(b))?(a):(b))
-
-// this allows user to use their own
-// custom memory allocation functions
-#ifdef RBUF_CUSTOM_MALLOC
-#include "../../helpers/ringbuffer_pal.h"
-#else
-#define crbuf_malloc(...) malloc(__VA_ARGS__)
-#define crbuf_free(...) free(__VA_ARGS__)
-#endif
-
-rbuf_t rbuf_create(size_t size)
-{
- rbuf_t buffer = crbuf_malloc(sizeof(struct rbuf_t) + size);
- if (!buffer)
- return NULL;
-
- memset(buffer, 0, sizeof(struct rbuf_t));
-
- buffer->data = ((char*)buffer) + sizeof(struct rbuf_t);
-
- buffer->head = buffer->data;
- buffer->tail = buffer->data;
- buffer->size = size;
- buffer->end = buffer->data + size;
-
- return buffer;
-}
-
-void rbuf_free(rbuf_t buffer)
-{
- crbuf_free(buffer);
-}
-
-void rbuf_flush(rbuf_t buffer)
-{
- buffer->head = buffer->data;
- buffer->tail = buffer->data;
- buffer->size_data = 0;
-}
-
-char *rbuf_get_linear_insert_range(rbuf_t buffer, size_t *bytes)
-{
- *bytes = 0;
- if (buffer->head == buffer->tail && buffer->size_data)
- return NULL;
-
- *bytes = ((buffer->head >= buffer->tail) ? buffer->end : buffer->tail) - buffer->head;
- return buffer->head;
-}
-
-char *rbuf_get_linear_read_range(rbuf_t buffer, size_t *bytes)
-{
- *bytes = 0;
- if(buffer->head == buffer->tail && !buffer->size_data)
- return NULL;
-
- *bytes = ((buffer->tail >= buffer->head) ? buffer->end : buffer->head) - buffer->tail;
-
- return buffer->tail;
-}
-
-int rbuf_bump_head(rbuf_t buffer, size_t bytes)
-{
- size_t free_bytes = rbuf_bytes_free(buffer);
- if (bytes > free_bytes)
- return 0;
- int i = buffer->head - buffer->data;
- buffer->head = &buffer->data[(i + bytes) % buffer->size];
- buffer->size_data += bytes;
- return 1;
-}
-
-int rbuf_bump_tail(rbuf_t buffer, size_t bytes)
-{
- if(!rbuf_bump_tail_noopt(buffer, bytes))
- return 0;
-
- // if tail catched up with head
- // start writing buffer from beggining
- // this is not necessary (rbuf must work well without it)
- // but helps to optimize big writes as rbuf_get_linear_insert_range
- // will return bigger continuous region
- if(buffer->tail == buffer->head) {
- assert(buffer->size_data == 0);
- rbuf_flush(buffer);
- }
-
- return 1;
-}
-
-size_t rbuf_get_capacity(rbuf_t buffer)
-{
- return buffer->size;
-}
-
-size_t rbuf_bytes_available(rbuf_t buffer)
-{
- return buffer->size_data;
-}
-
-size_t rbuf_bytes_free(rbuf_t buffer)
-{
- return buffer->size - buffer->size_data;
-}
-
-size_t rbuf_push(rbuf_t buffer, const char *data, size_t len)
-{
- size_t to_cpy;
- char *w_ptr = rbuf_get_linear_insert_range(buffer, &to_cpy);
- if(!to_cpy)
- return to_cpy;
-
- to_cpy = MIN(to_cpy, len);
- memcpy(w_ptr, data, to_cpy);
- rbuf_bump_head(buffer, to_cpy);
- if(to_cpy < len)
- to_cpy += rbuf_push(buffer, &data[to_cpy], len - to_cpy);
- return to_cpy;
-}
-
-size_t rbuf_pop(rbuf_t buffer, char *data, size_t len)
-{
- size_t to_cpy;
- const char *r_ptr = rbuf_get_linear_read_range(buffer, &to_cpy);
- if(!to_cpy)
- return to_cpy;
-
- to_cpy = MIN(to_cpy, len);
- memcpy(data, r_ptr, to_cpy);
- rbuf_bump_tail(buffer, to_cpy);
- if(to_cpy < len)
- to_cpy += rbuf_pop(buffer, &data[to_cpy], len - to_cpy);
- return to_cpy;
-}
-
-static inline void rbuf_ptr_inc(rbuf_t buffer, const char **ptr)
-{
- (*ptr)++;
- if(*ptr >= buffer->end)
- *ptr = buffer->data;
-}
-
-int rbuf_memcmp(rbuf_t buffer, const char *haystack, const char *needle, size_t needle_bytes)
-{
- const char *end = needle + needle_bytes;
-
- // as head==tail can mean 2 things here
- if (haystack == buffer->head && buffer->size_data) {
- if (*haystack != *needle)
- return (*haystack - *needle);
- rbuf_ptr_inc(buffer, &haystack);
- needle++;
- }
-
- while (haystack != buffer->head && needle != end) {
- if (*haystack != *needle)
- return (*haystack - *needle);
- rbuf_ptr_inc(buffer, &haystack);
- needle++;
- }
- return 0;
-}
-
-int rbuf_memcmp_n(rbuf_t buffer, const char *to_cmp, size_t to_cmp_bytes)
-{
- return rbuf_memcmp(buffer, buffer->tail, to_cmp, to_cmp_bytes);
-}
-
-char *rbuf_find_bytes(rbuf_t buffer, const char *needle, size_t needle_bytes, int *found_idx)
-{
- const char *ptr = buffer->tail;
- *found_idx = 0;
-
- if (!rbuf_bytes_available(buffer))
- return NULL;
-
- if (buffer->head == buffer->tail && buffer->size_data) {
- if(!rbuf_memcmp(buffer, ptr, needle, needle_bytes))
- return (char *)ptr;
- rbuf_ptr_inc(buffer, &ptr);
- (*found_idx)++;
- }
-
- while (ptr != buffer->head)
- {
- if(!rbuf_memcmp(buffer, ptr, needle, needle_bytes))
- return (char *)ptr;
- rbuf_ptr_inc(buffer, &ptr);
- (*found_idx)++;
- }
- return NULL;
-}
diff --git a/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.h b/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.h
deleted file mode 100644
index eb98035a9..000000000
--- a/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.h
+++ /dev/null
@@ -1,47 +0,0 @@
-// Copyright: SPDX-License-Identifier: GPL-3.0-only
-
-#ifndef CRINGBUFFER_H
-#define CRINGBUFFER_H
-
-#include <stddef.h>
-
-typedef struct rbuf_t *rbuf_t;
-
-rbuf_t rbuf_create(size_t size);
-void rbuf_free(rbuf_t buffer);
-void rbuf_flush(rbuf_t buffer);
-
-/* /param bytes how much bytes can be copied into pointer returned
- * /return pointer where data can be copied to or NULL if buffer full
- */
-char *rbuf_get_linear_insert_range(rbuf_t buffer, size_t *bytes);
-char *rbuf_get_linear_read_range(rbuf_t buffer, size_t *bytes);
-
-int rbuf_bump_head(rbuf_t buffer, size_t bytes);
-int rbuf_bump_tail(rbuf_t buffer, size_t bytes);
-
-/* @param buffer related buffer instance
- * @returns total capacity of buffer in bytes (not free/used)
- */
-size_t rbuf_get_capacity(rbuf_t buffer);
-
-/* @param buffer related buffer instance
- * @returns count of bytes stored in the buffer
- */
-size_t rbuf_bytes_available(rbuf_t buffer);
-
-/* @param buffer related buffer instance
- * @returns count of bytes available/free in the buffer (how many more bytes you can store in this buffer)
- */
-size_t rbuf_bytes_free(rbuf_t buffer);
-
-/* writes as many bytes from `data` into the `buffer` as possible
- * but maximum `len` bytes
- */
-size_t rbuf_push(rbuf_t buffer, const char *data, size_t len);
-size_t rbuf_pop(rbuf_t buffer, char *data, size_t len);
-
-char *rbuf_find_bytes(rbuf_t buffer, const char *needle, size_t needle_bytes, int *found_idx);
-int rbuf_memcmp_n(rbuf_t buffer, const char *to_cmp, size_t to_cmp_bytes);
-
-#endif
diff --git a/src/aclk/mqtt_websockets/c-rbuf/cringbuffer_internal.h b/src/aclk/mqtt_websockets/c-rbuf/cringbuffer_internal.h
deleted file mode 100644
index d32de187c..000000000
--- a/src/aclk/mqtt_websockets/c-rbuf/cringbuffer_internal.h
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright: SPDX-License-Identifier: GPL-3.0-only
-
-#ifndef CRINGBUFFER_INTERNAL_H
-#define CRINGBUFFER_INTERNAL_H
-
-struct rbuf_t {
- char *data;
-
- // points to next byte where we can write
- char *head;
- // points to oldest (next to be poped) readable byte
- char *tail;
-
- // to avoid calculating data + size
- // all the time
- char *end;
-
- size_t size;
- size_t size_data;
-};
-
-/* this exists so that it can be tested by unit tests
- * without optimization that resets head and tail to
- * beginning if buffer empty
- */
-inline static int rbuf_bump_tail_noopt(rbuf_t buffer, size_t bytes)
-{
- if (bytes > buffer->size_data)
- return 0;
- int i = buffer->tail - buffer->data;
- buffer->tail = &buffer->data[(i + bytes) % buffer->size];
- buffer->size_data -= bytes;
-
- return 1;
-}
-
-#endif
diff --git a/src/aclk/mqtt_websockets/c-rbuf/ringbuffer_test.c b/src/aclk/mqtt_websockets/c-rbuf/ringbuffer_test.c
deleted file mode 100644
index 6a17c9956..000000000
--- a/src/aclk/mqtt_websockets/c-rbuf/ringbuffer_test.c
+++ /dev/null
@@ -1,485 +0,0 @@
-// Copyright: SPDX-License-Identifier: GPL-3.0-only
-
-#include "ringbuffer.h"
-
-// to be able to access internals
-// never do this from app
-#include "../src/ringbuffer_internal.h"
-
-#include <stdio.h>
-#include <string.h>
-
-#define KNRM "\x1B[0m"
-#define KRED "\x1B[31m"
-#define KGRN "\x1B[32m"
-#define KYEL "\x1B[33m"
-#define KBLU "\x1B[34m"
-#define KMAG "\x1B[35m"
-#define KCYN "\x1B[36m"
-#define KWHT "\x1B[37m"
-
-#define UNUSED(x) (void)(x)
-
-int total_fails = 0;
-int total_tests = 0;
-int total_checks = 0;
-
-#define CHECK_EQ_RESULT(x, y) \
- while (s_len--) \
- putchar('.'); \
- printf("%s%s " KNRM "\n", (((x) == (y)) ? KGRN : KRED), (((x) == (y)) ? " PASS " : " FAIL ")); \
- if ((x) != (y)) \
- total_fails++; \
- total_checks++;
-
-#define CHECK_EQ_PREFIX(x, y, prefix, subtest_name, ...) \
- { \
- int s_len = \
- 100 - \
- printf(("Checking: " KWHT "%s %s%2d " subtest_name " " KNRM), __func__, prefix, subtest_no, ##__VA_ARGS__); \
- CHECK_EQ_RESULT(x, y) \
- }
-
-#define CHECK_EQ(x, y, subtest_name, ...) \
- { \
- int s_len = \
- 100 - printf(("Checking: " KWHT "%s %2d " subtest_name " " KNRM), __func__, subtest_no, ##__VA_ARGS__); \
- CHECK_EQ_RESULT(x, y) \
- }
-
-#define TEST_DECL() \
- int subtest_no = 0; \
- printf(KYEL "TEST SUITE: %s\n" KNRM, __func__); \
- total_tests++;
-
-static void test_rbuf_get_linear_insert_range()
-{
- TEST_DECL();
-
- // check empty buffer behaviour
- rbuf_t buff = rbuf_create(5);
- char *to_write;
- size_t ret;
- to_write = rbuf_get_linear_insert_range(buff, &ret);
- CHECK_EQ(ret, 5, "empty size");
- CHECK_EQ(to_write, buff->head, "empty write ptr");
- rbuf_free(buff);
-
- // check full buffer behaviour
- subtest_no++;
- buff = rbuf_create(5);
- ret = rbuf_bump_head(buff, 5);
- CHECK_EQ(ret, 1, "ret");
- to_write = rbuf_get_linear_insert_range(buff, &ret);
- CHECK_EQ(to_write, NULL, "writable NULL");
- CHECK_EQ(ret, 0, "writable count = 0");
-
- // check buffer flush
- subtest_no++;
- rbuf_flush(buff);
- CHECK_EQ(rbuf_bytes_free(buff), 5, "size_free");
- CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail");
- CHECK_EQ(buff->head, buff->data, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
-
- // check behaviour head > tail
- subtest_no++;
- rbuf_flush(buff);
- rbuf_bump_head(buff, 3);
- to_write = rbuf_get_linear_insert_range(buff, &ret);
- CHECK_EQ(to_write, buff->head, "write location");
- CHECK_EQ(ret, 2, "availible to linear write");
-
- // check behaviour tail > head
- subtest_no++;
- rbuf_flush(buff);
- rbuf_bump_head(buff, 5);
- rbuf_bump_tail(buff, 3);
- CHECK_EQ(buff->head, buff->data, "head_ptr");
- CHECK_EQ(buff->tail, buff->data + 3, "tail_ptr");
- to_write = rbuf_get_linear_insert_range(buff, &ret);
- CHECK_EQ(to_write, buff->head, "write location");
- CHECK_EQ(ret, 3, "availible to linear write");
-
-/* // check behaviour tail and head at last element
- subtest_no++;
- rbuf_flush(buff);
- rbuf_bump_head(buff, 4);
- rbuf_bump_tail(buff, 4);
- CHECK_EQ(buff->head, buff->end - 1, "head_ptr");
- CHECK_EQ(buff->tail, buff->end - 1, "tail_ptr");
- to_write = rbuf_get_linear_insert_range(buff, &ret);
- CHECK_EQ(to_write, buff->head, "write location");
- CHECK_EQ(ret, 1, "availible to linear write");*/
-
- // check behaviour tail and head at last element
- // after rbuf_bump_tail optimisation that restarts buffer
- // in case tail catches up with head
- subtest_no++;
- rbuf_flush(buff);
- rbuf_bump_head(buff, 4);
- rbuf_bump_tail(buff, 4);
- CHECK_EQ(buff->head, buff->data, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
- to_write = rbuf_get_linear_insert_range(buff, &ret);
- CHECK_EQ(to_write, buff->head, "write location");
- CHECK_EQ(ret, 5, "availible to linear write");
-}
-
-#define _CHECK_EQ(x, y, subtest_name, ...) CHECK_EQ_PREFIX(x, y, prefix, subtest_name, ##__VA_ARGS__)
-#define _PREFX "(size = %5zu) "
-static void test_rbuf_bump_head_bsize(size_t size)
-{
- char prefix[16];
- snprintf(prefix, 16, _PREFX, size);
- int subtest_no = 0;
- rbuf_t buff = rbuf_create(size);
- _CHECK_EQ(rbuf_bytes_free(buff), size, "size_free");
-
- subtest_no++;
- int ret = rbuf_bump_head(buff, size);
- _CHECK_EQ(buff->data, buff->head, "loc");
- _CHECK_EQ(ret, 1, "ret");
- _CHECK_EQ(buff->size_data, buff->size, "size");
- _CHECK_EQ(rbuf_bytes_free(buff), 0, "size_free");
-
- subtest_no++;
- ret = rbuf_bump_head(buff, 1);
- _CHECK_EQ(buff->data, buff->head, "loc no move");
- _CHECK_EQ(ret, 0, "ret error");
- _CHECK_EQ(buff->size_data, buff->size, "size");
- _CHECK_EQ(rbuf_bytes_free(buff), 0, "size_free");
- rbuf_free(buff);
-
- subtest_no++;
- buff = rbuf_create(size);
- ret = rbuf_bump_head(buff, size - 1);
- _CHECK_EQ(buff->head, buff->end-1, "loc end");
- rbuf_free(buff);
-}
-#undef _CHECK_EQ
-
-static void test_rbuf_bump_head()
-{
- TEST_DECL();
- UNUSED(subtest_no);
-
- size_t test_sizes[] = { 1, 2, 3, 5, 6, 7, 8, 100, 99999, 0 };
- for (int i = 0; test_sizes[i]; i++)
- test_rbuf_bump_head_bsize(test_sizes[i]);
-}
-
-static void test_rbuf_bump_tail_noopt(int subtest_no)
-{
- rbuf_t buff = rbuf_create(10);
- CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free");
- CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail");
-
- subtest_no++;
- int ret = rbuf_bump_head(buff, 5);
- CHECK_EQ(ret, 1, "ret");
- CHECK_EQ(rbuf_bytes_free(buff), 5, "size_free");
- CHECK_EQ(rbuf_bytes_available(buff), 5, "size_avail");
- CHECK_EQ(buff->head, buff->data + 5, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
-
- subtest_no++;
- ret = rbuf_bump_tail_noopt(buff, 2);
- CHECK_EQ(ret, 1, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 3, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 7, "size_free");
- CHECK_EQ(buff->head, buff->data + 5, "head_ptr");
- CHECK_EQ(buff->tail, buff->data + 2, "tail_ptr");
-
- subtest_no++;
- ret = rbuf_bump_tail_noopt(buff, 3);
- CHECK_EQ(ret, 1, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free");
- CHECK_EQ(buff->head, buff->data + 5, "head_ptr");
- CHECK_EQ(buff->tail, buff->data + 5, "tail_ptr");
-
- subtest_no++;
- ret = rbuf_bump_tail_noopt(buff, 1);
- CHECK_EQ(ret, 0, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free");
- CHECK_EQ(buff->head, buff->data + 5, "head_ptr");
- CHECK_EQ(buff->tail, buff->data + 5, "tail_ptr");
-
- subtest_no++;
- ret = rbuf_bump_head(buff, 7);
- CHECK_EQ(ret, 1, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 7, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 3, "size_free");
- CHECK_EQ(buff->head, buff->data + 2, "head_ptr");
- CHECK_EQ(buff->tail, buff->data + 5, "tail_ptr");
-
- subtest_no++;
- ret = rbuf_bump_tail_noopt(buff, 5);
- CHECK_EQ(ret, 1, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 2, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 8, "size_free");
- CHECK_EQ(buff->head, buff->data + 2, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
-
- // check tail can't overrun head
- subtest_no++;
- ret = rbuf_bump_tail_noopt(buff, 3);
- CHECK_EQ(ret, 0, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 2, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 8, "size_free");
- CHECK_EQ(buff->head, buff->data + 2, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
-
- // check head can't overrun tail
- subtest_no++;
- ret = rbuf_bump_head(buff, 9);
- CHECK_EQ(ret, 0, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 2, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 8, "size_free");
- CHECK_EQ(buff->head, buff->data + 2, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
-
- // check head can fill the buffer
- subtest_no++;
- ret = rbuf_bump_head(buff, 8);
- CHECK_EQ(ret, 1, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 10, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 0, "size_free");
- CHECK_EQ(buff->head, buff->data, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
-
- // check can empty the buffer
- subtest_no++;
- ret = rbuf_bump_tail_noopt(buff, 10);
- CHECK_EQ(ret, 1, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free");
- CHECK_EQ(buff->head, buff->data, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
-}
-
-static void test_rbuf_bump_tail_opt(int subtest_no)
-{
- subtest_no++;
- rbuf_t buff = rbuf_create(10);
- CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free");
- CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail");
-
- subtest_no++;
- int ret = rbuf_bump_head(buff, 5);
- CHECK_EQ(ret, 1, "ret");
- CHECK_EQ(rbuf_bytes_free(buff), 5, "size_free");
- CHECK_EQ(rbuf_bytes_available(buff), 5, "size_avail");
- CHECK_EQ(buff->head, buff->data + 5, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
-
- subtest_no++;
- ret = rbuf_bump_tail(buff, 2);
- CHECK_EQ(ret, 1, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 3, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 7, "size_free");
- CHECK_EQ(buff->head, buff->data + 5, "head_ptr");
- CHECK_EQ(buff->tail, buff->data + 2, "tail_ptr");
-
- subtest_no++;
- ret = rbuf_bump_tail(buff, 3);
- CHECK_EQ(ret, 1, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free");
- CHECK_EQ(buff->head, buff->data, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
-
- subtest_no++;
- ret = rbuf_bump_tail_noopt(buff, 1);
- CHECK_EQ(ret, 0, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free");
- CHECK_EQ(buff->head, buff->data, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
-
- subtest_no++;
- ret = rbuf_bump_head(buff, 6);
- ret = rbuf_bump_tail(buff, 5);
- ret = rbuf_bump_head(buff, 6);
- CHECK_EQ(ret, 1, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 7, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 3, "size_free");
- CHECK_EQ(buff->head, buff->data + 2, "head_ptr");
- CHECK_EQ(buff->tail, buff->data + 5, "tail_ptr");
-
- subtest_no++;
- ret = rbuf_bump_tail(buff, 5);
- CHECK_EQ(ret, 1, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 2, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 8, "size_free");
- CHECK_EQ(buff->head, buff->data + 2, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
-
- // check tail can't overrun head
- subtest_no++;
- ret = rbuf_bump_tail(buff, 3);
- CHECK_EQ(ret, 0, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 2, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 8, "size_free");
- CHECK_EQ(buff->head, buff->data + 2, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
-
- // check head can't overrun tail
- subtest_no++;
- ret = rbuf_bump_head(buff, 9);
- CHECK_EQ(ret, 0, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 2, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 8, "size_free");
- CHECK_EQ(buff->head, buff->data + 2, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
-
- // check head can fill the buffer
- subtest_no++;
- ret = rbuf_bump_head(buff, 8);
- CHECK_EQ(ret, 1, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 10, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 0, "size_free");
- CHECK_EQ(buff->head, buff->data, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
-
- // check can empty the buffer
- subtest_no++;
- ret = rbuf_bump_tail(buff, 10);
- CHECK_EQ(ret, 1, "ret");
- CHECK_EQ(rbuf_bytes_available(buff), 0, "size_avail");
- CHECK_EQ(rbuf_bytes_free(buff), 10, "size_free");
- CHECK_EQ(buff->head, buff->data, "head_ptr");
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
-}
-
-static void test_rbuf_bump_tail()
-{
- TEST_DECL();
- test_rbuf_bump_tail_noopt(subtest_no);
- test_rbuf_bump_tail_opt(subtest_no);
-}
-
-#define ASCII_A 0x61
-#define ASCII_Z 0x7A
-#define TEST_DATA_SIZE ASCII_Z-ASCII_A+1
-static void test_rbuf_push()
-{
- TEST_DECL();
- rbuf_t buff = rbuf_create(10);
- int i;
- char test_data[TEST_DATA_SIZE];
-
- for (int i = 0; i <= TEST_DATA_SIZE; i++)
- test_data[i] = i + ASCII_A;
-
- int ret = rbuf_push(buff, test_data, 10);
- CHECK_EQ(ret, 10, "written 10 bytes");
- CHECK_EQ(rbuf_bytes_free(buff), 0, "empty size == 0");
- for (i = 0; i < 10; i++)
- CHECK_EQ(buff->data[i], i + ASCII_A, "Check data");
-
- subtest_no++;
- rbuf_flush(buff);
- rbuf_bump_head(buff, 5);
- rbuf_bump_tail_noopt(buff, 5); //to not reset both pointers to beginning
- ret = rbuf_push(buff, test_data, 10);
- CHECK_EQ(ret, 10, "written 10 bytes");
- for (i = 0; i < 10; i++)
- CHECK_EQ(buff->data[i], ((i+5)%10) + ASCII_A, "Check Data");
-
- subtest_no++;
- rbuf_flush(buff);
- rbuf_bump_head(buff, 9);
- rbuf_bump_tail_noopt(buff, 9);
- ret = rbuf_push(buff, test_data, 10);
- CHECK_EQ(ret, 10, "written 10 bytes");
- for (i = 0; i < 10; i++)
- CHECK_EQ(buff->data[i], ((i + 1) % 10) + ASCII_A, "Check data");
-
- // let tail > head
- subtest_no++;
- rbuf_flush(buff);
- rbuf_bump_head(buff, 9);
- rbuf_bump_tail_noopt(buff, 9);
- rbuf_bump_head(buff, 1);
- ret = rbuf_push(buff, test_data, 9);
- CHECK_EQ(ret, 9, "written 9 bytes");
- CHECK_EQ(buff->head, buff->end - 1, "head_ptr");
- CHECK_EQ(buff->tail, buff->head, "tail_ptr");
- rbuf_bump_tail(buff, 1);
- //TODO push byte can be usefull optimisation
- ret = rbuf_push(buff, &test_data[9], 1);
- CHECK_EQ(ret, 1, "written 1 byte");
- CHECK_EQ(rbuf_bytes_free(buff), 0, "empty size == 0");
- for (i = 0; i < 10; i++)
- CHECK_EQ(buff->data[i], i + ASCII_A, "Check data");
-
- subtest_no++;
- rbuf_flush(buff);
- rbuf_bump_head(buff, 9);
- rbuf_bump_tail_noopt(buff, 7);
- rbuf_bump_head(buff, 1);
- ret = rbuf_push(buff, test_data, 7);
- CHECK_EQ(ret, 7, "written 7 bytes");
- CHECK_EQ(buff->head, buff->data + 7, "head_ptr");
- CHECK_EQ(buff->tail, buff->head, "tail_ptr");
- rbuf_bump_tail(buff, 3);
- CHECK_EQ(buff->tail, buff->data, "tail_ptr");
- //TODO push byte can be usefull optimisation
- ret = rbuf_push(buff, &test_data[7], 3);
- CHECK_EQ(ret, 3, "written 3 bytes");
- CHECK_EQ(rbuf_bytes_free(buff), 0, "empty size == 0");
- for (i = 0; i < 10; i++)
- CHECK_EQ(buff->data[i], i + ASCII_A, "Check data");
-
- // test can't overfill the buffer
- subtest_no++;
- rbuf_flush(buff);
- rbuf_push(buff, test_data, TEST_DATA_SIZE);
- CHECK_EQ(ret, 3, "written 10 bytes");
- for (i = 0; i < 10; i++)
- CHECK_EQ(buff->data[i], i + ASCII_A, "Check data");
-}
-
-#define TEST_RBUF_FIND_BYTES_SIZE 10
-void test_rbuf_find_bytes()
-{
- TEST_DECL();
- rbuf_t buff = rbuf_create(TEST_RBUF_FIND_BYTES_SIZE);
- char *filler_3 = " ";
- char *needle = "needle";
- int idx;
- char *ptr;
-
- // make sure needle is wrapped aroung in the buffer
- // to test we still can find it
- // target "edle ne"
- rbuf_bump_head(buff, TEST_RBUF_FIND_BYTES_SIZE / 2);
- rbuf_push(buff, filler_3, strlen(filler_3));
- rbuf_bump_tail(buff, TEST_RBUF_FIND_BYTES_SIZE / 2);
- rbuf_push(buff, needle, strlen(needle));
- ptr = rbuf_find_bytes(buff, needle, strlen(needle), &idx);
- CHECK_EQ(ptr, buff->data + (TEST_RBUF_FIND_BYTES_SIZE / 2) + strlen(filler_3), "Pointer to needle correct");
- CHECK_EQ(idx, ptr - buff->tail, "Check needle index");
-}
-
-int main()
-{
- test_rbuf_bump_head();
- test_rbuf_bump_tail();
- test_rbuf_get_linear_insert_range();
- test_rbuf_push();
- test_rbuf_find_bytes();
-
- printf(
- KNRM "Total Tests %d, Total Checks %d, Successful Checks %d, Failed Checks %d\n",
- total_tests, total_checks, total_checks - total_fails, total_fails);
- if (total_fails)
- printf(KRED "!!!Some test(s) Failed!!!\n");
- else
- printf(KGRN "ALL TESTS PASSED\n");
-
- return total_fails;
-}
diff --git a/src/aclk/mqtt_websockets/c_rhash/c_rhash.c b/src/aclk/mqtt_websockets/c_rhash/c_rhash.c
deleted file mode 100644
index a71b500e2..000000000
--- a/src/aclk/mqtt_websockets/c_rhash/c_rhash.c
+++ /dev/null
@@ -1,264 +0,0 @@
-// Copyright: SPDX-License-Identifier: GPL-3.0-only
-
-#include "c_rhash_internal.h"
-
-#include <stdlib.h>
-#include <string.h>
-
-#ifdef DEBUG_VERBOSE
-#include <stdio.h>
-#endif
-
-#define c_rmalloc(...) malloc(__VA_ARGS__)
-#define c_rcalloc(...) calloc(__VA_ARGS__)
-#define c_rfree(...) free(__VA_ARGS__)
-
-static inline uint32_t simple_hash(const char *name) {
- unsigned char *s = (unsigned char *) name;
- uint32_t hval = 0x811c9dc5;
- while (*s) {
- hval *= 16777619;
- hval ^= (uint32_t) *s++;
- }
- return hval;
-}
-
-c_rhash c_rhash_new(size_t bin_count) {
- if (!bin_count)
- bin_count = 1000;
-
- c_rhash hash = c_rcalloc(1, sizeof(struct c_rhash_s) + (bin_count * sizeof(struct bin_ll*)) );
- if (hash == NULL)
- return NULL;
-
- hash->bin_count = bin_count;
- hash->bins = (c_rhash_bin *)((char*)hash + sizeof(struct c_rhash_s));
-
- return hash;
-}
-
-static size_t get_itemtype_len(uint8_t item_type, const void* item_data) {
- switch (item_type) {
- case ITEMTYPE_STRING:
- return strlen(item_data) + 1;
- case ITEMTYPE_UINT64:
- return sizeof(uint64_t);
- case ITEMTYPE_UINT8:
- return 1;
- case ITEMTYPE_OPAQUE_PTR:
- return sizeof(void*);
- default:
- return 0;
- }
-}
-
-static int compare_bin_item(struct bin_item *item, uint8_t key_type, const void *key) {
- if (item->key_type != key_type)
- return 1;
-
- size_t key_value_len = get_itemtype_len(key_type, key);
-
- if(key_type == ITEMTYPE_STRING) {
- size_t new_key_value_len = get_itemtype_len(item->key_type, item->key);
- if (new_key_value_len != key_value_len)
- return 1;
- }
-
- if(memcmp(item->key, key, key_value_len) == 0) {
- return 0;
- }
-
- return 1;
-}
-
-static int insert_into_bin(c_rhash_bin *bin, uint8_t key_type, const void *key, uint8_t value_type, const void *value) {
- struct bin_item *prev = NULL;
- while (*bin != NULL) {
- if (!compare_bin_item(*bin, key_type, key)) {
-#ifdef DEBUG_VERBOSE
- printf("Key already present! Updating value!\n");
-#endif
-// TODO: optimize here if the new value is of different kind compared to the old one
-// in case it is not crazily bigger we can reuse the memory and avoid malloc and free
- c_rfree((*bin)->value);
- (*bin)->value_type = value_type;
- (*bin)->value = c_rmalloc(get_itemtype_len(value_type, value));
- if ((*bin)->value == NULL)
- return 1;
- memcpy((*bin)->value, value, get_itemtype_len(value_type, value));
- return 0;
- }
- prev = *bin;
- bin = &(*bin)->next;
- }
-
- if (*bin == NULL)
- *bin = c_rcalloc(1, sizeof(struct bin_item));
- if (prev != NULL)
- prev->next = *bin;
-
- (*bin)->key_type = key_type;
- size_t len = get_itemtype_len(key_type, key);
- (*bin)->key = c_rmalloc(len);
- memcpy((*bin)->key, key, len);
-
- (*bin)->value_type = value_type;
- len = get_itemtype_len(value_type, value);
- (*bin)->value = c_rmalloc(len);
- memcpy((*bin)->value, value, len);
- return 0;
-}
-
-static inline uint32_t get_bin_idx_str(c_rhash hash, const char *key) {
- uint32_t nhash = simple_hash(key);
- return nhash % hash->bin_count;
-}
-
-static inline c_rhash_bin *get_binptr_by_str(c_rhash hash, const char *key) {
- return &hash->bins[get_bin_idx_str(hash, key)];
-}
-
-int c_rhash_insert_str_ptr(c_rhash hash, const char *key, void *value) {
- c_rhash_bin *bin = get_binptr_by_str(hash, key);
-
-#ifdef DEBUG_VERBOSE
- if (bin != NULL)
- printf("COLLISION. There will be more than one item in bin idx=%d\n", nhash);
-#endif
-
- return insert_into_bin(bin, ITEMTYPE_STRING, key, ITEMTYPE_OPAQUE_PTR, &value);
-}
-
-int c_rhash_insert_str_uint8(c_rhash hash, const char *key, uint8_t value) {
- c_rhash_bin *bin = get_binptr_by_str(hash, key);
-
-#ifdef DEBUG_VERBOSE
- if (bin != NULL)
- printf("COLLISION. There will be more than one item in bin idx=%d\n", nhash);
-#endif
-
- return insert_into_bin(bin, ITEMTYPE_STRING, key, ITEMTYPE_UINT8, &value);
-}
-
-int c_rhash_insert_uint64_ptr(c_rhash hash, uint64_t key, void *value) {
- c_rhash_bin *bin = &hash->bins[key % hash->bin_count];
-
-#ifdef DEBUG_VERBOSE
- if (bin != NULL)
- printf("COLLISION. There will be more than one item in bin idx=%d\n", nhash);
-#endif
-
- return insert_into_bin(bin, ITEMTYPE_UINT64, &key, ITEMTYPE_OPAQUE_PTR, &value);
-}
-
-int c_rhash_get_uint8_by_str(c_rhash hash, const char *key, uint8_t *ret_val) {
- uint32_t nhash = get_bin_idx_str(hash, key);
-
- struct bin_item *bin = hash->bins[nhash];
-
- while (bin) {
- if (bin->key_type == ITEMTYPE_STRING) {
- if (!strcmp(bin->key, key)) {
- *ret_val = *(uint8_t*)bin->value;
- return 0;
- }
- }
- bin = bin->next;
- }
- return 1;
-}
-
-int c_rhash_get_ptr_by_str(c_rhash hash, const char *key, void **ret_val) {
- uint32_t nhash = get_bin_idx_str(hash, key);
-
- struct bin_item *bin = hash->bins[nhash];
-
- while (bin) {
- if (bin->key_type == ITEMTYPE_STRING) {
- if (!strcmp(bin->key, key)) {
- *ret_val = *((void**)bin->value);
- return 0;
- }
- }
- bin = bin->next;
- }
- *ret_val = NULL;
- return 1;
-}
-
-int c_rhash_get_ptr_by_uint64(c_rhash hash, uint64_t key, void **ret_val) {
- uint32_t nhash = key % hash->bin_count;
-
- struct bin_item *bin = hash->bins[nhash];
-
- while (bin) {
- if (bin->key_type == ITEMTYPE_UINT64) {
- if (*((uint64_t *)bin->key) == key) {
- *ret_val = *((void**)bin->value);
- return 0;
- }
- }
- bin = bin->next;
- }
- *ret_val = NULL;
- return 1;
-}
-
-static void c_rhash_destroy_bin(c_rhash_bin bin) {
- struct bin_item *next;
- do {
- next = bin->next;
- c_rfree(bin->key);
- c_rfree(bin->value);
- c_rfree(bin);
- bin = next;
- } while (bin != NULL);
-}
-
-int c_rhash_iter_uint64_keys(c_rhash hash, c_rhash_iter_t *iter, uint64_t *key) {
- while (iter->bin < hash->bin_count) {
- if (iter->item != NULL)
- iter->item = iter->item->next;
- if (iter->item == NULL) {
- if (iter->initialized)
- iter->bin++;
- else
- iter->initialized = 1;
- if (iter->bin < hash->bin_count)
- iter->item = hash->bins[iter->bin];
- }
- if (iter->item != NULL && iter->item->key_type == ITEMTYPE_UINT64) {
- *key = *(uint64_t*)iter->item->key;
- return 0;
- }
- }
- return 1;
-}
-
-int c_rhash_iter_str_keys(c_rhash hash, c_rhash_iter_t *iter, const char **key) {
- while (iter->bin < hash->bin_count) {
- if (iter->item != NULL)
- iter->item = iter->item->next;
- if (iter->item == NULL) {
- if (iter->initialized)
- iter->bin++;
- else
- iter->initialized = 1;
- if (iter->bin < hash->bin_count)
- iter->item = hash->bins[iter->bin];
- }
- if (iter->item != NULL && iter->item->key_type == ITEMTYPE_STRING) {
- *key = (const char*)iter->item->key;
- return 0;
- }
- }
- return 1;
-}
-
-void c_rhash_destroy(c_rhash hash) {
- for (size_t i = 0; i < hash->bin_count; i++) {
- if (hash->bins[i] != NULL)
- c_rhash_destroy_bin(hash->bins[i]);
- }
- c_rfree(hash);
-}
diff --git a/src/aclk/mqtt_websockets/c_rhash/c_rhash.h b/src/aclk/mqtt_websockets/c_rhash/c_rhash.h
deleted file mode 100644
index 37addd161..000000000
--- a/src/aclk/mqtt_websockets/c_rhash/c_rhash.h
+++ /dev/null
@@ -1,61 +0,0 @@
-// Copyright: SPDX-License-Identifier: GPL-3.0-only
-
-#include <sys/types.h>
-#include <stdint.h>
-#include <stddef.h>
-
-#ifndef DEFAULT_BIN_COUNT
- #define DEFAULT_BIN_COUNT 1000
-#endif
-
-#define ITEMTYPE_UNSET (0x0)
-#define ITEMTYPE_STRING (0x1)
-#define ITEMTYPE_UINT8 (0x2)
-#define ITEMTYPE_UINT64 (0x3)
-#define ITEMTYPE_OPAQUE_PTR (0x4)
-
-typedef struct c_rhash_s *c_rhash;
-
-c_rhash c_rhash_new(size_t bin_count);
-
-void c_rhash_destroy(c_rhash hash);
-
-// # Insert
-// ## Insert where key is string
-int c_rhash_insert_str_ptr(c_rhash hash, const char *key, void *value);
-int c_rhash_insert_str_uint8(c_rhash hash, const char *key, uint8_t value);
-// ## Insert where key is uint64
-int c_rhash_insert_uint64_ptr(c_rhash hash, uint64_t key, void *value);
-
-// # Get
-// ## Get where key is string
-int c_rhash_get_ptr_by_str(c_rhash hash, const char *key, void **ret_val);
-int c_rhash_get_uint8_by_str(c_rhash hash, const char *key, uint8_t *ret_val);
-// ## Get where key is uint64
-int c_rhash_get_ptr_by_uint64(c_rhash hash, uint64_t key, void **ret_val);
-
-typedef struct {
- size_t bin;
- struct bin_item *item;
- int initialized;
-} c_rhash_iter_t;
-
-#define C_RHASH_ITER_T_INITIALIZER { .bin = 0, .item = NULL, .initialized = 0 }
-
-#define c_rhash_iter_t_initialize(p_iter) memset(p_iter, 0, sizeof(c_rhash_iter_t))
-
-/*
- * goes trough whole hash map and returns every
- * type uint64 key present/stored
- *
- * it is not necessary to finish iterating and iterator can be reinitialized
- * there are no guarantees on the order in which the keys will come
- * behavior here is implementation dependent and can change any time
- *
- * returns:
- * 0 for every key and stores the key in *key
- * 1 on error or when all keys of this type has been already iterated over
- */
-int c_rhash_iter_uint64_keys(c_rhash hash, c_rhash_iter_t *iter, uint64_t *key);
-
-int c_rhash_iter_str_keys(c_rhash hash, c_rhash_iter_t *iter, const char **key);
diff --git a/src/aclk/mqtt_websockets/c_rhash/c_rhash_internal.h b/src/aclk/mqtt_websockets/c_rhash/c_rhash_internal.h
deleted file mode 100644
index 20f741076..000000000
--- a/src/aclk/mqtt_websockets/c_rhash/c_rhash_internal.h
+++ /dev/null
@@ -1,19 +0,0 @@
-// Copyright: SPDX-License-Identifier: GPL-3.0-only
-
-#include "c_rhash.h"
-
-struct bin_item {
- uint8_t key_type:4;
- void *key;
- uint8_t value_type:4;
- void *value;
-
- struct bin_item *next;
-};
-
-typedef struct bin_item *c_rhash_bin;
-
-struct c_rhash_s {
- size_t bin_count;
- c_rhash_bin *bins;
-};
diff --git a/src/aclk/mqtt_websockets/c_rhash/tests.c b/src/aclk/mqtt_websockets/c_rhash/tests.c
deleted file mode 100644
index 909c5562d..000000000
--- a/src/aclk/mqtt_websockets/c_rhash/tests.c
+++ /dev/null
@@ -1,273 +0,0 @@
-// Copyright: SPDX-License-Identifier: GPL-3.0-only
-
-#include <stdio.h>
-#include <string.h>
-
-#include "c_rhash.h"
-
-// terminal color codes
-#define KNRM "\x1B[0m"
-#define KRED "\x1B[31m"
-#define KGRN "\x1B[32m"
-#define KYEL "\x1B[33m"
-#define KBLU "\x1B[34m"
-#define KMAG "\x1B[35m"
-#define KCYN "\x1B[36m"
-#define KWHT "\x1B[37m"
-
-#define KEY_1 "key1"
-#define KEY_2 "keya"
-
-#define PRINT_ERR(str, ...) fprintf(stderr, "└─╼ ❌ " KRED str KNRM "\n" __VA_OPT__(,) __VA_ARGS__)
-
-#define ASSERT_RETVAL(fnc, comparator, expected_retval, ...) \
-{ int rval; \
-if(!((rval = fnc(__VA_ARGS__)) comparator expected_retval)) { \
- PRINT_ERR("Failed test. Value returned by \"%s\" in fnc:\"%s\",line:%d is not equal to expected value. Expected:%d, Got:%d", #fnc, __FUNCTION__, __LINE__, expected_retval, rval); \
- rc = 1; \
- goto test_cleanup; \
-} passed_subtest_count++;};
-
-#define ASSERT_VAL_UINT8(returned, expected) \
-if(returned != expected) { \
- PRINT_ERR("Failed test. Value returned (%d) doesn't match expected (%d)! fnc:\"%s\",line:%d", returned, expected, __FUNCTION__, __LINE__); \
- rc = 1; \
- goto test_cleanup; \
-} passed_subtest_count++;
-
-#define ASSERT_VAL_PTR(returned, expected) \
-if((void*)returned != (void*)expected) { \
- PRINT_ERR("Failed test. Value returned(%p) doesn't match expected(%p)! fnc:\"%s\",line:%d", (void*)returned, (void*)expected, __FUNCTION__, __LINE__); \
- rc = 1; \
- goto test_cleanup; \
-} passed_subtest_count++;
-
-#define ALL_SUBTESTS_PASS() printf("└─╼ ✅" KGRN " Test \"%s\" DONE. All of %zu subtests PASS. (line:%d)\n" KNRM, __FUNCTION__, passed_subtest_count, __LINE__);
-
-#define TEST_START() size_t passed_subtest_count = 0; int rc = 0; printf("╒═ Starting test \"%s\"\n", __FUNCTION__);
-
-int test_str_uint8() {
- c_rhash hash = c_rhash_new(100);
- uint8_t val;
-
- TEST_START();
- // function should fail on empty hash
- ASSERT_RETVAL(c_rhash_get_uint8_by_str, !=, 0, hash, KEY_1, &val);
-
- ASSERT_RETVAL(c_rhash_insert_str_uint8, ==, 0, hash, KEY_1, 5);
- ASSERT_RETVAL(c_rhash_get_uint8_by_str, ==, 0, hash, KEY_1, &val);
- ASSERT_VAL_UINT8(5, val);
-
- ASSERT_RETVAL(c_rhash_insert_str_uint8, ==, 0, hash, KEY_2, 8);
- ASSERT_RETVAL(c_rhash_get_uint8_by_str, ==, 0, hash, KEY_1, &val);
- ASSERT_VAL_UINT8(5, val);
- ASSERT_RETVAL(c_rhash_get_uint8_by_str, ==, 0, hash, KEY_2, &val);
- ASSERT_VAL_UINT8(8, val);
- ASSERT_RETVAL(c_rhash_get_uint8_by_str, !=, 0, hash, "sndnskjdf", &val);
-
- // test update of key
- ASSERT_RETVAL(c_rhash_insert_str_uint8, ==, 0, hash, KEY_1, 100);
- ASSERT_RETVAL(c_rhash_get_uint8_by_str, ==, 0, hash, KEY_1, &val);
- ASSERT_VAL_UINT8(100, val);
-
- ALL_SUBTESTS_PASS();
-test_cleanup:
- c_rhash_destroy(hash);
- return rc;
-}
-
-int test_uint64_ptr() {
- c_rhash hash = c_rhash_new(100);
- void *val;
-
- TEST_START();
-
- // function should fail on empty hash
- ASSERT_RETVAL(c_rhash_get_ptr_by_uint64, !=, 0, hash, 0, &val);
-
- ASSERT_RETVAL(c_rhash_insert_uint64_ptr, ==, 0, hash, 0, &hash);
- ASSERT_RETVAL(c_rhash_get_ptr_by_uint64, ==, 0, hash, 0, &val);
- ASSERT_VAL_PTR(&hash, val);
-
- ASSERT_RETVAL(c_rhash_insert_uint64_ptr, ==, 0, hash, 1, &val);
- ASSERT_RETVAL(c_rhash_get_ptr_by_uint64, ==, 0, hash, 0, &val);
- ASSERT_VAL_PTR(&hash, val);
- ASSERT_RETVAL(c_rhash_get_ptr_by_uint64, ==, 0, hash, 1, &val);
- ASSERT_VAL_PTR(&val, val);
- ASSERT_RETVAL(c_rhash_get_ptr_by_uint64, !=, 0, hash, 2, &val);
-
- ALL_SUBTESTS_PASS();
-test_cleanup:
- c_rhash_destroy(hash);
- return rc;
-}
-
-#define UINT64_PTR_INC_ITERATION_COUNT 5000
-int test_uint64_ptr_incremental() {
- c_rhash hash = c_rhash_new(100);
- void *val;
-
- TEST_START();
-
- char a = 0x20;
- char *ptr = &a;
- while(ptr < &a + UINT64_PTR_INC_ITERATION_COUNT) {
- ASSERT_RETVAL(c_rhash_insert_uint64_ptr, ==, 0, hash, (ptr-&a), ptr);
- ptr++;
- }
-
- ptr = &a;
- char *retptr;
- for(int i = 0; i < UINT64_PTR_INC_ITERATION_COUNT; i++) {
- ASSERT_RETVAL(c_rhash_get_ptr_by_uint64, ==, 0, hash, i, (void**)&retptr);
- ASSERT_VAL_PTR(retptr, (&a+i));
- }
-
- ALL_SUBTESTS_PASS();
-test_cleanup:
- c_rhash_destroy(hash);
- return rc;
-}
-
-struct test_string {
- const char *str;
- int counter;
-};
-
-struct test_string test_strings[] = {
- { .str = "Cillum reprehenderit eiusmod elit nisi aliquip esse exercitation commodo Lorem voluptate esse.", .counter = 0 },
- { .str = "Ullamco eiusmod tempor occaecat ad.", .counter = 0 },
- { .str = "Esse aliquip tempor sint tempor ullamco duis aute incididunt ad.", .counter = 0 },
- { .str = "Cillum Lorem labore cupidatat commodo proident adipisicing.", .counter = 0 },
- { .str = "Quis ad cillum officia exercitation.", .counter = 0 },
- { .str = "Ipsum enim dolor ullamco amet sint nisi ut occaecat sint non.", .counter = 0 },
- { .str = "Id duis officia ipsum cupidatat velit fugiat.", .counter = 0 },
- { .str = "Aliqua non occaecat voluptate reprehenderit reprehenderit veniam minim exercitation ea aliquip enim aliqua deserunt qui.", .counter = 0 },
- { .str = "Ullamco elit tempor laboris reprehenderit quis deserunt duis quis tempor reprehenderit magna dolore reprehenderit exercitation.", .counter = 0 },
- { .str = "Culpa do dolor quis incididunt et labore in ex.", .counter = 0 },
- { .str = "Aliquip velit cupidatat qui incididunt ipsum nostrud eiusmod ut proident nisi magna fugiat excepteur.", .counter = 0 },
- { .str = "Aliqua qui dolore tempor id proident ullamco sunt magna.", .counter = 0 },
- { .str = "Labore eiusmod ut fugiat dolore reprehenderit mollit magna.", .counter = 0 },
- { .str = "Veniam aliquip dolor excepteur minim nulla esse cupidatat esse.", .counter = 0 },
- { .str = "Do quis dolor irure nostrud occaecat aute proident anim.", .counter = 0 },
- { .str = "Enim veniam non nulla ad quis sit amet.", .counter = 0 },
- { .str = "Cillum reprehenderit do enim esse do ullamco consectetur ea.", .counter = 0 },
- { .str = "Sit et duis sint anim qui ad anim labore exercitation sunt cupidatat.", .counter = 0 },
- { .str = "Dolor officia adipisicing sint pariatur in dolor occaecat officia reprehenderit magna.", .counter = 0 },
- { .str = "Aliquip dolore qui occaecat eiusmod sunt incididunt reprehenderit minim et.", .counter = 0 },
- { .str = "Aute fugiat laboris cillum tempor consequat tempor do non laboris culpa officia nisi.", .counter = 0 },
- { .str = "Et excepteur do aliquip fugiat nisi velit tempor officia enim quis elit incididunt.", .counter = 0 },
- { .str = "Eu officia adipisicing incididunt occaecat officia cupidatat enim sit sit officia.", .counter = 0 },
- { .str = "Do amet cillum duis pariatur commodo nulla cillum magna nulla Lorem veniam cupidatat.", .counter = 0 },
- { .str = "Dolor adipisicing voluptate laboris occaecat culpa aliquip ipsum ut consequat aliqua aliquip commodo sunt velit.", .counter = 0 },
- { .str = "Nulla proident ipsum quis nulla.", .counter = 0 },
- { .str = "Laborum adipisicing nulla do aute aliqua est quis sint culpa pariatur laborum voluptate qui.", .counter = 0 },
- { .str = "Proident eiusmod sunt et nulla elit pariatur dolore irure ex voluptate excepteur adipisicing consectetur.", .counter = 0 },
- { .str = "Consequat ex voluptate officia excepteur aute deserunt proident commodo et.", .counter = 0 },
- { .str = "Velit sit cupidatat dolor dolore.", .counter = 0 },
- { .str = "Sunt enim do non anim nostrud exercitation ullamco ex proident commodo.", .counter = 0 },
- { .str = "Id ex officia cillum ad.", .counter = 0 },
- { .str = "Laboris in sunt eiusmod veniam laboris nostrud.", .counter = 0 },
- { .str = "Ex magna occaecat ea ea incididunt aliquip.", .counter = 0 },
- { .str = "Sunt eiusmod ex nostrud eu pariatur sit cupidatat ea adipisicing cillum culpa esse consequat aliquip.", .counter = 0 },
- { .str = "Excepteur commodo qui incididunt enim culpa sunt non excepteur Lorem adipisicing.", .counter = 0 },
- { .str = "Quis officia est ullamco reprehenderit incididunt occaecat pariatur ex reprehenderit nisi.", .counter = 0 },
- { .str = "Culpa irure proident proident et eiusmod irure aliqua ipsum cupidatat minim sit.", .counter = 0 },
- { .str = "Qui cupidatat aliquip est velit magna veniam.", .counter = 0 },
- { .str = "Pariatur ad ad mollit nostrud non irure minim veniam anim aliquip quis eu.", .counter = 0 },
- { .str = "Nisi ex minim eu adipisicing tempor Lorem nisi do ad exercitation est non eu.", .counter = 0 },
- { .str = "Cupidatat do mollit ad commodo cupidatat ut.", .counter = 0 },
- { .str = "Est non excepteur eiusmod nostrud et eu.", .counter = 0 },
- { .str = "Cupidatat mollit nisi magna officia ut elit eiusmod.", .counter = 0 },
- { .str = "Est aliqua consectetur laboris ex consequat est ut dolor.", .counter = 0 },
- { .str = "Duis eu laboris laborum ut id Lorem nostrud qui ad velit proident fugiat minim ullamco.", .counter = 0 },
- { .str = "Pariatur esse excepteur anim amet excepteur irure sint quis esse ex cupidatat ut.", .counter = 0 },
- { .str = "Esse reprehenderit amet qui excepteur aliquip amet.", .counter = 0 },
- { .str = "Ullamco laboris elit labore adipisicing aute nulla qui laborum tempor officia ut dolor aute.", .counter = 0 },
- { .str = "Commodo sunt cillum velit minim laborum Lorem aliqua tempor ad id eu.", .counter = 0 },
- { .str = NULL, .counter = 0 }
-};
-
-uint32_t test_strings_contain_element(const char *str) {
- struct test_string *str_desc = test_strings;
- while(str_desc->str) {
- if (!strcmp(str, str_desc->str))
- return str_desc - test_strings;
- str_desc++;
- }
- return -1;
-}
-
-#define TEST_INCREMENT_STR_KEYS_HASH_SIZE 20
-int test_increment_str_keys() {
- c_rhash hash;
- const char *key;
-
- TEST_START();
-
- hash = c_rhash_new(TEST_INCREMENT_STR_KEYS_HASH_SIZE); // less than element count of test_strings
-
- c_rhash_iter_t iter = C_RHASH_ITER_T_INITIALIZER;
-
- // check iter on empty hash
- ASSERT_RETVAL(c_rhash_iter_str_keys, !=, 0, hash, &iter, &key);
-
- int32_t element_count = 0;
- while (test_strings[element_count].str) {
- ASSERT_RETVAL(c_rhash_insert_str_ptr, ==, 0, hash, test_strings[element_count].str, NULL);
- test_strings[element_count].counter++; // we want to test we got each key exactly once
- element_count++;
- }
-
- if (element_count <= TEST_INCREMENT_STR_KEYS_HASH_SIZE * 2) {
- // verify we are actually test also iteration trough single bin (when 2 keys have same hash pointing them to same bin)
- PRINT_ERR("For this test to properly test all the hash size needs to be much smaller than all test key count.");
- rc = 1;
- goto test_cleanup;
- }
-
- // we insert another type of key as iterator should skip it
- // in case is another type
- ASSERT_RETVAL(c_rhash_insert_uint64_ptr, ==, 0, hash, 5, NULL);
-
- c_rhash_iter_t_initialize(&iter);
- while(!c_rhash_iter_str_keys(hash, &iter, &key)) {
- element_count--;
- int i;
- if ( (i = test_strings_contain_element(key)) < 0) {
- PRINT_ERR("Key \"%s\" is not present in test_strings array! (Fnc: %s, Line: %d)", key, __FUNCTION__, __LINE__);
- rc = 1;
- goto test_cleanup;
- }
- passed_subtest_count++;
-
- test_strings[i].counter--;
- }
- ASSERT_VAL_UINT8(element_count, 0); // we added also same non string keys
-
- // check each key was present exactly once
- struct test_string *str_desc = test_strings;
- while (str_desc->str) {
- ASSERT_VAL_UINT8(str_desc->counter, 0);
- str_desc++;
- }
-
- ALL_SUBTESTS_PASS();
-test_cleanup:
- c_rhash_destroy(hash);
- return rc;
-}
-
-#define RUN_TEST(fnc) \
-if(fnc()) \
- return 1;
-
-int main(int argc, char *argv[]) {
- RUN_TEST(test_str_uint8);
- RUN_TEST(test_uint64_ptr);
- RUN_TEST(test_uint64_ptr_incremental);
- RUN_TEST(test_increment_str_keys);
- // TODO hash with mixed key tests
- // TODO iterator test
- return 0;
-}
diff --git a/src/aclk/mqtt_websockets/common_internal.h b/src/aclk/mqtt_websockets/common_internal.h
index 2be1c45b8..d79dbb3f3 100644
--- a/src/aclk/mqtt_websockets/common_internal.h
+++ b/src/aclk/mqtt_websockets/common_internal.h
@@ -1,27 +1,12 @@
-// SPDX-License-Identifier: GPL-3.0-only
+// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef COMMON_INTERNAL_H
#define COMMON_INTERNAL_H
#include "endian_compat.h"
-#ifdef MQTT_WSS_CUSTOM_ALLOC
-#include "../helpers/mqtt_wss_pal.h"
-#else
-#define mw_malloc(...) malloc(__VA_ARGS__)
-#define mw_calloc(...) calloc(__VA_ARGS__)
-#define mw_free(...) free(__VA_ARGS__)
-#define mw_strdup(...) strdup(__VA_ARGS__)
-#define mw_realloc(...) realloc(__VA_ARGS__)
-#endif
-
#ifndef MQTT_WSS_FRAG_MEMALIGN
#define MQTT_WSS_FRAG_MEMALIGN (8)
#endif
-#define OPENSSL_VERSION_095 0x00905100L
-#define OPENSSL_VERSION_097 0x00907000L
-#define OPENSSL_VERSION_110 0x10100000L
-#define OPENSSL_VERSION_111 0x10101000L
-
#endif /* COMMON_INTERNAL_H */
diff --git a/src/aclk/mqtt_websockets/common_public.h b/src/aclk/mqtt_websockets/common_public.h
index a855737f9..8f3b4f7d1 100644
--- a/src/aclk/mqtt_websockets/common_public.h
+++ b/src/aclk/mqtt_websockets/common_public.h
@@ -1,3 +1,5 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
#ifndef MQTT_WEBSOCKETS_COMMON_PUBLIC_H
#define MQTT_WEBSOCKETS_COMMON_PUBLIC_H
diff --git a/src/aclk/mqtt_websockets/mqtt_ng.c b/src/aclk/mqtt_websockets/mqtt_ng.c
index 8ad6bd5c9..9abe77b5f 100644
--- a/src/aclk/mqtt_websockets/mqtt_ng.c
+++ b/src/aclk/mqtt_websockets/mqtt_ng.c
@@ -1,35 +1,19 @@
-// Copyright: SPDX-License-Identifier: GPL-3.0-only
+// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
-#include <stdint.h>
-#include <stdlib.h>
-#include <string.h>
-#include <pthread.h>
-#include <inttypes.h>
-
-#include "c_rhash/c_rhash.h"
+#include "libnetdata/libnetdata.h"
#include "common_internal.h"
#include "mqtt_constants.h"
-#include "mqtt_wss_log.h"
#include "mqtt_ng.h"
-#define UNIT_LOG_PREFIX "mqtt_client: "
-#define FATAL(fmt, ...) mws_fatal(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define ERROR(fmt, ...) mws_error(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define WARN(fmt, ...) mws_warn (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define INFO(fmt, ...) mws_info (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define DEBUG(fmt, ...) mws_debug(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-
#define SMALL_STRING_DONT_FRAGMENT_LIMIT 128
-#define MIN(a,b) (((a)<(b))?(a):(b))
-
-#define LOCK_HDR_BUFFER(buffer) pthread_mutex_lock(&((buffer)->mutex))
-#define UNLOCK_HDR_BUFFER(buffer) pthread_mutex_unlock(&((buffer)->mutex))
+#define LOCK_HDR_BUFFER(buffer) spinlock_lock(&((buffer)->spinlock))
+#define UNLOCK_HDR_BUFFER(buffer) spinlock_unlock(&((buffer)->spinlock))
#define BUFFER_FRAG_GARBAGE_COLLECT 0x01
// some packets can be marked for garbage collection
@@ -75,17 +59,17 @@ struct transaction_buffer {
// to be able to revert state easily
// in case of error mid processing
struct header_buffer state_backup;
- pthread_mutex_t mutex;
+ SPINLOCK spinlock;
struct buffer_fragment *sending_frag;
};
enum mqtt_client_state {
- RAW = 0,
- CONNECT_PENDING,
- CONNECTING,
- CONNECTED,
- ERROR,
- DISCONNECTED
+ MQTT_STATE_RAW = 0,
+ MQTT_STATE_CONNECT_PENDING,
+ MQTT_STATE_CONNECTING,
+ MQTT_STATE_CONNECTED,
+ MQTT_STATE_ERROR,
+ MQTT_STATE_DISCONNECTED
};
enum parser_state {
@@ -224,7 +208,7 @@ struct topic_aliases_data {
c_rhash stoi_dict;
uint32_t idx_max;
uint32_t idx_assigned;
- pthread_rwlock_t rwlock;
+ SPINLOCK spinlock;
};
struct mqtt_ng_client {
@@ -234,8 +218,6 @@ struct mqtt_ng_client {
mqtt_msg_data connect_msg;
- mqtt_wss_log_ctx_t log;
-
mqtt_ng_send_fnc_t send_fnc_ptr;
void *user_ctx;
@@ -253,7 +235,7 @@ struct mqtt_ng_client {
unsigned int ping_pending:1;
struct mqtt_ng_stats stats;
- pthread_mutex_t stats_mutex;
+ SPINLOCK stats_spinlock;
struct topic_aliases_data tx_topic_aliases;
c_rhash rx_aliases;
@@ -407,7 +389,7 @@ enum memory_mode {
CALLER_RESPONSIBLE
};
-static inline enum memory_mode ptr2memory_mode(void * ptr) {
+static enum memory_mode ptr2memory_mode(void * ptr) {
if (ptr == NULL)
return MEMCPY;
if (ptr == CALLER_RESPONSIBILITY)
@@ -492,15 +474,8 @@ static void buffer_rebuild(struct header_buffer *buf)
} while(frag);
}
-static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx)
+static void buffer_garbage_collect(struct header_buffer *buf)
{
-#if !defined(MQTT_DEBUG_VERBOSE) && !defined(ADDITIONAL_CHECKS)
- (void) log_ctx;
-#endif
-#ifdef MQTT_DEBUG_VERBOSE
- mws_debug(log_ctx, "Buffer Garbage Collection!");
-#endif
-
struct buffer_fragment *frag = BUFFER_FIRST_FRAG(buf);
while (frag) {
if (!frag_is_marked_for_gc(frag))
@@ -511,12 +486,8 @@ static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t
frag = frag->next;
}
- if (frag == BUFFER_FIRST_FRAG(buf)) {
-#ifdef MQTT_DEBUG_VERBOSE
- mws_debug(log_ctx, "Buffer Garbage Collection! No Space Reclaimed!");
-#endif
+ if (frag == BUFFER_FIRST_FRAG(buf))
return;
- }
if (!frag) {
buf->tail_frag = NULL;
@@ -535,21 +506,17 @@ static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t
buffer_rebuild(buf);
}
-static void transaction_buffer_garbage_collect(struct transaction_buffer *buf, mqtt_wss_log_ctx_t log_ctx)
+static void transaction_buffer_garbage_collect(struct transaction_buffer *buf)
{
-#ifdef MQTT_DEBUG_VERBOSE
- mws_debug(log_ctx, "Transaction Buffer Garbage Collection! %s", buf->sending_frag == NULL ? "NULL" : "in flight message");
-#endif
-
// Invalidate the cached sending fragment
// as we will move data around
if (buf->sending_frag != &ping_frag)
buf->sending_frag = NULL;
- buffer_garbage_collect(&buf->hdr_buffer, log_ctx);
+ buffer_garbage_collect(&buf->hdr_buffer);
}
-static int transaction_buffer_grow(struct transaction_buffer *buf, mqtt_wss_log_ctx_t log_ctx, float rate, size_t max)
+static int transaction_buffer_grow(struct transaction_buffer *buf, float rate, size_t max)
{
if (buf->hdr_buffer.size >= max)
return 0;
@@ -565,35 +532,30 @@ static int transaction_buffer_grow(struct transaction_buffer *buf, mqtt_wss_log_
void *ret = reallocz(buf->hdr_buffer.data, buf->hdr_buffer.size);
if (ret == NULL) {
- mws_warn(log_ctx, "Buffer growth failed (realloc)");
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "Buffer growth failed (realloc)");
return 1;
}
- mws_debug(log_ctx, "Message metadata buffer was grown");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Message metadata buffer was grown");
buf->hdr_buffer.data = ret;
buffer_rebuild(&buf->hdr_buffer);
return 0;
}
-inline static int transaction_buffer_init(struct transaction_buffer *to_init, size_t size)
+inline static void transaction_buffer_init(struct transaction_buffer *to_init, size_t size)
{
- pthread_mutex_init(&to_init->mutex, NULL);
+ spinlock_init(&to_init->spinlock);
to_init->hdr_buffer.size = size;
to_init->hdr_buffer.data = mallocz(size);
- if (to_init->hdr_buffer.data == NULL)
- return 1;
-
to_init->hdr_buffer.tail = to_init->hdr_buffer.data;
to_init->hdr_buffer.tail_frag = NULL;
- return 0;
}
static void transaction_buffer_destroy(struct transaction_buffer *to_init)
{
buffer_purge(&to_init->hdr_buffer);
- pthread_mutex_destroy(&to_init->mutex);
freez(to_init->hdr_buffer.data);
}
@@ -629,54 +591,30 @@ void transaction_buffer_transaction_rollback(struct transaction_buffer *buf, str
struct mqtt_ng_client *mqtt_ng_init(struct mqtt_ng_init *settings)
{
struct mqtt_ng_client *client = callocz(1, sizeof(struct mqtt_ng_client));
- if (client == NULL)
- return NULL;
- if (transaction_buffer_init(&client->main_buffer, HEADER_BUFFER_SIZE))
- goto err_free_client;
+ transaction_buffer_init(&client->main_buffer, HEADER_BUFFER_SIZE);
client->rx_aliases = RX_ALIASES_INITIALIZE();
- if (client->rx_aliases == NULL)
- goto err_free_trx_buf;
- if (pthread_mutex_init(&client->stats_mutex, NULL))
- goto err_free_rx_alias;
+ spinlock_init(&client->stats_spinlock);
+ spinlock_init(&client->tx_topic_aliases.spinlock);
client->tx_topic_aliases.stoi_dict = TX_ALIASES_INITIALIZE();
- if (client->tx_topic_aliases.stoi_dict == NULL)
- goto err_free_stats_mutex;
client->tx_topic_aliases.idx_max = UINT16_MAX;
- if (pthread_rwlock_init(&client->tx_topic_aliases.rwlock, NULL))
- goto err_free_tx_alias;
-
// TODO just embed the struct into mqtt_ng_client
client->parser.received_data = settings->data_in;
client->send_fnc_ptr = settings->data_out_fnc;
client->user_ctx = settings->user_ctx;
- client->log = settings->log;
-
client->puback_callback = settings->puback_callback;
client->connack_callback = settings->connack_callback;
client->msg_callback = settings->msg_callback;
return client;
-
-err_free_tx_alias:
- c_rhash_destroy(client->tx_topic_aliases.stoi_dict);
-err_free_stats_mutex:
- pthread_mutex_destroy(&client->stats_mutex);
-err_free_rx_alias:
- c_rhash_destroy(client->rx_aliases);
-err_free_trx_buf:
- transaction_buffer_destroy(&client->main_buffer);
-err_free_client:
- freez(client);
- return NULL;
}
-static inline uint8_t get_control_packet_type(uint8_t first_hdr_byte)
+static uint8_t get_control_packet_type(uint8_t first_hdr_byte)
{
return first_hdr_byte >> 4;
}
@@ -708,33 +646,27 @@ static void mqtt_ng_destroy_tx_alias_hash(c_rhash hash)
void mqtt_ng_destroy(struct mqtt_ng_client *client)
{
transaction_buffer_destroy(&client->main_buffer);
- pthread_mutex_destroy(&client->stats_mutex);
mqtt_ng_destroy_tx_alias_hash(client->tx_topic_aliases.stoi_dict);
- pthread_rwlock_destroy(&client->tx_topic_aliases.rwlock);
mqtt_ng_destroy_rx_alias_hash(client->rx_aliases);
freez(client);
}
-int frag_set_external_data(mqtt_wss_log_ctx_t log, struct buffer_fragment *frag, void *data, size_t data_len, free_fnc_t data_free_fnc)
+int frag_set_external_data(struct buffer_fragment *frag, void *data, size_t data_len, free_fnc_t data_free_fnc)
{
if (frag->len) {
// TODO?: This could potentially be done in future if we set rule
// external data always follows in buffer data
// could help reduce fragmentation in some messages but
// currently not worth it considering time is tight
- mws_fatal(log, UNIT_LOG_PREFIX "INTERNAL ERROR: Cannot set external data to fragment already containing in buffer data!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "INTERNAL ERROR: Cannot set external data to fragment already containing in buffer data!");
return 1;
}
switch (ptr2memory_mode(data_free_fnc)) {
case MEMCPY:
frag->data = mallocz(data_len);
- if (frag->data == NULL) {
- mws_error(log, UNIT_LOG_PREFIX "OOM while malloc @_optimized_add");
- return 1;
- }
memcpy(frag->data, data, data_len);
break;
case EXTERNAL_FREE_AFTER_USE:
@@ -816,18 +748,18 @@ static size_t mqtt_ng_connect_size(struct mqtt_auth_properties *auth,
#define PACK_2B_INT(buffer, integer, frag) { *(uint16_t *)WRITE_POS(frag) = htobe16((integer)); \
DATA_ADVANCE(buffer, sizeof(uint16_t), frag); }
-static int _optimized_add(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx, void *data, size_t data_len, free_fnc_t data_free_fnc, struct buffer_fragment **frag)
+static int _optimized_add(struct header_buffer *buf, void *data, size_t data_len, free_fnc_t data_free_fnc, struct buffer_fragment **frag)
{
if (data_len > SMALL_STRING_DONT_FRAGMENT_LIMIT) {
buffer_frag_flag_t flags = BUFFER_FRAG_DATA_EXTERNAL;
if ((*frag)->flags & BUFFER_FRAG_GARBAGE_COLLECT_ON_SEND)
flags |= BUFFER_FRAG_GARBAGE_COLLECT_ON_SEND;
if( (*frag = buffer_new_frag(buf, flags)) == NULL ) {
- mws_error(log_ctx, "Out of buffer space while generating the message");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Out of buffer space while generating the message");
return 1;
}
- if (frag_set_external_data(log_ctx, *frag, data, data_len, data_free_fnc)) {
- mws_error(log_ctx, "Error adding external data to newly created fragment");
+ if (frag_set_external_data(*frag, data, data_len, data_free_fnc)) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Error adding external data to newly created fragment");
return 1;
}
// we dont want to write to this fragment anymore
@@ -842,31 +774,30 @@ static int _optimized_add(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx,
return 0;
}
-#define TRY_GENERATE_MESSAGE(generator_function, client, ...) \
- int rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \
+#define TRY_GENERATE_MESSAGE(generator_function, ...) \
+ int rc = generator_function(&client->main_buffer, ##__VA_ARGS__); \
if (rc == MQTT_NG_MSGGEN_BUFFER_OOM) { \
LOCK_HDR_BUFFER(&client->main_buffer); \
- transaction_buffer_garbage_collect((&client->main_buffer), client->log); \
+ transaction_buffer_garbage_collect((&client->main_buffer)); \
UNLOCK_HDR_BUFFER(&client->main_buffer); \
- rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \
+ rc = generator_function(&client->main_buffer, ##__VA_ARGS__); \
if (rc == MQTT_NG_MSGGEN_BUFFER_OOM && client->max_mem_bytes) { \
LOCK_HDR_BUFFER(&client->main_buffer); \
- transaction_buffer_grow((&client->main_buffer), client->log, GROWTH_FACTOR, client->max_mem_bytes); \
+ transaction_buffer_grow((&client->main_buffer),GROWTH_FACTOR, client->max_mem_bytes); \
UNLOCK_HDR_BUFFER(&client->main_buffer); \
- rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \
+ rc = generator_function(&client->main_buffer, ##__VA_ARGS__); \
} \
if (rc == MQTT_NG_MSGGEN_BUFFER_OOM) \
- mws_error(client->log, "%s failed to generate message due to insufficient buffer space (line %d)", __FUNCTION__, __LINE__); \
+ nd_log(NDLS_DAEMON, NDLP_ERR, "%s failed to generate message due to insufficient buffer space (line %d)", __FUNCTION__, __LINE__); \
} \
if (rc == MQTT_NG_MSGGEN_OK) { \
- pthread_mutex_lock(&client->stats_mutex); \
+ spinlock_lock(&client->stats_spinlock); \
client->stats.tx_messages_queued++; \
- pthread_mutex_unlock(&client->stats_mutex); \
+ spinlock_unlock(&client->stats_spinlock); \
} \
return rc;
mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
- mqtt_wss_log_ctx_t log_ctx,
struct mqtt_auth_properties *auth,
struct mqtt_lwt_properties *lwt,
uint8_t clean_start,
@@ -874,7 +805,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
{
// Sanity Checks First (are given parameters correct and up to MQTT spec)
if (!auth->client_id) {
- mws_error(log_ctx, "ClientID must be set. [MQTT-3.1.3-3]");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "ClientID must be set. [MQTT-3.1.3-3]");
return NULL;
}
@@ -885,29 +816,29 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
// however server MUST allow ClientIDs between 1-23 bytes [MQTT-3.1.3-5]
// so we will warn client server might not like this and he is using it
// at his own risk!
- mws_warn(log_ctx, "client_id provided is empty string. This might not be allowed by server [MQTT-3.1.3-6]");
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "client_id provided is empty string. This might not be allowed by server [MQTT-3.1.3-6]");
}
if(len > MQTT_MAX_CLIENT_ID) {
// [MQTT-3.1.3-5] server MUST allow client_id length 1-32
// server MAY allow longer client_id, if user provides longer client_id
// warn them he is doing so at his own risk!
- mws_warn(log_ctx, "client_id provided is longer than 23 bytes, server might not allow that [MQTT-3.1.3-5]");
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "client_id provided is longer than 23 bytes, server might not allow that [MQTT-3.1.3-5]");
}
if (lwt) {
if (lwt->will_message && lwt->will_message_size > 65535) {
- mws_error(log_ctx, "Will message cannot be longer than 65535 bytes due to MQTT protocol limitations [MQTT-3.1.3-4] and [MQTT-1.5.6]");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Will message cannot be longer than 65535 bytes due to MQTT protocol limitations [MQTT-3.1.3-4] and [MQTT-1.5.6]");
return NULL;
}
if (!lwt->will_topic) { //TODO topic given with strlen==0 ? check specs
- mws_error(log_ctx, "If will message is given will topic must also be given [MQTT-3.1.3.3]");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "If will message is given will topic must also be given [MQTT-3.1.3.3]");
return NULL;
}
if (lwt->will_qos > MQTT_MAX_QOS) {
// refer to [MQTT-3-1.2-12]
- mws_error(log_ctx, "QOS for LWT message is bigger than max");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "QOS for LWT message is bigger than max");
return NULL;
}
}
@@ -941,8 +872,10 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
*connect_flags = 0;
if (auth->username)
*connect_flags |= MQTT_CONNECT_FLAG_USERNAME;
+
if (auth->password)
*connect_flags |= MQTT_CONNECT_FLAG_PASSWORD;
+
if (lwt) {
*connect_flags |= MQTT_CONNECT_FLAG_LWT;
*connect_flags |= lwt->will_qos << MQTT_CONNECT_FLAG_QOS_BITSHIFT;
@@ -966,7 +899,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
// [MQTT-3.1.3.1] Client identifier
CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback);
PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->client_id), frag);
- if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, auth->client_id, strlen(auth->client_id), auth->client_id_free, &frag))
+ if (_optimized_add(&trx_buf->hdr_buffer, auth->client_id, strlen(auth->client_id), auth->client_id_free, &frag))
goto fail_rollback;
if (lwt != NULL) {
@@ -980,7 +913,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
// Will Topic [MQTT-3.1.3.3]
CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback);
PACK_2B_INT(&trx_buf->hdr_buffer, strlen(lwt->will_topic), frag);
- if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, lwt->will_topic, strlen(lwt->will_topic), lwt->will_topic_free, &frag))
+ if (_optimized_add(&trx_buf->hdr_buffer, lwt->will_topic, strlen(lwt->will_topic), lwt->will_topic_free, &frag))
goto fail_rollback;
// Will Payload [MQTT-3.1.3.4]
@@ -988,7 +921,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback);
PACK_2B_INT(&trx_buf->hdr_buffer, lwt->will_message_size, frag);
- if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, lwt->will_message, lwt->will_message_size, lwt->will_topic_free, &frag))
+ if (_optimized_add(&trx_buf->hdr_buffer, lwt->will_message, lwt->will_message_size, lwt->will_topic_free, &frag))
goto fail_rollback;
}
}
@@ -998,7 +931,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback);
PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->username), frag);
- if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, auth->username, strlen(auth->username), auth->username_free, &frag))
+ if (_optimized_add(&trx_buf->hdr_buffer, auth->username, strlen(auth->username), auth->username_free, &frag))
goto fail_rollback;
}
@@ -1007,7 +940,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback);
PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->password), frag);
- if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, auth->password, strlen(auth->password), auth->password_free, &frag))
+ if (_optimized_add(&trx_buf->hdr_buffer, auth->password, strlen(auth->password), auth->password_free, &frag))
goto fail_rollback;
}
trx_buf->hdr_buffer.tail_frag->flags |= BUFFER_FRAG_MQTT_PACKET_TAIL;
@@ -1024,7 +957,7 @@ int mqtt_ng_connect(struct mqtt_ng_client *client,
uint8_t clean_start,
uint16_t keep_alive)
{
- client->client_state = RAW;
+ client->client_state = MQTT_STATE_RAW;
client->parser.state = MQTT_PARSE_FIXED_HEADER_PACKET_TYPE;
LOCK_HDR_BUFFER(&client->main_buffer);
@@ -1033,28 +966,23 @@ int mqtt_ng_connect(struct mqtt_ng_client *client,
buffer_purge(&client->main_buffer.hdr_buffer);
UNLOCK_HDR_BUFFER(&client->main_buffer);
- pthread_rwlock_wrlock(&client->tx_topic_aliases.rwlock);
+ spinlock_lock(&client->tx_topic_aliases.spinlock);
// according to MQTT spec topic aliases should not be persisted
// even if clean session is true
mqtt_ng_destroy_tx_alias_hash(client->tx_topic_aliases.stoi_dict);
+
client->tx_topic_aliases.stoi_dict = TX_ALIASES_INITIALIZE();
- if (client->tx_topic_aliases.stoi_dict == NULL) {
- pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock);
- return 1;
- }
client->tx_topic_aliases.idx_assigned = 0;
- pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock);
+ spinlock_unlock(&client->tx_topic_aliases.spinlock);
mqtt_ng_destroy_rx_alias_hash(client->rx_aliases);
client->rx_aliases = RX_ALIASES_INITIALIZE();
- if (client->rx_aliases == NULL)
- return 1;
- client->connect_msg = mqtt_ng_generate_connect(&client->main_buffer, client->log, auth, lwt, clean_start, keep_alive);
+ client->connect_msg = mqtt_ng_generate_connect(&client->main_buffer, auth, lwt, clean_start, keep_alive);
if (client->connect_msg == NULL)
return 1;
- pthread_mutex_lock(&client->stats_mutex);
+ spinlock_lock(&client->stats_spinlock);
if (clean_start)
client->stats.tx_messages_queued = 1;
else
@@ -1062,9 +990,9 @@ int mqtt_ng_connect(struct mqtt_ng_client *client,
client->stats.tx_messages_sent = 0;
client->stats.rx_messages_rcvd = 0;
- pthread_mutex_unlock(&client->stats_mutex);
+ spinlock_unlock(&client->stats_spinlock);
- client->client_state = CONNECT_PENDING;
+ client->client_state = MQTT_STATE_CONNECT_PENDING;
return 0;
}
@@ -1074,15 +1002,16 @@ uint16_t get_unused_packet_id() {
return packet_id ? packet_id : ++packet_id;
}
-static inline size_t mqtt_ng_publish_size(const char *topic,
- size_t msg_len,
- uint16_t topic_id)
+static size_t mqtt_ng_publish_size(
+ const char *topic,
+ size_t msg_len,
+ uint16_t topic_id)
{
- size_t retval = 2 /* Topic Name Length */
- + (topic == NULL ? 0 : strlen(topic))
- + 2 /* Packet identifier */
- + 1 /* Properties Length TODO for now fixed to 1 property */
- + msg_len;
+ size_t retval = 2
+ + (topic == NULL ? 0 : strlen(topic)) /* Topic Name Length */
+ + 2 /* Packet identifier */
+ + 1 /* Properties Length for now fixed to 1 property */
+ + msg_len;
if (topic_id)
retval += 3;
@@ -1091,7 +1020,6 @@ static inline size_t mqtt_ng_publish_size(const char *topic,
}
int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf,
- mqtt_wss_log_ctx_t log_ctx,
char *topic,
free_fnc_t topic_free,
void *msg,
@@ -1130,7 +1058,7 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf,
// [MQTT-3.3.2.1]
PACK_2B_INT(&trx_buf->hdr_buffer, topic == NULL ? 0 : strlen(topic), frag);
if (topic != NULL) {
- if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, topic, strlen(topic), topic_free, &frag))
+ if (_optimized_add(&trx_buf->hdr_buffer, topic, strlen(topic), topic_free, &frag))
goto fail_rollback;
BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
}
@@ -1154,7 +1082,7 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf,
if( (frag = buffer_new_frag(&trx_buf->hdr_buffer, BUFFER_FRAG_DATA_EXTERNAL)) == NULL )
goto fail_rollback;
- if (frag_set_external_data(log_ctx, frag, msg, msg_len, msg_free))
+ if (frag_set_external_data(frag, msg, msg_len, msg_free))
goto fail_rollback;
trx_buf->hdr_buffer.tail_frag->flags |= BUFFER_FRAG_MQTT_PACKET_TAIL;
@@ -1178,9 +1106,9 @@ int mqtt_ng_publish(struct mqtt_ng_client *client,
uint16_t *packet_id)
{
struct topic_alias_data *alias = NULL;
- pthread_rwlock_rdlock(&client->tx_topic_aliases.rwlock);
+ spinlock_lock(&client->tx_topic_aliases.spinlock);
c_rhash_get_ptr_by_str(client->tx_topic_aliases.stoi_dict, topic, (void**)&alias);
- pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock);
+ spinlock_unlock(&client->tx_topic_aliases.spinlock);
uint16_t topic_id = 0;
@@ -1194,14 +1122,14 @@ int mqtt_ng_publish(struct mqtt_ng_client *client,
}
if (client->max_msg_size && PUBLISH_SP_SIZE + mqtt_ng_publish_size(topic, msg_len, topic_id) > client->max_msg_size) {
- mws_error(client->log, "Message too big for server: %zu", msg_len);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Message too big for server: %zu", msg_len);
return MQTT_NG_MSGGEN_MSG_TOO_BIG;
}
- TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, client, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id, topic_id);
+ TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id, topic_id);
}
-static inline size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_count)
+static size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_count)
{
size_t len = 2 /* Packet Identifier */ + 1 /* Properties Length TODO for now fixed 0 */;
len += sub_count * (2 /* topic filter string length */ + 1 /* [MQTT-3.8.3.1] Subscription Options Byte */);
@@ -1212,7 +1140,7 @@ static inline size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_co
return len;
}
-int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, struct mqtt_sub *subs, size_t sub_count)
+int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, struct mqtt_sub *subs, size_t sub_count)
{
// >> START THE RODEO <<
transaction_buffer_transaction_start(trx_buf);
@@ -1247,7 +1175,7 @@ int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, mqtt_wss_log_
for (size_t i = 0; i < sub_count; i++) {
BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
PACK_2B_INT(&trx_buf->hdr_buffer, strlen(subs[i].topic), frag);
- if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, subs[i].topic, strlen(subs[i].topic), subs[i].topic_free, &frag))
+ if (_optimized_add(&trx_buf->hdr_buffer, subs[i].topic, strlen(subs[i].topic), subs[i].topic_free, &frag))
goto fail_rollback;
BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
*WRITE_POS(frag) = subs[i].options;
@@ -1264,12 +1192,11 @@ fail_rollback:
int mqtt_ng_subscribe(struct mqtt_ng_client *client, struct mqtt_sub *subs, size_t sub_count)
{
- TRY_GENERATE_MESSAGE(mqtt_ng_generate_subscribe, client, subs, sub_count);
+ TRY_GENERATE_MESSAGE(mqtt_ng_generate_subscribe, subs, sub_count);
}
-int mqtt_ng_generate_disconnect(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, uint8_t reason_code)
+int mqtt_ng_generate_disconnect(struct transaction_buffer *trx_buf, uint8_t reason_code)
{
- (void) log_ctx;
// >> START THE RODEO <<
transaction_buffer_transaction_start(trx_buf);
@@ -1308,12 +1235,11 @@ fail_rollback:
int mqtt_ng_disconnect(struct mqtt_ng_client *client, uint8_t reason_code)
{
- TRY_GENERATE_MESSAGE(mqtt_ng_generate_disconnect, client, reason_code);
+ TRY_GENERATE_MESSAGE(mqtt_ng_generate_disconnect, reason_code);
}
-static int mqtt_generate_puback(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, uint16_t packet_id, uint8_t reason_code)
+static int mqtt_generate_puback(struct transaction_buffer *trx_buf, uint16_t packet_id, uint8_t reason_code)
{
- (void) log_ctx;
// >> START THE RODEO <<
transaction_buffer_transaction_start(trx_buf);
@@ -1353,7 +1279,7 @@ fail_rollback:
static int mqtt_ng_puback(struct mqtt_ng_client *client, uint16_t packet_id, uint8_t reason_code)
{
- TRY_GENERATE_MESSAGE(mqtt_generate_puback, client, packet_id, reason_code);
+ TRY_GENERATE_MESSAGE(mqtt_generate_puback, packet_id, reason_code);
}
int mqtt_ng_ping(struct mqtt_ng_client *client)
@@ -1370,7 +1296,6 @@ int mqtt_ng_ping(struct mqtt_ng_client *client)
#define MQTT_NG_CLIENT_PROTOCOL_ERROR -1
#define MQTT_NG_CLIENT_SERVER_RETURNED_ERROR -2
#define MQTT_NG_CLIENT_NOT_IMPL_YET -3
-#define MQTT_NG_CLIENT_OOM -4
#define MQTT_NG_CLIENT_INTERNAL_ERROR -5
#define BUF_READ_CHECK_AT_LEAST(buf, x) \
@@ -1379,10 +1304,10 @@ int mqtt_ng_ping(struct mqtt_ng_client *client)
#define vbi_parser_reset_ctx(ctx) memset(ctx, 0, sizeof(struct mqtt_vbi_parser_ctx))
-static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data, mqtt_wss_log_ctx_t log)
+static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data)
{
if (ctx->bytes > MQTT_VBI_MAXBYTES - 1) {
- mws_error(log, "MQTT Variable Byte Integer can't be longer than %d bytes", MQTT_VBI_MAXBYTES);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT Variable Byte Integer can't be longer than %d bytes", MQTT_VBI_MAXBYTES);
return MQTT_NG_CLIENT_PROTOCOL_ERROR;
}
if (!ctx->bytes || ctx->data[ctx->bytes-1] & MQTT_VBI_CONTINUATION_FLAG) {
@@ -1394,7 +1319,7 @@ static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data, mqtt_w
}
if (mqtt_vbi_to_uint32(ctx->data, &ctx->result)) {
- mws_error(log, "MQTT Variable Byte Integer failed to be parsed.");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT Variable Byte Integer failed to be parsed.");
return MQTT_NG_CLIENT_PROTOCOL_ERROR;
}
@@ -1480,12 +1405,12 @@ struct mqtt_property *get_property_by_id(struct mqtt_property *props, uint8_t pr
}
// Parses [MQTT-2.2.2]
-static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t data, mqtt_wss_log_ctx_t log)
+static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t data)
{
int rc;
switch (ctx->state) {
case PROPERTIES_LENGTH:
- rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data, log);
+ rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data);
if (rc == MQTT_NG_CLIENT_PARSE_DONE) {
ctx->properties_length = ctx->vbi_parser_ctx.result;
ctx->bytes_consumed += ctx->vbi_parser_ctx.bytes;
@@ -1534,7 +1459,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t
ctx->state = PROPERTY_TYPE_STR_BIN_LEN;
break;
default:
- mws_error(log, "Unsupported property type %d for property id %d.", (int)ctx->tail->type, (int)ctx->tail->id);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Unsupported property type %d for property id %d.", (int)ctx->tail->type, (int)ctx->tail->id);
return MQTT_NG_CLIENT_PROTOCOL_ERROR;
}
break;
@@ -1552,7 +1477,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t
ctx->state = PROPERTY_TYPE_STR;
break;
default:
- mws_error(log, "Unexpected datatype in PROPERTY_TYPE_STR_BIN_LEN %d", (int)ctx->tail->type);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Unexpected datatype in PROPERTY_TYPE_STR_BIN_LEN %d", (int)ctx->tail->type);
return MQTT_NG_CLIENT_INTERNAL_ERROR;
}
break;
@@ -1577,7 +1502,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t
ctx->state = PROPERTY_NEXT;
break;
case PROPERTY_TYPE_VBI:
- rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data, log);
+ rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data);
if (rc == MQTT_NG_CLIENT_PARSE_DONE) {
ctx->tail->data.uint32 = ctx->vbi_parser_ctx.result;
ctx->bytes_consumed += ctx->vbi_parser_ctx.bytes;
@@ -1627,9 +1552,9 @@ static int parse_connack_varhdr(struct mqtt_ng_client *client)
mqtt_properties_parser_ctx_reset(&parser->properties_parser);
break;
case MQTT_PARSE_VARHDR_PROPS:
- return parse_properties_array(&parser->properties_parser, parser->received_data, client->log);
+ return parse_properties_array(&parser->properties_parser, parser->received_data);
default:
- ERROR("invalid state for connack varhdr parser");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for connack varhdr parser");
return MQTT_NG_CLIENT_INTERNAL_ERROR;
}
return MQTT_NG_CLIENT_OK_CALL_AGAIN;
@@ -1653,9 +1578,9 @@ static int parse_disconnect_varhdr(struct mqtt_ng_client *client)
mqtt_properties_parser_ctx_reset(&parser->properties_parser);
break;
case MQTT_PARSE_VARHDR_PROPS:
- return parse_properties_array(&parser->properties_parser, parser->received_data, client->log);
+ return parse_properties_array(&parser->properties_parser, parser->received_data);
default:
- ERROR("invalid state for connack varhdr parser");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for connack varhdr parser");
return MQTT_NG_CLIENT_INTERNAL_ERROR;
}
return MQTT_NG_CLIENT_OK_CALL_AGAIN;
@@ -1691,9 +1616,9 @@ static int parse_puback_varhdr(struct mqtt_ng_client *client)
mqtt_properties_parser_ctx_reset(&parser->properties_parser);
/* FALLTHROUGH */
case MQTT_PARSE_VARHDR_PROPS:
- return parse_properties_array(&parser->properties_parser, parser->received_data, client->log);
+ return parse_properties_array(&parser->properties_parser, parser->received_data);
default:
- ERROR("invalid state for puback varhdr parser");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for puback varhdr parser");
return MQTT_NG_CLIENT_INTERNAL_ERROR;
}
return MQTT_NG_CLIENT_OK_CALL_AGAIN;
@@ -1716,7 +1641,7 @@ static int parse_suback_varhdr(struct mqtt_ng_client *client)
mqtt_properties_parser_ctx_reset(&parser->properties_parser);
/* FALLTHROUGH */
case MQTT_PARSE_VARHDR_PROPS:
- rc = parse_properties_array(&parser->properties_parser, parser->received_data, client->log);
+ rc = parse_properties_array(&parser->properties_parser, parser->received_data);
if (rc != MQTT_NG_CLIENT_PARSE_DONE)
return rc;
parser->mqtt_parsed_len += parser->properties_parser.bytes_consumed;
@@ -1737,7 +1662,7 @@ static int parse_suback_varhdr(struct mqtt_ng_client *client)
return MQTT_NG_CLIENT_NEED_MORE_BYTES;
default:
- ERROR("invalid state for suback varhdr parser");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for suback varhdr parser");
return MQTT_NG_CLIENT_INTERNAL_ERROR;
}
return MQTT_NG_CLIENT_OK_CALL_AGAIN;
@@ -1761,8 +1686,6 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
break;
}
publish->topic = callocz(1, publish->topic_len + 1 /* add 0x00 */);
- if (publish->topic == NULL)
- return MQTT_NG_CLIENT_OOM;
parser->varhdr_state = MQTT_PARSE_VARHDR_TOPICNAME;
/* FALLTHROUGH */
case MQTT_PARSE_VARHDR_TOPICNAME:
@@ -1788,7 +1711,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
parser->mqtt_parsed_len += 2;
/* FALLTHROUGH */
case MQTT_PARSE_VARHDR_PROPS:
- rc = parse_properties_array(&parser->properties_parser, parser->received_data, client->log);
+ rc = parse_properties_array(&parser->properties_parser, parser->received_data);
if (rc != MQTT_NG_CLIENT_PARSE_DONE)
return rc;
parser->mqtt_parsed_len += parser->properties_parser.bytes_consumed;
@@ -1798,7 +1721,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
if (parser->mqtt_fixed_hdr_remaining_length < parser->mqtt_parsed_len) {
freez(publish->topic);
publish->topic = NULL;
- ERROR("Error parsing PUBLISH message");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Error parsing PUBLISH message");
return MQTT_NG_CLIENT_PROTOCOL_ERROR;
}
publish->data_len = parser->mqtt_fixed_hdr_remaining_length - parser->mqtt_parsed_len;
@@ -1809,18 +1732,12 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
BUF_READ_CHECK_AT_LEAST(parser->received_data, publish->data_len);
publish->data = mallocz(publish->data_len);
- if (publish->data == NULL) {
- freez(publish->topic);
- publish->topic = NULL;
- return MQTT_NG_CLIENT_OOM;
- }
-
rbuf_pop(parser->received_data, publish->data, publish->data_len);
parser->mqtt_parsed_len += publish->data_len;
return MQTT_NG_CLIENT_PARSE_DONE;
default:
- ERROR("invalid state for publish varhdr parser");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for publish varhdr parser");
return MQTT_NG_CLIENT_INTERNAL_ERROR;
}
return MQTT_NG_CLIENT_OK_CALL_AGAIN;
@@ -1840,7 +1757,7 @@ static int parse_data(struct mqtt_ng_client *client)
parser->state = MQTT_PARSE_FIXED_HEADER_LEN;
break;
case MQTT_PARSE_FIXED_HEADER_LEN:
- rc = vbi_parser_parse(&parser->vbi_parser, parser->received_data, client->log);
+ rc = vbi_parser_parse(&parser->vbi_parser, parser->received_data);
if (rc == MQTT_NG_CLIENT_PARSE_DONE) {
parser->mqtt_fixed_hdr_remaining_length = parser->vbi_parser.result;
parser->state = MQTT_PARSE_VARIABLE_HEADER;
@@ -1883,10 +1800,11 @@ static int parse_data(struct mqtt_ng_client *client)
return rc;
case MQTT_CPT_PINGRESP:
if (parser->mqtt_fixed_hdr_remaining_length) {
- ERROR ("PINGRESP has to be 0 Remaining Length."); // [MQTT-3.13.1]
+ nd_log(NDLS_DAEMON, NDLP_ERR, "PINGRESP has to be 0 Remaining Length."); // [MQTT-3.13.1]
return MQTT_NG_CLIENT_PROTOCOL_ERROR;
}
parser->state = MQTT_PARSE_MQTT_PACKET_DONE;
+ ping_timeout = 0;
break;
case MQTT_CPT_DISCONNECT:
rc = parse_disconnect_varhdr(client);
@@ -1896,7 +1814,7 @@ static int parse_data(struct mqtt_ng_client *client)
}
return rc;
default:
- ERROR("Parsing Control Packet Type %" PRIu8 " not implemented yet.", get_control_packet_type(parser->mqtt_control_packet_type));
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Parsing Control Packet Type %" PRIu8 " not implemented yet.", get_control_packet_type(parser->mqtt_control_packet_type));
rbuf_bump_tail(parser->received_data, parser->mqtt_fixed_hdr_remaining_length);
parser->state = MQTT_PARSE_MQTT_PACKET_DONE;
return MQTT_NG_CLIENT_NOT_IMPL_YET;
@@ -1916,12 +1834,12 @@ static int parse_data(struct mqtt_ng_client *client)
// return -1 on error
// return 0 if there is fragment set
static int mqtt_ng_next_to_send(struct mqtt_ng_client *client) {
- if (client->client_state == CONNECT_PENDING) {
+ if (client->client_state == MQTT_STATE_CONNECT_PENDING) {
client->main_buffer.sending_frag = client->connect_msg;
- client->client_state = CONNECTING;
+ client->client_state = MQTT_STATE_CONNECTING;
return 0;
}
- if (client->client_state != CONNECTED)
+ if (client->client_state != MQTT_STATE_CONNECTED)
return -1;
struct buffer_fragment *frag = BUFFER_FIRST_FRAG(&client->main_buffer.hdr_buffer);
@@ -1959,7 +1877,7 @@ static int send_fragment(struct mqtt_ng_client *client) {
if (bytes)
processed = client->send_fnc_ptr(client->user_ctx, ptr, bytes);
else
- WARN("This fragment was fully sent already. This should not happen!");
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "This fragment was fully sent already. This should not happen!");
frag->sent += processed;
if (frag->sent != frag->len)
@@ -1967,11 +1885,11 @@ static int send_fragment(struct mqtt_ng_client *client) {
if (frag->flags & BUFFER_FRAG_MQTT_PACKET_TAIL) {
client->time_of_last_send = time(NULL);
- pthread_mutex_lock(&client->stats_mutex);
+ spinlock_lock(&client->stats_spinlock);
if (client->main_buffer.sending_frag != &ping_frag)
client->stats.tx_messages_queued--;
client->stats.tx_messages_sent++;
- pthread_mutex_unlock(&client->stats_mutex);
+ spinlock_unlock(&client->stats_spinlock);
client->main_buffer.sending_frag = NULL;
return 1;
}
@@ -1995,7 +1913,7 @@ static void try_send_all(struct mqtt_ng_client *client) {
} while(send_all_message_fragments(client) >= 0);
}
-static inline void mark_message_for_gc(struct buffer_fragment *frag)
+static void mark_message_for_gc(struct buffer_fragment *frag)
{
while (frag) {
frag->flags |= BUFFER_FRAG_GARBAGE_COLLECT;
@@ -2013,7 +1931,7 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id)
while (frag) {
if ( (frag->flags & BUFFER_FRAG_MQTT_PACKET_HEAD) && frag->packet_id == packet_id) {
if (!frag->sent) {
- ERROR("Received packet_id (%" PRIu16 ") belongs to MQTT packet which was not yet sent!", packet_id);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Received packet_id (%" PRIu16 ") belongs to MQTT packet which was not yet sent!", packet_id);
UNLOCK_HDR_BUFFER(&client->main_buffer);
return 1;
}
@@ -2023,7 +1941,7 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id)
}
frag = frag->next;
}
- ERROR("Received packet_id (%" PRIu16 ") is unknown!", packet_id);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Received packet_id (%" PRIu16 ") is unknown!", packet_id);
UNLOCK_HDR_BUFFER(&client->main_buffer);
return 1;
}
@@ -2031,110 +1949,113 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id)
int handle_incoming_traffic(struct mqtt_ng_client *client)
{
int rc;
+ while ((rc = parse_data(client)) == MQTT_NG_CLIENT_OK_CALL_AGAIN) {
+ ;
+ }
+ if (rc != MQTT_NG_CLIENT_MQTT_PACKET_DONE)
+ return rc;
+
struct mqtt_publish *pub;
- while( (rc = parse_data(client)) == MQTT_NG_CLIENT_OK_CALL_AGAIN );
- if ( rc == MQTT_NG_CLIENT_MQTT_PACKET_DONE ) {
- struct mqtt_property *prop;
-#ifdef MQTT_DEBUG_VERBOSE
- DEBUG("MQTT Packet Parsed Successfully!");
-#endif
- pthread_mutex_lock(&client->stats_mutex);
- client->stats.rx_messages_rcvd++;
- pthread_mutex_unlock(&client->stats_mutex);
-
- switch (get_control_packet_type(client->parser.mqtt_control_packet_type)) {
- case MQTT_CPT_CONNACK:
-#ifdef MQTT_DEBUG_VERBOSE
- DEBUG("Received CONNACK");
-#endif
- LOCK_HDR_BUFFER(&client->main_buffer);
- mark_message_for_gc(client->connect_msg);
- UNLOCK_HDR_BUFFER(&client->main_buffer);
- client->connect_msg = NULL;
- if (client->client_state != CONNECTING) {
- ERROR("Received unexpected CONNACK");
- client->client_state = ERROR;
- return MQTT_NG_CLIENT_PROTOCOL_ERROR;
- }
- if ((prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_MAX_PKT_SIZE)) != NULL) {
- INFO("MQTT server limits message size to %" PRIu32, prop->data.uint32);
- client->max_msg_size = prop->data.uint32;
- }
- if (client->connack_callback)
- client->connack_callback(client->user_ctx, client->parser.mqtt_packet.connack.reason_code);
- if (!client->parser.mqtt_packet.connack.reason_code) {
- INFO("MQTT Connection Accepted By Server");
- client->client_state = CONNECTED;
- break;
- }
- client->client_state = ERROR;
- return MQTT_NG_CLIENT_SERVER_RETURNED_ERROR;
- case MQTT_CPT_PUBACK:
-#ifdef MQTT_DEBUG_VERBOSE
- DEBUG("Received PUBACK %" PRIu16, client->parser.mqtt_packet.puback.packet_id);
-#endif
- if (mark_packet_acked(client, client->parser.mqtt_packet.puback.packet_id))
- return MQTT_NG_CLIENT_PROTOCOL_ERROR;
- if (client->puback_callback)
- client->puback_callback(client->parser.mqtt_packet.puback.packet_id);
- break;
- case MQTT_CPT_PINGRESP:
-#ifdef MQTT_DEBUG_VERBOSE
- DEBUG("Received PINGRESP");
-#endif
- break;
- case MQTT_CPT_SUBACK:
-#ifdef MQTT_DEBUG_VERBOSE
- DEBUG("Received SUBACK %" PRIu16, client->parser.mqtt_packet.suback.packet_id);
-#endif
- if (mark_packet_acked(client, client->parser.mqtt_packet.suback.packet_id))
- return MQTT_NG_CLIENT_PROTOCOL_ERROR;
+ struct mqtt_property *prop;
+ spinlock_lock(&client->stats_spinlock);
+ client->stats.rx_messages_rcvd++;
+ spinlock_unlock(&client->stats_spinlock);
+
+ uint8_t ctrl_packet_type = get_control_packet_type(client->parser.mqtt_control_packet_type);
+ switch (ctrl_packet_type) {
+ case MQTT_CPT_CONNACK:
+ LOCK_HDR_BUFFER(&client->main_buffer);
+ mark_message_for_gc(client->connect_msg);
+ UNLOCK_HDR_BUFFER(&client->main_buffer);
+
+ client->connect_msg = NULL;
+
+ if (client->client_state != MQTT_STATE_CONNECTING) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Received unexpected CONNACK");
+ client->client_state = MQTT_STATE_ERROR;
+ return MQTT_NG_CLIENT_PROTOCOL_ERROR;
+ }
+
+ if ((prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_MAX_PKT_SIZE)) != NULL) {
+ nd_log(NDLS_DAEMON, NDLP_INFO, "MQTT server limits message size to %" PRIu32, prop->data.uint32);
+ client->max_msg_size = prop->data.uint32;
+ }
+
+ if (client->connack_callback)
+ client->connack_callback(client->user_ctx, client->parser.mqtt_packet.connack.reason_code);
+ if (!client->parser.mqtt_packet.connack.reason_code) {
+ nd_log(NDLS_DAEMON, NDLP_INFO, "MQTT Connection Accepted By Server");
+ client->client_state = MQTT_STATE_CONNECTED;
break;
- case MQTT_CPT_PUBLISH:
-#ifdef MQTT_DEBUG_VERBOSE
- DEBUG("Recevied PUBLISH");
-#endif
- pub = &client->parser.mqtt_packet.publish;
- if (pub->qos > 1) {
- freez(pub->topic);
- freez(pub->data);
- return MQTT_NG_CLIENT_NOT_IMPL_YET;
- }
- if ( pub->qos == 1 && (rc = mqtt_ng_puback(client, pub->packet_id, 0)) ) {
- client->client_state = ERROR;
- ERROR("Error generating PUBACK reply for PUBLISH");
- return rc;
- }
- if ( (prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_TOPIC_ALIAS)) != NULL ) {
- // Topic Alias property was sent from server
- void *topic_ptr;
- if (!c_rhash_get_ptr_by_uint64(client->rx_aliases, prop->data.uint8, &topic_ptr)) {
- if (pub->topic != NULL) {
- ERROR("We do not yet support topic alias reassignment");
- return MQTT_NG_CLIENT_NOT_IMPL_YET;
- }
- pub->topic = topic_ptr;
- } else {
- if (pub->topic == NULL) {
- ERROR("Topic alias with id %d unknown and topic not set by server!", prop->data.uint8);
- return MQTT_NG_CLIENT_PROTOCOL_ERROR;
- }
- c_rhash_insert_uint64_ptr(client->rx_aliases, prop->data.uint8, pub->topic);
+ }
+ client->client_state = MQTT_STATE_ERROR;
+ return MQTT_NG_CLIENT_SERVER_RETURNED_ERROR;
+
+ case MQTT_CPT_PUBACK:
+ if (mark_packet_acked(client, client->parser.mqtt_packet.puback.packet_id))
+ return MQTT_NG_CLIENT_PROTOCOL_ERROR;
+ if (client->puback_callback)
+ client->puback_callback(client->parser.mqtt_packet.puback.packet_id);
+ break;
+
+ case MQTT_CPT_PINGRESP:
+ break;
+
+ case MQTT_CPT_SUBACK:
+ if (mark_packet_acked(client, client->parser.mqtt_packet.suback.packet_id))
+ return MQTT_NG_CLIENT_PROTOCOL_ERROR;
+ break;
+
+ case MQTT_CPT_PUBLISH:
+ pub = &client->parser.mqtt_packet.publish;
+
+ if (pub->qos > 1) {
+ freez(pub->topic);
+ freez(pub->data);
+ return MQTT_NG_CLIENT_NOT_IMPL_YET;
+ }
+
+ if ( pub->qos == 1 && ((rc = mqtt_ng_puback(client, pub->packet_id, 0))) ) {
+ client->client_state = MQTT_STATE_ERROR;
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Error generating PUBACK reply for PUBLISH");
+ return rc;
+ }
+
+ if ( (prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_TOPIC_ALIAS)) != NULL ) {
+ // Topic Alias property was sent from server
+ void *topic_ptr;
+ if (!c_rhash_get_ptr_by_uint64(client->rx_aliases, prop->data.uint8, &topic_ptr)) {
+ if (pub->topic != NULL) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "We do not yet support topic alias reassignment");
+ return MQTT_NG_CLIENT_NOT_IMPL_YET;
}
+ pub->topic = topic_ptr;
+ } else {
+ if (pub->topic == NULL) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Topic alias with id %d unknown and topic not set by server!", prop->data.uint8);
+ return MQTT_NG_CLIENT_PROTOCOL_ERROR;
+ }
+ c_rhash_insert_uint64_ptr(client->rx_aliases, prop->data.uint8, pub->topic);
}
- if (client->msg_callback)
- client->msg_callback(pub->topic, pub->data, pub->data_len, pub->qos);
- // in case we have property topic alias and we have topic we take over the string
- // and add pointer to it into topic alias list
- if (prop == NULL)
- freez(pub->topic);
- freez(pub->data);
- return MQTT_NG_CLIENT_WANT_WRITE;
- case MQTT_CPT_DISCONNECT:
- INFO ("Got MQTT DISCONNECT control packet from server. Reason code: %d", (int)client->parser.mqtt_packet.disconnect.reason_code);
- client->client_state = DISCONNECTED;
- break;
- }
+ }
+
+ if (client->msg_callback)
+ client->msg_callback(pub->topic, pub->data, pub->data_len, pub->qos);
+ // in case we have property topic alias and we have topic we take over the string
+ // and add pointer to it into topic alias list
+ if (prop == NULL)
+ freez(pub->topic);
+ freez(pub->data);
+ return MQTT_NG_CLIENT_WANT_WRITE;
+
+ case MQTT_CPT_DISCONNECT:
+ nd_log(NDLS_DAEMON, NDLP_INFO, "Got MQTT DISCONNECT control packet from server. Reason code: %d", (int)client->parser.mqtt_packet.disconnect.reason_code);
+ client->client_state = MQTT_STATE_DISCONNECTED;
+ break;
+
+ default:
+ nd_log(NDLS_DAEMON, NDLP_INFO, "Got unknown control packet %u from server", ctrl_packet_type);
+ break;
}
return rc;
@@ -2142,10 +2063,10 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)
int mqtt_ng_sync(struct mqtt_ng_client *client)
{
- if (client->client_state == RAW || client->client_state == DISCONNECTED)
+ if (client->client_state == MQTT_STATE_RAW || client->client_state == MQTT_STATE_DISCONNECTED)
return 0;
- if (client->client_state == ERROR)
+ if (client->client_state == MQTT_STATE_ERROR)
return 1;
LOCK_HDR_BUFFER(&client->main_buffer);
@@ -2182,9 +2103,9 @@ void mqtt_ng_set_max_mem(struct mqtt_ng_client *client, size_t bytes)
void mqtt_ng_get_stats(struct mqtt_ng_client *client, struct mqtt_ng_stats *stats)
{
- pthread_mutex_lock(&client->stats_mutex);
+ spinlock_lock(&client->stats_spinlock);
memcpy(stats, &client->stats, sizeof(struct mqtt_ng_stats));
- pthread_mutex_unlock(&client->stats_mutex);
+ spinlock_unlock(&client->stats_spinlock);
stats->tx_bytes_queued = 0;
stats->tx_buffer_reclaimable = 0;
@@ -2207,11 +2128,11 @@ void mqtt_ng_get_stats(struct mqtt_ng_client *client, struct mqtt_ng_stats *stat
int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic)
{
uint16_t idx;
- pthread_rwlock_wrlock(&client->tx_topic_aliases.rwlock);
+ spinlock_lock(&client->tx_topic_aliases.spinlock);
if (client->tx_topic_aliases.idx_assigned >= client->tx_topic_aliases.idx_max) {
- pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock);
- mws_error(client->log, "Tx topic alias indexes were exhausted (current version of the library doesn't support reassigning yet. Feel free to contribute.");
+ spinlock_unlock(&client->tx_topic_aliases.spinlock);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Tx topic alias indexes were exhausted (current version of the library doesn't support reassigning yet. Feel free to contribute.");
return 0; //0 is not a valid topic alias
}
@@ -2220,8 +2141,8 @@ int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic)
// this is not a problem for library but might be helpful to warn user
// as it might indicate bug in their program (but also might be expected)
idx = alias->idx;
- pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock);
- mws_debug(client->log, "%s topic \"%s\" already has alias set. Ignoring.", __FUNCTION__, topic);
+ spinlock_unlock(&client->tx_topic_aliases.spinlock);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "%s topic \"%s\" already has alias set. Ignoring.", __FUNCTION__, topic);
return idx;
}
@@ -2232,6 +2153,6 @@ int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic)
c_rhash_insert_str_ptr(client->tx_topic_aliases.stoi_dict, topic, (void*)alias);
- pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock);
+ spinlock_unlock(&client->tx_topic_aliases.spinlock);
return idx;
}
diff --git a/src/aclk/mqtt_websockets/mqtt_ng.h b/src/aclk/mqtt_websockets/mqtt_ng.h
index 4b0584d58..c5f6d94cc 100644
--- a/src/aclk/mqtt_websockets/mqtt_ng.h
+++ b/src/aclk/mqtt_websockets/mqtt_ng.h
@@ -1,10 +1,5 @@
-// Copyright: SPDX-License-Identifier: GPL-3.0-only
+// SPDX-License-Identifier: GPL-3.0-or-later
-#include <stdint.h>
-#include <sys/types.h>
-#include <time.h>
-
-#include "c-rbuf/cringbuffer.h"
#include "common_public.h"
#define MQTT_NG_MSGGEN_OK 0
@@ -15,7 +10,7 @@
#define MQTT_NG_MSGGEN_MSG_TOO_BIG 3
struct mqtt_ng_client;
-
+extern time_t ping_timeout;
/* Converts integer to MQTT Variable Byte Integer as per 1.5.5 of MQTT 5 specs
* @param input value to be converted
* @param output pointer to memory where output will be written to. Must allow up to 4 bytes to be written.
@@ -72,7 +67,6 @@ int mqtt_ng_ping(struct mqtt_ng_client *client);
typedef ssize_t (*mqtt_ng_send_fnc_t)(void *user_ctx, const void* buf, size_t len);
struct mqtt_ng_init {
- mqtt_wss_log_ctx_t log;
rbuf_t data_in;
mqtt_ng_send_fnc_t data_out_fnc;
void *user_ctx;
diff --git a/src/aclk/mqtt_websockets/mqtt_wss_client.c b/src/aclk/mqtt_websockets/mqtt_wss_client.c
index bb0e17262..5c576ced5 100644
--- a/src/aclk/mqtt_websockets/mqtt_wss_client.c
+++ b/src/aclk/mqtt_websockets/mqtt_wss_client.c
@@ -1,32 +1,24 @@
-// SPDX-License-Identifier: GPL-3.0-only
+// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
+#include "libnetdata/libnetdata.h"
#include "mqtt_wss_client.h"
#include "mqtt_ng.h"
#include "ws_client.h"
#include "common_internal.h"
-
-#include <stdlib.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <poll.h>
-#include <string.h>
-#include <time.h>
-
-#include <sys/socket.h>
-#include <netinet/in.h>
-
-#include <openssl/err.h>
-#include <openssl/ssl.h>
+#include "../aclk.h"
#define PIPE_READ_END 0
#define PIPE_WRITE_END 1
#define POLLFD_SOCKET 0
#define POLLFD_PIPE 1
+#define PING_TIMEOUT (60) //Expect a ping response within this time (seconds)
+time_t ping_timeout = 0;
+
#if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) && (SSLEAY_VERSION_NUMBER >= OPENSSL_VERSION_097)
#include <openssl/conf.h>
#endif
@@ -69,6 +61,8 @@ char *util_openssl_ret_err(int err)
return "SSL_ERROR_SYSCALL";
case SSL_ERROR_SSL:
return "SSL_ERROR_SSL";
+ default:
+ break;
}
return "UNKNOWN";
}
@@ -76,8 +70,6 @@ char *util_openssl_ret_err(int err)
struct mqtt_wss_client_struct {
ws_client *ws_client;
- mqtt_wss_log_ctx_t log;
-
// immediate connection (e.g. proxy server)
char *host;
int port;
@@ -129,69 +121,49 @@ static void mws_connack_callback_ng(void *user_ctx, int code)
switch(code) {
case 0:
client->mqtt_connected = 1;
- return;
+ break;
//TODO manual labor: all the CONNACK error codes with some nice error message
default:
- mws_error(client->log, "MQTT CONNACK returned error %d", code);
- return;
+ nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT CONNACK returned error %d", code);
+ break;
}
}
static ssize_t mqtt_send_cb(void *user_ctx, const void* buf, size_t len)
{
mqtt_wss_client client = user_ctx;
-#ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(client->log, "mqtt_pal_sendall(len=%d)", len);
-#endif
int ret = ws_client_send(client->ws_client, WS_OP_BINARY_FRAME, buf, len);
- if (ret >= 0 && (size_t)ret != len) {
-#ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(client->log, "Not complete message sent (Msg=%d,Sent=%d). Need to arm POLLOUT!", len, ret);
-#endif
+ if (ret >= 0 && (size_t)ret != len)
client->mqtt_didnt_finish_write = 1;
- }
return ret;
}
-mqtt_wss_client mqtt_wss_new(const char *log_prefix,
- mqtt_wss_log_callback_t log_callback,
- msg_callback_fnc_t msg_callback,
- void (*puback_callback)(uint16_t packet_id))
+mqtt_wss_client mqtt_wss_new(
+ msg_callback_fnc_t msg_callback,
+ void (*puback_callback)(uint16_t packet_id))
{
- mqtt_wss_log_ctx_t log;
-
- log = mqtt_wss_log_ctx_create(log_prefix, log_callback);
- if(!log)
- return NULL;
-
SSL_library_init();
SSL_load_error_strings();
mqtt_wss_client client = callocz(1, sizeof(struct mqtt_wss_client_struct));
- if (!client) {
- mws_error(log, "OOM alocating mqtt_wss_client");
- goto fail;
- }
spinlock_init(&client->stat_lock);
client->msg_callback = msg_callback;
client->puback_callback = puback_callback;
- client->ws_client = ws_client_new(0, &client->target_host, log);
+ client->ws_client = ws_client_new(0, &client->target_host);
if (!client->ws_client) {
- mws_error(log, "Error creating ws_client");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Error creating ws_client");
goto fail_1;
}
- client->log = log;
-
#ifdef __APPLE__
if (pipe(client->write_notif_pipe)) {
#else
if (pipe2(client->write_notif_pipe, O_CLOEXEC /*| O_DIRECT*/)) {
#endif
- mws_error(log, "Couldn't create pipe");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Couldn't create pipe");
goto fail_2;
}
@@ -201,7 +173,6 @@ mqtt_wss_client mqtt_wss_new(const char *log_prefix,
client->poll_fds[POLLFD_SOCKET].events = POLLIN;
struct mqtt_ng_init settings = {
- .log = log,
.data_in = client->ws_client->buf_to_mqtt,
.data_out_fnc = &mqtt_send_cb,
.user_ctx = client,
@@ -209,22 +180,14 @@ mqtt_wss_client mqtt_wss_new(const char *log_prefix,
.puback_callback = puback_callback,
.msg_callback = msg_callback
};
- if ( (client->mqtt = mqtt_ng_init(&settings)) == NULL ) {
- mws_error(log, "Error initializing internal MQTT client");
- goto fail_3;
- }
+ client->mqtt = mqtt_ng_init(&settings);
return client;
-fail_3:
- close(client->write_notif_pipe[PIPE_WRITE_END]);
- close(client->write_notif_pipe[PIPE_READ_END]);
fail_2:
ws_client_destroy(client->ws_client);
fail_1:
freez(client);
-fail:
- mqtt_wss_log_ctx_destroy(log);
return NULL;
}
@@ -265,30 +228,25 @@ void mqtt_wss_destroy(mqtt_wss_client client)
if (client->sockfd > 0)
close(client->sockfd);
- mqtt_wss_log_ctx_destroy(client->log);
freez(client);
}
static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
{
- SSL *ssl;
- X509 *err_cert;
- mqtt_wss_client client;
- int err = 0, depth;
- char *err_str;
+ int err = 0;
- ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx());
- client = SSL_get_ex_data(ssl, 0);
+ SSL* ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx());
+ mqtt_wss_client client = SSL_get_ex_data(ssl, 0);
// TODO handle depth as per https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set_verify.html
if (!preverify_ok) {
err = X509_STORE_CTX_get_error(ctx);
- depth = X509_STORE_CTX_get_error_depth(ctx);
- err_cert = X509_STORE_CTX_get_current_cert(ctx);
- err_str = X509_NAME_oneline(X509_get_subject_name(err_cert), NULL, 0);
+ int depth = X509_STORE_CTX_get_error_depth(ctx);
+ X509* err_cert = X509_STORE_CTX_get_current_cert(ctx);
+ char* err_str = X509_NAME_oneline(X509_get_subject_name(err_cert), NULL, 0);
- mws_error(client->log, "verify error:num=%d:%s:depth=%d:%s", err,
+ nd_log(NDLS_DAEMON, NDLP_ERR, "verify error:num=%d:%s:depth=%d:%s", err,
X509_verify_cert_error_string(err), depth, err_str);
freez(err_str);
@@ -298,7 +256,7 @@ static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
client->ssl_flags & MQTT_WSS_SSL_ALLOW_SELF_SIGNED)
{
preverify_ok = 1;
- mws_error(client->log, "Self Signed Certificate Accepted as the connection was "
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Self Signed Certificate Accepted as the connection was "
"requested with MQTT_WSS_SSL_ALLOW_SELF_SIGNED");
}
@@ -312,16 +270,14 @@ static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
#define HTTP_HDR_TERMINATOR "\x0D\x0A\x0D\x0A"
#define HTTP_CODE_LEN 4
#define HTTP_REASON_MAX_LEN 512
-static int http_parse_reply(mqtt_wss_client client, rbuf_t buf)
+static int http_parse_reply(rbuf_t buf)
{
- char *ptr;
char http_code_s[4];
- int http_code;
int idx;
if (rbuf_memcmp_n(buf, PROXY_HTTP, strlen(PROXY_HTTP))) {
if (rbuf_memcmp_n(buf, PROXY_HTTP10, strlen(PROXY_HTTP10))) {
- mws_error(client->log, "http_proxy expected reply with \"" PROXY_HTTP "\" or \"" PROXY_HTTP10 "\"");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy expected reply with \"" PROXY_HTTP "\" or \"" PROXY_HTTP10 "\"");
return 1;
}
}
@@ -329,39 +285,37 @@ static int http_parse_reply(mqtt_wss_client client, rbuf_t buf)
rbuf_bump_tail(buf, strlen(PROXY_HTTP));
if (!rbuf_pop(buf, http_code_s, 1) || http_code_s[0] != 0x20) {
- mws_error(client->log, "http_proxy missing space after \"" PROXY_HTTP "\" or \"" PROXY_HTTP10 "\"");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy missing space after \"" PROXY_HTTP "\" or \"" PROXY_HTTP10 "\"");
return 2;
}
if (!rbuf_pop(buf, http_code_s, HTTP_CODE_LEN)) {
- mws_error(client->log, "http_proxy missing HTTP code");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy missing HTTP code");
return 3;
}
for (int i = 0; i < HTTP_CODE_LEN - 1; i++)
if (http_code_s[i] > 0x39 || http_code_s[i] < 0x30) {
- mws_error(client->log, "http_proxy HTTP code non numeric");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy HTTP code non numeric");
return 4;
}
http_code_s[HTTP_CODE_LEN - 1] = 0;
- http_code = atoi(http_code_s);
+ int http_code = str2i(http_code_s);
// TODO check if we ever have more headers here
rbuf_find_bytes(buf, HTTP_ENDLINE, strlen(HTTP_ENDLINE), &idx);
if (idx >= HTTP_REASON_MAX_LEN) {
- mws_error(client->log, "http_proxy returned reason that is too long");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy returned reason that is too long");
return 5;
}
if (http_code != 200) {
- ptr = mallocz(idx + 1);
- if (!ptr)
- return 6;
+ char *ptr = mallocz(idx + 1);
rbuf_pop(buf, ptr, idx);
ptr[idx] = 0;
- mws_error(client->log, "http_proxy returned error code %d \"%s\"", http_code, ptr);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy returned error code %d \"%s\"", http_code, ptr);
freez(ptr);
return 7;
}/* else
@@ -374,52 +328,11 @@ static int http_parse_reply(mqtt_wss_client client, rbuf_t buf)
rbuf_bump_tail(buf, strlen(HTTP_HDR_TERMINATOR));
if (rbuf_bytes_available(buf)) {
- mws_error(client->log, "http_proxy unexpected trailing bytes after end of HTTP hdr");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy unexpected trailing bytes after end of HTTP hdr");
return 8;
}
- mws_debug(client->log, "http_proxy CONNECT succeeded");
- return 0;
-}
-
-#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110
-static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void)
-{
- EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx));
-
- if (ctx != NULL) {
- memset(ctx, 0, sizeof(*ctx));
- }
- return ctx;
-}
-static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx)
-{
- OPENSSL_free(ctx);
- return;
-}
-#endif
-
-inline static int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len)
-{
- int len;
- unsigned char *str = out;
- EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new();
- EVP_EncodeInit(ctx);
- EVP_EncodeUpdate(ctx, str, outl, in, in_len);
- str += *outl;
- EVP_EncodeFinal(ctx, str, &len);
- *outl += len;
-
- str = out;
- while(*str) {
- if (*str != 0x0D && *str != 0x0A)
- *out++ = *str++;
- else
- str++;
- }
- *out = 0;
-
- EVP_ENCODE_CTX_free(ctx);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "http_proxy CONNECT succeeded");
return 0;
}
@@ -430,13 +343,12 @@ static int http_proxy_connect(mqtt_wss_client client)
rbuf_t r_buf = rbuf_create(4096);
if (!r_buf)
return 1;
- char *r_buf_ptr;
size_t r_buf_linear_insert_capacity;
poll_fd.fd = client->sockfd;
poll_fd.events = POLLIN;
- r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity);
+ char *r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity);
snprintf(r_buf_ptr, r_buf_linear_insert_capacity,"%s %s:%d %s" HTTP_ENDLINE "Host: %s" HTTP_ENDLINE, PROXY_CONNECT,
client->target_host, client->target_port, PROXY_HTTP, client->target_host);
write(client->sockfd, r_buf_ptr, strlen(r_buf_ptr));
@@ -445,7 +357,7 @@ static int http_proxy_connect(mqtt_wss_client client)
size_t creds_plain_len = strlen(client->proxy_uname) + strlen(client->proxy_passwd) + 2;
char *creds_plain = mallocz(creds_plain_len);
if (!creds_plain) {
- mws_error(client->log, "OOM creds_plain");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "OOM creds_plain");
rc = 6;
goto cleanup;
}
@@ -456,7 +368,7 @@ static int http_proxy_connect(mqtt_wss_client client)
char *creds_base64 = mallocz(creds_base64_len + 1);
if (!creds_base64) {
freez(creds_plain);
- mws_error(client->log, "OOM creds_base64");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "OOM creds_base64");
rc = 6;
goto cleanup;
}
@@ -466,8 +378,7 @@ static int http_proxy_connect(mqtt_wss_client client)
*ptr++ = ':';
strcpy(ptr, client->proxy_passwd);
- int b64_len;
- base64_encode_helper((unsigned char*)creds_base64, &b64_len, (unsigned char*)creds_plain, strlen(creds_plain));
+ (void) netdata_base64_encode((unsigned char*)creds_base64, (unsigned char*)creds_plain, strlen(creds_plain));
freez(creds_plain);
r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity);
@@ -482,13 +393,13 @@ static int http_proxy_connect(mqtt_wss_client client)
// or timeout
while ((rc = poll(&poll_fd, 1, 1000)) >= 0) {
if (!rc) {
- mws_error(client->log, "http_proxy timeout waiting reply from proxy server");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy timeout waiting reply from proxy server");
rc = 2;
goto cleanup;
}
r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity);
if (!r_buf_ptr) {
- mws_error(client->log, "http_proxy read ring buffer full");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy read ring buffer full");
rc = 3;
goto cleanup;
}
@@ -496,20 +407,20 @@ static int http_proxy_connect(mqtt_wss_client client)
if (errno == EWOULDBLOCK || errno == EAGAIN) {
continue;
}
- mws_error(client->log, "http_proxy error reading from socket \"%s\"", strerror(errno));
+ nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy error reading from socket \"%s\"", strerror(errno));
rc = 4;
goto cleanup;
}
rbuf_bump_head(r_buf, rc);
if (rbuf_find_bytes(r_buf, HTTP_HDR_TERMINATOR, strlen(HTTP_HDR_TERMINATOR), &rc)) {
rc = 0;
- if (http_parse_reply(client, r_buf))
+ if (http_parse_reply(r_buf))
rc = 5;
goto cleanup;
}
}
- mws_error(client->log, "proxy negotiation poll error \"%s\"", strerror(errno));
+ nd_log(NDLS_DAEMON, NDLP_ERR, "proxy negotiation poll error \"%s\"", strerror(errno));
rc = 5;
cleanup:
rbuf_free(r_buf);
@@ -522,11 +433,11 @@ int mqtt_wss_connect(
int port,
struct mqtt_connect_params *mqtt_params,
int ssl_flags,
- struct mqtt_wss_proxy *proxy,
+ const struct mqtt_wss_proxy *proxy,
bool *fallback_ipv4)
{
if (!mqtt_params) {
- mws_error(client->log, "mqtt_params can't be null!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "mqtt_params can't be null!");
return -1;
}
@@ -583,7 +494,7 @@ int mqtt_wss_connect(
struct timeval timeout = { .tv_sec = 10, .tv_usec = 0 };
int fd = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, client->host, 0, port_str, &timeout, fallback_ipv4);
if (fd < 0) {
- mws_error(client->log, "Could not connect to remote endpoint \"%s\", port %d.\n", client->host, port);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Could not connect to remote endpoint \"%s\", port %d.\n", client->host, port);
return -3;
}
@@ -598,12 +509,12 @@ int mqtt_wss_connect(
int flag = 1;
int result = setsockopt(client->sockfd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int));
if (result < 0)
- mws_error(client->log, "Could not dissable NAGLE");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Could not dissable NAGLE");
client->poll_fds[POLLFD_SOCKET].fd = client->sockfd;
if (fcntl(client->sockfd, F_SETFL, fcntl(client->sockfd, F_GETFL, 0) | O_NONBLOCK) == -1) {
- mws_error(client->log, "Error setting O_NONBLOCK to TCP socket. \"%s\"", strerror(errno));
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Error setting O_NONBLOCK to TCP socket. \"%s\"", strerror(errno));
return -8;
}
@@ -619,7 +530,7 @@ int mqtt_wss_connect(
SSL_library_init();
#else
if (OPENSSL_init_ssl(OPENSSL_INIT_LOAD_CONFIG, NULL) != 1) {
- mws_error(client->log, "Failed to initialize SSL");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to initialize SSL");
return -1;
};
#endif
@@ -636,7 +547,7 @@ int mqtt_wss_connect(
SSL_CTX_set_default_verify_paths(client->ssl_ctx);
SSL_CTX_set_verify(client->ssl_ctx, SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE, cert_verify_callback);
} else
- mws_error(client->log, "SSL Certificate checking completely disabled!!!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "SSL Certificate checking completely disabled!!!");
#ifdef MQTT_WSS_DEBUG
if(client->ssl_ctx_keylog_cb)
@@ -646,7 +557,7 @@ int mqtt_wss_connect(
client->ssl = SSL_new(client->ssl_ctx);
if (!(client->ssl_flags & MQTT_WSS_SSL_DONT_CHECK_CERTS)) {
if (!SSL_set_ex_data(client->ssl, 0, client)) {
- mws_error(client->log, "Could not SSL_set_ex_data");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Could not SSL_set_ex_data");
return -4;
}
}
@@ -654,27 +565,27 @@ int mqtt_wss_connect(
SSL_set_connect_state(client->ssl);
if (!SSL_set_tlsext_host_name(client->ssl, client->target_host)) {
- mws_error(client->log, "Error setting TLS SNI host");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Error setting TLS SNI host");
return -7;
}
result = SSL_connect(client->ssl);
if (result != -1 && result != 1) {
- mws_error(client->log, "SSL could not connect");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "SSL could not connect");
return -5;
}
if (result == -1) {
int ec = SSL_get_error(client->ssl, result);
if (ec != SSL_ERROR_WANT_READ && ec != SSL_ERROR_WANT_WRITE) {
- mws_error(client->log, "Failed to start SSL connection");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to start SSL connection");
return -6;
}
}
client->mqtt_keepalive = (mqtt_params->keep_alive ? mqtt_params->keep_alive : 400);
- mws_info(client->log, "Going to connect using internal MQTT 5 implementation");
+ nd_log(NDLS_DAEMON, NDLP_INFO, "Going to connect using internal MQTT 5 implementation");
struct mqtt_auth_properties auth;
auth.client_id = (char*)mqtt_params->clientid;
auth.client_id_free = NULL;
@@ -694,7 +605,7 @@ int mqtt_wss_connect(
int ret = mqtt_ng_connect(client->mqtt, &auth, mqtt_params->will_msg ? &lwt : NULL, 1, client->mqtt_keepalive);
if (ret) {
- mws_error(client->log, "Error generating MQTT connect");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Error generating MQTT connect");
return 1;
}
@@ -703,7 +614,7 @@ int mqtt_wss_connect(
// wait till MQTT connection is established
while (!client->mqtt_connected) {
if(mqtt_wss_service(client, -1)) {
- mws_error(client->log, "Error connecting to MQTT WSS server \"%s\", port %d.", host, port);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Error connecting to MQTT WSS server \"%s\", port %d.", host, port);
return 2;
}
}
@@ -716,14 +627,14 @@ int mqtt_wss_connect(
#define NSEC_PER_MSEC 1000000ULL
#define NSEC_PER_SEC 1000000000ULL
-static inline uint64_t boottime_usec(mqtt_wss_client client) {
+static uint64_t boottime_usec(void) {
struct timespec ts;
#if defined(__APPLE__) || defined(__FreeBSD__)
if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1) {
#else
if (clock_gettime(CLOCK_BOOTTIME, &ts) == -1) {
#endif
- mws_error(client->log, "clock_gettimte failed");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "clock_gettimte failed");
return 0;
}
return (uint64_t)ts.tv_sec * USEC_PER_SEC + (ts.tv_nsec % NSEC_PER_SEC) / NSEC_PER_USEC;
@@ -732,7 +643,7 @@ static inline uint64_t boottime_usec(mqtt_wss_client client) {
#define MWS_TIMED_OUT 1
#define MWS_ERROR 2
#define MWS_OK 0
-static inline const char *mqtt_wss_error_tos(int ec)
+static const char *mqtt_wss_error_tos(int ec)
{
switch(ec) {
case MWS_TIMED_OUT:
@@ -745,13 +656,12 @@ static inline const char *mqtt_wss_error_tos(int ec)
}
-static inline int mqtt_wss_service_all(mqtt_wss_client client, int timeout_ms)
+static int mqtt_wss_service_all(mqtt_wss_client client, int timeout_ms)
{
- uint64_t exit_by = boottime_usec(client) + (timeout_ms * NSEC_PER_MSEC);
- uint64_t now;
+ uint64_t exit_by = boottime_usec() + (timeout_ms * NSEC_PER_MSEC);
client->poll_fds[POLLFD_SOCKET].events |= POLLOUT; // TODO when entering mwtt_wss_service use out buffer size to arm POLLOUT
while (rbuf_bytes_available(client->ws_client->buf_write)) {
- now = boottime_usec(client);
+ const uint64_t now = boottime_usec();
if (now >= exit_by)
return MWS_TIMED_OUT;
if (mqtt_wss_service(client, exit_by - now))
@@ -762,15 +672,13 @@ static inline int mqtt_wss_service_all(mqtt_wss_client client, int timeout_ms)
void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms)
{
- int ret;
-
// block application from sending more MQTT messages
client->mqtt_disconnecting = 1;
// send whatever was left at the time of calling this function
- ret = mqtt_wss_service_all(client, timeout_ms / 4);
+ int ret = mqtt_wss_service_all(client, timeout_ms / 4);
if(ret)
- mws_error(client->log,
+ nd_log(NDLS_DAEMON, NDLP_ERR,
"Error while trying to send all remaining data in an attempt "
"to gracefully disconnect! EC=%d Desc:\"%s\"",
ret,
@@ -782,7 +690,7 @@ void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms)
ret = mqtt_wss_service_all(client, timeout_ms / 4);
if(ret)
- mws_error(client->log,
+ nd_log(NDLS_DAEMON, NDLP_ERR,
"Error while trying to send MQTT disconnect message in an attempt "
"to gracefully disconnect! EC=%d Desc:\"%s\"",
ret,
@@ -795,7 +703,7 @@ void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms)
if(ret) {
// Some MQTT/WSS servers will close socket on receipt of MQTT disconnect and
// do not wait for WebSocket to be closed properly
- mws_warn(client->log,
+ nd_log(NDLS_DAEMON, NDLP_WARNING,
"Error while trying to send WebSocket disconnect message in an attempt "
"to gracefully disconnect! EC=%d Desc:\"%s\".",
ret,
@@ -810,22 +718,19 @@ void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms)
client->sockfd = -1;
}
-static inline void mqtt_wss_wakeup(mqtt_wss_client client)
+static void mqtt_wss_wakeup(mqtt_wss_client client)
{
-#ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(client->log, "mqtt_wss_wakup - forcing wake up of main loop");
-#endif
write(client->write_notif_pipe[PIPE_WRITE_END], " ", 1);
}
#define THROWAWAY_BUF_SIZE 32
char throwaway[THROWAWAY_BUF_SIZE];
-static inline void util_clear_pipe(int fd)
+static void util_clear_pipe(int fd)
{
(void)read(fd, throwaway, THROWAWAY_BUF_SIZE);
}
-static inline void set_socket_pollfds(mqtt_wss_client client, int ssl_ret) {
+static void set_socket_pollfds(mqtt_wss_client client, int ssl_ret) {
if (ssl_ret == SSL_ERROR_WANT_WRITE)
client->poll_fds[POLLFD_SOCKET].events |= POLLOUT;
if (ssl_ret == SSL_ERROR_WANT_READ)
@@ -836,27 +741,25 @@ static int handle_mqtt_internal(mqtt_wss_client client)
{
int rc = mqtt_ng_sync(client->mqtt);
if (rc) {
- mws_error(client->log, "mqtt_ng_sync returned %d != 0", rc);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "mqtt_ng_sync returned %d != 0", rc);
client->mqtt_connected = 0;
return 1;
}
return 0;
}
-#define SEC_TO_MSEC 1000
-static inline long long int t_till_next_keepalive_ms(mqtt_wss_client client)
+static int t_till_next_keepalive_ms(mqtt_wss_client client)
{
time_t last_send = mqtt_ng_last_send_time(client->mqtt);
- long long int next_mqtt_keep_alive = (last_send * SEC_TO_MSEC)
- + (client->mqtt_keepalive * (SEC_TO_MSEC * 0.75 /* SEND IN ADVANCE */));
- return(next_mqtt_keep_alive - (time(NULL) * SEC_TO_MSEC));
+ time_t next_mqtt_keep_alive = last_send + client->mqtt_keepalive * 0.75;
+ return ((next_mqtt_keep_alive - now_realtime_sec()) * MSEC_PER_SEC);
}
#ifdef MQTT_WSS_CPUSTATS
-static inline uint64_t mqtt_wss_now_usec(mqtt_wss_client client) {
+static uint64_t mqtt_wss_now_usec(void) {
struct timespec ts;
if(clock_gettime(CLOCK_MONOTONIC, &ts) == -1) {
- mws_error(client->log, "clock_gettime(CLOCK_MONOTONIC, &timespec) failed.");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "clock_gettime(CLOCK_MONOTONIC, &timespec) failed.");
return 0;
}
return (uint64_t)ts.tv_sec * USEC_PER_SEC + (ts.tv_nsec % NSEC_PER_SEC) / NSEC_PER_USEC;
@@ -871,63 +774,51 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
int send_keepalive = 0;
#ifdef MQTT_WSS_CPUSTATS
- uint64_t t1,t2;
- t1 = mqtt_wss_now_usec(client);
-#endif
-
-#ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(client->log, ">>>>> mqtt_wss_service <<<<<");
- mws_debug(client->log, "Waiting for events: %s%s%s",
- (client->poll_fds[POLLFD_SOCKET].events & POLLIN) ? "SOCKET_POLLIN " : "",
- (client->poll_fds[POLLFD_SOCKET].events & POLLOUT) ? "SOCKET_POLLOUT " : "",
- (client->poll_fds[POLLFD_PIPE].events & POLLIN) ? "PIPE_POLLIN" : "" );
+ uint64_t t2;
+ uint64_t t1 = mqtt_wss_now_usec();
#endif
// Check user requested TO doesn't interfere with MQTT keep alives
- long long int till_next_keep_alive = t_till_next_keepalive_ms(client);
- if (client->mqtt_connected && (timeout_ms < 0 || timeout_ms >= till_next_keep_alive)) {
- #ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(client->log, "Shortening Timeout requested %d to %lld to ensure keep-alive can be sent", timeout_ms, till_next_keep_alive);
- #endif
- timeout_ms = till_next_keep_alive;
- send_keepalive = 1;
+ if (!ping_timeout) {
+ int till_next_keep_alive = t_till_next_keepalive_ms(client);
+ if (till_next_keep_alive < 0)
+ till_next_keep_alive = 0;
+ if (client->mqtt_connected && (timeout_ms < 0 || timeout_ms >= till_next_keep_alive)) {
+ timeout_ms = till_next_keep_alive;
+ send_keepalive = 1;
+ }
}
#ifdef MQTT_WSS_CPUSTATS
- t2 = mqtt_wss_now_usec(client);
+ t2 = mqtt_wss_now_usec();
client->stats.time_keepalive += t2 - t1;
#endif
if ((ret = poll(client->poll_fds, 2, timeout_ms >= 0 ? timeout_ms : -1)) < 0) {
if (errno == EINTR) {
- mws_warn(client->log, "poll interrupted by EINTR");
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "poll interrupted by EINTR");
return 0;
}
- mws_error(client->log, "poll error \"%s\"", strerror(errno));
+ nd_log(NDLS_DAEMON, NDLP_ERR, "poll error \"%s\"", strerror(errno));
return -2;
}
-#ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(client->log, "Poll events happened: %s%s%s%s",
- (client->poll_fds[POLLFD_SOCKET].revents & POLLIN) ? "SOCKET_POLLIN " : "",
- (client->poll_fds[POLLFD_SOCKET].revents & POLLOUT) ? "SOCKET_POLLOUT " : "",
- (client->poll_fds[POLLFD_PIPE].revents & POLLIN) ? "PIPE_POLLIN " : "",
- (!ret) ? "POLL_TIMEOUT" : "");
-#endif
-
#ifdef MQTT_WSS_CPUSTATS
- t1 = mqtt_wss_now_usec(client);
+ t1 = mqtt_wss_now_usec();
#endif
if (ret == 0) {
+ time_t now = now_realtime_sec();
if (send_keepalive) {
// otherwise we shortened the timeout ourselves to take care of
// MQTT keep alives
-#ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(client->log, "Forcing MQTT Ping/keep-alive");
-#endif
mqtt_ng_ping(client->mqtt);
+ ping_timeout = now + PING_TIMEOUT;
} else {
+ if (ping_timeout && ping_timeout < now) {
+ disconnect_req = ACLK_PING_TIMEOUT;
+ ping_timeout = 0;
+ }
// if poll timed out and user requested timeout was being used
// return here let user do his work and he will call us back soon
return 0;
@@ -935,7 +826,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
}
#ifdef MQTT_WSS_CPUSTATS
- t2 = mqtt_wss_now_usec(client);
+ t2 = mqtt_wss_now_usec();
client->stats.time_keepalive += t2 - t1;
#endif
@@ -943,9 +834,6 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
if ((ptr = rbuf_get_linear_insert_range(client->ws_client->buf_read, &size))) {
if((ret = SSL_read(client->ssl, ptr, size)) > 0) {
-#ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(client->log, "SSL_Read: Read %d.", ret);
-#endif
spinlock_lock(&client->stat_lock);
client->stats.bytes_rx += ret;
spinlock_unlock(&client->stat_lock);
@@ -953,22 +841,19 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
} else {
int errnobkp = errno;
ret = SSL_get_error(client->ssl, ret);
-#ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(client->log, "Read Err: %s", util_openssl_ret_err(ret));
-#endif
set_socket_pollfds(client, ret);
if (ret != SSL_ERROR_WANT_READ &&
ret != SSL_ERROR_WANT_WRITE) {
- mws_error(client->log, "SSL_read error: %d %s", ret, util_openssl_ret_err(ret));
+ nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_read error: %d %s", ret, util_openssl_ret_err(ret));
if (ret == SSL_ERROR_SYSCALL)
- mws_error(client->log, "SSL_read SYSCALL errno: %d %s", errnobkp, strerror(errnobkp));
+ nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_read SYSCALL errno: %d %s", errnobkp, strerror(errnobkp));
return MQTT_WSS_ERR_CONN_DROP;
}
}
}
#ifdef MQTT_WSS_CPUSTATS
- t1 = mqtt_wss_now_usec(client);
+ t1 = mqtt_wss_now_usec();
client->stats.time_read_socket += t1 - t2;
#endif
@@ -976,18 +861,20 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
switch(ret) {
case WS_CLIENT_PROTOCOL_ERROR:
return MQTT_WSS_ERR_PROTO_WS;
+
case WS_CLIENT_NEED_MORE_BYTES:
-#ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(client->log, "WSCLIENT WANT READ");
-#endif
client->poll_fds[POLLFD_SOCKET].events |= POLLIN;
break;
+
case WS_CLIENT_CONNECTION_CLOSED:
return MQTT_WSS_ERR_CONN_DROP;
+
+ default:
+ return MQTT_WSS_ERR_PROTO_WS;
}
#ifdef MQTT_WSS_CPUSTATS
- t2 = mqtt_wss_now_usec(client);
+ t2 = mqtt_wss_now_usec();
client->stats.time_process_websocket += t2 - t1;
#endif
@@ -1002,18 +889,12 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
}
#ifdef MQTT_WSS_CPUSTATS
- t1 = mqtt_wss_now_usec(client);
+ t1 = mqtt_wss_now_usec();
client->stats.time_process_mqtt += t1 - t2;
#endif
if ((ptr = rbuf_get_linear_read_range(client->ws_client->buf_write, &size))) {
-#ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(client->log, "Have data to write to SSL");
-#endif
if ((ret = SSL_write(client->ssl, ptr, size)) > 0) {
-#ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(client->log, "SSL_Write: Written %d of avail %d.", ret, size);
-#endif
spinlock_lock(&client->stat_lock);
client->stats.bytes_tx += ret;
spinlock_unlock(&client->stat_lock);
@@ -1021,15 +902,12 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
} else {
int errnobkp = errno;
ret = SSL_get_error(client->ssl, ret);
-#ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(client->log, "Write Err: %s", util_openssl_ret_err(ret));
-#endif
set_socket_pollfds(client, ret);
if (ret != SSL_ERROR_WANT_READ &&
ret != SSL_ERROR_WANT_WRITE) {
- mws_error(client->log, "SSL_write error: %d %s", ret, util_openssl_ret_err(ret));
+ nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_write error: %d %s", ret, util_openssl_ret_err(ret));
if (ret == SSL_ERROR_SYSCALL)
- mws_error(client->log, "SSL_write SYSCALL errno: %d %s", errnobkp, strerror(errnobkp));
+ nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_write SYSCALL errno: %d %s", errnobkp, strerror(errnobkp));
return MQTT_WSS_ERR_CONN_DROP;
}
}
@@ -1039,7 +917,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
util_clear_pipe(client->write_notif_pipe[PIPE_READ_END]);
#ifdef MQTT_WSS_CPUSTATS
- t2 = mqtt_wss_now_usec(client);
+ t2 = mqtt_wss_now_usec();
client->stats.time_write_socket += t2 - t1;
#endif
@@ -1056,12 +934,12 @@ int mqtt_wss_publish5(mqtt_wss_client client,
uint16_t *packet_id)
{
if (client->mqtt_disconnecting) {
- mws_error(client->log, "mqtt_wss is disconnecting can't publish");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "mqtt_wss is disconnecting can't publish");
return 1;
}
if (!client->mqtt_connected) {
- mws_error(client->log, "MQTT is offline. Can't send message.");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT is offline. Can't send message.");
return 1;
}
uint8_t mqtt_flags = 0;
@@ -1072,7 +950,7 @@ int mqtt_wss_publish5(mqtt_wss_client client,
int rc = mqtt_ng_publish(client->mqtt, topic, topic_free, msg, msg_free, msg_len, mqtt_flags, packet_id);
if (rc == MQTT_NG_MSGGEN_MSG_TOO_BIG)
- return MQTT_WSS_ERR_TOO_BIG_FOR_SERVER;
+ return MQTT_WSS_ERR_MSG_TOO_BIG;
mqtt_wss_wakeup(client);
@@ -1083,12 +961,12 @@ int mqtt_wss_subscribe(mqtt_wss_client client, char *topic, int max_qos_level)
{
(void)max_qos_level; //TODO now hardcoded
if (!client->mqtt_connected) {
- mws_error(client->log, "MQTT is offline. Can't subscribe.");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT is offline. Can't subscribe.");
return 1;
}
if (client->mqtt_disconnecting) {
- mws_error(client->log, "mqtt_wss is disconnecting can't subscribe");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "mqtt_wss is disconnecting can't subscribe");
return 1;
}
diff --git a/src/aclk/mqtt_websockets/mqtt_wss_client.h b/src/aclk/mqtt_websockets/mqtt_wss_client.h
index f0bdce98b..2fd94075d 100644
--- a/src/aclk/mqtt_websockets/mqtt_wss_client.h
+++ b/src/aclk/mqtt_websockets/mqtt_wss_client.h
@@ -1,56 +1,36 @@
-// SPDX-License-Identifier: GPL-3.0-only
-// Copyright (C) 2020 Timotej Šiškovič
+// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef MQTT_WSS_CLIENT_H
#define MQTT_WSS_CLIENT_H
-#include <stdbool.h>
-#include <stdint.h>
-#include <stddef.h> //size_t
-
-#include "mqtt_wss_log.h"
#include "common_public.h"
-// All OK call me at your earliest convinience
-#define MQTT_WSS_OK 0
-/* All OK, poll timeout you requested when calling mqtt_wss_service expired - you might want to know if timeout
- * happened or we got some data or handle same as MQTT_WSS_OK
- */
-#define MQTT_WSS_OK_TO 1
-// Connection was closed by remote
-#define MQTT_WSS_ERR_CONN_DROP -1
-// Error in MQTT protocol (e.g. malformed packet)
-#define MQTT_WSS_ERR_PROTO_MQTT -2
-// Error in WebSocket protocol (e.g. malformed packet)
-#define MQTT_WSS_ERR_PROTO_WS -3
-
-#define MQTT_WSS_ERR_TX_BUF_TOO_SMALL -4
-#define MQTT_WSS_ERR_RX_BUF_TOO_SMALL -5
-
-#define MQTT_WSS_ERR_TOO_BIG_FOR_SERVER -6
-// if client was initialized with MQTT 3 but MQTT 5 feature
-// was requested by user of library
-#define MQTT_WSS_ERR_CANT_DO -8
+
+#define MQTT_WSS_OK 0 // All OK call me at your earliest convinience
+#define MQTT_WSS_OK_TO 1 // All OK, poll timeout you requested when calling mqtt_wss_service expired
+ //you might want to know if timeout
+ //happened or we got some data or handle same as MQTT_WSS_OK
+#define MQTT_WSS_ERR_CONN_DROP -1 // Connection was closed by remote
+#define MQTT_WSS_ERR_PROTO_MQTT -2 // Error in MQTT protocol (e.g. malformed packet)
+#define MQTT_WSS_ERR_PROTO_WS -3 // Error in WebSocket protocol (e.g. malformed packet)
+#define MQTT_WSS_ERR_MSG_TOO_BIG -6 // Message size too big for server
+#define MQTT_WSS_ERR_CANT_DO -8 // if client was initialized with MQTT 3 but MQTT 5 feature
+ // was requested by user of library
typedef struct mqtt_wss_client_struct *mqtt_wss_client;
typedef void (*msg_callback_fnc_t)(const char *topic, const void *msg, size_t msglen, int qos);
+
/* Creates new instance of MQTT over WSS. Doesn't start connection.
- * @param log_prefix this is prefix to be used when logging to discern between multiple
- * mqtt_wss instances. Can be NULL.
- * @param log_callback is function pointer to fnc to be called when mqtt_wss wants
- * to log. This allows plugging this library into your own logging system/solution.
- * If NULL STDOUT/STDERR will be used.
* @param msg_callback is function pointer to function which will be called
* when application level message arrives from broker (for subscribed topics).
* Can be NULL if you are not interested about incoming messages.
* @param puback_callback is function pointer to function to be called when QOS1 Publish
* is acknowledged by server
*/
-mqtt_wss_client mqtt_wss_new(const char *log_prefix,
- mqtt_wss_log_callback_t log_callback,
- msg_callback_fnc_t msg_callback,
- void (*puback_callback)(uint16_t packet_id));
+mqtt_wss_client mqtt_wss_new(
+ msg_callback_fnc_t msg_callback,
+ void (*puback_callback)(uint16_t packet_id));
void mqtt_wss_set_max_buf_size(mqtt_wss_client client, size_t size);
@@ -76,7 +56,7 @@ int mqtt_wss_connect(
int port,
struct mqtt_connect_params *mqtt_params,
int ssl_flags,
- struct mqtt_wss_proxy *proxy,
+ const struct mqtt_wss_proxy *proxy,
bool *fallback_ipv4);
int mqtt_wss_service(mqtt_wss_client client, int timeout_ms);
void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms);
diff --git a/src/aclk/mqtt_websockets/mqtt_wss_log.c b/src/aclk/mqtt_websockets/mqtt_wss_log.c
deleted file mode 100644
index e5da76fcf..000000000
--- a/src/aclk/mqtt_websockets/mqtt_wss_log.c
+++ /dev/null
@@ -1,130 +0,0 @@
-// Copyright: SPDX-License-Identifier: GPL-3.0-only
-
-#include <stdlib.h>
-#include <stdarg.h>
-#include <string.h>
-#include <stdio.h>
-
-#include "mqtt_wss_log.h"
-#include "common_internal.h"
-
-struct mqtt_wss_log_ctx {
- mqtt_wss_log_callback_t extern_log_fnc;
- char *ctx_prefix;
- char *buffer;
- char *buffer_w_ptr;
- size_t buffer_bytes_avail;
-};
-
-#define LOG_BUFFER_SIZE 1024 * 4
-#define LOG_CTX_PREFIX_SEV_STR " : "
-#define LOG_CTX_PREFIX_LIMIT 15
-#define LOG_CTX_PREFIX_LIMIT_STR (LOG_CTX_PREFIX_LIMIT - (2 + strlen(LOG_CTX_PREFIX_SEV_STR))) // with [] characters and affixed ' ' it is total 15 chars
-#if (LOG_CTX_PREFIX_LIMIT * 10) > LOG_BUFFER_SIZE
-#error "LOG_BUFFER_SIZE too small"
-#endif
-mqtt_wss_log_ctx_t mqtt_wss_log_ctx_create(const char *ctx_prefix, mqtt_wss_log_callback_t log_callback)
-{
- mqtt_wss_log_ctx_t ctx = callocz(1, sizeof(struct mqtt_wss_log_ctx));
- if(!ctx)
- return NULL;
-
- if(log_callback) {
- ctx->extern_log_fnc = log_callback;
- ctx->buffer = callocz(1, LOG_BUFFER_SIZE);
- if(!ctx->buffer)
- goto cleanup;
-
- ctx->buffer_w_ptr = ctx->buffer;
- if(ctx_prefix) {
- *(ctx->buffer_w_ptr++) = '[';
- strncpy(ctx->buffer_w_ptr, ctx_prefix, LOG_CTX_PREFIX_LIMIT_STR);
- ctx->buffer_w_ptr += strnlen(ctx_prefix, LOG_CTX_PREFIX_LIMIT_STR);
- *(ctx->buffer_w_ptr++) = ']';
- }
- strcpy(ctx->buffer_w_ptr, LOG_CTX_PREFIX_SEV_STR);
- ctx->buffer_w_ptr += strlen(LOG_CTX_PREFIX_SEV_STR);
- // no term '\0' -> calloc is used
-
- ctx->buffer_bytes_avail = LOG_BUFFER_SIZE - strlen(ctx->buffer);
-
- return ctx;
- }
-
- if(ctx_prefix) {
- ctx->ctx_prefix = strndup(ctx_prefix, LOG_CTX_PREFIX_LIMIT_STR);
- if(!ctx->ctx_prefix)
- goto cleanup;
- }
-
- return ctx;
-
-cleanup:
- freez(ctx);
- return NULL;
-}
-
-void mqtt_wss_log_ctx_destroy(mqtt_wss_log_ctx_t ctx)
-{
- freez(ctx->ctx_prefix);
- freez(ctx->buffer);
- freez(ctx);
-}
-
-static inline char severity_to_c(int severity)
-{
- switch (severity) {
- case MQTT_WSS_LOG_FATAL:
- return 'F';
- case MQTT_WSS_LOG_ERROR:
- return 'E';
- case MQTT_WSS_LOG_WARN:
- return 'W';
- case MQTT_WSS_LOG_INFO:
- return 'I';
- case MQTT_WSS_LOG_DEBUG:
- return 'D';
- default:
- return '?';
- }
-}
-
-void mws_log(int severity, mqtt_wss_log_ctx_t ctx, const char *fmt, va_list args)
-{
- size_t size;
-
- if(ctx->extern_log_fnc) {
- size = vsnprintf(ctx->buffer_w_ptr, ctx->buffer_bytes_avail, fmt, args);
- *(ctx->buffer_w_ptr - 3) = severity_to_c(severity);
-
- ctx->extern_log_fnc(severity, ctx->buffer);
-
- if(size >= ctx->buffer_bytes_avail)
- mws_error(ctx, "Last message of this type was truncated! Consider what you log or increase LOG_BUFFER_SIZE if really needed.");
-
- return;
- }
-
- if(ctx->ctx_prefix)
- printf("[%s] ", ctx->ctx_prefix);
-
- printf("%c: ", severity_to_c(severity));
-
- vprintf(fmt, args);
- putchar('\n');
-}
-
-#define DEFINE_MWS_SEV_FNC(severity_fncname, severity) \
-void mws_ ## severity_fncname(mqtt_wss_log_ctx_t ctx, const char *fmt, ...) \
-{ \
- va_list args; \
- va_start(args, fmt); \
- mws_log(severity, ctx, fmt, args); \
- va_end(args); \
-}
-
-DEFINE_MWS_SEV_FNC(fatal, MQTT_WSS_LOG_FATAL)
-DEFINE_MWS_SEV_FNC(error, MQTT_WSS_LOG_ERROR)
-DEFINE_MWS_SEV_FNC(warn, MQTT_WSS_LOG_WARN )
-DEFINE_MWS_SEV_FNC(info, MQTT_WSS_LOG_INFO )
-DEFINE_MWS_SEV_FNC(debug, MQTT_WSS_LOG_DEBUG)
diff --git a/src/aclk/mqtt_websockets/mqtt_wss_log.h b/src/aclk/mqtt_websockets/mqtt_wss_log.h
deleted file mode 100644
index 6ae60d870..000000000
--- a/src/aclk/mqtt_websockets/mqtt_wss_log.h
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright: SPDX-License-Identifier: GPL-3.0-only
-
-#ifndef MQTT_WSS_LOG_H
-#define MQTT_WSS_LOG_H
-
-typedef enum mqtt_wss_log_type {
- MQTT_WSS_LOG_DEBUG = 0x01,
- MQTT_WSS_LOG_INFO = 0x02,
- MQTT_WSS_LOG_WARN = 0x03,
- MQTT_WSS_LOG_ERROR = 0x81,
- MQTT_WSS_LOG_FATAL = 0x88
-} mqtt_wss_log_type_t;
-
-typedef void (*mqtt_wss_log_callback_t)(mqtt_wss_log_type_t, const char*);
-
-typedef struct mqtt_wss_log_ctx *mqtt_wss_log_ctx_t;
-
-/** Creates logging context with optional prefix and optional callback
- * @param ctx_prefix String to be prefixed to every log message.
- * This is useful if multiple clients are instantiated to be able to
- * know which one this message belongs to. Can be `NULL` for no prefix.
- * @param log_callback Callback to be called instead of logging to
- * `STDOUT` or `STDERR` (if debug enabled otherwise silent). Callback has to be
- * pointer to function of `void function(mqtt_wss_log_type_t, const char*)` type.
- * If `NULL` default will be used (silent or STDERR/STDOUT).
- * @return mqtt_wss_log_ctx_t or `NULL` on error */
-mqtt_wss_log_ctx_t mqtt_wss_log_ctx_create(const char *ctx_prefix, mqtt_wss_log_callback_t log_callback);
-
-/** Destroys logging context and cleans up the memory
- * @param ctx Context to destroy */
-void mqtt_wss_log_ctx_destroy(mqtt_wss_log_ctx_t ctx);
-
-void mws_fatal(mqtt_wss_log_ctx_t ctx, const char *fmt, ...);
-void mws_error(mqtt_wss_log_ctx_t ctx, const char *fmt, ...);
-void mws_warn (mqtt_wss_log_ctx_t ctx, const char *fmt, ...);
-void mws_info (mqtt_wss_log_ctx_t ctx, const char *fmt, ...);
-void mws_debug(mqtt_wss_log_ctx_t ctx, const char *fmt, ...);
-
-#endif /* MQTT_WSS_LOG_H */
diff --git a/src/aclk/mqtt_websockets/ws_client.c b/src/aclk/mqtt_websockets/ws_client.c
index a6b9b23f3..99ea266c8 100644
--- a/src/aclk/mqtt_websockets/ws_client.c
+++ b/src/aclk/mqtt_websockets/ws_client.c
@@ -1,103 +1,43 @@
-// Copyright (C) 2020 Timotej Šiškovič
-// SPDX-License-Identifier: GPL-3.0-only
-//
-// This program is free software: you can redistribute it and/or modify it
-// under the terms of the GNU General Public License as published by the Free Software Foundation, version 3.
-//
-// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
-// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
-// See the GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License along with this program.
-// If not, see <https://www.gnu.org/licenses/>.
-
-#include <fcntl.h>
-#include <unistd.h>
-#include <string.h>
-#include <errno.h>
-#include <ctype.h>
-
-#include <openssl/evp.h>
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "libnetdata/libnetdata.h"
#include "ws_client.h"
#include "common_internal.h"
-#ifdef MQTT_WEBSOCKETS_DEBUG
-#include "../c-rbuf/src/ringbuffer_internal.h"
-#endif
-
-#define UNIT_LOG_PREFIX "ws_client: "
-#define FATAL(fmt, ...) mws_fatal(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define ERROR(fmt, ...) mws_error(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define WARN(fmt, ...) mws_warn (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define INFO(fmt, ...) mws_info (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define DEBUG(fmt, ...) mws_debug(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-
const char *websocket_upgrage_hdr = "GET /mqtt HTTP/1.1\x0D\x0A"
"Host: %s\x0D\x0A"
"Upgrade: websocket\x0D\x0A"
"Connection: Upgrade\x0D\x0A"
"Sec-WebSocket-Key: %s\x0D\x0A"
- "Origin: http://example.com\x0D\x0A"
+ "Origin: \x0D\x0A"
"Sec-WebSocket-Protocol: mqtt\x0D\x0A"
"Sec-WebSocket-Version: 13\x0D\x0A\x0D\x0A";
const char *mqtt_protoid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
#define DEFAULT_RINGBUFFER_SIZE (1024*128)
-#define ENTROPY_SOURCE "/dev/urandom"
-ws_client *ws_client_new(size_t buf_size, char **host, mqtt_wss_log_ctx_t log)
-{
- ws_client *client;
+ws_client *ws_client_new(size_t buf_size, char **host)
+{
if(!host)
return NULL;
- client = callocz(1, sizeof(ws_client));
- if (!client)
- return NULL;
-
+ ws_client *client = callocz(1, sizeof(ws_client));
client->host = host;
- client->log = log;
-
client->buf_read = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE);
- if (!client->buf_read)
- goto cleanup;
-
client->buf_write = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE);
- if (!client->buf_write)
- goto cleanup_1;
-
client->buf_to_mqtt = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE);
- if (!client->buf_to_mqtt)
- goto cleanup_2;
-
- client->entropy_fd = open(ENTROPY_SOURCE, O_RDONLY | O_CLOEXEC);
- if (client->entropy_fd < 1) {
- ERROR("Error opening entropy source \"" ENTROPY_SOURCE "\". Reason: \"%s\"", strerror(errno));
- goto cleanup_3;
- }
return client;
-
-cleanup_3:
- rbuf_free(client->buf_to_mqtt);
-cleanup_2:
- rbuf_free(client->buf_write);
-cleanup_1:
- rbuf_free(client->buf_read);
-cleanup:
- freez(client);
- return NULL;
}
void ws_client_free_headers(ws_client *client)
{
struct http_header *ptr = client->hs.headers;
- struct http_header *tmp;
while (ptr) {
- tmp = ptr;
+ struct http_header *tmp = ptr;
ptr = ptr->next;
freez(tmp);
}
@@ -112,7 +52,6 @@ void ws_client_destroy(ws_client *client)
ws_client_free_headers(client);
freez(client->hs.nonce_reply);
freez(client->hs.http_reply_msg);
- close(client->entropy_fd);
rbuf_free(client->buf_read);
rbuf_free(client->buf_write);
rbuf_free(client->buf_to_mqtt);
@@ -141,7 +80,7 @@ void ws_client_reset(ws_client *client)
int ws_client_add_http_header(ws_client *client, struct http_header *hdr)
{
if (client->hs.hdr_count > MAX_HTTP_HDR_COUNT) {
- ERROR("Too many HTTP response header fields");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Too many HTTP response header fields");
return -1;
}
@@ -156,7 +95,7 @@ int ws_client_add_http_header(ws_client *client, struct http_header *hdr)
return 0;
}
-int ws_client_want_write(ws_client *client)
+int ws_client_want_write(const ws_client *client)
{
return rbuf_bytes_available(client->buf_write);
}
@@ -165,78 +104,89 @@ int ws_client_want_write(ws_client *client)
#define TEMP_BUF_SIZE 4096
int ws_client_start_handshake(ws_client *client)
{
- nd_uuid_t nonce;
+ unsigned char nonce[WEBSOCKET_NONCE_SIZE];
char nonce_b64[256];
char second[TEMP_BUF_SIZE];
unsigned int md_len;
- unsigned char *digest;
+ unsigned char digest[EVP_MAX_MD_SIZE]; // EVP_MAX_MD_SIZE ensures enough space
EVP_MD_CTX *md_ctx;
const EVP_MD *md;
+ int rc = 1;
if(!client->host || !*client->host) {
- ERROR("Hostname has not been set. We should not be able to come here!");
- return 1;
- }
-
- uuid_generate_random(nonce);
- EVP_EncodeBlock((unsigned char *)nonce_b64, (const unsigned char *)nonce, WEBSOCKET_NONCE_SIZE);
- snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr, *client->host, nonce_b64);
-
- if(rbuf_bytes_free(client->buf_write) < strlen(second)) {
- ERROR("Write buffer capacity too low.");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Hostname has not been set. We should not be able to come here!");
return 1;
}
- rbuf_push(client->buf_write, second, strlen(second));
- client->state = WS_HANDSHAKE;
-
- //Calculating expected Sec-WebSocket-Accept reply
- snprintf(second, TEMP_BUF_SIZE, "%s%s", nonce_b64, mqtt_protoid);
+ // Generate a random 16-byte nonce
+ os_random_bytes(nonce, sizeof(nonce));
+ // Initialize the digest context
#if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110)
md_ctx = EVP_MD_CTX_create();
#else
md_ctx = EVP_MD_CTX_new();
#endif
if (md_ctx == NULL) {
- ERROR("Cant create EVP_MD Context");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Can't create EVP_MD context");
return 1;
}
- md = EVP_get_digestbyname("sha1");
+ md = EVP_sha1(); // Use SHA-1 for WebSocket handshake
if (!md) {
- ERROR("Unknown message digest");
- return 1;
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown message digest SHA-1");
+ goto exit_with_error;
}
- if ((digest = (unsigned char *)OPENSSL_malloc(EVP_MD_size(EVP_sha256()))) == NULL) {
- ERROR("Cant alloc digest");
- return 1;
+ (void) netdata_base64_encode((unsigned char *) nonce_b64, nonce, WEBSOCKET_NONCE_SIZE);
+
+ // Format and push the upgrade header to the write buffer
+ size_t bytes = snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr, *client->host, nonce_b64);
+ if(rbuf_bytes_free(client->buf_write) < bytes) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Write buffer capacity too low.");
+ goto exit_with_error;
}
+ rbuf_push(client->buf_write, second, bytes);
+
+ client->state = WS_HANDSHAKE;
- EVP_DigestInit_ex(md_ctx, md, NULL);
- EVP_DigestUpdate(md_ctx, second, strlen(second));
- EVP_DigestFinal_ex(md_ctx, digest, &md_len);
+ // Create the expected Sec-WebSocket-Accept value
+ bytes = snprintf(second, TEMP_BUF_SIZE, "%s%s", nonce_b64, mqtt_protoid);
- EVP_EncodeBlock((unsigned char *)nonce_b64, digest, (int) md_len);
+ if (!EVP_DigestInit_ex(md_ctx, md, NULL)) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to initialize digest context");
+ goto exit_with_error;
+ }
+
+ if (!EVP_DigestUpdate(md_ctx, second, bytes)) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to update digest");
+ goto exit_with_error;
+ }
+
+ if (!EVP_DigestFinal_ex(md_ctx, digest, &md_len)) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to finalize digest");
+ goto exit_with_error;
+ }
+
+ (void) netdata_base64_encode((unsigned char *) nonce_b64, digest, md_len);
freez(client->hs.nonce_reply);
client->hs.nonce_reply = strdupz(nonce_b64);
+ rc = 0;
- OPENSSL_free(digest);
-
+exit_with_error:
#if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110)
EVP_MD_CTX_destroy(md_ctx);
#else
EVP_MD_CTX_free(md_ctx);
#endif
- return 0;
+ return rc;
}
#define BUF_READ_MEMCMP_CONST(const, err) \
if (rbuf_memcmp_n(client->buf_read, const, strlen(const))) { \
- ERROR(err); \
+ nd_log(NDLS_DAEMON, NDLP_ERR, err); \
rbuf_flush(client->buf_read); \
return WS_CLIENT_PROTOCOL_ERROR; \
}
@@ -262,7 +212,7 @@ int ws_client_start_handshake(ws_client *client)
#define HTTP_HDR_LINE_CHECK_LIMIT(x) \
if ((x) >= MAX_HTTP_LINE_LENGTH) { \
- ERROR("HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \
+ nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \
return WS_CLIENT_PROTOCOL_ERROR; \
}
@@ -285,13 +235,13 @@ int ws_client_parse_handshake_resp(ws_client *client)
BUF_READ_CHECK_AT_LEAST(HTTP_SC_LENGTH); // "XXX " http return code
rbuf_pop(client->buf_read, buf, HTTP_SC_LENGTH);
if (buf[HTTP_SC_LENGTH - 1] != 0x20) {
- ERROR("HTTP status code received is not terminated by space (0x20)");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP status code received is not terminated by space (0x20)");
return WS_CLIENT_PROTOCOL_ERROR;
}
buf[HTTP_SC_LENGTH - 1] = 0;
client->hs.http_code = atoi(buf);
if (client->hs.http_code < 100 || client->hs.http_code >= 600) {
- ERROR("HTTP status code received not in valid range 100-600");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP status code received not in valid range 100-600");
return WS_CLIENT_PROTOCOL_ERROR;
}
client->hs.hdr_state = WS_HDR_ENDLINE;
@@ -330,16 +280,16 @@ int ws_client_parse_handshake_resp(ws_client *client)
ptr = rbuf_find_bytes(client->buf_read, HTTP_HDR_SEPARATOR, strlen(HTTP_HDR_SEPARATOR), &idx_sep);
if (!ptr || idx_sep > idx_crlf) {
- ERROR("Expected HTTP hdr field key/value separator \": \" before endline in non empty HTTP header line");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Expected HTTP hdr field key/value separator \": \" before endline in non empty HTTP header line");
return WS_CLIENT_PROTOCOL_ERROR;
}
if (idx_crlf == idx_sep + (int)strlen(HTTP_HDR_SEPARATOR)) {
- ERROR("HTTP Header value cannot be empty");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP Header value cannot be empty");
return WS_CLIENT_PROTOCOL_ERROR;
}
if (idx_sep > HTTP_HEADER_NAME_MAX_LEN) {
- ERROR("HTTP header too long (%d)", idx_sep);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP header too long (%d)", idx_sep);
return WS_CLIENT_PROTOCOL_ERROR;
}
@@ -347,23 +297,21 @@ int ws_client_parse_handshake_resp(ws_client *client)
hdr->key = ((char*)hdr) + sizeof(struct http_header);
hdr->value = hdr->key + idx_sep + 1;
- bytes = rbuf_pop(client->buf_read, hdr->key, idx_sep);
+ rbuf_pop(client->buf_read, hdr->key, idx_sep);
rbuf_bump_tail(client->buf_read, strlen(HTTP_HDR_SEPARATOR));
- bytes = rbuf_pop(client->buf_read, hdr->value, idx_crlf - idx_sep - strlen(HTTP_HDR_SEPARATOR));
+ rbuf_pop(client->buf_read, hdr->value, idx_crlf - idx_sep - strlen(HTTP_HDR_SEPARATOR));
rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE));
for (int i = 0; hdr->key[i]; i++)
hdr->key[i] = tolower(hdr->key[i]);
-// DEBUG("HTTP header \"%s\" received. Value \"%s\"", hdr->key, hdr->value);
-
if (ws_client_add_http_header(client, hdr))
return WS_CLIENT_PROTOCOL_ERROR;
if (!strcmp(hdr->key, WS_CONN_ACCEPT)) {
if (strcmp(client->hs.nonce_reply, hdr->value)) {
- ERROR("Received NONCE \"%s\" does not match expected nonce of \"%s\"", hdr->value, client->hs.nonce_reply);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Received NONCE \"%s\" does not match expected nonce of \"%s\"", hdr->value, client->hs.nonce_reply);
return WS_CLIENT_PROTOCOL_ERROR;
}
client->hs.nonce_matched = 1;
@@ -373,21 +321,21 @@ int ws_client_parse_handshake_resp(ws_client *client)
case WS_HDR_PARSE_DONE:
if (!client->hs.nonce_matched) {
- ERROR("Missing " WS_CONN_ACCEPT " header");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Missing " WS_CONN_ACCEPT " header");
return WS_CLIENT_PROTOCOL_ERROR;
}
if (client->hs.http_code != 101) {
- ERROR("HTTP return code not 101. Received %d with msg \"%s\".", client->hs.http_code, client->hs.http_reply_msg);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP return code not 101. Received %d with msg \"%s\".", client->hs.http_code, client->hs.http_reply_msg);
return WS_CLIENT_PROTOCOL_ERROR;
}
client->state = WS_ESTABLISHED;
client->hs.hdr_state = WS_HDR_ALL_DONE;
- INFO("Websocket Connection Accepted By Server");
+ nd_log(NDLS_DAEMON, NDLP_INFO, "Websocket Connection Accepted By Server");
return WS_CLIENT_PARSING_DONE;
case WS_HDR_ALL_DONE:
- FATAL("This is error we should never come here!");
+ nd_log(NDLS_DAEMON, NDLP_CRIT, "This is error we should never come here!");
return WS_CLIENT_PROTOCOL_ERROR;
}
return 0;
@@ -397,7 +345,7 @@ int ws_client_parse_handshake_resp(ws_client *client)
#define WS_FINAL_FRAG BYTE_MSB
#define WS_PAYLOAD_MASKED BYTE_MSB
-static inline size_t get_ws_hdr_size(size_t payload_size)
+static size_t get_ws_hdr_size(size_t payload_size)
{
size_t hdr_len = 2 + 4 /*mask*/;
if(payload_size > 125)
@@ -408,7 +356,7 @@ static inline size_t get_ws_hdr_size(size_t payload_size)
}
#define MAX_POSSIBLE_HDR_LEN 14
-int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size)
+int ws_client_send(const ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size)
{
// TODO maybe? implement fragmenting, it is not necessary though
// as both tested MQTT brokers have no reuirement of one MQTT envelope
@@ -416,24 +364,16 @@ int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const ch
// one big MQTT message as single fragmented WebSocket envelope
char hdr[MAX_POSSIBLE_HDR_LEN];
char *ptr = hdr;
- char *mask;
int size_written = 0;
size_t j = 0;
size_t w_buff_free = rbuf_bytes_free(client->buf_write);
size_t hdr_len = get_ws_hdr_size(size);
- if (w_buff_free < hdr_len * 2) {
-#ifdef DEBUG_ULTRA_VERBOSE
- DEBUG("Write buffer full. Can't write requested %d size.", size);
-#endif
+ if (w_buff_free < hdr_len * 2)
return 0;
- }
if (w_buff_free < (hdr_len + size)) {
-#ifdef DEBUG_ULTRA_VERBOSE
- DEBUG("Can't write whole MQTT packet of %d bytes into the buffer. Will do partial send of %d.", size, w_buff_free - hdr_len);
-#endif
size = w_buff_free - hdr_len;
hdr_len = get_ws_hdr_size(size);
// the actual needed header size might decrease if we cut number of bytes
@@ -459,12 +399,10 @@ int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const ch
ptr += sizeof(be);
} else
*ptr++ |= size;
-
- mask = ptr;
- if (read(client->entropy_fd, mask, sizeof(uint32_t)) < (ssize_t)sizeof(uint32_t)) {
- ERROR("Unable to get mask from \"" ENTROPY_SOURCE "\"");
- return -2;
- }
+
+ char *mask = ptr;
+ uint32_t mask32 = os_random32() + 1;
+ memcpy(mask, &mask32, sizeof(mask32));
rbuf_push(client->buf_write, hdr, hdr_len);
@@ -490,7 +428,7 @@ int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const ch
return size_written;
}
-static int check_opcode(ws_client *client,enum websocket_opcode oc)
+static int check_opcode(enum websocket_opcode oc)
{
switch(oc) {
case WS_OP_BINARY_FRAME:
@@ -498,34 +436,34 @@ static int check_opcode(ws_client *client,enum websocket_opcode oc)
case WS_OP_PING:
return 0;
case WS_OP_CONTINUATION_FRAME:
- FATAL("WS_OP_CONTINUATION_FRAME NOT IMPLEMENTED YET!!!!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "WS_OP_CONTINUATION_FRAME NOT IMPLEMENTED YET!!!!");
return 0;
case WS_OP_TEXT_FRAME:
- FATAL("WS_OP_TEXT_FRAME NOT IMPLEMENTED YET!!!!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "WS_OP_TEXT_FRAME NOT IMPLEMENTED YET!!!!");
return 0;
case WS_OP_PONG:
- FATAL("WS_OP_PONG NOT IMPLEMENTED YET!!!!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "WS_OP_PONG NOT IMPLEMENTED YET!!!!");
return 0;
default:
return WS_CLIENT_PROTOCOL_ERROR;
}
}
-static inline void ws_client_rx_post_hdr_state(ws_client *client)
+static void ws_client_rx_post_hdr_state(ws_client *client)
{
switch(client->rx.opcode) {
case WS_OP_BINARY_FRAME:
client->rx.parse_state = WS_PAYLOAD_DATA;
- return;
+ break;
case WS_OP_CONNECTION_CLOSE:
client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE;
- return;
+ break;
case WS_OP_PING:
client->rx.parse_state = WS_PAYLOAD_PING_REQ_PAYLOAD;
- return;
+ break;
default:
client->rx.parse_state = WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD;
- return;
+ break;
}
}
@@ -541,15 +479,15 @@ int ws_client_process_rx_ws(ws_client *client)
client->rx.opcode = buf[0] & (char)~BYTE_MSB;
if (!(buf[0] & (char)~WS_FINAL_FRAG)) {
- ERROR("Not supporting fragmented messages yet!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Not supporting fragmented messages yet!");
return WS_CLIENT_PROTOCOL_ERROR;
}
- if (check_opcode(client, client->rx.opcode) == WS_CLIENT_PROTOCOL_ERROR)
+ if (check_opcode(client->rx.opcode) == WS_CLIENT_PROTOCOL_ERROR)
return WS_CLIENT_PROTOCOL_ERROR;
if (buf[1] & (char)WS_PAYLOAD_MASKED) {
- ERROR("Mask is not allowed in Server->Client Websocket direction.");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Mask is not allowed in Server->Client Websocket direction.");
return WS_CLIENT_PROTOCOL_ERROR;
}
@@ -584,12 +522,8 @@ int ws_client_process_rx_ws(ws_client *client)
if (!rbuf_bytes_available(client->buf_read))
return WS_CLIENT_NEED_MORE_BYTES;
char *insert = rbuf_get_linear_insert_range(client->buf_to_mqtt, &size);
- if (!insert) {
-#ifdef DEBUG_ULTRA_VERBOSE
- DEBUG("BUFFER TOO FULL. Avail %d req %d", (int)size, (int)remaining);
-#endif
+ if (!insert)
return WS_CLIENT_BUFFER_FULL;
- }
size = (size > remaining) ? remaining : size;
size = rbuf_pop(client->buf_read, insert, size);
rbuf_bump_head(client->buf_to_mqtt, size);
@@ -603,11 +537,11 @@ int ws_client_process_rx_ws(ws_client *client)
// b) 2byte reason code
// c) 2byte reason code followed by message
if (client->rx.payload_length == 1) {
- ERROR("WebScoket CONNECTION_CLOSE can't have payload of size 1");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "WebScoket CONNECTION_CLOSE can't have payload of size 1");
return WS_CLIENT_PROTOCOL_ERROR;
}
if (!client->rx.payload_length) {
- INFO("WebSocket server closed the connection without giving reason.");
+ nd_log(NDLS_DAEMON, NDLP_INFO, "WebSocket server closed the connection without giving reason.");
client->rx.parse_state = WS_PACKET_DONE;
break;
}
@@ -621,7 +555,7 @@ int ws_client_process_rx_ws(ws_client *client)
client->rx.payload_processed += sizeof(uint16_t);
if(client->rx.payload_processed == client->rx.payload_length) {
- INFO("WebSocket server closed the connection with EC=%d. Without message.",
+ nd_log(NDLS_DAEMON, NDLP_INFO, "WebSocket server closed the connection with EC=%d. Without message.",
client->rx.specific_data.op_close.ec);
client->rx.parse_state = WS_PACKET_DONE;
break;
@@ -640,7 +574,7 @@ int ws_client_process_rx_ws(ws_client *client)
client->rx.payload_length - client->rx.payload_processed);
}
client->rx.specific_data.op_close.reason[client->rx.payload_length] = 0;
- INFO("WebSocket server closed the connection with EC=%d and reason \"%s\"",
+ nd_log(NDLS_DAEMON, NDLP_INFO, "WebSocket server closed the connection with EC=%d and reason \"%s\"",
client->rx.specific_data.op_close.ec,
client->rx.specific_data.op_close.reason);
freez(client->rx.specific_data.op_close.reason);
@@ -649,14 +583,14 @@ int ws_client_process_rx_ws(ws_client *client)
break;
case WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD:
BUF_READ_CHECK_AT_LEAST(client->rx.payload_length);
- WARN("Skipping Websocket Packet of unsupported/unknown type");
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "Skipping Websocket Packet of unsupported/unknown type");
if (client->rx.payload_length)
rbuf_bump_tail(client->buf_read, client->rx.payload_length);
client->rx.parse_state = WS_PACKET_DONE;
return WS_CLIENT_PARSING_DONE;
case WS_PAYLOAD_PING_REQ_PAYLOAD:
if (client->rx.payload_length > rbuf_get_capacity(client->buf_read) / 2) {
- ERROR("Ping arrived with payload which is too big!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Ping arrived with payload which is too big!");
return WS_CLIENT_INTERNAL_ERROR;
}
BUF_READ_CHECK_AT_LEAST(client->rx.payload_length);
@@ -666,7 +600,7 @@ int ws_client_process_rx_ws(ws_client *client)
// then attempt to send as soon as buffer space clears up
size = ws_client_send(client, WS_OP_PONG, client->rx.specific_data.ping_msg, client->rx.payload_length);
if (size != client->rx.payload_length) {
- ERROR("Unable to send the PONG as one packet back. Closing connection.");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Unable to send the PONG as one packet back. Closing connection.");
return WS_CLIENT_PROTOCOL_ERROR;
}
client->rx.parse_state = WS_PACKET_DONE;
@@ -678,7 +612,7 @@ int ws_client_process_rx_ws(ws_client *client)
return WS_CLIENT_CONNECTION_CLOSED;
return WS_CLIENT_PARSING_DONE;
default:
- FATAL("Unknown parse state");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown parse state");
return WS_CLIENT_INTERNAL_ERROR;
}
return 0;
@@ -711,6 +645,8 @@ int ws_client_process(ws_client *client)
case WS_CLIENT_CONNECTION_CLOSED:
client->state = WS_CONN_CLOSED_GRACEFUL;
break;
+ default:
+ break;
}
// if ret == 0 we can continue parsing
// if ret == WS_CLIENT_PARSING_DONE we processed
@@ -719,13 +655,13 @@ int ws_client_process(ws_client *client)
} while (!ret || ret == WS_CLIENT_PARSING_DONE);
break;
case WS_ERROR:
- ERROR("ws_client is in error state. Restart the connection!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "ws_client is in error state. Restart the connection!");
return WS_CLIENT_PROTOCOL_ERROR;
case WS_CONN_CLOSED_GRACEFUL:
- ERROR("Connection has been gracefully closed. Calling this is useless (and probably bug) until you reconnect again.");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Connection has been gracefully closed. Calling this is useless (and probably bug) until you reconnect again.");
return WS_CLIENT_CONNECTION_CLOSED;
default:
- FATAL("Unknown connection state! Probably memory corruption.");
+ nd_log(NDLS_DAEMON, NDLP_CRIT, "Unknown connection state! Probably memory corruption.");
return WS_CLIENT_INTERNAL_ERROR;
}
return ret;
diff --git a/src/aclk/mqtt_websockets/ws_client.h b/src/aclk/mqtt_websockets/ws_client.h
index 0ccbd29a8..67e5835a2 100644
--- a/src/aclk/mqtt_websockets/ws_client.h
+++ b/src/aclk/mqtt_websockets/ws_client.h
@@ -1,14 +1,8 @@
-// SPDX-License-Identifier: GPL-3.0-only
-// Copyright (C) 2020 Timotej Šiškovič
+// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef WS_CLIENT_H
#define WS_CLIENT_H
-#include "c-rbuf/cringbuffer.h"
-#include "mqtt_wss_log.h"
-
-#include <stdint.h>
-
#define WS_CLIENT_NEED_MORE_BYTES 0x10
#define WS_CLIENT_PARSING_DONE 0x11
#define WS_CLIENT_CONNECTION_CLOSED 0x12
@@ -98,23 +92,20 @@ typedef struct websocket_client {
// memory usage and remove one more memcpy buf_read->buf_to_mqtt
rbuf_t buf_to_mqtt; // RAW data for MQTT lib
- int entropy_fd;
-
// careful host is borrowed, don't free
char **host;
- mqtt_wss_log_ctx_t log;
} ws_client;
-ws_client *ws_client_new(size_t buf_size, char **host, mqtt_wss_log_ctx_t log);
+ws_client *ws_client_new(size_t buf_size, char **host);
void ws_client_destroy(ws_client *client);
void ws_client_reset(ws_client *client);
int ws_client_start_handshake(ws_client *client);
-int ws_client_want_write(ws_client *client);
+int ws_client_want_write(const ws_client *client);
int ws_client_process(ws_client *client);
-int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size);
+int ws_client_send(const ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size);
#endif /* WS_CLIENT_H */