diff options
Diffstat (limited to 'net/tipc')
-rw-r--r-- | net/tipc/bcast.c | 204 | ||||
-rw-r--r-- | net/tipc/bcast.h | 33 | ||||
-rw-r--r-- | net/tipc/bearer.c | 15 | ||||
-rw-r--r-- | net/tipc/bearer.h | 8 | ||||
-rw-r--r-- | net/tipc/link.c | 87 | ||||
-rw-r--r-- | net/tipc/msg.c | 17 | ||||
-rw-r--r-- | net/tipc/msg.h | 11 | ||||
-rw-r--r-- | net/tipc/name_table.c | 128 | ||||
-rw-r--r-- | net/tipc/name_table.h | 24 | ||||
-rw-r--r-- | net/tipc/node.c | 42 | ||||
-rw-r--r-- | net/tipc/node.h | 4 | ||||
-rw-r--r-- | net/tipc/socket.c | 495 | ||||
-rw-r--r-- | net/tipc/udp_media.c | 8 |
13 files changed, 672 insertions, 404 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index aa1babbea385..7d99029df342 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -1,7 +1,7 @@ /* * net/tipc/bcast.c: TIPC broadcast code * - * Copyright (c) 2004-2006, 2014-2015, Ericsson AB + * Copyright (c) 2004-2006, 2014-2016, Ericsson AB * Copyright (c) 2004, Intel Corporation. * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. @@ -39,9 +39,8 @@ #include "socket.h" #include "msg.h" #include "bcast.h" -#include "name_distr.h" #include "link.h" -#include "node.h" +#include "name_table.h" #define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */ #define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ @@ -54,12 +53,20 @@ const char tipc_bclink_name[] = "broadcast-link"; * @inputq: data input queue; will only carry SOCK_WAKEUP messages * @dest: array keeping number of reachable destinations per bearer * @primary_bearer: a bearer having links to all broadcast destinations, if any + * @bcast_support: indicates if primary bearer, if any, supports broadcast + * @rcast_support: indicates if all peer nodes support replicast + * @rc_ratio: dest count as percentage of cluster size where send method changes + * @bc_threshold: calculated drom rc_ratio; if dests > threshold use broadcast */ struct tipc_bc_base { struct tipc_link *link; struct sk_buff_head inputq; int dests[MAX_BEARERS]; int primary_bearer; + bool bcast_support; + bool rcast_support; + int rc_ratio; + int bc_threshold; }; static struct tipc_bc_base *tipc_bc_base(struct net *net) @@ -69,7 +76,20 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net) int tipc_bcast_get_mtu(struct net *net) { - return tipc_link_mtu(tipc_bc_sndlink(net)); + return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE; +} + +void tipc_bcast_disable_rcast(struct net *net) +{ + tipc_bc_base(net)->rcast_support = false; +} + +static void tipc_bcbase_calc_bc_threshold(struct net *net) +{ + struct tipc_bc_base *bb = tipc_bc_base(net); + int cluster_size = tipc_link_bc_peers(tipc_bc_sndlink(net)); + + bb->bc_threshold = 1 + (cluster_size * bb->rc_ratio / 100); } /* tipc_bcbase_select_primary(): find a bearer with links to all destinations, @@ -79,9 +99,10 @@ static void tipc_bcbase_select_primary(struct net *net) { struct tipc_bc_base *bb = tipc_bc_base(net); int all_dests = tipc_link_bc_peers(bb->link); - int i, mtu; + int i, mtu, prim; bb->primary_bearer = INVALID_BEARER_ID; + bb->bcast_support = true; if (!all_dests) return; @@ -93,7 +114,7 @@ static void tipc_bcbase_select_primary(struct net *net) mtu = tipc_bearer_mtu(net, i); if (mtu < tipc_link_mtu(bb->link)) tipc_link_set_mtu(bb->link, mtu); - + bb->bcast_support &= tipc_bearer_bcast_support(net, i); if (bb->dests[i] < all_dests) continue; @@ -103,6 +124,9 @@ static void tipc_bcbase_select_primary(struct net *net) if ((i ^ tipc_own_addr(net)) & 1) break; } + prim = bb->primary_bearer; + if (prim != INVALID_BEARER_ID) + bb->bcast_support = tipc_bearer_bcast_support(net, prim); } void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id) @@ -170,45 +194,131 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq) __skb_queue_purge(&_xmitq); } -/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster - * and to identified node local sockets +static void tipc_bcast_select_xmit_method(struct net *net, int dests, + struct tipc_mc_method *method) +{ + struct tipc_bc_base *bb = tipc_bc_base(net); + unsigned long exp = method->expires; + + /* Broadcast supported by used bearer/bearers? */ + if (!bb->bcast_support) { + method->rcast = true; + return; + } + /* Any destinations which don't support replicast ? */ + if (!bb->rcast_support) { + method->rcast = false; + return; + } + /* Can current method be changed ? */ + method->expires = jiffies + TIPC_METHOD_EXPIRE; + if (method->mandatory || time_before(jiffies, exp)) + return; + + /* Determine method to use now */ + method->rcast = dests <= bb->bc_threshold; +} + +/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes * @net: the applicable net namespace - * @list: chain of buffers containing message - * Consumes the buffer chain, except when returning -ELINKCONG - * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE + * @pkts: chain of buffers containing message + * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0 + * Consumes the buffer chain. + * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE */ -int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) +static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts, + u16 *cong_link_cnt) { struct tipc_link *l = tipc_bc_sndlink(net); - struct sk_buff_head xmitq, inputq, rcvq; + struct sk_buff_head xmitq; int rc = 0; - __skb_queue_head_init(&rcvq); __skb_queue_head_init(&xmitq); - skb_queue_head_init(&inputq); - - /* Prepare message clone for local node */ - if (unlikely(!tipc_msg_reassemble(list, &rcvq))) - return -EHOSTUNREACH; - tipc_bcast_lock(net); if (tipc_link_bc_peers(l)) - rc = tipc_link_xmit(l, list, &xmitq); + rc = tipc_link_xmit(l, pkts, &xmitq); tipc_bcast_unlock(net); - - /* Don't send to local node if adding to link failed */ - if (unlikely(rc)) { - __skb_queue_purge(&rcvq); - return rc; + tipc_bcbase_xmit(net, &xmitq); + __skb_queue_purge(pkts); + if (rc == -ELINKCONG) { + *cong_link_cnt = 1; + rc = 0; } + return rc; +} - /* Broadcast to all nodes, inluding local node */ - tipc_bcbase_xmit(net, &xmitq); - tipc_sk_mcast_rcv(net, &rcvq, &inputq); - __skb_queue_purge(list); +/* tipc_rcast_xmit - replicate and send a message to given destination nodes + * @net: the applicable net namespace + * @pkts: chain of buffers containing message + * @dests: list of destination nodes + * @cong_link_cnt: returns number of congested links + * @cong_links: returns identities of congested links + * Returns 0 if success, otherwise errno + */ +static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts, + struct tipc_nlist *dests, u16 *cong_link_cnt) +{ + struct sk_buff_head _pkts; + struct u32_item *n, *tmp; + u32 dst, selector; + + selector = msg_link_selector(buf_msg(skb_peek(pkts))); + __skb_queue_head_init(&_pkts); + + list_for_each_entry_safe(n, tmp, &dests->list, list) { + dst = n->value; + if (!tipc_msg_pskb_copy(dst, pkts, &_pkts)) + return -ENOMEM; + + /* Any other return value than -ELINKCONG is ignored */ + if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG) + (*cong_link_cnt)++; + } return 0; } +/* tipc_mcast_xmit - deliver message to indicated destination nodes + * and to identified node local sockets + * @net: the applicable net namespace + * @pkts: chain of buffers containing message + * @method: send method to be used + * @dests: destination nodes for message. + * @cong_link_cnt: returns number of encountered congested destination links + * Consumes buffer chain. + * Returns 0 if success, otherwise errno + */ +int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, + struct tipc_mc_method *method, struct tipc_nlist *dests, + u16 *cong_link_cnt) +{ + struct sk_buff_head inputq, localq; + int rc = 0; + + skb_queue_head_init(&inputq); + skb_queue_head_init(&localq); + + /* Clone packets before they are consumed by next call */ + if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { + rc = -ENOMEM; + goto exit; + } + /* Send according to determined transmit method */ + if (dests->remote) { + tipc_bcast_select_xmit_method(net, dests->remote, method); + if (method->rcast) + rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); + else + rc = tipc_bcast_xmit(net, pkts, cong_link_cnt); + } + + if (dests->local) + tipc_sk_mcast_rcv(net, &localq, &inputq); +exit: + /* This queue should normally be empty by now */ + __skb_queue_purge(pkts); + return rc; +} + /* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link * * RCU is locked, no other locks set @@ -313,6 +423,7 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l, tipc_bcast_lock(net); tipc_link_add_bc_peer(snd_l, uc_l, xmitq); tipc_bcbase_select_primary(net); + tipc_bcbase_calc_bc_threshold(net); tipc_bcast_unlock(net); } @@ -331,6 +442,7 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l) tipc_bcast_lock(net); tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); tipc_bcbase_select_primary(net); + tipc_bcbase_calc_bc_threshold(net); tipc_bcast_unlock(net); tipc_bcbase_xmit(net, &xmitq); @@ -413,6 +525,8 @@ int tipc_bcast_init(struct net *net) goto enomem; bb->link = l; tn->bcl = l; + bb->rc_ratio = 25; + bb->rcast_support = true; return 0; enomem: kfree(bb); @@ -428,3 +542,33 @@ void tipc_bcast_stop(struct net *net) kfree(tn->bcbase); kfree(tn->bcl); } + +void tipc_nlist_init(struct tipc_nlist *nl, u32 self) +{ + memset(nl, 0, sizeof(*nl)); + INIT_LIST_HEAD(&nl->list); + nl->self = self; +} + +void tipc_nlist_add(struct tipc_nlist *nl, u32 node) +{ + if (node == nl->self) + nl->local = true; + else if (u32_push(&nl->list, node)) + nl->remote++; +} + +void tipc_nlist_del(struct tipc_nlist *nl, u32 node) +{ + if (node == nl->self) + nl->local = false; + else if (u32_del(&nl->list, node)) + nl->remote--; +} + +void tipc_nlist_purge(struct tipc_nlist *nl) +{ + u32_list_purge(&nl->list); + nl->remote = 0; + nl->local = 0; +} diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 855d53c64ab3..751530ab0c49 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -42,9 +42,35 @@ struct tipc_node; struct tipc_msg; struct tipc_nl_msg; -struct tipc_node_map; +struct tipc_nlist; +struct tipc_nitem; extern const char tipc_bclink_name[]; +#define TIPC_METHOD_EXPIRE msecs_to_jiffies(5000) + +struct tipc_nlist { + struct list_head list; + u32 self; + u16 remote; + bool local; +}; + +void tipc_nlist_init(struct tipc_nlist *nl, u32 self); +void tipc_nlist_purge(struct tipc_nlist *nl); +void tipc_nlist_add(struct tipc_nlist *nl, u32 node); +void tipc_nlist_del(struct tipc_nlist *nl, u32 node); + +/* Cookie to be used between socket and broadcast layer + * @rcast: replicast (instead of broadcast) was used at previous xmit + * @mandatory: broadcast/replicast indication was set by user + * @expires: re-evaluate non-mandatory transmit method if we are past this + */ +struct tipc_mc_method { + bool rcast; + bool mandatory; + unsigned long expires; +}; + int tipc_bcast_init(struct net *net); void tipc_bcast_stop(struct net *net); void tipc_bcast_add_peer(struct net *net, struct tipc_link *l, @@ -53,7 +79,10 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl); void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id); void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id); int tipc_bcast_get_mtu(struct net *net); -int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list); +void tipc_bcast_disable_rcast(struct net *net); +int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, + struct tipc_mc_method *method, struct tipc_nlist *dests, + u16 *cong_link_cnt); int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb); void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, struct tipc_msg *hdr); diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 52d74760fb68..33a5bdfbef76 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -431,7 +431,7 @@ int tipc_enable_l2_media(struct net *net, struct tipc_bearer *b, memset(&b->bcast_addr, 0, sizeof(b->bcast_addr)); memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len); b->bcast_addr.media_id = b->media->type_id; - b->bcast_addr.broadcast = 1; + b->bcast_addr.broadcast = TIPC_BROADCAST_SUPPORT; b->mtu = dev->mtu; b->media->raw2addr(b, &b->addr, (char *)dev->dev_addr); rcu_assign_pointer(dev->tipc_ptr, b); @@ -482,6 +482,19 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb, return 0; } +bool tipc_bearer_bcast_support(struct net *net, u32 bearer_id) +{ + bool supp = false; + struct tipc_bearer *b; + + rcu_read_lock(); + b = bearer_get(net, bearer_id); + if (b) + supp = (b->bcast_addr.broadcast == TIPC_BROADCAST_SUPPORT); + rcu_read_unlock(); + return supp; +} + int tipc_bearer_mtu(struct net *net, u32 bearer_id) { int mtu = 0; diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index 278ff7f616f9..635c9086e19a 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -60,9 +60,14 @@ #define TIPC_MEDIA_TYPE_IB 2 #define TIPC_MEDIA_TYPE_UDP 3 -/* minimum bearer MTU */ +/* Minimum bearer MTU */ #define TIPC_MIN_BEARER_MTU (MAX_H_SIZE + INT_H_SIZE) +/* Identifiers for distinguishing between broadcast/multicast and replicast + */ +#define TIPC_BROADCAST_SUPPORT 1 +#define TIPC_REPLICAST_SUPPORT 2 + /** * struct tipc_media_addr - destination address used by TIPC bearers * @value: address info (format defined by media) @@ -210,6 +215,7 @@ int tipc_bearer_setup(void); void tipc_bearer_cleanup(void); void tipc_bearer_stop(struct net *net); int tipc_bearer_mtu(struct net *net, u32 bearer_id); +bool tipc_bearer_bcast_support(struct net *net, u32 bearer_id); void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id, struct sk_buff *skb, struct tipc_media_addr *dest); diff --git a/net/tipc/link.c b/net/tipc/link.c index 4e8647aef01c..ddd2dd6f77aa 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -515,6 +515,10 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, if (link_is_bc_sndlink(l)) l->state = LINK_ESTABLISHED; + /* Disable replicast if even a single peer doesn't support it */ + if (link_is_bc_rcvlink(l) && !(peer_caps & TIPC_BCAST_RCAST)) + tipc_bcast_disable_rcast(net); + return true; } @@ -776,60 +780,47 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) /** * link_schedule_user - schedule a message sender for wakeup after congestion - * @link: congested link - * @list: message that was attempted sent + * @l: congested link + * @hdr: header of message that is being sent * Create pseudo msg to send back to user when congestion abates - * Does not consume buffer list */ -static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list) +static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr) { - struct tipc_msg *msg = buf_msg(skb_peek(list)); - int imp = msg_importance(msg); - u32 oport = msg_origport(msg); - u32 addr = tipc_own_addr(link->net); + u32 dnode = tipc_own_addr(l->net); + u32 dport = msg_origport(hdr); struct sk_buff *skb; - /* This really cannot happen... */ - if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { - pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); - return -ENOBUFS; - } - /* Non-blocking sender: */ - if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending) - return -ELINKCONG; - /* Create and schedule wakeup pseudo message */ skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, - addr, addr, oport, 0, 0); + dnode, l->addr, dport, 0, 0); if (!skb) return -ENOBUFS; - TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list); - TIPC_SKB_CB(skb)->chain_imp = imp; - skb_queue_tail(&link->wakeupq, skb); - link->stats.link_congs++; + msg_set_dest_droppable(buf_msg(skb), true); + TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr); + skb_queue_tail(&l->wakeupq, skb); + l->stats.link_congs++; return -ELINKCONG; } /** * link_prepare_wakeup - prepare users for wakeup after congestion - * @link: congested link - * Move a number of waiting users, as permitted by available space in - * the send queue, from link wait queue to node wait queue for wakeup + * @l: congested link + * Wake up a number of waiting users, as permitted by available space + * in the send queue */ void link_prepare_wakeup(struct tipc_link *l) { - int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,}; - int imp, lim; struct sk_buff *skb, *tmp; + int imp, i = 0; skb_queue_walk_safe(&l->wakeupq, skb, tmp) { imp = TIPC_SKB_CB(skb)->chain_imp; - lim = l->backlog[imp].limit; - pnd[imp] += TIPC_SKB_CB(skb)->chain_sz; - if ((pnd[imp] + l->backlog[imp].len) >= lim) + if (l->backlog[imp].len < l->backlog[imp].limit) { + skb_unlink(skb, &l->wakeupq); + skb_queue_tail(l->inputq, skb); + } else if (i++ > 10) { break; - skb_unlink(skb, &l->wakeupq); - skb_queue_tail(l->inputq, skb); + } } } @@ -869,8 +860,7 @@ void tipc_link_reset(struct tipc_link *l) * @list: chain of buffers containing message * @xmitq: returned list of packets to be sent by caller * - * Consumes the buffer chain, except when returning -ELINKCONG, - * since the caller then may want to make more send attempts. + * Consumes the buffer chain. * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted */ @@ -879,7 +869,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, { struct tipc_msg *hdr = buf_msg(skb_peek(list)); unsigned int maxwin = l->window; - unsigned int i, imp = msg_importance(hdr); + int imp = msg_importance(hdr); unsigned int mtu = l->mtu; u16 ack = l->rcv_nxt - 1; u16 seqno = l->snd_nxt; @@ -888,19 +878,22 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, struct sk_buff_head *backlogq = &l->backlogq; struct sk_buff *skb, *_skb, *bskb; int pkt_cnt = skb_queue_len(list); + int rc = 0; - /* Match msg importance against this and all higher backlog limits: */ - if (!skb_queue_empty(backlogq)) { - for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) { - if (unlikely(l->backlog[i].len >= l->backlog[i].limit)) - return link_schedule_user(l, list); - } - } if (unlikely(msg_size(hdr) > mtu)) { skb_queue_purge(list); return -EMSGSIZE; } + /* Allow oversubscription of one data msg per source at congestion */ + if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) { + if (imp == TIPC_SYSTEM_IMPORTANCE) { + pr_warn("%s<%s>, link overflow", link_rst_msg, l->name); + return -ENOBUFS; + } + rc = link_schedule_user(l, hdr); + } + if (pkt_cnt > 1) { l->stats.sent_fragmented++; l->stats.sent_fragments += pkt_cnt; @@ -946,7 +939,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, skb_queue_splice_tail_init(list, backlogq); } l->snd_nxt = seqno; - return 0; + return rc; } void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) @@ -1043,11 +1036,17 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to, static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *inputq) { - switch (msg_user(buf_msg(skb))) { + struct tipc_msg *hdr = buf_msg(skb); + + switch (msg_user(hdr)) { case TIPC_LOW_IMPORTANCE: case TIPC_MEDIUM_IMPORTANCE: case TIPC_HIGH_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE: + if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) { + skb_queue_tail(l->bc_rcvlink->inputq, skb); + return true; + } case CONN_MANAGER: skb_queue_tail(inputq, skb); return true; diff --git a/net/tipc/msg.c b/net/tipc/msg.c index ab02d0742476..312ef7de57d7 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -607,6 +607,23 @@ error: return false; } +bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, + struct sk_buff_head *cpy) +{ + struct sk_buff *skb, *_skb; + + skb_queue_walk(msg, skb) { + _skb = pskb_copy(skb, GFP_ATOMIC); + if (!_skb) { + __skb_queue_purge(cpy); + return false; + } + msg_set_destnode(buf_msg(_skb), dst); + __skb_queue_tail(cpy, _skb); + } + return true; +} + /* tipc_skb_queue_sorted(); sort pkt into list according to sequence number * @list: list to be appended to * @seqno: sequence number of buffer to add diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 2c3dc38abf9c..c843fd2bc48d 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -98,8 +98,6 @@ struct tipc_skb_cb { u32 bytes_read; struct sk_buff *tail; bool validated; - bool wakeup_pending; - u16 chain_sz; u16 chain_imp; u16 ackers; }; @@ -633,14 +631,11 @@ static inline void msg_set_bc_netid(struct tipc_msg *m, u32 id) static inline u32 msg_link_selector(struct tipc_msg *m) { + if (msg_user(m) == MSG_FRAGMENTER) + m = (void *)msg_data(m); return msg_bits(m, 4, 0, 1); } -static inline void msg_set_link_selector(struct tipc_msg *m, u32 n) -{ - msg_set_bits(m, 4, 0, 1, n); -} - /* * Word 5 */ @@ -837,6 +832,8 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, int dsz, int mtu, struct sk_buff_head *list); bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq); +bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, + struct sk_buff_head *cpy); void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, struct sk_buff *skb); diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index e190460fe0d3..9be6592e4a6f 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -608,7 +608,7 @@ not_found: * Returns non-zero if any off-node ports overlap */ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, - u32 limit, struct tipc_plist *dports) + u32 limit, struct list_head *dports) { struct name_seq *seq; struct sub_seq *sseq; @@ -633,7 +633,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, info = sseq->info; list_for_each_entry(publ, &info->node_list, node_list) { if (publ->scope <= limit) - tipc_plist_push(dports, publ->ref); + u32_push(dports, publ->ref); } if (info->cluster_list_size != info->node_list_size) @@ -645,6 +645,39 @@ exit: return res; } +/* tipc_nametbl_lookup_dst_nodes - find broadcast destination nodes + * - Creates list of nodes that overlap the given multicast address + * - Determines if any node local ports overlap + */ +void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, + u32 upper, u32 domain, + struct tipc_nlist *nodes) +{ + struct sub_seq *sseq, *stop; + struct publication *publ; + struct name_info *info; + struct name_seq *seq; + + rcu_read_lock(); + seq = nametbl_find_seq(net, type); + if (!seq) + goto exit; + + spin_lock_bh(&seq->lock); + sseq = seq->sseqs + nameseq_locate_subseq(seq, lower); + stop = seq->sseqs + seq->first_free; + for (; sseq->lower <= upper && sseq != stop; sseq++) { + info = sseq->info; + list_for_each_entry(publ, &info->zone_list, zone_list) { + if (tipc_in_scope(domain, publ->node)) + tipc_nlist_add(nodes, publ->node); + } + } + spin_unlock_bh(&seq->lock); +exit: + rcu_read_unlock(); +} + /* * tipc_nametbl_publish - add name publication to network name tables */ @@ -1022,40 +1055,79 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) return skb->len; } -void tipc_plist_push(struct tipc_plist *pl, u32 port) +bool u32_find(struct list_head *l, u32 value) { - struct tipc_plist *nl; + struct u32_item *item; - if (likely(!pl->port)) { - pl->port = port; - return; + list_for_each_entry(item, l, list) { + if (item->value == value) + return true; } - if (pl->port == port) - return; - list_for_each_entry(nl, &pl->list, list) { - if (nl->port == port) - return; + return false; +} + +bool u32_push(struct list_head *l, u32 value) +{ + struct u32_item *item; + + list_for_each_entry(item, l, list) { + if (item->value == value) + return false; + } + item = kmalloc(sizeof(*item), GFP_ATOMIC); + if (unlikely(!item)) + return false; + + item->value = value; + list_add(&item->list, l); + return true; +} + +u32 u32_pop(struct list_head *l) +{ + struct u32_item *item; + u32 value = 0; + + if (list_empty(l)) + return 0; + item = list_first_entry(l, typeof(*item), list); + value = item->value; + list_del(&item->list); + kfree(item); + return value; +} + +bool u32_del(struct list_head *l, u32 value) +{ + struct u32_item *item, *tmp; + + list_for_each_entry_safe(item, tmp, l, list) { + if (item->value != value) + continue; + list_del(&item->list); + kfree(item); + return true; } - nl = kmalloc(sizeof(*nl), GFP_ATOMIC); - if (nl) { - nl->port = port; - list_add(&nl->list, &pl->list); + return false; +} + +void u32_list_purge(struct list_head *l) +{ + struct u32_item *item, *tmp; + + list_for_each_entry_safe(item, tmp, l, list) { + list_del(&item->list); + kfree(item); } } -u32 tipc_plist_pop(struct tipc_plist *pl) +int u32_list_len(struct list_head *l) { - struct tipc_plist *nl; - u32 port = 0; + struct u32_item *item; + int i = 0; - if (likely(list_empty(&pl->list))) { - port = pl->port; - pl->port = 0; - return port; + list_for_each_entry(item, l, list) { + i++; } - nl = list_first_entry(&pl->list, typeof(*nl), list); - port = nl->port; - list_del(&nl->list); - kfree(nl); - return port; + return i; } diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index 1524a73830f7..6ebdeb1d84a5 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -39,6 +39,7 @@ struct tipc_subscription; struct tipc_plist; +struct tipc_nlist; /* * TIPC name types reserved for internal TIPC use (both current and planned) @@ -99,7 +100,10 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb); u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, - u32 limit, struct tipc_plist *dports); + u32 limit, struct list_head *dports); +void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, + u32 upper, u32 domain, + struct tipc_nlist *nodes); struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, u32 upper, u32 scope, u32 port_ref, u32 key); @@ -116,18 +120,16 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s); int tipc_nametbl_init(struct net *net); void tipc_nametbl_stop(struct net *net); -struct tipc_plist { +struct u32_item { struct list_head list; - u32 port; + u32 value; }; -static inline void tipc_plist_init(struct tipc_plist *pl) -{ - INIT_LIST_HEAD(&pl->list); - pl->port = 0; -} - -void tipc_plist_push(struct tipc_plist *pl, u32 port); -u32 tipc_plist_pop(struct tipc_plist *pl); +bool u32_push(struct list_head *l, u32 value); +u32 u32_pop(struct list_head *l); +bool u32_find(struct list_head *l, u32 value); +bool u32_del(struct list_head *l, u32 value); +void u32_list_purge(struct list_head *l); +int u32_list_len(struct list_head *l); #endif diff --git a/net/tipc/node.c b/net/tipc/node.c index 27753325e06e..e9295fa3a554 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1172,7 +1172,7 @@ msg_full: * @list: chain of buffers containing message * @dnode: address of destination node * @selector: a number used for deterministic link selection - * Consumes the buffer chain, except when returning -ELINKCONG + * Consumes the buffer chain. * Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF */ int tipc_node_xmit(struct net *net, struct sk_buff_head *list, @@ -1211,10 +1211,10 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list, spin_unlock_bh(&le->lock); tipc_node_read_unlock(n); - if (likely(rc == 0)) - tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); - else if (rc == -ENOBUFS) + if (unlikely(rc == -ENOBUFS)) tipc_node_link_down(n, bearer_id, false); + else + tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); tipc_node_put(n); @@ -1226,20 +1226,15 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list, * messages, which will not be rejected * The only exception is datagram messages rerouted after secondary * lookup, which are rare and safe to dispose of anyway. - * TODO: Return real return value, and let callers use - * tipc_wait_for_sendpkt() where applicable */ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, u32 selector) { struct sk_buff_head head; - int rc; skb_queue_head_init(&head); __skb_queue_tail(&head, skb); - rc = tipc_node_xmit(net, &head, dnode, selector); - if (rc == -ELINKCONG) - kfree_skb(skb); + tipc_node_xmit(net, &head, dnode, selector); return 0; } @@ -1267,6 +1262,19 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb) kfree_skb(skb); } +static void tipc_node_mcast_rcv(struct tipc_node *n) +{ + struct tipc_bclink_entry *be = &n->bc_entry; + + /* 'arrvq' is under inputq2's lock protection */ + spin_lock_bh(&be->inputq2.lock); + spin_lock_bh(&be->inputq1.lock); + skb_queue_splice_tail_init(&be->inputq1, &be->arrvq); + spin_unlock_bh(&be->inputq1.lock); + spin_unlock_bh(&be->inputq2.lock); + tipc_sk_mcast_rcv(n->net, &be->arrvq, &be->inputq2); +} + static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr, int bearer_id, struct sk_buff_head *xmitq) { @@ -1340,15 +1348,8 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id if (!skb_queue_empty(&xmitq)) tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); - /* Deliver. 'arrvq' is under inputq2's lock protection */ - if (!skb_queue_empty(&be->inputq1)) { - spin_lock_bh(&be->inputq2.lock); - spin_lock_bh(&be->inputq1.lock); - skb_queue_splice_tail_init(&be->inputq1, &be->arrvq); - spin_unlock_bh(&be->inputq1.lock); - spin_unlock_bh(&be->inputq2.lock); - tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2); - } + if (!skb_queue_empty(&be->inputq1)) + tipc_node_mcast_rcv(n); if (rc & TIPC_LINK_DOWN_EVT) { /* Reception reassembly failure => reset all links to peer */ @@ -1575,6 +1576,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) if (unlikely(!skb_queue_empty(&n->bc_entry.namedq))) tipc_named_rcv(net, &n->bc_entry.namedq); + if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1))) + tipc_node_mcast_rcv(n); + if (!skb_queue_empty(&le->inputq)) tipc_sk_rcv(net, &le->inputq); diff --git a/net/tipc/node.h b/net/tipc/node.h index 39ef54c1f2ad..898c22916984 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -47,11 +47,13 @@ enum { TIPC_BCAST_SYNCH = (1 << 1), TIPC_BCAST_STATE_NACK = (1 << 2), - TIPC_BLOCK_FLOWCTL = (1 << 3) + TIPC_BLOCK_FLOWCTL = (1 << 3), + TIPC_BCAST_RCAST = (1 << 4) }; #define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \ TIPC_BCAST_STATE_NACK | \ + TIPC_BCAST_RCAST | \ TIPC_BLOCK_FLOWCTL) #define INVALID_BEARER_ID -1 diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 800caaa699a1..103d1fd058c0 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -67,16 +67,19 @@ enum { * @max_pkt: maximum packet size "hint" used when building messages sent by port * @portid: unique port identity in TIPC socket hash table * @phdr: preformatted message header used when sending messages + * #cong_links: list of congested links * @publications: list of publications for port + * @blocking_link: address of the congested link we are currently sleeping on * @pub_count: total # of publications port has made during its lifetime * @probing_state: * @conn_timeout: the time we can wait for an unresponded setup request * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue - * @link_cong: non-zero if owner must sleep because of link congestion + * @cong_link_cnt: number of congested links * @sent_unacked: # messages sent by socket, and not yet acked by peer * @rcv_unacked: # messages read by user, but not yet acked back to peer * @peer: 'connected' peer for dgram/rdm * @node: hash table node + * @mc_method: cookie for use between socket and broadcast layer * @rcu: rcu struct for tipc_sock */ struct tipc_sock { @@ -87,13 +90,13 @@ struct tipc_sock { u32 max_pkt; u32 portid; struct tipc_msg phdr; - struct list_head sock_list; + struct list_head cong_links; struct list_head publications; u32 pub_count; uint conn_timeout; atomic_t dupl_rcvcnt; bool probe_unacked; - bool link_cong; + u16 cong_link_cnt; u16 snt_unacked; u16 snd_win; u16 peer_caps; @@ -101,6 +104,7 @@ struct tipc_sock { u16 rcv_win; struct sockaddr_tipc peer; struct rhash_head node; + struct tipc_mc_method mc_method; struct rcu_head rcu; }; @@ -110,7 +114,6 @@ static void tipc_write_space(struct sock *sk); static void tipc_sock_destruct(struct sock *sk); static int tipc_release(struct socket *sock); static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); -static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p); static void tipc_sk_timeout(unsigned long data); static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, struct tipc_name_seq const *seq); @@ -119,8 +122,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); static int tipc_sk_insert(struct tipc_sock *tsk); static void tipc_sk_remove(struct tipc_sock *tsk); -static int __tipc_send_stream(struct socket *sock, struct msghdr *m, - size_t dsz); +static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz); static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz); static const struct proto_ops packet_ops; @@ -334,6 +336,49 @@ static int tipc_set_sk_state(struct sock *sk, int state) return res; } +static int tipc_sk_sock_err(struct socket *sock, long *timeout) +{ + struct sock *sk = sock->sk; + int err = sock_error(sk); + int typ = sock->type; + + if (err) + return err; + if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) { + if (sk->sk_state == TIPC_DISCONNECTING) + return -EPIPE; + else if (!tipc_sk_connected(sk)) + return -ENOTCONN; + } + if (!*timeout) + return -EAGAIN; + if (signal_pending(current)) + return sock_intr_errno(*timeout); + + return 0; +} + +#define tipc_wait_for_cond(sock_, timeout_, condition_) \ +({ \ + int rc_ = 0; \ + int done_ = 0; \ + \ + while (!(condition_) && !done_) { \ + struct sock *sk_ = sock->sk; \ + DEFINE_WAIT_FUNC(wait_, woken_wake_function); \ + \ + rc_ = tipc_sk_sock_err(sock_, timeout_); \ + if (rc_) \ + break; \ + prepare_to_wait(sk_sleep(sk_), &wait_, \ + TASK_INTERRUPTIBLE); \ + done_ = sk_wait_event(sk_, timeout_, \ + (condition_), &wait_); \ + remove_wait_queue(sk_sleep(sk_), &wait_); \ + } \ + rc_; \ +}) + /** * tipc_sk_create - create a TIPC socket * @net: network namespace (must be default network) @@ -382,6 +427,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, tsk = tipc_sk(sk); tsk->max_pkt = MAX_PKT_DEFAULT; INIT_LIST_HEAD(&tsk->publications); + INIT_LIST_HEAD(&tsk->cong_links); msg = &tsk->phdr; tn = net_generic(sock_net(sk), tipc_net_id); tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, @@ -432,9 +478,14 @@ static void __tipc_shutdown(struct socket *sock, int error) struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); struct net *net = sock_net(sk); + long timeout = CONN_TIMEOUT_DEFAULT; u32 dnode = tsk_peer_node(tsk); struct sk_buff *skb; + /* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */ + tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt && + !tsk_conn_cong(tsk))); + /* Reject all unreceived messages, except on an active connection * (which disconnects locally & sends a 'FIN+' to peer). */ @@ -505,7 +556,8 @@ static int tipc_release(struct socket *sock) /* Reject any messages that accumulated in backlog queue */ release_sock(sk); - + u32_list_purge(&tsk->cong_links); + tsk->cong_link_cnt = 0; call_rcu(&tsk->rcu, tipc_sk_callback); sock->sk = NULL; @@ -648,7 +700,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, switch (sk->sk_state) { case TIPC_ESTABLISHED: - if (!tsk->link_cong && !tsk_conn_cong(tsk)) + if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk)) mask |= POLLOUT; /* fall thru' */ case TIPC_LISTEN: @@ -657,7 +709,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, mask |= (POLLIN | POLLRDNORM); break; case TIPC_OPEN: - if (!tsk->link_cong) + if (!tsk->cong_link_cnt) mask |= POLLOUT; if (tipc_sk_type_connectionless(sk) && (!skb_queue_empty(&sk->sk_receive_queue))) @@ -676,63 +728,60 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, * @sock: socket structure * @seq: destination address * @msg: message to send - * @dsz: total length of message data - * @timeo: timeout to wait for wakeup + * @dlen: length of data to send + * @timeout: timeout to wait for wakeup * * Called from function tipc_sendmsg(), which has done all sanity checks * Returns the number of bytes sent on success, or errno */ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, - struct msghdr *msg, size_t dsz, long timeo) + struct msghdr *msg, size_t dlen, long timeout) { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_msg *hdr = &tsk->phdr; struct net *net = sock_net(sk); - struct tipc_msg *mhdr = &tsk->phdr; - struct sk_buff_head pktchain; - struct iov_iter save = msg->msg_iter; - uint mtu; + int mtu = tipc_bcast_get_mtu(net); + struct tipc_mc_method *method = &tsk->mc_method; + u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE); + struct sk_buff_head pkts; + struct tipc_nlist dsts; int rc; - if (!timeo && tsk->link_cong) - return -ELINKCONG; + /* Block or return if any destination link is congested */ + rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); + if (unlikely(rc)) + return rc; - msg_set_type(mhdr, TIPC_MCAST_MSG); - msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE); - msg_set_destport(mhdr, 0); - msg_set_destnode(mhdr, 0); - msg_set_nametype(mhdr, seq->type); - msg_set_namelower(mhdr, seq->lower); - msg_set_nameupper(mhdr, seq->upper); - msg_set_hdr_sz(mhdr, MCAST_H_SIZE); + /* Lookup destination nodes */ + tipc_nlist_init(&dsts, tipc_own_addr(net)); + tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower, + seq->upper, domain, &dsts); + if (!dsts.local && !dsts.remote) + return -EHOSTUNREACH; - skb_queue_head_init(&pktchain); + /* Build message header */ + msg_set_type(hdr, TIPC_MCAST_MSG); + msg_set_hdr_sz(hdr, MCAST_H_SIZE); + msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE); + msg_set_destport(hdr, 0); + msg_set_destnode(hdr, 0); + msg_set_nametype(hdr, seq->type); + msg_set_namelower(hdr, seq->lower); + msg_set_nameupper(hdr, seq->upper); -new_mtu: - mtu = tipc_bcast_get_mtu(net); - rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain); - if (unlikely(rc < 0)) - return rc; + /* Build message as chain of buffers */ + skb_queue_head_init(&pkts); + rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts); - do { - rc = tipc_bcast_xmit(net, &pktchain); - if (likely(!rc)) - return dsz; - - if (rc == -ELINKCONG) { - tsk->link_cong = 1; - rc = tipc_wait_for_sndmsg(sock, &timeo); - if (!rc) - continue; - } - __skb_queue_purge(&pktchain); - if (rc == -EMSGSIZE) { - msg->msg_iter = save; - goto new_mtu; - } - break; - } while (1); - return rc; + /* Send message if build was successful */ + if (unlikely(rc == dlen)) + rc = tipc_mcast_xmit(net, &pkts, method, &dsts, + &tsk->cong_link_cnt); + + tipc_nlist_purge(&dsts); + + return rc ? rc : dlen; } /** @@ -746,7 +795,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, struct sk_buff_head *inputq) { struct tipc_msg *msg; - struct tipc_plist dports; + struct list_head dports; u32 portid; u32 scope = TIPC_CLUSTER_SCOPE; struct sk_buff_head tmpq; @@ -754,7 +803,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, struct sk_buff *skb, *_skb; __skb_queue_head_init(&tmpq); - tipc_plist_init(&dports); + INIT_LIST_HEAD(&dports); skb = tipc_skb_peek(arrvq, &inputq->lock); for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { @@ -768,8 +817,8 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), msg_nameupper(msg), scope, &dports); - portid = tipc_plist_pop(&dports); - for (; portid; portid = tipc_plist_pop(&dports)) { + portid = u32_pop(&dports); + for (; portid; portid = u32_pop(&dports)) { _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); if (_skb) { msg_set_destport(buf_msg(_skb), portid); @@ -830,31 +879,6 @@ exit: kfree_skb(skb); } -static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) -{ - DEFINE_WAIT_FUNC(wait, woken_wake_function); - struct sock *sk = sock->sk; - struct tipc_sock *tsk = tipc_sk(sk); - int done; - - do { - int err = sock_error(sk); - if (err) - return err; - if (sk->sk_shutdown & SEND_SHUTDOWN) - return -EPIPE; - if (!*timeo_p) - return -EAGAIN; - if (signal_pending(current)) - return sock_intr_errno(*timeo_p); - - add_wait_queue(sk_sleep(sk), &wait); - done = sk_wait_event(sk, timeo_p, !tsk->link_cong, &wait); - remove_wait_queue(sk_sleep(sk), &wait); - } while (!done); - return 0; -} - /** * tipc_sendmsg - send message in connectionless manner * @sock: socket structure @@ -881,35 +905,38 @@ static int tipc_sendmsg(struct socket *sock, return ret; } -static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) +static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) { - DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); struct sock *sk = sock->sk; - struct tipc_sock *tsk = tipc_sk(sk); struct net *net = sock_net(sk); - struct tipc_msg *mhdr = &tsk->phdr; - u32 dnode, dport; - struct sk_buff_head pktchain; - bool is_connectionless = tipc_sk_type_connectionless(sk); - struct sk_buff *skb; + struct tipc_sock *tsk = tipc_sk(sk); + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); + long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); + struct list_head *clinks = &tsk->cong_links; + bool syn = !tipc_sk_type_connectionless(sk); + struct tipc_msg *hdr = &tsk->phdr; struct tipc_name_seq *seq; - struct iov_iter save; - u32 mtu; - long timeo; - int rc; + struct sk_buff_head pkts; + u32 type, inst, domain; + u32 dnode, dport; + int mtu, rc; - if (dsz > TIPC_MAX_USER_MSG_SIZE) + if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) return -EMSGSIZE; + if (unlikely(!dest)) { - if (is_connectionless && tsk->peer.family == AF_TIPC) - dest = &tsk->peer; - else + dest = &tsk->peer; + if (!syn || dest->family != AF_TIPC) return -EDESTADDRREQ; - } else if (unlikely(m->msg_namelen < sizeof(*dest)) || - dest->family != AF_TIPC) { - return -EINVAL; } - if (!is_connectionless) { + + if (unlikely(m->msg_namelen < sizeof(*dest))) + return -EINVAL; + + if (unlikely(dest->family != AF_TIPC)) + return -EINVAL; + + if (unlikely(syn)) { if (sk->sk_state == TIPC_LISTEN) return -EPIPE; if (sk->sk_state != TIPC_OPEN) @@ -921,102 +948,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) tsk->conn_instance = dest->addr.name.name.instance; } } - seq = &dest->addr.nameseq; - timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); - if (dest->addrtype == TIPC_ADDR_MCAST) { - return tipc_sendmcast(sock, seq, m, dsz, timeo); - } else if (dest->addrtype == TIPC_ADDR_NAME) { - u32 type = dest->addr.name.name.type; - u32 inst = dest->addr.name.name.instance; - u32 domain = dest->addr.name.domain; + seq = &dest->addr.nameseq; + if (dest->addrtype == TIPC_ADDR_MCAST) + return tipc_sendmcast(sock, seq, m, dlen, timeout); + if (dest->addrtype == TIPC_ADDR_NAME) { + type = dest->addr.name.name.type; + inst = dest->addr.name.name.instance; + domain = dest->addr.name.domain; dnode = domain; - msg_set_type(mhdr, TIPC_NAMED_MSG); - msg_set_hdr_sz(mhdr, NAMED_H_SIZE); - msg_set_nametype(mhdr, type); - msg_set_nameinst(mhdr, inst); - msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); + msg_set_type(hdr, TIPC_NAMED_MSG); + msg_set_hdr_sz(hdr, NAMED_H_SIZE); + msg_set_nametype(hdr, type); + msg_set_nameinst(hdr, inst); + msg_set_lookup_scope(hdr, tipc_addr_scope(domain)); dport = tipc_nametbl_translate(net, type, inst, &dnode); - msg_set_destnode(mhdr, dnode); - msg_set_destport(mhdr, dport); + msg_set_destnode(hdr, dnode); + msg_set_destport(hdr, dport); if (unlikely(!dport && !dnode)) return -EHOSTUNREACH; + } else if (dest->addrtype == TIPC_ADDR_ID) { dnode = dest->addr.id.node; - msg_set_type(mhdr, TIPC_DIRECT_MSG); - msg_set_lookup_scope(mhdr, 0); - msg_set_destnode(mhdr, dnode); - msg_set_destport(mhdr, dest->addr.id.ref); - msg_set_hdr_sz(mhdr, BASIC_H_SIZE); + msg_set_type(hdr, TIPC_DIRECT_MSG); + msg_set_lookup_scope(hdr, 0); + msg_set_destnode(hdr, dnode); + msg_set_destport(hdr, dest->addr.id.ref); + msg_set_hdr_sz(hdr, BASIC_H_SIZE); } - skb_queue_head_init(&pktchain); - save = m->msg_iter; -new_mtu: - mtu = tipc_node_get_mtu(net, dnode, tsk->portid); - rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain); - if (rc < 0) + /* Block or return if destination link is congested */ + rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode)); + if (unlikely(rc)) return rc; - do { - skb = skb_peek(&pktchain); - TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; - rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid); - if (likely(!rc)) { - if (!is_connectionless) - tipc_set_sk_state(sk, TIPC_CONNECTING); - return dsz; - } - if (rc == -ELINKCONG) { - tsk->link_cong = 1; - rc = tipc_wait_for_sndmsg(sock, &timeo); - if (!rc) - continue; - } - __skb_queue_purge(&pktchain); - if (rc == -EMSGSIZE) { - m->msg_iter = save; - goto new_mtu; - } - break; - } while (1); - - return rc; -} + skb_queue_head_init(&pkts); + mtu = tipc_node_get_mtu(net, dnode, tsk->portid); + rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); + if (unlikely(rc != dlen)) + return rc; -static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) -{ - DEFINE_WAIT_FUNC(wait, woken_wake_function); - struct sock *sk = sock->sk; - struct tipc_sock *tsk = tipc_sk(sk); - int done; + rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); + if (unlikely(rc == -ELINKCONG)) { + u32_push(clinks, dnode); + tsk->cong_link_cnt++; + rc = 0; + } - do { - int err = sock_error(sk); - if (err) - return err; - if (sk->sk_state == TIPC_DISCONNECTING) - return -EPIPE; - else if (!tipc_sk_connected(sk)) - return -ENOTCONN; - if (!*timeo_p) - return -EAGAIN; - if (signal_pending(current)) - return sock_intr_errno(*timeo_p); + if (unlikely(syn && !rc)) + tipc_set_sk_state(sk, TIPC_CONNECTING); - add_wait_queue(sk_sleep(sk), &wait); - done = sk_wait_event(sk, timeo_p, - (!tsk->link_cong && - !tsk_conn_cong(tsk)) || - !tipc_sk_connected(sk), &wait); - remove_wait_queue(sk_sleep(sk), &wait); - } while (!done); - return 0; + return rc ? rc : dlen; } /** - * tipc_send_stream - send stream-oriented data + * tipc_sendstream - send stream-oriented data * @sock: socket structure * @m: data to send * @dsz: total length of data to be transmitted @@ -1026,94 +1013,69 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) * Returns the number of bytes sent on success (or partial success), * or errno if no data sent */ -static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) +static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz) { struct sock *sk = sock->sk; int ret; lock_sock(sk); - ret = __tipc_send_stream(sock, m, dsz); + ret = __tipc_sendstream(sock, m, dsz); release_sock(sk); return ret; } -static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) +static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen) { struct sock *sk = sock->sk; - struct net *net = sock_net(sk); - struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_msg *mhdr = &tsk->phdr; - struct sk_buff_head pktchain; DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); - u32 portid = tsk->portid; - int rc = -EINVAL; - long timeo; - u32 dnode; - uint mtu, send, sent = 0; - struct iov_iter save; - int hlen = MIN_H_SIZE; - - /* Handle implied connection establishment */ - if (unlikely(dest)) { - rc = __tipc_sendmsg(sock, m, dsz); - hlen = msg_hdr_sz(mhdr); - if (dsz && (dsz == rc)) - tsk->snt_unacked = tsk_inc(tsk, dsz + hlen); - return rc; - } - if (dsz > (uint)INT_MAX) - return -EMSGSIZE; - - if (unlikely(!tipc_sk_connected(sk))) { - if (sk->sk_state == TIPC_DISCONNECTING) - return -EPIPE; - else - return -ENOTCONN; - } + long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_msg *hdr = &tsk->phdr; + struct net *net = sock_net(sk); + struct sk_buff_head pkts; + u32 dnode = tsk_peer_node(tsk); + int send, sent = 0; + int rc = 0; - timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); - if (!timeo && tsk->link_cong) - return -ELINKCONG; + skb_queue_head_init(&pkts); - dnode = tsk_peer_node(tsk); - skb_queue_head_init(&pktchain); + if (unlikely(dlen > INT_MAX)) + return -EMSGSIZE; -next: - save = m->msg_iter; - mtu = tsk->max_pkt; - send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); - rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain); - if (unlikely(rc < 0)) + /* Handle implicit connection setup */ + if (unlikely(dest)) { + rc = __tipc_sendmsg(sock, m, dlen); + if (dlen && (dlen == rc)) + tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr)); return rc; + } do { - if (likely(!tsk_conn_cong(tsk))) { - rc = tipc_node_xmit(net, &pktchain, dnode, portid); - if (likely(!rc)) { - tsk->snt_unacked += tsk_inc(tsk, send + hlen); - sent += send; - if (sent == dsz) - return dsz; - goto next; - } - if (rc == -EMSGSIZE) { - __skb_queue_purge(&pktchain); - tsk->max_pkt = tipc_node_get_mtu(net, dnode, - portid); - m->msg_iter = save; - goto next; - } - if (rc != -ELINKCONG) - break; + rc = tipc_wait_for_cond(sock, &timeout, + (!tsk->cong_link_cnt && + !tsk_conn_cong(tsk) && + tipc_sk_connected(sk))); + if (unlikely(rc)) + break; + + send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE); + rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts); + if (unlikely(rc != send)) + break; - tsk->link_cong = 1; + rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); + if (unlikely(rc == -ELINKCONG)) { + tsk->cong_link_cnt = 1; + rc = 0; + } + if (likely(!rc)) { + tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE); + sent += send; } - rc = tipc_wait_for_sndpkt(sock, &timeo); - } while (!rc); + } while (sent < dlen && !rc); - __skb_queue_purge(&pktchain); - return sent ? sent : rc; + return rc ? rc : sent; } /** @@ -1131,7 +1093,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz) if (dsz > TIPC_MAX_USER_MSG_SIZE) return -EMSGSIZE; - return tipc_send_stream(sock, m, dsz); + return tipc_sendstream(sock, m, dsz); } /* tipc_sk_finish_conn - complete the setup of a connection @@ -1698,6 +1660,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb, unsigned int limit = rcvbuf_limit(sk, skb); int err = TIPC_OK; int usr = msg_user(hdr); + u32 onode; if (unlikely(msg_user(hdr) == CONN_MANAGER)) { tipc_sk_proto_rcv(tsk, skb, xmitq); @@ -1705,8 +1668,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb, } if (unlikely(usr == SOCK_WAKEUP)) { + onode = msg_orignode(hdr); kfree_skb(skb); - tsk->link_cong = 0; + u32_del(&tsk->cong_links, onode); + tsk->cong_link_cnt--; sk->sk_write_space(sk); return false; } @@ -2114,7 +2079,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags) struct msghdr m = {NULL,}; tsk_advance_rx_queue(sk); - __tipc_send_stream(new_sock, &m, 0); + __tipc_sendstream(new_sock, &m, 0); } else { __skb_dequeue(&sk->sk_receive_queue); __skb_queue_head(&new_sk->sk_receive_queue, buf); @@ -2382,18 +2347,29 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - u32 value; - int res; + u32 value = 0; + int res = 0; if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM)) return 0; if (lvl != SOL_TIPC) return -ENOPROTOOPT; - if (ol < sizeof(value)) - return -EINVAL; - res = get_user(value, (u32 __user *)ov); - if (res) - return res; + + switch (opt) { + case TIPC_IMPORTANCE: + case TIPC_SRC_DROPPABLE: + case TIPC_DEST_DROPPABLE: + case TIPC_CONN_TIMEOUT: + if (ol < sizeof(value)) + return -EINVAL; + res = get_user(value, (u32 __user *)ov); + if (res) + return res; + break; + default: + if (ov || ol) + return -EINVAL; + } lock_sock(sk); @@ -2412,7 +2388,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, break; case TIPC_CONN_TIMEOUT: tipc_sk(sk)->conn_timeout = value; - /* no need to set "res", since already 0 at this point */ + break; + case TIPC_MCAST_BROADCAST: + tsk->mc_method.rcast = false; + tsk->mc_method.mandatory = true; + break; + case TIPC_MCAST_REPLICAST: + tsk->mc_method.rcast = true; + tsk->mc_method.mandatory = true; break; default: res = -EINVAL; @@ -2575,7 +2558,7 @@ static const struct proto_ops stream_ops = { .shutdown = tipc_shutdown, .setsockopt = tipc_setsockopt, .getsockopt = tipc_getsockopt, - .sendmsg = tipc_send_stream, + .sendmsg = tipc_sendstream, .recvmsg = tipc_recv_stream, .mmap = sock_no_mmap, .sendpage = sock_no_sendpage diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c index b58dc95f3d35..46061cf48cd1 100644 --- a/net/tipc/udp_media.c +++ b/net/tipc/udp_media.c @@ -113,7 +113,7 @@ static void tipc_udp_media_addr_set(struct tipc_media_addr *addr, memcpy(addr->value, ua, sizeof(struct udp_media_addr)); if (tipc_udp_is_mcast_addr(ua)) - addr->broadcast = 1; + addr->broadcast = TIPC_BROADCAST_SUPPORT; } /* tipc_udp_addr2str - convert ip/udp address to string */ @@ -229,7 +229,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb, goto out; } - if (!addr->broadcast || list_empty(&ub->rcast.list)) + if (addr->broadcast != TIPC_REPLICAST_SUPPORT) return tipc_udp_xmit(net, skb, ub, src, dst); /* Replicast, send an skb to each configured IP address */ @@ -296,7 +296,7 @@ static int tipc_udp_rcast_add(struct tipc_bearer *b, else if (ntohs(addr->proto) == ETH_P_IPV6) pr_info("New replicast peer: %pI6\n", &rcast->addr.ipv6); #endif - + b->bcast_addr.broadcast = TIPC_REPLICAST_SUPPORT; list_add_rcu(&rcast->list, &ub->rcast.list); return 0; } @@ -681,7 +681,7 @@ static int tipc_udp_enable(struct net *net, struct tipc_bearer *b, goto err; b->bcast_addr.media_id = TIPC_MEDIA_TYPE_UDP; - b->bcast_addr.broadcast = 1; + b->bcast_addr.broadcast = TIPC_BROADCAST_SUPPORT; rcu_assign_pointer(b->media_ptr, ub); rcu_assign_pointer(ub->bearer, b); tipc_udp_media_addr_set(&b->addr, &local); |