summaryrefslogtreecommitdiffstats
path: root/libc-bottom-half/sources/poll-wasip2.c
diff options
context:
space:
mode:
Diffstat (limited to 'libc-bottom-half/sources/poll-wasip2.c')
-rw-r--r--libc-bottom-half/sources/poll-wasip2.c257
1 files changed, 257 insertions, 0 deletions
diff --git a/libc-bottom-half/sources/poll-wasip2.c b/libc-bottom-half/sources/poll-wasip2.c
new file mode 100644
index 0000000..be7809c
--- /dev/null
+++ b/libc-bottom-half/sources/poll-wasip2.c
@@ -0,0 +1,257 @@
+#include <errno.h>
+#include <poll.h>
+
+#include <wasi/descriptor_table.h>
+
+typedef struct {
+ poll_own_pollable_t pollable;
+ struct pollfd *pollfd;
+ descriptor_table_entry_t *entry;
+ short events;
+} state_t;
+
+int poll_wasip2(struct pollfd *fds, size_t nfds, int timeout)
+{
+ int event_count = 0;
+ for (size_t i = 0; i < nfds; ++i) {
+ fds[i].revents = 0;
+ }
+
+ size_t max_pollables = (2 * nfds) + 1;
+ state_t states[max_pollables];
+ size_t state_index = 0;
+ for (size_t i = 0; i < nfds; ++i) {
+ struct pollfd *pollfd = fds + i;
+ descriptor_table_entry_t *entry;
+ if (descriptor_table_get_ref(pollfd->fd, &entry)) {
+ switch (entry->tag) {
+ case DESCRIPTOR_TABLE_ENTRY_TCP_SOCKET: {
+ tcp_socket_t *socket = &(entry->tcp_socket);
+ switch (socket->state.tag) {
+ case TCP_SOCKET_STATE_CONNECTING:
+ case TCP_SOCKET_STATE_LISTENING: {
+ if ((pollfd->events &
+ (POLLRDNORM | POLLWRNORM)) != 0) {
+ states[state_index++] = (state_t){
+ .pollable =
+ socket->socket_pollable,
+ .pollfd = pollfd,
+ .entry = entry,
+ .events = pollfd->events
+ };
+ }
+ break;
+ }
+
+ case TCP_SOCKET_STATE_CONNECTED: {
+ if ((pollfd->events & POLLRDNORM) !=
+ 0) {
+ states[state_index++] = (state_t){
+ .pollable =
+ socket->state
+ .connected
+ .input_pollable,
+ .pollfd = pollfd,
+ .entry = entry,
+ .events = POLLRDNORM
+ };
+ }
+ if ((pollfd->events & POLLWRNORM) !=
+ 0) {
+ states[state_index++] = (state_t){
+ .pollable =
+ socket->state
+ .connected
+ .output_pollable,
+ .pollfd = pollfd,
+ .entry = entry,
+ .events = POLLWRNORM
+ };
+ }
+ break;
+ }
+
+ case TCP_SOCKET_STATE_CONNECT_FAILED: {
+ if (pollfd->revents == 0) {
+ ++event_count;
+ }
+ pollfd->revents |= pollfd->events;
+ break;
+ }
+
+ default:
+ errno = ENOTSUP;
+ return -1;
+ }
+ break;
+ }
+
+ case DESCRIPTOR_TABLE_ENTRY_UDP_SOCKET: {
+ udp_socket_t *socket = &(entry->udp_socket);
+ switch (socket->state.tag) {
+ case UDP_SOCKET_STATE_UNBOUND:
+ case UDP_SOCKET_STATE_BOUND_NOSTREAMS: {
+ if (pollfd->revents == 0) {
+ ++event_count;
+ }
+ pollfd->revents |= pollfd->events;
+ break;
+ }
+
+ case UDP_SOCKET_STATE_BOUND_STREAMING:
+ case UDP_SOCKET_STATE_CONNECTED: {
+ udp_socket_streams_t *streams;
+ if (socket->state.tag ==
+ UDP_SOCKET_STATE_BOUND_STREAMING) {
+ streams = &(
+ socket->state
+ .bound_streaming
+ .streams);
+ } else {
+ streams = &(
+ socket->state.connected
+ .streams);
+ }
+ if ((pollfd->events & POLLRDNORM) !=
+ 0) {
+ states[state_index++] = (state_t){
+ .pollable =
+ streams->incoming_pollable,
+ .pollfd = pollfd,
+ .entry = entry,
+ .events = POLLRDNORM
+ };
+ }
+ if ((pollfd->events & POLLWRNORM) !=
+ 0) {
+ states[state_index++] = (state_t){
+ .pollable =
+ streams->outgoing_pollable,
+ .pollfd = pollfd,
+ .entry = entry,
+ .events = POLLWRNORM
+ };
+ }
+ break;
+ }
+
+ default:
+ errno = ENOTSUP;
+ return -1;
+ }
+ break;
+ }
+
+ default:
+ errno = ENOTSUP;
+ return -1;
+ }
+ } else {
+ abort();
+ }
+ }
+
+ if (event_count > 0 && timeout != 0) {
+ return event_count;
+ }
+
+ poll_borrow_pollable_t pollables[state_index + 1];
+ for (size_t i = 0; i < state_index; ++i) {
+ pollables[i] = poll_borrow_pollable(states[i].pollable);
+ }
+
+ poll_own_pollable_t timeout_pollable;
+ size_t pollable_count = state_index;
+ if (timeout >= 0) {
+ timeout_pollable = monotonic_clock_subscribe_duration(
+ ((monotonic_clock_duration_t)timeout) * 1000000);
+ pollables[pollable_count++] =
+ poll_borrow_pollable(timeout_pollable);
+ }
+
+ wasip2_list_u32_t ready;
+ poll_list_borrow_pollable_t list = {
+ .ptr = (poll_borrow_pollable_t *)&pollables,
+ .len = pollable_count
+ };
+ poll_poll(&list, &ready);
+
+ for (size_t i = 0; i < ready.len; ++i) {
+ size_t index = ready.ptr[i];
+ if (index < state_index) {
+ state_t *state = &states[index];
+ if (state->entry->tag ==
+ DESCRIPTOR_TABLE_ENTRY_TCP_SOCKET &&
+ state->entry->tcp_socket.state.tag ==
+ TCP_SOCKET_STATE_CONNECTING) {
+ tcp_socket_t *socket =
+ &(state->entry->tcp_socket);
+ tcp_borrow_tcp_socket_t borrow =
+ tcp_borrow_tcp_socket(socket->socket);
+ tcp_tuple2_own_input_stream_own_output_stream_t
+ tuple;
+ tcp_error_code_t error;
+ if (tcp_method_tcp_socket_finish_connect(
+ borrow, &tuple, &error)) {
+ streams_borrow_input_stream_t
+ input_stream_borrow =
+ streams_borrow_input_stream(
+ tuple.f0);
+ streams_own_pollable_t input_pollable =
+ streams_method_input_stream_subscribe(
+ input_stream_borrow);
+ streams_borrow_output_stream_t
+ output_stream_borrow =
+ streams_borrow_output_stream(
+ tuple.f1);
+ streams_own_pollable_t output_pollable =
+ streams_method_output_stream_subscribe(
+ output_stream_borrow);
+ socket->state =
+ (tcp_socket_state_t){ .tag = TCP_SOCKET_STATE_CONNECTED,
+ .connected = {
+ .input_pollable =
+ input_pollable,
+ .input =
+ tuple.f0,
+ .output_pollable =
+ output_pollable,
+ .output =
+ tuple.f1,
+ } };
+ if (state->pollfd->revents == 0) {
+ ++event_count;
+ }
+ state->pollfd->revents |= state->events;
+ } else if (error ==
+ NETWORK_ERROR_CODE_WOULD_BLOCK) {
+ // No events yet -- application will need to poll again
+ } else {
+ socket->state =
+ (tcp_socket_state_t){ .tag = TCP_SOCKET_STATE_CONNECT_FAILED,
+ .connect_failed = {
+ .error_code =
+ error,
+ } };
+ if (state->pollfd->revents == 0) {
+ ++event_count;
+ }
+ state->pollfd->revents |= state->events;
+ }
+ } else {
+ if (state->pollfd->revents == 0) {
+ ++event_count;
+ }
+ state->pollfd->revents |= state->events;
+ }
+ }
+ }
+
+ wasip2_list_u32_free(&ready);
+
+ if (timeout >= 0) {
+ poll_pollable_drop_own(timeout_pollable);
+ }
+
+ return event_count;
+}