summaryrefslogtreecommitdiffstats
path: root/poll/unix/z_asio.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 09:11:38 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 09:11:38 +0000
commitbc7e963c37d9c8d1c854ac960df241cfa34e3dc5 (patch)
treeaa35d7414ce9f1326abf6f723f6dfa5b0aa08b1d /poll/unix/z_asio.c
parentInitial commit. (diff)
downloadapr-bc7e963c37d9c8d1c854ac960df241cfa34e3dc5.tar.xz
apr-bc7e963c37d9c8d1c854ac960df241cfa34e3dc5.zip
Adding upstream version 1.7.2.upstream/1.7.2upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--poll/unix/z_asio.c782
1 files changed, 782 insertions, 0 deletions
diff --git a/poll/unix/z_asio.c b/poll/unix/z_asio.c
new file mode 100644
index 0000000..48b531c
--- /dev/null
+++ b/poll/unix/z_asio.c
@@ -0,0 +1,782 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ ******************************************************************************
+ *
+ * This implementation is based on a design by John Brooks (IBM Pok) which uses
+ * the z/OS sockets async i/o facility. When a
+ * socket is added to the pollset, an async poll is issued for that individual
+ * socket. It specifies that the kernel should send an IPC message when the
+ * socket becomes ready. The IPC messages are sent to a single message queue
+ * that is part of the pollset. apr_pollset_poll waits on the arrival of IPC
+ * messages or the specified timeout.
+ *
+ * Since z/OS does not support async i/o for pipes or files at present, this
+ * implementation falls back to using ordinary poll() when
+ * APR_POLLSET_THREADSAFE is unset.
+ *
+ * Greg Ames
+ * April 2012
+ */
+
+#include "apr.h"
+#include "apr_hash.h"
+#include "apr_poll.h"
+#include "apr_time.h"
+#include "apr_portable.h"
+#include "apr_arch_inherit.h"
+#include "apr_arch_file_io.h"
+#include "apr_arch_networkio.h"
+#include "apr_arch_poll_private.h"
+
+#ifdef HAVE_AIO_MSGQ
+
+#include <sys/msg.h> /* msgget etc */
+#include <time.h> /* timestruct */
+#include <poll.h> /* pollfd */
+#include <limits.h> /* MAX_INT */
+
+struct apr_pollset_private_t
+{
+ int msg_q; /* IPC message queue. The z/OS kernel sends messages
+ * to this queue when our async polls on individual
+ * file descriptors complete
+ */
+ apr_pollfd_t *result_set;
+ apr_uint32_t size;
+
+#if APR_HAS_THREADS
+ /* A thread mutex to protect operations on the rings and the hash */
+ apr_thread_mutex_t *ring_lock;
+#endif
+
+ /* A hash of all active elements used for O(1) _remove operations */
+ apr_hash_t *elems;
+
+ APR_RING_HEAD(ready_ring_t, asio_elem_t) ready_ring;
+ APR_RING_HEAD(prior_ready_ring_t, asio_elem_t) prior_ready_ring;
+ APR_RING_HEAD(free_ring_t, asio_elem_t) free_ring;
+
+ /* for pipes etc with no asio */
+ struct pollfd *pollset;
+ apr_pollfd_t *query_set;
+};
+
+typedef enum {
+ ASIO_INIT = 0,
+ ASIO_REMOVED,
+ ASIO_COMPLETE
+} asio_state_e;
+
+typedef struct asio_elem_t asio_elem_t;
+
+struct asio_msgbuf_t {
+ long msg_type; /* must be > 0 */
+ asio_elem_t *msg_elem;
+};
+
+struct asio_elem_t
+{
+ APR_RING_ENTRY(asio_elem_t) link;
+ apr_pollfd_t pfd;
+ struct pollfd os_pfd;
+ struct aiocb a;
+ asio_state_e state;
+ struct asio_msgbuf_t msg;
+};
+
+#define DEBUG 0
+
+/* DEBUG settings: 0 - no debug messages at all,
+ * 1 - should not occur messages,
+ * 2 - apr_pollset_* entry and exit messages,
+ * 3 - state changes, memory usage,
+ * 4 - z/OS, APR, and internal calls,
+ * 5 - everything else except the timer pop path,
+ * 6 - everything, including the Event 1 sec timer pop path
+ *
+ * each DEBUG level includes all messages produced by lower numbered levels
+ */
+
+#if DEBUG
+
+#include <assert.h>
+#include <unistd.h> /* getpid */
+
+#define DBG_BUFF char dbg_msg_buff[256];
+
+#define DBG_TEST(lvl) if (lvl <= DEBUG) {
+
+#define DBG_CORE(msg) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
+ " " msg, getpid()), \
+ fprintf(stderr, "%s", dbg_msg_buff);
+#define DBG_CORE1(msg, var1) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
+ " " msg, getpid(), var1), \
+ fprintf(stderr, "%s", dbg_msg_buff);
+#define DBG_CORE2(msg, var1, var2) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
+ " " msg, getpid(), var1, var2), \
+ fprintf(stderr, "%s", dbg_msg_buff);
+#define DBG_CORE3(msg, var1, var2, var3) \
+ sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
+ " " msg, getpid(), var1, var2, var3), \
+ fprintf(stderr, "%s", dbg_msg_buff);
+#define DBG_CORE4(msg, var1, var2, var3, var4) \
+ sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
+ " " msg, getpid(), var1, var2, var3, var4),\
+ fprintf(stderr, "%s", dbg_msg_buff);
+
+#define DBG_END }
+
+#define DBG(lvl, msg) DBG_TEST(lvl) \
+ DBG_CORE(msg) \
+ DBG_END
+
+#define DBG1(lvl, msg, var1) DBG_TEST(lvl) \
+ DBG_CORE1(msg, var1) \
+ DBG_END
+
+#define DBG2(lvl, msg, var1, var2) DBG_TEST(lvl) \
+ DBG_CORE2(msg, var1, var2) \
+ DBG_END
+
+#define DBG3(lvl, msg, var1, var2, var3) \
+ DBG_TEST(lvl) \
+ DBG_CORE3(msg, var1, var2, var3) \
+ DBG_END
+
+#define DBG4(lvl, msg, var1, var2, var3, var4) \
+ DBG_TEST(lvl) \
+ DBG_CORE4(msg, var1, var2, var3, var4) \
+ DBG_END
+
+#else /* DEBUG is 0 */
+#define DBG_BUFF
+#define DBG(lvl, msg) ((void)0)
+#define DBG1(lvl, msg, var1) ((void)0)
+#define DBG2(lvl, msg, var1, var2) ((void)0)
+#define DBG3(lvl, msg, var1, var2, var3) ((void)0)
+#define DBG4(lvl, msg, var1, var2, var3, var4) ((void)0)
+
+#endif /* DEBUG */
+
+static int asyncio(struct aiocb *a)
+{
+ DBG_BUFF
+ int rv;
+
+#ifdef _LP64
+#define AIO BPX4AIO
+#else
+#define AIO BPX1AIO
+#endif
+
+ AIO(sizeof(struct aiocb), a, &rv, &errno, __err2ad());
+ DBG2(4, "BPX4AIO aiocb %p rv %d\n",
+ a, rv);
+#ifdef DEBUG
+ if (rv < 0) {
+ DBG2(4, "errno %d errnojr %08x\n",
+ errno, *__err2ad());
+ }
+#endif
+ return rv;
+}
+
+static apr_int16_t get_event(apr_int16_t event)
+{
+ DBG_BUFF
+ apr_int16_t rv = 0;
+ DBG(4, "entered\n");
+
+ if (event & APR_POLLIN)
+ rv |= POLLIN;
+ if (event & APR_POLLPRI)
+ rv |= POLLPRI;
+ if (event & APR_POLLOUT)
+ rv |= POLLOUT;
+ if (event & APR_POLLERR)
+ rv |= POLLERR;
+ if (event & APR_POLLHUP)
+ rv |= POLLHUP;
+ if (event & APR_POLLNVAL)
+ rv |= POLLNVAL;
+
+ DBG(4, "exiting\n");
+ return rv;
+}
+
+static apr_int16_t get_revent(apr_int16_t event)
+{
+ DBG_BUFF
+ apr_int16_t rv = 0;
+ DBG(4, "entered\n");
+
+ if (event & POLLIN)
+ rv |= APR_POLLIN;
+ if (event & POLLPRI)
+ rv |= APR_POLLPRI;
+ if (event & POLLOUT)
+ rv |= APR_POLLOUT;
+ if (event & POLLERR)
+ rv |= APR_POLLERR;
+ if (event & POLLHUP)
+ rv |= APR_POLLHUP;
+ if (event & POLLNVAL)
+ rv |= APR_POLLNVAL;
+
+ DBG(4, "exiting\n");
+ return rv;
+}
+
+static apr_status_t asio_pollset_cleanup(apr_pollset_t *pollset)
+{
+ DBG_BUFF
+ int rv;
+
+ DBG(4, "entered\n");
+ if (pollset->flags & APR_POLLSET_THREADSAFE) {
+ rv = msgctl(pollset->p->msg_q, IPC_RMID, NULL);
+ DBG1(4, "asio_pollset_cleanup: msgctl(IPC_RMID) returned %d\n", rv);
+ }
+
+ return rv;
+}
+
+static apr_status_t asio_pollset_create(apr_pollset_t *pollset,
+ apr_uint32_t size,
+ apr_pool_t *p,
+ apr_uint32_t flags)
+{
+ DBG_BUFF
+ apr_status_t rv;
+ apr_pollset_private_t *priv;
+
+ DBG1(2, "entered, flags: %x\n", flags);
+
+ priv = pollset->p = apr_pcalloc(p, sizeof(*priv));
+
+ if (flags & APR_POLLSET_THREADSAFE) {
+#if APR_HAS_THREADS
+ if ((rv = apr_thread_mutex_create(&(priv->ring_lock),
+ APR_THREAD_MUTEX_DEFAULT,
+ p)) != APR_SUCCESS) {
+ DBG1(1, "apr_thread_mutex_create returned %d\n", rv);
+ pollset->p = NULL;
+ return rv;
+ }
+ rv = msgget(IPC_PRIVATE, S_IWUSR+S_IRUSR); /* user r/w perms */
+ if (rv < 0) {
+#if DEBUG
+ perror(__FUNCTION__ " msgget returned < 0 ");
+#endif
+ pollset->p = NULL;
+ return rv;
+ }
+
+ DBG2(4, "pollset %p msgget was OK, rv=%d\n", pollset, rv);
+ priv->msg_q = rv;
+ priv->elems = apr_hash_make(p);
+
+ APR_RING_INIT(&priv->free_ring, asio_elem_t, link);
+ APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link);
+
+#else /* APR doesn't have threads but caller wants a threadsafe pollset */
+ pollset->p = NULL;
+ return APR_ENOTIMPL;
+#endif
+
+ } else { /* APR_POLLSET_THREADSAFE not set, i.e. no async i/o,
+ * init fields only needed in old style pollset
+ */
+
+ priv->pollset = apr_palloc(p, size * sizeof(struct pollfd));
+ priv->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
+
+ if ((!priv->pollset) || (!priv->query_set)) {
+ pollset->p = NULL;
+ return APR_ENOMEM;
+ }
+ }
+
+ pollset->nelts = 0;
+ pollset->flags = flags;
+ pollset->pool = p;
+ priv->size = size;
+ priv->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
+ if (!priv->result_set) {
+ if (flags & APR_POLLSET_THREADSAFE) {
+ msgctl(priv->msg_q, IPC_RMID, NULL);
+ }
+ pollset->p = NULL;
+ return APR_ENOMEM;
+ }
+
+ DBG2(2, "exiting, pollset: %p, type: %s\n",
+ pollset,
+ flags & APR_POLLSET_THREADSAFE ? "async" : "POSIX");
+
+
+ return APR_SUCCESS;
+
+} /* end of asio_pollset_create */
+
+static apr_status_t posix_add(apr_pollset_t *pollset,
+ const apr_pollfd_t *descriptor)
+{
+ DBG_BUFF
+ int fd;
+ apr_pool_t *p = pollset->pool;
+ apr_pollset_private_t *priv = pollset->p;
+
+ DBG(4, "entered\n");
+
+ if (pollset->nelts == priv->size) {
+ return APR_ENOMEM;
+ }
+
+ priv->query_set[pollset->nelts] = *descriptor;
+ if (descriptor->desc_type == APR_POLL_SOCKET) {
+ fd = descriptor->desc.s->socketdes;
+ }
+ else {
+ fd = descriptor->desc.f->filedes;
+ }
+
+ priv->pollset[pollset->nelts].fd = fd;
+
+ priv->pollset[pollset->nelts].events =
+ get_event(descriptor->reqevents);
+
+ pollset->nelts++;
+
+ DBG2(4, "exiting, fd %d added to pollset %p\n", fd, pollset);
+
+ return APR_SUCCESS;
+} /* end of posix_add */
+
+
+static apr_status_t asio_pollset_add(apr_pollset_t *pollset,
+ const apr_pollfd_t *descriptor)
+{
+ DBG_BUFF
+ asio_elem_t *elem;
+ apr_status_t rv = APR_SUCCESS;
+ apr_pollset_private_t *priv = pollset->p;
+
+ pollset_lock_rings();
+ DBG(2, "entered\n");
+
+ if (pollset->flags & APR_POLLSET_THREADSAFE) {
+
+ if (!APR_RING_EMPTY(&(priv->free_ring), asio_elem_t, link)) {
+ elem = APR_RING_FIRST(&(priv->free_ring));
+ APR_RING_REMOVE(elem, link);
+ DBG1(3, "used recycled memory at %08p\n", elem);
+ elem->state = ASIO_INIT;
+ elem->a.aio_cflags = 0;
+ }
+ else {
+ elem = (asio_elem_t *) apr_pcalloc(pollset->pool, sizeof(asio_elem_t));
+ DBG1(3, "alloced new memory at %08p\n", elem);
+
+ elem->a.aio_notifytype = AIO_MSGQ;
+ elem->a.aio_msgev_qid = priv->msg_q;
+ DBG1(5, "aio_msgev_quid = %d \n", elem->a.aio_msgev_qid);
+ elem->a.aio_msgev_size = sizeof(asio_elem_t *);
+ elem->a.aio_msgev_flag = 0; /* wait if queue is full */
+ elem->a.aio_msgev_addr = &(elem->msg);
+ elem->a.aio_buf = &(elem->os_pfd);
+ elem->a.aio_nbytes = 1; /* number of pfds to poll */
+ elem->msg.msg_type = 1;
+ elem->msg.msg_elem = elem;
+ }
+
+ /* z/OS only supports async I/O for sockets for now */
+ elem->os_pfd.fd = descriptor->desc.s->socketdes;
+
+ APR_RING_ELEM_INIT(elem, link);
+ elem->a.aio_cmd = AIO_SELPOLL;
+ elem->a.aio_cflags &= ~AIO_OK2COMPIMD; /* not OK to complete inline*/
+ elem->pfd = *descriptor;
+ elem->os_pfd.events = get_event(descriptor->reqevents);
+
+ if (0 != asyncio(&elem->a)) {
+ rv = errno;
+ DBG3(4, "pollset %p asio failed fd %d, errno %p\n",
+ pollset, elem->os_pfd.fd, rv);
+#if DEBUG
+ perror(__FUNCTION__ " asio failure");
+#endif
+ }
+ else {
+ DBG2(4, "good asio call, adding fd %d to pollset %p\n",
+ elem->os_pfd.fd, pollset);
+
+ pollset->nelts++;
+ apr_hash_set(priv->elems, &(elem->os_pfd.fd), sizeof(int), elem);
+ }
+ }
+ else {
+ /* APR_POLLSET_THREADSAFE isn't set. use POSIX poll in case
+ * pipes or files are used with this pollset
+ */
+
+ rv = posix_add(pollset, descriptor);
+ }
+
+ DBG1(2, "exiting, rv = %d\n", rv);
+
+ pollset_unlock_rings();
+ return rv;
+} /* end of asio_pollset_add */
+
+static posix_remove(apr_pollset_t *pollset, const apr_pollfd_t *descriptor)
+{
+ DBG_BUFF
+ apr_uint32_t i;
+ apr_pollset_private_t *priv = pollset->p;
+
+ DBG(4, "entered\n");
+ for (i = 0; i < pollset->nelts; i++) {
+ if (descriptor->desc.s == priv->query_set[i].desc.s) {
+ /* Found an instance of the fd: remove this and any other copies */
+ apr_uint32_t dst = i;
+ apr_uint32_t old_nelts = pollset->nelts;
+ pollset->nelts--;
+ for (i++; i < old_nelts; i++) {
+ if (descriptor->desc.s == priv->query_set[i].desc.s) {
+ pollset->nelts--;
+ }
+ else {
+ priv->pollset[dst] = priv->pollset[i];
+ priv->query_set[dst] = priv->query_set[i];
+ dst++;
+ }
+ }
+ DBG(4, "returning OK\n");
+ return APR_SUCCESS;
+ }
+ }
+
+ DBG(1, "returning APR_NOTFOUND\n");
+ return APR_NOTFOUND;
+
+} /* end of posix_remove */
+
+static apr_status_t asio_pollset_remove(apr_pollset_t *pollset,
+ const apr_pollfd_t *descriptor)
+{
+ DBG_BUFF
+ asio_elem_t *elem;
+ apr_status_t rv = APR_SUCCESS;
+ apr_pollset_private_t *priv = pollset->p;
+ /* AIO_CANCEL is synchronous, so autodata works fine. */
+ struct aiocb cancel_a = {0};
+
+ int fd;
+
+ DBG(2, "entered\n");
+
+ if (!(pollset->flags & APR_POLLSET_THREADSAFE)) {
+ return posix_remove(pollset, descriptor);
+ }
+
+ pollset_lock_rings();
+
+#if DEBUG
+ assert(descriptor->desc_type == APR_POLL_SOCKET);
+#endif
+ /* zOS 1.12 doesn't support files for async i/o */
+ fd = descriptor->desc.s->socketdes;
+
+ elem = apr_hash_get(priv->elems, &(fd), sizeof(int));
+ if (elem == NULL) {
+ DBG1(1, "couldn't find fd %d\n", fd);
+ rv = APR_NOTFOUND;
+ } else {
+ DBG1(5, "hash found fd %d\n", fd);
+ /* delete this fd from the hash */
+ apr_hash_set(priv->elems, &(fd), sizeof(int), NULL);
+
+ if (elem->state == ASIO_INIT) {
+ /* asyncio call to cancel */
+ cancel_a.aio_cmd = AIO_CANCEL;
+ cancel_a.aio_buf = &elem->a; /* point to original aiocb */
+
+ cancel_a.aio_cflags = 0;
+ cancel_a.aio_cflags2 = 0;
+
+ /* we want the original aiocb to show up on the pollset message queue
+ * before recycling its memory to eliminate race conditions
+ */
+
+ rv = asyncio(&cancel_a);
+ DBG1(4, "asyncio returned %d\n", rv);
+
+#if DEBUG
+ assert(rv == 1);
+#endif
+ }
+ elem->state = ASIO_REMOVED;
+ rv = APR_SUCCESS;
+ }
+
+ DBG1(2, "exiting, rv: %d\n", rv);
+
+ pollset_unlock_rings();
+
+ return rv;
+} /* end of asio_pollset_remove */
+
+static posix_poll(apr_pollset_t *pollset,
+ apr_interval_time_t timeout,
+ apr_int32_t *num,
+ const apr_pollfd_t **descriptors)
+{
+ DBG_BUFF
+ int rv;
+ apr_uint32_t i, j;
+ apr_pollset_private_t *priv = pollset->p;
+
+ DBG(4, "entered\n");
+
+ if (timeout > 0) {
+ timeout /= 1000;
+ }
+ rv = poll(priv->pollset, pollset->nelts, timeout);
+ (*num) = rv;
+ if (rv < 0) {
+ return apr_get_netos_error();
+ }
+ if (rv == 0) {
+ return APR_TIMEUP;
+ }
+ j = 0;
+ for (i = 0; i < pollset->nelts; i++) {
+ if (priv->pollset[i].revents != 0) {
+ priv->result_set[j] = priv->query_set[i];
+ priv->result_set[j].rtnevents =
+ get_revent(priv->pollset[i].revents);
+ j++;
+ }
+ }
+ if (descriptors)
+ *descriptors = priv->result_set;
+
+ DBG(4, "exiting ok\n");
+ return APR_SUCCESS;
+
+} /* end of posix_poll */
+
+static process_msg(apr_pollset_t *pollset, struct asio_msgbuf_t *msg)
+{
+ DBG_BUFF
+ asio_elem_t *elem = msg->msg_elem;
+
+ switch(elem->state) {
+ case ASIO_REMOVED:
+ DBG2(5, "for cancelled elem, recycling memory - elem %08p, fd %d\n",
+ elem, elem->os_pfd.fd);
+ APR_RING_INSERT_TAIL(&(pollset->p->free_ring), elem,
+ asio_elem_t, link);
+ break;
+ case ASIO_INIT:
+ DBG2(4, "adding to ready ring: elem %08p, fd %d\n",
+ elem, elem->os_pfd.fd);
+ elem->state = ASIO_COMPLETE;
+ APR_RING_INSERT_TAIL(&(pollset->p->ready_ring), elem,
+ asio_elem_t, link);
+ break;
+ default:
+ DBG3(1, "unexpected state: elem %08p, fd %d, state %d\n",
+ elem, elem->os_pfd.fd, elem->state);
+#if DEBUG
+ assert(0);
+#endif
+ }
+}
+
+static apr_status_t asio_pollset_poll(apr_pollset_t *pollset,
+ apr_interval_time_t timeout,
+ apr_int32_t *num,
+ const apr_pollfd_t **descriptors)
+{
+ DBG_BUFF
+ int i, ret;
+ asio_elem_t *elem, *next_elem;
+ struct asio_msgbuf_t msg_buff;
+ struct timespec tv;
+ apr_status_t rv = APR_SUCCESS;
+ apr_pollset_private_t *priv = pollset->p;
+
+ DBG(6, "entered\n"); /* chatty - traces every second w/Event */
+
+ if ((pollset->flags & APR_POLLSET_THREADSAFE) == 0 ) {
+ return posix_poll(pollset, timeout, num, descriptors);
+ }
+
+ pollset_lock_rings();
+ APR_RING_INIT(&(priv->ready_ring), asio_elem_t, link);
+
+ while (!APR_RING_EMPTY(&(priv->prior_ready_ring), asio_elem_t, link)) {
+ elem = APR_RING_FIRST(&(priv->prior_ready_ring));
+ DBG3(5, "pollset %p elem %p fd %d on prior ready ring\n",
+ pollset,
+ elem,
+ elem->os_pfd.fd);
+
+ APR_RING_REMOVE(elem, link);
+
+ /*
+ * since USS does not remember what's in our pollset, we have
+ * to re-add fds which have not been apr_pollset_remove'd
+ *
+ * there may have been too many ready fd's to return in the
+ * result set last time. re-poll inline for both cases
+ */
+
+ if (elem->state == ASIO_REMOVED) {
+
+ /*
+ * async i/o is done since it was found on prior_ready
+ * the state says the caller is done with it too
+ * so recycle the elem
+ */
+
+ APR_RING_INSERT_TAIL(&(priv->free_ring), elem,
+ asio_elem_t, link);
+ continue; /* do not re-add if it has been _removed */
+ }
+
+ elem->state = ASIO_INIT;
+ elem->a.aio_cflags = AIO_OK2COMPIMD;
+
+ if (0 != (ret = asyncio(&elem->a))) {
+ if (ret == 1) {
+ DBG(4, "asyncio() completed inline\n");
+ /* it's ready now */
+ elem->state = ASIO_COMPLETE;
+ APR_RING_INSERT_TAIL(&(priv->ready_ring), elem, asio_elem_t,
+ link);
+ }
+ else {
+ DBG2(1, "asyncio() failed, ret: %d, errno: %d\n",
+ ret, errno);
+ pollset_unlock_rings();
+ return errno;
+ }
+ }
+ DBG1(4, "asyncio() completed rc %d\n", ret);
+ }
+
+ DBG(6, "after prior ready loop\n"); /* chatty w/timeouts, hence 6 */
+
+ /* Gather async poll completions that have occurred since the last call */
+ while (0 < msgrcv(priv->msg_q, &msg_buff, sizeof(asio_elem_t *), 0,
+ IPC_NOWAIT)) {
+ process_msg(pollset, &msg_buff);
+ }
+
+ /* Suspend if nothing is ready yet. */
+ if (APR_RING_EMPTY(&(priv->ready_ring), asio_elem_t, link)) {
+
+ if (timeout >= 0) {
+ tv.tv_sec = apr_time_sec(timeout);
+ tv.tv_nsec = apr_time_usec(timeout) * 1000;
+ } else {
+ tv.tv_sec = INT_MAX; /* block until something is ready */
+ }
+
+ DBG2(6, "nothing on the ready ring "
+ "- blocking for %d seconds %d ns\n",
+ tv.tv_sec, tv.tv_nsec);
+
+ pollset_unlock_rings(); /* allow other apr_pollset_* calls while blocked */
+
+ if (0 >= (ret = __msgrcv_timed(priv->msg_q, &msg_buff,
+ sizeof(asio_elem_t *), 0, NULL, &tv))) {
+#if DEBUG
+ if (errno == EAGAIN) {
+ DBG(6, "__msgrcv_timed timed out\n"); /* timeout path, so 6 */
+ }
+ else {
+ DBG(1, "__msgrcv_timed failed!\n");
+ }
+#endif
+ return (errno == EAGAIN) ? APR_TIMEUP : errno;
+ }
+
+ pollset_lock_rings();
+
+ process_msg(pollset, &msg_buff);
+ }
+
+ APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link);
+
+ (*num) = 0;
+ elem = APR_RING_FIRST(&(priv->ready_ring));
+
+ for (i = 0;
+
+ i < priv->size
+ && elem != APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link);
+ i++) {
+ DBG2(5, "ready ring: elem %08p, fd %d\n", elem, elem->os_pfd.fd);
+
+ priv->result_set[i] = elem->pfd;
+ priv->result_set[i].rtnevents
+ = get_revent(elem->os_pfd.revents);
+ (*num)++;
+
+ elem = APR_RING_NEXT(elem, link);
+
+#if DEBUG
+ if (elem == APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link)) {
+ DBG(5, "end of ready ring reached\n");
+ }
+#endif
+ }
+
+ if (descriptors) {
+ *descriptors = priv->result_set;
+ }
+
+ /* if the result size is too small, remember which descriptors
+ * haven't had results reported yet. we will look
+ * at these descriptors on the next apr_pollset_poll call
+ */
+
+ APR_RING_CONCAT(&priv->prior_ready_ring, &(priv->ready_ring), asio_elem_t, link);
+
+ DBG1(2, "exiting, rv = %d\n", rv);
+
+ pollset_unlock_rings();
+
+ return rv;
+} /* end of asio_pollset_poll */
+
+static const apr_pollset_provider_t impl = {
+ asio_pollset_create,
+ asio_pollset_add,
+ asio_pollset_remove,
+ asio_pollset_poll,
+ asio_pollset_cleanup,
+ "asio"
+};
+
+const apr_pollset_provider_t *apr_pollset_provider_aio_msgq = &impl;
+
+#endif /* HAVE_AIO_MSGQ */