diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2016-07-30 16:33:25 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2016-07-30 16:33:25 -0700 |
commit | 7f155c702677d057d03b192ce652311de5434697 (patch) | |
tree | dcee0fbb463ec3e55cb50181180c7d175d5895c3 /net/sunrpc | |
parent | d761f3ed6e71bcca724a6e9e39efcac65b7b4ac1 (diff) | |
parent | 944171cbf499d3445c749f7c13c46de0a564a905 (diff) |
Merge tag 'nfs-for-4.8-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
Pull NFS client updates from Trond Myklebust:
"Highlights include:
Stable bugfixes:
- nfs: don't create zero-length requests
- several LAYOUTGET bugfixes
Features:
- several performance related features
- more aggressive caching when we can rely on close-to-open
cache consistency
- remove serialisation of O_DIRECT reads and writes
- optimise several code paths to not flush to disk unnecessarily.
However allow for the idiosyncracies of pNFS for those layout
types that need to issue a LAYOUTCOMMIT before the metadata can
be updated on the server.
- SUNRPC updates to the client data receive path
- pNFS/SCSI support RH/Fedora dm-mpath device nodes
- pNFS files/flexfiles can now use unprivileged ports when
the generic NFS mount options allow it.
Bugfixes:
- Don't use RDMA direct data placement together with data
integrity or privacy security flavours
- Remove the RDMA ALLPHYSICAL memory registration mode as
it has potential security holes.
- Several layout recall fixes to improve NFSv4.1 protocol
compliance.
- Fix an Oops in the pNFS files and flexfiles connection
setup to the DS
- Allow retry of operations that used a returned delegation
stateid
- Don't mark the inode as revalidated if a LAYOUTCOMMIT is
outstanding
- Fix writeback races in nfs4_copy_range() and
nfs42_proc_deallocate()"
* tag 'nfs-for-4.8-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (104 commits)
pNFS: Actively set attributes as invalid if LAYOUTCOMMIT is outstanding
NFSv4: Clean up lookup of SECINFO_NO_NAME
NFSv4.2: Fix warning "variable ‘stateids’ set but not used"
NFSv4: Fix warning "no previous prototype for ‘nfs4_listxattr’"
SUNRPC: Fix a compiler warning in fs/nfs/clnt.c
pNFS: Remove redundant smp_mb() from pnfs_init_lseg()
pNFS: Cleanup - do layout segment initialisation in one place
pNFS: Remove redundant stateid invalidation
pNFS: Remove redundant pnfs_mark_layout_returned_if_empty()
pNFS: Clear the layout metadata if the server changed the layout stateid
pNFS: Cleanup - don't open code pnfs_mark_layout_stateid_invalid()
NFS: pnfs_mark_matching_lsegs_return() should match the layout sequence id
pNFS: Do not set plh_return_seq for non-callback related layoutreturns
pNFS: Ensure layoutreturn acts as a completion for layout callbacks
pNFS: Fix CB_LAYOUTRECALL stateid verification
pNFS: Always update the layout barrier seqid on LAYOUTGET
pNFS: Always update the layout stateid if NFS_LAYOUT_INVALID_STID is set
pNFS: Clear the layout return tracking on layout reinitialisation
pNFS: LAYOUTRETURN should only update the stateid if the layout is valid
nfs: don't create zero-length requests
...
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/auth.c | 8 | ||||
-rw-r--r-- | net/sunrpc/auth_generic.c | 9 | ||||
-rw-r--r-- | net/sunrpc/auth_gss/auth_gss.c | 3 | ||||
-rw-r--r-- | net/sunrpc/auth_gss/gss_krb5_mech.c | 2 | ||||
-rw-r--r-- | net/sunrpc/auth_gss/gss_mech_switch.c | 12 | ||||
-rw-r--r-- | net/sunrpc/auth_null.c | 1 | ||||
-rw-r--r-- | net/sunrpc/auth_unix.c | 1 | ||||
-rw-r--r-- | net/sunrpc/clnt.c | 2 | ||||
-rw-r--r-- | net/sunrpc/sched.c | 67 | ||||
-rw-r--r-- | net/sunrpc/svc.c | 8 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 14 | ||||
-rw-r--r-- | net/sunrpc/xprtmultipath.c | 8 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/Makefile | 2 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/fmr_ops.c | 378 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 369 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/physical_ops.c | 122 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 274 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 40 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 242 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 118 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 125 |
21 files changed, 864 insertions, 941 deletions
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c index 040ff627c18a..a7e42f9a405c 100644 --- a/net/sunrpc/auth.c +++ b/net/sunrpc/auth.c @@ -51,9 +51,7 @@ static int param_set_hashtbl_sz(const char *val, const struct kernel_param *kp) ret = kstrtoul(val, 0, &num); if (ret == -EINVAL) goto out_inval; - nbits = fls(num); - if (num > (1U << nbits)) - nbits++; + nbits = fls(num - 1); if (nbits > MAX_HASHTABLE_BITS || nbits < 2) goto out_inval; *(unsigned int *)kp->arg = nbits; @@ -359,8 +357,10 @@ rpcauth_key_timeout_notify(struct rpc_auth *auth, struct rpc_cred *cred) EXPORT_SYMBOL_GPL(rpcauth_key_timeout_notify); bool -rpcauth_cred_key_to_expire(struct rpc_cred *cred) +rpcauth_cred_key_to_expire(struct rpc_auth *auth, struct rpc_cred *cred) { + if (auth->au_flags & RPCAUTH_AUTH_NO_CRKEY_TIMEOUT) + return false; if (!cred->cr_ops->crkey_to_expire) return false; return cred->cr_ops->crkey_to_expire(cred); diff --git a/net/sunrpc/auth_generic.c b/net/sunrpc/auth_generic.c index 54dd3fdead54..168219535a34 100644 --- a/net/sunrpc/auth_generic.c +++ b/net/sunrpc/auth_generic.c @@ -224,7 +224,7 @@ generic_key_timeout(struct rpc_auth *auth, struct rpc_cred *cred) /* Fast track for non crkey_timeout (no key) underlying credentials */ - if (test_bit(RPC_CRED_NO_CRKEY_TIMEOUT, &acred->ac_flags)) + if (auth->au_flags & RPCAUTH_AUTH_NO_CRKEY_TIMEOUT) return 0; /* Fast track for the normal case */ @@ -236,12 +236,6 @@ generic_key_timeout(struct rpc_auth *auth, struct rpc_cred *cred) if (IS_ERR(tcred)) return -EACCES; - if (!tcred->cr_ops->crkey_timeout) { - set_bit(RPC_CRED_NO_CRKEY_TIMEOUT, &acred->ac_flags); - ret = 0; - goto out_put; - } - /* Test for the almost error case */ ret = tcred->cr_ops->crkey_timeout(tcred); if (ret != 0) { @@ -257,7 +251,6 @@ generic_key_timeout(struct rpc_auth *auth, struct rpc_cred *cred) set_bit(RPC_CRED_NOTIFY_TIMEOUT, &acred->ac_flags); } -out_put: put_rpccred(tcred); return ret; } diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c index e64ae93d5b4f..23c8e7c39656 100644 --- a/net/sunrpc/auth_gss/auth_gss.c +++ b/net/sunrpc/auth_gss/auth_gss.c @@ -1015,8 +1015,11 @@ gss_create_new(struct rpc_auth_create_args *args, struct rpc_clnt *clnt) auth = &gss_auth->rpc_auth; auth->au_cslack = GSS_CRED_SLACK >> 2; auth->au_rslack = GSS_VERF_SLACK >> 2; + auth->au_flags = 0; auth->au_ops = &authgss_ops; auth->au_flavor = flavor; + if (gss_pseudoflavor_to_datatouch(gss_auth->mech, flavor)) + auth->au_flags |= RPCAUTH_AUTH_DATATOUCH; atomic_set(&auth->au_count, 1); kref_init(&gss_auth->kref); diff --git a/net/sunrpc/auth_gss/gss_krb5_mech.c b/net/sunrpc/auth_gss/gss_krb5_mech.c index 65427492b1c9..60595835317a 100644 --- a/net/sunrpc/auth_gss/gss_krb5_mech.c +++ b/net/sunrpc/auth_gss/gss_krb5_mech.c @@ -745,12 +745,14 @@ static struct pf_desc gss_kerberos_pfs[] = { .qop = GSS_C_QOP_DEFAULT, .service = RPC_GSS_SVC_INTEGRITY, .name = "krb5i", + .datatouch = true, }, [2] = { .pseudoflavor = RPC_AUTH_GSS_KRB5P, .qop = GSS_C_QOP_DEFAULT, .service = RPC_GSS_SVC_PRIVACY, .name = "krb5p", + .datatouch = true, }, }; diff --git a/net/sunrpc/auth_gss/gss_mech_switch.c b/net/sunrpc/auth_gss/gss_mech_switch.c index 7063d856a598..5fec3abbe19b 100644 --- a/net/sunrpc/auth_gss/gss_mech_switch.c +++ b/net/sunrpc/auth_gss/gss_mech_switch.c @@ -361,6 +361,18 @@ gss_pseudoflavor_to_service(struct gss_api_mech *gm, u32 pseudoflavor) } EXPORT_SYMBOL(gss_pseudoflavor_to_service); +bool +gss_pseudoflavor_to_datatouch(struct gss_api_mech *gm, u32 pseudoflavor) +{ + int i; + + for (i = 0; i < gm->gm_pf_num; i++) { + if (gm->gm_pfs[i].pseudoflavor == pseudoflavor) + return gm->gm_pfs[i].datatouch; + } + return false; +} + char * gss_service_to_auth_domain_name(struct gss_api_mech *gm, u32 service) { diff --git a/net/sunrpc/auth_null.c b/net/sunrpc/auth_null.c index 8d9eb4d5ddd8..4d17376b2acb 100644 --- a/net/sunrpc/auth_null.c +++ b/net/sunrpc/auth_null.c @@ -115,6 +115,7 @@ static struct rpc_auth null_auth = { .au_cslack = NUL_CALLSLACK, .au_rslack = NUL_REPLYSLACK, + .au_flags = RPCAUTH_AUTH_NO_CRKEY_TIMEOUT, .au_ops = &authnull_ops, .au_flavor = RPC_AUTH_NULL, .au_count = ATOMIC_INIT(0), diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c index 9f65452b7cbc..a99278c984e8 100644 --- a/net/sunrpc/auth_unix.c +++ b/net/sunrpc/auth_unix.c @@ -228,6 +228,7 @@ static struct rpc_auth unix_auth = { .au_cslack = UNX_CALLSLACK, .au_rslack = NUL_REPLYSLACK, + .au_flags = RPCAUTH_AUTH_NO_CRKEY_TIMEOUT, .au_ops = &authunix_ops, .au_flavor = RPC_AUTH_UNIX, .au_count = ATOMIC_INIT(0), diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 2808d550d273..cb49898a5a58 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -2577,7 +2577,7 @@ static void rpc_cb_add_xprt_release(void *calldata) kfree(data); } -const static struct rpc_call_ops rpc_cb_add_xprt_call_ops = { +static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = { .rpc_call_done = rpc_cb_add_xprt_done, .rpc_release = rpc_cb_add_xprt_release, }; diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index fcfd48d263f6..9ae588511aaf 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -54,7 +54,8 @@ static struct rpc_wait_queue delay_queue; /* * rpciod-related stuff */ -struct workqueue_struct *rpciod_workqueue; +struct workqueue_struct *rpciod_workqueue __read_mostly; +struct workqueue_struct *xprtiod_workqueue __read_mostly; /* * Disable the timer for a given RPC task. Should be called with @@ -329,7 +330,8 @@ EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); * lockless RPC_IS_QUEUED() test) before we've had a chance to test * the RPC_TASK_RUNNING flag. */ -static void rpc_make_runnable(struct rpc_task *task) +static void rpc_make_runnable(struct workqueue_struct *wq, + struct rpc_task *task) { bool need_wakeup = !rpc_test_and_set_running(task); @@ -338,7 +340,7 @@ static void rpc_make_runnable(struct rpc_task *task) return; if (RPC_IS_ASYNC(task)) { INIT_WORK(&task->u.tk_work, rpc_async_schedule); - queue_work(rpciod_workqueue, &task->u.tk_work); + queue_work(wq, &task->u.tk_work); } else wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED); } @@ -407,13 +409,16 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task, EXPORT_SYMBOL_GPL(rpc_sleep_on_priority); /** - * __rpc_do_wake_up_task - wake up a single rpc_task + * __rpc_do_wake_up_task_on_wq - wake up a single rpc_task + * @wq: workqueue on which to run task * @queue: wait queue * @task: task to be woken up * * Caller must hold queue->lock, and have cleared the task queued flag. */ -static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task) +static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq, + struct rpc_wait_queue *queue, + struct rpc_task *task) { dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n", task->tk_pid, jiffies); @@ -428,7 +433,7 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task __rpc_remove_wait_queue(queue, task); - rpc_make_runnable(task); + rpc_make_runnable(wq, task); dprintk("RPC: __rpc_wake_up_task done\n"); } @@ -436,16 +441,25 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task /* * Wake up a queued task while the queue lock is being held */ -static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task) +static void rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq, + struct rpc_wait_queue *queue, struct rpc_task *task) { if (RPC_IS_QUEUED(task)) { smp_rmb(); if (task->tk_waitqueue == queue) - __rpc_do_wake_up_task(queue, task); + __rpc_do_wake_up_task_on_wq(wq, queue, task); } } /* + * Wake up a queued task while the queue lock is being held + */ +static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task) +{ + rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task); +} + +/* * Wake up a task on a specific queue */ void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task) @@ -518,7 +532,8 @@ static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue) /* * Wake up the first task on the wait queue. */ -struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, +struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq, + struct rpc_wait_queue *queue, bool (*func)(struct rpc_task *, void *), void *data) { struct rpc_task *task = NULL; @@ -529,7 +544,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, task = __rpc_find_next_queued(queue); if (task != NULL) { if (func(task, data)) - rpc_wake_up_task_queue_locked(queue, task); + rpc_wake_up_task_on_wq_queue_locked(wq, queue, task); else task = NULL; } @@ -537,6 +552,15 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, return task; } + +/* + * Wake up the first task on the wait queue. + */ +struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, + bool (*func)(struct rpc_task *, void *), void *data) +{ + return rpc_wake_up_first_on_wq(rpciod_workqueue, queue, func, data); +} EXPORT_SYMBOL_GPL(rpc_wake_up_first); static bool rpc_wake_up_next_func(struct rpc_task *task, void *data) @@ -814,7 +838,7 @@ void rpc_execute(struct rpc_task *task) bool is_async = RPC_IS_ASYNC(task); rpc_set_active(task); - rpc_make_runnable(task); + rpc_make_runnable(rpciod_workqueue, task); if (!is_async) __rpc_execute(task); } @@ -1071,10 +1095,22 @@ static int rpciod_start(void) * Create the rpciod thread and wait for it to start. */ dprintk("RPC: creating workqueue rpciod\n"); - /* Note: highpri because network receive is latency sensitive */ - wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0); + wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0); + if (!wq) + goto out_failed; rpciod_workqueue = wq; - return rpciod_workqueue != NULL; + /* Note: highpri because network receive is latency sensitive */ + wq = alloc_workqueue("xprtiod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0); + if (!wq) + goto free_rpciod; + xprtiod_workqueue = wq; + return 1; +free_rpciod: + wq = rpciod_workqueue; + rpciod_workqueue = NULL; + destroy_workqueue(wq); +out_failed: + return 0; } static void rpciod_stop(void) @@ -1088,6 +1124,9 @@ static void rpciod_stop(void) wq = rpciod_workqueue; rpciod_workqueue = NULL; destroy_workqueue(wq); + wq = xprtiod_workqueue; + xprtiod_workqueue = NULL; + destroy_workqueue(wq); } void diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index cc9852897395..c5b0cb4f4056 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -1188,11 +1188,17 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv) *statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp); /* Encode reply */ - if (test_bit(RQ_DROPME, &rqstp->rq_flags)) { + if (*statp == rpc_drop_reply || + test_bit(RQ_DROPME, &rqstp->rq_flags)) { if (procp->pc_release) procp->pc_release(rqstp, NULL, rqstp->rq_resp); goto dropit; } + if (*statp == rpc_autherr_badcred) { + if (procp->pc_release) + procp->pc_release(rqstp, NULL, rqstp->rq_resp); + goto err_bad_auth; + } if (*statp == rpc_success && (xdr = procp->pc_encode) && !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) { diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 216a1385718a..8313960cac52 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -220,7 +220,7 @@ static void xprt_clear_locked(struct rpc_xprt *xprt) clear_bit(XPRT_LOCKED, &xprt->state); smp_mb__after_atomic(); } else - queue_work(rpciod_workqueue, &xprt->task_cleanup); + queue_work(xprtiod_workqueue, &xprt->task_cleanup); } /* @@ -295,7 +295,8 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt) if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) return; - if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt)) + if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, + __xprt_lock_write_func, xprt)) return; xprt_clear_locked(xprt); } @@ -324,7 +325,8 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) return; if (RPCXPRT_CONGESTED(xprt)) goto out_unlock; - if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt)) + if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, + __xprt_lock_write_cong_func, xprt)) return; out_unlock: xprt_clear_locked(xprt); @@ -645,7 +647,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt) set_bit(XPRT_CLOSE_WAIT, &xprt->state); /* Try to schedule an autoclose RPC call */ if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) - queue_work(rpciod_workqueue, &xprt->task_cleanup); + queue_work(xprtiod_workqueue, &xprt->task_cleanup); xprt_wake_pending_tasks(xprt, -EAGAIN); spin_unlock_bh(&xprt->transport_lock); } @@ -672,7 +674,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) set_bit(XPRT_CLOSE_WAIT, &xprt->state); /* Try to schedule an autoclose RPC call */ if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) - queue_work(rpciod_workqueue, &xprt->task_cleanup); + queue_work(xprtiod_workqueue, &xprt->task_cleanup); xprt_wake_pending_tasks(xprt, -EAGAIN); out: spin_unlock_bh(&xprt->transport_lock); @@ -689,7 +691,7 @@ xprt_init_autodisconnect(unsigned long data) if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) goto out_abort; spin_unlock(&xprt->transport_lock); - queue_work(rpciod_workqueue, &xprt->task_cleanup); + queue_work(xprtiod_workqueue, &xprt->task_cleanup); return; out_abort: spin_unlock(&xprt->transport_lock); diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c index e7fd76975d86..66c9d63f4797 100644 --- a/net/sunrpc/xprtmultipath.c +++ b/net/sunrpc/xprtmultipath.c @@ -271,14 +271,12 @@ struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi, xprt_switch_find_xprt_t find_next) { struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); - struct list_head *head; if (xps == NULL) return NULL; - head = &xps->xps_xprt_list; - if (xps->xps_nxprts < 2) - return xprt_switch_find_first_entry(head); - return xprt_switch_set_next_cursor(head, &xpi->xpi_cursor, find_next); + return xprt_switch_set_next_cursor(&xps->xps_xprt_list, + &xpi->xpi_cursor, + find_next); } static diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile index dc9f3b513a05..ef19fa42c50f 100644 --- a/net/sunrpc/xprtrdma/Makefile +++ b/net/sunrpc/xprtrdma/Makefile @@ -1,7 +1,7 @@ obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o rpcrdma-y := transport.o rpc_rdma.o verbs.o \ - fmr_ops.o frwr_ops.o physical_ops.o \ + fmr_ops.o frwr_ops.o \ svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \ svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ module.o diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index 6326ebe8b595..21cb3b150b37 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -19,13 +19,6 @@ * verb (fmr_op_unmap). */ -/* Transport recovery - * - * After a transport reconnect, fmr_op_map re-uses the MR already - * allocated for the RPC, but generates a fresh rkey then maps the - * MR again. This process is synchronous. - */ - #include "xprt_rdma.h" #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) @@ -35,62 +28,132 @@ /* Maximum scatter/gather per FMR */ #define RPCRDMA_MAX_FMR_SGES (64) -static struct workqueue_struct *fmr_recovery_wq; - -#define FMR_RECOVERY_WQ_FLAGS (WQ_UNBOUND) +/* Access mode of externally registered pages */ +enum { + RPCRDMA_FMR_ACCESS_FLAGS = IB_ACCESS_REMOTE_WRITE | + IB_ACCESS_REMOTE_READ, +}; -int -fmr_alloc_recovery_wq(void) +bool +fmr_is_supported(struct rpcrdma_ia *ia) { - fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0); - return !fmr_recovery_wq ? -ENOMEM : 0; + if (!ia->ri_device->alloc_fmr) { + pr_info("rpcrdma: 'fmr' mode is not supported by device %s\n", + ia->ri_device->name); + return false; + } + return true; } -void -fmr_destroy_recovery_wq(void) +static int +fmr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *mw) { - struct workqueue_struct *wq; + static struct ib_fmr_attr fmr_attr = { + .max_pages = RPCRDMA_MAX_FMR_SGES, + .max_maps = 1, + .page_shift = PAGE_SHIFT + }; - if (!fmr_recovery_wq) - return; + mw->fmr.fm_physaddrs = kcalloc(RPCRDMA_MAX_FMR_SGES, + sizeof(u64), GFP_KERNEL); + if (!mw->fmr.fm_physaddrs) + goto out_free; - wq = fmr_recovery_wq; - fmr_recovery_wq = NULL; - destroy_workqueue(wq); + mw->mw_sg = kcalloc(RPCRDMA_MAX_FMR_SGES, + sizeof(*mw->mw_sg), GFP_KERNEL); + if (!mw->mw_sg) + goto out_free; + + sg_init_table(mw->mw_sg, RPCRDMA_MAX_FMR_SGES); + + mw->fmr.fm_mr = ib_alloc_fmr(ia->ri_pd, RPCRDMA_FMR_ACCESS_FLAGS, + &fmr_attr); + if (IS_ERR(mw->fmr.fm_mr)) + goto out_fmr_err; + + return 0; + +out_fmr_err: + dprintk("RPC: %s: ib_alloc_fmr returned %ld\n", __func__, + PTR_ERR(mw->fmr.fm_mr)); + +out_free: + kfree(mw->mw_sg); + kfree(mw->fmr.fm_physaddrs); + return -ENOMEM; } static int __fmr_unmap(struct rpcrdma_mw *mw) { LIST_HEAD(l); + int rc; - list_add(&mw->fmr.fmr->list, &l); - return ib_unmap_fmr(&l); + list_add(&mw->fmr.fm_mr->list, &l); + rc = ib_unmap_fmr(&l); + list_del_init(&mw->fmr.fm_mr->list); + return rc; } -/* Deferred reset of a single FMR. Generate a fresh rkey by - * replacing the MR. There's no recovery if this fails. - */ static void -__fmr_recovery_worker(struct work_struct *work) +fmr_op_release_mr(struct rpcrdma_mw *r) { - struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw, - mw_work); - struct rpcrdma_xprt *r_xprt = mw->mw_xprt; + LIST_HEAD(unmap_list); + int rc; - __fmr_unmap(mw); - rpcrdma_put_mw(r_xprt, mw); - return; + /* Ensure MW is not on any rl_registered list */ + if (!list_empty(&r->mw_list)) + list_del(&r->mw_list); + + kfree(r->fmr.fm_physaddrs); + kfree(r->mw_sg); + + /* In case this one was left mapped, try to unmap it + * to prevent dealloc_fmr from failing with EBUSY + */ + rc = __fmr_unmap(r); + if (rc) + pr_err("rpcrdma: final ib_unmap_fmr for %p failed %i\n", + r, rc); + + rc = ib_dealloc_fmr(r->fmr.fm_mr); + if (rc) + pr_err("rpcrdma: final ib_dealloc_fmr for %p returned %i\n", + r, rc); + + kfree(r); } -/* A broken MR was discovered in a context that can't sleep. - * Defer recovery to the recovery worker. +/* Reset of a single FMR. */ static void -__fmr_queue_recovery(struct rpcrdma_mw *mw) +fmr_op_recover_mr(struct rpcrdma_mw *mw) { - INIT_WORK(&mw->mw_work, __fmr_recovery_worker); - queue_work(fmr_recovery_wq, &mw->mw_work); + struct rpcrdma_xprt *r_xprt = mw->mw_xprt; + int rc; + + /* ORDER: invalidate first */ + rc = __fmr_unmap(mw); + + /* ORDER: then DMA unmap */ + ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, + mw->mw_sg, mw->mw_nents, mw->mw_dir); + if (rc) + goto out_release; + + rpcrdma_put_mw(r_xprt, mw); + r_xprt->rx_stats.mrs_recovered++; + return; + +out_release: + pr_err("rpcrdma: FMR reset failed (%d), %p released\n", rc, mw); + r_xprt->rx_stats.mrs_orphaned++; + + spin_lock(&r_xprt->rx_buf.rb_mwlock); + list_del(&mw->mw_all); + spin_unlock(&r_xprt->rx_buf.rb_mwlock); + + fmr_op_release_mr(mw); } static int @@ -112,86 +175,21 @@ fmr_op_maxpages(struct rpcrdma_xprt *r_xprt) RPCRDMA_MAX_HDR_SEGS * RPCRDMA_MAX_FMR_SGES); } -static int -fmr_op_init(struct rpcrdma_xprt *r_xprt) -{ - struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - int mr_access_flags = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ; - struct ib_fmr_attr fmr_attr = { - .max_pages = RPCRDMA_MAX_FMR_SGES, - .max_maps = 1, - .page_shift = PAGE_SHIFT - }; - struct ib_pd *pd = r_xprt->rx_ia.ri_pd; - struct rpcrdma_mw *r; - int i, rc; - - spin_lock_init(&buf->rb_mwlock); - INIT_LIST_HEAD(&buf->rb_mws); - INIT_LIST_HEAD(&buf->rb_all); - - i = max_t(int, RPCRDMA_MAX_DATA_SEGS / RPCRDMA_MAX_FMR_SGES, 1); - i += 2; /* head + tail */ - i *= buf->rb_max_requests; /* one set for each RPC slot */ - dprintk("RPC: %s: initalizing %d FMRs\n", __func__, i); - - rc = -ENOMEM; - while (i--) { - r = kzalloc(sizeof(*r), GFP_KERNEL); - if (!r) - goto out; - - r->fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES * - sizeof(u64), GFP_KERNEL); - if (!r->fmr.physaddrs) - goto out_free; - - r->fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr); - if (IS_ERR(r->fmr.fmr)) - goto out_fmr_err; - - r->mw_xprt = r_xprt; - list_add(&r->mw_list, &buf->rb_mws); - list_add(&r->mw_all, &buf->rb_all); - } - return 0; - -out_fmr_err: - rc = PTR_ERR(r->fmr.fmr); - dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc); - kfree(r->fmr.physaddrs); -out_free: - kfree(r); -out: - return rc; -} - /* Use the ib_map_phys_fmr() verb to register a memory region * for remote access via RDMA READ or RDMA WRITE. */ static int fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, - int nsegs, bool writing) + int nsegs, bool writing, struct rpcrdma_mw **out) { - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct ib_device *device = ia->ri_device; - enum dma_data_direction direction = rpcrdma_data_dir(writing); struct rpcrdma_mr_seg *seg1 = seg; int len, pageoff, i, rc; struct rpcrdma_mw *mw; + u64 *dma_pages; - mw = seg1->rl_mw; - seg1->rl_mw = NULL; - if (!mw) { - mw = rpcrdma_get_mw(r_xprt); - if (!mw) - return -ENOMEM; - } else { - /* this is a retransmit; generate a fresh rkey */ - rc = __fmr_unmap(mw); - if (rc) - return rc; - } + mw = rpcrdma_get_mw(r_xprt); + if (!mw) + return -ENOBUFS; pageoff = offset_in_page(seg1->mr_offset); seg1->mr_offset -= pageoff; /* start of page */ @@ -200,8 +198,14 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, if (nsegs > RPCRDMA_MAX_FMR_SGES) nsegs = RPCRDMA_MAX_FMR_SGES; for (i = 0; i < nsegs;) { - rpcrdma_map_one(device, seg, direction); - mw->fmr.physaddrs[i] = seg->mr_dma; + if (seg->mr_page) + sg_set_page(&mw->mw_sg[i], + seg->mr_page, + seg->mr_len, + offset_in_page(seg->mr_offset)); + else + sg_set_buf(&mw->mw_sg[i], seg->mr_offset, + seg->mr_len); len += seg->mr_len; ++seg; ++i; @@ -210,49 +214,54 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) break; } - - rc = ib_map_phys_fmr(mw->fmr.fmr, mw->fmr.physaddrs, - i, seg1->mr_dma); + mw->mw_nents = i; + mw->mw_dir = rpcrdma_data_dir(writing); + if (i == 0) + goto out_dmamap_err; + + if (!ib_dma_map_sg(r_xprt->rx_ia.ri_device, + mw->mw_sg, mw->mw_nents, mw->mw_dir)) + goto out_dmamap_err; + + for (i = 0, dma_pages = mw->fmr.fm_physaddrs; i < mw->mw_nents; i++) + dma_pages[i] = sg_dma_address(&mw->mw_sg[i]); + rc = ib_map_phys_fmr(mw->fmr.fm_mr, dma_pages, mw->mw_nents, + dma_pages[0]); if (rc) goto out_maperr; - seg1->rl_mw = mw; - seg1->mr_rkey = mw->fmr.fmr->rkey; - seg1->mr_base = seg1->mr_dma + pageoff; - seg1->mr_nsegs = i; - seg1->mr_len = len; - return i; + mw->mw_handle = mw->fmr.fm_mr->rkey; + mw->mw_length = len; + mw->mw_offset = dma_pages[0] + pageoff; -out_maperr: - dprintk("RPC: %s: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n", - __func__, len, (unsigned long long)seg1->mr_dma, - pageoff, i, rc); - while (i--) - rpcrdma_unmap_one(device, --seg); - return rc; -} + *out = mw; + return mw->mw_nents; -static void -__fmr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) -{ - struct ib_device *device = r_xprt->rx_ia.ri_device; - int nsegs = seg->mr_nsegs; +out_dmamap_err: + pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n", + mw->mw_sg, mw->mw_nents); + rpcrdma_defer_mr_recovery(mw); + return -EIO; - while (nsegs--) - rpcrdma_unmap_one(device, seg++); +out_maperr: + pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n", + len, (unsigned long long)dma_pages[0], + pageoff, mw->mw_nents, rc); + rpcrdma_defer_mr_recovery(mw); + return -EIO; } /* Invalidate all memory regions that were registered for "req". * * Sleeps until it is safe for the host CPU to access the * previously mapped memory regions. + * + * Caller ensures that req->rl_registered is not empty. */ static void fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { - struct rpcrdma_mr_seg *seg; - unsigned int i, nchunks; - struct rpcrdma_mw *mw; + struct rpcrdma_mw *mw, *tmp; LIST_HEAD(unmap_list); int rc; @@ -261,90 +270,54 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) /* ORDER: Invalidate all of the req's MRs first * * ib_unmap_fmr() is slow, so use a single call instead - * of one call per mapped MR. + * of one call per mapped FMR. */ - for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { - seg = &req->rl_segments[i]; - mw = seg->rl_mw; - - list_add(&mw->fmr.fmr->list, &unmap_list); - - i += seg->mr_nsegs; - } + list_for_each_entry(mw, &req->rl_registered, mw_list) + list_add_tail(&mw->fmr.fm_mr->list, &unmap_list); rc = ib_unmap_fmr(&unmap_list); if (rc) - pr_warn("%s: ib_unmap_fmr failed (%i)\n", __func__, rc); + goto out_reset; /* ORDER: Now DMA unmap all of the req's MRs, and return * them to the free MW list. */ - for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { - seg = &req->rl_segments[i]; + list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) { + list_del_init(&mw->mw_list); + list_del_init(&mw->fmr.fm_mr->list); + ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, + mw->mw_sg, mw->mw_nents, mw->mw_dir); + rpcrdma_put_mw(r_xprt, mw); + } - __fmr_dma_unmap(r_xprt, seg); - rpcrdma_put_mw(r_xprt, seg->rl_mw); + return; - i += seg->mr_nsegs; - seg->mr_nsegs = 0; - seg->rl_mw = NULL; - } +out_reset: + pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc); - req->rl_nchunks = 0; + list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) { + list_del_init(&mw->fmr.fm_mr->list); + fmr_op_recover_mr(mw); + } } /* Use a slow, safe mechanism to invalidate all memory regions * that were registered for "req". - * - * In the asynchronous case, DMA unmapping occurs first here - * because the rpcrdma_mr_seg is released immediately after this - * call. It's contents won't be available in __fmr_dma_unmap later. - * FIXME. */ static void fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, bool sync) { - struct rpcrdma_mr_seg *seg; struct rpcrdma_mw *mw; - unsigned int i; - - for (i = 0; req->rl_nchunks; req->rl_nchunks--) { - seg = &req->rl_segments[i]; - mw = seg->rl_mw; - - if (sync) { - /* ORDER */ - __fmr_unmap(mw); - __fmr_dma_unmap(r_xprt, seg); - rpcrdma_put_mw(r_xprt, mw); - } else { - __fmr_dma_unmap(r_xprt, seg); - __fmr_queue_recovery(mw); - } - - i += seg->mr_nsegs; - seg->mr_nsegs = 0; - seg->rl_mw = NULL; - } -} - -static void -fmr_op_destroy(struct rpcrdma_buffer *buf) -{ - struct rpcrdma_mw *r; - int rc; - - while (!list_empty(&buf->rb_all)) { - r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); - list_del(&r->mw_all); - kfree(r->fmr.physaddrs); - rc = ib_dealloc_fmr(r->fmr.fmr); - if (rc) - dprintk("RPC: %s: ib_dealloc_fmr failed %i\n", - __func__, rc); + while (!list_empty(&req->rl_registered)) { + mw = list_first_entry(&req->rl_registered, + struct rpcrdma_mw, mw_list); + list_del_init(&mw->mw_list); - kfree(r); + if (sync) + fmr_op_recover_mr(mw); + else + rpcrdma_defer_mr_recovery(mw); } } @@ -352,9 +325,10 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = { .ro_map = fmr_op_map, .ro_unmap_sync = fmr_op_unmap_sync, .ro_unmap_safe = fmr_op_unmap_safe, + .ro_recover_mr = fmr_op_recover_mr, .ro_open = fmr_op_open, .ro_maxpages = fmr_op_maxpages, - .ro_init = fmr_op_init, - .ro_destroy = fmr_op_destroy, + .ro_init_mr = fmr_op_init_mr, + .ro_release_mr = fmr_op_release_mr, .ro_displayname = "fmr", }; diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index c0947544babe..892b5e1d9b09 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -73,29 +73,71 @@ # define RPCDBG_FACILITY RPCDBG_TRANS #endif -static struct workqueue_struct *frwr_recovery_wq; - -#define FRWR_RECOVERY_WQ_FLAGS (WQ_UNBOUND | WQ_MEM_RECLAIM) +bool +frwr_is_supported(struct rpcrdma_ia *ia) +{ + struct ib_device_attr *attrs = &ia->ri_device->attrs; + + if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)) + goto out_not_supported; + if (attrs->max_fast_reg_page_list_len == 0) + goto out_not_supported; + return true; + +out_not_supported: + pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n", + ia->ri_device->name); + return false; +} -int -frwr_alloc_recovery_wq(void) +static int +frwr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r) { - frwr_recovery_wq = alloc_workqueue("frwr_recovery", - FRWR_RECOVERY_WQ_FLAGS, 0); - return !frwr_recovery_wq ? -ENOMEM : 0; + unsigned int depth = ia->ri_max_frmr_depth; + struct rpcrdma_frmr *f = &r->frmr; + int rc; + + f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG, depth); + if (IS_ERR(f->fr_mr)) + goto out_mr_err; + + r->mw_sg = kcalloc(depth, sizeof(*r->mw_sg), GFP_KERNEL); + if (!r->mw_sg) + goto out_list_err; + + sg_init_table(r->mw_sg, depth); + init_completion(&f->fr_linv_done); + return 0; + +out_mr_err: + rc = PTR_ERR(f->fr_mr); + dprintk("RPC: %s: ib_alloc_mr status %i\n", + __func__, rc); + return rc; + +out_list_err: + rc = -ENOMEM; + dprintk("RPC: %s: sg allocation failure\n", + __func__); + ib_dereg_mr(f->fr_mr); + return rc; } -void -frwr_destroy_recovery_wq(void) +static void +frwr_op_release_mr(struct rpcrdma_mw *r) { - struct workqueue_struct *wq; + int rc; - if (!frwr_recovery_wq) - return; + /* Ensure MW is not on any rl_registered list */ + if (!list_empty(&r->mw_list)) + list_del(&r->mw_list); - wq = frwr_recovery_wq; - frwr_recovery_wq = NULL; - destroy_workqueue(wq); + rc = ib_dereg_mr(r->frmr.fr_mr); + if (rc) + pr_err("rpcrdma: final ib_dereg_mr for %p returned %i\n", + r, rc); + kfree(r->mw_sg); + kfree(r); } static int @@ -124,93 +166,37 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r) return 0; } -static void -__frwr_reset_and_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) -{ - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_frmr *f = &mw->frmr; - int rc; - - rc = __frwr_reset_mr(ia, mw); - ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents, f->fr_dir); - if (rc) - return; - - rpcrdma_put_mw(r_xprt, mw); -} - -/* Deferred reset of a single FRMR. Generate a fresh rkey by - * replacing the MR. +/* Reset of a single FRMR. Generate a fresh rkey by replacing the MR. * * There's no recovery if this fails. The FRMR is abandoned, but * remains in rb_all. It will be cleaned up when the transport is * destroyed. */ static void -__frwr_recovery_worker(struct work_struct *work) -{ - struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw, - mw_work); - - __frwr_reset_and_unmap(r->mw_xprt, r); - return; -} - -/* A broken MR was discovered in a context that can't sleep. - * Defer recovery to the recovery worker. - */ -static void -__frwr_queue_recovery(struct rpcrdma_mw *r) -{ - INIT_WORK(&r->mw_work, __frwr_recovery_worker); - queue_work(frwr_recovery_wq, &r->mw_work); -} - -static int -__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device, - unsigned int depth) +frwr_op_recover_mr(struct rpcrdma_mw *mw) { - struct rpcrdma_frmr *f = &r->frmr; + struct rpcrdma_xprt *r_xprt = mw->mw_xprt; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; int rc; - f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); - if (IS_ERR(f->fr_mr)) - goto out_mr_err; - - f->fr_sg = kcalloc(depth, sizeof(*f->fr_sg), GFP_KERNEL); - if (!f->fr_sg) - goto out_list_err; - - sg_init_table(f->fr_sg, depth); - - init_completion(&f->fr_linv_done); - - return 0; + rc = __frwr_reset_mr(ia, mw); + ib_dma_unmap_sg(ia->ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir); + if (rc) + goto out_release; -out_mr_err: - rc = PTR_ERR(f->fr_mr); - dprintk("RPC: %s: ib_alloc_mr status %i\n", - __func__, rc); - return rc; + rpcrdma_put_mw(r_xprt, mw); + r_xprt->rx_stats.mrs_recovered++; + return; -out_list_err: - rc = -ENOMEM; - dprintk("RPC: %s: sg allocation failure\n", - __func__); - ib_dereg_mr(f->fr_mr); - return rc; -} +out_release: + pr_err("rpcrdma: FRMR reset failed %d, %p release\n", rc, mw); + r_xprt->rx_stats.mrs_orphaned++; -static void -__frwr_release(struct rpcrdma_mw *r) -{ - int rc; + spin_lock(&r_xprt->rx_buf.rb_mwlock); + list_del(&mw->mw_all); + spin_unlock(&r_xprt->rx_buf.rb_mwlock); - rc = ib_dereg_mr(r->frmr.fr_mr); - if (rc) - dprintk("RPC: %s: ib_dereg_mr status %i\n", - __func__, rc); - kfree(r->frmr.fr_sg); + frwr_op_release_mr(mw); } static int @@ -346,57 +332,14 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) complete_all(&frmr->fr_linv_done); } -static int -frwr_op_init(struct rpcrdma_xprt *r_xprt) -{ - struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - struct ib_device *device = r_xprt->rx_ia.ri_device; - unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth; - struct ib_pd *pd = r_xprt->rx_ia.ri_pd; - int i; - - spin_lock_init(&buf->rb_mwlock); - INIT_LIST_HEAD(&buf->rb_mws); - INIT_LIST_HEAD(&buf->rb_all); - - i = max_t(int, RPCRDMA_MAX_DATA_SEGS / depth, 1); - i += 2; /* head + tail */ - i *= buf->rb_max_requests; /* one set for each RPC slot */ - dprintk("RPC: %s: initalizing %d FRMRs\n", __func__, i); - - while (i--) { - struct rpcrdma_mw *r; - int rc; - - r = kzalloc(sizeof(*r), GFP_KERNEL); - if (!r) - return -ENOMEM; - - rc = __frwr_init(r, pd, device, depth); - if (rc) { - kfree(r); - return rc; - } - - r->mw_xprt = r_xprt; - list_add(&r->mw_list, &buf->rb_mws); - list_add(&r->mw_all, &buf->rb_all); - } - - return 0; -} - -/* Post a FAST_REG Work Request to register a memory region +/* Post a REG_MR Work Request to register a memory region * for remote access via RDMA READ or RDMA WRITE. */ static int frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, - int nsegs, bool writing) + int nsegs, bool writing, struct rpcrdma_mw **out) { struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct ib_device *device = ia->ri_device; - enum dma_data_direction direction = rpcrdma_data_dir(writing); - struct rpcrdma_mr_seg *seg1 = seg; struct rpcrdma_mw *mw; struct rpcrdma_frmr *frmr; struct ib_mr *mr; @@ -405,14 +348,13 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, int rc, i, n, dma_nents; u8 key; - mw = seg1->rl_mw; - seg1->rl_mw = NULL; + mw = NULL; do { if (mw) - __frwr_queue_recovery(mw); + rpcrdma_defer_mr_recovery(mw); mw = rpcrdma_get_mw(r_xprt); if (!mw) - return -ENOMEM; + return -ENOBUFS; } while (mw->frmr.fr_state != FRMR_IS_INVALID); frmr = &mw->frmr; frmr->fr_state = FRMR_IS_VALID; @@ -421,15 +363,14 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, if (nsegs > ia->ri_max_frmr_depth) nsegs = ia->ri_max_frmr_depth; - for (i = 0; i < nsegs;) { if (seg->mr_page) - sg_set_page(&frmr->fr_sg[i], + sg_set_page(&mw->mw_sg[i], seg->mr_page, seg->mr_len, offset_in_page(seg->mr_offset)); else - sg_set_buf(&frmr->fr_sg[i], seg->mr_offset, + sg_set_buf(&mw->mw_sg[i], seg->mr_offset, seg->mr_len); ++seg; @@ -440,26 +381,22 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) break; } - frmr->fr_nents = i; - frmr->fr_dir = direction; - - dma_nents = ib_dma_map_sg(device, frmr->fr_sg, frmr->fr_nents, direction); - if (!dma_nents) { - pr_err("RPC: %s: failed to dma map sg %p sg_nents %u\n", - __func__, frmr->fr_sg, frmr->fr_nents); - return -ENOMEM; - } + mw->mw_nents = i; + mw->mw_dir = rpcrdma_data_dir(writing); + if (i == 0) + goto out_dmamap_err; - n = ib_map_mr_sg(mr, frmr->fr_sg, frmr->fr_nents, NULL, PAGE_SIZE); - if (unlikely(n != frmr->fr_nents)) { - pr_err("RPC: %s: failed to map mr %p (%u/%u)\n", - __func__, frmr->fr_mr, n, frmr->fr_nents); - rc = n < 0 ? n : -EINVAL; - goto out_senderr; - } + dma_nents = ib_dma_map_sg(ia->ri_device, + mw->mw_sg, mw->mw_nents, mw->mw_dir); + if (!dma_nents) + goto out_dmamap_err; + + n = ib_map_mr_sg(mr, mw->mw_sg, mw->mw_nents, NULL, PAGE_SIZE); + if (unlikely(n != mw->mw_nents)) + goto out_mapmr_err; dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n", - __func__, mw, frmr->fr_nents, mr->length); + __func__, mw, mw->mw_nents, mr->length); key = (u8)(mr->rkey & 0x000000FF); ib_update_fast_reg_key(mr, ++key); @@ -481,24 +418,34 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, if (rc) goto out_senderr; - seg1->rl_mw = mw; - seg1->mr_rkey = mr->rkey; - seg1->mr_base = mr->iova; - seg1->mr_nsegs = frmr->fr_nents; - seg1->mr_len = mr->length; + mw->mw_handle = mr->rkey; + mw->mw_length = mr->length; + mw->mw_offset = mr->iova; + + *out = mw; + return mw->mw_nents; - return frmr->fr_nents; +out_dmamap_err: + pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n", + mw->mw_sg, mw->mw_nents); + rpcrdma_defer_mr_recovery(mw); + return -EIO; + +out_mapmr_err: + pr_err("rpcrdma: failed to map mr %p (%u/%u)\n", + frmr->fr_mr, n, mw->mw_nents); + rpcrdma_defer_mr_recovery(mw); + return -EIO; out_senderr: - dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc); - __frwr_queue_recovery(mw); - return rc; + pr_err("rpcrdma: FRMR registration ib_post_send returned %i\n", rc); + rpcrdma_defer_mr_recovery(mw); + return -ENOTCONN; } static struct ib_send_wr * -__frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg) +__frwr_prepare_linv_wr(struct rpcrdma_mw *mw) { - struct rpcrdma_mw *mw = seg->rl_mw; struct rpcrdma_frmr *f = &mw->frmr; struct ib_send_wr *invalidate_wr; @@ -518,16 +465,16 @@ __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg) * * Sleeps until it is safe for the host CPU to access the * previously mapped memory regions. + * + * Caller ensures that req->rl_registered is not empty. */ static void frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr; struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_mr_seg *seg; - unsigned int i, nchunks; + struct rpcrdma_mw *mw, *tmp; struct rpcrdma_frmr *f; - struct rpcrdma_mw *mw; int rc; dprintk("RPC: %s: req %p\n", __func__, req); @@ -537,22 +484,18 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * Chain the LOCAL_INV Work Requests and post them with * a single ib_post_send() call. */ + f = NULL; invalidate_wrs = pos = prev = NULL; - seg = NULL; - for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { - seg = &req->rl_segments[i]; - - pos = __frwr_prepare_linv_wr(seg); + list_for_each_entry(mw, &req->rl_registered, mw_list) { + pos = __frwr_prepare_linv_wr(mw); if (!invalidate_wrs) invalidate_wrs = pos; else prev->next = pos; prev = pos; - - i += seg->mr_nsegs; + f = &mw->frmr; } - f = &seg->rl_mw->frmr; /* Strong send queue ordering guarantees that when the * last WR in the chain completes, all WRs in the chain @@ -577,39 +520,27 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * them to the free MW list. */ unmap: - for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { - seg = &req->rl_segments[i]; - mw = seg->rl_mw; - seg->rl_mw = NULL; - - ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents, - f->fr_dir); + list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) { + list_del_init(&mw->mw_list); + ib_dma_unmap_sg(ia->ri_device, + mw->mw_sg, mw->mw_nents, mw->mw_dir); rpcrdma_put_mw(r_xprt, mw); - - i += seg->mr_nsegs; - seg->mr_nsegs = 0; } - - req->rl_nchunks = 0; return; reset_mrs: - pr_warn("%s: ib_post_send failed %i\n", __func__, rc); + pr_err("rpcrdma: FRMR invalidate ib_post_send returned %i\n", rc); + rdma_disconnect(ia->ri_id); /* Find and reset the MRs in the LOCAL_INV WRs that did not * get posted. This is synchronous, and slow. */ - for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { - seg = &req->rl_segments[i]; - mw = seg->rl_mw; + list_for_each_entry(mw, &req->rl_registered, mw_list) { f = &mw->frmr; - if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) { __frwr_reset_mr(ia, mw); bad_wr = bad_wr->next; } - - i += seg->mr_nsegs; } goto unmap; } @@ -621,38 +552,17 @@ static void frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, bool sync) { - struct rpcrdma_mr_seg *seg; struct rpcrdma_mw *mw; - unsigned int i; - for (i = 0; req->rl_nchunks; req->rl_nchunks--) { - seg = &req->rl_segments[i]; - mw = seg->rl_mw; + while (!list_empty(&req->rl_registered)) { + mw = list_first_entry(&req->rl_registered, + struct rpcrdma_mw, mw_list); + list_del_init(&mw->mw_list); if (sync) - __frwr_reset_and_unmap(r_xprt, mw); + frwr_op_recover_mr(mw); else - __frwr_queue_recovery(mw); - - i += seg->mr_nsegs; - seg->mr_nsegs = 0; - seg->rl_mw = NULL; - } -} - -static void -frwr_op_destroy(struct rpcrdma_buffer *buf) -{ - struct rpcrdma_mw *r; - - /* Ensure stale MWs for "buf" are no longer in flight */ - flush_workqueue(frwr_recovery_wq); - - while (!list_empty(&buf->rb_all)) { - r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); - list_del(&r->mw_all); - __frwr_release(r); - kfree(r); + rpcrdma_defer_mr_recovery(mw); } } @@ -660,9 +570,10 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = { .ro_map = frwr_op_map, .ro_unmap_sync = frwr_op_unmap_sync, .ro_unmap_safe = frwr_op_unmap_safe, + .ro_recover_mr = frwr_op_recover_mr, .ro_open = frwr_op_open, .ro_maxpages = frwr_op_maxpages, - .ro_init = frwr_op_init, - .ro_destroy = frwr_op_destroy, + .ro_init_mr = frwr_op_init_mr, + .ro_release_mr = frwr_op_release_mr, .ro_displayname = "frwr", }; diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c deleted file mode 100644 index 3750596cc432..000000000000 --- a/net/sunrpc/xprtrdma/physical_ops.c +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright (c) 2015 Oracle. All rights reserved. - * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. - */ - -/* No-op chunk preparation. All client memory is pre-registered. - * Sometimes referred to as ALLPHYSICAL mode. - * - * Physical registration is simple because all client memory is - * pre-registered and never deregistered. This mode is good for - * adapter bring up, but is considered not safe: the server is - * trusted not to abuse its access to client memory not involved - * in RDMA I/O. - */ - -#include "xprt_rdma.h" - -#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) -# define RPCDBG_FACILITY RPCDBG_TRANS -#endif - -static int -physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, - struct rpcrdma_create_data_internal *cdata) -{ - struct ib_mr *mr; - - /* Obtain an rkey to use for RPC data payloads. - */ - mr = ib_get_dma_mr(ia->ri_pd, - IB_ACCESS_LOCAL_WRITE | - IB_ACCESS_REMOTE_WRITE | - IB_ACCESS_REMOTE_READ); - if (IS_ERR(mr)) { - pr_err("%s: ib_get_dma_mr for failed with %lX\n", - __func__, PTR_ERR(mr)); - return -ENOMEM; - } - ia->ri_dma_mr = mr; - - rpcrdma_set_max_header_sizes(ia, cdata, min_t(unsigned int, - RPCRDMA_MAX_DATA_SEGS, - RPCRDMA_MAX_HDR_SEGS)); - return 0; -} - -/* PHYSICAL memory registration conveys one page per chunk segment. - */ -static size_t -physical_op_maxpages(struct rpcrdma_xprt *r_xprt) -{ - return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - RPCRDMA_MAX_HDR_SEGS); -} - -static int -physical_op_init(struct rpcrdma_xprt *r_xprt) -{ - return 0; -} - -/* The client's physical memory is already exposed for - * remote access via RDMA READ or RDMA WRITE. - */ -static int -physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, - int nsegs, bool writing) -{ - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - - rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing)); - seg->mr_rkey = ia->ri_dma_mr->rkey; - seg->mr_base = seg->mr_dma; - return 1; -} - -/* DMA unmap all memory regions that were mapped for "req". - */ -static void -physical_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) -{ - struct ib_device *device = r_xprt->rx_ia.ri_device; - unsigned int i; - - for (i = 0; req->rl_nchunks; --req->rl_nchunks) - rpcrdma_unmap_one(device, &req->rl_segments[i++]); -} - -/* Use a slow, safe mechanism to invalidate all memory regions - * that were registered for "req". - * - * For physical memory registration, there is no good way to - * fence a single MR that has been advertised to the server. The - * client has already handed the server an R_key that cannot be - * invalidated and is shared by all MRs on this connection. - * Tearing down the PD might be the only safe choice, but it's - * not clear that a freshly acquired DMA R_key would be different - * than the one used by the PD that was just destroyed. - * FIXME. - */ -static void -physical_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - bool sync) -{ - physical_op_unmap_sync(r_xprt, req); -} - -static void -physical_op_destroy(struct rpcrdma_buffer *buf) -{ -} - -const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = { - .ro_map = physical_op_map, - .ro_unmap_sync = physical_op_unmap_sync, - .ro_unmap_safe = physical_op_unmap_safe, - .ro_open = physical_op_open, - .ro_maxpages = physical_op_maxpages, - .ro_init = physical_op_init, - .ro_destroy = physical_op_destroy, - .ro_displayname = "physical", -}; diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 35a81096e83d..a47f170b20ef 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -196,8 +196,7 @@ rpcrdma_tail_pullup(struct xdr_buf *buf) * MR when they can. */ static int -rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, - int n, int nsegs) +rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n) { size_t page_offset; u32 remaining; @@ -206,7 +205,7 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, base = vec->iov_base; page_offset = offset_in_page(base); remaining = vec->iov_len; - while (remaining && n < nsegs) { + while (remaining && n < RPCRDMA_MAX_SEGS) { seg[n].mr_page = NULL; seg[n].mr_offset = base; seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); @@ -230,34 +229,34 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, static int rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, - enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs) + enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg) { - int len, n = 0, p; - int page_base; + int len, n, p, page_base; struct page **ppages; + n = 0; if (pos == 0) { - n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs); - if (n == nsegs) - return -EIO; + n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n); + if (n == RPCRDMA_MAX_SEGS) + goto out_overflow; } len = xdrbuf->page_len; ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); page_base = xdrbuf->page_base & ~PAGE_MASK; p = 0; - while (len && n < nsegs) { + while (len && n < RPCRDMA_MAX_SEGS) { if (!ppages[p]) { /* alloc the pagelist for receiving buffer */ ppages[p] = alloc_page(GFP_ATOMIC); if (!ppages[p]) - return -ENOMEM; + return -EAGAIN; } seg[n].mr_page = ppages[p]; seg[n].mr_offset = (void *)(unsigned long) page_base; seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len); if (seg[n].mr_len > PAGE_SIZE) - return -EIO; + goto out_overflow; len -= seg[n].mr_len; ++n; ++p; @@ -265,8 +264,8 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, } /* Message overflows the seg array */ - if (len && n == nsegs) - return -EIO; + if (len && n == RPCRDMA_MAX_SEGS) + goto out_overflow; /* When encoding the read list, the tail is always sent inline */ if (type == rpcrdma_readch) @@ -277,20 +276,24 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, * xdr pad bytes, saving the server an RDMA operation. */ if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize) return n; - n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs); - if (n == nsegs) - return -EIO; + n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n); + if (n == RPCRDMA_MAX_SEGS) + goto out_overflow; } return n; + +out_overflow: + pr_err("rpcrdma: segment array overflow\n"); + return -EIO; } static inline __be32 * -xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg) +xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw) { - *iptr++ = cpu_to_be32(seg->mr_rkey); - *iptr++ = cpu_to_be32(seg->mr_len); - return xdr_encode_hyper(iptr, seg->mr_base); + *iptr++ = cpu_to_be32(mw->mw_handle); + *iptr++ = cpu_to_be32(mw->mw_length); + return xdr_encode_hyper(iptr, mw->mw_offset); } /* XDR-encode the Read list. Supports encoding a list of read @@ -310,7 +313,8 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, struct rpc_rqst *rqst, __be32 *iptr, enum rpcrdma_chunktype rtype) { - struct rpcrdma_mr_seg *seg = req->rl_nextseg; + struct rpcrdma_mr_seg *seg; + struct rpcrdma_mw *mw; unsigned int pos; int n, nsegs; @@ -322,15 +326,17 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, pos = rqst->rq_snd_buf.head[0].iov_len; if (rtype == rpcrdma_areadch) pos = 0; - nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg, - RPCRDMA_MAX_SEGS - req->rl_nchunks); + seg = req->rl_segments; + nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg); if (nsegs < 0) return ERR_PTR(nsegs); do { - n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false); - if (n <= 0) + n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, + false, &mw); + if (n < 0) return ERR_PTR(n); + list_add(&mw->mw_list, &req->rl_registered); *iptr++ = xdr_one; /* item present */ @@ -338,20 +344,17 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, * have the same "position". */ *iptr++ = cpu_to_be32(pos); - iptr = xdr_encode_rdma_segment(iptr, seg); + iptr = xdr_encode_rdma_segment(iptr, mw); - dprintk("RPC: %5u %s: read segment pos %u " - "%d@0x%016llx:0x%08x (%s)\n", + dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n", rqst->rq_task->tk_pid, __func__, pos, - seg->mr_len, (unsigned long long)seg->mr_base, - seg->mr_rkey, n < nsegs ? "more" : "last"); + mw->mw_length, (unsigned long long)mw->mw_offset, + mw->mw_handle, n < nsegs ? "more" : "last"); r_xprt->rx_stats.read_chunk_count++; - req->rl_nchunks++; seg += n; nsegs -= n; } while (nsegs); - req->rl_nextseg = seg; /* Finish Read list */ *iptr++ = xdr_zero; /* Next item not present */ @@ -375,7 +378,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, struct rpc_rqst *rqst, __be32 *iptr, enum rpcrdma_chunktype wtype) { - struct rpcrdma_mr_seg *seg = req->rl_nextseg; + struct rpcrdma_mr_seg *seg; + struct rpcrdma_mw *mw; int n, nsegs, nchunks; __be32 *segcount; @@ -384,10 +388,10 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, return iptr; } + seg = req->rl_segments; nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, rqst->rq_rcv_buf.head[0].iov_len, - wtype, seg, - RPCRDMA_MAX_SEGS - req->rl_nchunks); + wtype, seg); if (nsegs < 0) return ERR_PTR(nsegs); @@ -396,26 +400,25 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nchunks = 0; do { - n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true); - if (n <= 0) + n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, + true, &mw); + if (n < 0) return ERR_PTR(n); + list_add(&mw->mw_list, &req->rl_registered); - iptr = xdr_encode_rdma_segment(iptr, seg); + iptr = xdr_encode_rdma_segment(iptr, mw); - dprintk("RPC: %5u %s: write segment " - "%d@0x016%llx:0x%08x (%s)\n", + dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n", rqst->rq_task->tk_pid, __func__, - seg->mr_len, (unsigned long long)seg->mr_base, - seg->mr_rkey, n < nsegs ? "more" : "last"); + mw->mw_length, (unsigned long long)mw->mw_offset, + mw->mw_handle, n < nsegs ? "more" : "last"); r_xprt->rx_stats.write_chunk_count++; r_xprt->rx_stats.total_rdma_request += seg->mr_len; - req->rl_nchunks++; nchunks++; seg += n; nsegs -= n; } while (nsegs); - req->rl_nextseg = seg; /* Update count of segments in this Write chunk */ *segcount = cpu_to_be32(nchunks); @@ -442,7 +445,8 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, struct rpc_rqst *rqst, __be32 *iptr, enum rpcrdma_chunktype wtype) { - struct rpcrdma_mr_seg *seg = req->rl_nextseg; + struct rpcrdma_mr_seg *seg; + struct rpcrdma_mw *mw; int n, nsegs, nchunks; __be32 *segcount; @@ -451,8 +455,8 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, return iptr; } - nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg, - RPCRDMA_MAX_SEGS - req->rl_nchunks); + seg = req->rl_segments; + nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg); if (nsegs < 0) return ERR_PTR(nsegs); @@ -461,26 +465,25 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, nchunks = 0; do { - n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true); - if (n <= 0) + n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, + true, &mw); + if (n < 0) return ERR_PTR(n); + list_add(&mw->mw_list, &req->rl_registered); - iptr = xdr_encode_rdma_segment(iptr, seg); + iptr = xdr_encode_rdma_segment(iptr, mw); - dprintk("RPC: %5u %s: reply segment " - "%d@0x%016llx:0x%08x (%s)\n", + dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n", rqst->rq_task->tk_pid, __func__, - seg->mr_len, (unsigned long long)seg->mr_base, - seg->mr_rkey, n < nsegs ? "more" : "last"); + mw->mw_length, (unsigned long long)mw->mw_offset, + mw->mw_handle, n < nsegs ? "more" : "last"); r_xprt->rx_stats.reply_chunk_count++; r_xprt->rx_stats.total_rdma_request += seg->mr_len; - req->rl_nchunks++; nchunks++; seg += n; nsegs -= n; } while (nsegs); - req->rl_nextseg = seg; /* Update count of segments in the Reply chunk */ *segcount = cpu_to_be32(nchunks); @@ -567,6 +570,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) struct rpcrdma_req *req = rpcr_to_rdmar(rqst); enum rpcrdma_chunktype rtype, wtype; struct rpcrdma_msg *headerp; + bool ddp_allowed; ssize_t hdrlen; size_t rpclen; __be32 *iptr; @@ -583,6 +587,13 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests); headerp->rm_type = rdma_msg; + /* When the ULP employs a GSS flavor that guarantees integrity + * or privacy, direct data placement of individual data items + * is not allowed. + */ + ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags & + RPCAUTH_AUTH_DATATOUCH); + /* * Chunks needed for results? * @@ -594,7 +605,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) */ if (rpcrdma_results_inline(r_xprt, rqst)) wtype = rpcrdma_noch; - else if (rqst->rq_rcv_buf.flags & XDRBUF_READ) + else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) wtype = rpcrdma_writech; else wtype = rpcrdma_replych; @@ -617,7 +628,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) rtype = rpcrdma_noch; rpcrdma_inline_pullup(rqst); rpclen = rqst->rq_svec[0].iov_len; - } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) { + } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) { rtype = rpcrdma_readch; rpclen = rqst->rq_svec[0].iov_len; rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); @@ -650,8 +661,6 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) * send a Call message with a Position Zero Read chunk and a * regular Read chunk at the same time. */ - req->rl_nchunks = 0; - req->rl_nextseg = req->rl_segments; iptr = headerp->rm_body.rm_chunks; iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype); if (IS_ERR(iptr)) @@ -690,10 +699,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) out_overflow: pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n", hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]); - /* Terminate this RPC. Chunks registered above will be - * released by xprt_release -> xprt_rmda_free . - */ - return -EIO; + iptr = ERR_PTR(-EIO); out_unmap: r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false); @@ -705,15 +711,13 @@ out_unmap: * RDMA'd by server. See map at rpcrdma_create_chunks()! :-) */ static int -rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __be32 **iptrp) +rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp) { unsigned int i, total_len; struct rpcrdma_write_chunk *cur_wchunk; char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf); i = be32_to_cpu(**iptrp); - if (i > max) - return -1; cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1); total_len = 0; while (i--) { @@ -744,45 +748,66 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b return total_len; } -/* - * Scatter inline received data back into provided iov's. +/** + * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs + * @rqst: controlling RPC request + * @srcp: points to RPC message payload in receive buffer + * @copy_len: remaining length of receive buffer content + * @pad: Write chunk pad bytes needed (zero for pure inline) + * + * The upper layer has set the maximum number of bytes it can + * receive in each component of rq_rcv_buf. These values are set in + * the head.iov_len, page_len, tail.iov_len, and buflen fields. + * + * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in + * many cases this function simply updates iov_base pointers in + * rq_rcv_buf to point directly to the received reply data, to + * avoid copying reply data. + * + * Returns the count of bytes which had to be memcopied. */ -static void +static unsigned long rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) { - int i, npages, curlen, olen; + unsigned long fixup_copy_count; + int i, npages, curlen; char *destp; struct page **ppages; int page_base; + /* The head iovec is redirected to the RPC reply message + * in the receive buffer, to avoid a memcopy. + */ + rqst->rq_rcv_buf.head[0].iov_base = srcp; + rqst->rq_private_buf.head[0].iov_base = srcp; + + /* The contents of the receive buffer that follow + * head.iov_len bytes are copied into the page list. + */ curlen = rqst->rq_rcv_buf.head[0].iov_len; - if (curlen > copy_len) { /* write chunk header fixup */ + if (curlen > copy_len) curlen = copy_len; - rqst->rq_rcv_buf.head[0].iov_len = curlen; - } - dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n", __func__, srcp, copy_len, curlen); - - /* Shift pointer for first receive segment only */ - rqst->rq_rcv_buf.head[0].iov_base = srcp; srcp += curlen; copy_len -= curlen; - olen = copy_len; - i = 0; - rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen; page_base = rqst->rq_rcv_buf.page_base; ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT); page_base &= ~PAGE_MASK; - + fixup_copy_count = 0; if (copy_len && rqst->rq_rcv_buf.page_len) { - npages = PAGE_ALIGN(page_base + - rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT; - for (; i < npages; i++) { + int pagelist_len; + + pagelist_len = rqst->rq_rcv_buf.page_len; + if (pagelist_len > copy_len) + pagelist_len = copy_len; + npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT; + for (i = 0; i < npages; i++) { curlen = PAGE_SIZE - page_base; - if (curlen > copy_len) - curlen = copy_len; + if (curlen > pagelist_len) + curlen = pagelist_len; + dprintk("RPC: %s: page %d" " srcp 0x%p len %d curlen %d\n", __func__, i, srcp, copy_len, curlen); @@ -792,39 +817,32 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) kunmap_atomic(destp); srcp += curlen; copy_len -= curlen; - if (copy_len == 0) + fixup_copy_count += curlen; + pagelist_len -= curlen; + if (!pagelist_len) break; page_base = 0; } - } - if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) { - curlen = copy_len; - if (curlen > rqst->rq_rcv_buf.tail[0].iov_len) - curlen = rqst->rq_rcv_buf.tail[0].iov_len; - if (rqst->rq_rcv_buf.tail[0].iov_base != srcp) - memmove(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen); - dprintk("RPC: %s: tail srcp 0x%p len %d curlen %d\n", - __func__, srcp, copy_len, curlen); - rqst->rq_rcv_buf.tail[0].iov_len = curlen; - copy_len -= curlen; ++i; - } else - rqst->rq_rcv_buf.tail[0].iov_len = 0; - - if (pad) { - /* implicit padding on terminal chunk */ - unsigned char *p = rqst->rq_rcv_buf.tail[0].iov_base; - while (pad--) - p[rqst->rq_rcv_buf.tail[0].iov_len++] = 0; + /* Implicit padding for the last segment in a Write + * chunk is inserted inline at the front of the tail + * iovec. The upper layer ignores the content of + * the pad. Simply ensure inline content in the tail + * that follows the Write chunk is properly aligned. + */ + if (pad) + srcp -= pad; } - if (copy_len) - dprintk("RPC: %s: %d bytes in" - " %d extra segments (%d lost)\n", - __func__, olen, i, copy_len); + /* The tail iovec is redirected to the remaining data + * in the receive buffer, to avoid a memcopy. + */ + if (copy_len || pad) { + rqst->rq_rcv_buf.tail[0].iov_base = srcp; + rqst->rq_private_buf.tail[0].iov_base = srcp; + } - /* TBD avoid a warning from call_decode() */ - rqst->rq_private_buf = rqst->rq_rcv_buf; + return fixup_copy_count; } void @@ -960,14 +978,13 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) (headerp->rm_body.rm_chunks[1] == xdr_zero && headerp->rm_body.rm_chunks[2] != xdr_zero) || (headerp->rm_body.rm_chunks[1] != xdr_zero && - req->rl_nchunks == 0)) + list_empty(&req->rl_registered))) goto badheader; if (headerp->rm_body.rm_chunks[1] != xdr_zero) { /* count any expected write chunks in read reply */ /* start at write chunk array count */ iptr = &headerp->rm_body.rm_chunks[2]; - rdmalen = rpcrdma_count_chunks(rep, - req->rl_nchunks, 1, &iptr); + rdmalen = rpcrdma_count_chunks(rep, 1, &iptr); /* check for validity, and no reply chunk after */ if (rdmalen < 0 || *iptr++ != xdr_zero) goto badheader; @@ -988,8 +1005,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) rep->rr_len -= RPCRDMA_HDRLEN_MIN; status = rep->rr_len; } - /* Fix up the rpc results for upper layer */ - rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen); + + r_xprt->rx_stats.fixup_copy_count += + rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, + rdmalen); break; case rdma_nomsg: @@ -997,11 +1016,11 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) if (headerp->rm_body.rm_chunks[0] != xdr_zero || headerp->rm_body.rm_chunks[1] != xdr_zero || headerp->rm_body.rm_chunks[2] != xdr_one || - req->rl_nchunks == 0) + list_empty(&req->rl_registered)) goto badheader; iptr = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN); - rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr); + rdmalen = rpcrdma_count_chunks(rep, 0, &iptr); if (rdmalen < 0) goto badheader; r_xprt->rx_stats.total_rdma_reply += rdmalen; @@ -1014,14 +1033,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) badheader: default: - dprintk("%s: invalid rpcrdma reply header (type %d):" - " chunks[012] == %d %d %d" - " expected chunks <= %d\n", - __func__, be32_to_cpu(headerp->rm_type), - headerp->rm_body.rm_chunks[0], - headerp->rm_body.rm_chunks[1], - headerp->rm_body.rm_chunks[2], - req->rl_nchunks); + dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n", + rqst->rq_task->tk_pid, __func__, + be32_to_cpu(headerp->rm_type)); status = -EIO; r_xprt->rx_stats.bad_reply_count++; break; @@ -1035,7 +1049,7 @@ out: * control: waking the next RPC waits until this RPC has * relinquished all its Send Queue entries. */ - if (req->rl_nchunks) + if (!list_empty(&req->rl_registered)) r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req); spin_lock_bh(&xprt->transport_lock); diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 99d2e5b72726..81f0e879f019 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -558,7 +558,6 @@ out_sendbuf: out_fail: rpcrdma_buffer_put(req); - r_xprt->rx_stats.failed_marshal_count++; return NULL; } @@ -590,8 +589,19 @@ xprt_rdma_free(void *buffer) rpcrdma_buffer_put(req); } -/* +/** + * xprt_rdma_send_request - marshal and send an RPC request + * @task: RPC task with an RPC message in rq_snd_buf + * + * Return values: + * 0: The request has been sent + * ENOTCONN: Caller needs to invoke connect logic then call again + * ENOBUFS: Call again later to send the request + * EIO: A permanent error occurred. The request was not sent, + * and don't try it again + * * send_request invokes the meat of RPC RDMA. It must do the following: + * * 1. Marshal the RPC request into an RPC RDMA request, which means * putting a header in front of data, and creating IOVs for RDMA * from those in the request. @@ -600,7 +610,6 @@ xprt_rdma_free(void *buffer) * the request (rpcrdma_ep_post). * 4. No partial sends are possible in the RPC-RDMA protocol (as in UDP). */ - static int xprt_rdma_send_request(struct rpc_task *task) { @@ -610,6 +619,9 @@ xprt_rdma_send_request(struct rpc_task *task) struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); int rc = 0; + /* On retransmit, remove any previously registered chunks */ + r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false); + rc = rpcrdma_marshal_req(rqst); if (rc < 0) goto failed_marshal; @@ -630,11 +642,12 @@ xprt_rdma_send_request(struct rpc_task *task) return 0; failed_marshal: - r_xprt->rx_stats.failed_marshal_count++; dprintk("RPC: %s: rpcrdma_marshal_req failed, status %i\n", __func__, rc); if (rc == -EIO) - return -EIO; + r_xprt->rx_stats.failed_marshal_count++; + if (rc != -ENOTCONN) + return rc; drop_connection: xprt_disconnect_done(xprt); return -ENOTCONN; /* implies disconnect */ @@ -660,7 +673,7 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) xprt->stat.bad_xids, xprt->stat.req_u, xprt->stat.bklog_u); - seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu\n", + seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ", r_xprt->rx_stats.read_chunk_count, r_xprt->rx_stats.write_chunk_count, r_xprt->rx_stats.reply_chunk_count, @@ -672,6 +685,10 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) r_xprt->rx_stats.failed_marshal_count, r_xprt->rx_stats.bad_reply_count, r_xprt->rx_stats.nomsg_call_count); + seq_printf(seq, "%lu %lu %lu\n", + r_xprt->rx_stats.mrs_recovered, + r_xprt->rx_stats.mrs_orphaned, + r_xprt->rx_stats.mrs_allocated); } static int @@ -741,7 +758,6 @@ void xprt_rdma_cleanup(void) __func__, rc); rpcrdma_destroy_wq(); - frwr_destroy_recovery_wq(); rc = xprt_unregister_transport(&xprt_rdma_bc); if (rc) @@ -753,20 +769,13 @@ int xprt_rdma_init(void) { int rc; - rc = frwr_alloc_recovery_wq(); - if (rc) - return rc; - rc = rpcrdma_alloc_wq(); - if (rc) { - frwr_destroy_recovery_wq(); + if (rc) return rc; - } rc = xprt_register_transport(&xprt_rdma); if (rc) { rpcrdma_destroy_wq(); - frwr_destroy_recovery_wq(); return rc; } @@ -774,7 +783,6 @@ int xprt_rdma_init(void) if (rc) { xprt_unregister_transport(&xprt_rdma); rpcrdma_destroy_wq(); - frwr_destroy_recovery_wq(); return rc; } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index b044d98a1370..536d0be3f61b 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -379,8 +379,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) struct rpcrdma_ia *ia = &xprt->rx_ia; int rc; - ia->ri_dma_mr = NULL; - ia->ri_id = rpcrdma_create_id(xprt, ia, addr); if (IS_ERR(ia->ri_id)) { rc = PTR_ERR(ia->ri_id); @@ -391,47 +389,29 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) ia->ri_pd = ib_alloc_pd(ia->ri_device); if (IS_ERR(ia->ri_pd)) { rc = PTR_ERR(ia->ri_pd); - dprintk("RPC: %s: ib_alloc_pd() failed %i\n", - __func__, rc); + pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); goto out2; } - if (memreg == RPCRDMA_FRMR) { - if (!(ia->ri_device->attrs.device_cap_flags & - IB_DEVICE_MEM_MGT_EXTENSIONS) || - (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) { - dprintk("RPC: %s: FRMR registration " - "not supported by HCA\n", __func__); - memreg = RPCRDMA_MTHCAFMR; - } - } - if (memreg == RPCRDMA_MTHCAFMR) { - if (!ia->ri_device->alloc_fmr) { - dprintk("RPC: %s: MTHCAFMR registration " - "not supported by HCA\n", __func__); - rc = -EINVAL; - goto out3; - } - } - switch (memreg) { case RPCRDMA_FRMR: - ia->ri_ops = &rpcrdma_frwr_memreg_ops; - break; - case RPCRDMA_ALLPHYSICAL: - ia->ri_ops = &rpcrdma_physical_memreg_ops; - break; + if (frwr_is_supported(ia)) { + ia->ri_ops = &rpcrdma_frwr_memreg_ops; + break; + } + /*FALLTHROUGH*/ case RPCRDMA_MTHCAFMR: - ia->ri_ops = &rpcrdma_fmr_memreg_ops; - break; + if (fmr_is_supported(ia)) { + ia->ri_ops = &rpcrdma_fmr_memreg_ops; + break; + } + /*FALLTHROUGH*/ default: - printk(KERN_ERR "RPC: Unsupported memory " - "registration mode: %d\n", memreg); - rc = -ENOMEM; + pr_err("rpcrdma: Unsupported memory registration mode: %d\n", + memreg); + rc = -EINVAL; goto out3; } - dprintk("RPC: %s: memory registration strategy is '%s'\n", - __func__, ia->ri_ops->ro_displayname); return 0; @@ -585,8 +565,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, out2: ib_free_cq(sendcq); out1: - if (ia->ri_dma_mr) - ib_dereg_mr(ia->ri_dma_mr); return rc; } @@ -600,8 +578,6 @@ out1: void rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) { - int rc; - dprintk("RPC: %s: entering, connected is %d\n", __func__, ep->rep_connected); @@ -615,12 +591,6 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ib_free_cq(ep->rep_attr.recv_cq); ib_free_cq(ep->rep_attr.send_cq); - - if (ia->ri_dma_mr) { - rc = ib_dereg_mr(ia->ri_dma_mr); - dprintk("RPC: %s: ib_dereg_mr returned %i\n", - __func__, rc); - } } /* @@ -777,6 +747,90 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ib_drain_qp(ia->ri_id->qp); } +static void +rpcrdma_mr_recovery_worker(struct work_struct *work) +{ + struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, + rb_recovery_worker.work); + struct rpcrdma_mw *mw; + + spin_lock(&buf->rb_recovery_lock); + while (!list_empty(&buf->rb_stale_mrs)) { + mw = list_first_entry(&buf->rb_stale_mrs, + struct rpcrdma_mw, mw_list); + list_del_init(&mw->mw_list); + spin_unlock(&buf->rb_recovery_lock); + + dprintk("RPC: %s: recovering MR %p\n", __func__, mw); + mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw); + + spin_lock(&buf->rb_recovery_lock); + } + spin_unlock(&buf->rb_recovery_lock); +} + +void +rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw) +{ + struct rpcrdma_xprt *r_xprt = mw->mw_xprt; + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + + spin_lock(&buf->rb_recovery_lock); + list_add(&mw->mw_list, &buf->rb_stale_mrs); + spin_unlock(&buf->rb_recovery_lock); + + schedule_delayed_work(&buf->rb_recovery_worker, 0); +} + +static void +rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + unsigned int count; + LIST_HEAD(free); + LIST_HEAD(all); + + for (count = 0; count < 32; count++) { + struct rpcrdma_mw *mw; + int rc; + + mw = kzalloc(sizeof(*mw), GFP_KERNEL); + if (!mw) + break; + + rc = ia->ri_ops->ro_init_mr(ia, mw); + if (rc) { + kfree(mw); + break; + } + + mw->mw_xprt = r_xprt; + + list_add(&mw->mw_list, &free); + list_add(&mw->mw_all, &all); + } + + spin_lock(&buf->rb_mwlock); + list_splice(&free, &buf->rb_mws); + list_splice(&all, &buf->rb_all); + r_xprt->rx_stats.mrs_allocated += count; + spin_unlock(&buf->rb_mwlock); + + dprintk("RPC: %s: created %u MRs\n", __func__, count); +} + +static void +rpcrdma_mr_refresh_worker(struct work_struct *work) +{ + struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, + rb_refresh_worker.work); + struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, + rx_buf); + + rpcrdma_create_mrs(r_xprt); +} + struct rpcrdma_req * rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) { @@ -793,6 +847,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) spin_unlock(&buffer->rb_reqslock); req->rl_cqe.done = rpcrdma_wc_send; req->rl_buffer = &r_xprt->rx_buf; + INIT_LIST_HEAD(&req->rl_registered); return req; } @@ -832,17 +887,23 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - struct rpcrdma_ia *ia = &r_xprt->rx_ia; int i, rc; buf->rb_max_requests = r_xprt->rx_data.max_requests; buf->rb_bc_srv_max_requests = 0; - spin_lock_init(&buf->rb_lock); atomic_set(&buf->rb_credits, 1); + spin_lock_init(&buf->rb_mwlock); + spin_lock_init(&buf->rb_lock); + spin_lock_init(&buf->rb_recovery_lock); + INIT_LIST_HEAD(&buf->rb_mws); + INIT_LIST_HEAD(&buf->rb_all); + INIT_LIST_HEAD(&buf->rb_stale_mrs); + INIT_DELAYED_WORK(&buf->rb_refresh_worker, + rpcrdma_mr_refresh_worker); + INIT_DELAYED_WORK(&buf->rb_recovery_worker, + rpcrdma_mr_recovery_worker); - rc = ia->ri_ops->ro_init(r_xprt); - if (rc) - goto out; + rpcrdma_create_mrs(r_xprt); INIT_LIST_HEAD(&buf->rb_send_bufs); INIT_LIST_HEAD(&buf->rb_allreqs); @@ -862,7 +923,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) } INIT_LIST_HEAD(&buf->rb_recv_bufs); - for (i = 0; i < buf->rb_max_requests + 2; i++) { + for (i = 0; i < buf->rb_max_requests; i++) { struct rpcrdma_rep *rep; rep = rpcrdma_create_rep(r_xprt); @@ -918,11 +979,39 @@ rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) kfree(req); } +static void +rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf) +{ + struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, + rx_buf); + struct rpcrdma_ia *ia = rdmab_to_ia(buf); + struct rpcrdma_mw *mw; + unsigned int count; + + count = 0; + spin_lock(&buf->rb_mwlock); + while (!list_empty(&buf->rb_all)) { + mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); + list_del(&mw->mw_all); + + spin_unlock(&buf->rb_mwlock); + ia->ri_ops->ro_release_mr(mw); + count++; + spin_lock(&buf->rb_mwlock); + } + spin_unlock(&buf->rb_mwlock); + r_xprt->rx_stats.mrs_allocated = 0; + + dprintk("RPC: %s: released %u MRs\n", __func__, count); +} + void rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) { struct rpcrdma_ia *ia = rdmab_to_ia(buf); + cancel_delayed_work_sync(&buf->rb_recovery_worker); + while (!list_empty(&buf->rb_recv_bufs)) { struct rpcrdma_rep *rep; @@ -944,7 +1033,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) } spin_unlock(&buf->rb_reqslock); - ia->ri_ops->ro_destroy(buf); + rpcrdma_destroy_mrs(buf); } struct rpcrdma_mw * @@ -962,8 +1051,17 @@ rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) spin_unlock(&buf->rb_mwlock); if (!mw) - pr_err("RPC: %s: no MWs available\n", __func__); + goto out_nomws; return mw; + +out_nomws: + dprintk("RPC: %s: no MWs available\n", __func__); + schedule_delayed_work(&buf->rb_refresh_worker, 0); + + /* Allow the reply handler and refresh worker to run */ + cond_resched(); + + return NULL; } void @@ -978,8 +1076,6 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) /* * Get a set of request/reply buffers. - * - * Reply buffer (if available) is attached to send buffer upon return. */ struct rpcrdma_req * rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) @@ -998,13 +1094,13 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) out_reqbuf: spin_unlock(&buffers->rb_lock); - pr_warn("RPC: %s: out of request buffers\n", __func__); + pr_warn("rpcrdma: out of request buffers (%p)\n", buffers); return NULL; out_repbuf: + list_add(&req->rl_free, &buffers->rb_send_bufs); spin_unlock(&buffers->rb_lock); - pr_warn("RPC: %s: out of reply buffers\n", __func__); - req->rl_reply = NULL; - return req; + pr_warn("rpcrdma: out of reply buffers (%p)\n", buffers); + return NULL; } /* @@ -1060,14 +1156,6 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) * Wrappers for internal-use kmalloc memory registration, used by buffer code. */ -void -rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg) -{ - dprintk("RPC: map_one: offset %p iova %llx len %zu\n", - seg->mr_offset, - (unsigned long long)seg->mr_dma, seg->mr_dmalen); -} - /** * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers * @ia: controlling rpcrdma_ia @@ -1150,7 +1238,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, if (rep) { rc = rpcrdma_ep_post_recv(ia, ep, rep); if (rc) - goto out; + return rc; req->rl_reply = NULL; } @@ -1175,10 +1263,12 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail); if (rc) - dprintk("RPC: %s: ib_post_send returned %i\n", __func__, - rc); -out: - return rc; + goto out_postsend_err; + return 0; + +out_postsend_err: + pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc); + return -ENOTCONN; } /* @@ -1203,11 +1293,13 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, DMA_BIDIRECTIONAL); rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail); - if (rc) - dprintk("RPC: %s: ib_post_recv returned %i\n", __func__, - rc); - return rc; + goto out_postrecv; + return 0; + +out_postrecv: + pr_err("rpcrdma: ib_post_recv returned %i\n", rc); + return -ENOTCONN; } /** diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 95cdc66225ee..670fad57153a 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -68,7 +68,6 @@ struct rpcrdma_ia { struct ib_device *ri_device; struct rdma_cm_id *ri_id; struct ib_pd *ri_pd; - struct ib_mr *ri_dma_mr; struct completion ri_done; int ri_async_rc; unsigned int ri_max_frmr_depth; @@ -172,23 +171,14 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) * o recv buffer (posted to provider) * o ib_sge (also donated to provider) * o status of reply (length, success or not) - * o bookkeeping state to get run by tasklet (list, etc) + * o bookkeeping state to get run by reply handler (list, etc) * - * These are allocated during initialization, per-transport instance; - * however, the tasklet execution list itself is global, as it should - * always be pretty short. + * These are allocated during initialization, per-transport instance. * * N of these are associated with a transport instance, and stored in * struct rpcrdma_buffer. N is the max number of outstanding requests. */ -#define RPCRDMA_MAX_DATA_SEGS ((1 * 1024 * 1024) / PAGE_SIZE) - -/* data segments + head/tail for Call + head/tail for Reply */ -#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 4) - -struct rpcrdma_buffer; - struct rpcrdma_rep { struct ib_cqe rr_cqe; unsigned int rr_len; @@ -221,9 +211,6 @@ enum rpcrdma_frmr_state { }; struct rpcrdma_frmr { - struct scatterlist *fr_sg; - int fr_nents; - enum dma_data_direction fr_dir; struct ib_mr *fr_mr; struct ib_cqe fr_cqe; enum rpcrdma_frmr_state fr_state; @@ -235,18 +222,23 @@ struct rpcrdma_frmr { }; struct rpcrdma_fmr { - struct ib_fmr *fmr; - u64 *physaddrs; + struct ib_fmr *fm_mr; + u64 *fm_physaddrs; }; struct rpcrdma_mw { + struct list_head mw_list; + struct scatterlist *mw_sg; + int mw_nents; + enum dma_data_direction mw_dir; union { struct rpcrdma_fmr fmr; struct rpcrdma_frmr frmr; }; - struct work_struct mw_work; struct rpcrdma_xprt *mw_xprt; - struct list_head mw_list; + u32 mw_handle; + u32 mw_length; + u64 mw_offset; struct list_head mw_all; }; @@ -266,33 +258,30 @@ struct rpcrdma_mw { * of iovs for send operations. The reason is that the iovs passed to * ib_post_{send,recv} must not be modified until the work request * completes. - * - * NOTES: - * o RPCRDMA_MAX_SEGS is the max number of addressible chunk elements we - * marshal. The number needed varies depending on the iov lists that - * are passed to us, the memory registration mode we are in, and if - * physical addressing is used, the layout. */ +/* Maximum number of page-sized "segments" per chunk list to be + * registered or invalidated. Must handle a Reply chunk: + */ +enum { + RPCRDMA_MAX_IOV_SEGS = 3, + RPCRDMA_MAX_DATA_SEGS = ((1 * 1024 * 1024) / PAGE_SIZE) + 1, + RPCRDMA_MAX_SEGS = RPCRDMA_MAX_DATA_SEGS + + RPCRDMA_MAX_IOV_SEGS, +}; + struct rpcrdma_mr_seg { /* chunk descriptors */ - struct rpcrdma_mw *rl_mw; /* registered MR */ - u64 mr_base; /* registration result */ - u32 mr_rkey; /* registration result */ u32 mr_len; /* length of chunk or segment */ - int mr_nsegs; /* number of segments in chunk or 0 */ - enum dma_data_direction mr_dir; /* segment mapping direction */ - dma_addr_t mr_dma; /* segment mapping address */ - size_t mr_dmalen; /* segment mapping length */ struct page *mr_page; /* owning page, if any */ char *mr_offset; /* kva if no page, else offset */ }; #define RPCRDMA_MAX_IOVS (2) +struct rpcrdma_buffer; struct rpcrdma_req { struct list_head rl_free; unsigned int rl_niovs; - unsigned int rl_nchunks; unsigned int rl_connect_cookie; struct rpc_task *rl_task; struct rpcrdma_buffer *rl_buffer; @@ -300,12 +289,13 @@ struct rpcrdma_req { struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS]; struct rpcrdma_regbuf *rl_rdmabuf; struct rpcrdma_regbuf *rl_sendbuf; - struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; - struct rpcrdma_mr_seg *rl_nextseg; struct ib_cqe rl_cqe; struct list_head rl_all; bool rl_backchannel; + + struct list_head rl_registered; /* registered segments */ + struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; }; static inline struct rpcrdma_req * @@ -341,6 +331,11 @@ struct rpcrdma_buffer { struct list_head rb_allreqs; u32 rb_bc_max_requests; + + spinlock_t rb_recovery_lock; /* protect rb_stale_mrs */ + struct list_head rb_stale_mrs; + struct delayed_work rb_recovery_worker; + struct delayed_work rb_refresh_worker; }; #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) @@ -387,6 +382,9 @@ struct rpcrdma_stats { unsigned long bad_reply_count; unsigned long nomsg_call_count; unsigned long bcall_count; + unsigned long mrs_recovered; + unsigned long mrs_orphaned; + unsigned long mrs_allocated; }; /* @@ -395,23 +393,25 @@ struct rpcrdma_stats { struct rpcrdma_xprt; struct rpcrdma_memreg_ops { int (*ro_map)(struct rpcrdma_xprt *, - struct rpcrdma_mr_seg *, int, bool); + struct rpcrdma_mr_seg *, int, bool, + struct rpcrdma_mw **); void (*ro_unmap_sync)(struct rpcrdma_xprt *, struct rpcrdma_req *); void (*ro_unmap_safe)(struct rpcrdma_xprt *, struct rpcrdma_req *, bool); + void (*ro_recover_mr)(struct rpcrdma_mw *); int (*ro_open)(struct rpcrdma_ia *, struct rpcrdma_ep *, struct rpcrdma_create_data_internal *); size_t (*ro_maxpages)(struct rpcrdma_xprt *); - int (*ro_init)(struct rpcrdma_xprt *); - void (*ro_destroy)(struct rpcrdma_buffer *); + int (*ro_init_mr)(struct rpcrdma_ia *, + struct rpcrdma_mw *); + void (*ro_release_mr)(struct rpcrdma_mw *); const char *ro_displayname; }; extern const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops; extern const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops; -extern const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops; /* * RPCRDMA transport -- encapsulates the structures above for @@ -446,6 +446,8 @@ extern int xprt_rdma_pad_optimize; */ int rpcrdma_ia_open(struct rpcrdma_xprt *, struct sockaddr *, int); void rpcrdma_ia_close(struct rpcrdma_ia *); +bool frwr_is_supported(struct rpcrdma_ia *); +bool fmr_is_supported(struct rpcrdma_ia *); /* * Endpoint calls - xprtrdma/verbs.c @@ -477,6 +479,8 @@ void rpcrdma_buffer_put(struct rpcrdma_req *); void rpcrdma_recv_buffer_get(struct rpcrdma_req *); void rpcrdma_recv_buffer_put(struct rpcrdma_rep *); +void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *); + struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *, size_t, gfp_t); void rpcrdma_free_regbuf(struct rpcrdma_ia *, @@ -484,9 +488,6 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *, int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int); -int frwr_alloc_recovery_wq(void); -void frwr_destroy_recovery_wq(void); - int rpcrdma_alloc_wq(void); void rpcrdma_destroy_wq(void); @@ -494,45 +495,12 @@ void rpcrdma_destroy_wq(void); * Wrappers for chunk registration, shared by read/write chunk code. */ -void rpcrdma_mapping_error(struct rpcrdma_mr_seg *); - static inline enum dma_data_direction rpcrdma_data_dir(bool writing) { return writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE; } -static inline void -rpcrdma_map_one(struct ib_device *device, struct rpcrdma_mr_seg *seg, - enum dma_data_direction direction) -{ - seg->mr_dir = direction; - seg->mr_dmalen = seg->mr_len; - - if (seg->mr_page) - seg->mr_dma = ib_dma_map_page(device, - seg->mr_page, offset_in_page(seg->mr_offset), - seg->mr_dmalen, seg->mr_dir); - else - seg->mr_dma = ib_dma_map_single(device, - seg->mr_offset, - seg->mr_dmalen, seg->mr_dir); - - if (ib_dma_mapping_error(device, seg->mr_dma)) - rpcrdma_mapping_error(seg); -} - -static inline void -rpcrdma_unmap_one(struct ib_device *device, struct rpcrdma_mr_seg *seg) -{ - if (seg->mr_page) - ib_dma_unmap_page(device, - seg->mr_dma, seg->mr_dmalen, seg->mr_dir); - else - ib_dma_unmap_single(device, - seg->mr_dma, seg->mr_dmalen, seg->mr_dir); -} - /* * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c */ diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 7e2b2fa189c3..111767ab124a 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -124,7 +124,7 @@ static struct ctl_table xs_tunables_table[] = { .mode = 0644, .proc_handler = proc_dointvec_minmax, .extra1 = &xprt_min_resvport_limit, - .extra2 = &xprt_max_resvport_limit + .extra2 = &xprt_max_resvport }, { .procname = "max_resvport", @@ -132,7 +132,7 @@ static struct ctl_table xs_tunables_table[] = { .maxlen = sizeof(unsigned int), .mode = 0644, .proc_handler = proc_dointvec_minmax, - .extra1 = &xprt_min_resvport_limit, + .extra1 = &xprt_min_resvport, .extra2 = &xprt_max_resvport_limit }, { @@ -642,6 +642,7 @@ static int xs_tcp_send_request(struct rpc_task *task) struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf; bool zerocopy = true; + bool vm_wait = false; int status; int sent; @@ -677,15 +678,33 @@ static int xs_tcp_send_request(struct rpc_task *task) return 0; } + WARN_ON_ONCE(sent == 0 && status == 0); + + if (status == -EAGAIN ) { + /* + * Return EAGAIN if we're sure we're hitting the + * socket send buffer limits. + */ + if (test_bit(SOCK_NOSPACE, &transport->sock->flags)) + break; + /* + * Did we hit a memory allocation failure? + */ + if (sent == 0) { + status = -ENOBUFS; + if (vm_wait) + break; + /* Retry, knowing now that we're below the + * socket send buffer limit + */ + vm_wait = true; + } + continue; + } if (status < 0) break; - if (sent == 0) { - status = -EAGAIN; - break; - } + vm_wait = false; } - if (status == -EAGAIN && sk_stream_is_writeable(transport->inet)) - status = -ENOBUFS; switch (status) { case -ENOTSOCK: @@ -755,11 +774,19 @@ static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *s sk->sk_error_report = transport->old_error_report; } +static void xs_sock_reset_state_flags(struct rpc_xprt *xprt) +{ + struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + + clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); +} + static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) { smp_mb__before_atomic(); clear_bit(XPRT_CLOSE_WAIT, &xprt->state); clear_bit(XPRT_CLOSING, &xprt->state); + xs_sock_reset_state_flags(xprt); smp_mb__after_atomic(); } @@ -962,10 +989,13 @@ static void xs_local_data_receive(struct sock_xprt *transport) goto out; for (;;) { skb = skb_recv_datagram(sk, 0, 1, &err); - if (skb == NULL) + if (skb != NULL) { + xs_local_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram(sk, skb); + continue; + } + if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) break; - xs_local_data_read_skb(&transport->xprt, sk, skb); - skb_free_datagram(sk, skb); } out: mutex_unlock(&transport->recv_mutex); @@ -1043,10 +1073,13 @@ static void xs_udp_data_receive(struct sock_xprt *transport) goto out; for (;;) { skb = skb_recv_datagram(sk, 0, 1, &err); - if (skb == NULL) + if (skb != NULL) { + xs_udp_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram(sk, skb); + continue; + } + if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) break; - xs_udp_data_read_skb(&transport->xprt, sk, skb); - skb_free_datagram(sk, skb); } out: mutex_unlock(&transport->recv_mutex); @@ -1074,7 +1107,14 @@ static void xs_data_ready(struct sock *sk) if (xprt != NULL) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - queue_work(rpciod_workqueue, &transport->recv_worker); + transport->old_data_ready(sk); + /* Any data means we had a useful conversation, so + * then we don't need to delay the next reconnect + */ + if (xprt->reestablish_timeout) + xprt->reestablish_timeout = 0; + if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) + queue_work(xprtiod_workqueue, &transport->recv_worker); } read_unlock_bh(&sk->sk_callback_lock); } @@ -1474,10 +1514,15 @@ static void xs_tcp_data_receive(struct sock_xprt *transport) for (;;) { lock_sock(sk); read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); - release_sock(sk); - if (read <= 0) - break; - total += read; + if (read <= 0) { + clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); + release_sock(sk); + if (!test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) + break; + } else { + release_sock(sk); + total += read; + } rd_desc.count = 65536; } out: @@ -1493,34 +1538,6 @@ static void xs_tcp_data_receive_workfn(struct work_struct *work) } /** - * xs_tcp_data_ready - "data ready" callback for TCP sockets - * @sk: socket with data to read - * - */ -static void xs_tcp_data_ready(struct sock *sk) -{ - struct sock_xprt *transport; - struct rpc_xprt *xprt; - - dprintk("RPC: xs_tcp_data_ready...\n"); - - read_lock_bh(&sk->sk_callback_lock); - if (!(xprt = xprt_from_sock(sk))) - goto out; - transport = container_of(xprt, struct sock_xprt, xprt); - - /* Any data means we had a useful conversation, so - * the we don't need to delay the next reconnect - */ - if (xprt->reestablish_timeout) - xprt->reestablish_timeout = 0; - queue_work(rpciod_workqueue, &transport->recv_worker); - -out: - read_unlock_bh(&sk->sk_callback_lock); -} - -/** * xs_tcp_state_change - callback to handle TCP socket state changes * @sk: socket whose state has changed * @@ -1714,7 +1731,7 @@ static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task) static unsigned short xs_get_random_port(void) { - unsigned short range = xprt_max_resvport - xprt_min_resvport; + unsigned short range = xprt_max_resvport - xprt_min_resvport + 1; unsigned short rand = (unsigned short) prandom_u32() % range; return rand + xprt_min_resvport; } @@ -2241,7 +2258,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) xs_save_old_callbacks(transport, sk); sk->sk_user_data = xprt; - sk->sk_data_ready = xs_tcp_data_ready; + sk->sk_data_ready = xs_data_ready; sk->sk_state_change = xs_tcp_state_change; sk->sk_write_space = xs_tcp_write_space; sock_set_flag(sk, SOCK_FASYNC); @@ -2380,7 +2397,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) /* Start by resetting any existing state */ xs_reset_transport(transport); - queue_delayed_work(rpciod_workqueue, + queue_delayed_work(xprtiod_workqueue, &transport->connect_worker, xprt->reestablish_timeout); xprt->reestablish_timeout <<= 1; @@ -2390,7 +2407,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO; } else { dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); - queue_delayed_work(rpciod_workqueue, + queue_delayed_work(xprtiod_workqueue, &transport->connect_worker, 0); } } @@ -3153,8 +3170,12 @@ static int param_set_uint_minmax(const char *val, static int param_set_portnr(const char *val, const struct kernel_param *kp) { - return param_set_uint_minmax(val, kp, + if (kp->arg == &xprt_min_resvport) + return param_set_uint_minmax(val, kp, RPC_MIN_RESVPORT, + xprt_max_resvport); + return param_set_uint_minmax(val, kp, + xprt_min_resvport, RPC_MAX_RESVPORT); } |