From 2e85f9325a797977eea9dfea0a925775ddd211d9 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Feb 2021 12:49:00 +0100 Subject: Merging upstream version 1.29.0. Signed-off-by: Daniel Baumann --- aclk/legacy/aclk_lws_wss_client.c | 613 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 613 insertions(+) create mode 100644 aclk/legacy/aclk_lws_wss_client.c (limited to 'aclk/legacy/aclk_lws_wss_client.c') diff --git a/aclk/legacy/aclk_lws_wss_client.c b/aclk/legacy/aclk_lws_wss_client.c new file mode 100644 index 000000000..2e6fd4ec8 --- /dev/null +++ b/aclk/legacy/aclk_lws_wss_client.c @@ -0,0 +1,613 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_lws_wss_client.h" + +#include "libnetdata/libnetdata.h" +#include "../../daemon/common.h" +#include "aclk_common.h" +#include "aclk_stats.h" + +extern int aclk_shutting_down; + +static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); + +struct aclk_lws_wss_perconnect_data { + int todo; +}; + +static struct aclk_lws_wss_engine_instance *engine_instance = NULL; + +void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len) +{ + if (write_len != NULL && write_len_bytes != NULL) + { + *write_len = 0; + *write_len_bytes = 0; + if (engine_instance != NULL) + { + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + + struct lws_wss_packet_buffer *write_b; + size_t w,wb; + for(w=0, wb=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next) + { + w++; + wb += write_b->data_size - write_b->written; + } + *write_len = w; + *write_len_bytes = wb; + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); + } + } + else if (write_len != NULL) + { + *write_len = 0; + if (engine_instance != NULL) + { + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + + struct lws_wss_packet_buffer *write_b; + size_t w; + for(w=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next) + w++; + *write_len = w; + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); + } + } + if (read_len != NULL) + { + *read_len = 0; + if (engine_instance != NULL) + { + aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); + *read_len = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL); + aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); + } + } +} + +static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void *data, size_t size) +{ + struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer)); + if (data) { + new->data = mallocz(LWS_PRE + size); + memcpy(new->data + LWS_PRE, data, size); + new->data_size = size; + new->written = 0; + } + return new; +} + +static inline void lws_wss_packet_buffer_append(struct lws_wss_packet_buffer **list, struct lws_wss_packet_buffer *item) +{ + struct lws_wss_packet_buffer *tail = *list; + if (!*list) { + *list = item; + return; + } + while (tail->next) { + tail = tail->next; + } + tail->next = item; +} + +static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_pop(struct lws_wss_packet_buffer **list) +{ + struct lws_wss_packet_buffer *ret = *list; + if (ret != NULL) + *list = ret->next; + + return ret; +} + +static inline void lws_wss_packet_buffer_free(struct lws_wss_packet_buffer *item) +{ + freez(item->data); + freez(item); +} + +static inline void _aclk_lws_wss_read_buffer_clear(struct lws_ring *ringbuffer) +{ + size_t elems = lws_ring_get_count_waiting_elements(ringbuffer, NULL); + if (elems > 0) + lws_ring_consume(ringbuffer, NULL, NULL, elems); +} + +static inline void _aclk_lws_wss_write_buffer_clear(struct lws_wss_packet_buffer **list) +{ + struct lws_wss_packet_buffer *i; + while ((i = lws_wss_packet_buffer_pop(list)) != NULL) { + lws_wss_packet_buffer_free(i); + } + *list = NULL; +} + +static inline void aclk_lws_wss_clear_io_buffers() +{ + aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); + _aclk_lws_wss_read_buffer_clear(engine_instance->read_ringbuffer); + aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + _aclk_lws_wss_write_buffer_clear(&engine_instance->write_buffer_head); + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); +} + +static const struct lws_protocols protocols[] = { { "aclk-wss", aclk_lws_wss_callback, + sizeof(struct aclk_lws_wss_perconnect_data), 32768*4, 0, 0, 32768*4 }, + { NULL, NULL, 0, 0, 0, 0, 0 } }; + +static void aclk_lws_wss_log_divert(int level, const char *line) +{ + switch (level) { + case LLL_ERR: + error("Libwebsockets Error: %s", line); + break; + case LLL_WARN: + debug(D_ACLK, "Libwebsockets Warn: %s", line); + break; + default: + error("Libwebsockets try to log with unknown log level (%d), msg: %s", level, line); + } +} + +static int aclk_lws_wss_client_init( char *target_hostname, int target_port) +{ + static int lws_logging_initialized = 0; + + if (unlikely(!lws_logging_initialized)) { + lws_set_log_level(LLL_ERR | LLL_WARN, aclk_lws_wss_log_divert); + lws_logging_initialized = 1; + } + + if (!target_hostname) + return 1; + + engine_instance = callocz(1, sizeof(struct aclk_lws_wss_engine_instance)); + + engine_instance->host = target_hostname; + engine_instance->port = target_port; + + + aclk_lws_mutex_init(&engine_instance->write_buf_mutex); + aclk_lws_mutex_init(&engine_instance->read_buf_mutex); + + engine_instance->read_ringbuffer = lws_ring_create(1, ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES, NULL); + if (!engine_instance->read_ringbuffer) + goto failure_cleanup; + + return 0; + +failure_cleanup: + freez(engine_instance); + return 1; +} + +void aclk_lws_wss_destroy_context() +{ + if (!engine_instance) + return; + if (!engine_instance->lws_context) + return; + lws_context_destroy(engine_instance->lws_context); + engine_instance->lws_context = NULL; +} + + +void aclk_lws_wss_client_destroy() +{ + if (engine_instance == NULL) + return; + + aclk_lws_wss_destroy_context(); + engine_instance->lws_wsi = NULL; + + aclk_lws_wss_clear_io_buffers(); + +#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED + pthread_mutex_destroy(&engine_instance->write_buf_mutex); + pthread_mutex_destroy(&engine_instance->read_buf_mutex); +#endif +} + +#ifdef LWS_WITH_SOCKS5 +static int aclk_wss_set_socks(struct lws_vhost *vhost, const char *socks) +{ + char *proxy = strstr(socks, ACLK_PROXY_PROTO_ADDR_SEPARATOR); + + if (!proxy) + return -1; + + proxy += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR); + + if (!*proxy) + return -1; + + return lws_set_socks(vhost, proxy); +} +#endif + +void aclk_wss_set_proxy(struct lws_vhost *vhost) +{ + const char *proxy; + ACLK_PROXY_TYPE proxy_type; + char *log; + + proxy = aclk_get_proxy(&proxy_type); + +#ifdef LWS_WITH_SOCKS5 + lws_set_socks(vhost, ":"); +#endif + lws_set_proxy(vhost, ":"); + + if (proxy_type == PROXY_TYPE_UNKNOWN) { + error("Unknown proxy type"); + return; + } + + if (proxy_type == PROXY_TYPE_SOCKS5 || proxy_type == PROXY_TYPE_HTTP) { + log = strdupz(proxy); + safe_log_proxy_censor(log); + info("Connecting using %s proxy:\"%s\"", aclk_proxy_type_to_s(&proxy_type), log); + freez(log); + } + if (proxy_type == PROXY_TYPE_SOCKS5) { +#ifdef LWS_WITH_SOCKS5 + if (aclk_wss_set_socks(vhost, proxy)) + error("LWS failed to accept socks proxy."); + return; +#else + fatal("We have no SOCKS5 support but we made it here. Programming error!"); +#endif + } + if (proxy_type == PROXY_TYPE_HTTP) { + if (lws_set_proxy(vhost, proxy)) + error("LWS failed to accept http proxy."); + return; + } + if (proxy_type != PROXY_DISABLED) + error("Unknown proxy type"); +} + +// Return code indicates if connection attempt has started async. +int aclk_lws_wss_connect(char *host, int port) +{ + struct lws_client_connect_info i; + struct lws_vhost *vhost; + int n; + + if (!engine_instance) { + if (aclk_lws_wss_client_init(host, port)) + return 1; // Propagate failure + } + + if (!engine_instance->lws_context) + { + // First time through (on this connection), create the context + struct lws_context_creation_info info; + memset(&info, 0, sizeof(struct lws_context_creation_info)); + info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + info.port = CONTEXT_PORT_NO_LISTEN; + info.protocols = protocols; + engine_instance->lws_context = lws_create_context(&info); + if (!engine_instance->lws_context) + { + error("Failed to create lws_context, ACLK will not function"); + return 1; + } + return 0; + // PROTOCOL_INIT callback will call again. + } + + for (n = 0; n < ACLK_LWS_CALLBACK_HISTORY; n++) + engine_instance->lws_callback_history[n] = 0; + + if (engine_instance->lws_wsi) { + error("Already Connected. Only one connection supported at a time."); + return 0; + } + + memset(&i, 0, sizeof(i)); + i.context = engine_instance->lws_context; + i.port = engine_instance->port; + i.address = engine_instance->host; + i.path = "/mqtt"; + i.host = engine_instance->host; + i.protocol = "mqtt"; + + // from LWS docu: + // If option LWS_SERVER_OPTION_EXPLICIT_VHOSTS is given, no vhost is + // created; you're expected to create your own vhosts afterwards using + // lws_create_vhost(). Otherwise a vhost named "default" is also created + // using the information in the vhost-related members, for compatibility. + vhost = lws_get_vhost_by_name(engine_instance->lws_context, "default"); + if(!vhost) + fatal("Could not find the default LWS vhost."); + + aclk_wss_set_proxy(vhost); + +#ifdef ACLK_SSL_ALLOW_SELF_SIGNED + i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_ALLOW_INSECURE; + info("Disabling SSL certificate checks"); +#else + i.ssl_connection = LCCSCF_USE_SSL; +#endif +#if defined(HAVE_X509_VERIFY_PARAM_set1_host) && HAVE_X509_VERIFY_PARAM_set1_host == 0 +#warning DISABLING SSL HOSTNAME VALIDATION BECAUSE IT IS NOT AVAILABLE ON THIS SYSTEM. + i.ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; +#endif + lws_client_connect_via_info(&i); + return 0; +} + +static inline int received_data_to_ringbuff(struct lws_ring *buffer, void *data, size_t len) +{ + if (lws_ring_insert(buffer, data, len) != len) { + error("ACLK_LWS_WSS_CLIENT: receive buffer full. Closing connection to prevent flooding."); + return 0; + } + return 1; +} + +static const char *aclk_lws_callback_name(enum lws_callback_reasons reason) +{ + switch (reason) { + case LWS_CALLBACK_CLIENT_WRITEABLE: + return "LWS_CALLBACK_CLIENT_WRITEABLE"; + case LWS_CALLBACK_CLIENT_RECEIVE: + return "LWS_CALLBACK_CLIENT_RECEIVE"; + case LWS_CALLBACK_PROTOCOL_INIT: + return "LWS_CALLBACK_PROTOCOL_INIT"; + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: + return "LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED"; + case LWS_CALLBACK_USER: + return "LWS_CALLBACK_USER"; + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + return "LWS_CALLBACK_CLIENT_CONNECTION_ERROR"; + case LWS_CALLBACK_CLIENT_CLOSED: + return "LWS_CALLBACK_CLIENT_CLOSED"; + case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: + return "LWS_CALLBACK_WS_PEER_INITIATED_CLOSE"; + case LWS_CALLBACK_WSI_DESTROY: + return "LWS_CALLBACK_WSI_DESTROY"; + case LWS_CALLBACK_CLIENT_ESTABLISHED: + return "LWS_CALLBACK_CLIENT_ESTABLISHED"; + case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: + return "LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION"; + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: + return "LWS_CALLBACK_EVENT_WAIT_CANCELLED"; + default: + // Not using an internal buffer here for thread-safety with unknown calling context. + error("Unknown LWS callback %u", reason); + return "unknown"; + } +} + +void aclk_lws_wss_fail_report() +{ + int i; + int anything_to_send = 0; + BUFFER *buf; + + if (netdata_anonymous_statistics_enabled <= 0) + return; + + // guess - most of the callback will be 1-99 + ',' + \0 + buf = buffer_create((ACLK_LWS_CALLBACK_HISTORY * 2) + 10); + + for (i = 0; i < ACLK_LWS_CALLBACK_HISTORY; i++) + if (engine_instance->lws_callback_history[i]) { + buffer_sprintf(buf, "%s%d", (i ? "," : ""), engine_instance->lws_callback_history[i]); + anything_to_send = 1; + } + + if (anything_to_send) + send_statistics("ACLK_CONN_FAIL", "FAIL", buffer_tostring(buf)); + + buffer_free(buf); +} + +static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +{ + UNUSED(user); + struct lws_wss_packet_buffer *data; + int retval = 0; + static int lws_shutting_down = 0; + int i; + + for (i = ACLK_LWS_CALLBACK_HISTORY - 1; i > 0; i--) + engine_instance->lws_callback_history[i] = engine_instance->lws_callback_history[i - 1]; + engine_instance->lws_callback_history[0] = (int)reason; + + if (unlikely(aclk_shutting_down && !lws_shutting_down)) { + lws_shutting_down = 1; + retval = -1; + engine_instance->upstream_reconnect_request = 0; + } + + // Callback servicing is forced when we are closed from above. + if (engine_instance->upstream_reconnect_request) { + error("Closing lws connectino due to libmosquitto error."); + char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection."; + lws_close_reason( + wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *)upstream_connection_error, + strlen(upstream_connection_error)); + retval = -1; + engine_instance->upstream_reconnect_request = 0; + } + + // Don't log to info - volume is proportional to message flow on ACLK. + switch (reason) { + case LWS_CALLBACK_CLIENT_WRITEABLE: + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + data = engine_instance->write_buffer_head; + if (likely(data)) { + size_t bytes_left = data->data_size - data->written; + if ( bytes_left > FRAGMENT_SIZE) + bytes_left = FRAGMENT_SIZE; + int n = lws_write(wsi, data->data + LWS_PRE + data->written, bytes_left, LWS_WRITE_BINARY); + if (n>=0) { + data->written += n; + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.write_q_consumed += n; + ACLK_STATS_UNLOCK; + } + } + //error("lws_write(req=%u,written=%u) %zu of %zu",bytes_left, rc, data->written,data->data_size,rc); + if (data->written == data->data_size) + { + lws_wss_packet_buffer_pop(&engine_instance->write_buffer_head); + lws_wss_packet_buffer_free(data); + } + if (engine_instance->write_buffer_head) + lws_callback_on_writable(engine_instance->lws_wsi); + } + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); + return retval; + + case LWS_CALLBACK_CLIENT_RECEIVE: + aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); + if (!received_data_to_ringbuff(engine_instance->read_ringbuffer, in, len)) + retval = 1; + aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.read_q_added += len; + ACLK_STATS_UNLOCK; + } + + // to future myself -> do not call this while read lock is active as it will eventually + // want to acquire same lock later in aclk_lws_wss_client_read() function + aclk_lws_connection_data_received(); + return retval; + + case LWS_CALLBACK_WSI_CREATE: + case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: + case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: + case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: + case LWS_CALLBACK_GET_THREAD_ID: // ? + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: + case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: + // Expected and safe to ignore. + debug(D_ACLK, "Ignoring expected callback from LWS: %s", aclk_lws_callback_name(reason)); + return retval; + + default: + // Pass to next switch, this case removes compiler warnings. + break; + } + // Log to info - volume is proportional to connection attempts. + info("Processing callback %s", aclk_lws_callback_name(reason)); + switch (reason) { + case LWS_CALLBACK_PROTOCOL_INIT: + aclk_lws_wss_connect(engine_instance->host, engine_instance->port); // Makes the outgoing connection + break; + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: + if (engine_instance->lws_wsi != NULL && engine_instance->lws_wsi != wsi) + error("Multiple connections on same WSI? %p vs %p", engine_instance->lws_wsi, wsi); + engine_instance->lws_wsi = wsi; + break; + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + error( + "Could not connect MQTT over WSS server \"%s:%d\". LwsReason:\"%s\"", engine_instance->host, + engine_instance->port, (in ? (char *)in : "not given")); + // Fall-through + case LWS_CALLBACK_CLIENT_CLOSED: + case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: + engine_instance->lws_wsi = NULL; // inside libwebsockets lws_close_free_wsi is called after callback + aclk_lws_connection_closed(); + return -1; // the callback response is ignored, hope the above remains true + case LWS_CALLBACK_WSI_DESTROY: + aclk_lws_wss_clear_io_buffers(); + if (!engine_instance->websocket_connection_up) + aclk_lws_wss_fail_report(); + engine_instance->lws_wsi = NULL; + engine_instance->websocket_connection_up = 0; + aclk_lws_connection_closed(); + break; + case LWS_CALLBACK_CLIENT_ESTABLISHED: + engine_instance->websocket_connection_up = 1; + aclk_lws_connection_established(engine_instance->host, engine_instance->port); + break; + + default: + error("Unexpected callback from libwebsockets %s", aclk_lws_callback_name(reason)); + break; + } + return retval; //0-OK, other connection should be closed! +} + +int aclk_lws_wss_client_write(void *buf, size_t count) +{ + if (engine_instance && engine_instance->lws_wsi && engine_instance->websocket_connection_up) { + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + lws_wss_packet_buffer_append(&engine_instance->write_buffer_head, lws_wss_packet_buffer_new(buf, count)); + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.write_q_added += count; + ACLK_STATS_UNLOCK; + } + + lws_callback_on_writable(engine_instance->lws_wsi); + return count; + } + return 0; +} + +int aclk_lws_wss_client_read(void *buf, size_t count) +{ + size_t data_to_be_read = count; + + aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); + size_t readable_byte_count = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL); + if (unlikely(readable_byte_count == 0)) { + errno = EAGAIN; + data_to_be_read = -1; + goto abort; + } + + if (readable_byte_count < data_to_be_read) + data_to_be_read = readable_byte_count; + + data_to_be_read = lws_ring_consume(engine_instance->read_ringbuffer, NULL, buf, data_to_be_read); + if (data_to_be_read == readable_byte_count) + engine_instance->data_to_read = 0; + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.read_q_consumed += data_to_be_read; + ACLK_STATS_UNLOCK; + } + +abort: + aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); + return data_to_be_read; +} + +void aclk_lws_wss_service_loop() +{ + if (engine_instance) + { + /*if (engine_instance->lws_wsi) { + lws_cancel_service(engine_instance->lws_context); + lws_callback_on_writable(engine_instance->lws_wsi); + }*/ + lws_service(engine_instance->lws_context, 0); + } +} + +// in case the MQTT connection disconnect while lws transport is still operational +// we should drop connection and reconnect +// this function should be called when that happens to notify lws of that situation +void aclk_lws_wss_mqtt_layer_disconect_notif() +{ + if (!engine_instance) + return; + if (engine_instance->lws_wsi && engine_instance->websocket_connection_up) { + engine_instance->upstream_reconnect_request = 1; + lws_callback_on_writable( + engine_instance->lws_wsi); //here we just do it to ensure we get callback called from lws, we don't need any actual data to be written. + } +} -- cgit v1.2.3