summaryrefslogtreecommitdiffstats
path: root/io_uring/io_uring.c
diff options
context:
space:
mode:
Diffstat (limited to 'io_uring/io_uring.c')
-rw-r--r--io_uring/io_uring.c159
1 files changed, 76 insertions, 83 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index dc0235ff47..8a216b1d6d 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -59,7 +59,6 @@
#include <linux/bvec.h>
#include <linux/net.h>
#include <net/sock.h>
-#include <net/af_unix.h>
#include <linux/anon_inodes.h>
#include <linux/sched/mm.h>
#include <linux/uaccess.h>
@@ -95,6 +94,7 @@
#include "notif.h"
#include "waitid.h"
#include "futex.h"
+#include "napi.h"
#include "timeout.h"
#include "poll.h"
@@ -122,11 +122,6 @@
#define IO_COMPL_BATCH 32
#define IO_REQ_ALLOC_BATCH 8
-enum {
- IO_CHECK_CQ_OVERFLOW_BIT,
- IO_CHECK_CQ_DROPPED_BIT,
-};
-
struct io_defer_entry {
struct list_head list;
struct io_kiocb *req;
@@ -350,6 +345,8 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
INIT_HLIST_HEAD(&ctx->cancelable_uring_cmd);
+ io_napi_init(ctx);
+
return ctx;
err:
kfree(ctx->cancel_table.hbs);
@@ -463,7 +460,6 @@ static void io_prep_async_work(struct io_kiocb *req)
req->work.list.next = NULL;
req->work.flags = 0;
- req->work.cancel_seq = atomic_read(&ctx->cancel_seq);
if (req->flags & REQ_F_FORCE_ASYNC)
req->work.flags |= IO_WQ_WORK_CONCURRENT;
@@ -670,7 +666,6 @@ static void io_cq_unlock_post(struct io_ring_ctx *ctx)
io_commit_cqring_flush(ctx);
}
-/* Returns true if there are no backlogged entries after the flush */
static void io_cqring_overflow_kill(struct io_ring_ctx *ctx)
{
struct io_overflow_cqe *ocqe;
@@ -1027,15 +1022,15 @@ static void __io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
{
- if (req->ctx->task_complete && req->ctx->submitter_task != current) {
+ struct io_ring_ctx *ctx = req->ctx;
+
+ if (ctx->task_complete && ctx->submitter_task != current) {
req->io_task_work.func = io_req_task_complete;
io_req_task_work_add(req);
} else if (!(issue_flags & IO_URING_F_UNLOCKED) ||
- !(req->ctx->flags & IORING_SETUP_IOPOLL)) {
+ !(ctx->flags & IORING_SETUP_IOPOLL)) {
__io_req_complete_post(req, issue_flags);
} else {
- struct io_ring_ctx *ctx = req->ctx;
-
mutex_lock(&ctx->uring_lock);
__io_req_complete_post(req, issue_flags & ~IO_URING_F_UNLOCKED);
mutex_unlock(&ctx->uring_lock);
@@ -1176,39 +1171,44 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
percpu_ref_put(&ctx->refs);
}
-static unsigned int handle_tw_list(struct llist_node *node,
- struct io_ring_ctx **ctx,
- struct io_tw_state *ts)
+/*
+ * Run queued task_work, returning the number of entries processed in *count.
+ * If more entries than max_entries are available, stop processing once this
+ * is reached and return the rest of the list.
+ */
+struct llist_node *io_handle_tw_list(struct llist_node *node,
+ unsigned int *count,
+ unsigned int max_entries)
{
- unsigned int count = 0;
+ struct io_ring_ctx *ctx = NULL;
+ struct io_tw_state ts = { };
do {
struct llist_node *next = node->next;
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
- prefetch(container_of(next, struct io_kiocb, io_task_work.node));
-
- if (req->ctx != *ctx) {
- ctx_flush_and_put(*ctx, ts);
- *ctx = req->ctx;
+ if (req->ctx != ctx) {
+ ctx_flush_and_put(ctx, &ts);
+ ctx = req->ctx;
/* if not contended, grab and improve batching */
- ts->locked = mutex_trylock(&(*ctx)->uring_lock);
- percpu_ref_get(&(*ctx)->refs);
+ ts.locked = mutex_trylock(&ctx->uring_lock);
+ percpu_ref_get(&ctx->refs);
}
INDIRECT_CALL_2(req->io_task_work.func,
io_poll_task_func, io_req_rw_complete,
- req, ts);
+ req, &ts);
node = next;
- count++;
+ (*count)++;
if (unlikely(need_resched())) {
- ctx_flush_and_put(*ctx, ts);
- *ctx = NULL;
+ ctx_flush_and_put(ctx, &ts);
+ ctx = NULL;
cond_resched();
}
- } while (node);
+ } while (node && *count < max_entries);
- return count;
+ ctx_flush_and_put(ctx, &ts);
+ return node;
}
/**
@@ -1253,31 +1253,41 @@ static __cold void io_fallback_tw(struct io_uring_task *tctx, bool sync)
}
}
-void tctx_task_work(struct callback_head *cb)
+struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
+ unsigned int max_entries,
+ unsigned int *count)
{
- struct io_tw_state ts = {};
- struct io_ring_ctx *ctx = NULL;
- struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
- task_work);
struct llist_node *node;
- unsigned int count = 0;
if (unlikely(current->flags & PF_EXITING)) {
io_fallback_tw(tctx, true);
- return;
+ return NULL;
}
node = llist_del_all(&tctx->task_list);
- if (node)
- count = handle_tw_list(node, &ctx, &ts);
-
- ctx_flush_and_put(ctx, &ts);
+ if (node) {
+ node = llist_reverse_order(node);
+ node = io_handle_tw_list(node, count, max_entries);
+ }
/* relaxed read is enough as only the task itself sets ->in_cancel */
if (unlikely(atomic_read(&tctx->in_cancel)))
io_uring_drop_tctx_refs(current);
- trace_io_uring_task_work_run(tctx, count, 1);
+ trace_io_uring_task_work_run(tctx, *count);
+ return node;
+}
+
+void tctx_task_work(struct callback_head *cb)
+{
+ struct io_uring_task *tctx;
+ struct llist_node *ret;
+ unsigned int count = 0;
+
+ tctx = container_of(cb, struct io_uring_task, task_work);
+ ret = tctx_task_work_run(tctx, UINT_MAX, &count);
+ /* can't happen */
+ WARN_ON_ONCE(ret);
}
static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
@@ -1360,6 +1370,15 @@ static void io_req_normal_work_add(struct io_kiocb *req)
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+ /* SQPOLL doesn't need the task_work added, it'll run it itself */
+ if (ctx->flags & IORING_SETUP_SQPOLL) {
+ struct io_sq_data *sqd = ctx->sq_data;
+
+ if (wq_has_sleeper(&sqd->wait))
+ wake_up(&sqd->wait);
+ return;
+ }
+
if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method)))
return;
@@ -1424,7 +1443,6 @@ again:
struct llist_node *next = node->next;
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
- prefetch(container_of(next, struct io_kiocb, io_task_work.node));
INDIRECT_CALL_2(req->io_task_work.func,
io_poll_task_func, io_req_rw_complete,
req, ts);
@@ -1754,9 +1772,9 @@ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
}
}
-unsigned int io_file_get_flags(struct file *file)
+io_req_flags_t io_file_get_flags(struct file *file)
{
- unsigned int res = 0;
+ io_req_flags_t res = 0;
if (S_ISREG(file_inode(file)->i_mode))
res |= REQ_F_ISREG;
@@ -1962,7 +1980,7 @@ fail:
*/
if (req->flags & REQ_F_APOLL_MULTISHOT) {
err = -EBADFD;
- if (!file_can_poll(req->file))
+ if (!io_file_can_poll(req))
goto fail;
if (req->file->f_flags & O_NONBLOCK ||
req->file->f_mode & FMODE_NOWAIT) {
@@ -1978,7 +1996,7 @@ fail:
if (req->flags & REQ_F_FORCE_ASYNC) {
bool opcode_poll = def->pollin || def->pollout;
- if (opcode_poll && file_can_poll(req->file)) {
+ if (opcode_poll && io_file_can_poll(req)) {
needs_poll = true;
issue_flags |= IO_URING_F_NONBLOCK;
}
@@ -2187,11 +2205,13 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
/* req is partially pre-initialised, see io_preinit_req() */
req->opcode = opcode = READ_ONCE(sqe->opcode);
/* same numerical values with corresponding REQ_F_*, safe to copy */
- req->flags = sqe_flags = READ_ONCE(sqe->flags);
+ sqe_flags = READ_ONCE(sqe->flags);
+ req->flags = (io_req_flags_t) sqe_flags;
req->cqe.user_data = READ_ONCE(sqe->user_data);
req->file = NULL;
req->rsrc_node = NULL;
req->task = current;
+ req->cancel_seq_set = false;
if (unlikely(opcode >= IORING_OP_LAST)) {
req->opcode = 0;
@@ -2491,33 +2511,6 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
return ret;
}
-struct io_wait_queue {
- struct wait_queue_entry wq;
- struct io_ring_ctx *ctx;
- unsigned cq_tail;
- unsigned nr_timeouts;
- ktime_t timeout;
-};
-
-static inline bool io_has_work(struct io_ring_ctx *ctx)
-{
- return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) ||
- !llist_empty(&ctx->work_llist);
-}
-
-static inline bool io_should_wake(struct io_wait_queue *iowq)
-{
- struct io_ring_ctx *ctx = iowq->ctx;
- int dist = READ_ONCE(ctx->rings->cq.tail) - (int) iowq->cq_tail;
-
- /*
- * Wake up if we have enough events, or if a timeout occurred since we
- * started waiting. For timeouts, we always want to return to userspace,
- * regardless of event count.
- */
- return dist >= 0 || atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
-}
-
static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
int wake_flags, void *key)
{
@@ -2623,7 +2616,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
if (get_timespec64(&ts, uts))
return -EFAULT;
+
iowq.timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
+ io_napi_adjust_timeout(ctx, &iowq, &ts);
}
if (sig) {
@@ -2639,6 +2634,8 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
return ret;
}
+ io_napi_busy_loop(ctx, &iowq);
+
trace_io_uring_cqring_wait(ctx, min_events);
do {
int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);
@@ -2712,13 +2709,9 @@ void io_mem_free(void *ptr)
static void io_pages_free(struct page ***pages, int npages)
{
- struct page **page_array;
+ struct page **page_array = *pages;
int i;
- if (!pages)
- return;
-
- page_array = *pages;
if (!page_array)
return;
@@ -2936,6 +2929,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
io_req_caches_free(ctx);
if (ctx->hash_map)
io_wq_put_hash(ctx->hash_map);
+ io_napi_free(ctx);
kfree(ctx->cancel_table.hbs);
kfree(ctx->cancel_table_locked.hbs);
xa_destroy(&ctx->io_bl_xa);
@@ -4172,7 +4166,7 @@ static int __init io_uring_init(void)
BUILD_BUG_ON(SQE_COMMON_FLAGS >= (1 << 8));
BUILD_BUG_ON((SQE_VALID_FLAGS | SQE_COMMON_FLAGS) != SQE_VALID_FLAGS);
- BUILD_BUG_ON(__REQ_F_LAST_BIT > 8 * sizeof(int));
+ BUILD_BUG_ON(__REQ_F_LAST_BIT > 8 * sizeof_field(struct io_kiocb, flags));
BUILD_BUG_ON(sizeof(atomic_t) != sizeof(u32));
@@ -4194,9 +4188,8 @@ static int __init io_uring_init(void)
SLAB_ACCOUNT | SLAB_TYPESAFE_BY_RCU,
offsetof(struct io_kiocb, cmd.data),
sizeof_field(struct io_kiocb, cmd.data), NULL);
- io_buf_cachep = kmem_cache_create("io_buffer", sizeof(struct io_buffer), 0,
- SLAB_HWCACHE_ALIGN | SLAB_PANIC | SLAB_ACCOUNT,
- NULL);
+ io_buf_cachep = KMEM_CACHE(io_buffer,
+ SLAB_HWCACHE_ALIGN | SLAB_PANIC | SLAB_ACCOUNT);
iou_wq = alloc_workqueue("iou_exit", WQ_UNBOUND, 64);