diff options
Diffstat (limited to 'fluent-bit/lib/monkey/mk_server/mk_stream.c')
-rw-r--r-- | fluent-bit/lib/monkey/mk_server/mk_stream.c | 338 |
1 files changed, 0 insertions, 338 deletions
diff --git a/fluent-bit/lib/monkey/mk_server/mk_stream.c b/fluent-bit/lib/monkey/mk_server/mk_stream.c deleted file mode 100644 index df0f1b0b8..000000000 --- a/fluent-bit/lib/monkey/mk_server/mk_stream.c +++ /dev/null @@ -1,338 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Monkey HTTP Server - * ================== - * Copyright 2001-2017 Eduardo Silva <eduardo@monkey.io> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <monkey/monkey.h> -#include <monkey/mk_stream.h> -#include <assert.h> - -/* Create a new channel */ -struct mk_channel *mk_channel_new(int type, int fd) -{ - struct mk_channel *channel; - - channel = mk_mem_alloc(sizeof(struct mk_channel)); - if (!channel) { - return NULL; - } - channel->type = type; - channel->fd = fd; - channel->status = MK_CHANNEL_OK; - mk_list_init(&channel->streams); - - return channel; -} - -int mk_channel_release(struct mk_channel *channel) -{ - mk_mem_free(channel); - return 0; -} - -static inline size_t channel_write_in_file(struct mk_channel *channel, - struct mk_stream_input *in) -{ - ssize_t bytes = 0; - - MK_TRACE("[CH %i] STREAM_FILE [fd=%i], bytes=%lu", - channel->fd, in->fd, in->bytes_total); - - /* Direct write */ - bytes = mk_sched_conn_sendfile(channel, - in->fd, - &in->bytes_offset, - in->bytes_total - ); - MK_TRACE("[CH=%d] [FD=%i] WRITE STREAM FILE: %lu bytes", - channel->fd, in->fd, bytes); - - return bytes; -} - -size_t mk_stream_size(struct mk_stream *stream) -{ - return (stream->bytes_total - stream->bytes_offset); -} - -/* - * It 'intent' to write a few streams over the channel and alter the - * channel notification side if required: READ -> WRITE. - */ -int mk_channel_flush(struct mk_channel *channel) -{ - int ret = 0; - size_t count = 0; - size_t total = 0; - uint32_t stop = (MK_CHANNEL_DONE | MK_CHANNEL_ERROR | MK_CHANNEL_EMPTY); - - do { - ret = mk_channel_write(channel, &count); - total += count; - -#ifdef MK_HAVE_TRACE - MK_TRACE("Channel flush: %d bytes", count); - if (ret & MK_CHANNEL_DONE) { - MK_TRACE("Channel was empty"); - } - if (ret & MK_CHANNEL_ERROR) { - MK_TRACE("Channel error"); - } - if (ret & MK_CHANNEL_EMPTY) { - MK_TRACE("Channel empty"); - } -#endif - } while (total <= 4096 && ((ret & stop) == 0)); - - if (ret == MK_CHANNEL_DONE) { - MK_TRACE("Channel done"); - return ret; - } - else if (ret & (MK_CHANNEL_FLUSH | MK_CHANNEL_BUSY)) { - MK_TRACE("Channel FLUSH | BUSY"); - if ((channel->event->mask & MK_EVENT_WRITE) == 0) { - mk_event_add(mk_sched_loop(), - channel->fd, - MK_EVENT_CONNECTION, - MK_EVENT_WRITE, - channel->event); - } - } - - return ret; -} - -int mk_stream_in_release(struct mk_stream_input *in) -{ - if (in->cb_finished) { - in->cb_finished(in); - } - - mk_stream_input_unlink(in); - if (in->dynamic == MK_TRUE) { - mk_mem_free(in); - } - - return 0; -} - -int mk_channel_stream_write(struct mk_stream *stream, size_t *count) -{ - ssize_t bytes = 0; - struct mk_iov *iov; - struct mk_list *tmp; - struct mk_list *head; - struct mk_channel *channel; - struct mk_stream_input *input; - - channel = stream->channel; - - /* Validate channel status */ - if (channel->status != MK_CHANNEL_OK) { - return -MK_CHANNEL_ERROR; - } - - /* Iterate inputs and process stream */ - mk_list_foreach_safe(head, tmp, &stream->inputs) { - input = mk_list_entry(head, struct mk_stream_input, _head); - if (input->type == MK_STREAM_FILE) { - bytes = channel_write_in_file(channel, input); - } - else if (input->type == MK_STREAM_IOV) { - iov = input->buffer; - if (!iov) { - return MK_CHANNEL_EMPTY; - } - - bytes = mk_sched_conn_writev(channel, iov); - - MK_TRACE("[CH %i] STREAM_IOV, wrote %d bytes", - channel->fd, bytes); - if (bytes > 0) { - /* Perform the adjustment on mk_iov */ - mk_iov_consume(iov, bytes); - } - } - else if (input->type == MK_STREAM_RAW) { - bytes = mk_sched_conn_write(channel, - input->buffer, input->bytes_total); - MK_TRACE("[CH %i] STREAM_RAW, bytes=%lu/%lu\n", - channel->fd, bytes, input->bytes_total); - } - - if (bytes > 0) { - *count = bytes; - mk_stream_input_consume(input, bytes); - - /* notification callback, optional */ - if (stream->cb_bytes_consumed) { - stream->cb_bytes_consumed(stream, bytes); - } - - if (input->cb_consumed) { - input->cb_consumed(input, bytes); - } - - if (input->bytes_total == 0) { - MK_TRACE("Input done, unlinking (channel=%p)", channel); - mk_stream_in_release(input); - } - MK_TRACE("[CH %i] CHANNEL_FLUSH", channel->fd); - } - else if (bytes < 0) { - mk_stream_in_release(input); - return -MK_CHANNEL_ERROR; - } - else if (bytes == 0) { - mk_stream_in_release(input); - return -MK_CHANNEL_ERROR; - } - } - - return bytes; -} - -/* It perform a direct stream I/O write through the network layer */ -int mk_channel_write(struct mk_channel *channel, size_t *count) -{ - ssize_t bytes = -1; - struct mk_iov *iov; - struct mk_stream *stream = NULL; - struct mk_stream_input *input; - - errno = 0; - - if (mk_list_is_empty(&channel->streams) == 0) { - MK_TRACE("[CH %i] CHANNEL_EMPTY", channel->fd); - return MK_CHANNEL_EMPTY; - } - - /* Get the input source */ - stream = mk_list_entry_first(&channel->streams, struct mk_stream, _head); - if (mk_list_is_empty(&stream->inputs) == 0) { - return MK_CHANNEL_EMPTY; - } - input = mk_list_entry_first(&stream->inputs, struct mk_stream_input, _head); - - /* - * Based on the Stream Input type we consume on that way, not all inputs - * requires to read from buffer, e.g: Static File, Pipes. - */ - if (channel->type == MK_CHANNEL_SOCKET) { - if (input->type == MK_STREAM_FILE) { - bytes = channel_write_in_file(channel, input); - } - else if (input->type == MK_STREAM_IOV) { - iov = input->buffer; - if (!iov) { - return MK_CHANNEL_EMPTY; - } - - bytes = mk_sched_conn_writev(channel, iov); - - MK_TRACE("[CH %i] STREAM_IOV, wrote %d bytes", - channel->fd, bytes); - if (bytes > 0) { - /* Perform the adjustment on mk_iov */ - mk_iov_consume(iov, bytes); - } - } - else if (input->type == MK_STREAM_RAW) { - bytes = mk_sched_conn_write(channel, - input->buffer, input->bytes_total); - MK_TRACE("[CH %i] STREAM_RAW, bytes=%lu/%lu", - channel->fd, bytes, input->bytes_total); - if (bytes > 0) { - /* DEPRECATED: consume_raw(input, bytes); */ - } - } - - if (bytes > 0) { - *count = bytes; - mk_stream_input_consume(input, bytes); - - /* notification callback, optional */ - if (stream->cb_bytes_consumed) { - stream->cb_bytes_consumed(stream, bytes); - } - - if (input->cb_consumed) { - input->cb_consumed(input, bytes); - } - - if (input->bytes_total == 0) { - MK_TRACE("Input done, unlinking (channel=%p)", channel); - mk_stream_in_release(input); - } - - if (mk_list_is_empty(&stream->inputs) == 0) { - /* Everytime the stream is empty, we notify the trigger the cb */ - if (stream->cb_finished) { - stream->cb_finished(stream); - } - - if (mk_channel_is_empty(channel) == 0) { - MK_TRACE("[CH %i] CHANNEL_DONE", channel->fd); - return MK_CHANNEL_DONE; - } - else { - MK_TRACE("[CH %i] CHANNEL_FLUSH", channel->fd); - return MK_CHANNEL_FLUSH; - } - } - - MK_TRACE("[CH %i] CHANNEL_FLUSH", channel->fd); - return MK_CHANNEL_FLUSH; - } - else if (bytes < 0) { - if (errno == EAGAIN) { - return MK_CHANNEL_BUSY; - } - - mk_stream_in_release(input); - return MK_CHANNEL_ERROR; - } - else if (bytes == 0) { - mk_stream_in_release(input); - return MK_CHANNEL_ERROR; - } - } - - return MK_CHANNEL_ERROR; -} - -/* Remove any dynamic memory associated */ -int mk_channel_clean(struct mk_channel *channel) -{ - struct mk_list *tmp; - struct mk_list *tmp_in; - struct mk_list *head; - struct mk_list *head_in; - struct mk_stream *stream; - struct mk_stream_input *in; - - mk_list_foreach_safe(head, tmp, &channel->streams) { - stream = mk_list_entry(head, struct mk_stream, _head); - mk_list_foreach_safe(head_in, tmp_in, &stream->inputs) { - in = mk_list_entry(head_in, struct mk_stream_input, _head); - mk_stream_in_release(in); - } - mk_stream_release(stream); - } - - return 0; -} |