diff options
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r-- | net/tipc/bcast.c | 988 |
1 files changed, 237 insertions, 751 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index eadba62afa85..9dc239dfe192 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -35,742 +35,301 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include <linux/tipc_config.h> #include "socket.h" #include "msg.h" #include "bcast.h" #include "name_distr.h" -#include "core.h" +#include "link.h" +#include "node.h" -#define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */ #define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */ #define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ const char tipc_bclink_name[] = "broadcast-link"; -static void tipc_nmap_diff(struct tipc_node_map *nm_a, - struct tipc_node_map *nm_b, - struct tipc_node_map *nm_diff); -static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); -static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); - -static void tipc_bclink_lock(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - - spin_lock_bh(&tn->bclink->lock); -} - -static void tipc_bclink_unlock(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - - spin_unlock_bh(&tn->bclink->lock); -} - -void tipc_bclink_input(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - - tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq); -} - -uint tipc_bclink_get_mtu(void) -{ - return MAX_PKT_DEFAULT_MCAST; -} - -static u32 bcbuf_acks(struct sk_buff *buf) -{ - return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; -} - -static void bcbuf_set_acks(struct sk_buff *buf, u32 acks) -{ - TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks; -} - -static void bcbuf_decr_acks(struct sk_buff *buf) -{ - bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); -} +/** + * struct tipc_bc_base - base structure for keeping broadcast send state + * @link: broadcast send link structure + * @inputq: data input queue; will only carry SOCK_WAKEUP messages + * @dest: array keeping number of reachable destinations per bearer + * @primary_bearer: a bearer having links to all broadcast destinations, if any + */ +struct tipc_bc_base { + struct tipc_link *link; + struct sk_buff_head inputq; + int dests[MAX_BEARERS]; + int primary_bearer; +}; -void tipc_bclink_add_node(struct net *net, u32 addr) +static struct tipc_bc_base *tipc_bc_base(struct net *net) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - - tipc_bclink_lock(net); - tipc_nmap_add(&tn->bclink->bcast_nodes, addr); - tipc_bclink_unlock(net); + return tipc_net(net)->bcbase; } -void tipc_bclink_remove_node(struct net *net, u32 addr) +int tipc_bcast_get_mtu(struct net *net) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - - tipc_bclink_lock(net); - tipc_nmap_remove(&tn->bclink->bcast_nodes, addr); - - /* Last node? => reset backlog queue */ - if (!tn->bclink->bcast_nodes.count) - tipc_link_purge_backlog(&tn->bclink->link); - - tipc_bclink_unlock(net); + return tipc_link_mtu(tipc_bc_sndlink(net)); } -static void bclink_set_last_sent(struct net *net) +/* tipc_bcbase_select_primary(): find a bearer with links to all destinations, + * if any, and make it primary bearer + */ +static void tipc_bcbase_select_primary(struct net *net) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_link *bcl = tn->bcl; + struct tipc_bc_base *bb = tipc_bc_base(net); + int all_dests = tipc_link_bc_peers(bb->link); + int i, mtu; - bcl->silent_intv_cnt = mod(bcl->snd_nxt - 1); -} + bb->primary_bearer = INVALID_BEARER_ID; -u32 tipc_bclink_get_last_sent(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); + if (!all_dests) + return; - return tn->bcl->silent_intv_cnt; -} + for (i = 0; i < MAX_BEARERS; i++) { + if (!bb->dests[i]) + continue; -static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) -{ - node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ? - seqno : node->bclink.last_sent; -} + mtu = tipc_bearer_mtu(net, i); + if (mtu < tipc_link_mtu(bb->link)) + tipc_link_set_mtu(bb->link, mtu); -/** - * tipc_bclink_retransmit_to - get most recent node to request retransmission - * - * Called with bclink_lock locked - */ -struct tipc_node *tipc_bclink_retransmit_to(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - - return tn->bclink->retransmit_to; -} + if (bb->dests[i] < all_dests) + continue; -/** - * bclink_retransmit_pkt - retransmit broadcast packets - * @after: sequence number of last packet to *not* retransmit - * @to: sequence number of last packet to retransmit - * - * Called with bclink_lock locked - */ -static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to) -{ - struct sk_buff *skb; - struct tipc_link *bcl = tn->bcl; + bb->primary_bearer = i; - skb_queue_walk(&bcl->transmq, skb) { - if (more(buf_seqno(skb), after)) { - tipc_link_retransmit(bcl, skb, mod(to - after)); + /* Reduce risk that all nodes select same primary */ + if ((i ^ tipc_own_addr(net)) & 1) break; - } } } -/** - * bclink_prepare_wakeup - prepare users for wakeup after congestion - * @bcl: broadcast link - * @resultq: queue for users which can be woken up - * Move a number of waiting users, as permitted by available space in - * the send queue, from link wait queue to specified queue for wakeup - */ -static void bclink_prepare_wakeup(struct tipc_link *bcl, struct sk_buff_head *resultq) +void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id) { - int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,}; - int imp, lim; - struct sk_buff *skb, *tmp; - - skb_queue_walk_safe(&bcl->wakeupq, skb, tmp) { - imp = TIPC_SKB_CB(skb)->chain_imp; - lim = bcl->window + bcl->backlog[imp].limit; - pnd[imp] += TIPC_SKB_CB(skb)->chain_sz; - if ((pnd[imp] + bcl->backlog[imp].len) >= lim) - continue; - skb_unlink(skb, &bcl->wakeupq); - skb_queue_tail(resultq, skb); - } -} + struct tipc_bc_base *bb = tipc_bc_base(net); -/** - * tipc_bclink_wakeup_users - wake up pending users - * - * Called with no locks taken - */ -void tipc_bclink_wakeup_users(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_link *bcl = tn->bcl; - struct sk_buff_head resultq; - - skb_queue_head_init(&resultq); - bclink_prepare_wakeup(bcl, &resultq); - tipc_sk_rcv(net, &resultq); + tipc_bcast_lock(net); + bb->dests[bearer_id]++; + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); } -/** - * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets - * @n_ptr: node that sent acknowledgement info - * @acked: broadcast sequence # that has been acknowledged - * - * Node is locked, bclink_lock unlocked. - */ -void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) +void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id) { - struct sk_buff *skb, *tmp; - unsigned int released = 0; - struct net *net = n_ptr->net; - struct tipc_net *tn = net_generic(net, tipc_net_id); - - if (unlikely(!n_ptr->bclink.recv_permitted)) - return; + struct tipc_bc_base *bb = tipc_bc_base(net); - tipc_bclink_lock(net); - - /* Bail out if tx queue is empty (no clean up is required) */ - skb = skb_peek(&tn->bcl->transmq); - if (!skb) - goto exit; - - /* Determine which messages need to be acknowledged */ - if (acked == INVALID_LINK_SEQ) { - /* - * Contact with specified node has been lost, so need to - * acknowledge sent messages only (if other nodes still exist) - * or both sent and unsent messages (otherwise) - */ - if (tn->bclink->bcast_nodes.count) - acked = tn->bcl->silent_intv_cnt; - else - acked = tn->bcl->snd_nxt; - } else { - /* - * Bail out if specified sequence number does not correspond - * to a message that has been sent and not yet acknowledged - */ - if (less(acked, buf_seqno(skb)) || - less(tn->bcl->silent_intv_cnt, acked) || - less_eq(acked, n_ptr->bclink.acked)) - goto exit; - } - - /* Skip over packets that node has previously acknowledged */ - skb_queue_walk(&tn->bcl->transmq, skb) { - if (more(buf_seqno(skb), n_ptr->bclink.acked)) - break; - } - - /* Update packets that node is now acknowledging */ - skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) { - if (more(buf_seqno(skb), acked)) - break; - bcbuf_decr_acks(skb); - bclink_set_last_sent(net); - if (bcbuf_acks(skb) == 0) { - __skb_unlink(skb, &tn->bcl->transmq); - kfree_skb(skb); - released = 1; - } - } - n_ptr->bclink.acked = acked; - - /* Try resolving broadcast link congestion, if necessary */ - if (unlikely(skb_peek(&tn->bcl->backlogq))) { - tipc_link_push_packets(tn->bcl); - bclink_set_last_sent(net); - } - if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq))) - n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS; -exit: - tipc_bclink_unlock(net); + tipc_bcast_lock(net); + bb->dests[bearer_id]--; + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); } -/** - * tipc_bclink_update_link_state - update broadcast link state +/* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers * - * RCU and node lock set + * Note that number of reachable destinations, as indicated in the dests[] + * array, may transitionally differ from the number of destinations indicated + * in each sent buffer. We can sustain this. Excess destination nodes will + * drop and never acknowledge the unexpected packets, and missing destinations + * will either require retransmission (if they are just about to be added to + * the bearer), or be removed from the buffer's 'ackers' counter (if they + * just went down) */ -void tipc_bclink_update_link_state(struct tipc_node *n_ptr, - u32 last_sent) +static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq) { - struct sk_buff *buf; - struct net *net = n_ptr->net; - struct tipc_net *tn = net_generic(net, tipc_net_id); - - /* Ignore "stale" link state info */ - if (less_eq(last_sent, n_ptr->bclink.last_in)) - return; + int bearer_id; + struct tipc_bc_base *bb = tipc_bc_base(net); + struct sk_buff *skb, *_skb; + struct sk_buff_head _xmitq; - /* Update link synchronization state; quit if in sync */ - bclink_update_last_sent(n_ptr, last_sent); - - if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) + if (skb_queue_empty(xmitq)) return; - /* Update out-of-sync state; quit if loss is still unconfirmed */ - if ((++n_ptr->bclink.oos_state) == 1) { - if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) - return; - n_ptr->bclink.oos_state++; - } - - /* Don't NACK if one has been recently sent (or seen) */ - if (n_ptr->bclink.oos_state & 0x1) + /* The typical case: at least one bearer has links to all nodes */ + bearer_id = bb->primary_bearer; + if (bearer_id >= 0) { + tipc_bearer_bc_xmit(net, bearer_id, xmitq); return; - - /* Send NACK */ - buf = tipc_buf_acquire(INT_H_SIZE); - if (buf) { - struct tipc_msg *msg = buf_msg(buf); - struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq); - u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent; - - tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG, - INT_H_SIZE, n_ptr->addr); - msg_set_non_seq(msg, 1); - msg_set_mc_netid(msg, tn->net_id); - msg_set_bcast_ack(msg, n_ptr->bclink.last_in); - msg_set_bcgap_after(msg, n_ptr->bclink.last_in); - msg_set_bcgap_to(msg, to); - - tipc_bclink_lock(net); - tipc_bearer_send(net, MAX_BEARERS, buf, NULL); - tn->bcl->stats.sent_nacks++; - tipc_bclink_unlock(net); - kfree_skb(buf); - - n_ptr->bclink.oos_state++; } -} -void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr) -{ - u16 last = msg_last_bcast(hdr); - int mtyp = msg_type(hdr); + /* We have to transmit across all bearers */ + skb_queue_head_init(&_xmitq); + for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { + if (!bb->dests[bearer_id]) + continue; - if (unlikely(msg_user(hdr) != LINK_PROTOCOL)) - return; - if (mtyp == STATE_MSG) { - tipc_bclink_update_link_state(n, last); - return; + skb_queue_walk(xmitq, skb) { + _skb = pskb_copy_for_clone(skb, GFP_ATOMIC); + if (!_skb) + break; + __skb_queue_tail(&_xmitq, _skb); + } + tipc_bearer_bc_xmit(net, bearer_id, &_xmitq); } - /* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization, - * and transfer synch info in LINK_PROTOCOL messages. - */ - if (tipc_node_is_up(n)) - return; - if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG)) - return; - n->bclink.last_sent = last; - n->bclink.last_in = last; - n->bclink.oos_state = 0; + __skb_queue_purge(xmitq); + __skb_queue_purge(&_xmitq); } -/** - * bclink_peek_nack - monitor retransmission requests sent by other nodes - * - * Delay any upcoming NACK by this node if another node has already - * requested the first message this node is going to ask for. - */ -static void bclink_peek_nack(struct net *net, struct tipc_msg *msg) -{ - struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg)); - - if (unlikely(!n_ptr)) - return; - - tipc_node_lock(n_ptr); - if (n_ptr->bclink.recv_permitted && - (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && - (n_ptr->bclink.last_in == msg_bcgap_after(msg))) - n_ptr->bclink.oos_state = 2; - tipc_node_unlock(n_ptr); - tipc_node_put(n_ptr); -} - -/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster +/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster * and to identified node local sockets * @net: the applicable net namespace * @list: chain of buffers containing message * Consumes the buffer chain, except when returning -ELINKCONG * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE */ -int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) +int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_link *bcl = tn->bcl; - struct tipc_bclink *bclink = tn->bclink; + struct tipc_link *l = tipc_bc_sndlink(net); + struct sk_buff_head xmitq, inputq, rcvq; int rc = 0; - int bc = 0; - struct sk_buff *skb; - struct sk_buff_head arrvq; - struct sk_buff_head inputq; - /* Prepare clone of message for local node */ - skb = tipc_msg_reassemble(list); - if (unlikely(!skb)) - return -EHOSTUNREACH; + __skb_queue_head_init(&rcvq); + __skb_queue_head_init(&xmitq); + skb_queue_head_init(&inputq); - /* Broadcast to all nodes */ - if (likely(bclink)) { - tipc_bclink_lock(net); - if (likely(bclink->bcast_nodes.count)) { - rc = __tipc_link_xmit(net, bcl, list); - if (likely(!rc)) { - u32 len = skb_queue_len(&bcl->transmq); - - bclink_set_last_sent(net); - bcl->stats.queue_sz_counts++; - bcl->stats.accu_queue_sz += len; - } - bc = 1; - } - tipc_bclink_unlock(net); - } + /* Prepare message clone for local node */ + if (unlikely(!tipc_msg_reassemble(list, &rcvq))) + return -EHOSTUNREACH; - if (unlikely(!bc)) - __skb_queue_purge(list); + tipc_bcast_lock(net); + if (tipc_link_bc_peers(l)) + rc = tipc_link_xmit(l, list, &xmitq); + tipc_bcast_unlock(net); + /* Don't send to local node if adding to link failed */ if (unlikely(rc)) { - kfree_skb(skb); + __skb_queue_purge(&rcvq); return rc; } - /* Deliver message clone */ - __skb_queue_head_init(&arrvq); - skb_queue_head_init(&inputq); - __skb_queue_tail(&arrvq, skb); - tipc_sk_mcast_rcv(net, &arrvq, &inputq); - return rc; -} -/** - * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet - * - * Called with both sending node's lock and bclink_lock taken. - */ -static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) -{ - struct tipc_net *tn = net_generic(node->net, tipc_net_id); - - bclink_update_last_sent(node, seqno); - node->bclink.last_in = seqno; - node->bclink.oos_state = 0; - tn->bcl->stats.recv_info++; - - /* - * Unicast an ACK periodically, ensuring that - * all nodes in the cluster don't ACK at the same time - */ - if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) { - tipc_link_proto_xmit(node_active_link(node, node->addr), - STATE_MSG, 0, 0, 0, 0); - tn->bcl->stats.sent_acks++; - } + /* Broadcast to all nodes, inluding local node */ + tipc_bcbase_xmit(net, &xmitq); + tipc_sk_mcast_rcv(net, &rcvq, &inputq); + __skb_queue_purge(list); + return 0; } -/** - * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards +/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link * * RCU is locked, no other locks set */ -void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) +int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_link *bcl = tn->bcl; - struct tipc_msg *msg = buf_msg(buf); - struct tipc_node *node; - u32 next_in; - u32 seqno; - int deferred = 0; - int pos = 0; - struct sk_buff *iskb; - struct sk_buff_head *arrvq, *inputq; - - /* Screen out unwanted broadcast messages */ - if (msg_mc_netid(msg) != tn->net_id) - goto exit; - - node = tipc_node_find(net, msg_prevnode(msg)); - if (unlikely(!node)) - goto exit; - - tipc_node_lock(node); - if (unlikely(!node->bclink.recv_permitted)) - goto unlock; - - /* Handle broadcast protocol message */ - if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { - if (msg_type(msg) != STATE_MSG) - goto unlock; - if (msg_destnode(msg) == tn->own_addr) { - tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); - tipc_bclink_lock(net); - bcl->stats.recv_nacks++; - tn->bclink->retransmit_to = node; - bclink_retransmit_pkt(tn, msg_bcgap_after(msg), - msg_bcgap_to(msg)); - tipc_bclink_unlock(net); - tipc_node_unlock(node); - } else { - tipc_node_unlock(node); - bclink_peek_nack(net, msg); - } - tipc_node_put(node); - goto exit; - } - - /* Handle in-sequence broadcast message */ - seqno = msg_seqno(msg); - next_in = mod(node->bclink.last_in + 1); - arrvq = &tn->bclink->arrvq; - inputq = &tn->bclink->inputq; - - if (likely(seqno == next_in)) { -receive: - /* Deliver message to destination */ - if (likely(msg_isdata(msg))) { - tipc_bclink_lock(net); - bclink_accept_pkt(node, seqno); - spin_lock_bh(&inputq->lock); - __skb_queue_tail(arrvq, buf); - spin_unlock_bh(&inputq->lock); - node->action_flags |= TIPC_BCAST_MSG_EVT; - tipc_bclink_unlock(net); - tipc_node_unlock(node); - } else if (msg_user(msg) == MSG_BUNDLER) { - tipc_bclink_lock(net); - bclink_accept_pkt(node, seqno); - bcl->stats.recv_bundles++; - bcl->stats.recv_bundled += msg_msgcnt(msg); - pos = 0; - while (tipc_msg_extract(buf, &iskb, &pos)) { - spin_lock_bh(&inputq->lock); - __skb_queue_tail(arrvq, iskb); - spin_unlock_bh(&inputq->lock); - } - node->action_flags |= TIPC_BCAST_MSG_EVT; - tipc_bclink_unlock(net); - tipc_node_unlock(node); - } else if (msg_user(msg) == MSG_FRAGMENTER) { - tipc_bclink_lock(net); - bclink_accept_pkt(node, seqno); - tipc_buf_append(&node->bclink.reasm_buf, &buf); - if (unlikely(!buf && !node->bclink.reasm_buf)) { - tipc_bclink_unlock(net); - goto unlock; - } - bcl->stats.recv_fragments++; - if (buf) { - bcl->stats.recv_fragmented++; - msg = buf_msg(buf); - tipc_bclink_unlock(net); - goto receive; - } - tipc_bclink_unlock(net); - tipc_node_unlock(node); - } else { - tipc_bclink_lock(net); - bclink_accept_pkt(node, seqno); - tipc_bclink_unlock(net); - tipc_node_unlock(node); - kfree_skb(buf); - } - buf = NULL; + struct tipc_msg *hdr = buf_msg(skb); + struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; + struct sk_buff_head xmitq; + int rc; - /* Determine new synchronization state */ - tipc_node_lock(node); - if (unlikely(!tipc_node_is_up(node))) - goto unlock; + __skb_queue_head_init(&xmitq); - if (node->bclink.last_in == node->bclink.last_sent) - goto unlock; - - if (skb_queue_empty(&node->bclink.deferdq)) { - node->bclink.oos_state = 1; - goto unlock; - } - - msg = buf_msg(skb_peek(&node->bclink.deferdq)); - seqno = msg_seqno(msg); - next_in = mod(next_in + 1); - if (seqno != next_in) - goto unlock; - - /* Take in-sequence message from deferred queue & deliver it */ - buf = __skb_dequeue(&node->bclink.deferdq); - goto receive; - } - - /* Handle out-of-sequence broadcast message */ - if (less(next_in, seqno)) { - deferred = tipc_link_defer_pkt(&node->bclink.deferdq, - buf); - bclink_update_last_sent(node, seqno); - buf = NULL; + if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) { + kfree_skb(skb); + return 0; } - tipc_bclink_lock(net); - - if (deferred) - bcl->stats.deferred_recv++; + tipc_bcast_lock(net); + if (msg_user(hdr) == BCAST_PROTOCOL) + rc = tipc_link_bc_nack_rcv(l, skb, &xmitq); else - bcl->stats.duplicates++; + rc = tipc_link_rcv(l, skb, NULL); + tipc_bcast_unlock(net); - tipc_bclink_unlock(net); + tipc_bcbase_xmit(net, &xmitq); -unlock: - tipc_node_unlock(node); - tipc_node_put(node); -exit: - kfree_skb(buf); -} + /* Any socket wakeup messages ? */ + if (!skb_queue_empty(inputq)) + tipc_sk_rcv(net, inputq); -u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) -{ - return (n_ptr->bclink.recv_permitted && - (tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked)); + return rc; } - -/** - * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer +/* tipc_bcast_ack_rcv - receive and handle a broadcast acknowledge * - * Send packet over as many bearers as necessary to reach all nodes - * that have joined the broadcast link. - * - * Returns 0 (packet sent successfully) under all circumstances, - * since the broadcast link's pseudo-bearer never blocks + * RCU is locked, no other locks set */ -static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf, - struct tipc_bearer *unused1, - struct tipc_media_addr *unused2) +void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked) { - int bp_index; - struct tipc_msg *msg = buf_msg(buf); - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_bcbearer *bcbearer = tn->bcbearer; - struct tipc_bclink *bclink = tn->bclink; - - /* Prepare broadcast link message for reliable transmission, - * if first time trying to send it; - * preparation is skipped for broadcast link protocol messages - * since they are sent in an unreliable manner and don't need it - */ - if (likely(!msg_non_seq(buf_msg(buf)))) { - bcbuf_set_acks(buf, bclink->bcast_nodes.count); - msg_set_non_seq(msg, 1); - msg_set_mc_netid(msg, tn->net_id); - tn->bcl->stats.sent_info++; - if (WARN_ON(!bclink->bcast_nodes.count)) { - dump_stack(); - return 0; - } - } + struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; + struct sk_buff_head xmitq; - /* Send buffer over bearers until all targets reached */ - bcbearer->remains = bclink->bcast_nodes; - - for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { - struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary; - struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary; - struct tipc_bearer *bp[2] = {p, s}; - struct tipc_bearer *b = bp[msg_link_selector(msg)]; - struct sk_buff *tbuf; - - if (!p) - break; /* No more bearers to try */ - if (!b) - b = p; - tipc_nmap_diff(&bcbearer->remains, &b->nodes, - &bcbearer->remains_new); - if (bcbearer->remains_new.count == bcbearer->remains.count) - continue; /* Nothing added by bearer pair */ - - if (bp_index == 0) { - /* Use original buffer for first bearer */ - tipc_bearer_send(net, b->identity, buf, &b->bcast_addr); - } else { - /* Avoid concurrent buffer access */ - tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC); - if (!tbuf) - break; - tipc_bearer_send(net, b->identity, tbuf, - &b->bcast_addr); - kfree_skb(tbuf); /* Bearer keeps a clone */ - } - if (bcbearer->remains_new.count == 0) - break; /* All targets reached */ + __skb_queue_head_init(&xmitq); - bcbearer->remains = bcbearer->remains_new; - } + tipc_bcast_lock(net); + tipc_link_bc_ack_rcv(l, acked, &xmitq); + tipc_bcast_unlock(net); - return 0; + tipc_bcbase_xmit(net, &xmitq); + + /* Any socket wakeup messages ? */ + if (!skb_queue_empty(inputq)) + tipc_sk_rcv(net, inputq); } -/** - * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer +/* tipc_bcast_synch_rcv - check and update rcv link with peer's send state + * + * RCU is locked, no other locks set */ -void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr, - u32 node, bool action) +void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, + struct tipc_msg *hdr) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_bcbearer *bcbearer = tn->bcbearer; - struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp; - struct tipc_bcbearer_pair *bp_curr; - struct tipc_bearer *b; - int b_index; - int pri; - - tipc_bclink_lock(net); + struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; + struct sk_buff_head xmitq; - if (action) - tipc_nmap_add(nm_ptr, node); - else - tipc_nmap_remove(nm_ptr, node); + __skb_queue_head_init(&xmitq); - /* Group bearers by priority (can assume max of two per priority) */ - memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); + tipc_bcast_lock(net); + if (msg_type(hdr) == STATE_MSG) { + tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq); + tipc_link_bc_sync_rcv(l, hdr, &xmitq); + } else { + tipc_link_bc_init_rcv(l, hdr); + } + tipc_bcast_unlock(net); - rcu_read_lock(); - for (b_index = 0; b_index < MAX_BEARERS; b_index++) { - b = rcu_dereference_rtnl(tn->bearer_list[b_index]); - if (!b || !b->nodes.count) - continue; + tipc_bcbase_xmit(net, &xmitq); - if (!bp_temp[b->priority].primary) - bp_temp[b->priority].primary = b; - else - bp_temp[b->priority].secondary = b; - } - rcu_read_unlock(); + /* Any socket wakeup messages ? */ + if (!skb_queue_empty(inputq)) + tipc_sk_rcv(net, inputq); +} - /* Create array of bearer pairs for broadcasting */ - bp_curr = bcbearer->bpairs; - memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs)); +/* tipc_bcast_add_peer - add a peer node to broadcast link and bearer + * + * RCU is locked, node lock is set + */ +void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l, + struct sk_buff_head *xmitq) +{ + struct tipc_link *snd_l = tipc_bc_sndlink(net); - for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) { + tipc_bcast_lock(net); + tipc_link_add_bc_peer(snd_l, uc_l, xmitq); + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); +} - if (!bp_temp[pri].primary) - continue; +/* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer + * + * RCU is locked, node lock is set + */ +void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l) +{ + struct tipc_link *snd_l = tipc_bc_sndlink(net); + struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; + struct sk_buff_head xmitq; - bp_curr->primary = bp_temp[pri].primary; + __skb_queue_head_init(&xmitq); - if (bp_temp[pri].secondary) { - if (tipc_nmap_equal(&bp_temp[pri].primary->nodes, - &bp_temp[pri].secondary->nodes)) { - bp_curr->secondary = bp_temp[pri].secondary; - } else { - bp_curr++; - bp_curr->primary = bp_temp[pri].secondary; - } - } + tipc_bcast_lock(net); + tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); - bp_curr++; - } + tipc_bcbase_xmit(net, &xmitq); - tipc_bclink_unlock(net); + /* Any socket wakeup messages ? */ + if (!skb_queue_empty(inputq)) + tipc_sk_rcv(net, inputq); } static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb, @@ -836,7 +395,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg) if (!bcl) return 0; - tipc_bclink_lock(net); + tipc_bcast_lock(net); hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family, NLM_F_MULTI, TIPC_NL_LINK_GET); @@ -871,7 +430,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg) if (err) goto attr_msg_full; - tipc_bclink_unlock(net); + tipc_bcast_unlock(net); nla_nest_end(msg->skb, attrs); genlmsg_end(msg->skb, hdr); @@ -882,7 +441,7 @@ prop_msg_full: attr_msg_full: nla_nest_cancel(msg->skb, attrs); msg_full: - tipc_bclink_unlock(net); + tipc_bcast_unlock(net); genlmsg_cancel(msg->skb, hdr); return -EMSGSIZE; @@ -896,26 +455,25 @@ int tipc_bclink_reset_stats(struct net *net) if (!bcl) return -ENOPROTOOPT; - tipc_bclink_lock(net); + tipc_bcast_lock(net); memset(&bcl->stats, 0, sizeof(bcl->stats)); - tipc_bclink_unlock(net); + tipc_bcast_unlock(net); return 0; } -int tipc_bclink_set_queue_limits(struct net *net, u32 limit) +static int tipc_bc_link_set_queue_limits(struct net *net, u32 limit) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_link *bcl = tn->bcl; + struct tipc_link *l = tipc_bc_sndlink(net); - if (!bcl) + if (!l) return -ENOPROTOOPT; if (limit < BCLINK_WIN_MIN) limit = BCLINK_WIN_MIN; if (limit > TIPC_MAX_LINK_WIN) return -EINVAL; - tipc_bclink_lock(net); - tipc_link_set_queue_limits(bcl, limit); - tipc_bclink_unlock(net); + tipc_bcast_lock(net); + tipc_link_set_queue_limits(l, limit); + tipc_bcast_unlock(net); return 0; } @@ -937,123 +495,51 @@ int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]) win = nla_get_u32(props[TIPC_NLA_PROP_WIN]); - return tipc_bclink_set_queue_limits(net, win); + return tipc_bc_link_set_queue_limits(net, win); } -int tipc_bclink_init(struct net *net) +int tipc_bcast_init(struct net *net) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_bcbearer *bcbearer; - struct tipc_bclink *bclink; - struct tipc_link *bcl; - - bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); - if (!bcbearer) - return -ENOMEM; - - bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); - if (!bclink) { - kfree(bcbearer); - return -ENOMEM; - } + struct tipc_net *tn = tipc_net(net); + struct tipc_bc_base *bb = NULL; + struct tipc_link *l = NULL; - bcl = &bclink->link; - bcbearer->bearer.media = &bcbearer->media; - bcbearer->media.send_msg = tipc_bcbearer_send; - sprintf(bcbearer->media.name, "tipc-broadcast"); - - spin_lock_init(&bclink->lock); - __skb_queue_head_init(&bcl->transmq); - __skb_queue_head_init(&bcl->backlogq); - __skb_queue_head_init(&bcl->deferdq); - skb_queue_head_init(&bcl->wakeupq); - bcl->snd_nxt = 1; - spin_lock_init(&bclink->node.lock); - __skb_queue_head_init(&bclink->arrvq); - skb_queue_head_init(&bclink->inputq); - bcl->owner = &bclink->node; - bcl->owner->net = net; - bcl->mtu = MAX_PKT_DEFAULT_MCAST; - tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); - bcl->bearer_id = MAX_BEARERS; - rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer); - bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg; - msg_set_prevnode(bcl->pmsg, tn->own_addr); - strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); - tn->bcbearer = bcbearer; - tn->bclink = bclink; - tn->bcl = bcl; - return 0; -} + bb = kzalloc(sizeof(*bb), GFP_ATOMIC); + if (!bb) + goto enomem; + tn->bcbase = bb; + spin_lock_init(&tipc_net(net)->bclock); -void tipc_bclink_stop(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - - tipc_bclink_lock(net); - tipc_link_purge_queues(tn->bcl); - tipc_bclink_unlock(net); - - RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL); - synchronize_net(); - kfree(tn->bcbearer); - kfree(tn->bclink); + if (!tipc_link_bc_create(net, 0, 0, + U16_MAX, + BCLINK_WIN_DEFAULT, + 0, + &bb->inputq, + NULL, + NULL, + &l)) + goto enomem; + bb->link = l; + tn->bcl = l; + return 0; +enomem: + kfree(bb); + kfree(l); + return -ENOMEM; } -/** - * tipc_nmap_add - add a node to a node map - */ -static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) +void tipc_bcast_reinit(struct net *net) { - int n = tipc_node(node); - int w = n / WSIZE; - u32 mask = (1 << (n % WSIZE)); + struct tipc_bc_base *b = tipc_bc_base(net); - if ((nm_ptr->map[w] & mask) == 0) { - nm_ptr->count++; - nm_ptr->map[w] |= mask; - } + msg_set_prevnode(b->link->pmsg, tipc_own_addr(net)); } -/** - * tipc_nmap_remove - remove a node from a node map - */ -static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) +void tipc_bcast_stop(struct net *net) { - int n = tipc_node(node); - int w = n / WSIZE; - u32 mask = (1 << (n % WSIZE)); - - if ((nm_ptr->map[w] & mask) != 0) { - nm_ptr->map[w] &= ~mask; - nm_ptr->count--; - } -} + struct tipc_net *tn = net_generic(net, tipc_net_id); -/** - * tipc_nmap_diff - find differences between node maps - * @nm_a: input node map A - * @nm_b: input node map B - * @nm_diff: output node map A-B (i.e. nodes of A that are not in B) - */ -static void tipc_nmap_diff(struct tipc_node_map *nm_a, - struct tipc_node_map *nm_b, - struct tipc_node_map *nm_diff) -{ - int stop = ARRAY_SIZE(nm_a->map); - int w; - int b; - u32 map; - - memset(nm_diff, 0, sizeof(*nm_diff)); - for (w = 0; w < stop; w++) { - map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]); - nm_diff->map[w] = map; - if (map != 0) { - for (b = 0 ; b < WSIZE; b++) { - if (map & (1 << b)) - nm_diff->count++; - } - } - } + synchronize_net(); + kfree(tn->bcbase); + kfree(tn->bcl); } |