summaryrefslogtreecommitdiff
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c204
-rw-r--r--net/tipc/bcast.h33
-rw-r--r--net/tipc/bearer.c15
-rw-r--r--net/tipc/bearer.h8
-rw-r--r--net/tipc/link.c87
-rw-r--r--net/tipc/msg.c17
-rw-r--r--net/tipc/msg.h11
-rw-r--r--net/tipc/name_table.c128
-rw-r--r--net/tipc/name_table.h24
-rw-r--r--net/tipc/node.c42
-rw-r--r--net/tipc/node.h4
-rw-r--r--net/tipc/socket.c495
-rw-r--r--net/tipc/udp_media.c8
13 files changed, 672 insertions, 404 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index aa1babbea385..7d99029df342 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -1,7 +1,7 @@
/*
* net/tipc/bcast.c: TIPC broadcast code
*
- * Copyright (c) 2004-2006, 2014-2015, Ericsson AB
+ * Copyright (c) 2004-2006, 2014-2016, Ericsson AB
* Copyright (c) 2004, Intel Corporation.
* Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
@@ -39,9 +39,8 @@
#include "socket.h"
#include "msg.h"
#include "bcast.h"
-#include "name_distr.h"
#include "link.h"
-#include "node.h"
+#include "name_table.h"
#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */
#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */
@@ -54,12 +53,20 @@ const char tipc_bclink_name[] = "broadcast-link";
* @inputq: data input queue; will only carry SOCK_WAKEUP messages
* @dest: array keeping number of reachable destinations per bearer
* @primary_bearer: a bearer having links to all broadcast destinations, if any
+ * @bcast_support: indicates if primary bearer, if any, supports broadcast
+ * @rcast_support: indicates if all peer nodes support replicast
+ * @rc_ratio: dest count as percentage of cluster size where send method changes
+ * @bc_threshold: calculated drom rc_ratio; if dests > threshold use broadcast
*/
struct tipc_bc_base {
struct tipc_link *link;
struct sk_buff_head inputq;
int dests[MAX_BEARERS];
int primary_bearer;
+ bool bcast_support;
+ bool rcast_support;
+ int rc_ratio;
+ int bc_threshold;
};
static struct tipc_bc_base *tipc_bc_base(struct net *net)
@@ -69,7 +76,20 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
int tipc_bcast_get_mtu(struct net *net)
{
- return tipc_link_mtu(tipc_bc_sndlink(net));
+ return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE;
+}
+
+void tipc_bcast_disable_rcast(struct net *net)
+{
+ tipc_bc_base(net)->rcast_support = false;
+}
+
+static void tipc_bcbase_calc_bc_threshold(struct net *net)
+{
+ struct tipc_bc_base *bb = tipc_bc_base(net);
+ int cluster_size = tipc_link_bc_peers(tipc_bc_sndlink(net));
+
+ bb->bc_threshold = 1 + (cluster_size * bb->rc_ratio / 100);
}
/* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
@@ -79,9 +99,10 @@ static void tipc_bcbase_select_primary(struct net *net)
{
struct tipc_bc_base *bb = tipc_bc_base(net);
int all_dests = tipc_link_bc_peers(bb->link);
- int i, mtu;
+ int i, mtu, prim;
bb->primary_bearer = INVALID_BEARER_ID;
+ bb->bcast_support = true;
if (!all_dests)
return;
@@ -93,7 +114,7 @@ static void tipc_bcbase_select_primary(struct net *net)
mtu = tipc_bearer_mtu(net, i);
if (mtu < tipc_link_mtu(bb->link))
tipc_link_set_mtu(bb->link, mtu);
-
+ bb->bcast_support &= tipc_bearer_bcast_support(net, i);
if (bb->dests[i] < all_dests)
continue;
@@ -103,6 +124,9 @@ static void tipc_bcbase_select_primary(struct net *net)
if ((i ^ tipc_own_addr(net)) & 1)
break;
}
+ prim = bb->primary_bearer;
+ if (prim != INVALID_BEARER_ID)
+ bb->bcast_support = tipc_bearer_bcast_support(net, prim);
}
void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id)
@@ -170,45 +194,131 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
__skb_queue_purge(&_xmitq);
}
-/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
- * and to identified node local sockets
+static void tipc_bcast_select_xmit_method(struct net *net, int dests,
+ struct tipc_mc_method *method)
+{
+ struct tipc_bc_base *bb = tipc_bc_base(net);
+ unsigned long exp = method->expires;
+
+ /* Broadcast supported by used bearer/bearers? */
+ if (!bb->bcast_support) {
+ method->rcast = true;
+ return;
+ }
+ /* Any destinations which don't support replicast ? */
+ if (!bb->rcast_support) {
+ method->rcast = false;
+ return;
+ }
+ /* Can current method be changed ? */
+ method->expires = jiffies + TIPC_METHOD_EXPIRE;
+ if (method->mandatory || time_before(jiffies, exp))
+ return;
+
+ /* Determine method to use now */
+ method->rcast = dests <= bb->bc_threshold;
+}
+
+/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
* @net: the applicable net namespace
- * @list: chain of buffers containing message
- * Consumes the buffer chain, except when returning -ELINKCONG
- * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
+ * @pkts: chain of buffers containing message
+ * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0
+ * Consumes the buffer chain.
+ * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
*/
-int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
+static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
+ u16 *cong_link_cnt)
{
struct tipc_link *l = tipc_bc_sndlink(net);
- struct sk_buff_head xmitq, inputq, rcvq;
+ struct sk_buff_head xmitq;
int rc = 0;
- __skb_queue_head_init(&rcvq);
__skb_queue_head_init(&xmitq);
- skb_queue_head_init(&inputq);
-
- /* Prepare message clone for local node */
- if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
- return -EHOSTUNREACH;
-
tipc_bcast_lock(net);
if (tipc_link_bc_peers(l))
- rc = tipc_link_xmit(l, list, &xmitq);
+ rc = tipc_link_xmit(l, pkts, &xmitq);
tipc_bcast_unlock(net);
-
- /* Don't send to local node if adding to link failed */
- if (unlikely(rc)) {
- __skb_queue_purge(&rcvq);
- return rc;
+ tipc_bcbase_xmit(net, &xmitq);
+ __skb_queue_purge(pkts);
+ if (rc == -ELINKCONG) {
+ *cong_link_cnt = 1;
+ rc = 0;
}
+ return rc;
+}
- /* Broadcast to all nodes, inluding local node */
- tipc_bcbase_xmit(net, &xmitq);
- tipc_sk_mcast_rcv(net, &rcvq, &inputq);
- __skb_queue_purge(list);
+/* tipc_rcast_xmit - replicate and send a message to given destination nodes
+ * @net: the applicable net namespace
+ * @pkts: chain of buffers containing message
+ * @dests: list of destination nodes
+ * @cong_link_cnt: returns number of congested links
+ * @cong_links: returns identities of congested links
+ * Returns 0 if success, otherwise errno
+ */
+static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
+ struct tipc_nlist *dests, u16 *cong_link_cnt)
+{
+ struct sk_buff_head _pkts;
+ struct u32_item *n, *tmp;
+ u32 dst, selector;
+
+ selector = msg_link_selector(buf_msg(skb_peek(pkts)));
+ __skb_queue_head_init(&_pkts);
+
+ list_for_each_entry_safe(n, tmp, &dests->list, list) {
+ dst = n->value;
+ if (!tipc_msg_pskb_copy(dst, pkts, &_pkts))
+ return -ENOMEM;
+
+ /* Any other return value than -ELINKCONG is ignored */
+ if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG)
+ (*cong_link_cnt)++;
+ }
return 0;
}
+/* tipc_mcast_xmit - deliver message to indicated destination nodes
+ * and to identified node local sockets
+ * @net: the applicable net namespace
+ * @pkts: chain of buffers containing message
+ * @method: send method to be used
+ * @dests: destination nodes for message.
+ * @cong_link_cnt: returns number of encountered congested destination links
+ * Consumes buffer chain.
+ * Returns 0 if success, otherwise errno
+ */
+int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
+ struct tipc_mc_method *method, struct tipc_nlist *dests,
+ u16 *cong_link_cnt)
+{
+ struct sk_buff_head inputq, localq;
+ int rc = 0;
+
+ skb_queue_head_init(&inputq);
+ skb_queue_head_init(&localq);
+
+ /* Clone packets before they are consumed by next call */
+ if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
+ rc = -ENOMEM;
+ goto exit;
+ }
+ /* Send according to determined transmit method */
+ if (dests->remote) {
+ tipc_bcast_select_xmit_method(net, dests->remote, method);
+ if (method->rcast)
+ rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
+ else
+ rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
+ }
+
+ if (dests->local)
+ tipc_sk_mcast_rcv(net, &localq, &inputq);
+exit:
+ /* This queue should normally be empty by now */
+ __skb_queue_purge(pkts);
+ return rc;
+}
+
/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
*
* RCU is locked, no other locks set
@@ -313,6 +423,7 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,
tipc_bcast_lock(net);
tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
tipc_bcbase_select_primary(net);
+ tipc_bcbase_calc_bc_threshold(net);
tipc_bcast_unlock(net);
}
@@ -331,6 +442,7 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
tipc_bcast_lock(net);
tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
tipc_bcbase_select_primary(net);
+ tipc_bcbase_calc_bc_threshold(net);
tipc_bcast_unlock(net);
tipc_bcbase_xmit(net, &xmitq);
@@ -413,6 +525,8 @@ int tipc_bcast_init(struct net *net)
goto enomem;
bb->link = l;
tn->bcl = l;
+ bb->rc_ratio = 25;
+ bb->rcast_support = true;
return 0;
enomem:
kfree(bb);
@@ -428,3 +542,33 @@ void tipc_bcast_stop(struct net *net)
kfree(tn->bcbase);
kfree(tn->bcl);
}
+
+void tipc_nlist_init(struct tipc_nlist *nl, u32 self)
+{
+ memset(nl, 0, sizeof(*nl));
+ INIT_LIST_HEAD(&nl->list);
+ nl->self = self;
+}
+
+void tipc_nlist_add(struct tipc_nlist *nl, u32 node)
+{
+ if (node == nl->self)
+ nl->local = true;
+ else if (u32_push(&nl->list, node))
+ nl->remote++;
+}
+
+void tipc_nlist_del(struct tipc_nlist *nl, u32 node)
+{
+ if (node == nl->self)
+ nl->local = false;
+ else if (u32_del(&nl->list, node))
+ nl->remote--;
+}
+
+void tipc_nlist_purge(struct tipc_nlist *nl)
+{
+ u32_list_purge(&nl->list);
+ nl->remote = 0;
+ nl->local = 0;
+}
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 855d53c64ab3..751530ab0c49 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -42,9 +42,35 @@
struct tipc_node;
struct tipc_msg;
struct tipc_nl_msg;
-struct tipc_node_map;
+struct tipc_nlist;
+struct tipc_nitem;
extern const char tipc_bclink_name[];
+#define TIPC_METHOD_EXPIRE msecs_to_jiffies(5000)
+
+struct tipc_nlist {
+ struct list_head list;
+ u32 self;
+ u16 remote;
+ bool local;
+};
+
+void tipc_nlist_init(struct tipc_nlist *nl, u32 self);
+void tipc_nlist_purge(struct tipc_nlist *nl);
+void tipc_nlist_add(struct tipc_nlist *nl, u32 node);
+void tipc_nlist_del(struct tipc_nlist *nl, u32 node);
+
+/* Cookie to be used between socket and broadcast layer
+ * @rcast: replicast (instead of broadcast) was used at previous xmit
+ * @mandatory: broadcast/replicast indication was set by user
+ * @expires: re-evaluate non-mandatory transmit method if we are past this
+ */
+struct tipc_mc_method {
+ bool rcast;
+ bool mandatory;
+ unsigned long expires;
+};
+
int tipc_bcast_init(struct net *net);
void tipc_bcast_stop(struct net *net);
void tipc_bcast_add_peer(struct net *net, struct tipc_link *l,
@@ -53,7 +79,10 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
int tipc_bcast_get_mtu(struct net *net);
-int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
+void tipc_bcast_disable_rcast(struct net *net);
+int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
+ struct tipc_mc_method *method, struct tipc_nlist *dests,
+ u16 *cong_link_cnt);
int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
struct tipc_msg *hdr);
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 52d74760fb68..33a5bdfbef76 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -431,7 +431,7 @@ int tipc_enable_l2_media(struct net *net, struct tipc_bearer *b,
memset(&b->bcast_addr, 0, sizeof(b->bcast_addr));
memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len);
b->bcast_addr.media_id = b->media->type_id;
- b->bcast_addr.broadcast = 1;
+ b->bcast_addr.broadcast = TIPC_BROADCAST_SUPPORT;
b->mtu = dev->mtu;
b->media->raw2addr(b, &b->addr, (char *)dev->dev_addr);
rcu_assign_pointer(dev->tipc_ptr, b);
@@ -482,6 +482,19 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,
return 0;
}
+bool tipc_bearer_bcast_support(struct net *net, u32 bearer_id)
+{
+ bool supp = false;
+ struct tipc_bearer *b;
+
+ rcu_read_lock();
+ b = bearer_get(net, bearer_id);
+ if (b)
+ supp = (b->bcast_addr.broadcast == TIPC_BROADCAST_SUPPORT);
+ rcu_read_unlock();
+ return supp;
+}
+
int tipc_bearer_mtu(struct net *net, u32 bearer_id)
{
int mtu = 0;
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index 278ff7f616f9..635c9086e19a 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -60,9 +60,14 @@
#define TIPC_MEDIA_TYPE_IB 2
#define TIPC_MEDIA_TYPE_UDP 3
-/* minimum bearer MTU */
+/* Minimum bearer MTU */
#define TIPC_MIN_BEARER_MTU (MAX_H_SIZE + INT_H_SIZE)
+/* Identifiers for distinguishing between broadcast/multicast and replicast
+ */
+#define TIPC_BROADCAST_SUPPORT 1
+#define TIPC_REPLICAST_SUPPORT 2
+
/**
* struct tipc_media_addr - destination address used by TIPC bearers
* @value: address info (format defined by media)
@@ -210,6 +215,7 @@ int tipc_bearer_setup(void);
void tipc_bearer_cleanup(void);
void tipc_bearer_stop(struct net *net);
int tipc_bearer_mtu(struct net *net, u32 bearer_id);
+bool tipc_bearer_bcast_support(struct net *net, u32 bearer_id);
void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
struct sk_buff *skb,
struct tipc_media_addr *dest);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 4e8647aef01c..ddd2dd6f77aa 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -515,6 +515,10 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
if (link_is_bc_sndlink(l))
l->state = LINK_ESTABLISHED;
+ /* Disable replicast if even a single peer doesn't support it */
+ if (link_is_bc_rcvlink(l) && !(peer_caps & TIPC_BCAST_RCAST))
+ tipc_bcast_disable_rcast(net);
+
return true;
}
@@ -776,60 +780,47 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
/**
* link_schedule_user - schedule a message sender for wakeup after congestion
- * @link: congested link
- * @list: message that was attempted sent
+ * @l: congested link
+ * @hdr: header of message that is being sent
* Create pseudo msg to send back to user when congestion abates
- * Does not consume buffer list
*/
-static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
+static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)
{
- struct tipc_msg *msg = buf_msg(skb_peek(list));
- int imp = msg_importance(msg);
- u32 oport = msg_origport(msg);
- u32 addr = tipc_own_addr(link->net);
+ u32 dnode = tipc_own_addr(l->net);
+ u32 dport = msg_origport(hdr);
struct sk_buff *skb;
- /* This really cannot happen... */
- if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
- pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
- return -ENOBUFS;
- }
- /* Non-blocking sender: */
- if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
- return -ELINKCONG;
-
/* Create and schedule wakeup pseudo message */
skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
- addr, addr, oport, 0, 0);
+ dnode, l->addr, dport, 0, 0);
if (!skb)
return -ENOBUFS;
- TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
- TIPC_SKB_CB(skb)->chain_imp = imp;
- skb_queue_tail(&link->wakeupq, skb);
- link->stats.link_congs++;
+ msg_set_dest_droppable(buf_msg(skb), true);
+ TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr);
+ skb_queue_tail(&l->wakeupq, skb);
+ l->stats.link_congs++;
return -ELINKCONG;
}
/**
* link_prepare_wakeup - prepare users for wakeup after congestion
- * @link: congested link
- * Move a number of waiting users, as permitted by available space in
- * the send queue, from link wait queue to node wait queue for wakeup
+ * @l: congested link
+ * Wake up a number of waiting users, as permitted by available space
+ * in the send queue
*/
void link_prepare_wakeup(struct tipc_link *l)
{
- int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
- int imp, lim;
struct sk_buff *skb, *tmp;
+ int imp, i = 0;
skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
imp = TIPC_SKB_CB(skb)->chain_imp;
- lim = l->backlog[imp].limit;
- pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
- if ((pnd[imp] + l->backlog[imp].len) >= lim)
+ if (l->backlog[imp].len < l->backlog[imp].limit) {
+ skb_unlink(skb, &l->wakeupq);
+ skb_queue_tail(l->inputq, skb);
+ } else if (i++ > 10) {
break;
- skb_unlink(skb, &l->wakeupq);
- skb_queue_tail(l->inputq, skb);
+ }
}
}
@@ -869,8 +860,7 @@ void tipc_link_reset(struct tipc_link *l)
* @list: chain of buffers containing message
* @xmitq: returned list of packets to be sent by caller
*
- * Consumes the buffer chain, except when returning -ELINKCONG,
- * since the caller then may want to make more send attempts.
+ * Consumes the buffer chain.
* Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
* Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
*/
@@ -879,7 +869,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
{
struct tipc_msg *hdr = buf_msg(skb_peek(list));
unsigned int maxwin = l->window;
- unsigned int i, imp = msg_importance(hdr);
+ int imp = msg_importance(hdr);
unsigned int mtu = l->mtu;
u16 ack = l->rcv_nxt - 1;
u16 seqno = l->snd_nxt;
@@ -888,19 +878,22 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
struct sk_buff_head *backlogq = &l->backlogq;
struct sk_buff *skb, *_skb, *bskb;
int pkt_cnt = skb_queue_len(list);
+ int rc = 0;
- /* Match msg importance against this and all higher backlog limits: */
- if (!skb_queue_empty(backlogq)) {
- for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
- if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
- return link_schedule_user(l, list);
- }
- }
if (unlikely(msg_size(hdr) > mtu)) {
skb_queue_purge(list);
return -EMSGSIZE;
}
+ /* Allow oversubscription of one data msg per source at congestion */
+ if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) {
+ if (imp == TIPC_SYSTEM_IMPORTANCE) {
+ pr_warn("%s<%s>, link overflow", link_rst_msg, l->name);
+ return -ENOBUFS;
+ }
+ rc = link_schedule_user(l, hdr);
+ }
+
if (pkt_cnt > 1) {
l->stats.sent_fragmented++;
l->stats.sent_fragments += pkt_cnt;
@@ -946,7 +939,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
skb_queue_splice_tail_init(list, backlogq);
}
l->snd_nxt = seqno;
- return 0;
+ return rc;
}
void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
@@ -1043,11 +1036,17 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *inputq)
{
- switch (msg_user(buf_msg(skb))) {
+ struct tipc_msg *hdr = buf_msg(skb);
+
+ switch (msg_user(hdr)) {
case TIPC_LOW_IMPORTANCE:
case TIPC_MEDIUM_IMPORTANCE:
case TIPC_HIGH_IMPORTANCE:
case TIPC_CRITICAL_IMPORTANCE:
+ if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) {
+ skb_queue_tail(l->bc_rcvlink->inputq, skb);
+ return true;
+ }
case CONN_MANAGER:
skb_queue_tail(inputq, skb);
return true;
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index ab02d0742476..312ef7de57d7 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -607,6 +607,23 @@ error:
return false;
}
+bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
+ struct sk_buff_head *cpy)
+{
+ struct sk_buff *skb, *_skb;
+
+ skb_queue_walk(msg, skb) {
+ _skb = pskb_copy(skb, GFP_ATOMIC);
+ if (!_skb) {
+ __skb_queue_purge(cpy);
+ return false;
+ }
+ msg_set_destnode(buf_msg(_skb), dst);
+ __skb_queue_tail(cpy, _skb);
+ }
+ return true;
+}
+
/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
* @list: list to be appended to
* @seqno: sequence number of buffer to add
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 2c3dc38abf9c..c843fd2bc48d 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -98,8 +98,6 @@ struct tipc_skb_cb {
u32 bytes_read;
struct sk_buff *tail;
bool validated;
- bool wakeup_pending;
- u16 chain_sz;
u16 chain_imp;
u16 ackers;
};
@@ -633,14 +631,11 @@ static inline void msg_set_bc_netid(struct tipc_msg *m, u32 id)
static inline u32 msg_link_selector(struct tipc_msg *m)
{
+ if (msg_user(m) == MSG_FRAGMENTER)
+ m = (void *)msg_data(m);
return msg_bits(m, 4, 0, 1);
}
-static inline void msg_set_link_selector(struct tipc_msg *m, u32 n)
-{
- msg_set_bits(m, 4, 0, 1, n);
-}
-
/*
* Word 5
*/
@@ -837,6 +832,8 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
int offset, int dsz, int mtu, struct sk_buff_head *list);
bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
+bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
+ struct sk_buff_head *cpy);
void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
struct sk_buff *skb);
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index e190460fe0d3..9be6592e4a6f 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -608,7 +608,7 @@ not_found:
* Returns non-zero if any off-node ports overlap
*/
int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
- u32 limit, struct tipc_plist *dports)
+ u32 limit, struct list_head *dports)
{
struct name_seq *seq;
struct sub_seq *sseq;
@@ -633,7 +633,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
info = sseq->info;
list_for_each_entry(publ, &info->node_list, node_list) {
if (publ->scope <= limit)
- tipc_plist_push(dports, publ->ref);
+ u32_push(dports, publ->ref);
}
if (info->cluster_list_size != info->node_list_size)
@@ -645,6 +645,39 @@ exit:
return res;
}
+/* tipc_nametbl_lookup_dst_nodes - find broadcast destination nodes
+ * - Creates list of nodes that overlap the given multicast address
+ * - Determines if any node local ports overlap
+ */
+void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
+ u32 upper, u32 domain,
+ struct tipc_nlist *nodes)
+{
+ struct sub_seq *sseq, *stop;
+ struct publication *publ;
+ struct name_info *info;
+ struct name_seq *seq;
+
+ rcu_read_lock();
+ seq = nametbl_find_seq(net, type);
+ if (!seq)
+ goto exit;
+
+ spin_lock_bh(&seq->lock);
+ sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
+ stop = seq->sseqs + seq->first_free;
+ for (; sseq->lower <= upper && sseq != stop; sseq++) {
+ info = sseq->info;
+ list_for_each_entry(publ, &info->zone_list, zone_list) {
+ if (tipc_in_scope(domain, publ->node))
+ tipc_nlist_add(nodes, publ->node);
+ }
+ }
+ spin_unlock_bh(&seq->lock);
+exit:
+ rcu_read_unlock();
+}
+
/*
* tipc_nametbl_publish - add name publication to network name tables
*/
@@ -1022,40 +1055,79 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
return skb->len;
}
-void tipc_plist_push(struct tipc_plist *pl, u32 port)
+bool u32_find(struct list_head *l, u32 value)
{
- struct tipc_plist *nl;
+ struct u32_item *item;
- if (likely(!pl->port)) {
- pl->port = port;
- return;
+ list_for_each_entry(item, l, list) {
+ if (item->value == value)
+ return true;
}
- if (pl->port == port)
- return;
- list_for_each_entry(nl, &pl->list, list) {
- if (nl->port == port)
- return;
+ return false;
+}
+
+bool u32_push(struct list_head *l, u32 value)
+{
+ struct u32_item *item;
+
+ list_for_each_entry(item, l, list) {
+ if (item->value == value)
+ return false;
+ }
+ item = kmalloc(sizeof(*item), GFP_ATOMIC);
+ if (unlikely(!item))
+ return false;
+
+ item->value = value;
+ list_add(&item->list, l);
+ return true;
+}
+
+u32 u32_pop(struct list_head *l)
+{
+ struct u32_item *item;
+ u32 value = 0;
+
+ if (list_empty(l))
+ return 0;
+ item = list_first_entry(l, typeof(*item), list);
+ value = item->value;
+ list_del(&item->list);
+ kfree(item);
+ return value;
+}
+
+bool u32_del(struct list_head *l, u32 value)
+{
+ struct u32_item *item, *tmp;
+
+ list_for_each_entry_safe(item, tmp, l, list) {
+ if (item->value != value)
+ continue;
+ list_del(&item->list);
+ kfree(item);
+ return true;
}
- nl = kmalloc(sizeof(*nl), GFP_ATOMIC);
- if (nl) {
- nl->port = port;
- list_add(&nl->list, &pl->list);
+ return false;
+}
+
+void u32_list_purge(struct list_head *l)
+{
+ struct u32_item *item, *tmp;
+
+ list_for_each_entry_safe(item, tmp, l, list) {
+ list_del(&item->list);
+ kfree(item);
}
}
-u32 tipc_plist_pop(struct tipc_plist *pl)
+int u32_list_len(struct list_head *l)
{
- struct tipc_plist *nl;
- u32 port = 0;
+ struct u32_item *item;
+ int i = 0;
- if (likely(list_empty(&pl->list))) {
- port = pl->port;
- pl->port = 0;
- return port;
+ list_for_each_entry(item, l, list) {
+ i++;
}
- nl = list_first_entry(&pl->list, typeof(*nl), list);
- port = nl->port;
- list_del(&nl->list);
- kfree(nl);
- return port;
+ return i;
}
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 1524a73830f7..6ebdeb1d84a5 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -39,6 +39,7 @@
struct tipc_subscription;
struct tipc_plist;
+struct tipc_nlist;
/*
* TIPC name types reserved for internal TIPC use (both current and planned)
@@ -99,7 +100,10 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
- u32 limit, struct tipc_plist *dports);
+ u32 limit, struct list_head *dports);
+void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
+ u32 upper, u32 domain,
+ struct tipc_nlist *nodes);
struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
u32 upper, u32 scope, u32 port_ref,
u32 key);
@@ -116,18 +120,16 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s);
int tipc_nametbl_init(struct net *net);
void tipc_nametbl_stop(struct net *net);
-struct tipc_plist {
+struct u32_item {
struct list_head list;
- u32 port;
+ u32 value;
};
-static inline void tipc_plist_init(struct tipc_plist *pl)
-{
- INIT_LIST_HEAD(&pl->list);
- pl->port = 0;
-}
-
-void tipc_plist_push(struct tipc_plist *pl, u32 port);
-u32 tipc_plist_pop(struct tipc_plist *pl);
+bool u32_push(struct list_head *l, u32 value);
+u32 u32_pop(struct list_head *l);
+bool u32_find(struct list_head *l, u32 value);
+bool u32_del(struct list_head *l, u32 value);
+void u32_list_purge(struct list_head *l);
+int u32_list_len(struct list_head *l);
#endif
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 27753325e06e..e9295fa3a554 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1172,7 +1172,7 @@ msg_full:
* @list: chain of buffers containing message
* @dnode: address of destination node
* @selector: a number used for deterministic link selection
- * Consumes the buffer chain, except when returning -ELINKCONG
+ * Consumes the buffer chain.
* Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF
*/
int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
@@ -1211,10 +1211,10 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
spin_unlock_bh(&le->lock);
tipc_node_read_unlock(n);
- if (likely(rc == 0))
- tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
- else if (rc == -ENOBUFS)
+ if (unlikely(rc == -ENOBUFS))
tipc_node_link_down(n, bearer_id, false);
+ else
+ tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
tipc_node_put(n);
@@ -1226,20 +1226,15 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
* messages, which will not be rejected
* The only exception is datagram messages rerouted after secondary
* lookup, which are rare and safe to dispose of anyway.
- * TODO: Return real return value, and let callers use
- * tipc_wait_for_sendpkt() where applicable
*/
int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
u32 selector)
{
struct sk_buff_head head;
- int rc;
skb_queue_head_init(&head);
__skb_queue_tail(&head, skb);
- rc = tipc_node_xmit(net, &head, dnode, selector);
- if (rc == -ELINKCONG)
- kfree_skb(skb);
+ tipc_node_xmit(net, &head, dnode, selector);
return 0;
}
@@ -1267,6 +1262,19 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
kfree_skb(skb);
}
+static void tipc_node_mcast_rcv(struct tipc_node *n)
+{
+ struct tipc_bclink_entry *be = &n->bc_entry;
+
+ /* 'arrvq' is under inputq2's lock protection */
+ spin_lock_bh(&be->inputq2.lock);
+ spin_lock_bh(&be->inputq1.lock);
+ skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
+ spin_unlock_bh(&be->inputq1.lock);
+ spin_unlock_bh(&be->inputq2.lock);
+ tipc_sk_mcast_rcv(n->net, &be->arrvq, &be->inputq2);
+}
+
static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,
int bearer_id, struct sk_buff_head *xmitq)
{
@@ -1340,15 +1348,8 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
if (!skb_queue_empty(&xmitq))
tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
- /* Deliver. 'arrvq' is under inputq2's lock protection */
- if (!skb_queue_empty(&be->inputq1)) {
- spin_lock_bh(&be->inputq2.lock);
- spin_lock_bh(&be->inputq1.lock);
- skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
- spin_unlock_bh(&be->inputq1.lock);
- spin_unlock_bh(&be->inputq2.lock);
- tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
- }
+ if (!skb_queue_empty(&be->inputq1))
+ tipc_node_mcast_rcv(n);
if (rc & TIPC_LINK_DOWN_EVT) {
/* Reception reassembly failure => reset all links to peer */
@@ -1575,6 +1576,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
tipc_named_rcv(net, &n->bc_entry.namedq);
+ if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
+ tipc_node_mcast_rcv(n);
+
if (!skb_queue_empty(&le->inputq))
tipc_sk_rcv(net, &le->inputq);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 39ef54c1f2ad..898c22916984 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -47,11 +47,13 @@
enum {
TIPC_BCAST_SYNCH = (1 << 1),
TIPC_BCAST_STATE_NACK = (1 << 2),
- TIPC_BLOCK_FLOWCTL = (1 << 3)
+ TIPC_BLOCK_FLOWCTL = (1 << 3),
+ TIPC_BCAST_RCAST = (1 << 4)
};
#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \
TIPC_BCAST_STATE_NACK | \
+ TIPC_BCAST_RCAST | \
TIPC_BLOCK_FLOWCTL)
#define INVALID_BEARER_ID -1
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 800caaa699a1..103d1fd058c0 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -67,16 +67,19 @@ enum {
* @max_pkt: maximum packet size "hint" used when building messages sent by port
* @portid: unique port identity in TIPC socket hash table
* @phdr: preformatted message header used when sending messages
+ * #cong_links: list of congested links
* @publications: list of publications for port
+ * @blocking_link: address of the congested link we are currently sleeping on
* @pub_count: total # of publications port has made during its lifetime
* @probing_state:
* @conn_timeout: the time we can wait for an unresponded setup request
* @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
- * @link_cong: non-zero if owner must sleep because of link congestion
+ * @cong_link_cnt: number of congested links
* @sent_unacked: # messages sent by socket, and not yet acked by peer
* @rcv_unacked: # messages read by user, but not yet acked back to peer
* @peer: 'connected' peer for dgram/rdm
* @node: hash table node
+ * @mc_method: cookie for use between socket and broadcast layer
* @rcu: rcu struct for tipc_sock
*/
struct tipc_sock {
@@ -87,13 +90,13 @@ struct tipc_sock {
u32 max_pkt;
u32 portid;
struct tipc_msg phdr;
- struct list_head sock_list;
+ struct list_head cong_links;
struct list_head publications;
u32 pub_count;
uint conn_timeout;
atomic_t dupl_rcvcnt;
bool probe_unacked;
- bool link_cong;
+ u16 cong_link_cnt;
u16 snt_unacked;
u16 snd_win;
u16 peer_caps;
@@ -101,6 +104,7 @@ struct tipc_sock {
u16 rcv_win;
struct sockaddr_tipc peer;
struct rhash_head node;
+ struct tipc_mc_method mc_method;
struct rcu_head rcu;
};
@@ -110,7 +114,6 @@ static void tipc_write_space(struct sock *sk);
static void tipc_sock_destruct(struct sock *sk);
static int tipc_release(struct socket *sock);
static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
-static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
static void tipc_sk_timeout(unsigned long data);
static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
struct tipc_name_seq const *seq);
@@ -119,8 +122,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
static int tipc_sk_insert(struct tipc_sock *tsk);
static void tipc_sk_remove(struct tipc_sock *tsk);
-static int __tipc_send_stream(struct socket *sock, struct msghdr *m,
- size_t dsz);
+static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
static const struct proto_ops packet_ops;
@@ -334,6 +336,49 @@ static int tipc_set_sk_state(struct sock *sk, int state)
return res;
}
+static int tipc_sk_sock_err(struct socket *sock, long *timeout)
+{
+ struct sock *sk = sock->sk;
+ int err = sock_error(sk);
+ int typ = sock->type;
+
+ if (err)
+ return err;
+ if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
+ if (sk->sk_state == TIPC_DISCONNECTING)
+ return -EPIPE;
+ else if (!tipc_sk_connected(sk))
+ return -ENOTCONN;
+ }
+ if (!*timeout)
+ return -EAGAIN;
+ if (signal_pending(current))
+ return sock_intr_errno(*timeout);
+
+ return 0;
+}
+
+#define tipc_wait_for_cond(sock_, timeout_, condition_) \
+({ \
+ int rc_ = 0; \
+ int done_ = 0; \
+ \
+ while (!(condition_) && !done_) { \
+ struct sock *sk_ = sock->sk; \
+ DEFINE_WAIT_FUNC(wait_, woken_wake_function); \
+ \
+ rc_ = tipc_sk_sock_err(sock_, timeout_); \
+ if (rc_) \
+ break; \
+ prepare_to_wait(sk_sleep(sk_), &wait_, \
+ TASK_INTERRUPTIBLE); \
+ done_ = sk_wait_event(sk_, timeout_, \
+ (condition_), &wait_); \
+ remove_wait_queue(sk_sleep(sk_), &wait_); \
+ } \
+ rc_; \
+})
+
/**
* tipc_sk_create - create a TIPC socket
* @net: network namespace (must be default network)
@@ -382,6 +427,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
tsk = tipc_sk(sk);
tsk->max_pkt = MAX_PKT_DEFAULT;
INIT_LIST_HEAD(&tsk->publications);
+ INIT_LIST_HEAD(&tsk->cong_links);
msg = &tsk->phdr;
tn = net_generic(sock_net(sk), tipc_net_id);
tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
@@ -432,9 +478,14 @@ static void __tipc_shutdown(struct socket *sock, int error)
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
struct net *net = sock_net(sk);
+ long timeout = CONN_TIMEOUT_DEFAULT;
u32 dnode = tsk_peer_node(tsk);
struct sk_buff *skb;
+ /* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
+ tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
+ !tsk_conn_cong(tsk)));
+
/* Reject all unreceived messages, except on an active connection
* (which disconnects locally & sends a 'FIN+' to peer).
*/
@@ -505,7 +556,8 @@ static int tipc_release(struct socket *sock)
/* Reject any messages that accumulated in backlog queue */
release_sock(sk);
-
+ u32_list_purge(&tsk->cong_links);
+ tsk->cong_link_cnt = 0;
call_rcu(&tsk->rcu, tipc_sk_callback);
sock->sk = NULL;
@@ -648,7 +700,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
switch (sk->sk_state) {
case TIPC_ESTABLISHED:
- if (!tsk->link_cong && !tsk_conn_cong(tsk))
+ if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
mask |= POLLOUT;
/* fall thru' */
case TIPC_LISTEN:
@@ -657,7 +709,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
mask |= (POLLIN | POLLRDNORM);
break;
case TIPC_OPEN:
- if (!tsk->link_cong)
+ if (!tsk->cong_link_cnt)
mask |= POLLOUT;
if (tipc_sk_type_connectionless(sk) &&
(!skb_queue_empty(&sk->sk_receive_queue)))
@@ -676,63 +728,60 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
* @sock: socket structure
* @seq: destination address
* @msg: message to send
- * @dsz: total length of message data
- * @timeo: timeout to wait for wakeup
+ * @dlen: length of data to send
+ * @timeout: timeout to wait for wakeup
*
* Called from function tipc_sendmsg(), which has done all sanity checks
* Returns the number of bytes sent on success, or errno
*/
static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
- struct msghdr *msg, size_t dsz, long timeo)
+ struct msghdr *msg, size_t dlen, long timeout)
{
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
+ struct tipc_msg *hdr = &tsk->phdr;
struct net *net = sock_net(sk);
- struct tipc_msg *mhdr = &tsk->phdr;
- struct sk_buff_head pktchain;
- struct iov_iter save = msg->msg_iter;
- uint mtu;
+ int mtu = tipc_bcast_get_mtu(net);
+ struct tipc_mc_method *method = &tsk->mc_method;
+ u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
+ struct sk_buff_head pkts;
+ struct tipc_nlist dsts;
int rc;
- if (!timeo && tsk->link_cong)
- return -ELINKCONG;
+ /* Block or return if any destination link is congested */
+ rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
+ if (unlikely(rc))
+ return rc;
- msg_set_type(mhdr, TIPC_MCAST_MSG);
- msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
- msg_set_destport(mhdr, 0);
- msg_set_destnode(mhdr, 0);
- msg_set_nametype(mhdr, seq->type);
- msg_set_namelower(mhdr, seq->lower);
- msg_set_nameupper(mhdr, seq->upper);
- msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
+ /* Lookup destination nodes */
+ tipc_nlist_init(&dsts, tipc_own_addr(net));
+ tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
+ seq->upper, domain, &dsts);
+ if (!dsts.local && !dsts.remote)
+ return -EHOSTUNREACH;
- skb_queue_head_init(&pktchain);
+ /* Build message header */
+ msg_set_type(hdr, TIPC_MCAST_MSG);
+ msg_set_hdr_sz(hdr, MCAST_H_SIZE);
+ msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
+ msg_set_destport(hdr, 0);
+ msg_set_destnode(hdr, 0);
+ msg_set_nametype(hdr, seq->type);
+ msg_set_namelower(hdr, seq->lower);
+ msg_set_nameupper(hdr, seq->upper);
-new_mtu:
- mtu = tipc_bcast_get_mtu(net);
- rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain);
- if (unlikely(rc < 0))
- return rc;
+ /* Build message as chain of buffers */
+ skb_queue_head_init(&pkts);
+ rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
- do {
- rc = tipc_bcast_xmit(net, &pktchain);
- if (likely(!rc))
- return dsz;
-
- if (rc == -ELINKCONG) {
- tsk->link_cong = 1;
- rc = tipc_wait_for_sndmsg(sock, &timeo);
- if (!rc)
- continue;
- }
- __skb_queue_purge(&pktchain);
- if (rc == -EMSGSIZE) {
- msg->msg_iter = save;
- goto new_mtu;
- }
- break;
- } while (1);
- return rc;
+ /* Send message if build was successful */
+ if (unlikely(rc == dlen))
+ rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
+ &tsk->cong_link_cnt);
+
+ tipc_nlist_purge(&dsts);
+
+ return rc ? rc : dlen;
}
/**
@@ -746,7 +795,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
struct sk_buff_head *inputq)
{
struct tipc_msg *msg;
- struct tipc_plist dports;
+ struct list_head dports;
u32 portid;
u32 scope = TIPC_CLUSTER_SCOPE;
struct sk_buff_head tmpq;
@@ -754,7 +803,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
struct sk_buff *skb, *_skb;
__skb_queue_head_init(&tmpq);
- tipc_plist_init(&dports);
+ INIT_LIST_HEAD(&dports);
skb = tipc_skb_peek(arrvq, &inputq->lock);
for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
@@ -768,8 +817,8 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
tipc_nametbl_mc_translate(net,
msg_nametype(msg), msg_namelower(msg),
msg_nameupper(msg), scope, &dports);
- portid = tipc_plist_pop(&dports);
- for (; portid; portid = tipc_plist_pop(&dports)) {
+ portid = u32_pop(&dports);
+ for (; portid; portid = u32_pop(&dports)) {
_skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
if (_skb) {
msg_set_destport(buf_msg(_skb), portid);
@@ -830,31 +879,6 @@ exit:
kfree_skb(skb);
}
-static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
-{
- DEFINE_WAIT_FUNC(wait, woken_wake_function);
- struct sock *sk = sock->sk;
- struct tipc_sock *tsk = tipc_sk(sk);
- int done;
-
- do {
- int err = sock_error(sk);
- if (err)
- return err;
- if (sk->sk_shutdown & SEND_SHUTDOWN)
- return -EPIPE;
- if (!*timeo_p)
- return -EAGAIN;
- if (signal_pending(current))
- return sock_intr_errno(*timeo_p);
-
- add_wait_queue(sk_sleep(sk), &wait);
- done = sk_wait_event(sk, timeo_p, !tsk->link_cong, &wait);
- remove_wait_queue(sk_sleep(sk), &wait);
- } while (!done);
- return 0;
-}
-
/**
* tipc_sendmsg - send message in connectionless manner
* @sock: socket structure
@@ -881,35 +905,38 @@ static int tipc_sendmsg(struct socket *sock,
return ret;
}
-static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
+static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
{
- DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
struct sock *sk = sock->sk;
- struct tipc_sock *tsk = tipc_sk(sk);
struct net *net = sock_net(sk);
- struct tipc_msg *mhdr = &tsk->phdr;
- u32 dnode, dport;
- struct sk_buff_head pktchain;
- bool is_connectionless = tipc_sk_type_connectionless(sk);
- struct sk_buff *skb;
+ struct tipc_sock *tsk = tipc_sk(sk);
+ DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
+ long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+ struct list_head *clinks = &tsk->cong_links;
+ bool syn = !tipc_sk_type_connectionless(sk);
+ struct tipc_msg *hdr = &tsk->phdr;
struct tipc_name_seq *seq;
- struct iov_iter save;
- u32 mtu;
- long timeo;
- int rc;
+ struct sk_buff_head pkts;
+ u32 type, inst, domain;
+ u32 dnode, dport;
+ int mtu, rc;
- if (dsz > TIPC_MAX_USER_MSG_SIZE)
+ if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
return -EMSGSIZE;
+
if (unlikely(!dest)) {
- if (is_connectionless && tsk->peer.family == AF_TIPC)
- dest = &tsk->peer;
- else
+ dest = &tsk->peer;
+ if (!syn || dest->family != AF_TIPC)
return -EDESTADDRREQ;
- } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
- dest->family != AF_TIPC) {
- return -EINVAL;
}
- if (!is_connectionless) {
+
+ if (unlikely(m->msg_namelen < sizeof(*dest)))
+ return -EINVAL;
+
+ if (unlikely(dest->family != AF_TIPC))
+ return -EINVAL;
+
+ if (unlikely(syn)) {
if (sk->sk_state == TIPC_LISTEN)
return -EPIPE;
if (sk->sk_state != TIPC_OPEN)
@@ -921,102 +948,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
tsk->conn_instance = dest->addr.name.name.instance;
}
}
- seq = &dest->addr.nameseq;
- timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
- if (dest->addrtype == TIPC_ADDR_MCAST) {
- return tipc_sendmcast(sock, seq, m, dsz, timeo);
- } else if (dest->addrtype == TIPC_ADDR_NAME) {
- u32 type = dest->addr.name.name.type;
- u32 inst = dest->addr.name.name.instance;
- u32 domain = dest->addr.name.domain;
+ seq = &dest->addr.nameseq;
+ if (dest->addrtype == TIPC_ADDR_MCAST)
+ return tipc_sendmcast(sock, seq, m, dlen, timeout);
+ if (dest->addrtype == TIPC_ADDR_NAME) {
+ type = dest->addr.name.name.type;
+ inst = dest->addr.name.name.instance;
+ domain = dest->addr.name.domain;
dnode = domain;
- msg_set_type(mhdr, TIPC_NAMED_MSG);
- msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
- msg_set_nametype(mhdr, type);
- msg_set_nameinst(mhdr, inst);
- msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
+ msg_set_type(hdr, TIPC_NAMED_MSG);
+ msg_set_hdr_sz(hdr, NAMED_H_SIZE);
+ msg_set_nametype(hdr, type);
+ msg_set_nameinst(hdr, inst);
+ msg_set_lookup_scope(hdr, tipc_addr_scope(domain));
dport = tipc_nametbl_translate(net, type, inst, &dnode);
- msg_set_destnode(mhdr, dnode);
- msg_set_destport(mhdr, dport);
+ msg_set_destnode(hdr, dnode);
+ msg_set_destport(hdr, dport);
if (unlikely(!dport && !dnode))
return -EHOSTUNREACH;
+
} else if (dest->addrtype == TIPC_ADDR_ID) {
dnode = dest->addr.id.node;
- msg_set_type(mhdr, TIPC_DIRECT_MSG);
- msg_set_lookup_scope(mhdr, 0);
- msg_set_destnode(mhdr, dnode);
- msg_set_destport(mhdr, dest->addr.id.ref);
- msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
+ msg_set_type(hdr, TIPC_DIRECT_MSG);
+ msg_set_lookup_scope(hdr, 0);
+ msg_set_destnode(hdr, dnode);
+ msg_set_destport(hdr, dest->addr.id.ref);
+ msg_set_hdr_sz(hdr, BASIC_H_SIZE);
}
- skb_queue_head_init(&pktchain);
- save = m->msg_iter;
-new_mtu:
- mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
- rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
- if (rc < 0)
+ /* Block or return if destination link is congested */
+ rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode));
+ if (unlikely(rc))
return rc;
- do {
- skb = skb_peek(&pktchain);
- TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
- rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid);
- if (likely(!rc)) {
- if (!is_connectionless)
- tipc_set_sk_state(sk, TIPC_CONNECTING);
- return dsz;
- }
- if (rc == -ELINKCONG) {
- tsk->link_cong = 1;
- rc = tipc_wait_for_sndmsg(sock, &timeo);
- if (!rc)
- continue;
- }
- __skb_queue_purge(&pktchain);
- if (rc == -EMSGSIZE) {
- m->msg_iter = save;
- goto new_mtu;
- }
- break;
- } while (1);
-
- return rc;
-}
+ skb_queue_head_init(&pkts);
+ mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
+ rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
+ if (unlikely(rc != dlen))
+ return rc;
-static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
-{
- DEFINE_WAIT_FUNC(wait, woken_wake_function);
- struct sock *sk = sock->sk;
- struct tipc_sock *tsk = tipc_sk(sk);
- int done;
+ rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
+ if (unlikely(rc == -ELINKCONG)) {
+ u32_push(clinks, dnode);
+ tsk->cong_link_cnt++;
+ rc = 0;
+ }
- do {
- int err = sock_error(sk);
- if (err)
- return err;
- if (sk->sk_state == TIPC_DISCONNECTING)
- return -EPIPE;
- else if (!tipc_sk_connected(sk))
- return -ENOTCONN;
- if (!*timeo_p)
- return -EAGAIN;
- if (signal_pending(current))
- return sock_intr_errno(*timeo_p);
+ if (unlikely(syn && !rc))
+ tipc_set_sk_state(sk, TIPC_CONNECTING);
- add_wait_queue(sk_sleep(sk), &wait);
- done = sk_wait_event(sk, timeo_p,
- (!tsk->link_cong &&
- !tsk_conn_cong(tsk)) ||
- !tipc_sk_connected(sk), &wait);
- remove_wait_queue(sk_sleep(sk), &wait);
- } while (!done);
- return 0;
+ return rc ? rc : dlen;
}
/**
- * tipc_send_stream - send stream-oriented data
+ * tipc_sendstream - send stream-oriented data
* @sock: socket structure
* @m: data to send
* @dsz: total length of data to be transmitted
@@ -1026,94 +1013,69 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
* Returns the number of bytes sent on success (or partial success),
* or errno if no data sent
*/
-static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
+static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
{
struct sock *sk = sock->sk;
int ret;
lock_sock(sk);
- ret = __tipc_send_stream(sock, m, dsz);
+ ret = __tipc_sendstream(sock, m, dsz);
release_sock(sk);
return ret;
}
-static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
+static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
{
struct sock *sk = sock->sk;
- struct net *net = sock_net(sk);
- struct tipc_sock *tsk = tipc_sk(sk);
- struct tipc_msg *mhdr = &tsk->phdr;
- struct sk_buff_head pktchain;
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
- u32 portid = tsk->portid;
- int rc = -EINVAL;
- long timeo;
- u32 dnode;
- uint mtu, send, sent = 0;
- struct iov_iter save;
- int hlen = MIN_H_SIZE;
-
- /* Handle implied connection establishment */
- if (unlikely(dest)) {
- rc = __tipc_sendmsg(sock, m, dsz);
- hlen = msg_hdr_sz(mhdr);
- if (dsz && (dsz == rc))
- tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
- return rc;
- }
- if (dsz > (uint)INT_MAX)
- return -EMSGSIZE;
-
- if (unlikely(!tipc_sk_connected(sk))) {
- if (sk->sk_state == TIPC_DISCONNECTING)
- return -EPIPE;
- else
- return -ENOTCONN;
- }
+ long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+ struct tipc_sock *tsk = tipc_sk(sk);
+ struct tipc_msg *hdr = &tsk->phdr;
+ struct net *net = sock_net(sk);
+ struct sk_buff_head pkts;
+ u32 dnode = tsk_peer_node(tsk);
+ int send, sent = 0;
+ int rc = 0;
- timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
- if (!timeo && tsk->link_cong)
- return -ELINKCONG;
+ skb_queue_head_init(&pkts);
- dnode = tsk_peer_node(tsk);
- skb_queue_head_init(&pktchain);
+ if (unlikely(dlen > INT_MAX))
+ return -EMSGSIZE;
-next:
- save = m->msg_iter;
- mtu = tsk->max_pkt;
- send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
- rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain);
- if (unlikely(rc < 0))
+ /* Handle implicit connection setup */
+ if (unlikely(dest)) {
+ rc = __tipc_sendmsg(sock, m, dlen);
+ if (dlen && (dlen == rc))
+ tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
return rc;
+ }
do {
- if (likely(!tsk_conn_cong(tsk))) {
- rc = tipc_node_xmit(net, &pktchain, dnode, portid);
- if (likely(!rc)) {
- tsk->snt_unacked += tsk_inc(tsk, send + hlen);
- sent += send;
- if (sent == dsz)
- return dsz;
- goto next;
- }
- if (rc == -EMSGSIZE) {
- __skb_queue_purge(&pktchain);
- tsk->max_pkt = tipc_node_get_mtu(net, dnode,
- portid);
- m->msg_iter = save;
- goto next;
- }
- if (rc != -ELINKCONG)
- break;
+ rc = tipc_wait_for_cond(sock, &timeout,
+ (!tsk->cong_link_cnt &&
+ !tsk_conn_cong(tsk) &&
+ tipc_sk_connected(sk)));
+ if (unlikely(rc))
+ break;
+
+ send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
+ rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
+ if (unlikely(rc != send))
+ break;
- tsk->link_cong = 1;
+ rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
+ if (unlikely(rc == -ELINKCONG)) {
+ tsk->cong_link_cnt = 1;
+ rc = 0;
+ }
+ if (likely(!rc)) {
+ tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
+ sent += send;
}
- rc = tipc_wait_for_sndpkt(sock, &timeo);
- } while (!rc);
+ } while (sent < dlen && !rc);
- __skb_queue_purge(&pktchain);
- return sent ? sent : rc;
+ return rc ? rc : sent;
}
/**
@@ -1131,7 +1093,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
if (dsz > TIPC_MAX_USER_MSG_SIZE)
return -EMSGSIZE;
- return tipc_send_stream(sock, m, dsz);
+ return tipc_sendstream(sock, m, dsz);
}
/* tipc_sk_finish_conn - complete the setup of a connection
@@ -1698,6 +1660,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
unsigned int limit = rcvbuf_limit(sk, skb);
int err = TIPC_OK;
int usr = msg_user(hdr);
+ u32 onode;
if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
tipc_sk_proto_rcv(tsk, skb, xmitq);
@@ -1705,8 +1668,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
}
if (unlikely(usr == SOCK_WAKEUP)) {
+ onode = msg_orignode(hdr);
kfree_skb(skb);
- tsk->link_cong = 0;
+ u32_del(&tsk->cong_links, onode);
+ tsk->cong_link_cnt--;
sk->sk_write_space(sk);
return false;
}
@@ -2114,7 +2079,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
struct msghdr m = {NULL,};
tsk_advance_rx_queue(sk);
- __tipc_send_stream(new_sock, &m, 0);
+ __tipc_sendstream(new_sock, &m, 0);
} else {
__skb_dequeue(&sk->sk_receive_queue);
__skb_queue_head(&new_sk->sk_receive_queue, buf);
@@ -2382,18 +2347,29 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
{
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
- u32 value;
- int res;
+ u32 value = 0;
+ int res = 0;
if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
return 0;
if (lvl != SOL_TIPC)
return -ENOPROTOOPT;
- if (ol < sizeof(value))
- return -EINVAL;
- res = get_user(value, (u32 __user *)ov);
- if (res)
- return res;
+
+ switch (opt) {
+ case TIPC_IMPORTANCE:
+ case TIPC_SRC_DROPPABLE:
+ case TIPC_DEST_DROPPABLE:
+ case TIPC_CONN_TIMEOUT:
+ if (ol < sizeof(value))
+ return -EINVAL;
+ res = get_user(value, (u32 __user *)ov);
+ if (res)
+ return res;
+ break;
+ default:
+ if (ov || ol)
+ return -EINVAL;
+ }
lock_sock(sk);
@@ -2412,7 +2388,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
break;
case TIPC_CONN_TIMEOUT:
tipc_sk(sk)->conn_timeout = value;
- /* no need to set "res", since already 0 at this point */
+ break;
+ case TIPC_MCAST_BROADCAST:
+ tsk->mc_method.rcast = false;
+ tsk->mc_method.mandatory = true;
+ break;
+ case TIPC_MCAST_REPLICAST:
+ tsk->mc_method.rcast = true;
+ tsk->mc_method.mandatory = true;
break;
default:
res = -EINVAL;
@@ -2575,7 +2558,7 @@ static const struct proto_ops stream_ops = {
.shutdown = tipc_shutdown,
.setsockopt = tipc_setsockopt,
.getsockopt = tipc_getsockopt,
- .sendmsg = tipc_send_stream,
+ .sendmsg = tipc_sendstream,
.recvmsg = tipc_recv_stream,
.mmap = sock_no_mmap,
.sendpage = sock_no_sendpage
diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c
index b58dc95f3d35..46061cf48cd1 100644
--- a/net/tipc/udp_media.c
+++ b/net/tipc/udp_media.c
@@ -113,7 +113,7 @@ static void tipc_udp_media_addr_set(struct tipc_media_addr *addr,
memcpy(addr->value, ua, sizeof(struct udp_media_addr));
if (tipc_udp_is_mcast_addr(ua))
- addr->broadcast = 1;
+ addr->broadcast = TIPC_BROADCAST_SUPPORT;
}
/* tipc_udp_addr2str - convert ip/udp address to string */
@@ -229,7 +229,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb,
goto out;
}
- if (!addr->broadcast || list_empty(&ub->rcast.list))
+ if (addr->broadcast != TIPC_REPLICAST_SUPPORT)
return tipc_udp_xmit(net, skb, ub, src, dst);
/* Replicast, send an skb to each configured IP address */
@@ -296,7 +296,7 @@ static int tipc_udp_rcast_add(struct tipc_bearer *b,
else if (ntohs(addr->proto) == ETH_P_IPV6)
pr_info("New replicast peer: %pI6\n", &rcast->addr.ipv6);
#endif
-
+ b->bcast_addr.broadcast = TIPC_REPLICAST_SUPPORT;
list_add_rcu(&rcast->list, &ub->rcast.list);
return 0;
}
@@ -681,7 +681,7 @@ static int tipc_udp_enable(struct net *net, struct tipc_bearer *b,
goto err;
b->bcast_addr.media_id = TIPC_MEDIA_TYPE_UDP;
- b->bcast_addr.broadcast = 1;
+ b->bcast_addr.broadcast = TIPC_BROADCAST_SUPPORT;
rcu_assign_pointer(b->media_ptr, ub);
rcu_assign_pointer(ub->bearer, b);
tipc_udp_media_addr_set(&b->addr, &local);