diff options
-rw-r--r-- | fs/dlm/lowcomms-sctp.c | 145 | ||||
-rw-r--r-- | fs/dlm/lowcomms-tcp.c | 254 |
2 files changed, 115 insertions, 284 deletions
diff --git a/fs/dlm/lowcomms-sctp.c b/fs/dlm/lowcomms-sctp.c index 5aeadadd8afd..dc83a9d979b5 100644 --- a/fs/dlm/lowcomms-sctp.c +++ b/fs/dlm/lowcomms-sctp.c @@ -72,6 +72,8 @@ struct nodeinfo { struct list_head writequeue; /* outgoing writequeue_entries */ spinlock_t writequeue_lock; int nodeid; + struct work_struct swork; /* Send workqueue */ + struct work_struct lwork; /* Locking workqueue */ }; static DEFINE_IDR(nodeinfo_idr); @@ -96,6 +98,7 @@ struct connection { atomic_t waiting_requests; struct cbuf cb; int eagain_flag; + struct work_struct work; /* Send workqueue */ }; /* An entry waiting to be sent */ @@ -137,19 +140,23 @@ static void cbuf_eat(struct cbuf *cb, int n) static LIST_HEAD(write_nodes); static DEFINE_SPINLOCK(write_nodes_lock); + /* Maximum number of incoming messages to process before * doing a schedule() */ #define MAX_RX_MSG_COUNT 25 -/* Manage daemons */ -static struct task_struct *recv_task; -static struct task_struct *send_task; -static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait); +/* Work queues */ +static struct workqueue_struct *recv_workqueue; +static struct workqueue_struct *send_workqueue; +static struct workqueue_struct *lock_workqueue; /* The SCTP connection */ static struct connection sctp_con; +static void process_send_sockets(struct work_struct *work); +static void process_recv_sockets(struct work_struct *work); +static void process_lock_request(struct work_struct *work); static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) { @@ -222,6 +229,8 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc) spin_lock_init(&ni->lock); INIT_LIST_HEAD(&ni->writequeue); spin_lock_init(&ni->writequeue_lock); + INIT_WORK(&ni->lwork, process_lock_request); + INIT_WORK(&ni->swork, process_send_sockets); ni->nodeid = nodeid; if (nodeid > max_nodeid) @@ -249,11 +258,8 @@ static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc) /* Data or notification available on socket */ static void lowcomms_data_ready(struct sock *sk, int count_unused) { - atomic_inc(&sctp_con.waiting_requests); if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags)) - return; - - wake_up_interruptible(&lowcomms_recv_wait); + queue_work(recv_workqueue, &sctp_con.work); } @@ -361,10 +367,10 @@ static void init_failed(void) spin_lock_bh(&write_nodes_lock); list_add_tail(&ni->write_list, &write_nodes); spin_unlock_bh(&write_nodes_lock); + queue_work(send_workqueue, &ni->swork); } } } - wake_up_process(send_task); } /* Something happened to an association */ @@ -446,8 +452,8 @@ static void process_sctp_notification(struct msghdr *msg, char *buf) spin_lock_bh(&write_nodes_lock); list_add_tail(&ni->write_list, &write_nodes); spin_unlock_bh(&write_nodes_lock); + queue_work(send_workqueue, &ni->swork); } - wake_up_process(send_task); } break; @@ -580,8 +586,8 @@ static int receive_from_sock(void) spin_lock_bh(&write_nodes_lock); list_add_tail(&ni->write_list, &write_nodes); spin_unlock_bh(&write_nodes_lock); + queue_work(send_workqueue, &ni->swork); } - wake_up_process(send_task); } } @@ -590,6 +596,7 @@ static int receive_from_sock(void) return 0; cbuf_add(&sctp_con.cb, ret); + // PJC: TODO: Add to node's workqueue....can we ?? ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), page_address(sctp_con.rx_page), sctp_con.cb.base, sctp_con.cb.len, @@ -820,7 +827,8 @@ void dlm_lowcomms_commit_buffer(void *arg) spin_lock_bh(&write_nodes_lock); list_add_tail(&ni->write_list, &write_nodes); spin_unlock_bh(&write_nodes_lock); - wake_up_process(send_task); + + queue_work(send_workqueue, &ni->swork); } return; @@ -1088,101 +1096,75 @@ int dlm_lowcomms_close(int nodeid) return 0; } -static int write_list_empty(void) +// PJC: The work queue function for receiving. +static void process_recv_sockets(struct work_struct *work) { - int status; + if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) { + int ret; + int count = 0; - spin_lock_bh(&write_nodes_lock); - status = list_empty(&write_nodes); - spin_unlock_bh(&write_nodes_lock); + do { + ret = receive_from_sock(); - return status; + /* Don't starve out everyone else */ + if (++count >= MAX_RX_MSG_COUNT) { + cond_resched(); + count = 0; + } + } while (!kthread_should_stop() && ret >=0); + } + cond_resched(); } -static int dlm_recvd(void *data) +// PJC: the work queue function for sending +static void process_send_sockets(struct work_struct *work) { - DECLARE_WAITQUEUE(wait, current); - - while (!kthread_should_stop()) { - int count = 0; - - set_current_state(TASK_INTERRUPTIBLE); - add_wait_queue(&lowcomms_recv_wait, &wait); - if (!test_bit(CF_READ_PENDING, &sctp_con.flags)) - schedule(); - remove_wait_queue(&lowcomms_recv_wait, &wait); - set_current_state(TASK_RUNNING); - - if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) { - int ret; - - do { - ret = receive_from_sock(); - - /* Don't starve out everyone else */ - if (++count >= MAX_RX_MSG_COUNT) { - cond_resched(); - count = 0; - } - } while (!kthread_should_stop() && ret >=0); - } - cond_resched(); + if (sctp_con.eagain_flag) { + sctp_con.eagain_flag = 0; + refill_write_queue(); } - - return 0; + process_output_queue(); } -static int dlm_sendd(void *data) +// PJC: Process lock requests from a particular node. +// TODO: can we optimise this out on UP ?? +static void process_lock_request(struct work_struct *work) { - DECLARE_WAITQUEUE(wait, current); - - add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait); - - while (!kthread_should_stop()) { - set_current_state(TASK_INTERRUPTIBLE); - if (write_list_empty()) - schedule(); - set_current_state(TASK_RUNNING); - - if (sctp_con.eagain_flag) { - sctp_con.eagain_flag = 0; - refill_write_queue(); - } - process_output_queue(); - } - - remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait); - - return 0; } static void daemons_stop(void) { - kthread_stop(recv_task); - kthread_stop(send_task); + destroy_workqueue(recv_workqueue); + destroy_workqueue(send_workqueue); + destroy_workqueue(lock_workqueue); } static int daemons_start(void) { - struct task_struct *p; int error; + recv_workqueue = create_workqueue("dlm_recv"); + error = IS_ERR(recv_workqueue); + if (error) { + log_print("can't start dlm_recv %d", error); + return error; + } - p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); - error = IS_ERR(p); + send_workqueue = create_singlethread_workqueue("dlm_send"); + error = IS_ERR(send_workqueue); if (error) { - log_print("can't start dlm_recvd %d", error); + log_print("can't start dlm_send %d", error); + destroy_workqueue(recv_workqueue); return error; } - recv_task = p; - p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); - error = IS_ERR(p); + lock_workqueue = create_workqueue("dlm_rlock"); + error = IS_ERR(lock_workqueue); if (error) { - log_print("can't start dlm_sendd %d", error); - kthread_stop(recv_task); + log_print("can't start dlm_rlock %d", error); + destroy_workqueue(send_workqueue); + destroy_workqueue(recv_workqueue); return error; } - send_task = p; return 0; } @@ -1194,6 +1176,8 @@ int dlm_lowcomms_start(void) { int error; + INIT_WORK(&sctp_con.work, process_recv_sockets); + error = init_sock(); if (error) goto fail_sock; @@ -1224,4 +1208,3 @@ void dlm_lowcomms_stop(void) for (i = 0; i < dlm_local_count; i++) kfree(dlm_local_addr[i]); } - diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c index b4fb5783ef8a..86e5f81da7cb 100644 --- a/fs/dlm/lowcomms-tcp.c +++ b/fs/dlm/lowcomms-tcp.c @@ -115,6 +115,8 @@ struct connection { atomic_t waiting_requests; #define MAX_CONNECT_RETRIES 3 struct connection *othercon; + struct work_struct rwork; /* Receive workqueue */ + struct work_struct swork; /* Send workqueue */ }; #define sock2con(x) ((struct connection *)(x)->sk_user_data) @@ -131,14 +133,9 @@ struct writequeue_entry { static struct sockaddr_storage dlm_local_addr; -/* Manage daemons */ -static struct task_struct *recv_task; -static struct task_struct *send_task; - -static wait_queue_t lowcomms_send_waitq_head; -static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq); -static wait_queue_t lowcomms_recv_waitq_head; -static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq); +/* Work queues */ +static struct workqueue_struct *recv_workqueue; +static struct workqueue_struct *send_workqueue; /* An array of pointers to connections, indexed by NODEID */ static struct connection **connections; @@ -146,17 +143,8 @@ static DECLARE_MUTEX(connections_lock); static struct kmem_cache *con_cache; static int conn_array_size; -/* List of sockets that have reads pending */ -static LIST_HEAD(read_sockets); -static DEFINE_SPINLOCK(read_sockets_lock); - -/* List of sockets which have writes pending */ -static LIST_HEAD(write_sockets); -static DEFINE_SPINLOCK(write_sockets_lock); - -/* List of sockets which have connects pending */ -static LIST_HEAD(state_sockets); -static DEFINE_SPINLOCK(state_sockets_lock); +static void process_recv_sockets(struct work_struct *work); +static void process_send_sockets(struct work_struct *work); static struct connection *nodeid2con(int nodeid, gfp_t allocation) { @@ -189,6 +177,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation) init_rwsem(&con->sock_sem); INIT_LIST_HEAD(&con->writequeue); spin_lock_init(&con->writequeue_lock); + INIT_WORK(&con->swork, process_send_sockets); + INIT_WORK(&con->rwork, process_recv_sockets); connections[nodeid] = con; } @@ -203,41 +193,22 @@ static void lowcomms_data_ready(struct sock *sk, int count_unused) { struct connection *con = sock2con(sk); - atomic_inc(&con->waiting_requests); - if (test_and_set_bit(CF_READ_PENDING, &con->flags)) - return; - - spin_lock_bh(&read_sockets_lock); - list_add_tail(&con->read_list, &read_sockets); - spin_unlock_bh(&read_sockets_lock); - - wake_up_interruptible(&lowcomms_recv_waitq); + if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) + queue_work(recv_workqueue, &con->rwork); } static void lowcomms_write_space(struct sock *sk) { struct connection *con = sock2con(sk); - if (test_and_set_bit(CF_WRITE_PENDING, &con->flags)) - return; - - spin_lock_bh(&write_sockets_lock); - list_add_tail(&con->write_list, &write_sockets); - spin_unlock_bh(&write_sockets_lock); - - wake_up_interruptible(&lowcomms_send_waitq); + if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) + queue_work(send_workqueue, &con->swork); } static inline void lowcomms_connect_sock(struct connection *con) { - if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) - return; - - spin_lock_bh(&state_sockets_lock); - list_add_tail(&con->state_list, &state_sockets); - spin_unlock_bh(&state_sockets_lock); - - wake_up_interruptible(&lowcomms_send_waitq); + if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) + queue_work(send_workqueue, &con->swork); } static void lowcomms_state_change(struct sock *sk) @@ -388,7 +359,8 @@ out: return 0; out_resched: - lowcomms_data_ready(con->sock->sk, 0); + if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) + queue_work(recv_workqueue, &con->rwork); up_read(&con->sock_sem); cond_resched(); return 0; @@ -477,6 +449,8 @@ static int accept_from_sock(struct connection *con) othercon->nodeid = nodeid; othercon->rx_action = receive_from_sock; init_rwsem(&othercon->sock_sem); + INIT_WORK(&othercon->swork, process_send_sockets); + INIT_WORK(&othercon->rwork, process_recv_sockets); set_bit(CF_IS_OTHERCON, &othercon->flags); newcon->othercon = othercon; } @@ -498,7 +472,8 @@ static int accept_from_sock(struct connection *con) * beween processing the accept adding the socket * to the read_sockets list */ - lowcomms_data_ready(newsock->sk, 0); + if (!test_and_set_bit(CF_READ_PENDING, &newcon->flags)) + queue_work(recv_workqueue, &newcon->rwork); up_read(&con->sock_sem); return 0; @@ -757,12 +732,8 @@ void dlm_lowcomms_commit_buffer(void *mh) kunmap(e->page); spin_unlock(&con->writequeue_lock); - if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) { - spin_lock_bh(&write_sockets_lock); - list_add_tail(&con->write_list, &write_sockets); - spin_unlock_bh(&write_sockets_lock); - - wake_up_interruptible(&lowcomms_send_waitq); + if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { + queue_work(send_workqueue, &con->swork); } return; @@ -803,6 +774,7 @@ static void send_to_sock(struct connection *con) offset = e->offset; BUG_ON(len == 0 && e->users == 0); spin_unlock(&con->writequeue_lock); + kmap(e->page); ret = 0; if (len) { @@ -884,85 +856,29 @@ out: } /* Look for activity on active sockets */ -static void process_sockets(void) +static void process_recv_sockets(struct work_struct *work) { - struct list_head *list; - struct list_head *temp; - int count = 0; - - spin_lock_bh(&read_sockets_lock); - list_for_each_safe(list, temp, &read_sockets) { - - struct connection *con = - list_entry(list, struct connection, read_list); - list_del(&con->read_list); - clear_bit(CF_READ_PENDING, &con->flags); - - spin_unlock_bh(&read_sockets_lock); + struct connection *con = container_of(work, struct connection, rwork); + int err; - /* This can reach zero if we are processing requests - * as they come in. - */ - if (atomic_read(&con->waiting_requests) == 0) { - spin_lock_bh(&read_sockets_lock); - continue; - } - - do { - con->rx_action(con); - - /* Don't starve out everyone else */ - if (++count >= MAX_RX_MSG_COUNT) { - cond_resched(); - count = 0; - } - - } while (!atomic_dec_and_test(&con->waiting_requests) && - !kthread_should_stop()); - - spin_lock_bh(&read_sockets_lock); - } - spin_unlock_bh(&read_sockets_lock); + clear_bit(CF_READ_PENDING, &con->flags); + do { + err = con->rx_action(con); + } while (!err); } -/* Try to send any messages that are pending - */ -static void process_output_queue(void) -{ - struct list_head *list; - struct list_head *temp; - - spin_lock_bh(&write_sockets_lock); - list_for_each_safe(list, temp, &write_sockets) { - struct connection *con = - list_entry(list, struct connection, write_list); - clear_bit(CF_WRITE_PENDING, &con->flags); - list_del(&con->write_list); - - spin_unlock_bh(&write_sockets_lock); - send_to_sock(con); - spin_lock_bh(&write_sockets_lock); - } - spin_unlock_bh(&write_sockets_lock); -} -static void process_state_queue(void) +static void process_send_sockets(struct work_struct *work) { - struct list_head *list; - struct list_head *temp; - - spin_lock_bh(&state_sockets_lock); - list_for_each_safe(list, temp, &state_sockets) { - struct connection *con = - list_entry(list, struct connection, state_list); - list_del(&con->state_list); - clear_bit(CF_CONNECT_PENDING, &con->flags); - spin_unlock_bh(&state_sockets_lock); + struct connection *con = container_of(work, struct connection, swork); + if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { connect_to_sock(con); - spin_lock_bh(&state_sockets_lock); } - spin_unlock_bh(&state_sockets_lock); + + if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) { + send_to_sock(con); + } } @@ -979,97 +895,29 @@ static void clean_writequeues(void) } } -static int read_list_empty(void) -{ - int status; - - spin_lock_bh(&read_sockets_lock); - status = list_empty(&read_sockets); - spin_unlock_bh(&read_sockets_lock); - - return status; -} - -/* DLM Transport comms receive daemon */ -static int dlm_recvd(void *data) +static void work_stop(void) { - init_waitqueue_entry(&lowcomms_recv_waitq_head, current); - add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head); - - while (!kthread_should_stop()) { - set_current_state(TASK_INTERRUPTIBLE); - if (read_list_empty()) - schedule(); - set_current_state(TASK_RUNNING); - - process_sockets(); - } - - return 0; + destroy_workqueue(recv_workqueue); + destroy_workqueue(send_workqueue); } -static int write_and_state_lists_empty(void) +static int work_start(void) { - int status; - - spin_lock_bh(&write_sockets_lock); - status = list_empty(&write_sockets); - spin_unlock_bh(&write_sockets_lock); - - spin_lock_bh(&state_sockets_lock); - if (list_empty(&state_sockets) == 0) - status = 0; - spin_unlock_bh(&state_sockets_lock); - - return status; -} - -/* DLM Transport send daemon */ -static int dlm_sendd(void *data) -{ - init_waitqueue_entry(&lowcomms_send_waitq_head, current); - add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head); - - while (!kthread_should_stop()) { - set_current_state(TASK_INTERRUPTIBLE); - if (write_and_state_lists_empty()) - schedule(); - set_current_state(TASK_RUNNING); - - process_state_queue(); - process_output_queue(); - } - - return 0; -} - -static void daemons_stop(void) -{ - kthread_stop(recv_task); - kthread_stop(send_task); -} - -static int daemons_start(void) -{ - struct task_struct *p; int error; - - p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); - error = IS_ERR(p); + recv_workqueue = create_workqueue("dlm_recv"); + error = IS_ERR(recv_workqueue); if (error) { - log_print("can't start dlm_recvd %d", error); + log_print("can't start dlm_recv %d", error); return error; } - recv_task = p; - p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); - error = IS_ERR(p); + send_workqueue = create_singlethread_workqueue("dlm_send"); + error = IS_ERR(send_workqueue); if (error) { - log_print("can't start dlm_sendd %d", error); - kthread_stop(recv_task); + log_print("can't start dlm_send %d", error); + destroy_workqueue(recv_workqueue); return error; } - send_task = p; return 0; } @@ -1086,7 +934,7 @@ void dlm_lowcomms_stop(void) connections[i]->flags |= 0xFF; } - daemons_stop(); + work_stop(); clean_writequeues(); for (i = 0; i < conn_array_size; i++) { @@ -1138,7 +986,7 @@ int dlm_lowcomms_start(void) if (error) goto fail_unlisten; - error = daemons_start(); + error = work_start(); if (error) goto fail_unlisten; |