diff options
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r-- | net/tipc/link.c | 482 |
1 files changed, 132 insertions, 350 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c index 2ccdb6ffd5c8..d5f4005f388f 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -48,7 +48,7 @@ /* * Error message prefixes */ -static const char *link_co_err = "Link changeover error, "; +static const char *link_co_err = "Link tunneling error, "; static const char *link_rst_msg = "Resetting link "; static const char *link_unk_evt = "Unknown link event "; @@ -139,24 +139,6 @@ static void tipc_link_build_bcast_sync_msg(struct tipc_link *l, static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb); static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb); -static int tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb); - -/* - * Simple link routines - */ -static unsigned int align(unsigned int i) -{ - return (i + 3) & ~3u; -} - -static struct tipc_link *tipc_parallel_link(struct tipc_link *l) -{ - struct tipc_node *n = l->owner; - - if (node_active_link(n, 0) != l) - return node_active_link(n, 0); - return node_active_link(n, 1); -} /* * Simple non-static link routines (i.e. referenced outside this file) @@ -394,12 +376,10 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt, /* Perform actions as decided by FSM */ if (actions & LINK_RESET) { l->exec_mode = TIPC_LINK_BLOCKED; - rc |= TIPC_LINK_DOWN_EVT; - } - if (actions & LINK_ACTIVATE) { - l->exec_mode = TIPC_LINK_OPEN; - rc |= TIPC_LINK_UP_EVT; + rc = TIPC_LINK_DOWN_EVT; } + if (actions & LINK_ACTIVATE) + rc = TIPC_LINK_UP_EVT; if (actions & (SND_STATE | SND_PROBE)) mtyp = STATE_MSG; if (actions & SND_RESET) @@ -461,6 +441,9 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) { int rc = 0; + if (l->exec_mode == TIPC_LINK_BLOCKED) + return rc; + link_profile_stats(l); if (l->silent_intv_cnt) rc = tipc_link_fsm_evt(l, SILENCE_EVT, xmitq); @@ -563,52 +546,42 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr) tipc_link_reset_fragments(l_ptr); } -void tipc_link_reset(struct tipc_link *l_ptr) +void tipc_link_reset(struct tipc_link *l) { - u32 prev_state = l_ptr->state; - struct tipc_node *owner = l_ptr->owner; - struct tipc_link *pl = tipc_parallel_link(l_ptr); + struct tipc_node *owner = l->owner; - msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); + l->state = TIPC_LINK_RESETTING; /* Link is down, accept any session */ - l_ptr->peer_session = WILDCARD_SESSION; + l->peer_session = WILDCARD_SESSION; - /* Prepare for renewed mtu size negotiation */ - l_ptr->mtu = l_ptr->advertised_mtu; - - l_ptr->state = TIPC_LINK_RESETTING; + /* If peer is up, it only accepts an incremented session number */ + msg_set_session(l->pmsg, msg_session(l->pmsg) + 1); - if ((prev_state == TIPC_LINK_RESETTING) || - (prev_state == TIPC_LINK_ESTABLISHING)) - return; + /* Prepare for renewed mtu size negotiation */ + l->mtu = l->advertised_mtu; - if (tipc_node_is_up(l_ptr->owner) && (pl != l_ptr)) { - l_ptr->exec_mode = TIPC_LINK_BLOCKED; - l_ptr->failover_checkpt = l_ptr->rcv_nxt; - pl->failover_pkts = FIRST_FAILOVER; - pl->failover_checkpt = l_ptr->rcv_nxt; - pl->failover_skb = l_ptr->reasm_buf; - } else { - kfree_skb(l_ptr->reasm_buf); - } /* Clean up all queues, except inputq: */ - __skb_queue_purge(&l_ptr->transmq); - __skb_queue_purge(&l_ptr->deferdq); + __skb_queue_purge(&l->transmq); + __skb_queue_purge(&l->deferdq); if (!owner->inputq) - owner->inputq = l_ptr->inputq; - skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq); + owner->inputq = l->inputq; + skb_queue_splice_init(&l->wakeupq, owner->inputq); if (!skb_queue_empty(owner->inputq)) owner->action_flags |= TIPC_MSG_EVT; - tipc_link_purge_backlog(l_ptr); - l_ptr->reasm_buf = NULL; - l_ptr->rcv_unacked = 0; - l_ptr->snd_nxt = 1; - l_ptr->rcv_nxt = 1; - l_ptr->silent_intv_cnt = 0; - l_ptr->stats.recv_info = 0; - l_ptr->stale_count = 0; - link_reset_statistics(l_ptr); + + tipc_link_purge_backlog(l); + kfree_skb(l->reasm_buf); + kfree_skb(l->failover_reasm_skb); + l->reasm_buf = NULL; + l->failover_reasm_skb = NULL; + l->rcv_unacked = 0; + l->snd_nxt = 1; + l->rcv_nxt = 1; + l->silent_intv_cnt = 0; + l->stats.recv_info = 0; + l->stale_count = 0; + link_reset_statistics(l); } /** @@ -751,20 +724,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, return 0; } -static void skb2list(struct sk_buff *skb, struct sk_buff_head *list) -{ - skb_queue_head_init(list); - __skb_queue_tail(list, skb); -} - -static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb) -{ - struct sk_buff_head head; - - skb2list(skb, &head); - return __tipc_link_xmit(link->owner->net, link, &head); -} - /* * tipc_link_sync_rcv - synchronize broadcast link endpoints. * Receive the sequence number where we should start receiving and @@ -955,32 +914,6 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm, return 0; } -/* link_synch(): check if all packets arrived before the synch - * point have been consumed - * Returns true if the parallel links are synched, otherwise false - */ -static bool link_synch(struct tipc_link *l) -{ - unsigned int post_synch; - struct tipc_link *pl; - - pl = tipc_parallel_link(l); - if (pl == l) - goto synched; - - /* Was last pre-synch packet added to input queue ? */ - if (less_eq(pl->rcv_nxt, l->synch_point)) - return false; - - /* Is it still in the input queue ? */ - post_synch = mod(pl->rcv_nxt - l->synch_point) - 1; - if (skb_queue_len(pl->inputq) > post_synch) - return false; -synched: - l->exec_mode = TIPC_LINK_OPEN; - return true; -} - /* tipc_data_input - deliver data and name distr msgs to upper layer * * Consumes buffer if message is of right type @@ -1025,54 +958,59 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb) /* tipc_link_input - process packet that has passed link protocol check * * Consumes buffer - * Node lock must be held */ -static int tipc_link_input(struct tipc_link *link, struct sk_buff *skb) +static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb) { - struct tipc_node *node = link->owner; - struct tipc_msg *msg = buf_msg(skb); + struct tipc_node *node = l->owner; + struct tipc_msg *hdr = buf_msg(skb); + struct sk_buff **reasm_skb = &l->reasm_buf; struct sk_buff *iskb; - int pos = 0; + int usr = msg_user(hdr); int rc = 0; + int pos = 0; + int ipos = 0; - switch (msg_user(msg)) { - case TUNNEL_PROTOCOL: - if (msg_dup(msg)) { - link->exec_mode = TIPC_LINK_TUNNEL; - link->synch_point = msg_seqno(msg_get_wrapped(msg)); - kfree_skb(skb); - break; + if (unlikely(usr == TUNNEL_PROTOCOL)) { + if (msg_type(hdr) == SYNCH_MSG) { + __skb_queue_purge(&l->deferdq); + goto drop; } - rc |= tipc_link_failover_rcv(link, &skb); - if (!skb) - break; - if (msg_user(buf_msg(skb)) != MSG_BUNDLER) { - tipc_data_input(link, skb); - break; - } - case MSG_BUNDLER: - link->stats.recv_bundles++; - link->stats.recv_bundled += msg_msgcnt(msg); + if (!tipc_msg_extract(skb, &iskb, &ipos)) + return rc; + kfree_skb(skb); + skb = iskb; + hdr = buf_msg(skb); + if (less(msg_seqno(hdr), l->drop_point)) + goto drop; + if (tipc_data_input(l, skb)) + return rc; + usr = msg_user(hdr); + reasm_skb = &l->failover_reasm_skb; + } + if (usr == MSG_BUNDLER) { + l->stats.recv_bundles++; + l->stats.recv_bundled += msg_msgcnt(hdr); while (tipc_msg_extract(skb, &iskb, &pos)) - tipc_data_input(link, iskb); - break; - case MSG_FRAGMENTER: - link->stats.recv_fragments++; - if (tipc_buf_append(&link->reasm_buf, &skb)) { - link->stats.recv_fragmented++; - tipc_data_input(link, skb); - } else if (!link->reasm_buf) { - link->exec_mode = TIPC_LINK_BLOCKED; - rc |= TIPC_LINK_DOWN_EVT; + tipc_data_input(l, iskb); + return rc; + } else if (usr == MSG_FRAGMENTER) { + l->stats.recv_fragments++; + if (tipc_buf_append(reasm_skb, &skb)) { + l->stats.recv_fragmented++; + tipc_data_input(l, skb); + } else if (!*reasm_skb) { + l->exec_mode = TIPC_LINK_BLOCKED; + l->state = TIPC_LINK_RESETTING; + rc = TIPC_LINK_DOWN_EVT; } - break; - case BCAST_PROTOCOL: + return rc; + } else if (usr == BCAST_PROTOCOL) { tipc_link_sync_rcv(node, skb); - break; - default: - break; - }; + return rc; + } +drop: + kfree_skb(skb); return rc; } @@ -1100,7 +1038,6 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *xmitq) { struct sk_buff_head *arrvq = &l->deferdq; - struct sk_buff *tmp; struct tipc_msg *hdr; u16 seqno, rcv_nxt; int rc = 0; @@ -1112,18 +1049,18 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, return rc; } - skb_queue_walk_safe(arrvq, skb, tmp) { + while ((skb = skb_peek(arrvq))) { hdr = buf_msg(skb); /* Verify and update link state */ if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) { __skb_dequeue(arrvq); - rc |= tipc_link_proto_rcv(l, skb, xmitq); + rc = tipc_link_proto_rcv(l, skb, xmitq); continue; } if (unlikely(!link_working(l))) { - rc |= tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq); + rc = tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq); if (!link_working(l)) { kfree_skb(__skb_dequeue(arrvq)); return rc; @@ -1156,18 +1093,11 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, return rc; } - /* Synchronize with parallel link if applicable */ - if (unlikely(l->exec_mode == TIPC_LINK_TUNNEL)) - if (!msg_dup(hdr) && !link_synch(l)) { - kfree_skb(skb); - return rc; - } - /* Packet can be delivered */ l->rcv_nxt++; l->stats.recv_info++; if (unlikely(!tipc_data_input(l, skb))) - rc |= tipc_link_input(l, skb); + rc = tipc_link_input(l, skb); /* Ack at regular intervals */ if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) { @@ -1288,7 +1218,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, } else { /* RESET_MSG or ACTIVATE_MSG */ msg_set_max_pkt(hdr, l->advertised_mtu); - msg_set_ack(hdr, l->failover_checkpt - 1); + msg_set_ack(hdr, l->rcv_nxt - 1); msg_set_next_sent(hdr, 1); } skb = tipc_buf_acquire(msg_size(hdr)); @@ -1296,223 +1226,75 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, return; skb_copy_to_linear_data(skb, hdr, msg_size(hdr)); skb->priority = TC_PRIO_CONTROL; - __skb_queue_head(xmitq, skb); -} - -/* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to - * a different bearer. Owner node is locked. - */ -static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, - struct tipc_msg *tunnel_hdr, - struct tipc_msg *msg, - u32 selector) -{ - struct tipc_link *tunnel; - struct sk_buff *skb; - u32 length = msg_size(msg); - - tunnel = node_active_link(l_ptr->owner, selector & 1); - if (!tipc_link_is_up(tunnel)) { - pr_warn("%stunnel link no longer available\n", link_co_err); - return; - } - msg_set_size(tunnel_hdr, length + INT_H_SIZE); - skb = tipc_buf_acquire(length + INT_H_SIZE); - if (!skb) { - pr_warn("%sunable to send tunnel msg\n", link_co_err); - return; - } - skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length); - __tipc_link_xmit_skb(tunnel, skb); + __skb_queue_tail(xmitq, skb); } - -/* tipc_link_failover_send_queue(): A link has gone down, but a second - * link is still active. We can do failover. Tunnel the failing link's - * whole send queue via the remaining link. This way, we don't lose - * any packets, and sequence order is preserved for subsequent traffic - * sent over the remaining link. Owner node is locked. +/* tipc_link_tnl_prepare(): prepare and return a list of tunnel packets + * with contents of the link's tranmsit and backlog queues. */ -void tipc_link_failover_send_queue(struct tipc_link *l_ptr) +void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, + int mtyp, struct sk_buff_head *xmitq) { - int msgcount; - struct tipc_link *tunnel = node_active_link(l_ptr->owner, 0); - struct tipc_msg tunnel_hdr; - struct sk_buff *skb; - int split_bundles; + struct sk_buff *skb, *tnlskb; + struct tipc_msg *hdr, tnlhdr; + struct sk_buff_head *queue = &l->transmq; + struct sk_buff_head tmpxq, tnlq; + u16 pktlen, pktcnt, seqno = l->snd_nxt; - if (!tunnel) + if (!tnl) return; - tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, TUNNEL_PROTOCOL, - FAILOVER_MSG, INT_H_SIZE, l_ptr->addr); + skb_queue_head_init(&tnlq); + skb_queue_head_init(&tmpxq); - skb_queue_walk(&l_ptr->backlogq, skb) { - msg_set_seqno(buf_msg(skb), l_ptr->snd_nxt); - l_ptr->snd_nxt = mod(l_ptr->snd_nxt + 1); - } - skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq); - tipc_link_purge_backlog(l_ptr); - msgcount = skb_queue_len(&l_ptr->transmq); - msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); - msg_set_msgcnt(&tunnel_hdr, msgcount); - - if (skb_queue_empty(&l_ptr->transmq)) { - skb = tipc_buf_acquire(INT_H_SIZE); - if (skb) { - skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE); - msg_set_size(&tunnel_hdr, INT_H_SIZE); - __tipc_link_xmit_skb(tunnel, skb); - } else { - pr_warn("%sunable to send changeover msg\n", - link_co_err); - } + /* At least one packet required for safe algorithm => add dummy */ + skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG, + BASIC_H_SIZE, 0, l->addr, link_own_addr(l), + 0, 0, TIPC_ERR_NO_PORT); + if (!skb) { + pr_warn("%sunable to create tunnel packet\n", link_co_err); return; } - - split_bundles = (node_active_link(l_ptr->owner, 0) != - node_active_link(l_ptr->owner, 0)); - - skb_queue_walk(&l_ptr->transmq, skb) { - struct tipc_msg *msg = buf_msg(skb); - - if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { - struct tipc_msg *m = msg_get_wrapped(msg); - unchar *pos = (unchar *)m; - - msgcount = msg_msgcnt(msg); - while (msgcount--) { - msg_set_seqno(m, msg_seqno(msg)); - tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m, - msg_link_selector(m)); - pos += align(msg_size(m)); - m = (struct tipc_msg *)pos; - } - } else { - tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg, - msg_link_selector(msg)); - } - } -} - -/* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a - * duplicate of the first link's send queue via the new link. This way, we - * are guaranteed that currently queued packets from a socket are delivered - * before future traffic from the same socket, even if this is using the - * new link. The last arriving copy of each duplicate packet is dropped at - * the receiving end by the regular protocol check, so packet cardinality - * and sequence order is preserved per sender/receiver socket pair. - * Owner node is locked. - */ -void tipc_link_dup_queue_xmit(struct tipc_link *link, - struct tipc_link *tnl) -{ - struct sk_buff *skb; - struct tipc_msg tnl_hdr; - struct sk_buff_head *queue = &link->transmq; - int mcnt; - u16 seqno; - - tipc_msg_init(link_own_addr(link), &tnl_hdr, TUNNEL_PROTOCOL, - SYNCH_MSG, INT_H_SIZE, link->addr); - mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq); - msg_set_msgcnt(&tnl_hdr, mcnt); - msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id); - -tunnel_queue: + skb_queue_tail(&tnlq, skb); + tipc_link_xmit(l, &tnlq, &tmpxq); + __skb_queue_purge(&tmpxq); + + /* Initialize reusable tunnel packet header */ + tipc_msg_init(link_own_addr(l), &tnlhdr, TUNNEL_PROTOCOL, + mtyp, INT_H_SIZE, l->addr); + pktcnt = skb_queue_len(&l->transmq) + skb_queue_len(&l->backlogq); + msg_set_msgcnt(&tnlhdr, pktcnt); + msg_set_bearer_id(&tnlhdr, l->peer_bearer_id); +tnl: + /* Wrap each packet into a tunnel packet */ skb_queue_walk(queue, skb) { - struct sk_buff *outskb; - struct tipc_msg *msg = buf_msg(skb); - u32 len = msg_size(msg); - - msg_set_ack(msg, mod(link->rcv_nxt - 1)); - msg_set_bcast_ack(msg, link->owner->bclink.last_in); - msg_set_size(&tnl_hdr, len + INT_H_SIZE); - outskb = tipc_buf_acquire(len + INT_H_SIZE); - if (outskb == NULL) { - pr_warn("%sunable to send duplicate msg\n", - link_co_err); + hdr = buf_msg(skb); + if (queue == &l->backlogq) + msg_set_seqno(hdr, seqno++); + pktlen = msg_size(hdr); + msg_set_size(&tnlhdr, pktlen + INT_H_SIZE); + tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE); + if (!tnlskb) { + pr_warn("%sunable to send packet\n", link_co_err); return; } - skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, - skb->data, len); - __tipc_link_xmit_skb(tnl, outskb); - if (!tipc_link_is_up(link)) - return; + skb_copy_to_linear_data(tnlskb, &tnlhdr, INT_H_SIZE); + skb_copy_to_linear_data_offset(tnlskb, INT_H_SIZE, hdr, pktlen); + __skb_queue_tail(&tnlq, tnlskb); } - if (queue == &link->backlogq) - return; - seqno = link->snd_nxt; - skb_queue_walk(&link->backlogq, skb) { - msg_set_seqno(buf_msg(skb), seqno); - seqno = mod(seqno + 1); - } - queue = &link->backlogq; - goto tunnel_queue; -} - -/* tipc_link_failover_rcv(): Receive a tunnelled FAILOVER_MSG packet - * Owner node is locked. - */ -static int tipc_link_failover_rcv(struct tipc_link *link, - struct sk_buff **skb) -{ - struct tipc_msg *msg = buf_msg(*skb); - struct sk_buff *iskb = NULL; - struct tipc_link *pl = NULL; - int bearer_id = msg_bearer_id(msg); - int pos = 0; - int rc = 0; - - if (msg_type(msg) != FAILOVER_MSG) { - pr_warn("%sunknown tunnel pkt received\n", link_co_err); - goto exit; + if (queue != &l->backlogq) { + queue = &l->backlogq; + goto tnl; } - if (bearer_id >= MAX_BEARERS) - goto exit; - - if (bearer_id == link->bearer_id) - goto exit; - - pl = link->owner->links[bearer_id].link; - - if (link->failover_pkts == FIRST_FAILOVER) - link->failover_pkts = msg_msgcnt(msg); - - /* Should we expect an inner packet? */ - if (!link->failover_pkts) - goto exit; - if (!tipc_msg_extract(*skb, &iskb, &pos)) { - pr_warn("%sno inner failover pkt\n", link_co_err); - *skb = NULL; - goto exit; - } - link->failover_pkts--; - *skb = NULL; + tipc_link_xmit(tnl, &tnlq, xmitq); - /* Was this packet already delivered? */ - if (less(buf_seqno(iskb), link->failover_checkpt)) { - kfree_skb(iskb); - iskb = NULL; - goto exit; - } - if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) { - link->stats.recv_fragments++; - if (!tipc_buf_append(&link->failover_skb, &iskb) && - !link->failover_skb) { - link->exec_mode = TIPC_LINK_BLOCKED; - rc |= TIPC_LINK_DOWN_EVT; - } + if (mtyp == FAILOVER_MSG) { + tnl->drop_point = l->rcv_nxt; + tnl->failover_reasm_skb = l->reasm_buf; + l->reasm_buf = NULL; + l->exec_mode = TIPC_LINK_BLOCKED; } -exit: - if (!link->failover_pkts && pl) - pl->exec_mode = TIPC_LINK_OPEN; - kfree_skb(*skb); - *skb = iskb; - return rc; } /* tipc_link_proto_rcv(): receive link level protocol message : @@ -1593,7 +1375,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, /* If NACK, retransmit will now start at right position */ if (nacked_gap) { - rc |= tipc_link_retransm(l, nacked_gap, xmitq); + rc = tipc_link_retransm(l, nacked_gap, xmitq); l->stats.recv_nacks++; } tipc_link_advance_backlog(l, xmitq); |