diff options
Diffstat (limited to '')
-rw-r--r-- | src/master/service-process-notify.c | 101 |
1 files changed, 101 insertions, 0 deletions
diff --git a/src/master/service-process-notify.c b/src/master/service-process-notify.c new file mode 100644 index 0000000..1cda677 --- /dev/null +++ b/src/master/service-process-notify.c @@ -0,0 +1,101 @@ +/* Copyright (c) 2009-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "aqueue.h" +#include "ioloop.h" +#include "service.h" +#include "service-process.h" +#include "service-process-notify.h" + +struct service_process_notify { + service_process_notify_callback_t *write_callback; + + int fd; + struct io *io_write; + struct aqueue *process_queue; + ARRAY(struct service_process *) processes; +}; + +struct service_process_notify * +service_process_notify_init(int fd, + service_process_notify_callback_t *write_callback) +{ + struct service_process_notify *notify; + + notify = i_new(struct service_process_notify, 1); + notify->fd = fd; + notify->write_callback = write_callback; + + i_array_init(¬ify->processes, 64); + notify->process_queue = aqueue_init(¬ify->processes.arr); + return notify; +} + +static void service_process_notify_reset(struct service_process_notify *notify) +{ + struct service_process *const *processes, *process; + unsigned int i, count; + + if (notify->io_write == NULL) + return; + + processes = array_front_modifiable(¬ify->processes); + count = aqueue_count(notify->process_queue); + for (i = 0; i < count; i++) { + process = processes[aqueue_idx(notify->process_queue, i)]; + service_process_unref(process); + } + aqueue_clear(notify->process_queue); + array_clear(¬ify->processes); + + io_remove(¬ify->io_write); +} + +static void notify_flush(struct service_process_notify *notify) +{ + struct service_process *const *processes, *process; + + while (aqueue_count(notify->process_queue) > 0) { + processes = array_front_modifiable(¬ify->processes); + process = processes[aqueue_idx(notify->process_queue, 0)]; + + if (notify->write_callback(notify->fd, process) < 0) { + if (errno != EAGAIN) + service_process_notify_reset(notify); + return; + } + service_process_unref(process); + aqueue_delete_tail(notify->process_queue); + } + io_remove(¬ify->io_write); +} + +void service_process_notify_deinit(struct service_process_notify **_notify) +{ + struct service_process_notify *notify = *_notify; + + *_notify = NULL; + + service_process_notify_reset(notify); + io_remove(¬ify->io_write); + aqueue_deinit(¬ify->process_queue); + array_free(¬ify->processes); + i_free(notify); +} + +void service_process_notify_add(struct service_process_notify *notify, + struct service_process *process) +{ + if (notify->write_callback(notify->fd, process) < 0) { + if (errno != EAGAIN) + return; + + if (notify->io_write == NULL) { + notify->io_write = io_add(notify->fd, IO_WRITE, + notify_flush, notify); + } + aqueue_append(notify->process_queue, &process); + service_process_ref(process); + } +} |