diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/lib/monkey/mk_server/mk_stream.c | |
parent | Initial commit. (diff) | |
download | netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.tar.xz netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/monkey/mk_server/mk_stream.c')
-rw-r--r-- | fluent-bit/lib/monkey/mk_server/mk_stream.c | 338 |
1 files changed, 338 insertions, 0 deletions
diff --git a/fluent-bit/lib/monkey/mk_server/mk_stream.c b/fluent-bit/lib/monkey/mk_server/mk_stream.c new file mode 100644 index 00000000..df0f1b0b --- /dev/null +++ b/fluent-bit/lib/monkey/mk_server/mk_stream.c @@ -0,0 +1,338 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Monkey HTTP Server + * ================== + * Copyright 2001-2017 Eduardo Silva <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <monkey/monkey.h> +#include <monkey/mk_stream.h> +#include <assert.h> + +/* Create a new channel */ +struct mk_channel *mk_channel_new(int type, int fd) +{ + struct mk_channel *channel; + + channel = mk_mem_alloc(sizeof(struct mk_channel)); + if (!channel) { + return NULL; + } + channel->type = type; + channel->fd = fd; + channel->status = MK_CHANNEL_OK; + mk_list_init(&channel->streams); + + return channel; +} + +int mk_channel_release(struct mk_channel *channel) +{ + mk_mem_free(channel); + return 0; +} + +static inline size_t channel_write_in_file(struct mk_channel *channel, + struct mk_stream_input *in) +{ + ssize_t bytes = 0; + + MK_TRACE("[CH %i] STREAM_FILE [fd=%i], bytes=%lu", + channel->fd, in->fd, in->bytes_total); + + /* Direct write */ + bytes = mk_sched_conn_sendfile(channel, + in->fd, + &in->bytes_offset, + in->bytes_total + ); + MK_TRACE("[CH=%d] [FD=%i] WRITE STREAM FILE: %lu bytes", + channel->fd, in->fd, bytes); + + return bytes; +} + +size_t mk_stream_size(struct mk_stream *stream) +{ + return (stream->bytes_total - stream->bytes_offset); +} + +/* + * It 'intent' to write a few streams over the channel and alter the + * channel notification side if required: READ -> WRITE. + */ +int mk_channel_flush(struct mk_channel *channel) +{ + int ret = 0; + size_t count = 0; + size_t total = 0; + uint32_t stop = (MK_CHANNEL_DONE | MK_CHANNEL_ERROR | MK_CHANNEL_EMPTY); + + do { + ret = mk_channel_write(channel, &count); + total += count; + +#ifdef MK_HAVE_TRACE + MK_TRACE("Channel flush: %d bytes", count); + if (ret & MK_CHANNEL_DONE) { + MK_TRACE("Channel was empty"); + } + if (ret & MK_CHANNEL_ERROR) { + MK_TRACE("Channel error"); + } + if (ret & MK_CHANNEL_EMPTY) { + MK_TRACE("Channel empty"); + } +#endif + } while (total <= 4096 && ((ret & stop) == 0)); + + if (ret == MK_CHANNEL_DONE) { + MK_TRACE("Channel done"); + return ret; + } + else if (ret & (MK_CHANNEL_FLUSH | MK_CHANNEL_BUSY)) { + MK_TRACE("Channel FLUSH | BUSY"); + if ((channel->event->mask & MK_EVENT_WRITE) == 0) { + mk_event_add(mk_sched_loop(), + channel->fd, + MK_EVENT_CONNECTION, + MK_EVENT_WRITE, + channel->event); + } + } + + return ret; +} + +int mk_stream_in_release(struct mk_stream_input *in) +{ + if (in->cb_finished) { + in->cb_finished(in); + } + + mk_stream_input_unlink(in); + if (in->dynamic == MK_TRUE) { + mk_mem_free(in); + } + + return 0; +} + +int mk_channel_stream_write(struct mk_stream *stream, size_t *count) +{ + ssize_t bytes = 0; + struct mk_iov *iov; + struct mk_list *tmp; + struct mk_list *head; + struct mk_channel *channel; + struct mk_stream_input *input; + + channel = stream->channel; + + /* Validate channel status */ + if (channel->status != MK_CHANNEL_OK) { + return -MK_CHANNEL_ERROR; + } + + /* Iterate inputs and process stream */ + mk_list_foreach_safe(head, tmp, &stream->inputs) { + input = mk_list_entry(head, struct mk_stream_input, _head); + if (input->type == MK_STREAM_FILE) { + bytes = channel_write_in_file(channel, input); + } + else if (input->type == MK_STREAM_IOV) { + iov = input->buffer; + if (!iov) { + return MK_CHANNEL_EMPTY; + } + + bytes = mk_sched_conn_writev(channel, iov); + + MK_TRACE("[CH %i] STREAM_IOV, wrote %d bytes", + channel->fd, bytes); + if (bytes > 0) { + /* Perform the adjustment on mk_iov */ + mk_iov_consume(iov, bytes); + } + } + else if (input->type == MK_STREAM_RAW) { + bytes = mk_sched_conn_write(channel, + input->buffer, input->bytes_total); + MK_TRACE("[CH %i] STREAM_RAW, bytes=%lu/%lu\n", + channel->fd, bytes, input->bytes_total); + } + + if (bytes > 0) { + *count = bytes; + mk_stream_input_consume(input, bytes); + + /* notification callback, optional */ + if (stream->cb_bytes_consumed) { + stream->cb_bytes_consumed(stream, bytes); + } + + if (input->cb_consumed) { + input->cb_consumed(input, bytes); + } + + if (input->bytes_total == 0) { + MK_TRACE("Input done, unlinking (channel=%p)", channel); + mk_stream_in_release(input); + } + MK_TRACE("[CH %i] CHANNEL_FLUSH", channel->fd); + } + else if (bytes < 0) { + mk_stream_in_release(input); + return -MK_CHANNEL_ERROR; + } + else if (bytes == 0) { + mk_stream_in_release(input); + return -MK_CHANNEL_ERROR; + } + } + + return bytes; +} + +/* It perform a direct stream I/O write through the network layer */ +int mk_channel_write(struct mk_channel *channel, size_t *count) +{ + ssize_t bytes = -1; + struct mk_iov *iov; + struct mk_stream *stream = NULL; + struct mk_stream_input *input; + + errno = 0; + + if (mk_list_is_empty(&channel->streams) == 0) { + MK_TRACE("[CH %i] CHANNEL_EMPTY", channel->fd); + return MK_CHANNEL_EMPTY; + } + + /* Get the input source */ + stream = mk_list_entry_first(&channel->streams, struct mk_stream, _head); + if (mk_list_is_empty(&stream->inputs) == 0) { + return MK_CHANNEL_EMPTY; + } + input = mk_list_entry_first(&stream->inputs, struct mk_stream_input, _head); + + /* + * Based on the Stream Input type we consume on that way, not all inputs + * requires to read from buffer, e.g: Static File, Pipes. + */ + if (channel->type == MK_CHANNEL_SOCKET) { + if (input->type == MK_STREAM_FILE) { + bytes = channel_write_in_file(channel, input); + } + else if (input->type == MK_STREAM_IOV) { + iov = input->buffer; + if (!iov) { + return MK_CHANNEL_EMPTY; + } + + bytes = mk_sched_conn_writev(channel, iov); + + MK_TRACE("[CH %i] STREAM_IOV, wrote %d bytes", + channel->fd, bytes); + if (bytes > 0) { + /* Perform the adjustment on mk_iov */ + mk_iov_consume(iov, bytes); + } + } + else if (input->type == MK_STREAM_RAW) { + bytes = mk_sched_conn_write(channel, + input->buffer, input->bytes_total); + MK_TRACE("[CH %i] STREAM_RAW, bytes=%lu/%lu", + channel->fd, bytes, input->bytes_total); + if (bytes > 0) { + /* DEPRECATED: consume_raw(input, bytes); */ + } + } + + if (bytes > 0) { + *count = bytes; + mk_stream_input_consume(input, bytes); + + /* notification callback, optional */ + if (stream->cb_bytes_consumed) { + stream->cb_bytes_consumed(stream, bytes); + } + + if (input->cb_consumed) { + input->cb_consumed(input, bytes); + } + + if (input->bytes_total == 0) { + MK_TRACE("Input done, unlinking (channel=%p)", channel); + mk_stream_in_release(input); + } + + if (mk_list_is_empty(&stream->inputs) == 0) { + /* Everytime the stream is empty, we notify the trigger the cb */ + if (stream->cb_finished) { + stream->cb_finished(stream); + } + + if (mk_channel_is_empty(channel) == 0) { + MK_TRACE("[CH %i] CHANNEL_DONE", channel->fd); + return MK_CHANNEL_DONE; + } + else { + MK_TRACE("[CH %i] CHANNEL_FLUSH", channel->fd); + return MK_CHANNEL_FLUSH; + } + } + + MK_TRACE("[CH %i] CHANNEL_FLUSH", channel->fd); + return MK_CHANNEL_FLUSH; + } + else if (bytes < 0) { + if (errno == EAGAIN) { + return MK_CHANNEL_BUSY; + } + + mk_stream_in_release(input); + return MK_CHANNEL_ERROR; + } + else if (bytes == 0) { + mk_stream_in_release(input); + return MK_CHANNEL_ERROR; + } + } + + return MK_CHANNEL_ERROR; +} + +/* Remove any dynamic memory associated */ +int mk_channel_clean(struct mk_channel *channel) +{ + struct mk_list *tmp; + struct mk_list *tmp_in; + struct mk_list *head; + struct mk_list *head_in; + struct mk_stream *stream; + struct mk_stream_input *in; + + mk_list_foreach_safe(head, tmp, &channel->streams) { + stream = mk_list_entry(head, struct mk_stream, _head); + mk_list_foreach_safe(head_in, tmp_in, &stream->inputs) { + in = mk_list_entry(head_in, struct mk_stream_input, _head); + mk_stream_in_release(in); + } + mk_stream_release(stream); + } + + return 0; +} |