diff options
Diffstat (limited to 'fs/dlm/recoverd.c')
-rw-r--r-- | fs/dlm/recoverd.c | 142 |
1 files changed, 116 insertions, 26 deletions
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index 4d17491dea..17a40d1e60 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -20,6 +20,67 @@ #include "requestqueue.h" #include "recoverd.h" +static int dlm_create_masters_list(struct dlm_ls *ls) +{ + struct dlm_rsb *r; + int error = 0; + + write_lock_bh(&ls->ls_masters_lock); + if (!list_empty(&ls->ls_masters_list)) { + log_error(ls, "root list not empty"); + error = -EINVAL; + goto out; + } + + read_lock_bh(&ls->ls_rsbtbl_lock); + list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { + if (r->res_nodeid) + continue; + + list_add(&r->res_masters_list, &ls->ls_masters_list); + dlm_hold_rsb(r); + } + read_unlock_bh(&ls->ls_rsbtbl_lock); + out: + write_unlock_bh(&ls->ls_masters_lock); + return error; +} + +static void dlm_release_masters_list(struct dlm_ls *ls) +{ + struct dlm_rsb *r, *safe; + + write_lock_bh(&ls->ls_masters_lock); + list_for_each_entry_safe(r, safe, &ls->ls_masters_list, res_masters_list) { + list_del_init(&r->res_masters_list); + dlm_put_rsb(r); + } + write_unlock_bh(&ls->ls_masters_lock); +} + +static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list) +{ + struct dlm_rsb *r; + + read_lock_bh(&ls->ls_rsbtbl_lock); + list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { + list_add(&r->res_root_list, root_list); + dlm_hold_rsb(r); + } + + WARN_ON_ONCE(!list_empty(&ls->ls_toss)); + read_unlock_bh(&ls->ls_rsbtbl_lock); +} + +static void dlm_release_root_list(struct list_head *root_list) +{ + struct dlm_rsb *r, *safe; + + list_for_each_entry_safe(r, safe, root_list, res_root_list) { + list_del_init(&r->res_root_list); + dlm_put_rsb(r); + } +} /* If the start for which we're re-enabling locking (seq) has been superseded by a newer stop (ls_recover_seq), we need to leave locking disabled. @@ -32,24 +93,35 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq) { int error = -EINTR; - down_write(&ls->ls_recv_active); + write_lock_bh(&ls->ls_recv_active); - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); if (ls->ls_recover_seq == seq) { set_bit(LSFL_RUNNING, &ls->ls_flags); + /* Schedule next timer if recovery put something on toss. + * + * The rsbs that was queued while recovery on toss hasn't + * started yet because LSFL_RUNNING was set everything + * else recovery hasn't started as well because ls_in_recovery + * is still hold. So we should not run into the case that + * dlm_timer_resume() queues a timer that can occur in + * a no op. + */ + dlm_timer_resume(ls); /* unblocks processes waiting to enter the dlm */ up_write(&ls->ls_in_recovery); clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags); error = 0; } - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); - up_write(&ls->ls_recv_active); + write_unlock_bh(&ls->ls_recv_active); return error; } static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) { + LIST_HEAD(root_list); unsigned long start; int error, neg = 0; @@ -66,7 +138,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) * routines. */ - dlm_create_root_list(ls); + dlm_create_root_list(ls, &root_list); /* * Add or remove nodes from the lockspace's ls_nodes list. @@ -82,10 +154,25 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) goto fail; } - dlm_recover_dir_nodeid(ls); + dlm_recover_dir_nodeid(ls, &root_list); + + /* Create a snapshot of all active rsbs were we are the master of. + * During the barrier between dlm_recover_members_wait() and + * dlm_recover_directory() other nodes can dump their necessary + * directory dlm_rsb (r->res_dir_nodeid == nodeid) in rcom + * communication dlm_copy_master_names() handling. + * + * TODO We should create a per lockspace list that contains rsbs + * that we are the master of. Instead of creating this list while + * recovery we keep track of those rsbs while locking handling and + * recovery can use it when necessary. + */ + error = dlm_create_masters_list(ls); + if (error) { + log_rinfo(ls, "dlm_create_masters_list error %d", error); + goto fail_root_list; + } - ls->ls_recover_dir_sent_res = 0; - ls->ls_recover_dir_sent_msg = 0; ls->ls_recover_locks_in = 0; dlm_set_recover_status(ls, DLM_RS_NODES); @@ -93,7 +180,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_members_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_members_wait error %d", error); - goto fail; + dlm_release_masters_list(ls); + goto fail_root_list; } start = jiffies; @@ -106,7 +194,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_directory(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_directory error %d", error); - goto fail; + dlm_release_masters_list(ls); + goto fail_root_list; } dlm_set_recover_status(ls, DLM_RS_DIR); @@ -114,11 +203,11 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_directory_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_directory_wait error %d", error); - goto fail; + dlm_release_masters_list(ls); + goto fail_root_list; } - log_rinfo(ls, "dlm_recover_directory %u out %u messages", - ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg); + dlm_release_masters_list(ls); /* * We may have outstanding operations that are waiting for a reply from @@ -130,7 +219,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) if (dlm_recovery_stopped(ls)) { error = -EINTR; - goto fail; + goto fail_root_list; } if (neg || dlm_no_directory(ls)) { @@ -138,27 +227,27 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) * Clear lkb's for departed nodes. */ - dlm_recover_purge(ls); + dlm_recover_purge(ls, &root_list); /* * Get new master nodeid's for rsb's that were mastered on * departed nodes. */ - error = dlm_recover_masters(ls, rv->seq); + error = dlm_recover_masters(ls, rv->seq, &root_list); if (error) { log_rinfo(ls, "dlm_recover_masters error %d", error); - goto fail; + goto fail_root_list; } /* * Send our locks on remastered rsb's to the new masters. */ - error = dlm_recover_locks(ls, rv->seq); + error = dlm_recover_locks(ls, rv->seq, &root_list); if (error) { log_rinfo(ls, "dlm_recover_locks error %d", error); - goto fail; + goto fail_root_list; } dlm_set_recover_status(ls, DLM_RS_LOCKS); @@ -166,7 +255,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_locks_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_locks_wait error %d", error); - goto fail; + goto fail_root_list; } log_rinfo(ls, "dlm_recover_locks %u in", @@ -178,7 +267,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) * settings. */ - dlm_recover_rsbs(ls); + dlm_recover_rsbs(ls, &root_list); } else { /* * Other lockspace members may be going through the "neg" steps @@ -190,11 +279,11 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_locks_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_locks_wait error %d", error); - goto fail; + goto fail_root_list; } } - dlm_release_root_list(ls); + dlm_release_root_list(&root_list); /* * Purge directory-related requests that are saved in requestqueue. @@ -243,8 +332,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) return 0; + fail_root_list: + dlm_release_root_list(&root_list); fail: - dlm_release_root_list(ls); mutex_unlock(&ls->ls_recoverd_active); return error; @@ -259,12 +349,12 @@ static void do_ls_recovery(struct dlm_ls *ls) struct dlm_recover *rv = NULL; int error; - spin_lock(&ls->ls_recover_lock); + spin_lock_bh(&ls->ls_recover_lock); rv = ls->ls_recover_args; ls->ls_recover_args = NULL; if (rv && ls->ls_recover_seq == rv->seq) clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags); - spin_unlock(&ls->ls_recover_lock); + spin_unlock_bh(&ls->ls_recover_lock); if (rv) { error = ls_recover(ls, rv); |