diff options
Diffstat (limited to 'drivers/staging/lustre/lustre/ldlm/ldlm_request.c')
-rw-r--r-- | drivers/staging/lustre/lustre/ldlm/ldlm_request.c | 163 |
1 files changed, 96 insertions, 67 deletions
diff --git a/drivers/staging/lustre/lustre/ldlm/ldlm_request.c b/drivers/staging/lustre/lustre/ldlm/ldlm_request.c index 74e193e52cd6..107314e284a0 100644 --- a/drivers/staging/lustre/lustre/ldlm/ldlm_request.c +++ b/drivers/staging/lustre/lustre/ldlm/ldlm_request.c @@ -153,7 +153,7 @@ static int ldlm_completion_tail(struct ldlm_lock *lock) long delay; int result; - if (lock->l_flags & (LDLM_FL_DESTROYED | LDLM_FL_FAILED)) { + if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) { LDLM_DEBUG(lock, "client-side enqueue: destroyed"); result = -EIO; } else { @@ -252,7 +252,7 @@ noreproc: lwd.lwd_lock = lock; - if (lock->l_flags & LDLM_FL_NO_TIMEOUT) { + if (ldlm_is_no_timeout(lock)) { LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT"); lwi = LWI_INTR(interrupted_completion_wait, &lwd); } else { @@ -269,7 +269,7 @@ noreproc: if (OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST, OBD_FAIL_LDLM_CP_BL_RACE | OBD_FAIL_ONCE)) { - lock->l_flags |= LDLM_FL_FAIL_LOC; + ldlm_set_fail_loc(lock); rc = -EINTR; } else { /* Go to sleep until the lock is granted or cancelled. */ @@ -296,7 +296,7 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns, lock_res_and_lock(lock); /* Check that lock is not granted or failed, we might race. */ if ((lock->l_req_mode != lock->l_granted_mode) && - !(lock->l_flags & LDLM_FL_FAILED)) { + !ldlm_is_failed(lock)) { /* Make sure that this lock will not be found by raced * bl_ast and -EINVAL reply is sent to server anyways. * bug 17645 @@ -347,7 +347,6 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, struct ldlm_lock *lock; struct ldlm_reply *reply; int cleanup_phase = 1; - int size = 0; lock = ldlm_handle2lock(lockh); /* ldlm_cli_enqueue is holding a reference on this lock. */ @@ -375,8 +374,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, goto cleanup; } - if (lvb_len != 0) { - LASSERT(lvb); + if (lvb_len > 0) { + int size = 0; size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER); @@ -390,12 +389,13 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, rc = -EINVAL; goto cleanup; } + lvb_len = size; } if (rc == ELDLM_LOCK_ABORTED) { - if (lvb_len != 0) + if (lvb_len > 0 && lvb) rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER, - lvb, size); + lvb, lvb_len); if (rc == 0) rc = ELDLM_LOCK_ABORTED; goto cleanup; @@ -421,7 +421,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, *flags = ldlm_flags_from_wire(reply->lock_flags); lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags & - LDLM_INHERIT_FLAGS); + LDLM_FL_INHERIT_MASK); /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match() * to wait with no timeout as well */ @@ -489,7 +489,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, /* If the lock has already been granted by a completion AST, don't * clobber the LVB with an older one. */ - if (lvb_len != 0) { + if (lvb_len > 0) { /* We must lock or a racing completion might update lvb without * letting us know and we'll clobber the correct value. * Cannot unlock after the check either, as that still leaves @@ -498,7 +498,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, lock_res_and_lock(lock); if (lock->l_req_mode != lock->l_granted_mode) rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER, - lock->l_lvb_data, size); + lock->l_lvb_data, lvb_len); unlock_res_and_lock(lock); if (rc < 0) { cleanup_phase = 1; @@ -518,7 +518,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, } } - if (lvb_len && lvb) { + if (lvb_len > 0 && lvb) { /* Copy the LVB here, and not earlier, because the completion * AST (if any) can override what we got in the reply */ @@ -601,7 +601,7 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff); flags = ns_connect_lru_resize(ns) ? - LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED; + LDLM_CANCEL_LRUR_NO_WAIT : LDLM_CANCEL_AGED; to_free = !ns_connect_lru_resize(ns) && opc == LDLM_ENQUEUE ? 1 : 0; @@ -821,12 +821,11 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock) LDLM_DEBUG(lock, "client-side cancel"); /* Set this flag to prevent others from getting new references*/ lock_res_and_lock(lock); - lock->l_flags |= LDLM_FL_CBPENDING; + ldlm_set_cbpending(lock); local_only = !!(lock->l_flags & (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK)); ldlm_cancel_callback(lock); - rc = (lock->l_flags & LDLM_FL_BL_AST) ? - LDLM_FL_BL_AST : LDLM_FL_CANCELING; + rc = ldlm_is_bl_ast(lock) ? LDLM_FL_BL_AST : LDLM_FL_CANCELING; unlock_res_and_lock(lock); if (local_only) { @@ -1131,31 +1130,30 @@ EXPORT_SYMBOL(ldlm_cli_cancel_list_local); * dirty data, to close a file, ...) or waiting for any RPCs in-flight (e.g. * readahead requests, ...) */ -static ldlm_policy_res_t ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, - struct ldlm_lock *lock, - int unused, int added, - int count) +static enum ldlm_policy_res +ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, + int unused, int added, int count) { - ldlm_policy_res_t result = LDLM_POLICY_CANCEL_LOCK; - ldlm_cancel_for_recovery cb = ns->ns_cancel_for_recovery; - - lock_res_and_lock(lock); + enum ldlm_policy_res result = LDLM_POLICY_CANCEL_LOCK; /* don't check added & count since we want to process all locks - * from unused list + * from unused list. + * It's fine to not take lock to access lock->l_resource since + * the lock has already been granted so it won't change. */ switch (lock->l_resource->lr_type) { case LDLM_EXTENT: case LDLM_IBITS: - if (cb && cb(lock)) + if (ns->ns_cancel && ns->ns_cancel(lock) != 0) break; default: result = LDLM_POLICY_SKIP_LOCK; - lock->l_flags |= LDLM_FL_SKIPPED; + lock_res_and_lock(lock); + ldlm_set_skipped(lock); + unlock_res_and_lock(lock); break; } - unlock_res_and_lock(lock); return result; } @@ -1168,10 +1166,10 @@ static ldlm_policy_res_t ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, * * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU */ -static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, - struct ldlm_lock *lock, - int unused, int added, - int count) +static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, + struct ldlm_lock *lock, + int unused, int added, + int count) { unsigned long cur = cfs_time_current(); struct ldlm_pool *pl = &ns->ns_pool; @@ -1196,8 +1194,13 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, /* Stop when SLV is not yet come from server or lv is smaller than * it is. */ - return (slv == 0 || lv < slv) ? - LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; + if (slv == 0 || lv < slv) + return LDLM_POLICY_KEEP_LOCK; + + if (ns->ns_cancel && ns->ns_cancel(lock) == 0) + return LDLM_POLICY_KEEP_LOCK; + + return LDLM_POLICY_CANCEL_LOCK; } /** @@ -1209,10 +1212,10 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, * * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU */ -static ldlm_policy_res_t ldlm_cancel_passed_policy(struct ldlm_namespace *ns, - struct ldlm_lock *lock, - int unused, int added, - int count) +static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns, + struct ldlm_lock *lock, + int unused, int added, + int count) { /* Stop LRU processing when we reach past @count or have checked all * locks in LRU. @@ -1230,16 +1233,35 @@ static ldlm_policy_res_t ldlm_cancel_passed_policy(struct ldlm_namespace *ns, * * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU */ -static ldlm_policy_res_t ldlm_cancel_aged_policy(struct ldlm_namespace *ns, - struct ldlm_lock *lock, - int unused, int added, - int count) +static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns, + struct ldlm_lock *lock, + int unused, int added, + int count) { - /* Stop LRU processing if young lock is found and we reach past count */ - return ((added >= count) && - time_before(cfs_time_current(), - cfs_time_add(lock->l_last_used, ns->ns_max_age))) ? - LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; + if ((added >= count) && + time_before(cfs_time_current(), + cfs_time_add(lock->l_last_used, ns->ns_max_age))) + return LDLM_POLICY_KEEP_LOCK; + + if (ns->ns_cancel && ns->ns_cancel(lock) == 0) + return LDLM_POLICY_KEEP_LOCK; + + return LDLM_POLICY_CANCEL_LOCK; +} + +static enum ldlm_policy_res +ldlm_cancel_lrur_no_wait_policy(struct ldlm_namespace *ns, + struct ldlm_lock *lock, + int unused, int added, + int count) +{ + enum ldlm_policy_res result; + + result = ldlm_cancel_lrur_policy(ns, lock, unused, added, count); + if (result == LDLM_POLICY_KEEP_LOCK) + return result; + + return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count); } /** @@ -1251,10 +1273,9 @@ static ldlm_policy_res_t ldlm_cancel_aged_policy(struct ldlm_namespace *ns, * * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU */ -static ldlm_policy_res_t ldlm_cancel_default_policy(struct ldlm_namespace *ns, - struct ldlm_lock *lock, - int unused, int added, - int count) +static enum ldlm_policy_res +ldlm_cancel_default_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, + int unused, int added, int count) { /* Stop LRU processing when we reach past count or have checked all * locks in LRU. @@ -1263,7 +1284,8 @@ static ldlm_policy_res_t ldlm_cancel_default_policy(struct ldlm_namespace *ns, LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; } -typedef ldlm_policy_res_t (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *, +typedef enum ldlm_policy_res (*ldlm_cancel_lru_policy_t)( + struct ldlm_namespace *, struct ldlm_lock *, int, int, int); @@ -1281,6 +1303,8 @@ ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags) return ldlm_cancel_lrur_policy; else if (flags & LDLM_CANCEL_PASSED) return ldlm_cancel_passed_policy; + else if (flags & LDLM_CANCEL_LRUR_NO_WAIT) + return ldlm_cancel_lrur_no_wait_policy; } else { if (flags & LDLM_CANCEL_AGED) return ldlm_cancel_aged_policy; @@ -1329,6 +1353,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, ldlm_cancel_lru_policy_t pf; struct ldlm_lock *lock, *next; int added = 0, unused, remained; + int no_wait = flags & (LDLM_CANCEL_NO_WAIT | LDLM_CANCEL_LRUR_NO_WAIT); spin_lock(&ns->ns_lock); unused = ns->ns_nr_unused; @@ -1341,7 +1366,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, LASSERT(pf); while (!list_empty(&ns->ns_unused_list)) { - ldlm_policy_res_t result; + enum ldlm_policy_res result; + time_t last_use = 0; /* all unused locks */ if (remained-- <= 0) @@ -1354,17 +1380,20 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, list_for_each_entry_safe(lock, next, &ns->ns_unused_list, l_lru) { /* No locks which got blocking requests. */ - LASSERT(!(lock->l_flags & LDLM_FL_BL_AST)); + LASSERT(!ldlm_is_bl_ast(lock)); - if (flags & LDLM_CANCEL_NO_WAIT && - lock->l_flags & LDLM_FL_SKIPPED) + if (no_wait && ldlm_is_skipped(lock)) /* already processed */ continue; + last_use = lock->l_last_used; + if (last_use == cfs_time_current()) + continue; + /* Somebody is already doing CANCEL. No need for this * lock in LRU, do not traverse it again. */ - if (!(lock->l_flags & LDLM_FL_CANCELING)) + if (!ldlm_is_canceling(lock)) break; ldlm_lock_remove_from_lru_nolock(lock); @@ -1407,12 +1436,14 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, lock_res_and_lock(lock); /* Check flags again under the lock. */ - if ((lock->l_flags & LDLM_FL_CANCELING) || - (ldlm_lock_remove_from_lru(lock) == 0)) { + if (ldlm_is_canceling(lock) || + (ldlm_lock_remove_from_lru_check(lock, last_use) == 0)) { /* Another thread is removing lock from LRU, or * somebody is already doing CANCEL, or there * is a blocking request which will send cancel - * by itself, or the lock is no longer unused. + * by itself, or the lock is no longer unused or + * the lock has been used since the pf() call and + * pages could be put under it. */ unlock_res_and_lock(lock); lu_ref_del(&lock->l_reference, @@ -1429,7 +1460,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, * where while we are doing cancel here, server is also * silently cancelling this lock. */ - lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK; + ldlm_clear_cancel_on_block(lock); /* Setting the CBPENDING flag is a little misleading, * but prevents an important race; namely, once @@ -1526,8 +1557,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, /* If somebody is already doing CANCEL, or blocking AST came, * skip this lock. */ - if (lock->l_flags & LDLM_FL_BL_AST || - lock->l_flags & LDLM_FL_CANCELING) + if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock)) continue; if (lockmode_compat(lock->l_granted_mode, mode)) @@ -1771,7 +1801,6 @@ static void ldlm_namespace_foreach(struct ldlm_namespace *ns, cfs_hash_for_each_nolock(ns->ns_rs_hash, ldlm_res_iter_helper, &helper); - } /* non-blocking function to manipulate a lock whose cb_data is being put away. @@ -1887,7 +1916,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) int flags; /* Bug 11974: Do not replay a lock which is actively being canceled */ - if (lock->l_flags & LDLM_FL_CANCELING) { + if (ldlm_is_canceling(lock)) { LDLM_DEBUG(lock, "Not replaying canceled lock:"); return 0; } @@ -1896,7 +1925,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) * server might have long dropped it, but notification of that event was * lost by network. (and server granted conflicting lock already) */ - if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) { + if (ldlm_is_cancel_on_block(lock)) { LDLM_DEBUG(lock, "Not replaying reply-less lock:"); ldlm_lock_cancel(lock); return 0; |