// Copyright (C) 2020 Timotej Šiškovič // SPDX-License-Identifier: GPL-3.0-only // // This program is free software: you can redistribute it and/or modify it // under the terms of the GNU General Public License as published by the Free Software Foundation, version 3. // // This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; // without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU General Public License for more details. // // You should have received a copy of the GNU General Public License along with this program. // If not, see . #include #include #include #include #include #include #include "ws_client.h" #include "common_internal.h" #ifdef MQTT_WEBSOCKETS_DEBUG #include "../c-rbuf/src/ringbuffer_internal.h" #endif #define UNIT_LOG_PREFIX "ws_client: " #define FATAL(fmt, ...) mws_fatal(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) #define ERROR(fmt, ...) mws_error(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) #define WARN(fmt, ...) mws_warn (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) #define INFO(fmt, ...) mws_info (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) #define DEBUG(fmt, ...) mws_debug(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) const char *websocket_upgrage_hdr = "GET /mqtt HTTP/1.1\x0D\x0A" "Host: %s\x0D\x0A" "Upgrade: websocket\x0D\x0A" "Connection: Upgrade\x0D\x0A" "Sec-WebSocket-Key: %s\x0D\x0A" "Origin: http://example.com\x0D\x0A" "Sec-WebSocket-Protocol: mqtt\x0D\x0A" "Sec-WebSocket-Version: 13\x0D\x0A\x0D\x0A"; const char *mqtt_protoid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; #define DEFAULT_RINGBUFFER_SIZE (1024*128) #define ENTROPY_SOURCE "/dev/urandom" ws_client *ws_client_new(size_t buf_size, char **host, mqtt_wss_log_ctx_t log) { ws_client *client; if(!host) return NULL; client = mw_calloc(1, sizeof(ws_client)); if (!client) return NULL; client->host = host; client->log = log; client->buf_read = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE); if (!client->buf_read) goto cleanup; client->buf_write = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE); if (!client->buf_write) goto cleanup_1; client->buf_to_mqtt = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE); if (!client->buf_to_mqtt) goto cleanup_2; client->entropy_fd = open(ENTROPY_SOURCE, O_RDONLY); if (client->entropy_fd < 1) { ERROR("Error opening entropy source \"" ENTROPY_SOURCE "\". Reason: \"%s\"", strerror(errno)); goto cleanup_3; } return client; cleanup_3: rbuf_free(client->buf_to_mqtt); cleanup_2: rbuf_free(client->buf_write); cleanup_1: rbuf_free(client->buf_read); cleanup: mw_free(client); return NULL; } void ws_client_free_headers(ws_client *client) { struct http_header *ptr = client->hs.headers; struct http_header *tmp; while (ptr) { tmp = ptr; ptr = ptr->next; mw_free(tmp); } client->hs.headers = NULL; client->hs.headers_tail = NULL; client->hs.hdr_count = 0; } void ws_client_destroy(ws_client *client) { ws_client_free_headers(client); mw_free(client->hs.nonce_reply); mw_free(client->hs.http_reply_msg); close(client->entropy_fd); rbuf_free(client->buf_read); rbuf_free(client->buf_write); rbuf_free(client->buf_to_mqtt); mw_free(client); } void ws_client_reset(ws_client *client) { ws_client_free_headers(client); mw_free(client->hs.nonce_reply); client->hs.nonce_reply = NULL; mw_free(client->hs.http_reply_msg); client->hs.http_reply_msg = NULL; rbuf_flush(client->buf_read); rbuf_flush(client->buf_write); rbuf_flush(client->buf_to_mqtt); client->state = WS_RAW; client->hs.hdr_state = WS_HDR_HTTP; client->rx.parse_state = WS_FIRST_2BYTES; } #define MAX_HTTP_HDR_COUNT 128 int ws_client_add_http_header(ws_client *client, struct http_header *hdr) { if (client->hs.hdr_count > MAX_HTTP_HDR_COUNT) { ERROR("Too many HTTP response header fields"); return -1; } if (client->hs.headers) client->hs.headers_tail->next = hdr; else client->hs.headers = hdr; client->hs.headers_tail = hdr; client->hs.hdr_count++; return 0; } int ws_client_want_write(ws_client *client) { return rbuf_bytes_available(client->buf_write); } #define RAND_SRC "/dev/urandom" static int ws_client_get_nonce(ws_client *client, char *dest, unsigned int size) { // we do not need crypto secure random here // it's just used for protocol negotiation int rd; int f = open(RAND_SRC, O_RDONLY); if (f < 0) { ERROR("Error opening \"%s\". Err: \"%s\"", RAND_SRC, strerror(errno)); return -2; } if ((rd = read(f, dest, size)) > 0) { close(f); return rd; } close(f); return -1; } #define WEBSOCKET_NONCE_SIZE 16 #define TEMP_BUF_SIZE 4096 int ws_client_start_handshake(ws_client *client) { char nonce[WEBSOCKET_NONCE_SIZE]; char nonce_b64[256]; char second[TEMP_BUF_SIZE]; unsigned int md_len; unsigned char *digest; EVP_MD_CTX *md_ctx; const EVP_MD *md; if(!*client->host) { ERROR("Hostname has not been set. We should not be able to come here!"); return 1; } ws_client_get_nonce(client, nonce, WEBSOCKET_NONCE_SIZE); EVP_EncodeBlock((unsigned char *)nonce_b64, (const unsigned char *)nonce, WEBSOCKET_NONCE_SIZE); snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr, *client->host, nonce_b64); if(rbuf_bytes_free(client->buf_write) < strlen(second)) { ERROR("Write buffer capacity too low."); return 1; } rbuf_push(client->buf_write, second, strlen(second)); client->state = WS_HANDSHAKE; //Calculating expected Sec-WebSocket-Accept reply snprintf(second, TEMP_BUF_SIZE, "%s%s", nonce_b64, mqtt_protoid); #if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) md_ctx = EVP_MD_CTX_create(); #else md_ctx = EVP_MD_CTX_new(); #endif if (md_ctx == NULL) { ERROR("Cant create EVP_MD Context"); return 1; } md = EVP_get_digestbyname("sha1"); if (!md) { ERROR("Unknown message digest"); return 1; } if ((digest = (unsigned char *)OPENSSL_malloc(EVP_MD_size(EVP_sha256()))) == NULL) { ERROR("Cant alloc digest"); return 1; } EVP_DigestInit_ex(md_ctx, md, NULL); EVP_DigestUpdate(md_ctx, second, strlen(second)); EVP_DigestFinal_ex(md_ctx, digest, &md_len); EVP_EncodeBlock((unsigned char *)nonce_b64, digest, md_len); mw_free(client->hs.nonce_reply); client->hs.nonce_reply = mw_strdup(nonce_b64); OPENSSL_free(digest); #if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) EVP_MD_CTX_destroy(md_ctx); #else EVP_MD_CTX_free(md_ctx); #endif return 0; } #define BUF_READ_MEMCMP_CONST(const, err) \ if (rbuf_memcmp_n(client->buf_read, const, strlen(const))) { \ ERROR(err); \ rbuf_flush(client->buf_read); \ return WS_CLIENT_PROTOCOL_ERROR; \ } #define BUF_READ_CHECK_AT_LEAST(x) \ if (rbuf_bytes_available(client->buf_read) < x) \ return WS_CLIENT_NEED_MORE_BYTES; #define MAX_HTTP_LINE_LENGTH 1024*4 #define HTTP_SC_LENGTH 4 // "XXX " http status code as C string #define WS_CLIENT_HTTP_HDR "HTTP/1.1 " #define WS_CONN_ACCEPT "sec-websocket-accept" #define HTTP_HDR_SEPARATOR ": " #define WS_NONCE_STRLEN_B64 28 #define WS_HTTP_NEWLINE "\r\n" #define HTTP_HEADER_NAME_MAX_LEN 256 #if HTTP_HEADER_NAME_MAX_LEN > MAX_HTTP_LINE_LENGTH #error "Buffer too small" #endif #if WS_NONCE_STRLEN_B64 > MAX_HTTP_LINE_LENGTH #error "Buffer too small" #endif #define HTTP_HDR_LINE_CHECK_LIMIT(x) if ((x) >= MAX_HTTP_LINE_LENGTH) \ { \ ERROR("HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \ return WS_CLIENT_PROTOCOL_ERROR; \ } int ws_client_parse_handshake_resp(ws_client *client) { char buf[HTTP_SC_LENGTH]; int idx_crlf, idx_sep; char *ptr; size_t bytes; switch (client->hs.hdr_state) { case WS_HDR_HTTP: BUF_READ_CHECK_AT_LEAST(strlen(WS_CLIENT_HTTP_HDR)) BUF_READ_MEMCMP_CONST(WS_CLIENT_HTTP_HDR, "Expected \"HTTP1.1\" header"); rbuf_bump_tail(client->buf_read, strlen(WS_CLIENT_HTTP_HDR)); client->hs.hdr_state = WS_HDR_RC; break; case WS_HDR_RC: BUF_READ_CHECK_AT_LEAST(HTTP_SC_LENGTH); // "XXX " http return code rbuf_pop(client->buf_read, buf, HTTP_SC_LENGTH); if (buf[HTTP_SC_LENGTH - 1] != 0x20) { ERROR("HTTP status code received is not terminated by space (0x20)"); return WS_CLIENT_PROTOCOL_ERROR; } buf[HTTP_SC_LENGTH - 1] = 0; client->hs.http_code = atoi(buf); if (client->hs.http_code < 100 || client->hs.http_code >= 600) { ERROR("HTTP status code received not in valid range 100-600"); return WS_CLIENT_PROTOCOL_ERROR; } client->hs.hdr_state = WS_HDR_ENDLINE; break; case WS_HDR_ENDLINE: ptr = rbuf_find_bytes(client->buf_read, WS_HTTP_NEWLINE, strlen(WS_HTTP_NEWLINE), &idx_crlf); if (!ptr) { bytes = rbuf_bytes_available(client->buf_read); HTTP_HDR_LINE_CHECK_LIMIT(bytes); return WS_CLIENT_NEED_MORE_BYTES; } HTTP_HDR_LINE_CHECK_LIMIT(idx_crlf); client->hs.http_reply_msg = mw_malloc(idx_crlf+1); rbuf_pop(client->buf_read, client->hs.http_reply_msg, idx_crlf); client->hs.http_reply_msg[idx_crlf] = 0; rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE)); client->hs.hdr_state = WS_HDR_PARSE_HEADERS; break; case WS_HDR_PARSE_HEADERS: ptr = rbuf_find_bytes(client->buf_read, WS_HTTP_NEWLINE, strlen(WS_HTTP_NEWLINE), &idx_crlf); if (!ptr) { bytes = rbuf_bytes_available(client->buf_read); HTTP_HDR_LINE_CHECK_LIMIT(bytes); return WS_CLIENT_NEED_MORE_BYTES; } HTTP_HDR_LINE_CHECK_LIMIT(idx_crlf); if (!idx_crlf) { // empty line, header end rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE)); client->hs.hdr_state = WS_HDR_PARSE_DONE; return 0; } ptr = rbuf_find_bytes(client->buf_read, HTTP_HDR_SEPARATOR, strlen(HTTP_HDR_SEPARATOR), &idx_sep); if (!ptr || idx_sep > idx_crlf) { ERROR("Expected HTTP hdr field key/value separator \": \" before endline in non empty HTTP header line"); return WS_CLIENT_PROTOCOL_ERROR; } if (idx_crlf == idx_sep + (int)strlen(HTTP_HDR_SEPARATOR)) { ERROR("HTTP Header value cannot be empty"); return WS_CLIENT_PROTOCOL_ERROR; } if (idx_sep > HTTP_HEADER_NAME_MAX_LEN) { ERROR("HTTP header too long (%d)", idx_sep); return WS_CLIENT_PROTOCOL_ERROR; } struct http_header *hdr = mw_calloc(1, sizeof(struct http_header) + idx_crlf); //idx_crlf includes ": " that will be used as 2 \0 bytes hdr->key = ((char*)hdr) + sizeof(struct http_header); hdr->value = hdr->key + idx_sep + 1; bytes = rbuf_pop(client->buf_read, hdr->key, idx_sep); rbuf_bump_tail(client->buf_read, strlen(HTTP_HDR_SEPARATOR)); bytes = rbuf_pop(client->buf_read, hdr->value, idx_crlf - idx_sep - strlen(HTTP_HDR_SEPARATOR)); rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE)); for (int i = 0; hdr->key[i]; i++) hdr->key[i] = tolower(hdr->key[i]); // DEBUG("HTTP header \"%s\" received. Value \"%s\"", hdr->key, hdr->value); if (ws_client_add_http_header(client, hdr)) return WS_CLIENT_PROTOCOL_ERROR; if (!strcmp(hdr->key, WS_CONN_ACCEPT)) { if (strcmp(client->hs.nonce_reply, hdr->value)) { ERROR("Received NONCE \"%s\" does not match expected nonce of \"%s\"", hdr->value, client->hs.nonce_reply); return WS_CLIENT_PROTOCOL_ERROR; } client->hs.nonce_matched = 1; } break; case WS_HDR_PARSE_DONE: if (!client->hs.nonce_matched) { ERROR("Missing " WS_CONN_ACCEPT " header"); return WS_CLIENT_PROTOCOL_ERROR; } if (client->hs.http_code != 101) { ERROR("HTTP return code not 101. Received %d with msg \"%s\".", client->hs.http_code, client->hs.http_reply_msg); return WS_CLIENT_PROTOCOL_ERROR; } client->state = WS_ESTABLISHED; client->hs.hdr_state = WS_HDR_ALL_DONE; INFO("Websocket Connection Accepted By Server"); return WS_CLIENT_PARSING_DONE; case WS_HDR_ALL_DONE: FATAL("This is error we should never come here!"); return WS_CLIENT_PROTOCOL_ERROR; } return 0; } #define BYTE_MSB 0x80 #define WS_FINAL_FRAG BYTE_MSB #define WS_PAYLOAD_MASKED BYTE_MSB static inline size_t get_ws_hdr_size(size_t payload_size) { size_t hdr_len = 2 + 4 /*mask*/; if(payload_size > 125) hdr_len += 2; if(payload_size > 65535) hdr_len += 6; return hdr_len; } #define MAX_POSSIBLE_HDR_LEN 14 int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size) { // TODO maybe? implement fragmenting, it is not necessary though // as both tested MQTT brokers have no reuirement of one MQTT envelope // be equal to one WebSockets envelope. Therefore there is no need to send // one big MQTT message as single fragmented WebSocket envelope char hdr[MAX_POSSIBLE_HDR_LEN]; char *ptr = hdr; char *mask; int size_written = 0; size_t j = 0; size_t w_buff_free = rbuf_bytes_free(client->buf_write); size_t hdr_len = get_ws_hdr_size(size); if (w_buff_free < hdr_len * 2) { #ifdef DEBUG_ULTRA_VERBOSE DEBUG("Write buffer full. Can't write requested %d size.", size); #endif return 0; } if (w_buff_free < (hdr_len + size)) { #ifdef DEBUG_ULTRA_VERBOSE DEBUG("Can't write whole MQTT packet of %d bytes into the buffer. Will do partial send of %d.", size, w_buff_free - hdr_len); #endif size = w_buff_free - hdr_len; hdr_len = get_ws_hdr_size(size); // the actual needed header size might decrease if we cut number of bytes // if decrease of size crosses 65535 or 125 boundary // but I can live with that at least for now // worst case is we have 6 more bytes we could have written // no bigus dealus } *ptr++ = frame_type | WS_FINAL_FRAG; //generate length *ptr = WS_PAYLOAD_MASKED; if (size > 65535) { *ptr++ |= 0x7f; uint64_t be = htobe64(size); memcpy(ptr, (void *)&be, sizeof(be)); ptr += sizeof(be); } else if (size > 125) { *ptr++ |= 0x7e; uint16_t be = htobe16(size); memcpy(ptr, (void *)&be, sizeof(be)); ptr += sizeof(be); } else *ptr++ |= size; mask = ptr; if (read(client->entropy_fd, mask, sizeof(uint32_t)) < (ssize_t)sizeof(uint32_t)) { ERROR("Unable to get mask from \"" ENTROPY_SOURCE "\""); return -2; } rbuf_push(client->buf_write, hdr, hdr_len); if (!size) return 0; // copy and mask data in the write ringbuffer while (size - size_written) { size_t writable_bytes; char *w_ptr = rbuf_get_linear_insert_range(client->buf_write, &writable_bytes); if(!writable_bytes) break; writable_bytes = (writable_bytes > size) ? (size - size_written) : writable_bytes; memcpy(w_ptr, &data[size_written], writable_bytes); rbuf_bump_head(client->buf_write, writable_bytes); for (size_t i = 0; i < writable_bytes; i++, j++) w_ptr[i] ^= mask[j % 4]; size_written += writable_bytes; } return size_written; } static int check_opcode(ws_client *client,enum websocket_opcode oc) { switch(oc) { case WS_OP_BINARY_FRAME: case WS_OP_CONNECTION_CLOSE: case WS_OP_PING: return 0; case WS_OP_CONTINUATION_FRAME: FATAL("WS_OP_CONTINUATION_FRAME NOT IMPLEMENTED YET!!!!"); return 0; case WS_OP_TEXT_FRAME: FATAL("WS_OP_TEXT_FRAME NOT IMPLEMENTED YET!!!!"); return 0; case WS_OP_PONG: FATAL("WS_OP_PONG NOT IMPLEMENTED YET!!!!"); return 0; default: return WS_CLIENT_PROTOCOL_ERROR; } } static inline void ws_client_rx_post_hdr_state(ws_client *client) { switch(client->rx.opcode) { case WS_OP_BINARY_FRAME: client->rx.parse_state = WS_PAYLOAD_DATA; return; case WS_OP_CONNECTION_CLOSE: client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE; return; case WS_OP_PING: client->rx.parse_state = WS_PAYLOAD_PING_REQ_PAYLOAD; return; default: client->rx.parse_state = WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD; return; } } #define LONGEST_POSSIBLE_HDR_PART 8 int ws_client_process_rx_ws(ws_client *client) { char buf[LONGEST_POSSIBLE_HDR_PART]; size_t size; switch (client->rx.parse_state) { case WS_FIRST_2BYTES: BUF_READ_CHECK_AT_LEAST(2); rbuf_pop(client->buf_read, buf, 2); client->rx.opcode = buf[0] & (char)~BYTE_MSB; if (!(buf[0] & (char)~WS_FINAL_FRAG)) { ERROR("Not supporting fragmented messages yet!"); return WS_CLIENT_PROTOCOL_ERROR; } if (check_opcode(client, client->rx.opcode) == WS_CLIENT_PROTOCOL_ERROR) return WS_CLIENT_PROTOCOL_ERROR; if (buf[1] & (char)WS_PAYLOAD_MASKED) { ERROR("Mask is not allowed in Server->Client Websocket direction."); return WS_CLIENT_PROTOCOL_ERROR; } switch (buf[1]) { case 127: client->rx.parse_state = WS_PAYLOAD_EXTENDED_64; break; case 126: client->rx.parse_state = WS_PAYLOAD_EXTENDED_16; break; default: client->rx.payload_length = buf[1]; ws_client_rx_post_hdr_state(client); } break; case WS_PAYLOAD_EXTENDED_16: BUF_READ_CHECK_AT_LEAST(2); rbuf_pop(client->buf_read, buf, 2); client->rx.payload_length = be16toh(*((uint16_t *)buf)); ws_client_rx_post_hdr_state(client); break; case WS_PAYLOAD_EXTENDED_64: BUF_READ_CHECK_AT_LEAST(LONGEST_POSSIBLE_HDR_PART); rbuf_pop(client->buf_read, buf, LONGEST_POSSIBLE_HDR_PART); client->rx.payload_length = be64toh(*((uint64_t *)buf)); ws_client_rx_post_hdr_state(client); break; case WS_PAYLOAD_DATA: // TODO not pretty? while (client->rx.payload_processed < client->rx.payload_length) { size_t remaining = client->rx.payload_length - client->rx.payload_processed; if (!rbuf_bytes_available(client->buf_read)) return WS_CLIENT_NEED_MORE_BYTES; char *insert = rbuf_get_linear_insert_range(client->buf_to_mqtt, &size); if (!insert) { #ifdef DEBUG_ULTRA_VERBOSE DEBUG("BUFFER TOO FULL. Avail %d req %d", (int)size, (int)remaining); #endif return WS_CLIENT_BUFFER_FULL; } size = (size > remaining) ? remaining : size; size = rbuf_pop(client->buf_read, insert, size); rbuf_bump_head(client->buf_to_mqtt, size); client->rx.payload_processed += size; } client->rx.parse_state = WS_PACKET_DONE; break; case WS_PAYLOAD_CONNECTION_CLOSE: // for WS_OP_CONNECTION_CLOSE allowed is // a) empty payload // b) 2byte reason code // c) 2byte reason code followed by message if (client->rx.payload_length == 1) { ERROR("WebScoket CONNECTION_CLOSE can't have payload of size 1"); return WS_CLIENT_PROTOCOL_ERROR; } if (!client->rx.payload_length) { INFO("WebSocket server closed the connection without giving reason."); client->rx.parse_state = WS_PACKET_DONE; break; } client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE_EC; break; case WS_PAYLOAD_CONNECTION_CLOSE_EC: BUF_READ_CHECK_AT_LEAST(sizeof(uint16_t)); rbuf_pop(client->buf_read, buf, sizeof(uint16_t)); client->rx.specific_data.op_close.ec = be16toh(*((uint16_t *)buf)); client->rx.payload_processed += sizeof(uint16_t); if(client->rx.payload_processed == client->rx.payload_length) { INFO("WebSocket server closed the connection with EC=%d. Without message.", client->rx.specific_data.op_close.ec); client->rx.parse_state = WS_PACKET_DONE; break; } client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE_MSG; break; case WS_PAYLOAD_CONNECTION_CLOSE_MSG: if (!client->rx.specific_data.op_close.reason) client->rx.specific_data.op_close.reason = mw_malloc(client->rx.payload_length + 1); while (client->rx.payload_processed < client->rx.payload_length) { if (!rbuf_bytes_available(client->buf_read)) return WS_CLIENT_NEED_MORE_BYTES; client->rx.payload_processed += rbuf_pop(client->buf_read, &client->rx.specific_data.op_close.reason[client->rx.payload_processed - sizeof(uint16_t)], client->rx.payload_length - client->rx.payload_processed); } client->rx.specific_data.op_close.reason[client->rx.payload_length] = 0; INFO("WebSocket server closed the connection with EC=%d and reason \"%s\"", client->rx.specific_data.op_close.ec, client->rx.specific_data.op_close.reason); mw_free(client->rx.specific_data.op_close.reason); client->rx.specific_data.op_close.reason = NULL; client->rx.parse_state = WS_PACKET_DONE; break; case WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD: BUF_READ_CHECK_AT_LEAST(client->rx.payload_length); WARN("Skipping Websocket Packet of unsupported/unknown type"); if (client->rx.payload_length) rbuf_bump_tail(client->buf_read, client->rx.payload_length); client->rx.parse_state = WS_PACKET_DONE; return WS_CLIENT_PARSING_DONE; case WS_PAYLOAD_PING_REQ_PAYLOAD: if (client->rx.payload_length > rbuf_get_capacity(client->buf_read) / 2) { ERROR("Ping arrived with payload which is too big!"); return WS_CLIENT_INTERNAL_ERROR; } BUF_READ_CHECK_AT_LEAST(client->rx.payload_length); client->rx.specific_data.ping_msg = mw_malloc(client->rx.payload_length); rbuf_pop(client->buf_read, client->rx.specific_data.ping_msg, client->rx.payload_length); // TODO schedule this instead of sending right away // then attempt to send as soon as buffer space clears up size = ws_client_send(client, WS_OP_PONG, client->rx.specific_data.ping_msg, client->rx.payload_length); if (size != client->rx.payload_length) { ERROR("Unable to send the PONG as one packet back. Closing connection."); return WS_CLIENT_PROTOCOL_ERROR; } client->rx.parse_state = WS_PACKET_DONE; return WS_CLIENT_PARSING_DONE; case WS_PACKET_DONE: client->rx.parse_state = WS_FIRST_2BYTES; client->rx.payload_processed = 0; if (client->rx.opcode == WS_OP_CONNECTION_CLOSE) return WS_CLIENT_CONNECTION_CLOSED; return WS_CLIENT_PARSING_DONE; default: FATAL("Unknown parse state"); return WS_CLIENT_INTERNAL_ERROR; } return 0; } int ws_client_process(ws_client *client) { int ret; switch(client->state) { case WS_RAW: if (ws_client_start_handshake(client)) return WS_CLIENT_INTERNAL_ERROR; return WS_CLIENT_NEED_MORE_BYTES; case WS_HANDSHAKE: do { ret = ws_client_parse_handshake_resp(client); if (ret == WS_CLIENT_PROTOCOL_ERROR) client->state = WS_ERROR; if (ret == WS_CLIENT_PARSING_DONE && client->state == WS_ESTABLISHED) ret = WS_CLIENT_NEED_MORE_BYTES; } while (!ret); break; case WS_ESTABLISHED: do { ret = ws_client_process_rx_ws(client); switch(ret) { case WS_CLIENT_PROTOCOL_ERROR: client->state = WS_ERROR; break; case WS_CLIENT_CONNECTION_CLOSED: client->state = WS_CONN_CLOSED_GRACEFUL; break; } // if ret == 0 we can continue parsing // if ret == WS_CLIENT_PARSING_DONE we processed // one websocket packet and attempt processing // next one if data available in the buffer } while (!ret || ret == WS_CLIENT_PARSING_DONE); break; case WS_ERROR: ERROR("ws_client is in error state. Restart the connection!"); return WS_CLIENT_PROTOCOL_ERROR; case WS_CONN_CLOSED_GRACEFUL: ERROR("Connection has been gracefully closed. Calling this is useless (and probably bug) until you reconnect again."); return WS_CLIENT_CONNECTION_CLOSED; default: FATAL("Unknown connection state! Probably memory corruption."); return WS_CLIENT_INTERNAL_ERROR; } return ret; }