summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtsock.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtsock.c')
-rw-r--r--net/sunrpc/xprtsock.c313
1 files changed, 99 insertions, 214 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 3b305ab17afe..66891e32c5e3 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -63,6 +63,8 @@ static unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;
static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+
#define XS_TCP_LINGER_TO (15U * HZ)
static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
@@ -75,8 +77,6 @@ static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
* someone else's file names!
*/
-#ifdef RPC_DEBUG
-
static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
@@ -186,7 +186,7 @@ static struct ctl_table sunrpc_table[] = {
*/
#define XS_IDLE_DISC_TO (5U * 60 * HZ)
-#ifdef RPC_DEBUG
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
# undef RPC_DEBUG_DATA
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif
@@ -216,65 +216,6 @@ static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
}
#endif
-struct sock_xprt {
- struct rpc_xprt xprt;
-
- /*
- * Network layer
- */
- struct socket * sock;
- struct sock * inet;
-
- /*
- * State of TCP reply receive
- */
- __be32 tcp_fraghdr,
- tcp_xid,
- tcp_calldir;
-
- u32 tcp_offset,
- tcp_reclen;
-
- unsigned long tcp_copied,
- tcp_flags;
-
- /*
- * Connection of transports
- */
- struct delayed_work connect_worker;
- struct sockaddr_storage srcaddr;
- unsigned short srcport;
-
- /*
- * UDP socket buffer size parameters
- */
- size_t rcvsize,
- sndsize;
-
- /*
- * Saved socket callback addresses
- */
- void (*old_data_ready)(struct sock *);
- void (*old_state_change)(struct sock *);
- void (*old_write_space)(struct sock *);
- void (*old_error_report)(struct sock *);
-};
-
-/*
- * TCP receive state flags
- */
-#define TCP_RCV_LAST_FRAG (1UL << 0)
-#define TCP_RCV_COPY_FRAGHDR (1UL << 1)
-#define TCP_RCV_COPY_XID (1UL << 2)
-#define TCP_RCV_COPY_DATA (1UL << 3)
-#define TCP_RCV_READ_CALLDIR (1UL << 4)
-#define TCP_RCV_COPY_CALLDIR (1UL << 5)
-
-/*
- * TCP RPC flags
- */
-#define TCP_RPC_REPLY (1UL << 6)
-
static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
{
return (struct rpc_xprt *) sk->sk_user_data;
@@ -686,7 +627,7 @@ process_status:
* @xprt: transport
*
* Initiates a graceful shutdown of the TCP socket by calling the
- * equivalent of shutdown(SHUT_WR);
+ * equivalent of shutdown(SHUT_RDWR);
*/
static void xs_tcp_shutdown(struct rpc_xprt *xprt)
{
@@ -694,7 +635,7 @@ static void xs_tcp_shutdown(struct rpc_xprt *xprt)
struct socket *sock = transport->sock;
if (sock != NULL) {
- kernel_sock_shutdown(sock, SHUT_WR);
+ kernel_sock_shutdown(sock, SHUT_RDWR);
trace_rpc_socket_shutdown(xprt, sock);
}
}
@@ -777,9 +718,9 @@ static int xs_tcp_send_request(struct rpc_task *task)
dprintk("RPC: sendmsg returned unrecognized error %d\n",
-status);
case -ECONNRESET:
- xs_tcp_shutdown(xprt);
case -ECONNREFUSED:
case -ENOTCONN:
+ case -EADDRINUSE:
case -EPIPE:
clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
}
@@ -832,6 +773,21 @@ static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *s
sk->sk_error_report = transport->old_error_report;
}
+static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
+{
+ smp_mb__before_atomic();
+ clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
+ clear_bit(XPRT_CLOSING, &xprt->state);
+ smp_mb__after_atomic();
+}
+
+static void xs_sock_mark_closed(struct rpc_xprt *xprt)
+{
+ xs_sock_reset_connection_flags(xprt);
+ /* Mark transport as closed and wake up all pending tasks */
+ xprt_disconnect_done(xprt);
+}
+
/**
* xs_error_report - callback to handle TCP socket state errors
* @sk: socket
@@ -851,11 +807,12 @@ static void xs_error_report(struct sock *sk)
err = -sk->sk_err;
if (err == 0)
goto out;
+ /* Is this a reset event? */
+ if (sk->sk_state == TCP_CLOSE)
+ xs_sock_mark_closed(xprt);
dprintk("RPC: xs_error_report client %p, error=%d...\n",
xprt, -err);
trace_rpc_socket_error(xprt, sk->sk_socket, err);
- if (test_bit(XPRT_CONNECTION_REUSE, &xprt->state))
- goto out;
xprt_wake_pending_tasks(xprt, err);
out:
read_unlock_bh(&sk->sk_callback_lock);
@@ -865,12 +822,11 @@ static void xs_reset_transport(struct sock_xprt *transport)
{
struct socket *sock = transport->sock;
struct sock *sk = transport->inet;
+ struct rpc_xprt *xprt = &transport->xprt;
if (sk == NULL)
return;
- transport->srcport = 0;
-
write_lock_bh(&sk->sk_callback_lock);
transport->inet = NULL;
transport->sock = NULL;
@@ -879,8 +835,9 @@ static void xs_reset_transport(struct sock_xprt *transport)
xs_restore_old_callbacks(transport, sk);
write_unlock_bh(&sk->sk_callback_lock);
+ xs_sock_reset_connection_flags(xprt);
- trace_rpc_socket_close(&transport->xprt, sock);
+ trace_rpc_socket_close(xprt, sock);
sock_release(sock);
}
@@ -900,27 +857,12 @@ static void xs_close(struct rpc_xprt *xprt)
dprintk("RPC: xs_close xprt %p\n", xprt);
- cancel_delayed_work_sync(&transport->connect_worker);
-
xs_reset_transport(transport);
xprt->reestablish_timeout = 0;
- smp_mb__before_atomic();
- clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
- clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
- clear_bit(XPRT_CLOSING, &xprt->state);
- smp_mb__after_atomic();
xprt_disconnect_done(xprt);
}
-static void xs_tcp_close(struct rpc_xprt *xprt)
-{
- if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
- xs_close(xprt);
- else
- xs_tcp_shutdown(xprt);
-}
-
static void xs_xprt_free(struct rpc_xprt *xprt)
{
xs_free_peer_addresses(xprt);
@@ -1091,7 +1033,6 @@ static void xs_udp_data_ready(struct sock *sk)
*/
static void xs_tcp_force_close(struct rpc_xprt *xprt)
{
- set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
xprt_force_disconnect(xprt);
}
@@ -1415,6 +1356,7 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
dprintk("RPC: xs_tcp_data_recv started\n");
do {
+ trace_xs_tcp_data_recv(transport);
/* Read in a new fragment marker if necessary */
/* Can we ever really expect to get completely empty fragments? */
if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
@@ -1439,6 +1381,7 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
/* Skip over any trailing bytes on short reads */
xs_tcp_read_discard(transport, &desc);
} while (desc.count);
+ trace_xs_tcp_data_recv(transport);
dprintk("RPC: xs_tcp_data_recv done\n");
return len - desc.count;
}
@@ -1454,12 +1397,15 @@ static void xs_tcp_data_ready(struct sock *sk)
struct rpc_xprt *xprt;
read_descriptor_t rd_desc;
int read;
+ unsigned long total = 0;
dprintk("RPC: xs_tcp_data_ready...\n");
read_lock_bh(&sk->sk_callback_lock);
- if (!(xprt = xprt_from_sock(sk)))
+ if (!(xprt = xprt_from_sock(sk))) {
+ read = 0;
goto out;
+ }
/* Any data means we had a useful conversation, so
* the we don't need to delay the next reconnect
*/
@@ -1471,59 +1417,14 @@ static void xs_tcp_data_ready(struct sock *sk)
do {
rd_desc.count = 65536;
read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
+ if (read > 0)
+ total += read;
} while (read > 0);
out:
+ trace_xs_tcp_data_ready(xprt, read, total);
read_unlock_bh(&sk->sk_callback_lock);
}
-/*
- * Do the equivalent of linger/linger2 handling for dealing with
- * broken servers that don't close the socket in a timely
- * fashion
- */
-static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
- unsigned long timeout)
-{
- struct sock_xprt *transport;
-
- if (xprt_test_and_set_connecting(xprt))
- return;
- set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
- transport = container_of(xprt, struct sock_xprt, xprt);
- queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
- timeout);
-}
-
-static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
-{
- struct sock_xprt *transport;
-
- transport = container_of(xprt, struct sock_xprt, xprt);
-
- if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
- !cancel_delayed_work(&transport->connect_worker))
- return;
- clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
- xprt_clear_connecting(xprt);
-}
-
-static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
-{
- smp_mb__before_atomic();
- clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
- clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
- clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
- clear_bit(XPRT_CLOSING, &xprt->state);
- smp_mb__after_atomic();
-}
-
-static void xs_sock_mark_closed(struct rpc_xprt *xprt)
-{
- xs_sock_reset_connection_flags(xprt);
- /* Mark transport as closed and wake up all pending tasks */
- xprt_disconnect_done(xprt);
-}
-
/**
* xs_tcp_state_change - callback to handle TCP socket state changes
* @sk: socket whose state has changed
@@ -1572,7 +1473,6 @@ static void xs_tcp_state_change(struct sock *sk)
clear_bit(XPRT_CONNECTED, &xprt->state);
clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
smp_mb__after_atomic();
- xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
break;
case TCP_CLOSE_WAIT:
/* The server initiated a shutdown of the socket */
@@ -1589,13 +1489,11 @@ static void xs_tcp_state_change(struct sock *sk)
break;
case TCP_LAST_ACK:
set_bit(XPRT_CLOSING, &xprt->state);
- xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
smp_mb__before_atomic();
clear_bit(XPRT_CONNECTED, &xprt->state);
smp_mb__after_atomic();
break;
case TCP_CLOSE:
- xs_tcp_cancel_linger_timeout(xprt);
xs_sock_mark_closed(xprt);
}
out:
@@ -1718,6 +1616,40 @@ static unsigned short xs_get_random_port(void)
}
/**
+ * xs_set_reuseaddr_port - set the socket's port and address reuse options
+ * @sock: socket
+ *
+ * Note that this function has to be called on all sockets that share the
+ * same port, and it must be called before binding.
+ */
+static void xs_sock_set_reuseport(struct socket *sock)
+{
+ int opt = 1;
+
+ kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEPORT,
+ (char *)&opt, sizeof(opt));
+}
+
+static unsigned short xs_sock_getport(struct socket *sock)
+{
+ struct sockaddr_storage buf;
+ int buflen;
+ unsigned short port = 0;
+
+ if (kernel_getsockname(sock, (struct sockaddr *)&buf, &buflen) < 0)
+ goto out;
+ switch (buf.ss_family) {
+ case AF_INET6:
+ port = ntohs(((struct sockaddr_in6 *)&buf)->sin6_port);
+ break;
+ case AF_INET:
+ port = ntohs(((struct sockaddr_in *)&buf)->sin_port);
+ }
+out:
+ return port;
+}
+
+/**
* xs_set_port - reset the port number in the remote endpoint address
* @xprt: generic transport
* @port: new port number
@@ -1731,6 +1663,12 @@ static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
xs_update_peer_port(xprt);
}
+static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock)
+{
+ if (transport->srcport == 0)
+ transport->srcport = xs_sock_getport(sock);
+}
+
static unsigned short xs_get_srcport(struct sock_xprt *transport)
{
unsigned short port = transport->srcport;
@@ -1884,7 +1822,8 @@ static void xs_dummy_setup_socket(struct work_struct *work)
}
static struct socket *xs_create_sock(struct rpc_xprt *xprt,
- struct sock_xprt *transport, int family, int type, int protocol)
+ struct sock_xprt *transport, int family, int type,
+ int protocol, bool reuseport)
{
struct socket *sock;
int err;
@@ -1897,6 +1836,9 @@ static struct socket *xs_create_sock(struct rpc_xprt *xprt,
}
xs_reclassify_socket(family, sock);
+ if (reuseport)
+ xs_sock_set_reuseport(sock);
+
err = xs_bind(transport, sock);
if (err) {
sock_release(sock);
@@ -1954,7 +1896,6 @@ static int xs_local_setup_socket(struct sock_xprt *transport)
struct socket *sock;
int status = -EIO;
- clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
status = __sock_create(xprt->xprt_net, AF_LOCAL,
SOCK_STREAM, 0, &sock, 1);
if (status < 0) {
@@ -2095,10 +2036,9 @@ static void xs_udp_setup_socket(struct work_struct *work)
struct socket *sock = transport->sock;
int status = -EIO;
- /* Start by resetting any existing state */
- xs_reset_transport(transport);
sock = xs_create_sock(xprt, transport,
- xs_addr(xprt)->sa_family, SOCK_DGRAM, IPPROTO_UDP);
+ xs_addr(xprt)->sa_family, SOCK_DGRAM,
+ IPPROTO_UDP, false);
if (IS_ERR(sock))
goto out;
@@ -2112,61 +2052,11 @@ static void xs_udp_setup_socket(struct work_struct *work)
trace_rpc_socket_connect(xprt, sock, 0);
status = 0;
out:
+ xprt_unlock_connect(xprt, transport);
xprt_clear_connecting(xprt);
xprt_wake_pending_tasks(xprt, status);
}
-/*
- * We need to preserve the port number so the reply cache on the server can
- * find our cached RPC replies when we get around to reconnecting.
- */
-static void xs_abort_connection(struct sock_xprt *transport)
-{
- int result;
- struct sockaddr any;
-
- dprintk("RPC: disconnecting xprt %p to reuse port\n", transport);
-
- /*
- * Disconnect the transport socket by doing a connect operation
- * with AF_UNSPEC. This should return immediately...
- */
- memset(&any, 0, sizeof(any));
- any.sa_family = AF_UNSPEC;
- result = kernel_connect(transport->sock, &any, sizeof(any), 0);
- trace_rpc_socket_reset_connection(&transport->xprt,
- transport->sock, result);
- if (!result)
- xs_sock_reset_connection_flags(&transport->xprt);
- dprintk("RPC: AF_UNSPEC connect return code %d\n", result);
-}
-
-static void xs_tcp_reuse_connection(struct sock_xprt *transport)
-{
- unsigned int state = transport->inet->sk_state;
-
- if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED) {
- /* we don't need to abort the connection if the socket
- * hasn't undergone a shutdown
- */
- if (transport->inet->sk_shutdown == 0)
- return;
- dprintk("RPC: %s: TCP_CLOSEd and sk_shutdown set to %d\n",
- __func__, transport->inet->sk_shutdown);
- }
- if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT)) {
- /* we don't need to abort the connection if the socket
- * hasn't undergone a shutdown
- */
- if (transport->inet->sk_shutdown == 0)
- return;
- dprintk("RPC: %s: ESTABLISHED/SYN_SENT "
- "sk_shutdown set to %d\n",
- __func__, transport->inet->sk_shutdown);
- }
- xs_abort_connection(transport);
-}
-
static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
{
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
@@ -2200,9 +2090,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
sk->sk_allocation = GFP_ATOMIC;
/* socket options */
- sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
sock_reset_flag(sk, SOCK_LINGER);
- tcp_sk(sk)->linger2 = 0;
tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
xprt_clear_connected(xprt);
@@ -2225,6 +2113,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
switch (ret) {
case 0:
+ xs_set_srcport(transport, sock);
case -EINPROGRESS:
/* SYN_SENT! */
if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
@@ -2251,25 +2140,13 @@ static void xs_tcp_setup_socket(struct work_struct *work)
int status = -EIO;
if (!sock) {
- clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
sock = xs_create_sock(xprt, transport,
- xs_addr(xprt)->sa_family, SOCK_STREAM, IPPROTO_TCP);
+ xs_addr(xprt)->sa_family, SOCK_STREAM,
+ IPPROTO_TCP, true);
if (IS_ERR(sock)) {
status = PTR_ERR(sock);
goto out;
}
- } else {
- int abort_and_exit;
-
- abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
- &xprt->state);
- /* "close" the socket, preserving the local port */
- set_bit(XPRT_CONNECTION_REUSE, &xprt->state);
- xs_tcp_reuse_connection(transport);
- clear_bit(XPRT_CONNECTION_REUSE, &xprt->state);
-
- if (abort_and_exit)
- goto out_eagain;
}
dprintk("RPC: worker connecting xprt %p via %s to "
@@ -2296,6 +2173,7 @@ static void xs_tcp_setup_socket(struct work_struct *work)
case 0:
case -EINPROGRESS:
case -EALREADY:
+ xprt_unlock_connect(xprt, transport);
xprt_clear_connecting(xprt);
return;
case -EINVAL:
@@ -2305,13 +2183,15 @@ static void xs_tcp_setup_socket(struct work_struct *work)
case -ECONNREFUSED:
case -ECONNRESET:
case -ENETUNREACH:
+ case -EADDRINUSE:
case -ENOBUFS:
/* retry with existing socket, after a delay */
+ xs_tcp_force_close(xprt);
goto out;
}
-out_eagain:
status = -EAGAIN;
out:
+ xprt_unlock_connect(xprt, transport);
xprt_clear_connecting(xprt);
xprt_wake_pending_tasks(xprt, status);
}
@@ -2334,6 +2214,11 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
{
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+ WARN_ON_ONCE(!xprt_lock_connect(xprt, task, transport));
+
+ /* Start by resetting any existing state */
+ xs_reset_transport(transport);
+
if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) {
dprintk("RPC: xs_connect delayed xprt %p for %lu "
"seconds\n",
@@ -2610,7 +2495,7 @@ static struct rpc_xprt_ops xs_tcp_ops = {
.buf_free = rpc_free,
.send_request = xs_tcp_send_request,
.set_retrans_timeout = xprt_set_retrans_timeout_def,
- .close = xs_tcp_close,
+ .close = xs_tcp_shutdown,
.destroy = xs_destroy,
.print_stats = xs_tcp_print_stats,
};
@@ -3042,7 +2927,7 @@ static struct xprt_class xs_bc_tcp_transport = {
*/
int init_socket_xprt(void)
{
-#ifdef RPC_DEBUG
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
if (!sunrpc_table_header)
sunrpc_table_header = register_sysctl_table(sunrpc_table);
#endif
@@ -3061,7 +2946,7 @@ int init_socket_xprt(void)
*/
void cleanup_socket_xprt(void)
{
-#ifdef RPC_DEBUG
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
if (sunrpc_table_header) {
unregister_sysctl_table(sunrpc_table_header);
sunrpc_table_header = NULL;