summaryrefslogtreecommitdiff
path: root/fs/direct-io.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/direct-io.c')
-rw-r--r--fs/direct-io.c323
1 files changed, 144 insertions, 179 deletions
diff --git a/fs/direct-io.c b/fs/direct-io.c
index 5981e17f46f0..d9d0833444f5 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -27,6 +27,7 @@
#include <linux/slab.h>
#include <linux/highmem.h>
#include <linux/pagemap.h>
+#include <linux/task_io_accounting_ops.h>
#include <linux/bio.h>
#include <linux/wait.h>
#include <linux/err.h>
@@ -121,8 +122,7 @@ struct dio {
/* BIO completion state */
spinlock_t bio_lock; /* protects BIO fields below */
- int bio_count; /* nr bios to be completed */
- int bios_in_flight; /* nr bios in flight */
+ unsigned long refcount; /* direct_io_worker() and bios */
struct bio *bio_list; /* singly linked via bi_private */
struct task_struct *waiter; /* waiting task (NULL if none) */
@@ -209,76 +209,55 @@ static struct page *dio_get_page(struct dio *dio)
return dio->pages[dio->head++];
}
-/*
- * Called when all DIO BIO I/O has been completed - let the filesystem
- * know, if it registered an interest earlier via get_block. Pass the
- * private field of the map buffer_head so that filesystems can use it
- * to hold additional state between get_block calls and dio_complete.
- */
-static void dio_complete(struct dio *dio, loff_t offset, ssize_t bytes)
-{
- if (dio->end_io && dio->result)
- dio->end_io(dio->iocb, offset, bytes, dio->map_bh.b_private);
- if (dio->lock_type == DIO_LOCKING)
- /* lockdep: non-owner release */
- up_read_non_owner(&dio->inode->i_alloc_sem);
-}
-
-/*
- * Called when a BIO has been processed. If the count goes to zero then IO is
- * complete and we can signal this to the AIO layer.
+/**
+ * dio_complete() - called when all DIO BIO I/O has been completed
+ * @offset: the byte offset in the file of the completed operation
+ *
+ * This releases locks as dictated by the locking type, lets interested parties
+ * know that a DIO operation has completed, and calculates the resulting return
+ * code for the operation.
+ *
+ * It lets the filesystem know if it registered an interest earlier via
+ * get_block. Pass the private field of the map buffer_head so that
+ * filesystems can use it to hold additional state between get_block calls and
+ * dio_complete.
*/
-static void finished_one_bio(struct dio *dio)
+static int dio_complete(struct dio *dio, loff_t offset, int ret)
{
- unsigned long flags;
+ ssize_t transferred = 0;
- spin_lock_irqsave(&dio->bio_lock, flags);
- if (dio->bio_count == 1) {
- if (dio->is_async) {
- ssize_t transferred;
- loff_t offset;
-
- /*
- * Last reference to the dio is going away.
- * Drop spinlock and complete the DIO.
- */
- spin_unlock_irqrestore(&dio->bio_lock, flags);
+ /*
+ * AIO submission can race with bio completion to get here while
+ * expecting to have the last io completed by bio completion.
+ * In that case -EIOCBQUEUED is in fact not an error we want
+ * to preserve through this call.
+ */
+ if (ret == -EIOCBQUEUED)
+ ret = 0;
- /* Check for short read case */
- transferred = dio->result;
- offset = dio->iocb->ki_pos;
+ if (dio->result) {
+ transferred = dio->result;
- if ((dio->rw == READ) &&
- ((offset + transferred) > dio->i_size))
- transferred = dio->i_size - offset;
+ /* Check for short read case */
+ if ((dio->rw == READ) && ((offset + transferred) > dio->i_size))
+ transferred = dio->i_size - offset;
+ }
- /* check for error in completion path */
- if (dio->io_error)
- transferred = dio->io_error;
+ if (dio->end_io && dio->result)
+ dio->end_io(dio->iocb, offset, transferred,
+ dio->map_bh.b_private);
+ if (dio->lock_type == DIO_LOCKING)
+ /* lockdep: non-owner release */
+ up_read_non_owner(&dio->inode->i_alloc_sem);
- dio_complete(dio, offset, transferred);
+ if (ret == 0)
+ ret = dio->page_errors;
+ if (ret == 0)
+ ret = dio->io_error;
+ if (ret == 0)
+ ret = transferred;
- /* Complete AIO later if falling back to buffered i/o */
- if (dio->result == dio->size ||
- ((dio->rw == READ) && dio->result)) {
- aio_complete(dio->iocb, transferred, 0);
- kfree(dio);
- return;
- } else {
- /*
- * Falling back to buffered
- */
- spin_lock_irqsave(&dio->bio_lock, flags);
- dio->bio_count--;
- if (dio->waiter)
- wake_up_process(dio->waiter);
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- return;
- }
- }
- }
- dio->bio_count--;
- spin_unlock_irqrestore(&dio->bio_lock, flags);
+ return ret;
}
static int dio_bio_complete(struct dio *dio, struct bio *bio);
@@ -288,12 +267,27 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio);
static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
{
struct dio *dio = bio->bi_private;
+ unsigned long remaining;
+ unsigned long flags;
if (bio->bi_size)
return 1;
/* cleanup the bio */
dio_bio_complete(dio, bio);
+
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ remaining = --dio->refcount;
+ if (remaining == 1 && dio->waiter)
+ wake_up_process(dio->waiter);
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
+
+ if (remaining == 0) {
+ int ret = dio_complete(dio, dio->iocb->ki_pos, 0);
+ aio_complete(dio->iocb, ret, 0);
+ kfree(dio);
+ }
+
return 0;
}
@@ -315,8 +309,7 @@ static int dio_bio_end_io(struct bio *bio, unsigned int bytes_done, int error)
spin_lock_irqsave(&dio->bio_lock, flags);
bio->bi_private = dio->bio_list;
dio->bio_list = bio;
- dio->bios_in_flight--;
- if (dio->waiter && dio->bios_in_flight == 0)
+ if (--dio->refcount == 1 && dio->waiter)
wake_up_process(dio->waiter);
spin_unlock_irqrestore(&dio->bio_lock, flags);
return 0;
@@ -347,6 +340,8 @@ dio_bio_alloc(struct dio *dio, struct block_device *bdev,
* In the AIO read case we speculatively dirty the pages before starting IO.
* During IO completion, any of these pages which happen to have been written
* back will be redirtied by bio_check_pages_dirty().
+ *
+ * bios hold a dio reference between submit_bio and ->end_io.
*/
static void dio_bio_submit(struct dio *dio)
{
@@ -354,12 +349,14 @@ static void dio_bio_submit(struct dio *dio)
unsigned long flags;
bio->bi_private = dio;
+
spin_lock_irqsave(&dio->bio_lock, flags);
- dio->bio_count++;
- dio->bios_in_flight++;
+ dio->refcount++;
spin_unlock_irqrestore(&dio->bio_lock, flags);
+
if (dio->is_async && dio->rw == READ)
bio_set_pages_dirty(bio);
+
submit_bio(dio->rw, bio);
dio->bio = NULL;
@@ -376,28 +373,37 @@ static void dio_cleanup(struct dio *dio)
}
/*
- * Wait for the next BIO to complete. Remove it and return it.
+ * Wait for the next BIO to complete. Remove it and return it. NULL is
+ * returned once all BIOs have been completed. This must only be called once
+ * all bios have been issued so that dio->refcount can only decrease. This
+ * requires that that the caller hold a reference on the dio.
*/
static struct bio *dio_await_one(struct dio *dio)
{
unsigned long flags;
- struct bio *bio;
+ struct bio *bio = NULL;
spin_lock_irqsave(&dio->bio_lock, flags);
- while (dio->bio_list == NULL) {
- set_current_state(TASK_UNINTERRUPTIBLE);
- if (dio->bio_list == NULL) {
- dio->waiter = current;
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- blk_run_address_space(dio->inode->i_mapping);
- io_schedule();
- spin_lock_irqsave(&dio->bio_lock, flags);
- dio->waiter = NULL;
- }
- set_current_state(TASK_RUNNING);
+
+ /*
+ * Wait as long as the list is empty and there are bios in flight. bio
+ * completion drops the count, maybe adds to the list, and wakes while
+ * holding the bio_lock so we don't need set_current_state()'s barrier
+ * and can call it after testing our condition.
+ */
+ while (dio->refcount > 1 && dio->bio_list == NULL) {
+ __set_current_state(TASK_UNINTERRUPTIBLE);
+ dio->waiter = current;
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
+ io_schedule();
+ /* wake up sets us TASK_RUNNING */
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ dio->waiter = NULL;
+ }
+ if (dio->bio_list) {
+ bio = dio->bio_list;
+ dio->bio_list = bio->bi_private;
}
- bio = dio->bio_list;
- dio->bio_list = bio->bi_private;
spin_unlock_irqrestore(&dio->bio_lock, flags);
return bio;
}
@@ -426,34 +432,24 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio)
}
bio_put(bio);
}
- finished_one_bio(dio);
return uptodate ? 0 : -EIO;
}
/*
- * Wait on and process all in-flight BIOs.
+ * Wait on and process all in-flight BIOs. This must only be called once
+ * all bios have been issued so that the refcount can only decrease.
+ * This just waits for all bios to make it through dio_bio_complete. IO
+ * errors are propogated through dio->io_error and should be propogated via
+ * dio_complete().
*/
-static int dio_await_completion(struct dio *dio)
+static void dio_await_completion(struct dio *dio)
{
- int ret = 0;
-
- if (dio->bio)
- dio_bio_submit(dio);
-
- /*
- * The bio_lock is not held for the read of bio_count.
- * This is ok since it is the dio_bio_complete() that changes
- * bio_count.
- */
- while (dio->bio_count) {
- struct bio *bio = dio_await_one(dio);
- int ret2;
-
- ret2 = dio_bio_complete(dio, bio);
- if (ret == 0)
- ret = ret2;
- }
- return ret;
+ struct bio *bio;
+ do {
+ bio = dio_await_one(dio);
+ if (bio)
+ dio_bio_complete(dio, bio);
+ } while (bio);
}
/*
@@ -675,6 +671,13 @@ submit_page_section(struct dio *dio, struct page *page,
{
int ret = 0;
+ if (dio->rw & WRITE) {
+ /*
+ * Read accounting is performed in submit_bio()
+ */
+ task_io_account_write(len);
+ }
+
/*
* Can we just grow the current page's presence in the dio?
*/
@@ -953,6 +956,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
struct dio *dio)
{
unsigned long user_addr;
+ unsigned long flags;
int seg;
ssize_t ret = 0;
ssize_t ret2;
@@ -983,17 +987,8 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
dio->iocb = iocb;
dio->i_size = i_size_read(inode);
- /*
- * BIO completion state.
- *
- * ->bio_count starts out at one, and we decrement it to zero after all
- * BIOs are submitted. This to avoid the situation where a really fast
- * (or synchronous) device could take the count to zero while we're
- * still submitting BIOs.
- */
- dio->bio_count = 1;
- dio->bios_in_flight = 0;
spin_lock_init(&dio->bio_lock);
+ dio->refcount = 1;
dio->bio_list = NULL;
dio->waiter = NULL;
@@ -1069,6 +1064,9 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
if (dio->bio)
dio_bio_submit(dio);
+ /* All IO is now issued, send it on its way */
+ blk_run_address_space(inode->i_mapping);
+
/*
* It is possible that, we return short IO due to end of file.
* In that case, we need to release all the pages we got hold on.
@@ -1084,74 +1082,41 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
mutex_unlock(&dio->inode->i_mutex);
/*
- * OK, all BIOs are submitted, so we can decrement bio_count to truly
- * reflect the number of to-be-processed BIOs.
+ * The only time we want to leave bios in flight is when a successful
+ * partial aio read or full aio write have been setup. In that case
+ * bio completion will call aio_complete. The only time it's safe to
+ * call aio_complete is when we return -EIOCBQUEUED, so we key on that.
+ * This had *better* be the only place that raises -EIOCBQUEUED.
*/
- if (dio->is_async) {
- int should_wait = 0;
+ BUG_ON(ret == -EIOCBQUEUED);
+ if (dio->is_async && ret == 0 && dio->result &&
+ ((rw & READ) || (dio->result == dio->size)))
+ ret = -EIOCBQUEUED;
- if (dio->result < dio->size && (rw & WRITE)) {
- dio->waiter = current;
- should_wait = 1;
- }
- if (ret == 0)
- ret = dio->result;
- finished_one_bio(dio); /* This can free the dio */
- blk_run_address_space(inode->i_mapping);
- if (should_wait) {
- unsigned long flags;
- /*
- * Wait for already issued I/O to drain out and
- * release its references to user-space pages
- * before returning to fallback on buffered I/O
- */
-
- spin_lock_irqsave(&dio->bio_lock, flags);
- set_current_state(TASK_UNINTERRUPTIBLE);
- while (dio->bio_count) {
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- io_schedule();
- spin_lock_irqsave(&dio->bio_lock, flags);
- set_current_state(TASK_UNINTERRUPTIBLE);
- }
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- set_current_state(TASK_RUNNING);
- kfree(dio);
- }
- } else {
- ssize_t transferred = 0;
-
- finished_one_bio(dio);
- ret2 = dio_await_completion(dio);
- if (ret == 0)
- ret = ret2;
- if (ret == 0)
- ret = dio->page_errors;
- if (dio->result) {
- loff_t i_size = i_size_read(inode);
-
- transferred = dio->result;
- /*
- * Adjust the return value if the read crossed a
- * non-block-aligned EOF.
- */
- if (rw == READ && (offset + transferred > i_size))
- transferred = i_size - offset;
- }
- dio_complete(dio, offset, transferred);
- if (ret == 0)
- ret = transferred;
+ if (ret != -EIOCBQUEUED)
+ dio_await_completion(dio);
- /* We could have also come here on an AIO file extend */
- if (!is_sync_kiocb(iocb) && (rw & WRITE) &&
- ret >= 0 && dio->result == dio->size)
- /*
- * For AIO writes where we have completed the
- * i/o, we have to mark the the aio complete.
- */
- aio_complete(iocb, ret, 0);
+ /*
+ * Sync will always be dropping the final ref and completing the
+ * operation. AIO can if it was a broken operation described above or
+ * in fact if all the bios race to complete before we get here. In
+ * that case dio_complete() translates the EIOCBQUEUED into the proper
+ * return code that the caller will hand to aio_complete().
+ *
+ * This is managed by the bio_lock instead of being an atomic_t so that
+ * completion paths can drop their ref and use the remaining count to
+ * decide to wake the submission path atomically.
+ */
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ ret2 = --dio->refcount;
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
+ BUG_ON(!dio->is_async && ret2 != 0);
+ if (ret2 == 0) {
+ ret = dio_complete(dio, offset, ret);
kfree(dio);
- }
+ } else
+ BUG_ON(ret != -EIOCBQUEUED);
+
return ret;
}