summaryrefslogtreecommitdiff
path: root/net/sunrpc
diff options
context:
space:
mode:
authorTrond Myklebust <trond.myklebust@primarydata.com>2015-06-16 11:36:47 -0400
committerTrond Myklebust <trond.myklebust@primarydata.com>2015-06-16 11:37:50 -0400
commit3438995bc43939ed1502bb2639ff7007169574ea (patch)
tree079a2732cc9eacb98200914c1278aee264b89ba9 /net/sunrpc
parent5ba12443a132420be45d089e4c4ac8a1e26b7da8 (diff)
parent40c6ed0c8a7f2c5d67f5d6774bbce230a42176db (diff)
Merge tag 'nfs-rdma-for-4.2' of git://git.linux-nfs.org/projects/anna/nfs-rdma
NFS: NFSoRDMA Client Changes These patches continue to build up for improving the rsize and wsize that the NFS client uses when talking over RDMA. In addition, these patches also add in scalability enhancements and other bugfixes. Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com> * tag 'nfs-rdma-for-4.2' of git://git.linux-nfs.org/projects/anna/nfs-rdma: (142 commits) xprtrdma: Reduce per-transport MR allocation xprtrdma: Stack relief in fmr_op_map() xprtrdma: Split rb_lock xprtrdma: Remove rpcrdma_ia::ri_memreg_strategy xprtrdma: Remove ->ro_reset xprtrdma: Remove unused LOCAL_INV recovery logic xprtrdma: Acquire MRs in rpcrdma_register_external() xprtrdma: Introduce an FRMR recovery workqueue xprtrdma: Acquire FMRs in rpcrdma_fmr_register_external() xprtrdma: Introduce helpers for allocating MWs xprtrdma: Use ib_device pointer safely xprtrdma: Remove rr_func xprtrdma: Replace rpcrdma_rep::rr_buffer with rr_rxprt xprtrdma: Warn when there are orphaned IB objects ...
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/xprtrdma/fmr_ops.c120
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c227
-rw-r--r--net/sunrpc/xprtrdma/physical_ops.c14
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c8
-rw-r--r--net/sunrpc/xprtrdma/transport.c17
-rw-r--r--net/sunrpc/xprtrdma/verbs.c257
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h38
7 files changed, 343 insertions, 338 deletions
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 302d4ebf6fbf..f1e8dafbd507 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -11,6 +11,21 @@
* can take tens of usecs to complete.
*/
+/* Normal operation
+ *
+ * A Memory Region is prepared for RDMA READ or WRITE using the
+ * ib_map_phys_fmr verb (fmr_op_map). When the RDMA operation is
+ * finished, the Memory Region is unmapped using the ib_unmap_fmr
+ * verb (fmr_op_unmap).
+ */
+
+/* Transport recovery
+ *
+ * After a transport reconnect, fmr_op_map re-uses the MR already
+ * allocated for the RPC, but generates a fresh rkey then maps the
+ * MR again. This process is synchronous.
+ */
+
#include "xprt_rdma.h"
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
@@ -50,19 +65,28 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_mw *r;
int i, rc;
+ spin_lock_init(&buf->rb_mwlock);
INIT_LIST_HEAD(&buf->rb_mws);
INIT_LIST_HEAD(&buf->rb_all);
- i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
- dprintk("RPC: %s: initializing %d FMRs\n", __func__, i);
+ i = max_t(int, RPCRDMA_MAX_DATA_SEGS / RPCRDMA_MAX_FMR_SGES, 1);
+ i += 2; /* head + tail */
+ i *= buf->rb_max_requests; /* one set for each RPC slot */
+ dprintk("RPC: %s: initalizing %d FMRs\n", __func__, i);
+ rc = -ENOMEM;
while (i--) {
r = kzalloc(sizeof(*r), GFP_KERNEL);
if (!r)
- return -ENOMEM;
+ goto out;
- r->r.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
- if (IS_ERR(r->r.fmr))
+ r->r.fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
+ sizeof(u64), GFP_KERNEL);
+ if (!r->r.fmr.physaddrs)
+ goto out_free;
+
+ r->r.fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
+ if (IS_ERR(r->r.fmr.fmr))
goto out_fmr_err;
list_add(&r->mw_list, &buf->rb_mws);
@@ -71,12 +95,24 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
return 0;
out_fmr_err:
- rc = PTR_ERR(r->r.fmr);
+ rc = PTR_ERR(r->r.fmr.fmr);
dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc);
+ kfree(r->r.fmr.physaddrs);
+out_free:
kfree(r);
+out:
return rc;
}
+static int
+__fmr_unmap(struct rpcrdma_mw *r)
+{
+ LIST_HEAD(l);
+
+ list_add(&r->r.fmr.fmr->list, &l);
+ return ib_unmap_fmr(&l);
+}
+
/* Use the ib_map_phys_fmr() verb to register a memory region
* for remote access via RDMA READ or RDMA WRITE.
*/
@@ -85,12 +121,24 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
int nsegs, bool writing)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct ib_device *device = ia->ri_id->device;
+ struct ib_device *device = ia->ri_device;
enum dma_data_direction direction = rpcrdma_data_dir(writing);
struct rpcrdma_mr_seg *seg1 = seg;
- struct rpcrdma_mw *mw = seg1->rl_mw;
- u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
int len, pageoff, i, rc;
+ struct rpcrdma_mw *mw;
+
+ mw = seg1->rl_mw;
+ seg1->rl_mw = NULL;
+ if (!mw) {
+ mw = rpcrdma_get_mw(r_xprt);
+ if (!mw)
+ return -ENOMEM;
+ } else {
+ /* this is a retransmit; generate a fresh rkey */
+ rc = __fmr_unmap(mw);
+ if (rc)
+ return rc;
+ }
pageoff = offset_in_page(seg1->mr_offset);
seg1->mr_offset -= pageoff; /* start of page */
@@ -100,7 +148,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
nsegs = RPCRDMA_MAX_FMR_SGES;
for (i = 0; i < nsegs;) {
rpcrdma_map_one(device, seg, direction);
- physaddrs[i] = seg->mr_dma;
+ mw->r.fmr.physaddrs[i] = seg->mr_dma;
len += seg->mr_len;
++seg;
++i;
@@ -110,11 +158,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
break;
}
- rc = ib_map_phys_fmr(mw->r.fmr, physaddrs, i, seg1->mr_dma);
+ rc = ib_map_phys_fmr(mw->r.fmr.fmr, mw->r.fmr.physaddrs,
+ i, seg1->mr_dma);
if (rc)
goto out_maperr;
- seg1->mr_rkey = mw->r.fmr->rkey;
+ seg1->rl_mw = mw;
+ seg1->mr_rkey = mw->r.fmr.fmr->rkey;
seg1->mr_base = seg1->mr_dma + pageoff;
seg1->mr_nsegs = i;
seg1->mr_len = len;
@@ -137,48 +187,28 @@ fmr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_mr_seg *seg1 = seg;
- struct ib_device *device;
+ struct rpcrdma_mw *mw = seg1->rl_mw;
int rc, nsegs = seg->mr_nsegs;
- LIST_HEAD(l);
- list_add(&seg1->rl_mw->r.fmr->list, &l);
- rc = ib_unmap_fmr(&l);
- read_lock(&ia->ri_qplock);
- device = ia->ri_id->device;
+ dprintk("RPC: %s: FMR %p\n", __func__, mw);
+
+ seg1->rl_mw = NULL;
while (seg1->mr_nsegs--)
- rpcrdma_unmap_one(device, seg++);
- read_unlock(&ia->ri_qplock);
+ rpcrdma_unmap_one(ia->ri_device, seg++);
+ rc = __fmr_unmap(mw);
if (rc)
goto out_err;
+ rpcrdma_put_mw(r_xprt, mw);
return nsegs;
out_err:
+ /* The FMR is abandoned, but remains in rb_all. fmr_op_destroy
+ * will attempt to release it when the transport is destroyed.
+ */
dprintk("RPC: %s: ib_unmap_fmr status %i\n", __func__, rc);
return nsegs;
}
-/* After a disconnect, unmap all FMRs.
- *
- * This is invoked only in the transport connect worker in order
- * to serialize with rpcrdma_register_fmr_external().
- */
-static void
-fmr_op_reset(struct rpcrdma_xprt *r_xprt)
-{
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct rpcrdma_mw *r;
- LIST_HEAD(list);
- int rc;
-
- list_for_each_entry(r, &buf->rb_all, mw_all)
- list_add(&r->r.fmr->list, &list);
-
- rc = ib_unmap_fmr(&list);
- if (rc)
- dprintk("RPC: %s: ib_unmap_fmr failed %i\n",
- __func__, rc);
-}
-
static void
fmr_op_destroy(struct rpcrdma_buffer *buf)
{
@@ -188,10 +218,13 @@ fmr_op_destroy(struct rpcrdma_buffer *buf)
while (!list_empty(&buf->rb_all)) {
r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
list_del(&r->mw_all);
- rc = ib_dealloc_fmr(r->r.fmr);
+ kfree(r->r.fmr.physaddrs);
+
+ rc = ib_dealloc_fmr(r->r.fmr.fmr);
if (rc)
dprintk("RPC: %s: ib_dealloc_fmr failed %i\n",
__func__, rc);
+
kfree(r);
}
}
@@ -202,7 +235,6 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
.ro_open = fmr_op_open,
.ro_maxpages = fmr_op_maxpages,
.ro_init = fmr_op_init,
- .ro_reset = fmr_op_reset,
.ro_destroy = fmr_op_destroy,
.ro_displayname = "fmr",
};
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index dff0481dbcf8..661fbc1784ab 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -11,12 +11,136 @@
* but most complex memory registration mode.
*/
+/* Normal operation
+ *
+ * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
+ * Work Request (frmr_op_map). When the RDMA operation is finished, this
+ * Memory Region is invalidated using a LOCAL_INV Work Request
+ * (frmr_op_unmap).
+ *
+ * Typically these Work Requests are not signaled, and neither are RDMA
+ * SEND Work Requests (with the exception of signaling occasionally to
+ * prevent provider work queue overflows). This greatly reduces HCA
+ * interrupt workload.
+ *
+ * As an optimization, frwr_op_unmap marks MRs INVALID before the
+ * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
+ * rb_mws immediately so that no work (like managing a linked list
+ * under a spinlock) is needed in the completion upcall.
+ *
+ * But this means that frwr_op_map() can occasionally encounter an MR
+ * that is INVALID but the LOCAL_INV WR has not completed. Work Queue
+ * ordering prevents a subsequent FAST_REG WR from executing against
+ * that MR while it is still being invalidated.
+ */
+
+/* Transport recovery
+ *
+ * ->op_map and the transport connect worker cannot run at the same
+ * time, but ->op_unmap can fire while the transport connect worker
+ * is running. Thus MR recovery is handled in ->op_map, to guarantee
+ * that recovered MRs are owned by a sending RPC, and not one where
+ * ->op_unmap could fire at the same time transport reconnect is
+ * being done.
+ *
+ * When the underlying transport disconnects, MRs are left in one of
+ * three states:
+ *
+ * INVALID: The MR was not in use before the QP entered ERROR state.
+ * (Or, the LOCAL_INV WR has not completed or flushed yet).
+ *
+ * STALE: The MR was being registered or unregistered when the QP
+ * entered ERROR state, and the pending WR was flushed.
+ *
+ * VALID: The MR was registered before the QP entered ERROR state.
+ *
+ * When frwr_op_map encounters STALE and VALID MRs, they are recovered
+ * with ib_dereg_mr and then are re-initialized. Beause MR recovery
+ * allocates fresh resources, it is deferred to a workqueue, and the
+ * recovered MRs are placed back on the rb_mws list when recovery is
+ * complete. frwr_op_map allocates another MR for the current RPC while
+ * the broken MR is reset.
+ *
+ * To ensure that frwr_op_map doesn't encounter an MR that is marked
+ * INVALID but that is about to be flushed due to a previous transport
+ * disconnect, the transport connect worker attempts to drain all
+ * pending send queue WRs before the transport is reconnected.
+ */
+
#include "xprt_rdma.h"
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif
+static struct workqueue_struct *frwr_recovery_wq;
+
+#define FRWR_RECOVERY_WQ_FLAGS (WQ_UNBOUND | WQ_MEM_RECLAIM)
+
+int
+frwr_alloc_recovery_wq(void)
+{
+ frwr_recovery_wq = alloc_workqueue("frwr_recovery",
+ FRWR_RECOVERY_WQ_FLAGS, 0);
+ return !frwr_recovery_wq ? -ENOMEM : 0;
+}
+
+void
+frwr_destroy_recovery_wq(void)
+{
+ struct workqueue_struct *wq;
+
+ if (!frwr_recovery_wq)
+ return;
+
+ wq = frwr_recovery_wq;
+ frwr_recovery_wq = NULL;
+ destroy_workqueue(wq);
+}
+
+/* Deferred reset of a single FRMR. Generate a fresh rkey by
+ * replacing the MR.
+ *
+ * There's no recovery if this fails. The FRMR is abandoned, but
+ * remains in rb_all. It will be cleaned up when the transport is
+ * destroyed.
+ */
+static void
+__frwr_recovery_worker(struct work_struct *work)
+{
+ struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
+ r.frmr.fr_work);
+ struct rpcrdma_xprt *r_xprt = r->r.frmr.fr_xprt;
+ unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
+ struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
+
+ if (ib_dereg_mr(r->r.frmr.fr_mr))
+ goto out_fail;
+
+ r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(pd, depth);
+ if (IS_ERR(r->r.frmr.fr_mr))
+ goto out_fail;
+
+ dprintk("RPC: %s: recovered FRMR %p\n", __func__, r);
+ r->r.frmr.fr_state = FRMR_IS_INVALID;
+ rpcrdma_put_mw(r_xprt, r);
+ return;
+
+out_fail:
+ pr_warn("RPC: %s: FRMR %p unrecovered\n",
+ __func__, r);
+}
+
+/* A broken MR was discovered in a context that can't sleep.
+ * Defer recovery to the recovery worker.
+ */
+static void
+__frwr_queue_recovery(struct rpcrdma_mw *r)
+{
+ INIT_WORK(&r->r.frmr.fr_work, __frwr_recovery_worker);
+ queue_work(frwr_recovery_wq, &r->r.frmr.fr_work);
+}
+
static int
__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
unsigned int depth)
@@ -128,7 +252,7 @@ frwr_sendcompletion(struct ib_wc *wc)
/* WARNING: Only wr_id and status are reliable at this point */
r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
- dprintk("RPC: %s: frmr %p (stale), status %d\n",
+ pr_warn("RPC: %s: frmr %p flushed, status %d\n",
__func__, r, wc->status);
r->r.frmr.fr_state = FRMR_IS_STALE;
}
@@ -137,16 +261,19 @@ static int
frwr_op_init(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct ib_device *device = r_xprt->rx_ia.ri_id->device;
+ struct ib_device *device = r_xprt->rx_ia.ri_device;
unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
int i;
+ spin_lock_init(&buf->rb_mwlock);
INIT_LIST_HEAD(&buf->rb_mws);
INIT_LIST_HEAD(&buf->rb_all);
- i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
- dprintk("RPC: %s: initializing %d FRMRs\n", __func__, i);
+ i = max_t(int, RPCRDMA_MAX_DATA_SEGS / depth, 1);
+ i += 2; /* head + tail */
+ i *= buf->rb_max_requests; /* one set for each RPC slot */
+ dprintk("RPC: %s: initalizing %d FRMRs\n", __func__, i);
while (i--) {
struct rpcrdma_mw *r;
@@ -165,6 +292,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
list_add(&r->mw_list, &buf->rb_mws);
list_add(&r->mw_all, &buf->rb_all);
r->mw_sendcompletion = frwr_sendcompletion;
+ r->r.frmr.fr_xprt = r_xprt;
}
return 0;
@@ -178,12 +306,12 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
int nsegs, bool writing)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct ib_device *device = ia->ri_id->device;
+ struct ib_device *device = ia->ri_device;
enum dma_data_direction direction = rpcrdma_data_dir(writing);
struct rpcrdma_mr_seg *seg1 = seg;
- struct rpcrdma_mw *mw = seg1->rl_mw;
- struct rpcrdma_frmr *frmr = &mw->r.frmr;
- struct ib_mr *mr = frmr->fr_mr;
+ struct rpcrdma_mw *mw;
+ struct rpcrdma_frmr *frmr;
+ struct ib_mr *mr;
struct ib_send_wr fastreg_wr, *bad_wr;
u8 key;
int len, pageoff;
@@ -192,12 +320,25 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
u64 pa;
int page_no;
+ mw = seg1->rl_mw;
+ seg1->rl_mw = NULL;
+ do {
+ if (mw)
+ __frwr_queue_recovery(mw);
+ mw = rpcrdma_get_mw(r_xprt);
+ if (!mw)
+ return -ENOMEM;
+ } while (mw->r.frmr.fr_state != FRMR_IS_INVALID);
+ frmr = &mw->r.frmr;
+ frmr->fr_state = FRMR_IS_VALID;
+
pageoff = offset_in_page(seg1->mr_offset);
seg1->mr_offset -= pageoff; /* start of page */
seg1->mr_len += pageoff;
len = -pageoff;
if (nsegs > ia->ri_max_frmr_depth)
nsegs = ia->ri_max_frmr_depth;
+
for (page_no = i = 0; i < nsegs;) {
rpcrdma_map_one(device, seg, direction);
pa = seg->mr_dma;
@@ -216,8 +357,6 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
dprintk("RPC: %s: Using frmr %p to map %d segments (%d bytes)\n",
__func__, mw, i, len);
- frmr->fr_state = FRMR_IS_VALID;
-
memset(&fastreg_wr, 0, sizeof(fastreg_wr));
fastreg_wr.wr_id = (unsigned long)(void *)mw;
fastreg_wr.opcode = IB_WR_FAST_REG_MR;
@@ -229,6 +368,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
fastreg_wr.wr.fast_reg.access_flags = writing ?
IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
IB_ACCESS_REMOTE_READ;
+ mr = frmr->fr_mr;
key = (u8)(mr->rkey & 0x000000FF);
ib_update_fast_reg_key(mr, ++key);
fastreg_wr.wr.fast_reg.rkey = mr->rkey;
@@ -238,6 +378,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
if (rc)
goto out_senderr;
+ seg1->rl_mw = mw;
seg1->mr_rkey = mr->rkey;
seg1->mr_base = seg1->mr_dma + pageoff;
seg1->mr_nsegs = i;
@@ -246,10 +387,9 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
out_senderr:
dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc);
- ib_update_fast_reg_key(mr, --key);
- frmr->fr_state = FRMR_IS_INVALID;
while (i--)
rpcrdma_unmap_one(device, --seg);
+ __frwr_queue_recovery(mw);
return rc;
}
@@ -261,78 +401,46 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
{
struct rpcrdma_mr_seg *seg1 = seg;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_mw *mw = seg1->rl_mw;
struct ib_send_wr invalidate_wr, *bad_wr;
int rc, nsegs = seg->mr_nsegs;
- struct ib_device *device;
- seg1->rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
+ dprintk("RPC: %s: FRMR %p\n", __func__, mw);
+
+ seg1->rl_mw = NULL;
+ mw->r.frmr.fr_state = FRMR_IS_INVALID;
memset(&invalidate_wr, 0, sizeof(invalidate_wr));
- invalidate_wr.wr_id = (unsigned long)(void *)seg1->rl_mw;
+ invalidate_wr.wr_id = (unsigned long)(void *)mw;
invalidate_wr.opcode = IB_WR_LOCAL_INV;
- invalidate_wr.ex.invalidate_rkey = seg1->rl_mw->r.frmr.fr_mr->rkey;
+ invalidate_wr.ex.invalidate_rkey = mw->r.frmr.fr_mr->rkey;
DECR_CQCOUNT(&r_xprt->rx_ep);
- read_lock(&ia->ri_qplock);
- device = ia->ri_id->device;
while (seg1->mr_nsegs--)
- rpcrdma_unmap_one(device, seg++);
+ rpcrdma_unmap_one(ia->ri_device, seg++);
+ read_lock(&ia->ri_qplock);
rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
read_unlock(&ia->ri_qplock);
if (rc)
goto out_err;
+
+ rpcrdma_put_mw(r_xprt, mw);
return nsegs;
out_err:
- /* Force rpcrdma_buffer_get() to retry */
- seg1->rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc);
+ __frwr_queue_recovery(mw);
return nsegs;
}
-/* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in
- * an unusable state. Find FRMRs in this state and dereg / reg
- * each. FRMRs that are VALID and attached to an rpcrdma_req are
- * also torn down.
- *
- * This gives all in-use FRMRs a fresh rkey and leaves them INVALID.
- *
- * This is invoked only in the transport connect worker in order
- * to serialize with rpcrdma_register_frmr_external().
- */
-static void
-frwr_op_reset(struct rpcrdma_xprt *r_xprt)
-{
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct ib_device *device = r_xprt->rx_ia.ri_id->device;
- unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
- struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
- struct rpcrdma_mw *r;
- int rc;
-
- list_for_each_entry(r, &buf->rb_all, mw_all) {
- if (r->r.frmr.fr_state == FRMR_IS_INVALID)
- continue;
-
- __frwr_release(r);
- rc = __frwr_init(r, pd, device, depth);
- if (rc) {
- dprintk("RPC: %s: mw %p left %s\n",
- __func__, r,
- (r->r.frmr.fr_state == FRMR_IS_STALE ?
- "stale" : "valid"));
- continue;
- }
-
- r->r.frmr.fr_state = FRMR_IS_INVALID;
- }
-}
-
static void
frwr_op_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_mw *r;
+ /* Ensure stale MWs for "buf" are no longer in flight */
+ flush_workqueue(frwr_recovery_wq);
+
while (!list_empty(&buf->rb_all)) {
r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
list_del(&r->mw_all);
@@ -347,7 +455,6 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
.ro_open = frwr_op_open,
.ro_maxpages = frwr_op_maxpages,
.ro_init = frwr_op_init,
- .ro_reset = frwr_op_reset,
.ro_destroy = frwr_op_destroy,
.ro_displayname = "frwr",
};
diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c
index ba518af16787..41985d07fdb7 100644
--- a/net/sunrpc/xprtrdma/physical_ops.c
+++ b/net/sunrpc/xprtrdma/physical_ops.c
@@ -50,8 +50,7 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- rpcrdma_map_one(ia->ri_id->device, seg,
- rpcrdma_data_dir(writing));
+ rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing));
seg->mr_rkey = ia->ri_bind_mem->rkey;
seg->mr_base = seg->mr_dma;
seg->mr_nsegs = 1;
@@ -65,19 +64,11 @@ physical_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- read_lock(&ia->ri_qplock);
- rpcrdma_unmap_one(ia->ri_id->device, seg);
- read_unlock(&ia->ri_qplock);
-
+ rpcrdma_unmap_one(ia->ri_device, seg);
return 1;
}
static void
-physical_op_reset(struct rpcrdma_xprt *r_xprt)
-{
-}
-
-static void
physical_op_destroy(struct rpcrdma_buffer *buf)
{
}
@@ -88,7 +79,6 @@ const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = {
.ro_open = physical_op_open,
.ro_maxpages = physical_op_maxpages,
.ro_init = physical_op_init,
- .ro_reset = physical_op_reset,
.ro_destroy = physical_op_destroy,
.ro_displayname = "physical",
};
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 2c53ea9e1b83..84ea37daef36 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -284,9 +284,6 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
return (unsigned char *)iptr - (unsigned char *)headerp;
out:
- if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR)
- return n;
-
for (pos = 0; nchunks--;)
pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
&req->rl_segments[pos]);
@@ -732,8 +729,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
struct rpcrdma_msg *headerp;
struct rpcrdma_req *req;
struct rpc_rqst *rqst;
- struct rpc_xprt *xprt = rep->rr_xprt;
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
__be32 *iptr;
int rdmalen, status;
unsigned long cwnd;
@@ -770,7 +767,6 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
rep->rr_len);
repost:
r_xprt->rx_stats.bad_reply_count++;
- rep->rr_func = rpcrdma_reply_handler;
if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
rpcrdma_recv_buffer_put(rep);
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index b8aac23b1b0c..9facc71d9d86 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -628,12 +628,6 @@ xprt_rdma_send_request(struct rpc_task *task)
if (req->rl_reply == NULL) /* e.g. reconnection */
rpcrdma_recv_buffer_get(req);
- if (req->rl_reply) {
- req->rl_reply->rr_func = rpcrdma_reply_handler;
- /* this need only be done once, but... */
- req->rl_reply->rr_xprt = xprt;
- }
-
/* Must suppress retransmit to maintain credits */
if (req->rl_connect_cookie == xprt->connect_cookie)
goto drop_connection;
@@ -750,17 +744,24 @@ static void __exit xprt_rdma_cleanup(void)
if (rc)
dprintk("RPC: %s: xprt_unregister returned %i\n",
__func__, rc);
+
+ frwr_destroy_recovery_wq();
}
static int __init xprt_rdma_init(void)
{
int rc;
- rc = xprt_register_transport(&xprt_rdma);
-
+ rc = frwr_alloc_recovery_wq();
if (rc)
return rc;
+ rc = xprt_register_transport(&xprt_rdma);
+ if (rc) {
+ frwr_destroy_recovery_wq();
+ return rc;
+ }
+
dprintk("RPCRDMA Module Init, register RPC RDMA transport\n");
dprintk("Defaults:\n");
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 4870d272e006..234083560d0e 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -80,7 +80,6 @@ static void
rpcrdma_run_tasklet(unsigned long data)
{
struct rpcrdma_rep *rep;
- void (*func)(struct rpcrdma_rep *);
unsigned long flags;
data = data;
@@ -89,14 +88,9 @@ rpcrdma_run_tasklet(unsigned long data)
rep = list_entry(rpcrdma_tasklets_g.next,
struct rpcrdma_rep, rr_list);
list_del(&rep->rr_list);
- func = rep->rr_func;
- rep->rr_func = NULL;
spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
- if (func)
- func(rep);
- else
- rpcrdma_recv_buffer_put(rep);
+ rpcrdma_reply_handler(rep);
spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
}
@@ -291,7 +285,7 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
__func__, rep, wc->byte_len);
rep->rr_len = wc->byte_len;
- ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
+ ib_dma_sync_single_for_cpu(rep->rr_device,
rdmab_addr(rep->rr_rdmabuf),
rep->rr_len, DMA_FROM_DEVICE);
prefetch(rdmab_to_msg(rep->rr_rdmabuf));
@@ -487,7 +481,7 @@ connected:
pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
sap, rpc_get_port(sap),
- ia->ri_id->device->name,
+ ia->ri_device->name,
ia->ri_ops->ro_displayname,
xprt->rx_buf.rb_max_requests,
ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
@@ -588,8 +582,9 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
rc = PTR_ERR(ia->ri_id);
goto out1;
}
+ ia->ri_device = ia->ri_id->device;
- ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
+ ia->ri_pd = ib_alloc_pd(ia->ri_device);
if (IS_ERR(ia->ri_pd)) {
rc = PTR_ERR(ia->ri_pd);
dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
@@ -597,7 +592,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
goto out2;
}
- rc = ib_query_device(ia->ri_id->device, devattr);
+ rc = ib_query_device(ia->ri_device, devattr);
if (rc) {
dprintk("RPC: %s: ib_query_device failed %d\n",
__func__, rc);
@@ -606,7 +601,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
ia->ri_have_dma_lkey = 1;
- ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
+ ia->ri_dma_lkey = ia->ri_device->local_dma_lkey;
}
if (memreg == RPCRDMA_FRMR) {
@@ -621,7 +616,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
}
}
if (memreg == RPCRDMA_MTHCAFMR) {
- if (!ia->ri_id->device->alloc_fmr) {
+ if (!ia->ri_device->alloc_fmr) {
dprintk("RPC: %s: MTHCAFMR registration "
"not supported by HCA\n", __func__);
memreg = RPCRDMA_ALLPHYSICAL;
@@ -670,9 +665,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
dprintk("RPC: %s: memory registration strategy is '%s'\n",
__func__, ia->ri_ops->ro_displayname);
- /* Else will do memory reg/dereg for each chunk */
- ia->ri_memreg_strategy = memreg;
-
rwlock_init(&ia->ri_qplock);
return 0;
@@ -702,17 +694,17 @@ rpcrdma_ia_close(struct rpcrdma_ia *ia)
dprintk("RPC: %s: ib_dereg_mr returned %i\n",
__func__, rc);
}
+
if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
if (ia->ri_id->qp)
rdma_destroy_qp(ia->ri_id);
rdma_destroy_id(ia->ri_id);
ia->ri_id = NULL;
}
- if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
- rc = ib_dealloc_pd(ia->ri_pd);
- dprintk("RPC: %s: ib_dealloc_pd returned %i\n",
- __func__, rc);
- }
+
+ /* If the pd is still busy, xprtrdma missed freeing a resource */
+ if (ia->ri_pd && !IS_ERR(ia->ri_pd))
+ WARN_ON(ib_dealloc_pd(ia->ri_pd));
}
/*
@@ -771,9 +763,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
init_waitqueue_head(&ep->rep_connect_wait);
INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
- sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
- rpcrdma_cq_async_error_upcall, ep,
- ep->rep_attr.cap.max_send_wr + 1, 0);
+ sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
+ rpcrdma_cq_async_error_upcall, ep,
+ ep->rep_attr.cap.max_send_wr + 1, 0);
if (IS_ERR(sendcq)) {
rc = PTR_ERR(sendcq);
dprintk("RPC: %s: failed to create send CQ: %i\n",
@@ -788,9 +780,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
goto out2;
}
- recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
- rpcrdma_cq_async_error_upcall, ep,
- ep->rep_attr.cap.max_recv_wr + 1, 0);
+ recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
+ rpcrdma_cq_async_error_upcall, ep,
+ ep->rep_attr.cap.max_recv_wr + 1, 0);
if (IS_ERR(recvcq)) {
rc = PTR_ERR(recvcq);
dprintk("RPC: %s: failed to create recv CQ: %i\n",
@@ -896,8 +888,6 @@ retry:
rpcrdma_flush_cqs(ep);
xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
- ia->ri_ops->ro_reset(xprt);
-
id = rpcrdma_create_id(xprt, ia,
(struct sockaddr *)&xprt->rx_data.addr);
if (IS_ERR(id)) {
@@ -911,7 +901,7 @@ retry:
* More stuff I haven't thought of!
* Rrrgh!
*/
- if (ia->ri_id->device != id->device) {
+ if (ia->ri_device != id->device) {
printk("RPC: %s: can't reconnect on "
"different device!\n", __func__);
rdma_destroy_id(id);
@@ -1053,7 +1043,8 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
goto out_free;
}
- rep->rr_buffer = &r_xprt->rx_buf;
+ rep->rr_device = ia->ri_device;
+ rep->rr_rxprt = r_xprt;
return rep;
out_free:
@@ -1177,31 +1168,33 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
kfree(buf->rb_pool);
}
-/* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
- * some req segments uninitialized.
- */
-static void
-rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
+struct rpcrdma_mw *
+rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
{
- if (*mw) {
- list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
- *mw = NULL;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_mw *mw = NULL;
+
+ spin_lock(&buf->rb_mwlock);
+ if (!list_empty(&buf->rb_mws)) {
+ mw = list_first_entry(&buf->rb_mws,
+ struct rpcrdma_mw, mw_list);
+ list_del_init(&mw->mw_list);
}
+ spin_unlock(&buf->rb_mwlock);
+
+ if (!mw)
+ pr_err("RPC: %s: no MWs available\n", __func__);
+ return mw;
}
-/* Cycle mw's back in reverse order, and "spin" them.
- * This delays and scrambles reuse as much as possible.
- */
-static void
-rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
+void
+rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
{
- struct rpcrdma_mr_seg *seg = req->rl_segments;
- struct rpcrdma_mr_seg *seg1 = seg;
- int i;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
- rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
- rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
+ spin_lock(&buf->rb_mwlock);
+ list_add_tail(&mw->mw_list, &buf->rb_mws);
+ spin_unlock(&buf->rb_mwlock);
}
static void
@@ -1211,115 +1204,10 @@ rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
req->rl_niovs = 0;
if (req->rl_reply) {
buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
- req->rl_reply->rr_func = NULL;
req->rl_reply = NULL;
}
}
-/* rpcrdma_unmap_one() was already done during deregistration.
- * Redo only the ib_post_send().
- */
-static void
-rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
-{
- struct rpcrdma_xprt *r_xprt =
- container_of(ia, struct rpcrdma_xprt, rx_ia);
- struct ib_send_wr invalidate_wr, *bad_wr;
- int rc;
-
- dprintk("RPC: %s: FRMR %p is stale\n", __func__, r);
-
- /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
- r->r.frmr.fr_state = FRMR_IS_INVALID;
-
- memset(&invalidate_wr, 0, sizeof(invalidate_wr));
- invalidate_wr.wr_id = (unsigned long)(void *)r;
- invalidate_wr.opcode = IB_WR_LOCAL_INV;
- invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
- DECR_CQCOUNT(&r_xprt->rx_ep);
-
- dprintk("RPC: %s: frmr %p invalidating rkey %08x\n",
- __func__, r, r->r.frmr.fr_mr->rkey);
-
- read_lock(&ia->ri_qplock);
- rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
- read_unlock(&ia->ri_qplock);
- if (rc) {
- /* Force rpcrdma_buffer_get() to retry */
- r->r.frmr.fr_state = FRMR_IS_STALE;
- dprintk("RPC: %s: ib_post_send failed, %i\n",
- __func__, rc);
- }
-}
-
-static void
-rpcrdma_retry_flushed_linv(struct list_head *stale,
- struct rpcrdma_buffer *buf)
-{
- struct rpcrdma_ia *ia = rdmab_to_ia(buf);
- struct list_head *pos;
- struct rpcrdma_mw *r;
- unsigned long flags;
-
- list_for_each(pos, stale) {
- r = list_entry(pos, struct rpcrdma_mw, mw_list);
- rpcrdma_retry_local_inv(r, ia);
- }
-
- spin_lock_irqsave(&buf->rb_lock, flags);
- list_splice_tail(stale, &buf->rb_mws);
- spin_unlock_irqrestore(&buf->rb_lock, flags);
-}
-
-static struct rpcrdma_req *
-rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
- struct list_head *stale)
-{
- struct rpcrdma_mw *r;
- int i;
-
- i = RPCRDMA_MAX_SEGS - 1;
- while (!list_empty(&buf->rb_mws)) {
- r = list_entry(buf->rb_mws.next,
- struct rpcrdma_mw, mw_list);
- list_del(&r->mw_list);
- if (r->r.frmr.fr_state == FRMR_IS_STALE) {
- list_add(&r->mw_list, stale);
- continue;
- }
- req->rl_segments[i].rl_mw = r;
- if (unlikely(i-- == 0))
- return req; /* Success */
- }
-
- /* Not enough entries on rb_mws for this req */
- rpcrdma_buffer_put_sendbuf(req, buf);
- rpcrdma_buffer_put_mrs(req, buf);
- return NULL;
-}
-
-static struct rpcrdma_req *
-rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
-{
- struct rpcrdma_mw *r;
- int i;
-
- i = RPCRDMA_MAX_SEGS - 1;
- while (!list_empty(&buf->rb_mws)) {
- r = list_entry(buf->rb_mws.next,
- struct rpcrdma_mw, mw_list);
- list_del(&r->mw_list);
- req->rl_segments[i].rl_mw = r;
- if (unlikely(i-- == 0))
- return req; /* Success */
- }
-
- /* Not enough entries on rb_mws for this req */
- rpcrdma_buffer_put_sendbuf(req, buf);
- rpcrdma_buffer_put_mrs(req, buf);
- return NULL;
-}
-
/*
* Get a set of request/reply buffers.
*
@@ -1332,12 +1220,11 @@ rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
- struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
- struct list_head stale;
struct rpcrdma_req *req;
unsigned long flags;
spin_lock_irqsave(&buffers->rb_lock, flags);
+
if (buffers->rb_send_index == buffers->rb_max_requests) {
spin_unlock_irqrestore(&buffers->rb_lock, flags);
dprintk("RPC: %s: out of request buffers\n", __func__);
@@ -1356,20 +1243,7 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
}
buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
- INIT_LIST_HEAD(&stale);
- switch (ia->ri_memreg_strategy) {
- case RPCRDMA_FRMR:
- req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
- break;
- case RPCRDMA_MTHCAFMR:
- req = rpcrdma_buffer_get_fmrs(req, buffers);
- break;
- default:
- break;
- }
spin_unlock_irqrestore(&buffers->rb_lock, flags);
- if (!list_empty(&stale))
- rpcrdma_retry_flushed_linv(&stale, buffers);
return req;
}
@@ -1381,19 +1255,10 @@ void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
struct rpcrdma_buffer *buffers = req->rl_buffer;
- struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
unsigned long flags;
spin_lock_irqsave(&buffers->rb_lock, flags);
rpcrdma_buffer_put_sendbuf(req, buffers);
- switch (ia->ri_memreg_strategy) {
- case RPCRDMA_FRMR:
- case RPCRDMA_MTHCAFMR:
- rpcrdma_buffer_put_mrs(req, buffers);
- break;
- default:
- break;
- }
spin_unlock_irqrestore(&buffers->rb_lock, flags);
}
@@ -1423,10 +1288,9 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
- struct rpcrdma_buffer *buffers = rep->rr_buffer;
+ struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
unsigned long flags;
- rep->rr_func = NULL;
spin_lock_irqsave(&buffers->rb_lock, flags);
buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
spin_unlock_irqrestore(&buffers->rb_lock, flags);
@@ -1455,9 +1319,9 @@ rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
/*
* All memory passed here was kmalloc'ed, therefore phys-contiguous.
*/
- iov->addr = ib_dma_map_single(ia->ri_id->device,
+ iov->addr = ib_dma_map_single(ia->ri_device,
va, len, DMA_BIDIRECTIONAL);
- if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
+ if (ib_dma_mapping_error(ia->ri_device, iov->addr))
return -ENOMEM;
iov->length = len;
@@ -1501,8 +1365,8 @@ rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
{
int rc;
- ib_dma_unmap_single(ia->ri_id->device,
- iov->addr, iov->length, DMA_BIDIRECTIONAL);
+ ib_dma_unmap_single(ia->ri_device,
+ iov->addr, iov->length, DMA_BIDIRECTIONAL);
if (NULL == mr)
return 0;
@@ -1595,15 +1459,18 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
send_wr.num_sge = req->rl_niovs;
send_wr.opcode = IB_WR_SEND;
if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
- ib_dma_sync_single_for_device(ia->ri_id->device,
- req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
- DMA_TO_DEVICE);
- ib_dma_sync_single_for_device(ia->ri_id->device,
- req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
- DMA_TO_DEVICE);
- ib_dma_sync_single_for_device(ia->ri_id->device,
- req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
- DMA_TO_DEVICE);
+ ib_dma_sync_single_for_device(ia->ri_device,
+ req->rl_send_iov[3].addr,
+ req->rl_send_iov[3].length,
+ DMA_TO_DEVICE);
+ ib_dma_sync_single_for_device(ia->ri_device,
+ req->rl_send_iov[1].addr,
+ req->rl_send_iov[1].length,
+ DMA_TO_DEVICE);
+ ib_dma_sync_single_for_device(ia->ri_device,
+ req->rl_send_iov[0].addr,
+ req->rl_send_iov[0].length,
+ DMA_TO_DEVICE);
if (DECR_CQCOUNT(ep) > 0)
send_wr.send_flags = 0;
@@ -1636,7 +1503,7 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
recv_wr.num_sge = 1;
- ib_dma_sync_single_for_cpu(ia->ri_id->device,
+ ib_dma_sync_single_for_cpu(ia->ri_device,
rdmab_addr(rep->rr_rdmabuf),
rdmab_length(rep->rr_rdmabuf),
DMA_BIDIRECTIONAL);
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 78e0b8beaa36..110d685b4ac5 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -62,6 +62,7 @@
struct rpcrdma_ia {
const struct rpcrdma_memreg_ops *ri_ops;
rwlock_t ri_qplock;
+ struct ib_device *ri_device;
struct rdma_cm_id *ri_id;
struct ib_pd *ri_pd;
struct ib_mr *ri_bind_mem;
@@ -69,7 +70,6 @@ struct rpcrdma_ia {
int ri_have_dma_lkey;
struct completion ri_done;
int ri_async_rc;
- enum rpcrdma_memreg ri_memreg_strategy;
unsigned int ri_max_frmr_depth;
struct ib_device_attr ri_devattr;
struct ib_qp_attr ri_qp_attr;
@@ -173,9 +173,8 @@ struct rpcrdma_buffer;
struct rpcrdma_rep {
unsigned int rr_len;
- struct rpcrdma_buffer *rr_buffer;
- struct rpc_xprt *rr_xprt;
- void (*rr_func)(struct rpcrdma_rep *);
+ struct ib_device *rr_device;
+ struct rpcrdma_xprt *rr_rxprt;
struct list_head rr_list;
struct rpcrdma_regbuf *rr_rdmabuf;
};
@@ -203,11 +202,18 @@ struct rpcrdma_frmr {
struct ib_fast_reg_page_list *fr_pgl;
struct ib_mr *fr_mr;
enum rpcrdma_frmr_state fr_state;
+ struct work_struct fr_work;
+ struct rpcrdma_xprt *fr_xprt;
+};
+
+struct rpcrdma_fmr {
+ struct ib_fmr *fmr;
+ u64 *physaddrs;
};
struct rpcrdma_mw {
union {
- struct ib_fmr *fmr;
+ struct rpcrdma_fmr fmr;
struct rpcrdma_frmr frmr;
} r;
void (*mw_sendcompletion)(struct ib_wc *);
@@ -281,15 +287,17 @@ rpcr_to_rdmar(struct rpc_rqst *rqst)
* One of these is associated with a transport instance
*/
struct rpcrdma_buffer {
- spinlock_t rb_lock; /* protects indexes */
- u32 rb_max_requests;/* client max requests */
- struct list_head rb_mws; /* optional memory windows/fmrs/frmrs */
- struct list_head rb_all;
- int rb_send_index;
+ spinlock_t rb_mwlock; /* protect rb_mws list */
+ struct list_head rb_mws;
+ struct list_head rb_all;
+ char *rb_pool;
+
+ spinlock_t rb_lock; /* protect buf arrays */
+ u32 rb_max_requests;
+ int rb_send_index;
+ int rb_recv_index;
struct rpcrdma_req **rb_send_bufs;
- int rb_recv_index;
struct rpcrdma_rep **rb_recv_bufs;
- char *rb_pool;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
@@ -350,7 +358,6 @@ struct rpcrdma_memreg_ops {
struct rpcrdma_create_data_internal *);
size_t (*ro_maxpages)(struct rpcrdma_xprt *);
int (*ro_init)(struct rpcrdma_xprt *);
- void (*ro_reset)(struct rpcrdma_xprt *);
void (*ro_destroy)(struct rpcrdma_buffer *);
const char *ro_displayname;
};
@@ -413,6 +420,8 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
+struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
+void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
void rpcrdma_buffer_put(struct rpcrdma_req *);
void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
@@ -425,6 +434,9 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
+int frwr_alloc_recovery_wq(void);
+void frwr_destroy_recovery_wq(void);
+
/*
* Wrappers for chunk registration, shared by read/write chunk code.
*/