summaryrefslogtreecommitdiffstats
path: root/io_uring/io-wq.c
diff options
context:
space:
mode:
Diffstat (limited to 'io_uring/io-wq.c')
-rw-r--r--io_uring/io-wq.c87
1 files changed, 46 insertions, 41 deletions
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index 522196dfb0..22dac58503 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -23,12 +23,13 @@
#include "io_uring.h"
#define WORKER_IDLE_TIMEOUT (5 * HZ)
+#define WORKER_INIT_LIMIT 3
enum {
- IO_WORKER_F_UP = 1, /* up and active */
- IO_WORKER_F_RUNNING = 2, /* account as running */
- IO_WORKER_F_FREE = 4, /* worker on free list */
- IO_WORKER_F_BOUND = 8, /* is doing bounded work */
+ IO_WORKER_F_UP = 0, /* up and active */
+ IO_WORKER_F_RUNNING = 1, /* account as running */
+ IO_WORKER_F_FREE = 2, /* worker on free list */
+ IO_WORKER_F_BOUND = 3, /* is doing bounded work */
};
enum {
@@ -44,21 +45,21 @@ enum {
*/
struct io_worker {
refcount_t ref;
- unsigned flags;
+ int create_index;
+ unsigned long flags;
struct hlist_nulls_node nulls_node;
struct list_head all_list;
struct task_struct *task;
struct io_wq *wq;
struct io_wq_work *cur_work;
- struct io_wq_work *next_work;
raw_spinlock_t lock;
struct completion ref_done;
unsigned long create_state;
struct callback_head create_work;
- int create_index;
+ int init_retries;
union {
struct rcu_head rcu;
@@ -165,7 +166,7 @@ static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq,
static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker)
{
- return io_get_acct(worker->wq, worker->flags & IO_WORKER_F_BOUND);
+ return io_get_acct(worker->wq, test_bit(IO_WORKER_F_BOUND, &worker->flags));
}
static void io_worker_ref_put(struct io_wq *wq)
@@ -225,7 +226,7 @@ static void io_worker_exit(struct io_worker *worker)
wait_for_completion(&worker->ref_done);
raw_spin_lock(&wq->lock);
- if (worker->flags & IO_WORKER_F_FREE)
+ if (test_bit(IO_WORKER_F_FREE, &worker->flags))
hlist_nulls_del_rcu(&worker->nulls_node);
list_del_rcu(&worker->all_list);
raw_spin_unlock(&wq->lock);
@@ -410,7 +411,7 @@ static void io_wq_dec_running(struct io_worker *worker)
struct io_wq_acct *acct = io_wq_get_acct(worker);
struct io_wq *wq = worker->wq;
- if (!(worker->flags & IO_WORKER_F_UP))
+ if (!test_bit(IO_WORKER_F_UP, &worker->flags))
return;
if (!atomic_dec_and_test(&acct->nr_running))
@@ -430,8 +431,8 @@ static void io_wq_dec_running(struct io_worker *worker)
*/
static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker)
{
- if (worker->flags & IO_WORKER_F_FREE) {
- worker->flags &= ~IO_WORKER_F_FREE;
+ if (test_bit(IO_WORKER_F_FREE, &worker->flags)) {
+ clear_bit(IO_WORKER_F_FREE, &worker->flags);
raw_spin_lock(&wq->lock);
hlist_nulls_del_init_rcu(&worker->nulls_node);
raw_spin_unlock(&wq->lock);
@@ -444,8 +445,8 @@ static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker)
static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker)
__must_hold(wq->lock)
{
- if (!(worker->flags & IO_WORKER_F_FREE)) {
- worker->flags |= IO_WORKER_F_FREE;
+ if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) {
+ set_bit(IO_WORKER_F_FREE, &worker->flags);
hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
}
}
@@ -539,7 +540,6 @@ static void io_assign_current_work(struct io_worker *worker,
raw_spin_lock(&worker->lock);
worker->cur_work = work;
- worker->next_work = NULL;
raw_spin_unlock(&worker->lock);
}
@@ -564,10 +564,7 @@ static void io_worker_handle_work(struct io_wq_acct *acct,
* clear the stalled flag.
*/
work = io_get_next_work(acct, worker);
- raw_spin_unlock(&acct->lock);
if (work) {
- __io_worker_busy(wq, worker);
-
/*
* Make sure cancelation can find this, even before
* it becomes the active work. That avoids a window
@@ -576,11 +573,17 @@ static void io_worker_handle_work(struct io_wq_acct *acct,
* current work item for this worker.
*/
raw_spin_lock(&worker->lock);
- worker->next_work = work;
+ worker->cur_work = work;
raw_spin_unlock(&worker->lock);
- } else {
- break;
}
+
+ raw_spin_unlock(&acct->lock);
+
+ if (!work)
+ break;
+
+ __io_worker_busy(wq, worker);
+
io_assign_current_work(worker, work);
__set_current_state(TASK_RUNNING);
@@ -631,7 +634,8 @@ static int io_wq_worker(void *data)
bool exit_mask = false, last_timeout = false;
char buf[TASK_COMM_LEN];
- worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
+ set_mask_bits(&worker->flags, 0,
+ BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING));
snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
set_task_comm(current, buf);
@@ -695,11 +699,11 @@ void io_wq_worker_running(struct task_struct *tsk)
if (!worker)
return;
- if (!(worker->flags & IO_WORKER_F_UP))
+ if (!test_bit(IO_WORKER_F_UP, &worker->flags))
return;
- if (worker->flags & IO_WORKER_F_RUNNING)
+ if (test_bit(IO_WORKER_F_RUNNING, &worker->flags))
return;
- worker->flags |= IO_WORKER_F_RUNNING;
+ set_bit(IO_WORKER_F_RUNNING, &worker->flags);
io_wq_inc_running(worker);
}
@@ -713,12 +717,12 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
if (!worker)
return;
- if (!(worker->flags & IO_WORKER_F_UP))
+ if (!test_bit(IO_WORKER_F_UP, &worker->flags))
return;
- if (!(worker->flags & IO_WORKER_F_RUNNING))
+ if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags))
return;
- worker->flags &= ~IO_WORKER_F_RUNNING;
+ clear_bit(IO_WORKER_F_RUNNING, &worker->flags);
io_wq_dec_running(worker);
}
@@ -732,7 +736,7 @@ static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker,
raw_spin_lock(&wq->lock);
hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
list_add_tail_rcu(&worker->all_list, &wq->all_list);
- worker->flags |= IO_WORKER_F_FREE;
+ set_bit(IO_WORKER_F_FREE, &worker->flags);
raw_spin_unlock(&wq->lock);
wake_up_new_task(tsk);
}
@@ -742,7 +746,7 @@ static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
return true;
}
-static inline bool io_should_retry_thread(long err)
+static inline bool io_should_retry_thread(struct io_worker *worker, long err)
{
/*
* Prevent perpetual task_work retry, if the task (or its group) is
@@ -750,6 +754,8 @@ static inline bool io_should_retry_thread(long err)
*/
if (fatal_signal_pending(current))
return false;
+ if (worker->init_retries++ >= WORKER_INIT_LIMIT)
+ return false;
switch (err) {
case -EAGAIN:
@@ -776,7 +782,7 @@ static void create_worker_cont(struct callback_head *cb)
io_init_new_worker(wq, worker, tsk);
io_worker_release(worker);
return;
- } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
+ } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
struct io_wq_acct *acct = io_wq_get_acct(worker);
atomic_dec(&acct->nr_running);
@@ -838,12 +844,12 @@ fail:
init_completion(&worker->ref_done);
if (index == IO_WQ_ACCT_BOUND)
- worker->flags |= IO_WORKER_F_BOUND;
+ set_bit(IO_WORKER_F_BOUND, &worker->flags);
tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
if (!IS_ERR(tsk)) {
io_init_new_worker(wq, worker, tsk);
- } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
+ } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
kfree(worker);
goto fail;
} else {
@@ -924,8 +930,12 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
struct io_wq_acct *acct = io_work_get_acct(wq, work);
- struct io_cb_cancel_data match;
- unsigned work_flags = work->flags;
+ unsigned long work_flags = work->flags;
+ struct io_cb_cancel_data match = {
+ .fn = io_wq_work_match_item,
+ .data = work,
+ .cancel_all = false,
+ };
bool do_create;
/*
@@ -963,10 +973,6 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
raw_spin_unlock(&wq->lock);
/* fatal condition, failed to create the first worker */
- match.fn = io_wq_work_match_item,
- match.data = work,
- match.cancel_all = false,
-
io_acct_cancel_pending_work(wq, acct, &match);
}
}
@@ -1005,8 +1011,7 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
* may dereference the passed in work.
*/
raw_spin_lock(&worker->lock);
- if (__io_wq_worker_cancel(worker, match, worker->cur_work) ||
- __io_wq_worker_cancel(worker, match, worker->next_work))
+ if (__io_wq_worker_cancel(worker, match, worker->cur_work))
match->nr_running++;
raw_spin_unlock(&worker->lock);