summaryrefslogtreecommitdiffstats
path: root/src/aclk/mqtt_websockets/ws_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/aclk/mqtt_websockets/ws_client.c')
-rw-r--r--src/aclk/mqtt_websockets/ws_client.c262
1 files changed, 99 insertions, 163 deletions
diff --git a/src/aclk/mqtt_websockets/ws_client.c b/src/aclk/mqtt_websockets/ws_client.c
index a6b9b23f3..99ea266c8 100644
--- a/src/aclk/mqtt_websockets/ws_client.c
+++ b/src/aclk/mqtt_websockets/ws_client.c
@@ -1,103 +1,43 @@
-// Copyright (C) 2020 Timotej Šiškovič
-// SPDX-License-Identifier: GPL-3.0-only
-//
-// This program is free software: you can redistribute it and/or modify it
-// under the terms of the GNU General Public License as published by the Free Software Foundation, version 3.
-//
-// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
-// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
-// See the GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License along with this program.
-// If not, see <https://www.gnu.org/licenses/>.
-
-#include <fcntl.h>
-#include <unistd.h>
-#include <string.h>
-#include <errno.h>
-#include <ctype.h>
-
-#include <openssl/evp.h>
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "libnetdata/libnetdata.h"
#include "ws_client.h"
#include "common_internal.h"
-#ifdef MQTT_WEBSOCKETS_DEBUG
-#include "../c-rbuf/src/ringbuffer_internal.h"
-#endif
-
-#define UNIT_LOG_PREFIX "ws_client: "
-#define FATAL(fmt, ...) mws_fatal(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define ERROR(fmt, ...) mws_error(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define WARN(fmt, ...) mws_warn (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define INFO(fmt, ...) mws_info (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define DEBUG(fmt, ...) mws_debug(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-
const char *websocket_upgrage_hdr = "GET /mqtt HTTP/1.1\x0D\x0A"
"Host: %s\x0D\x0A"
"Upgrade: websocket\x0D\x0A"
"Connection: Upgrade\x0D\x0A"
"Sec-WebSocket-Key: %s\x0D\x0A"
- "Origin: http://example.com\x0D\x0A"
+ "Origin: \x0D\x0A"
"Sec-WebSocket-Protocol: mqtt\x0D\x0A"
"Sec-WebSocket-Version: 13\x0D\x0A\x0D\x0A";
const char *mqtt_protoid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
#define DEFAULT_RINGBUFFER_SIZE (1024*128)
-#define ENTROPY_SOURCE "/dev/urandom"
-ws_client *ws_client_new(size_t buf_size, char **host, mqtt_wss_log_ctx_t log)
-{
- ws_client *client;
+ws_client *ws_client_new(size_t buf_size, char **host)
+{
if(!host)
return NULL;
- client = callocz(1, sizeof(ws_client));
- if (!client)
- return NULL;
-
+ ws_client *client = callocz(1, sizeof(ws_client));
client->host = host;
- client->log = log;
-
client->buf_read = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE);
- if (!client->buf_read)
- goto cleanup;
-
client->buf_write = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE);
- if (!client->buf_write)
- goto cleanup_1;
-
client->buf_to_mqtt = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE);
- if (!client->buf_to_mqtt)
- goto cleanup_2;
-
- client->entropy_fd = open(ENTROPY_SOURCE, O_RDONLY | O_CLOEXEC);
- if (client->entropy_fd < 1) {
- ERROR("Error opening entropy source \"" ENTROPY_SOURCE "\". Reason: \"%s\"", strerror(errno));
- goto cleanup_3;
- }
return client;
-
-cleanup_3:
- rbuf_free(client->buf_to_mqtt);
-cleanup_2:
- rbuf_free(client->buf_write);
-cleanup_1:
- rbuf_free(client->buf_read);
-cleanup:
- freez(client);
- return NULL;
}
void ws_client_free_headers(ws_client *client)
{
struct http_header *ptr = client->hs.headers;
- struct http_header *tmp;
while (ptr) {
- tmp = ptr;
+ struct http_header *tmp = ptr;
ptr = ptr->next;
freez(tmp);
}
@@ -112,7 +52,6 @@ void ws_client_destroy(ws_client *client)
ws_client_free_headers(client);
freez(client->hs.nonce_reply);
freez(client->hs.http_reply_msg);
- close(client->entropy_fd);
rbuf_free(client->buf_read);
rbuf_free(client->buf_write);
rbuf_free(client->buf_to_mqtt);
@@ -141,7 +80,7 @@ void ws_client_reset(ws_client *client)
int ws_client_add_http_header(ws_client *client, struct http_header *hdr)
{
if (client->hs.hdr_count > MAX_HTTP_HDR_COUNT) {
- ERROR("Too many HTTP response header fields");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Too many HTTP response header fields");
return -1;
}
@@ -156,7 +95,7 @@ int ws_client_add_http_header(ws_client *client, struct http_header *hdr)
return 0;
}
-int ws_client_want_write(ws_client *client)
+int ws_client_want_write(const ws_client *client)
{
return rbuf_bytes_available(client->buf_write);
}
@@ -165,78 +104,89 @@ int ws_client_want_write(ws_client *client)
#define TEMP_BUF_SIZE 4096
int ws_client_start_handshake(ws_client *client)
{
- nd_uuid_t nonce;
+ unsigned char nonce[WEBSOCKET_NONCE_SIZE];
char nonce_b64[256];
char second[TEMP_BUF_SIZE];
unsigned int md_len;
- unsigned char *digest;
+ unsigned char digest[EVP_MAX_MD_SIZE]; // EVP_MAX_MD_SIZE ensures enough space
EVP_MD_CTX *md_ctx;
const EVP_MD *md;
+ int rc = 1;
if(!client->host || !*client->host) {
- ERROR("Hostname has not been set. We should not be able to come here!");
- return 1;
- }
-
- uuid_generate_random(nonce);
- EVP_EncodeBlock((unsigned char *)nonce_b64, (const unsigned char *)nonce, WEBSOCKET_NONCE_SIZE);
- snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr, *client->host, nonce_b64);
-
- if(rbuf_bytes_free(client->buf_write) < strlen(second)) {
- ERROR("Write buffer capacity too low.");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Hostname has not been set. We should not be able to come here!");
return 1;
}
- rbuf_push(client->buf_write, second, strlen(second));
- client->state = WS_HANDSHAKE;
-
- //Calculating expected Sec-WebSocket-Accept reply
- snprintf(second, TEMP_BUF_SIZE, "%s%s", nonce_b64, mqtt_protoid);
+ // Generate a random 16-byte nonce
+ os_random_bytes(nonce, sizeof(nonce));
+ // Initialize the digest context
#if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110)
md_ctx = EVP_MD_CTX_create();
#else
md_ctx = EVP_MD_CTX_new();
#endif
if (md_ctx == NULL) {
- ERROR("Cant create EVP_MD Context");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Can't create EVP_MD context");
return 1;
}
- md = EVP_get_digestbyname("sha1");
+ md = EVP_sha1(); // Use SHA-1 for WebSocket handshake
if (!md) {
- ERROR("Unknown message digest");
- return 1;
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown message digest SHA-1");
+ goto exit_with_error;
}
- if ((digest = (unsigned char *)OPENSSL_malloc(EVP_MD_size(EVP_sha256()))) == NULL) {
- ERROR("Cant alloc digest");
- return 1;
+ (void) netdata_base64_encode((unsigned char *) nonce_b64, nonce, WEBSOCKET_NONCE_SIZE);
+
+ // Format and push the upgrade header to the write buffer
+ size_t bytes = snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr, *client->host, nonce_b64);
+ if(rbuf_bytes_free(client->buf_write) < bytes) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Write buffer capacity too low.");
+ goto exit_with_error;
}
+ rbuf_push(client->buf_write, second, bytes);
+
+ client->state = WS_HANDSHAKE;
- EVP_DigestInit_ex(md_ctx, md, NULL);
- EVP_DigestUpdate(md_ctx, second, strlen(second));
- EVP_DigestFinal_ex(md_ctx, digest, &md_len);
+ // Create the expected Sec-WebSocket-Accept value
+ bytes = snprintf(second, TEMP_BUF_SIZE, "%s%s", nonce_b64, mqtt_protoid);
- EVP_EncodeBlock((unsigned char *)nonce_b64, digest, (int) md_len);
+ if (!EVP_DigestInit_ex(md_ctx, md, NULL)) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to initialize digest context");
+ goto exit_with_error;
+ }
+
+ if (!EVP_DigestUpdate(md_ctx, second, bytes)) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to update digest");
+ goto exit_with_error;
+ }
+
+ if (!EVP_DigestFinal_ex(md_ctx, digest, &md_len)) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to finalize digest");
+ goto exit_with_error;
+ }
+
+ (void) netdata_base64_encode((unsigned char *) nonce_b64, digest, md_len);
freez(client->hs.nonce_reply);
client->hs.nonce_reply = strdupz(nonce_b64);
+ rc = 0;
- OPENSSL_free(digest);
-
+exit_with_error:
#if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110)
EVP_MD_CTX_destroy(md_ctx);
#else
EVP_MD_CTX_free(md_ctx);
#endif
- return 0;
+ return rc;
}
#define BUF_READ_MEMCMP_CONST(const, err) \
if (rbuf_memcmp_n(client->buf_read, const, strlen(const))) { \
- ERROR(err); \
+ nd_log(NDLS_DAEMON, NDLP_ERR, err); \
rbuf_flush(client->buf_read); \
return WS_CLIENT_PROTOCOL_ERROR; \
}
@@ -262,7 +212,7 @@ int ws_client_start_handshake(ws_client *client)
#define HTTP_HDR_LINE_CHECK_LIMIT(x) \
if ((x) >= MAX_HTTP_LINE_LENGTH) { \
- ERROR("HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \
+ nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \
return WS_CLIENT_PROTOCOL_ERROR; \
}
@@ -285,13 +235,13 @@ int ws_client_parse_handshake_resp(ws_client *client)
BUF_READ_CHECK_AT_LEAST(HTTP_SC_LENGTH); // "XXX " http return code
rbuf_pop(client->buf_read, buf, HTTP_SC_LENGTH);
if (buf[HTTP_SC_LENGTH - 1] != 0x20) {
- ERROR("HTTP status code received is not terminated by space (0x20)");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP status code received is not terminated by space (0x20)");
return WS_CLIENT_PROTOCOL_ERROR;
}
buf[HTTP_SC_LENGTH - 1] = 0;
client->hs.http_code = atoi(buf);
if (client->hs.http_code < 100 || client->hs.http_code >= 600) {
- ERROR("HTTP status code received not in valid range 100-600");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP status code received not in valid range 100-600");
return WS_CLIENT_PROTOCOL_ERROR;
}
client->hs.hdr_state = WS_HDR_ENDLINE;
@@ -330,16 +280,16 @@ int ws_client_parse_handshake_resp(ws_client *client)
ptr = rbuf_find_bytes(client->buf_read, HTTP_HDR_SEPARATOR, strlen(HTTP_HDR_SEPARATOR), &idx_sep);
if (!ptr || idx_sep > idx_crlf) {
- ERROR("Expected HTTP hdr field key/value separator \": \" before endline in non empty HTTP header line");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Expected HTTP hdr field key/value separator \": \" before endline in non empty HTTP header line");
return WS_CLIENT_PROTOCOL_ERROR;
}
if (idx_crlf == idx_sep + (int)strlen(HTTP_HDR_SEPARATOR)) {
- ERROR("HTTP Header value cannot be empty");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP Header value cannot be empty");
return WS_CLIENT_PROTOCOL_ERROR;
}
if (idx_sep > HTTP_HEADER_NAME_MAX_LEN) {
- ERROR("HTTP header too long (%d)", idx_sep);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP header too long (%d)", idx_sep);
return WS_CLIENT_PROTOCOL_ERROR;
}
@@ -347,23 +297,21 @@ int ws_client_parse_handshake_resp(ws_client *client)
hdr->key = ((char*)hdr) + sizeof(struct http_header);
hdr->value = hdr->key + idx_sep + 1;
- bytes = rbuf_pop(client->buf_read, hdr->key, idx_sep);
+ rbuf_pop(client->buf_read, hdr->key, idx_sep);
rbuf_bump_tail(client->buf_read, strlen(HTTP_HDR_SEPARATOR));
- bytes = rbuf_pop(client->buf_read, hdr->value, idx_crlf - idx_sep - strlen(HTTP_HDR_SEPARATOR));
+ rbuf_pop(client->buf_read, hdr->value, idx_crlf - idx_sep - strlen(HTTP_HDR_SEPARATOR));
rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE));
for (int i = 0; hdr->key[i]; i++)
hdr->key[i] = tolower(hdr->key[i]);
-// DEBUG("HTTP header \"%s\" received. Value \"%s\"", hdr->key, hdr->value);
-
if (ws_client_add_http_header(client, hdr))
return WS_CLIENT_PROTOCOL_ERROR;
if (!strcmp(hdr->key, WS_CONN_ACCEPT)) {
if (strcmp(client->hs.nonce_reply, hdr->value)) {
- ERROR("Received NONCE \"%s\" does not match expected nonce of \"%s\"", hdr->value, client->hs.nonce_reply);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Received NONCE \"%s\" does not match expected nonce of \"%s\"", hdr->value, client->hs.nonce_reply);
return WS_CLIENT_PROTOCOL_ERROR;
}
client->hs.nonce_matched = 1;
@@ -373,21 +321,21 @@ int ws_client_parse_handshake_resp(ws_client *client)
case WS_HDR_PARSE_DONE:
if (!client->hs.nonce_matched) {
- ERROR("Missing " WS_CONN_ACCEPT " header");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Missing " WS_CONN_ACCEPT " header");
return WS_CLIENT_PROTOCOL_ERROR;
}
if (client->hs.http_code != 101) {
- ERROR("HTTP return code not 101. Received %d with msg \"%s\".", client->hs.http_code, client->hs.http_reply_msg);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP return code not 101. Received %d with msg \"%s\".", client->hs.http_code, client->hs.http_reply_msg);
return WS_CLIENT_PROTOCOL_ERROR;
}
client->state = WS_ESTABLISHED;
client->hs.hdr_state = WS_HDR_ALL_DONE;
- INFO("Websocket Connection Accepted By Server");
+ nd_log(NDLS_DAEMON, NDLP_INFO, "Websocket Connection Accepted By Server");
return WS_CLIENT_PARSING_DONE;
case WS_HDR_ALL_DONE:
- FATAL("This is error we should never come here!");
+ nd_log(NDLS_DAEMON, NDLP_CRIT, "This is error we should never come here!");
return WS_CLIENT_PROTOCOL_ERROR;
}
return 0;
@@ -397,7 +345,7 @@ int ws_client_parse_handshake_resp(ws_client *client)
#define WS_FINAL_FRAG BYTE_MSB
#define WS_PAYLOAD_MASKED BYTE_MSB
-static inline size_t get_ws_hdr_size(size_t payload_size)
+static size_t get_ws_hdr_size(size_t payload_size)
{
size_t hdr_len = 2 + 4 /*mask*/;
if(payload_size > 125)
@@ -408,7 +356,7 @@ static inline size_t get_ws_hdr_size(size_t payload_size)
}
#define MAX_POSSIBLE_HDR_LEN 14
-int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size)
+int ws_client_send(const ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size)
{
// TODO maybe? implement fragmenting, it is not necessary though
// as both tested MQTT brokers have no reuirement of one MQTT envelope
@@ -416,24 +364,16 @@ int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const ch
// one big MQTT message as single fragmented WebSocket envelope
char hdr[MAX_POSSIBLE_HDR_LEN];
char *ptr = hdr;
- char *mask;
int size_written = 0;
size_t j = 0;
size_t w_buff_free = rbuf_bytes_free(client->buf_write);
size_t hdr_len = get_ws_hdr_size(size);
- if (w_buff_free < hdr_len * 2) {
-#ifdef DEBUG_ULTRA_VERBOSE
- DEBUG("Write buffer full. Can't write requested %d size.", size);
-#endif
+ if (w_buff_free < hdr_len * 2)
return 0;
- }
if (w_buff_free < (hdr_len + size)) {
-#ifdef DEBUG_ULTRA_VERBOSE
- DEBUG("Can't write whole MQTT packet of %d bytes into the buffer. Will do partial send of %d.", size, w_buff_free - hdr_len);
-#endif
size = w_buff_free - hdr_len;
hdr_len = get_ws_hdr_size(size);
// the actual needed header size might decrease if we cut number of bytes
@@ -459,12 +399,10 @@ int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const ch
ptr += sizeof(be);
} else
*ptr++ |= size;
-
- mask = ptr;
- if (read(client->entropy_fd, mask, sizeof(uint32_t)) < (ssize_t)sizeof(uint32_t)) {
- ERROR("Unable to get mask from \"" ENTROPY_SOURCE "\"");
- return -2;
- }
+
+ char *mask = ptr;
+ uint32_t mask32 = os_random32() + 1;
+ memcpy(mask, &mask32, sizeof(mask32));
rbuf_push(client->buf_write, hdr, hdr_len);
@@ -490,7 +428,7 @@ int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const ch
return size_written;
}
-static int check_opcode(ws_client *client,enum websocket_opcode oc)
+static int check_opcode(enum websocket_opcode oc)
{
switch(oc) {
case WS_OP_BINARY_FRAME:
@@ -498,34 +436,34 @@ static int check_opcode(ws_client *client,enum websocket_opcode oc)
case WS_OP_PING:
return 0;
case WS_OP_CONTINUATION_FRAME:
- FATAL("WS_OP_CONTINUATION_FRAME NOT IMPLEMENTED YET!!!!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "WS_OP_CONTINUATION_FRAME NOT IMPLEMENTED YET!!!!");
return 0;
case WS_OP_TEXT_FRAME:
- FATAL("WS_OP_TEXT_FRAME NOT IMPLEMENTED YET!!!!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "WS_OP_TEXT_FRAME NOT IMPLEMENTED YET!!!!");
return 0;
case WS_OP_PONG:
- FATAL("WS_OP_PONG NOT IMPLEMENTED YET!!!!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "WS_OP_PONG NOT IMPLEMENTED YET!!!!");
return 0;
default:
return WS_CLIENT_PROTOCOL_ERROR;
}
}
-static inline void ws_client_rx_post_hdr_state(ws_client *client)
+static void ws_client_rx_post_hdr_state(ws_client *client)
{
switch(client->rx.opcode) {
case WS_OP_BINARY_FRAME:
client->rx.parse_state = WS_PAYLOAD_DATA;
- return;
+ break;
case WS_OP_CONNECTION_CLOSE:
client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE;
- return;
+ break;
case WS_OP_PING:
client->rx.parse_state = WS_PAYLOAD_PING_REQ_PAYLOAD;
- return;
+ break;
default:
client->rx.parse_state = WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD;
- return;
+ break;
}
}
@@ -541,15 +479,15 @@ int ws_client_process_rx_ws(ws_client *client)
client->rx.opcode = buf[0] & (char)~BYTE_MSB;
if (!(buf[0] & (char)~WS_FINAL_FRAG)) {
- ERROR("Not supporting fragmented messages yet!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Not supporting fragmented messages yet!");
return WS_CLIENT_PROTOCOL_ERROR;
}
- if (check_opcode(client, client->rx.opcode) == WS_CLIENT_PROTOCOL_ERROR)
+ if (check_opcode(client->rx.opcode) == WS_CLIENT_PROTOCOL_ERROR)
return WS_CLIENT_PROTOCOL_ERROR;
if (buf[1] & (char)WS_PAYLOAD_MASKED) {
- ERROR("Mask is not allowed in Server->Client Websocket direction.");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Mask is not allowed in Server->Client Websocket direction.");
return WS_CLIENT_PROTOCOL_ERROR;
}
@@ -584,12 +522,8 @@ int ws_client_process_rx_ws(ws_client *client)
if (!rbuf_bytes_available(client->buf_read))
return WS_CLIENT_NEED_MORE_BYTES;
char *insert = rbuf_get_linear_insert_range(client->buf_to_mqtt, &size);
- if (!insert) {
-#ifdef DEBUG_ULTRA_VERBOSE
- DEBUG("BUFFER TOO FULL. Avail %d req %d", (int)size, (int)remaining);
-#endif
+ if (!insert)
return WS_CLIENT_BUFFER_FULL;
- }
size = (size > remaining) ? remaining : size;
size = rbuf_pop(client->buf_read, insert, size);
rbuf_bump_head(client->buf_to_mqtt, size);
@@ -603,11 +537,11 @@ int ws_client_process_rx_ws(ws_client *client)
// b) 2byte reason code
// c) 2byte reason code followed by message
if (client->rx.payload_length == 1) {
- ERROR("WebScoket CONNECTION_CLOSE can't have payload of size 1");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "WebScoket CONNECTION_CLOSE can't have payload of size 1");
return WS_CLIENT_PROTOCOL_ERROR;
}
if (!client->rx.payload_length) {
- INFO("WebSocket server closed the connection without giving reason.");
+ nd_log(NDLS_DAEMON, NDLP_INFO, "WebSocket server closed the connection without giving reason.");
client->rx.parse_state = WS_PACKET_DONE;
break;
}
@@ -621,7 +555,7 @@ int ws_client_process_rx_ws(ws_client *client)
client->rx.payload_processed += sizeof(uint16_t);
if(client->rx.payload_processed == client->rx.payload_length) {
- INFO("WebSocket server closed the connection with EC=%d. Without message.",
+ nd_log(NDLS_DAEMON, NDLP_INFO, "WebSocket server closed the connection with EC=%d. Without message.",
client->rx.specific_data.op_close.ec);
client->rx.parse_state = WS_PACKET_DONE;
break;
@@ -640,7 +574,7 @@ int ws_client_process_rx_ws(ws_client *client)
client->rx.payload_length - client->rx.payload_processed);
}
client->rx.specific_data.op_close.reason[client->rx.payload_length] = 0;
- INFO("WebSocket server closed the connection with EC=%d and reason \"%s\"",
+ nd_log(NDLS_DAEMON, NDLP_INFO, "WebSocket server closed the connection with EC=%d and reason \"%s\"",
client->rx.specific_data.op_close.ec,
client->rx.specific_data.op_close.reason);
freez(client->rx.specific_data.op_close.reason);
@@ -649,14 +583,14 @@ int ws_client_process_rx_ws(ws_client *client)
break;
case WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD:
BUF_READ_CHECK_AT_LEAST(client->rx.payload_length);
- WARN("Skipping Websocket Packet of unsupported/unknown type");
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "Skipping Websocket Packet of unsupported/unknown type");
if (client->rx.payload_length)
rbuf_bump_tail(client->buf_read, client->rx.payload_length);
client->rx.parse_state = WS_PACKET_DONE;
return WS_CLIENT_PARSING_DONE;
case WS_PAYLOAD_PING_REQ_PAYLOAD:
if (client->rx.payload_length > rbuf_get_capacity(client->buf_read) / 2) {
- ERROR("Ping arrived with payload which is too big!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Ping arrived with payload which is too big!");
return WS_CLIENT_INTERNAL_ERROR;
}
BUF_READ_CHECK_AT_LEAST(client->rx.payload_length);
@@ -666,7 +600,7 @@ int ws_client_process_rx_ws(ws_client *client)
// then attempt to send as soon as buffer space clears up
size = ws_client_send(client, WS_OP_PONG, client->rx.specific_data.ping_msg, client->rx.payload_length);
if (size != client->rx.payload_length) {
- ERROR("Unable to send the PONG as one packet back. Closing connection.");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Unable to send the PONG as one packet back. Closing connection.");
return WS_CLIENT_PROTOCOL_ERROR;
}
client->rx.parse_state = WS_PACKET_DONE;
@@ -678,7 +612,7 @@ int ws_client_process_rx_ws(ws_client *client)
return WS_CLIENT_CONNECTION_CLOSED;
return WS_CLIENT_PARSING_DONE;
default:
- FATAL("Unknown parse state");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown parse state");
return WS_CLIENT_INTERNAL_ERROR;
}
return 0;
@@ -711,6 +645,8 @@ int ws_client_process(ws_client *client)
case WS_CLIENT_CONNECTION_CLOSED:
client->state = WS_CONN_CLOSED_GRACEFUL;
break;
+ default:
+ break;
}
// if ret == 0 we can continue parsing
// if ret == WS_CLIENT_PARSING_DONE we processed
@@ -719,13 +655,13 @@ int ws_client_process(ws_client *client)
} while (!ret || ret == WS_CLIENT_PARSING_DONE);
break;
case WS_ERROR:
- ERROR("ws_client is in error state. Restart the connection!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "ws_client is in error state. Restart the connection!");
return WS_CLIENT_PROTOCOL_ERROR;
case WS_CONN_CLOSED_GRACEFUL:
- ERROR("Connection has been gracefully closed. Calling this is useless (and probably bug) until you reconnect again.");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Connection has been gracefully closed. Calling this is useless (and probably bug) until you reconnect again.");
return WS_CLIENT_CONNECTION_CLOSED;
default:
- FATAL("Unknown connection state! Probably memory corruption.");
+ nd_log(NDLS_DAEMON, NDLP_CRIT, "Unknown connection state! Probably memory corruption.");
return WS_CLIENT_INTERNAL_ERROR;
}
return ret;