diff options
Diffstat (limited to '')
-rw-r--r-- | storage/mroonga/vendor/groonga/lib/com.c | 1202 |
1 files changed, 1202 insertions, 0 deletions
diff --git a/storage/mroonga/vendor/groonga/lib/com.c b/storage/mroonga/vendor/groonga/lib/com.c new file mode 100644 index 00000000..7761f483 --- /dev/null +++ b/storage/mroonga/vendor/groonga/lib/com.c @@ -0,0 +1,1202 @@ +/* -*- c-basic-offset: 2 -*- */ +/* Copyright(C) 2009-2012 Brazil + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA +*/ + +#include "grn.h" + +#include <stdio.h> +#include <string.h> +#include "grn_ctx_impl.h" + +#ifdef WIN32 +# include <ws2tcpip.h> +#else +# ifdef HAVE_SYS_SOCKET_H +# include <sys/socket.h> +# endif /* HAVE_SYS_SOCKET_H */ +# include <netinet/in.h> +# include <netinet/tcp.h> +# ifdef HAVE_SIGNAL_H +# include <signal.h> +# endif /* HAVE_SIGNAL_H */ +# include <sys/uio.h> +#endif /* WIN32 */ + +#include "grn_ctx.h" +#include "grn_com.h" + +#ifndef PF_INET +#define PF_INET AF_INET +#endif /* PF_INET */ + +#ifndef SOL_TCP +# ifdef IPPROTO_TCP +# define SOL_TCP IPPROTO_TCP +# else +# define SOL_TCP 6 +# endif /* IPPROTO_TCP */ +#endif /* SOL_TCP */ + +#ifndef USE_MSG_MORE +# ifdef MSG_MORE +# undef MSG_MORE +# endif +# define MSG_MORE 0 +#endif /* USE_MSG_MORE */ + + +#ifndef USE_MSG_NOSIGNAL +# ifdef MSG_NOSIGNAL +# undef MSG_NOSIGNAL +# endif +# define MSG_NOSIGNAL 0 +#endif /* USE_MSG_NOSIGNAL */ +/******* grn_com_queue ********/ + +grn_rc +grn_com_queue_enque(grn_ctx *ctx, grn_com_queue *q, grn_com_queue_entry *e) +{ + CRITICAL_SECTION_ENTER(q->cs); + e->next = NULL; + *q->tail = e; + q->tail = &e->next; + CRITICAL_SECTION_LEAVE(q->cs); + /* + uint8_t i = q->last + 1; + e->next = NULL; + if (q->first == i || q->next) { + CRITICAL_SECTION_ENTER(q->cs); + if (q->first == i || q->next) { + *q->tail = e; + q->tail = &e->next; + } else { + q->bins[q->last] = e; + q->last = i; + } + CRITICAL_SECTION_LEAVE(q->cs); + } else { + q->bins[q->last] = e; + q->last = i; + } + */ + return GRN_SUCCESS; +} + +grn_com_queue_entry * +grn_com_queue_deque(grn_ctx *ctx, grn_com_queue *q) +{ + grn_com_queue_entry *e = NULL; + + CRITICAL_SECTION_ENTER(q->cs); + if (q->next) { + e = q->next; + if (!(q->next = e->next)) { q->tail = &q->next; } + } + CRITICAL_SECTION_LEAVE(q->cs); + + /* + if (q->first == q->last) { + if (q->next) { + CRITICAL_SECTION_ENTER(q->cs); + e = q->next; + if (!(q->next = e->next)) { q->tail = &q->next; } + CRITICAL_SECTION_LEAVE(q->cs); + } + } else { + e = q->bins[q->first++]; + } + */ + return e; +} + +/******* grn_msg ********/ + +grn_obj * +grn_msg_open(grn_ctx *ctx, grn_com *com, grn_com_queue *old) +{ + grn_msg *msg = NULL; + if (old && (msg = (grn_msg *)grn_com_queue_deque(ctx, old))) { + if (msg->ctx != ctx) { + ERR(GRN_INVALID_ARGUMENT, "ctx unmatch"); + return NULL; + } + GRN_BULK_REWIND(&msg->qe.obj); + } else if ((msg = GRN_MALLOCN(grn_msg, 1))) { + GRN_OBJ_INIT(&msg->qe.obj, GRN_MSG, 0, GRN_DB_TEXT); + msg->qe.obj.header.impl_flags |= GRN_OBJ_ALLOCATED; + msg->ctx = ctx; + } + msg->qe.next = NULL; + msg->u.peer = com; + msg->old = old; + memset(&msg->header, 0, sizeof(grn_com_header)); + return (grn_obj *)msg; +} + +grn_obj * +grn_msg_open_for_reply(grn_ctx *ctx, grn_obj *query, grn_com_queue *old) +{ + grn_msg *req = (grn_msg *)query, *msg = NULL; + if (req && (msg = (grn_msg *)grn_msg_open(ctx, req->u.peer, old))) { + msg->edge_id = req->edge_id; + msg->header.proto = req->header.proto == GRN_COM_PROTO_MBREQ + ? GRN_COM_PROTO_MBRES : req->header.proto; + } + return (grn_obj *)msg; +} + +grn_rc +grn_msg_close(grn_ctx *ctx, grn_obj *obj) +{ + grn_msg *msg = (grn_msg *)obj; + if (ctx == msg->ctx) { return grn_obj_close(ctx, obj); } + return grn_com_queue_enque(ctx, msg->old, (grn_com_queue_entry *)msg); +} + +grn_rc +grn_msg_set_property(grn_ctx *ctx, grn_obj *obj, + uint16_t status, uint32_t key_size, uint8_t extra_size) +{ + grn_com_header *header = &((grn_msg *)obj)->header; + header->status = htons(status); + header->keylen = htons(key_size); + header->level = extra_size; + return GRN_SUCCESS; +} + +grn_rc +grn_msg_send(grn_ctx *ctx, grn_obj *msg, int flags) +{ + grn_rc rc; + grn_msg *m = (grn_msg *)msg; + grn_com *peer = m->u.peer; + grn_com_header *header = &m->header; + if (GRN_COM_QUEUE_EMPTYP(&peer->new_)) { + switch (header->proto) { + case GRN_COM_PROTO_HTTP : + { + ssize_t ret; + ret = send(peer->fd, GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), MSG_NOSIGNAL); + if (ret == -1) { SOERR("send"); } + if (ctx->rc != GRN_OPERATION_WOULD_BLOCK) { + grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg); + return ctx->rc; + } + } + break; + case GRN_COM_PROTO_GQTP : + { + if (flags & GRN_CTX_MORE) { flags |= GRN_CTX_QUIET; } + if (ctx->stat == GRN_CTX_QUIT) { flags |= GRN_CTX_QUIT; } + header->qtype = (uint8_t) ctx->impl->output.type; + header->keylen = 0; + header->level = 0; + header->flags = flags; + header->status = htons((uint16_t)ctx->rc); + header->opaque = 0; + header->cas = 0; + //todo : MSG_DONTWAIT + rc = grn_com_send(ctx, peer, header, + GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), 0); + if (rc != GRN_OPERATION_WOULD_BLOCK) { + grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg); + return rc; + } + } + break; + case GRN_COM_PROTO_MBREQ : + return GRN_FUNCTION_NOT_IMPLEMENTED; + case GRN_COM_PROTO_MBRES : + rc = grn_com_send(ctx, peer, header, + GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), + (flags & GRN_CTX_MORE) ? MSG_MORE :0); + if (rc != GRN_OPERATION_WOULD_BLOCK) { + grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg); + return rc; + } + break; + default : + return GRN_INVALID_ARGUMENT; + } + } + MUTEX_LOCK(peer->ev->mutex); + rc = grn_com_queue_enque(ctx, &peer->new_, (grn_com_queue_entry *)msg); + COND_SIGNAL(peer->ev->cond); + MUTEX_UNLOCK(peer->ev->mutex); + return rc; +} + +/******* grn_com ********/ + +grn_rc +grn_com_init(void) +{ +#ifdef WIN32 + WSADATA wd; + if (WSAStartup(MAKEWORD(2, 0), &wd) != 0) { + grn_ctx *ctx = &grn_gctx; + SOERR("WSAStartup"); + } +#else /* WIN32 */ +#ifndef USE_MSG_NOSIGNAL + if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) { + grn_ctx *ctx = &grn_gctx; + SERR("signal"); + } +#endif /* USE_MSG_NOSIGNAL */ +#endif /* WIN32 */ + return grn_gctx.rc; +} + +void +grn_com_fin(void) +{ +#ifdef WIN32 + WSACleanup(); +#endif /* WIN32 */ +} + +grn_rc +grn_com_event_init(grn_ctx *ctx, grn_com_event *ev, int max_nevents, int data_size) +{ + ev->max_nevents = max_nevents; + if ((ev->hash = grn_hash_create(ctx, NULL, sizeof(grn_sock), data_size, 0))) { + MUTEX_INIT(ev->mutex); + COND_INIT(ev->cond); + GRN_COM_QUEUE_INIT(&ev->recv_old); + ev->msg_handler = NULL; + memset(&(ev->curr_edge_id), 0, sizeof(grn_com_addr)); + ev->acceptor = NULL; + ev->opaque = NULL; +#ifndef USE_SELECT +# ifdef USE_EPOLL + if ((ev->events = GRN_MALLOC(sizeof(struct epoll_event) * max_nevents))) { + if ((ev->epfd = epoll_create(max_nevents)) != -1) { + goto exit; + } else { + SERR("epoll_create"); + } + GRN_FREE(ev->events); + } +# else /* USE_EPOLL */ +# ifdef USE_KQUEUE + if ((ev->events = GRN_MALLOC(sizeof(struct kevent) * max_nevents))) { + if ((ev->kqfd = kqueue()) != -1) { + goto exit; + } else { + SERR("kqueue"); + } + GRN_FREE(ev->events); + } +# else /* USE_KQUEUE */ + if ((ev->events = GRN_MALLOC(sizeof(struct pollfd) * max_nevents))) { + goto exit; + } +# endif /* USE_KQUEUE*/ +# endif /* USE_EPOLL */ + grn_hash_close(ctx, ev->hash); + ev->hash = NULL; + ev->events = NULL; +#else /* USE_SELECT */ + goto exit; +#endif /* USE_SELECT */ + } +exit : + return ctx->rc; +} + +grn_rc +grn_com_event_fin(grn_ctx *ctx, grn_com_event *ev) +{ + grn_obj *msg; + while ((msg = (grn_obj *)grn_com_queue_deque(ctx, &ev->recv_old))) { + grn_msg_close(ctx, msg); + } + if (ev->hash) { grn_hash_close(ctx, ev->hash); } +#ifndef USE_SELECT + if (ev->events) { GRN_FREE(ev->events); } +# ifdef USE_EPOLL + grn_close(ev->epfd); +# endif /* USE_EPOLL */ +# ifdef USE_KQUEUE + grn_close(ev->kqfd); +# endif /* USE_KQUEUE*/ +#endif /* USE_SELECT */ + return GRN_SUCCESS; +} + +grn_rc +grn_com_event_add(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com) +{ + grn_com *c; + /* todo : expand events */ + if (!ev || *ev->hash->n_entries == (uint32_t) ev->max_nevents) { + if (ev) { GRN_LOG(ctx, GRN_LOG_ERROR, "too many connections (%d)", ev->max_nevents); } + return GRN_INVALID_ARGUMENT; + } +#ifdef USE_EPOLL + { + struct epoll_event e; + memset(&e, 0, sizeof(struct epoll_event)); + e.data.fd = (fd); + e.events = (uint32_t) events; + if (epoll_ctl(ev->epfd, EPOLL_CTL_ADD, (fd), &e) == -1) { + SERR("epoll_ctl"); + return ctx->rc; + } + } +#endif /* USE_EPOLL*/ +#ifdef USE_KQUEUE + { + struct kevent e; + /* todo: udata should have fd */ + EV_SET(&e, (fd), events, EV_ADD, 0, 0, NULL); + if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) { + SERR("kevent"); + return ctx->rc; + } + } +#endif /* USE_KQUEUE */ + { + if (grn_hash_add(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c, NULL)) { + c->ev = ev; + c->fd = fd; + c->events = events; + if (com) { *com = c; } + } + } + return ctx->rc; +} + +grn_rc +grn_com_event_mod(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com) +{ + grn_com *c; + if (!ev) { return GRN_INVALID_ARGUMENT; } + if (grn_hash_get(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c)) { + if (c->fd != fd) { + GRN_LOG(ctx, GRN_LOG_ERROR, + "grn_com_event_mod fd unmatch " + "%" GRN_FMT_SOCKET " != %" GRN_FMT_SOCKET, + c->fd, fd); + return GRN_OBJECT_CORRUPT; + } + if (com) { *com = c; } + if (c->events != events) { +#ifdef USE_EPOLL + struct epoll_event e; + memset(&e, 0, sizeof(struct epoll_event)); + e.data.fd = (fd); + e.events = (uint32_t) events; + if (epoll_ctl(ev->epfd, EPOLL_CTL_MOD, (fd), &e) == -1) { + SERR("epoll_ctl"); + return ctx->rc; + } +#endif /* USE_EPOLL*/ +#ifdef USE_KQUEUE + // experimental + struct kevent e[2]; + EV_SET(&e[0], (fd), GRN_COM_POLLIN|GRN_COM_POLLOUT, EV_DELETE, 0, 0, NULL); + EV_SET(&e[1], (fd), events, EV_ADD, 0, 0, NULL); + if (kevent(ev->kqfd, e, 2, NULL, 0, NULL) == -1) { + SERR("kevent"); + return ctx->rc; + } +#endif /* USE_KQUEUE */ + c->events = events; + } + return GRN_SUCCESS; + } + return GRN_INVALID_ARGUMENT; +} + +grn_rc +grn_com_event_del(grn_ctx *ctx, grn_com_event *ev, grn_sock fd) +{ + if (!ev) { return GRN_INVALID_ARGUMENT; } + { + grn_com *c; + grn_id id = grn_hash_get(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c); + if (id) { +#ifdef USE_EPOLL + if (!c->closed) { + struct epoll_event e; + memset(&e, 0, sizeof(struct epoll_event)); + e.data.fd = fd; + e.events = c->events; + if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, fd, &e) == -1) { + SERR("epoll_ctl"); + return ctx->rc; + } + } +#endif /* USE_EPOLL*/ +#ifdef USE_KQUEUE + struct kevent e; + EV_SET(&e, (fd), c->events, EV_DELETE, 0, 0, NULL); + if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) { + SERR("kevent"); + return ctx->rc; + } +#endif /* USE_KQUEUE */ + return grn_hash_delete_by_id(ctx, ev->hash, id, NULL); + } else { + GRN_LOG(ctx, GRN_LOG_ERROR, + "%04x| fd(%" GRN_FMT_SOCKET ") not found in ev(%p)", + grn_getpid(), fd, ev); + return GRN_INVALID_ARGUMENT; + } + } +} + +#define LISTEN_BACKLOG 0x1000 + +grn_rc +grn_com_event_start_accept(grn_ctx *ctx, grn_com_event *ev) +{ + grn_com *com = ev->acceptor; + + if (com->accepting) {return ctx->rc;} + + GRN_API_ENTER; + if (!grn_com_event_mod(ctx, ev, com->fd, GRN_COM_POLLIN, NULL)) { + if (listen(com->fd, LISTEN_BACKLOG) == 0) { + com->accepting = GRN_TRUE; + } else { + SOERR("listen - start accept"); + } + } + GRN_API_RETURN(ctx->rc); +} + +grn_rc +grn_com_event_stop_accept(grn_ctx *ctx, grn_com_event *ev) +{ + grn_com *com = ev->acceptor; + + if (!com->accepting) {return ctx->rc;} + + GRN_API_ENTER; + if (!grn_com_event_mod(ctx, ev, com->fd, 0, NULL)) { + if (listen(com->fd, 0) == 0) { + com->accepting = GRN_FALSE; + } else { + SOERR("listen - disable accept"); + } + } + GRN_API_RETURN(ctx->rc); +} + +static void +grn_com_receiver(grn_ctx *ctx, grn_com *com) +{ + grn_com_event *ev = com->ev; + ERRCLR(ctx); + if (ev->acceptor == com) { + grn_com *ncs; + grn_sock fd = accept(com->fd, NULL, NULL); + if (fd == -1) { + if (errno == EMFILE) { + grn_com_event_stop_accept(ctx, ev); + } else { + SOERR("accept"); + } + return; + } + if (grn_com_event_add(ctx, ev, fd, GRN_COM_POLLIN, (grn_com **)&ncs)) { + grn_sock_close(fd); + return; + } + ncs->has_sid = 0; + ncs->closed = 0; + ncs->opaque = NULL; + GRN_COM_QUEUE_INIT(&ncs->new_); + // GRN_LOG(ctx, GRN_LOG_NOTICE, "accepted (%d)", fd); + return; + } else { + grn_msg *msg = (grn_msg *)grn_msg_open(ctx, com, &ev->recv_old); + grn_com_recv(ctx, msg->u.peer, &msg->header, (grn_obj *)msg); + if (msg->u.peer /* is_edge_request(msg)*/) { + grn_memcpy(&msg->edge_id, &ev->curr_edge_id, sizeof(grn_com_addr)); + if (!com->has_sid) { + com->has_sid = 1; + com->sid = ev->curr_edge_id.sid++; + } + msg->edge_id.sid = com->sid; + } + msg->acceptor = ev->acceptor; + ev->msg_handler(ctx, (grn_obj *)msg); + } +} + +grn_rc +grn_com_event_poll(grn_ctx *ctx, grn_com_event *ev, int timeout) +{ + int nevents; + grn_com *com; +#ifdef USE_SELECT + uint32_t dummy; + grn_sock *pfd; + int nfds = 0; + fd_set rfds; + fd_set wfds; + struct timeval tv; + if (timeout >= 0) { + tv.tv_sec = timeout / 1000; + tv.tv_usec = (timeout % 1000) * 1000; + } + FD_ZERO(&rfds); + FD_ZERO(&wfds); + ctx->errlvl = GRN_OK; + ctx->rc = GRN_SUCCESS; + { + grn_hash_cursor *cursor; + cursor = grn_hash_cursor_open(ctx, ev->hash, NULL, 0, NULL, 0, 0, -1, 0); + if (cursor) { + grn_id id; + while ((id = grn_hash_cursor_next(ctx, cursor))) { + grn_hash_cursor_get_key_value(ctx, + cursor, + (void **)(&pfd), + &dummy, + (void **)(&com)); + if (com->events & GRN_COM_POLLIN) { FD_SET(*pfd, &rfds); } + if (com->events & GRN_COM_POLLOUT) { FD_SET(*pfd, &wfds); } +# ifndef WIN32 + if (*pfd > nfds) { nfds = *pfd; } +# endif /* WIN32 */ + } + grn_hash_cursor_close(ctx, cursor); + } + } + nevents = select(nfds + 1, &rfds, &wfds, NULL, (timeout >= 0) ? &tv : NULL); + if (nevents < 0) { + SOERR("select"); + if (ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) { ERRCLR(ctx); } + return ctx->rc; + } + if (timeout < 0 && !nevents) { GRN_LOG(ctx, GRN_LOG_NOTICE, "select returns 0 events"); } + GRN_HASH_EACH(ctx, ev->hash, eh, &pfd, &dummy, &com, { + if (FD_ISSET(*pfd, &rfds)) { grn_com_receiver(ctx, com); } + }); +#else /* USE_SELECT */ +# ifdef USE_EPOLL + struct epoll_event *ep; + ctx->errlvl = GRN_OK; + ctx->rc = GRN_SUCCESS; + nevents = epoll_wait(ev->epfd, ev->events, ev->max_nevents, timeout); + if (nevents < 0) { + SERR("epoll_wait"); + } +# else /* USE_EPOLL */ +# ifdef USE_KQUEUE + struct kevent *ep; + struct timespec tv; + if (timeout >= 0) { + tv.tv_sec = timeout / 1000; + tv.tv_nsec = (timeout % 1000) * 1000; + } + nevents = kevent(ev->kqfd, NULL, 0, ev->events, ev->max_nevents, &tv); + if (nevents < 0) { + SERR("kevent"); + } +# else /* USE_KQUEUE */ + uint32_t dummy; + int nfd = 0, *pfd; + struct pollfd *ep = ev->events; + ctx->errlvl = GRN_OK; + ctx->rc = GRN_SUCCESS; + GRN_HASH_EACH(ctx, ev->hash, eh, &pfd, &dummy, &com, { + ep->fd = *pfd; + // ep->events =(short) com->events; + ep->events = POLLIN; + ep->revents = 0; + ep++; + nfd++; + }); + nevents = poll(ev->events, nfd, timeout); + if (nevents < 0) { + SERR("poll"); + } +# endif /* USE_KQUEUE */ +# endif /* USE_EPOLL */ + if (ctx->rc != GRN_SUCCESS) { + if (ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) { + ERRCLR(ctx); + } + return ctx->rc; + } + if (timeout < 0 && !nevents) { GRN_LOG(ctx, GRN_LOG_NOTICE, "poll returns 0 events"); } + for (ep = ev->events; nevents; ep++) { + int efd; +# ifdef USE_EPOLL + efd = ep->data.fd; + nevents--; + // todo : com = ep->data.ptr; + if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) { + struct epoll_event e; + GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->hash", efd); + memset(&e, 0, sizeof(struct epoll_event)); + e.data.fd = efd; + e.events = ep->events; + if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, efd, &e) == -1) { SERR("epoll_ctl"); } + if (grn_sock_close(efd) == -1) { SOERR("close"); } + continue; + } + if (ep->events & GRN_COM_POLLIN) { grn_com_receiver(ctx, com); } +# else /* USE_EPOLL */ +# ifdef USE_KQUEUE + efd = ep->ident; + nevents--; + // todo : com = ep->udata; + if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) { + struct kevent e; + GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->set", efd); + EV_SET(&e, efd, ep->filter, EV_DELETE, 0, 0, NULL); + if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) { SERR("kevent"); } + if (grn_sock_close(efd) == -1) { SOERR("close"); } + continue; + } + if (ep->filter == GRN_COM_POLLIN) { grn_com_receiver(ctx, com); } +# else + efd = ep->fd; + if (!(ep->events & ep->revents)) { continue; } + nevents--; + if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) { + GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->hash", efd); + if (grn_sock_close(efd) == -1) { SOERR("close"); } + continue; + } + if (ep->revents & GRN_COM_POLLIN) { grn_com_receiver(ctx, com); } +# endif /* USE_KQUEUE */ +# endif /* USE_EPOLL */ + } +#endif /* USE_SELECT */ + /* todo : + while (!(msg = (grn_com_msg *)grn_com_queue_deque(&recv_old))) { + grn_msg_close(ctx, msg); + } + */ + return GRN_SUCCESS; +} + +grn_rc +grn_com_send_http(grn_ctx *ctx, grn_com *cs, const char *path, uint32_t path_len, int flags) +{ + ssize_t ret; + grn_obj buf; + GRN_TEXT_INIT(&buf, 0); + GRN_TEXT_PUTS(ctx, &buf, "GET "); + grn_bulk_write(ctx, &buf, path, path_len); + GRN_TEXT_PUTS(ctx, &buf, " HTTP/1.0\r\n\r\n"); + // todo : refine + if ((ret = send(cs->fd, GRN_BULK_HEAD(&buf), GRN_BULK_VSIZE(&buf), MSG_NOSIGNAL|flags)) == -1) { + SOERR("send"); + } + if (ret != GRN_BULK_VSIZE(&buf)) { + GRN_LOG(ctx, GRN_LOG_NOTICE, "send %d != %d", (int)ret, (int)GRN_BULK_VSIZE(&buf)); + } + grn_obj_close(ctx, &buf); + return ctx->rc; +} + +grn_rc +grn_com_send(grn_ctx *ctx, grn_com *cs, + grn_com_header *header, const char *body, uint32_t size, int flags) +{ + grn_rc rc = GRN_SUCCESS; + size_t whole_size = sizeof(grn_com_header) + size; + ssize_t ret; + header->size = htonl(size); + GRN_LOG(ctx, GRN_LOG_INFO, "send (%d,%x,%d,%02x,%02x,%04x)", size, header->flags, header->proto, header->qtype, header->level, header->status); + + if (size) { +#ifdef WIN32 + WSABUF wsabufs[2]; + DWORD n_sent; + wsabufs[0].buf = (char *)header; + wsabufs[0].len = sizeof(grn_com_header); + wsabufs[1].buf = (char *)body; + wsabufs[1].len = size; + if (WSASend(cs->fd, wsabufs, 2, &n_sent, 0, NULL, NULL) == SOCKET_ERROR) { + SOERR("WSASend"); + } + ret = n_sent; +#else /* WIN32 */ + struct iovec msg_iov[2]; + struct msghdr msg; + memset(&msg, 0, sizeof(struct msghdr)); + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = msg_iov; + msg.msg_iovlen = 2; + msg_iov[0].iov_base = (char*) header; + msg_iov[0].iov_len = sizeof(grn_com_header); + msg_iov[1].iov_base = (char *)body; + msg_iov[1].iov_len = size; + if ((ret = sendmsg(cs->fd, &msg, MSG_NOSIGNAL|flags)) == -1) { + SOERR("sendmsg"); + rc = ctx->rc; + } +#endif /* WIN32 */ + } else { + if ((ret = send(cs->fd, (const void *)header, whole_size, MSG_NOSIGNAL|flags)) == -1) { + SOERR("send"); + rc = ctx->rc; + } + } + if ((size_t) ret != whole_size) { + GRN_LOG(ctx, GRN_LOG_ERROR, + "sendmsg(%" GRN_FMT_SOCKET "): %" GRN_FMT_LLD " < %" GRN_FMT_LLU, + cs->fd, (long long int)ret, (unsigned long long int)whole_size); + rc = ctx->rc; + } + return rc; +} + +#define RETRY_MAX 10 + +static const char * +scan_delimiter(const char *p, const char *e) +{ + while (p + 4 <= e) { + if (p[3] == '\n') { + if (p[2] == '\r') { + if (p[1] == '\n') { + if (p[0] == '\r') { return p + 4; } else { p += 2; } + } else { p += 2; } + } else { p += 4; } + } else { p += p[3] == '\r' ? 1 : 4; } + } + return NULL; +} + +#define BUFSIZE 4096 + +static grn_rc +grn_com_recv_text(grn_ctx *ctx, grn_com *com, + grn_com_header *header, grn_obj *buf, ssize_t ret) +{ + const char *p; + int retry = 0; + grn_bulk_write(ctx, buf, (char *)header, ret); + if ((p = scan_delimiter(GRN_BULK_HEAD(buf), GRN_BULK_CURR(buf)))) { + header->qtype = *GRN_BULK_HEAD(buf); + header->proto = GRN_COM_PROTO_HTTP; + header->size = GRN_BULK_VSIZE(buf); + goto exit; + } + for (;;) { + if (grn_bulk_reserve(ctx, buf, BUFSIZE)) { return ctx->rc; } + if ((ret = recv(com->fd, GRN_BULK_CURR(buf), BUFSIZE, 0)) < 0) { + SOERR("recv text"); + if (ctx->rc == GRN_OPERATION_WOULD_BLOCK || + ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) { + ERRCLR(ctx); + continue; + } + goto exit; + } + if (ret) { + off_t o = GRN_BULK_VSIZE(buf); + p = GRN_BULK_CURR(buf); + GRN_BULK_INCR_LEN(buf, ret); + if (scan_delimiter(p - (o > 3 ? 3 : o), p + ret)) { + break; + } + } else { + if (++retry > RETRY_MAX) { + // ERR(GRN_RETRY_MAX, "retry max in recv text"); + goto exit; + } + } + } + header->qtype = *GRN_BULK_HEAD(buf); + header->proto = GRN_COM_PROTO_HTTP; + header->size = GRN_BULK_VSIZE(buf); +exit : + if (header->qtype == 'H') { + //todo : refine + /* + GRN_BULK_REWIND(buf); + grn_bulk_reserve(ctx, buf, BUFSIZE); + if ((ret = recv(com->fd, GRN_BULK_CURR(buf), BUFSIZE, 0)) < 0) { + SOERR("recv text body"); + } else { + GRN_BULK_CURR(buf) += ret; + } + */ + } + return ctx->rc; +} + +grn_rc +grn_com_recv(grn_ctx *ctx, grn_com *com, grn_com_header *header, grn_obj *buf) +{ + ssize_t ret; + int retry = 0; + byte *p = (byte *)header; + size_t rest = sizeof(grn_com_header); + do { + if ((ret = recv(com->fd, p, rest, 0)) < 0) { + SOERR("recv size"); + GRN_LOG(ctx, GRN_LOG_ERROR, "recv error (%" GRN_FMT_SOCKET ")", com->fd); + if (ctx->rc == GRN_OPERATION_WOULD_BLOCK || + ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) { + ERRCLR(ctx); + continue; + } + goto exit; + } + if (ret) { + if (header->proto < 0x80) { + return grn_com_recv_text(ctx, com, header, buf, ret); + } + rest -= ret, p += ret; + } else { + if (++retry > RETRY_MAX) { + // ERR(GRN_RETRY_MAX, "retry max in recv header (%d)", com->fd); + goto exit; + } + } + } while (rest); + GRN_LOG(ctx, GRN_LOG_INFO, + "recv (%u,%x,%d,%02x,%02x,%04x)", + (uint32_t)ntohl(header->size), + header->flags, + header->proto, + header->qtype, + header->level, + header->status); + { + uint8_t proto = header->proto; + size_t value_size = ntohl(header->size); + GRN_BULK_REWIND(buf); + switch (proto) { + case GRN_COM_PROTO_GQTP : + case GRN_COM_PROTO_MBREQ : + if (GRN_BULK_WSIZE(buf) < value_size) { + if (grn_bulk_resize(ctx, buf, value_size)) { + goto exit; + } + } + retry = 0; + for (rest = value_size; rest;) { + if ((ret = recv(com->fd, GRN_BULK_CURR(buf), rest, MSG_WAITALL)) < 0) { + SOERR("recv body"); + if (ctx->rc == GRN_OPERATION_WOULD_BLOCK || + ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) { + ERRCLR(ctx); + continue; + } + goto exit; + } + if (ret) { + rest -= ret; + GRN_BULK_INCR_LEN(buf, ret); + } else { + if (++retry > RETRY_MAX) { + // ERR(GRN_RETRY_MAX, "retry max in recv body"); + goto exit; + } + } + } + break; + default : + GRN_LOG(ctx, GRN_LOG_ERROR, "illegal header: %d", proto); + ctx->rc = GRN_INVALID_FORMAT; + goto exit; + } + } +exit : + return ctx->rc; +} + +grn_com * +grn_com_copen(grn_ctx *ctx, grn_com_event *ev, const char *dest, int port) +{ + grn_sock fd = -1; + grn_com *cs = NULL; + + struct addrinfo hints, *addrinfo_list, *addrinfo_ptr; + char port_string[16]; + int getaddrinfo_result; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; +#ifdef AI_NUMERICSERV + hints.ai_flags = AI_NUMERICSERV; +#endif + grn_snprintf(port_string, sizeof(port_string), sizeof(port_string), + "%d", port); + + getaddrinfo_result = getaddrinfo(dest, port_string, &hints, &addrinfo_list); + if (getaddrinfo_result != 0) { + switch (getaddrinfo_result) { +#ifdef EAI_MEMORY + case EAI_MEMORY: + ERR(GRN_NO_MEMORY_AVAILABLE, "getaddrinfo: <%s:%s>: %s", + dest, port_string, gai_strerror(getaddrinfo_result)); + break; +#endif +#ifdef EAI_SYSTEM + case EAI_SYSTEM: + SOERR("getaddrinfo"); + break; +#endif + default: + ERR(GRN_INVALID_ARGUMENT, "getaddrinfo: <%s:%s>: %s", + dest, port_string, gai_strerror(getaddrinfo_result)); + break; + } + return NULL; + } + + for (addrinfo_ptr = addrinfo_list; addrinfo_ptr; + addrinfo_ptr = addrinfo_ptr->ai_next) { + fd = socket(addrinfo_ptr->ai_family, addrinfo_ptr->ai_socktype, + addrinfo_ptr->ai_protocol); + if (fd == -1) { + SOERR("socket"); + continue; + } +#ifdef TCP_NODELAY + { + static const int value = 1; + if (setsockopt(fd, 6, TCP_NODELAY, + (const char *)&value, sizeof(value)) != 0) { + SOERR("setsockopt"); + grn_sock_close(fd); + continue; + } + } +#endif + if (connect(fd, addrinfo_ptr->ai_addr, addrinfo_ptr->ai_addrlen) != 0) { + SOERR("connect"); + grn_sock_close(fd); + continue; + } + + break; + } + + freeaddrinfo(addrinfo_list); + + if (!addrinfo_ptr) { + return NULL; + } + ctx->errlvl = GRN_OK; + ctx->rc = GRN_SUCCESS; + + if (ev) { + grn_com_event_add(ctx, ev, fd, GRN_COM_POLLIN, &cs); + } else { + cs = GRN_CALLOC(sizeof(grn_com)); + if (cs) { + cs->fd = fd; + } + } + if (!cs) { + grn_sock_close(fd); + } + return cs; +} + +void +grn_com_close_(grn_ctx *ctx, grn_com *com) +{ + grn_sock fd = com->fd; + if (shutdown(fd, SHUT_RDWR) == -1) { /* SOERR("shutdown"); */ } + if (grn_sock_close(fd) == -1) { + SOERR("close"); + } else { + com->closed = 1; + } +} + +grn_rc +grn_com_close(grn_ctx *ctx, grn_com *com) +{ + grn_sock fd = com->fd; + grn_com_event *ev = com->ev; + if (ev) { + grn_com *acceptor = ev->acceptor; + grn_com_event_del(ctx, ev, fd); + if (acceptor) { grn_com_event_start_accept(ctx, ev); } + } + if (!com->closed) { grn_com_close_(ctx, com); } + if (!ev) { GRN_FREE(com); } + return GRN_SUCCESS; +} + +grn_rc +grn_com_sopen(grn_ctx *ctx, grn_com_event *ev, + const char *bind_address, int port, grn_msg_handler *func, + struct hostent *he) +{ + grn_sock lfd = -1; + grn_com *cs = NULL; + int getaddrinfo_result; + struct addrinfo *bind_address_info = NULL; + struct addrinfo hints; + char port_string[6]; /* ceil(log10(65535)) + 1 ('\0')*/ + + GRN_API_ENTER; + if (!bind_address) { + bind_address = "0.0.0.0"; + } + grn_snprintf(port_string, sizeof(port_string), sizeof(port_string), + "%d", port); + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; +#ifdef AI_NUMERICSERV + hints.ai_flags = AI_NUMERICSERV; +#endif + getaddrinfo_result = getaddrinfo(bind_address, port_string, + &hints, &bind_address_info); + if (getaddrinfo_result != 0) { + switch (getaddrinfo_result) { +#ifdef EAI_MEMORY + case EAI_MEMORY: + ERR(GRN_NO_MEMORY_AVAILABLE, + "getaddrinfo: <%s:%s>: %s", + bind_address, port_string, gai_strerror(getaddrinfo_result)); + break; +#endif +#ifdef EAI_SYSTEM + case EAI_SYSTEM: + SOERR("getaddrinfo"); + break; +#endif + default: + ERR(GRN_INVALID_ARGUMENT, + "getaddrinfo: <%s:%s>: %s", + bind_address, port_string, gai_strerror(getaddrinfo_result)); + break; + } + goto exit; + } + if ((lfd = socket(bind_address_info->ai_family, SOCK_STREAM, 0)) == -1) { + SOERR("socket"); + goto exit; + } + grn_memcpy(&ev->curr_edge_id.addr, he->h_addr, he->h_length); + ev->curr_edge_id.port = htons(port); + ev->curr_edge_id.sid = 0; + { + int v = 1; +#ifdef TCP_NODELAY + if (setsockopt(lfd, SOL_TCP, TCP_NODELAY, (void *) &v, sizeof(int)) == -1) { + SOERR("setsockopt"); + goto exit; + } +#endif + if (setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, (void *) &v, sizeof(int)) == -1) { + SOERR("setsockopt"); + goto exit; + } + } + if (bind(lfd, bind_address_info->ai_addr, bind_address_info->ai_addrlen) < 0) { + SOERR("bind"); + goto exit; + } + if (listen(lfd, LISTEN_BACKLOG) < 0) { + SOERR("listen"); + goto exit; + } + if (ev) { + if (grn_com_event_add(ctx, ev, lfd, GRN_COM_POLLIN, &cs)) { goto exit; } + ev->acceptor = cs; + ev->msg_handler = func; + cs->has_sid = 0; + cs->closed = 0; + cs->opaque = NULL; + GRN_COM_QUEUE_INIT(&cs->new_); + } else { + if (!(cs = GRN_MALLOC(sizeof(grn_com)))) { goto exit; } + cs->fd = lfd; + } + cs->accepting = GRN_TRUE; +exit : + if (!cs && lfd != 1) { grn_sock_close(lfd); } + if (bind_address_info) { freeaddrinfo(bind_address_info); } + GRN_API_RETURN(ctx->rc); +} + + +grn_hash *grn_edges = NULL; +void (*grn_dispatcher)(grn_ctx *ctx, grn_edge *edge); + +void +grn_edges_init(grn_ctx *ctx, void (*dispatcher)(grn_ctx *ctx, grn_edge *edge)) +{ + grn_edges = grn_hash_create(ctx, NULL, sizeof(grn_com_addr), sizeof(grn_edge), 0); + grn_dispatcher = dispatcher; +} + +void +grn_edges_fin(grn_ctx *ctx) +{ + grn_hash_close(ctx, grn_edges); +} + +grn_edge * +grn_edges_add(grn_ctx *ctx, grn_com_addr *addr, int *added) +{ + if (grn_io_lock(ctx, grn_edges->io, grn_lock_timeout)) { + return NULL; + } else { + grn_edge *edge; + grn_id id = grn_hash_add(ctx, grn_edges, addr, sizeof(grn_com_addr), + (void **)&edge, added); + grn_io_unlock(grn_edges->io); + if (id) { edge->id = id; } + return edge; + } +} + +void +grn_edges_delete(grn_ctx *ctx, grn_edge *edge) +{ + if (!grn_io_lock(ctx, grn_edges->io, grn_lock_timeout)) { + grn_hash_delete_by_id(ctx, grn_edges, edge->id, NULL); + grn_io_unlock(grn_edges->io); + } +} + +grn_edge * +grn_edges_add_communicator(grn_ctx *ctx, grn_com_addr *addr) +{ + int added; + grn_edge *edge = grn_edges_add(ctx, addr, &added); + if (added) { + grn_ctx_init(&edge->ctx, 0); + GRN_COM_QUEUE_INIT(&edge->recv_new); + GRN_COM_QUEUE_INIT(&edge->send_old); + edge->com = NULL; + edge->stat = 0 /*EDGE_IDLE*/; + edge->flags = GRN_EDGE_COMMUNICATOR; + } + return edge; +} + +void +grn_edge_dispatch(grn_ctx *ctx, grn_edge *edge, grn_obj *msg) +{ + grn_com_queue_enque(ctx, &edge->recv_new, (grn_com_queue_entry *)msg); + grn_dispatcher(ctx, edge); +} |