summaryrefslogtreecommitdiffstats
path: root/web/server/h2o/streaming.c
diff options
context:
space:
mode:
Diffstat (limited to 'web/server/h2o/streaming.c')
-rw-r--r--web/server/h2o/streaming.c382
1 files changed, 382 insertions, 0 deletions
diff --git a/web/server/h2o/streaming.c b/web/server/h2o/streaming.c
new file mode 100644
index 000000000..063e487af
--- /dev/null
+++ b/web/server/h2o/streaming.c
@@ -0,0 +1,382 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "streaming.h"
+#include "connlist.h"
+#include "h2o_utils.h"
+#include "streaming/common.h"
+
+static int pending_write_reqs = 0;
+
+#define H2O2STREAM_BUF_SIZE (1024 * 1024)
+
+// h2o_stream_conn_t related functions
+void h2o_stream_conn_t_init(h2o_stream_conn_t *conn)
+{
+ memset(conn, 0, sizeof(*conn));
+ conn->rx = rbuf_create(H2O2STREAM_BUF_SIZE);
+ conn->tx = rbuf_create(H2O2STREAM_BUF_SIZE);
+
+ pthread_mutex_init(&conn->rx_buf_lock, NULL);
+ pthread_mutex_init(&conn->tx_buf_lock, NULL);
+ pthread_cond_init(&conn->rx_buf_cond, NULL);
+ // no need to check for NULL as rbuf_create uses mallocz internally
+}
+
+void h2o_stream_conn_t_destroy(h2o_stream_conn_t *conn)
+{
+ rbuf_free(conn->rx);
+ rbuf_free(conn->tx);
+
+ freez(conn->url);
+ freez(conn->user_agent);
+
+ pthread_mutex_destroy(&conn->rx_buf_lock);
+ pthread_mutex_destroy(&conn->tx_buf_lock);
+ pthread_cond_destroy(&conn->rx_buf_cond);
+}
+
+// streaming upgrade related functions
+int is_streaming_handshake(h2o_req_t *req)
+{
+ /* method */
+ if (!h2o_memis(req->input.method.base, req->input.method.len, H2O_STRLIT("GET")))
+ return 1;
+
+ if (!h2o_memis(req->path_normalized.base, req->path_normalized.len, H2O_STRLIT(NETDATA_STREAM_URL))) {
+ return 1;
+ }
+
+ /* upgrade header */
+ if (req->upgrade.base == NULL || !h2o_lcstris(req->upgrade.base, req->upgrade.len, H2O_STRLIT(NETDATA_STREAM_PROTO_NAME)))
+ return 1;
+
+ // TODO consider adding some key in form of random number
+ // to prevent caching on route especially if TLS is not used
+ // e.g. client sends random number
+ // server replies with it xored
+
+ return 0;
+}
+
+static void stream_on_close(h2o_stream_conn_t *conn);
+void stream_process(h2o_stream_conn_t *conn, int initial);
+
+void stream_on_complete(void *user_data, h2o_socket_t *sock, size_t reqsize)
+{
+ h2o_stream_conn_t *conn = user_data;
+
+ /* close the connection on error */
+ if (sock == NULL) {
+ stream_on_close(conn);
+ return;
+ }
+
+ conn->sock = sock;
+ sock->data = conn;
+
+ conn_list_insert(&conn_list, conn);
+
+ h2o_buffer_consume(&sock->input, reqsize);
+ stream_process(conn, 1);
+}
+
+// handling of active streams
+static void stream_on_close(h2o_stream_conn_t *conn)
+{
+ if (conn->sock != NULL)
+ h2o_socket_close(conn->sock);
+
+ conn_list_remove_conn(&conn_list, conn);
+
+ pthread_mutex_lock(&conn->rx_buf_lock);
+ conn->shutdown = 1;
+ pthread_cond_broadcast(&conn->rx_buf_cond);
+ pthread_mutex_unlock(&conn->rx_buf_lock);
+
+ h2o_stream_conn_t_destroy(conn);
+ freez(conn);
+}
+
+static void on_write_complete(h2o_socket_t *sock, const char *err)
+{
+ h2o_stream_conn_t *conn = sock->data;
+
+ if (err != NULL) {
+ stream_on_close(conn);
+ error_report("Streaming connection error \"%s\"", err);
+ return;
+ }
+
+ pthread_mutex_lock(&conn->tx_buf_lock);
+
+ rbuf_bump_tail(conn->tx, conn->tx_buf.len);
+
+ conn->tx_buf.base = NULL;
+ conn->tx_buf.len = 0;
+
+ pthread_mutex_unlock(&conn->tx_buf_lock);
+
+ stream_process(conn, 0);
+}
+
+static void stream_on_recv(h2o_socket_t *sock, const char *err)
+{
+ h2o_stream_conn_t *conn = sock->data;
+
+ if (err != NULL) {
+ stream_on_close(conn);
+ error_report("Streaming connection error \"%s\"", err);
+ return;
+ }
+ stream_process(conn, 0);
+}
+
+#define PARSE_DONE 1
+#define PARSE_ERROR -1
+#define GIMME_MORE_OF_DEM_SWEET_BYTEZ 0
+
+#define STREAM_METHOD "STREAM "
+#define USER_AGENT "User-Agent: "
+
+#define NEED_MIN_BYTES(buf, bytes) \
+if (rbuf_bytes_available(buf) < bytes) \
+ return GIMME_MORE_OF_DEM_SWEET_BYTEZ;
+
+// TODO check in streaming code this is probably defined somewhere already
+#define MAX_LEN_STREAM_HELLO (1024*2)
+
+static int process_STREAM_X_HTTP_1_1(http_stream_parse_state_t *parser_state, rbuf_t buf, char **url, char **user_agent)
+{
+ int idx;
+ switch(*parser_state) {
+ case HTTP_STREAM:
+ NEED_MIN_BYTES(buf, strlen(STREAM_METHOD));
+ if (rbuf_memcmp_n(buf, H2O_STRLIT(STREAM_METHOD))) {
+ error_report("Expected \"%s\"", STREAM_METHOD);
+ return PARSE_ERROR;
+ }
+ rbuf_bump_tail(buf, strlen(STREAM_METHOD));
+ *parser_state = HTTP_URL;
+ /* FALLTHROUGH */
+ case HTTP_URL:
+ if (!rbuf_find_bytes(buf, " ", 1, &idx)) {
+ if (rbuf_bytes_available(buf) >= MAX_LEN_STREAM_HELLO) {
+ error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
+ return PARSE_ERROR;
+ }
+ }
+ *url = mallocz(idx + 1);
+ rbuf_pop(buf, *url, idx);
+ (*url)[idx] = 0;
+
+ *parser_state = HTTP_PROTO;
+ /* FALLTHROUGH */
+ case HTTP_PROTO:
+ NEED_MIN_BYTES(buf, strlen(HTTP_1_1));
+ if (rbuf_memcmp_n(buf, H2O_STRLIT(HTTP_1_1))) {
+ error_report("Expected \"%s\"", HTTP_1_1);
+ return PARSE_ERROR;
+ }
+ rbuf_bump_tail(buf, strlen(HTTP_1_1));
+ *parser_state = HTTP_USER_AGENT_KEY;
+ /* FALLTHROUGH */
+ case HTTP_USER_AGENT_KEY:
+ // and OF COURSE EVERYTHING is passed in URL except
+ // for user agent which we need and is passed as HTTP header
+ // not worth writing a parser for this so we manually extract
+ // just the single header we need and skip everything else
+ if (!rbuf_find_bytes(buf, USER_AGENT, strlen(USER_AGENT), &idx)) {
+ if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) {
+ error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
+ return PARSE_ERROR;
+ }
+ return GIMME_MORE_OF_DEM_SWEET_BYTEZ;
+ }
+ rbuf_bump_tail(buf, idx + strlen(USER_AGENT));
+ *parser_state = HTTP_USER_AGENT_VALUE;
+ /* FALLTHROUGH */
+ case HTTP_USER_AGENT_VALUE:
+ if (!rbuf_find_bytes(buf, "\r\n", 2, &idx)) {
+ if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) {
+ error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
+ return PARSE_ERROR;
+ }
+ return GIMME_MORE_OF_DEM_SWEET_BYTEZ;
+ }
+
+ *user_agent = mallocz(idx + 1);
+ rbuf_pop(buf, *user_agent, idx);
+ (*user_agent)[idx] = 0;
+
+ *parser_state = HTTP_HDR;
+ /* FALLTHROUGH */
+ case HTTP_HDR:
+ if (!rbuf_find_bytes(buf, HTTP_HDR_END, strlen(HTTP_HDR_END), &idx)) {
+ if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) {
+ error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
+ return PARSE_ERROR;
+ }
+ return GIMME_MORE_OF_DEM_SWEET_BYTEZ;
+ }
+ rbuf_bump_tail(buf, idx + strlen(HTTP_HDR_END));
+
+ *parser_state = HTTP_DONE;
+ return PARSE_DONE;
+ case HTTP_DONE:
+ error_report("Parsing is done. No need to call again.");
+ return PARSE_DONE;
+ default:
+ error_report("Unknown parser state %d", (int)*parser_state);
+ return PARSE_ERROR;
+ }
+}
+
+#define SINGLE_WRITE_MAX (1024)
+
+void stream_process(h2o_stream_conn_t *conn, int initial)
+{
+ int rc;
+ struct web_client w;
+
+ pthread_mutex_lock(&conn->tx_buf_lock);
+ if (h2o_socket_is_writing(conn->sock) || rbuf_bytes_available(conn->tx)) {
+ if (rbuf_bytes_available(conn->tx) && !conn->tx_buf.base) {
+ conn->tx_buf.base = rbuf_get_linear_read_range(conn->tx, &conn->tx_buf.len);
+ if (conn->tx_buf.base) {
+ conn->tx_buf.len = MIN(conn->tx_buf.len, SINGLE_WRITE_MAX);
+ h2o_socket_write(conn->sock, &conn->tx_buf, 1, on_write_complete);
+ }
+ }
+ }
+ pthread_mutex_unlock(&conn->tx_buf_lock);
+
+ if (initial)
+ h2o_socket_read_start(conn->sock, stream_on_recv);
+
+ if (conn->sock->input->size) {
+ size_t insert_max;
+ pthread_mutex_lock(&conn->rx_buf_lock);
+ char *insert_loc = rbuf_get_linear_insert_range(conn->rx, &insert_max);
+ if (insert_loc == NULL) {
+ pthread_cond_broadcast(&conn->rx_buf_cond);
+ pthread_mutex_unlock(&conn->rx_buf_lock);
+ return;
+ }
+ insert_max = MIN(insert_max, conn->sock->input->size);
+ memcpy(insert_loc, conn->sock->input->bytes, insert_max);
+ rbuf_bump_head(conn->rx, insert_max);
+
+ h2o_buffer_consume(&conn->sock->input, insert_max);
+
+ pthread_cond_broadcast(&conn->rx_buf_cond);
+ pthread_mutex_unlock(&conn->rx_buf_lock);
+ }
+
+ switch (conn->state) {
+ case STREAM_X_HTTP_1_1:
+ // no conn->rx lock here as at this point we are still single threaded
+ // until we call rrdpush_receiver_thread_spawn() later down
+ rc = process_STREAM_X_HTTP_1_1(&conn->parse_state, conn->rx, &conn->url, &conn->user_agent);
+ if (rc == PARSE_ERROR) {
+ error_report("error parsing the STREAM hello");
+ break;
+ }
+ if (rc != PARSE_DONE)
+ break;
+ conn->state = STREAM_X_HTTP_1_1_DONE;
+ /* FALLTHROUGH */
+ case STREAM_X_HTTP_1_1_DONE:
+ memset(&w, 0, sizeof(w));
+ w.response.data = buffer_create(1024, NULL);
+
+ // get client ip from the conn->sock
+ struct sockaddr client;
+ socklen_t len = h2o_socket_getpeername(conn->sock, &client);
+ char peername[NI_MAXHOST];
+ size_t peername_len = h2o_socket_getnumerichost(&client, len, peername);
+ size_t cpy_len = sizeof(w.client_ip) < peername_len ? sizeof(w.client_ip) : peername_len;
+ memcpy(w.client_ip, peername, cpy_len);
+ w.client_ip[cpy_len - 1] = 0;
+ w.user_agent = conn->user_agent;
+
+ rc = rrdpush_receiver_thread_spawn(&w, conn->url, conn);
+ if (rc != HTTP_RESP_OK) {
+ error_report("HTTPD Failed to spawn the receiver thread %d", rc);
+ conn->state = STREAM_CLOSE;
+ stream_on_close(conn);
+ } else {
+ conn->state = STREAM_ACTIVE;
+ }
+ buffer_free(w.response.data);
+ /* FALLTHROUGH */
+ case STREAM_ACTIVE:
+ break;
+ default:
+ error_report("Unknown conn->state");
+ }
+}
+
+// read and write functions to be used by streaming parser
+int h2o_stream_write(void *ctx, const char *data, size_t data_len)
+{
+ h2o_stream_conn_t *conn = (h2o_stream_conn_t *)ctx;
+
+ pthread_mutex_lock(&conn->tx_buf_lock);
+ size_t avail = rbuf_bytes_free(conn->tx);
+ avail = MIN(avail, data_len);
+ rbuf_push(conn->tx, data, avail);
+ pthread_mutex_unlock(&conn->tx_buf_lock);
+ __atomic_add_fetch(&pending_write_reqs, 1, __ATOMIC_SEQ_CST);
+ return avail;
+}
+
+size_t h2o_stream_read(void *ctx, char *buf, size_t read_bytes)
+{
+ int ret;
+ h2o_stream_conn_t *conn = (h2o_stream_conn_t *)ctx;
+
+ pthread_mutex_lock(&conn->rx_buf_lock);
+ size_t avail = rbuf_bytes_available(conn->rx);
+
+ if (!avail) {
+ if (conn->shutdown) {
+ pthread_mutex_unlock(&conn->rx_buf_lock);
+ return -1;
+ }
+ pthread_cond_wait(&conn->rx_buf_cond, &conn->rx_buf_lock);
+ if (conn->shutdown) {
+ pthread_mutex_unlock(&conn->rx_buf_lock);
+ return -1;
+ }
+ avail = rbuf_bytes_available(conn->rx);
+ if (!avail) {
+ pthread_mutex_unlock(&conn->rx_buf_lock);
+ return 0;
+ }
+ }
+
+ avail = MIN(avail, read_bytes);
+
+ ret = rbuf_pop(conn->rx, buf, avail);
+ pthread_mutex_unlock(&conn->rx_buf_lock);
+
+ return ret;
+}
+
+// periodic check for pending write requests
+void check_tx_buf(h2o_stream_conn_t *conn)
+{
+ pthread_mutex_lock(&conn->tx_buf_lock);
+ if (rbuf_bytes_available(conn->tx)) {
+ pthread_mutex_unlock(&conn->tx_buf_lock);
+ stream_process(conn, 0);
+ } else
+ pthread_mutex_unlock(&conn->tx_buf_lock);
+}
+
+void h2o_stream_check_pending_write_reqs(void)
+{
+ int _write_reqs = __atomic_exchange_n(&pending_write_reqs, 0, __ATOMIC_SEQ_CST);
+ if (_write_reqs > 0)
+ conn_list_iter_all(&conn_list, check_tx_buf);
+}