summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakub Kicinski <kuba@kernel.org>2026-02-04 20:46:40 -0800
committerJakub Kicinski <kuba@kernel.org>2026-02-04 20:46:40 -0800
commit677bd4bf5c685d98d9a8ab74cc02bd17c996a4ef (patch)
treea087b1c14c56179d7141d332e56d54f83560bbbb
parentacd21dd2da198875e0cf73f08b18c9bbf8b87d8c (diff)
parent9d27a0fb122f19b6d01d02f4b4f429ca28811ace (diff)
Merge branch 'net-rds-rds-tcp-protocol-and-extension-improvements'
Allison Henderson says: ==================== net/rds: RDS-TCP protocol and extension improvements This is subset 3 of the larger RDS-TCP patch series I posted last Oct. The greater series aims to correct multiple rds-tcp issues that can cause dropped or out of sequence messages. I've broken it down into smaller sets to make reviews more manageable. In this set, we introduce extension headers for byte accounting and fix several RDS/TCP protocol issues including message preservation during connection transitions and multipath lane handling. The entire set can be viewed in the rfc here: https://lore.kernel.org/netdev/20251022191715.157755-1-achender@kernel.org/ ==================== Link: https://patch.msgid.link/20260203055723.1085751-1-achender@kernel.org Signed-off-by: Jakub Kicinski <kuba@kernel.org>
-rw-r--r--net/rds/connection.c7
-rw-r--r--net/rds/ib_send.c40
-rw-r--r--net/rds/message.c66
-rw-r--r--net/rds/rds.h105
-rw-r--r--net/rds/recv.c37
-rw-r--r--net/rds/send.c130
-rw-r--r--net/rds/stats.c1
-rw-r--r--net/rds/tcp.c1
-rw-r--r--net/rds/tcp.h7
-rw-r--r--net/rds/tcp_connect.c79
-rw-r--r--net/rds/tcp_listen.c92
-rw-r--r--net/rds/tcp_recv.c4
-rw-r--r--net/rds/tcp_send.c2
13 files changed, 450 insertions, 121 deletions
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 3f26a67f3180..185f73b01694 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -442,16 +442,21 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
* to the conn hash, so we never trigger a reconnect on this
* conn - the reconnect is always triggered by the active peer. */
cancel_delayed_work_sync(&cp->cp_conn_w);
+
+ clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
rcu_read_lock();
if (!hlist_unhashed(&conn->c_hash_node)) {
rcu_read_unlock();
+ if (conn->c_trans->t_mp_capable &&
+ cp->cp_index == 0)
+ rds_send_ping(conn, 0);
rds_queue_reconnect(cp);
} else {
rcu_read_unlock();
}
if (conn->c_trans->conn_slots_available)
- conn->c_trans->conn_slots_available(conn);
+ conn->c_trans->conn_slots_available(conn, false);
}
/* destroy a single rds_conn_path. rds_conn_destroy() iterates over
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index f9d28ddd168d..fcd04c29f543 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -577,16 +577,42 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
/* If it has a RDMA op, tell the peer we did it. This is
* used by the peer to release use-once RDMA MRs. */
if (rm->rdma.op_active) {
- struct rds_ext_header_rdma ext_hdr;
+ struct rds_ext_header_rdma ext_hdr = {};
+ struct rds_ext_header_rdma_bytes
+ rdma_bytes_ext_hdr = {};
ext_hdr.h_rdma_rkey = cpu_to_be32(rm->rdma.op_rkey);
- rds_message_add_extension(&rm->m_inc.i_hdr,
- RDS_EXTHDR_RDMA, &ext_hdr, sizeof(ext_hdr));
+ if (rds_message_add_extension(&rm->m_inc.i_hdr,
+ RDS_EXTHDR_RDMA,
+ &ext_hdr)) {
+ /* prepare the rdma bytes ext header */
+ rdma_bytes_ext_hdr.h_rflags =
+ rm->rdma.op_write ?
+ RDS_FLAG_RDMA_WR_BYTES :
+ RDS_FLAG_RDMA_RD_BYTES;
+ rdma_bytes_ext_hdr.h_rdma_bytes =
+ cpu_to_be32(rm->rdma.op_bytes);
+ } else {
+ rdsdebug("RDS_EXTHDR_RDMA dropped");
+ }
+
+ if (rds_message_add_extension(&rm->m_inc.i_hdr,
+ RDS_EXTHDR_RDMA_BYTES,
+ &rdma_bytes_ext_hdr)) {
+ /* rdma bytes ext header was added successfully,
+ * notify the remote side via flag in header
+ */
+ rm->m_inc.i_hdr.h_flags |=
+ RDS_FLAG_EXTHDR_EXTENSION;
+ } else {
+ rdsdebug("RDS_EXTHDR_RDMA_BYTES dropped");
+ }
}
- if (rm->m_rdma_cookie) {
- rds_message_add_rdma_dest_extension(&rm->m_inc.i_hdr,
- rds_rdma_cookie_key(rm->m_rdma_cookie),
- rds_rdma_cookie_offset(rm->m_rdma_cookie));
+ if (rm->m_rdma_cookie &&
+ !rds_message_add_rdma_dest_extension(&rm->m_inc.i_hdr,
+ rds_rdma_cookie_key(rm->m_rdma_cookie),
+ rds_rdma_cookie_offset(rm->m_rdma_cookie))) {
+ rdsdebug("RDS_EXTHDR_RDMA_DEST dropped\n");
}
/* Note - rds_ib_piggyb_ack clears the ACK_REQUIRED bit, so
diff --git a/net/rds/message.c b/net/rds/message.c
index 199a899a43e9..54fd000806ea 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -44,8 +44,10 @@ static unsigned int rds_exthdr_size[__RDS_EXTHDR_MAX] = {
[RDS_EXTHDR_VERSION] = sizeof(struct rds_ext_header_version),
[RDS_EXTHDR_RDMA] = sizeof(struct rds_ext_header_rdma),
[RDS_EXTHDR_RDMA_DEST] = sizeof(struct rds_ext_header_rdma_dest),
+[RDS_EXTHDR_RDMA_BYTES] = sizeof(struct rds_ext_header_rdma_bytes),
[RDS_EXTHDR_NPATHS] = sizeof(__be16),
[RDS_EXTHDR_GEN_NUM] = sizeof(__be32),
+[RDS_EXTHDR_SPORT_IDX] = 1,
};
void rds_message_addref(struct rds_message *rm)
@@ -191,31 +193,69 @@ void rds_message_populate_header(struct rds_header *hdr, __be16 sport,
hdr->h_sport = sport;
hdr->h_dport = dport;
hdr->h_sequence = cpu_to_be64(seq);
- hdr->h_exthdr[0] = RDS_EXTHDR_NONE;
+ /* see rds_find_next_ext_space for reason why we memset the
+ * ext header
+ */
+ memset(hdr->h_exthdr, RDS_EXTHDR_NONE, RDS_HEADER_EXT_SPACE);
}
EXPORT_SYMBOL_GPL(rds_message_populate_header);
-int rds_message_add_extension(struct rds_header *hdr, unsigned int type,
- const void *data, unsigned int len)
+/*
+ * Find the next place we can add an RDS header extension with
+ * specific length. Extension headers are pushed one after the
+ * other. In the following, the number after the colon is the number
+ * of bytes:
+ *
+ * [ type1:1 dta1:len1 [ type2:1 dta2:len2 ] ... ] RDS_EXTHDR_NONE
+ *
+ * If the extension headers fill the complete extension header space
+ * (16 bytes), the trailing RDS_EXTHDR_NONE is omitted.
+ */
+static int rds_find_next_ext_space(struct rds_header *hdr, unsigned int len,
+ u8 **ext_start)
{
- unsigned int ext_len = sizeof(u8) + len;
- unsigned char *dst;
+ unsigned int ext_len;
+ unsigned int type;
+ int ind = 0;
+
+ while ((ind + 1 + len) <= RDS_HEADER_EXT_SPACE) {
+ if (hdr->h_exthdr[ind] == RDS_EXTHDR_NONE) {
+ *ext_start = hdr->h_exthdr + ind;
+ return 0;
+ }
- /* For now, refuse to add more than one extension header */
- if (hdr->h_exthdr[0] != RDS_EXTHDR_NONE)
- return 0;
+ type = hdr->h_exthdr[ind];
+
+ ext_len = (type < __RDS_EXTHDR_MAX) ? rds_exthdr_size[type] : 0;
+ WARN_ONCE(!ext_len, "Unknown ext hdr type %d\n", type);
+ if (!ext_len)
+ return -EINVAL;
+
+ /* ind points to a valid ext hdr with known length */
+ ind += 1 + ext_len;
+ }
+
+ /* no room for extension */
+ return -ENOSPC;
+}
+
+/* The ext hdr space is prefilled with zero from the kzalloc() */
+int rds_message_add_extension(struct rds_header *hdr,
+ unsigned int type, const void *data)
+{
+ unsigned char *dst;
+ unsigned int len;
- if (type >= __RDS_EXTHDR_MAX || len != rds_exthdr_size[type])
+ len = (type < __RDS_EXTHDR_MAX) ? rds_exthdr_size[type] : 0;
+ if (!len)
return 0;
- if (ext_len >= RDS_HEADER_EXT_SPACE)
+ if (rds_find_next_ext_space(hdr, len, &dst))
return 0;
- dst = hdr->h_exthdr;
*dst++ = type;
memcpy(dst, data, len);
- dst[len] = RDS_EXTHDR_NONE;
return 1;
}
EXPORT_SYMBOL_GPL(rds_message_add_extension);
@@ -272,7 +312,7 @@ int rds_message_add_rdma_dest_extension(struct rds_header *hdr, u32 r_key, u32 o
ext_hdr.h_rdma_rkey = cpu_to_be32(r_key);
ext_hdr.h_rdma_offset = cpu_to_be32(offset);
- return rds_message_add_extension(hdr, RDS_EXTHDR_RDMA_DEST, &ext_hdr, sizeof(ext_hdr));
+ return rds_message_add_extension(hdr, RDS_EXTHDR_RDMA_DEST, &ext_hdr);
}
EXPORT_SYMBOL_GPL(rds_message_add_rdma_dest_extension);
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 8a549fe687ac..6e0790e4b570 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -147,6 +147,7 @@ struct rds_connection {
c_ping_triggered:1,
c_pad_to_32:29;
int c_npaths;
+ bool c_with_sport_idx;
struct rds_connection *c_passive;
struct rds_transport *c_trans;
@@ -169,6 +170,8 @@ struct rds_connection {
u32 c_my_gen_num;
u32 c_peer_gen_num;
+
+ u64 c_cp0_mprds_catchup_tx_seq;
};
static inline
@@ -183,10 +186,11 @@ void rds_conn_net_set(struct rds_connection *conn, struct net *net)
write_pnet(&conn->c_net, net);
}
-#define RDS_FLAG_CONG_BITMAP 0x01
-#define RDS_FLAG_ACK_REQUIRED 0x02
-#define RDS_FLAG_RETRANSMITTED 0x04
-#define RDS_MAX_ADV_CREDIT 255
+#define RDS_FLAG_CONG_BITMAP 0x01
+#define RDS_FLAG_ACK_REQUIRED 0x02
+#define RDS_FLAG_RETRANSMITTED 0x04
+#define RDS_FLAG_EXTHDR_EXTENSION 0x20
+#define RDS_MAX_ADV_CREDIT 255
/* RDS_FLAG_PROBE_PORT is the reserved sport used for sending a ping
* probe to exchange control information before establishing a connection.
@@ -258,13 +262,29 @@ struct rds_ext_header_rdma_dest {
__be32 h_rdma_offset;
};
+/*
+ * This extension header tells the peer about delivered RDMA byte count.
+ */
+#define RDS_EXTHDR_RDMA_BYTES 4
+
+struct rds_ext_header_rdma_bytes {
+ __be32 h_rdma_bytes; /* byte count */
+ u8 h_rflags; /* direction of RDMA, write or read */
+ u8 h_pad[3];
+};
+
+#define RDS_FLAG_RDMA_WR_BYTES 0x01
+#define RDS_FLAG_RDMA_RD_BYTES 0x02
+
/* Extension header announcing number of paths.
* Implicit length = 2 bytes.
*/
#define RDS_EXTHDR_NPATHS 5
#define RDS_EXTHDR_GEN_NUM 6
+#define RDS_EXTHDR_SPORT_IDX 8
#define __RDS_EXTHDR_MAX 16 /* for now */
+
#define RDS_RX_MAX_TRACES (RDS_MSG_RX_DGRAM_TRACE_MAX + 1)
#define RDS_MSG_RX_HDR 0
#define RDS_MSG_RX_START 1
@@ -529,7 +549,7 @@ struct rds_transport {
* messages received on the new socket are not discarded when no
* connection path was available at the time.
*/
- void (*conn_slots_available)(struct rds_connection *conn);
+ void (*conn_slots_available)(struct rds_connection *conn, bool fan_out);
int (*conn_path_connect)(struct rds_conn_path *cp);
/*
@@ -695,42 +715,43 @@ static inline int rds_sk_rcvbuf(struct rds_sock *rs)
}
struct rds_statistics {
- uint64_t s_conn_reset;
- uint64_t s_recv_drop_bad_checksum;
- uint64_t s_recv_drop_old_seq;
- uint64_t s_recv_drop_no_sock;
- uint64_t s_recv_drop_dead_sock;
- uint64_t s_recv_deliver_raced;
- uint64_t s_recv_delivered;
- uint64_t s_recv_queued;
- uint64_t s_recv_immediate_retry;
- uint64_t s_recv_delayed_retry;
- uint64_t s_recv_ack_required;
- uint64_t s_recv_rdma_bytes;
- uint64_t s_recv_ping;
- uint64_t s_send_queue_empty;
- uint64_t s_send_queue_full;
- uint64_t s_send_lock_contention;
- uint64_t s_send_lock_queue_raced;
- uint64_t s_send_immediate_retry;
- uint64_t s_send_delayed_retry;
- uint64_t s_send_drop_acked;
- uint64_t s_send_ack_required;
- uint64_t s_send_queued;
- uint64_t s_send_rdma;
- uint64_t s_send_rdma_bytes;
- uint64_t s_send_pong;
- uint64_t s_page_remainder_hit;
- uint64_t s_page_remainder_miss;
- uint64_t s_copy_to_user;
- uint64_t s_copy_from_user;
- uint64_t s_cong_update_queued;
- uint64_t s_cong_update_received;
- uint64_t s_cong_send_error;
- uint64_t s_cong_send_blocked;
- uint64_t s_recv_bytes_added_to_socket;
- uint64_t s_recv_bytes_removed_from_socket;
- uint64_t s_send_stuck_rm;
+ u64 s_conn_reset;
+ u64 s_recv_drop_bad_checksum;
+ u64 s_recv_drop_old_seq;
+ u64 s_recv_drop_no_sock;
+ u64 s_recv_drop_dead_sock;
+ u64 s_recv_deliver_raced;
+ u64 s_recv_delivered;
+ u64 s_recv_queued;
+ u64 s_recv_immediate_retry;
+ u64 s_recv_delayed_retry;
+ u64 s_recv_ack_required;
+ u64 s_recv_rdma_bytes;
+ u64 s_recv_ping;
+ u64 s_send_queue_empty;
+ u64 s_send_queue_full;
+ u64 s_send_lock_contention;
+ u64 s_send_lock_queue_raced;
+ u64 s_send_immediate_retry;
+ u64 s_send_delayed_retry;
+ u64 s_send_drop_acked;
+ u64 s_send_ack_required;
+ u64 s_send_queued;
+ u64 s_send_rdma;
+ u64 s_send_rdma_bytes;
+ u64 s_send_pong;
+ u64 s_page_remainder_hit;
+ u64 s_page_remainder_miss;
+ u64 s_copy_to_user;
+ u64 s_copy_from_user;
+ u64 s_cong_update_queued;
+ u64 s_cong_update_received;
+ u64 s_cong_send_error;
+ u64 s_cong_send_blocked;
+ u64 s_recv_bytes_added_to_socket;
+ u64 s_recv_bytes_removed_from_socket;
+ u64 s_send_stuck_rm;
+ u64 s_mprds_catchup_tx0_retries;
};
/* af_rds.c */
@@ -871,7 +892,7 @@ struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned in
void rds_message_populate_header(struct rds_header *hdr, __be16 sport,
__be16 dport, u64 seq);
int rds_message_add_extension(struct rds_header *hdr,
- unsigned int type, const void *data, unsigned int len);
+ unsigned int type, const void *data);
int rds_message_next_extension(struct rds_header *hdr,
unsigned int *pos, void *buf, unsigned int *buflen);
int rds_message_add_rdma_dest_extension(struct rds_header *hdr, u32 r_key, u32 offset);
diff --git a/net/rds/recv.c b/net/rds/recv.c
index 66680f652e74..4b3f9e4a8bfd 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -204,8 +204,14 @@ static void rds_recv_hs_exthdrs(struct rds_header *hdr,
struct rds_ext_header_version version;
__be16 rds_npaths;
__be32 rds_gen_num;
+ u8 dummy;
} buffer;
+ bool new_with_sport_idx = false;
u32 new_peer_gen_num = 0;
+ int new_npaths;
+ bool fan_out;
+
+ new_npaths = conn->c_npaths;
while (1) {
len = sizeof(buffer);
@@ -215,25 +221,48 @@ static void rds_recv_hs_exthdrs(struct rds_header *hdr,
/* Process extension header here */
switch (type) {
case RDS_EXTHDR_NPATHS:
- conn->c_npaths = min_t(int, RDS_MPATH_WORKERS,
- be16_to_cpu(buffer.rds_npaths));
+ new_npaths = min_t(int, RDS_MPATH_WORKERS,
+ be16_to_cpu(buffer.rds_npaths));
break;
case RDS_EXTHDR_GEN_NUM:
new_peer_gen_num = be32_to_cpu(buffer.rds_gen_num);
break;
+ case RDS_EXTHDR_SPORT_IDX:
+ new_with_sport_idx = true;
+ break;
default:
pr_warn_ratelimited("ignoring unknown exthdr type "
"0x%x\n", type);
}
}
+
+ conn->c_with_sport_idx = new_with_sport_idx;
+
+ if (new_npaths > 1 && new_npaths != conn->c_npaths) {
+ /* We're about to fan-out.
+ * Make sure that messages from cp_index#0
+ * are sent prior to handling other lanes.
+ */
+ struct rds_conn_path *cp0 = conn->c_path;
+ unsigned long flags;
+
+ spin_lock_irqsave(&cp0->cp_lock, flags);
+ conn->c_cp0_mprds_catchup_tx_seq = cp0->cp_next_tx_seq;
+ spin_unlock_irqrestore(&cp0->cp_lock, flags);
+ fan_out = true;
+ } else {
+ fan_out = false;
+ }
+
/* if RDS_EXTHDR_NPATHS was not found, default to a single-path */
- conn->c_npaths = max_t(int, conn->c_npaths, 1);
+ conn->c_npaths = max_t(int, new_npaths, 1);
+
conn->c_ping_triggered = 0;
rds_conn_peer_gen_update(conn, new_peer_gen_num);
if (conn->c_npaths > 1 &&
conn->c_trans->conn_slots_available)
- conn->c_trans->conn_slots_available(conn);
+ conn->c_trans->conn_slots_available(conn, fan_out);
}
/* rds_start_mprds() will synchronously start multiple paths when appropriate.
diff --git a/net/rds/send.c b/net/rds/send.c
index 3e3d028bc21e..6e96f108473e 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -120,6 +120,57 @@ static void release_in_xmit(struct rds_conn_path *cp)
}
/*
+ * Helper function for multipath fanout to ensure lane 0 transmits queued
+ * messages before other lanes to prevent out-of-order delivery.
+ *
+ * Returns true if lane 0 still has messages or false otherwise
+ */
+static bool rds_mprds_cp0_catchup(struct rds_connection *conn)
+{
+ struct rds_conn_path *cp0 = conn->c_path;
+ struct rds_message *rm0;
+ unsigned long flags;
+ bool ret = false;
+
+ spin_lock_irqsave(&cp0->cp_lock, flags);
+
+ /* the oldest / first message in the retransmit queue
+ * has to be at or beyond c_cp0_mprds_catchup_tx_seq
+ */
+ if (!list_empty(&cp0->cp_retrans)) {
+ rm0 = list_entry(cp0->cp_retrans.next, struct rds_message,
+ m_conn_item);
+ if (be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) <
+ conn->c_cp0_mprds_catchup_tx_seq) {
+ /* the retransmit queue of cp_index#0 has not
+ * quite caught up yet
+ */
+ ret = true;
+ goto unlock;
+ }
+ }
+
+ /* the oldest / first message of the send queue
+ * has to be at or beyond c_cp0_mprds_catchup_tx_seq
+ */
+ rm0 = cp0->cp_xmit_rm;
+ if (!rm0 && !list_empty(&cp0->cp_send_queue))
+ rm0 = list_entry(cp0->cp_send_queue.next, struct rds_message,
+ m_conn_item);
+ if (rm0 && be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) <
+ conn->c_cp0_mprds_catchup_tx_seq) {
+ /* the send queue of cp_index#0 has not quite
+ * caught up yet
+ */
+ ret = true;
+ }
+
+unlock:
+ spin_unlock_irqrestore(&cp0->cp_lock, flags);
+ return ret;
+}
+
+/*
* We're making the conscious trade-off here to only send one message
* down the connection at a time.
* Pro:
@@ -248,6 +299,14 @@ restart:
if (batch_count >= send_batch_count)
goto over_batch;
+ /* make sure cp_index#0 caught up during fan-out in
+ * order to avoid lane races
+ */
+ if (cp->cp_index > 0 && rds_mprds_cp0_catchup(conn)) {
+ rds_stats_inc(s_mprds_catchup_tx0_retries);
+ goto over_batch;
+ }
+
spin_lock_irqsave(&cp->cp_lock, flags);
if (!list_empty(&cp->cp_send_queue)) {
@@ -1042,39 +1101,6 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
return ret;
}
-static int rds_send_mprds_hash(struct rds_sock *rs,
- struct rds_connection *conn, int nonblock)
-{
- int hash;
-
- if (conn->c_npaths == 0)
- hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
- else
- hash = RDS_MPATH_HASH(rs, conn->c_npaths);
- if (conn->c_npaths == 0 && hash != 0) {
- rds_send_ping(conn, 0);
-
- /* The underlying connection is not up yet. Need to wait
- * until it is up to be sure that the non-zero c_path can be
- * used. But if we are interrupted, we have to use the zero
- * c_path in case the connection ends up being non-MP capable.
- */
- if (conn->c_npaths == 0) {
- /* Cannot wait for the connection be made, so just use
- * the base c_path.
- */
- if (nonblock)
- return 0;
- if (wait_event_interruptible(conn->c_hs_waitq,
- conn->c_npaths != 0))
- hash = 0;
- }
- if (conn->c_npaths == 1)
- hash = 0;
- }
- return hash;
-}
-
static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
{
struct rds_rdma_args *args;
@@ -1304,10 +1330,32 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
rs->rs_conn = conn;
}
- if (conn->c_trans->t_mp_capable)
- cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)];
- else
+ if (conn->c_trans->t_mp_capable) {
+ /* Use c_path[0] until we learn that
+ * the peer supports more (c_npaths > 1)
+ */
+ cpath = &conn->c_path[RDS_MPATH_HASH(rs, conn->c_npaths ? : 1)];
+ } else {
cpath = &conn->c_path[0];
+ }
+
+ /* If we're multipath capable and path 0 is down, queue reconnect
+ * and send a ping. This initiates the multipath handshake through
+ * rds_send_probe(), which sends RDS_EXTHDR_NPATHS to the peer,
+ * starting multipath capability negotiation.
+ */
+ if (conn->c_trans->t_mp_capable &&
+ !rds_conn_path_up(&conn->c_path[0])) {
+ /* Ensures that only one request is queued. And
+ * rds_send_ping() ensures that only one ping is
+ * outstanding.
+ */
+ if (!test_and_set_bit(RDS_RECONNECT_PENDING,
+ &conn->c_path[0].cp_flags))
+ queue_delayed_work(conn->c_path[0].cp_wq,
+ &conn->c_path[0].cp_conn_w, 0);
+ rds_send_ping(conn, 0);
+ }
rm->m_conn_path = cpath;
@@ -1457,14 +1505,16 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport,
cp->cp_conn->c_trans->t_mp_capable) {
__be16 npaths = cpu_to_be16(RDS_MPATH_WORKERS);
__be32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num);
+ u8 dummy = 0;
rds_message_add_extension(&rm->m_inc.i_hdr,
- RDS_EXTHDR_NPATHS, &npaths,
- sizeof(npaths));
+ RDS_EXTHDR_NPATHS, &npaths);
rds_message_add_extension(&rm->m_inc.i_hdr,
RDS_EXTHDR_GEN_NUM,
- &my_gen_num,
- sizeof(u32));
+ &my_gen_num);
+ rds_message_add_extension(&rm->m_inc.i_hdr,
+ RDS_EXTHDR_SPORT_IDX,
+ &dummy);
}
spin_unlock_irqrestore(&cp->cp_lock, flags);
diff --git a/net/rds/stats.c b/net/rds/stats.c
index cb2e3d2cdf73..24ee22d09e8c 100644
--- a/net/rds/stats.c
+++ b/net/rds/stats.c
@@ -79,6 +79,7 @@ static const char *const rds_stat_names[] = {
"recv_bytes_added_to_sock",
"recv_bytes_freed_fromsock",
"send_stuck_rm",
+ "mprds_catchup_tx0_retries",
};
void rds_stats_info_copy(struct rds_info_iterator *iter,
diff --git a/net/rds/tcp.c b/net/rds/tcp.c
index 31e7425e2da9..45484a93d75f 100644
--- a/net/rds/tcp.c
+++ b/net/rds/tcp.c
@@ -384,6 +384,7 @@ static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp)
tc->t_tinc = NULL;
tc->t_tinc_hdr_rem = sizeof(struct rds_header);
tc->t_tinc_data_rem = 0;
+ init_waitqueue_head(&tc->t_recv_done_waitq);
conn->c_path[i].cp_transport_data = tc;
tc->t_cpath = &conn->c_path[i];
diff --git a/net/rds/tcp.h b/net/rds/tcp.h
index 7d07128593b7..39c86347188c 100644
--- a/net/rds/tcp.h
+++ b/net/rds/tcp.h
@@ -34,6 +34,7 @@ struct rds_tcp_connection {
*/
struct mutex t_conn_path_lock;
struct socket *t_sock;
+ u32 t_client_port_group;
struct rds_tcp_net *t_rtn;
void *t_orig_write_space;
void *t_orig_data_ready;
@@ -54,6 +55,9 @@ struct rds_tcp_connection {
u32 t_last_sent_nxt;
u32 t_last_expected_una;
u32 t_last_seen_una;
+
+ /* for rds_tcp_conn_path_shutdown */
+ wait_queue_head_t t_recv_done_waitq;
};
struct rds_tcp_statistics {
@@ -86,7 +90,7 @@ void rds_tcp_state_change(struct sock *sk);
struct socket *rds_tcp_listen_init(struct net *net, bool isv6);
void rds_tcp_listen_stop(struct socket *sock, struct work_struct *acceptor);
void rds_tcp_listen_data_ready(struct sock *sk);
-void rds_tcp_conn_slots_available(struct rds_connection *conn);
+void rds_tcp_conn_slots_available(struct rds_connection *conn, bool fan_out);
int rds_tcp_accept_one(struct rds_tcp_net *rtn);
void rds_tcp_keepalive(struct socket *sock);
void *rds_tcp_listen_sock_def_readable(struct net *net);
@@ -104,6 +108,7 @@ void rds_tcp_xmit_path_prepare(struct rds_conn_path *cp);
void rds_tcp_xmit_path_complete(struct rds_conn_path *cp);
int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
unsigned int hdr_off, unsigned int sg, unsigned int off);
+int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack);
void rds_tcp_write_space(struct sock *sk);
/* tcp_stats.c */
diff --git a/net/rds/tcp_connect.c b/net/rds/tcp_connect.c
index 92891b0d224d..b77c88ffb199 100644
--- a/net/rds/tcp_connect.c
+++ b/net/rds/tcp_connect.c
@@ -75,8 +75,16 @@ void rds_tcp_state_change(struct sock *sk)
rds_connect_path_complete(cp, RDS_CONN_CONNECTING);
}
break;
+ case TCP_CLOSING:
+ case TCP_TIME_WAIT:
+ if (wq_has_sleeper(&tc->t_recv_done_waitq))
+ wake_up(&tc->t_recv_done_waitq);
+ break;
case TCP_CLOSE_WAIT:
+ case TCP_LAST_ACK:
case TCP_CLOSE:
+ if (wq_has_sleeper(&tc->t_recv_done_waitq))
+ wake_up(&tc->t_recv_done_waitq);
rds_conn_path_drop(cp, false);
break;
default:
@@ -93,6 +101,8 @@ int rds_tcp_conn_path_connect(struct rds_conn_path *cp)
struct sockaddr_in6 sin6;
struct sockaddr_in sin;
struct sockaddr *addr;
+ int port_low, port_high, port;
+ int port_groups, groups_left;
int addrlen;
bool isv6;
int ret;
@@ -145,7 +155,26 @@ int rds_tcp_conn_path_connect(struct rds_conn_path *cp)
addrlen = sizeof(sin);
}
- ret = kernel_bind(sock, (struct sockaddr_unsized *)addr, addrlen);
+ /* encode cp->cp_index in lowest bits of source-port */
+ inet_get_local_port_range(rds_conn_net(conn), &port_low, &port_high);
+ port_low = ALIGN(port_low, RDS_MPATH_WORKERS);
+ port_groups = (port_high - port_low + 1) / RDS_MPATH_WORKERS;
+ ret = -EADDRINUSE;
+ groups_left = port_groups;
+ while (groups_left-- > 0 && ret) {
+ if (++tc->t_client_port_group >= port_groups)
+ tc->t_client_port_group = 0;
+ port = port_low +
+ tc->t_client_port_group * RDS_MPATH_WORKERS +
+ cp->cp_index;
+
+ if (isv6)
+ sin6.sin6_port = htons(port);
+ else
+ sin.sin_port = htons(port);
+ ret = kernel_bind(sock, (struct sockaddr_unsized *)addr,
+ addrlen);
+ }
if (ret) {
rdsdebug("bind failed with %d at address %pI6c\n",
ret, &conn->c_laddr);
@@ -205,18 +234,58 @@ void rds_tcp_conn_path_shutdown(struct rds_conn_path *cp)
{
struct rds_tcp_connection *tc = cp->cp_transport_data;
struct socket *sock = tc->t_sock;
+ struct sock *sk;
+ unsigned int rounds;
rdsdebug("shutting down conn %p tc %p sock %p\n",
cp->cp_conn, tc, sock);
if (sock) {
+ sk = sock->sk;
if (rds_destroy_pending(cp->cp_conn))
- sock_no_linger(sock->sk);
- sock->ops->shutdown(sock, RCV_SHUTDOWN | SEND_SHUTDOWN);
- lock_sock(sock->sk);
+ sock_no_linger(sk);
+
+ sock->ops->shutdown(sock, SHUT_WR);
+
+ /* after sending FIN,
+ * wait until we processed all incoming messages
+ * and we're sure that there won't be any more:
+ * i.e. state CLOSING, TIME_WAIT, CLOSE_WAIT,
+ * LAST_ACK, or CLOSE (RFC 793).
+ *
+ * Give up waiting after 5 seconds and allow messages
+ * to theoretically get dropped, if the TCP transition
+ * didn't happen.
+ */
+ rounds = 0;
+ do {
+ /* we need to ensure messages are dequeued here
+ * since "rds_recv_worker" only dispatches messages
+ * while the connection is still in RDS_CONN_UP
+ * and there is no guarantee that "rds_tcp_data_ready"
+ * was called nor that "sk_data_ready" still points to
+ * it.
+ */
+ rds_tcp_recv_path(cp);
+ } while (!wait_event_timeout(tc->t_recv_done_waitq,
+ (sk->sk_state == TCP_CLOSING ||
+ sk->sk_state == TCP_TIME_WAIT ||
+ sk->sk_state == TCP_CLOSE_WAIT ||
+ sk->sk_state == TCP_LAST_ACK ||
+ sk->sk_state == TCP_CLOSE) &&
+ skb_queue_empty_lockless(&sk->sk_receive_queue),
+ msecs_to_jiffies(100)) &&
+ ++rounds < 50);
+ lock_sock(sk);
+
+ /* discard messages that the peer received already */
+ tc->t_last_seen_una = rds_tcp_snd_una(tc);
+ rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc),
+ rds_tcp_is_acked);
+
rds_tcp_restore_callbacks(sock, tc); /* tc->tc_sock = NULL */
- release_sock(sock->sk);
+ release_sock(sk);
sock_release(sock);
}
diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c
index 551c847f2890..6fb5c928b8fd 100644
--- a/net/rds/tcp_listen.c
+++ b/net/rds/tcp_listen.c
@@ -56,32 +56,79 @@ void rds_tcp_keepalive(struct socket *sock)
tcp_sock_set_keepintvl(sock->sk, keepidle);
}
+static int
+rds_tcp_get_peer_sport(struct socket *sock)
+{
+ union {
+ struct sockaddr_storage storage;
+ struct sockaddr addr;
+ struct sockaddr_in sin;
+ struct sockaddr_in6 sin6;
+ } saddr;
+ int sport;
+
+ if (kernel_getpeername(sock, &saddr.addr) >= 0) {
+ switch (saddr.addr.sa_family) {
+ case AF_INET:
+ sport = ntohs(saddr.sin.sin_port);
+ break;
+ case AF_INET6:
+ sport = ntohs(saddr.sin6.sin6_port);
+ break;
+ default:
+ sport = -1;
+ }
+ } else {
+ sport = -1;
+ }
+
+ return sport;
+}
+
/* rds_tcp_accept_one_path(): if accepting on cp_index > 0, make sure the
* client's ipaddr < server's ipaddr. Otherwise, close the accepted
* socket and force a reconneect from smaller -> larger ip addr. The reason
* we special case cp_index 0 is to allow the rds probe ping itself to itself
* get through efficiently.
*/
-static
-struct rds_tcp_connection *rds_tcp_accept_one_path(struct rds_connection *conn)
+static struct rds_tcp_connection *
+rds_tcp_accept_one_path(struct rds_connection *conn, struct socket *sock)
{
- int i;
- int npaths = max_t(int, 1, conn->c_npaths);
+ int sport, npaths, i_min, i_max, i;
+
+ if (conn->c_with_sport_idx)
+ /* cp->cp_index is encoded in lowest bits of source-port */
+ sport = rds_tcp_get_peer_sport(sock);
+ else
+ sport = -1;
+
+ npaths = max_t(int, 1, conn->c_npaths);
+
+ if (sport >= 0) {
+ i_min = sport % npaths;
+ i_max = i_min;
+ } else {
+ i_min = 0;
+ i_max = npaths - 1;
+ }
- for (i = 0; i < npaths; i++) {
+ for (i = i_min; i <= i_max; i++) {
struct rds_conn_path *cp = &conn->c_path[i];
if (rds_conn_path_transition(cp, RDS_CONN_DOWN,
RDS_CONN_CONNECTING))
return cp->cp_transport_data;
}
+
return NULL;
}
-void rds_tcp_conn_slots_available(struct rds_connection *conn)
+void rds_tcp_conn_slots_available(struct rds_connection *conn, bool fan_out)
{
struct rds_tcp_connection *tc;
struct rds_tcp_net *rtn;
+ struct socket *sock;
+ int sport, npaths;
if (rds_destroy_pending(conn))
return;
@@ -91,6 +138,21 @@ void rds_tcp_conn_slots_available(struct rds_connection *conn)
if (!rtn)
return;
+ sock = tc->t_sock;
+
+ /* During fan-out, check that the connection we already
+ * accepted in slot#0 carried the proper source port modulo.
+ */
+ if (fan_out && conn->c_with_sport_idx && sock &&
+ rds_addr_cmp(&conn->c_laddr, &conn->c_faddr) > 0) {
+ /* cp->cp_index is encoded in lowest bits of source-port */
+ sport = rds_tcp_get_peer_sport(sock);
+ npaths = max_t(int, 1, conn->c_npaths);
+ if (sport >= 0 && sport % npaths != 0)
+ /* peer initiated with a non-#0 lane first */
+ rds_conn_path_drop(conn->c_path, 0);
+ }
+
/* As soon as a connection went down,
* it is safe to schedule a "rds_tcp_accept_one"
* attempt even if there are no connections pending:
@@ -199,7 +261,7 @@ int rds_tcp_accept_one(struct rds_tcp_net *rtn)
* to and discarded by the sender.
* We must not throw those away!
*/
- rs_tcp = rds_tcp_accept_one_path(conn);
+ rs_tcp = rds_tcp_accept_one_path(conn, new_sock);
if (!rs_tcp) {
/* It's okay to stash "new_sock", since
* "rds_tcp_conn_slots_available" triggers
@@ -245,6 +307,22 @@ int rds_tcp_accept_one(struct rds_tcp_net *rtn)
rds_tcp_set_callbacks(new_sock, cp);
rds_connect_path_complete(cp, RDS_CONN_CONNECTING);
}
+
+ /* Since "rds_tcp_set_callbacks" happens this late
+ * the connection may already have been closed without
+ * "rds_tcp_state_change" doing its due diligence.
+ *
+ * If that's the case, we simply drop the path,
+ * knowing that "rds_tcp_conn_path_shutdown" will
+ * dequeue pending messages.
+ */
+ if (new_sock->sk->sk_state == TCP_CLOSE_WAIT ||
+ new_sock->sk->sk_state == TCP_LAST_ACK ||
+ new_sock->sk->sk_state == TCP_CLOSE)
+ rds_conn_path_drop(cp, 0);
+ else
+ queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 0);
+
new_sock = NULL;
ret = 0;
if (conn->c_npaths == 0)
diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c
index b7cf7f451430..49f96ee0c40f 100644
--- a/net/rds/tcp_recv.c
+++ b/net/rds/tcp_recv.c
@@ -278,6 +278,10 @@ static int rds_tcp_read_sock(struct rds_conn_path *cp, gfp_t gfp)
rdsdebug("tcp_read_sock for tc %p gfp 0x%x returned %d\n", tc, gfp,
desc.error);
+ if (skb_queue_empty_lockless(&sock->sk->sk_receive_queue) &&
+ wq_has_sleeper(&tc->t_recv_done_waitq))
+ wake_up(&tc->t_recv_done_waitq);
+
return desc.error;
}
diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c
index 4e82c9644aa6..7c52acc749cf 100644
--- a/net/rds/tcp_send.c
+++ b/net/rds/tcp_send.c
@@ -169,7 +169,7 @@ out:
* unacked byte of the TCP sequence space. We have to do very careful
* wrapping 32bit comparisons here.
*/
-static int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack)
+int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack)
{
if (!test_bit(RDS_MSG_HAS_ACK_SEQ, &rm->m_flags))
return 0;