summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2008-02-02 14:31:28 +1100
committerLinus Torvalds <torvalds@linux-foundation.org>2008-02-02 14:31:28 +1100
commit63e9b66e29357dd12e8b1d3ebf7036e7591f81e3 (patch)
tree5aa6a70a8f4bbf306e2825a1e2fa2660c2c1c187 /net/sunrpc/xprtrdma
parent687fcdf741e4a268c2c7bac8b3734de761bb9719 (diff)
parentea339d46b93c7b16e067a29aad1812f7a389815a (diff)
Merge branch 'for-linus' of git://linux-nfs.org/~bfields/linux
* 'for-linus' of git://linux-nfs.org/~bfields/linux: (100 commits) SUNRPC: RPC program information is stored in unsigned integers SUNRPC: Move exported symbol definitions after function declaration part 2 NLM: tear down RPC clients in nlm_shutdown_hosts SUNRPC: spin svc_rqst initialization to its own function nfsd: more careful input validation in nfsctl write methods lockd: minor log message fix knfsd: don't bother mapping putrootfh enoent to eperm rdma: makefile rdma: ONCRPC RDMA protocol marshalling rdma: SVCRDMA sendto rdma: SVCRDMA recvfrom rdma: SVCRDMA Core Transport Services rdma: SVCRDMA Transport Module rdma: SVCRMDA Header File svc: Add svc_xprt_names service to replace svc_sock_names knfsd: Support adding transports by writing portlist file svc: Add svc API that queries for a transport instance svc: Add /proc/sys/sunrpc/transport files svc: Add transport hdr size for defer/revisit svc: Move the xprt independent code to the svc_xprt.c file ...
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r--net/sunrpc/xprtrdma/Makefile5
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma.c266
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_marshal.c412
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c586
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c520
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c1080
6 files changed, 2869 insertions, 0 deletions
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 264f0feeb513..5a8f268bdd30 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -1,3 +1,8 @@
obj-$(CONFIG_SUNRPC_XPRT_RDMA) += xprtrdma.o
xprtrdma-y := transport.o rpc_rdma.o verbs.o
+
+obj-$(CONFIG_SUNRPC_XPRT_RDMA) += svcrdma.o
+
+svcrdma-y := svc_rdma.o svc_rdma_transport.o \
+ svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
new file mode 100644
index 000000000000..88c0ca20bb1e
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -0,0 +1,266 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+#include <linux/module.h>
+#include <linux/init.h>
+#include <linux/fs.h>
+#include <linux/sysctl.h>
+#include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/sched.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+/* RPC/RDMA parameters */
+unsigned int svcrdma_ord = RPCRDMA_ORD;
+static unsigned int min_ord = 1;
+static unsigned int max_ord = 4096;
+unsigned int svcrdma_max_requests = RPCRDMA_MAX_REQUESTS;
+static unsigned int min_max_requests = 4;
+static unsigned int max_max_requests = 16384;
+unsigned int svcrdma_max_req_size = RPCRDMA_MAX_REQ_SIZE;
+static unsigned int min_max_inline = 4096;
+static unsigned int max_max_inline = 65536;
+
+atomic_t rdma_stat_recv;
+atomic_t rdma_stat_read;
+atomic_t rdma_stat_write;
+atomic_t rdma_stat_sq_starve;
+atomic_t rdma_stat_rq_starve;
+atomic_t rdma_stat_rq_poll;
+atomic_t rdma_stat_rq_prod;
+atomic_t rdma_stat_sq_poll;
+atomic_t rdma_stat_sq_prod;
+
+/*
+ * This function implements reading and resetting an atomic_t stat
+ * variable through read/write to a proc file. Any write to the file
+ * resets the associated statistic to zero. Any read returns it's
+ * current value.
+ */
+static int read_reset_stat(ctl_table *table, int write,
+ struct file *filp, void __user *buffer, size_t *lenp,
+ loff_t *ppos)
+{
+ atomic_t *stat = (atomic_t *)table->data;
+
+ if (!stat)
+ return -EINVAL;
+
+ if (write)
+ atomic_set(stat, 0);
+ else {
+ char str_buf[32];
+ char *data;
+ int len = snprintf(str_buf, 32, "%d\n", atomic_read(stat));
+ if (len >= 32)
+ return -EFAULT;
+ len = strlen(str_buf);
+ if (*ppos > len) {
+ *lenp = 0;
+ return 0;
+ }
+ data = &str_buf[*ppos];
+ len -= *ppos;
+ if (len > *lenp)
+ len = *lenp;
+ if (len && copy_to_user(buffer, str_buf, len))
+ return -EFAULT;
+ *lenp = len;
+ *ppos += len;
+ }
+ return 0;
+}
+
+static struct ctl_table_header *svcrdma_table_header;
+static ctl_table svcrdma_parm_table[] = {
+ {
+ .procname = "max_requests",
+ .data = &svcrdma_max_requests,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &min_max_requests,
+ .extra2 = &max_max_requests
+ },
+ {
+ .procname = "max_req_size",
+ .data = &svcrdma_max_req_size,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &min_max_inline,
+ .extra2 = &max_max_inline
+ },
+ {
+ .procname = "max_outbound_read_requests",
+ .data = &svcrdma_ord,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &min_ord,
+ .extra2 = &max_ord,
+ },
+
+ {
+ .procname = "rdma_stat_read",
+ .data = &rdma_stat_read,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_recv",
+ .data = &rdma_stat_recv,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_write",
+ .data = &rdma_stat_write,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_sq_starve",
+ .data = &rdma_stat_sq_starve,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_rq_starve",
+ .data = &rdma_stat_rq_starve,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_rq_poll",
+ .data = &rdma_stat_rq_poll,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_rq_prod",
+ .data = &rdma_stat_rq_prod,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_sq_poll",
+ .data = &rdma_stat_sq_poll,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_sq_prod",
+ .data = &rdma_stat_sq_prod,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = &read_reset_stat,
+ },
+ {
+ .ctl_name = 0,
+ },
+};
+
+static ctl_table svcrdma_table[] = {
+ {
+ .procname = "svc_rdma",
+ .mode = 0555,
+ .child = svcrdma_parm_table
+ },
+ {
+ .ctl_name = 0,
+ },
+};
+
+static ctl_table svcrdma_root_table[] = {
+ {
+ .ctl_name = CTL_SUNRPC,
+ .procname = "sunrpc",
+ .mode = 0555,
+ .child = svcrdma_table
+ },
+ {
+ .ctl_name = 0,
+ },
+};
+
+void svc_rdma_cleanup(void)
+{
+ dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n");
+ if (svcrdma_table_header) {
+ unregister_sysctl_table(svcrdma_table_header);
+ svcrdma_table_header = NULL;
+ }
+ svc_unreg_xprt_class(&svc_rdma_class);
+}
+
+int svc_rdma_init(void)
+{
+ dprintk("SVCRDMA Module Init, register RPC RDMA transport\n");
+ dprintk("\tsvcrdma_ord : %d\n", svcrdma_ord);
+ dprintk("\tmax_requests : %d\n", svcrdma_max_requests);
+ dprintk("\tsq_depth : %d\n",
+ svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT);
+ dprintk("\tmax_inline : %d\n", svcrdma_max_req_size);
+ if (!svcrdma_table_header)
+ svcrdma_table_header =
+ register_sysctl_table(svcrdma_root_table);
+
+ /* Register RDMA with the SVC transport switch */
+ svc_reg_xprt_class(&svc_rdma_class);
+ return 0;
+}
+MODULE_AUTHOR("Tom Tucker <tom@opengridcomputing.com>");
+MODULE_DESCRIPTION("SVC RDMA Transport");
+MODULE_LICENSE("Dual BSD/GPL");
+module_init(svc_rdma_init);
+module_exit(svc_rdma_cleanup);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
new file mode 100644
index 000000000000..9530ef2d40dc
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -0,0 +1,412 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/xdr.h>
+#include <linux/sunrpc/debug.h>
+#include <asm/unaligned.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+/*
+ * Decodes a read chunk list. The expected format is as follows:
+ * descrim : xdr_one
+ * position : u32 offset into XDR stream
+ * handle : u32 RKEY
+ * . . .
+ * end-of-list: xdr_zero
+ */
+static u32 *decode_read_list(u32 *va, u32 *vaend)
+{
+ struct rpcrdma_read_chunk *ch = (struct rpcrdma_read_chunk *)va;
+
+ while (ch->rc_discrim != xdr_zero) {
+ u64 ch_offset;
+
+ if (((unsigned long)ch + sizeof(struct rpcrdma_read_chunk)) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: vaend=%p, ch=%p\n", vaend, ch);
+ return NULL;
+ }
+
+ ch->rc_discrim = ntohl(ch->rc_discrim);
+ ch->rc_position = ntohl(ch->rc_position);
+ ch->rc_target.rs_handle = ntohl(ch->rc_target.rs_handle);
+ ch->rc_target.rs_length = ntohl(ch->rc_target.rs_length);
+ va = (u32 *)&ch->rc_target.rs_offset;
+ xdr_decode_hyper(va, &ch_offset);
+ put_unaligned(ch_offset, (u64 *)va);
+ ch++;
+ }
+ return (u32 *)&ch->rc_position;
+}
+
+/*
+ * Determine number of chunks and total bytes in chunk list. The chunk
+ * list has already been verified to fit within the RPCRDMA header.
+ */
+void svc_rdma_rcl_chunk_counts(struct rpcrdma_read_chunk *ch,
+ int *ch_count, int *byte_count)
+{
+ /* compute the number of bytes represented by read chunks */
+ *byte_count = 0;
+ *ch_count = 0;
+ for (; ch->rc_discrim != 0; ch++) {
+ *byte_count = *byte_count + ch->rc_target.rs_length;
+ *ch_count = *ch_count + 1;
+ }
+}
+
+/*
+ * Decodes a write chunk list. The expected format is as follows:
+ * descrim : xdr_one
+ * nchunks : <count>
+ * handle : u32 RKEY ---+
+ * length : u32 <len of segment> |
+ * offset : remove va + <count>
+ * . . . |
+ * ---+
+ */
+static u32 *decode_write_list(u32 *va, u32 *vaend)
+{
+ int ch_no;
+ struct rpcrdma_write_array *ary =
+ (struct rpcrdma_write_array *)va;
+
+ /* Check for not write-array */
+ if (ary->wc_discrim == xdr_zero)
+ return (u32 *)&ary->wc_nchunks;
+
+ if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
+ return NULL;
+ }
+ ary->wc_discrim = ntohl(ary->wc_discrim);
+ ary->wc_nchunks = ntohl(ary->wc_nchunks);
+ if (((unsigned long)&ary->wc_array[0] +
+ (sizeof(struct rpcrdma_write_chunk) * ary->wc_nchunks)) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n",
+ ary, ary->wc_nchunks, vaend);
+ return NULL;
+ }
+ for (ch_no = 0; ch_no < ary->wc_nchunks; ch_no++) {
+ u64 ch_offset;
+
+ ary->wc_array[ch_no].wc_target.rs_handle =
+ ntohl(ary->wc_array[ch_no].wc_target.rs_handle);
+ ary->wc_array[ch_no].wc_target.rs_length =
+ ntohl(ary->wc_array[ch_no].wc_target.rs_length);
+ va = (u32 *)&ary->wc_array[ch_no].wc_target.rs_offset;
+ xdr_decode_hyper(va, &ch_offset);
+ put_unaligned(ch_offset, (u64 *)va);
+ }
+
+ /*
+ * rs_length is the 2nd 4B field in wc_target and taking its
+ * address skips the list terminator
+ */
+ return (u32 *)&ary->wc_array[ch_no].wc_target.rs_length;
+}
+
+static u32 *decode_reply_array(u32 *va, u32 *vaend)
+{
+ int ch_no;
+ struct rpcrdma_write_array *ary =
+ (struct rpcrdma_write_array *)va;
+
+ /* Check for no reply-array */
+ if (ary->wc_discrim == xdr_zero)
+ return (u32 *)&ary->wc_nchunks;
+
+ if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
+ return NULL;
+ }
+ ary->wc_discrim = ntohl(ary->wc_discrim);
+ ary->wc_nchunks = ntohl(ary->wc_nchunks);
+ if (((unsigned long)&ary->wc_array[0] +
+ (sizeof(struct rpcrdma_write_chunk) * ary->wc_nchunks)) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n",
+ ary, ary->wc_nchunks, vaend);
+ return NULL;
+ }
+ for (ch_no = 0; ch_no < ary->wc_nchunks; ch_no++) {
+ u64 ch_offset;
+
+ ary->wc_array[ch_no].wc_target.rs_handle =
+ ntohl(ary->wc_array[ch_no].wc_target.rs_handle);
+ ary->wc_array[ch_no].wc_target.rs_length =
+ ntohl(ary->wc_array[ch_no].wc_target.rs_length);
+ va = (u32 *)&ary->wc_array[ch_no].wc_target.rs_offset;
+ xdr_decode_hyper(va, &ch_offset);
+ put_unaligned(ch_offset, (u64 *)va);
+ }
+
+ return (u32 *)&ary->wc_array[ch_no];
+}
+
+int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
+ struct svc_rqst *rqstp)
+{
+ struct rpcrdma_msg *rmsgp = NULL;
+ u32 *va;
+ u32 *vaend;
+ u32 hdr_len;
+
+ rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+
+ /* Verify that there's enough bytes for header + something */
+ if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
+ dprintk("svcrdma: header too short = %d\n",
+ rqstp->rq_arg.len);
+ return -EINVAL;
+ }
+
+ /* Decode the header */
+ rmsgp->rm_xid = ntohl(rmsgp->rm_xid);
+ rmsgp->rm_vers = ntohl(rmsgp->rm_vers);
+ rmsgp->rm_credit = ntohl(rmsgp->rm_credit);
+ rmsgp->rm_type = ntohl(rmsgp->rm_type);
+
+ if (rmsgp->rm_vers != RPCRDMA_VERSION)
+ return -ENOSYS;
+
+ /* Pull in the extra for the padded case and bump our pointer */
+ if (rmsgp->rm_type == RDMA_MSGP) {
+ int hdrlen;
+ rmsgp->rm_body.rm_padded.rm_align =
+ ntohl(rmsgp->rm_body.rm_padded.rm_align);
+ rmsgp->rm_body.rm_padded.rm_thresh =
+ ntohl(rmsgp->rm_body.rm_padded.rm_thresh);
+
+ va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
+ rqstp->rq_arg.head[0].iov_len -= hdrlen;
+ if (hdrlen > rqstp->rq_arg.len)
+ return -EINVAL;
+ return hdrlen;
+ }
+
+ /* The chunk list may contain either a read chunk list or a write
+ * chunk list and a reply chunk list.
+ */
+ va = &rmsgp->rm_body.rm_chunks[0];
+ vaend = (u32 *)((unsigned long)rmsgp + rqstp->rq_arg.len);
+ va = decode_read_list(va, vaend);
+ if (!va)
+ return -EINVAL;
+ va = decode_write_list(va, vaend);
+ if (!va)
+ return -EINVAL;
+ va = decode_reply_array(va, vaend);
+ if (!va)
+ return -EINVAL;
+
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdr_len = (unsigned long)va - (unsigned long)rmsgp;
+ rqstp->rq_arg.head[0].iov_len -= hdr_len;
+
+ *rdma_req = rmsgp;
+ return hdr_len;
+}
+
+int svc_rdma_xdr_decode_deferred_req(struct svc_rqst *rqstp)
+{
+ struct rpcrdma_msg *rmsgp = NULL;
+ struct rpcrdma_read_chunk *ch;
+ struct rpcrdma_write_array *ary;
+ u32 *va;
+ u32 hdrlen;
+
+ dprintk("svcrdma: processing deferred RDMA header on rqstp=%p\n",
+ rqstp);
+ rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+
+ /* Pull in the extra for the padded case and bump our pointer */
+ if (rmsgp->rm_type == RDMA_MSGP) {
+ va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
+ rqstp->rq_arg.head[0].iov_len -= hdrlen;
+ return hdrlen;
+ }
+
+ /*
+ * Skip all chunks to find RPC msg. These were previously processed
+ */
+ va = &rmsgp->rm_body.rm_chunks[0];
+
+ /* Skip read-list */
+ for (ch = (struct rpcrdma_read_chunk *)va;
+ ch->rc_discrim != xdr_zero; ch++);
+ va = (u32 *)&ch->rc_position;
+
+ /* Skip write-list */
+ ary = (struct rpcrdma_write_array *)va;
+ if (ary->wc_discrim == xdr_zero)
+ va = (u32 *)&ary->wc_nchunks;
+ else
+ /*
+ * rs_length is the 2nd 4B field in wc_target and taking its
+ * address skips the list terminator
+ */
+ va = (u32 *)&ary->wc_array[ary->wc_nchunks].wc_target.rs_length;
+
+ /* Skip reply-array */
+ ary = (struct rpcrdma_write_array *)va;
+ if (ary->wc_discrim == xdr_zero)
+ va = (u32 *)&ary->wc_nchunks;
+ else
+ va = (u32 *)&ary->wc_array[ary->wc_nchunks];
+
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdrlen = (unsigned long)va - (unsigned long)rmsgp;
+ rqstp->rq_arg.head[0].iov_len -= hdrlen;
+
+ return hdrlen;
+}
+
+int svc_rdma_xdr_encode_error(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rmsgp,
+ enum rpcrdma_errcode err, u32 *va)
+{
+ u32 *startp = va;
+
+ *va++ = htonl(rmsgp->rm_xid);
+ *va++ = htonl(rmsgp->rm_vers);
+ *va++ = htonl(xprt->sc_max_requests);
+ *va++ = htonl(RDMA_ERROR);
+ *va++ = htonl(err);
+ if (err == ERR_VERS) {
+ *va++ = htonl(RPCRDMA_VERSION);
+ *va++ = htonl(RPCRDMA_VERSION);
+ }
+
+ return (int)((unsigned long)va - (unsigned long)startp);
+}
+
+int svc_rdma_xdr_get_reply_hdr_len(struct rpcrdma_msg *rmsgp)
+{
+ struct rpcrdma_write_array *wr_ary;
+
+ /* There is no read-list in a reply */
+
+ /* skip write list */
+ wr_ary = (struct rpcrdma_write_array *)
+ &rmsgp->rm_body.rm_chunks[1];
+ if (wr_ary->wc_discrim)
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)].
+ wc_target.rs_length;
+ else
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_nchunks;
+
+ /* skip reply array */
+ if (wr_ary->wc_discrim)
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)];
+ else
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_nchunks;
+
+ return (unsigned long) wr_ary - (unsigned long) rmsgp;
+}
+
+void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks)
+{
+ struct rpcrdma_write_array *ary;
+
+ /* no read-list */
+ rmsgp->rm_body.rm_chunks[0] = xdr_zero;
+
+ /* write-array discrim */
+ ary = (struct rpcrdma_write_array *)
+ &rmsgp->rm_body.rm_chunks[1];
+ ary->wc_discrim = xdr_one;
+ ary->wc_nchunks = htonl(chunks);
+
+ /* write-list terminator */
+ ary->wc_array[chunks].wc_target.rs_handle = xdr_zero;
+
+ /* reply-array discriminator */
+ ary->wc_array[chunks].wc_target.rs_length = xdr_zero;
+}
+
+void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *ary,
+ int chunks)
+{
+ ary->wc_discrim = xdr_one;
+ ary->wc_nchunks = htonl(chunks);
+}
+
+void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary,
+ int chunk_no,
+ u32 rs_handle, u64 rs_offset,
+ u32 write_len)
+{
+ struct rpcrdma_segment *seg = &ary->wc_array[chunk_no].wc_target;
+ seg->rs_handle = htonl(rs_handle);
+ seg->rs_length = htonl(write_len);
+ xdr_encode_hyper((u32 *) &seg->rs_offset, rs_offset);
+}
+
+void svc_rdma_xdr_encode_reply_header(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rdma_argp,
+ struct rpcrdma_msg *rdma_resp,
+ enum rpcrdma_proc rdma_type)
+{
+ rdma_resp->rm_xid = htonl(rdma_argp->rm_xid);
+ rdma_resp->rm_vers = htonl(rdma_argp->rm_vers);
+ rdma_resp->rm_credit = htonl(xprt->sc_max_requests);
+ rdma_resp->rm_type = htonl(rdma_type);
+
+ /* Encode <nul> chunks lists */
+ rdma_resp->rm_body.rm_chunks[0] = xdr_zero;
+ rdma_resp->rm_body.rm_chunks[1] = xdr_zero;
+ rdma_resp->rm_body.rm_chunks[2] = xdr_zero;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
new file mode 100644
index 000000000000..ab54a736486e
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -0,0 +1,586 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/spinlock.h>
+#include <asm/unaligned.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+/*
+ * Replace the pages in the rq_argpages array with the pages from the SGE in
+ * the RDMA_RECV completion. The SGL should contain full pages up until the
+ * last one.
+ */
+static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *ctxt,
+ u32 byte_count)
+{
+ struct page *page;
+ u32 bc;
+ int sge_no;
+
+ /* Swap the page in the SGE with the page in argpages */
+ page = ctxt->pages[0];
+ put_page(rqstp->rq_pages[0]);
+ rqstp->rq_pages[0] = page;
+
+ /* Set up the XDR head */
+ rqstp->rq_arg.head[0].iov_base = page_address(page);
+ rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length);
+ rqstp->rq_arg.len = byte_count;
+ rqstp->rq_arg.buflen = byte_count;
+
+ /* Compute bytes past head in the SGL */
+ bc = byte_count - rqstp->rq_arg.head[0].iov_len;
+
+ /* If data remains, store it in the pagelist */
+ rqstp->rq_arg.page_len = bc;
+ rqstp->rq_arg.page_base = 0;
+ rqstp->rq_arg.pages = &rqstp->rq_pages[1];
+ sge_no = 1;
+ while (bc && sge_no < ctxt->count) {
+ page = ctxt->pages[sge_no];
+ put_page(rqstp->rq_pages[sge_no]);
+ rqstp->rq_pages[sge_no] = page;
+ bc -= min(bc, ctxt->sge[sge_no].length);
+ rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
+ sge_no++;
+ }
+ rqstp->rq_respages = &rqstp->rq_pages[sge_no];
+
+ /* We should never run out of SGE because the limit is defined to
+ * support the max allowed RPC data length
+ */
+ BUG_ON(bc && (sge_no == ctxt->count));
+ BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len)
+ != byte_count);
+ BUG_ON(rqstp->rq_arg.len != byte_count);
+
+ /* If not all pages were used from the SGL, free the remaining ones */
+ bc = sge_no;
+ while (sge_no < ctxt->count) {
+ page = ctxt->pages[sge_no++];
+ put_page(page);
+ }
+ ctxt->count = bc;
+
+ /* Set up tail */
+ rqstp->rq_arg.tail[0].iov_base = NULL;
+ rqstp->rq_arg.tail[0].iov_len = 0;
+}
+
+struct chunk_sge {
+ int start; /* sge no for this chunk */
+ int count; /* sge count for this chunk */
+};
+
+/* Encode a read-chunk-list as an array of IB SGE
+ *
+ * Assumptions:
+ * - chunk[0]->position points to pages[0] at an offset of 0
+ * - pages[] is not physically or virtually contigous and consists of
+ * PAGE_SIZE elements.
+ *
+ * Output:
+ * - sge array pointing into pages[] array.
+ * - chunk_sge array specifying sge index and count for each
+ * chunk in the read list
+ *
+ */
+static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head,
+ struct rpcrdma_msg *rmsgp,
+ struct ib_sge *sge,
+ struct chunk_sge *ch_sge_ary,
+ int ch_count,
+ int byte_count)
+{
+ int sge_no;
+ int sge_bytes;
+ int page_off;
+ int page_no;
+ int ch_bytes;
+ int ch_no;
+ struct rpcrdma_read_chunk *ch;
+
+ sge_no = 0;
+ page_no = 0;
+ page_off = 0;
+ ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
+ ch_no = 0;
+ ch_bytes = ch->rc_target.rs_length;
+ head->arg.head[0] = rqstp->rq_arg.head[0];
+ head->arg.tail[0] = rqstp->rq_arg.tail[0];
+ head->arg.pages = &head->pages[head->count];
+ head->sge[0].length = head->count; /* save count of hdr pages */
+ head->arg.page_base = 0;
+ head->arg.page_len = ch_bytes;
+ head->arg.len = rqstp->rq_arg.len + ch_bytes;
+ head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
+ head->count++;
+ ch_sge_ary[0].start = 0;
+ while (byte_count) {
+ sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
+ sge[sge_no].addr =
+ ib_dma_map_page(xprt->sc_cm_id->device,
+ rqstp->rq_arg.pages[page_no],
+ page_off, sge_bytes,
+ DMA_FROM_DEVICE);
+ sge[sge_no].length = sge_bytes;
+ sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+ /*
+ * Don't bump head->count here because the same page
+ * may be used by multiple SGE.
+ */
+ head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
+ rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
+
+ byte_count -= sge_bytes;
+ ch_bytes -= sge_bytes;
+ sge_no++;
+ /*
+ * If all bytes for this chunk have been mapped to an
+ * SGE, move to the next SGE
+ */
+ if (ch_bytes == 0) {
+ ch_sge_ary[ch_no].count =
+ sge_no - ch_sge_ary[ch_no].start;
+ ch_no++;
+ ch++;
+ ch_sge_ary[ch_no].start = sge_no;
+ ch_bytes = ch->rc_target.rs_length;
+ /* If bytes remaining account for next chunk */
+ if (byte_count) {
+ head->arg.page_len += ch_bytes;
+ head->arg.len += ch_bytes;
+ head->arg.buflen += ch_bytes;
+ }
+ }
+ /*
+ * If this SGE consumed all of the page, move to the
+ * next page
+ */
+ if ((sge_bytes + page_off) == PAGE_SIZE) {
+ page_no++;
+ page_off = 0;
+ /*
+ * If there are still bytes left to map, bump
+ * the page count
+ */
+ if (byte_count)
+ head->count++;
+ } else
+ page_off += sge_bytes;
+ }
+ BUG_ON(byte_count != 0);
+ return sge_no;
+}
+
+static void rdma_set_ctxt_sge(struct svc_rdma_op_ctxt *ctxt,
+ struct ib_sge *sge,
+ u64 *sgl_offset,
+ int count)
+{
+ int i;
+
+ ctxt->count = count;
+ for (i = 0; i < count; i++) {
+ ctxt->sge[i].addr = sge[i].addr;
+ ctxt->sge[i].length = sge[i].length;
+ *sgl_offset = *sgl_offset + sge[i].length;
+ }
+}
+
+static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
+{
+#ifdef RDMA_TRANSPORT_IWARP
+ if ((RDMA_TRANSPORT_IWARP ==
+ rdma_node_get_transport(xprt->sc_cm_id->
+ device->node_type))
+ && sge_count > 1)
+ return 1;
+ else
+#endif
+ return min_t(int, sge_count, xprt->sc_max_sge);
+}
+
+/*
+ * Use RDMA_READ to read data from the advertised client buffer into the
+ * XDR stream starting at rq_arg.head[0].iov_base.
+ * Each chunk in the array
+ * contains the following fields:
+ * discrim - '1', This isn't used for data placement
+ * position - The xdr stream offset (the same for every chunk)
+ * handle - RMR for client memory region
+ * length - data transfer length
+ * offset - 64 bit tagged offset in remote memory region
+ *
+ * On our side, we need to read into a pagelist. The first page immediately
+ * follows the RPC header.
+ *
+ * This function returns 1 to indicate success. The data is not yet in
+ * the pagelist and therefore the RPC request must be deferred. The
+ * I/O completion will enqueue the transport again and
+ * svc_rdma_recvfrom will complete the request.
+ *
+ * NOTE: The ctxt must not be touched after the last WR has been posted
+ * because the I/O completion processing may occur on another
+ * processor and free / modify the context. Ne touche pas!
+ */
+static int rdma_read_xdr(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rmsgp,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *hdr_ctxt)
+{
+ struct ib_send_wr read_wr;
+ int err = 0;
+ int ch_no;
+ struct ib_sge *sge;
+ int ch_count;
+ int byte_count;
+ int sge_count;
+ u64 sgl_offset;
+ struct rpcrdma_read_chunk *ch;
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+ struct svc_rdma_op_ctxt *head;
+ struct svc_rdma_op_ctxt *tmp_sge_ctxt;
+ struct svc_rdma_op_ctxt *tmp_ch_ctxt;
+ struct chunk_sge *ch_sge_ary;
+
+ /* If no read list is present, return 0 */
+ ch = svc_rdma_get_read_chunk(rmsgp);
+ if (!ch)
+ return 0;
+
+ /* Allocate temporary contexts to keep SGE */
+ BUG_ON(sizeof(struct ib_sge) < sizeof(struct chunk_sge));
+ tmp_sge_ctxt = svc_rdma_get_context(xprt);
+ sge = tmp_sge_ctxt->sge;
+ tmp_ch_ctxt = svc_rdma_get_context(xprt);
+ ch_sge_ary = (struct chunk_sge *)tmp_ch_ctxt->sge;
+
+ svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
+ sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp,
+ sge, ch_sge_ary,
+ ch_count, byte_count);
+ head = svc_rdma_get_context(xprt);
+ sgl_offset = 0;
+ ch_no = 0;
+
+ for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
+ ch->rc_discrim != 0; ch++, ch_no++) {
+next_sge:
+ if (!ctxt)
+ ctxt = head;
+ else {
+ ctxt->next = svc_rdma_get_context(xprt);
+ ctxt = ctxt->next;
+ }
+ ctxt->next = NULL;
+ ctxt->direction = DMA_FROM_DEVICE;
+ clear_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
+ clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+ if ((ch+1)->rc_discrim == 0) {
+ /*
+ * Checked in sq_cq_reap to see if we need to
+ * be enqueued
+ */
+ set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+ ctxt->next = hdr_ctxt;
+ hdr_ctxt->next = head;
+ }
+
+ /* Prepare READ WR */
+ memset(&read_wr, 0, sizeof read_wr);
+ ctxt->wr_op = IB_WR_RDMA_READ;
+ read_wr.wr_id = (unsigned long)ctxt;
+ read_wr.opcode = IB_WR_RDMA_READ;
+ read_wr.send_flags = IB_SEND_SIGNALED;
+ read_wr.wr.rdma.rkey = ch->rc_target.rs_handle;
+ read_wr.wr.rdma.remote_addr =
+ get_unaligned(&(ch->rc_target.rs_offset)) +
+ sgl_offset;
+ read_wr.sg_list = &sge[ch_sge_ary[ch_no].start];
+ read_wr.num_sge =
+ rdma_read_max_sge(xprt, ch_sge_ary[ch_no].count);
+ rdma_set_ctxt_sge(ctxt, &sge[ch_sge_ary[ch_no].start],
+ &sgl_offset,
+ read_wr.num_sge);
+
+ /* Post the read */
+ err = svc_rdma_send(xprt, &read_wr);
+ if (err) {
+ printk(KERN_ERR "svcrdma: Error posting send = %d\n",
+ err);
+ /*
+ * Break the circular list so free knows when
+ * to stop if the error happened to occur on
+ * the last read
+ */
+ ctxt->next = NULL;
+ goto out;
+ }
+ atomic_inc(&rdma_stat_read);
+
+ if (read_wr.num_sge < ch_sge_ary[ch_no].count) {
+ ch_sge_ary[ch_no].count -= read_wr.num_sge;
+ ch_sge_ary[ch_no].start += read_wr.num_sge;
+ goto next_sge;
+ }
+ sgl_offset = 0;
+ err = 0;
+ }
+
+ out:
+ svc_rdma_put_context(tmp_sge_ctxt, 0);
+ svc_rdma_put_context(tmp_ch_ctxt, 0);
+
+ /* Detach arg pages. svc_recv will replenish them */
+ for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++)
+ rqstp->rq_pages[ch_no] = NULL;
+
+ /*
+ * Detach res pages. svc_release must see a resused count of
+ * zero or it will attempt to put them.
+ */
+ while (rqstp->rq_resused)
+ rqstp->rq_respages[--rqstp->rq_resused] = NULL;
+
+ if (err) {
+ printk(KERN_ERR "svcrdma : RDMA_READ error = %d\n", err);
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ /* Free the linked list of read contexts */
+ while (head != NULL) {
+ ctxt = head->next;
+ svc_rdma_put_context(head, 1);
+ head = ctxt;
+ }
+ return 0;
+ }
+
+ return 1;
+}
+
+static int rdma_read_complete(struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *data)
+{
+ struct svc_rdma_op_ctxt *head = data->next;
+ int page_no;
+ int ret;
+
+ BUG_ON(!head);
+
+ /* Copy RPC pages */
+ for (page_no = 0; page_no < head->count; page_no++) {
+ put_page(rqstp->rq_pages[page_no]);
+ rqstp->rq_pages[page_no] = head->pages[page_no];
+ }
+ /* Point rq_arg.pages past header */
+ rqstp->rq_arg.pages = &rqstp->rq_pages[head->sge[0].length];
+ rqstp->rq_arg.page_len = head->arg.page_len;
+ rqstp->rq_arg.page_base = head->arg.page_base;
+
+ /* rq_respages starts after the last arg page */
+ rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
+ rqstp->rq_resused = 0;
+
+ /* Rebuild rq_arg head and tail. */
+ rqstp->rq_arg.head[0] = head->arg.head[0];
+ rqstp->rq_arg.tail[0] = head->arg.tail[0];
+ rqstp->rq_arg.len = head->arg.len;
+ rqstp->rq_arg.buflen = head->arg.buflen;
+
+ /* XXX: What should this be? */
+ rqstp->rq_prot = IPPROTO_MAX;
+
+ /*
+ * Free the contexts we used to build the RDMA_READ. We have
+ * to be careful here because the context list uses the same
+ * next pointer used to chain the contexts associated with the
+ * RDMA_READ
+ */
+ data->next = NULL; /* terminate circular list */
+ do {
+ data = head->next;
+ svc_rdma_put_context(head, 0);
+ head = data;
+ } while (head != NULL);
+
+ ret = rqstp->rq_arg.head[0].iov_len
+ + rqstp->rq_arg.page_len
+ + rqstp->rq_arg.tail[0].iov_len;
+ dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, "
+ "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
+ ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
+ rqstp->rq_arg.head[0].iov_len);
+
+ /* Indicate that we've consumed an RQ credit */
+ rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
+ svc_xprt_received(rqstp->rq_xprt);
+ return ret;
+}
+
+/*
+ * Set up the rqstp thread context to point to the RQ buffer. If
+ * necessary, pull additional data from the client with an RDMA_READ
+ * request.
+ */
+int svc_rdma_recvfrom(struct svc_rqst *rqstp)
+{
+ struct svc_xprt *xprt = rqstp->rq_xprt;
+ struct svcxprt_rdma *rdma_xprt =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+ struct rpcrdma_msg *rmsgp;
+ int ret = 0;
+ int len;
+
+ dprintk("svcrdma: rqstp=%p\n", rqstp);
+
+ /*
+ * The rq_xprt_ctxt indicates if we've consumed an RQ credit
+ * or not. It is used in the rdma xpo_release_rqst function to
+ * determine whether or not to return an RQ WQE to the RQ.
+ */
+ rqstp->rq_xprt_ctxt = NULL;
+
+ spin_lock_bh(&rdma_xprt->sc_read_complete_lock);
+ if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
+ ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
+ struct svc_rdma_op_ctxt,
+ dto_q);
+ list_del_init(&ctxt->dto_q);
+ }
+ spin_unlock_bh(&rdma_xprt->sc_read_complete_lock);
+ if (ctxt)
+ return rdma_read_complete(rqstp, ctxt);
+
+ spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
+ if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
+ ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
+ struct svc_rdma_op_ctxt,
+ dto_q);
+ list_del_init(&ctxt->dto_q);
+ } else {
+ atomic_inc(&rdma_stat_rq_starve);
+ clear_bit(XPT_DATA, &xprt->xpt_flags);
+ ctxt = NULL;
+ }
+ spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
+ if (!ctxt) {
+ /* This is the EAGAIN path. The svc_recv routine will
+ * return -EAGAIN, the nfsd thread will go to call into
+ * svc_recv again and we shouldn't be on the active
+ * transport list
+ */
+ if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
+ goto close_out;
+
+ BUG_ON(ret);
+ goto out;
+ }
+ dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
+ ctxt, rdma_xprt, rqstp, ctxt->wc_status);
+ BUG_ON(ctxt->wc_status != IB_WC_SUCCESS);
+ atomic_inc(&rdma_stat_recv);
+
+ /* Build up the XDR from the receive buffers. */
+ rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
+
+ /* Decode the RDMA header. */
+ len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
+ rqstp->rq_xprt_hlen = len;
+
+ /* If the request is invalid, reply with an error */
+ if (len < 0) {
+ if (len == -ENOSYS)
+ (void)svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
+ goto close_out;
+ }
+
+ /* Read read-list data. If we would need to wait, defer
+ * it. Not that in this case, we don't return the RQ credit
+ * until after the read completes.
+ */
+ if (rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt)) {
+ svc_xprt_received(xprt);
+ return 0;
+ }
+
+ /* Indicate we've consumed an RQ credit */
+ rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
+
+ ret = rqstp->rq_arg.head[0].iov_len
+ + rqstp->rq_arg.page_len
+ + rqstp->rq_arg.tail[0].iov_len;
+ svc_rdma_put_context(ctxt, 0);
+ out:
+ dprintk("svcrdma: ret = %d, rq_arg.len =%d, "
+ "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
+ ret, rqstp->rq_arg.len,
+ rqstp->rq_arg.head[0].iov_base,
+ rqstp->rq_arg.head[0].iov_len);
+ rqstp->rq_prot = IPPROTO_MAX;
+ svc_xprt_copy_addrs(rqstp, xprt);
+ svc_xprt_received(xprt);
+ return ret;
+
+ close_out:
+ if (ctxt) {
+ svc_rdma_put_context(ctxt, 1);
+ /* Indicate we've consumed an RQ credit */
+ rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
+ }
+ dprintk("svcrdma: transport %p is closing\n", xprt);
+ /*
+ * Set the close bit and enqueue it. svc_recv will see the
+ * close bit and call svc_xprt_delete
+ */
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ svc_xprt_received(xprt);
+ return 0;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
new file mode 100644
index 000000000000..3e321949e1dc
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -0,0 +1,520 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/spinlock.h>
+#include <asm/unaligned.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+/* Encode an XDR as an array of IB SGE
+ *
+ * Assumptions:
+ * - head[0] is physically contiguous.
+ * - tail[0] is physically contiguous.
+ * - pages[] is not physically or virtually contigous and consists of
+ * PAGE_SIZE elements.
+ *
+ * Output:
+ * SGE[0] reserved for RCPRDMA header
+ * SGE[1] data from xdr->head[]
+ * SGE[2..sge_count-2] data from xdr->pages[]
+ * SGE[sge_count-1] data from xdr->tail.
+ *
+ */
+static struct ib_sge *xdr_to_sge(struct svcxprt_rdma *xprt,
+ struct xdr_buf *xdr,
+ struct ib_sge *sge,
+ int *sge_count)
+{
+ /* Max we need is the length of the XDR / pagesize + one for
+ * head + one for tail + one for RPCRDMA header
+ */
+ int sge_max = (xdr->len+PAGE_SIZE-1) / PAGE_SIZE + 3;
+ int sge_no;
+ u32 byte_count = xdr->len;
+ u32 sge_bytes;
+ u32 page_bytes;
+ int page_off;
+ int page_no;
+
+ /* Skip the first sge, this is for the RPCRDMA header */
+ sge_no = 1;
+
+ /* Head SGE */
+ sge[sge_no].addr = ib_dma_map_single(xprt->sc_cm_id->device,
+ xdr->head[0].iov_base,
+ xdr->head[0].iov_len,
+ DMA_TO_DEVICE);
+ sge_bytes = min_t(u32, byte_count, xdr->head[0].iov_len);
+ byte_count -= sge_bytes;
+ sge[sge_no].length = sge_bytes;
+ sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+ sge_no++;
+
+ /* pages SGE */
+ page_no = 0;
+ page_bytes = xdr->page_len;
+ page_off = xdr->page_base;
+ while (byte_count && page_bytes) {
+ sge_bytes = min_t(u32, byte_count, (PAGE_SIZE-page_off));
+ sge[sge_no].addr =
+ ib_dma_map_page(xprt->sc_cm_id->device,
+ xdr->pages[page_no], page_off,
+ sge_bytes, DMA_TO_DEVICE);
+ sge_bytes = min(sge_bytes, page_bytes);
+ byte_count -= sge_bytes;
+ page_bytes -= sge_bytes;
+ sge[sge_no].length = sge_bytes;
+ sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+
+ sge_no++;
+ page_no++;
+ page_off = 0; /* reset for next time through loop */
+ }
+
+ /* Tail SGE */
+ if (byte_count && xdr->tail[0].iov_len) {
+ sge[sge_no].addr =
+ ib_dma_map_single(xprt->sc_cm_id->device,
+ xdr->tail[0].iov_base,
+ xdr->tail[0].iov_len,
+ DMA_TO_DEVICE);
+ sge_bytes = min_t(u32, byte_count, xdr->tail[0].iov_len);
+ byte_count -= sge_bytes;
+ sge[sge_no].length = sge_bytes;
+ sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+ sge_no++;
+ }
+
+ BUG_ON(sge_no > sge_max);
+ BUG_ON(byte_count != 0);
+
+ *sge_count = sge_no;
+ return sge;
+}
+
+
+/* Assumptions:
+ * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
+ */
+static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
+ u32 rmr, u64 to,
+ u32 xdr_off, int write_len,
+ struct ib_sge *xdr_sge, int sge_count)
+{
+ struct svc_rdma_op_ctxt *tmp_sge_ctxt;
+ struct ib_send_wr write_wr;
+ struct ib_sge *sge;
+ int xdr_sge_no;
+ int sge_no;
+ int sge_bytes;
+ int sge_off;
+ int bc;
+ struct svc_rdma_op_ctxt *ctxt;
+ int ret = 0;
+
+ BUG_ON(sge_count >= 32);
+ dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
+ "write_len=%d, xdr_sge=%p, sge_count=%d\n",
+ rmr, to, xdr_off, write_len, xdr_sge, sge_count);
+
+ ctxt = svc_rdma_get_context(xprt);
+ ctxt->count = 0;
+ tmp_sge_ctxt = svc_rdma_get_context(xprt);
+ sge = tmp_sge_ctxt->sge;
+
+ /* Find the SGE associated with xdr_off */
+ for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < sge_count;
+ xdr_sge_no++) {
+ if (xdr_sge[xdr_sge_no].length > bc)
+ break;
+ bc -= xdr_sge[xdr_sge_no].length;
+ }
+
+ sge_off = bc;
+ bc = write_len;
+ sge_no = 0;
+
+ /* Copy the remaining SGE */
+ while (bc != 0 && xdr_sge_no < sge_count) {
+ sge[sge_no].addr = xdr_sge[xdr_sge_no].addr + sge_off;
+ sge[sge_no].lkey = xdr_sge[xdr_sge_no].lkey;
+ sge_bytes = min((size_t)bc,
+ (size_t)(xdr_sge[xdr_sge_no].length-sge_off));
+ sge[sge_no].length = sge_bytes;
+
+ sge_off = 0;
+ sge_no++;
+ xdr_sge_no++;
+ bc -= sge_bytes;
+ }
+
+ BUG_ON(bc != 0);
+ BUG_ON(xdr_sge_no > sge_count);
+
+ /* Prepare WRITE WR */
+ memset(&write_wr, 0, sizeof write_wr);
+ ctxt->wr_op = IB_WR_RDMA_WRITE;
+ write_wr.wr_id = (unsigned long)ctxt;
+ write_wr.sg_list = &sge[0];
+ write_wr.num_sge = sge_no;
+ write_wr.opcode = IB_WR_RDMA_WRITE;
+ write_wr.send_flags = IB_SEND_SIGNALED;
+ write_wr.wr.rdma.rkey = rmr;
+ write_wr.wr.rdma.remote_addr = to;
+
+ /* Post It */
+ atomic_inc(&rdma_stat_write);
+ if (svc_rdma_send(xprt, &write_wr)) {
+ svc_rdma_put_context(ctxt, 1);
+ /* Fatal error, close transport */
+ ret = -EIO;
+ }
+ svc_rdma_put_context(tmp_sge_ctxt, 0);
+ return ret;
+}
+
+static int send_write_chunks(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rdma_argp,
+ struct rpcrdma_msg *rdma_resp,
+ struct svc_rqst *rqstp,
+ struct ib_sge *sge,
+ int sge_count)
+{
+ u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+ int write_len;
+ int max_write;
+ u32 xdr_off;
+ int chunk_off;
+ int chunk_no;
+ struct rpcrdma_write_array *arg_ary;
+ struct rpcrdma_write_array *res_ary;
+ int ret;
+
+ arg_ary = svc_rdma_get_write_array(rdma_argp);
+ if (!arg_ary)
+ return 0;
+ res_ary = (struct rpcrdma_write_array *)
+ &rdma_resp->rm_body.rm_chunks[1];
+
+ max_write = xprt->sc_max_sge * PAGE_SIZE;
+
+ /* Write chunks start at the pagelist */
+ for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
+ xfer_len && chunk_no < arg_ary->wc_nchunks;
+ chunk_no++) {
+ struct rpcrdma_segment *arg_ch;
+ u64 rs_offset;
+
+ arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
+ write_len = min(xfer_len, arg_ch->rs_length);
+
+ /* Prepare the response chunk given the length actually
+ * written */
+ rs_offset = get_unaligned(&(arg_ch->rs_offset));
+ svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
+ arg_ch->rs_handle,
+ rs_offset,
+ write_len);
+ chunk_off = 0;
+ while (write_len) {
+ int this_write;
+ this_write = min(write_len, max_write);
+ ret = send_write(xprt, rqstp,
+ arg_ch->rs_handle,
+ rs_offset + chunk_off,
+ xdr_off,
+ this_write,
+ sge,
+ sge_count);
+ if (ret) {
+ dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
+ ret);
+ return -EIO;
+ }
+ chunk_off += this_write;
+ xdr_off += this_write;
+ xfer_len -= this_write;
+ write_len -= this_write;
+ }
+ }
+ /* Update the req with the number of chunks actually used */
+ svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
+
+ return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+}
+
+static int send_reply_chunks(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rdma_argp,
+ struct rpcrdma_msg *rdma_resp,
+ struct svc_rqst *rqstp,
+ struct ib_sge *sge,
+ int sge_count)
+{
+ u32 xfer_len = rqstp->rq_res.len;
+ int write_len;
+ int max_write;
+ u32 xdr_off;
+ int chunk_no;
+ int chunk_off;
+ struct rpcrdma_segment *ch;
+ struct rpcrdma_write_array *arg_ary;
+ struct rpcrdma_write_array *res_ary;
+ int ret;
+
+ arg_ary = svc_rdma_get_reply_array(rdma_argp);
+ if (!arg_ary)
+ return 0;
+ /* XXX: need to fix when reply lists occur with read-list and or
+ * write-list */
+ res_ary = (struct rpcrdma_write_array *)
+ &rdma_resp->rm_body.rm_chunks[2];
+
+ max_write = xprt->sc_max_sge * PAGE_SIZE;
+
+ /* xdr offset starts at RPC message */
+ for (xdr_off = 0, chunk_no = 0;
+ xfer_len && chunk_no < arg_ary->wc_nchunks;
+ chunk_no++) {
+ u64 rs_offset;
+ ch = &arg_ary->wc_array[chunk_no].wc_target;
+ write_len = min(xfer_len, ch->rs_length);
+
+
+ /* Prepare the reply chunk given the length actually
+ * written */
+ rs_offset = get_unaligned(&(ch->rs_offset));
+ svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
+ ch->rs_handle, rs_offset,
+ write_len);
+ chunk_off = 0;
+ while (write_len) {
+ int this_write;
+
+ this_write = min(write_len, max_write);
+ ret = send_write(xprt, rqstp,
+ ch->rs_handle,
+ rs_offset + chunk_off,
+ xdr_off,
+ this_write,
+ sge,
+ sge_count);
+ if (ret) {
+ dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
+ ret);
+ return -EIO;
+ }
+ chunk_off += this_write;
+ xdr_off += this_write;
+ xfer_len -= this_write;
+ write_len -= this_write;
+ }
+ }
+ /* Update the req with the number of chunks actually used */
+ svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
+
+ return rqstp->rq_res.len;
+}
+
+/* This function prepares the portion of the RPCRDMA message to be
+ * sent in the RDMA_SEND. This function is called after data sent via
+ * RDMA has already been transmitted. There are three cases:
+ * - The RPCRDMA header, RPC header, and payload are all sent in a
+ * single RDMA_SEND. This is the "inline" case.
+ * - The RPCRDMA header and some portion of the RPC header and data
+ * are sent via this RDMA_SEND and another portion of the data is
+ * sent via RDMA.
+ * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
+ * header and data are all transmitted via RDMA.
+ * In all three cases, this function prepares the RPCRDMA header in
+ * sge[0], the 'type' parameter indicates the type to place in the
+ * RPCRDMA header, and the 'byte_count' field indicates how much of
+ * the XDR to include in this RDMA_SEND.
+ */
+static int send_reply(struct svcxprt_rdma *rdma,
+ struct svc_rqst *rqstp,
+ struct page *page,
+ struct rpcrdma_msg *rdma_resp,
+ struct svc_rdma_op_ctxt *ctxt,
+ int sge_count,
+ int byte_count)
+{
+ struct ib_send_wr send_wr;
+ int sge_no;
+ int sge_bytes;
+ int page_no;
+ int ret;
+
+ /* Prepare the context */
+ ctxt->pages[0] = page;
+ ctxt->count = 1;
+
+ /* Prepare the SGE for the RPCRDMA Header */
+ ctxt->sge[0].addr =
+ ib_dma_map_page(rdma->sc_cm_id->device,
+ page, 0, PAGE_SIZE, DMA_TO_DEVICE);
+ ctxt->direction = DMA_TO_DEVICE;
+ ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
+ ctxt->sge[0].lkey = rdma->sc_phys_mr->lkey;
+
+ /* Determine how many of our SGE are to be transmitted */
+ for (sge_no = 1; byte_count && sge_no < sge_count; sge_no++) {
+ sge_bytes = min((size_t)ctxt->sge[sge_no].length,
+ (size_t)byte_count);
+ byte_count -= sge_bytes;
+ }
+ BUG_ON(byte_count != 0);
+
+ /* Save all respages in the ctxt and remove them from the
+ * respages array. They are our pages until the I/O
+ * completes.
+ */
+ for (page_no = 0; page_no < rqstp->rq_resused; page_no++) {
+ ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
+ ctxt->count++;
+ rqstp->rq_respages[page_no] = NULL;
+ }
+
+ BUG_ON(sge_no > rdma->sc_max_sge);
+ memset(&send_wr, 0, sizeof send_wr);
+ ctxt->wr_op = IB_WR_SEND;
+ send_wr.wr_id = (unsigned long)ctxt;
+ send_wr.sg_list = ctxt->sge;
+ send_wr.num_sge = sge_no;
+ send_wr.opcode = IB_WR_SEND;
+ send_wr.send_flags = IB_SEND_SIGNALED;
+
+ ret = svc_rdma_send(rdma, &send_wr);
+ if (ret)
+ svc_rdma_put_context(ctxt, 1);
+
+ return ret;
+}
+
+void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
+{
+}
+
+/*
+ * Return the start of an xdr buffer.
+ */
+static void *xdr_start(struct xdr_buf *xdr)
+{
+ return xdr->head[0].iov_base -
+ (xdr->len -
+ xdr->page_len -
+ xdr->tail[0].iov_len -
+ xdr->head[0].iov_len);
+}
+
+int svc_rdma_sendto(struct svc_rqst *rqstp)
+{
+ struct svc_xprt *xprt = rqstp->rq_xprt;
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ struct rpcrdma_msg *rdma_argp;
+ struct rpcrdma_msg *rdma_resp;
+ struct rpcrdma_write_array *reply_ary;
+ enum rpcrdma_proc reply_type;
+ int ret;
+ int inline_bytes;
+ struct ib_sge *sge;
+ int sge_count = 0;
+ struct page *res_page;
+ struct svc_rdma_op_ctxt *ctxt;
+
+ dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
+
+ /* Get the RDMA request header. */
+ rdma_argp = xdr_start(&rqstp->rq_arg);
+
+ /* Build an SGE for the XDR */
+ ctxt = svc_rdma_get_context(rdma);
+ ctxt->direction = DMA_TO_DEVICE;
+ sge = xdr_to_sge(rdma, &rqstp->rq_res, ctxt->sge, &sge_count);
+
+ inline_bytes = rqstp->rq_res.len;
+
+ /* Create the RDMA response header */
+ res_page = svc_rdma_get_page();
+ rdma_resp = page_address(res_page);
+ reply_ary = svc_rdma_get_reply_array(rdma_argp);
+ if (reply_ary)
+ reply_type = RDMA_NOMSG;
+ else
+ reply_type = RDMA_MSG;
+ svc_rdma_xdr_encode_reply_header(rdma, rdma_argp,
+ rdma_resp, reply_type);
+
+ /* Send any write-chunk data and build resp write-list */
+ ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
+ rqstp, sge, sge_count);
+ if (ret < 0) {
+ printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
+ ret);
+ goto error;
+ }
+ inline_bytes -= ret;
+
+ /* Send any reply-list data and update resp reply-list */
+ ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
+ rqstp, sge, sge_count);
+ if (ret < 0) {
+ printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
+ ret);
+ goto error;
+ }
+ inline_bytes -= ret;
+
+ ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, sge_count,
+ inline_bytes);
+ dprintk("svcrdma: send_reply returns %d\n", ret);
+ return ret;
+ error:
+ svc_rdma_put_context(ctxt, 0);
+ put_page(res_page);
+ return ret;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
new file mode 100644
index 000000000000..f09444c451bc
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -0,0 +1,1080 @@
+/*
+ * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/svc_xprt.h>
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/spinlock.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
+ struct sockaddr *sa, int salen,
+ int flags);
+static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
+static void svc_rdma_release_rqst(struct svc_rqst *);
+static void rdma_destroy_xprt(struct svcxprt_rdma *xprt);
+static void dto_tasklet_func(unsigned long data);
+static void svc_rdma_detach(struct svc_xprt *xprt);
+static void svc_rdma_free(struct svc_xprt *xprt);
+static int svc_rdma_has_wspace(struct svc_xprt *xprt);
+static void rq_cq_reap(struct svcxprt_rdma *xprt);
+static void sq_cq_reap(struct svcxprt_rdma *xprt);
+
+DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
+static DEFINE_SPINLOCK(dto_lock);
+static LIST_HEAD(dto_xprt_q);
+
+static struct svc_xprt_ops svc_rdma_ops = {
+ .xpo_create = svc_rdma_create,
+ .xpo_recvfrom = svc_rdma_recvfrom,
+ .xpo_sendto = svc_rdma_sendto,
+ .xpo_release_rqst = svc_rdma_release_rqst,
+ .xpo_detach = svc_rdma_detach,
+ .xpo_free = svc_rdma_free,
+ .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
+ .xpo_has_wspace = svc_rdma_has_wspace,
+ .xpo_accept = svc_rdma_accept,
+};
+
+struct svc_xprt_class svc_rdma_class = {
+ .xcl_name = "rdma",
+ .xcl_owner = THIS_MODULE,
+ .xcl_ops = &svc_rdma_ops,
+ .xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP,
+};
+
+static int rdma_bump_context_cache(struct svcxprt_rdma *xprt)
+{
+ int target;
+ int at_least_one = 0;
+ struct svc_rdma_op_ctxt *ctxt;
+
+ target = min(xprt->sc_ctxt_cnt + xprt->sc_ctxt_bump,
+ xprt->sc_ctxt_max);
+
+ spin_lock_bh(&xprt->sc_ctxt_lock);
+ while (xprt->sc_ctxt_cnt < target) {
+ xprt->sc_ctxt_cnt++;
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+
+ ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+
+ spin_lock_bh(&xprt->sc_ctxt_lock);
+ if (ctxt) {
+ at_least_one = 1;
+ ctxt->next = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt;
+ } else {
+ /* kmalloc failed...give up for now */
+ xprt->sc_ctxt_cnt--;
+ break;
+ }
+ }
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+ dprintk("svcrdma: sc_ctxt_max=%d, sc_ctxt_cnt=%d\n",
+ xprt->sc_ctxt_max, xprt->sc_ctxt_cnt);
+ return at_least_one;
+}
+
+struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
+{
+ struct svc_rdma_op_ctxt *ctxt;
+
+ while (1) {
+ spin_lock_bh(&xprt->sc_ctxt_lock);
+ if (unlikely(xprt->sc_ctxt_head == NULL)) {
+ /* Try to bump my cache. */
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+
+ if (rdma_bump_context_cache(xprt))
+ continue;
+
+ printk(KERN_INFO "svcrdma: sleeping waiting for "
+ "context memory on xprt=%p\n",
+ xprt);
+ schedule_timeout_uninterruptible(msecs_to_jiffies(500));
+ continue;
+ }
+ ctxt = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt->next;
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+ ctxt->xprt = xprt;
+ INIT_LIST_HEAD(&ctxt->dto_q);
+ ctxt->count = 0;
+ break;
+ }
+ return ctxt;
+}
+
+void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
+{
+ struct svcxprt_rdma *xprt;
+ int i;
+
+ BUG_ON(!ctxt);
+ xprt = ctxt->xprt;
+ if (free_pages)
+ for (i = 0; i < ctxt->count; i++)
+ put_page(ctxt->pages[i]);
+
+ for (i = 0; i < ctxt->count; i++)
+ dma_unmap_single(xprt->sc_cm_id->device->dma_device,
+ ctxt->sge[i].addr,
+ ctxt->sge[i].length,
+ ctxt->direction);
+ spin_lock_bh(&xprt->sc_ctxt_lock);
+ ctxt->next = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt;
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+}
+
+/* ib_cq event handler */
+static void cq_event_handler(struct ib_event *event, void *context)
+{
+ struct svc_xprt *xprt = context;
+ dprintk("svcrdma: received CQ event id=%d, context=%p\n",
+ event->event, context);
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+}
+
+/* QP event handler */
+static void qp_event_handler(struct ib_event *event, void *context)
+{
+ struct svc_xprt *xprt = context;
+
+ switch (event->event) {
+ /* These are considered benign events */
+ case IB_EVENT_PATH_MIG:
+ case IB_EVENT_COMM_EST:
+ case IB_EVENT_SQ_DRAINED:
+ case IB_EVENT_QP_LAST_WQE_REACHED:
+ dprintk("svcrdma: QP event %d received for QP=%p\n",
+ event->event, event->element.qp);
+ break;
+ /* These are considered fatal events */
+ case IB_EVENT_PATH_MIG_ERR:
+ case IB_EVENT_QP_FATAL:
+ case IB_EVENT_QP_REQ_ERR:
+ case IB_EVENT_QP_ACCESS_ERR:
+ case IB_EVENT_DEVICE_FATAL:
+ default:
+ dprintk("svcrdma: QP ERROR event %d received for QP=%p, "
+ "closing transport\n",
+ event->event, event->element.qp);
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ break;
+ }
+}
+
+/*
+ * Data Transfer Operation Tasklet
+ *
+ * Walks a list of transports with I/O pending, removing entries as
+ * they are added to the server's I/O pending list. Two bits indicate
+ * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
+ * spinlock that serializes access to the transport list with the RQ
+ * and SQ interrupt handlers.
+ */
+static void dto_tasklet_func(unsigned long data)
+{
+ struct svcxprt_rdma *xprt;
+ unsigned long flags;
+
+ spin_lock_irqsave(&dto_lock, flags);
+ while (!list_empty(&dto_xprt_q)) {
+ xprt = list_entry(dto_xprt_q.next,
+ struct svcxprt_rdma, sc_dto_q);
+ list_del_init(&xprt->sc_dto_q);
+ spin_unlock_irqrestore(&dto_lock, flags);
+
+ if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) {
+ ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+ rq_cq_reap(xprt);
+ set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+ /*
+ * If data arrived before established event,
+ * don't enqueue. This defers RPC I/O until the
+ * RDMA connection is complete.
+ */
+ if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
+ svc_xprt_enqueue(&xprt->sc_xprt);
+ }
+
+ if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
+ ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+ sq_cq_reap(xprt);
+ }
+
+ spin_lock_irqsave(&dto_lock, flags);
+ }
+ spin_unlock_irqrestore(&dto_lock, flags);
+}
+
+/*
+ * Receive Queue Completion Handler
+ *
+ * Since an RQ completion handler is called on interrupt context, we
+ * need to defer the handling of the I/O to a tasklet
+ */
+static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
+{
+ struct svcxprt_rdma *xprt = cq_context;
+ unsigned long flags;
+
+ /*
+ * Set the bit regardless of whether or not it's on the list
+ * because it may be on the list already due to an SQ
+ * completion.
+ */
+ set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
+
+ /*
+ * If this transport is not already on the DTO transport queue,
+ * add it
+ */
+ spin_lock_irqsave(&dto_lock, flags);
+ if (list_empty(&xprt->sc_dto_q))
+ list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
+ spin_unlock_irqrestore(&dto_lock, flags);
+
+ /* Tasklet does all the work to avoid irqsave locks. */
+ tasklet_schedule(&dto_tasklet);
+}
+
+/*
+ * rq_cq_reap - Process the RQ CQ.
+ *
+ * Take all completing WC off the CQE and enqueue the associated DTO
+ * context on the dto_q for the transport.
+ */
+static void rq_cq_reap(struct svcxprt_rdma *xprt)
+{
+ int ret;
+ struct ib_wc wc;
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+
+ atomic_inc(&rdma_stat_rq_poll);
+
+ spin_lock_bh(&xprt->sc_rq_dto_lock);
+ while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
+ ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
+ ctxt->wc_status = wc.status;
+ ctxt->byte_len = wc.byte_len;
+ if (wc.status != IB_WC_SUCCESS) {
+ /* Close the transport */
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ svc_rdma_put_context(ctxt, 1);
+ continue;
+ }
+ list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+ }
+ spin_unlock_bh(&xprt->sc_rq_dto_lock);
+
+ if (ctxt)
+ atomic_inc(&rdma_stat_rq_prod);
+}
+
+/*
+ * Send Queue Completion Handler - potentially called on interrupt context.
+ */
+static void sq_cq_reap(struct svcxprt_rdma *xprt)
+{
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+ struct ib_wc wc;
+ struct ib_cq *cq = xprt->sc_sq_cq;
+ int ret;
+
+ atomic_inc(&rdma_stat_sq_poll);
+ while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
+ ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
+ xprt = ctxt->xprt;
+
+ if (wc.status != IB_WC_SUCCESS)
+ /* Close the transport */
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+
+ /* Decrement used SQ WR count */
+ atomic_dec(&xprt->sc_sq_count);
+ wake_up(&xprt->sc_send_wait);
+
+ switch (ctxt->wr_op) {
+ case IB_WR_SEND:
+ case IB_WR_RDMA_WRITE:
+ svc_rdma_put_context(ctxt, 1);
+ break;
+
+ case IB_WR_RDMA_READ:
+ if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
+ set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+ set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
+ spin_lock_bh(&xprt->sc_read_complete_lock);
+ list_add_tail(&ctxt->dto_q,
+ &xprt->sc_read_complete_q);
+ spin_unlock_bh(&xprt->sc_read_complete_lock);
+ svc_xprt_enqueue(&xprt->sc_xprt);
+ }
+ break;
+
+ default:
+ printk(KERN_ERR "svcrdma: unexpected completion type, "
+ "opcode=%d, status=%d\n",
+ wc.opcode, wc.status);
+ break;
+ }
+ }
+
+ if (ctxt)
+ atomic_inc(&rdma_stat_sq_prod);
+}
+
+static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
+{
+ struct svcxprt_rdma *xprt = cq_context;
+ unsigned long flags;
+
+ /*
+ * Set the bit regardless of whether or not it's on the list
+ * because it may be on the list already due to an RQ
+ * completion.
+ */
+ set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
+
+ /*
+ * If this transport is not already on the DTO transport queue,
+ * add it
+ */
+ spin_lock_irqsave(&dto_lock, flags);
+ if (list_empty(&xprt->sc_dto_q))
+ list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
+ spin_unlock_irqrestore(&dto_lock, flags);
+
+ /* Tasklet does all the work to avoid irqsave locks. */
+ tasklet_schedule(&dto_tasklet);
+}
+
+static void create_context_cache(struct svcxprt_rdma *xprt,
+ int ctxt_count, int ctxt_bump, int ctxt_max)
+{
+ struct svc_rdma_op_ctxt *ctxt;
+ int i;
+
+ xprt->sc_ctxt_max = ctxt_max;
+ xprt->sc_ctxt_bump = ctxt_bump;
+ xprt->sc_ctxt_cnt = 0;
+ xprt->sc_ctxt_head = NULL;
+ for (i = 0; i < ctxt_count; i++) {
+ ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+ if (ctxt) {
+ ctxt->next = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt;
+ xprt->sc_ctxt_cnt++;
+ }
+ }
+}
+
+static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt)
+{
+ struct svc_rdma_op_ctxt *next;
+ if (!ctxt)
+ return;
+
+ do {
+ next = ctxt->next;
+ kfree(ctxt);
+ ctxt = next;
+ } while (next);
+}
+
+static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
+ int listener)
+{
+ struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
+
+ if (!cma_xprt)
+ return NULL;
+ svc_xprt_init(&svc_rdma_class, &cma_xprt->sc_xprt, serv);
+ INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_dto_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
+ init_waitqueue_head(&cma_xprt->sc_send_wait);
+
+ spin_lock_init(&cma_xprt->sc_lock);
+ spin_lock_init(&cma_xprt->sc_read_complete_lock);
+ spin_lock_init(&cma_xprt->sc_ctxt_lock);
+ spin_lock_init(&cma_xprt->sc_rq_dto_lock);
+
+ cma_xprt->sc_ord = svcrdma_ord;
+
+ cma_xprt->sc_max_req_size = svcrdma_max_req_size;
+ cma_xprt->sc_max_requests = svcrdma_max_requests;
+ cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;
+ atomic_set(&cma_xprt->sc_sq_count, 0);
+
+ if (!listener) {
+ int reqs = cma_xprt->sc_max_requests;
+ create_context_cache(cma_xprt,
+ reqs << 1, /* starting size */
+ reqs, /* bump amount */
+ reqs +
+ cma_xprt->sc_sq_depth +
+ RPCRDMA_MAX_THREADS + 1); /* max */
+ if (!cma_xprt->sc_ctxt_head) {
+ kfree(cma_xprt);
+ return NULL;
+ }
+ clear_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
+ } else
+ set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
+
+ return cma_xprt;
+}
+
+struct page *svc_rdma_get_page(void)
+{
+ struct page *page;
+
+ while ((page = alloc_page(GFP_KERNEL)) == NULL) {
+ /* If we can't get memory, wait a bit and try again */
+ printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 "
+ "jiffies.\n");
+ schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
+ }
+ return page;
+}
+
+int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
+{
+ struct ib_recv_wr recv_wr, *bad_recv_wr;
+ struct svc_rdma_op_ctxt *ctxt;
+ struct page *page;
+ unsigned long pa;
+ int sge_no;
+ int buflen;
+ int ret;
+
+ ctxt = svc_rdma_get_context(xprt);
+ buflen = 0;
+ ctxt->direction = DMA_FROM_DEVICE;
+ for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
+ BUG_ON(sge_no >= xprt->sc_max_sge);
+ page = svc_rdma_get_page();
+ ctxt->pages[sge_no] = page;
+ pa = ib_dma_map_page(xprt->sc_cm_id->device,
+ page, 0, PAGE_SIZE,
+ DMA_FROM_DEVICE);
+ ctxt->sge[sge_no].addr = pa;
+ ctxt->sge[sge_no].length = PAGE_SIZE;
+ ctxt->sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+ buflen += PAGE_SIZE;
+ }
+ ctxt->count = sge_no;
+ recv_wr.next = NULL;
+ recv_wr.sg_list = &ctxt->sge[0];
+ recv_wr.num_sge = ctxt->count;
+ recv_wr.wr_id = (u64)(unsigned long)ctxt;
+
+ ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
+ return ret;
+}
+
+/*
+ * This function handles the CONNECT_REQUEST event on a listening
+ * endpoint. It is passed the cma_id for the _new_ connection. The context in
+ * this cma_id is inherited from the listening cma_id and is the svc_xprt
+ * structure for the listening endpoint.
+ *
+ * This function creates a new xprt for the new connection and enqueues it on
+ * the accept queue for the listent xprt. When the listen thread is kicked, it
+ * will call the recvfrom method on the listen xprt which will accept the new
+ * connection.
+ */
+static void handle_connect_req(struct rdma_cm_id *new_cma_id)
+{
+ struct svcxprt_rdma *listen_xprt = new_cma_id->context;
+ struct svcxprt_rdma *newxprt;
+
+ /* Create a new transport */
+ newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
+ if (!newxprt) {
+ dprintk("svcrdma: failed to create new transport\n");
+ return;
+ }
+ newxprt->sc_cm_id = new_cma_id;
+ new_cma_id->context = newxprt;
+ dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
+ newxprt, newxprt->sc_cm_id, listen_xprt);
+
+ /*
+ * Enqueue the new transport on the accept queue of the listening
+ * transport
+ */
+ spin_lock_bh(&listen_xprt->sc_lock);
+ list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
+ spin_unlock_bh(&listen_xprt->sc_lock);
+
+ /*
+ * Can't use svc_xprt_received here because we are not on a
+ * rqstp thread
+ */
+ set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
+ svc_xprt_enqueue(&listen_xprt->sc_xprt);
+}
+
+/*
+ * Handles events generated on the listening endpoint. These events will be
+ * either be incoming connect requests or adapter removal events.
+ */
+static int rdma_listen_handler(struct rdma_cm_id *cma_id,
+ struct rdma_cm_event *event)
+{
+ struct svcxprt_rdma *xprt = cma_id->context;
+ int ret = 0;
+
+ switch (event->event) {
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
+ "event=%d\n", cma_id, cma_id->context, event->event);
+ handle_connect_req(cma_id);
+ break;
+
+ case RDMA_CM_EVENT_ESTABLISHED:
+ /* Accept complete */
+ dprintk("svcrdma: Connection completed on LISTEN xprt=%p, "
+ "cm_id=%p\n", xprt, cma_id);
+ break;
+
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",
+ xprt, cma_id);
+ if (xprt)
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ break;
+
+ default:
+ dprintk("svcrdma: Unexpected event on listening endpoint %p, "
+ "event=%d\n", cma_id, event->event);
+ break;
+ }
+
+ return ret;
+}
+
+static int rdma_cma_handler(struct rdma_cm_id *cma_id,
+ struct rdma_cm_event *event)
+{
+ struct svc_xprt *xprt = cma_id->context;
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ switch (event->event) {
+ case RDMA_CM_EVENT_ESTABLISHED:
+ /* Accept complete */
+ dprintk("svcrdma: Connection completed on DTO xprt=%p, "
+ "cm_id=%p\n", xprt, cma_id);
+ clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);
+ svc_xprt_enqueue(xprt);
+ break;
+ case RDMA_CM_EVENT_DISCONNECTED:
+ dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
+ xprt, cma_id);
+ if (xprt) {
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ svc_xprt_enqueue(xprt);
+ }
+ break;
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, "
+ "event=%d\n", cma_id, xprt, event->event);
+ if (xprt) {
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ svc_xprt_enqueue(xprt);
+ }
+ break;
+ default:
+ dprintk("svcrdma: Unexpected event on DTO endpoint %p, "
+ "event=%d\n", cma_id, event->event);
+ break;
+ }
+ return 0;
+}
+
+/*
+ * Create a listening RDMA service endpoint.
+ */
+static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
+ struct sockaddr *sa, int salen,
+ int flags)
+{
+ struct rdma_cm_id *listen_id;
+ struct svcxprt_rdma *cma_xprt;
+ struct svc_xprt *xprt;
+ int ret;
+
+ dprintk("svcrdma: Creating RDMA socket\n");
+
+ cma_xprt = rdma_create_xprt(serv, 1);
+ if (!cma_xprt)
+ return ERR_PTR(ENOMEM);
+ xprt = &cma_xprt->sc_xprt;
+
+ listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
+ if (IS_ERR(listen_id)) {
+ rdma_destroy_xprt(cma_xprt);
+ dprintk("svcrdma: rdma_create_id failed = %ld\n",
+ PTR_ERR(listen_id));
+ return (void *)listen_id;
+ }
+ ret = rdma_bind_addr(listen_id, sa);
+ if (ret) {
+ rdma_destroy_xprt(cma_xprt);
+ rdma_destroy_id(listen_id);
+ dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
+ return ERR_PTR(ret);
+ }
+ cma_xprt->sc_cm_id = listen_id;
+
+ ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
+ if (ret) {
+ rdma_destroy_id(listen_id);
+ rdma_destroy_xprt(cma_xprt);
+ dprintk("svcrdma: rdma_listen failed = %d\n", ret);
+ }
+
+ /*
+ * We need to use the address from the cm_id in case the
+ * caller specified 0 for the port number.
+ */
+ sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr;
+ svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
+
+ return &cma_xprt->sc_xprt;
+}
+
+/*
+ * This is the xpo_recvfrom function for listening endpoints. Its
+ * purpose is to accept incoming connections. The CMA callback handler
+ * has already created a new transport and attached it to the new CMA
+ * ID.
+ *
+ * There is a queue of pending connections hung on the listening
+ * transport. This queue contains the new svc_xprt structure. This
+ * function takes svc_xprt structures off the accept_q and completes
+ * the connection.
+ */
+static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *listen_rdma;
+ struct svcxprt_rdma *newxprt = NULL;
+ struct rdma_conn_param conn_param;
+ struct ib_qp_init_attr qp_attr;
+ struct ib_device_attr devattr;
+ struct sockaddr *sa;
+ int ret;
+ int i;
+
+ listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ clear_bit(XPT_CONN, &xprt->xpt_flags);
+ /* Get the next entry off the accept list */
+ spin_lock_bh(&listen_rdma->sc_lock);
+ if (!list_empty(&listen_rdma->sc_accept_q)) {
+ newxprt = list_entry(listen_rdma->sc_accept_q.next,
+ struct svcxprt_rdma, sc_accept_q);
+ list_del_init(&newxprt->sc_accept_q);
+ }
+ if (!list_empty(&listen_rdma->sc_accept_q))
+ set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
+ spin_unlock_bh(&listen_rdma->sc_lock);
+ if (!newxprt)
+ return NULL;
+
+ dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
+ newxprt, newxprt->sc_cm_id);
+
+ ret = ib_query_device(newxprt->sc_cm_id->device, &devattr);
+ if (ret) {
+ dprintk("svcrdma: could not query device attributes on "
+ "device %p, rc=%d\n", newxprt->sc_cm_id->device, ret);
+ goto errout;
+ }
+
+ /* Qualify the transport resource defaults with the
+ * capabilities of this particular device */
+ newxprt->sc_max_sge = min((size_t)devattr.max_sge,
+ (size_t)RPCSVC_MAXPAGES);
+ newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,
+ (size_t)svcrdma_max_requests);
+ newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
+
+ newxprt->sc_ord = min((size_t)devattr.max_qp_rd_atom,
+ (size_t)svcrdma_ord);
+
+ newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device);
+ if (IS_ERR(newxprt->sc_pd)) {
+ dprintk("svcrdma: error creating PD for connect request\n");
+ goto errout;
+ }
+ newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+ sq_comp_handler,
+ cq_event_handler,
+ newxprt,
+ newxprt->sc_sq_depth,
+ 0);
+ if (IS_ERR(newxprt->sc_sq_cq)) {
+ dprintk("svcrdma: error creating SQ CQ for connect request\n");
+ goto errout;
+ }
+ newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+ rq_comp_handler,
+ cq_event_handler,
+ newxprt,
+ newxprt->sc_max_requests,
+ 0);
+ if (IS_ERR(newxprt->sc_rq_cq)) {
+ dprintk("svcrdma: error creating RQ CQ for connect request\n");
+ goto errout;
+ }
+
+ memset(&qp_attr, 0, sizeof qp_attr);
+ qp_attr.event_handler = qp_event_handler;
+ qp_attr.qp_context = &newxprt->sc_xprt;
+ qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
+ qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;
+ qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
+ qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
+ qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
+ qp_attr.qp_type = IB_QPT_RC;
+ qp_attr.send_cq = newxprt->sc_sq_cq;
+ qp_attr.recv_cq = newxprt->sc_rq_cq;
+ dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n"
+ " cm_id->device=%p, sc_pd->device=%p\n"
+ " cap.max_send_wr = %d\n"
+ " cap.max_recv_wr = %d\n"
+ " cap.max_send_sge = %d\n"
+ " cap.max_recv_sge = %d\n",
+ newxprt->sc_cm_id, newxprt->sc_pd,
+ newxprt->sc_cm_id->device, newxprt->sc_pd->device,
+ qp_attr.cap.max_send_wr,
+ qp_attr.cap.max_recv_wr,
+ qp_attr.cap.max_send_sge,
+ qp_attr.cap.max_recv_sge);
+
+ ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
+ if (ret) {
+ /*
+ * XXX: This is a hack. We need a xx_request_qp interface
+ * that will adjust the qp_attr's with a best-effort
+ * number
+ */
+ qp_attr.cap.max_send_sge -= 2;
+ qp_attr.cap.max_recv_sge -= 2;
+ ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd,
+ &qp_attr);
+ if (ret) {
+ dprintk("svcrdma: failed to create QP, ret=%d\n", ret);
+ goto errout;
+ }
+ newxprt->sc_max_sge = qp_attr.cap.max_send_sge;
+ newxprt->sc_max_sge = qp_attr.cap.max_recv_sge;
+ newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
+ newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
+ }
+ newxprt->sc_qp = newxprt->sc_cm_id->qp;
+
+ /* Register all of physical memory */
+ newxprt->sc_phys_mr = ib_get_dma_mr(newxprt->sc_pd,
+ IB_ACCESS_LOCAL_WRITE |
+ IB_ACCESS_REMOTE_WRITE);
+ if (IS_ERR(newxprt->sc_phys_mr)) {
+ dprintk("svcrdma: Failed to create DMA MR ret=%d\n", ret);
+ goto errout;
+ }
+
+ /* Post receive buffers */
+ for (i = 0; i < newxprt->sc_max_requests; i++) {
+ ret = svc_rdma_post_recv(newxprt);
+ if (ret) {
+ dprintk("svcrdma: failure posting receive buffers\n");
+ goto errout;
+ }
+ }
+
+ /* Swap out the handler */
+ newxprt->sc_cm_id->event_handler = rdma_cma_handler;
+
+ /* Accept Connection */
+ set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
+ memset(&conn_param, 0, sizeof conn_param);
+ conn_param.responder_resources = 0;
+ conn_param.initiator_depth = newxprt->sc_ord;
+ ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
+ if (ret) {
+ dprintk("svcrdma: failed to accept new connection, ret=%d\n",
+ ret);
+ goto errout;
+ }
+
+ dprintk("svcrdma: new connection %p accepted with the following "
+ "attributes:\n"
+ " local_ip : %d.%d.%d.%d\n"
+ " local_port : %d\n"
+ " remote_ip : %d.%d.%d.%d\n"
+ " remote_port : %d\n"
+ " max_sge : %d\n"
+ " sq_depth : %d\n"
+ " max_requests : %d\n"
+ " ord : %d\n",
+ newxprt,
+ NIPQUAD(((struct sockaddr_in *)&newxprt->sc_cm_id->
+ route.addr.src_addr)->sin_addr.s_addr),
+ ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
+ route.addr.src_addr)->sin_port),
+ NIPQUAD(((struct sockaddr_in *)&newxprt->sc_cm_id->
+ route.addr.dst_addr)->sin_addr.s_addr),
+ ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
+ route.addr.dst_addr)->sin_port),
+ newxprt->sc_max_sge,
+ newxprt->sc_sq_depth,
+ newxprt->sc_max_requests,
+ newxprt->sc_ord);
+
+ /* Set the local and remote addresses in the transport */
+ sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
+ svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+ sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
+ svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+
+ ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+ ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+ return &newxprt->sc_xprt;
+
+ errout:
+ dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
+ rdma_destroy_id(newxprt->sc_cm_id);
+ rdma_destroy_xprt(newxprt);
+ return NULL;
+}
+
+/*
+ * Post an RQ WQE to the RQ when the rqst is being released. This
+ * effectively returns an RQ credit to the client. The rq_xprt_ctxt
+ * will be null if the request is deferred due to an RDMA_READ or the
+ * transport had no data ready (EAGAIN). Note that an RPC deferred in
+ * svc_process will still return the credit, this is because the data
+ * is copied and no longer consume a WQE/WC.
+ */
+static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
+{
+ int err;
+ struct svcxprt_rdma *rdma =
+ container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
+ if (rqstp->rq_xprt_ctxt) {
+ BUG_ON(rqstp->rq_xprt_ctxt != rdma);
+ err = svc_rdma_post_recv(rdma);
+ if (err)
+ dprintk("svcrdma: failed to post an RQ WQE error=%d\n",
+ err);
+ }
+ rqstp->rq_xprt_ctxt = NULL;
+}
+
+/* Disable data ready events for this connection */
+static void svc_rdma_detach(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ unsigned long flags;
+
+ dprintk("svc: svc_rdma_detach(%p)\n", xprt);
+ /*
+ * Shutdown the connection. This will ensure we don't get any
+ * more events from the provider.
+ */
+ rdma_disconnect(rdma->sc_cm_id);
+ rdma_destroy_id(rdma->sc_cm_id);
+
+ /* We may already be on the DTO list */
+ spin_lock_irqsave(&dto_lock, flags);
+ if (!list_empty(&rdma->sc_dto_q))
+ list_del_init(&rdma->sc_dto_q);
+ spin_unlock_irqrestore(&dto_lock, flags);
+}
+
+static void svc_rdma_free(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt;
+ dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
+ rdma_destroy_xprt(rdma);
+ kfree(rdma);
+}
+
+static void rdma_destroy_xprt(struct svcxprt_rdma *xprt)
+{
+ if (xprt->sc_qp && !IS_ERR(xprt->sc_qp))
+ ib_destroy_qp(xprt->sc_qp);
+
+ if (xprt->sc_sq_cq && !IS_ERR(xprt->sc_sq_cq))
+ ib_destroy_cq(xprt->sc_sq_cq);
+
+ if (xprt->sc_rq_cq && !IS_ERR(xprt->sc_rq_cq))
+ ib_destroy_cq(xprt->sc_rq_cq);
+
+ if (xprt->sc_phys_mr && !IS_ERR(xprt->sc_phys_mr))
+ ib_dereg_mr(xprt->sc_phys_mr);
+
+ if (xprt->sc_pd && !IS_ERR(xprt->sc_pd))
+ ib_dealloc_pd(xprt->sc_pd);
+
+ destroy_context_cache(xprt->sc_ctxt_head);
+}
+
+static int svc_rdma_has_wspace(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+
+ /*
+ * If there are fewer SQ WR available than required to send a
+ * simple response, return false.
+ */
+ if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3))
+ return 0;
+
+ /*
+ * ...or there are already waiters on the SQ,
+ * return false.
+ */
+ if (waitqueue_active(&rdma->sc_send_wait))
+ return 0;
+
+ /* Otherwise return true. */
+ return 1;
+}
+
+int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
+{
+ struct ib_send_wr *bad_wr;
+ int ret;
+
+ if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
+ return 0;
+
+ BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
+ BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op !=
+ wr->opcode);
+ /* If the SQ is full, wait until an SQ entry is available */
+ while (1) {
+ spin_lock_bh(&xprt->sc_lock);
+ if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
+ spin_unlock_bh(&xprt->sc_lock);
+ atomic_inc(&rdma_stat_sq_starve);
+ /* See if we can reap some SQ WR */
+ sq_cq_reap(xprt);
+
+ /* Wait until SQ WR available if SQ still full */
+ wait_event(xprt->sc_send_wait,
+ atomic_read(&xprt->sc_sq_count) <
+ xprt->sc_sq_depth);
+ continue;
+ }
+ /* Bumped used SQ WR count and post */
+ ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
+ if (!ret)
+ atomic_inc(&xprt->sc_sq_count);
+ else
+ dprintk("svcrdma: failed to post SQ WR rc=%d, "
+ "sc_sq_count=%d, sc_sq_depth=%d\n",
+ ret, atomic_read(&xprt->sc_sq_count),
+ xprt->sc_sq_depth);
+ spin_unlock_bh(&xprt->sc_lock);
+ break;
+ }
+ return ret;
+}
+
+int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+ enum rpcrdma_errcode err)
+{
+ struct ib_send_wr err_wr;
+ struct ib_sge sge;
+ struct page *p;
+ struct svc_rdma_op_ctxt *ctxt;
+ u32 *va;
+ int length;
+ int ret;
+
+ p = svc_rdma_get_page();
+ va = page_address(p);
+
+ /* XDR encode error */
+ length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
+
+ /* Prepare SGE for local address */
+ sge.addr = ib_dma_map_page(xprt->sc_cm_id->device,
+ p, 0, PAGE_SIZE, DMA_FROM_DEVICE);
+ sge.lkey = xprt->sc_phys_mr->lkey;
+ sge.length = length;
+
+ ctxt = svc_rdma_get_context(xprt);
+ ctxt->count = 1;
+ ctxt->pages[0] = p;
+
+ /* Prepare SEND WR */
+ memset(&err_wr, 0, sizeof err_wr);
+ ctxt->wr_op = IB_WR_SEND;
+ err_wr.wr_id = (unsigned long)ctxt;
+ err_wr.sg_list = &sge;
+ err_wr.num_sge = 1;
+ err_wr.opcode = IB_WR_SEND;
+ err_wr.send_flags = IB_SEND_SIGNALED;
+
+ /* Post It */
+ ret = svc_rdma_send(xprt, &err_wr);
+ if (ret) {
+ dprintk("svcrdma: Error posting send = %d\n", ret);
+ svc_rdma_put_context(ctxt, 1);
+ }
+
+ return ret;
+}