diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /web/server/h2o/libh2o/lib/handler/mruby | |
parent | Initial commit. (diff) | |
download | netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.tar.xz netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'web/server/h2o/libh2o/lib/handler/mruby')
6 files changed, 1051 insertions, 0 deletions
diff --git a/web/server/h2o/libh2o/lib/handler/mruby/chunked.c b/web/server/h2o/libh2o/lib/handler/mruby/chunked.c new file mode 100644 index 00000000..28e3ae43 --- /dev/null +++ b/web/server/h2o/libh2o/lib/handler/mruby/chunked.c @@ -0,0 +1,270 @@ +/* + * Copyright (c) 2015-2016 DeNA Co., Ltd., Kazuho Oku + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <stdlib.h> +#include <mruby.h> +#include <mruby/array.h> +#include <mruby/error.h> +#include <mruby/string.h> +#include "h2o/mruby_.h" +#include "embedded.c.h" + +struct st_h2o_mruby_chunked_t { + h2o_doublebuffer_t sending; + size_t bytes_left; /* SIZE_MAX indicates that the number is undermined */ + enum { H2O_MRUBY_CHUNKED_TYPE_CALLBACK, H2O_MRUBY_CHUNKED_TYPE_SHORTCUT } type; + union { + struct { + h2o_buffer_t *receiving; + mrb_value body_obj; /* becomes nil on eos */ + } callback; + struct { + h2o_mruby_http_request_context_t *client; + h2o_buffer_t *remaining; + } shortcut; + }; +}; + +static void do_send(h2o_mruby_generator_t *generator, h2o_buffer_t **input, int is_final) +{ + h2o_mruby_chunked_t *chunked = generator->chunked; + + assert(chunked->sending.bytes_inflight == 0); + + h2o_iovec_t buf = h2o_doublebuffer_prepare(&chunked->sending, input, generator->req->preferred_chunk_size); + size_t bufcnt = 1; + + if (is_final && buf.len == chunked->sending.buf->size && (*input)->size == 0) { + if (buf.len == 0) + --bufcnt; + /* terminate the H1 connection if the length of content served did not match the value sent in content-length header */ + if (chunked->bytes_left != SIZE_MAX && chunked->bytes_left != 0) + generator->req->http1_is_persistent = 0; + } else { + if (buf.len == 0) + return; + is_final = 0; + } + + h2o_send(generator->req, &buf, bufcnt, is_final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS); +} + +static void do_proceed(h2o_generator_t *_generator, h2o_req_t *req) +{ + h2o_mruby_generator_t *generator = (void *)_generator; + h2o_mruby_chunked_t *chunked = generator->chunked; + h2o_buffer_t **input; + int is_final; + + h2o_doublebuffer_consume(&chunked->sending); + + switch (chunked->type) { + case H2O_MRUBY_CHUNKED_TYPE_CALLBACK: + input = &chunked->callback.receiving; + is_final = mrb_nil_p(chunked->callback.body_obj); + break; + case H2O_MRUBY_CHUNKED_TYPE_SHORTCUT: + if (chunked->shortcut.client != NULL) { + input = h2o_mruby_http_peek_content(chunked->shortcut.client, &is_final); + assert(!is_final); + } else { + input = &chunked->shortcut.remaining; + is_final = 1; + } + break; + default: + h2o_fatal("unexpected type"); + break; + } + + do_send(generator, input, is_final); +} + +static void on_shortcut_notify(h2o_mruby_generator_t *generator) +{ + h2o_mruby_chunked_t *chunked = generator->chunked; + int is_final; + h2o_buffer_t **input = h2o_mruby_http_peek_content(chunked->shortcut.client, &is_final); + + if (chunked->bytes_left != SIZE_MAX) { + if (chunked->bytes_left < (*input)->size) + (*input)->size = chunked->bytes_left; /* trim data too long */ + chunked->bytes_left -= (*input)->size; + } + + /* if final, steal socket input buffer to shortcut.remaining, and reset pointer to client */ + if (is_final) { + chunked->shortcut.remaining = *input; + h2o_buffer_init(input, &h2o_socket_buffer_prototype); + input = &chunked->shortcut.remaining; + chunked->shortcut.client = NULL; + } + + if (chunked->sending.bytes_inflight == 0) + do_send(generator, input, is_final); +} + +static void close_body_obj(h2o_mruby_generator_t *generator) +{ + h2o_mruby_chunked_t *chunked = generator->chunked; + mrb_state *mrb = generator->ctx->shared->mrb; + + if (!mrb_nil_p(chunked->callback.body_obj)) { + /* call close and throw away error */ + if (mrb_respond_to(mrb, chunked->callback.body_obj, generator->ctx->shared->symbols.sym_close)) + mrb_funcall_argv(mrb, chunked->callback.body_obj, generator->ctx->shared->symbols.sym_close, 0, NULL); + mrb->exc = NULL; + mrb_gc_unregister(mrb, chunked->callback.body_obj); + chunked->callback.body_obj = mrb_nil_value(); + } +} + +mrb_value h2o_mruby_send_chunked_init(h2o_mruby_generator_t *generator, mrb_value body) +{ + h2o_mruby_chunked_t *chunked = h2o_mem_alloc_pool(&generator->req->pool, sizeof(*chunked)); + h2o_doublebuffer_init(&chunked->sending, &h2o_socket_buffer_prototype); + chunked->bytes_left = h2o_memis(generator->req->method.base, generator->req->method.len, H2O_STRLIT("HEAD")) + ? 0 + : generator->req->res.content_length; + generator->super.proceed = do_proceed; + generator->chunked = chunked; + + if ((chunked->shortcut.client = h2o_mruby_http_set_shortcut(generator->ctx->shared->mrb, body, on_shortcut_notify)) != NULL) { + chunked->type = H2O_MRUBY_CHUNKED_TYPE_SHORTCUT; + chunked->shortcut.remaining = NULL; + on_shortcut_notify(generator); + return mrb_nil_value(); + } else { + chunked->type = H2O_MRUBY_CHUNKED_TYPE_CALLBACK; + h2o_buffer_init(&chunked->callback.receiving, &h2o_socket_buffer_prototype); + mrb_gc_register(generator->ctx->shared->mrb, body); + chunked->callback.body_obj = body; + return mrb_ary_entry(generator->ctx->shared->constants, H2O_MRUBY_CHUNKED_PROC_EACH_TO_FIBER); + } +} + +void h2o_mruby_send_chunked_dispose(h2o_mruby_generator_t *generator) +{ + h2o_mruby_chunked_t *chunked = generator->chunked; + + h2o_doublebuffer_dispose(&chunked->sending); + + switch (chunked->type) { + case H2O_MRUBY_CHUNKED_TYPE_CALLBACK: + h2o_buffer_dispose(&chunked->callback.receiving); + close_body_obj(generator); + break; + case H2O_MRUBY_CHUNKED_TYPE_SHORTCUT: + /* note: no need to free reference from chunked->client, since it is disposed at the same moment */ + if (chunked->shortcut.remaining != NULL) + h2o_buffer_dispose(&chunked->shortcut.remaining); + break; + } +} + +static mrb_value check_precond(mrb_state *mrb, h2o_mruby_generator_t *generator) +{ + if (generator == NULL || generator->req == NULL) + return mrb_exc_new_str_lit(mrb, E_RUNTIME_ERROR, "downstream HTTP closed"); + if (generator->req->_generator == NULL) + return mrb_exc_new_str_lit(mrb, E_RUNTIME_ERROR, "cannot send chunk before sending headers"); + return mrb_nil_value(); +} + +static mrb_value send_chunked_method(mrb_state *mrb, mrb_value self) +{ + h2o_mruby_generator_t *generator = h2o_mruby_current_generator; + const char *s; + mrb_int len; + + /* parse args */ + mrb_get_args(mrb, "s", &s, &len); + + { /* precond check */ + mrb_value exc = check_precond(mrb, generator); + if (!mrb_nil_p(exc)) + mrb_exc_raise(mrb, exc); + } + + /* append to send buffer, and send out immediately if necessary */ + if (len != 0) { + h2o_mruby_chunked_t *chunked = generator->chunked; + if (chunked->bytes_left != SIZE_MAX) { + if (len > chunked->bytes_left) + len = chunked->bytes_left; + chunked->bytes_left -= len; + } + if (len != 0) { + h2o_buffer_reserve(&chunked->callback.receiving, len); + memcpy(chunked->callback.receiving->bytes + chunked->callback.receiving->size, s, len); + chunked->callback.receiving->size += len; + if (chunked->sending.bytes_inflight == 0) + do_send(generator, &chunked->callback.receiving, 0); + } + } + + return mrb_nil_value(); +} + +mrb_value h2o_mruby_send_chunked_eos_callback(h2o_mruby_generator_t *generator, mrb_value receiver, mrb_value input, + int *next_action) +{ + mrb_state *mrb = generator->ctx->shared->mrb; + + { /* precond check */ + mrb_value exc = check_precond(mrb, generator); + if (!mrb_nil_p(exc)) + return exc; + } + + h2o_mruby_send_chunked_close(generator); + + *next_action = H2O_MRUBY_CALLBACK_NEXT_ACTION_STOP; + return mrb_nil_value(); +} + +void h2o_mruby_send_chunked_close(h2o_mruby_generator_t *generator) +{ + h2o_mruby_chunked_t *chunked = generator->chunked; + + /* run_fiber will never be called once we enter the fast path, and therefore this function will never get called in that case */ + assert(chunked->type == H2O_MRUBY_CHUNKED_TYPE_CALLBACK); + + close_body_obj(generator); + + if (chunked->sending.bytes_inflight == 0) + do_send(generator, &chunked->callback.receiving, 1); +} + +void h2o_mruby_send_chunked_init_context(h2o_mruby_shared_context_t *shared_ctx) +{ + mrb_state *mrb = shared_ctx->mrb; + + h2o_mruby_eval_expr(mrb, H2O_MRUBY_CODE_CHUNKED); + h2o_mruby_assert(mrb); + + mrb_define_method(mrb, mrb->kernel_module, "_h2o_send_chunk", send_chunked_method, MRB_ARGS_ARG(1, 0)); + h2o_mruby_define_callback(mrb, "_h2o_send_chunk_eos", H2O_MRUBY_CALLBACK_ID_SEND_CHUNKED_EOS); + + mrb_ary_set(mrb, shared_ctx->constants, H2O_MRUBY_CHUNKED_PROC_EACH_TO_FIBER, + mrb_funcall(mrb, mrb_top_self(mrb), "_h2o_chunked_proc_each_to_fiber", 0)); + h2o_mruby_assert(mrb); +} diff --git a/web/server/h2o/libh2o/lib/handler/mruby/embedded.c.h b/web/server/h2o/libh2o/lib/handler/mruby/embedded.c.h new file mode 100644 index 00000000..db4bb232 --- /dev/null +++ b/web/server/h2o/libh2o/lib/handler/mruby/embedded.c.h @@ -0,0 +1,111 @@ +/* + * DO NOT EDIT! generated by embed_mruby_code.pl + * Please refer to the respective source files for copyright information. + */ + +/* lib/handler/mruby/embedded/core.rb */ +#define H2O_MRUBY_CODE_CORE \ + "module Kernel\n" \ + " def _h2o_define_callback(name, id)\n" \ + " Kernel.define_method(name) do |*args|\n" \ + " ret = Fiber.yield([ id, _h2o_create_resumer(), args ])\n" \ + " if ret.kind_of? Exception\n" \ + " raise ret\n" \ + " end\n" \ + " ret\n" \ + " end\n" \ + " end\n" \ + " def _h2o_create_resumer()\n" \ + " me = Fiber.current\n" \ + " Proc.new do |v|\n" \ + " me.resume(v)\n" \ + " end\n" \ + " end\n" \ + " def _h2o_proc_each_to_array()\n" \ + " Proc.new do |o|\n" \ + " a = []\n" \ + " o.each do |x|\n" \ + " a << x\n" \ + " end\n" \ + " a\n" \ + " end\n" \ + " end\n" \ + " def _h2o_proc_app_to_fiber()\n" \ + " Proc.new do |app|\n" \ + " cached = nil\n" \ + " Proc.new do |req|\n" \ + " fiber = cached\n" \ + " cached = nil\n" \ + " if !fiber\n" \ + " fiber = Fiber.new do\n" \ + " self_fiber = Fiber.current\n" \ + " req = Fiber.yield\n" \ + " while 1\n" \ + " begin\n" \ + " while 1\n" \ + " resp = app.call(req)\n" \ + " cached = self_fiber\n" \ + " req = Fiber.yield(resp)\n" \ + " end\n" \ + " rescue => e\n" \ + " cached = self_fiber\n" \ + " req = Fiber.yield([-1, e])\n" \ + " end\n" \ + " end\n" \ + " end\n" \ + " fiber.resume\n" \ + " end\n" \ + " fiber.resume(req)\n" \ + " end\n" \ + " end\n" \ + " end\n" \ + "end\n" + +/* lib/handler/mruby/embedded/http_request.rb */ +#define H2O_MRUBY_CODE_HTTP_REQUEST \ + "module H2O\n" \ + " class HttpRequest\n" \ + " def join\n" \ + " if !@resp\n" \ + " @resp = _h2o__http_join_response(self)\n" \ + " end\n" \ + " @resp\n" \ + " end\n" \ + " def _set_response(resp)\n" \ + " @resp = resp\n" \ + " end\n" \ + " end\n" \ + " class HttpInputStream\n" \ + " def each\n" \ + " while c = _h2o__http_fetch_chunk(self)\n" \ + " yield c\n" \ + " end\n" \ + " end\n" \ + " def join\n" \ + " s = \"\"\n" \ + " each do |c|\n" \ + " s << c\n" \ + " end\n" \ + " s\n" \ + " end\n" \ + " class Empty < HttpInputStream\n" \ + " def each; end\n" \ + " end\n" \ + " end\n" \ + "end\n" + +/* lib/handler/mruby/embedded/chunked.rb */ +#define H2O_MRUBY_CODE_CHUNKED \ + "module Kernel\n" \ + " def _h2o_chunked_proc_each_to_fiber()\n" \ + " Proc.new do |src|\n" \ + " fiber = Fiber.new do\n" \ + " src.each do |chunk|\n" \ + " _h2o_send_chunk(chunk)\n" \ + " end\n" \ + " _h2o_send_chunk_eos()\n" \ + " end\n" \ + " fiber.resume\n" \ + " end\n" \ + " end\n" \ + "end\n" diff --git a/web/server/h2o/libh2o/lib/handler/mruby/embedded/chunked.rb b/web/server/h2o/libh2o/lib/handler/mruby/embedded/chunked.rb new file mode 100644 index 00000000..ff4e578f --- /dev/null +++ b/web/server/h2o/libh2o/lib/handler/mruby/embedded/chunked.rb @@ -0,0 +1,35 @@ +# Copyright (c) 2014 DeNA Co., Ltd. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +module Kernel + + def _h2o_chunked_proc_each_to_fiber() + Proc.new do |src| + fiber = Fiber.new do + src.each do |chunk| + _h2o_send_chunk(chunk) + end + _h2o_send_chunk_eos() + end + fiber.resume + end + end + +end diff --git a/web/server/h2o/libh2o/lib/handler/mruby/embedded/core.rb b/web/server/h2o/libh2o/lib/handler/mruby/embedded/core.rb new file mode 100644 index 00000000..e62583df --- /dev/null +++ b/web/server/h2o/libh2o/lib/handler/mruby/embedded/core.rb @@ -0,0 +1,81 @@ +# Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku, Ryosuke Matsumoto, +# Masayoshi Takahashi +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +module Kernel + + def _h2o_define_callback(name, id) + Kernel.define_method(name) do |*args| + ret = Fiber.yield([ id, _h2o_create_resumer(), args ]) + if ret.kind_of? Exception + raise ret + end + ret + end + end + + def _h2o_create_resumer() + me = Fiber.current + Proc.new do |v| + me.resume(v) + end + end + + def _h2o_proc_each_to_array() + Proc.new do |o| + a = [] + o.each do |x| + a << x + end + a + end + end + + def _h2o_proc_app_to_fiber() + Proc.new do |app| + cached = nil + Proc.new do |req| + fiber = cached + cached = nil + if !fiber + fiber = Fiber.new do + self_fiber = Fiber.current + req = Fiber.yield + while 1 + begin + while 1 + resp = app.call(req) + cached = self_fiber + req = Fiber.yield(resp) + end + rescue => e + cached = self_fiber + req = Fiber.yield([-1, e]) + end + end + end + fiber.resume + end + fiber.resume(req) + end + end + end + +end diff --git a/web/server/h2o/libh2o/lib/handler/mruby/embedded/http_request.rb b/web/server/h2o/libh2o/lib/handler/mruby/embedded/http_request.rb new file mode 100644 index 00000000..3f3247a3 --- /dev/null +++ b/web/server/h2o/libh2o/lib/handler/mruby/embedded/http_request.rb @@ -0,0 +1,54 @@ +# Copyright (c) 2015-2016 DeNA Co., Ltd., Kazuho Oku +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +module H2O + + class HttpRequest + def join + if !@resp + @resp = _h2o__http_join_response(self) + end + @resp + end + def _set_response(resp) + @resp = resp + end + end + + class HttpInputStream + def each + while c = _h2o__http_fetch_chunk(self) + yield c + end + end + def join + s = "" + each do |c| + s << c + end + s + end + + class Empty < HttpInputStream + def each; end + end + end + +end diff --git a/web/server/h2o/libh2o/lib/handler/mruby/http_request.c b/web/server/h2o/libh2o/lib/handler/mruby/http_request.c new file mode 100644 index 00000000..964d03f1 --- /dev/null +++ b/web/server/h2o/libh2o/lib/handler/mruby/http_request.c @@ -0,0 +1,500 @@ +/* + * Copyright (c) 2015-2016 DeNA Co., Ltd., Kazuho Oku + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#include <mruby.h> +#include <mruby/array.h> +#include <mruby/error.h> +#include <mruby/hash.h> +#include <mruby/string.h> +#include <mruby_input_stream.h> +#include "h2o/mruby_.h" +#include "embedded.c.h" + +struct st_h2o_mruby_http_request_context_t { + h2o_mruby_generator_t *generator; + h2o_http1client_t *client; + mrb_value receiver; + struct { + h2o_buffer_t *buf; + h2o_iovec_t body; /* body.base != NULL indicates that post content exists (and the length MAY be zero) */ + unsigned method_is_head : 1; + unsigned has_transfer_encoding : 1; + } req; + struct { + h2o_buffer_t *after_closed; /* when client becomes NULL, rest of the data will be stored to this pointer */ + int has_content; + } resp; + struct { + mrb_value request; + mrb_value input_stream; + } refs; + void (*shortcut_notify_cb)(h2o_mruby_generator_t *generator); +}; + +static void on_gc_dispose_request(mrb_state *mrb, void *_ctx) +{ + struct st_h2o_mruby_http_request_context_t *ctx = _ctx; + if (ctx != NULL) + ctx->refs.request = mrb_nil_value(); +} + +const static struct mrb_data_type request_type = {"http_request", on_gc_dispose_request}; + +static void on_gc_dispose_input_stream(mrb_state *mrb, void *_ctx) +{ + struct st_h2o_mruby_http_request_context_t *ctx = _ctx; + if (ctx != NULL) + ctx->refs.input_stream = mrb_nil_value(); +} + +const static struct mrb_data_type input_stream_type = {"http_input_stream", on_gc_dispose_input_stream}; + +static mrb_value create_downstream_closed_exception(mrb_state *mrb) +{ + return mrb_exc_new_str_lit(mrb, E_RUNTIME_ERROR, "downstream HTTP closed"); +} + +static mrb_value detach_receiver(struct st_h2o_mruby_http_request_context_t *ctx) +{ + mrb_value ret = ctx->receiver; + assert(!mrb_nil_p(ret)); + ctx->receiver = mrb_nil_value(); + return ret; +} + +static void on_dispose(void *_ctx) +{ + struct st_h2o_mruby_http_request_context_t *ctx = _ctx; + + /* clear the refs */ + if (ctx->client != NULL) { + h2o_http1client_cancel(ctx->client); + ctx->client = NULL; + } + if (!mrb_nil_p(ctx->refs.request)) + DATA_PTR(ctx->refs.request) = NULL; + if (!mrb_nil_p(ctx->refs.input_stream)) + DATA_PTR(ctx->refs.input_stream) = NULL; + + /* clear bufs */ + h2o_buffer_dispose(&ctx->req.buf); + h2o_buffer_dispose(&ctx->resp.after_closed); + + /* notify the app, if it is waiting to hear from us */ + if (!mrb_nil_p(ctx->receiver)) { + mrb_state *mrb = ctx->generator->ctx->shared->mrb; + int gc_arena = mrb_gc_arena_save(mrb); + h2o_mruby_run_fiber(ctx->generator, detach_receiver(ctx), create_downstream_closed_exception(mrb), NULL); + mrb_gc_arena_restore(mrb, gc_arena); + } +} + +static void post_response(struct st_h2o_mruby_http_request_context_t *ctx, int status, const h2o_header_t *headers_sorted, + size_t num_headers) +{ + mrb_state *mrb = ctx->generator->ctx->shared->mrb; + int gc_arena = mrb_gc_arena_save(mrb); + size_t i; + + mrb_value resp = mrb_ary_new_capa(mrb, 3); + + /* set status */ + mrb_ary_set(mrb, resp, 0, mrb_fixnum_value(status)); + + /* set headers */ + mrb_value headers_hash = mrb_hash_new_capa(mrb, (int)num_headers); + for (i = 0; i < num_headers; ++i) { + /* skip the headers, we determine the eos! */ + if (h2o_memis(headers_sorted[i].name, headers_sorted[i].name->len, H2O_STRLIT("content-length")) || + h2o_memis(headers_sorted[i].name, headers_sorted[i].name->len, H2O_STRLIT("transfer-encoding"))) + continue; + /* build and set the hash entry */ + mrb_value k = mrb_str_new(mrb, headers_sorted[i].name->base, headers_sorted[i].name->len); + mrb_value v = mrb_str_new(mrb, headers_sorted[i].value.base, headers_sorted[i].value.len); + while (i + 1 < num_headers && h2o_memis(headers_sorted[i].name->base, headers_sorted[i].name->len, + headers_sorted[i + 1].name->base, headers_sorted[i + 1].name->len)) { + ++i; + v = mrb_str_cat_lit(mrb, v, "\n"); + v = mrb_str_cat(mrb, v, headers_sorted[i].value.base, headers_sorted[i].value.len); + } + mrb_hash_set(mrb, headers_hash, k, v); + } + mrb_ary_set(mrb, resp, 1, headers_hash); + + /* set input stream */ + assert(mrb_nil_p(ctx->refs.input_stream)); + mrb_value input_stream_class; + if (ctx->req.method_is_head || status == 101 || status == 204 || status == 304) { + input_stream_class = mrb_ary_entry(ctx->generator->ctx->shared->constants, H2O_MRUBY_HTTP_EMPTY_INPUT_STREAM_CLASS); + } else { + input_stream_class = mrb_ary_entry(ctx->generator->ctx->shared->constants, H2O_MRUBY_HTTP_INPUT_STREAM_CLASS); + } + ctx->refs.input_stream = h2o_mruby_create_data_instance(mrb, input_stream_class, ctx, &input_stream_type); + mrb_ary_set(mrb, resp, 2, ctx->refs.input_stream); + + if (mrb_nil_p(ctx->receiver)) { + /* is async */ + mrb_funcall(mrb, ctx->refs.request, "_set_response", 1, resp); + if (mrb->exc != NULL) { + fprintf(stderr, "_set_response failed\n"); + abort(); + } + } else { + /* send response to the waiting receiver */ + h2o_mruby_run_fiber(ctx->generator, detach_receiver(ctx), resp, NULL); + } + + mrb_gc_arena_restore(mrb, gc_arena); +} + +static void post_error(struct st_h2o_mruby_http_request_context_t *ctx, const char *errstr) +{ + static const h2o_header_t headers_sorted[] = { + {&H2O_TOKEN_CONTENT_TYPE->buf, NULL, {H2O_STRLIT("text/plain; charset=utf-8")}}, + }; + + ctx->client = NULL; + size_t errstr_len = strlen(errstr); + h2o_buffer_reserve(&ctx->resp.after_closed, errstr_len); + memcpy(ctx->resp.after_closed->bytes + ctx->resp.after_closed->size, errstr, errstr_len); + ctx->resp.after_closed->size += errstr_len; + ctx->resp.has_content = 1; + + post_response(ctx, 500, headers_sorted, sizeof(headers_sorted) / sizeof(headers_sorted[0])); +} + +static mrb_value build_chunk(struct st_h2o_mruby_http_request_context_t *ctx) +{ + mrb_value chunk; + + assert(ctx->resp.has_content); + + if (ctx->client != NULL) { + assert(ctx->client->sock->input->size != 0); + chunk = mrb_str_new(ctx->generator->ctx->shared->mrb, ctx->client->sock->input->bytes, ctx->client->sock->input->size); + h2o_buffer_consume(&ctx->client->sock->input, ctx->client->sock->input->size); + ctx->resp.has_content = 0; + } else { + if (ctx->resp.after_closed->size == 0) { + chunk = mrb_nil_value(); + } else { + chunk = mrb_str_new(ctx->generator->ctx->shared->mrb, ctx->resp.after_closed->bytes, ctx->resp.after_closed->size); + h2o_buffer_consume(&ctx->resp.after_closed, ctx->resp.after_closed->size); + } + /* has_content is retained as true, so that repeated calls will return nil immediately */ + } + + return chunk; +} + +static int on_body(h2o_http1client_t *client, const char *errstr) +{ + struct st_h2o_mruby_http_request_context_t *ctx = client->data; + + if (errstr != NULL) { + h2o_buffer_t *tmp = ctx->resp.after_closed; + ctx->resp.after_closed = client->sock->input; + client->sock->input = tmp; + ctx->client = NULL; + ctx->resp.has_content = 1; + } else if (client->sock->input->size != 0) { + ctx->resp.has_content = 1; + } + + if (ctx->resp.has_content) { + if (ctx->shortcut_notify_cb != NULL) { + ctx->shortcut_notify_cb(ctx->generator); + } else if (!mrb_nil_p(ctx->receiver)) { + int gc_arena = mrb_gc_arena_save(ctx->generator->ctx->shared->mrb); + mrb_value chunk = build_chunk(ctx); + h2o_mruby_run_fiber(ctx->generator, detach_receiver(ctx), chunk, NULL); + mrb_gc_arena_restore(ctx->generator->ctx->shared->mrb, gc_arena); + } + } + return 0; +} + +static int headers_sort_cb(const void *_x, const void *_y) +{ + const h2o_header_t *x = _x, *y = _y; + + if (x->name->len < y->name->len) + return -1; + if (x->name->len > y->name->len) + return 1; + return memcmp(x->name->base, y->name->base, x->name->len); +} + +static h2o_http1client_body_cb on_head(h2o_http1client_t *client, const char *errstr, int minor_version, int status, + h2o_iovec_t msg, h2o_header_t *headers, size_t num_headers, int rlen) +{ + struct st_h2o_mruby_http_request_context_t *ctx = client->data; + + if (errstr != NULL) { + if (errstr != h2o_http1client_error_is_eos) { + /* error */ + post_error(ctx, errstr); + return NULL; + } + /* closed without body */ + ctx->client = NULL; + } + + qsort(headers, num_headers, sizeof(headers[0]), headers_sort_cb); + post_response(ctx, status, headers, num_headers); + return on_body; +} + +static h2o_http1client_head_cb on_connect(h2o_http1client_t *client, const char *errstr, h2o_iovec_t **reqbufs, size_t *reqbufcnt, + int *method_is_head) +{ + struct st_h2o_mruby_http_request_context_t *ctx = client->data; + + if (errstr != NULL) { + post_error(ctx, errstr); + return NULL; + } + + *reqbufs = h2o_mem_alloc_pool(&ctx->generator->req->pool, sizeof(**reqbufs) * 2); + **reqbufs = h2o_iovec_init(ctx->req.buf->bytes, ctx->req.buf->size); + *reqbufcnt = 1; + if (ctx->req.body.base != NULL) + (*reqbufs)[(*reqbufcnt)++] = ctx->req.body; + *method_is_head = ctx->req.method_is_head; + return on_head; +} + +static inline void append_to_buffer(h2o_buffer_t **buf, const void *src, size_t len) +{ + memcpy((*buf)->bytes + (*buf)->size, src, len); + (*buf)->size += len; +} + +static int flatten_request_header(h2o_mruby_context_t *handler_ctx, h2o_iovec_t name, h2o_iovec_t value, void *_ctx) +{ + struct st_h2o_mruby_http_request_context_t *ctx = _ctx; + + /* ignore certain headers */ + if (h2o_lcstris(name.base, name.len, H2O_STRLIT("content-length")) || + h2o_lcstris(name.base, name.len, H2O_STRLIT("connection")) || h2o_lcstris(name.base, name.len, H2O_STRLIT("host"))) + return 0; + + /* mark the existence of transfer-encoding in order to prevent us from adding content-length header */ + if (h2o_lcstris(name.base, name.len, H2O_STRLIT("transfer-encoding"))) + ctx->req.has_transfer_encoding = 1; + + h2o_buffer_reserve(&ctx->req.buf, name.len + value.len + sizeof(": \r\n") - 1); + append_to_buffer(&ctx->req.buf, name.base, name.len); + append_to_buffer(&ctx->req.buf, H2O_STRLIT(": ")); + append_to_buffer(&ctx->req.buf, value.base, value.len); + append_to_buffer(&ctx->req.buf, H2O_STRLIT("\r\n")); + + return 0; +} + +static mrb_value http_request_method(mrb_state *mrb, mrb_value self) +{ + h2o_mruby_generator_t *generator; + struct st_h2o_mruby_http_request_context_t *ctx; + const char *arg_url; + mrb_int arg_url_len; + mrb_value arg_hash; + h2o_iovec_t method; + h2o_url_t url; + + /* parse args */ + arg_hash = mrb_nil_value(); + mrb_get_args(mrb, "s|H", &arg_url, &arg_url_len, &arg_hash); + + /* precond check */ + if ((generator = h2o_mruby_current_generator) == NULL || generator->req == NULL) + mrb_exc_raise(mrb, create_downstream_closed_exception(mrb)); + + /* allocate context and initialize */ + ctx = h2o_mem_alloc_shared(&generator->req->pool, sizeof(*ctx), on_dispose); + memset(ctx, 0, sizeof(*ctx)); + ctx->generator = generator; + ctx->receiver = mrb_nil_value(); + h2o_buffer_init(&ctx->req.buf, &h2o_socket_buffer_prototype); + h2o_buffer_init(&ctx->resp.after_closed, &h2o_socket_buffer_prototype); + ctx->refs.request = mrb_nil_value(); + ctx->refs.input_stream = mrb_nil_value(); + + /* uri */ + if (h2o_url_parse(arg_url, arg_url_len, &url) != 0) + mrb_raise(mrb, E_ARGUMENT_ERROR, "invaild URL"); + + /* method */ + method = h2o_iovec_init(H2O_STRLIT("GET")); + if (mrb_hash_p(arg_hash)) { + mrb_value t = mrb_hash_get(mrb, arg_hash, mrb_symbol_value(generator->ctx->shared->symbols.sym_method)); + if (!mrb_nil_p(t)) { + t = mrb_str_to_str(mrb, t); + method = h2o_iovec_init(RSTRING_PTR(t), RSTRING_LEN(t)); + if (h2o_memis(method.base, method.len, H2O_STRLIT("HEAD"))) { + ctx->req.method_is_head = 1; + } + } + } + + /* start building the request */ + h2o_buffer_reserve(&ctx->req.buf, method.len + 1); + append_to_buffer(&ctx->req.buf, method.base, method.len); + append_to_buffer(&ctx->req.buf, H2O_STRLIT(" ")); + h2o_buffer_reserve(&ctx->req.buf, + url.path.len + url.authority.len + sizeof(" HTTP/1.1\r\nConnection: close\r\nHost: \r\n") - 1); + append_to_buffer(&ctx->req.buf, url.path.base, url.path.len); + append_to_buffer(&ctx->req.buf, H2O_STRLIT(" HTTP/1.1\r\nConnection: close\r\nHost: ")); + append_to_buffer(&ctx->req.buf, url.authority.base, url.authority.len); + append_to_buffer(&ctx->req.buf, H2O_STRLIT("\r\n")); + + /* headers */ + if (mrb_hash_p(arg_hash)) { + mrb_value headers = mrb_hash_get(mrb, arg_hash, mrb_symbol_value(generator->ctx->shared->symbols.sym_headers)); + if (!mrb_nil_p(headers)) { + if (h2o_mruby_iterate_headers(generator->ctx, headers, flatten_request_header, ctx) != 0) { + mrb_value exc = mrb_obj_value(mrb->exc); + mrb->exc = NULL; + mrb_exc_raise(mrb, exc); + } + } + } + /* body */ + if (mrb_hash_p(arg_hash)) { + mrb_value body = mrb_hash_get(mrb, arg_hash, mrb_symbol_value(generator->ctx->shared->symbols.sym_body)); + if (!mrb_nil_p(body)) { + if (mrb_obj_eq(mrb, body, generator->rack_input)) { + /* fast path */ + mrb_int pos; + mrb_input_stream_get_data(mrb, body, NULL, NULL, &pos, NULL, NULL); + ctx->req.body = generator->req->entity; + ctx->req.body.base += pos; + ctx->req.body.len -= pos; + } else { + if (!mrb_string_p(body)) { + body = mrb_funcall(mrb, body, "read", 0); + if (!mrb_string_p(body)) + mrb_raise(mrb, E_ARGUMENT_ERROR, "body.read did not return string"); + } + ctx->req.body = h2o_strdup(&ctx->generator->req->pool, RSTRING_PTR(body), RSTRING_LEN(body)); + } + if (!ctx->req.has_transfer_encoding) { + char buf[64]; + size_t l = (size_t)sprintf(buf, "content-length: %zu\r\n", ctx->req.body.len); + h2o_buffer_reserve(&ctx->req.buf, l); + append_to_buffer(&ctx->req.buf, buf, l); + } + } + } + + h2o_buffer_reserve(&ctx->req.buf, 2); + append_to_buffer(&ctx->req.buf, H2O_STRLIT("\r\n")); + + /* build request and connect */ + ctx->refs.request = h2o_mruby_create_data_instance( + mrb, mrb_ary_entry(generator->ctx->shared->constants, H2O_MRUBY_HTTP_REQUEST_CLASS), ctx, &request_type); + h2o_http1client_connect(&ctx->client, ctx, &generator->req->conn->ctx->proxy.client_ctx, url.host, h2o_url_get_port(&url), + url.scheme == &H2O_URL_SCHEME_HTTPS, on_connect); + + return ctx->refs.request; +} + +mrb_value h2o_mruby_http_join_response_callback(h2o_mruby_generator_t *generator, mrb_value receiver, mrb_value args, + int *next_action) +{ + mrb_state *mrb = generator->ctx->shared->mrb; + struct st_h2o_mruby_http_request_context_t *ctx; + + if (generator->req == NULL) + return create_downstream_closed_exception(mrb); + + if ((ctx = mrb_data_check_get_ptr(mrb, mrb_ary_entry(args, 0), &request_type)) == NULL) + return mrb_exc_new_str_lit(mrb, E_ARGUMENT_ERROR, "HttpRequest#join wrong self"); + + ctx->receiver = receiver; + *next_action = H2O_MRUBY_CALLBACK_NEXT_ACTION_ASYNC; + return mrb_nil_value(); +} + +mrb_value h2o_mruby_http_fetch_chunk_callback(h2o_mruby_generator_t *generator, mrb_value receiver, mrb_value args, + int *next_action) +{ + mrb_state *mrb = generator->ctx->shared->mrb; + struct st_h2o_mruby_http_request_context_t *ctx; + mrb_value ret; + + if (generator->req == NULL) + return create_downstream_closed_exception(mrb); + + if ((ctx = mrb_data_check_get_ptr(mrb, mrb_ary_entry(args, 0), &input_stream_type)) == NULL) + return mrb_exc_new_str_lit(mrb, E_ARGUMENT_ERROR, "_HttpInputStream#each wrong self"); + + if (ctx->resp.has_content) { + ret = build_chunk(ctx); + } else { + ctx->receiver = receiver; + *next_action = H2O_MRUBY_CALLBACK_NEXT_ACTION_ASYNC; + ret = mrb_nil_value(); + } + + return ret; +} + +h2o_mruby_http_request_context_t *h2o_mruby_http_set_shortcut(mrb_state *mrb, mrb_value obj, void (*cb)(h2o_mruby_generator_t *)) +{ + struct st_h2o_mruby_http_request_context_t *ctx; + + if ((ctx = mrb_data_check_get_ptr(mrb, obj, &input_stream_type)) == NULL) + return NULL; + ctx->shortcut_notify_cb = cb; + return ctx; +} + +h2o_buffer_t **h2o_mruby_http_peek_content(h2o_mruby_http_request_context_t *ctx, int *is_final) +{ + *is_final = ctx->client == NULL; + return ctx->client != NULL && ctx->resp.has_content ? &ctx->client->sock->input : &ctx->resp.after_closed; +} + +void h2o_mruby_http_request_init_context(h2o_mruby_shared_context_t *ctx) +{ + mrb_state *mrb = ctx->mrb; + + h2o_mruby_eval_expr(mrb, H2O_MRUBY_CODE_HTTP_REQUEST); + h2o_mruby_assert(mrb); + + struct RClass *module, *klass; + module = mrb_define_module(mrb, "H2O"); + + mrb_define_method(mrb, mrb->kernel_module, "http_request", http_request_method, MRB_ARGS_ARG(1, 2)); + + klass = mrb_class_get_under(mrb, module, "HttpRequest"); + mrb_ary_set(mrb, ctx->constants, H2O_MRUBY_HTTP_REQUEST_CLASS, mrb_obj_value(klass)); + + klass = mrb_class_get_under(mrb, module, "HttpInputStream"); + mrb_ary_set(mrb, ctx->constants, H2O_MRUBY_HTTP_INPUT_STREAM_CLASS, mrb_obj_value(klass)); + + klass = mrb_class_get_under(mrb, klass, "Empty"); + mrb_ary_set(mrb, ctx->constants, H2O_MRUBY_HTTP_EMPTY_INPUT_STREAM_CLASS, mrb_obj_value(klass)); + + h2o_mruby_define_callback(mrb, "_h2o__http_join_response", H2O_MRUBY_CALLBACK_ID_HTTP_JOIN_RESPONSE); + h2o_mruby_define_callback(mrb, "_h2o__http_fetch_chunk", H2O_MRUBY_CALLBACK_ID_HTTP_FETCH_CHUNK); +} |