diff options
Diffstat (limited to 'src/aclk/mqtt_websockets/ws_client.c')
-rw-r--r-- | src/aclk/mqtt_websockets/ws_client.c | 262 |
1 files changed, 99 insertions, 163 deletions
diff --git a/src/aclk/mqtt_websockets/ws_client.c b/src/aclk/mqtt_websockets/ws_client.c index a6b9b23f3..99ea266c8 100644 --- a/src/aclk/mqtt_websockets/ws_client.c +++ b/src/aclk/mqtt_websockets/ws_client.c @@ -1,103 +1,43 @@ -// Copyright (C) 2020 Timotej Šiškovič -// SPDX-License-Identifier: GPL-3.0-only -// -// This program is free software: you can redistribute it and/or modify it -// under the terms of the GNU General Public License as published by the Free Software Foundation, version 3. -// -// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -// See the GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License along with this program. -// If not, see <https://www.gnu.org/licenses/>. - -#include <fcntl.h> -#include <unistd.h> -#include <string.h> -#include <errno.h> -#include <ctype.h> - -#include <openssl/evp.h> +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "libnetdata/libnetdata.h" #include "ws_client.h" #include "common_internal.h" -#ifdef MQTT_WEBSOCKETS_DEBUG -#include "../c-rbuf/src/ringbuffer_internal.h" -#endif - -#define UNIT_LOG_PREFIX "ws_client: " -#define FATAL(fmt, ...) mws_fatal(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define ERROR(fmt, ...) mws_error(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define WARN(fmt, ...) mws_warn (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define INFO(fmt, ...) mws_info (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define DEBUG(fmt, ...) mws_debug(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) - const char *websocket_upgrage_hdr = "GET /mqtt HTTP/1.1\x0D\x0A" "Host: %s\x0D\x0A" "Upgrade: websocket\x0D\x0A" "Connection: Upgrade\x0D\x0A" "Sec-WebSocket-Key: %s\x0D\x0A" - "Origin: http://example.com\x0D\x0A" + "Origin: \x0D\x0A" "Sec-WebSocket-Protocol: mqtt\x0D\x0A" "Sec-WebSocket-Version: 13\x0D\x0A\x0D\x0A"; const char *mqtt_protoid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; #define DEFAULT_RINGBUFFER_SIZE (1024*128) -#define ENTROPY_SOURCE "/dev/urandom" -ws_client *ws_client_new(size_t buf_size, char **host, mqtt_wss_log_ctx_t log) -{ - ws_client *client; +ws_client *ws_client_new(size_t buf_size, char **host) +{ if(!host) return NULL; - client = callocz(1, sizeof(ws_client)); - if (!client) - return NULL; - + ws_client *client = callocz(1, sizeof(ws_client)); client->host = host; - client->log = log; - client->buf_read = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE); - if (!client->buf_read) - goto cleanup; - client->buf_write = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE); - if (!client->buf_write) - goto cleanup_1; - client->buf_to_mqtt = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE); - if (!client->buf_to_mqtt) - goto cleanup_2; - - client->entropy_fd = open(ENTROPY_SOURCE, O_RDONLY | O_CLOEXEC); - if (client->entropy_fd < 1) { - ERROR("Error opening entropy source \"" ENTROPY_SOURCE "\". Reason: \"%s\"", strerror(errno)); - goto cleanup_3; - } return client; - -cleanup_3: - rbuf_free(client->buf_to_mqtt); -cleanup_2: - rbuf_free(client->buf_write); -cleanup_1: - rbuf_free(client->buf_read); -cleanup: - freez(client); - return NULL; } void ws_client_free_headers(ws_client *client) { struct http_header *ptr = client->hs.headers; - struct http_header *tmp; while (ptr) { - tmp = ptr; + struct http_header *tmp = ptr; ptr = ptr->next; freez(tmp); } @@ -112,7 +52,6 @@ void ws_client_destroy(ws_client *client) ws_client_free_headers(client); freez(client->hs.nonce_reply); freez(client->hs.http_reply_msg); - close(client->entropy_fd); rbuf_free(client->buf_read); rbuf_free(client->buf_write); rbuf_free(client->buf_to_mqtt); @@ -141,7 +80,7 @@ void ws_client_reset(ws_client *client) int ws_client_add_http_header(ws_client *client, struct http_header *hdr) { if (client->hs.hdr_count > MAX_HTTP_HDR_COUNT) { - ERROR("Too many HTTP response header fields"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Too many HTTP response header fields"); return -1; } @@ -156,7 +95,7 @@ int ws_client_add_http_header(ws_client *client, struct http_header *hdr) return 0; } -int ws_client_want_write(ws_client *client) +int ws_client_want_write(const ws_client *client) { return rbuf_bytes_available(client->buf_write); } @@ -165,78 +104,89 @@ int ws_client_want_write(ws_client *client) #define TEMP_BUF_SIZE 4096 int ws_client_start_handshake(ws_client *client) { - nd_uuid_t nonce; + unsigned char nonce[WEBSOCKET_NONCE_SIZE]; char nonce_b64[256]; char second[TEMP_BUF_SIZE]; unsigned int md_len; - unsigned char *digest; + unsigned char digest[EVP_MAX_MD_SIZE]; // EVP_MAX_MD_SIZE ensures enough space EVP_MD_CTX *md_ctx; const EVP_MD *md; + int rc = 1; if(!client->host || !*client->host) { - ERROR("Hostname has not been set. We should not be able to come here!"); - return 1; - } - - uuid_generate_random(nonce); - EVP_EncodeBlock((unsigned char *)nonce_b64, (const unsigned char *)nonce, WEBSOCKET_NONCE_SIZE); - snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr, *client->host, nonce_b64); - - if(rbuf_bytes_free(client->buf_write) < strlen(second)) { - ERROR("Write buffer capacity too low."); + nd_log(NDLS_DAEMON, NDLP_ERR, "Hostname has not been set. We should not be able to come here!"); return 1; } - rbuf_push(client->buf_write, second, strlen(second)); - client->state = WS_HANDSHAKE; - - //Calculating expected Sec-WebSocket-Accept reply - snprintf(second, TEMP_BUF_SIZE, "%s%s", nonce_b64, mqtt_protoid); + // Generate a random 16-byte nonce + os_random_bytes(nonce, sizeof(nonce)); + // Initialize the digest context #if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) md_ctx = EVP_MD_CTX_create(); #else md_ctx = EVP_MD_CTX_new(); #endif if (md_ctx == NULL) { - ERROR("Cant create EVP_MD Context"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Can't create EVP_MD context"); return 1; } - md = EVP_get_digestbyname("sha1"); + md = EVP_sha1(); // Use SHA-1 for WebSocket handshake if (!md) { - ERROR("Unknown message digest"); - return 1; + nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown message digest SHA-1"); + goto exit_with_error; } - if ((digest = (unsigned char *)OPENSSL_malloc(EVP_MD_size(EVP_sha256()))) == NULL) { - ERROR("Cant alloc digest"); - return 1; + (void) netdata_base64_encode((unsigned char *) nonce_b64, nonce, WEBSOCKET_NONCE_SIZE); + + // Format and push the upgrade header to the write buffer + size_t bytes = snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr, *client->host, nonce_b64); + if(rbuf_bytes_free(client->buf_write) < bytes) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Write buffer capacity too low."); + goto exit_with_error; } + rbuf_push(client->buf_write, second, bytes); + + client->state = WS_HANDSHAKE; - EVP_DigestInit_ex(md_ctx, md, NULL); - EVP_DigestUpdate(md_ctx, second, strlen(second)); - EVP_DigestFinal_ex(md_ctx, digest, &md_len); + // Create the expected Sec-WebSocket-Accept value + bytes = snprintf(second, TEMP_BUF_SIZE, "%s%s", nonce_b64, mqtt_protoid); - EVP_EncodeBlock((unsigned char *)nonce_b64, digest, (int) md_len); + if (!EVP_DigestInit_ex(md_ctx, md, NULL)) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to initialize digest context"); + goto exit_with_error; + } + + if (!EVP_DigestUpdate(md_ctx, second, bytes)) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to update digest"); + goto exit_with_error; + } + + if (!EVP_DigestFinal_ex(md_ctx, digest, &md_len)) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to finalize digest"); + goto exit_with_error; + } + + (void) netdata_base64_encode((unsigned char *) nonce_b64, digest, md_len); freez(client->hs.nonce_reply); client->hs.nonce_reply = strdupz(nonce_b64); + rc = 0; - OPENSSL_free(digest); - +exit_with_error: #if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) EVP_MD_CTX_destroy(md_ctx); #else EVP_MD_CTX_free(md_ctx); #endif - return 0; + return rc; } #define BUF_READ_MEMCMP_CONST(const, err) \ if (rbuf_memcmp_n(client->buf_read, const, strlen(const))) { \ - ERROR(err); \ + nd_log(NDLS_DAEMON, NDLP_ERR, err); \ rbuf_flush(client->buf_read); \ return WS_CLIENT_PROTOCOL_ERROR; \ } @@ -262,7 +212,7 @@ int ws_client_start_handshake(ws_client *client) #define HTTP_HDR_LINE_CHECK_LIMIT(x) \ if ((x) >= MAX_HTTP_LINE_LENGTH) { \ - ERROR("HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \ + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \ return WS_CLIENT_PROTOCOL_ERROR; \ } @@ -285,13 +235,13 @@ int ws_client_parse_handshake_resp(ws_client *client) BUF_READ_CHECK_AT_LEAST(HTTP_SC_LENGTH); // "XXX " http return code rbuf_pop(client->buf_read, buf, HTTP_SC_LENGTH); if (buf[HTTP_SC_LENGTH - 1] != 0x20) { - ERROR("HTTP status code received is not terminated by space (0x20)"); + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP status code received is not terminated by space (0x20)"); return WS_CLIENT_PROTOCOL_ERROR; } buf[HTTP_SC_LENGTH - 1] = 0; client->hs.http_code = atoi(buf); if (client->hs.http_code < 100 || client->hs.http_code >= 600) { - ERROR("HTTP status code received not in valid range 100-600"); + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP status code received not in valid range 100-600"); return WS_CLIENT_PROTOCOL_ERROR; } client->hs.hdr_state = WS_HDR_ENDLINE; @@ -330,16 +280,16 @@ int ws_client_parse_handshake_resp(ws_client *client) ptr = rbuf_find_bytes(client->buf_read, HTTP_HDR_SEPARATOR, strlen(HTTP_HDR_SEPARATOR), &idx_sep); if (!ptr || idx_sep > idx_crlf) { - ERROR("Expected HTTP hdr field key/value separator \": \" before endline in non empty HTTP header line"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Expected HTTP hdr field key/value separator \": \" before endline in non empty HTTP header line"); return WS_CLIENT_PROTOCOL_ERROR; } if (idx_crlf == idx_sep + (int)strlen(HTTP_HDR_SEPARATOR)) { - ERROR("HTTP Header value cannot be empty"); + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP Header value cannot be empty"); return WS_CLIENT_PROTOCOL_ERROR; } if (idx_sep > HTTP_HEADER_NAME_MAX_LEN) { - ERROR("HTTP header too long (%d)", idx_sep); + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP header too long (%d)", idx_sep); return WS_CLIENT_PROTOCOL_ERROR; } @@ -347,23 +297,21 @@ int ws_client_parse_handshake_resp(ws_client *client) hdr->key = ((char*)hdr) + sizeof(struct http_header); hdr->value = hdr->key + idx_sep + 1; - bytes = rbuf_pop(client->buf_read, hdr->key, idx_sep); + rbuf_pop(client->buf_read, hdr->key, idx_sep); rbuf_bump_tail(client->buf_read, strlen(HTTP_HDR_SEPARATOR)); - bytes = rbuf_pop(client->buf_read, hdr->value, idx_crlf - idx_sep - strlen(HTTP_HDR_SEPARATOR)); + rbuf_pop(client->buf_read, hdr->value, idx_crlf - idx_sep - strlen(HTTP_HDR_SEPARATOR)); rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE)); for (int i = 0; hdr->key[i]; i++) hdr->key[i] = tolower(hdr->key[i]); -// DEBUG("HTTP header \"%s\" received. Value \"%s\"", hdr->key, hdr->value); - if (ws_client_add_http_header(client, hdr)) return WS_CLIENT_PROTOCOL_ERROR; if (!strcmp(hdr->key, WS_CONN_ACCEPT)) { if (strcmp(client->hs.nonce_reply, hdr->value)) { - ERROR("Received NONCE \"%s\" does not match expected nonce of \"%s\"", hdr->value, client->hs.nonce_reply); + nd_log(NDLS_DAEMON, NDLP_ERR, "Received NONCE \"%s\" does not match expected nonce of \"%s\"", hdr->value, client->hs.nonce_reply); return WS_CLIENT_PROTOCOL_ERROR; } client->hs.nonce_matched = 1; @@ -373,21 +321,21 @@ int ws_client_parse_handshake_resp(ws_client *client) case WS_HDR_PARSE_DONE: if (!client->hs.nonce_matched) { - ERROR("Missing " WS_CONN_ACCEPT " header"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Missing " WS_CONN_ACCEPT " header"); return WS_CLIENT_PROTOCOL_ERROR; } if (client->hs.http_code != 101) { - ERROR("HTTP return code not 101. Received %d with msg \"%s\".", client->hs.http_code, client->hs.http_reply_msg); + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP return code not 101. Received %d with msg \"%s\".", client->hs.http_code, client->hs.http_reply_msg); return WS_CLIENT_PROTOCOL_ERROR; } client->state = WS_ESTABLISHED; client->hs.hdr_state = WS_HDR_ALL_DONE; - INFO("Websocket Connection Accepted By Server"); + nd_log(NDLS_DAEMON, NDLP_INFO, "Websocket Connection Accepted By Server"); return WS_CLIENT_PARSING_DONE; case WS_HDR_ALL_DONE: - FATAL("This is error we should never come here!"); + nd_log(NDLS_DAEMON, NDLP_CRIT, "This is error we should never come here!"); return WS_CLIENT_PROTOCOL_ERROR; } return 0; @@ -397,7 +345,7 @@ int ws_client_parse_handshake_resp(ws_client *client) #define WS_FINAL_FRAG BYTE_MSB #define WS_PAYLOAD_MASKED BYTE_MSB -static inline size_t get_ws_hdr_size(size_t payload_size) +static size_t get_ws_hdr_size(size_t payload_size) { size_t hdr_len = 2 + 4 /*mask*/; if(payload_size > 125) @@ -408,7 +356,7 @@ static inline size_t get_ws_hdr_size(size_t payload_size) } #define MAX_POSSIBLE_HDR_LEN 14 -int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size) +int ws_client_send(const ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size) { // TODO maybe? implement fragmenting, it is not necessary though // as both tested MQTT brokers have no reuirement of one MQTT envelope @@ -416,24 +364,16 @@ int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const ch // one big MQTT message as single fragmented WebSocket envelope char hdr[MAX_POSSIBLE_HDR_LEN]; char *ptr = hdr; - char *mask; int size_written = 0; size_t j = 0; size_t w_buff_free = rbuf_bytes_free(client->buf_write); size_t hdr_len = get_ws_hdr_size(size); - if (w_buff_free < hdr_len * 2) { -#ifdef DEBUG_ULTRA_VERBOSE - DEBUG("Write buffer full. Can't write requested %d size.", size); -#endif + if (w_buff_free < hdr_len * 2) return 0; - } if (w_buff_free < (hdr_len + size)) { -#ifdef DEBUG_ULTRA_VERBOSE - DEBUG("Can't write whole MQTT packet of %d bytes into the buffer. Will do partial send of %d.", size, w_buff_free - hdr_len); -#endif size = w_buff_free - hdr_len; hdr_len = get_ws_hdr_size(size); // the actual needed header size might decrease if we cut number of bytes @@ -459,12 +399,10 @@ int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const ch ptr += sizeof(be); } else *ptr++ |= size; - - mask = ptr; - if (read(client->entropy_fd, mask, sizeof(uint32_t)) < (ssize_t)sizeof(uint32_t)) { - ERROR("Unable to get mask from \"" ENTROPY_SOURCE "\""); - return -2; - } + + char *mask = ptr; + uint32_t mask32 = os_random32() + 1; + memcpy(mask, &mask32, sizeof(mask32)); rbuf_push(client->buf_write, hdr, hdr_len); @@ -490,7 +428,7 @@ int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const ch return size_written; } -static int check_opcode(ws_client *client,enum websocket_opcode oc) +static int check_opcode(enum websocket_opcode oc) { switch(oc) { case WS_OP_BINARY_FRAME: @@ -498,34 +436,34 @@ static int check_opcode(ws_client *client,enum websocket_opcode oc) case WS_OP_PING: return 0; case WS_OP_CONTINUATION_FRAME: - FATAL("WS_OP_CONTINUATION_FRAME NOT IMPLEMENTED YET!!!!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "WS_OP_CONTINUATION_FRAME NOT IMPLEMENTED YET!!!!"); return 0; case WS_OP_TEXT_FRAME: - FATAL("WS_OP_TEXT_FRAME NOT IMPLEMENTED YET!!!!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "WS_OP_TEXT_FRAME NOT IMPLEMENTED YET!!!!"); return 0; case WS_OP_PONG: - FATAL("WS_OP_PONG NOT IMPLEMENTED YET!!!!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "WS_OP_PONG NOT IMPLEMENTED YET!!!!"); return 0; default: return WS_CLIENT_PROTOCOL_ERROR; } } -static inline void ws_client_rx_post_hdr_state(ws_client *client) +static void ws_client_rx_post_hdr_state(ws_client *client) { switch(client->rx.opcode) { case WS_OP_BINARY_FRAME: client->rx.parse_state = WS_PAYLOAD_DATA; - return; + break; case WS_OP_CONNECTION_CLOSE: client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE; - return; + break; case WS_OP_PING: client->rx.parse_state = WS_PAYLOAD_PING_REQ_PAYLOAD; - return; + break; default: client->rx.parse_state = WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD; - return; + break; } } @@ -541,15 +479,15 @@ int ws_client_process_rx_ws(ws_client *client) client->rx.opcode = buf[0] & (char)~BYTE_MSB; if (!(buf[0] & (char)~WS_FINAL_FRAG)) { - ERROR("Not supporting fragmented messages yet!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Not supporting fragmented messages yet!"); return WS_CLIENT_PROTOCOL_ERROR; } - if (check_opcode(client, client->rx.opcode) == WS_CLIENT_PROTOCOL_ERROR) + if (check_opcode(client->rx.opcode) == WS_CLIENT_PROTOCOL_ERROR) return WS_CLIENT_PROTOCOL_ERROR; if (buf[1] & (char)WS_PAYLOAD_MASKED) { - ERROR("Mask is not allowed in Server->Client Websocket direction."); + nd_log(NDLS_DAEMON, NDLP_ERR, "Mask is not allowed in Server->Client Websocket direction."); return WS_CLIENT_PROTOCOL_ERROR; } @@ -584,12 +522,8 @@ int ws_client_process_rx_ws(ws_client *client) if (!rbuf_bytes_available(client->buf_read)) return WS_CLIENT_NEED_MORE_BYTES; char *insert = rbuf_get_linear_insert_range(client->buf_to_mqtt, &size); - if (!insert) { -#ifdef DEBUG_ULTRA_VERBOSE - DEBUG("BUFFER TOO FULL. Avail %d req %d", (int)size, (int)remaining); -#endif + if (!insert) return WS_CLIENT_BUFFER_FULL; - } size = (size > remaining) ? remaining : size; size = rbuf_pop(client->buf_read, insert, size); rbuf_bump_head(client->buf_to_mqtt, size); @@ -603,11 +537,11 @@ int ws_client_process_rx_ws(ws_client *client) // b) 2byte reason code // c) 2byte reason code followed by message if (client->rx.payload_length == 1) { - ERROR("WebScoket CONNECTION_CLOSE can't have payload of size 1"); + nd_log(NDLS_DAEMON, NDLP_ERR, "WebScoket CONNECTION_CLOSE can't have payload of size 1"); return WS_CLIENT_PROTOCOL_ERROR; } if (!client->rx.payload_length) { - INFO("WebSocket server closed the connection without giving reason."); + nd_log(NDLS_DAEMON, NDLP_INFO, "WebSocket server closed the connection without giving reason."); client->rx.parse_state = WS_PACKET_DONE; break; } @@ -621,7 +555,7 @@ int ws_client_process_rx_ws(ws_client *client) client->rx.payload_processed += sizeof(uint16_t); if(client->rx.payload_processed == client->rx.payload_length) { - INFO("WebSocket server closed the connection with EC=%d. Without message.", + nd_log(NDLS_DAEMON, NDLP_INFO, "WebSocket server closed the connection with EC=%d. Without message.", client->rx.specific_data.op_close.ec); client->rx.parse_state = WS_PACKET_DONE; break; @@ -640,7 +574,7 @@ int ws_client_process_rx_ws(ws_client *client) client->rx.payload_length - client->rx.payload_processed); } client->rx.specific_data.op_close.reason[client->rx.payload_length] = 0; - INFO("WebSocket server closed the connection with EC=%d and reason \"%s\"", + nd_log(NDLS_DAEMON, NDLP_INFO, "WebSocket server closed the connection with EC=%d and reason \"%s\"", client->rx.specific_data.op_close.ec, client->rx.specific_data.op_close.reason); freez(client->rx.specific_data.op_close.reason); @@ -649,14 +583,14 @@ int ws_client_process_rx_ws(ws_client *client) break; case WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD: BUF_READ_CHECK_AT_LEAST(client->rx.payload_length); - WARN("Skipping Websocket Packet of unsupported/unknown type"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "Skipping Websocket Packet of unsupported/unknown type"); if (client->rx.payload_length) rbuf_bump_tail(client->buf_read, client->rx.payload_length); client->rx.parse_state = WS_PACKET_DONE; return WS_CLIENT_PARSING_DONE; case WS_PAYLOAD_PING_REQ_PAYLOAD: if (client->rx.payload_length > rbuf_get_capacity(client->buf_read) / 2) { - ERROR("Ping arrived with payload which is too big!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Ping arrived with payload which is too big!"); return WS_CLIENT_INTERNAL_ERROR; } BUF_READ_CHECK_AT_LEAST(client->rx.payload_length); @@ -666,7 +600,7 @@ int ws_client_process_rx_ws(ws_client *client) // then attempt to send as soon as buffer space clears up size = ws_client_send(client, WS_OP_PONG, client->rx.specific_data.ping_msg, client->rx.payload_length); if (size != client->rx.payload_length) { - ERROR("Unable to send the PONG as one packet back. Closing connection."); + nd_log(NDLS_DAEMON, NDLP_ERR, "Unable to send the PONG as one packet back. Closing connection."); return WS_CLIENT_PROTOCOL_ERROR; } client->rx.parse_state = WS_PACKET_DONE; @@ -678,7 +612,7 @@ int ws_client_process_rx_ws(ws_client *client) return WS_CLIENT_CONNECTION_CLOSED; return WS_CLIENT_PARSING_DONE; default: - FATAL("Unknown parse state"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown parse state"); return WS_CLIENT_INTERNAL_ERROR; } return 0; @@ -711,6 +645,8 @@ int ws_client_process(ws_client *client) case WS_CLIENT_CONNECTION_CLOSED: client->state = WS_CONN_CLOSED_GRACEFUL; break; + default: + break; } // if ret == 0 we can continue parsing // if ret == WS_CLIENT_PARSING_DONE we processed @@ -719,13 +655,13 @@ int ws_client_process(ws_client *client) } while (!ret || ret == WS_CLIENT_PARSING_DONE); break; case WS_ERROR: - ERROR("ws_client is in error state. Restart the connection!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "ws_client is in error state. Restart the connection!"); return WS_CLIENT_PROTOCOL_ERROR; case WS_CONN_CLOSED_GRACEFUL: - ERROR("Connection has been gracefully closed. Calling this is useless (and probably bug) until you reconnect again."); + nd_log(NDLS_DAEMON, NDLP_ERR, "Connection has been gracefully closed. Calling this is useless (and probably bug) until you reconnect again."); return WS_CLIENT_CONNECTION_CLOSED; default: - FATAL("Unknown connection state! Probably memory corruption."); + nd_log(NDLS_DAEMON, NDLP_CRIT, "Unknown connection state! Probably memory corruption."); return WS_CLIENT_INTERNAL_ERROR; } return ret; |