summaryrefslogtreecommitdiffstats
path: root/fs/bcachefs/thread_with_file.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/bcachefs/thread_with_file.c')
-rw-r--r--fs/bcachefs/thread_with_file.c299
1 files changed, 299 insertions, 0 deletions
diff --git a/fs/bcachefs/thread_with_file.c b/fs/bcachefs/thread_with_file.c
new file mode 100644
index 0000000000..9220d7de10
--- /dev/null
+++ b/fs/bcachefs/thread_with_file.c
@@ -0,0 +1,299 @@
+// SPDX-License-Identifier: GPL-2.0
+#ifndef NO_BCACHEFS_FS
+
+#include "bcachefs.h"
+#include "printbuf.h"
+#include "thread_with_file.h"
+
+#include <linux/anon_inodes.h>
+#include <linux/file.h>
+#include <linux/kthread.h>
+#include <linux/pagemap.h>
+#include <linux/poll.h>
+
+void bch2_thread_with_file_exit(struct thread_with_file *thr)
+{
+ if (thr->task) {
+ kthread_stop(thr->task);
+ put_task_struct(thr->task);
+ }
+}
+
+int bch2_run_thread_with_file(struct thread_with_file *thr,
+ const struct file_operations *fops,
+ int (*fn)(void *))
+{
+ struct file *file = NULL;
+ int ret, fd = -1;
+ unsigned fd_flags = O_CLOEXEC;
+
+ if (fops->read && fops->write)
+ fd_flags |= O_RDWR;
+ else if (fops->read)
+ fd_flags |= O_RDONLY;
+ else if (fops->write)
+ fd_flags |= O_WRONLY;
+
+ char name[TASK_COMM_LEN];
+ get_task_comm(name, current);
+
+ thr->ret = 0;
+ thr->task = kthread_create(fn, thr, "%s", name);
+ ret = PTR_ERR_OR_ZERO(thr->task);
+ if (ret)
+ return ret;
+
+ ret = get_unused_fd_flags(fd_flags);
+ if (ret < 0)
+ goto err;
+ fd = ret;
+
+ file = anon_inode_getfile(name, fops, thr, fd_flags);
+ ret = PTR_ERR_OR_ZERO(file);
+ if (ret)
+ goto err;
+
+ get_task_struct(thr->task);
+ wake_up_process(thr->task);
+ fd_install(fd, file);
+ return fd;
+err:
+ if (fd >= 0)
+ put_unused_fd(fd);
+ if (thr->task)
+ kthread_stop(thr->task);
+ return ret;
+}
+
+static inline bool thread_with_stdio_has_output(struct thread_with_stdio *thr)
+{
+ return thr->stdio.output_buf.pos ||
+ thr->output2.nr ||
+ thr->thr.done;
+}
+
+static ssize_t thread_with_stdio_read(struct file *file, char __user *buf,
+ size_t len, loff_t *ppos)
+{
+ struct thread_with_stdio *thr =
+ container_of(file->private_data, struct thread_with_stdio, thr);
+ size_t copied = 0, b;
+ int ret = 0;
+
+ if ((file->f_flags & O_NONBLOCK) &&
+ !thread_with_stdio_has_output(thr))
+ return -EAGAIN;
+
+ ret = wait_event_interruptible(thr->stdio.output_wait,
+ thread_with_stdio_has_output(thr));
+ if (ret)
+ return ret;
+
+ if (thr->thr.done)
+ return 0;
+
+ while (len) {
+ ret = darray_make_room(&thr->output2, thr->stdio.output_buf.pos);
+ if (ret)
+ break;
+
+ spin_lock_irq(&thr->stdio.output_lock);
+ b = min_t(size_t, darray_room(thr->output2), thr->stdio.output_buf.pos);
+
+ memcpy(&darray_top(thr->output2), thr->stdio.output_buf.buf, b);
+ memmove(thr->stdio.output_buf.buf,
+ thr->stdio.output_buf.buf + b,
+ thr->stdio.output_buf.pos - b);
+
+ thr->output2.nr += b;
+ thr->stdio.output_buf.pos -= b;
+ spin_unlock_irq(&thr->stdio.output_lock);
+
+ b = min(len, thr->output2.nr);
+ if (!b)
+ break;
+
+ b -= copy_to_user(buf, thr->output2.data, b);
+ if (!b) {
+ ret = -EFAULT;
+ break;
+ }
+
+ copied += b;
+ buf += b;
+ len -= b;
+
+ memmove(thr->output2.data,
+ thr->output2.data + b,
+ thr->output2.nr - b);
+ thr->output2.nr -= b;
+ }
+
+ return copied ?: ret;
+}
+
+static int thread_with_stdio_release(struct inode *inode, struct file *file)
+{
+ struct thread_with_stdio *thr =
+ container_of(file->private_data, struct thread_with_stdio, thr);
+
+ bch2_thread_with_file_exit(&thr->thr);
+ printbuf_exit(&thr->stdio.input_buf);
+ printbuf_exit(&thr->stdio.output_buf);
+ darray_exit(&thr->output2);
+ thr->exit(thr);
+ return 0;
+}
+
+#define WRITE_BUFFER 4096
+
+static inline bool thread_with_stdio_has_input_space(struct thread_with_stdio *thr)
+{
+ return thr->stdio.input_buf.pos < WRITE_BUFFER || thr->thr.done;
+}
+
+static ssize_t thread_with_stdio_write(struct file *file, const char __user *ubuf,
+ size_t len, loff_t *ppos)
+{
+ struct thread_with_stdio *thr =
+ container_of(file->private_data, struct thread_with_stdio, thr);
+ struct printbuf *buf = &thr->stdio.input_buf;
+ size_t copied = 0;
+ ssize_t ret = 0;
+
+ while (len) {
+ if (thr->thr.done) {
+ ret = -EPIPE;
+ break;
+ }
+
+ size_t b = len - fault_in_readable(ubuf, len);
+ if (!b) {
+ ret = -EFAULT;
+ break;
+ }
+
+ spin_lock(&thr->stdio.input_lock);
+ if (buf->pos < WRITE_BUFFER)
+ bch2_printbuf_make_room(buf, min(b, WRITE_BUFFER - buf->pos));
+ b = min(len, printbuf_remaining_size(buf));
+
+ if (b && !copy_from_user_nofault(&buf->buf[buf->pos], ubuf, b)) {
+ ubuf += b;
+ len -= b;
+ copied += b;
+ buf->pos += b;
+ }
+ spin_unlock(&thr->stdio.input_lock);
+
+ if (b) {
+ wake_up(&thr->stdio.input_wait);
+ } else {
+ if ((file->f_flags & O_NONBLOCK)) {
+ ret = -EAGAIN;
+ break;
+ }
+
+ ret = wait_event_interruptible(thr->stdio.input_wait,
+ thread_with_stdio_has_input_space(thr));
+ if (ret)
+ break;
+ }
+ }
+
+ return copied ?: ret;
+}
+
+static __poll_t thread_with_stdio_poll(struct file *file, struct poll_table_struct *wait)
+{
+ struct thread_with_stdio *thr =
+ container_of(file->private_data, struct thread_with_stdio, thr);
+
+ poll_wait(file, &thr->stdio.output_wait, wait);
+ poll_wait(file, &thr->stdio.input_wait, wait);
+
+ __poll_t mask = 0;
+
+ if (thread_with_stdio_has_output(thr))
+ mask |= EPOLLIN;
+ if (thread_with_stdio_has_input_space(thr))
+ mask |= EPOLLOUT;
+ if (thr->thr.done)
+ mask |= EPOLLHUP|EPOLLERR;
+ return mask;
+}
+
+static const struct file_operations thread_with_stdio_fops = {
+ .release = thread_with_stdio_release,
+ .read = thread_with_stdio_read,
+ .write = thread_with_stdio_write,
+ .poll = thread_with_stdio_poll,
+ .llseek = no_llseek,
+};
+
+int bch2_run_thread_with_stdio(struct thread_with_stdio *thr,
+ void (*exit)(struct thread_with_stdio *),
+ int (*fn)(void *))
+{
+ thr->stdio.input_buf = PRINTBUF;
+ thr->stdio.input_buf.atomic++;
+ spin_lock_init(&thr->stdio.input_lock);
+ init_waitqueue_head(&thr->stdio.input_wait);
+
+ thr->stdio.output_buf = PRINTBUF;
+ thr->stdio.output_buf.atomic++;
+ spin_lock_init(&thr->stdio.output_lock);
+ init_waitqueue_head(&thr->stdio.output_wait);
+
+ darray_init(&thr->output2);
+ thr->exit = exit;
+
+ return bch2_run_thread_with_file(&thr->thr, &thread_with_stdio_fops, fn);
+}
+
+int bch2_stdio_redirect_read(struct stdio_redirect *stdio, char *buf, size_t len)
+{
+ wait_event(stdio->input_wait,
+ stdio->input_buf.pos || stdio->done);
+
+ if (stdio->done)
+ return -1;
+
+ spin_lock(&stdio->input_lock);
+ int ret = min(len, stdio->input_buf.pos);
+ stdio->input_buf.pos -= ret;
+ memcpy(buf, stdio->input_buf.buf, ret);
+ memmove(stdio->input_buf.buf,
+ stdio->input_buf.buf + ret,
+ stdio->input_buf.pos);
+ spin_unlock(&stdio->input_lock);
+
+ wake_up(&stdio->input_wait);
+ return ret;
+}
+
+int bch2_stdio_redirect_readline(struct stdio_redirect *stdio, char *buf, size_t len)
+{
+ wait_event(stdio->input_wait,
+ stdio->input_buf.pos || stdio->done);
+
+ if (stdio->done)
+ return -1;
+
+ spin_lock(&stdio->input_lock);
+ int ret = min(len, stdio->input_buf.pos);
+ char *n = memchr(stdio->input_buf.buf, '\n', ret);
+ if (n)
+ ret = min(ret, n + 1 - stdio->input_buf.buf);
+ stdio->input_buf.pos -= ret;
+ memcpy(buf, stdio->input_buf.buf, ret);
+ memmove(stdio->input_buf.buf,
+ stdio->input_buf.buf + ret,
+ stdio->input_buf.pos);
+ spin_unlock(&stdio->input_lock);
+
+ wake_up(&stdio->input_wait);
+ return ret;
+}
+
+#endif /* NO_BCACHEFS_FS */