diff options
Diffstat (limited to 'io_uring/io_uring.c')
-rw-r--r-- | io_uring/io_uring.c | 159 |
1 files changed, 76 insertions, 83 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index dc0235ff47..8a216b1d6d 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -59,7 +59,6 @@ #include <linux/bvec.h> #include <linux/net.h> #include <net/sock.h> -#include <net/af_unix.h> #include <linux/anon_inodes.h> #include <linux/sched/mm.h> #include <linux/uaccess.h> @@ -95,6 +94,7 @@ #include "notif.h" #include "waitid.h" #include "futex.h" +#include "napi.h" #include "timeout.h" #include "poll.h" @@ -122,11 +122,6 @@ #define IO_COMPL_BATCH 32 #define IO_REQ_ALLOC_BATCH 8 -enum { - IO_CHECK_CQ_OVERFLOW_BIT, - IO_CHECK_CQ_DROPPED_BIT, -}; - struct io_defer_entry { struct list_head list; struct io_kiocb *req; @@ -350,6 +345,8 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func); INIT_WQ_LIST(&ctx->submit_state.compl_reqs); INIT_HLIST_HEAD(&ctx->cancelable_uring_cmd); + io_napi_init(ctx); + return ctx; err: kfree(ctx->cancel_table.hbs); @@ -463,7 +460,6 @@ static void io_prep_async_work(struct io_kiocb *req) req->work.list.next = NULL; req->work.flags = 0; - req->work.cancel_seq = atomic_read(&ctx->cancel_seq); if (req->flags & REQ_F_FORCE_ASYNC) req->work.flags |= IO_WQ_WORK_CONCURRENT; @@ -670,7 +666,6 @@ static void io_cq_unlock_post(struct io_ring_ctx *ctx) io_commit_cqring_flush(ctx); } -/* Returns true if there are no backlogged entries after the flush */ static void io_cqring_overflow_kill(struct io_ring_ctx *ctx) { struct io_overflow_cqe *ocqe; @@ -1027,15 +1022,15 @@ static void __io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) { - if (req->ctx->task_complete && req->ctx->submitter_task != current) { + struct io_ring_ctx *ctx = req->ctx; + + if (ctx->task_complete && ctx->submitter_task != current) { req->io_task_work.func = io_req_task_complete; io_req_task_work_add(req); } else if (!(issue_flags & IO_URING_F_UNLOCKED) || - !(req->ctx->flags & IORING_SETUP_IOPOLL)) { + !(ctx->flags & IORING_SETUP_IOPOLL)) { __io_req_complete_post(req, issue_flags); } else { - struct io_ring_ctx *ctx = req->ctx; - mutex_lock(&ctx->uring_lock); __io_req_complete_post(req, issue_flags & ~IO_URING_F_UNLOCKED); mutex_unlock(&ctx->uring_lock); @@ -1176,39 +1171,44 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts) percpu_ref_put(&ctx->refs); } -static unsigned int handle_tw_list(struct llist_node *node, - struct io_ring_ctx **ctx, - struct io_tw_state *ts) +/* + * Run queued task_work, returning the number of entries processed in *count. + * If more entries than max_entries are available, stop processing once this + * is reached and return the rest of the list. + */ +struct llist_node *io_handle_tw_list(struct llist_node *node, + unsigned int *count, + unsigned int max_entries) { - unsigned int count = 0; + struct io_ring_ctx *ctx = NULL; + struct io_tw_state ts = { }; do { struct llist_node *next = node->next; struct io_kiocb *req = container_of(node, struct io_kiocb, io_task_work.node); - prefetch(container_of(next, struct io_kiocb, io_task_work.node)); - - if (req->ctx != *ctx) { - ctx_flush_and_put(*ctx, ts); - *ctx = req->ctx; + if (req->ctx != ctx) { + ctx_flush_and_put(ctx, &ts); + ctx = req->ctx; /* if not contended, grab and improve batching */ - ts->locked = mutex_trylock(&(*ctx)->uring_lock); - percpu_ref_get(&(*ctx)->refs); + ts.locked = mutex_trylock(&ctx->uring_lock); + percpu_ref_get(&ctx->refs); } INDIRECT_CALL_2(req->io_task_work.func, io_poll_task_func, io_req_rw_complete, - req, ts); + req, &ts); node = next; - count++; + (*count)++; if (unlikely(need_resched())) { - ctx_flush_and_put(*ctx, ts); - *ctx = NULL; + ctx_flush_and_put(ctx, &ts); + ctx = NULL; cond_resched(); } - } while (node); + } while (node && *count < max_entries); - return count; + ctx_flush_and_put(ctx, &ts); + return node; } /** @@ -1253,31 +1253,41 @@ static __cold void io_fallback_tw(struct io_uring_task *tctx, bool sync) } } -void tctx_task_work(struct callback_head *cb) +struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, + unsigned int max_entries, + unsigned int *count) { - struct io_tw_state ts = {}; - struct io_ring_ctx *ctx = NULL; - struct io_uring_task *tctx = container_of(cb, struct io_uring_task, - task_work); struct llist_node *node; - unsigned int count = 0; if (unlikely(current->flags & PF_EXITING)) { io_fallback_tw(tctx, true); - return; + return NULL; } node = llist_del_all(&tctx->task_list); - if (node) - count = handle_tw_list(node, &ctx, &ts); - - ctx_flush_and_put(ctx, &ts); + if (node) { + node = llist_reverse_order(node); + node = io_handle_tw_list(node, count, max_entries); + } /* relaxed read is enough as only the task itself sets ->in_cancel */ if (unlikely(atomic_read(&tctx->in_cancel))) io_uring_drop_tctx_refs(current); - trace_io_uring_task_work_run(tctx, count, 1); + trace_io_uring_task_work_run(tctx, *count); + return node; +} + +void tctx_task_work(struct callback_head *cb) +{ + struct io_uring_task *tctx; + struct llist_node *ret; + unsigned int count = 0; + + tctx = container_of(cb, struct io_uring_task, task_work); + ret = tctx_task_work_run(tctx, UINT_MAX, &count); + /* can't happen */ + WARN_ON_ONCE(ret); } static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags) @@ -1360,6 +1370,15 @@ static void io_req_normal_work_add(struct io_kiocb *req) if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + /* SQPOLL doesn't need the task_work added, it'll run it itself */ + if (ctx->flags & IORING_SETUP_SQPOLL) { + struct io_sq_data *sqd = ctx->sq_data; + + if (wq_has_sleeper(&sqd->wait)) + wake_up(&sqd->wait); + return; + } + if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method))) return; @@ -1424,7 +1443,6 @@ again: struct llist_node *next = node->next; struct io_kiocb *req = container_of(node, struct io_kiocb, io_task_work.node); - prefetch(container_of(next, struct io_kiocb, io_task_work.node)); INDIRECT_CALL_2(req->io_task_work.func, io_poll_task_func, io_req_rw_complete, req, ts); @@ -1754,9 +1772,9 @@ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags) } } -unsigned int io_file_get_flags(struct file *file) +io_req_flags_t io_file_get_flags(struct file *file) { - unsigned int res = 0; + io_req_flags_t res = 0; if (S_ISREG(file_inode(file)->i_mode)) res |= REQ_F_ISREG; @@ -1962,7 +1980,7 @@ fail: */ if (req->flags & REQ_F_APOLL_MULTISHOT) { err = -EBADFD; - if (!file_can_poll(req->file)) + if (!io_file_can_poll(req)) goto fail; if (req->file->f_flags & O_NONBLOCK || req->file->f_mode & FMODE_NOWAIT) { @@ -1978,7 +1996,7 @@ fail: if (req->flags & REQ_F_FORCE_ASYNC) { bool opcode_poll = def->pollin || def->pollout; - if (opcode_poll && file_can_poll(req->file)) { + if (opcode_poll && io_file_can_poll(req)) { needs_poll = true; issue_flags |= IO_URING_F_NONBLOCK; } @@ -2187,11 +2205,13 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, /* req is partially pre-initialised, see io_preinit_req() */ req->opcode = opcode = READ_ONCE(sqe->opcode); /* same numerical values with corresponding REQ_F_*, safe to copy */ - req->flags = sqe_flags = READ_ONCE(sqe->flags); + sqe_flags = READ_ONCE(sqe->flags); + req->flags = (io_req_flags_t) sqe_flags; req->cqe.user_data = READ_ONCE(sqe->user_data); req->file = NULL; req->rsrc_node = NULL; req->task = current; + req->cancel_seq_set = false; if (unlikely(opcode >= IORING_OP_LAST)) { req->opcode = 0; @@ -2491,33 +2511,6 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) return ret; } -struct io_wait_queue { - struct wait_queue_entry wq; - struct io_ring_ctx *ctx; - unsigned cq_tail; - unsigned nr_timeouts; - ktime_t timeout; -}; - -static inline bool io_has_work(struct io_ring_ctx *ctx) -{ - return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) || - !llist_empty(&ctx->work_llist); -} - -static inline bool io_should_wake(struct io_wait_queue *iowq) -{ - struct io_ring_ctx *ctx = iowq->ctx; - int dist = READ_ONCE(ctx->rings->cq.tail) - (int) iowq->cq_tail; - - /* - * Wake up if we have enough events, or if a timeout occurred since we - * started waiting. For timeouts, we always want to return to userspace, - * regardless of event count. - */ - return dist >= 0 || atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts; -} - static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode, int wake_flags, void *key) { @@ -2623,7 +2616,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, if (get_timespec64(&ts, uts)) return -EFAULT; + iowq.timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns()); + io_napi_adjust_timeout(ctx, &iowq, &ts); } if (sig) { @@ -2639,6 +2634,8 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, return ret; } + io_napi_busy_loop(ctx, &iowq); + trace_io_uring_cqring_wait(ctx, min_events); do { int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail); @@ -2712,13 +2709,9 @@ void io_mem_free(void *ptr) static void io_pages_free(struct page ***pages, int npages) { - struct page **page_array; + struct page **page_array = *pages; int i; - if (!pages) - return; - - page_array = *pages; if (!page_array) return; @@ -2936,6 +2929,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) io_req_caches_free(ctx); if (ctx->hash_map) io_wq_put_hash(ctx->hash_map); + io_napi_free(ctx); kfree(ctx->cancel_table.hbs); kfree(ctx->cancel_table_locked.hbs); xa_destroy(&ctx->io_bl_xa); @@ -4172,7 +4166,7 @@ static int __init io_uring_init(void) BUILD_BUG_ON(SQE_COMMON_FLAGS >= (1 << 8)); BUILD_BUG_ON((SQE_VALID_FLAGS | SQE_COMMON_FLAGS) != SQE_VALID_FLAGS); - BUILD_BUG_ON(__REQ_F_LAST_BIT > 8 * sizeof(int)); + BUILD_BUG_ON(__REQ_F_LAST_BIT > 8 * sizeof_field(struct io_kiocb, flags)); BUILD_BUG_ON(sizeof(atomic_t) != sizeof(u32)); @@ -4194,9 +4188,8 @@ static int __init io_uring_init(void) SLAB_ACCOUNT | SLAB_TYPESAFE_BY_RCU, offsetof(struct io_kiocb, cmd.data), sizeof_field(struct io_kiocb, cmd.data), NULL); - io_buf_cachep = kmem_cache_create("io_buffer", sizeof(struct io_buffer), 0, - SLAB_HWCACHE_ALIGN | SLAB_PANIC | SLAB_ACCOUNT, - NULL); + io_buf_cachep = KMEM_CACHE(io_buffer, + SLAB_HWCACHE_ALIGN | SLAB_PANIC | SLAB_ACCOUNT); iou_wq = alloc_workqueue("iou_exit", WQ_UNBOUND, 64); |