diff options
Diffstat (limited to '')
-rw-r--r-- | test/clients/.gitignore | 1 | ||||
-rw-r--r-- | test/clients/Makefile.in | 20 | ||||
-rw-r--r-- | test/clients/h2ws.c | 1110 |
3 files changed, 1131 insertions, 0 deletions
diff --git a/test/clients/.gitignore b/test/clients/.gitignore new file mode 100644 index 0000000..18b1263 --- /dev/null +++ b/test/clients/.gitignore @@ -0,0 +1 @@ +h2ws
\ No newline at end of file diff --git a/test/clients/Makefile.in b/test/clients/Makefile.in new file mode 100644 index 0000000..a322a58 --- /dev/null +++ b/test/clients/Makefile.in @@ -0,0 +1,20 @@ +DISTCLEAN_TARGETS = h2ws + +CLEAN_TARGETS = h2ws + +bin_PROGRAMS = h2ws +TARGETS = $(bin_PROGRAMS) + +PROGRAM_LDADD = $(UTIL_LDFLAGS) $(PROGRAM_DEPENDENCIES) $(EXTRA_LIBS) $(AP_LIBS) +PROGRAM_DEPENDENCIES = + +include $(top_builddir)/build/rules.mk + +h2ws.lo: h2ws.c + $(LIBTOOL) --mode=compile $(CC) $(ab_CFLAGS) $(ALL_CFLAGS) $(ALL_CPPFLAGS) \ + $(ALL_INCLUDES) $(PICFLAGS) $(LTCFLAGS) -c $< && touch $@ +h2ws_OBJECTS = h2ws.lo +h2ws_LDADD = -lnghttp2 +h2ws: $(h2ws_OBJECTS) + $(LIBTOOL) --mode=link $(CC) $(ALL_CFLAGS) $(PILDFLAGS) \ + $(LT_LDFLAGS) $(ALL_LDFLAGS) -o $@ $(h2ws_LTFLAGS) $(h2ws_OBJECTS) $(h2ws_LDADD) diff --git a/test/clients/h2ws.c b/test/clients/h2ws.c new file mode 100644 index 0000000..1de3876 --- /dev/null +++ b/test/clients/h2ws.c @@ -0,0 +1,1110 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <apr.h> + +#include <assert.h> +#include <inttypes.h> +#include <stdlib.h> +#ifdef APR_HAVE_UNISTD_H +# include <unistd.h> +#endif /* HAVE_UNISTD_H */ +#ifdef APR_HAVE_FCNTL_H +# include <fcntl.h> +#endif /* HAVE_FCNTL_H */ +#include <sys/types.h> +#include <sys/time.h> +#ifdef APR_HAVE_SYS_SOCKET_H +# include <sys/socket.h> +#endif /* HAVE_SYS_SOCKET_H */ +#ifdef APR_HAVE_NETDB_H +# include <netdb.h> +#endif /* HAVE_NETDB_H */ +#ifdef APR_HAVE_NETINET_IN_H +# include <netinet/in.h> +#endif /* HAVE_NETINET_IN_H */ +#include <netinet/tcp.h> +#include <poll.h> +#include <signal.h> +#include <stdio.h> +#include <string.h> +#include <time.h> +#include <errno.h> + +#include <nghttp2/nghttp2.h> + +#define MAKE_NV(NAME, VALUE) \ + { \ + (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, sizeof(VALUE) - 1, \ + NGHTTP2_NV_FLAG_NONE \ + } + +#define MAKE_NV_CS(NAME, VALUE) \ + { \ + (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, strlen(VALUE), \ + NGHTTP2_NV_FLAG_NONE \ + } + + +static int verbose; +static const char *cmd; + +static void log_out(const char *level, const char *where, const char *msg) +{ + struct timespec tp; + struct tm tm; + char timebuf[128]; + + clock_gettime(CLOCK_REALTIME, &tp); + localtime_r(&tp.tv_sec, &tm); + strftime(timebuf, sizeof(timebuf)-1, "%H:%M:%S", &tm); + fprintf(stderr, "[%s.%09lu][%s][%s] %s\n", timebuf, tp.tv_nsec, level, where, msg); +} + +static void log_err(const char *where, const char *msg) +{ + log_out("ERROR", where, msg); +} + +static void log_info(const char *where, const char *msg) +{ + if (verbose) + log_out("INFO", where, msg); +} + +static void log_debug(const char *where, const char *msg) +{ + if (verbose > 1) + log_out("DEBUG", where, msg); +} + +#if defined(__GNUC__) + __attribute__((format(printf, 2, 3))) +#endif +static void log_errf(const char *where, const char *msg, ...) +{ + char buffer[8*1024]; + va_list ap; + + va_start(ap, msg); + vsnprintf(buffer, sizeof(buffer), msg, ap); + va_end(ap); + log_err(where, buffer); +} + +#if defined(__GNUC__) + __attribute__((format(printf, 2, 3))) +#endif +static void log_infof(const char *where, const char *msg, ...) +{ + if (verbose) { + char buffer[8*1024]; + va_list ap; + + va_start(ap, msg); + vsnprintf(buffer, sizeof(buffer), msg, ap); + va_end(ap); + log_info(where, buffer); + } +} + +#if defined(__GNUC__) + __attribute__((format(printf, 2, 3))) +#endif +static void log_debugf(const char *where, const char *msg, ...) +{ + if (verbose > 1) { + char buffer[8*1024]; + va_list ap; + + va_start(ap, msg); + vsnprintf(buffer, sizeof(buffer), msg, ap); + va_end(ap); + log_debug(where, buffer); + } +} + +static int parse_host_port(const char **phost, uint16_t *pport, + int *pipv6, size_t *pconsumed, + const char *s, size_t len, uint16_t def_port) +{ + size_t i, offset; + char *host = NULL; + int port = 0; + int rv = 1, ipv6 = 0; + + if (!len) + goto leave; + offset = 0; + if (s[offset] == '[') { + ipv6 = 1; + for (i = offset++; i < len; ++i) { + if (s[i] == ']') + break; + } + if (i >= len || i == offset) + goto leave; + host = strndup(s + offset, i - offset); + offset = i + 1; + } + else { + for (i = offset; i < len; ++i) { + if (strchr(":/?#", s[i])) + break; + } + if (i == offset) { + log_debugf("parse_uri", "empty host name in '%.*s", (int)len, s); + goto leave; + } + host = strndup(s + offset, i - offset); + offset = i; + } + if (offset < len && s[offset] == ':') { + port = 0; + ++offset; + for (i = offset; i < len; ++i) { + if (strchr("/?#", s[i])) + break; + if (s[i] < '0' || s[i] > '9') { + log_debugf("parse_uri", "invalid port char '%c'", s[i]); + goto leave; + } + port *= 10; + port += s[i] - '0'; + if (port > 65535) { + log_debugf("parse_uri", "invalid port number '%d'", port); + goto leave; + } + } + offset = i; + } + rv = 0; + +leave: + *phost = rv? NULL : host; + *pport = rv? 0 : (port? (uint16_t)port : def_port); + if (pipv6) + *pipv6 = ipv6; + if (pconsumed) + *pconsumed = offset; + return rv; +} + +struct uri { + const char *scheme; + const char *host; + const char *authority; + const char *path; + uint16_t port; + int ipv6; +}; + +static int parse_uri(struct uri *uri, const char *s, size_t len) +{ + char tmp[8192]; + size_t n, offset = 0; + uint16_t def_port = 0; + int rv = 1; + + /* NOT A REAL URI PARSER */ + memset(uri, 0, sizeof(*uri)); + if (len > 5 && !memcmp("ws://", s, 5)) { + uri->scheme = "ws"; + def_port = 80; + offset = 5; + } + else if (len > 6 && !memcmp("wss://", s, 6)) { + uri->scheme = "wss"; + def_port = 443; + offset = 6; + } + else { + /* not a scheme we process */ + goto leave; + } + + if (parse_host_port(&uri->host, &uri->port, &uri->ipv6, &n, s + offset, + len - offset, def_port)) + goto leave; + offset += n; + + if (uri->port == def_port) + uri->authority = uri->host; + else if (uri->ipv6) { + snprintf(tmp, sizeof(tmp), "[%s]:%u", uri->host, uri->port); + uri->authority = strdup(tmp); + } + else { + snprintf(tmp, sizeof(tmp), "%s:%u", uri->host, uri->port); + uri->authority = strdup(tmp); + } + + if (offset < len) { + uri->path = strndup(s + offset, len - offset); + } + rv = 0; + +leave: + return rv; +} + +static int sock_nonblock_nodelay(int fd) { + int flags, rv; + int val = 1; + + while ((flags = fcntl(fd, F_GETFL, 0)) == -1 && errno == EINTR) + ; + if (flags == -1) { + log_errf("sock_nonblock_nodelay", "fcntl get error %d (%s)", + errno, strerror(errno)); + return -1; + } + while ((rv = fcntl(fd, F_SETFL, flags | O_NONBLOCK)) == -1 && errno == EINTR) + ; + if (rv == -1) { + log_errf("sock_nonblock_nodelay", "fcntl set error %d (%s)", + errno, strerror(errno)); + return -1; + } + rv = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t)sizeof(val)); + if (rv == -1) { + log_errf("sock_nonblock_nodelay", "set nodelay error %d (%s)", + errno, strerror(errno)); + return -1; + } + return 0; +} + +static int open_connection(const char *host, uint16_t port) +{ + char service[NI_MAXSERV]; + struct addrinfo hints; + struct addrinfo *res = NULL, *rp; + int rv, fd = -1; + + memset(&hints, 0, sizeof(hints)); + snprintf(service, sizeof(service), "%u", port); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + rv = getaddrinfo(host, service, &hints, &res); + if (rv) { + log_err("getaddrinfo", gai_strerror(rv)); + goto leave; + } + + for (rp = res; rp; rp = rp->ai_next) { + fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (fd == -1) { + continue; + } + while ((rv = connect(fd, rp->ai_addr, rp->ai_addrlen)) == -1 && + errno == EINTR) + ; + if (!rv) /* connected */ + break; + close(fd); + fd = -1; + } + +leave: + if (res) + freeaddrinfo(res); + return fd; +} + +struct h2_stream; + +#define IO_WANT_NONE 0 +#define IO_WANT_READ 1 +#define IO_WANT_WRITE 2 + +struct h2_session { + const char *server_name; + const char *connect_host; + uint16_t connect_port; + int fd; + nghttp2_session *ngh2; + struct h2_stream *streams; + int aborted; + int want_io; +}; + +typedef void h2_stream_closed_cb(struct h2_stream *stream); +typedef void h2_stream_recv_data(struct h2_stream *stream, + const uint8_t *data, size_t len); + +struct h2_stream { + struct h2_stream *next; + struct uri *uri; + int32_t id; + int fdin; + int http_status; + uint32_t error_code; + unsigned input_closed : 1; + unsigned closed : 1; + unsigned reset : 1; + h2_stream_closed_cb *on_close; + h2_stream_recv_data *on_recv_data; +}; + +static void h2_session_stream_add(struct h2_session *session, + struct h2_stream *stream) +{ + struct h2_stream *s; + for (s = session->streams; s; s = s->next) { + if (s == stream) /* already there? */ + return; + } + stream->next = session->streams; + session->streams = stream; +} + +static void h2_session_stream_remove(struct h2_session *session, + struct h2_stream *stream) +{ + struct h2_stream *s, **pnext; + pnext = &session->streams; + s = session->streams; + while (s) { + if (s == stream) { + *pnext = s->next; + s->next = NULL; + break; + } + pnext = &s->next; + s = s->next; + } +} + +static struct h2_stream *h2_session_stream_get(struct h2_session *session, + int32_t id) +{ + struct h2_stream *s; + for (s = session->streams; s; s = s->next) { + if (s->id == id) + return s; + } + return NULL; +} + +static ssize_t h2_session_send(nghttp2_session *ngh2, const uint8_t *data, + size_t length, int flags, void *user_data) +{ + struct h2_session *session = user_data; + ssize_t nwritten; + (void)ngh2; + (void)flags; + + session->want_io = IO_WANT_NONE; + nwritten = send(session->fd, data, length, 0); + if (nwritten < 0) { + int err = errno; + if ((EWOULDBLOCK == err) || (EAGAIN == err) || + (EINTR == err) || (EINPROGRESS == err)) { + return NGHTTP2_ERR_WOULDBLOCK; + } + log_errf("h2_session_send", "error sending %ld bytes: %d (%s)", + (long)length, err, strerror(err)); + return NGHTTP2_ERR_CALLBACK_FAILURE; + } + return nwritten; +} + +static ssize_t h2_session_recv(nghttp2_session *ngh2, uint8_t *buf, + size_t length, int flags, void *user_data) +{ + struct h2_session *session = user_data; + ssize_t nread; + (void)ngh2; + (void)flags; + + session->want_io = IO_WANT_NONE; + nread = recv(session->fd, buf, length, 0); + if (nread < 0) { + int err = errno; + if ((EWOULDBLOCK == err) || (EAGAIN == err) || (EINTR == err)) { + return NGHTTP2_ERR_WOULDBLOCK; + } + log_errf("h2_session_recv", "error reading %ld bytes: %d (%s)", + (long)length, err, strerror(err)); + return NGHTTP2_ERR_CALLBACK_FAILURE; + } + return nread; +} + +static int h2_session_on_frame_send(nghttp2_session *session, + const nghttp2_frame *frame, + void *user_data) +{ + size_t i; + (void)user_data; + + switch (frame->hd.type) { + case NGHTTP2_HEADERS: + if (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)) { + const nghttp2_nv *nva = frame->headers.nva; + log_infof("frame send", "FRAME[HEADERS, stream=%d", + frame->hd.stream_id); + for (i = 0; i < frame->headers.nvlen; ++i) { + log_infof("frame send", " %.*s: %.*s", + (int)nva[i].namelen, nva[i].name, + (int)nva[i].valuelen, nva[i].value); + } + log_infof("frame send", "]"); + } + break; + case NGHTTP2_DATA: + log_infof("frame send", "FRAME[DATA, stream=%d, length=%d, flags=%d]", + frame->hd.stream_id, (int)frame->hd.length, + (int)frame->hd.flags); + break; + case NGHTTP2_RST_STREAM: + log_infof("frame send", "FRAME[RST, stream=%d]", + frame->hd.stream_id); + break; + case NGHTTP2_WINDOW_UPDATE: + log_infof("frame send", "FRAME[WINDOW_UPDATE, stream=%d]", + frame->hd.stream_id); + break; + case NGHTTP2_GOAWAY: + log_infof("frame send", "FRAME[GOAWAY]"); + break; + } + return 0; +} + +static int h2_session_on_frame_recv(nghttp2_session *ngh2, + const nghttp2_frame *frame, + void *user_data) +{ + (void)user_data; + + switch (frame->hd.type) { + case NGHTTP2_HEADERS: + if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) { + log_infof("frame recv", "FRAME[HEADERS, stream=%d]", + frame->hd.stream_id); + } + break; + case NGHTTP2_DATA: + log_infof("frame recv", "FRAME[DATA, stream=%d, len=%lu, eof=%d]", + frame->hd.stream_id, frame->hd.length, + (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) != 0); + break; + case NGHTTP2_RST_STREAM: + log_infof("frame recv", "FRAME[RST, stream=%d]", + frame->hd.stream_id); + fprintf(stdout, "[%d] RST\n", frame->hd.stream_id); + break; + case NGHTTP2_GOAWAY: + log_infof("frame recv", "FRAME[GOAWAY]"); + break; + } + return 0; +} + +static int h2_session_on_header(nghttp2_session *ngh2, + const nghttp2_frame *frame, + const uint8_t *name, size_t namelen, + const uint8_t *value, size_t valuelen, + uint8_t flags, void *user_data) +{ + struct h2_session *session = user_data; + struct h2_stream *stream; + (void)flags; + (void)user_data; + log_infof("frame recv", "stream=%d, HEADER %.*s: %.*s", + frame->hd.stream_id, (int)namelen, name, + (int)valuelen, value); + stream = h2_session_stream_get(session, frame->hd.stream_id); + if (stream) { + if (namelen == 7 && !strncmp(":status", (const char *)name, namelen)) { + stream->http_status = 0; + if (valuelen < 10) { + char tmp[10], *endp; + memcpy(tmp, value, valuelen); + tmp[valuelen] = 0; + stream->http_status = (int)strtol(tmp, &endp, 10); + } + if (stream->http_status < 100 || stream->http_status >= 600) { + log_errf("on header recv", "stream=%d, invalid :status: %.*s", + frame->hd.stream_id, (int)valuelen, value); + return NGHTTP2_ERR_CALLBACK_FAILURE; + } + else { + fprintf(stdout, "[%d] :status: %d\n", stream->id, + stream->http_status); + } + } + } + return 0; +} + +static int h2_session_on_stream_close(nghttp2_session *ngh2, int32_t stream_id, + uint32_t error_code, void *user_data) +{ + struct h2_session *session = user_data; + struct h2_stream *stream; + + stream = h2_session_stream_get(session, stream_id); + if (stream) { + /* closed known stream */ + stream->error_code = error_code; + stream->closed = 1; + if (error_code) + stream->reset = 1; + if (error_code) { + log_errf("stream close", "stream %d closed with error %d", + stream_id, error_code); + } + + h2_session_stream_remove(session, stream); + if (stream->on_close) + stream->on_close(stream); + /* last one? */ + if (!session->streams) { + int rv; + rv = nghttp2_session_terminate_session(ngh2, NGHTTP2_NO_ERROR); + if (rv) { + log_errf("terminate session", "error %d (%s)", + rv, nghttp2_strerror(rv)); + session->aborted = 1; + } + } + } + return 0; +} + +static int h2_session_on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags, + int32_t stream_id, const uint8_t *data, + size_t len, void *user_data) { + struct h2_session *session = user_data; + struct h2_stream *stream; + + stream = h2_session_stream_get(session, stream_id); + if (stream && stream->on_recv_data) { + stream->on_recv_data(stream, data, len); + } + return 0; +} + +static int h2_session_open(struct h2_session *session, const char *server_name, + const char *host, uint16_t port) +{ + nghttp2_session_callbacks *cbs = NULL; + nghttp2_settings_entry settings[2]; + int rv = -1; + + memset(session, 0, sizeof(*session)); + session->server_name = server_name; + session->connect_host = host; + session->connect_port = port; + /* establish socket */ + session->fd = open_connection(session->connect_host, session->connect_port); + if (session->fd < 0) { + log_errf(cmd, "could not connect to %s:%u", + session->connect_host, session->connect_port); + goto leave; + } + if (sock_nonblock_nodelay(session->fd)) + goto leave; + session->want_io = IO_WANT_NONE; + + log_infof(cmd, "connected to %s via %s:%u", session->server_name, + session->connect_host, session->connect_port); + + rv = nghttp2_session_callbacks_new(&cbs); + if (rv) { + log_errf("setup callbacks", "error_code=%d, msg=%s\n", rv, + nghttp2_strerror(rv)); + rv = -1; + goto leave; + } + /* setup session callbacks */ + nghttp2_session_callbacks_set_send_callback(cbs, h2_session_send); + nghttp2_session_callbacks_set_recv_callback(cbs, h2_session_recv); + nghttp2_session_callbacks_set_on_frame_send_callback( + cbs, h2_session_on_frame_send); + nghttp2_session_callbacks_set_on_frame_recv_callback( + cbs, h2_session_on_frame_recv); + nghttp2_session_callbacks_set_on_header_callback( + cbs, h2_session_on_header); + nghttp2_session_callbacks_set_on_stream_close_callback( + cbs, h2_session_on_stream_close); + nghttp2_session_callbacks_set_on_data_chunk_recv_callback( + cbs, h2_session_on_data_chunk_recv); + /* create the ngh2 session */ + rv = nghttp2_session_client_new(&session->ngh2, cbs, session); + if (rv) { + log_errf("client new", "error_code=%d, msg=%s\n", rv, + nghttp2_strerror(rv)); + rv = -1; + goto leave; + } + /* submit initial settings */ + settings[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS; + settings[0].value = 100; + settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; + settings[1].value = 10 * 1024 * 1024; + + rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings, 2); + if (rv) { + log_errf("submit settings", "error_code=%d, msg=%s\n", rv, + nghttp2_strerror(rv)); + rv = -1; + goto leave; + } + rv = nghttp2_session_set_local_window_size(session->ngh2, NGHTTP2_FLAG_NONE, + 0, 10 * 1024 * 1024); + if (rv) { + log_errf("set connection window size", "error_code=%d, msg=%s\n", rv, + nghttp2_strerror(rv)); + rv = -1; + goto leave; + } + rv = 0; + +leave: + if (cbs) + nghttp2_session_callbacks_del(cbs); + return rv; +} + +static int h2_session_io(struct h2_session *session) { + int rv; + rv = nghttp2_session_recv(session->ngh2); + if (rv) { + log_errf("session recv", "error_code=%d, msg=%s\n", rv, + nghttp2_strerror(rv)); + return 1; + } + rv = nghttp2_session_send(session->ngh2); + if (rv) { + log_errf("session send", "error_code=%d, msg=%s\n", rv, + nghttp2_strerror(rv)); + } + return 0; +} + +struct h2_poll_ctx; +typedef int h2_poll_ev_cb(struct h2_poll_ctx *pctx, struct pollfd *pfd); + +struct h2_poll_ctx { + struct h2_session *session; + struct h2_stream *stream; + h2_poll_ev_cb *on_ev; +}; + +static int h2_session_ev(struct h2_poll_ctx *pctx, struct pollfd *pfd) +{ + if (pfd->revents & (POLLIN | POLLOUT)) { + h2_session_io(pctx->session); + } + else if (pfd->revents & POLLHUP) { + log_errf("session run", "connection closed"); + return -1; + } + else if (pfd->revents & POLLERR) { + log_errf("session run", "connection error"); + return -1; + } + return 0; +} + +static int h2_stream_ev(struct h2_poll_ctx *pctx, struct pollfd *pfd) +{ + if (pfd->revents & (POLLIN | POLLHUP)) { + nghttp2_session_resume_data(pctx->session->ngh2, pctx->stream->id); + } + else if (pfd->revents & (POLLERR)) { + nghttp2_submit_rst_stream(pctx->session->ngh2, NGHTTP2_FLAG_NONE, + pctx->stream->id, NGHTTP2_STREAM_CLOSED); + } + return 0; +} + +static nfds_t h2_session_set_poll(struct h2_session *session, + struct h2_poll_ctx *pollctxs, + struct pollfd *pfds) +{ + nfds_t n = 0; + int want_read, want_write; + struct h2_stream *stream; + + want_read = (nghttp2_session_want_read(session->ngh2) || + session->want_io == IO_WANT_READ); + want_write = (nghttp2_session_want_write(session->ngh2) || + session->want_io == IO_WANT_WRITE); + if (want_read || want_write) { + pollctxs[n].session = session; + pollctxs[n].stream = NULL; + pollctxs[n].on_ev = h2_session_ev; + pfds[n].fd = session->fd; + pfds[n].events = pfds[n].revents = 0; + if (want_read) + pfds[n].events |= (POLLIN | POLLHUP); + if (want_write) + pfds[n].events |= (POLLOUT | POLLERR); + ++n; + } + + for (stream = session->streams; stream; stream = stream->next) { + if (stream->fdin >= 0 && !stream->input_closed && !stream->closed) { + pollctxs[n].session = session; + pollctxs[n].stream = stream; + pollctxs[n].on_ev = h2_stream_ev; + pfds[n].fd = stream->fdin; + pfds[n].revents = 0; + pfds[n].events = (POLLIN | POLLHUP); + ++n; + } + } + return n; +} + +static void h2_session_run(struct h2_session *session) +{ + struct h2_poll_ctx pollctxs[5]; + struct pollfd pfds[5]; + nfds_t npollfds, i; + + npollfds = h2_session_set_poll(session, pollctxs, pfds); + while (npollfds) { + if (poll(pfds, npollfds, -1) == -1) { + log_errf("session run", "poll error %d (%s)", errno, strerror(errno)); + break; + } + for (i = 0; i < npollfds; ++i) { + if (pfds[i].revents) { + if (pollctxs[i].on_ev(&pollctxs[i], &pfds[i])) { + break; + } + } + } + npollfds = h2_session_set_poll(session, pollctxs, pfds); + if (!session->streams) + break; + } +} + +static void h2_session_close(struct h2_session *session) +{ + log_infof(cmd, "closed session to %s:%u", + session->connect_host, session->connect_port); +} + +/* websocket stream */ + +struct ws_stream { + struct h2_stream s; +}; + +static void ws_stream_on_close(struct h2_stream *stream) +{ + log_infof("ws stream", "stream %d closed", stream->id); + if (!stream->reset) + fprintf(stdout, "[%d] EOF\n", stream->id); +} + +static void ws_stream_on_recv_data(struct h2_stream *stream, + const uint8_t *data, size_t len) +{ + size_t i; + + log_infof("ws stream", "stream %d recv %lu data bytes", + stream->id, (unsigned long)len); + for (i = 0; i < len; ++i) { + fprintf(stdout, "%s%02x", (i&0xf)? " " : (i? "\n" : ""), data[i]); + } + fprintf(stdout, "\n"); +} + +static int ws_stream_create(struct ws_stream **pstream, struct uri *uri) +{ + struct ws_stream *stream; + + stream = calloc(1, sizeof(*stream)); + if (!stream) { + log_errf("ws stream create", "out of memory"); + *pstream = NULL; + return -1; + } + stream->s.uri = uri; + stream->s.id = -1; + stream->s.on_close = ws_stream_on_close; + stream->s.on_recv_data = ws_stream_on_recv_data; + *pstream = stream; + return 0; +} + +static ssize_t ws_stream_read_req_body(nghttp2_session *ngh2, + int32_t stream_id, + uint8_t *buf, size_t buflen, + uint32_t *pflags, + nghttp2_data_source *source, + void *user_data) +{ + struct h2_session *session = user_data; + struct ws_stream *stream; + ssize_t nread = 0; + int eof = 0; + + stream = (struct ws_stream *)h2_session_stream_get(session, stream_id); + if (!stream) { + log_errf("stream req body", "stream not known"); + return NGHTTP2_ERR_CALLBACK_FAILURE; + } + + (void)source; + assert(stream->s.fdin >= 0); + nread = read(stream->s.fdin, buf, buflen); + log_debugf("stream req body", "fread(len=%lu) -> %ld", + (unsigned long)buflen, (long)nread); + + if (nread < 0) { + if (errno == EAGAIN) { + nread = 0; + } + else { + log_errf("stream req body", "error on input"); + return NGHTTP2_ERR_CALLBACK_FAILURE; + } + } + else if (nread == 0) { + eof = 1; + stream->s.input_closed = 1; + } + + *pflags = stream->s.input_closed? NGHTTP2_DATA_FLAG_EOF : 0; + if (nread == 0 && !eof) { + return NGHTTP2_ERR_DEFERRED; + } + return nread; +} + +static int ws_stream_submit(struct ws_stream *stream, + struct h2_session *session, + const nghttp2_nv *nva, size_t nvalen, + int fdin) +{ + nghttp2_data_provider provider, *req_body = NULL; + + if (fdin >= 0) { + sock_nonblock_nodelay(fdin); + stream->s.fdin = fdin; + provider.read_callback = ws_stream_read_req_body; + provider.source.ptr = NULL; + req_body = &provider; + } + else { + stream->s.input_closed = 1; + } + + stream->s.id = nghttp2_submit_request(session->ngh2, NULL, nva, nvalen, + req_body, stream); + if (stream->s.id < 0) { + log_errf("ws stream submit", "nghttp2_submit_request: error %d", + stream->s.id); + return -1; + } + + h2_session_stream_add(session, &stream->s); + log_infof("ws stream submit", "stream %d opened for %s%s", + stream->s.id, stream->s.uri->authority, stream->s.uri->path); + return 0; +} + +static void usage(const char *msg) +{ + if(msg) + fprintf(stderr, "%s\n", msg); + fprintf(stderr, + "usage: [options] ws-uri scenario\n" + " run a websocket scenario to the ws-uri, options:\n" + " -c host:port connect to host:port\n" + " -v increase verbosity\n" + "scenarios are:\n" + " * fail-proto: CONNECT using wrong :protocol\n" + " * miss-authority: CONNECT without :authority header\n" + " * miss-path: CONNECT without :path header\n" + " * miss-scheme: CONNECT without :scheme header\n" + " * miss-version: CONNECT without sec-webSocket-version header\n" + " * ws-empty: open valid websocket, do not send anything\n" + ); +} + +int main(int argc, char *argv[]) +{ + const char *host = NULL, *scenario; + uint16_t port = 80; + struct uri uri; + struct h2_session session; + struct ws_stream *stream; + char ch; + + cmd = argv[0]; + while((ch = getopt(argc, argv, "c:vh")) != -1) { + switch(ch) { + case 'c': + if (parse_host_port(&host, &port, NULL, NULL, + optarg, strlen(optarg), 80)) { + log_errf(cmd, "could not parse connect '%s'", optarg); + return 1; + } + break; + case 'h': + usage(NULL); + return 2; + break; + case 'v': + ++verbose; + break; + default: + usage("invalid option"); + return 1; + } + } + argc -= optind; + argv += optind; + + if (argc < 1) { + usage("need URL"); + return 1; + } + if (argc < 2) { + usage("need scenario"); + return 1; + } + if (parse_uri(&uri, argv[0], strlen(argv[0]))) { + log_errf(cmd, "could not parse uri '%s'", argv[0]); + return 1; + } + log_debugf(cmd, "normalized uri: %s://%s:%u%s", uri.scheme, uri.host, + uri.port, uri.path? uri.path : ""); + scenario = argv[1]; + + if (!host) { + host = uri.host; + port = uri.port; + } + + if (h2_session_open(&session, uri.host, host, port)) + return 1; + + if (ws_stream_create(&stream, &uri)) + return 1; + + if (!strcmp(scenario, "ws-stdin")) { + const nghttp2_nv nva[] = { + MAKE_NV(":method", "CONNECT"), + MAKE_NV_CS(":path", stream->s.uri->path), + MAKE_NV_CS(":scheme", "http"), + MAKE_NV_CS(":authority", stream->s.uri->authority), + MAKE_NV_CS(":protocol", "websocket"), + MAKE_NV("accept", "*/*"), + MAKE_NV("user-agent", "mod_h2/h2ws-test"), + MAKE_NV("sec-webSocket-version", "13"), + MAKE_NV("sec-webSocket-protocol", "chat"), + }; + if (ws_stream_submit(stream, &session, + nva, sizeof(nva) / sizeof(nva[0]), 0)) + return 1; + } + else if (!strcmp(scenario, "fail-proto")) { + const nghttp2_nv nva[] = { + MAKE_NV(":method", "CONNECT"), + MAKE_NV_CS(":path", stream->s.uri->path), + MAKE_NV_CS(":scheme", "http"), + MAKE_NV_CS(":authority", stream->s.uri->authority), + MAKE_NV_CS(":protocol", "websockets"), + MAKE_NV("accept", "*/*"), + MAKE_NV("user-agent", "mod_h2/h2ws-test"), + MAKE_NV("sec-webSocket-version", "13"), + MAKE_NV("sec-webSocket-protocol", "chat"), + }; + if (ws_stream_submit(stream, &session, + nva, sizeof(nva) / sizeof(nva[0]), -1)) + return 1; + } + else if (!strcmp(scenario, "miss-version")) { + const nghttp2_nv nva[] = { + MAKE_NV(":method", "CONNECT"), + MAKE_NV_CS(":path", stream->s.uri->path), + MAKE_NV_CS(":scheme", "http"), + MAKE_NV_CS(":authority", stream->s.uri->authority), + MAKE_NV_CS(":protocol", "websocket"), + MAKE_NV("accept", "*/*"), + MAKE_NV("user-agent", "mod_h2/h2ws-test"), + MAKE_NV("sec-webSocket-protocol", "chat"), + }; + if (ws_stream_submit(stream, &session, + nva, sizeof(nva) / sizeof(nva[0]), -1)) + return 1; + } + else if (!strcmp(scenario, "miss-path")) { + const nghttp2_nv nva[] = { + MAKE_NV(":method", "CONNECT"), + MAKE_NV_CS(":scheme", "http"), + MAKE_NV_CS(":authority", stream->s.uri->authority), + MAKE_NV_CS(":protocol", "websocket"), + MAKE_NV("accept", "*/*"), + MAKE_NV("user-agent", "mod_h2/h2ws-test"), + MAKE_NV("sec-webSocket-version", "13"), + MAKE_NV("sec-webSocket-protocol", "chat"), + }; + if (ws_stream_submit(stream, &session, + nva, sizeof(nva) / sizeof(nva[0]), -1)) + return 1; + } + else if (!strcmp(scenario, "miss-scheme")) { + const nghttp2_nv nva[] = { + MAKE_NV(":method", "CONNECT"), + MAKE_NV_CS(":path", stream->s.uri->path), + MAKE_NV_CS(":authority", stream->s.uri->authority), + MAKE_NV_CS(":protocol", "websocket"), + MAKE_NV("accept", "*/*"), + MAKE_NV("user-agent", "mod_h2/h2ws-test"), + MAKE_NV("sec-webSocket-version", "13"), + MAKE_NV("sec-webSocket-protocol", "chat"), + }; + if (ws_stream_submit(stream, &session, + nva, sizeof(nva) / sizeof(nva[0]), -1)) + return 1; + } + else if (!strcmp(scenario, "miss-authority")) { + const nghttp2_nv nva[] = { + MAKE_NV(":method", "CONNECT"), + MAKE_NV_CS(":path", stream->s.uri->path), + MAKE_NV_CS(":scheme", "http"), + MAKE_NV_CS(":protocol", "websocket"), + MAKE_NV("accept", "*/*"), + MAKE_NV("user-agent", "mod_h2/h2ws-test"), + MAKE_NV("sec-webSocket-version", "13"), + MAKE_NV("sec-webSocket-protocol", "chat"), + }; + if (ws_stream_submit(stream, &session, + nva, sizeof(nva) / sizeof(nva[0]), -1)) + return 1; + } + else { + log_errf(cmd, "unknown scenario: %s", scenario); + return 1; + } + + h2_session_run(&session); + h2_session_close(&session); + return 0; +} |