From 24297c7cd3f9389374bb13d1ca578c335d2866b9 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 6 Mar 2026 16:56:22 -0500 Subject: xprtrdma: Close sendctx get/put race that can block a transport rpcrdma_sendctx_get_locked() and rpcrdma_sendctx_put_locked() can race in a way that leaves XPRT_WRITE_SPACE set permanently, blocking all further sends on the transport: get_locked put_locked (Send completion) ---------- -------------------------- read rb_sc_tail -> ring full advance rb_sc_tail xprt_write_space(): test_bit(WRITE_SPACE) -> not set, return set_bit(WRITE_SPACE) return NULL (-EAGAIN) After the sender releases XPRT_LOCKED, the release path refuses to wake the next task because XPRT_WRITE_SPACE is set. The sender retries, finds XPRT_WRITE_SPACE still set, and sleeps on xprt_sending. No further Send completions arrive to clear the flag because no new Sends can be posted. With nconnect, the stalled transport's share of congestion credits are never returned, starving the remaining transports as well. Fixes: 05eb06d86685 ("xprtrdma: Fix occasional transport deadlock") Signed-off-by: Chuck Lever Signed-off-by: Trond Myklebust --- net/sunrpc/xprtrdma/verbs.c | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index b51a162885bb..90fd83f2d846 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -708,6 +708,18 @@ out_emptyq: */ xprt_wait_for_buffer_space(&r_xprt->rx_xprt); r_xprt->rx_stats.empty_sendctx_q++; + + /* Recheck: a Send completion between the ring-empty test + * and the set_bit could cause its xprt_write_space() to + * miss, leaving XPRT_WRITE_SPACE set with a non-full ring. + * The smp_mb__after_atomic() pairs with smp_store_release() + * in rpcrdma_sendctx_put_locked(). + */ + smp_mb__after_atomic(); + next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head); + if (next_head != READ_ONCE(buf->rb_sc_tail)) + xprt_write_space(&r_xprt->rx_xprt); + return NULL; } @@ -739,7 +751,10 @@ static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt, } while (buf->rb_sc_ctxs[next_tail] != sc); - /* Paired with READ_ONCE */ + /* Paired with READ_ONCE in rpcrdma_sendctx_get_locked(): + * both the fast-path ring-full test and the post-set_bit + * recheck in the slow path depend on this store-release. + */ smp_store_release(&buf->rb_sc_tail, next_tail); xprt_write_space(&r_xprt->rx_xprt); -- cgit v1.2.3 From 100142093e22b3f7741ac88e94878bb3694e306f Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 6 Mar 2026 16:56:23 -0500 Subject: xprtrdma: Avoid 250 ms delay on backlog wakeup Commit a721035477fb ("SUNRPC/xprt: async tasks mustn't block waiting for memory") changed xprt_rdma_alloc_slot() to set tk_status to -ENOMEM so that call_reserveresult() would sleep HZ/4 before retrying. That rationale applies to xprt_dynamic_alloc_slot(), where an immediate retry under memory pressure wastes CPU, but not to the RDMA backlog path: a task woken from the backlog has a slot waiting for it, so the 250 ms rpc_delay adds latency without benefit. This also aligns the code with the existing kernel-doc for xprt_rdma_alloc_slot(), which already documented %-EAGAIN. Fixes: a721035477fb ("SUNRPC/xprt: async tasks mustn't block waiting for memory") Signed-off-by: Chuck Lever Signed-off-by: Trond Myklebust --- net/sunrpc/xprtrdma/transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 9a8ce5df83ca..ca079439f9cc 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -510,7 +510,7 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) return; out_sleep: - task->tk_status = -ENOMEM; + task->tk_status = -EAGAIN; xprt_add_backlog(xprt, task); } -- cgit v1.2.3 From 765bde47fe7f197dabeb12da76831f40d0b20377 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 6 Mar 2026 16:56:24 -0500 Subject: xprtrdma: Close lost-wakeup race in xprt_rdma_alloc_slot xprt_rdma_alloc_slot() and xprt_rdma_free_slot() lack serialization between the buffer pool and the backlog queue. A buffer freed after rpcrdma_buffer_get() finds the pool empty but before rpc_sleep_on() places the task on the backlog is returned to the pool with no waiter to wake, leaving the task stuck on the backlog indefinitely. After joining the backlog, re-check the pool and route any recovered buffer through xprt_wake_up_backlog(), whose queue lock serializes with concurrent wakeups and avoids double-assignment of slots. Because xprt_rdma_free_slot() does not hold reserve_lock, the XPRT_CONGESTED double-check in xprt_throttle_congested() is ineffective: a task can join the backlog through that path after free_slot has already found it empty and cleared the bit. Avoid this by using xprt_add_backlog_noncongested(), which queues the task without setting XPRT_CONGESTED, so every allocation reaches xprt_rdma_alloc_slot() and its post-sleep re-check. Fixes: edb41e61a54e ("xprtrdma: Make rpc_rqst part of rpcrdma_req") Signed-off-by: Chuck Lever Signed-off-by: Trond Myklebust --- net/sunrpc/xprt.c | 16 ++++++++++++++++ net/sunrpc/xprtrdma/transport.c | 15 ++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 4fbb57a29704..48a3618cbb29 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1663,6 +1663,22 @@ void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) } EXPORT_SYMBOL_GPL(xprt_add_backlog); +/** + * xprt_add_backlog_noncongested - queue task on backlog + * @xprt: transport whose backlog queue receives the task + * @task: task to queue + * + * Like xprt_add_backlog, but does not set XPRT_CONGESTED. + * For transports whose free_slot path does not synchronize + * with xprt_throttle_congested via reserve_lock. + */ +void xprt_add_backlog_noncongested(struct rpc_xprt *xprt, + struct rpc_task *task) +{ + rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init); +} +EXPORT_SYMBOL_GPL(xprt_add_backlog_noncongested); + static bool __xprt_set_rq(struct rpc_task *task, void *data) { struct rpc_rqst *req = data; diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index ca079439f9cc..61706df5e485 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -511,7 +511,20 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) out_sleep: task->tk_status = -EAGAIN; - xprt_add_backlog(xprt, task); + xprt_add_backlog_noncongested(xprt, task); + /* A buffer freed between buffer_get and rpc_sleep_on + * goes back to the pool with no waiter to wake. + * Re-check after joining the backlog to close that gap. + */ + req = rpcrdma_buffer_get(&r_xprt->rx_buf); + if (req) { + struct rpc_rqst *rqst = &req->rl_slot; + + if (!xprt_wake_up_backlog(xprt, rqst)) { + memset(rqst, 0, sizeof(*rqst)); + rpcrdma_buffer_put(&r_xprt->rx_buf, req); + } + } } /** -- cgit v1.2.3 From 6f2e565fb3bd68636e4920223e599d70861f8ba6 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 6 Mar 2026 16:56:25 -0500 Subject: xprtrdma: Decouple frwr_wp_create from frwr_map frwr_wp_create is the only caller of frwr_map outside the encode path. It registers a single 4-byte write-pad region from a stack- local rpcrdma_mr_seg. Inlining the registration logic directly (sg_init_table + sg_set_page + ib_dma_map_sg + ib_map_mr_sg + IOVA mangle + reg_wr setup) eliminates the coupling that would otherwise complicate the removal of rpcrdma_mr_seg from frwr_map's interface. The inlined version adds a proper error-unwind ladder: on failure, the DMA mapping (if established) is released, ep->re_write_pad_mr is cleared, and the MR is returned to the transport free list. The old frwr_map-based code relied on rpcrdma_mrs_destroy at teardown to reclaim partially-initialized MRs. This is a one-time setup path; duplicating ~20 lines is a reasonable tradeoff for decoupling the write-pad registration from the data- path MR registration. Signed-off-by: Chuck Lever Signed-off-by: Trond Myklebust --- net/sunrpc/xprtrdma/frwr_ops.c | 57 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 50 insertions(+), 7 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 31434aeb8e29..4331b0b65f4c 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -669,9 +669,13 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) */ int frwr_wp_create(struct rpcrdma_xprt *r_xprt) { + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ep *ep = r_xprt->rx_ep; - struct rpcrdma_mr_seg seg; + struct ib_reg_wr *reg_wr; struct rpcrdma_mr *mr; + struct ib_mr *ibmr; + int dma_nents; + int ret; mr = rpcrdma_mr_get(r_xprt); if (!mr) @@ -679,11 +683,39 @@ int frwr_wp_create(struct rpcrdma_xprt *r_xprt) mr->mr_req = NULL; ep->re_write_pad_mr = mr; - seg.mr_len = XDR_UNIT; - seg.mr_page = virt_to_page(ep->re_write_pad); - seg.mr_offset = offset_in_page(ep->re_write_pad); - if (IS_ERR(frwr_map(r_xprt, &seg, 1, true, xdr_zero, mr))) - return -EIO; + sg_init_table(mr->mr_sg, 1); + sg_set_page(mr->mr_sg, virt_to_page(ep->re_write_pad), + XDR_UNIT, offset_in_page(ep->re_write_pad)); + + mr->mr_dir = DMA_FROM_DEVICE; + mr->mr_nents = 1; + dma_nents = ib_dma_map_sg(ep->re_id->device, mr->mr_sg, + mr->mr_nents, mr->mr_dir); + if (!dma_nents) { + ret = -EIO; + goto out_mr; + } + mr->mr_device = ep->re_id->device; + + ibmr = mr->mr_ibmr; + if (ib_map_mr_sg(ibmr, mr->mr_sg, dma_nents, NULL, + PAGE_SIZE) != dma_nents) { + ret = -EIO; + goto out_unmap; + } + + /* IOVA is not tagged with an XID; the write-pad is not RPC-specific. */ + ib_update_fast_reg_key(ibmr, ib_inc_rkey(ibmr->rkey)); + + reg_wr = &mr->mr_regwr; + reg_wr->mr = ibmr; + reg_wr->key = ibmr->rkey; + reg_wr->access = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE; + + mr->mr_handle = ibmr->rkey; + mr->mr_length = ibmr->length; + mr->mr_offset = ibmr->iova; + trace_xprtrdma_mr_fastreg(mr); mr->mr_cqe.done = frwr_wc_fastreg; @@ -693,5 +725,16 @@ int frwr_wp_create(struct rpcrdma_xprt *r_xprt) mr->mr_regwr.wr.opcode = IB_WR_REG_MR; mr->mr_regwr.wr.send_flags = 0; - return ib_post_send(ep->re_id->qp, &mr->mr_regwr.wr, NULL); + ret = ib_post_send(ep->re_id->qp, &mr->mr_regwr.wr, NULL); + if (!ret) + return 0; + +out_unmap: + frwr_mr_unmap(mr); +out_mr: + ep->re_write_pad_mr = NULL; + spin_lock(&buf->rb_lock); + rpcrdma_mr_push(mr, &buf->rb_mrs); + spin_unlock(&buf->rb_lock); + return ret; } -- cgit v1.2.3 From 7a079ab57c4eeff241d9abfc1ec6477cb90a6206 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 6 Mar 2026 16:56:26 -0500 Subject: xprtrdma: Replace rpcrdma_mr_seg with xdr_buf cursor The FRWR registration path converts data through three representations: xdr_buf -> rpcrdma_mr_seg[] -> scatterlist[] -> ib_map_mr_sg(). The rpcrdma_mr_seg intermediate is a relic of when multiple registration strategies existed (FMR, physical, FRWR). Only FRWR remains, so this indirection and the 6240-byte rl_segments[260] array embedded in each rpcrdma_req serve no purpose. Introduce struct rpcrdma_xdr_cursor to track position within an xdr_buf during iterative MR registration. Rewrite frwr_map to populate scatterlist entries directly from the xdr_buf regions (head kvec, page list, tail kvec). The boundary logic for non-SG_GAPS devices is simpler because the xdr_buf structure guarantees that page-region entries after the first start at offset 0, and that head/tail kvecs are separate regions that naturally break at MR boundaries. Fix a pre-existing bug in rpcrdma_encode_write_list where the write-pad statistics accumulator added mr->mr_length from the last data MR rather than the write-pad MR. The refactored code uses ep->re_write_pad_mr->mr_length. Signed-off-by: Chuck Lever Signed-off-by: Trond Myklebust --- net/sunrpc/xprtrdma/frwr_ops.c | 119 +++++++++++++++++++++++------ net/sunrpc/xprtrdma/rpc_rdma.c | 163 ++++++++++++++-------------------------- net/sunrpc/xprtrdma/xprt_rdma.h | 42 +++++++---- 3 files changed, 180 insertions(+), 144 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 4331b0b65f4c..229057d35fb8 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -268,10 +268,9 @@ int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device) } /** - * frwr_map - Register a memory region + * frwr_map - Register a memory region from an xdr_buf cursor * @r_xprt: controlling transport - * @seg: memory region co-ordinates - * @nsegs: number of segments remaining + * @cur: cursor tracking position within the xdr_buf * @writing: true when RDMA Write will be used * @xid: XID of RPC using the registered memory * @mr: MR to fill in @@ -279,34 +278,104 @@ int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device) * Prepare a REG_MR Work Request to register a memory region * for remote access via RDMA READ or RDMA WRITE. * - * Returns the next segment or a negative errno pointer. - * On success, @mr is filled in. + * Returns 0 on success (cursor advanced past consumed data, + * @mr populated) or a negative errno on failure. */ -struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_mr_seg *seg, - int nsegs, bool writing, __be32 xid, - struct rpcrdma_mr *mr) +int frwr_map(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_xdr_cursor *cur, + bool writing, __be32 xid, + struct rpcrdma_mr *mr) { struct rpcrdma_ep *ep = r_xprt->rx_ep; + const struct xdr_buf *xdrbuf = cur->xc_buf; + bool sg_gaps = ep->re_mrtype == IB_MR_TYPE_SG_GAPS; + unsigned int max_depth = ep->re_max_fr_depth; struct ib_reg_wr *reg_wr; int i, n, dma_nents; struct ib_mr *ibmr; u8 key; - if (nsegs > ep->re_max_fr_depth) - nsegs = ep->re_max_fr_depth; - for (i = 0; i < nsegs;) { - sg_set_page(&mr->mr_sg[i], seg->mr_page, - seg->mr_len, seg->mr_offset); - - ++seg; - ++i; - if (ep->re_mrtype == IB_MR_TYPE_SG_GAPS) - continue; - if ((i < nsegs && seg->mr_offset) || - offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) - break; + i = 0; + + /* Head kvec */ + if (!(cur->xc_flags & XC_HEAD_DONE)) { + const struct kvec *head = &xdrbuf->head[0]; + + sg_set_page(&mr->mr_sg[i], + virt_to_page(head->iov_base), + head->iov_len, + offset_in_page(head->iov_base)); + cur->xc_flags |= XC_HEAD_DONE; + i++; + /* Without sg-gap support, each non-contiguous region + * must be registered as a separate MR. Returning + * here after the head kvec causes the caller to + * invoke frwr_map() again for the page list and + * tail. + */ + if (!sg_gaps) + goto finish; } + + /* Page list */ + if (!(cur->xc_flags & XC_PAGES_DONE) && xdrbuf->page_len) { + unsigned int page_base, remaining; + struct page **ppages; + + remaining = xdrbuf->page_len - cur->xc_page_offset; + page_base = offset_in_page(xdrbuf->page_base + + cur->xc_page_offset); + ppages = xdrbuf->pages + + ((xdrbuf->page_base + cur->xc_page_offset) + >> PAGE_SHIFT); + + while (remaining > 0 && i < max_depth) { + unsigned int len; + + len = min_t(unsigned int, + PAGE_SIZE - page_base, remaining); + sg_set_page(&mr->mr_sg[i], *ppages, + len, page_base); + cur->xc_page_offset += len; + i++; + ppages++; + remaining -= len; + + if (!sg_gaps && remaining > 0 && + offset_in_page(page_base + len)) + goto finish; + page_base = 0; + } + if (remaining == 0) + cur->xc_flags |= XC_PAGES_DONE; + } else if (!(cur->xc_flags & XC_PAGES_DONE)) { + cur->xc_flags |= XC_PAGES_DONE; + } + + /* Tail kvec */ + if (!(cur->xc_flags & XC_TAIL_DONE) && xdrbuf->tail[0].iov_len && + i < max_depth) { + const struct kvec *tail = &xdrbuf->tail[0]; + + if (!sg_gaps && i > 0) { + struct scatterlist *prev = &mr->mr_sg[i - 1]; + + if (offset_in_page(prev->offset + prev->length) || + offset_in_page(tail->iov_base)) + goto finish; + } + sg_set_page(&mr->mr_sg[i], + virt_to_page(tail->iov_base), + tail->iov_len, + offset_in_page(tail->iov_base)); + cur->xc_flags |= XC_TAIL_DONE; + i++; + } else if (!(cur->xc_flags & XC_TAIL_DONE) && + !xdrbuf->tail[0].iov_len) { + cur->xc_flags |= XC_TAIL_DONE; + } + +finish: mr->mr_dir = rpcrdma_data_dir(writing); mr->mr_nents = i; @@ -338,15 +407,15 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, mr->mr_offset = ibmr->iova; trace_xprtrdma_mr_map(mr); - return seg; + return 0; out_dmamap_err: trace_xprtrdma_frwr_sgerr(mr, i); - return ERR_PTR(-EIO); + return -EIO; out_mapmr_err: trace_xprtrdma_frwr_maperr(mr, n); - return ERR_PTR(-EIO); + return -EIO; } /** diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 3aac1456e23e..a77e7e48aab2 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -200,67 +200,30 @@ rpcrdma_alloc_sparse_pages(struct xdr_buf *buf) return 0; } -/* Convert @vec to a single SGL element. - * - * Returns pointer to next available SGE, and bumps the total number - * of SGEs consumed. - */ -static struct rpcrdma_mr_seg * -rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, - unsigned int *n) +static void +rpcrdma_xdr_cursor_init(struct rpcrdma_xdr_cursor *cur, + const struct xdr_buf *xdrbuf, + unsigned int pos, enum rpcrdma_chunktype type) { - seg->mr_page = virt_to_page(vec->iov_base); - seg->mr_offset = offset_in_page(vec->iov_base); - seg->mr_len = vec->iov_len; - ++seg; - ++(*n); - return seg; + cur->xc_buf = xdrbuf; + cur->xc_page_offset = 0; + cur->xc_flags = 0; + + if (pos != 0) + cur->xc_flags |= XC_HEAD_DONE; + if (!xdrbuf->page_len) + cur->xc_flags |= XC_PAGES_DONE; + if (type == rpcrdma_readch || type == rpcrdma_writech || + !xdrbuf->tail[0].iov_len) + cur->xc_flags |= XC_TAIL_DONE; } -/* Convert @xdrbuf into SGEs no larger than a page each. As they - * are registered, these SGEs are then coalesced into RDMA segments - * when the selected memreg mode supports it. - * - * Returns positive number of SGEs consumed, or a negative errno. - */ - -static int -rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf, - unsigned int pos, enum rpcrdma_chunktype type, - struct rpcrdma_mr_seg *seg) +static bool +rpcrdma_xdr_cursor_done(const struct rpcrdma_xdr_cursor *cur) { - unsigned long page_base; - unsigned int len, n; - struct page **ppages; - - n = 0; - if (pos == 0) - seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n); - - len = xdrbuf->page_len; - ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); - page_base = offset_in_page(xdrbuf->page_base); - while (len) { - seg->mr_page = *ppages; - seg->mr_offset = page_base; - seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len); - len -= seg->mr_len; - ++ppages; - ++seg; - ++n; - page_base = 0; - } - - if (type == rpcrdma_readch || type == rpcrdma_writech) - goto out; - - if (xdrbuf->tail[0].iov_len) - rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n); - -out: - if (unlikely(n > RPCRDMA_MAX_SEGS)) - return -EIO; - return n; + return (cur->xc_flags & (XC_HEAD_DONE | XC_PAGES_DONE | + XC_TAIL_DONE)) == + (XC_HEAD_DONE | XC_PAGES_DONE | XC_TAIL_DONE); } static int @@ -292,11 +255,10 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, return 0; } -static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_req *req, - struct rpcrdma_mr_seg *seg, - int nsegs, bool writing, - struct rpcrdma_mr **mr) +static int rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpcrdma_xdr_cursor *cur, + bool writing, struct rpcrdma_mr **mr) { *mr = rpcrdma_mr_pop(&req->rl_free_mrs); if (!*mr) { @@ -307,13 +269,13 @@ static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt, } rpcrdma_mr_push(*mr, &req->rl_registered); - return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr); + return frwr_map(r_xprt, cur, writing, req->rl_slot.rq_xid, *mr); out_getmr_err: trace_xprtrdma_nomrs_err(r_xprt, req); xprt_wait_for_buffer_space(&r_xprt->rx_xprt); rpcrdma_mrs_refresh(r_xprt); - return ERR_PTR(-EAGAIN); + return -EAGAIN; } /* Register and XDR encode the Read list. Supports encoding a list of read @@ -336,10 +298,10 @@ static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, enum rpcrdma_chunktype rtype) { struct xdr_stream *xdr = &req->rl_stream; - struct rpcrdma_mr_seg *seg; + struct rpcrdma_xdr_cursor cur; struct rpcrdma_mr *mr; unsigned int pos; - int nsegs; + int ret; if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped) goto done; @@ -347,24 +309,20 @@ static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, pos = rqst->rq_snd_buf.head[0].iov_len; if (rtype == rpcrdma_areadch) pos = 0; - seg = req->rl_segments; - nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos, - rtype, seg); - if (nsegs < 0) - return nsegs; + rpcrdma_xdr_cursor_init(&cur, &rqst->rq_snd_buf, pos, rtype); do { - seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr); - if (IS_ERR(seg)) - return PTR_ERR(seg); + ret = rpcrdma_mr_prepare(r_xprt, req, &cur, false, &mr); + if (ret) + return ret; if (encode_read_segment(xdr, mr, pos) < 0) return -EMSGSIZE; - trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs); + trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, + rpcrdma_xdr_cursor_done(&cur)); r_xprt->rx_stats.read_chunk_count++; - nsegs -= mr->mr_nents; - } while (nsegs); + } while (!rpcrdma_xdr_cursor_done(&cur)); done: if (xdr_stream_encode_item_absent(xdr) < 0) @@ -394,20 +352,16 @@ static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_ep *ep = r_xprt->rx_ep; - struct rpcrdma_mr_seg *seg; + struct rpcrdma_xdr_cursor cur; struct rpcrdma_mr *mr; - int nsegs, nchunks; + int nchunks, ret; __be32 *segcount; if (wtype != rpcrdma_writech) goto done; - seg = req->rl_segments; - nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, - rqst->rq_rcv_buf.head[0].iov_len, - wtype, seg); - if (nsegs < 0) - return nsegs; + rpcrdma_xdr_cursor_init(&cur, &rqst->rq_rcv_buf, + rqst->rq_rcv_buf.head[0].iov_len, wtype); if (xdr_stream_encode_item_present(xdr) < 0) return -EMSGSIZE; @@ -418,30 +372,30 @@ static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, nchunks = 0; do { - seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); - if (IS_ERR(seg)) - return PTR_ERR(seg); + ret = rpcrdma_mr_prepare(r_xprt, req, &cur, true, &mr); + if (ret) + return ret; if (encode_rdma_segment(xdr, mr) < 0) return -EMSGSIZE; - trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs); + trace_xprtrdma_chunk_write(rqst->rq_task, mr, + rpcrdma_xdr_cursor_done(&cur)); r_xprt->rx_stats.write_chunk_count++; r_xprt->rx_stats.total_rdma_request += mr->mr_length; nchunks++; - nsegs -= mr->mr_nents; - } while (nsegs); + } while (!rpcrdma_xdr_cursor_done(&cur)); if (xdr_pad_size(rqst->rq_rcv_buf.page_len)) { if (encode_rdma_segment(xdr, ep->re_write_pad_mr) < 0) return -EMSGSIZE; trace_xprtrdma_chunk_wp(rqst->rq_task, ep->re_write_pad_mr, - nsegs); + true); r_xprt->rx_stats.write_chunk_count++; - r_xprt->rx_stats.total_rdma_request += mr->mr_length; + r_xprt->rx_stats.total_rdma_request += + ep->re_write_pad_mr->mr_length; nchunks++; - nsegs -= mr->mr_nents; } /* Update count of segments in this Write chunk */ @@ -471,9 +425,9 @@ static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, enum rpcrdma_chunktype wtype) { struct xdr_stream *xdr = &req->rl_stream; - struct rpcrdma_mr_seg *seg; + struct rpcrdma_xdr_cursor cur; struct rpcrdma_mr *mr; - int nsegs, nchunks; + int nchunks, ret; __be32 *segcount; if (wtype != rpcrdma_replych) { @@ -482,10 +436,7 @@ static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, return 0; } - seg = req->rl_segments; - nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); - if (nsegs < 0) - return nsegs; + rpcrdma_xdr_cursor_init(&cur, &rqst->rq_rcv_buf, 0, wtype); if (xdr_stream_encode_item_present(xdr) < 0) return -EMSGSIZE; @@ -496,19 +447,19 @@ static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, nchunks = 0; do { - seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); - if (IS_ERR(seg)) - return PTR_ERR(seg); + ret = rpcrdma_mr_prepare(r_xprt, req, &cur, true, &mr); + if (ret) + return ret; if (encode_rdma_segment(xdr, mr) < 0) return -EMSGSIZE; - trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs); + trace_xprtrdma_chunk_reply(rqst->rq_task, mr, + rpcrdma_xdr_cursor_done(&cur)); r_xprt->rx_stats.reply_chunk_count++; r_xprt->rx_stats.total_rdma_request += mr->mr_length; nchunks++; - nsegs -= mr->mr_nents; - } while (nsegs); + } while (!rpcrdma_xdr_cursor_done(&cur)); /* Update count of segments in the Reply chunk */ *segcount = cpu_to_be32(nchunks); diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 8147d2b41494..37bba72065e8 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -283,19 +283,36 @@ struct rpcrdma_mr { * registered or invalidated. Must handle a Reply chunk: */ enum { - RPCRDMA_MAX_IOV_SEGS = 3, + RPCRDMA_MAX_IOV_SEGS = 3, /* head, page-boundary, tail */ RPCRDMA_MAX_DATA_SEGS = ((1 * 1024 * 1024) / PAGE_SIZE) + 1, RPCRDMA_MAX_SEGS = RPCRDMA_MAX_DATA_SEGS + RPCRDMA_MAX_IOV_SEGS, }; -/* Arguments for DMA mapping and registration */ -struct rpcrdma_mr_seg { - u32 mr_len; /* length of segment */ - struct page *mr_page; /* underlying struct page */ - u64 mr_offset; /* IN: page offset, OUT: iova */ +/** + * struct rpcrdma_xdr_cursor - tracks position within an xdr_buf + * for iterative MR registration + * @xc_buf: the xdr_buf being iterated + * @xc_page_offset: byte offset into the page region consumed so far + * @xc_flags: combination of XC_* bits + * + * Each XC_*_DONE flag indicates that this region has no + * remaining MR registration work. That condition holds both when the region + * has already been registered by a prior frwr_map() call and + * when the region is excluded from this chunk type (pre-set + * at init time by rpcrdma_xdr_cursor_init()). frwr_map() + * treats the two cases identically: skip the region. + */ +struct rpcrdma_xdr_cursor { + const struct xdr_buf *xc_buf; + unsigned int xc_page_offset; + unsigned int xc_flags; }; +#define XC_HEAD_DONE BIT(0) +#define XC_PAGES_DONE BIT(1) +#define XC_TAIL_DONE BIT(2) + /* The Send SGE array is provisioned to send a maximum size * inline request: * - RPC-over-RDMA header @@ -330,7 +347,6 @@ struct rpcrdma_req { struct list_head rl_free_mrs; struct list_head rl_registered; - struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; }; static inline struct rpcrdma_req * @@ -450,8 +466,8 @@ rpcrdma_portstr(const struct rpcrdma_xprt *r_xprt) } /* Setting this to 0 ensures interoperability with early servers. - * Setting this to 1 enhances certain unaligned read/write performance. - * Default is 0, see sysctl entry and rpc_rdma.c rpcrdma_convert_iovs() */ + * Setting this to 1 enhances unaligned read/write performance. + * Default is 0, see sysctl entry and rpc_rdma.c */ extern int xprt_rdma_pad_optimize; /* This setting controls the hunt for a supported memory @@ -535,10 +551,10 @@ void frwr_reset(struct rpcrdma_req *req); int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device); int frwr_mr_init(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr); void frwr_mr_release(struct rpcrdma_mr *mr); -struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_mr_seg *seg, - int nsegs, bool writing, __be32 xid, - struct rpcrdma_mr *mr); +int frwr_map(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_xdr_cursor *cur, + bool writing, __be32 xid, + struct rpcrdma_mr *mr); int frwr_send(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs); void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); -- cgit v1.2.3 From 93b4791adb1017b2b079b4a453e7159e101a7e55 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 6 Mar 2026 16:56:27 -0500 Subject: xprtrdma: Scale receive batch size with credit window The fixed RPCRDMA_MAX_RECV_BATCH of 7 results in frequent small ib_post_recv batches during high-rate workloads. With a 128-slot credit window, receives are reposted every 7th completion, each batch incurring atomic serialization and a doorbell write. Replace the fixed batch constant with a per-endpoint value scaled to 25% of the negotiated credit window. For a typical 128-credit connection this raises the batch from 7 to 32, reducing doorbell frequency by roughly 4x and amortizing the per-batch atomic and MMIO costs over a larger group of receive WRs. Signed-off-by: Chuck Lever Signed-off-by: Trond Myklebust --- net/sunrpc/xprtrdma/frwr_ops.c | 3 ++- net/sunrpc/xprtrdma/verbs.c | 2 +- net/sunrpc/xprtrdma/xprt_rdma.h | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 229057d35fb8..7f79a0a2601e 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -244,9 +244,10 @@ int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device) } ep->re_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; ep->re_attr.cap.max_send_wr += 1; /* for ib_drain_sq */ + ep->re_recv_batch = ep->re_max_requests >> 2; ep->re_attr.cap.max_recv_wr = ep->re_max_requests; ep->re_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; - ep->re_attr.cap.max_recv_wr += RPCRDMA_MAX_RECV_BATCH; + ep->re_attr.cap.max_recv_wr += ep->re_recv_batch; ep->re_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */ ep->re_max_rdma_segs = diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 90fd83f2d846..aecf9c0a153f 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1374,7 +1374,7 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed) if (likely(ep->re_receive_count > needed)) goto out; needed -= ep->re_receive_count; - needed += RPCRDMA_MAX_RECV_BATCH; + needed += ep->re_recv_batch; if (atomic_inc_return(&ep->re_receiving) > 1) goto out_dec; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 37bba72065e8..f53a77472724 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -96,6 +96,7 @@ struct rpcrdma_ep { struct rpcrdma_notification re_rn; int re_receive_count; unsigned int re_max_requests; /* depends on device */ + unsigned int re_recv_batch; unsigned int re_inline_send; /* negotiated */ unsigned int re_inline_recv; /* negotiated */ -- cgit v1.2.3 From 704f3f640f72db4d44ec5ce3db8d4e150c974bc7 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 6 Mar 2026 16:56:28 -0500 Subject: xprtrdma: Post receive buffers after RPC completion rpcrdma_post_recvs() runs in CQ poll context and its cost falls on the latency-critical path between polling a Receive completion and waking the RPC consumer. Every cycle spent refilling the Receive Queue delays delivery of the reply to the NFS layer. Move the rpcrdma_post_recvs() call in rpcrdma_reply_handler() to after the RPC has been decoded and completed. The larger batch size from the preceding patch provides sufficient Receive Queue headroom to absorb the brief delay before buffers are replenished. Signed-off-by: Chuck Lever Signed-off-by: Trond Myklebust --- net/sunrpc/xprtrdma/rpc_rdma.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index a77e7e48aab2..0e0f21974710 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1422,7 +1422,6 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) credits = 1; /* don't deadlock */ else if (credits > r_xprt->rx_ep->re_max_requests) credits = r_xprt->rx_ep->re_max_requests; - rpcrdma_post_recvs(r_xprt, credits + (buf->rb_bc_srv_max_requests << 1)); if (buf->rb_credits != credits) rpcrdma_update_cwnd(r_xprt, credits); @@ -1441,15 +1440,20 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) /* LocalInv completion will complete the RPC */ else kref_put(&req->rl_kref, rpcrdma_reply_done); - return; -out_badversion: - trace_xprtrdma_reply_vers_err(rep); - goto out; +out_post: + rpcrdma_post_recvs(r_xprt, + credits + (buf->rb_bc_srv_max_requests << 1)); + return; out_norqst: spin_unlock(&xprt->queue_lock); trace_xprtrdma_reply_rqst_err(rep); + rpcrdma_rep_put(buf, rep); + goto out_post; + +out_badversion: + trace_xprtrdma_reply_vers_err(rep); goto out; out_shortreply: -- cgit v1.2.3