diff options
Diffstat (limited to 'net/rxrpc/call.c')
-rw-r--r-- | net/rxrpc/call.c | 2278 |
1 files changed, 2278 insertions, 0 deletions
diff --git a/net/rxrpc/call.c b/net/rxrpc/call.c new file mode 100644 index 000000000000..5cfd4cadee42 --- /dev/null +++ b/net/rxrpc/call.c @@ -0,0 +1,2278 @@ +/* call.c: Rx call routines + * + * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +#include <linux/sched.h> +#include <linux/slab.h> +#include <linux/module.h> +#include <rxrpc/rxrpc.h> +#include <rxrpc/transport.h> +#include <rxrpc/peer.h> +#include <rxrpc/connection.h> +#include <rxrpc/call.h> +#include <rxrpc/message.h> +#include "internal.h" + +__RXACCT_DECL(atomic_t rxrpc_call_count); +__RXACCT_DECL(atomic_t rxrpc_message_count); + +LIST_HEAD(rxrpc_calls); +DECLARE_RWSEM(rxrpc_calls_sem); + +unsigned rxrpc_call_rcv_timeout = HZ/3; +static unsigned rxrpc_call_acks_timeout = HZ/3; +static unsigned rxrpc_call_dfr_ack_timeout = HZ/20; +static unsigned short rxrpc_call_max_resend = HZ/10; + +const char *rxrpc_call_states[] = { + "COMPLETE", + "ERROR", + "SRVR_RCV_OPID", + "SRVR_RCV_ARGS", + "SRVR_GOT_ARGS", + "SRVR_SND_REPLY", + "SRVR_RCV_FINAL_ACK", + "CLNT_SND_ARGS", + "CLNT_RCV_REPLY", + "CLNT_GOT_REPLY" +}; + +const char *rxrpc_call_error_states[] = { + "NO_ERROR", + "LOCAL_ABORT", + "PEER_ABORT", + "LOCAL_ERROR", + "REMOTE_ERROR" +}; + +const char *rxrpc_pkts[] = { + "?00", + "data", "ack", "busy", "abort", "ackall", "chall", "resp", "debug", + "?09", "?10", "?11", "?12", "?13", "?14", "?15" +}; + +static const char *rxrpc_acks[] = { + "---", "REQ", "DUP", "SEQ", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL", + "-?-" +}; + +static const char _acktype[] = "NA-"; + +static void rxrpc_call_receive_packet(struct rxrpc_call *call); +static void rxrpc_call_receive_data_packet(struct rxrpc_call *call, + struct rxrpc_message *msg); +static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call, + struct rxrpc_message *msg); +static void rxrpc_call_definitively_ACK(struct rxrpc_call *call, + rxrpc_seq_t higest); +static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest); +static int __rxrpc_call_read_data(struct rxrpc_call *call); + +static int rxrpc_call_record_ACK(struct rxrpc_call *call, + struct rxrpc_message *msg, + rxrpc_seq_t seq, + size_t count); + +static int rxrpc_call_flush(struct rxrpc_call *call); + +#define _state(call) \ + _debug("[[[ state %s ]]]", rxrpc_call_states[call->app_call_state]); + +static void rxrpc_call_default_attn_func(struct rxrpc_call *call) +{ + wake_up(&call->waitq); +} + +static void rxrpc_call_default_error_func(struct rxrpc_call *call) +{ + wake_up(&call->waitq); +} + +static void rxrpc_call_default_aemap_func(struct rxrpc_call *call) +{ + switch (call->app_err_state) { + case RXRPC_ESTATE_LOCAL_ABORT: + call->app_abort_code = -call->app_errno; + case RXRPC_ESTATE_PEER_ABORT: + call->app_errno = -ECONNABORTED; + default: + break; + } +} + +static void __rxrpc_call_acks_timeout(unsigned long _call) +{ + struct rxrpc_call *call = (struct rxrpc_call *) _call; + + _debug("ACKS TIMEOUT %05lu", jiffies - call->cjif); + + call->flags |= RXRPC_CALL_ACKS_TIMO; + rxrpc_krxiod_queue_call(call); +} + +static void __rxrpc_call_rcv_timeout(unsigned long _call) +{ + struct rxrpc_call *call = (struct rxrpc_call *) _call; + + _debug("RCV TIMEOUT %05lu", jiffies - call->cjif); + + call->flags |= RXRPC_CALL_RCV_TIMO; + rxrpc_krxiod_queue_call(call); +} + +static void __rxrpc_call_ackr_timeout(unsigned long _call) +{ + struct rxrpc_call *call = (struct rxrpc_call *) _call; + + _debug("ACKR TIMEOUT %05lu",jiffies - call->cjif); + + call->flags |= RXRPC_CALL_ACKR_TIMO; + rxrpc_krxiod_queue_call(call); +} + +/*****************************************************************************/ +/* + * calculate a timeout based on an RTT value + */ +static inline unsigned long __rxrpc_rtt_based_timeout(struct rxrpc_call *call, + unsigned long val) +{ + unsigned long expiry = call->conn->peer->rtt / (1000000 / HZ); + + expiry += 10; + if (expiry < HZ / 25) + expiry = HZ / 25; + if (expiry > HZ) + expiry = HZ; + + _leave(" = %lu jiffies", expiry); + return jiffies + expiry; +} /* end __rxrpc_rtt_based_timeout() */ + +/*****************************************************************************/ +/* + * create a new call record + */ +static inline int __rxrpc_create_call(struct rxrpc_connection *conn, + struct rxrpc_call **_call) +{ + struct rxrpc_call *call; + + _enter("%p", conn); + + /* allocate and initialise a call record */ + call = (struct rxrpc_call *) get_zeroed_page(GFP_KERNEL); + if (!call) { + _leave(" ENOMEM"); + return -ENOMEM; + } + + atomic_set(&call->usage, 1); + + init_waitqueue_head(&call->waitq); + spin_lock_init(&call->lock); + INIT_LIST_HEAD(&call->link); + INIT_LIST_HEAD(&call->acks_pendq); + INIT_LIST_HEAD(&call->rcv_receiveq); + INIT_LIST_HEAD(&call->rcv_krxiodq_lk); + INIT_LIST_HEAD(&call->app_readyq); + INIT_LIST_HEAD(&call->app_unreadyq); + INIT_LIST_HEAD(&call->app_link); + INIT_LIST_HEAD(&call->app_attn_link); + + init_timer(&call->acks_timeout); + call->acks_timeout.data = (unsigned long) call; + call->acks_timeout.function = __rxrpc_call_acks_timeout; + + init_timer(&call->rcv_timeout); + call->rcv_timeout.data = (unsigned long) call; + call->rcv_timeout.function = __rxrpc_call_rcv_timeout; + + init_timer(&call->ackr_dfr_timo); + call->ackr_dfr_timo.data = (unsigned long) call; + call->ackr_dfr_timo.function = __rxrpc_call_ackr_timeout; + + call->conn = conn; + call->ackr_win_bot = 1; + call->ackr_win_top = call->ackr_win_bot + RXRPC_CALL_ACK_WINDOW_SIZE - 1; + call->ackr_prev_seq = 0; + call->app_mark = RXRPC_APP_MARK_EOF; + call->app_attn_func = rxrpc_call_default_attn_func; + call->app_error_func = rxrpc_call_default_error_func; + call->app_aemap_func = rxrpc_call_default_aemap_func; + call->app_scr_alloc = call->app_scratch; + + call->cjif = jiffies; + + _leave(" = 0 (%p)", call); + + *_call = call; + + return 0; +} /* end __rxrpc_create_call() */ + +/*****************************************************************************/ +/* + * create a new call record for outgoing calls + */ +int rxrpc_create_call(struct rxrpc_connection *conn, + rxrpc_call_attn_func_t attn, + rxrpc_call_error_func_t error, + rxrpc_call_aemap_func_t aemap, + struct rxrpc_call **_call) +{ + DECLARE_WAITQUEUE(myself, current); + + struct rxrpc_call *call; + int ret, cix, loop; + + _enter("%p", conn); + + /* allocate and initialise a call record */ + ret = __rxrpc_create_call(conn, &call); + if (ret < 0) { + _leave(" = %d", ret); + return ret; + } + + call->app_call_state = RXRPC_CSTATE_CLNT_SND_ARGS; + if (attn) + call->app_attn_func = attn; + if (error) + call->app_error_func = error; + if (aemap) + call->app_aemap_func = aemap; + + _state(call); + + spin_lock(&conn->lock); + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue(&conn->chanwait, &myself); + + try_again: + /* try to find an unused channel */ + for (cix = 0; cix < 4; cix++) + if (!conn->channels[cix]) + goto obtained_chan; + + /* no free channels - wait for one to become available */ + ret = -EINTR; + if (signal_pending(current)) + goto error_unwait; + + spin_unlock(&conn->lock); + + schedule(); + set_current_state(TASK_INTERRUPTIBLE); + + spin_lock(&conn->lock); + goto try_again; + + /* got a channel - now attach to the connection */ + obtained_chan: + remove_wait_queue(&conn->chanwait, &myself); + set_current_state(TASK_RUNNING); + + /* concoct a unique call number */ + next_callid: + call->call_id = htonl(++conn->call_counter); + for (loop = 0; loop < 4; loop++) + if (conn->channels[loop] && + conn->channels[loop]->call_id == call->call_id) + goto next_callid; + + rxrpc_get_connection(conn); + conn->channels[cix] = call; /* assign _after_ done callid check loop */ + do_gettimeofday(&conn->atime); + call->chan_ix = htonl(cix); + + spin_unlock(&conn->lock); + + down_write(&rxrpc_calls_sem); + list_add_tail(&call->call_link, &rxrpc_calls); + up_write(&rxrpc_calls_sem); + + __RXACCT(atomic_inc(&rxrpc_call_count)); + *_call = call; + + _leave(" = 0 (call=%p cix=%u)", call, cix); + return 0; + + error_unwait: + remove_wait_queue(&conn->chanwait, &myself); + set_current_state(TASK_RUNNING); + spin_unlock(&conn->lock); + + free_page((unsigned long) call); + _leave(" = %d", ret); + return ret; +} /* end rxrpc_create_call() */ + +/*****************************************************************************/ +/* + * create a new call record for incoming calls + */ +int rxrpc_incoming_call(struct rxrpc_connection *conn, + struct rxrpc_message *msg, + struct rxrpc_call **_call) +{ + struct rxrpc_call *call; + unsigned cix; + int ret; + + cix = ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK; + + _enter("%p,%u,%u", conn, ntohl(msg->hdr.callNumber), cix); + + /* allocate and initialise a call record */ + ret = __rxrpc_create_call(conn, &call); + if (ret < 0) { + _leave(" = %d", ret); + return ret; + } + + call->pkt_rcv_count = 1; + call->app_call_state = RXRPC_CSTATE_SRVR_RCV_OPID; + call->app_mark = sizeof(uint32_t); + + _state(call); + + /* attach to the connection */ + ret = -EBUSY; + call->chan_ix = htonl(cix); + call->call_id = msg->hdr.callNumber; + + spin_lock(&conn->lock); + + if (!conn->channels[cix] || + conn->channels[cix]->app_call_state == RXRPC_CSTATE_COMPLETE || + conn->channels[cix]->app_call_state == RXRPC_CSTATE_ERROR + ) { + conn->channels[cix] = call; + rxrpc_get_connection(conn); + ret = 0; + } + + spin_unlock(&conn->lock); + + if (ret < 0) { + free_page((unsigned long) call); + call = NULL; + } + + if (ret == 0) { + down_write(&rxrpc_calls_sem); + list_add_tail(&call->call_link, &rxrpc_calls); + up_write(&rxrpc_calls_sem); + __RXACCT(atomic_inc(&rxrpc_call_count)); + *_call = call; + } + + _leave(" = %d [%p]", ret, call); + return ret; +} /* end rxrpc_incoming_call() */ + +/*****************************************************************************/ +/* + * free a call record + */ +void rxrpc_put_call(struct rxrpc_call *call) +{ + struct rxrpc_connection *conn = call->conn; + struct rxrpc_message *msg; + + _enter("%p{u=%d}",call,atomic_read(&call->usage)); + + /* sanity check */ + if (atomic_read(&call->usage) <= 0) + BUG(); + + /* to prevent a race, the decrement and the de-list must be effectively + * atomic */ + spin_lock(&conn->lock); + if (likely(!atomic_dec_and_test(&call->usage))) { + spin_unlock(&conn->lock); + _leave(""); + return; + } + + if (conn->channels[ntohl(call->chan_ix)] == call) + conn->channels[ntohl(call->chan_ix)] = NULL; + + spin_unlock(&conn->lock); + + wake_up(&conn->chanwait); + + rxrpc_put_connection(conn); + + /* clear the timers and dequeue from krxiod */ + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->rcv_timeout); + del_timer_sync(&call->ackr_dfr_timo); + + rxrpc_krxiod_dequeue_call(call); + + /* clean up the contents of the struct */ + if (call->snd_nextmsg) + rxrpc_put_message(call->snd_nextmsg); + + if (call->snd_ping) + rxrpc_put_message(call->snd_ping); + + while (!list_empty(&call->acks_pendq)) { + msg = list_entry(call->acks_pendq.next, + struct rxrpc_message, link); + list_del(&msg->link); + rxrpc_put_message(msg); + } + + while (!list_empty(&call->rcv_receiveq)) { + msg = list_entry(call->rcv_receiveq.next, + struct rxrpc_message, link); + list_del(&msg->link); + rxrpc_put_message(msg); + } + + while (!list_empty(&call->app_readyq)) { + msg = list_entry(call->app_readyq.next, + struct rxrpc_message, link); + list_del(&msg->link); + rxrpc_put_message(msg); + } + + while (!list_empty(&call->app_unreadyq)) { + msg = list_entry(call->app_unreadyq.next, + struct rxrpc_message, link); + list_del(&msg->link); + rxrpc_put_message(msg); + } + + module_put(call->owner); + + down_write(&rxrpc_calls_sem); + list_del(&call->call_link); + up_write(&rxrpc_calls_sem); + + __RXACCT(atomic_dec(&rxrpc_call_count)); + free_page((unsigned long) call); + + _leave(" [destroyed]"); +} /* end rxrpc_put_call() */ + +/*****************************************************************************/ +/* + * actually generate a normal ACK + */ +static inline int __rxrpc_call_gen_normal_ACK(struct rxrpc_call *call, + rxrpc_seq_t seq) +{ + struct rxrpc_message *msg; + struct kvec diov[3]; + __be32 aux[4]; + int delta, ret; + + /* ACKs default to DELAY */ + if (!call->ackr.reason) + call->ackr.reason = RXRPC_ACK_DELAY; + + _proto("Rx %05lu Sending ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", + jiffies - call->cjif, + ntohs(call->ackr.maxSkew), + ntohl(call->ackr.firstPacket), + ntohl(call->ackr.previousPacket), + ntohl(call->ackr.serial), + rxrpc_acks[call->ackr.reason], + call->ackr.nAcks); + + aux[0] = htonl(call->conn->peer->if_mtu); /* interface MTU */ + aux[1] = htonl(1444); /* max MTU */ + aux[2] = htonl(16); /* rwind */ + aux[3] = htonl(4); /* max packets */ + + diov[0].iov_len = sizeof(struct rxrpc_ackpacket); + diov[0].iov_base = &call->ackr; + diov[1].iov_len = call->ackr_pend_cnt + 3; + diov[1].iov_base = call->ackr_array; + diov[2].iov_len = sizeof(aux); + diov[2].iov_base = &aux; + + /* build and send the message */ + ret = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK, + 3, diov, GFP_KERNEL, &msg); + if (ret < 0) + goto out; + + msg->seq = seq; + msg->hdr.seq = htonl(seq); + msg->hdr.flags |= RXRPC_SLOW_START_OK; + + ret = rxrpc_conn_sendmsg(call->conn, msg); + rxrpc_put_message(msg); + if (ret < 0) + goto out; + call->pkt_snd_count++; + + /* count how many actual ACKs there were at the front */ + for (delta = 0; delta < call->ackr_pend_cnt; delta++) + if (call->ackr_array[delta] != RXRPC_ACK_TYPE_ACK) + break; + + call->ackr_pend_cnt -= delta; /* all ACK'd to this point */ + + /* crank the ACK window around */ + if (delta == 0) { + /* un-ACK'd window */ + } + else if (delta < RXRPC_CALL_ACK_WINDOW_SIZE) { + /* partially ACK'd window + * - shuffle down to avoid losing out-of-sequence packets + */ + call->ackr_win_bot += delta; + call->ackr_win_top += delta; + + memmove(&call->ackr_array[0], + &call->ackr_array[delta], + call->ackr_pend_cnt); + + memset(&call->ackr_array[call->ackr_pend_cnt], + RXRPC_ACK_TYPE_NACK, + sizeof(call->ackr_array) - call->ackr_pend_cnt); + } + else { + /* fully ACK'd window + * - just clear the whole thing + */ + memset(&call->ackr_array, + RXRPC_ACK_TYPE_NACK, + sizeof(call->ackr_array)); + } + + /* clear this ACK */ + memset(&call->ackr, 0, sizeof(call->ackr)); + + out: + if (!call->app_call_state) + printk("___ STATE 0 ___\n"); + return ret; +} /* end __rxrpc_call_gen_normal_ACK() */ + +/*****************************************************************************/ +/* + * note the reception of a packet in the call's ACK records and generate an + * appropriate ACK packet if necessary + * - returns 0 if packet should be processed, 1 if packet should be ignored + * and -ve on an error + */ +static int rxrpc_call_generate_ACK(struct rxrpc_call *call, + struct rxrpc_header *hdr, + struct rxrpc_ackpacket *ack) +{ + struct rxrpc_message *msg; + rxrpc_seq_t seq; + unsigned offset; + int ret = 0, err; + u8 special_ACK, do_ACK, force; + + _enter("%p,%p { seq=%d tp=%d fl=%02x }", + call, hdr, ntohl(hdr->seq), hdr->type, hdr->flags); + + seq = ntohl(hdr->seq); + offset = seq - call->ackr_win_bot; + do_ACK = RXRPC_ACK_DELAY; + special_ACK = 0; + force = (seq == 1); + + if (call->ackr_high_seq < seq) + call->ackr_high_seq = seq; + + /* deal with generation of obvious special ACKs first */ + if (ack && ack->reason == RXRPC_ACK_PING) { + special_ACK = RXRPC_ACK_PING_RESPONSE; + ret = 1; + goto gen_ACK; + } + + if (seq < call->ackr_win_bot) { + special_ACK = RXRPC_ACK_DUPLICATE; + ret = 1; + goto gen_ACK; + } + + if (seq >= call->ackr_win_top) { + special_ACK = RXRPC_ACK_EXCEEDS_WINDOW; + ret = 1; + goto gen_ACK; + } + + if (call->ackr_array[offset] != RXRPC_ACK_TYPE_NACK) { + special_ACK = RXRPC_ACK_DUPLICATE; + ret = 1; + goto gen_ACK; + } + + /* okay... it's a normal data packet inside the ACK window */ + call->ackr_array[offset] = RXRPC_ACK_TYPE_ACK; + + if (offset < call->ackr_pend_cnt) { + } + else if (offset > call->ackr_pend_cnt) { + do_ACK = RXRPC_ACK_OUT_OF_SEQUENCE; + call->ackr_pend_cnt = offset; + goto gen_ACK; + } + + if (hdr->flags & RXRPC_REQUEST_ACK) { + do_ACK = RXRPC_ACK_REQUESTED; + } + + /* generate an ACK on the final packet of a reply just received */ + if (hdr->flags & RXRPC_LAST_PACKET) { + if (call->conn->out_clientflag) + force = 1; + } + else if (!(hdr->flags & RXRPC_MORE_PACKETS)) { + do_ACK = RXRPC_ACK_REQUESTED; + } + + /* re-ACK packets previously received out-of-order */ + for (offset++; offset < RXRPC_CALL_ACK_WINDOW_SIZE; offset++) + if (call->ackr_array[offset] != RXRPC_ACK_TYPE_ACK) + break; + + call->ackr_pend_cnt = offset; + + /* generate an ACK if we fill up the window */ + if (call->ackr_pend_cnt >= RXRPC_CALL_ACK_WINDOW_SIZE) + force = 1; + + gen_ACK: + _debug("%05lu ACKs pend=%u norm=%s special=%s%s", + jiffies - call->cjif, + call->ackr_pend_cnt, + rxrpc_acks[do_ACK], + rxrpc_acks[special_ACK], + force ? " immediate" : + do_ACK == RXRPC_ACK_REQUESTED ? " merge-req" : + hdr->flags & RXRPC_LAST_PACKET ? " finalise" : + " defer" + ); + + /* send any pending normal ACKs if need be */ + if (call->ackr_pend_cnt > 0) { + /* fill out the appropriate form */ + call->ackr.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE); + call->ackr.maxSkew = htons(min(call->ackr_high_seq - seq, + 65535U)); + call->ackr.firstPacket = htonl(call->ackr_win_bot); + call->ackr.previousPacket = call->ackr_prev_seq; + call->ackr.serial = hdr->serial; + call->ackr.nAcks = call->ackr_pend_cnt; + + if (do_ACK == RXRPC_ACK_REQUESTED) + call->ackr.reason = do_ACK; + + /* generate the ACK immediately if necessary */ + if (special_ACK || force) { + err = __rxrpc_call_gen_normal_ACK( + call, do_ACK == RXRPC_ACK_DELAY ? 0 : seq); + if (err < 0) { + ret = err; + goto out; + } + } + } + + if (call->ackr.reason == RXRPC_ACK_REQUESTED) + call->ackr_dfr_seq = seq; + + /* start the ACK timer if not running if there are any pending deferred + * ACKs */ + if (call->ackr_pend_cnt > 0 && + call->ackr.reason != RXRPC_ACK_REQUESTED && + !timer_pending(&call->ackr_dfr_timo) + ) { + unsigned long timo; + + timo = rxrpc_call_dfr_ack_timeout + jiffies; + + _debug("START ACKR TIMER for cj=%lu", timo - call->cjif); + + spin_lock(&call->lock); + mod_timer(&call->ackr_dfr_timo, timo); + spin_unlock(&call->lock); + } + else if ((call->ackr_pend_cnt == 0 || + call->ackr.reason == RXRPC_ACK_REQUESTED) && + timer_pending(&call->ackr_dfr_timo) + ) { + /* stop timer if no pending ACKs */ + _debug("CLEAR ACKR TIMER"); + del_timer_sync(&call->ackr_dfr_timo); + } + + /* send a special ACK if one is required */ + if (special_ACK) { + struct rxrpc_ackpacket ack; + struct kvec diov[2]; + uint8_t acks[1] = { RXRPC_ACK_TYPE_ACK }; + + /* fill out the appropriate form */ + ack.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE); + ack.maxSkew = htons(min(call->ackr_high_seq - seq, + 65535U)); + ack.firstPacket = htonl(call->ackr_win_bot); + ack.previousPacket = call->ackr_prev_seq; + ack.serial = hdr->serial; + ack.reason = special_ACK; + ack.nAcks = 0; + + _proto("Rx Sending s-ACK" + " { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", + ntohs(ack.maxSkew), + ntohl(ack.firstPacket), + ntohl(ack.previousPacket), + ntohl(ack.serial), + rxrpc_acks[ack.reason], + ack.nAcks); + + diov[0].iov_len = sizeof(struct rxrpc_ackpacket); + diov[0].iov_base = &ack; + diov[1].iov_len = sizeof(acks); + diov[1].iov_base = acks; + + /* build and send the message */ + err = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK, + hdr->seq ? 2 : 1, diov, + GFP_KERNEL, + &msg); + if (err < 0) { + ret = err; + goto out; + } + + msg->seq = seq; + msg->hdr.seq = htonl(seq); + msg->hdr.flags |= RXRPC_SLOW_START_OK; + + err = rxrpc_conn_sendmsg(call->conn, msg); + rxrpc_put_message(msg); + if (err < 0) { + ret = err; + goto out; + } + call->pkt_snd_count++; + } + + out: + if (hdr->seq) + call->ackr_prev_seq = hdr->seq; + + _leave(" = %d", ret); + return ret; +} /* end rxrpc_call_generate_ACK() */ + +/*****************************************************************************/ +/* + * handle work to be done on a call + * - includes packet reception and timeout processing + */ +void rxrpc_call_do_stuff(struct rxrpc_call *call) +{ + _enter("%p{flags=%lx}", call, call->flags); + + /* handle packet reception */ + if (call->flags & RXRPC_CALL_RCV_PKT) { + _debug("- receive packet"); + call->flags &= ~RXRPC_CALL_RCV_PKT; + rxrpc_call_receive_packet(call); + } + + /* handle overdue ACKs */ + if (call->flags & RXRPC_CALL_ACKS_TIMO) { + _debug("- overdue ACK timeout"); + call->flags &= ~RXRPC_CALL_ACKS_TIMO; + rxrpc_call_resend(call, call->snd_seq_count); + } + + /* handle lack of reception */ + if (call->flags & RXRPC_CALL_RCV_TIMO) { + _debug("- reception timeout"); + call->flags &= ~RXRPC_CALL_RCV_TIMO; + rxrpc_call_abort(call, -EIO); + } + + /* handle deferred ACKs */ + if (call->flags & RXRPC_CALL_ACKR_TIMO || + (call->ackr.nAcks > 0 && call->ackr.reason == RXRPC_ACK_REQUESTED) + ) { + _debug("- deferred ACK timeout: cj=%05lu r=%s n=%u", + jiffies - call->cjif, + rxrpc_acks[call->ackr.reason], + call->ackr.nAcks); + + call->flags &= ~RXRPC_CALL_ACKR_TIMO; + + if (call->ackr.nAcks > 0 && + call->app_call_state != RXRPC_CSTATE_ERROR) { + /* generate ACK */ + __rxrpc_call_gen_normal_ACK(call, call->ackr_dfr_seq); + call->ackr_dfr_seq = 0; + } + } + + _leave(""); + +} /* end rxrpc_call_do_stuff() */ + +/*****************************************************************************/ +/* + * send an abort message at call or connection level + * - must be called with call->lock held + * - the supplied error code is sent as the packet data + */ +static int __rxrpc_call_abort(struct rxrpc_call *call, int errno) +{ + struct rxrpc_connection *conn = call->conn; + struct rxrpc_message *msg; + struct kvec diov[1]; + int ret; + __be32 _error; + + _enter("%p{%08x},%p{%d},%d", + conn, ntohl(conn->conn_id), call, ntohl(call->call_id), errno); + + /* if this call is already aborted, then just wake up any waiters */ + if (call->app_call_state == RXRPC_CSTATE_ERROR) { + spin_unlock(&call->lock); + call->app_error_func(call); + _leave(" = 0"); + return 0; + } + + rxrpc_get_call(call); + + /* change the state _with_ the lock still held */ + call->app_call_state = RXRPC_CSTATE_ERROR; + call->app_err_state = RXRPC_ESTATE_LOCAL_ABORT; + call->app_errno = errno; + call->app_mark = RXRPC_APP_MARK_EOF; + call->app_read_buf = NULL; + call->app_async_read = 0; + + _state(call); + + /* ask the app to translate the error code */ + call->app_aemap_func(call); + + spin_unlock(&call->lock); + + /* flush any outstanding ACKs */ + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->rcv_timeout); + del_timer_sync(&call->ackr_dfr_timo); + + if (rxrpc_call_is_ack_pending(call)) + __rxrpc_call_gen_normal_ACK(call, 0); + + /* send the abort packet only if we actually traded some other + * packets */ + ret = 0; + if (call->pkt_snd_count || call->pkt_rcv_count) { + /* actually send the abort */ + _proto("Rx Sending Call ABORT { data=%d }", + call->app_abort_code); + + _error = htonl(call->app_abort_code); + + diov[0].iov_len = sizeof(_error); + diov[0].iov_base = &_error; + + ret = rxrpc_conn_newmsg(conn, call, RXRPC_PACKET_TYPE_ABORT, + 1, diov, GFP_KERNEL, &msg); + if (ret == 0) { + ret = rxrpc_conn_sendmsg(conn, msg); + rxrpc_put_message(msg); + } + } + + /* tell the app layer to let go */ + call->app_error_func(call); + + rxrpc_put_call(call); + + _leave(" = %d", ret); + return ret; +} /* end __rxrpc_call_abort() */ + +/*****************************************************************************/ +/* + * send an abort message at call or connection level + * - the supplied error code is sent as the packet data + */ +int rxrpc_call_abort(struct rxrpc_call *call, int error) +{ + spin_lock(&call->lock); + + return __rxrpc_call_abort(call, error); + +} /* end rxrpc_call_abort() */ + +/*****************************************************************************/ +/* + * process packets waiting for this call + */ +static void rxrpc_call_receive_packet(struct rxrpc_call *call) +{ + struct rxrpc_message *msg; + struct list_head *_p; + + _enter("%p", call); + + rxrpc_get_call(call); /* must not go away too soon if aborted by + * app-layer */ + + while (!list_empty(&call->rcv_receiveq)) { + /* try to get next packet */ + _p = NULL; + spin_lock(&call->lock); + if (!list_empty(&call->rcv_receiveq)) { + _p = call->rcv_receiveq.next; + list_del_init(_p); + } + spin_unlock(&call->lock); + + if (!_p) + break; + + msg = list_entry(_p, struct rxrpc_message, link); + + _proto("Rx %05lu Received %s packet (%%%u,#%u,%c%c%c%c%c)", + jiffies - call->cjif, + rxrpc_pkts[msg->hdr.type], + ntohl(msg->hdr.serial), + msg->seq, + msg->hdr.flags & RXRPC_JUMBO_PACKET ? 'j' : '-', + msg->hdr.flags & RXRPC_MORE_PACKETS ? 'm' : '-', + msg->hdr.flags & RXRPC_LAST_PACKET ? 'l' : '-', + msg->hdr.flags & RXRPC_REQUEST_ACK ? 'r' : '-', + msg->hdr.flags & RXRPC_CLIENT_INITIATED ? 'C' : 'S' + ); + + switch (msg->hdr.type) { + /* deal with data packets */ + case RXRPC_PACKET_TYPE_DATA: + /* ACK the packet if necessary */ + switch (rxrpc_call_generate_ACK(call, &msg->hdr, + NULL)) { + case 0: /* useful packet */ + rxrpc_call_receive_data_packet(call, msg); + break; + case 1: /* duplicate or out-of-window packet */ + break; + default: + rxrpc_put_message(msg); + goto out; + } + break; + + /* deal with ACK packets */ + case RXRPC_PACKET_TYPE_ACK: + rxrpc_call_receive_ack_packet(call, msg); + break; + + /* deal with abort packets */ + case RXRPC_PACKET_TYPE_ABORT: { + __be32 _dbuf, *dp; + + dp = skb_header_pointer(msg->pkt, msg->offset, + sizeof(_dbuf), &_dbuf); + if (dp == NULL) + printk("Rx Received short ABORT packet\n"); + + _proto("Rx Received Call ABORT { data=%d }", + (dp ? ntohl(*dp) : 0)); + + spin_lock(&call->lock); + call->app_call_state = RXRPC_CSTATE_ERROR; + call->app_err_state = RXRPC_ESTATE_PEER_ABORT; + call->app_abort_code = (dp ? ntohl(*dp) : 0); + call->app_errno = -ECONNABORTED; + call->app_mark = RXRPC_APP_MARK_EOF; + call->app_read_buf = NULL; + call->app_async_read = 0; + + /* ask the app to translate the error code */ + call->app_aemap_func(call); + _state(call); + spin_unlock(&call->lock); + call->app_error_func(call); + break; + } + default: + /* deal with other packet types */ + _proto("Rx Unsupported packet type %u (#%u)", + msg->hdr.type, msg->seq); + break; + } + + rxrpc_put_message(msg); + } + + out: + rxrpc_put_call(call); + _leave(""); +} /* end rxrpc_call_receive_packet() */ + +/*****************************************************************************/ +/* + * process next data packet + * - as the next data packet arrives: + * - it is queued on app_readyq _if_ it is the next one expected + * (app_ready_seq+1) + * - it is queued on app_unreadyq _if_ it is not the next one expected + * - if a packet placed on app_readyq completely fills a hole leading up to + * the first packet on app_unreadyq, then packets now in sequence are + * tranferred to app_readyq + * - the application layer can only see packets on app_readyq + * (app_ready_qty bytes) + * - the application layer is prodded every time a new packet arrives + */ +static void rxrpc_call_receive_data_packet(struct rxrpc_call *call, + struct rxrpc_message *msg) +{ + const struct rxrpc_operation *optbl, *op; + struct rxrpc_message *pmsg; + struct list_head *_p; + int ret, lo, hi, rmtimo; + __be32 opid; + + _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq); + + rxrpc_get_message(msg); + + /* add to the unready queue if we'd have to create a hole in the ready + * queue otherwise */ + if (msg->seq != call->app_ready_seq + 1) { + _debug("Call add packet %d to unreadyq", msg->seq); + + /* insert in seq order */ + list_for_each(_p, &call->app_unreadyq) { + pmsg = list_entry(_p, struct rxrpc_message, link); + if (pmsg->seq > msg->seq) + break; + } + + list_add_tail(&msg->link, _p); + + _leave(" [unreadyq]"); + return; + } + + /* next in sequence - simply append into the call's ready queue */ + _debug("Call add packet %d to readyq (+%Zd => %Zd bytes)", + msg->seq, msg->dsize, call->app_ready_qty); + + spin_lock(&call->lock); + call->app_ready_seq = msg->seq; + call->app_ready_qty += msg->dsize; + list_add_tail(&msg->link, &call->app_readyq); + + /* move unready packets to the readyq if we got rid of a hole */ + while (!list_empty(&call->app_unreadyq)) { + pmsg = list_entry(call->app_unreadyq.next, + struct rxrpc_message, link); + + if (pmsg->seq != call->app_ready_seq + 1) + break; + + /* next in sequence - just move list-to-list */ + _debug("Call transfer packet %d to readyq (+%Zd => %Zd bytes)", + pmsg->seq, pmsg->dsize, call->app_ready_qty); + + call->app_ready_seq = pmsg->seq; + call->app_ready_qty += pmsg->dsize; + list_del_init(&pmsg->link); + list_add_tail(&pmsg->link, &call->app_readyq); + } + + /* see if we've got the last packet yet */ + if (!list_empty(&call->app_readyq)) { + pmsg = list_entry(call->app_readyq.prev, + struct rxrpc_message, link); + if (pmsg->hdr.flags & RXRPC_LAST_PACKET) { + call->app_last_rcv = 1; + _debug("Last packet on readyq"); + } + } + + switch (call->app_call_state) { + /* do nothing if call already aborted */ + case RXRPC_CSTATE_ERROR: + spin_unlock(&call->lock); + _leave(" [error]"); + return; + + /* extract the operation ID from an incoming call if that's not + * yet been done */ + case RXRPC_CSTATE_SRVR_RCV_OPID: + spin_unlock(&call->lock); + + /* handle as yet insufficient data for the operation ID */ + if (call->app_ready_qty < 4) { + if (call->app_last_rcv) + /* trouble - last packet seen */ + rxrpc_call_abort(call, -EINVAL); + + _leave(""); + return; + } + + /* pull the operation ID out of the buffer */ + ret = rxrpc_call_read_data(call, &opid, sizeof(opid), 0); + if (ret < 0) { + printk("Unexpected error from read-data: %d\n", ret); + if (call->app_call_state != RXRPC_CSTATE_ERROR) + rxrpc_call_abort(call, ret); + _leave(""); + return; + } + call->app_opcode = ntohl(opid); + + /* locate the operation in the available ops table */ + optbl = call->conn->service->ops_begin; + lo = 0; + hi = call->conn->service->ops_end - optbl; + + while (lo < hi) { + int mid = (hi + lo) / 2; + op = &optbl[mid]; + if (call->app_opcode == op->id) + goto found_op; + if (call->app_opcode > op->id) + lo = mid + 1; + else + hi = mid; + } + + /* search failed */ + kproto("Rx Client requested operation %d from %s service", + call->app_opcode, call->conn->service->name); + rxrpc_call_abort(call, -EINVAL); + _leave(" [inval]"); + return; + + found_op: + _proto("Rx Client requested operation %s from %s service", + op->name, call->conn->service->name); + + /* we're now waiting for the argument block (unless the call + * was aborted) */ + spin_lock(&call->lock); + if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_OPID || + call->app_call_state == RXRPC_CSTATE_SRVR_SND_REPLY) { + if (!call->app_last_rcv) + call->app_call_state = + RXRPC_CSTATE_SRVR_RCV_ARGS; + else if (call->app_ready_qty > 0) + call->app_call_state = + RXRPC_CSTATE_SRVR_GOT_ARGS; + else + call->app_call_state = + RXRPC_CSTATE_SRVR_SND_REPLY; + call->app_mark = op->asize; + call->app_user = op->user; + } + spin_unlock(&call->lock); + + _state(call); + break; + + case RXRPC_CSTATE_SRVR_RCV_ARGS: + /* change state if just received last packet of arg block */ + if (call->app_last_rcv) + call->app_call_state = RXRPC_CSTATE_SRVR_GOT_ARGS; + spin_unlock(&call->lock); + + _state(call); + break; + + case RXRPC_CSTATE_CLNT_RCV_REPLY: + /* change state if just received last packet of reply block */ + rmtimo = 0; + if (call->app_last_rcv) { + call->app_call_state = RXRPC_CSTATE_CLNT_GOT_REPLY; + rmtimo = 1; + } + spin_unlock(&call->lock); + + if (rmtimo) { + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->rcv_timeout); + del_timer_sync(&call->ackr_dfr_timo); + } + + _state(call); + break; + + default: + /* deal with data reception in an unexpected state */ + printk("Unexpected state [[[ %u ]]]\n", call->app_call_state); + __rxrpc_call_abort(call, -EBADMSG); + _leave(""); + return; + } + + if (call->app_call_state == RXRPC_CSTATE_CLNT_RCV_REPLY && + call->app_last_rcv) + BUG(); + + /* otherwise just invoke the data function whenever we can satisfy its desire for more + * data + */ + _proto("Rx Received Op Data: st=%u qty=%Zu mk=%Zu%s", + call->app_call_state, call->app_ready_qty, call->app_mark, + call->app_last_rcv ? " last-rcvd" : ""); + + spin_lock(&call->lock); + + ret = __rxrpc_call_read_data(call); + switch (ret) { + case 0: + spin_unlock(&call->lock); + call->app_attn_func(call); + break; + case -EAGAIN: + spin_unlock(&call->lock); + break; + case -ECONNABORTED: + spin_unlock(&call->lock); + break; + default: + __rxrpc_call_abort(call, ret); + break; + } + + _state(call); + + _leave(""); + +} /* end rxrpc_call_receive_data_packet() */ + +/*****************************************************************************/ +/* + * received an ACK packet + */ +static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call, + struct rxrpc_message *msg) +{ + struct rxrpc_ackpacket _ack, *ap; + rxrpc_serial_net_t serial; + rxrpc_seq_t seq; + int ret; + + _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq); + + /* extract the basic ACK record */ + ap = skb_header_pointer(msg->pkt, msg->offset, sizeof(_ack), &_ack); + if (ap == NULL) { + printk("Rx Received short ACK packet\n"); + return; + } + msg->offset += sizeof(_ack); + + serial = ap->serial; + seq = ntohl(ap->firstPacket); + + _proto("Rx Received ACK %%%d { b=%hu m=%hu f=%u p=%u s=%u r=%s n=%u }", + ntohl(msg->hdr.serial), + ntohs(ap->bufferSpace), + ntohs(ap->maxSkew), + seq, + ntohl(ap->previousPacket), + ntohl(serial), + rxrpc_acks[ap->reason], + call->ackr.nAcks + ); + + /* check the other side isn't ACK'ing a sequence number I haven't sent + * yet */ + if (ap->nAcks > 0 && + (seq > call->snd_seq_count || + seq + ap->nAcks - 1 > call->snd_seq_count)) { + printk("Received ACK (#%u-#%u) for unsent packet\n", + seq, seq + ap->nAcks - 1); + rxrpc_call_abort(call, -EINVAL); + _leave(""); + return; + } + + /* deal with RTT calculation */ + if (serial) { + struct rxrpc_message *rttmsg; + + /* find the prompting packet */ + spin_lock(&call->lock); + if (call->snd_ping && call->snd_ping->hdr.serial == serial) { + /* it was a ping packet */ + rttmsg = call->snd_ping; + call->snd_ping = NULL; + spin_unlock(&call->lock); + + if (rttmsg) { + rttmsg->rttdone = 1; + rxrpc_peer_calculate_rtt(call->conn->peer, + rttmsg, msg); + rxrpc_put_message(rttmsg); + } + } + else { + struct list_head *_p; + + /* it ought to be a data packet - look in the pending + * ACK list */ + list_for_each(_p, &call->acks_pendq) { + rttmsg = list_entry(_p, struct rxrpc_message, + link); + if (rttmsg->hdr.serial == serial) { + if (rttmsg->rttdone) + /* never do RTT twice without + * resending */ + break; + + rttmsg->rttdone = 1; + rxrpc_peer_calculate_rtt( + call->conn->peer, rttmsg, msg); + break; + } + } + spin_unlock(&call->lock); + } + } + + switch (ap->reason) { + /* deal with negative/positive acknowledgement of data + * packets */ + case RXRPC_ACK_REQUESTED: + case RXRPC_ACK_DELAY: + case RXRPC_ACK_IDLE: + rxrpc_call_definitively_ACK(call, seq - 1); + + case RXRPC_ACK_DUPLICATE: + case RXRPC_ACK_OUT_OF_SEQUENCE: + case RXRPC_ACK_EXCEEDS_WINDOW: + call->snd_resend_cnt = 0; + ret = rxrpc_call_record_ACK(call, msg, seq, ap->nAcks); + if (ret < 0) + rxrpc_call_abort(call, ret); + break; + + /* respond to ping packets immediately */ + case RXRPC_ACK_PING: + rxrpc_call_generate_ACK(call, &msg->hdr, ap); + break; + + /* only record RTT on ping response packets */ + case RXRPC_ACK_PING_RESPONSE: + if (call->snd_ping) { + struct rxrpc_message *rttmsg; + + /* only do RTT stuff if the response matches the + * retained ping */ + rttmsg = NULL; + spin_lock(&call->lock); + if (call->snd_ping && + call->snd_ping->hdr.serial == ap->serial) { + rttmsg = call->snd_ping; + call->snd_ping = NULL; + } + spin_unlock(&call->lock); + + if (rttmsg) { + rttmsg->rttdone = 1; + rxrpc_peer_calculate_rtt(call->conn->peer, + rttmsg, msg); + rxrpc_put_message(rttmsg); + } + } + break; + + default: + printk("Unsupported ACK reason %u\n", ap->reason); + break; + } + + _leave(""); +} /* end rxrpc_call_receive_ack_packet() */ + +/*****************************************************************************/ +/* + * record definitive ACKs for all messages up to and including the one with the + * 'highest' seq + */ +static void rxrpc_call_definitively_ACK(struct rxrpc_call *call, + rxrpc_seq_t highest) +{ + struct rxrpc_message *msg; + int now_complete; + + _enter("%p{ads=%u},%u", call, call->acks_dftv_seq, highest); + + while (call->acks_dftv_seq < highest) { + call->acks_dftv_seq++; + + _proto("Definitive ACK on packet #%u", call->acks_dftv_seq); + + /* discard those at front of queue until message with highest + * ACK is found */ + spin_lock(&call->lock); + msg = NULL; + if (!list_empty(&call->acks_pendq)) { + msg = list_entry(call->acks_pendq.next, + struct rxrpc_message, link); + list_del_init(&msg->link); /* dequeue */ + if (msg->state == RXRPC_MSG_SENT) + call->acks_pend_cnt--; + } + spin_unlock(&call->lock); + + /* insanity check */ + if (!msg) + panic("%s(): acks_pendq unexpectedly empty\n", + __FUNCTION__); + + if (msg->seq != call->acks_dftv_seq) + panic("%s(): Packet #%u expected at front of acks_pendq" + " (#%u found)\n", + __FUNCTION__, call->acks_dftv_seq, msg->seq); + + /* discard the message */ + msg->state = RXRPC_MSG_DONE; + rxrpc_put_message(msg); + } + + /* if all sent packets are definitively ACK'd then prod any sleepers just in case */ + now_complete = 0; + spin_lock(&call->lock); + if (call->acks_dftv_seq == call->snd_seq_count) { + if (call->app_call_state != RXRPC_CSTATE_COMPLETE) { + call->app_call_state = RXRPC_CSTATE_COMPLETE; + _state(call); + now_complete = 1; + } + } + spin_unlock(&call->lock); + + if (now_complete) { + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->rcv_timeout); + del_timer_sync(&call->ackr_dfr_timo); + call->app_attn_func(call); + } + + _leave(""); +} /* end rxrpc_call_definitively_ACK() */ + +/*****************************************************************************/ +/* + * record the specified amount of ACKs/NAKs + */ +static int rxrpc_call_record_ACK(struct rxrpc_call *call, + struct rxrpc_message *msg, + rxrpc_seq_t seq, + size_t count) +{ + struct rxrpc_message *dmsg; + struct list_head *_p; + rxrpc_seq_t highest; + unsigned ix; + size_t chunk; + char resend, now_complete; + u8 acks[16]; + + _enter("%p{apc=%u ads=%u},%p,%u,%Zu", + call, call->acks_pend_cnt, call->acks_dftv_seq, + msg, seq, count); + + /* handle re-ACK'ing of definitively ACK'd packets (may be out-of-order + * ACKs) */ + if (seq <= call->acks_dftv_seq) { + unsigned delta = call->acks_dftv_seq - seq; + + if (count <= delta) { + _leave(" = 0 [all definitively ACK'd]"); + return 0; + } + + seq += delta; + count -= delta; + msg->offset += delta; + } + + highest = seq + count - 1; + resend = 0; + while (count > 0) { + /* extract up to 16 ACK slots at a time */ + chunk = min(count, sizeof(acks)); + count -= chunk; + + memset(acks, 2, sizeof(acks)); + + if (skb_copy_bits(msg->pkt, msg->offset, &acks, chunk) < 0) { + printk("Rx Received short ACK packet\n"); + _leave(" = -EINVAL"); + return -EINVAL; + } + msg->offset += chunk; + + /* check that the ACK set is valid */ + for (ix = 0; ix < chunk; ix++) { + switch (acks[ix]) { + case RXRPC_ACK_TYPE_ACK: + break; + case RXRPC_ACK_TYPE_NACK: + resend = 1; + break; + default: + printk("Rx Received unsupported ACK state" + " %u\n", acks[ix]); + _leave(" = -EINVAL"); + return -EINVAL; + } + } + + _proto("Rx ACK of packets #%u-#%u " + "[%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c] (pend=%u)", + seq, (unsigned) (seq + chunk - 1), + _acktype[acks[0x0]], + _acktype[acks[0x1]], + _acktype[acks[0x2]], + _acktype[acks[0x3]], + _acktype[acks[0x4]], + _acktype[acks[0x5]], + _acktype[acks[0x6]], + _acktype[acks[0x7]], + _acktype[acks[0x8]], + _acktype[acks[0x9]], + _acktype[acks[0xA]], + _acktype[acks[0xB]], + _acktype[acks[0xC]], + _acktype[acks[0xD]], + _acktype[acks[0xE]], + _acktype[acks[0xF]], + call->acks_pend_cnt + ); + + /* mark the packets in the ACK queue as being provisionally + * ACK'd */ + ix = 0; + spin_lock(&call->lock); + + /* find the first packet ACK'd/NAK'd here */ + list_for_each(_p, &call->acks_pendq) { + dmsg = list_entry(_p, struct rxrpc_message, link); + if (dmsg->seq == seq) + goto found_first; + _debug("- %u: skipping #%u", ix, dmsg->seq); + } + goto bad_queue; + + found_first: + do { + _debug("- %u: processing #%u (%c) apc=%u", + ix, dmsg->seq, _acktype[acks[ix]], + call->acks_pend_cnt); + + if (acks[ix] == RXRPC_ACK_TYPE_ACK) { + if (dmsg->state == RXRPC_MSG_SENT) + call->acks_pend_cnt--; + dmsg->state = RXRPC_MSG_ACKED; + } + else { + if (dmsg->state == RXRPC_MSG_ACKED) + call->acks_pend_cnt++; + dmsg->state = RXRPC_MSG_SENT; + } + ix++; + seq++; + + _p = dmsg->link.next; + dmsg = list_entry(_p, struct rxrpc_message, link); + } while(ix < chunk && + _p != &call->acks_pendq && + dmsg->seq == seq); + + if (ix < chunk) + goto bad_queue; + + spin_unlock(&call->lock); + } + + if (resend) + rxrpc_call_resend(call, highest); + + /* if all packets are provisionally ACK'd, then wake up anyone who's + * waiting for that */ + now_complete = 0; + spin_lock(&call->lock); + if (call->acks_pend_cnt == 0) { + if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_FINAL_ACK) { + call->app_call_state = RXRPC_CSTATE_COMPLETE; + _state(call); + } + now_complete = 1; + } + spin_unlock(&call->lock); + + if (now_complete) { + _debug("- wake up waiters"); + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->rcv_timeout); + del_timer_sync(&call->ackr_dfr_timo); + call->app_attn_func(call); + } + + _leave(" = 0 (apc=%u)", call->acks_pend_cnt); + return 0; + + bad_queue: + panic("%s(): acks_pendq in bad state (packet #%u absent)\n", + __FUNCTION__, seq); + +} /* end rxrpc_call_record_ACK() */ + +/*****************************************************************************/ +/* + * transfer data from the ready packet queue to the asynchronous read buffer + * - since this func is the only one going to look at packets queued on + * app_readyq, we don't need a lock to modify or access them, only to modify + * the queue pointers + * - called with call->lock held + * - the buffer must be in kernel space + * - returns: + * 0 if buffer filled + * -EAGAIN if buffer not filled and more data to come + * -EBADMSG if last packet received and insufficient data left + * -ECONNABORTED if the call has in an error state + */ +static int __rxrpc_call_read_data(struct rxrpc_call *call) +{ + struct rxrpc_message *msg; + size_t qty; + int ret; + + _enter("%p{as=%d buf=%p qty=%Zu/%Zu}", + call, + call->app_async_read, call->app_read_buf, + call->app_ready_qty, call->app_mark); + + /* check the state */ + switch (call->app_call_state) { + case RXRPC_CSTATE_SRVR_RCV_ARGS: + case RXRPC_CSTATE_CLNT_RCV_REPLY: + if (call->app_last_rcv) { + printk("%s(%p,%p,%Zd):" + " Inconsistent call state (%s, last pkt)", + __FUNCTION__, + call, call->app_read_buf, call->app_mark, + rxrpc_call_states[call->app_call_state]); + BUG(); + } + break; + + case RXRPC_CSTATE_SRVR_RCV_OPID: + case RXRPC_CSTATE_SRVR_GOT_ARGS: + case RXRPC_CSTATE_CLNT_GOT_REPLY: + break; + + case RXRPC_CSTATE_SRVR_SND_REPLY: + if (!call->app_last_rcv) { + printk("%s(%p,%p,%Zd):" + " Inconsistent call state (%s, not last pkt)", + __FUNCTION__, + call, call->app_read_buf, call->app_mark, + rxrpc_call_states[call->app_call_state]); + BUG(); + } + _debug("Trying to read data from call in SND_REPLY state"); + break; + + case RXRPC_CSTATE_ERROR: + _leave(" = -ECONNABORTED"); + return -ECONNABORTED; + + default: + printk("reading in unexpected state [[[ %u ]]]\n", + call->app_call_state); + BUG(); + } + + /* handle the case of not having an async buffer */ + if (!call->app_async_read) { + if (call->app_mark == RXRPC_APP_MARK_EOF) { + ret = call->app_last_rcv ? 0 : -EAGAIN; + } + else { + if (call->app_mark >= call->app_ready_qty) { + call->app_mark = RXRPC_APP_MARK_EOF; + ret = 0; + } + else { + ret = call->app_last_rcv ? -EBADMSG : -EAGAIN; + } + } + + _leave(" = %d [no buf]", ret); + return 0; + } + + while (!list_empty(&call->app_readyq) && call->app_mark > 0) { + msg = list_entry(call->app_readyq.next, + struct rxrpc_message, link); + + /* drag as much data as we need out of this packet */ + qty = min(call->app_mark, msg->dsize); + + _debug("reading %Zu from skb=%p off=%lu", + qty, msg->pkt, msg->offset); + + if (call->app_read_buf) + if (skb_copy_bits(msg->pkt, msg->offset, + call->app_read_buf, qty) < 0) + panic("%s: Failed to copy data from packet:" + " (%p,%p,%Zd)", + __FUNCTION__, + call, call->app_read_buf, qty); + + /* if that packet is now empty, discard it */ + call->app_ready_qty -= qty; + msg->dsize -= qty; + + if (msg->dsize == 0) { + list_del_init(&msg->link); + rxrpc_put_message(msg); + } + else { + msg->offset += qty; + } + + call->app_mark -= qty; + if (call->app_read_buf) + call->app_read_buf += qty; + } + + if (call->app_mark == 0) { + call->app_async_read = 0; + call->app_mark = RXRPC_APP_MARK_EOF; + call->app_read_buf = NULL; + + /* adjust the state if used up all packets */ + if (list_empty(&call->app_readyq) && call->app_last_rcv) { + switch (call->app_call_state) { + case RXRPC_CSTATE_SRVR_RCV_OPID: + call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY; + call->app_mark = RXRPC_APP_MARK_EOF; + _state(call); + del_timer_sync(&call->rcv_timeout); + break; + case RXRPC_CSTATE_SRVR_GOT_ARGS: + call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY; + _state(call); + del_timer_sync(&call->rcv_timeout); + break; + default: + call->app_call_state = RXRPC_CSTATE_COMPLETE; + _state(call); + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->ackr_dfr_timo); + del_timer_sync(&call->rcv_timeout); + break; + } + } + + _leave(" = 0"); + return 0; + } + + if (call->app_last_rcv) { + _debug("Insufficient data (%Zu/%Zu)", + call->app_ready_qty, call->app_mark); + call->app_async_read = 0; + call->app_mark = RXRPC_APP_MARK_EOF; + call->app_read_buf = NULL; + + _leave(" = -EBADMSG"); + return -EBADMSG; + } + + _leave(" = -EAGAIN"); + return -EAGAIN; +} /* end __rxrpc_call_read_data() */ + +/*****************************************************************************/ +/* + * attempt to read the specified amount of data from the call's ready queue + * into the buffer provided + * - since this func is the only one going to look at packets queued on + * app_readyq, we don't need a lock to modify or access them, only to modify + * the queue pointers + * - if the buffer pointer is NULL, then data is merely drained, not copied + * - if flags&RXRPC_CALL_READ_BLOCK, then the function will wait until there is + * enough data or an error will be generated + * - note that the caller must have added the calling task to the call's wait + * queue beforehand + * - if flags&RXRPC_CALL_READ_ALL, then an error will be generated if this + * function doesn't read all available data + */ +int rxrpc_call_read_data(struct rxrpc_call *call, + void *buffer, size_t size, int flags) +{ + int ret; + + _enter("%p{arq=%Zu},%p,%Zd,%x", + call, call->app_ready_qty, buffer, size, flags); + + spin_lock(&call->lock); + + if (unlikely(!!call->app_read_buf)) { + spin_unlock(&call->lock); + _leave(" = -EBUSY"); + return -EBUSY; + } + + call->app_mark = size; + call->app_read_buf = buffer; + call->app_async_read = 1; + call->app_read_count++; + + /* read as much data as possible */ + ret = __rxrpc_call_read_data(call); + switch (ret) { + case 0: + if (flags & RXRPC_CALL_READ_ALL && + (!call->app_last_rcv || call->app_ready_qty > 0)) { + _leave(" = -EBADMSG"); + __rxrpc_call_abort(call, -EBADMSG); + return -EBADMSG; + } + + spin_unlock(&call->lock); + call->app_attn_func(call); + _leave(" = 0"); + return ret; + + case -ECONNABORTED: + spin_unlock(&call->lock); + _leave(" = %d [aborted]", ret); + return ret; + + default: + __rxrpc_call_abort(call, ret); + _leave(" = %d", ret); + return ret; + + case -EAGAIN: + spin_unlock(&call->lock); + + if (!(flags & RXRPC_CALL_READ_BLOCK)) { + _leave(" = -EAGAIN"); + return -EAGAIN; + } + + /* wait for the data to arrive */ + _debug("blocking for data arrival"); + + for (;;) { + set_current_state(TASK_INTERRUPTIBLE); + if (!call->app_async_read || signal_pending(current)) + break; + schedule(); + } + set_current_state(TASK_RUNNING); + + if (signal_pending(current)) { + _leave(" = -EINTR"); + return -EINTR; + } + + if (call->app_call_state == RXRPC_CSTATE_ERROR) { + _leave(" = -ECONNABORTED"); + return -ECONNABORTED; + } + + _leave(" = 0"); + return 0; + } + +} /* end rxrpc_call_read_data() */ + +/*****************************************************************************/ +/* + * write data to a call + * - the data may not be sent immediately if it doesn't fill a buffer + * - if we can't queue all the data for buffering now, siov[] will have been + * adjusted to take account of what has been sent + */ +int rxrpc_call_write_data(struct rxrpc_call *call, + size_t sioc, + struct kvec *siov, + u8 rxhdr_flags, + int alloc_flags, + int dup_data, + size_t *size_sent) +{ + struct rxrpc_message *msg; + struct kvec *sptr; + size_t space, size, chunk, tmp; + char *buf; + int ret; + + _enter("%p,%Zu,%p,%02x,%x,%d,%p", + call, sioc, siov, rxhdr_flags, alloc_flags, dup_data, + size_sent); + + *size_sent = 0; + size = 0; + ret = -EINVAL; + + /* can't send more if we've sent last packet from this end */ + switch (call->app_call_state) { + case RXRPC_CSTATE_SRVR_SND_REPLY: + case RXRPC_CSTATE_CLNT_SND_ARGS: + break; + case RXRPC_CSTATE_ERROR: + ret = call->app_errno; + default: + goto out; + } + + /* calculate how much data we've been given */ + sptr = siov; + for (; sioc > 0; sptr++, sioc--) { + if (!sptr->iov_len) + continue; + + if (!sptr->iov_base) + goto out; + + size += sptr->iov_len; + } + + _debug("- size=%Zu mtu=%Zu", size, call->conn->mtu_size); + + do { + /* make sure there's a message under construction */ + if (!call->snd_nextmsg) { + /* no - allocate a message with no data yet attached */ + ret = rxrpc_conn_newmsg(call->conn, call, + RXRPC_PACKET_TYPE_DATA, + 0, NULL, alloc_flags, + &call->snd_nextmsg); + if (ret < 0) + goto out; + _debug("- allocated new message [ds=%Zu]", + call->snd_nextmsg->dsize); + } + + msg = call->snd_nextmsg; + msg->hdr.flags |= rxhdr_flags; + + /* deal with zero-length terminal packet */ + if (size == 0) { + if (rxhdr_flags & RXRPC_LAST_PACKET) { + ret = rxrpc_call_flush(call); + if (ret < 0) + goto out; + } + break; + } + + /* work out how much space current packet has available */ + space = call->conn->mtu_size - msg->dsize; + chunk = min(space, size); + + _debug("- [before] space=%Zu chunk=%Zu", space, chunk); + + while (!siov->iov_len) + siov++; + + /* if we are going to have to duplicate the data then coalesce + * it too */ + if (dup_data) { + /* don't allocate more that 1 page at a time */ + if (chunk > PAGE_SIZE) + chunk = PAGE_SIZE; + + /* allocate a data buffer and attach to the message */ + buf = kmalloc(chunk, alloc_flags); + if (unlikely(!buf)) { + if (msg->dsize == + sizeof(struct rxrpc_header)) { + /* discard an empty msg and wind back + * the seq counter */ + rxrpc_put_message(msg); + call->snd_nextmsg = NULL; + call->snd_seq_count--; + } + + ret = -ENOMEM; + goto out; + } + + tmp = msg->dcount++; + set_bit(tmp, &msg->dfree); + msg->data[tmp].iov_base = buf; + msg->data[tmp].iov_len = chunk; + msg->dsize += chunk; + *size_sent += chunk; + size -= chunk; + + /* load the buffer with data */ + while (chunk > 0) { + tmp = min(chunk, siov->iov_len); + memcpy(buf, siov->iov_base, tmp); + buf += tmp; + siov->iov_base += tmp; + siov->iov_len -= tmp; + if (!siov->iov_len) + siov++; + chunk -= tmp; + } + } + else { + /* we want to attach the supplied buffers directly */ + while (chunk > 0 && + msg->dcount < RXRPC_MSG_MAX_IOCS) { + tmp = msg->dcount++; + msg->data[tmp].iov_base = siov->iov_base; + msg->data[tmp].iov_len = siov->iov_len; + msg->dsize += siov->iov_len; + *size_sent += siov->iov_len; + size -= siov->iov_len; + chunk -= siov->iov_len; + siov++; + } + } + + _debug("- [loaded] chunk=%Zu size=%Zu", chunk, size); + + /* dispatch the message when full, final or requesting ACK */ + if (msg->dsize >= call->conn->mtu_size || rxhdr_flags) { + ret = rxrpc_call_flush(call); + if (ret < 0) + goto out; + } + + } while(size > 0); + + ret = 0; + out: + _leave(" = %d (%Zd queued, %Zd rem)", ret, *size_sent, size); + return ret; + +} /* end rxrpc_call_write_data() */ + +/*****************************************************************************/ +/* + * flush outstanding packets to the network + */ +static int rxrpc_call_flush(struct rxrpc_call *call) +{ + struct rxrpc_message *msg; + int ret = 0; + + _enter("%p", call); + + rxrpc_get_call(call); + + /* if there's a packet under construction, then dispatch it now */ + if (call->snd_nextmsg) { + msg = call->snd_nextmsg; + call->snd_nextmsg = NULL; + + if (msg->hdr.flags & RXRPC_LAST_PACKET) { + msg->hdr.flags &= ~RXRPC_MORE_PACKETS; + if (call->app_call_state != RXRPC_CSTATE_CLNT_SND_ARGS) + msg->hdr.flags |= RXRPC_REQUEST_ACK; + } + else { + msg->hdr.flags |= RXRPC_MORE_PACKETS; + } + + _proto("Sending DATA message { ds=%Zu dc=%u df=%02lu }", + msg->dsize, msg->dcount, msg->dfree); + + /* queue and adjust call state */ + spin_lock(&call->lock); + list_add_tail(&msg->link, &call->acks_pendq); + + /* decide what to do depending on current state and if this is + * the last packet */ + ret = -EINVAL; + switch (call->app_call_state) { + case RXRPC_CSTATE_SRVR_SND_REPLY: + if (msg->hdr.flags & RXRPC_LAST_PACKET) { + call->app_call_state = + RXRPC_CSTATE_SRVR_RCV_FINAL_ACK; + _state(call); + } + break; + + case RXRPC_CSTATE_CLNT_SND_ARGS: + if (msg->hdr.flags & RXRPC_LAST_PACKET) { + call->app_call_state = + RXRPC_CSTATE_CLNT_RCV_REPLY; + _state(call); + } + break; + + case RXRPC_CSTATE_ERROR: + ret = call->app_errno; + default: + spin_unlock(&call->lock); + goto out; + } + + call->acks_pend_cnt++; + + mod_timer(&call->acks_timeout, + __rxrpc_rtt_based_timeout(call, + rxrpc_call_acks_timeout)); + + spin_unlock(&call->lock); + + ret = rxrpc_conn_sendmsg(call->conn, msg); + if (ret == 0) + call->pkt_snd_count++; + } + + out: + rxrpc_put_call(call); + + _leave(" = %d", ret); + return ret; + +} /* end rxrpc_call_flush() */ + +/*****************************************************************************/ +/* + * resend NAK'd or unacknowledged packets up to the highest one specified + */ +static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest) +{ + struct rxrpc_message *msg; + struct list_head *_p; + rxrpc_seq_t seq = 0; + + _enter("%p,%u", call, highest); + + _proto("Rx Resend required"); + + /* handle too many resends */ + if (call->snd_resend_cnt >= rxrpc_call_max_resend) { + _debug("Aborting due to too many resends (rcv=%d)", + call->pkt_rcv_count); + rxrpc_call_abort(call, + call->pkt_rcv_count > 0 ? -EIO : -ETIMEDOUT); + _leave(""); + return; + } + + spin_lock(&call->lock); + call->snd_resend_cnt++; + for (;;) { + /* determine which the next packet we might need to ACK is */ + if (seq <= call->acks_dftv_seq) + seq = call->acks_dftv_seq; + seq++; + + if (seq > highest) + break; + + /* look for the packet in the pending-ACK queue */ + list_for_each(_p, &call->acks_pendq) { + msg = list_entry(_p, struct rxrpc_message, link); + if (msg->seq == seq) + goto found_msg; + } + + panic("%s(%p,%d):" + " Inconsistent pending-ACK queue (ds=%u sc=%u sq=%u)\n", + __FUNCTION__, call, highest, + call->acks_dftv_seq, call->snd_seq_count, seq); + + found_msg: + if (msg->state != RXRPC_MSG_SENT) + continue; /* only un-ACK'd packets */ + + rxrpc_get_message(msg); + spin_unlock(&call->lock); + + /* send each message again (and ignore any errors we might + * incur) */ + _proto("Resending DATA message { ds=%Zu dc=%u df=%02lu }", + msg->dsize, msg->dcount, msg->dfree); + + if (rxrpc_conn_sendmsg(call->conn, msg) == 0) + call->pkt_snd_count++; + + rxrpc_put_message(msg); + + spin_lock(&call->lock); + } + + /* reset the timeout */ + mod_timer(&call->acks_timeout, + __rxrpc_rtt_based_timeout(call, rxrpc_call_acks_timeout)); + + spin_unlock(&call->lock); + + _leave(""); +} /* end rxrpc_call_resend() */ + +/*****************************************************************************/ +/* + * handle an ICMP error being applied to a call + */ +void rxrpc_call_handle_error(struct rxrpc_call *call, int local, int errno) +{ + _enter("%p{%u},%d", call, ntohl(call->call_id), errno); + + /* if this call is already aborted, then just wake up any waiters */ + if (call->app_call_state == RXRPC_CSTATE_ERROR) { + call->app_error_func(call); + } + else { + /* tell the app layer what happened */ + spin_lock(&call->lock); + call->app_call_state = RXRPC_CSTATE_ERROR; + _state(call); + if (local) + call->app_err_state = RXRPC_ESTATE_LOCAL_ERROR; + else + call->app_err_state = RXRPC_ESTATE_REMOTE_ERROR; + call->app_errno = errno; + call->app_mark = RXRPC_APP_MARK_EOF; + call->app_read_buf = NULL; + call->app_async_read = 0; + + /* map the error */ + call->app_aemap_func(call); + + del_timer_sync(&call->acks_timeout); + del_timer_sync(&call->rcv_timeout); + del_timer_sync(&call->ackr_dfr_timo); + + spin_unlock(&call->lock); + + call->app_error_func(call); + } + + _leave(""); +} /* end rxrpc_call_handle_error() */ |