summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--net/tipc/bcast.c57
-rw-r--r--net/tipc/bcast.h3
-rw-r--r--net/tipc/msg.h17
-rw-r--r--net/tipc/name_table.h2
-rw-r--r--net/tipc/node.c11
-rw-r--r--net/tipc/node.h7
-rw-r--r--net/tipc/socket.c72
-rw-r--r--net/tipc/socket.h4
8 files changed, 114 insertions, 59 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3eaa931e2e8c..81b1fef1f5e0 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -79,6 +79,13 @@ static void tipc_bclink_unlock(struct net *net)
tipc_link_reset_all(node);
}
+void tipc_bclink_input(struct net *net)
+{
+ struct tipc_net *tn = net_generic(net, tipc_net_id);
+
+ tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
+}
+
uint tipc_bclink_get_mtu(void)
{
return MAX_PKT_DEFAULT_MCAST;
@@ -356,7 +363,7 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
tipc_node_unlock(n_ptr);
}
-/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
+/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
* and to identified node local sockets
* @net: the applicable net namespace
* @list: chain of buffers containing message
@@ -371,6 +378,8 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
int rc = 0;
int bc = 0;
struct sk_buff *skb;
+ struct sk_buff_head arrvq;
+ struct sk_buff_head inputq;
/* Prepare clone of message for local node */
skb = tipc_msg_reassemble(list);
@@ -379,7 +388,7 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
return -EHOSTUNREACH;
}
- /* Broadcast to all other nodes */
+ /* Broadcast to all nodes */
if (likely(bclink)) {
tipc_bclink_lock(net);
if (likely(bclink->bcast_nodes.count)) {
@@ -399,12 +408,15 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
if (unlikely(!bc))
__skb_queue_purge(list);
- /* Deliver message clone */
- if (likely(!rc))
- tipc_sk_mcast_rcv(net, skb);
- else
+ if (unlikely(rc)) {
kfree_skb(skb);
-
+ return rc;
+ }
+ /* Deliver message clone */
+ __skb_queue_head_init(&arrvq);
+ skb_queue_head_init(&inputq);
+ __skb_queue_tail(&arrvq, skb);
+ tipc_sk_mcast_rcv(net, &arrvq, &inputq);
return rc;
}
@@ -449,7 +461,7 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
int deferred = 0;
int pos = 0;
struct sk_buff *iskb;
- struct sk_buff_head msgs;
+ struct sk_buff_head *arrvq, *inputq;
/* Screen out unwanted broadcast messages */
if (msg_mc_netid(msg) != tn->net_id)
@@ -486,6 +498,8 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
/* Handle in-sequence broadcast message */
seqno = msg_seqno(msg);
next_in = mod(node->bclink.last_in + 1);
+ arrvq = &tn->bclink->arrvq;
+ inputq = &tn->bclink->inputq;
if (likely(seqno == next_in)) {
receive:
@@ -493,21 +507,26 @@ receive:
if (likely(msg_isdata(msg))) {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
+ spin_lock_bh(&inputq->lock);
+ __skb_queue_tail(arrvq, buf);
+ spin_unlock_bh(&inputq->lock);
+ node->action_flags |= TIPC_BCAST_MSG_EVT;
tipc_bclink_unlock(net);
tipc_node_unlock(node);
- if (likely(msg_mcast(msg)))
- tipc_sk_mcast_rcv(net, buf);
- else
- kfree_skb(buf);
} else if (msg_user(msg) == MSG_BUNDLER) {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
bcl->stats.recv_bundles++;
bcl->stats.recv_bundled += msg_msgcnt(msg);
+ pos = 0;
+ while (tipc_msg_extract(buf, &iskb, &pos)) {
+ spin_lock_bh(&inputq->lock);
+ __skb_queue_tail(arrvq, iskb);
+ spin_unlock_bh(&inputq->lock);
+ }
+ node->action_flags |= TIPC_BCAST_MSG_EVT;
tipc_bclink_unlock(net);
tipc_node_unlock(node);
- while (tipc_msg_extract(buf, &iskb, &pos))
- tipc_sk_mcast_rcv(net, iskb);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
tipc_buf_append(&node->bclink.reasm_buf, &buf);
if (unlikely(!buf && !node->bclink.reasm_buf))
@@ -523,14 +542,6 @@ receive:
}
tipc_bclink_unlock(net);
tipc_node_unlock(node);
- } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
- tipc_bclink_lock(net);
- bclink_accept_pkt(node, seqno);
- tipc_bclink_unlock(net);
- tipc_node_unlock(node);
- skb_queue_head_init(&msgs);
- skb_queue_tail(&msgs, buf);
- tipc_named_rcv(net, &msgs);
} else {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
@@ -950,6 +961,8 @@ int tipc_bclink_init(struct net *net)
skb_queue_head_init(&bcl->wakeupq);
bcl->next_out_no = 1;
spin_lock_init(&bclink->node.lock);
+ __skb_queue_head_init(&bclink->arrvq);
+ skb_queue_head_init(&bclink->inputq);
bcl->owner = &bclink->node;
bcl->owner->net = net;
bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 8f4d4dc38e11..a910c0b9f249 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -97,6 +97,8 @@ struct tipc_bclink {
struct tipc_link link;
struct tipc_node node;
unsigned int flags;
+ struct sk_buff_head arrvq;
+ struct sk_buff_head inputq;
struct tipc_node_map bcast_nodes;
struct tipc_node *retransmit_to;
};
@@ -134,5 +136,6 @@ uint tipc_bclink_get_mtu(void);
int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list);
void tipc_bclink_wakeup_users(struct net *net);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
+void tipc_bclink_input(struct net *net);
#endif
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index ab467261bd9d..9ace47f44a69 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -767,6 +767,23 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode,
int *err);
struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);
+/* tipc_skb_peek(): peek and reserve first buffer in list
+ * @list: list to be peeked in
+ * Returns pointer to first buffer in list, if any
+ */
+static inline struct sk_buff *tipc_skb_peek(struct sk_buff_head *list,
+ spinlock_t *lock)
+{
+ struct sk_buff *skb;
+
+ spin_lock_bh(lock);
+ skb = skb_peek(list);
+ if (skb)
+ skb_get(skb);
+ spin_unlock_bh(lock);
+ return skb;
+}
+
/* tipc_skb_peek_port(): find a destination port, ignoring all destinations
* up to and including 'filter'.
* Note: ignoring previously tried destinations minimizes the risk of
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 52501fdaafa5..0304ddc6b101 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -1,7 +1,7 @@
/*
* net/tipc/name_table.h: Include file for TIPC name table code
*
- * Copyright (c) 2000-2006, 2014, Ericsson AB
+ * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
* Copyright (c) 2004-2005, 2010-2011, Wind River Systems
* All rights reserved.
*
diff --git a/net/tipc/node.c b/net/tipc/node.c
index c7fdf3dec92c..52308498f208 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -582,10 +582,10 @@ void tipc_node_unlock(struct tipc_node *node)
namedq = node->namedq;
publ_list = &node->publ_list;
- node->action_flags &= ~(TIPC_MSG_EVT | TIPC_NOTIFY_NODE_DOWN |
- TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_UP |
- TIPC_NOTIFY_LINK_DOWN |
- TIPC_WAKEUP_BCAST_USERS |
+ node->action_flags &= ~(TIPC_MSG_EVT |
+ TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
+ TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP |
+ TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT |
TIPC_NAMED_MSG_EVT);
spin_unlock_bh(&node->lock);
@@ -612,6 +612,9 @@ void tipc_node_unlock(struct tipc_node *node)
if (flags & TIPC_NAMED_MSG_EVT)
tipc_named_rcv(net, namedq);
+
+ if (flags & TIPC_BCAST_MSG_EVT)
+ tipc_bclink_input(net);
}
/* Caller should hold node lock for the passed node */
diff --git a/net/tipc/node.h b/net/tipc/node.h
index c2b0fcf4042b..20ec13f9bede 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -1,7 +1,7 @@
/*
* net/tipc/node.h: Include file for TIPC node management routines
*
- * Copyright (c) 2000-2006, 2014, Ericsson AB
+ * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
* Copyright (c) 2005, 2010-2014, Wind River Systems
* All rights reserved.
*
@@ -63,7 +63,8 @@ enum {
TIPC_WAKEUP_BCAST_USERS = (1 << 5),
TIPC_NOTIFY_LINK_UP = (1 << 6),
TIPC_NOTIFY_LINK_DOWN = (1 << 7),
- TIPC_NAMED_MSG_EVT = (1 << 8)
+ TIPC_NAMED_MSG_EVT = (1 << 8),
+ TIPC_BCAST_MSG_EVT = (1 << 9)
};
/**
@@ -74,6 +75,7 @@ enum {
* @oos_state: state tracker for handling OOS b'cast messages
* @deferred_queue: deferred queue saved OOS b'cast message received from node
* @reasm_buf: broadcast reassembly queue head from node
+ * @inputq_map: bitmap indicating which inqueues should be kicked
* @recv_permitted: true if node is allowed to receive b'cast messages
*/
struct tipc_node_bclink {
@@ -84,6 +86,7 @@ struct tipc_node_bclink {
u32 deferred_size;
struct sk_buff_head deferred_queue;
struct sk_buff *reasm_buf;
+ int inputq_map;
bool recv_permitted;
};
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 26aec8414ac1..66666805b53c 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -776,44 +776,60 @@ new_mtu:
return rc;
}
-/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
+/**
+ * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
+ * @arrvq: queue with arriving messages, to be cloned after destination lookup
+ * @inputq: queue with cloned messages, delivered to socket after dest lookup
+ *
+ * Multi-threaded: parallel calls with reference to same queues may occur
*/
-void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *skb)
+void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
+ struct sk_buff_head *inputq)
{
- struct tipc_msg *msg = buf_msg(skb);
+ struct tipc_msg *msg;
struct tipc_plist dports;
- struct sk_buff *cskb;
u32 portid;
u32 scope = TIPC_CLUSTER_SCOPE;
- struct sk_buff_head msgq;
- uint hsz = skb_headroom(skb) + msg_hdr_sz(msg);
+ struct sk_buff_head tmpq;
+ uint hsz;
+ struct sk_buff *skb, *_skb;
- skb_queue_head_init(&msgq);
+ __skb_queue_head_init(&tmpq);
tipc_plist_init(&dports);
- if (in_own_node(net, msg_orignode(msg)))
- scope = TIPC_NODE_SCOPE;
-
- if (unlikely(!msg_mcast(msg))) {
- pr_warn("Received non-multicast msg in multicast\n");
- goto exit;
- }
- /* Create destination port list: */
- tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg),
- msg_nameupper(msg), scope, &dports);
- portid = tipc_plist_pop(&dports);
- for (; portid; portid = tipc_plist_pop(&dports)) {
- cskb = __pskb_copy(skb, hsz, GFP_ATOMIC);
- if (!cskb) {
- pr_warn("Failed do clone mcast rcv buffer\n");
- continue;
+ skb = tipc_skb_peek(arrvq, &inputq->lock);
+ for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
+ msg = buf_msg(skb);
+ hsz = skb_headroom(skb) + msg_hdr_sz(msg);
+
+ if (in_own_node(net, msg_orignode(msg)))
+ scope = TIPC_NODE_SCOPE;
+
+ /* Create destination port list and message clones: */
+ tipc_nametbl_mc_translate(net,
+ msg_nametype(msg), msg_namelower(msg),
+ msg_nameupper(msg), scope, &dports);
+ portid = tipc_plist_pop(&dports);
+ for (; portid; portid = tipc_plist_pop(&dports)) {
+ _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
+ if (_skb) {
+ msg_set_destport(buf_msg(_skb), portid);
+ __skb_queue_tail(&tmpq, _skb);
+ continue;
+ }
+ pr_warn("Failed to clone mcast rcv buffer\n");
}
- msg_set_destport(buf_msg(cskb), portid);
- skb_queue_tail(&msgq, cskb);
+ /* Append to inputq if not already done by other thread */
+ spin_lock_bh(&inputq->lock);
+ if (skb_peek(arrvq) == skb) {
+ skb_queue_splice_tail_init(&tmpq, inputq);
+ kfree_skb(__skb_dequeue(arrvq));
+ }
+ spin_unlock_bh(&inputq->lock);
+ __skb_queue_purge(&tmpq);
+ kfree_skb(skb);
}
- tipc_sk_rcv(net, &msgq);
-exit:
- kfree_skb(skb);
+ tipc_sk_rcv(net, inputq);
}
/**
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 95b015909ac1..8be0da7df8fc 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -42,7 +42,6 @@
#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2)
#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \
SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
-
int tipc_socket_init(void);
void tipc_socket_stop(void);
int tipc_sock_create_local(struct net *net, int type, struct socket **res);
@@ -51,7 +50,8 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
int flags);
int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
struct sk_buff *tipc_sk_socks_show(struct net *net);
-void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf);
+void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
+ struct sk_buff_head *inputq);
void tipc_sk_reinit(struct net *net);
int tipc_sk_rht_init(struct net *net);
void tipc_sk_rht_destroy(struct net *net);