diff options
Diffstat (limited to 'fs/dlm')
-rw-r--r-- | fs/dlm/dlm_internal.h | 2 | ||||
-rw-r--r-- | fs/dlm/lock.c | 108 | ||||
-rw-r--r-- | fs/dlm/plock.c | 44 |
3 files changed, 98 insertions, 56 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 511d0b984f..a9137c90f3 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -247,7 +247,7 @@ struct dlm_lkb { int8_t lkb_highbast; /* highest mode bast sent for */ int8_t lkb_wait_type; /* type of reply waiting for */ - atomic_t lkb_wait_count; + int8_t lkb_wait_count; int lkb_wait_nodeid; /* for debugging */ struct list_head lkb_statequeue; /* rsb g/c/w list */ diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 652c51fbbf..fd752dd038 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1407,7 +1407,6 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error = 0; - int wc; mutex_lock(&ls->ls_waiters_mutex); @@ -1429,17 +1428,20 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) error = -EBUSY; goto out; } - wc = atomic_inc_return(&lkb->lkb_wait_count); + lkb->lkb_wait_count++; hold_lkb(lkb); log_debug(ls, "addwait %x cur %d overlap %d count %d f %x", - lkb->lkb_id, lkb->lkb_wait_type, mstype, wc, - dlm_iflags_val(lkb)); + lkb->lkb_id, lkb->lkb_wait_type, mstype, + lkb->lkb_wait_count, dlm_iflags_val(lkb)); goto out; } - wc = atomic_fetch_inc(&lkb->lkb_wait_count); - DLM_ASSERT(!wc, dlm_print_lkb(lkb); printk("wait_count %d\n", wc);); + DLM_ASSERT(!lkb->lkb_wait_count, + dlm_print_lkb(lkb); + printk("wait_count %d\n", lkb->lkb_wait_count);); + + lkb->lkb_wait_count++; lkb->lkb_wait_type = mstype; lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */ hold_lkb(lkb); @@ -1502,7 +1504,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, log_debug(ls, "remwait %x convert_reply zap overlap_cancel", lkb->lkb_id); lkb->lkb_wait_type = 0; - atomic_dec(&lkb->lkb_wait_count); + lkb->lkb_wait_count--; unhold_lkb(lkb); goto out_del; } @@ -1529,15 +1531,16 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, if (overlap_done && lkb->lkb_wait_type) { log_error(ls, "remwait error %x reply %d wait_type %d overlap", lkb->lkb_id, mstype, lkb->lkb_wait_type); - atomic_dec(&lkb->lkb_wait_count); + lkb->lkb_wait_count--; unhold_lkb(lkb); lkb->lkb_wait_type = 0; } - DLM_ASSERT(atomic_read(&lkb->lkb_wait_count), dlm_print_lkb(lkb);); + DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); - if (atomic_dec_and_test(&lkb->lkb_wait_count)) + lkb->lkb_wait_count--; + if (!lkb->lkb_wait_count) list_del_init(&lkb->lkb_wait_reply); unhold_lkb(lkb); return 0; @@ -2666,7 +2669,7 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, goto out; /* lock not allowed if there's any op in progress */ - if (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count)) + if (lkb->lkb_wait_type || lkb->lkb_wait_count) goto out; if (is_overlap(lkb)) @@ -2728,7 +2731,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) /* normal unlock not allowed if there's any op in progress */ if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) && - (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count))) + (lkb->lkb_wait_type || lkb->lkb_wait_count)) goto out; /* an lkb may be waiting for an rsb lookup to complete where the @@ -5011,21 +5014,32 @@ static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) return lkb; } -/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the - master or dir-node for r. Processing the lkb may result in it being placed - back on waiters. */ - -/* We do this after normal locking has been enabled and any saved messages - (in requestqueue) have been processed. We should be confident that at - this point we won't get or process a reply to any of these waiting - operations. But, new ops may be coming in on the rsbs/locks here from - userspace or remotely. */ - -/* there may have been an overlap unlock/cancel prior to recovery or after - recovery. if before, the lkb may still have a pos wait_count; if after, the - overlap flag would just have been set and nothing new sent. we can be - confident here than any replies to either the initial op or overlap ops - prior to recovery have been received. */ +/* + * Forced state reset for locks that were in the middle of remote operations + * when recovery happened (i.e. lkbs that were on the waiters list, waiting + * for a reply from a remote operation.) The lkbs remaining on the waiters + * list need to be reevaluated; some may need resending to a different node + * than previously, and some may now need local handling rather than remote. + * + * First, the lkb state for the voided remote operation is forcibly reset, + * equivalent to what remove_from_waiters() would normally do: + * . lkb removed from ls_waiters list + * . lkb wait_type cleared + * . lkb waiters_count cleared + * . lkb ref count decremented for each waiters_count (almost always 1, + * but possibly 2 in case of cancel/unlock overlapping, which means + * two remote replies were being expected for the lkb.) + * + * Second, the lkb is reprocessed like an original operation would be, + * by passing it to _request_lock or _convert_lock, which will either + * process the lkb operation locally, or send it to a remote node again + * and put the lkb back onto the waiters list. + * + * When reprocessing the lkb, we may find that it's flagged for an overlapping + * force-unlock or cancel, either from before recovery began, or after recovery + * finished. If this is the case, the unlock/cancel is done directly, and the + * original operation is not initiated again (no _request_lock/_convert_lock.) + */ int dlm_recover_waiters_post(struct dlm_ls *ls) { @@ -5040,6 +5054,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) break; } + /* + * Find an lkb from the waiters list that's been affected by + * recovery node changes, and needs to be reprocessed. Does + * hold_lkb(), adding a refcount. + */ lkb = find_resend_waiter(ls); if (!lkb) break; @@ -5048,6 +5067,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) hold_rsb(r); lock_rsb(r); + /* + * If the lkb has been flagged for a force unlock or cancel, + * then the reprocessing below will be replaced by just doing + * the unlock/cancel directly. + */ mstype = lkb->lkb_wait_type; oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); @@ -5061,22 +5085,40 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid, dlm_dir_nodeid(r), oc, ou); - /* At this point we assume that we won't get a reply to any - previous op or overlap op on this lock. First, do a big - remove_from_waiters() for all previous ops. */ + /* + * No reply to the pre-recovery operation will now be received, + * so a forced equivalent of remove_from_waiters() is needed to + * reset the waiters state that was in place before recovery. + */ clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); + + /* Forcibly clear wait_type */ lkb->lkb_wait_type = 0; - /* drop all wait_count references we still - * hold a reference for this iteration. + + /* + * Forcibly reset wait_count and associated refcount. The + * wait_count will almost always be 1, but in case of an + * overlapping unlock/cancel it could be 2: see where + * add_to_waiters() finds the lkb is already on the waiters + * list and does lkb_wait_count++; hold_lkb(). */ - while (!atomic_dec_and_test(&lkb->lkb_wait_count)) + while (lkb->lkb_wait_count) { + lkb->lkb_wait_count--; unhold_lkb(lkb); + } + /* Forcibly remove from waiters list */ mutex_lock(&ls->ls_waiters_mutex); list_del_init(&lkb->lkb_wait_reply); mutex_unlock(&ls->ls_waiters_mutex); + /* + * The lkb is now clear of all prior waiters state and can be + * processed locally, or sent to remote node again, or directly + * cancelled/unlocked. + */ + if (oc || ou) { /* do an unlock or cancel instead of resending */ switch (mstype) { diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index d814c51213..9ca83ef70e 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -138,14 +138,14 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, } op->info.optype = DLM_PLOCK_OP_LOCK; - op->info.pid = fl->fl_pid; - op->info.ex = (fl->fl_type == F_WRLCK); - op->info.wait = !!(fl->fl_flags & FL_SLEEP); + op->info.pid = fl->c.flc_pid; + op->info.ex = lock_is_write(fl); + op->info.wait = !!(fl->c.flc_flags & FL_SLEEP); op->info.fsid = ls->ls_global_id; op->info.number = number; op->info.start = fl->fl_start; op->info.end = fl->fl_end; - op->info.owner = (__u64)(long)fl->fl_owner; + op->info.owner = (__u64)(long) fl->c.flc_owner; /* async handling */ if (fl->fl_lmops && fl->fl_lmops->lm_grant) { op_data = kzalloc(sizeof(*op_data), GFP_NOFS); @@ -258,7 +258,7 @@ static int dlm_plock_callback(struct plock_op *op) } /* got fs lock; bookkeep locally as well: */ - flc->fl_flags &= ~FL_SLEEP; + flc->c.flc_flags &= ~FL_SLEEP; if (posix_lock_file(file, flc, NULL)) { /* * This can only happen in the case of kmalloc() failure. @@ -291,7 +291,7 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file, struct dlm_ls *ls; struct plock_op *op; int rv; - unsigned char fl_flags = fl->fl_flags; + unsigned char saved_flags = fl->c.flc_flags; ls = dlm_find_lockspace_local(lockspace); if (!ls) @@ -304,7 +304,7 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file, } /* cause the vfs unlock to return ENOENT if lock is not found */ - fl->fl_flags |= FL_EXISTS; + fl->c.flc_flags |= FL_EXISTS; rv = locks_lock_file_wait(file, fl); if (rv == -ENOENT) { @@ -317,14 +317,14 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file, } op->info.optype = DLM_PLOCK_OP_UNLOCK; - op->info.pid = fl->fl_pid; + op->info.pid = fl->c.flc_pid; op->info.fsid = ls->ls_global_id; op->info.number = number; op->info.start = fl->fl_start; op->info.end = fl->fl_end; - op->info.owner = (__u64)(long)fl->fl_owner; + op->info.owner = (__u64)(long) fl->c.flc_owner; - if (fl->fl_flags & FL_CLOSE) { + if (fl->c.flc_flags & FL_CLOSE) { op->info.flags |= DLM_PLOCK_FL_CLOSE; send_op(op); rv = 0; @@ -345,7 +345,7 @@ out_free: dlm_release_plock_op(op); out: dlm_put_lockspace(ls); - fl->fl_flags = fl_flags; + fl->c.flc_flags = saved_flags; return rv; } EXPORT_SYMBOL_GPL(dlm_posix_unlock); @@ -375,14 +375,14 @@ int dlm_posix_cancel(dlm_lockspace_t *lockspace, u64 number, struct file *file, return -EINVAL; memset(&info, 0, sizeof(info)); - info.pid = fl->fl_pid; - info.ex = (fl->fl_type == F_WRLCK); + info.pid = fl->c.flc_pid; + info.ex = lock_is_write(fl); info.fsid = ls->ls_global_id; dlm_put_lockspace(ls); info.number = number; info.start = fl->fl_start; info.end = fl->fl_end; - info.owner = (__u64)(long)fl->fl_owner; + info.owner = (__u64)(long) fl->c.flc_owner; rv = do_lock_cancel(&info); switch (rv) { @@ -437,13 +437,13 @@ int dlm_posix_get(dlm_lockspace_t *lockspace, u64 number, struct file *file, } op->info.optype = DLM_PLOCK_OP_GET; - op->info.pid = fl->fl_pid; - op->info.ex = (fl->fl_type == F_WRLCK); + op->info.pid = fl->c.flc_pid; + op->info.ex = lock_is_write(fl); op->info.fsid = ls->ls_global_id; op->info.number = number; op->info.start = fl->fl_start; op->info.end = fl->fl_end; - op->info.owner = (__u64)(long)fl->fl_owner; + op->info.owner = (__u64)(long) fl->c.flc_owner; send_op(op); wait_event(recv_wq, (op->done != 0)); @@ -455,16 +455,16 @@ int dlm_posix_get(dlm_lockspace_t *lockspace, u64 number, struct file *file, rv = op->info.rv; - fl->fl_type = F_UNLCK; + fl->c.flc_type = F_UNLCK; if (rv == -ENOENT) rv = 0; else if (rv > 0) { locks_init_lock(fl); - fl->fl_type = (op->info.ex) ? F_WRLCK : F_RDLCK; - fl->fl_flags = FL_POSIX; - fl->fl_pid = op->info.pid; + fl->c.flc_type = (op->info.ex) ? F_WRLCK : F_RDLCK; + fl->c.flc_flags = FL_POSIX; + fl->c.flc_pid = op->info.pid; if (op->info.nodeid != dlm_our_nodeid()) - fl->fl_pid = -fl->fl_pid; + fl->c.flc_pid = -fl->c.flc_pid; fl->fl_start = op->info.start; fl->fl_end = op->info.end; rv = 0; |