// SPDX-License-Identifier: GPL-3.0-or-later #include "streaming.h" #include "connlist.h" #include "h2o_utils.h" #include "streaming/common.h" static int pending_write_reqs = 0; #define H2O2STREAM_BUF_SIZE (1024 * 1024) // h2o_stream_conn_t related functions void h2o_stream_conn_t_init(h2o_stream_conn_t *conn) { memset(conn, 0, sizeof(*conn)); conn->rx = rbuf_create(H2O2STREAM_BUF_SIZE); conn->tx = rbuf_create(H2O2STREAM_BUF_SIZE); pthread_mutex_init(&conn->rx_buf_lock, NULL); pthread_mutex_init(&conn->tx_buf_lock, NULL); pthread_cond_init(&conn->rx_buf_cond, NULL); // no need to check for NULL as rbuf_create uses mallocz internally } void h2o_stream_conn_t_destroy(h2o_stream_conn_t *conn) { rbuf_free(conn->rx); rbuf_free(conn->tx); freez(conn->url); freez(conn->user_agent); pthread_mutex_destroy(&conn->rx_buf_lock); pthread_mutex_destroy(&conn->tx_buf_lock); pthread_cond_destroy(&conn->rx_buf_cond); } // streaming upgrade related functions int is_streaming_handshake(h2o_req_t *req) { /* method */ if (!h2o_memis(req->input.method.base, req->input.method.len, H2O_STRLIT("GET"))) return 1; if (!h2o_memis(req->path_normalized.base, req->path_normalized.len, H2O_STRLIT(NETDATA_STREAM_URL))) { return 1; } /* upgrade header */ if (req->upgrade.base == NULL || !h2o_lcstris(req->upgrade.base, req->upgrade.len, H2O_STRLIT(NETDATA_STREAM_PROTO_NAME))) return 1; // TODO consider adding some key in form of random number // to prevent caching on route especially if TLS is not used // e.g. client sends random number // server replies with it xored return 0; } static void stream_on_close(h2o_stream_conn_t *conn); void stream_process(h2o_stream_conn_t *conn, int initial); void stream_on_complete(void *user_data, h2o_socket_t *sock, size_t reqsize) { h2o_stream_conn_t *conn = user_data; /* close the connection on error */ if (sock == NULL) { stream_on_close(conn); return; } conn->sock = sock; sock->data = conn; conn_list_insert(&conn_list, conn); h2o_buffer_consume(&sock->input, reqsize); stream_process(conn, 1); } // handling of active streams static void stream_on_close(h2o_stream_conn_t *conn) { if (conn->sock != NULL) h2o_socket_close(conn->sock); conn_list_remove_conn(&conn_list, conn); pthread_mutex_lock(&conn->rx_buf_lock); conn->shutdown = 1; pthread_cond_broadcast(&conn->rx_buf_cond); pthread_mutex_unlock(&conn->rx_buf_lock); h2o_stream_conn_t_destroy(conn); freez(conn); } static void on_write_complete(h2o_socket_t *sock, const char *err) { h2o_stream_conn_t *conn = sock->data; if (err != NULL) { stream_on_close(conn); error_report("Streaming connection error \"%s\"", err); return; } pthread_mutex_lock(&conn->tx_buf_lock); rbuf_bump_tail(conn->tx, conn->tx_buf.len); conn->tx_buf.base = NULL; conn->tx_buf.len = 0; pthread_mutex_unlock(&conn->tx_buf_lock); stream_process(conn, 0); } static void stream_on_recv(h2o_socket_t *sock, const char *err) { h2o_stream_conn_t *conn = sock->data; if (err != NULL) { stream_on_close(conn); error_report("Streaming connection error \"%s\"", err); return; } stream_process(conn, 0); } #define PARSE_DONE 1 #define PARSE_ERROR -1 #define GIMME_MORE_OF_DEM_SWEET_BYTEZ 0 #define STREAM_METHOD "STREAM " #define USER_AGENT "User-Agent: " #define NEED_MIN_BYTES(buf, bytes) \ if (rbuf_bytes_available(buf) < bytes) \ return GIMME_MORE_OF_DEM_SWEET_BYTEZ; // TODO check in streaming code this is probably defined somewhere already #define MAX_LEN_STREAM_HELLO (1024*2) static int process_STREAM_X_HTTP_1_1(http_stream_parse_state_t *parser_state, rbuf_t buf, char **url, char **user_agent) { int idx; switch(*parser_state) { case HTTP_STREAM: NEED_MIN_BYTES(buf, strlen(STREAM_METHOD)); if (rbuf_memcmp_n(buf, H2O_STRLIT(STREAM_METHOD))) { error_report("Expected \"%s\"", STREAM_METHOD); return PARSE_ERROR; } rbuf_bump_tail(buf, strlen(STREAM_METHOD)); *parser_state = HTTP_URL; /* FALLTHROUGH */ case HTTP_URL: if (!rbuf_find_bytes(buf, " ", 1, &idx)) { if (rbuf_bytes_available(buf) >= MAX_LEN_STREAM_HELLO) { error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO); return PARSE_ERROR; } } *url = mallocz(idx + 1); rbuf_pop(buf, *url, idx); (*url)[idx] = 0; *parser_state = HTTP_PROTO; /* FALLTHROUGH */ case HTTP_PROTO: NEED_MIN_BYTES(buf, strlen(HTTP_1_1)); if (rbuf_memcmp_n(buf, H2O_STRLIT(HTTP_1_1))) { error_report("Expected \"%s\"", HTTP_1_1); return PARSE_ERROR; } rbuf_bump_tail(buf, strlen(HTTP_1_1)); *parser_state = HTTP_USER_AGENT_KEY; /* FALLTHROUGH */ case HTTP_USER_AGENT_KEY: // and OF COURSE EVERYTHING is passed in URL except // for user agent which we need and is passed as HTTP header // not worth writing a parser for this so we manually extract // just the single header we need and skip everything else if (!rbuf_find_bytes(buf, USER_AGENT, strlen(USER_AGENT), &idx)) { if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) { error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO); return PARSE_ERROR; } return GIMME_MORE_OF_DEM_SWEET_BYTEZ; } rbuf_bump_tail(buf, idx + strlen(USER_AGENT)); *parser_state = HTTP_USER_AGENT_VALUE; /* FALLTHROUGH */ case HTTP_USER_AGENT_VALUE: if (!rbuf_find_bytes(buf, "\r\n", 2, &idx)) { if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) { error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO); return PARSE_ERROR; } return GIMME_MORE_OF_DEM_SWEET_BYTEZ; } *user_agent = mallocz(idx + 1); rbuf_pop(buf, *user_agent, idx); (*user_agent)[idx] = 0; *parser_state = HTTP_HDR; /* FALLTHROUGH */ case HTTP_HDR: if (!rbuf_find_bytes(buf, HTTP_HDR_END, strlen(HTTP_HDR_END), &idx)) { if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) { error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO); return PARSE_ERROR; } return GIMME_MORE_OF_DEM_SWEET_BYTEZ; } rbuf_bump_tail(buf, idx + strlen(HTTP_HDR_END)); *parser_state = HTTP_DONE; return PARSE_DONE; case HTTP_DONE: error_report("Parsing is done. No need to call again."); return PARSE_DONE; default: error_report("Unknown parser state %d", (int)*parser_state); return PARSE_ERROR; } } #define SINGLE_WRITE_MAX (1024) void stream_process(h2o_stream_conn_t *conn, int initial) { int rc; struct web_client w; pthread_mutex_lock(&conn->tx_buf_lock); if (h2o_socket_is_writing(conn->sock) || rbuf_bytes_available(conn->tx)) { if (rbuf_bytes_available(conn->tx) && !conn->tx_buf.base) { conn->tx_buf.base = rbuf_get_linear_read_range(conn->tx, &conn->tx_buf.len); if (conn->tx_buf.base) { conn->tx_buf.len = MIN(conn->tx_buf.len, SINGLE_WRITE_MAX); h2o_socket_write(conn->sock, &conn->tx_buf, 1, on_write_complete); } } } pthread_mutex_unlock(&conn->tx_buf_lock); if (initial) h2o_socket_read_start(conn->sock, stream_on_recv); if (conn->sock->input->size) { size_t insert_max; pthread_mutex_lock(&conn->rx_buf_lock); char *insert_loc = rbuf_get_linear_insert_range(conn->rx, &insert_max); if (insert_loc == NULL) { pthread_cond_broadcast(&conn->rx_buf_cond); pthread_mutex_unlock(&conn->rx_buf_lock); return; } insert_max = MIN(insert_max, conn->sock->input->size); memcpy(insert_loc, conn->sock->input->bytes, insert_max); rbuf_bump_head(conn->rx, insert_max); h2o_buffer_consume(&conn->sock->input, insert_max); pthread_cond_broadcast(&conn->rx_buf_cond); pthread_mutex_unlock(&conn->rx_buf_lock); } switch (conn->state) { case STREAM_X_HTTP_1_1: // no conn->rx lock here as at this point we are still single threaded // until we call rrdpush_receiver_thread_spawn() later down rc = process_STREAM_X_HTTP_1_1(&conn->parse_state, conn->rx, &conn->url, &conn->user_agent); if (rc == PARSE_ERROR) { error_report("error parsing the STREAM hello"); break; } if (rc != PARSE_DONE) break; conn->state = STREAM_X_HTTP_1_1_DONE; /* FALLTHROUGH */ case STREAM_X_HTTP_1_1_DONE: memset(&w, 0, sizeof(w)); w.response.data = buffer_create(1024, NULL); // get client ip from the conn->sock struct sockaddr client; socklen_t len = h2o_socket_getpeername(conn->sock, &client); char peername[NI_MAXHOST]; size_t peername_len = h2o_socket_getnumerichost(&client, len, peername); size_t cpy_len = sizeof(w.client_ip) < peername_len ? sizeof(w.client_ip) : peername_len; memcpy(w.client_ip, peername, cpy_len); w.client_ip[cpy_len - 1] = 0; w.user_agent = conn->user_agent; rc = rrdpush_receiver_thread_spawn(&w, conn->url, conn); if (rc != HTTP_RESP_OK) { error_report("HTTPD Failed to spawn the receiver thread %d", rc); conn->state = STREAM_CLOSE; stream_on_close(conn); } else { conn->state = STREAM_ACTIVE; } buffer_free(w.response.data); /* FALLTHROUGH */ case STREAM_ACTIVE: break; default: error_report("Unknown conn->state"); } } // read and write functions to be used by streaming parser int h2o_stream_write(void *ctx, const char *data, size_t data_len) { h2o_stream_conn_t *conn = (h2o_stream_conn_t *)ctx; pthread_mutex_lock(&conn->tx_buf_lock); size_t avail = rbuf_bytes_free(conn->tx); avail = MIN(avail, data_len); rbuf_push(conn->tx, data, avail); pthread_mutex_unlock(&conn->tx_buf_lock); __atomic_add_fetch(&pending_write_reqs, 1, __ATOMIC_SEQ_CST); return avail; } size_t h2o_stream_read(void *ctx, char *buf, size_t read_bytes) { int ret; h2o_stream_conn_t *conn = (h2o_stream_conn_t *)ctx; pthread_mutex_lock(&conn->rx_buf_lock); size_t avail = rbuf_bytes_available(conn->rx); if (!avail) { if (conn->shutdown) { pthread_mutex_unlock(&conn->rx_buf_lock); return -1; } pthread_cond_wait(&conn->rx_buf_cond, &conn->rx_buf_lock); if (conn->shutdown) { pthread_mutex_unlock(&conn->rx_buf_lock); return -1; } avail = rbuf_bytes_available(conn->rx); if (!avail) { pthread_mutex_unlock(&conn->rx_buf_lock); return 0; } } avail = MIN(avail, read_bytes); ret = rbuf_pop(conn->rx, buf, avail); pthread_mutex_unlock(&conn->rx_buf_lock); return ret; } // periodic check for pending write requests void check_tx_buf(h2o_stream_conn_t *conn) { pthread_mutex_lock(&conn->tx_buf_lock); if (rbuf_bytes_available(conn->tx)) { pthread_mutex_unlock(&conn->tx_buf_lock); stream_process(conn, 0); } else pthread_mutex_unlock(&conn->tx_buf_lock); } void h2o_stream_check_pending_write_reqs(void) { int _write_reqs = __atomic_exchange_n(&pending_write_reqs, 0, __ATOMIC_SEQ_CST); if (_write_reqs > 0) conn_list_iter_all(&conn_list, check_tx_buf); }