summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2026-04-24 14:20:03 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2026-04-24 14:20:03 -0700
commitb85900e91c8402bedc1db14e6d293e26f25d30d4 (patch)
tree22ff0ae976fab5251d72ad8f9c4552034f32a421 /net
parentac2dc6d57425ffa9629941d7c9d7c0e51082cb5a (diff)
parente6614b88d59d110ee1a80ed0826e34f24dd35c96 (diff)
Merge tag 'nfs-for-7.1-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
Pull NFS client updates from Trond Myklebust: "Bugfixes: - Fix handling of ENOSPC so that if we have to resend writes, they are written synchronously - SUNRPC RDMA transport fixes from Chuck - Several fixes for delegated timestamps in NFSv4.2 - Failure to obtain a directory delegation should not cause stat() to fail with NFSv4 - Rename was failing to update timestamps when a directory delegation is held on NFSv4 - Ensure we check rsize/wsize after crossing a NFSv4 filesystem boundary - NFSv4/pnfs: - If the server is down, retry the layout returns on reboot - Fallback to MDS could result in a short write being incorrectly logged Cleanups: - Use memcpy_and_pad in decode_fh" * tag 'nfs-for-7.1-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (21 commits) NFS: Fix RCU dereference of cl_xprt in nfs_compare_super_address NFS: remove redundant __private attribute from nfs_page_class NFSv4.2: fix CLONE/COPY attrs in presence of delegated attributes NFS: fix writeback in presence of errors nfs: use memcpy_and_pad in decode_fh NFSv4.1: Apply session size limits on clone path NFSv4: retry GETATTR if GET_DIR_DELEGATION failed NFS: fix RENAME attr in presence of directory delegations pnfs/flexfiles: validate ds_versions_cnt is non-zero NFS/blocklayout: print each device used for SCSI layouts xprtrdma: Post receive buffers after RPC completion xprtrdma: Scale receive batch size with credit window xprtrdma: Replace rpcrdma_mr_seg with xdr_buf cursor xprtrdma: Decouple frwr_wp_create from frwr_map xprtrdma: Close lost-wakeup race in xprt_rdma_alloc_slot xprtrdma: Avoid 250 ms delay on backlog wakeup xprtrdma: Close sendctx get/put race that can block a transport nfs: update inode ctime after removexattr operation nfs: fix utimensat() for atime with delegated timestamps NFS: improve "Server wrote zero bytes" error ...
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/xprt.c16
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c179
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c177
-rw-r--r--net/sunrpc/xprtrdma/transport.c17
-rw-r--r--net/sunrpc/xprtrdma/verbs.c19
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h43
6 files changed, 290 insertions, 161 deletions
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 4fbb57a29704..48a3618cbb29 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1663,6 +1663,22 @@ void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
}
EXPORT_SYMBOL_GPL(xprt_add_backlog);
+/**
+ * xprt_add_backlog_noncongested - queue task on backlog
+ * @xprt: transport whose backlog queue receives the task
+ * @task: task to queue
+ *
+ * Like xprt_add_backlog, but does not set XPRT_CONGESTED.
+ * For transports whose free_slot path does not synchronize
+ * with xprt_throttle_congested via reserve_lock.
+ */
+void xprt_add_backlog_noncongested(struct rpc_xprt *xprt,
+ struct rpc_task *task)
+{
+ rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
+}
+EXPORT_SYMBOL_GPL(xprt_add_backlog_noncongested);
+
static bool __xprt_set_rq(struct rpc_task *task, void *data)
{
struct rpc_rqst *req = data;
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 31434aeb8e29..7f79a0a2601e 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -244,9 +244,10 @@ int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device)
}
ep->re_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
ep->re_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
+ ep->re_recv_batch = ep->re_max_requests >> 2;
ep->re_attr.cap.max_recv_wr = ep->re_max_requests;
ep->re_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
- ep->re_attr.cap.max_recv_wr += RPCRDMA_MAX_RECV_BATCH;
+ ep->re_attr.cap.max_recv_wr += ep->re_recv_batch;
ep->re_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
ep->re_max_rdma_segs =
@@ -268,10 +269,9 @@ int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device)
}
/**
- * frwr_map - Register a memory region
+ * frwr_map - Register a memory region from an xdr_buf cursor
* @r_xprt: controlling transport
- * @seg: memory region co-ordinates
- * @nsegs: number of segments remaining
+ * @cur: cursor tracking position within the xdr_buf
* @writing: true when RDMA Write will be used
* @xid: XID of RPC using the registered memory
* @mr: MR to fill in
@@ -279,34 +279,104 @@ int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device)
* Prepare a REG_MR Work Request to register a memory region
* for remote access via RDMA READ or RDMA WRITE.
*
- * Returns the next segment or a negative errno pointer.
- * On success, @mr is filled in.
+ * Returns 0 on success (cursor advanced past consumed data,
+ * @mr populated) or a negative errno on failure.
*/
-struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
- struct rpcrdma_mr_seg *seg,
- int nsegs, bool writing, __be32 xid,
- struct rpcrdma_mr *mr)
+int frwr_map(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_xdr_cursor *cur,
+ bool writing, __be32 xid,
+ struct rpcrdma_mr *mr)
{
struct rpcrdma_ep *ep = r_xprt->rx_ep;
+ const struct xdr_buf *xdrbuf = cur->xc_buf;
+ bool sg_gaps = ep->re_mrtype == IB_MR_TYPE_SG_GAPS;
+ unsigned int max_depth = ep->re_max_fr_depth;
struct ib_reg_wr *reg_wr;
int i, n, dma_nents;
struct ib_mr *ibmr;
u8 key;
- if (nsegs > ep->re_max_fr_depth)
- nsegs = ep->re_max_fr_depth;
- for (i = 0; i < nsegs;) {
- sg_set_page(&mr->mr_sg[i], seg->mr_page,
- seg->mr_len, seg->mr_offset);
-
- ++seg;
- ++i;
- if (ep->re_mrtype == IB_MR_TYPE_SG_GAPS)
- continue;
- if ((i < nsegs && seg->mr_offset) ||
- offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
- break;
+ i = 0;
+
+ /* Head kvec */
+ if (!(cur->xc_flags & XC_HEAD_DONE)) {
+ const struct kvec *head = &xdrbuf->head[0];
+
+ sg_set_page(&mr->mr_sg[i],
+ virt_to_page(head->iov_base),
+ head->iov_len,
+ offset_in_page(head->iov_base));
+ cur->xc_flags |= XC_HEAD_DONE;
+ i++;
+ /* Without sg-gap support, each non-contiguous region
+ * must be registered as a separate MR. Returning
+ * here after the head kvec causes the caller to
+ * invoke frwr_map() again for the page list and
+ * tail.
+ */
+ if (!sg_gaps)
+ goto finish;
}
+
+ /* Page list */
+ if (!(cur->xc_flags & XC_PAGES_DONE) && xdrbuf->page_len) {
+ unsigned int page_base, remaining;
+ struct page **ppages;
+
+ remaining = xdrbuf->page_len - cur->xc_page_offset;
+ page_base = offset_in_page(xdrbuf->page_base +
+ cur->xc_page_offset);
+ ppages = xdrbuf->pages +
+ ((xdrbuf->page_base + cur->xc_page_offset)
+ >> PAGE_SHIFT);
+
+ while (remaining > 0 && i < max_depth) {
+ unsigned int len;
+
+ len = min_t(unsigned int,
+ PAGE_SIZE - page_base, remaining);
+ sg_set_page(&mr->mr_sg[i], *ppages,
+ len, page_base);
+ cur->xc_page_offset += len;
+ i++;
+ ppages++;
+ remaining -= len;
+
+ if (!sg_gaps && remaining > 0 &&
+ offset_in_page(page_base + len))
+ goto finish;
+ page_base = 0;
+ }
+ if (remaining == 0)
+ cur->xc_flags |= XC_PAGES_DONE;
+ } else if (!(cur->xc_flags & XC_PAGES_DONE)) {
+ cur->xc_flags |= XC_PAGES_DONE;
+ }
+
+ /* Tail kvec */
+ if (!(cur->xc_flags & XC_TAIL_DONE) && xdrbuf->tail[0].iov_len &&
+ i < max_depth) {
+ const struct kvec *tail = &xdrbuf->tail[0];
+
+ if (!sg_gaps && i > 0) {
+ struct scatterlist *prev = &mr->mr_sg[i - 1];
+
+ if (offset_in_page(prev->offset + prev->length) ||
+ offset_in_page(tail->iov_base))
+ goto finish;
+ }
+ sg_set_page(&mr->mr_sg[i],
+ virt_to_page(tail->iov_base),
+ tail->iov_len,
+ offset_in_page(tail->iov_base));
+ cur->xc_flags |= XC_TAIL_DONE;
+ i++;
+ } else if (!(cur->xc_flags & XC_TAIL_DONE) &&
+ !xdrbuf->tail[0].iov_len) {
+ cur->xc_flags |= XC_TAIL_DONE;
+ }
+
+finish:
mr->mr_dir = rpcrdma_data_dir(writing);
mr->mr_nents = i;
@@ -338,15 +408,15 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
mr->mr_offset = ibmr->iova;
trace_xprtrdma_mr_map(mr);
- return seg;
+ return 0;
out_dmamap_err:
trace_xprtrdma_frwr_sgerr(mr, i);
- return ERR_PTR(-EIO);
+ return -EIO;
out_mapmr_err:
trace_xprtrdma_frwr_maperr(mr, n);
- return ERR_PTR(-EIO);
+ return -EIO;
}
/**
@@ -669,9 +739,13 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
int frwr_wp_create(struct rpcrdma_xprt *r_xprt)
{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ep *ep = r_xprt->rx_ep;
- struct rpcrdma_mr_seg seg;
+ struct ib_reg_wr *reg_wr;
struct rpcrdma_mr *mr;
+ struct ib_mr *ibmr;
+ int dma_nents;
+ int ret;
mr = rpcrdma_mr_get(r_xprt);
if (!mr)
@@ -679,11 +753,39 @@ int frwr_wp_create(struct rpcrdma_xprt *r_xprt)
mr->mr_req = NULL;
ep->re_write_pad_mr = mr;
- seg.mr_len = XDR_UNIT;
- seg.mr_page = virt_to_page(ep->re_write_pad);
- seg.mr_offset = offset_in_page(ep->re_write_pad);
- if (IS_ERR(frwr_map(r_xprt, &seg, 1, true, xdr_zero, mr)))
- return -EIO;
+ sg_init_table(mr->mr_sg, 1);
+ sg_set_page(mr->mr_sg, virt_to_page(ep->re_write_pad),
+ XDR_UNIT, offset_in_page(ep->re_write_pad));
+
+ mr->mr_dir = DMA_FROM_DEVICE;
+ mr->mr_nents = 1;
+ dma_nents = ib_dma_map_sg(ep->re_id->device, mr->mr_sg,
+ mr->mr_nents, mr->mr_dir);
+ if (!dma_nents) {
+ ret = -EIO;
+ goto out_mr;
+ }
+ mr->mr_device = ep->re_id->device;
+
+ ibmr = mr->mr_ibmr;
+ if (ib_map_mr_sg(ibmr, mr->mr_sg, dma_nents, NULL,
+ PAGE_SIZE) != dma_nents) {
+ ret = -EIO;
+ goto out_unmap;
+ }
+
+ /* IOVA is not tagged with an XID; the write-pad is not RPC-specific. */
+ ib_update_fast_reg_key(ibmr, ib_inc_rkey(ibmr->rkey));
+
+ reg_wr = &mr->mr_regwr;
+ reg_wr->mr = ibmr;
+ reg_wr->key = ibmr->rkey;
+ reg_wr->access = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE;
+
+ mr->mr_handle = ibmr->rkey;
+ mr->mr_length = ibmr->length;
+ mr->mr_offset = ibmr->iova;
+
trace_xprtrdma_mr_fastreg(mr);
mr->mr_cqe.done = frwr_wc_fastreg;
@@ -693,5 +795,16 @@ int frwr_wp_create(struct rpcrdma_xprt *r_xprt)
mr->mr_regwr.wr.opcode = IB_WR_REG_MR;
mr->mr_regwr.wr.send_flags = 0;
- return ib_post_send(ep->re_id->qp, &mr->mr_regwr.wr, NULL);
+ ret = ib_post_send(ep->re_id->qp, &mr->mr_regwr.wr, NULL);
+ if (!ret)
+ return 0;
+
+out_unmap:
+ frwr_mr_unmap(mr);
+out_mr:
+ ep->re_write_pad_mr = NULL;
+ spin_lock(&buf->rb_lock);
+ rpcrdma_mr_push(mr, &buf->rb_mrs);
+ spin_unlock(&buf->rb_lock);
+ return ret;
}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 3aac1456e23e..0e0f21974710 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -200,67 +200,30 @@ rpcrdma_alloc_sparse_pages(struct xdr_buf *buf)
return 0;
}
-/* Convert @vec to a single SGL element.
- *
- * Returns pointer to next available SGE, and bumps the total number
- * of SGEs consumed.
- */
-static struct rpcrdma_mr_seg *
-rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
- unsigned int *n)
+static void
+rpcrdma_xdr_cursor_init(struct rpcrdma_xdr_cursor *cur,
+ const struct xdr_buf *xdrbuf,
+ unsigned int pos, enum rpcrdma_chunktype type)
{
- seg->mr_page = virt_to_page(vec->iov_base);
- seg->mr_offset = offset_in_page(vec->iov_base);
- seg->mr_len = vec->iov_len;
- ++seg;
- ++(*n);
- return seg;
+ cur->xc_buf = xdrbuf;
+ cur->xc_page_offset = 0;
+ cur->xc_flags = 0;
+
+ if (pos != 0)
+ cur->xc_flags |= XC_HEAD_DONE;
+ if (!xdrbuf->page_len)
+ cur->xc_flags |= XC_PAGES_DONE;
+ if (type == rpcrdma_readch || type == rpcrdma_writech ||
+ !xdrbuf->tail[0].iov_len)
+ cur->xc_flags |= XC_TAIL_DONE;
}
-/* Convert @xdrbuf into SGEs no larger than a page each. As they
- * are registered, these SGEs are then coalesced into RDMA segments
- * when the selected memreg mode supports it.
- *
- * Returns positive number of SGEs consumed, or a negative errno.
- */
-
-static int
-rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
- unsigned int pos, enum rpcrdma_chunktype type,
- struct rpcrdma_mr_seg *seg)
+static bool
+rpcrdma_xdr_cursor_done(const struct rpcrdma_xdr_cursor *cur)
{
- unsigned long page_base;
- unsigned int len, n;
- struct page **ppages;
-
- n = 0;
- if (pos == 0)
- seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n);
-
- len = xdrbuf->page_len;
- ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
- page_base = offset_in_page(xdrbuf->page_base);
- while (len) {
- seg->mr_page = *ppages;
- seg->mr_offset = page_base;
- seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
- len -= seg->mr_len;
- ++ppages;
- ++seg;
- ++n;
- page_base = 0;
- }
-
- if (type == rpcrdma_readch || type == rpcrdma_writech)
- goto out;
-
- if (xdrbuf->tail[0].iov_len)
- rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n);
-
-out:
- if (unlikely(n > RPCRDMA_MAX_SEGS))
- return -EIO;
- return n;
+ return (cur->xc_flags & (XC_HEAD_DONE | XC_PAGES_DONE |
+ XC_TAIL_DONE)) ==
+ (XC_HEAD_DONE | XC_PAGES_DONE | XC_TAIL_DONE);
}
static int
@@ -292,11 +255,10 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
return 0;
}
-static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
- struct rpcrdma_req *req,
- struct rpcrdma_mr_seg *seg,
- int nsegs, bool writing,
- struct rpcrdma_mr **mr)
+static int rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct rpcrdma_xdr_cursor *cur,
+ bool writing, struct rpcrdma_mr **mr)
{
*mr = rpcrdma_mr_pop(&req->rl_free_mrs);
if (!*mr) {
@@ -307,13 +269,13 @@ static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
}
rpcrdma_mr_push(*mr, &req->rl_registered);
- return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
+ return frwr_map(r_xprt, cur, writing, req->rl_slot.rq_xid, *mr);
out_getmr_err:
trace_xprtrdma_nomrs_err(r_xprt, req);
xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
rpcrdma_mrs_refresh(r_xprt);
- return ERR_PTR(-EAGAIN);
+ return -EAGAIN;
}
/* Register and XDR encode the Read list. Supports encoding a list of read
@@ -336,10 +298,10 @@ static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
enum rpcrdma_chunktype rtype)
{
struct xdr_stream *xdr = &req->rl_stream;
- struct rpcrdma_mr_seg *seg;
+ struct rpcrdma_xdr_cursor cur;
struct rpcrdma_mr *mr;
unsigned int pos;
- int nsegs;
+ int ret;
if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped)
goto done;
@@ -347,24 +309,20 @@ static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
pos = rqst->rq_snd_buf.head[0].iov_len;
if (rtype == rpcrdma_areadch)
pos = 0;
- seg = req->rl_segments;
- nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
- rtype, seg);
- if (nsegs < 0)
- return nsegs;
+ rpcrdma_xdr_cursor_init(&cur, &rqst->rq_snd_buf, pos, rtype);
do {
- seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr);
- if (IS_ERR(seg))
- return PTR_ERR(seg);
+ ret = rpcrdma_mr_prepare(r_xprt, req, &cur, false, &mr);
+ if (ret)
+ return ret;
if (encode_read_segment(xdr, mr, pos) < 0)
return -EMSGSIZE;
- trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs);
+ trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr,
+ rpcrdma_xdr_cursor_done(&cur));
r_xprt->rx_stats.read_chunk_count++;
- nsegs -= mr->mr_nents;
- } while (nsegs);
+ } while (!rpcrdma_xdr_cursor_done(&cur));
done:
if (xdr_stream_encode_item_absent(xdr) < 0)
@@ -394,20 +352,16 @@ static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_ep *ep = r_xprt->rx_ep;
- struct rpcrdma_mr_seg *seg;
+ struct rpcrdma_xdr_cursor cur;
struct rpcrdma_mr *mr;
- int nsegs, nchunks;
+ int nchunks, ret;
__be32 *segcount;
if (wtype != rpcrdma_writech)
goto done;
- seg = req->rl_segments;
- nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
- rqst->rq_rcv_buf.head[0].iov_len,
- wtype, seg);
- if (nsegs < 0)
- return nsegs;
+ rpcrdma_xdr_cursor_init(&cur, &rqst->rq_rcv_buf,
+ rqst->rq_rcv_buf.head[0].iov_len, wtype);
if (xdr_stream_encode_item_present(xdr) < 0)
return -EMSGSIZE;
@@ -418,30 +372,30 @@ static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
nchunks = 0;
do {
- seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
- if (IS_ERR(seg))
- return PTR_ERR(seg);
+ ret = rpcrdma_mr_prepare(r_xprt, req, &cur, true, &mr);
+ if (ret)
+ return ret;
if (encode_rdma_segment(xdr, mr) < 0)
return -EMSGSIZE;
- trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs);
+ trace_xprtrdma_chunk_write(rqst->rq_task, mr,
+ rpcrdma_xdr_cursor_done(&cur));
r_xprt->rx_stats.write_chunk_count++;
r_xprt->rx_stats.total_rdma_request += mr->mr_length;
nchunks++;
- nsegs -= mr->mr_nents;
- } while (nsegs);
+ } while (!rpcrdma_xdr_cursor_done(&cur));
if (xdr_pad_size(rqst->rq_rcv_buf.page_len)) {
if (encode_rdma_segment(xdr, ep->re_write_pad_mr) < 0)
return -EMSGSIZE;
trace_xprtrdma_chunk_wp(rqst->rq_task, ep->re_write_pad_mr,
- nsegs);
+ true);
r_xprt->rx_stats.write_chunk_count++;
- r_xprt->rx_stats.total_rdma_request += mr->mr_length;
+ r_xprt->rx_stats.total_rdma_request +=
+ ep->re_write_pad_mr->mr_length;
nchunks++;
- nsegs -= mr->mr_nents;
}
/* Update count of segments in this Write chunk */
@@ -471,9 +425,9 @@ static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
enum rpcrdma_chunktype wtype)
{
struct xdr_stream *xdr = &req->rl_stream;
- struct rpcrdma_mr_seg *seg;
+ struct rpcrdma_xdr_cursor cur;
struct rpcrdma_mr *mr;
- int nsegs, nchunks;
+ int nchunks, ret;
__be32 *segcount;
if (wtype != rpcrdma_replych) {
@@ -482,10 +436,7 @@ static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
return 0;
}
- seg = req->rl_segments;
- nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
- if (nsegs < 0)
- return nsegs;
+ rpcrdma_xdr_cursor_init(&cur, &rqst->rq_rcv_buf, 0, wtype);
if (xdr_stream_encode_item_present(xdr) < 0)
return -EMSGSIZE;
@@ -496,19 +447,19 @@ static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
nchunks = 0;
do {
- seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
- if (IS_ERR(seg))
- return PTR_ERR(seg);
+ ret = rpcrdma_mr_prepare(r_xprt, req, &cur, true, &mr);
+ if (ret)
+ return ret;
if (encode_rdma_segment(xdr, mr) < 0)
return -EMSGSIZE;
- trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs);
+ trace_xprtrdma_chunk_reply(rqst->rq_task, mr,
+ rpcrdma_xdr_cursor_done(&cur));
r_xprt->rx_stats.reply_chunk_count++;
r_xprt->rx_stats.total_rdma_request += mr->mr_length;
nchunks++;
- nsegs -= mr->mr_nents;
- } while (nsegs);
+ } while (!rpcrdma_xdr_cursor_done(&cur));
/* Update count of segments in the Reply chunk */
*segcount = cpu_to_be32(nchunks);
@@ -1471,7 +1422,6 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
credits = 1; /* don't deadlock */
else if (credits > r_xprt->rx_ep->re_max_requests)
credits = r_xprt->rx_ep->re_max_requests;
- rpcrdma_post_recvs(r_xprt, credits + (buf->rb_bc_srv_max_requests << 1));
if (buf->rb_credits != credits)
rpcrdma_update_cwnd(r_xprt, credits);
@@ -1490,15 +1440,20 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
/* LocalInv completion will complete the RPC */
else
kref_put(&req->rl_kref, rpcrdma_reply_done);
- return;
-out_badversion:
- trace_xprtrdma_reply_vers_err(rep);
- goto out;
+out_post:
+ rpcrdma_post_recvs(r_xprt,
+ credits + (buf->rb_bc_srv_max_requests << 1));
+ return;
out_norqst:
spin_unlock(&xprt->queue_lock);
trace_xprtrdma_reply_rqst_err(rep);
+ rpcrdma_rep_put(buf, rep);
+ goto out_post;
+
+out_badversion:
+ trace_xprtrdma_reply_vers_err(rep);
goto out;
out_shortreply:
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 9a8ce5df83ca..61706df5e485 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -510,8 +510,21 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
return;
out_sleep:
- task->tk_status = -ENOMEM;
- xprt_add_backlog(xprt, task);
+ task->tk_status = -EAGAIN;
+ xprt_add_backlog_noncongested(xprt, task);
+ /* A buffer freed between buffer_get and rpc_sleep_on
+ * goes back to the pool with no waiter to wake.
+ * Re-check after joining the backlog to close that gap.
+ */
+ req = rpcrdma_buffer_get(&r_xprt->rx_buf);
+ if (req) {
+ struct rpc_rqst *rqst = &req->rl_slot;
+
+ if (!xprt_wake_up_backlog(xprt, rqst)) {
+ memset(rqst, 0, sizeof(*rqst));
+ rpcrdma_buffer_put(&r_xprt->rx_buf, req);
+ }
+ }
}
/**
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index b51a162885bb..aecf9c0a153f 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -708,6 +708,18 @@ out_emptyq:
*/
xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
r_xprt->rx_stats.empty_sendctx_q++;
+
+ /* Recheck: a Send completion between the ring-empty test
+ * and the set_bit could cause its xprt_write_space() to
+ * miss, leaving XPRT_WRITE_SPACE set with a non-full ring.
+ * The smp_mb__after_atomic() pairs with smp_store_release()
+ * in rpcrdma_sendctx_put_locked().
+ */
+ smp_mb__after_atomic();
+ next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
+ if (next_head != READ_ONCE(buf->rb_sc_tail))
+ xprt_write_space(&r_xprt->rx_xprt);
+
return NULL;
}
@@ -739,7 +751,10 @@ static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
} while (buf->rb_sc_ctxs[next_tail] != sc);
- /* Paired with READ_ONCE */
+ /* Paired with READ_ONCE in rpcrdma_sendctx_get_locked():
+ * both the fast-path ring-full test and the post-set_bit
+ * recheck in the slow path depend on this store-release.
+ */
smp_store_release(&buf->rb_sc_tail, next_tail);
xprt_write_space(&r_xprt->rx_xprt);
@@ -1359,7 +1374,7 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed)
if (likely(ep->re_receive_count > needed))
goto out;
needed -= ep->re_receive_count;
- needed += RPCRDMA_MAX_RECV_BATCH;
+ needed += ep->re_recv_batch;
if (atomic_inc_return(&ep->re_receiving) > 1)
goto out_dec;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 8147d2b41494..f53a77472724 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -96,6 +96,7 @@ struct rpcrdma_ep {
struct rpcrdma_notification re_rn;
int re_receive_count;
unsigned int re_max_requests; /* depends on device */
+ unsigned int re_recv_batch;
unsigned int re_inline_send; /* negotiated */
unsigned int re_inline_recv; /* negotiated */
@@ -283,19 +284,36 @@ struct rpcrdma_mr {
* registered or invalidated. Must handle a Reply chunk:
*/
enum {
- RPCRDMA_MAX_IOV_SEGS = 3,
+ RPCRDMA_MAX_IOV_SEGS = 3, /* head, page-boundary, tail */
RPCRDMA_MAX_DATA_SEGS = ((1 * 1024 * 1024) / PAGE_SIZE) + 1,
RPCRDMA_MAX_SEGS = RPCRDMA_MAX_DATA_SEGS +
RPCRDMA_MAX_IOV_SEGS,
};
-/* Arguments for DMA mapping and registration */
-struct rpcrdma_mr_seg {
- u32 mr_len; /* length of segment */
- struct page *mr_page; /* underlying struct page */
- u64 mr_offset; /* IN: page offset, OUT: iova */
+/**
+ * struct rpcrdma_xdr_cursor - tracks position within an xdr_buf
+ * for iterative MR registration
+ * @xc_buf: the xdr_buf being iterated
+ * @xc_page_offset: byte offset into the page region consumed so far
+ * @xc_flags: combination of XC_* bits
+ *
+ * Each XC_*_DONE flag indicates that this region has no
+ * remaining MR registration work. That condition holds both when the region
+ * has already been registered by a prior frwr_map() call and
+ * when the region is excluded from this chunk type (pre-set
+ * at init time by rpcrdma_xdr_cursor_init()). frwr_map()
+ * treats the two cases identically: skip the region.
+ */
+struct rpcrdma_xdr_cursor {
+ const struct xdr_buf *xc_buf;
+ unsigned int xc_page_offset;
+ unsigned int xc_flags;
};
+#define XC_HEAD_DONE BIT(0)
+#define XC_PAGES_DONE BIT(1)
+#define XC_TAIL_DONE BIT(2)
+
/* The Send SGE array is provisioned to send a maximum size
* inline request:
* - RPC-over-RDMA header
@@ -330,7 +348,6 @@ struct rpcrdma_req {
struct list_head rl_free_mrs;
struct list_head rl_registered;
- struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
};
static inline struct rpcrdma_req *
@@ -450,8 +467,8 @@ rpcrdma_portstr(const struct rpcrdma_xprt *r_xprt)
}
/* Setting this to 0 ensures interoperability with early servers.
- * Setting this to 1 enhances certain unaligned read/write performance.
- * Default is 0, see sysctl entry and rpc_rdma.c rpcrdma_convert_iovs() */
+ * Setting this to 1 enhances unaligned read/write performance.
+ * Default is 0, see sysctl entry and rpc_rdma.c */
extern int xprt_rdma_pad_optimize;
/* This setting controls the hunt for a supported memory
@@ -535,10 +552,10 @@ void frwr_reset(struct rpcrdma_req *req);
int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device);
int frwr_mr_init(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr);
void frwr_mr_release(struct rpcrdma_mr *mr);
-struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
- struct rpcrdma_mr_seg *seg,
- int nsegs, bool writing, __be32 xid,
- struct rpcrdma_mr *mr);
+int frwr_map(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_xdr_cursor *cur,
+ bool writing, __be32 xid,
+ struct rpcrdma_mr *mr);
int frwr_send(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);