diff options
Diffstat (limited to 'web/server/h2o/streaming.c')
-rw-r--r-- | web/server/h2o/streaming.c | 382 |
1 files changed, 0 insertions, 382 deletions
diff --git a/web/server/h2o/streaming.c b/web/server/h2o/streaming.c deleted file mode 100644 index 063e487af..000000000 --- a/web/server/h2o/streaming.c +++ /dev/null @@ -1,382 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "streaming.h" -#include "connlist.h" -#include "h2o_utils.h" -#include "streaming/common.h" - -static int pending_write_reqs = 0; - -#define H2O2STREAM_BUF_SIZE (1024 * 1024) - -// h2o_stream_conn_t related functions -void h2o_stream_conn_t_init(h2o_stream_conn_t *conn) -{ - memset(conn, 0, sizeof(*conn)); - conn->rx = rbuf_create(H2O2STREAM_BUF_SIZE); - conn->tx = rbuf_create(H2O2STREAM_BUF_SIZE); - - pthread_mutex_init(&conn->rx_buf_lock, NULL); - pthread_mutex_init(&conn->tx_buf_lock, NULL); - pthread_cond_init(&conn->rx_buf_cond, NULL); - // no need to check for NULL as rbuf_create uses mallocz internally -} - -void h2o_stream_conn_t_destroy(h2o_stream_conn_t *conn) -{ - rbuf_free(conn->rx); - rbuf_free(conn->tx); - - freez(conn->url); - freez(conn->user_agent); - - pthread_mutex_destroy(&conn->rx_buf_lock); - pthread_mutex_destroy(&conn->tx_buf_lock); - pthread_cond_destroy(&conn->rx_buf_cond); -} - -// streaming upgrade related functions -int is_streaming_handshake(h2o_req_t *req) -{ - /* method */ - if (!h2o_memis(req->input.method.base, req->input.method.len, H2O_STRLIT("GET"))) - return 1; - - if (!h2o_memis(req->path_normalized.base, req->path_normalized.len, H2O_STRLIT(NETDATA_STREAM_URL))) { - return 1; - } - - /* upgrade header */ - if (req->upgrade.base == NULL || !h2o_lcstris(req->upgrade.base, req->upgrade.len, H2O_STRLIT(NETDATA_STREAM_PROTO_NAME))) - return 1; - - // TODO consider adding some key in form of random number - // to prevent caching on route especially if TLS is not used - // e.g. client sends random number - // server replies with it xored - - return 0; -} - -static void stream_on_close(h2o_stream_conn_t *conn); -void stream_process(h2o_stream_conn_t *conn, int initial); - -void stream_on_complete(void *user_data, h2o_socket_t *sock, size_t reqsize) -{ - h2o_stream_conn_t *conn = user_data; - - /* close the connection on error */ - if (sock == NULL) { - stream_on_close(conn); - return; - } - - conn->sock = sock; - sock->data = conn; - - conn_list_insert(&conn_list, conn); - - h2o_buffer_consume(&sock->input, reqsize); - stream_process(conn, 1); -} - -// handling of active streams -static void stream_on_close(h2o_stream_conn_t *conn) -{ - if (conn->sock != NULL) - h2o_socket_close(conn->sock); - - conn_list_remove_conn(&conn_list, conn); - - pthread_mutex_lock(&conn->rx_buf_lock); - conn->shutdown = 1; - pthread_cond_broadcast(&conn->rx_buf_cond); - pthread_mutex_unlock(&conn->rx_buf_lock); - - h2o_stream_conn_t_destroy(conn); - freez(conn); -} - -static void on_write_complete(h2o_socket_t *sock, const char *err) -{ - h2o_stream_conn_t *conn = sock->data; - - if (err != NULL) { - stream_on_close(conn); - error_report("Streaming connection error \"%s\"", err); - return; - } - - pthread_mutex_lock(&conn->tx_buf_lock); - - rbuf_bump_tail(conn->tx, conn->tx_buf.len); - - conn->tx_buf.base = NULL; - conn->tx_buf.len = 0; - - pthread_mutex_unlock(&conn->tx_buf_lock); - - stream_process(conn, 0); -} - -static void stream_on_recv(h2o_socket_t *sock, const char *err) -{ - h2o_stream_conn_t *conn = sock->data; - - if (err != NULL) { - stream_on_close(conn); - error_report("Streaming connection error \"%s\"", err); - return; - } - stream_process(conn, 0); -} - -#define PARSE_DONE 1 -#define PARSE_ERROR -1 -#define GIMME_MORE_OF_DEM_SWEET_BYTEZ 0 - -#define STREAM_METHOD "STREAM " -#define USER_AGENT "User-Agent: " - -#define NEED_MIN_BYTES(buf, bytes) \ -if (rbuf_bytes_available(buf) < bytes) \ - return GIMME_MORE_OF_DEM_SWEET_BYTEZ; - -// TODO check in streaming code this is probably defined somewhere already -#define MAX_LEN_STREAM_HELLO (1024*2) - -static int process_STREAM_X_HTTP_1_1(http_stream_parse_state_t *parser_state, rbuf_t buf, char **url, char **user_agent) -{ - int idx; - switch(*parser_state) { - case HTTP_STREAM: - NEED_MIN_BYTES(buf, strlen(STREAM_METHOD)); - if (rbuf_memcmp_n(buf, H2O_STRLIT(STREAM_METHOD))) { - error_report("Expected \"%s\"", STREAM_METHOD); - return PARSE_ERROR; - } - rbuf_bump_tail(buf, strlen(STREAM_METHOD)); - *parser_state = HTTP_URL; - /* FALLTHROUGH */ - case HTTP_URL: - if (!rbuf_find_bytes(buf, " ", 1, &idx)) { - if (rbuf_bytes_available(buf) >= MAX_LEN_STREAM_HELLO) { - error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO); - return PARSE_ERROR; - } - } - *url = mallocz(idx + 1); - rbuf_pop(buf, *url, idx); - (*url)[idx] = 0; - - *parser_state = HTTP_PROTO; - /* FALLTHROUGH */ - case HTTP_PROTO: - NEED_MIN_BYTES(buf, strlen(HTTP_1_1)); - if (rbuf_memcmp_n(buf, H2O_STRLIT(HTTP_1_1))) { - error_report("Expected \"%s\"", HTTP_1_1); - return PARSE_ERROR; - } - rbuf_bump_tail(buf, strlen(HTTP_1_1)); - *parser_state = HTTP_USER_AGENT_KEY; - /* FALLTHROUGH */ - case HTTP_USER_AGENT_KEY: - // and OF COURSE EVERYTHING is passed in URL except - // for user agent which we need and is passed as HTTP header - // not worth writing a parser for this so we manually extract - // just the single header we need and skip everything else - if (!rbuf_find_bytes(buf, USER_AGENT, strlen(USER_AGENT), &idx)) { - if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) { - error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO); - return PARSE_ERROR; - } - return GIMME_MORE_OF_DEM_SWEET_BYTEZ; - } - rbuf_bump_tail(buf, idx + strlen(USER_AGENT)); - *parser_state = HTTP_USER_AGENT_VALUE; - /* FALLTHROUGH */ - case HTTP_USER_AGENT_VALUE: - if (!rbuf_find_bytes(buf, "\r\n", 2, &idx)) { - if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) { - error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO); - return PARSE_ERROR; - } - return GIMME_MORE_OF_DEM_SWEET_BYTEZ; - } - - *user_agent = mallocz(idx + 1); - rbuf_pop(buf, *user_agent, idx); - (*user_agent)[idx] = 0; - - *parser_state = HTTP_HDR; - /* FALLTHROUGH */ - case HTTP_HDR: - if (!rbuf_find_bytes(buf, HTTP_HDR_END, strlen(HTTP_HDR_END), &idx)) { - if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) { - error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO); - return PARSE_ERROR; - } - return GIMME_MORE_OF_DEM_SWEET_BYTEZ; - } - rbuf_bump_tail(buf, idx + strlen(HTTP_HDR_END)); - - *parser_state = HTTP_DONE; - return PARSE_DONE; - case HTTP_DONE: - error_report("Parsing is done. No need to call again."); - return PARSE_DONE; - default: - error_report("Unknown parser state %d", (int)*parser_state); - return PARSE_ERROR; - } -} - -#define SINGLE_WRITE_MAX (1024) - -void stream_process(h2o_stream_conn_t *conn, int initial) -{ - int rc; - struct web_client w; - - pthread_mutex_lock(&conn->tx_buf_lock); - if (h2o_socket_is_writing(conn->sock) || rbuf_bytes_available(conn->tx)) { - if (rbuf_bytes_available(conn->tx) && !conn->tx_buf.base) { - conn->tx_buf.base = rbuf_get_linear_read_range(conn->tx, &conn->tx_buf.len); - if (conn->tx_buf.base) { - conn->tx_buf.len = MIN(conn->tx_buf.len, SINGLE_WRITE_MAX); - h2o_socket_write(conn->sock, &conn->tx_buf, 1, on_write_complete); - } - } - } - pthread_mutex_unlock(&conn->tx_buf_lock); - - if (initial) - h2o_socket_read_start(conn->sock, stream_on_recv); - - if (conn->sock->input->size) { - size_t insert_max; - pthread_mutex_lock(&conn->rx_buf_lock); - char *insert_loc = rbuf_get_linear_insert_range(conn->rx, &insert_max); - if (insert_loc == NULL) { - pthread_cond_broadcast(&conn->rx_buf_cond); - pthread_mutex_unlock(&conn->rx_buf_lock); - return; - } - insert_max = MIN(insert_max, conn->sock->input->size); - memcpy(insert_loc, conn->sock->input->bytes, insert_max); - rbuf_bump_head(conn->rx, insert_max); - - h2o_buffer_consume(&conn->sock->input, insert_max); - - pthread_cond_broadcast(&conn->rx_buf_cond); - pthread_mutex_unlock(&conn->rx_buf_lock); - } - - switch (conn->state) { - case STREAM_X_HTTP_1_1: - // no conn->rx lock here as at this point we are still single threaded - // until we call rrdpush_receiver_thread_spawn() later down - rc = process_STREAM_X_HTTP_1_1(&conn->parse_state, conn->rx, &conn->url, &conn->user_agent); - if (rc == PARSE_ERROR) { - error_report("error parsing the STREAM hello"); - break; - } - if (rc != PARSE_DONE) - break; - conn->state = STREAM_X_HTTP_1_1_DONE; - /* FALLTHROUGH */ - case STREAM_X_HTTP_1_1_DONE: - memset(&w, 0, sizeof(w)); - w.response.data = buffer_create(1024, NULL); - - // get client ip from the conn->sock - struct sockaddr client; - socklen_t len = h2o_socket_getpeername(conn->sock, &client); - char peername[NI_MAXHOST]; - size_t peername_len = h2o_socket_getnumerichost(&client, len, peername); - size_t cpy_len = sizeof(w.client_ip) < peername_len ? sizeof(w.client_ip) : peername_len; - memcpy(w.client_ip, peername, cpy_len); - w.client_ip[cpy_len - 1] = 0; - w.user_agent = conn->user_agent; - - rc = rrdpush_receiver_thread_spawn(&w, conn->url, conn); - if (rc != HTTP_RESP_OK) { - error_report("HTTPD Failed to spawn the receiver thread %d", rc); - conn->state = STREAM_CLOSE; - stream_on_close(conn); - } else { - conn->state = STREAM_ACTIVE; - } - buffer_free(w.response.data); - /* FALLTHROUGH */ - case STREAM_ACTIVE: - break; - default: - error_report("Unknown conn->state"); - } -} - -// read and write functions to be used by streaming parser -int h2o_stream_write(void *ctx, const char *data, size_t data_len) -{ - h2o_stream_conn_t *conn = (h2o_stream_conn_t *)ctx; - - pthread_mutex_lock(&conn->tx_buf_lock); - size_t avail = rbuf_bytes_free(conn->tx); - avail = MIN(avail, data_len); - rbuf_push(conn->tx, data, avail); - pthread_mutex_unlock(&conn->tx_buf_lock); - __atomic_add_fetch(&pending_write_reqs, 1, __ATOMIC_SEQ_CST); - return avail; -} - -size_t h2o_stream_read(void *ctx, char *buf, size_t read_bytes) -{ - int ret; - h2o_stream_conn_t *conn = (h2o_stream_conn_t *)ctx; - - pthread_mutex_lock(&conn->rx_buf_lock); - size_t avail = rbuf_bytes_available(conn->rx); - - if (!avail) { - if (conn->shutdown) { - pthread_mutex_unlock(&conn->rx_buf_lock); - return -1; - } - pthread_cond_wait(&conn->rx_buf_cond, &conn->rx_buf_lock); - if (conn->shutdown) { - pthread_mutex_unlock(&conn->rx_buf_lock); - return -1; - } - avail = rbuf_bytes_available(conn->rx); - if (!avail) { - pthread_mutex_unlock(&conn->rx_buf_lock); - return 0; - } - } - - avail = MIN(avail, read_bytes); - - ret = rbuf_pop(conn->rx, buf, avail); - pthread_mutex_unlock(&conn->rx_buf_lock); - - return ret; -} - -// periodic check for pending write requests -void check_tx_buf(h2o_stream_conn_t *conn) -{ - pthread_mutex_lock(&conn->tx_buf_lock); - if (rbuf_bytes_available(conn->tx)) { - pthread_mutex_unlock(&conn->tx_buf_lock); - stream_process(conn, 0); - } else - pthread_mutex_unlock(&conn->tx_buf_lock); -} - -void h2o_stream_check_pending_write_reqs(void) -{ - int _write_reqs = __atomic_exchange_n(&pending_write_reqs, 0, __ATOMIC_SEQ_CST); - if (_write_reqs > 0) - conn_list_iter_all(&conn_list, check_tx_buf); -} |