diff options
Diffstat (limited to 'src/fluent-bit/lib/monkey/mk_server/mk_net.c')
-rw-r--r-- | src/fluent-bit/lib/monkey/mk_server/mk_net.c | 284 |
1 files changed, 284 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/monkey/mk_server/mk_net.c b/src/fluent-bit/lib/monkey/mk_server/mk_net.c new file mode 100644 index 000000000..de183de48 --- /dev/null +++ b/src/fluent-bit/lib/monkey/mk_server/mk_net.c @@ -0,0 +1,284 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Monkey HTTP Server + * ================== + * Copyright 2001-2016 Monkey Software LLC <eduardo@monkey.io> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <monkey/mk_core.h> +#include <monkey/mk_net.h> +#include <monkey/mk_scheduler.h> +#include <monkey/mk_plugin.h> +#include <monkey/mk_thread.h> +#include <monkey/mk_tls.h> + +#ifdef _WIN32 +#include <winsock2.h> +#include <afunix.h> +#else +#include <sys/socket.h> +#include <netinet/tcp.h> +#endif + +/* Initialize the network stack*/ +int mk_net_init() +{ +#ifdef _WIN32 + int result; + WSADATA wsa_data; + static int initialized = 0; + + if(0 != initialized) { + return 0; + } + + result = WSAStartup(MAKEWORD(2, 2), &wsa_data); + + if(0 != result) { + if(WSAEINPROGRESS == result) + { + Sleep(100); /* Let the other thread finish initializing the stack */ + + return 0; + } + + return -1; + } + + initialized = 1; +#endif + + return 0; +} + +/* Connect to a TCP socket server */ +static int mk_net_fd_connect(int fd, char *host, unsigned long port) +{ + int ret; + struct addrinfo hints; + struct addrinfo *res; + char _port[6]; + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + snprintf(_port, sizeof(_port), "%lu", port); + ret = getaddrinfo(host, _port, &hints, &res); + if (ret != 0) { + return -1; + } + + ret = connect(fd, res->ai_addr, res->ai_addrlen); + freeaddrinfo(res); + + return ret; +} + +struct mk_net_connection *mk_net_conn_create(char *addr, int port) +{ + int fd; + int ret; + int error = 0; + socklen_t len = sizeof(error); + struct mk_sched_worker *sched; + struct mk_net_connection *conn; + + /* Allocate connection context */ + conn = mk_mem_alloc(sizeof(struct mk_net_connection)); + if (!conn) { + return NULL; + } + + /* Create socket */ + fd = mk_socket_create(AF_INET, SOCK_STREAM, 0); + if (fd == -1) { + mk_mem_free(conn); + return NULL; + } + + /* Make socket async */ + mk_socket_set_nonblocking(fd); + conn->fd = fd; + + ret = mk_net_fd_connect(conn->fd, addr, port); + if (ret == -1) { + if (errno != EINPROGRESS) { + close(fd); + mk_mem_free(conn); + return NULL; + } + + MK_EVENT_NEW(&conn->event); + + sched = mk_sched_get_thread_conf(); + // FIXME: not including the thread + //conn->thread = mk_thread_get(); + ret = mk_event_add(sched->loop, conn->fd, MK_EVENT_THREAD, + MK_EVENT_WRITE, &conn->event); + if (ret == -1) { + close(fd); + mk_mem_free(conn); + return NULL; + } + + /* + * Return the control to the parent caller, we need to wait for + * the event loop to get back to us. + */ + mk_thread_yield(conn->thread); + + /* We got a notification, remove the event registered */ + ret = mk_event_del(sched->loop, &conn->event); + + /* Check the connection status */ + if (conn->event.mask & MK_EVENT_WRITE) { + ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len); + if (ret == -1) { + close(fd); + mk_mem_free(conn); + return NULL; + } + + if (error != 0) { + /* Connection is broken, not much to do here */ + fprintf(stderr, "Async connection failed %s:%i\n", + conn->host, conn->port); + close(fd); + mk_mem_free(conn); + return NULL; + } + MK_EVENT_NEW(&conn->event); + return conn; + } + else { + close(fd); + mk_mem_free(conn); + return NULL; + } + } + + return NULL; +} + +int mk_net_conn_write(struct mk_channel *channel, + void *data, size_t len) +{ + int ret = 0; + int error; + ssize_t bytes; + size_t total = 0; + size_t send; + socklen_t slen = sizeof(error); + struct mk_thread *th = MK_TLS_GET(mk_thread); + struct mk_sched_worker *sched; + + sched = mk_sched_get_thread_conf(); + if (!sched) { + return -1; + } + + retry: + error = 0; + + if (len - total > 524288) { + send = 524288; + } + else { + send = (len - total); + } + + send = len - total; + bytes = channel->io->write(channel->io->plugin, channel->fd, (uint8_t *)data + total, send); + if (bytes == -1) { + if (errno == EAGAIN) { + MK_EVENT_NEW(channel->event); + channel->thread = th; + ret = mk_event_add(sched->loop, + channel->fd, + MK_EVENT_THREAD, + MK_EVENT_WRITE, channel->event); + if (ret == -1) { + /* + * If we failed here there no much that we can do, just + * let the caller we failed + */ + return -1; + } + + /* + * Return the control to the parent caller, we need to wait for + * the event loop to get back to us. + */ + mk_thread_yield(th); + + /* We got a notification, remove the event registered */ + ret = mk_event_del(sched->loop, channel->event); + if (ret == -1) { + return -1; + } + + /* Check the connection status */ + if (channel->event->mask & MK_EVENT_WRITE) { + ret = getsockopt(channel->fd, SOL_SOCKET, SO_ERROR, &error, &slen); + if (ret == -1) { + fprintf(stderr, "[io] could not validate socket status"); + return -1; + } + + if (error != 0) { + return -1; + } + + MK_EVENT_NEW(channel->event); + goto retry; + } + else { + return -1; + } + + } + else { + return -1; + } + } + + /* Update counters */ + total += bytes; + if (total < len) { + channel->thread = th; + ret = mk_event_add(sched->loop, + channel->fd, + MK_EVENT_THREAD, + MK_EVENT_WRITE, channel->event); + if (ret == -1) { + /* + * If we failed here there no much that we can do, just + * let the caller we failed + */ + return -1; + } + + mk_thread_yield(th); + goto retry; + } + + if (channel->event->status & MK_EVENT_REGISTERED) { + /* We got a notification, remove the event registered */ + ret = mk_event_del(sched->loop, channel->event); + } + + return total; +} |