diff options
Diffstat (limited to 'fs/bcachefs/journal.c')
-rw-r--r-- | fs/bcachefs/journal.c | 286 |
1 files changed, 172 insertions, 114 deletions
diff --git a/fs/bcachefs/journal.c b/fs/bcachefs/journal.c index bc890776eb..a8b08e76d0 100644 --- a/fs/bcachefs/journal.c +++ b/fs/bcachefs/journal.c @@ -27,33 +27,71 @@ static const char * const bch2_journal_errors[] = { NULL }; +static inline bool journal_seq_unwritten(struct journal *j, u64 seq) +{ + return seq > j->seq_ondisk; +} + +static bool __journal_entry_is_open(union journal_res_state state) +{ + return state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL; +} + +static inline unsigned nr_unwritten_journal_entries(struct journal *j) +{ + return atomic64_read(&j->seq) - j->seq_ondisk; +} + +static bool journal_entry_is_open(struct journal *j) +{ + return __journal_entry_is_open(j->reservations); +} + static void bch2_journal_buf_to_text(struct printbuf *out, struct journal *j, u64 seq) { union journal_res_state s = READ_ONCE(j->reservations); unsigned i = seq & JOURNAL_BUF_MASK; struct journal_buf *buf = j->buf + i; - prt_printf(out, "seq:"); + prt_str(out, "seq:"); prt_tab(out); prt_printf(out, "%llu", seq); prt_newline(out); printbuf_indent_add(out, 2); - prt_printf(out, "refcount:"); + prt_str(out, "refcount:"); prt_tab(out); prt_printf(out, "%u", journal_state_count(s, i)); prt_newline(out); - prt_printf(out, "size:"); + prt_str(out, "size:"); prt_tab(out); prt_human_readable_u64(out, vstruct_bytes(buf->data)); prt_newline(out); - prt_printf(out, "expires"); + prt_str(out, "expires:"); prt_tab(out); prt_printf(out, "%li jiffies", buf->expires - jiffies); prt_newline(out); + prt_str(out, "flags:"); + prt_tab(out); + if (buf->noflush) + prt_str(out, "noflush "); + if (buf->must_flush) + prt_str(out, "must_flush "); + if (buf->separate_flush) + prt_str(out, "separate_flush "); + if (buf->need_flush_to_write_buffer) + prt_str(out, "need_flush_to_write_buffer "); + if (buf->write_started) + prt_str(out, "write_started "); + if (buf->write_allocated) + prt_str(out, "write allocated "); + if (buf->write_done) + prt_str(out, "write done"); + prt_newline(out); + printbuf_indent_sub(out, 2); } @@ -66,26 +104,7 @@ static void bch2_journal_bufs_to_text(struct printbuf *out, struct journal *j) seq <= journal_cur_seq(j); seq++) bch2_journal_buf_to_text(out, j, seq); -} - -static inline bool journal_seq_unwritten(struct journal *j, u64 seq) -{ - return seq > j->seq_ondisk; -} - -static bool __journal_entry_is_open(union journal_res_state state) -{ - return state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL; -} - -static inline unsigned nr_unwritten_journal_entries(struct journal *j) -{ - return atomic64_read(&j->seq) - j->seq_ondisk; -} - -static bool journal_entry_is_open(struct journal *j) -{ - return __journal_entry_is_open(j->reservations); + prt_printf(out, "last buf %s\n", journal_entry_is_open(j) ? "open" : "closed"); } static inline struct journal_buf * @@ -174,21 +193,40 @@ journal_error_check_stuck(struct journal *j, int error, unsigned flags) return stuck; } +void bch2_journal_do_writes(struct journal *j) +{ + for (u64 seq = journal_last_unwritten_seq(j); + seq <= journal_cur_seq(j); + seq++) { + unsigned idx = seq & JOURNAL_BUF_MASK; + struct journal_buf *w = j->buf + idx; + + if (w->write_started && !w->write_allocated) + break; + if (w->write_started) + continue; + + if (!journal_state_count(j->reservations, idx)) { + w->write_started = true; + closure_call(&w->io, bch2_journal_write, j->wq, NULL); + } + + break; + } +} + /* * Final processing when the last reference of a journal buffer has been * dropped. Drop the pin list reference acquired at journal entry open and write * the buffer, if requested. */ -void bch2_journal_buf_put_final(struct journal *j, u64 seq, bool write) +void bch2_journal_buf_put_final(struct journal *j, u64 seq) { - struct bch_fs *c = container_of(j, struct bch_fs, journal); - lockdep_assert_held(&j->lock); if (__bch2_journal_pin_put(j, seq)) bch2_journal_reclaim_fast(j); - if (write) - closure_call(&j->io, bch2_journal_write, c->io_complete_wq, NULL); + bch2_journal_do_writes(j); } /* @@ -380,11 +418,14 @@ static int journal_entry_open(struct journal *j) BUG_ON(j->buf + (journal_cur_seq(j) & JOURNAL_BUF_MASK) != buf); bkey_extent_init(&buf->key); - buf->noflush = false; - buf->must_flush = false; - buf->separate_flush = false; - buf->flush_time = 0; + buf->noflush = false; + buf->must_flush = false; + buf->separate_flush = false; + buf->flush_time = 0; buf->need_flush_to_write_buffer = true; + buf->write_started = false; + buf->write_allocated = false; + buf->write_done = false; memset(buf->data, 0, sizeof(*buf->data)); buf->data->seq = cpu_to_le64(journal_cur_seq(j)); @@ -418,9 +459,10 @@ static int journal_entry_open(struct journal *j) } while ((v = atomic64_cmpxchg(&j->reservations.counter, old.v, new.v)) != old.v); - mod_delayed_work(c->io_complete_wq, - &j->write_work, - msecs_to_jiffies(c->opts.journal_flush_delay)); + if (nr_unwritten_journal_entries(j) == 1) + mod_delayed_work(j->wq, + &j->write_work, + msecs_to_jiffies(c->opts.journal_flush_delay)); journal_wake(j); if (j->early_journal_entries.nr) @@ -445,20 +487,16 @@ static void journal_quiesce(struct journal *j) static void journal_write_work(struct work_struct *work) { struct journal *j = container_of(work, struct journal, write_work.work); - struct bch_fs *c = container_of(j, struct bch_fs, journal); - long delta; spin_lock(&j->lock); - if (!__journal_entry_is_open(j->reservations)) - goto unlock; - - delta = journal_cur_buf(j)->expires - jiffies; + if (__journal_entry_is_open(j->reservations)) { + long delta = journal_cur_buf(j)->expires - jiffies; - if (delta > 0) - mod_delayed_work(c->io_complete_wq, &j->write_work, delta); - else - __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true); -unlock: + if (delta > 0) + mod_delayed_work(j->wq, &j->write_work, delta); + else + __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true); + } spin_unlock(&j->lock); } @@ -476,30 +514,29 @@ retry: if (bch2_journal_error(j)) return -BCH_ERR_erofs_journal_err; - spin_lock(&j->lock); + if (j->blocked) + return -BCH_ERR_journal_res_get_blocked; - /* check once more in case somebody else shut things down... */ - if (bch2_journal_error(j)) { - spin_unlock(&j->lock); - return -BCH_ERR_erofs_journal_err; + if ((flags & BCH_WATERMARK_MASK) < j->watermark) { + ret = JOURNAL_ERR_journal_full; + can_discard = j->can_discard; + goto out; + } + + if (nr_unwritten_journal_entries(j) == ARRAY_SIZE(j->buf) && !journal_entry_is_open(j)) { + ret = JOURNAL_ERR_max_in_flight; + goto out; } + spin_lock(&j->lock); + /* * Recheck after taking the lock, so we don't race with another thread * that just did journal_entry_open() and call bch2_journal_entry_close() * unnecessarily */ if (journal_res_get_fast(j, res, flags)) { - spin_unlock(&j->lock); - return 0; - } - - if ((flags & BCH_WATERMARK_MASK) < j->watermark) { - /* - * Don't want to close current journal entry, just need to - * invoke reclaim: - */ - ret = JOURNAL_ERR_journal_full; + ret = 0; goto unlock; } @@ -515,30 +552,30 @@ retry: j->buf_size_want = max(j->buf_size_want, buf->buf_size << 1); __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, false); - ret = journal_entry_open(j); - - if (ret == JOURNAL_ERR_max_in_flight) { - track_event_change(&c->times[BCH_TIME_blocked_journal_max_in_flight], - &j->max_in_flight_start, true); - if (trace_journal_entry_full_enabled()) { - struct printbuf buf = PRINTBUF; - buf.atomic++; - - bch2_journal_bufs_to_text(&buf, j); - trace_journal_entry_full(c, buf.buf); - printbuf_exit(&buf); - } - count_event(c, journal_entry_full); - } + ret = journal_entry_open(j) ?: JOURNAL_ERR_retry; unlock: can_discard = j->can_discard; spin_unlock(&j->lock); - - if (!ret) +out: + if (ret == JOURNAL_ERR_retry) goto retry; + if (!ret) + return 0; + if (journal_error_check_stuck(j, ret, flags)) ret = -BCH_ERR_journal_res_get_blocked; + if (ret == JOURNAL_ERR_max_in_flight && + track_event_change(&c->times[BCH_TIME_blocked_journal_max_in_flight], true)) { + + struct printbuf buf = PRINTBUF; + prt_printf(&buf, "seq %llu\n", journal_cur_seq(j)); + bch2_journal_bufs_to_text(&buf, j); + trace_journal_entry_full(c, buf.buf); + printbuf_exit(&buf); + count_event(c, journal_entry_full); + } + /* * Journal is full - can't rely on reclaim from work item due to * freezing: @@ -669,12 +706,18 @@ recheck_need_open: spin_unlock(&j->lock); + /* + * We're called from bch2_journal_flush_seq() -> wait_event(); + * but this might block. We won't usually block, so we won't + * livelock: + */ + sched_annotate_sleep(); ret = bch2_journal_res_get(j, &res, jset_u64s(0), 0); if (ret) return ret; seq = res.seq; - buf = j->buf + (seq & JOURNAL_BUF_MASK); + buf = journal_seq_to_buf(j, seq); buf->must_flush = true; if (!buf->flush_time) { @@ -692,8 +735,8 @@ recheck_need_open: } /* - * if write was kicked off without a flush, flush the next sequence - * number instead + * if write was kicked off without a flush, or if we promised it + * wouldn't be a flush, flush the next sequence number instead */ buf = journal_seq_to_buf(j, seq); if (buf->noflush) { @@ -771,8 +814,8 @@ bool bch2_journal_noflush_seq(struct journal *j, u64 seq) unwritten_seq++) { struct journal_buf *buf = journal_seq_to_buf(j, unwritten_seq); - /* journal write is already in flight, and was a flush write: */ - if (unwritten_seq == journal_last_unwritten_seq(j) && !buf->noflush) + /* journal flush already in flight, or flush requseted */ + if (buf->must_flush) goto out; buf->noflush = true; @@ -833,6 +876,8 @@ static struct journal_buf *__bch2_next_write_buffer_flush_journal_buf(struct jou { struct journal_buf *ret = NULL; + /* We're inside wait_event(), but using mutex_lock(: */ + sched_annotate_sleep(); mutex_lock(&j->buf_lock); spin_lock(&j->lock); max_seq = min(max_seq, journal_cur_seq(j)); @@ -1157,13 +1202,12 @@ int bch2_fs_journal_start(struct journal *j, u64 cur_seq) struct journal_replay *i, **_i; struct genradix_iter iter; bool had_entries = false; - unsigned ptr; u64 last_seq = cur_seq, nr, seq; genradix_for_each_reverse(&c->journal_entries, iter, _i) { i = *_i; - if (!i || i->ignore) + if (journal_replay_ignore(i)) continue; last_seq = le64_to_cpu(i->j.last_seq); @@ -1196,7 +1240,7 @@ int bch2_fs_journal_start(struct journal *j, u64 cur_seq) genradix_for_each(&c->journal_entries, iter, _i) { i = *_i; - if (!i || i->ignore) + if (journal_replay_ignore(i)) continue; seq = le64_to_cpu(i->j.seq); @@ -1211,8 +1255,8 @@ int bch2_fs_journal_start(struct journal *j, u64 cur_seq) p = journal_seq_pin(j, seq); p->devs.nr = 0; - for (ptr = 0; ptr < i->nr_ptrs; ptr++) - bch2_dev_list_add_dev(&p->devs, i->ptrs[ptr].dev); + darray_for_each(i->ptrs, ptr) + bch2_dev_list_add_dev(&p->devs, ptr->dev); had_entries = true; } @@ -1240,13 +1284,17 @@ int bch2_fs_journal_start(struct journal *j, u64 cur_seq) void bch2_dev_journal_exit(struct bch_dev *ca) { - kfree(ca->journal.bio); - kfree(ca->journal.buckets); - kfree(ca->journal.bucket_seq); + struct journal_device *ja = &ca->journal; + + for (unsigned i = 0; i < ARRAY_SIZE(ja->bio); i++) { + kfree(ja->bio[i]); + ja->bio[i] = NULL; + } - ca->journal.bio = NULL; - ca->journal.buckets = NULL; - ca->journal.bucket_seq = NULL; + kfree(ja->buckets); + kfree(ja->bucket_seq); + ja->buckets = NULL; + ja->bucket_seq = NULL; } int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb) @@ -1256,14 +1304,13 @@ int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb) bch2_sb_field_get(sb, journal); struct bch_sb_field_journal_v2 *journal_buckets_v2 = bch2_sb_field_get(sb, journal_v2); - unsigned i, nr_bvecs; ja->nr = 0; if (journal_buckets_v2) { unsigned nr = bch2_sb_field_journal_v2_nr_entries(journal_buckets_v2); - for (i = 0; i < nr; i++) + for (unsigned i = 0; i < nr; i++) ja->nr += le64_to_cpu(journal_buckets_v2->d[i].nr); } else if (journal_buckets) { ja->nr = bch2_nr_journal_buckets(journal_buckets); @@ -1273,13 +1320,18 @@ int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb) if (!ja->bucket_seq) return -BCH_ERR_ENOMEM_dev_journal_init; - nr_bvecs = DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE); + unsigned nr_bvecs = DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE); - ca->journal.bio = bio_kmalloc(nr_bvecs, GFP_KERNEL); - if (!ca->journal.bio) - return -BCH_ERR_ENOMEM_dev_journal_init; + for (unsigned i = 0; i < ARRAY_SIZE(ja->bio); i++) { + ja->bio[i] = kmalloc(struct_size(ja->bio[i], bio.bi_inline_vecs, + nr_bvecs), GFP_KERNEL); + if (!ja->bio[i]) + return -BCH_ERR_ENOMEM_dev_journal_init; - bio_init(ca->journal.bio, NULL, ca->journal.bio->bi_inline_vecs, nr_bvecs, 0); + ja->bio[i]->ca = ca; + ja->bio[i]->buf_idx = i; + bio_init(&ja->bio[i]->bio, NULL, ja->bio[i]->bio.bi_inline_vecs, nr_bvecs, 0); + } ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL); if (!ja->buckets) @@ -1287,14 +1339,14 @@ int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb) if (journal_buckets_v2) { unsigned nr = bch2_sb_field_journal_v2_nr_entries(journal_buckets_v2); - unsigned j, dst = 0; + unsigned dst = 0; - for (i = 0; i < nr; i++) - for (j = 0; j < le64_to_cpu(journal_buckets_v2->d[i].nr); j++) + for (unsigned i = 0; i < nr; i++) + for (unsigned j = 0; j < le64_to_cpu(journal_buckets_v2->d[i].nr); j++) ja->buckets[dst++] = le64_to_cpu(journal_buckets_v2->d[i].start) + j; } else if (journal_buckets) { - for (i = 0; i < ja->nr; i++) + for (unsigned i = 0; i < ja->nr; i++) ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]); } @@ -1303,19 +1355,19 @@ int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb) void bch2_fs_journal_exit(struct journal *j) { - unsigned i; + if (j->wq) + destroy_workqueue(j->wq); darray_exit(&j->early_journal_entries); - for (i = 0; i < ARRAY_SIZE(j->buf); i++) - kvpfree(j->buf[i].data, j->buf[i].buf_size); + for (unsigned i = 0; i < ARRAY_SIZE(j->buf); i++) + kvfree(j->buf[i].data); free_fifo(&j->pin); } int bch2_fs_journal_init(struct journal *j) { static struct lock_class_key res_key; - unsigned i; mutex_init(&j->buf_lock); spin_lock_init(&j->lock); @@ -1336,14 +1388,20 @@ int bch2_fs_journal_init(struct journal *j) if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL))) return -BCH_ERR_ENOMEM_journal_pin_fifo; - for (i = 0; i < ARRAY_SIZE(j->buf); i++) { + for (unsigned i = 0; i < ARRAY_SIZE(j->buf); i++) { j->buf[i].buf_size = JOURNAL_ENTRY_SIZE_MIN; - j->buf[i].data = kvpmalloc(j->buf[i].buf_size, GFP_KERNEL); + j->buf[i].data = kvmalloc(j->buf[i].buf_size, GFP_KERNEL); if (!j->buf[i].data) return -BCH_ERR_ENOMEM_journal_buf; + j->buf[i].idx = i; } j->pin.front = j->pin.back = 1; + + j->wq = alloc_workqueue("bcachefs_journal", + WQ_HIGHPRI|WQ_FREEZABLE|WQ_UNBOUND|WQ_MEM_RECLAIM, 512); + if (!j->wq) + return -BCH_ERR_ENOMEM_fs_other_alloc; return 0; } @@ -1381,6 +1439,7 @@ void __bch2_journal_debug_to_text(struct printbuf *out, struct journal *j) prt_printf(out, "reclaim kicked:\t\t%u\n", j->reclaim_kicked); prt_printf(out, "reclaim runs in:\t%u ms\n", time_after(j->next_reclaim, now) ? jiffies_to_msecs(j->next_reclaim - jiffies) : 0); + prt_printf(out, "blocked:\t\t%u\n", j->blocked); prt_printf(out, "current entry sectors:\t%u\n", j->cur_entry_sectors); prt_printf(out, "current entry error:\t%s\n", bch2_journal_errors[j->cur_entry_error]); prt_printf(out, "current entry:\t\t"); @@ -1455,7 +1514,6 @@ bool bch2_journal_seq_pins_to_text(struct printbuf *out, struct journal *j, u64 { struct journal_entry_pin_list *pin_list; struct journal_entry_pin *pin; - unsigned i; spin_lock(&j->lock); *seq = max(*seq, j->pin.front); @@ -1473,7 +1531,7 @@ bool bch2_journal_seq_pins_to_text(struct printbuf *out, struct journal *j, u64 prt_newline(out); printbuf_indent_add(out, 2); - for (i = 0; i < ARRAY_SIZE(pin_list->list); i++) + for (unsigned i = 0; i < ARRAY_SIZE(pin_list->list); i++) list_for_each_entry(pin, &pin_list->list[i], list) { prt_printf(out, "\t%px %ps", pin, pin->flush); prt_newline(out); |