summaryrefslogtreecommitdiffstats
path: root/src/aclk
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/aclk/README.md133
-rw-r--r--src/aclk/aclk.c (renamed from aclk/aclk.c)20
-rw-r--r--src/aclk/aclk.h (renamed from aclk/aclk.h)0
-rw-r--r--src/aclk/aclk_alarm_api.c (renamed from aclk/aclk_alarm_api.c)0
-rw-r--r--src/aclk/aclk_alarm_api.h (renamed from aclk/aclk_alarm_api.h)0
-rw-r--r--src/aclk/aclk_capas.c (renamed from aclk/aclk_capas.c)3
-rw-r--r--src/aclk/aclk_capas.h (renamed from aclk/aclk_capas.h)0
-rw-r--r--src/aclk/aclk_contexts_api.c (renamed from aclk/aclk_contexts_api.c)0
-rw-r--r--src/aclk/aclk_contexts_api.h (renamed from aclk/aclk_contexts_api.h)0
-rw-r--r--src/aclk/aclk_otp.c (renamed from aclk/aclk_otp.c)4
-rw-r--r--src/aclk/aclk_otp.h (renamed from aclk/aclk_otp.h)0
-rw-r--r--src/aclk/aclk_proxy.c (renamed from aclk/aclk_proxy.c)21
-rw-r--r--src/aclk/aclk_proxy.h (renamed from aclk/aclk_proxy.h)1
-rw-r--r--src/aclk/aclk_query.c (renamed from aclk/aclk_query.c)73
-rw-r--r--src/aclk/aclk_query.h (renamed from aclk/aclk_query.h)4
-rw-r--r--src/aclk/aclk_query_queue.c (renamed from aclk/aclk_query_queue.c)18
-rw-r--r--src/aclk/aclk_query_queue.h (renamed from aclk/aclk_query_queue.h)9
-rw-r--r--src/aclk/aclk_rrdhost_state.h (renamed from aclk/aclk_rrdhost_state.h)0
-rw-r--r--src/aclk/aclk_rx_msgs.c (renamed from aclk/aclk_rx_msgs.c)17
-rw-r--r--src/aclk/aclk_rx_msgs.h (renamed from aclk/aclk_rx_msgs.h)0
-rw-r--r--src/aclk/aclk_stats.c (renamed from aclk/aclk_stats.c)5
-rw-r--r--src/aclk/aclk_stats.h (renamed from aclk/aclk_stats.h)4
-rw-r--r--src/aclk/aclk_tx_msgs.c (renamed from aclk/aclk_tx_msgs.c)2
-rw-r--r--src/aclk/aclk_tx_msgs.h (renamed from aclk/aclk_tx_msgs.h)2
-rw-r--r--src/aclk/aclk_util.c (renamed from aclk/aclk_util.c)0
-rw-r--r--src/aclk/aclk_util.h (renamed from aclk/aclk_util.h)2
-rw-r--r--src/aclk/helpers/mqtt_wss_pal.h (renamed from aclk/helpers/mqtt_wss_pal.h)0
-rw-r--r--src/aclk/helpers/ringbuffer_pal.h (renamed from aclk/helpers/ringbuffer_pal.h)0
-rw-r--r--src/aclk/https_client.c (renamed from aclk/https_client.c)81
-rw-r--r--src/aclk/https_client.h (renamed from aclk/https_client.h)13
-rw-r--r--src/aclk/mqtt_websockets/.github/workflows/run-tests.yaml (renamed from mqtt_websockets/.github/workflows/run-tests.yaml)0
-rw-r--r--src/aclk/mqtt_websockets/.gitignore (renamed from mqtt_websockets/.gitignore)0
-rw-r--r--src/aclk/mqtt_websockets/README.md7
-rw-r--r--src/aclk/mqtt_websockets/c-rbuf/cringbuffer.c203
-rw-r--r--src/aclk/mqtt_websockets/c-rbuf/cringbuffer.h47
-rw-r--r--src/aclk/mqtt_websockets/c-rbuf/cringbuffer_internal.h37
-rw-r--r--src/aclk/mqtt_websockets/c-rbuf/ringbuffer_test.c (renamed from mqtt_websockets/c-rbuf/tests/ringbuffer_test.c)8
-rw-r--r--src/aclk/mqtt_websockets/c_rhash/c_rhash.c (renamed from mqtt_websockets/c_rhash/src/c_rhash.c)4
-rw-r--r--src/aclk/mqtt_websockets/c_rhash/c_rhash.h (renamed from mqtt_websockets/c_rhash/include/c_rhash.h)2
-rw-r--r--src/aclk/mqtt_websockets/c_rhash/c_rhash_internal.h (renamed from mqtt_websockets/c_rhash/src/c_rhash_internal.h)2
-rw-r--r--src/aclk/mqtt_websockets/c_rhash/tests.c (renamed from mqtt_websockets/c_rhash/src/tests.c)2
-rw-r--r--src/aclk/mqtt_websockets/common_internal.h27
-rw-r--r--src/aclk/mqtt_websockets/common_public.c (renamed from mqtt_websockets/src/common_public.c)2
-rw-r--r--src/aclk/mqtt_websockets/common_public.h (renamed from mqtt_websockets/src/include/common_public.h)0
-rw-r--r--src/aclk/mqtt_websockets/endian_compat.h31
-rw-r--r--src/aclk/mqtt_websockets/mqtt_constants.h (renamed from mqtt_websockets/src/include/mqtt_constants.h)2
-rw-r--r--src/aclk/mqtt_websockets/mqtt_ng.c (renamed from mqtt_websockets/src/mqtt_ng.c)24
-rw-r--r--src/aclk/mqtt_websockets/mqtt_ng.h (renamed from mqtt_websockets/src/include/mqtt_ng.h)6
-rw-r--r--src/aclk/mqtt_websockets/mqtt_wss_client.c (renamed from mqtt_websockets/src/mqtt_wss_client.c)50
-rw-r--r--src/aclk/mqtt_websockets/mqtt_wss_client.h (renamed from mqtt_websockets/src/include/mqtt_wss_client.h)12
-rw-r--r--src/aclk/mqtt_websockets/mqtt_wss_log.c (renamed from mqtt_websockets/src/mqtt_wss_log.c)5
-rw-r--r--src/aclk/mqtt_websockets/mqtt_wss_log.h (renamed from mqtt_websockets/src/include/mqtt_wss_log.h)2
-rw-r--r--src/aclk/mqtt_websockets/test.c90
-rw-r--r--src/aclk/mqtt_websockets/ws_client.c (renamed from mqtt_websockets/src/ws_client.c)4
-rw-r--r--src/aclk/mqtt_websockets/ws_client.h (renamed from mqtt_websockets/src/include/ws_client.h)14
-rw-r--r--src/aclk/schema-wrappers/agent_cmds.cc (renamed from aclk/schema-wrappers/agent_cmds.cc)0
-rw-r--r--src/aclk/schema-wrappers/agent_cmds.h (renamed from aclk/schema-wrappers/agent_cmds.h)0
-rw-r--r--src/aclk/schema-wrappers/alarm_config.cc (renamed from aclk/schema-wrappers/alarm_config.cc)0
-rw-r--r--src/aclk/schema-wrappers/alarm_config.h (renamed from aclk/schema-wrappers/alarm_config.h)0
-rw-r--r--src/aclk/schema-wrappers/alarm_stream.cc (renamed from aclk/schema-wrappers/alarm_stream.cc)0
-rw-r--r--src/aclk/schema-wrappers/alarm_stream.h (renamed from aclk/schema-wrappers/alarm_stream.h)0
-rw-r--r--src/aclk/schema-wrappers/capability.cc (renamed from aclk/schema-wrappers/capability.cc)0
-rw-r--r--src/aclk/schema-wrappers/capability.h (renamed from aclk/schema-wrappers/capability.h)0
-rw-r--r--src/aclk/schema-wrappers/connection.cc (renamed from aclk/schema-wrappers/connection.cc)0
-rw-r--r--src/aclk/schema-wrappers/connection.h (renamed from aclk/schema-wrappers/connection.h)0
-rw-r--r--src/aclk/schema-wrappers/context.cc (renamed from aclk/schema-wrappers/context.cc)0
-rw-r--r--src/aclk/schema-wrappers/context.h (renamed from aclk/schema-wrappers/context.h)0
-rw-r--r--src/aclk/schema-wrappers/context_stream.cc (renamed from aclk/schema-wrappers/context_stream.cc)0
-rw-r--r--src/aclk/schema-wrappers/context_stream.h (renamed from aclk/schema-wrappers/context_stream.h)0
-rw-r--r--src/aclk/schema-wrappers/node_connection.cc (renamed from aclk/schema-wrappers/node_connection.cc)0
-rw-r--r--src/aclk/schema-wrappers/node_connection.h (renamed from aclk/schema-wrappers/node_connection.h)0
-rw-r--r--src/aclk/schema-wrappers/node_creation.cc (renamed from aclk/schema-wrappers/node_creation.cc)0
-rw-r--r--src/aclk/schema-wrappers/node_creation.h (renamed from aclk/schema-wrappers/node_creation.h)0
-rw-r--r--src/aclk/schema-wrappers/node_info.cc (renamed from aclk/schema-wrappers/node_info.cc)0
-rw-r--r--src/aclk/schema-wrappers/node_info.h (renamed from aclk/schema-wrappers/node_info.h)0
-rw-r--r--src/aclk/schema-wrappers/proto_2_json.cc (renamed from aclk/schema-wrappers/proto_2_json.cc)0
-rw-r--r--src/aclk/schema-wrappers/proto_2_json.h (renamed from aclk/schema-wrappers/proto_2_json.h)0
-rw-r--r--src/aclk/schema-wrappers/schema_wrapper_utils.cc (renamed from aclk/schema-wrappers/schema_wrapper_utils.cc)0
-rw-r--r--src/aclk/schema-wrappers/schema_wrapper_utils.h24
-rw-r--r--src/aclk/schema-wrappers/schema_wrappers.h (renamed from aclk/schema-wrappers/schema_wrappers.h)0
80 files changed, 798 insertions, 219 deletions
diff --git a/src/aclk/README.md b/src/aclk/README.md
new file mode 100644
index 000000000..0a260868c
--- /dev/null
+++ b/src/aclk/README.md
@@ -0,0 +1,133 @@
+# Agent-Cloud link (ACLK)
+
+The Agent-Cloud link (ACLK) is the mechanism responsible for securely connecting a Netdata Agent to your web browser
+through Netdata Cloud. The ACLK establishes an outgoing secure WebSocket (WSS) connection to Netdata Cloud on port
+`443`. The ACLK is encrypted, safe, and _is only established if you connect your node_.
+
+The Cloud App lives at app.netdata.cloud which currently resolves to the following list of IPs:
+
+- 54.198.178.11
+- 44.207.131.212
+- 44.196.50.41
+
+> ### Caution
+>
+>This list of IPs can change without notice, we strongly advise you to whitelist following domains `app.netdata.cloud`, `mqtt.netdata.cloud`, if this is not an option in your case always verify the current domain resolution (e.g via the `host` command).
+
+For a guide to connecting a node using the ACLK, plus additional troubleshooting and reference information, read our [connect to Cloud
+documentation](/src/claim/README.md).
+
+## Data privacy
+
+[Data privacy](https://netdata.cloud/privacy/) is very important to us. We firmly believe that your data belongs to
+you. This is why **we don't store any metric data in Netdata Cloud**.
+
+All the data that you see in the web browser when using Netdata Cloud, is actually streamed directly from the Netdata Agent to the Netdata Cloud dashboard. The data passes through our systems, but it isn't stored.
+
+However, to be able to offer the stunning visualizations and advanced functionality of Netdata Cloud, it does store a limited number of _metadata_. Read more about our [security and privacy design](/docs/security-and-privacy-design/README.md).
+
+## Enable and configure the ACLK
+
+The ACLK is enabled by default, with its settings automatically configured and stored in the Agent's memory. No file is
+created at `/var/lib/netdata/cloud.d/cloud.conf` until you either connect a node or create it yourself. The default
+configuration uses two settings:
+
+```conf
+[global]
+ enabled = yes
+ cloud base url = https://app.netdata.cloud
+```
+
+If your Agent needs to use a proxy to access the internet, you must [set up a proxy for
+connecting to cloud](/src/claim/README.md#connect-through-a-proxy).
+
+You can configure following keys in the `netdata.conf` section `[cloud]`:
+```
+[cloud]
+ statistics = yes
+ query thread count = 2
+```
+
+- `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent.
+- `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries).
+
+## Disable the ACLK
+
+You have two options if you prefer to disable the ACLK and not use Netdata Cloud.
+
+### Disable at installation
+
+You can pass the `--disable-cloud` parameter to the Agent installation when using a kickstart script
+([kickstart.sh](/packaging/installer/methods/kickstart.md), or a [manual installation from
+Git](/packaging/installer/methods/manual.md).
+
+When you pass this parameter, the installer does not download or compile any extra libraries. Once running, the Agent
+kills the thread responsible for the ACLK and connecting behavior, and behaves as though the ACLK, and thus Netdata Cloud,
+does not exist.
+
+### Disable at runtime
+
+You can change a runtime setting in your `cloud.conf` file to disable the ACLK. This setting only stops the Agent from
+attempting any connection via the ACLK, but does not prevent the installer from downloading and compiling the ACLK's
+dependencies.
+
+The file typically exists at `/var/lib/netdata/cloud.d/cloud.conf`, but can change if you set a prefix during
+installation. To disable the ACLK, open that file and change the `enabled` setting to `no`:
+
+```conf
+[global]
+ enabled = no
+```
+
+If the file at `/var/lib/netdata/cloud.d/cloud.conf` doesn't exist, you need to create it.
+
+Copy and paste the first two lines from below, which will change your prompt to `cat`.
+
+```bash
+cd /var/lib/netdata/cloud.d
+cat > cloud.conf << EOF
+```
+
+Copy and paste in lines 3-6, and after the final `EOF`, hit **Enter**. The final line must contain only `EOF`. Hit **Enter** again to return to your normal prompt with the newly-created file.
+
+To get your normal prompt back, the final line
+must contain only `EOF`.
+
+```bash
+[global]
+ enabled = no
+ cloud base url = https://app.netdata.cloud
+EOF
+```
+
+You also need to change the file's permissions. Use `grep "run as user" /etc/netdata/netdata.conf` to figure out which
+user your Agent runs as (typically `netdata`), and replace `netdata:netdata` as shown below if necessary:
+
+```bash
+sudo chmod 0770 cloud.conf
+sudo chown netdata:netdata cloud.conf
+```
+
+Restart your Agent to disable the ACLK.
+
+### Re-enable the ACLK
+
+If you first disable the ACLK and any Cloud functionality and then decide you would like to use Cloud, you must either
+[reinstall Netdata](/packaging/installer/REINSTALL.md) with Cloud enabled or change the runtime setting in your
+`cloud.conf` file.
+
+If you passed `--disable-cloud` to `netdata-installer.sh` during installation, you must
+[reinstall](/packaging/installer/REINSTALL.md) your Agent. Use the same method as before, but pass `--require-cloud` to
+the installer. When installation finishes you can [connect your node](/src/claim/README.md#how-to-connect-a-node).
+
+If you changed the runtime setting in your `var/lib/netdata/cloud.d/cloud.conf` file, edit the file again and change
+`enabled` to `yes`:
+
+```conf
+[global]
+ enabled = yes
+```
+
+Restart your Agent and [connect your node](/src/claim/README.md#how-to-connect-a-node).
+
+
diff --git a/aclk/aclk.c b/src/aclk/aclk.c
index e95d7d6ab..991745491 100644
--- a/aclk/aclk.c
+++ b/src/aclk/aclk.c
@@ -4,7 +4,7 @@
#ifdef ENABLE_ACLK
#include "aclk_stats.h"
-#include "mqtt_wss_client.h"
+#include "mqtt_websockets/mqtt_wss_client.h"
#include "aclk_otp.h"
#include "aclk_tx_msgs.h"
#include "aclk_query.h"
@@ -817,10 +817,6 @@ void *aclk_main(void *ptr)
unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers();
- // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
- // as it must notify the far end that it shutdown gracefully and avoid the LWT.
- netdata_thread_disable_cancelability();
-
#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
nd_log(NDLS_DAEMON, NDLP_INFO,
"Killing ACLK thread -> cloud functionality has been disabled");
@@ -861,13 +857,10 @@ void *aclk_main(void *ptr)
aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", global_statistics_enabled);
if (aclk_stats_enabled) {
stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
- stats_thread->thread = mallocz(sizeof(netdata_thread_t));
stats_thread->query_thread_count = query_threads.count;
stats_thread->client = mqttwss_client;
aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt);
- netdata_thread_create(
- stats_thread->thread, "ACLK_STATS", NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
- stats_thread);
+ stats_thread->thread = nd_thread_create("ACLK_STATS", NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, stats_thread);
}
// Keep reconnecting and talking until our time has come
@@ -901,9 +894,8 @@ exit_full:
aclk_query_threads_cleanup(&query_threads);
if (aclk_stats_enabled) {
- netdata_thread_join(*stats_thread->thread, NULL);
+ nd_thread_join(stats_thread->thread);
aclk_stats_thread_cleanup();
- freez(stats_thread->thread);
freez(stats_thread);
}
free_topic_cache();
@@ -919,7 +911,7 @@ exit:
void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)
{
- uuid_t node_id;
+ nd_uuid_t node_id;
int ret = 0;
if (!aclk_connected)
@@ -1165,7 +1157,7 @@ char *aclk_state(void)
buffer_strcat(wb, "\n\tAlert Streaming Status:");
fill_alert_status_for_host(wb, host);
}
- rrd_unlock();
+ rrd_rdunlock();
}
ret = strdupz(buffer_tostring(wb));
@@ -1315,7 +1307,7 @@ char *aclk_state_json(void)
json_object_array_add(grp, nodeinstance);
}
- rrd_unlock();
+ rrd_rdunlock();
json_object_object_add(msg, "node-instances", grp);
char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
diff --git a/aclk/aclk.h b/src/aclk/aclk.h
index 72d1a2e11..72d1a2e11 100644
--- a/aclk/aclk.h
+++ b/src/aclk/aclk.h
diff --git a/aclk/aclk_alarm_api.c b/src/aclk/aclk_alarm_api.c
index 664671f70..664671f70 100644
--- a/aclk/aclk_alarm_api.c
+++ b/src/aclk/aclk_alarm_api.c
diff --git a/aclk/aclk_alarm_api.h b/src/aclk/aclk_alarm_api.h
index 4d9d9447a..4d9d9447a 100644
--- a/aclk/aclk_alarm_api.h
+++ b/src/aclk/aclk_alarm_api.h
diff --git a/aclk/aclk_capas.c b/src/aclk/aclk_capas.c
index e8d85a2b9..00102ad4a 100644
--- a/aclk/aclk_capas.c
+++ b/src/aclk/aclk_capas.c
@@ -18,6 +18,7 @@ const struct capability *aclk_get_agent_capas()
{ .name = "http_api_v2", .version = HTTP_API_V2_VERSION, .enabled = 1 },
{ .name = "health", .version = 1, .enabled = 0 }, // index 7, below
{ .name = "req_cancel", .version = 1, .enabled = 1 },
+ { .name = "dyncfg", .version = 2, .enabled = 1 },
{ .name = NULL, .version = 0, .enabled = 0 }
};
agent_capabilities[2].version = ml_capable() ? 1 : 0;
@@ -34,6 +35,7 @@ const struct capability *aclk_get_agent_capas()
struct capability *aclk_get_node_instance_capas(RRDHOST *host)
{
bool functions = (host == localhost || (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS)));
+ bool dyncfg = (host == localhost || dyncfg_available_for_rrdhost(host));
struct capability ni_caps[] = {
{ .name = "proto", .version = 1, .enabled = 1 },
@@ -46,6 +48,7 @@ struct capability *aclk_get_node_instance_capas(RRDHOST *host)
{ .name = "http_api_v2", .version = HTTP_API_V2_VERSION, .enabled = 1 },
{ .name = "health", .version = 1, .enabled = host->health.health_enabled },
{ .name = "req_cancel", .version = 1, .enabled = 1 },
+ { .name = "dyncfg", .version = 2, .enabled = dyncfg },
{ .name = NULL, .version = 0, .enabled = 0 }
};
diff --git a/aclk/aclk_capas.h b/src/aclk/aclk_capas.h
index c39a197b8..c39a197b8 100644
--- a/aclk/aclk_capas.h
+++ b/src/aclk/aclk_capas.h
diff --git a/aclk/aclk_contexts_api.c b/src/aclk/aclk_contexts_api.c
index f3344935e..f3344935e 100644
--- a/aclk/aclk_contexts_api.c
+++ b/src/aclk/aclk_contexts_api.c
diff --git a/aclk/aclk_contexts_api.h b/src/aclk/aclk_contexts_api.h
index f0b5ec77e..f0b5ec77e 100644
--- a/aclk/aclk_contexts_api.h
+++ b/src/aclk/aclk_contexts_api.h
diff --git a/aclk/aclk_otp.c b/src/aclk/aclk_otp.c
index 207ca08cf..c9c75dd38 100644
--- a/aclk/aclk_otp.c
+++ b/src/aclk/aclk_otp.c
@@ -7,7 +7,7 @@
#include "daemon/common.h"
-#include "mqtt_websockets/c-rbuf/include/ringbuffer.h"
+#include "mqtt_websockets/c-rbuf/cringbuffer.h"
static int aclk_https_request(https_req_t *request, https_req_response_t *response) {
int rc;
@@ -839,7 +839,7 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) {
return 1;
}
- buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto,ctx&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
+ buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto,ctx&claim_id=%s", &(NETDATA_VERSION[1]) /* skip 'v' at beginning */, agent_id);
freez(agent_id);
diff --git a/aclk/aclk_otp.h b/src/aclk/aclk_otp.h
index 2d660e5a4..2d660e5a4 100644
--- a/aclk/aclk_otp.h
+++ b/src/aclk/aclk_otp.h
diff --git a/aclk/aclk_proxy.c b/src/aclk/aclk_proxy.c
index 4af46208f..8d0e2d657 100644
--- a/aclk/aclk_proxy.c
+++ b/src/aclk/aclk_proxy.c
@@ -15,20 +15,6 @@ struct {
{ .type = PROXY_TYPE_UNKNOWN, .url_str = NULL },
};
-const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type)
-{
- switch (*type) {
- case PROXY_DISABLED:
- return "disabled";
- case PROXY_TYPE_HTTP:
- return "HTTP";
- case PROXY_TYPE_SOCKS5:
- return "SOCKS";
- default:
- return "Unknown";
- }
-}
-
static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string)
{
int i = 0;
@@ -127,7 +113,12 @@ static inline int check_http_enviroment(const char **proxy)
const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type)
{
- const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV);
+ const char *proxy = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV);
+
+ // backward compatibility: "proxy" was in "netdata.conf"
+ if (config_exists(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR))
+ proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV);
+
*type = PROXY_DISABLED;
if (strcmp(proxy, "none") == 0)
diff --git a/aclk/aclk_proxy.h b/src/aclk/aclk_proxy.h
index b4ceb7df8..6877b526b 100644
--- a/aclk/aclk_proxy.h
+++ b/src/aclk/aclk_proxy.h
@@ -13,7 +13,6 @@ typedef enum aclk_proxy_type {
PROXY_NOT_SET,
} ACLK_PROXY_TYPE;
-const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type);
ACLK_PROXY_TYPE aclk_verify_proxy(const char *string);
const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type);
void safe_log_proxy_censor(char *proxy);
diff --git a/aclk/aclk_query.c b/src/aclk/aclk_query.c
index 5e3574b97..08bc2acf3 100644
--- a/aclk/aclk_query.c
+++ b/src/aclk/aclk_query.c
@@ -99,30 +99,48 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
BUFFER *local_buffer = NULL;
size_t size = 0;
size_t sent = 0;
+ usec_t dt_ut = 0;
int z_ret;
BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk);
- char *start, *end;
struct web_client *w = web_client_get_from_cache();
- w->acl = WEB_CLIENT_ACL_ACLK;
- w->mode = WEB_CLIENT_MODE_GET;
+ web_client_set_conn_cloud(w);
+ w->port_acl = HTTP_ACL_ACLK | HTTP_ACL_ALL_FEATURES;
+ w->acl = w->port_acl;
+ web_client_set_permissions(w, HTTP_ACCESS_MAP_OLD_MEMBER, HTTP_USER_ROLE_MEMBER, WEB_CLIENT_FLAG_AUTH_CLOUD);
+
+ w->mode = HTTP_REQUEST_MODE_GET;
w->timings.tv_in = query->created_tv;
w->interrupt.callback = aclk_web_client_interrupt_cb;
w->interrupt.callback_data = pending_req_list_add(query->msg_id);
- usec_t t;
+ buffer_flush(w->response.data);
+ buffer_strcat(w->response.data, query->data.http_api_v2.payload);
+
+ HTTP_VALIDATION validation = http_request_validate(w);
+ if(validation != HTTP_VALIDATION_OK) {
+ nd_log(NDLS_ACCESS, NDLP_ERR, "ACLK received request is not valid, code %d", validation);
+ retval = 1;
+ w->response.code = HTTP_RESP_BAD_REQUEST;
+ w->response.code = (short)aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id,
+ dt_ut, query->created, w->response.code,
+ NULL, 0);
+ goto cleanup;
+ }
+
web_client_timeout_checkpoint_set(w, query->timeout);
- if(web_client_timeout_checkpoint_and_check(w, &t)) {
- nd_log(NDLS_ACCESS, NDLP_ERR, "QUERY CANCELED: QUEUE TIME EXCEEDED %llu ms (LIMIT %d ms)", t / USEC_PER_MS, query->timeout);
+ if(web_client_timeout_checkpoint_and_check(w, &dt_ut)) {
+ nd_log(NDLS_ACCESS, NDLP_ERR,
+ "QUERY CANCELED: QUEUE TIME EXCEEDED %llu ms (LIMIT %d ms)",
+ dt_ut / USEC_PER_MS, query->timeout);
retval = 1;
w->response.code = HTTP_RESP_SERVICE_UNAVAILABLE;
aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, NULL, 0);
goto cleanup;
}
- web_client_decode_path_and_query_string(w, query->data.http_api_v2.query);
char *path = (char *)buffer_tostring(w->url_path_decoded);
if (aclk_stats_enabled) {
@@ -134,41 +152,24 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
w->response.code = (short)web_client_api_request_with_node_selection(localhost, w, path);
- web_client_timeout_checkpoint_response_ready(w, &t);
+ web_client_timeout_checkpoint_response_ready(w, &dt_ut);
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_q_process_total += t;
+ aclk_metrics_per_sample.cloud_q_process_total += dt_ut;
aclk_metrics_per_sample.cloud_q_process_count++;
- if (aclk_metrics_per_sample.cloud_q_process_max < t)
- aclk_metrics_per_sample.cloud_q_process_max = t;
+ if (aclk_metrics_per_sample.cloud_q_process_max < dt_ut)
+ aclk_metrics_per_sample.cloud_q_process_max = dt_ut;
ACLK_STATS_UNLOCK;
}
size = w->response.data->len;
sent = size;
- // check if gzip encoding can and should be used
- if ((start = strstr((char *)query->data.http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) {
- start += strlen(WEB_HDR_ACCEPT_ENC);
- end = strstr(start, "\x0D\x0A");
- start = strstr(start, "gzip");
-
- if (start && start < end) {
- w->response.zstream.zalloc = Z_NULL;
- w->response.zstream.zfree = Z_NULL;
- w->response.zstream.opaque = Z_NULL;
- if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) {
- w->response.zinitialized = true;
- w->response.zoutput = true;
- } else
- netdata_log_error("Failed to initialize zlib. Proceeding without compression.");
- }
- }
-
if (w->response.data->len && w->response.zinitialized) {
w->response.zstream.next_in = (Bytef *)w->response.data->buffer;
w->response.zstream.avail_in = w->response.data->len;
+
do {
w->response.zstream.avail_out = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE;
w->response.zstream.next_out = w->response.zbuffer;
@@ -188,6 +189,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
memcpy(&z_buffer->buffer[z_buffer->len], w->response.zbuffer, bytes_to_cpy);
z_buffer->len += bytes_to_cpy;
} while(z_ret != Z_STREAM_END);
+
// so that web_client_build_http_header
// puts correct content length into header
buffer_free(w->response.data);
@@ -213,7 +215,9 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
// send msg.
- w->response.code = aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len);
+ w->response.code = (short)aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id,
+ dt_ut, query->created, w->response.code,
+ local_buffer->buffer, local_buffer->len);
cleanup:
web_client_log_completed_request(w, false);
@@ -347,8 +351,11 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss
if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "ACLK_QRY[%d]", i) < 0))
netdata_log_error("snprintf encoding error");
- netdata_thread_create(
- &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread,
+
+ query_threads->thread_list[i].thread = nd_thread_create(
+ thread_name,
+ NETDATA_THREAD_OPTION_JOINABLE,
+ aclk_query_main_thread,
&query_threads->thread_list[i]);
}
}
@@ -357,7 +364,7 @@ void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
{
if (query_threads && query_threads->thread_list) {
for (int i = 0; i < query_threads->count; i++) {
- netdata_thread_join(query_threads->thread_list[i].thread, NULL);
+ nd_thread_join(query_threads->thread_list[i].thread);
}
freez(query_threads->thread_list);
}
diff --git a/aclk/aclk_query.h b/src/aclk/aclk_query.h
index dbe6f9e5e..900583237 100644
--- a/aclk/aclk_query.h
+++ b/src/aclk/aclk_query.h
@@ -5,7 +5,7 @@
#include "libnetdata/libnetdata.h"
-#include "mqtt_wss_client.h"
+#include "mqtt_websockets/mqtt_wss_client.h"
#include "aclk_query_queue.h"
@@ -18,7 +18,7 @@ extern pthread_mutex_t query_lock_wait;
//extern volatile int aclk_connected;
struct aclk_query_thread {
- netdata_thread_t thread;
+ ND_THREAD *thread;
int idx;
mqtt_wss_client client;
};
diff --git a/aclk/aclk_query_queue.c b/src/aclk/aclk_query_queue.c
index 8ca21d456..3edadc002 100644
--- a/aclk/aclk_query_queue.c
+++ b/src/aclk/aclk_query_queue.c
@@ -10,11 +10,9 @@ static netdata_mutex_t aclk_query_queue_mutex = NETDATA_MUTEX_INITIALIZER;
static struct aclk_query_queue {
aclk_query_t head;
- aclk_query_t tail;
int block_push;
} aclk_query_queue = {
.head = NULL,
- .tail = NULL,
.block_push = 0
};
@@ -31,15 +29,7 @@ static inline int _aclk_queue_query(aclk_query_t query)
aclk_query_free(query);
return 1;
}
- if (!aclk_query_queue.head) {
- aclk_query_queue.head = query;
- aclk_query_queue.tail = query;
- ACLK_QUEUE_UNLOCK;
- return 0;
- }
- // TODO deduplication
- aclk_query_queue.tail->next = query;
- aclk_query_queue.tail = query;
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(aclk_query_queue.head, query, prev, next);
ACLK_QUEUE_UNLOCK;
return 0;
@@ -77,9 +67,7 @@ aclk_query_t aclk_queue_pop(void)
return ret;
}
- aclk_query_queue.head = ret->next;
- if (unlikely(!aclk_query_queue.head))
- aclk_query_queue.tail = aclk_query_queue.head;
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(aclk_query_queue.head, ret, prev, next);
ACLK_QUEUE_UNLOCK;
ret->next = NULL;
@@ -92,7 +80,7 @@ void aclk_queue_flush(void)
while (query) {
aclk_query_free(query);
query = aclk_queue_pop();
- };
+ }
}
aclk_query_t aclk_query_new(aclk_query_type_t type)
diff --git a/aclk/aclk_query_queue.h b/src/aclk/aclk_query_queue.h
index 5983561a6..4a4a36a3f 100644
--- a/aclk/aclk_query_queue.h
+++ b/src/aclk/aclk_query_queue.h
@@ -55,7 +55,7 @@ struct aclk_query {
struct timeval created_tv;
usec_t created;
int timeout;
- aclk_query_t next;
+ aclk_query_t prev, next;
// TODO maybe remove?
int version;
@@ -75,12 +75,13 @@ void aclk_queue_flush(void);
void aclk_queue_lock(void);
void aclk_queue_unlock(void);
-#define QUEUE_IF_PAYLOAD_PRESENT(query) \
+#define QUEUE_IF_PAYLOAD_PRESENT(query) do { \
if (likely(query->data.bin_payload.payload)) { \
aclk_queue_query(query); \
} else { \
- netdata_log_error("Failed to generate payload (%s)", __FUNCTION__); \
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to generate payload"); \
aclk_query_free(query); \
- }
+ } \
+} while(0)
#endif /* NETDATA_ACLK_QUERY_QUEUE_H */
diff --git a/aclk/aclk_rrdhost_state.h b/src/aclk/aclk_rrdhost_state.h
index 5c8a2ddc9..5c8a2ddc9 100644
--- a/aclk/aclk_rrdhost_state.h
+++ b/src/aclk/aclk_rrdhost_state.h
diff --git a/aclk/aclk_rx_msgs.c b/src/aclk/aclk_rx_msgs.c
index 0e91e28c0..60e421928 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/src/aclk/aclk_rx_msgs.c
@@ -11,7 +11,6 @@
#include "schema-wrappers/proto_2_json.h"
#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
-#define ACLK_CLOUD_REQ_V2_PREFIX "GET /"
#define ACLK_V_COMPRESSION 2
@@ -100,13 +99,17 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur
{
const char *start, *end;
- // TODO better check of URL
- if(strncmp(payload, ACLK_CLOUD_REQ_V2_PREFIX, strlen(ACLK_CLOUD_REQ_V2_PREFIX))) {
+ if(strncmp(payload, "GET /", 5) == 0 || strncmp(payload, "PUT /", 5) == 0)
+ start = payload + 4;
+ else if(strncmp(payload, "POST /", 6) == 0)
+ start = payload + 5;
+ else if(strncmp(payload, "DELETE /", 8) == 0)
+ start = payload + 7;
+ else {
errno = 0;
- netdata_log_error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX);
+ netdata_log_error("Only accepting requests that start with GET, POST, PUT, DELETE from CLOUD.");
return 1;
}
- start = payload + 4;
if(!(end = strstr(payload, HTTP_1_1 HTTP_ENDL))) {
errno = 0;
@@ -196,7 +199,7 @@ int aclk_handle_cloud_cmd_message(char *payload)
// Originally we were expecting to have multiple types of 'cmd' message,
// but after the new protocol was designed we will ever only have 'http'
- if (strcmp(cloud_to_agent.type_id, "http")) {
+ if (strcmp(cloud_to_agent.type_id, "http") != 0) {
error_report("Only 'http' cmd message is supported");
goto err_cleanup;
}
@@ -252,7 +255,7 @@ int create_node_instance_result(const char *msg, size_t msg_len)
netdata_log_debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id);
- uuid_t host_id, node_id;
+ nd_uuid_t host_id, node_id;
if (uuid_parse(res.machine_guid, host_id)) {
netdata_log_error("Error parsing machine_guid provided by CreateNodeInstanceResult");
freez(res.machine_guid);
diff --git a/aclk/aclk_rx_msgs.h b/src/aclk/aclk_rx_msgs.h
index 61921faec..61921faec 100644
--- a/aclk/aclk_rx_msgs.h
+++ b/src/aclk/aclk_rx_msgs.h
diff --git a/aclk/aclk_stats.c b/src/aclk/aclk_stats.c
index f4672882b..47a48c366 100644
--- a/aclk/aclk_stats.c
+++ b/src/aclk/aclk_stats.c
@@ -1,6 +1,8 @@
// SPDX-License-Identifier: GPL-3.0-or-later
+#ifndef MQTT_WSS_CPUSTATS
#define MQTT_WSS_CPUSTATS
+#endif
#include "aclk_stats.h"
@@ -385,11 +387,12 @@ void *aclk_stats_main_thread(void *ptr)
struct aclk_metrics permanent;
while (service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) {
- netdata_thread_testcancel();
+
// ------------------------------------------------------------------------
// Wait for the next iteration point.
heartbeat_next(&hb, step_ut);
+
if (!service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) break;
ACLK_STATS_LOCK;
diff --git a/aclk/aclk_stats.h b/src/aclk/aclk_stats.h
index 002ebcfa6..e13269557 100644
--- a/aclk/aclk_stats.h
+++ b/src/aclk/aclk_stats.h
@@ -6,7 +6,7 @@
#include "daemon/common.h"
#include "libnetdata/libnetdata.h"
#include "aclk_query_queue.h"
-#include "mqtt_wss_client.h"
+#include "mqtt_websockets/mqtt_wss_client.h"
extern netdata_mutex_t aclk_stats_mutex;
@@ -19,7 +19,7 @@ extern netdata_mutex_t aclk_stats_mutex;
int aclk_cloud_req_http_type_to_idx(const char *name);
struct aclk_stats_thread {
- netdata_thread_t *thread;
+ ND_THREAD *thread;
int query_thread_count;
mqtt_wss_client client;
};
diff --git a/aclk/aclk_tx_msgs.c b/src/aclk/aclk_tx_msgs.c
index 0e4182a72..c1ed68052 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/src/aclk/aclk_tx_msgs.c
@@ -101,7 +101,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
*/
static struct json_object *create_hdr(const char *type, const char *msg_id, time_t ts_secs, usec_t ts_us, int version)
{
- uuid_t uuid;
+ nd_uuid_t uuid;
char uuid_str[36 + 1];
json_object *tmp;
json_object *obj = json_object_new_object();
diff --git a/aclk/aclk_tx_msgs.h b/src/aclk/aclk_tx_msgs.h
index 9e7d89077..86ed20c38 100644
--- a/aclk/aclk_tx_msgs.h
+++ b/src/aclk/aclk_tx_msgs.h
@@ -5,7 +5,7 @@
#include <json-c/json.h>
#include "libnetdata/libnetdata.h"
#include "daemon/common.h"
-#include "mqtt_wss_client.h"
+#include "mqtt_websockets/mqtt_wss_client.h"
#include "schema-wrappers/schema_wrappers.h"
#include "aclk_util.h"
diff --git a/aclk/aclk_util.c b/src/aclk/aclk_util.c
index 3bf2e3f18..3bf2e3f18 100644
--- a/aclk/aclk_util.c
+++ b/src/aclk/aclk_util.c
diff --git a/aclk/aclk_util.h b/src/aclk/aclk_util.h
index 38ef5b0bc..6c0239cc3 100644
--- a/aclk/aclk_util.h
+++ b/src/aclk/aclk_util.h
@@ -5,7 +5,7 @@
#include "libnetdata/libnetdata.h"
#ifdef ENABLE_ACLK
-#include "mqtt_wss_client.h"
+#include "mqtt_websockets/mqtt_wss_client.h"
#define CLOUD_EC_MALFORMED_NODE_ID 1
#define CLOUD_EMSG_MALFORMED_NODE_ID "URL requests node_id but there is not enough chars following (for it to be valid uuid)."
diff --git a/aclk/helpers/mqtt_wss_pal.h b/src/aclk/helpers/mqtt_wss_pal.h
index 5c89f8bb7..5c89f8bb7 100644
--- a/aclk/helpers/mqtt_wss_pal.h
+++ b/src/aclk/helpers/mqtt_wss_pal.h
diff --git a/aclk/helpers/ringbuffer_pal.h b/src/aclk/helpers/ringbuffer_pal.h
index 2f7e1cb93..2f7e1cb93 100644
--- a/aclk/helpers/ringbuffer_pal.h
+++ b/src/aclk/helpers/ringbuffer_pal.h
diff --git a/aclk/https_client.c b/src/aclk/https_client.c
index 5385786b8..2bc768f24 100644
--- a/aclk/https_client.c
+++ b/src/aclk/https_client.c
@@ -8,8 +8,6 @@
#include "daemon/global_statistics.h"
-#define DEFAULT_CHUNKED_RESPONSE_BUFFER_SIZE (4096)
-
static const char *http_req_type_to_str(http_req_type_t req) {
switch (req) {
case HTTP_REQ_GET:
@@ -25,10 +23,9 @@ static const char *http_req_type_to_str(http_req_type_t req) {
#define TRANSFER_ENCODING_CHUNKED (-2)
-#define HTTP_PARSE_CTX_INITIALIZER { .state = HTTP_PARSE_INITIAL, .content_length = -1, .http_code = 0 }
-void http_parse_ctx_create(http_parse_ctx *ctx)
+void http_parse_ctx_create(http_parse_ctx *ctx, enum http_parse_state parse_state)
{
- ctx->state = HTTP_PARSE_INITIAL;
+ ctx->state = parse_state;
ctx->content_length = -1;
ctx->http_code = 0;
ctx->headers = c_rhash_new(0);
@@ -54,6 +51,7 @@ void http_parse_ctx_destroy(http_parse_ctx *ctx)
#define HTTP_LINE_TERM "\x0D\x0A"
#define RESP_PROTO "HTTP/1.1 "
+#define RESP_PROTO10 "HTTP/1.0 "
#define HTTP_KEYVAL_SEPARATOR ": "
#define HTTP_HDR_BUFFER_SIZE 1024
#define PORT_STR_MAX_BYTES 12
@@ -247,10 +245,20 @@ http_parse_rc parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx)
if (parse_ctx->state != HTTP_PARSE_CONTENT && !rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx))
return HTTP_PARSE_NEED_MORE_DATA;
switch (parse_ctx->state) {
+ case HTTP_PARSE_PROXY_CONNECT:
case HTTP_PARSE_INITIAL:
if (rbuf_memcmp_n(buf, RESP_PROTO, strlen(RESP_PROTO))) {
- netdata_log_error("Expected response to start with \"%s\"", RESP_PROTO);
- return HTTP_PARSE_ERROR;
+ if (parse_ctx->state == HTTP_PARSE_PROXY_CONNECT) {
+ if (rbuf_memcmp_n(buf, RESP_PROTO10, strlen(RESP_PROTO10))) {
+ netdata_log_error(
+ "Expected response to start with \"%s\" or \"%s\"", RESP_PROTO, RESP_PROTO10);
+ return HTTP_PARSE_ERROR;
+ }
+ }
+ else {
+ netdata_log_error("Expected response to start with \"%s\"", RESP_PROTO);
+ return HTTP_PARSE_ERROR;
+ }
}
rbuf_bump_tail(buf, strlen(RESP_PROTO));
if (rbuf_pop(buf, rc, 4) != 4) {
@@ -316,8 +324,6 @@ typedef struct https_req_ctx {
size_t written;
- int self_signed_allowed;
-
http_parse_ctx parse_ctx;
time_t req_start_time;
@@ -494,47 +500,45 @@ static int read_parse_response(https_req_ctx_t *ctx) {
return 0;
}
+static const char *http_methods[] = {
+ [HTTP_REQ_GET] = "GET ",
+ [HTTP_REQ_POST] = "POST ",
+ [HTTP_REQ_CONNECT] = "CONNECT ",
+};
+
+
#define TX_BUFFER_SIZE 8192
#define RX_BUFFER_SIZE (TX_BUFFER_SIZE*2)
static int handle_http_request(https_req_ctx_t *ctx) {
BUFFER *hdr = buffer_create(TX_BUFFER_SIZE, &netdata_buffers_statistics.buffers_aclk);
int rc = 0;
- http_parse_ctx_create(&ctx->parse_ctx);
+ http_req_type_t req_type = ctx->request->request_type;
- // Prepare data to send
- switch (ctx->request->request_type) {
- case HTTP_REQ_CONNECT:
- buffer_strcat(hdr, "CONNECT ");
- break;
- case HTTP_REQ_GET:
- buffer_strcat(hdr, "GET ");
- break;
- case HTTP_REQ_POST:
- buffer_strcat(hdr, "POST ");
- break;
- default:
- netdata_log_error("Unknown HTTPS request type!");
- rc = 1;
- goto err_exit;
+ if (req_type >= HTTP_REQ_INVALID) {
+ netdata_log_error("Unknown HTTPS request type!");
+ rc = 1;
+ goto err_exit;
}
+ buffer_strcat(hdr, http_methods[req_type]);
- if (ctx->request->request_type == HTTP_REQ_CONNECT) {
+ if (req_type == HTTP_REQ_CONNECT) {
buffer_strcat(hdr, ctx->request->host);
buffer_sprintf(hdr, ":%d", ctx->request->port);
- } else {
+ http_parse_ctx_create(&ctx->parse_ctx, HTTP_PARSE_PROXY_CONNECT);
+ }
+ else {
buffer_strcat(hdr, ctx->request->url);
+ http_parse_ctx_create(&ctx->parse_ctx, HTTP_PARSE_INITIAL);
}
buffer_strcat(hdr, HTTP_1_1 HTTP_ENDL);
//TODO Headers!
- if (ctx->request->request_type != HTTP_REQ_CONNECT) {
- buffer_sprintf(hdr, "Host: %s\x0D\x0A", ctx->request->host);
- }
+ buffer_sprintf(hdr, "Host: %s\x0D\x0A", ctx->request->host);
buffer_strcat(hdr, "User-Agent: Netdata/rocks newhttpclient\x0D\x0A");
- if (ctx->request->request_type == HTTP_REQ_POST && ctx->request->payload && ctx->request->payload_size) {
+ if (req_type == HTTP_REQ_POST && ctx->request->payload && ctx->request->payload_size) {
buffer_sprintf(hdr, "Content-Length: %zu\x0D\x0A", ctx->request->payload_size);
}
if (ctx->request->proxy_username) {
@@ -565,7 +569,7 @@ static int handle_http_request(https_req_ctx_t *ctx) {
goto err_exit;
}
- if (ctx->request->request_type == HTTP_REQ_POST && ctx->request->payload && ctx->request->payload_size) {
+ if (req_type == HTTP_REQ_POST && ctx->request->payload && ctx->request->payload_size) {
if (https_client_write_all(ctx, ctx->request->payload, ctx->request->payload_size)) {
netdata_log_error("Couldn't write payload into SSL connection");
rc = 3;
@@ -762,12 +766,6 @@ void https_req_response_free(https_req_response_t *res) {
freez(res->payload);
}
-void https_req_response_init(https_req_response_t *res) {
- res->http_code = 0;
- res->payload = NULL;
- res->payload_size = 0;
-}
-
static inline char *UNUSED_FUNCTION(min_non_null)(char *a, char *b) {
if (!a)
return b;
@@ -816,12 +814,11 @@ static inline void port_by_proto(url_t *url) {
}
}
-#define STRDUPZ_2PTR(dest, start, end) \
- { \
+#define STRDUPZ_2PTR(dest, start, end) do { \
dest = mallocz(1 + end - start); \
memcpy(dest, start, end - start); \
dest[end - start] = 0; \
- }
+ } while(0)
int url_parse(const char *url, url_t *parsed) {
const char *start = url;
@@ -833,7 +830,7 @@ int url_parse(const char *url, url_t *parsed) {
return 1;
}
- STRDUPZ_2PTR(parsed->proto, start, end)
+ STRDUPZ_2PTR(parsed->proto, start, end);
start = end + strlen(URI_PROTO_SEPARATOR);
}
diff --git a/aclk/https_client.h b/src/aclk/https_client.h
index 0b97fbb02..bc5ca30b8 100644
--- a/aclk/https_client.h
+++ b/src/aclk/https_client.h
@@ -5,13 +5,14 @@
#include "libnetdata/libnetdata.h"
-#include "mqtt_websockets/c-rbuf/include/ringbuffer.h"
-#include "mqtt_websockets/c_rhash/include/c_rhash.h"
+#include "mqtt_websockets/c-rbuf/cringbuffer.h"
+#include "mqtt_websockets/c_rhash/c_rhash.h"
typedef enum http_req_type {
HTTP_REQ_GET = 0,
HTTP_REQ_POST,
- HTTP_REQ_CONNECT
+ HTTP_REQ_CONNECT,
+ HTTP_REQ_INVALID
} http_req_type_t;
typedef struct {
@@ -56,7 +57,6 @@ int url_parse(const char *url, url_t *parsed);
void url_t_destroy(url_t *url);
void https_req_response_free(https_req_response_t *res);
-void https_req_response_init(https_req_response_t *res);
#define HTTPS_REQ_RESPONSE_T_INITIALIZER \
{ \
@@ -83,7 +83,8 @@ int https_request(https_req_t *request, https_req_response_t *response);
// we expose previously internal parser as this is usefull also from
// other parts of the code
enum http_parse_state {
- HTTP_PARSE_INITIAL = 0,
+ HTTP_PARSE_PROXY_CONNECT = 0,
+ HTTP_PARSE_INITIAL,
HTTP_PARSE_HEADERS,
HTTP_PARSE_CONTENT
};
@@ -119,7 +120,7 @@ typedef struct {
size_t chunk_got;
} http_parse_ctx;
-void http_parse_ctx_create(http_parse_ctx *ctx);
+void http_parse_ctx_create(http_parse_ctx *ctx, enum http_parse_state parse_state);
void http_parse_ctx_destroy(http_parse_ctx *ctx);
typedef enum {
diff --git a/mqtt_websockets/.github/workflows/run-tests.yaml b/src/aclk/mqtt_websockets/.github/workflows/run-tests.yaml
index da5dde821..da5dde821 100644
--- a/mqtt_websockets/.github/workflows/run-tests.yaml
+++ b/src/aclk/mqtt_websockets/.github/workflows/run-tests.yaml
diff --git a/mqtt_websockets/.gitignore b/src/aclk/mqtt_websockets/.gitignore
index 9f1a0d89a..9f1a0d89a 100644
--- a/mqtt_websockets/.gitignore
+++ b/src/aclk/mqtt_websockets/.gitignore
diff --git a/src/aclk/mqtt_websockets/README.md b/src/aclk/mqtt_websockets/README.md
new file mode 100644
index 000000000..b159686df
--- /dev/null
+++ b/src/aclk/mqtt_websockets/README.md
@@ -0,0 +1,7 @@
+# mqtt_websockets
+
+Library to connect MQTT client over Websockets Secure (WSS).
+
+## License
+
+The Project is released under GPL v3 license. See [License](LICENSE)
diff --git a/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.c b/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.c
new file mode 100644
index 000000000..8950c6906
--- /dev/null
+++ b/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.c
@@ -0,0 +1,203 @@
+// Copyright: SPDX-License-Identifier: GPL-3.0-only
+
+#include "cringbuffer.h"
+#include "cringbuffer_internal.h"
+
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+
+#define MIN(a,b) (((a)<(b))?(a):(b))
+#define MAX(a,b) (((a)>(b))?(a):(b))
+
+// this allows user to use their own
+// custom memory allocation functions
+#ifdef RBUF_CUSTOM_MALLOC
+#include "../../helpers/ringbuffer_pal.h"
+#else
+#define crbuf_malloc(...) malloc(__VA_ARGS__)
+#define crbuf_free(...) free(__VA_ARGS__)
+#endif
+
+rbuf_t rbuf_create(size_t size)
+{
+ rbuf_t buffer = crbuf_malloc(sizeof(struct rbuf_t) + size);
+ if (!buffer)
+ return NULL;
+
+ memset(buffer, 0, sizeof(struct rbuf_t));
+
+ buffer->data = ((char*)buffer) + sizeof(struct rbuf_t);
+
+ buffer->head = buffer->data;
+ buffer->tail = buffer->data;
+ buffer->size = size;
+ buffer->end = buffer->data + size;
+
+ return buffer;
+}
+
+void rbuf_free(rbuf_t buffer)
+{
+ crbuf_free(buffer);
+}
+
+void rbuf_flush(rbuf_t buffer)
+{
+ buffer->head = buffer->data;
+ buffer->tail = buffer->data;
+ buffer->size_data = 0;
+}
+
+char *rbuf_get_linear_insert_range(rbuf_t buffer, size_t *bytes)
+{
+ *bytes = 0;
+ if (buffer->head == buffer->tail && buffer->size_data)
+ return NULL;
+
+ *bytes = ((buffer->head >= buffer->tail) ? buffer->end : buffer->tail) - buffer->head;
+ return buffer->head;
+}
+
+char *rbuf_get_linear_read_range(rbuf_t buffer, size_t *bytes)
+{
+ *bytes = 0;
+ if(buffer->head == buffer->tail && !buffer->size_data)
+ return NULL;
+
+ *bytes = ((buffer->tail >= buffer->head) ? buffer->end : buffer->head) - buffer->tail;
+
+ return buffer->tail;
+}
+
+int rbuf_bump_head(rbuf_t buffer, size_t bytes)
+{
+ size_t free_bytes = rbuf_bytes_free(buffer);
+ if (bytes > free_bytes)
+ return 0;
+ int i = buffer->head - buffer->data;
+ buffer->head = &buffer->data[(i + bytes) % buffer->size];
+ buffer->size_data += bytes;
+ return 1;
+}
+
+int rbuf_bump_tail(rbuf_t buffer, size_t bytes)
+{
+ if(!rbuf_bump_tail_noopt(buffer, bytes))
+ return 0;
+
+ // if tail catched up with head
+ // start writing buffer from beggining
+ // this is not necessary (rbuf must work well without it)
+ // but helps to optimize big writes as rbuf_get_linear_insert_range
+ // will return bigger continuous region
+ if(buffer->tail == buffer->head) {
+ assert(buffer->size_data == 0);
+ rbuf_flush(buffer);
+ }
+
+ return 1;
+}
+
+size_t rbuf_get_capacity(rbuf_t buffer)
+{
+ return buffer->size;
+}
+
+size_t rbuf_bytes_available(rbuf_t buffer)
+{
+ return buffer->size_data;
+}
+
+size_t rbuf_bytes_free(rbuf_t buffer)
+{
+ return buffer->size - buffer->size_data;
+}
+
+size_t rbuf_push(rbuf_t buffer, const char *data, size_t len)
+{
+ size_t to_cpy;
+ char *w_ptr = rbuf_get_linear_insert_range(buffer, &to_cpy);
+ if(!to_cpy)
+ return to_cpy;
+
+ to_cpy = MIN(to_cpy, len);
+ memcpy(w_ptr, data, to_cpy);
+ rbuf_bump_head(buffer, to_cpy);
+ if(to_cpy < len)
+ to_cpy += rbuf_push(buffer, &data[to_cpy], len - to_cpy);
+ return to_cpy;
+}
+
+size_t rbuf_pop(rbuf_t buffer, char *data, size_t len)
+{
+ size_t to_cpy;
+ const char *r_ptr = rbuf_get_linear_read_range(buffer, &to_cpy);
+ if(!to_cpy)
+ return to_cpy;
+
+ to_cpy = MIN(to_cpy, len);
+ memcpy(data, r_ptr, to_cpy);
+ rbuf_bump_tail(buffer, to_cpy);
+ if(to_cpy < len)
+ to_cpy += rbuf_pop(buffer, &data[to_cpy], len - to_cpy);
+ return to_cpy;
+}
+
+static inline void rbuf_ptr_inc(rbuf_t buffer, const char **ptr)
+{
+ (*ptr)++;
+ if(*ptr >= buffer->end)
+ *ptr = buffer->data;
+}
+
+int rbuf_memcmp(rbuf_t buffer, const char *haystack, const char *needle, size_t needle_bytes)
+{
+ const char *end = needle + needle_bytes;
+
+ // as head==tail can mean 2 things here
+ if (haystack == buffer->head && buffer->size_data) {
+ if (*haystack != *needle)
+ return (*haystack - *needle);
+ rbuf_ptr_inc(buffer, &haystack);
+ needle++;
+ }
+
+ while (haystack != buffer->head && needle != end) {
+ if (*haystack != *needle)
+ return (*haystack - *needle);
+ rbuf_ptr_inc(buffer, &haystack);
+ needle++;
+ }
+ return 0;
+}
+
+int rbuf_memcmp_n(rbuf_t buffer, const char *to_cmp, size_t to_cmp_bytes)
+{
+ return rbuf_memcmp(buffer, buffer->tail, to_cmp, to_cmp_bytes);
+}
+
+char *rbuf_find_bytes(rbuf_t buffer, const char *needle, size_t needle_bytes, int *found_idx)
+{
+ const char *ptr = buffer->tail;
+ *found_idx = 0;
+
+ if (!rbuf_bytes_available(buffer))
+ return NULL;
+
+ if (buffer->head == buffer->tail && buffer->size_data) {
+ if(!rbuf_memcmp(buffer, ptr, needle, needle_bytes))
+ return (char *)ptr;
+ rbuf_ptr_inc(buffer, &ptr);
+ (*found_idx)++;
+ }
+
+ while (ptr != buffer->head)
+ {
+ if(!rbuf_memcmp(buffer, ptr, needle, needle_bytes))
+ return (char *)ptr;
+ rbuf_ptr_inc(buffer, &ptr);
+ (*found_idx)++;
+ }
+ return NULL;
+}
diff --git a/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.h b/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.h
new file mode 100644
index 000000000..eb98035a9
--- /dev/null
+++ b/src/aclk/mqtt_websockets/c-rbuf/cringbuffer.h
@@ -0,0 +1,47 @@
+// Copyright: SPDX-License-Identifier: GPL-3.0-only
+
+#ifndef CRINGBUFFER_H
+#define CRINGBUFFER_H
+
+#include <stddef.h>
+
+typedef struct rbuf_t *rbuf_t;
+
+rbuf_t rbuf_create(size_t size);
+void rbuf_free(rbuf_t buffer);
+void rbuf_flush(rbuf_t buffer);
+
+/* /param bytes how much bytes can be copied into pointer returned
+ * /return pointer where data can be copied to or NULL if buffer full
+ */
+char *rbuf_get_linear_insert_range(rbuf_t buffer, size_t *bytes);
+char *rbuf_get_linear_read_range(rbuf_t buffer, size_t *bytes);
+
+int rbuf_bump_head(rbuf_t buffer, size_t bytes);
+int rbuf_bump_tail(rbuf_t buffer, size_t bytes);
+
+/* @param buffer related buffer instance
+ * @returns total capacity of buffer in bytes (not free/used)
+ */
+size_t rbuf_get_capacity(rbuf_t buffer);
+
+/* @param buffer related buffer instance
+ * @returns count of bytes stored in the buffer
+ */
+size_t rbuf_bytes_available(rbuf_t buffer);
+
+/* @param buffer related buffer instance
+ * @returns count of bytes available/free in the buffer (how many more bytes you can store in this buffer)
+ */
+size_t rbuf_bytes_free(rbuf_t buffer);
+
+/* writes as many bytes from `data` into the `buffer` as possible
+ * but maximum `len` bytes
+ */
+size_t rbuf_push(rbuf_t buffer, const char *data, size_t len);
+size_t rbuf_pop(rbuf_t buffer, char *data, size_t len);
+
+char *rbuf_find_bytes(rbuf_t buffer, const char *needle, size_t needle_bytes, int *found_idx);
+int rbuf_memcmp_n(rbuf_t buffer, const char *to_cmp, size_t to_cmp_bytes);
+
+#endif
diff --git a/src/aclk/mqtt_websockets/c-rbuf/cringbuffer_internal.h b/src/aclk/mqtt_websockets/c-rbuf/cringbuffer_internal.h
new file mode 100644
index 000000000..d32de187c
--- /dev/null
+++ b/src/aclk/mqtt_websockets/c-rbuf/cringbuffer_internal.h
@@ -0,0 +1,37 @@
+// Copyright: SPDX-License-Identifier: GPL-3.0-only
+
+#ifndef CRINGBUFFER_INTERNAL_H
+#define CRINGBUFFER_INTERNAL_H
+
+struct rbuf_t {
+ char *data;
+
+ // points to next byte where we can write
+ char *head;
+ // points to oldest (next to be poped) readable byte
+ char *tail;
+
+ // to avoid calculating data + size
+ // all the time
+ char *end;
+
+ size_t size;
+ size_t size_data;
+};
+
+/* this exists so that it can be tested by unit tests
+ * without optimization that resets head and tail to
+ * beginning if buffer empty
+ */
+inline static int rbuf_bump_tail_noopt(rbuf_t buffer, size_t bytes)
+{
+ if (bytes > buffer->size_data)
+ return 0;
+ int i = buffer->tail - buffer->data;
+ buffer->tail = &buffer->data[(i + bytes) % buffer->size];
+ buffer->size_data -= bytes;
+
+ return 1;
+}
+
+#endif
diff --git a/mqtt_websockets/c-rbuf/tests/ringbuffer_test.c b/src/aclk/mqtt_websockets/c-rbuf/ringbuffer_test.c
index d810ea5a1..6a17c9956 100644
--- a/mqtt_websockets/c-rbuf/tests/ringbuffer_test.c
+++ b/src/aclk/mqtt_websockets/c-rbuf/ringbuffer_test.c
@@ -1,10 +1,4 @@
-/*
- *
- * Copyright: SPDX-License-Identifier: LGPL-3.0-only
- *
- * Author: Timotej Šiškovič <timotejs@gmail.com>
- *
- */
+// Copyright: SPDX-License-Identifier: GPL-3.0-only
#include "ringbuffer.h"
diff --git a/mqtt_websockets/c_rhash/src/c_rhash.c b/src/aclk/mqtt_websockets/c_rhash/c_rhash.c
index fd130a442..a71b500e2 100644
--- a/mqtt_websockets/c_rhash/src/c_rhash.c
+++ b/src/aclk/mqtt_websockets/c_rhash/c_rhash.c
@@ -1,3 +1,5 @@
+// Copyright: SPDX-License-Identifier: GPL-3.0-only
+
#include "c_rhash_internal.h"
#include <stdlib.h>
@@ -259,4 +261,4 @@ void c_rhash_destroy(c_rhash hash) {
c_rhash_destroy_bin(hash->bins[i]);
}
c_rfree(hash);
-} \ No newline at end of file
+}
diff --git a/mqtt_websockets/c_rhash/include/c_rhash.h b/src/aclk/mqtt_websockets/c_rhash/c_rhash.h
index e14fea5de..37addd161 100644
--- a/mqtt_websockets/c_rhash/include/c_rhash.h
+++ b/src/aclk/mqtt_websockets/c_rhash/c_rhash.h
@@ -1,3 +1,5 @@
+// Copyright: SPDX-License-Identifier: GPL-3.0-only
+
#include <sys/types.h>
#include <stdint.h>
#include <stddef.h>
diff --git a/mqtt_websockets/c_rhash/src/c_rhash_internal.h b/src/aclk/mqtt_websockets/c_rhash/c_rhash_internal.h
index aefa9453c..20f741076 100644
--- a/mqtt_websockets/c_rhash/src/c_rhash_internal.h
+++ b/src/aclk/mqtt_websockets/c_rhash/c_rhash_internal.h
@@ -1,3 +1,5 @@
+// Copyright: SPDX-License-Identifier: GPL-3.0-only
+
#include "c_rhash.h"
struct bin_item {
diff --git a/mqtt_websockets/c_rhash/src/tests.c b/src/aclk/mqtt_websockets/c_rhash/tests.c
index 652aad69b..909c5562d 100644
--- a/mqtt_websockets/c_rhash/src/tests.c
+++ b/src/aclk/mqtt_websockets/c_rhash/tests.c
@@ -1,3 +1,5 @@
+// Copyright: SPDX-License-Identifier: GPL-3.0-only
+
#include <stdio.h>
#include <string.h>
diff --git a/src/aclk/mqtt_websockets/common_internal.h b/src/aclk/mqtt_websockets/common_internal.h
new file mode 100644
index 000000000..2be1c45b8
--- /dev/null
+++ b/src/aclk/mqtt_websockets/common_internal.h
@@ -0,0 +1,27 @@
+// SPDX-License-Identifier: GPL-3.0-only
+
+#ifndef COMMON_INTERNAL_H
+#define COMMON_INTERNAL_H
+
+#include "endian_compat.h"
+
+#ifdef MQTT_WSS_CUSTOM_ALLOC
+#include "../helpers/mqtt_wss_pal.h"
+#else
+#define mw_malloc(...) malloc(__VA_ARGS__)
+#define mw_calloc(...) calloc(__VA_ARGS__)
+#define mw_free(...) free(__VA_ARGS__)
+#define mw_strdup(...) strdup(__VA_ARGS__)
+#define mw_realloc(...) realloc(__VA_ARGS__)
+#endif
+
+#ifndef MQTT_WSS_FRAG_MEMALIGN
+#define MQTT_WSS_FRAG_MEMALIGN (8)
+#endif
+
+#define OPENSSL_VERSION_095 0x00905100L
+#define OPENSSL_VERSION_097 0x00907000L
+#define OPENSSL_VERSION_110 0x10100000L
+#define OPENSSL_VERSION_111 0x10101000L
+
+#endif /* COMMON_INTERNAL_H */
diff --git a/mqtt_websockets/src/common_public.c b/src/aclk/mqtt_websockets/common_public.c
index 7f74fa511..7991b0c23 100644
--- a/mqtt_websockets/src/common_public.c
+++ b/src/aclk/mqtt_websockets/common_public.c
@@ -1,3 +1,5 @@
+// Copyright: SPDX-License-Identifier: GPL-3.0-only
+
#include "common_public.h"
// this dummy exists to have a special pointer with special meaning
diff --git a/mqtt_websockets/src/include/common_public.h b/src/aclk/mqtt_websockets/common_public.h
index a855737f9..a855737f9 100644
--- a/mqtt_websockets/src/include/common_public.h
+++ b/src/aclk/mqtt_websockets/common_public.h
diff --git a/src/aclk/mqtt_websockets/endian_compat.h b/src/aclk/mqtt_websockets/endian_compat.h
new file mode 100644
index 000000000..b36d2c858
--- /dev/null
+++ b/src/aclk/mqtt_websockets/endian_compat.h
@@ -0,0 +1,31 @@
+// SPDX-License-Identifier: GPL-3.0-only
+
+#ifndef MQTT_WSS_ENDIAN_COMPAT_H
+#define MQTT_WSS_ENDIAN_COMPAT_H
+
+#ifdef __APPLE__
+ #include <libkern/OSByteOrder.h>
+
+ #define htobe16(x) OSSwapHostToBigInt16(x)
+ #define htole16(x) OSSwapHostToLittleInt16(x)
+ #define be16toh(x) OSSwapBigToHostInt16(x)
+ #define le16toh(x) OSSwapLittleToHostInt16(x)
+
+ #define htobe32(x) OSSwapHostToBigInt32(x)
+ #define htole32(x) OSSwapHostToLittleInt32(x)
+ #define be32toh(x) OSSwapBigToHostInt32(x)
+ #define le32toh(x) OSSwapLittleToHostInt32(x)
+
+ #define htobe64(x) OSSwapHostToBigInt64(x)
+ #define htole64(x) OSSwapHostToLittleInt64(x)
+ #define be64toh(x) OSSwapBigToHostInt64(x)
+ #define le64toh(x) OSSwapLittleToHostInt64(x)
+#else
+#ifdef __FreeBSD__
+ #include <sys/endian.h>
+#else
+ #include <endian.h>
+#endif
+#endif
+
+#endif /* MQTT_WSS_ENDIAN_COMPAT_H */
diff --git a/mqtt_websockets/src/include/mqtt_constants.h b/src/aclk/mqtt_websockets/mqtt_constants.h
index 1db498976..3d6a2aa4a 100644
--- a/mqtt_websockets/src/include/mqtt_constants.h
+++ b/src/aclk/mqtt_websockets/mqtt_constants.h
@@ -1,3 +1,5 @@
+// Copyright: SPDX-License-Identifier: GPL-3.0-only
+
#ifndef MQTT_CONSTANTS_H
#define MQTT_CONSTANTS_H
diff --git a/mqtt_websockets/src/mqtt_ng.c b/src/aclk/mqtt_websockets/mqtt_ng.c
index 81cffccf0..f570fde71 100644
--- a/mqtt_websockets/src/mqtt_ng.c
+++ b/src/aclk/mqtt_websockets/mqtt_ng.c
@@ -1,4 +1,8 @@
+// Copyright: SPDX-License-Identifier: GPL-3.0-only
+
+#ifndef _GNU_SOURCE
#define _GNU_SOURCE
+#endif
#include <stdint.h>
#include <stdlib.h>
@@ -6,7 +10,7 @@
#include <pthread.h>
#include <inttypes.h>
-#include "c_rhash.h"
+#include "c_rhash/c_rhash.h"
#include "common_internal.h"
#include "mqtt_constants.h"
@@ -47,7 +51,7 @@ struct buffer_fragment {
size_t sent;
buffer_frag_flag_t flags;
void (*free_fnc)(void *ptr);
- char *data;
+ unsigned char *data;
uint16_t packet_id;
@@ -60,8 +64,8 @@ typedef struct buffer_fragment *mqtt_msg_data;
// not for actual data sent
struct header_buffer {
size_t size;
- char *data;
- char *tail;
+ unsigned char *data;
+ unsigned char *tail;
struct buffer_fragment *tail_frag;
};
@@ -257,7 +261,7 @@ struct mqtt_ng_client {
size_t max_msg_size;
};
-char pingreq[] = { MQTT_CPT_PINGREQ << 4, 0x00 };
+unsigned char pingreq[] = { MQTT_CPT_PINGREQ << 4, 0x00 };
struct buffer_fragment ping_frag = {
.data = pingreq,
@@ -269,7 +273,7 @@ struct buffer_fragment ping_frag = {
.packet_id = 0
};
-int uint32_to_mqtt_vbi(uint32_t input, char *output) {
+int uint32_to_mqtt_vbi(uint32_t input, unsigned char *output) {
int i = 1;
*output = 0;
@@ -476,7 +480,7 @@ static void buffer_rebuild(struct header_buffer *buf)
{
struct buffer_fragment *frag = (struct buffer_fragment*)buf->data;
do {
- buf->tail = (char*)frag + sizeof(struct buffer_fragment);
+ buf->tail = (unsigned char *) frag + sizeof(struct buffer_fragment);
buf->tail_frag = frag;
if (!(frag->flags & BUFFER_FRAG_DATA_EXTERNAL)) {
buf->tail_frag->data = buf->tail;
@@ -527,7 +531,7 @@ static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t
}
#endif
- memmove(buf->data, frag, buf->tail - (char*)frag);
+ memmove(buf->data, frag, buf->tail - (unsigned char *) frag);
buffer_rebuild(buf);
}
@@ -933,7 +937,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
DATA_ADVANCE(&trx_buf->hdr_buffer, sizeof(mqtt_protocol_name_frag), frag);
// [MQTT-3.1.2.3] Connect flags
- char *connect_flags = WRITE_POS(frag);
+ unsigned char *connect_flags = WRITE_POS(frag);
*connect_flags = 0;
if (auth->username)
*connect_flags |= MQTT_CONNECT_FLAG_USERNAME;
@@ -1947,7 +1951,7 @@ static int send_fragment(struct mqtt_ng_client *client) {
struct buffer_fragment *frag = client->main_buffer.sending_frag;
// for readability
- char *ptr = frag->data + frag->sent;
+ unsigned char *ptr = frag->data + frag->sent;
size_t bytes = frag->len - frag->sent;
size_t processed = 0;
diff --git a/mqtt_websockets/src/include/mqtt_ng.h b/src/aclk/mqtt_websockets/mqtt_ng.h
index 09668d09b..4b0584d58 100644
--- a/mqtt_websockets/src/include/mqtt_ng.h
+++ b/src/aclk/mqtt_websockets/mqtt_ng.h
@@ -1,8 +1,10 @@
+// Copyright: SPDX-License-Identifier: GPL-3.0-only
+
#include <stdint.h>
#include <sys/types.h>
#include <time.h>
-#include "ringbuffer.h"
+#include "c-rbuf/cringbuffer.h"
#include "common_public.h"
#define MQTT_NG_MSGGEN_OK 0
@@ -19,7 +21,7 @@ struct mqtt_ng_client;
* @param output pointer to memory where output will be written to. Must allow up to 4 bytes to be written.
* @return number of bytes written to output or <= 0 if error in which case contents of output are undefined
*/
-int uint32_to_mqtt_vbi(uint32_t input, char *output);
+int uint32_to_mqtt_vbi(uint32_t input, unsigned char *output);
struct mqtt_lwt_properties {
char *will_topic;
diff --git a/mqtt_websockets/src/mqtt_wss_client.c b/src/aclk/mqtt_websockets/mqtt_wss_client.c
index 01e2ffce7..f5b4025d7 100644
--- a/mqtt_websockets/src/mqtt_wss_client.c
+++ b/src/aclk/mqtt_websockets/mqtt_wss_client.c
@@ -1,16 +1,9 @@
-// Copyright (C) 2020 Timotej Šiškovič
// SPDX-License-Identifier: GPL-3.0-only
-//
-// This program is free software: you can redistribute it and/or modify it
-// under the terms of the GNU General Public License as published by the Free Software Foundation, version 3.
-//
-// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
-// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
-// See the GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License along with this program.
-// If not, see <https://www.gnu.org/licenses/>.
+// Copyright (C) 2020 Timotej Šiškovič
+
+#ifndef _GNU_SOURCE
#define _GNU_SOURCE
+#endif
#include "mqtt_wss_client.h"
#include "mqtt_ng.h"
@@ -152,16 +145,16 @@ static void mws_connack_callback_ng(void *user_ctx, int code)
static ssize_t mqtt_send_cb(void *user_ctx, const void* buf, size_t len)
{
- mqtt_wss_client mqtt_wss_client = user_ctx;
+ mqtt_wss_client client = user_ctx;
#ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(mqtt_wss_client->log, "mqtt_pal_sendall(len=%d)", len);
+ mws_debug(client->log, "mqtt_pal_sendall(len=%d)", len);
#endif
- int ret = ws_client_send(mqtt_wss_client->ws_client, WS_OP_BINARY_FRAME, buf, len);
+ int ret = ws_client_send(client->ws_client, WS_OP_BINARY_FRAME, buf, len);
if (ret >= 0 && (size_t)ret != len) {
#ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(mqtt_wss_client->log, "Not complete message sent (Msg=%d,Sent=%d). Need to arm POLLOUT!", len, ret);
+ mws_debug(client->log, "Not complete message sent (Msg=%d,Sent=%d). Need to arm POLLOUT!", len, ret);
#endif
- mqtt_wss_client->mqtt_didnt_finish_write = 1;
+ client->mqtt_didnt_finish_write = 1;
}
return ret;
}
@@ -321,6 +314,7 @@ static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
#define PROXY_CONNECT "CONNECT"
#define PROXY_HTTP "HTTP/1.1"
+#define PROXY_HTTP10 "HTTP/1.0"
#define HTTP_ENDLINE "\x0D\x0A"
#define HTTP_HDR_TERMINATOR "\x0D\x0A\x0D\x0A"
#define HTTP_CODE_LEN 4
@@ -333,14 +327,16 @@ static int http_parse_reply(mqtt_wss_client client, rbuf_t buf)
int idx;
if (rbuf_memcmp_n(buf, PROXY_HTTP, strlen(PROXY_HTTP))) {
- mws_error(client->log, "http_proxy expected reply with \"" PROXY_HTTP "\"");
- return 1;
+ if (rbuf_memcmp_n(buf, PROXY_HTTP10, strlen(PROXY_HTTP10))) {
+ mws_error(client->log, "http_proxy expected reply with \"" PROXY_HTTP "\" or \"" PROXY_HTTP10 "\"");
+ return 1;
+ }
}
rbuf_bump_tail(buf, strlen(PROXY_HTTP));
if (!rbuf_pop(buf, http_code_s, 1) || http_code_s[0] != 0x20) {
- mws_error(client->log, "http_proxy missing space after \"" PROXY_HTTP "\"");
+ mws_error(client->log, "http_proxy missing space after \"" PROXY_HTTP "\" or \"" PROXY_HTTP10 "\"");
return 2;
}
@@ -448,7 +444,8 @@ static int http_proxy_connect(mqtt_wss_client client)
poll_fd.events = POLLIN;
r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity);
- snprintf(r_buf_ptr, r_buf_linear_insert_capacity,"%s %s:%d %s" HTTP_ENDLINE, PROXY_CONNECT, client->target_host, client->target_port, PROXY_HTTP);
+ snprintf(r_buf_ptr, r_buf_linear_insert_capacity,"%s %s:%d %s" HTTP_ENDLINE "Host: %s" HTTP_ENDLINE, PROXY_CONNECT,
+ client->target_host, client->target_port, PROXY_HTTP, client->target_host);
write(client->sockfd, r_buf_ptr, strlen(r_buf_ptr));
if (client->proxy_uname) {
@@ -590,12 +587,18 @@ int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_c
if (client->sockfd > 0)
close(client->sockfd);
- client->sockfd = socket(AF_INET, SOCK_STREAM, 0);
+ client->sockfd = socket(AF_INET, SOCK_STREAM | DEFAULT_SOCKET_FLAGS, 0);
if (client->sockfd < 0) {
mws_error(client->log, "Couldn't create socket()");
return -1;
}
+#ifndef SOCK_CLOEXEC
+ int flags = fcntl(client->sockfd, F_GETFD);
+ if (flags != -1)
+ (void) fcntl(client->sockfd, F_SETFD, flags| FD_CLOEXEC);
+#endif
+
int flag = 1;
int result = setsockopt(client->sockfd,
IPPROTO_TCP,
@@ -680,11 +683,6 @@ int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_c
}
}
- uint8_t mqtt_flags = (mqtt_params->will_flags & MQTT_WSS_PUB_QOSMASK) << 3;
- if (mqtt_params->will_flags & MQTT_WSS_PUB_RETAIN)
- mqtt_flags |= MQTT_CONNECT_WILL_RETAIN;
- mqtt_flags |= MQTT_CONNECT_CLEAN_SESSION;
-
client->mqtt_keepalive = (mqtt_params->keep_alive ? mqtt_params->keep_alive : 400);
mws_info(client->log, "Going to connect using internal MQTT 5 implementation");
diff --git a/mqtt_websockets/src/include/mqtt_wss_client.h b/src/aclk/mqtt_websockets/mqtt_wss_client.h
index e325961b7..4bdea4db9 100644
--- a/mqtt_websockets/src/include/mqtt_wss_client.h
+++ b/src/aclk/mqtt_websockets/mqtt_wss_client.h
@@ -1,15 +1,5 @@
-// Copyright (C) 2020 Timotej Šiškovič
// SPDX-License-Identifier: GPL-3.0-only
-//
-// This program is free software: you can redistribute it and/or modify it
-// under the terms of the GNU General Public License as published by the Free Software Foundation, version 3.
-//
-// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
-// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
-// See the GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License along with this program.
-// If not, see <https://www.gnu.org/licenses/>.
+// Copyright (C) 2020 Timotej Šiškovič
#ifndef MQTT_WSS_CLIENT_H
#define MQTT_WSS_CLIENT_H
diff --git a/mqtt_websockets/src/mqtt_wss_log.c b/src/aclk/mqtt_websockets/mqtt_wss_log.c
index 2c8cf32e5..5e606c12b 100644
--- a/mqtt_websockets/src/mqtt_wss_log.c
+++ b/src/aclk/mqtt_websockets/mqtt_wss_log.c
@@ -1,8 +1,11 @@
-#include "mqtt_wss_log.h"
+// Copyright: SPDX-License-Identifier: GPL-3.0-only
+
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include <stdio.h>
+
+#include "mqtt_wss_log.h"
#include "common_internal.h"
struct mqtt_wss_log_ctx {
diff --git a/mqtt_websockets/src/include/mqtt_wss_log.h b/src/aclk/mqtt_websockets/mqtt_wss_log.h
index a33c460c9..6ae60d870 100644
--- a/mqtt_websockets/src/include/mqtt_wss_log.h
+++ b/src/aclk/mqtt_websockets/mqtt_wss_log.h
@@ -1,3 +1,5 @@
+// Copyright: SPDX-License-Identifier: GPL-3.0-only
+
#ifndef MQTT_WSS_LOG_H
#define MQTT_WSS_LOG_H
diff --git a/src/aclk/mqtt_websockets/test.c b/src/aclk/mqtt_websockets/test.c
new file mode 100644
index 000000000..59a9f3474
--- /dev/null
+++ b/src/aclk/mqtt_websockets/test.c
@@ -0,0 +1,90 @@
+// SPDX-License-Identifier: GPL-3.0-only
+// Copyright (C) 2020 Timotej Šiškovič
+
+#include <unistd.h>
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+
+#include "mqtt_wss_client.h"
+
+int test_exit = 0;
+int port = 0;
+
+void mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
+{
+ (void)log_type;
+ puts(str);
+}
+
+#define TEST_MSGLEN_MAX 512
+void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
+{
+ char cmsg[TEST_MSGLEN_MAX];
+ size_t len = (msglen < TEST_MSGLEN_MAX - 1) ? msglen : (TEST_MSGLEN_MAX - 1);
+ memcpy(cmsg,
+ msg,
+ len);
+ cmsg[len] = 0;
+
+ if (!strcmp(cmsg, "shutdown"))
+ test_exit = 1;
+
+ printf("Got Message From Broker Topic \"%s\" QOS %d MSG: \"%s\"\n", topic, qos, cmsg);
+}
+
+#define TESTMSG "Hello World!"
+int client_handle(mqtt_wss_client client)
+{
+ struct mqtt_connect_params params = {
+ .clientid = "test",
+ .username = "anon",
+ .password = "anon",
+ .keep_alive = 10
+ };
+
+/* struct mqtt_wss_proxy proxy = {
+ .host = "127.0.0.1",
+ .port = 3128,
+ .type = MQTT_WSS_PROXY_HTTP
+ };*/
+
+ while (mqtt_wss_connect(client, "127.0.0.1", port, &params, MQTT_WSS_SSL_ALLOW_SELF_SIGNED, NULL /*&proxy*/)) {
+ printf("Connect failed\n");
+ sleep(1);
+ printf("Attempting Reconnect\n");
+ }
+ printf("Connection succeeded\n");
+
+ mqtt_wss_subscribe(client, "test", 1);
+
+ while (!test_exit) {
+ if(mqtt_wss_service(client, -1) < 0)
+ return 1;
+ }
+ return 0;
+}
+
+int main(int argc, char **argv)
+{
+ if (argc >= 2)
+ port = atoi(argv[1]);
+ if (!port)
+ port = 9002;
+ printf("Using port %d\n", port);
+
+ mqtt_wss_client client = mqtt_wss_new("main", mqtt_wss_log_cb, msg_callback, NULL);
+ if (!client) {
+ printf("Couldn't initialize mqtt_wss\n");
+ return 1;
+ }
+ while (!test_exit) {
+ printf("client_handle = %d\n", client_handle(client));
+ }
+ if (test_exit) {
+ mqtt_wss_disconnect(client, 2000);
+ }
+
+ mqtt_wss_destroy(client);
+ return 0;
+}
diff --git a/mqtt_websockets/src/ws_client.c b/src/aclk/mqtt_websockets/ws_client.c
index 47f633e74..240e889ca 100644
--- a/mqtt_websockets/src/ws_client.c
+++ b/src/aclk/mqtt_websockets/ws_client.c
@@ -72,7 +72,7 @@ ws_client *ws_client_new(size_t buf_size, char **host, mqtt_wss_log_ctx_t log)
if (!client->buf_to_mqtt)
goto cleanup_2;
- client->entropy_fd = open(ENTROPY_SOURCE, O_RDONLY);
+ client->entropy_fd = open(ENTROPY_SOURCE, O_RDONLY | O_CLOEXEC);
if (client->entropy_fd < 1) {
ERROR("Error opening entropy source \"" ENTROPY_SOURCE "\". Reason: \"%s\"", strerror(errno));
goto cleanup_3;
@@ -164,7 +164,7 @@ static int ws_client_get_nonce(ws_client *client, char *dest, unsigned int size)
// we do not need crypto secure random here
// it's just used for protocol negotiation
int rd;
- int f = open(RAND_SRC, O_RDONLY);
+ int f = open(RAND_SRC, O_RDONLY | O_CLOEXEC);
if (f < 0) {
ERROR("Error opening \"%s\". Err: \"%s\"", RAND_SRC, strerror(errno));
return -2;
diff --git a/mqtt_websockets/src/include/ws_client.h b/src/aclk/mqtt_websockets/ws_client.h
index de4fac40b..0ccbd29a8 100644
--- a/mqtt_websockets/src/include/ws_client.h
+++ b/src/aclk/mqtt_websockets/ws_client.h
@@ -1,20 +1,10 @@
-// Copyright (C) 2020 Timotej Šiškovič
// SPDX-License-Identifier: GPL-3.0-only
-//
-// This program is free software: you can redistribute it and/or modify it
-// under the terms of the GNU General Public License as published by the Free Software Foundation, version 3.
-//
-// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
-// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
-// See the GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License along with this program.
-// If not, see <https://www.gnu.org/licenses/>.
+// Copyright (C) 2020 Timotej Šiškovič
#ifndef WS_CLIENT_H
#define WS_CLIENT_H
-#include "ringbuffer.h"
+#include "c-rbuf/cringbuffer.h"
#include "mqtt_wss_log.h"
#include <stdint.h>
diff --git a/aclk/schema-wrappers/agent_cmds.cc b/src/aclk/schema-wrappers/agent_cmds.cc
index 6950f4025..6950f4025 100644
--- a/aclk/schema-wrappers/agent_cmds.cc
+++ b/src/aclk/schema-wrappers/agent_cmds.cc
diff --git a/aclk/schema-wrappers/agent_cmds.h b/src/aclk/schema-wrappers/agent_cmds.h
index 7e01f86c5..7e01f86c5 100644
--- a/aclk/schema-wrappers/agent_cmds.h
+++ b/src/aclk/schema-wrappers/agent_cmds.h
diff --git a/aclk/schema-wrappers/alarm_config.cc b/src/aclk/schema-wrappers/alarm_config.cc
index 64d28f324..64d28f324 100644
--- a/aclk/schema-wrappers/alarm_config.cc
+++ b/src/aclk/schema-wrappers/alarm_config.cc
diff --git a/aclk/schema-wrappers/alarm_config.h b/src/aclk/schema-wrappers/alarm_config.h
index 3c9a5d9a8..3c9a5d9a8 100644
--- a/aclk/schema-wrappers/alarm_config.h
+++ b/src/aclk/schema-wrappers/alarm_config.h
diff --git a/aclk/schema-wrappers/alarm_stream.cc b/src/aclk/schema-wrappers/alarm_stream.cc
index 29d80e39e..29d80e39e 100644
--- a/aclk/schema-wrappers/alarm_stream.cc
+++ b/src/aclk/schema-wrappers/alarm_stream.cc
diff --git a/aclk/schema-wrappers/alarm_stream.h b/src/aclk/schema-wrappers/alarm_stream.h
index 3c81ff445..3c81ff445 100644
--- a/aclk/schema-wrappers/alarm_stream.h
+++ b/src/aclk/schema-wrappers/alarm_stream.h
diff --git a/aclk/schema-wrappers/capability.cc b/src/aclk/schema-wrappers/capability.cc
index af45740a9..af45740a9 100644
--- a/aclk/schema-wrappers/capability.cc
+++ b/src/aclk/schema-wrappers/capability.cc
diff --git a/aclk/schema-wrappers/capability.h b/src/aclk/schema-wrappers/capability.h
index c6085a44b..c6085a44b 100644
--- a/aclk/schema-wrappers/capability.h
+++ b/src/aclk/schema-wrappers/capability.h
diff --git a/aclk/schema-wrappers/connection.cc b/src/aclk/schema-wrappers/connection.cc
index 20b40ece2..20b40ece2 100644
--- a/aclk/schema-wrappers/connection.cc
+++ b/src/aclk/schema-wrappers/connection.cc
diff --git a/aclk/schema-wrappers/connection.h b/src/aclk/schema-wrappers/connection.h
index 0356c7d78..0356c7d78 100644
--- a/aclk/schema-wrappers/connection.h
+++ b/src/aclk/schema-wrappers/connection.h
diff --git a/aclk/schema-wrappers/context.cc b/src/aclk/schema-wrappers/context.cc
index b04c9d20c..b04c9d20c 100644
--- a/aclk/schema-wrappers/context.cc
+++ b/src/aclk/schema-wrappers/context.cc
diff --git a/aclk/schema-wrappers/context.h b/src/aclk/schema-wrappers/context.h
index cbb7701a8..cbb7701a8 100644
--- a/aclk/schema-wrappers/context.h
+++ b/src/aclk/schema-wrappers/context.h
diff --git a/aclk/schema-wrappers/context_stream.cc b/src/aclk/schema-wrappers/context_stream.cc
index 3bb1956cb..3bb1956cb 100644
--- a/aclk/schema-wrappers/context_stream.cc
+++ b/src/aclk/schema-wrappers/context_stream.cc
diff --git a/aclk/schema-wrappers/context_stream.h b/src/aclk/schema-wrappers/context_stream.h
index 8c691d2cc..8c691d2cc 100644
--- a/aclk/schema-wrappers/context_stream.h
+++ b/src/aclk/schema-wrappers/context_stream.h
diff --git a/aclk/schema-wrappers/node_connection.cc b/src/aclk/schema-wrappers/node_connection.cc
index db1fa6449..db1fa6449 100644
--- a/aclk/schema-wrappers/node_connection.cc
+++ b/src/aclk/schema-wrappers/node_connection.cc
diff --git a/aclk/schema-wrappers/node_connection.h b/src/aclk/schema-wrappers/node_connection.h
index dac0d8fe0..dac0d8fe0 100644
--- a/aclk/schema-wrappers/node_connection.h
+++ b/src/aclk/schema-wrappers/node_connection.h
diff --git a/aclk/schema-wrappers/node_creation.cc b/src/aclk/schema-wrappers/node_creation.cc
index 5ad25b7e5..5ad25b7e5 100644
--- a/aclk/schema-wrappers/node_creation.cc
+++ b/src/aclk/schema-wrappers/node_creation.cc
diff --git a/aclk/schema-wrappers/node_creation.h b/src/aclk/schema-wrappers/node_creation.h
index 7a8c7f7c7..7a8c7f7c7 100644
--- a/aclk/schema-wrappers/node_creation.h
+++ b/src/aclk/schema-wrappers/node_creation.h
diff --git a/aclk/schema-wrappers/node_info.cc b/src/aclk/schema-wrappers/node_info.cc
index 5e321f688..5e321f688 100644
--- a/aclk/schema-wrappers/node_info.cc
+++ b/src/aclk/schema-wrappers/node_info.cc
diff --git a/aclk/schema-wrappers/node_info.h b/src/aclk/schema-wrappers/node_info.h
index 4f57601df..4f57601df 100644
--- a/aclk/schema-wrappers/node_info.h
+++ b/src/aclk/schema-wrappers/node_info.h
diff --git a/aclk/schema-wrappers/proto_2_json.cc b/src/aclk/schema-wrappers/proto_2_json.cc
index 854396510..854396510 100644
--- a/aclk/schema-wrappers/proto_2_json.cc
+++ b/src/aclk/schema-wrappers/proto_2_json.cc
diff --git a/aclk/schema-wrappers/proto_2_json.h b/src/aclk/schema-wrappers/proto_2_json.h
index 3bd98478c..3bd98478c 100644
--- a/aclk/schema-wrappers/proto_2_json.h
+++ b/src/aclk/schema-wrappers/proto_2_json.h
diff --git a/aclk/schema-wrappers/schema_wrapper_utils.cc b/src/aclk/schema-wrappers/schema_wrapper_utils.cc
index 96a4b9bf1..96a4b9bf1 100644
--- a/aclk/schema-wrappers/schema_wrapper_utils.cc
+++ b/src/aclk/schema-wrappers/schema_wrapper_utils.cc
diff --git a/src/aclk/schema-wrappers/schema_wrapper_utils.h b/src/aclk/schema-wrappers/schema_wrapper_utils.h
new file mode 100644
index 000000000..693a4ce5f
--- /dev/null
+++ b/src/aclk/schema-wrappers/schema_wrapper_utils.h
@@ -0,0 +1,24 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef SCHEMA_WRAPPER_UTILS_H
+#define SCHEMA_WRAPPER_UTILS_H
+
+#include "database/rrd.h"
+
+#include <sys/time.h>
+#include <google/protobuf/timestamp.pb.h>
+#include <google/protobuf/map.h>
+
+#if GOOGLE_PROTOBUF_VERSION < 3001000
+#define PROTO_COMPAT_MSG_SIZE(msg) (size_t)msg.ByteSize()
+#define PROTO_COMPAT_MSG_SIZE_PTR(msg) (size_t)msg->ByteSize()
+#else
+#define PROTO_COMPAT_MSG_SIZE(msg) msg.ByteSizeLong()
+#define PROTO_COMPAT_MSG_SIZE_PTR(msg) msg->ByteSizeLong()
+#endif
+
+void set_google_timestamp_from_timeval(struct timeval tv, google::protobuf::Timestamp *ts);
+void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, struct timeval *tv);
+int label_add_to_map_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data);
+
+#endif /* SCHEMA_WRAPPER_UTILS_H */
diff --git a/aclk/schema-wrappers/schema_wrappers.h b/src/aclk/schema-wrappers/schema_wrappers.h
index b651b8845..b651b8845 100644
--- a/aclk/schema-wrappers/schema_wrappers.h
+++ b/src/aclk/schema-wrappers/schema_wrappers.h