diff options
author | Andy Grover <andy.grover@oracle.com> | 2010-01-12 14:33:38 -0800 |
---|---|---|
committer | Andy Grover <andy.grover@oracle.com> | 2010-09-08 18:11:41 -0700 |
commit | 15133f6e67d8d646d0744336b4daa3135452cb0d (patch) | |
tree | e5675d5a3ab240edc9a66af6b891dd75fa9eabae /net | |
parent | a63273d4992603979ddb181b6a8f07082839b39f (diff) |
RDS: Implement atomic operations
Implement a CMSG-based interface to do FADD and CSWP ops.
Alter send routines to handle atomic ops.
Add atomic counters to stats.
Add xmit_atomic() to struct rds_transport
Inline rds_ib_send_unmap_rdma into unmap_rm
Signed-off-by: Andy Grover <andy.grover@oracle.com>
Diffstat (limited to 'net')
-rw-r--r-- | net/rds/ib.c | 1 | ||||
-rw-r--r-- | net/rds/ib.h | 1 | ||||
-rw-r--r-- | net/rds/ib_rdma.c | 4 | ||||
-rw-r--r-- | net/rds/ib_send.c | 140 | ||||
-rw-r--r-- | net/rds/rdma.c | 73 | ||||
-rw-r--r-- | net/rds/rds.h | 33 | ||||
-rw-r--r-- | net/rds/send.c | 71 | ||||
-rw-r--r-- | net/rds/stats.c | 2 |
8 files changed, 302 insertions, 23 deletions
diff --git a/net/rds/ib.c b/net/rds/ib.c index 8f2d6dd7700a..f0d29656baff 100644 --- a/net/rds/ib.c +++ b/net/rds/ib.c @@ -264,6 +264,7 @@ struct rds_transport rds_ib_transport = { .xmit = rds_ib_xmit, .xmit_cong_map = NULL, .xmit_rdma = rds_ib_xmit_rdma, + .xmit_atomic = rds_ib_xmit_atomic, .recv = rds_ib_recv, .conn_alloc = rds_ib_conn_alloc, .conn_free = rds_ib_conn_free, diff --git a/net/rds/ib.h b/net/rds/ib.h index 64df4e79b29f..d2fd0aa4fde7 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -336,6 +336,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits); void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted); int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted, u32 *adv_credits, int need_posted, int max_posted); +int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op); /* ib_stats.c */ DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats); diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c index 0f3b5a2f3fe0..242231f09464 100644 --- a/net/rds/ib_rdma.c +++ b/net/rds/ib_rdma.c @@ -298,7 +298,9 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev) ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd, (IB_ACCESS_LOCAL_WRITE | IB_ACCESS_REMOTE_READ | - IB_ACCESS_REMOTE_WRITE), + IB_ACCESS_REMOTE_WRITE| + IB_ACCESS_REMOTE_ATOMIC), + &pool->fmr_attr); if (IS_ERR(ibmr->fmr)) { err = PTR_ERR(ibmr->fmr); diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index f0edfdb2866c..b2bd164434ad 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -62,15 +62,17 @@ static void rds_ib_send_rdma_complete(struct rds_message *rm, rds_rdma_send_complete(rm, notify_status); } -static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic, - struct rds_rdma_op *op) +static void rds_ib_send_atomic_complete(struct rds_message *rm, + int wc_status) { - if (op->r_mapped) { - ib_dma_unmap_sg(ic->i_cm_id->device, - op->r_sg, op->r_nents, - op->r_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE); - op->r_mapped = 0; - } + int notify_status; + + if (wc_status != IB_WC_SUCCESS) + notify_status = RDS_RDMA_OTHER_ERROR; + else + notify_status = RDS_RDMA_SUCCESS; + + rds_atomic_send_complete(rm, notify_status); } static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic, @@ -86,7 +88,14 @@ static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic, DMA_TO_DEVICE); if (rm->rdma.m_rdma_op.r_active) { - rds_ib_send_unmap_rdma(ic, &rm->rdma.m_rdma_op); + struct rds_rdma_op *op = &rm->rdma.m_rdma_op; + + if (op->r_mapped) { + ib_dma_unmap_sg(ic->i_cm_id->device, + op->r_sg, op->r_nents, + op->r_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE); + op->r_mapped = 0; + } /* If the user asked for a completion notification on this * message, we can implement three different semantics: @@ -116,6 +125,24 @@ static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic, rds_stats_add(s_recv_rdma_bytes, rm->rdma.m_rdma_op.r_bytes); } + if (rm->atomic.op_active) { + struct rm_atomic_op *op = &rm->atomic; + + /* unmap atomic recvbuf */ + if (op->op_mapped) { + ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1, + DMA_FROM_DEVICE); + op->op_mapped = 0; + } + + rds_ib_send_atomic_complete(rm, wc_status); + + if (rm->atomic.op_type == RDS_ATOMIC_TYPE_CSWP) + rds_stats_inc(s_atomic_cswp); + else + rds_stats_inc(s_atomic_fadd); + } + /* If anyone waited for this message to get flushed out, wake * them up now */ rds_message_unmapped(rm); @@ -158,12 +185,9 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic) u32 i; for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { - if (send->s_wr.opcode == 0xdead) + if (!send->s_rm || send->s_wr.opcode == 0xdead) continue; - if (send->s_rm) - rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR); - if (send->s_op) - rds_ib_send_unmap_rdma(ic, send->s_op); + rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR); } } @@ -218,6 +242,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) break; case IB_WR_RDMA_WRITE: case IB_WR_RDMA_READ: + case IB_WR_ATOMIC_FETCH_AND_ADD: + case IB_WR_ATOMIC_CMP_AND_SWP: /* Nothing to be done - the SG list will be unmapped * when the SEND completes. */ break; @@ -243,8 +269,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) rm = rds_send_get_message(conn, send->s_op); if (rm) { - if (rm->rdma.m_rdma_op.r_active) - rds_ib_send_unmap_rdma(ic, &rm->rdma.m_rdma_op); + rds_ib_send_unmap_rm(ic, send, wc.status); rds_ib_send_rdma_complete(rm, wc.status); rds_message_put(rm); } @@ -736,6 +761,89 @@ out: return ret; } +/* + * Issue atomic operation. + * A simplified version of the rdma case, we always map 1 SG, and + * only 8 bytes, for the return value from the atomic operation. + */ +int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) +{ + struct rds_ib_connection *ic = conn->c_transport_data; + struct rds_ib_send_work *send = NULL; + struct ib_send_wr *failed_wr; + struct rds_ib_device *rds_ibdev; + u32 pos; + u32 work_alloc; + int ret; + + rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); + + work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, &pos); + if (work_alloc != 1) { + rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); + rds_ib_stats_inc(s_ib_tx_ring_full); + ret = -ENOMEM; + goto out; + } + + /* address of send request in ring */ + send = &ic->i_sends[pos]; + send->s_queued = jiffies; + + if (op->op_type == RDS_ATOMIC_TYPE_CSWP) { + send->s_wr.opcode = IB_WR_ATOMIC_CMP_AND_SWP; + send->s_wr.wr.atomic.compare_add = op->op_compare; + send->s_wr.wr.atomic.swap = op->op_swap_add; + } else { /* FADD */ + send->s_wr.opcode = IB_WR_ATOMIC_FETCH_AND_ADD; + send->s_wr.wr.atomic.compare_add = op->op_swap_add; + send->s_wr.wr.atomic.swap = 0; + } + send->s_wr.send_flags = IB_SEND_SIGNALED; + send->s_wr.num_sge = 1; + send->s_wr.next = NULL; + send->s_wr.wr.atomic.remote_addr = op->op_remote_addr; + send->s_wr.wr.atomic.rkey = op->op_rkey; + + /* map 8 byte retval buffer to the device */ + ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE); + rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret); + if (ret != 1) { + rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); + rds_ib_stats_inc(s_ib_tx_sg_mapping_failure); + ret = -ENOMEM; /* XXX ? */ + goto out; + } + + /* Convert our struct scatterlist to struct ib_sge */ + send->s_sge[0].addr = ib_sg_dma_address(ic->i_cm_id->device, op->op_sg); + send->s_sge[0].length = ib_sg_dma_len(ic->i_cm_id->device, op->op_sg); + send->s_sge[0].lkey = ic->i_mr->lkey; + + rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr, + send->s_sge[0].addr, send->s_sge[0].length); + + failed_wr = &send->s_wr; + ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr); + rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic, + send, &send->s_wr, ret, failed_wr); + BUG_ON(failed_wr != &send->s_wr); + if (ret) { + printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 " + "returned %d\n", &conn->c_faddr, ret); + rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); + goto out; + } + + if (unlikely(failed_wr != &send->s_wr)) { + printk(KERN_WARNING "RDS/IB: atomic ib_post_send() rc=%d, but failed_wqe updated!\n", ret); + BUG_ON(failed_wr != &send->s_wr); + } + +out: + return ret; +} + int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op) { struct rds_ib_connection *ic = conn->c_transport_data; diff --git a/net/rds/rdma.c b/net/rds/rdma.c index 4fda33045598..a7019df38c70 100644 --- a/net/rds/rdma.c +++ b/net/rds/rdma.c @@ -719,3 +719,76 @@ int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm, return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->rdma.m_rdma_mr); } + +/* + * Fill in rds_message for an atomic request. + */ +int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm, + struct cmsghdr *cmsg) +{ + struct page *page = NULL; + struct rds_atomic_args *args; + int ret = 0; + + if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_atomic_args)) + || rm->atomic.op_active) + return -EINVAL; + + args = CMSG_DATA(cmsg); + + if (cmsg->cmsg_type == RDS_CMSG_ATOMIC_CSWP) { + rm->atomic.op_type = RDS_ATOMIC_TYPE_CSWP; + rm->atomic.op_swap_add = args->cswp.swap; + rm->atomic.op_compare = args->cswp.compare; + } else { + rm->atomic.op_type = RDS_ATOMIC_TYPE_FADD; + rm->atomic.op_swap_add = args->fadd.add; + } + + rm->m_rdma_cookie = args->cookie; + rm->atomic.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME); + rm->atomic.op_recverr = rs->rs_recverr; + rm->atomic.op_sg = rds_message_alloc_sgs(rm, 1); + + /* verify 8 byte-aligned */ + if (args->local_addr & 0x7) { + ret = -EFAULT; + goto err; + } + + ret = rds_pin_pages(args->local_addr, 1, &page, 1); + if (ret != 1) + goto err; + ret = 0; + + sg_set_page(rm->atomic.op_sg, page, 8, offset_in_page(args->local_addr)); + + if (rm->atomic.op_notify || rm->atomic.op_recverr) { + /* We allocate an uninitialized notifier here, because + * we don't want to do that in the completion handler. We + * would have to use GFP_ATOMIC there, and don't want to deal + * with failed allocations. + */ + rm->atomic.op_notifier = kmalloc(sizeof(*rm->atomic.op_notifier), GFP_KERNEL); + if (!rm->atomic.op_notifier) { + ret = -ENOMEM; + goto err; + } + + rm->atomic.op_notifier->n_user_token = args->user_token; + rm->atomic.op_notifier->n_status = RDS_RDMA_SUCCESS; + } + + rm->atomic.op_rkey = rds_rdma_cookie_key(rm->m_rdma_cookie); + rm->atomic.op_remote_addr = args->remote_addr + rds_rdma_cookie_offset(args->cookie); + + rm->atomic.op_active = 1; + + return ret; +err: + if (page) + put_page(page); + kfree(rm->atomic.op_notifier); + + return ret; +} diff --git a/net/rds/rds.h b/net/rds/rds.h index 0bb4957e0cfc..830e2bbb3332 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -97,6 +97,7 @@ struct rds_connection { unsigned int c_xmit_hdr_off; unsigned int c_xmit_data_off; unsigned int c_xmit_rdma_sent; + unsigned int c_xmit_atomic_sent; spinlock_t c_lock; /* protect msg queues */ u64 c_next_tx_seq; @@ -260,6 +261,10 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie) return cookie >> 32; } +/* atomic operation types */ +#define RDS_ATOMIC_TYPE_CSWP 0 +#define RDS_ATOMIC_TYPE_FADD 1 + /* * m_sock_item and m_conn_item are on lists that are serialized under * conn->c_lock. m_sock_item has additional meaning in that once it is empty @@ -315,11 +320,27 @@ struct rds_message { struct rds_sock *m_rs; rds_rdma_cookie_t m_rdma_cookie; struct { - struct { + struct rm_atomic_op { + int op_type; + uint64_t op_swap_add; + uint64_t op_compare; + + u32 op_rkey; + u64 op_remote_addr; + unsigned int op_notify:1; + unsigned int op_recverr:1; + unsigned int op_mapped:1; + unsigned int op_active:1; + struct rds_notifier *op_notifier; + struct scatterlist *op_sg; + + struct rds_mr *op_rdma_mr; + } atomic; + struct rm_rdma_op { struct rds_rdma_op m_rdma_op; struct rds_mr *m_rdma_mr; } rdma; - struct { + struct rm_data_op { unsigned int m_nents; unsigned int m_count; struct scatterlist *m_sg; @@ -397,6 +418,7 @@ struct rds_transport { int (*xmit_cong_map)(struct rds_connection *conn, struct rds_cong_map *map, unsigned long offset); int (*xmit_rdma)(struct rds_connection *conn, struct rds_rdma_op *op); + int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op); int (*recv)(struct rds_connection *conn); int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov, size_t size); @@ -546,6 +568,8 @@ struct rds_statistics { uint64_t s_cong_update_received; uint64_t s_cong_send_error; uint64_t s_cong_send_blocked; + uint64_t s_atomic_cswp; + uint64_t s_atomic_fadd; }; /* af_rds.c */ @@ -722,7 +746,10 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm, int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm, struct cmsghdr *cmsg); void rds_rdma_free_op(struct rds_rdma_op *ro); -void rds_rdma_send_complete(struct rds_message *rm, int); +void rds_rdma_send_complete(struct rds_message *rm, int wc_status); +void rds_atomic_send_complete(struct rds_message *rm, int wc_status); +int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm, + struct cmsghdr *cmsg); extern void __rds_put_mr_final(struct rds_mr *mr); static inline void rds_mr_put(struct rds_mr *mr) diff --git a/net/rds/send.c b/net/rds/send.c index b751a8e77c41..f3f4e79274bf 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -73,6 +73,7 @@ void rds_send_reset(struct rds_connection *conn) conn->c_xmit_hdr_off = 0; conn->c_xmit_data_off = 0; conn->c_xmit_rdma_sent = 0; + conn->c_xmit_atomic_sent = 0; conn->c_map_queued = 0; @@ -171,6 +172,7 @@ int rds_send_xmit(struct rds_connection *conn) conn->c_xmit_hdr_off = 0; conn->c_xmit_data_off = 0; conn->c_xmit_rdma_sent = 0; + conn->c_xmit_atomic_sent = 0; /* Release the reference to the previous message. */ rds_message_put(rm); @@ -262,6 +264,17 @@ int rds_send_xmit(struct rds_connection *conn) conn->c_xmit_rm = rm; } + + if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { + ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); + if (ret) + break; + conn->c_xmit_atomic_sent = 1; + /* The transport owns the mapped memory for now. + * You can't unmap it while it's on the send queue */ + set_bit(RDS_MSG_MAPPED, &rm->m_flags); + } + /* * Try and send an rdma message. Let's see if we can * keep this simple and require that the transport either @@ -443,6 +456,41 @@ void rds_rdma_send_complete(struct rds_message *rm, int status) EXPORT_SYMBOL_GPL(rds_rdma_send_complete); /* + * Just like above, except looks at atomic op + */ +void rds_atomic_send_complete(struct rds_message *rm, int status) +{ + struct rds_sock *rs = NULL; + struct rm_atomic_op *ao; + struct rds_notifier *notifier; + + spin_lock(&rm->m_rs_lock); + + ao = &rm->atomic; + if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) + && ao->op_active && ao->op_notify && ao->op_notifier) { + notifier = ao->op_notifier; + rs = rm->m_rs; + sock_hold(rds_rs_to_sk(rs)); + + notifier->n_status = status; + spin_lock(&rs->rs_lock); + list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); + spin_unlock(&rs->rs_lock); + + ao->op_notifier = NULL; + } + + spin_unlock(&rm->m_rs_lock); + + if (rs) { + rds_wake_sk_sleep(rs); + sock_put(rds_rs_to_sk(rs)); + } +} +EXPORT_SYMBOL_GPL(rds_atomic_send_complete); + +/* * This is the same as rds_rdma_send_complete except we * don't do any locking - we have all the ingredients (message, * socket, socket lock) and can just move the notifier. @@ -788,6 +836,11 @@ static int rds_rm_size(struct msghdr *msg, int data_len) /* these are valid but do no add any size */ break; + case RDS_CMSG_ATOMIC_CSWP: + case RDS_CMSG_ATOMIC_FADD: + size += sizeof(struct scatterlist); + break; + default: return -EINVAL; } @@ -813,7 +866,7 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, continue; /* As a side effect, RDMA_DEST and RDMA_MAP will set - * rm->m_rdma_cookie and rm->m_rdma_mr. + * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr. */ switch (cmsg->cmsg_type) { case RDS_CMSG_RDMA_ARGS: @@ -829,6 +882,10 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, if (!ret) *allocated_mr = 1; break; + case RDS_CMSG_ATOMIC_CSWP: + case RDS_CMSG_ATOMIC_FADD: + ret = rds_cmsg_atomic(rs, rm, cmsg); + break; default: return -EINVAL; @@ -926,10 +983,18 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, goto out; if ((rm->m_rdma_cookie || rm->rdma.m_rdma_op.r_active) && - !conn->c_trans->xmit_rdma) { + !conn->c_trans->xmit_rdma) { if (printk_ratelimit()) printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", - &rm->rdma.m_rdma_op, conn->c_trans->xmit_rdma); + &rm->rdma.m_rdma_op, conn->c_trans->xmit_rdma); + ret = -EOPNOTSUPP; + goto out; + } + + if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) { + if (printk_ratelimit()) + printk(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n", + &rm->atomic, conn->c_trans->xmit_atomic); ret = -EOPNOTSUPP; goto out; } diff --git a/net/rds/stats.c b/net/rds/stats.c index 7598eb07cfb1..c66d95d9c262 100644 --- a/net/rds/stats.c +++ b/net/rds/stats.c @@ -75,6 +75,8 @@ static const char *const rds_stat_names[] = { "cong_update_received", "cong_send_error", "cong_send_blocked", + "s_atomic_cswp", + "s_atomic_fadd", }; void rds_stats_info_copy(struct rds_info_iterator *iter, |