summaryrefslogtreecommitdiffstats
path: root/src/web/rtc/webrtc.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/web/rtc/webrtc.c')
-rw-r--r--src/web/rtc/webrtc.c733
1 files changed, 733 insertions, 0 deletions
diff --git a/src/web/rtc/webrtc.c b/src/web/rtc/webrtc.c
new file mode 100644
index 000000000..027b168b3
--- /dev/null
+++ b/src/web/rtc/webrtc.c
@@ -0,0 +1,733 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "webrtc.h"
+
+#include "../server/web_client.h"
+#include "../server/web_client_cache.h"
+
+#ifdef HAVE_LIBDATACHANNEL
+
+#include "rtc/rtc.h"
+
+#define WEBRTC_OUR_MAX_MESSAGE_SIZE (5 * 1024 * 1024)
+#define WEBRTC_DEFAULT_REMOTE_MAX_MESSAGE_SIZE (65536)
+#define WEBRTC_COMPRESSED_HEADER_SIZE 200
+
+static void webrtc_log(rtcLogLevel level, const char *message) {
+ switch(level) {
+ case RTC_LOG_NONE:
+ break;
+
+ case RTC_LOG_WARNING:
+ case RTC_LOG_ERROR:
+ case RTC_LOG_FATAL:
+ netdata_log_error("WEBRTC: %s", message);
+ break;
+
+ case RTC_LOG_INFO:
+ netdata_log_info("WEBRTC: %s", message);
+ break;
+
+ default:
+ case RTC_LOG_DEBUG:
+ case RTC_LOG_VERBOSE:
+ internal_error(true, "WEBRTC: %s", message);
+ break;
+
+ }
+}
+
+typedef struct webrtc_datachannel {
+ int dc;
+ char *label;
+ struct webrtc_connection *conn;
+
+ bool open; // atomic
+
+ struct {
+ struct webrtc_datachannel *prev;
+ struct webrtc_datachannel *next;
+ } link;
+} WEBRTC_DC;
+
+typedef struct webrtc_connection {
+ int pc;
+ rtcConfiguration config;
+ rtcState state;
+ rtcGatheringState gathering_state;
+
+ size_t max_message_size;
+ size_t local_max_message_size;
+ size_t remote_max_message_size;
+
+ struct {
+ SPINLOCK spinlock;
+ BUFFER *wb;
+ bool sdp;
+ bool candidates;
+ } response;
+
+ struct {
+ SPINLOCK spinlock;
+ WEBRTC_DC *head;
+ } channels;
+
+ struct {
+ struct webrtc_connection *prev;
+ struct webrtc_connection *next;
+ } link;
+} WEBRTC_CONN;
+
+#define WEBRTC_MAX_ICE_SERVERS 100
+
+static struct {
+ bool enabled;
+ char *iceServers[WEBRTC_MAX_ICE_SERVERS];
+ int iceServersCount;
+ char *proxyServer;
+ char *bindAddress;
+
+ struct {
+ SPINLOCK spinlock;
+ WEBRTC_CONN *head;
+ } unsafe;
+
+} webrtc_base = {
+#ifdef NETDATA_INTERNAL_CHECKS
+ .enabled = true,
+#else
+ .enabled = false,
+#endif
+ .iceServers = {
+ // Format:
+ // [("stun"|"turn"|"turns") (":"|"://")][username ":" password "@"]hostname[":" port]["?transport=" ("udp"|"tcp"|"tls")]
+ //
+ // Note transports TCP and TLS are only available for a TURN server with libnice as ICE backend and govern only the
+ // TURN control connection, meaning relaying is always performed over UDP.
+ //
+ // If the username or password of a URI contains reserved special characters, they must be percent-encoded.
+ // In particular, ":" must be encoded as "%3A" and "@" must by encoded as "%40".
+
+ "stun://stun.l.google.com:19302",
+ NULL, // terminator
+ },
+ .iceServersCount = 1,
+ .proxyServer = NULL, // [("http"|"socks5") (":"|"://")][username ":" password "@"]hostname[" :" port]
+ .bindAddress = NULL,
+ .unsafe = {
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
+ .head = NULL,
+ },
+};
+
+static inline bool webrtc_dc_is_open(WEBRTC_DC *chan) {
+ return __atomic_load_n(&chan->open, __ATOMIC_RELAXED);
+}
+
+static void webrtc_config_ice_servers(void) {
+ BUFFER *wb = buffer_create(0, NULL);
+
+ int i;
+ for(i = 0; i < WEBRTC_MAX_ICE_SERVERS ;i++) {
+ if (webrtc_base.iceServers[i]) {
+ if (buffer_strlen(wb))
+ buffer_strcat(wb, " ");
+
+ internal_error(true, "WEBRTC: default ice server No %d is: '%s'", i, webrtc_base.iceServers[i]);
+ buffer_strcat(wb, webrtc_base.iceServers[i]);
+ }
+ else
+ break;
+ }
+ webrtc_base.iceServersCount = i;
+ internal_error(true, "WEBRTC: there are %d default ice servers: '%s'", webrtc_base.iceServersCount, buffer_tostring(wb));
+
+ char *servers = config_get(CONFIG_SECTION_WEBRTC, "ice servers", buffer_tostring(wb));
+
+ webrtc_base.iceServersCount = 0;
+ char *s = servers, *e;
+ while(*s) {
+ if(isspace(*s))
+ s++;
+
+ e = s;
+ while(*e && !isspace(*e))
+ e++;
+
+ if(s != e && webrtc_base.iceServersCount < WEBRTC_MAX_ICE_SERVERS) {
+ char old = *e;
+ *e = '\0';
+ internal_error(true, "WEBRTC: ice server No %d is: '%s'", webrtc_base.iceServersCount, s);
+ webrtc_base.iceServers[webrtc_base.iceServersCount++] = strdupz(s);
+ *e = old;
+ }
+
+ if(*e)
+ s = e + 1;
+ else
+ break;
+ }
+
+ buffer_free(wb);
+}
+
+void webrtc_initialize() {
+ webrtc_base.enabled = config_get_boolean(CONFIG_SECTION_WEBRTC, "enabled", webrtc_base.enabled);
+ internal_error(true, "WEBRTC: is %s", webrtc_base.enabled ? "enabled" : "disabled");
+
+ webrtc_config_ice_servers();
+
+ webrtc_base.proxyServer = config_get(CONFIG_SECTION_WEBRTC, "proxy server", webrtc_base.proxyServer ? webrtc_base.proxyServer : "");
+ if(!webrtc_base.proxyServer || !*webrtc_base.proxyServer)
+ webrtc_base.proxyServer = NULL;
+
+ internal_error(true, "WEBRTC: proxy server is: '%s'", webrtc_base.proxyServer ? webrtc_base.proxyServer : "");
+
+ webrtc_base.bindAddress = config_get(CONFIG_SECTION_WEBRTC, "bind address", webrtc_base.bindAddress ? webrtc_base.bindAddress : "");
+ if(!webrtc_base.bindAddress || !*webrtc_base.bindAddress)
+ webrtc_base.bindAddress = NULL;
+
+ internal_error(true, "WEBRTC: bind address is: '%s'", webrtc_base.bindAddress ? webrtc_base.bindAddress : "");
+
+ if(!webrtc_base.enabled)
+ return;
+
+ rtcLogLevel level;
+#ifdef NETDATA_INTERNAL_CHECKS
+ level = RTC_LOG_INFO;
+#else
+ level = RTC_LOG_WARNING;
+#endif
+
+ rtcInitLogger(level, webrtc_log);
+ rtcPreload();
+}
+
+void webrtc_close_all_connections() {
+ if(!webrtc_base.enabled)
+ return;
+
+ rtcCleanup();
+}
+
+size_t find_max_message_size_in_sdp(const char *sdp) {
+ char *s = strstr(sdp, "a=max-message-size:");
+ if(s)
+ return str2ul(&s[19]);
+
+ return WEBRTC_DEFAULT_REMOTE_MAX_MESSAGE_SIZE;
+}
+
+// ----------------------------------------------------------------------------
+// execute web API requests
+
+static bool web_client_stop_callback(struct web_client *w __maybe_unused, void *data) {
+ WEBRTC_DC *chan = data;
+ return !webrtc_dc_is_open(chan);
+}
+
+static size_t webrtc_send_in_chunks(WEBRTC_DC *chan, const char *data, size_t size, int code, const char *message_type, HTTP_CONTENT_TYPE content_type, size_t max_message_size, bool binary) {
+ size_t sent_bytes = 0;
+ size_t chunk = 0;
+ size_t total_chunks = size / max_message_size;
+ if(total_chunks * max_message_size < size)
+ total_chunks++;
+
+ char *send_buffer = mallocz(chan->conn->max_message_size);
+
+ char *s = (char *)data;
+ size_t remaining = size;
+ while(remaining > 0) {
+ chunk++;
+
+ size_t message_size = MIN(remaining, max_message_size);
+
+ int len = snprintfz(send_buffer, WEBRTC_COMPRESSED_HEADER_SIZE, "%d %s %zu %zu %zu %s\r\n",
+ code,
+ message_type,
+ message_size,
+ chunk,
+ total_chunks,
+ content_type_id2string(content_type)
+ );
+
+ internal_fatal((size_t)len != strlen(send_buffer), "WEBRTC compressed header line mismatch");
+ internal_fatal(len + message_size > chan->conn->max_message_size, "WEBRTC message exceeds max message size");
+
+ memcpy(&send_buffer[len], s, message_size);
+
+ int total_message_size = (int)(len + message_size);
+ sent_bytes += total_message_size;
+
+ if(!binary)
+ total_message_size = -total_message_size;
+
+ if(rtcSendMessage(chan->dc, send_buffer, total_message_size) != RTC_ERR_SUCCESS)
+ netdata_log_error("WEBRTC[%d],DC[%d]: failed to send LZ4 chunk %zu of %zu", chan->conn->pc, chan->dc, chunk, total_chunks);
+ else
+ internal_error(true, "WEBRTC[%d],DC[%d]: sent chunk %zu of %zu, size %zu (total %d)",
+ chan->conn->pc, chan->dc, chunk, total_chunks, message_size, total_message_size);
+
+ s = s + message_size;
+ remaining -= message_size;
+ }
+
+ internal_fatal(chunk != total_chunks, "WEBRTC number of compressed chunks mismatch");
+
+ freez(send_buffer);
+ return sent_bytes;
+}
+
+static void webrtc_execute_api_request(WEBRTC_DC *chan, const char *request, size_t size __maybe_unused, bool binary __maybe_unused) {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_SRC_TRANSPORT, "webrtc"),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ internal_error(true, "WEBRTC[%d],DC[%d]: got request '%s' of size %zu and type %s.",
+ chan->conn->pc, chan->dc, request, size, binary?"binary":"text");
+
+ struct web_client *w = web_client_get_from_cache();
+ w->statistics.received_bytes = size;
+ w->interrupt.callback = web_client_stop_callback;
+ w->interrupt.callback_data = chan;
+ web_client_set_conn_webrtc(w);
+
+ w->port_acl = HTTP_ACL_WEBRTC | HTTP_ACL_ALL_FEATURES;
+ w->acl = w->port_acl;
+
+ char *path = (char *)request;
+ if(strncmp(request, "POST ", 5) == 0) {
+ w->mode = HTTP_REQUEST_MODE_POST;
+ path += 10;
+ }
+ else if(strncmp(request, "GET ", 4) == 0) {
+ w->mode = HTTP_REQUEST_MODE_GET;
+ path += 4;
+ }
+
+ web_client_timeout_checkpoint_set(w, 0);
+ web_client_decode_path_and_query_string(w, path);
+ path = (char *)buffer_tostring(w->url_path_decoded);
+
+ w->response.code = (short)web_client_api_request_with_node_selection(localhost, w, path);
+ web_client_timeout_checkpoint_response_ready(w, NULL);
+
+ size_t sent_bytes = 0;
+ size_t response_size = buffer_strlen(w->response.data);
+
+ bool send_plain = true;
+ int max_message_size = (int)chan->conn->max_message_size - WEBRTC_COMPRESSED_HEADER_SIZE;
+
+ if(!webrtc_dc_is_open(chan)) {
+ internal_error(true, "WEBRTC[%d],DC[%d]: ignoring API response on closed data channel.", chan->conn->pc, chan->dc);
+ goto cleanup;
+ }
+ else {
+ internal_error(true, "WEBRTC[%d],DC[%d]: prepared response with code %d, size %zu.",
+ chan->conn->pc, chan->dc, w->response.code, response_size);
+ }
+
+#if defined(ENABLE_LZ4)
+ int max_compressed_size = LZ4_compressBound((int)response_size);
+ char *compressed = mallocz(max_compressed_size);
+
+ int compressed_size = LZ4_compress_default(buffer_tostring(w->response.data), compressed,
+ (int)response_size, max_compressed_size);
+
+ if(compressed_size > 0) {
+ send_plain = false;
+ sent_bytes = webrtc_send_in_chunks(chan, compressed, compressed_size,
+ w->response.code, "LZ4", w->response.data->content_type,
+ max_message_size, true);
+ }
+ freez(compressed);
+#endif
+
+ if(send_plain)
+ sent_bytes = webrtc_send_in_chunks(chan, buffer_tostring(w->response.data), buffer_strlen(w->response.data),
+ w->response.code, "PLAIN", w->response.data->content_type,
+ max_message_size, false);
+
+ w->statistics.sent_bytes = sent_bytes;
+
+cleanup:
+ web_client_log_completed_request(w, false);
+ web_client_release_to_cache(w);
+}
+
+// ----------------------------------------------------------------------------
+// webrtc data channel
+
+static void myOpenCallback(int id __maybe_unused, void *user_ptr) {
+ webrtc_set_thread_name();
+
+ WEBRTC_DC *chan = user_ptr;
+ internal_fatal(chan->dc != id, "WEBRTC[%d],DC[%d]: dc mismatch, expected %d, got %d", chan->conn->pc, chan->dc, chan->dc, id);
+
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "WEBRTC[%d],DC[%d]: %d DATA CHANNEL '%s' OPEN", chan->conn->pc, chan->dc, gettid(), chan->label);
+ internal_error(true, "WEBRTC[%d],DC[%d]: data channel opened.", chan->conn->pc, chan->dc);
+ chan->open = true;
+}
+
+static void myClosedCallback(int id __maybe_unused, void *user_ptr) {
+ webrtc_set_thread_name();
+
+ WEBRTC_DC *chan = user_ptr;
+ internal_fatal(chan->dc != id, "WEBRTC[%d],DC[%d]: dc mismatch, expected %d, got %d", chan->conn->pc, chan->dc, chan->dc, id);
+
+ __atomic_store_n(&chan->open, false, __ATOMIC_RELAXED);
+ internal_error(true, "WEBRTC[%d],DC[%d]: data channel closed.", chan->conn->pc, chan->dc);
+
+ spinlock_lock(&chan->conn->channels.spinlock);
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(chan->conn->channels.head, chan, link.prev, link.next);
+ spinlock_unlock(&chan->conn->channels.spinlock);
+
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "WEBRTC[%d],DC[%d]: %d DATA CHANNEL '%s' CLOSED", chan->conn->pc, chan->dc, gettid(), chan->label);
+
+ freez(chan->label);
+ freez(chan);
+}
+
+static void myErrorCallback(int id __maybe_unused, const char *error, void *user_ptr) {
+ webrtc_set_thread_name();
+
+ WEBRTC_DC *chan = user_ptr;
+ internal_fatal(chan->dc != id, "WEBRTC[%d],DC[%d]: dc mismatch, expected %d, got %d", chan->conn->pc, chan->dc, chan->dc, id);
+
+ netdata_log_error("WEBRTC[%d],DC[%d]: ERROR: '%s'", chan->conn->pc, chan->dc, error);
+}
+
+static void myMessageCallback(int id __maybe_unused, const char *message, int size, void *user_ptr) {
+ webrtc_set_thread_name();
+
+ WEBRTC_DC *chan = user_ptr;
+ internal_fatal(chan->dc != id, "WEBRTC[%d],DC[%d]: dc mismatch, expected %d, got %d", chan->conn->pc, chan->dc, chan->dc, id);
+ internal_fatal(!webrtc_dc_is_open(chan), "WEBRTC[%d],DC[%d]: received message on closed channel", chan->conn->pc, chan->dc);
+
+ bool binary = (size >= 0);
+ if(size < 0)
+ size = -size;
+
+ webrtc_execute_api_request(chan, message, size, binary);
+}
+
+//#define WEBRTC_MAX_REQUEST_SIZE 65536
+//
+//static void myAvailableCallback(int id, void *user_ptr) {
+// webrtc_set_thread_name();
+//
+// WEBRTC_DC *chan = user_ptr;
+// internal_fatal(chan->dc != id, "WEBRTC[%d],DC[%d]: dc mismatch, expected %d, got %d", chan->conn->pc, chan->dc, chan->dc, id);
+//
+// internal_fatal(!chan->open, "WEBRTC[%d],DC[%d]: received message on closed channel", chan->conn->pc, chan->dc);
+//
+// int size = WEBRTC_MAX_REQUEST_SIZE;
+// char buffer[WEBRTC_MAX_REQUEST_SIZE];
+// while(rtcReceiveMessage(id, buffer, &size) == RTC_ERR_SUCCESS) {
+// bool binary = (size >= 0);
+// if(size < 0)
+// size = -size;
+//
+// webrtc_execute_api_request(chan, message, size, binary);
+// }
+//}
+
+static void myDataChannelCallback(int pc __maybe_unused, int dc, void *user_ptr) {
+ webrtc_set_thread_name();
+
+ WEBRTC_CONN *conn = user_ptr;
+ internal_fatal(conn->pc != pc, "WEBRTC[%d]: pc mismatch, expected %d, got %d", conn->pc, conn->pc, pc);
+
+ WEBRTC_DC *chan = callocz(1, sizeof(WEBRTC_DC));
+ chan->dc = dc;
+ chan->conn = conn;
+
+ spinlock_lock(&conn->channels.spinlock);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(conn->channels.head, chan, link.prev, link.next);
+ spinlock_unlock(&conn->channels.spinlock);
+
+ rtcSetUserPointer(dc, chan);
+
+ char label[1024 + 1];
+ rtcGetDataChannelLabel(dc, label, 1024);
+ label[1024] = '\0';
+
+ chan->label = strdupz(label);
+
+ if(rtcSetOpenCallback(dc, myOpenCallback) != RTC_ERR_SUCCESS)
+ netdata_log_error("WEBRTC[%d],DC[%d]: rtcSetOpenCallback() failed.", conn->pc, chan->dc);
+
+ if(rtcSetClosedCallback(dc, myClosedCallback) != RTC_ERR_SUCCESS)
+ netdata_log_error("WEBRTC[%d],DC[%d]: rtcSetClosedCallback() failed.", conn->pc, chan->dc);
+
+ if(rtcSetErrorCallback(dc, myErrorCallback) != RTC_ERR_SUCCESS)
+ netdata_log_error("WEBRTC[%d],DC[%d]: rtcSetErrorCallback() failed.", conn->pc, chan->dc);
+
+ if(rtcSetMessageCallback(dc, myMessageCallback) != RTC_ERR_SUCCESS)
+ netdata_log_error("WEBRTC[%d],DC[%d]: rtcSetMessageCallback() failed.", conn->pc, chan->dc);
+
+// if(rtcSetAvailableCallback(dc, myAvailableCallback) != RTC_ERR_SUCCESS)
+// netdata_log_error("WEBRTC[%d],DC[%d]: rtcSetAvailableCallback() failed.", conn->pc, chan->dc);
+
+ internal_error(true, "WEBRTC[%d],DC[%d]: new data channel with label '%s'", chan->conn->pc, chan->dc, chan->label);
+}
+
+// ----------------------------------------------------------------------------
+// webrtc connection
+
+static inline void webrtc_destroy_connection_unsafe(WEBRTC_CONN *conn) {
+ if(conn->state == RTC_CLOSED) {
+ spinlock_lock(&conn->channels.spinlock);
+ WEBRTC_DC *chan = conn->channels.head;
+ spinlock_unlock(&conn->channels.spinlock);
+
+ if(!chan) {
+ internal_error(true, "WEBRTC[%d]: destroying connection", conn->pc);
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(webrtc_base.unsafe.head, conn, link.prev, link.next);
+ freez(conn);
+ }
+ else {
+ internal_error(true, "WEBRTC[%d]: not destroying closed connection because it has data channels running", conn->pc);
+ }
+ }
+}
+
+static void cleanupConnections() {
+ spinlock_lock(&webrtc_base.unsafe.spinlock);
+ WEBRTC_CONN *conn = webrtc_base.unsafe.head;
+ while(conn) {
+ WEBRTC_CONN *conn_next = conn->link.next;
+ webrtc_destroy_connection_unsafe(conn);
+ conn = conn_next;
+ }
+ spinlock_unlock(&webrtc_base.unsafe.spinlock);
+}
+
+static WEBRTC_CONN * webrtc_create_connection(void) {
+ WEBRTC_CONN *conn = callocz(1, sizeof(WEBRTC_CONN));
+
+ spinlock_init(&conn->response.spinlock);
+ spinlock_init(&conn->channels.spinlock);
+
+ spinlock_lock(&webrtc_base.unsafe.spinlock);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(webrtc_base.unsafe.head, conn, link.prev, link.next);
+ spinlock_unlock(&webrtc_base.unsafe.spinlock);
+ return conn;
+}
+
+static void myDescriptionCallback(int pc __maybe_unused, const char *sdp, const char *type, void *user_ptr) {
+ webrtc_set_thread_name();
+
+ WEBRTC_CONN *conn = user_ptr;
+ internal_fatal(conn->pc != pc, "WEBRTC[%d]: pc mismatch, expected %d, got %d", conn->pc, conn->pc, pc);
+
+ internal_error(true, "WEBRTC[%d]: local description type '%s': %s", conn->pc, type, sdp);
+ spinlock_lock(&conn->response.spinlock);
+ if(!conn->response.candidates) {
+ buffer_json_member_add_string(conn->response.wb, "sdp", sdp);
+ buffer_json_member_add_string(conn->response.wb, "type", type);
+ conn->response.sdp = true;
+ }
+ spinlock_unlock(&conn->response.spinlock);
+
+ conn->local_max_message_size = find_max_message_size_in_sdp(sdp);
+}
+
+static void myCandidateCallback(int pc __maybe_unused, const char *cand, const char *mid __maybe_unused, void *user_ptr) {
+ webrtc_set_thread_name();
+
+ WEBRTC_CONN *conn = user_ptr;
+ internal_fatal(conn->pc != pc, "WEBRTC[%d]: pc mismatch, expected %d, got %d", conn->pc, conn->pc, pc);
+
+ spinlock_lock(&conn->response.spinlock);
+ if(!conn->response.candidates) {
+ buffer_json_member_add_array(conn->response.wb, "candidates");
+ conn->response.candidates = true;
+ }
+
+ internal_error(true, "WEBRTC[%d]: local candidate '%s', mid '%s'", conn->pc, cand, mid);
+ buffer_json_add_array_item_string(conn->response.wb, cand);
+ spinlock_unlock(&conn->response.spinlock);
+}
+
+static void myStateChangeCallback(int pc __maybe_unused, rtcState state, void *user_ptr) {
+ webrtc_set_thread_name();
+
+ WEBRTC_CONN *conn = user_ptr;
+ internal_fatal(conn->pc != pc, "WEBRTC[%d]: pc mismatch, expected %d, got %d", conn->pc, conn->pc, pc);
+
+ conn->state = state;
+
+ switch(state) {
+ case RTC_NEW:
+ internal_error(true, "WEBRTC[%d]: new connection...", conn->pc);
+ break;
+
+ case RTC_CONNECTING:
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "WEBRTC[%d]: %d CONNECTING", conn->pc, gettid());
+ internal_error(true, "WEBRTC[%d]: connecting...", conn->pc);
+ break;
+
+ case RTC_CONNECTED:
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "WEBRTC[%d]: %d CONNECTED", conn->pc, gettid());
+ internal_error(true, "WEBRTC[%d]: connected!", conn->pc);
+ break;
+
+ case RTC_DISCONNECTED:
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "WEBRTC[%d]: %d DISCONNECTED", conn->pc, gettid());
+ internal_error(true, "WEBRTC[%d]: disconnected.", conn->pc);
+ break;
+
+ case RTC_FAILED:
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "WEBRTC[%d]: %d CONNECTION FAILED", conn->pc, gettid());
+ internal_error(true, "WEBRTC[%d]: failed.", conn->pc);
+ break;
+
+ case RTC_CLOSED:
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "WEBRTC[%d]: %d CONNECTION CLOSED", conn->pc, gettid());
+ internal_error(true, "WEBRTC[%d]: closed.", conn->pc);
+ spinlock_lock(&webrtc_base.unsafe.spinlock);
+ webrtc_destroy_connection_unsafe(conn);
+ spinlock_unlock(&webrtc_base.unsafe.spinlock);
+ break;
+ }
+}
+
+static void myGatheringStateCallback(int pc __maybe_unused, rtcGatheringState state, void *user_ptr) {
+ webrtc_set_thread_name();
+
+ WEBRTC_CONN *conn = user_ptr;
+ internal_fatal(conn->pc != pc, "WEBRTC[%d]: pc mismatch, expected %d, got %d", conn->pc, conn->pc, pc);
+
+ conn->gathering_state = state;
+
+ switch(state) {
+ case RTC_GATHERING_NEW:
+ internal_error(true, "WEBRTC[%d]: gathering...", conn->pc);
+ break;
+
+ case RTC_GATHERING_INPROGRESS:
+ internal_error(true, "WEBRTC[%d]: gathering in progress...", conn->pc);
+ break;
+
+ case RTC_GATHERING_COMPLETE:
+ internal_error(true, "WEBRTC[%d]: gathering complete!", conn->pc);
+ break;
+ }
+}
+
+int webrtc_new_connection(const char *sdp, BUFFER *wb) {
+ if(unlikely(!webrtc_base.enabled)) {
+ buffer_flush(wb);
+ buffer_strcat(wb, "WebRTC is not enabled on this agent.");
+ wb->content_type = CT_TEXT_PLAIN;
+ return HTTP_RESP_BAD_REQUEST;
+ }
+
+ cleanupConnections();
+
+ if(unlikely(!sdp || !*sdp)) {
+ buffer_flush(wb);
+ buffer_strcat(wb, "No SDP message posted with the request");
+ wb->content_type = CT_TEXT_PLAIN;
+ return HTTP_RESP_BAD_REQUEST;
+ }
+
+ buffer_flush(wb);
+ buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_DEFAULT);
+ wb->content_type = CT_APPLICATION_JSON;
+
+ WEBRTC_CONN *conn = webrtc_create_connection();
+ conn->response.wb = wb;
+ conn->max_message_size = WEBRTC_DEFAULT_REMOTE_MAX_MESSAGE_SIZE;
+ conn->local_max_message_size = WEBRTC_OUR_MAX_MESSAGE_SIZE;
+ conn->remote_max_message_size = find_max_message_size_in_sdp(sdp);
+
+ conn->config.iceServers = (const char **)webrtc_base.iceServers;
+ conn->config.iceServersCount = webrtc_base.iceServersCount;
+ conn->config.proxyServer = webrtc_base.proxyServer;
+ conn->config.bindAddress = webrtc_base.bindAddress;
+ conn->config.certificateType = RTC_CERTIFICATE_DEFAULT;
+ conn->config.iceTransportPolicy = RTC_TRANSPORT_POLICY_ALL;
+ conn->config.enableIceTcp = true; // libnice only
+ conn->config.enableIceUdpMux = true; // libjuice only
+ conn->config.disableAutoNegotiation = false;
+ conn->config.forceMediaTransport = false;
+ conn->config.portRangeBegin = 0; // 0 means automatic
+ conn->config.portRangeEnd = 0; // 0 means automatic
+ conn->config.mtu = 0; // <= 0 means automatic
+ conn->config.maxMessageSize = WEBRTC_OUR_MAX_MESSAGE_SIZE; // <= 0 means default
+
+ conn->pc = rtcCreatePeerConnection(&conn->config);
+ rtcSetUserPointer(conn->pc, conn);
+
+ if(rtcSetLocalDescriptionCallback(conn->pc, myDescriptionCallback) != RTC_ERR_SUCCESS)
+ netdata_log_error("WEBRTC[%d]: rtcSetLocalDescriptionCallback() failed", conn->pc);
+
+ if(rtcSetLocalCandidateCallback(conn->pc, myCandidateCallback) != RTC_ERR_SUCCESS)
+ netdata_log_error("WEBRTC[%d]: rtcSetLocalCandidateCallback() failed", conn->pc);
+
+ if(rtcSetStateChangeCallback(conn->pc, myStateChangeCallback) != RTC_ERR_SUCCESS)
+ netdata_log_error("WEBRTC[%d]: rtcSetStateChangeCallback() failed", conn->pc);
+
+ if(rtcSetGatheringStateChangeCallback(conn->pc, myGatheringStateCallback) != RTC_ERR_SUCCESS)
+ netdata_log_error("WEBRTC[%d]: rtcSetGatheringStateChangeCallback() failed", conn->pc);
+
+ if(rtcSetDataChannelCallback(conn->pc, myDataChannelCallback) != RTC_ERR_SUCCESS)
+ netdata_log_error("WEBRTC[%d]: rtcSetDataChannelCallback() failed", conn->pc);
+
+ // initialize the handshake
+ internal_error(true, "WEBRTC[%d]: setting remote sdp: %s", conn->pc, sdp);
+ if(rtcSetRemoteDescription(conn->pc, sdp, "offer") != RTC_ERR_SUCCESS)
+ netdata_log_error("WEBRTC[%d]: rtcSetRemoteDescription() failed", conn->pc);
+
+ // initiate the handshake process
+ if(conn->config.disableAutoNegotiation) {
+ if(rtcSetLocalDescription(conn->pc, NULL) != RTC_ERR_SUCCESS)
+ netdata_log_error("WEBRTC[%d]: rtcSetLocalDescription() failed", conn->pc);
+ }
+
+ bool logged = false;
+ while(conn->gathering_state != RTC_GATHERING_COMPLETE) {
+ if(!logged) {
+ logged = true;
+ internal_error(true, "WEBRTC[%d]: Waiting for gathering to complete", conn->pc);
+ }
+ usleep(1000);
+ }
+
+ if(logged)
+ internal_error(true, "WEBRTC[%d]: Gathering finished, our answer is ready", conn->pc);
+
+ internal_fatal(!conn->response.sdp, "WEBRTC[%d]: response does not have an SDP: %s", conn->pc, buffer_tostring(conn->response.wb));
+ internal_fatal(!conn->response.candidates, "WEBRTC[%d]: response does not have candidates: %s", conn->pc, buffer_tostring(conn->response.wb));
+
+ conn->max_message_size = MIN(conn->local_max_message_size, conn->remote_max_message_size);
+ if(conn->max_message_size < WEBRTC_COMPRESSED_HEADER_SIZE)
+ conn->max_message_size = WEBRTC_COMPRESSED_HEADER_SIZE;
+
+ buffer_json_finalize(wb);
+
+ return HTTP_RESP_OK;
+}
+
+#else // ! HAVE_LIBDATACHANNEL
+
+void webrtc_initialize() {
+ ;
+}
+
+int webrtc_new_connection(const char *sdp __maybe_unused, BUFFER *wb) {
+ buffer_flush(wb);
+ buffer_strcat(wb, "WEBRTC is not available on this server");
+ wb->content_type = CT_TEXT_PLAIN;
+ return HTTP_RESP_BAD_REQUEST;
+}
+
+void webrtc_close_all_connections() {
+ ;
+}
+
+#endif // ! HAVE_LIBDATACHANNEL