summaryrefslogtreecommitdiff
path: root/drivers/block/drbd
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-08-14 09:10:21 -0600
committerLinus Torvalds <torvalds@linux-foundation.org>2014-08-14 09:10:21 -0600
commitd429a3639ca967ce2f35e3e8d4e70caec7149ded (patch)
treecad1e5602551b6a744f63ef062de2c2e21cfe39a /drivers/block/drbd
parent4a319a490ca59a746b3d36768c0e29ee19832366 (diff)
parent99d540018caa920b7a54e2d3048f1dff530b294b (diff)
Merge branch 'for-3.17/drivers' of git://git.kernel.dk/linux-block
Pull block driver changes from Jens Axboe: "Nothing out of the ordinary here, this pull request contains: - A big round of fixes for bcache from Kent Overstreet, Slava Pestov, and Surbhi Palande. No new features, just a lot of fixes. - The usual round of drbd updates from Andreas Gruenbacher, Lars Ellenberg, and Philipp Reisner. - virtio_blk was converted to blk-mq back in 3.13, but now Ming Lei has taken it one step further and added support for actually using more than one queue. - Addition of an explicit SG_FLAG_Q_AT_HEAD for block/bsg, to compliment the the default behavior of adding to the tail of the queue. From Douglas Gilbert" * 'for-3.17/drivers' of git://git.kernel.dk/linux-block: (86 commits) bcache: Drop unneeded blk_sync_queue() calls bcache: add mutex lock for bch_is_open bcache: Correct printing of btree_gc_max_duration_ms bcache: try to set b->parent properly bcache: fix memory corruption in init error path bcache: fix crash with incomplete cache set bcache: Fix more early shutdown bugs bcache: fix use-after-free in btree_gc_coalesce() bcache: Fix an infinite loop in journal replay bcache: fix crash in bcache_btree_node_alloc_fail tracepoint bcache: bcache_write tracepoint was crashing bcache: fix typo in bch_bkey_equal_header bcache: Allocate bounce buffers with GFP_NOWAIT bcache: Make sure to pass GFP_WAIT to mempool_alloc() bcache: fix uninterruptible sleep in writeback thread bcache: wait for buckets when allocating new btree root bcache: fix crash on shutdown in passthrough mode bcache: fix lockdep warnings on shutdown bcache allocator: send discards with correct size bcache: Fix to remove the rcu_sched stalls. ...
Diffstat (limited to 'drivers/block/drbd')
-rw-r--r--drivers/block/drbd/Makefile1
-rw-r--r--drivers/block/drbd/drbd_actlog.c518
-rw-r--r--drivers/block/drbd/drbd_bitmap.c150
-rw-r--r--drivers/block/drbd/drbd_debugfs.c958
-rw-r--r--drivers/block/drbd/drbd_debugfs.h39
-rw-r--r--drivers/block/drbd/drbd_int.h383
-rw-r--r--drivers/block/drbd/drbd_interval.h4
-rw-r--r--drivers/block/drbd/drbd_main.c302
-rw-r--r--drivers/block/drbd/drbd_nl.c110
-rw-r--r--drivers/block/drbd/drbd_proc.c125
-rw-r--r--drivers/block/drbd/drbd_receiver.c316
-rw-r--r--drivers/block/drbd/drbd_req.c527
-rw-r--r--drivers/block/drbd/drbd_req.h1
-rw-r--r--drivers/block/drbd/drbd_state.c90
-rw-r--r--drivers/block/drbd/drbd_worker.c348
15 files changed, 2674 insertions, 1198 deletions
diff --git a/drivers/block/drbd/Makefile b/drivers/block/drbd/Makefile
index 8b450338075e..4464e353c1e8 100644
--- a/drivers/block/drbd/Makefile
+++ b/drivers/block/drbd/Makefile
@@ -3,5 +3,6 @@ drbd-y += drbd_worker.o drbd_receiver.o drbd_req.o drbd_actlog.o
drbd-y += drbd_main.o drbd_strings.o drbd_nl.o
drbd-y += drbd_interval.o drbd_state.o
drbd-y += drbd_nla.o
+drbd-$(CONFIG_DEBUG_FS) += drbd_debugfs.o
obj-$(CONFIG_BLK_DEV_DRBD) += drbd.o
diff --git a/drivers/block/drbd/drbd_actlog.c b/drivers/block/drbd/drbd_actlog.c
index 05a1780ffa85..d26a3fa63688 100644
--- a/drivers/block/drbd/drbd_actlog.c
+++ b/drivers/block/drbd/drbd_actlog.c
@@ -92,34 +92,26 @@ struct __packed al_transaction_on_disk {
__be32 context[AL_CONTEXT_PER_TRANSACTION];
};
-struct update_odbm_work {
- struct drbd_work w;
- struct drbd_device *device;
- unsigned int enr;
-};
-
-struct update_al_work {
- struct drbd_work w;
- struct drbd_device *device;
- struct completion event;
- int err;
-};
-
-
-void *drbd_md_get_buffer(struct drbd_device *device)
+void *drbd_md_get_buffer(struct drbd_device *device, const char *intent)
{
int r;
wait_event(device->misc_wait,
- (r = atomic_cmpxchg(&device->md_io_in_use, 0, 1)) == 0 ||
+ (r = atomic_cmpxchg(&device->md_io.in_use, 0, 1)) == 0 ||
device->state.disk <= D_FAILED);
- return r ? NULL : page_address(device->md_io_page);
+ if (r)
+ return NULL;
+
+ device->md_io.current_use = intent;
+ device->md_io.start_jif = jiffies;
+ device->md_io.submit_jif = device->md_io.start_jif - 1;
+ return page_address(device->md_io.page);
}
void drbd_md_put_buffer(struct drbd_device *device)
{
- if (atomic_dec_and_test(&device->md_io_in_use))
+ if (atomic_dec_and_test(&device->md_io.in_use))
wake_up(&device->misc_wait);
}
@@ -145,10 +137,11 @@ void wait_until_done_or_force_detached(struct drbd_device *device, struct drbd_b
static int _drbd_md_sync_page_io(struct drbd_device *device,
struct drbd_backing_dev *bdev,
- struct page *page, sector_t sector,
- int rw, int size)
+ sector_t sector, int rw)
{
struct bio *bio;
+ /* we do all our meta data IO in aligned 4k blocks. */
+ const int size = 4096;
int err;
device->md_io.done = 0;
@@ -156,15 +149,15 @@ static int _drbd_md_sync_page_io(struct drbd_device *device,
if ((rw & WRITE) && !test_bit(MD_NO_FUA, &device->flags))
rw |= REQ_FUA | REQ_FLUSH;
- rw |= REQ_SYNC;
+ rw |= REQ_SYNC | REQ_NOIDLE;
bio = bio_alloc_drbd(GFP_NOIO);
bio->bi_bdev = bdev->md_bdev;
bio->bi_iter.bi_sector = sector;
err = -EIO;
- if (bio_add_page(bio, page, size, 0) != size)
+ if (bio_add_page(bio, device->md_io.page, size, 0) != size)
goto out;
- bio->bi_private = &device->md_io;
+ bio->bi_private = device;
bio->bi_end_io = drbd_md_io_complete;
bio->bi_rw = rw;
@@ -179,7 +172,8 @@ static int _drbd_md_sync_page_io(struct drbd_device *device,
}
bio_get(bio); /* one bio_put() is in the completion handler */
- atomic_inc(&device->md_io_in_use); /* drbd_md_put_buffer() is in the completion handler */
+ atomic_inc(&device->md_io.in_use); /* drbd_md_put_buffer() is in the completion handler */
+ device->md_io.submit_jif = jiffies;
if (drbd_insert_fault(device, (rw & WRITE) ? DRBD_FAULT_MD_WR : DRBD_FAULT_MD_RD))
bio_endio(bio, -EIO);
else
@@ -197,9 +191,7 @@ int drbd_md_sync_page_io(struct drbd_device *device, struct drbd_backing_dev *bd
sector_t sector, int rw)
{
int err;
- struct page *iop = device->md_io_page;
-
- D_ASSERT(device, atomic_read(&device->md_io_in_use) == 1);
+ D_ASSERT(device, atomic_read(&device->md_io.in_use) == 1);
BUG_ON(!bdev->md_bdev);
@@ -214,8 +206,7 @@ int drbd_md_sync_page_io(struct drbd_device *device, struct drbd_backing_dev *bd
current->comm, current->pid, __func__,
(unsigned long long)sector, (rw & WRITE) ? "WRITE" : "READ");
- /* we do all our meta data IO in aligned 4k blocks. */
- err = _drbd_md_sync_page_io(device, bdev, iop, sector, rw, 4096);
+ err = _drbd_md_sync_page_io(device, bdev, sector, rw);
if (err) {
drbd_err(device, "drbd_md_sync_page_io(,%llus,%s) failed with error %d\n",
(unsigned long long)sector, (rw & WRITE) ? "WRITE" : "READ", err);
@@ -297,26 +288,12 @@ bool drbd_al_begin_io_prepare(struct drbd_device *device, struct drbd_interval *
return need_transaction;
}
-static int al_write_transaction(struct drbd_device *device, bool delegate);
-
-/* When called through generic_make_request(), we must delegate
- * activity log I/O to the worker thread: a further request
- * submitted via generic_make_request() within the same task
- * would be queued on current->bio_list, and would only start
- * after this function returns (see generic_make_request()).
- *
- * However, if we *are* the worker, we must not delegate to ourselves.
- */
+static int al_write_transaction(struct drbd_device *device);
-/*
- * @delegate: delegate activity log I/O to the worker thread
- */
-void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate)
+void drbd_al_begin_io_commit(struct drbd_device *device)
{
bool locked = false;
- BUG_ON(delegate && current == first_peer_device(device)->connection->worker.task);
-
/* Serialize multiple transactions.
* This uses test_and_set_bit, memory barrier is implicit.
*/
@@ -335,7 +312,7 @@ void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate)
rcu_read_unlock();
if (write_al_updates)
- al_write_transaction(device, delegate);
+ al_write_transaction(device);
spin_lock_irq(&device->al_lock);
/* FIXME
if (err)
@@ -352,12 +329,10 @@ void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate)
/*
* @delegate: delegate activity log I/O to the worker thread
*/
-void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i, bool delegate)
+void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i)
{
- BUG_ON(delegate && current == first_peer_device(device)->connection->worker.task);
-
if (drbd_al_begin_io_prepare(device, i))
- drbd_al_begin_io_commit(device, delegate);
+ drbd_al_begin_io_commit(device);
}
int drbd_al_begin_io_nonblock(struct drbd_device *device, struct drbd_interval *i)
@@ -380,8 +355,19 @@ int drbd_al_begin_io_nonblock(struct drbd_device *device, struct drbd_interval *
/* We want all necessary updates for a given request within the same transaction
* We could first check how many updates are *actually* needed,
* and use that instead of the worst-case nr_al_extents */
- if (available_update_slots < nr_al_extents)
- return -EWOULDBLOCK;
+ if (available_update_slots < nr_al_extents) {
+ /* Too many activity log extents are currently "hot".
+ *
+ * If we have accumulated pending changes already,
+ * we made progress.
+ *
+ * If we cannot get even a single pending change through,
+ * stop the fast path until we made some progress,
+ * or requests to "cold" extents could be starved. */
+ if (!al->pending_changes)
+ __set_bit(__LC_STARVING, &device->act_log->flags);
+ return -ENOBUFS;
+ }
/* Is resync active in this area? */
for (enr = first; enr <= last; enr++) {
@@ -452,15 +438,6 @@ static unsigned int al_extent_to_bm_page(unsigned int al_enr)
(AL_EXTENT_SHIFT - BM_BLOCK_SHIFT));
}
-static unsigned int rs_extent_to_bm_page(unsigned int rs_enr)
-{
- return rs_enr >>
- /* bit to page */
- ((PAGE_SHIFT + 3) -
- /* resync extent number to bit */
- (BM_EXT_SHIFT - BM_BLOCK_SHIFT));
-}
-
static sector_t al_tr_number_to_on_disk_sector(struct drbd_device *device)
{
const unsigned int stripes = device->ldev->md.al_stripes;
@@ -479,8 +456,7 @@ static sector_t al_tr_number_to_on_disk_sector(struct drbd_device *device)
return device->ldev->md.md_offset + device->ldev->md.al_offset + t;
}
-static int
-_al_write_transaction(struct drbd_device *device)
+int al_write_transaction(struct drbd_device *device)
{
struct al_transaction_on_disk *buffer;
struct lc_element *e;
@@ -505,7 +481,8 @@ _al_write_transaction(struct drbd_device *device)
return -EIO;
}
- buffer = drbd_md_get_buffer(device); /* protects md_io_buffer, al_tr_cycle, ... */
+ /* protects md_io_buffer, al_tr_cycle, ... */
+ buffer = drbd_md_get_buffer(device, __func__);
if (!buffer) {
drbd_err(device, "disk failed while waiting for md_io buffer\n");
put_ldev(device);
@@ -590,38 +567,6 @@ _al_write_transaction(struct drbd_device *device)
return err;
}
-
-static int w_al_write_transaction(struct drbd_work *w, int unused)
-{
- struct update_al_work *aw = container_of(w, struct update_al_work, w);
- struct drbd_device *device = aw->device;
- int err;
-
- err = _al_write_transaction(device);
- aw->err = err;
- complete(&aw->event);
-
- return err != -EIO ? err : 0;
-}
-
-/* Calls from worker context (see w_restart_disk_io()) need to write the
- transaction directly. Others came through generic_make_request(),
- those need to delegate it to the worker. */
-static int al_write_transaction(struct drbd_device *device, bool delegate)
-{
- if (delegate) {
- struct update_al_work al_work;
- init_completion(&al_work.event);
- al_work.w.cb = w_al_write_transaction;
- al_work.device = device;
- drbd_queue_work_front(&first_peer_device(device)->connection->sender_work,
- &al_work.w);
- wait_for_completion(&al_work.event);
- return al_work.err;
- } else
- return _al_write_transaction(device);
-}
-
static int _try_lc_del(struct drbd_device *device, struct lc_element *al_ext)
{
int rv;
@@ -682,72 +627,56 @@ int drbd_initialize_al(struct drbd_device *device, void *buffer)
return 0;
}
-static int w_update_odbm(struct drbd_work *w, int unused)
-{
- struct update_odbm_work *udw = container_of(w, struct update_odbm_work, w);
- struct drbd_device *device = udw->device;
- struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
-
- if (!get_ldev(device)) {
- if (__ratelimit(&drbd_ratelimit_state))
- drbd_warn(device, "Can not update on disk bitmap, local IO disabled.\n");
- kfree(udw);
- return 0;
- }
-
- drbd_bm_write_page(device, rs_extent_to_bm_page(udw->enr));
- put_ldev(device);
-
- kfree(udw);
-
- if (drbd_bm_total_weight(device) <= device->rs_failed) {
- switch (device->state.conn) {
- case C_SYNC_SOURCE: case C_SYNC_TARGET:
- case C_PAUSED_SYNC_S: case C_PAUSED_SYNC_T:
- drbd_resync_finished(device);
- default:
- /* nothing to do */
- break;
- }
- }
- drbd_bcast_event(device, &sib);
-
- return 0;
-}
-
+static const char *drbd_change_sync_fname[] = {
+ [RECORD_RS_FAILED] = "drbd_rs_failed_io",
+ [SET_IN_SYNC] = "drbd_set_in_sync",
+ [SET_OUT_OF_SYNC] = "drbd_set_out_of_sync"
+};
/* ATTENTION. The AL's extents are 4MB each, while the extents in the
* resync LRU-cache are 16MB each.
* The caller of this function has to hold an get_ldev() reference.
*
+ * Adjusts the caching members ->rs_left (success) or ->rs_failed (!success),
+ * potentially pulling in (and recounting the corresponding bits)
+ * this resync extent into the resync extent lru cache.
+ *
+ * Returns whether all bits have been cleared for this resync extent,
+ * precisely: (rs_left <= rs_failed)
+ *
* TODO will be obsoleted once we have a caching lru of the on disk bitmap
*/
-static void drbd_try_clear_on_disk_bm(struct drbd_device *device, sector_t sector,
- int count, int success)
+static bool update_rs_extent(struct drbd_device *device,
+ unsigned int enr, int count,
+ enum update_sync_bits_mode mode)
{
struct lc_element *e;
- struct update_odbm_work *udw;
-
- unsigned int enr;
D_ASSERT(device, atomic_read(&device->local_cnt));
- /* I simply assume that a sector/size pair never crosses
- * a 16 MB extent border. (Currently this is true...) */
- enr = BM_SECT_TO_EXT(sector);
-
- e = lc_get(device->resync, enr);
+ /* When setting out-of-sync bits,
+ * we don't need it cached (lc_find).
+ * But if it is present in the cache,
+ * we should update the cached bit count.
+ * Otherwise, that extent should be in the resync extent lru cache
+ * already -- or we want to pull it in if necessary -- (lc_get),
+ * then update and check rs_left and rs_failed. */
+ if (mode == SET_OUT_OF_SYNC)
+ e = lc_find(device->resync, enr);
+ else
+ e = lc_get(device->resync, enr);
if (e) {
struct bm_extent *ext = lc_entry(e, struct bm_extent, lce);
if (ext->lce.lc_number == enr) {
- if (success)
+ if (mode == SET_IN_SYNC)
ext->rs_left -= count;
+ else if (mode == SET_OUT_OF_SYNC)
+ ext->rs_left += count;
else
ext->rs_failed += count;
if (ext->rs_left < ext->rs_failed) {
- drbd_warn(device, "BAD! sector=%llus enr=%u rs_left=%d "
+ drbd_warn(device, "BAD! enr=%u rs_left=%d "
"rs_failed=%d count=%d cstate=%s\n",
- (unsigned long long)sector,
ext->lce.lc_number, ext->rs_left,
ext->rs_failed, count,
drbd_conn_str(device->state.conn));
@@ -781,34 +710,27 @@ static void drbd_try_clear_on_disk_bm(struct drbd_device *device, sector_t secto
ext->lce.lc_number, ext->rs_failed);
}
ext->rs_left = rs_left;
- ext->rs_failed = success ? 0 : count;
+ ext->rs_failed = (mode == RECORD_RS_FAILED) ? count : 0;
/* we don't keep a persistent log of the resync lru,
* we can commit any change right away. */
lc_committed(device->resync);
}
- lc_put(device->resync, &ext->lce);
+ if (mode != SET_OUT_OF_SYNC)
+ lc_put(device->resync, &ext->lce);
/* no race, we are within the al_lock! */
- if (ext->rs_left == ext->rs_failed) {
+ if (ext->rs_left <= ext->rs_failed) {
ext->rs_failed = 0;
-
- udw = kmalloc(sizeof(*udw), GFP_ATOMIC);
- if (udw) {
- udw->enr = ext->lce.lc_number;
- udw->w.cb = w_update_odbm;
- udw->device = device;
- drbd_queue_work_front(&first_peer_device(device)->connection->sender_work,
- &udw->w);
- } else {
- drbd_warn(device, "Could not kmalloc an udw\n");
- }
+ return true;
}
- } else {
+ } else if (mode != SET_OUT_OF_SYNC) {
+ /* be quiet if lc_find() did not find it. */
drbd_err(device, "lc_get() failed! locked=%d/%d flags=%lu\n",
device->resync_locked,
device->resync->nr_elements,
device->resync->flags);
}
+ return false;
}
void drbd_advance_rs_marks(struct drbd_device *device, unsigned long still_to_go)
@@ -827,105 +749,105 @@ void drbd_advance_rs_marks(struct drbd_device *device, unsigned long still_to_go
}
}
-/* clear the bit corresponding to the piece of storage in question:
- * size byte of data starting from sector. Only clear a bits of the affected
- * one ore more _aligned_ BM_BLOCK_SIZE blocks.
- *
- * called by worker on C_SYNC_TARGET and receiver on SyncSource.
- *
- */
-void __drbd_set_in_sync(struct drbd_device *device, sector_t sector, int size,
- const char *file, const unsigned int line)
+/* It is called lazy update, so don't do write-out too often. */
+static bool lazy_bitmap_update_due(struct drbd_device *device)
{
- /* Is called from worker and receiver context _only_ */
- unsigned long sbnr, ebnr, lbnr;
- unsigned long count = 0;
- sector_t esector, nr_sectors;
- int wake_up = 0;
- unsigned long flags;
+ return time_after(jiffies, device->rs_last_bcast + 2*HZ);
+}
- if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) {
- drbd_err(device, "drbd_set_in_sync: sector=%llus size=%d nonsense!\n",
- (unsigned long long)sector, size);
+static void maybe_schedule_on_disk_bitmap_update(struct drbd_device *device, bool rs_done)
+{
+ if (rs_done)
+ set_bit(RS_DONE, &device->flags);
+ /* and also set RS_PROGRESS below */
+ else if (!lazy_bitmap_update_due(device))
return;
- }
-
- if (!get_ldev(device))
- return; /* no disk, no metadata, no bitmap to clear bits in */
-
- nr_sectors = drbd_get_capacity(device->this_bdev);
- esector = sector + (size >> 9) - 1;
-
- if (!expect(sector < nr_sectors))
- goto out;
- if (!expect(esector < nr_sectors))
- esector = nr_sectors - 1;
-
- lbnr = BM_SECT_TO_BIT(nr_sectors-1);
-
- /* we clear it (in sync).
- * round up start sector, round down end sector. we make sure we only
- * clear full, aligned, BM_BLOCK_SIZE (4K) blocks */
- if (unlikely(esector < BM_SECT_PER_BIT-1))
- goto out;
- if (unlikely(esector == (nr_sectors-1)))
- ebnr = lbnr;
- else
- ebnr = BM_SECT_TO_BIT(esector - (BM_SECT_PER_BIT-1));
- sbnr = BM_SECT_TO_BIT(sector + BM_SECT_PER_BIT-1);
- if (sbnr > ebnr)
- goto out;
+ drbd_device_post_work(device, RS_PROGRESS);
+}
+static int update_sync_bits(struct drbd_device *device,
+ unsigned long sbnr, unsigned long ebnr,
+ enum update_sync_bits_mode mode)
+{
/*
- * ok, (capacity & 7) != 0 sometimes, but who cares...
- * we count rs_{total,left} in bits, not sectors.
+ * We keep a count of set bits per resync-extent in the ->rs_left
+ * caching member, so we need to loop and work within the resync extent
+ * alignment. Typically this loop will execute exactly once.
*/
- count = drbd_bm_clear_bits(device, sbnr, ebnr);
- if (count) {
- drbd_advance_rs_marks(device, drbd_bm_total_weight(device));
- spin_lock_irqsave(&device->al_lock, flags);
- drbd_try_clear_on_disk_bm(device, sector, count, true);
- spin_unlock_irqrestore(&device->al_lock, flags);
-
- /* just wake_up unconditional now, various lc_chaged(),
- * lc_put() in drbd_try_clear_on_disk_bm(). */
- wake_up = 1;
+ unsigned long flags;
+ unsigned long count = 0;
+ unsigned int cleared = 0;
+ while (sbnr <= ebnr) {
+ /* set temporary boundary bit number to last bit number within
+ * the resync extent of the current start bit number,
+ * but cap at provided end bit number */
+ unsigned long tbnr = min(ebnr, sbnr | BM_BLOCKS_PER_BM_EXT_MASK);
+ unsigned long c;
+
+ if (mode == RECORD_RS_FAILED)
+ /* Only called from drbd_rs_failed_io(), bits
+ * supposedly still set. Recount, maybe some
+ * of the bits have been successfully cleared
+ * by application IO meanwhile.
+ */
+ c = drbd_bm_count_bits(device, sbnr, tbnr);
+ else if (mode == SET_IN_SYNC)
+ c = drbd_bm_clear_bits(device, sbnr, tbnr);
+ else /* if (mode == SET_OUT_OF_SYNC) */
+ c = drbd_bm_set_bits(device, sbnr, tbnr);
+
+ if (c) {
+ spin_lock_irqsave(&device->al_lock, flags);
+ cleared += update_rs_extent(device, BM_BIT_TO_EXT(sbnr), c, mode);
+ spin_unlock_irqrestore(&device->al_lock, flags);
+ count += c;
+ }
+ sbnr = tbnr + 1;
}
-out:
- put_ldev(device);
- if (wake_up)
+ if (count) {
+ if (mode == SET_IN_SYNC) {
+ unsigned long still_to_go = drbd_bm_total_weight(device);
+ bool rs_is_done = (still_to_go <= device->rs_failed);
+ drbd_advance_rs_marks(device, still_to_go);
+ if (cleared || rs_is_done)
+ maybe_schedule_on_disk_bitmap_update(device, rs_is_done);
+ } else if (mode == RECORD_RS_FAILED)
+ device->rs_failed += count;
wake_up(&device->al_wait);
+ }
+ return count;
}
-/*
- * this is intended to set one request worth of data out of sync.
- * affects at least 1 bit,
- * and at most 1+DRBD_MAX_BIO_SIZE/BM_BLOCK_SIZE bits.
+/* clear the bit corresponding to the piece of storage in question:
+ * size byte of data starting from sector. Only clear a bits of the affected
+ * one ore more _aligned_ BM_BLOCK_SIZE blocks.
+ *
+ * called by worker on C_SYNC_TARGET and receiver on SyncSource.
*
- * called by tl_clear and drbd_send_dblock (==drbd_make_request).
- * so this can be _any_ process.
*/
-int __drbd_set_out_of_sync(struct drbd_device *device, sector_t sector, int size,
- const char *file, const unsigned int line)
+int __drbd_change_sync(struct drbd_device *device, sector_t sector, int size,
+ enum update_sync_bits_mode mode,
+ const char *file, const unsigned int line)
{
- unsigned long sbnr, ebnr, flags;
+ /* Is called from worker and receiver context _only_ */
+ unsigned long sbnr, ebnr, lbnr;
+ unsigned long count = 0;
sector_t esector, nr_sectors;
- unsigned int enr, count = 0;
- struct lc_element *e;
- /* this should be an empty REQ_FLUSH */
- if (size == 0)
+ /* This would be an empty REQ_FLUSH, be silent. */
+ if ((mode == SET_OUT_OF_SYNC) && size == 0)
return 0;
- if (size < 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) {
- drbd_err(device, "sector: %llus, size: %d\n",
- (unsigned long long)sector, size);
+ if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) {
+ drbd_err(device, "%s: sector=%llus size=%d nonsense!\n",
+ drbd_change_sync_fname[mode],
+ (unsigned long long)sector, size);
return 0;
}
if (!get_ldev(device))
- return 0; /* no disk, no metadata, no bitmap to set bits in */
+ return 0; /* no disk, no metadata, no bitmap to manipulate bits in */
nr_sectors = drbd_get_capacity(device->this_bdev);
esector = sector + (size >> 9) - 1;
@@ -935,25 +857,28 @@ int __drbd_set_out_of_sync(struct drbd_device *device, sector_t sector, int size
if (!expect(esector < nr_sectors))
esector = nr_sectors - 1;
- /* we set it out of sync,
- * we do not need to round anything here */
- sbnr = BM_SECT_TO_BIT(sector);
- ebnr = BM_SECT_TO_BIT(esector);
-
- /* ok, (capacity & 7) != 0 sometimes, but who cares...
- * we count rs_{total,left} in bits, not sectors. */
- spin_lock_irqsave(&device->al_lock, flags);
- count = drbd_bm_set_bits(device, sbnr, ebnr);
+ lbnr = BM_SECT_TO_BIT(nr_sectors-1);
- enr = BM_SECT_TO_EXT(sector);
- e = lc_find(device->resync, enr);
- if (e)
- lc_entry(e, struct bm_extent, lce)->rs_left += count;
- spin_unlock_irqrestore(&device->al_lock, flags);
+ if (mode == SET_IN_SYNC) {
+ /* Round up start sector, round down end sector. We make sure
+ * we only clear full, aligned, BM_BLOCK_SIZE blocks. */
+ if (unlikely(esector < BM_SECT_PER_BIT-1))
+ goto out;
+ if (unlikely(esector == (nr_sectors-1)))
+ ebnr = lbnr;
+ else
+ ebnr = BM_SECT_TO_BIT(esector - (BM_SECT_PER_BIT-1));
+ sbnr = BM_SECT_TO_BIT(sector + BM_SECT_PER_BIT-1);
+ } else {
+ /* We set it out of sync, or record resync failure.
+ * Should not round anything here. */
+ sbnr = BM_SECT_TO_BIT(sector);
+ ebnr = BM_SECT_TO_BIT(esector);
+ }
+ count = update_sync_bits(device, sbnr, ebnr, mode);
out:
put_ldev(device);
-
return count;
}
@@ -1075,6 +1000,15 @@ int drbd_try_rs_begin_io(struct drbd_device *device, sector_t sector)
struct lc_element *e;
struct bm_extent *bm_ext;
int i;
+ bool throttle = drbd_rs_should_slow_down(device, sector, true);
+
+ /* If we need to throttle, a half-locked (only marked BME_NO_WRITES,
+ * not yet BME_LOCKED) extent needs to be kicked out explicitly if we
+ * need to throttle. There is at most one such half-locked extent,
+ * which is remembered in resync_wenr. */
+
+ if (throttle && device->resync_wenr != enr)
+ return -EAGAIN;
spin_lock_irq(&device->al_lock);
if (device->resync_wenr != LC_FREE && device->resync_wenr != enr) {
@@ -1098,8 +1032,10 @@ int drbd_try_rs_begin_io(struct drbd_device *device, sector_t sector)
D_ASSERT(device, test_bit(BME_NO_WRITES, &bm_ext->flags));
clear_bit(BME_NO_WRITES, &bm_ext->flags);
device->resync_wenr = LC_FREE;
- if (lc_put(device->resync, &bm_ext->lce) == 0)
+ if (lc_put(device->resync, &bm_ext->lce) == 0) {
+ bm_ext->flags = 0;
device->resync_locked--;
+ }
wake_up(&device->al_wait);
} else {
drbd_alert(device, "LOGIC BUG\n");
@@ -1161,8 +1097,20 @@ proceed:
return 0;
try_again:
- if (bm_ext)
- device->resync_wenr = enr;
+ if (bm_ext) {
+ if (throttle) {
+ D_ASSERT(device, !test_bit(BME_LOCKED, &bm_ext->flags));
+ D_ASSERT(device, test_bit(BME_NO_WRITES, &bm_ext->flags));
+ clear_bit(BME_NO_WRITES, &bm_ext->flags);
+ device->resync_wenr = LC_FREE;
+ if (lc_put(device->resync, &bm_ext->lce) == 0) {
+ bm_ext->flags = 0;
+ device->resync_locked--;
+ }
+ wake_up(&device->al_wait);
+ } else
+ device->resync_wenr = enr;
+ }
spin_unlock_irq(&device->al_lock);
return -EAGAIN;
}
@@ -1270,69 +1218,3 @@ int drbd_rs_del_all(struct drbd_device *device)
return 0;
}
-
-/**
- * drbd_rs_failed_io() - Record information on a failure to resync the specified blocks
- * @device: DRBD device.
- * @sector: The sector number.
- * @size: Size of failed IO operation, in byte.
- */
-void drbd_rs_failed_io(struct drbd_device *device, sector_t sector, int size)
-{
- /* Is called from worker and receiver context _only_ */
- unsigned long sbnr, ebnr, lbnr;
- unsigned long count;
- sector_t esector, nr_sectors;
- int wake_up = 0;
-
- if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) {
- drbd_err(device, "drbd_rs_failed_io: sector=%llus size=%d nonsense!\n",
- (unsigned long long)sector, size);
- return;
- }
- nr_sectors = drbd_get_capacity(device->this_bdev);
- esector = sector + (size >> 9) - 1;
-
- if (!expect(sector < nr_sectors))
- return;
- if (!expect(esector < nr_sectors))
- esector = nr_sectors - 1;
-
- lbnr = BM_SECT_TO_BIT(nr_sectors-1);
-
- /*
- * round up start sector, round down end sector. we make sure we only
- * handle full, aligned, BM_BLOCK_SIZE (4K) blocks */
- if (unlikely(esector < BM_SECT_PER_BIT-1))
- return;
- if (unlikely(esector == (nr_sectors-1)))
- ebnr = lbnr;
- else
- ebnr = BM_SECT_TO_BIT(esector - (BM_SECT_PER_BIT-1));
- sbnr = BM_SECT_TO_BIT(sector + BM_SECT_PER_BIT-1);
-
- if (sbnr > ebnr)
- return;
-
- /*
- * ok, (capacity & 7) != 0 sometimes, but who cares...
- * we count rs_{total,left} in bits, not sectors.
- */
- spin_lock_irq(&device->al_lock);
- count = drbd_bm_count_bits(device, sbnr, ebnr);
- if (count) {
- device->rs_failed += count;
-
- if (get_ldev(device)) {
- drbd_try_clear_on_disk_bm(device, sector, count, false);
- put_ldev(device);
- }
-
- /* just wake_up unconditional now, various lc_chaged(),
- * lc_put() in drbd_try_clear_on_disk_bm(). */
- wake_up = 1;
- }
- spin_unlock_irq(&device->al_lock);
- if (wake_up)
- wake_up(&device->al_wait);
-}
diff --git a/drivers/block/drbd/drbd_bitmap.c b/drivers/block/drbd/drbd_bitmap.c
index 1aa29f8fdfe1..426c97aef900 100644
--- a/drivers/block/drbd/drbd_bitmap.c
+++ b/drivers/block/drbd/drbd_bitmap.c
@@ -22,6 +22,8 @@
the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
*/
+#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
+
#include <linux/bitops.h>
#include <linux/vmalloc.h>
#include <linux/string.h>
@@ -353,9 +355,8 @@ static void bm_free_pages(struct page **pages, unsigned long number)
for (i = 0; i < number; i++) {
if (!pages[i]) {
- printk(KERN_ALERT "drbd: bm_free_pages tried to free "
- "a NULL pointer; i=%lu n=%lu\n",
- i, number);
+ pr_alert("bm_free_pages tried to free a NULL pointer; i=%lu n=%lu\n",
+ i, number);
continue;
}
__free_page(pages[i]);
@@ -592,7 +593,7 @@ static void bm_memset(struct drbd_bitmap *b, size_t offset, int c, size_t len)
end = offset + len;
if (end > b->bm_words) {
- printk(KERN_ALERT "drbd: bm_memset end > bm_words\n");
+ pr_alert("bm_memset end > bm_words\n");
return;
}
@@ -602,7 +603,7 @@ static void bm_memset(struct drbd_bitmap *b, size_t offset, int c, size_t len)
p_addr = bm_map_pidx(b, idx);
bm = p_addr + MLPP(offset);
if (bm+do_now > p_addr + LWPP) {
- printk(KERN_ALERT "drbd: BUG BUG BUG! p_addr:%p bm:%p do_now:%d\n",
+ pr_alert("BUG BUG BUG! p_addr:%p bm:%p do_now:%d\n",
p_addr, bm, (int)do_now);
} else
memset(bm, c, do_now * sizeof(long));
@@ -927,22 +928,14 @@ void drbd_bm_clear_all(struct drbd_device *device)
spin_unlock_irq(&b->bm_lock);
}
-struct bm_aio_ctx {
- struct drbd_device *device;
- atomic_t in_flight;
- unsigned int done;
- unsigned flags;
-#define BM_AIO_COPY_PAGES 1
-#define BM_AIO_WRITE_HINTED 2
-#define BM_WRITE_ALL_PAGES 4
- int error;
- struct kref kref;
-};
-
-static void bm_aio_ctx_destroy(struct kref *kref)
+static void drbd_bm_aio_ctx_destroy(struct kref *kref)
{
- struct bm_aio_ctx *ctx = container_of(kref, struct bm_aio_ctx, kref);
+ struct drbd_bm_aio_ctx *ctx = container_of(kref, struct drbd_bm_aio_ctx, kref);
+ unsigned long flags;
+ spin_lock_irqsave(&ctx->device->resource->req_lock, flags);
+ list_del(&ctx->list);
+ spin_unlock_irqrestore(&ctx->device->resource->req_lock, flags);
put_ldev(ctx->device);
kfree(ctx);
}
@@ -950,7 +943,7 @@ static void bm_aio_ctx_destroy(struct kref *kref)
/* bv_page may be a copy, or may be the original */
static void bm_async_io_complete(struct bio *bio, int error)
{
- struct bm_aio_ctx *ctx = bio->bi_private;
+ struct drbd_bm_aio_ctx *ctx = bio->bi_private;
struct drbd_device *device = ctx->device;
struct drbd_bitmap *b = device->bitmap;
unsigned int idx = bm_page_to_idx(bio->bi_io_vec[0].bv_page);
@@ -993,17 +986,18 @@ static void bm_async_io_complete(struct bio *bio, int error)
if (atomic_dec_and_test(&ctx->in_flight)) {
ctx->done = 1;
wake_up(&device->misc_wait);
- kref_put(&ctx->kref, &bm_aio_ctx_destroy);
+ kref_put(&ctx->kref, &drbd_bm_aio_ctx_destroy);
}
}
-static void bm_page_io_async(struct bm_aio_ctx *ctx, int page_nr, int rw) __must_hold(local)
+static void bm_page_io_async(struct drbd_bm_aio_ctx *ctx, int page_nr) __must_hold(local)
{
struct bio *bio = bio_alloc_drbd(GFP_NOIO);
struct drbd_device *device = ctx->device;
struct drbd_bitmap *b = device->bitmap;
struct page *page;
unsigned int len;
+ unsigned int rw = (ctx->flags & BM_AIO_READ) ? READ : WRITE;
sector_t on_disk_sector =
device->ldev->md.md_offset + device->ldev->md.bm_offset;
@@ -1049,9 +1043,9 @@ static void bm_page_io_async(struct bm_aio_ctx *ctx, int page_nr, int rw) __must
/*
* bm_rw: read/write the whole bitmap from/to its on disk location.
*/
-static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned lazy_writeout_upper_idx) __must_hold(local)
+static int bm_rw(struct drbd_device *device, const unsigned int flags, unsigned lazy_writeout_upper_idx) __must_hold(local)
{
- struct bm_aio_ctx *ctx;
+ struct drbd_bm_aio_ctx *ctx;
struct drbd_bitmap *b = device->bitmap;
int num_pages, i, count = 0;
unsigned long now;
@@ -1067,12 +1061,13 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la
* as we submit copies of pages anyways.
*/
- ctx = kmalloc(sizeof(struct bm_aio_ctx), GFP_NOIO);
+ ctx = kmalloc(sizeof(struct drbd_bm_aio_ctx), GFP_NOIO);
if (!ctx)
return -ENOMEM;
- *ctx = (struct bm_aio_ctx) {
+ *ctx = (struct drbd_bm_aio_ctx) {
.device = device,
+ .start_jif = jiffies,
.in_flight = ATOMIC_INIT(1),
.done = 0,
.flags = flags,
@@ -1080,15 +1075,21 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la
.kref = { ATOMIC_INIT(2) },
};
- if (!get_ldev_if_state(device, D_ATTACHING)) { /* put is in bm_aio_ctx_destroy() */
+ if (!get_ldev_if_state(device, D_ATTACHING)) { /* put is in drbd_bm_aio_ctx_destroy() */
drbd_err(device, "ASSERT FAILED: get_ldev_if_state() == 1 in bm_rw()\n");
kfree(ctx);
return -ENODEV;
}
+ /* Here D_ATTACHING is sufficient since drbd_bm_read() is called only from
+ drbd_adm_attach(), after device->ldev was assigned. */
- if (!ctx->flags)
+ if (0 == (ctx->flags & ~BM_AIO_READ))
WARN_ON(!(BM_LOCKED_MASK & b->bm_flags));
+ spin_lock_irq(&device->resource->req_lock);
+ list_add_tail(&ctx->list, &device->pending_bitmap_io);
+ spin_unlock_irq(&device->resource->req_lock);
+
num_pages = b->bm_number_of_pages;
now = jiffies;
@@ -1098,13 +1099,13 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la
/* ignore completely unchanged pages */
if (lazy_writeout_upper_idx && i == lazy_writeout_upper_idx)
break;
- if (rw & WRITE) {
+ if (!(flags & BM_AIO_READ)) {
if ((flags & BM_AIO_WRITE_HINTED) &&
!test_and_clear_bit(BM_PAGE_HINT_WRITEOUT,
&page_private(b->bm_pages[i])))
continue;
- if (!(flags & BM_WRITE_ALL_PAGES) &&
+ if (!(flags & BM_AIO_WRITE_ALL_PAGES) &&
bm_test_page_unchanged(b->bm_pages[i])) {
dynamic_drbd_dbg(device, "skipped bm write for idx %u\n", i);
continue;
@@ -1118,7 +1119,7 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la
}
}
atomic_inc(&ctx->in_flight);
- bm_page_io_async(ctx, i, rw);
+ bm_page_io_async(ctx, i);
++count;
cond_resched();
}
@@ -1134,12 +1135,12 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la
if (!atomic_dec_and_test(&ctx->in_flight))
wait_until_done_or_force_detached(device, device->ldev, &ctx->done);
else
- kref_put(&ctx->kref, &bm_aio_ctx_destroy);
+ kref_put(&ctx->kref, &drbd_bm_aio_ctx_destroy);
/* summary for global bitmap IO */
if (flags == 0)
drbd_info(device, "bitmap %s of %u pages took %lu jiffies\n",
- rw == WRITE ? "WRITE" : "READ",
+ (flags & BM_AIO_READ) ? "READ" : "WRITE",
count, jiffies - now);
if (ctx->error) {
@@ -1152,20 +1153,18 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la
err = -EIO; /* Disk timeout/force-detach during IO... */
now = jiffies;
- if (rw == WRITE) {
- drbd_md_flush(device);
- } else /* rw == READ */ {
+ if (flags & BM_AIO_READ) {
b->bm_set = bm_count_bits(b);
drbd_info(device, "recounting of set bits took additional %lu jiffies\n",
jiffies - now);
}
now = b->bm_set;
- if (flags == 0)
+ if ((flags & ~BM_AIO_READ) == 0)
drbd_info(device, "%s (%lu bits) marked out-of-sync by on disk bit-map.\n",
ppsize(ppb, now << (BM_BLOCK_SHIFT-10)), now);
- kref_put(&ctx->kref, &bm_aio_ctx_destroy);
+ kref_put(&ctx->kref, &drbd_bm_aio_ctx_destroy);
return err;
}
@@ -1175,7 +1174,7 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la
*/
int drbd_bm_read(struct drbd_device *device) __must_hold(local)
{
- return bm_rw(device, READ, 0, 0);
+ return bm_rw(device, BM_AIO_READ, 0);
}
/**
@@ -1186,7 +1185,7 @@ int drbd_bm_read(struct drbd_device *device) __must_hold(local)
*/
int drbd_bm_write(struct drbd_device *device) __must_hold(local)
{
- return bm_rw(device, WRITE, 0, 0);
+ return bm_rw(device, 0, 0);
}
/**
@@ -1197,7 +1196,17 @@ int drbd_bm_write(struct drbd_device *device) __must_hold(local)
*/
int drbd_bm_write_all(struct drbd_device *device) __must_hold(local)
{
- return bm_rw(device, WRITE, BM_WRITE_ALL_PAGES, 0);
+ return bm_rw(device, BM_AIO_WRITE_ALL_PAGES, 0);
+}
+
+/**
+ * drbd_bm_write_lazy() - Write bitmap pages 0 to @upper_idx-1, if they have changed.
+ * @device: DRBD device.
+ * @upper_idx: 0: write all changed pages; +ve: page index to stop scanning for changed pages
+ */
+int drbd_bm_write_lazy(struct drbd_device *device, unsigned upper_idx) __must_hold(local)
+{
+ return bm_rw(device, BM_AIO_COPY_PAGES, upper_idx);
}
/**
@@ -1213,7 +1222,7 @@ int drbd_bm_write_all(struct drbd_device *device) __must_hold(local)
*/
int drbd_bm_write_copy_pages(struct drbd_device *device) __must_hold(local)
{
- return bm_rw(device, WRITE, BM_AIO_COPY_PAGES, 0);
+ return bm_rw(device, BM_AIO_COPY_PAGES, 0);
}
/**
@@ -1222,62 +1231,7 @@ int drbd_bm_write_copy_pages(struct drbd_device *device) __must_hold(local)
*/
int drbd_bm_write_hinted(struct drbd_device *device) __must_hold(local)
{
- return bm_rw(device, WRITE, BM_AIO_WRITE_HINTED | BM_AIO_COPY_PAGES, 0);
-}
-
-/**
- * drbd_bm_write_page() - Writes a PAGE_SIZE aligned piece of bitmap
- * @device: DRBD device.
- * @idx: bitmap page index
- *
- * We don't want to special case on logical_block_size of the backend device,
- * so we submit PAGE_SIZE aligned pieces.
- * Note that on "most" systems, PAGE_SIZE is 4k.
- *
- * In case this becomes an issue on systems with larger PAGE_SIZE,
- * we may want to change this again to write 4k aligned 4k pieces.
- */
-int drbd_bm_write_page(struct drbd_device *device, unsigned int idx) __must_hold(local)
-{
- struct bm_aio_ctx *ctx;
- int err;
-
- if (bm_test_page_unchanged(device->bitmap->bm_pages[idx])) {
- dynamic_drbd_dbg(device, "skipped bm page write for idx %u\n", idx);
- return 0;
- }
-
- ctx = kmalloc(sizeof(struct bm_aio_ctx), GFP_NOIO);
- if (!ctx)
- return -ENOMEM;
-
- *ctx = (struct bm_aio_ctx) {
- .device = device,
- .in_flight = ATOMIC_INIT(1),
- .done = 0,
- .flags = BM_AIO_COPY_PAGES,
- .error = 0,
- .kref = { ATOMIC_INIT(2) },
- };
-
- if (!get_ldev_if_state(device, D_ATTACHING)) { /* put is in bm_aio_ctx_destroy() */
- drbd_err(device, "ASSERT FAILED: get_ldev_if_state() == 1 in drbd_bm_write_page()\n");
- kfree(ctx);
- return -ENODEV;
- }
-
- bm_page_io_async(ctx, idx, WRITE_SYNC);
- wait_until_done_or_force_detached(device, device->ldev, &ctx->done);
-
- if (ctx->error)
- drbd_chk_io_error(device, 1, DRBD_META_IO_ERROR);
- /* that causes us to detach, so the in memory bitmap will be
- * gone in a moment as well. */
-
- device->bm_writ_cnt++;
- err = atomic_read(&ctx->in_flight) ? -EIO : ctx->error;
- kref_put(&ctx->kref, &bm_aio_ctx_destroy);
- return err;
+ return bm_rw(device, BM_AIO_WRITE_HINTED | BM_AIO_COPY_PAGES, 0);
}
/* NOTE
diff --git a/drivers/block/drbd/drbd_debugfs.c b/drivers/block/drbd/drbd_debugfs.c
new file mode 100644
index 000000000000..5c20b18540b8
--- /dev/null
+++ b/drivers/block/drbd/drbd_debugfs.c
@@ -0,0 +1,958 @@
+#define pr_fmt(fmt) "drbd debugfs: " fmt
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/debugfs.h>
+#include <linux/seq_file.h>
+#include <linux/stat.h>
+#include <linux/jiffies.h>
+#include <linux/list.h>
+
+#include "drbd_int.h"
+#include "drbd_req.h"
+#include "drbd_debugfs.h"
+
+
+/**********************************************************************
+ * Whenever you change the file format, remember to bump the version. *
+ **********************************************************************/
+
+static struct dentry *drbd_debugfs_root;
+static struct dentry *drbd_debugfs_version;
+static struct dentry *drbd_debugfs_resources;
+static struct dentry *drbd_debugfs_minors;
+
+static void seq_print_age_or_dash(struct seq_file *m, bool valid, unsigned long dt)
+{
+ if (valid)
+ seq_printf(m, "\t%d", jiffies_to_msecs(dt));
+ else
+ seq_printf(m, "\t-");
+}
+
+static void __seq_print_rq_state_bit(struct seq_file *m,
+ bool is_set, char *sep, const char *set_name, const char *unset_name)
+{
+ if (is_set && set_name) {
+ seq_putc(m, *sep);
+ seq_puts(m, set_name);
+ *sep = '|';
+ } else if (!is_set && unset_name) {
+ seq_putc(m, *sep);
+ seq_puts(m, unset_name);
+ *sep = '|';
+ }
+}
+
+static void seq_print_rq_state_bit(struct seq_file *m,
+ bool is_set, char *sep, const char *set_name)
+{
+ __seq_print_rq_state_bit(m, is_set, sep, set_name, NULL);
+}
+
+/* pretty print enum drbd_req_state_bits req->rq_state */
+static void seq_print_request_state(struct seq_file *m, struct drbd_request *req)
+{
+ unsigned int s = req->rq_state;
+ char sep = ' ';
+ seq_printf(m, "\t0x%08x", s);
+ seq_printf(m, "\tmaster: %s", req->master_bio ? "pending" : "completed");
+
+ /* RQ_WRITE ignored, already reported */
+ seq_puts(m, "\tlocal:");
+ seq_print_rq_state_bit(m, s & RQ_IN_ACT_LOG, &sep, "in-AL");
+ seq_print_rq_state_bit(m, s & RQ_POSTPONED, &sep, "postponed");
+ seq_print_rq_state_bit(m, s & RQ_COMPLETION_SUSP, &sep, "suspended");
+ sep = ' ';
+ seq_print_rq_state_bit(m, s & RQ_LOCAL_PENDING, &sep, "pending");
+ seq_print_rq_state_bit(m, s & RQ_LOCAL_COMPLETED, &sep, "completed");
+ seq_print_rq_state_bit(m, s & RQ_LOCAL_ABORTED, &sep, "aborted");
+ seq_print_rq_state_bit(m, s & RQ_LOCAL_OK, &sep, "ok");
+ if (sep == ' ')
+ seq_puts(m, " -");
+
+ /* for_each_connection ... */
+ seq_printf(m, "\tnet:");
+ sep = ' ';
+ seq_print_rq_state_bit(m, s & RQ_NET_PENDING, &sep, "pending");
+ seq_print_rq_state_bit(m, s & RQ_NET_QUEUED, &sep, "queued");
+ seq_print_rq_state_bit(m, s & RQ_NET_SENT, &sep, "sent");
+ seq_print_rq_state_bit(m, s & RQ_NET_DONE, &sep, "done");
+ seq_print_rq_state_bit(m, s & RQ_NET_SIS, &sep, "sis");
+ seq_print_rq_state_bit(m, s & RQ_NET_OK, &sep, "ok");
+ if (sep == ' ')
+ seq_puts(m, " -");
+
+ seq_printf(m, " :");
+ sep = ' ';
+ seq_print_rq_state_bit(m, s & RQ_EXP_RECEIVE_ACK, &sep, "B");
+ seq_print_rq_state_bit(m, s & RQ_EXP_WRITE_ACK, &sep, "C");
+ seq_print_rq_state_bit(m, s & RQ_EXP_BARR_ACK, &sep, "barr");
+ if (sep == ' ')
+ seq_puts(m, " -");
+ seq_printf(m, "\n");
+}
+
+static void seq_print_one_request(struct seq_file *m, struct drbd_request *req, unsigned long now)
+{
+ /* change anything here, fixup header below! */
+ unsigned int s = req->rq_state;
+
+#define RQ_HDR_1 "epoch\tsector\tsize\trw"
+ seq_printf(m, "0x%x\t%llu\t%u\t%s",
+ req->epoch,
+ (unsigned long long)req->i.sector, req->i.size >> 9,
+ (s & RQ_WRITE) ? "W" : "R");
+
+#define RQ_HDR_2 "\tstart\tin AL\tsubmit"
+ seq_printf(m, "\t%d", jiffies_to_msecs(now - req->start_jif));
+ seq_print_age_or_dash(m, s & RQ_IN_ACT_LOG, now - req->in_actlog_jif);
+ seq_print_age_or_dash(m, s & RQ_LOCAL_PENDING, now - req->pre_submit_jif);
+
+#define RQ_HDR_3 "\tsent\tacked\tdone"
+ seq_print_age_or_dash(m, s & RQ_NET_SENT, now - req->pre_send_jif);
+ seq_print_age_or_dash(m, (s & RQ_NET_SENT) && !(s & RQ_NET_PENDING), now - req->acked_jif);
+ seq_print_age_or_dash(m, s & RQ_NET_DONE, now - req->net_done_jif);
+
+#define RQ_HDR_4 "\tstate\n"
+ seq_print_request_state(m, req);
+}
+#define RQ_HDR RQ_HDR_1 RQ_HDR_2 RQ_HDR_3 RQ_HDR_4
+
+static void seq_print_minor_vnr_req(struct seq_file *m, struct drbd_request *req, unsigned long now)
+{
+ seq_printf(m, "%u\t%u\t", req->device->minor, req->device->vnr);
+ seq_print_one_request(m, req, now);
+}
+
+static void seq_print_resource_pending_meta_io(struct seq_file *m, struct drbd_resource *resource, unsigned long now)
+{
+ struct drbd_device *device;
+ unsigned int i;
+
+ seq_puts(m, "minor\tvnr\tstart\tsubmit\tintent\n");
+ rcu_read_lock();
+ idr_for_each_entry(&resource->devices, device, i) {
+ struct drbd_md_io tmp;
+ /* In theory this is racy,
+ * in the sense that there could have been a
+ * drbd_md_put_buffer(); drbd_md_get_buffer();
+ * between accessing these members here. */
+ tmp = device->md_io;
+ if (atomic_read(&tmp.in_use)) {
+ seq_printf(m, "%u\t%u\t%d\t",
+ device->minor, device->vnr,
+ jiffies_to_msecs(now - tmp.start_jif));
+ if (time_before(tmp.submit_jif, tmp.start_jif))
+ seq_puts(m, "-\t");
+ else
+ seq_printf(m, "%d\t", jiffies_to_msecs(now - tmp.submit_jif));
+ seq_printf(m, "%s\n", tmp.current_use);
+ }
+ }
+ rcu_read_unlock();
+}
+
+static void seq_print_waiting_for_AL(struct seq_file *m, struct drbd_resource *resource, unsigned long now)
+{
+ struct drbd_device *device;
+ unsigned int i;
+
+ seq_puts(m, "minor\tvnr\tage\t#waiting\n");
+ rcu_read_lock();
+ idr_for_each_entry(&resource->devices, device, i) {
+ unsigned long jif;
+ struct drbd_request *req;
+ int n = atomic_read(&device->ap_actlog_cnt);
+ if (n) {
+ spin_lock_irq(&device->resource->req_lock);
+ req = list_first_entry_or_null(&device->pending_master_completion[1],
+ struct drbd_request, req_pending_master_completion);
+ /* if the oldest request does not wait for the activity log
+ * it is not interesting for us here */
+ if (req && !(req->rq_state & RQ_IN_ACT_LOG))
+ jif = req->start_jif;
+ else
+ req = NULL;
+ spin_unlock_irq(&device->resource->req_lock);
+ }
+ if (n) {
+ seq_printf(m, "%u\t%u\t", device->minor, device->vnr);
+ if (req)
+ seq_printf(m, "%u\t", jiffies_to_msecs(now - jif));
+ else
+ seq_puts(m, "-\t");
+ seq_printf(m, "%u\n", n);
+ }
+ }
+ rcu_read_unlock();
+}
+
+static void seq_print_device_bitmap_io(struct seq_file *m, struct drbd_device *device, unsigned long now)
+{
+ struct drbd_bm_aio_ctx *ctx;
+ unsigned long start_jif;
+ unsigned int in_flight;
+ unsigned int flags;
+ spin_lock_irq(&device->resource->req_lock);
+ ctx = list_first_entry_or_null(&device->pending_bitmap_io, struct drbd_bm_aio_ctx, list);
+ if (ctx && ctx->done)
+ ctx = NULL;
+ if (ctx) {
+ start_jif = ctx->start_jif;
+ in_flight = atomic_read(&ctx->in_flight);
+ flags = ctx->flags;
+ }
+ spin_unlock_irq(&device->resource->req_lock);
+ if (ctx) {
+ seq_printf(m, "%u\t%u\t%c\t%u\t%u\n",
+ device->minor, device->vnr,
+ (flags & BM_AIO_READ) ? 'R' : 'W',
+ jiffies_to_msecs(now - start_jif),
+ in_flight);
+ }
+}
+
+static void seq_print_resource_pending_bitmap_io(struct seq_file *m, struct drbd_resource *resource, unsigned long now)
+{
+ struct drbd_device *device;
+ unsigned int i;
+
+ seq_puts(m, "minor\tvnr\trw\tage\t#in-flight\n");
+ rcu_read_lock();
+ idr_for_each_entry(&resource->devices, device, i) {
+ seq_print_device_bitmap_io(m, device, now);
+ }
+ rcu_read_unlock();
+}
+
+/* pretty print enum peer_req->flags */
+static void seq_print_peer_request_flags(struct seq_file *m, struct drbd_peer_request *peer_req)
+{
+ unsigned long f = peer_req->flags;
+ char sep = ' ';
+
+ __seq_print_rq_state_bit(m, f & EE_SUBMITTED, &sep, "submitted", "preparing");
+ __seq_print_rq_state_bit(m, f & EE_APPLICATION, &sep, "application", "internal");
+ seq_print_rq_state_bit(m, f & EE_CALL_AL_COMPLETE_IO, &sep, "in-AL");
+ seq_print_rq_state_bit(m, f & EE_SEND_WRITE_ACK, &sep, "C");
+ seq_print_rq_state_bit(m, f & EE_MAY_SET_IN_SYNC, &sep, "set-in-sync");
+
+ if (f & EE_IS_TRIM) {
+ seq_putc(m, sep);
+ sep = '|';
+ if (f & EE_IS_TRIM_USE_ZEROOUT)
+ seq_puts(m, "zero-out");
+ else
+ seq_puts(m, "trim");
+ }
+ seq_putc(m, '\n');
+}
+
+static void seq_print_peer_request(struct seq_file *m,
+ struct drbd_device *device, struct list_head *lh,
+ unsigned long now)
+{
+ bool reported_preparing = false;
+ struct drbd_peer_request *peer_req;
+ list_for_each_entry(peer_req, lh, w.list) {
+ if (reported_preparing && !(peer_req->flags & EE_SUBMITTED))
+ continue;
+
+ if (device)
+ seq_printf(m, "%u\t%u\t", device->minor, device->vnr);
+
+ seq_printf(m, "%llu\t%u\t%c\t%u\t",
+ (unsigned long long)peer_req->i.sector, peer_req->i.size >> 9,
+ (peer_req->flags & EE_WRITE) ? 'W' : 'R',
+ jiffies_to_msecs(now - peer_req->submit_jif));
+ seq_print_peer_request_flags(m, peer_req);
+ if (peer_req->flags & EE_SUBMITTED)
+ break;
+ else
+ reported_preparing = true;
+ }
+}
+
+static void seq_print_device_peer_requests(struct seq_file *m,
+ struct drbd_device *device, unsigned long now)
+{
+ seq_puts(m, "minor\tvnr\tsector\tsize\trw\tage\tflags\n");
+ spin_lock_irq(&device->resource->req_lock);
+ seq_print_peer_request(m, device, &device->active_ee, now);
+ seq_print_peer_request(m, device, &device->read_ee, now);
+ seq_print_peer_request(m, device, &device->sync_ee, now);
+ spin_unlock_irq(&device->resource->req_lock);
+ if (test_bit(FLUSH_PENDING, &device->flags)) {
+ seq_printf(m, "%u\t%u\t-\t-\tF\t%u\tflush\n",
+ device->minor, device->vnr,
+ jiffies_to_msecs(now - device->flush_jif));
+ }
+}
+
+static void seq_print_resource_pending_peer_requests(struct seq_file *m,
+ struct drbd_resource *resource, unsigned long now)
+{
+ struct drbd_device *device;
+ unsigned int i;
+
+ rcu_read_lock();
+ idr_for_each_entry(&resource->devices, device, i) {
+ seq_print_device_peer_requests(m, device, now);
+ }
+ rcu_read_unlock();
+}
+
+static void seq_print_resource_transfer_log_summary(struct seq_file *m,
+ struct drbd_resource *resource,
+ struct drbd_connection *connection,
+ unsigned long now)
+{
+ struct drbd_request *req;
+ unsigned int count = 0;
+ unsigned int show_state = 0;
+
+ seq_puts(m, "n\tdevice\tvnr\t" RQ_HDR);
+ spin_lock_irq(&resource->req_lock);
+ list_for_each_entry(req, &connection->transfer_log, tl_requests) {
+ unsigned int tmp = 0;
+ unsigned int s;
+ ++count;
+
+ /* don't disable irq "forever" */
+ if (!(count & 0x1ff)) {
+ struct drbd_request *req_next;
+ kref_get(&req->kref);
+ spin_unlock_irq(&resource->req_lock);
+ cond_resched();
+ spin_lock_irq(&resource->req_lock);
+ req_next = list_next_entry(req, tl_requests);
+ if (kref_put(&req->kref, drbd_req_destroy))
+ req = req_next;
+ if (&req->tl_requests == &connection->transfer_log)
+ break;
+ }
+
+ s = req->rq_state;
+
+ /* This is meant to summarize timing issues, to be able to tell
+ * local disk problems from network problems.
+ * Skip requests, if we have shown an even older request with
+ * similar aspects already. */
+ if (req->master_bio == NULL)
+ tmp |= 1;
+ if ((s & RQ_LOCAL_MASK) && (s & RQ_LOCAL_PENDING))
+ tmp |= 2;
+ if (s & RQ_NET_MASK) {
+ if (!(s & RQ_NET_SENT))
+ tmp |= 4;
+ if (s & RQ_NET_PENDING)
+ tmp |= 8;
+ if (!(s & RQ_NET_DONE))
+ tmp |= 16;
+ }
+ if ((tmp & show_state) == tmp)
+ continue;
+ show_state |= tmp;
+ seq_printf(m, "%u\t", count);
+ seq_print_minor_vnr_req(m, req, now);
+ if (show_state == 0x1f)
+ break;
+ }
+ spin_unlock_irq(&resource->req_lock);
+}
+
+/* TODO: transfer_log and friends should be moved to resource */
+static int in_flight_summary_show(struct seq_file *m, void *pos)
+{
+ struct drbd_resource *resource = m->private;
+ struct drbd_connection *connection;
+ unsigned long jif = jiffies;
+
+ connection = first_connection(resource);
+ /* This does not happen, actually.
+ * But be robust and prepare for future code changes. */
+ if (!connection || !kref_get_unless_zero(&connection->kref))
+ return -ESTALE;
+
+ /* BUMP me if you change the file format/content/presentation */
+ seq_printf(m, "v: %u\n\n", 0);
+
+ seq_puts(m, "oldest bitmap IO\n");
+ seq_print_resource_pending_bitmap_io(m, resource, jif);
+ seq_putc(m, '\n');
+
+ seq_puts(m, "meta data IO\n");
+ seq_print_resource_pending_meta_io(m, resource, jif);
+ seq_putc(m, '\n');
+
+ seq_puts(m, "socket buffer stats\n");
+ /* for each connection ... once we have more than one */
+ rcu_read_lock();
+ if (connection->data.socket) {
+ /* open coded SIOCINQ, the "relevant" part */
+ struct tcp_sock *tp = tcp_sk(connection->data.socket->sk);
+ int answ = tp->rcv_nxt - tp->copied_seq;
+ seq_printf(m, "unread receive buffer: %u Byte\n", answ);
+ /* open coded SIOCOUTQ, the "relevant" part */
+ answ = tp->write_seq - tp->snd_una;
+ seq_printf(m, "unacked send buffer: %u Byte\n", answ);
+ }
+ rcu_read_unlock();
+ seq_putc(m, '\n');
+
+ seq_puts(m, "oldest peer requests\n");
+ seq_print_resource_pending_peer_requests(m, resource, jif);
+ seq_putc(m, '\n');
+
+ seq_puts(m, "application requests waiting for activity log\n");
+ seq_print_waiting_for_AL(m, resource, jif);
+ seq_putc(m, '\n');
+
+ seq_puts(m, "oldest application requests\n");
+ seq_print_resource_transfer_log_summary(m, resource, connection, jif);
+ seq_putc(m, '\n');
+
+ jif = jiffies - jif;
+ if (jif)
+ seq_printf(m, "generated in %d ms\n", jiffies_to_msecs(jif));
+ kref_put(&connection->kref, drbd_destroy_connection);
+ return 0;
+}
+
+/* simple_positive(file->f_dentry) respectively debugfs_positive(),
+ * but neither is "reachable" from here.
+ * So we have our own inline version of it above. :-( */
+static inline int debugfs_positive(struct dentry *dentry)
+{
+ return dentry->d_inode && !d_unhashed(dentry);
+}
+
+/* make sure at *open* time that the respective object won't go away. */
+static int drbd_single_open(struct file *file, int (*show)(struct seq_file *, void *),
+ void *data, struct kref *kref,
+ void (*release)(struct kref *))
+{
+ struct dentry *parent;
+ int ret = -ESTALE;
+
+ /* Are we still linked,
+ * or has debugfs_remove() already been called? */
+ parent = file->f_dentry->d_parent;
+ /* not sure if this can happen: */
+ if (!parent || !parent->d_inode)
+ goto out;
+ /* serialize with d_delete() */
+ mutex_lock(&parent->d_inode->i_mutex);
+ /* Make sure the object is still alive */
+ if (debugfs_positive(file->f_dentry)
+ && kref_get_unless_zero(kref))
+ ret = 0;
+ mutex_unlock(&parent->d_inode->i_mutex);
+ if (!ret) {
+ ret = single_open(file, show, data);
+ if (ret)
+ kref_put(kref, release);
+ }
+out:
+ return ret;
+}
+
+static int in_flight_summary_open(struct inode *inode, struct file *file)
+{
+ struct drbd_resource *resource = inode->i_private;
+ return drbd_single_open(file, in_flight_summary_show, resource,
+ &resource->kref, drbd_destroy_resource);
+}
+
+static int in_flight_summary_release(struct inode *inode, struct file *file)
+{
+ struct drbd_resource *resource = inode->i_private;
+ kref_put(&resource->kref, drbd_destroy_resource);
+ return single_release(inode, file);
+}
+
+static const struct file_operations in_flight_summary_fops = {
+ .owner = THIS_MODULE,
+ .open = in_flight_summary_open,
+ .read = seq_read,
+ .llseek = seq_lseek,
+ .release = in_flight_summary_release,
+};
+
+void drbd_debugfs_resource_add(struct drbd_resource *resource)
+{
+ struct dentry *dentry;
+ if (!drbd_debugfs_resources)
+ return;
+
+ dentry = debugfs_create_dir(resource->name, drbd_debugfs_resources);
+ if (IS_ERR_OR_NULL(dentry))
+ goto fail;
+ resource->debugfs_res = dentry;
+
+ dentry = debugfs_create_dir("volumes", resource->debugfs_res);
+ if (IS_ERR_OR_NULL(dentry))
+ goto fail;
+ resource->debugfs_res_volumes = dentry;
+
+ dentry = debugfs_create_dir("connections", resource->debugfs_res);
+ if (IS_ERR_OR_NULL(dentry))
+ goto fail;
+ resource->debugfs_res_connections = dentry;
+
+ dentry = debugfs_create_file("in_flight_summary", S_IRUSR|S_IRGRP,
+ resource->debugfs_res, resource,
+ &in_flight_summary_fops);
+ if (IS_ERR_OR_NULL(dentry))
+ goto fail;
+ resource->debugfs_res_in_flight_summary = dentry;
+ return;
+
+fail:
+ drbd_debugfs_resource_cleanup(resource);
+ drbd_err(resource, "failed to create debugfs dentry\n");
+}
+
+static void drbd_debugfs_remove(struct dentry **dp)
+{
+ debugfs_remove(*dp);
+ *dp = NULL;
+}
+
+void drbd_debugfs_resource_cleanup(struct drbd_resource *resource)
+{
+ /* it is ok to call debugfs_remove(NULL) */
+ drbd_debugfs_remove(&resource->debugfs_res_in_flight_summary);
+ drbd_debugfs_remove(&resource->debugfs_res_connections);
+ drbd_debugfs_remove(&resource->debugfs_res_volumes);
+ drbd_debugfs_remove(&resource->debugfs_res);
+}
+
+static void seq_print_one_timing_detail(struct seq_file *m,
+ const struct drbd_thread_timing_details *tdp,
+ unsigned long now)
+{
+ struct drbd_thread_timing_details td;
+ /* No locking...
+ * use temporary assignment to get at consistent data. */
+ do {
+ td = *tdp;
+ } while (td.cb_nr != tdp->cb_nr);
+ if (!td.cb_addr)
+ return;
+ seq_printf(m, "%u\t%d\t%s:%u\t%ps\n",
+ td.cb_nr,
+ jiffies_to_msecs(now - td.start_jif),
+ td.caller_fn, td.line,
+ td.cb_addr);
+}
+
+static void seq_print_timing_details(struct seq_file *m,
+ const char *title,
+ unsigned int cb_nr, struct drbd_thread_timing_details *tdp, unsigned long now)
+{
+ unsigned int start_idx;
+ unsigned int i;
+
+ seq_printf(m, "%s\n", title);
+ /* If not much is going on, this will result in natural ordering.
+ * If it is very busy, we will possibly skip events, or even see wrap
+ * arounds, which could only be avoided with locking.
+ */
+ start_idx = cb_nr % DRBD_THREAD_DETAILS_HIST;
+ for (i = start_idx; i < DRBD_THREAD_DETAILS_HIST; i++)
+ seq_print_one_timing_detail(m, tdp+i, now);
+ for (i = 0; i < start_idx; i++)
+ seq_print_one_timing_detail(m, tdp+i, now);
+}
+
+static int callback_history_show(struct seq_file *m, void *ignored)
+{
+ struct drbd_connection *connection = m->private;
+ unsigned long jif = jiffies;
+
+ /* BUMP me if you change the file format/content/presentation */
+ seq_printf(m, "v: %u\n\n", 0);
+
+ seq_puts(m, "n\tage\tcallsite\tfn\n");
+ seq_print_timing_details(m, "worker", connection->w_cb_nr, connection->w_timing_details, jif);
+ seq_print_timing_details(m, "receiver", connection->r_cb_nr, connection->r_timing_details, jif);
+ return 0;
+}
+
+static int callback_history_open(struct inode *inode, struct file *file)
+{
+ struct drbd_connection *connection = inode->i_private;
+ return drbd_single_open(file, callback_history_show, connection,
+ &connection->kref, drbd_destroy_connection);
+}
+
+static int callback_history_release(struct inode *inode, struct file *file)
+{
+ struct drbd_connection *connection = inode->i_private;
+ kref_put(&connection->kref, drbd_destroy_connection);
+ return single_release(inode, file);
+}
+
+static const struct file_operations connection_callback_history_fops = {
+ .owner = THIS_MODULE,
+ .open = callback_history_open,
+ .read = seq_read,
+ .llseek = seq_lseek,
+ .release = callback_history_release,
+};
+
+static int connection_oldest_requests_show(struct seq_file *m, void *ignored)
+{
+ struct drbd_connection *connection = m->private;
+ unsigned long now = jiffies;
+ struct drbd_request *r1, *r2;
+
+ /* BUMP me if you change the file format/content/presentation */
+ seq_printf(m, "v: %u\n\n", 0);
+
+ spin_lock_irq(&connection->resource->req_lock);
+ r1 = connection->req_next;
+ if (r1)
+ seq_print_minor_vnr_req(m, r1, now);
+ r2 = connection->req_ack_pending;
+ if (r2 && r2 != r1) {
+ r1 = r2;
+ seq_print_minor_vnr_req(m, r1, now);
+ }
+ r2 = connection->req_not_net_done;
+ if (r2 && r2 != r1)
+ seq_print_minor_vnr_req(m, r2, now);
+ spin_unlock_irq(&connection->resource->req_lock);
+ return 0;
+}
+
+static int connection_oldest_requests_open(struct inode *inode, struct file *file)
+{
+ struct drbd_connection *connection = inode->i_private;
+ return drbd_single_open(file, connection_oldest_requests_show, connection,
+ &connection->kref, drbd_destroy_connection);
+}
+
+static int connection_oldest_requests_release(struct inode *inode, struct file *file)
+{
+ struct drbd_connection *connection = inode->i_private;
+ kref_put(&connection->kref, drbd_destroy_connection);
+ return single_release(inode, file);
+}
+
+static const struct file_operations connection_oldest_requests_fops = {
+ .owner = THIS_MODULE,
+ .open = connection_oldest_requests_open,
+ .read = seq_read,
+ .llseek = seq_lseek,
+ .release = connection_oldest_requests_release,
+};
+
+void drbd_debugfs_connection_add(struct drbd_connection *connection)
+{
+ struct dentry *conns_dir = connection->resource->debugfs_res_connections;
+ struct dentry *dentry;
+ if (!conns_dir)
+ return;
+
+ /* Once we enable mutliple peers,
+ * these connections will have descriptive names.
+ * For now, it is just the one connection to the (only) "peer". */
+ dentry = debugfs_create_dir("peer", conns_dir);
+ if (IS_ERR_OR_NULL(dentry))
+ goto fail;
+ connection->debugfs_conn = dentry;
+
+ dentry = debugfs_create_file("callback_history", S_IRUSR|S_IRGRP,
+ connection->debugfs_conn, connection,
+ &connection_callback_history_fops);
+ if (IS_ERR_OR_NULL(dentry))
+ goto fail;
+ connection->debugfs_conn_callback_history = dentry;
+
+ dentry = debugfs_create_file("oldest_requests", S_IRUSR|S_IRGRP,
+ connection->debugfs_conn, connection,
+ &connection_oldest_requests_fops);
+ if (IS_ERR_OR_NULL(dentry))
+ goto fail;
+ connection->debugfs_conn_oldest_requests = dentry;
+ return;
+
+fail:
+ drbd_debugfs_connection_cleanup(connection);
+ drbd_err(connection, "failed to create debugfs dentry\n");
+}
+
+void drbd_debugfs_connection_cleanup(struct drbd_connection *connection)
+{
+ drbd_debugfs_remove(&connection->debugfs_conn_callback_history);
+ drbd_debugfs_remove(&connection->debugfs_conn_oldest_requests);
+ drbd_debugfs_remove(&connection->debugfs_conn);
+}
+
+static void resync_dump_detail(struct seq_file *m, struct lc_element *e)
+{
+ struct bm_extent *bme = lc_entry(e, struct bm_extent, lce);
+
+ seq_printf(m, "%5d %s %s %s\n", bme->rs_left,
+ test_bit(BME_NO_WRITES, &bme->flags) ? "NO_WRITES" : "---------",
+ test_bit(BME_LOCKED, &bme->flags) ? "LOCKED" : "------",
+ test_bit(BME_PRIORITY, &bme->flags) ? "PRIORITY" : "--------"
+ );
+}
+
+static int device_resync_extents_show(struct seq_file *m, void *ignored)
+{
+ struct drbd_device *device = m->private;
+
+ /* BUMP me if you change the file format/content/presentation */
+ seq_printf(m, "v: %u\n\n", 0);
+
+ if (get_ldev_if_state(device, D_FAILED)) {
+ lc_seq_printf_stats(m, device->resync);
+ lc_seq_dump_details(m, device->resync, "rs_left flags", resync_dump_detail);
+ put_ldev(device);
+ }
+ return 0;
+}
+
+static int device_act_log_extents_show(struct seq_file *m, void *ignored)
+{
+ struct drbd_device *device = m->private;
+
+ /* BUMP me if you change the file format/content/presentation */
+ seq_printf(m, "v: %u\n\n", 0);
+
+ if (get_ldev_if_state(device, D_FAILED)) {
+ lc_seq_printf_stats(m, device->act_log);
+ lc_seq_dump_details(m, device->act_log, "", NULL);
+ put_ldev(device);
+ }
+ return 0;
+}
+
+static int device_oldest_requests_show(struct seq_file *m, void *ignored)
+{
+ struct drbd_device *device = m->private;
+ struct drbd_resource *resource = device->resource;
+ unsigned long now = jiffies;
+ struct drbd_request *r1, *r2;
+ int i;
+
+ /* BUMP me if you change the file format/content/presentation */
+ seq_printf(m, "v: %u\n\n", 0);
+
+ seq_puts(m, RQ_HDR);
+ spin_lock_irq(&resource->req_lock);
+ /* WRITE, then READ */
+ for (i = 1; i >= 0; --i) {
+ r1 = list_first_entry_or_null(&device->pending_master_completion[i],
+ struct drbd_request, req_pending_master_completion);
+ r2 = list_first_entry_or_null(&device->pending_completion[i],
+ struct drbd_request, req_pending_local);
+ if (r1)
+ seq_print_one_request(m, r1, now);
+ if (r2 && r2 != r1)
+ seq_print_one_request(m, r2, now);
+ }
+ spin_unlock_irq(&resource->req_lock);
+ return 0;
+}
+
+static int device_data_gen_id_show(struct seq_file *m, void *ignored)
+{
+ struct drbd_device *device = m->private;
+ struct drbd_md *md;
+ enum drbd_uuid_index idx;
+
+ if (!get_ldev_if_state(device, D_FAILED))
+ return -ENODEV;
+
+ md = &device->ldev->md;
+ spin_lock_irq(&md->uuid_lock);
+ for (idx = UI_CURRENT; idx <= UI_HISTORY_END; idx++) {
+ seq_printf(m, "0x%016llX\n", md->uuid[idx]);
+ }
+ spin_unlock_irq(&md->uuid_lock);
+ put_ldev(device);
+ return 0;
+}
+
+#define drbd_debugfs_device_attr(name) \
+static int device_ ## name ## _open(struct inode *inode, struct file *file) \
+{ \
+ struct drbd_device *device = inode->i_private; \
+ return drbd_single_open(file, device_ ## name ## _show, device, \
+ &device->kref, drbd_destroy_device); \
+} \
+static int device_ ## name ## _release(struct inode *inode, struct file *file) \
+{ \
+ struct drbd_device *device = inode->i_private; \
+ kref_put(&device->kref, drbd_destroy_device); \
+ return single_release(inode, file); \
+} \
+static const struct file_operations device_ ## name ## _fops = { \
+ .owner = THIS_MODULE, \
+ .open = device_ ## name ## _open, \
+ .read = seq_read, \
+ .llseek = seq_lseek, \
+ .release = device_ ## name ## _release, \
+};
+
+drbd_debugfs_device_attr(oldest_requests)
+drbd_debugfs_device_attr(act_log_extents)
+drbd_debugfs_device_attr(resync_extents)
+drbd_debugfs_device_attr(data_gen_id)
+
+void drbd_debugfs_device_add(struct drbd_device *device)
+{
+ struct dentry *vols_dir = device->resource->debugfs_res_volumes;
+ char minor_buf[8]; /* MINORMASK, MINORBITS == 20; */
+ char vnr_buf[8]; /* volume number vnr is even 16 bit only; */
+ char *slink_name = NULL;
+
+ struct dentry *dentry;
+ if (!vols_dir || !drbd_debugfs_minors)
+ return;
+
+ snprintf(vnr_buf, sizeof(vnr_buf), "%u", device->vnr);
+ dentry = debugfs_create_dir(vnr_buf, vols_dir);
+ if (IS_ERR_OR_NULL(dentry))
+ goto fail;
+ device->debugfs_vol = dentry;
+
+ snprintf(minor_buf, sizeof(minor_buf), "%u", device->minor);
+ slink_name = kasprintf(GFP_KERNEL, "../resources/%s/volumes/%u",
+ device->resource->name, device->vnr);
+ if (!slink_name)
+ goto fail;
+ dentry = debugfs_create_symlink(minor_buf, drbd_debugfs_minors, slink_name);
+ kfree(slink_name);
+ slink_name = NULL;
+ if (IS_ERR_OR_NULL(dentry))
+ goto fail;
+ device->debugfs_minor = dentry;
+
+#define DCF(name) do { \
+ dentry = debugfs_create_file(#name, S_IRUSR|S_IRGRP, \
+ device->debugfs_vol, device, \
+ &device_ ## name ## _fops); \
+ if (IS_ERR_OR_NULL(dentry)) \
+ goto fail; \
+ device->debugfs_vol_ ## name = dentry; \
+ } while (0)
+
+ DCF(oldest_requests);
+ DCF(act_log_extents);
+ DCF(resync_extents);
+ DCF(data_gen_id);
+#undef DCF
+ return;
+
+fail:
+ drbd_debugfs_device_cleanup(device);
+ drbd_err(device, "failed to create debugfs entries\n");
+}
+
+void drbd_debugfs_device_cleanup(struct drbd_device *device)
+{
+ drbd_debugfs_remove(&device->debugfs_minor);
+ drbd_debugfs_remove(&device->debugfs_vol_oldest_requests);
+ drbd_debugfs_remove(&device->debugfs_vol_act_log_extents);
+ drbd_debugfs_remove(&device->debugfs_vol_resync_extents);
+ drbd_debugfs_remove(&device->debugfs_vol_data_gen_id);
+ drbd_debugfs_remove(&device->debugfs_vol);
+}
+
+void drbd_debugfs_peer_device_add(struct drbd_peer_device *peer_device)
+{
+ struct dentry *conn_dir = peer_device->connection->debugfs_conn;
+ struct dentry *dentry;
+ char vnr_buf[8];
+
+ if (!conn_dir)
+ return;
+
+ snprintf(vnr_buf, sizeof(vnr_buf), "%u", peer_device->device->vnr);
+ dentry = debugfs_create_dir(vnr_buf, conn_dir);
+ if (IS_ERR_OR_NULL(dentry))
+ goto fail;
+ peer_device->debugfs_peer_dev = dentry;
+ return;
+
+fail:
+ drbd_debugfs_peer_device_cleanup(peer_device);
+ drbd_err(peer_device, "failed to create debugfs entries\n");
+}
+
+void drbd_debugfs_peer_device_cleanup(struct drbd_peer_device *peer_device)
+{
+ drbd_debugfs_remove(&peer_device->debugfs_peer_dev);
+}
+
+static int drbd_version_show(struct seq_file *m, void *ignored)
+{
+ seq_printf(m, "# %s\n", drbd_buildtag());
+ seq_printf(m, "VERSION=%s\n", REL_VERSION);
+ seq_printf(m, "API_VERSION=%u\n", API_VERSION);
+ seq_printf(m, "PRO_VERSION_MIN=%u\n", PRO_VERSION_MIN);
+ seq_printf(m, "PRO_VERSION_MAX=%u\n", PRO_VERSION_MAX);
+ return 0;
+}
+
+static int drbd_version_open(struct inode *inode, struct file *file)
+{
+ return single_open(file, drbd_version_show, NULL);
+}
+
+static struct file_operations drbd_version_fops = {
+ .owner = THIS_MODULE,
+ .open = drbd_version_open,
+ .llseek = seq_lseek,
+ .read = seq_read,
+ .release = single_release,
+};
+
+/* not __exit, may be indirectly called
+ * from the module-load-failure path as well. */
+void drbd_debugfs_cleanup(void)
+{
+ drbd_debugfs_remove(&drbd_debugfs_resources);
+ drbd_debugfs_remove(&drbd_debugfs_minors);
+ drbd_debugfs_remove(&drbd_debugfs_version);
+ drbd_debugfs_remove(&drbd_debugfs_root);
+}
+
+int __init drbd_debugfs_init(void)
+{
+ struct dentry *dentry;
+
+ dentry = debugfs_create_dir("drbd", NULL);
+ if (IS_ERR_OR_NULL(dentry))
+ goto fail;
+ drbd_debugfs_root = dentry;
+
+ dentry = debugfs_create_file("version", 0444, drbd_debugfs_root, NULL, &drbd_version_fops);
+ if (IS_ERR_OR_NULL(dentry))
+ goto fail;
+ drbd_debugfs_version = dentry;
+
+ dentry = debugfs_create_dir("resources", drbd_debugfs_root);
+ if (IS_ERR_OR_NULL(dentry))
+ goto fail;
+ drbd_debugfs_resources = dentry;
+
+ dentry = debugfs_create_dir("minors", drbd_debugfs_root);
+ if (IS_ERR_OR_NULL(dentry))
+ goto fail;
+ drbd_debugfs_minors = dentry;
+ return 0;
+
+fail:
+ drbd_debugfs_cleanup();
+ if (dentry)
+ return PTR_ERR(dentry);
+ else
+ return -EINVAL;
+}
diff --git a/drivers/block/drbd/drbd_debugfs.h b/drivers/block/drbd/drbd_debugfs.h
new file mode 100644
index 000000000000..8bee21340dce
--- /dev/null
+++ b/drivers/block/drbd/drbd_debugfs.h
@@ -0,0 +1,39 @@
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/debugfs.h>
+
+#include "drbd_int.h"
+
+#ifdef CONFIG_DEBUG_FS
+int __init drbd_debugfs_init(void);
+void drbd_debugfs_cleanup(void);
+
+void drbd_debugfs_resource_add(struct drbd_resource *resource);
+void drbd_debugfs_resource_cleanup(struct drbd_resource *resource);
+
+void drbd_debugfs_connection_add(struct drbd_connection *connection);
+void drbd_debugfs_connection_cleanup(struct drbd_connection *connection);
+
+void drbd_debugfs_device_add(struct drbd_device *device);
+void drbd_debugfs_device_cleanup(struct drbd_device *device);
+
+void drbd_debugfs_peer_device_add(struct drbd_peer_device *peer_device);
+void drbd_debugfs_peer_device_cleanup(struct drbd_peer_device *peer_device);
+#else
+
+static inline int __init drbd_debugfs_init(void) { return -ENODEV; }
+static inline void drbd_debugfs_cleanup(void) { }
+
+static inline void drbd_debugfs_resource_add(struct drbd_resource *resource) { }
+static inline void drbd_debugfs_resource_cleanup(struct drbd_resource *resource) { }
+
+static inline void drbd_debugfs_connection_add(struct drbd_connection *connection) { }
+static inline void drbd_debugfs_connection_cleanup(struct drbd_connection *connection) { }
+
+static inline void drbd_debugfs_device_add(struct drbd_device *device) { }
+static inline void drbd_debugfs_device_cleanup(struct drbd_device *device) { }
+
+static inline void drbd_debugfs_peer_device_add(struct drbd_peer_device *peer_device) { }
+static inline void drbd_debugfs_peer_device_cleanup(struct drbd_peer_device *peer_device) { }
+
+#endif
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index a76ceb344d64..1a000016ccdf 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -317,7 +317,63 @@ struct drbd_request {
struct list_head tl_requests; /* ring list in the transfer log */
struct bio *master_bio; /* master bio pointer */
- unsigned long start_time;
+
+ /* see struct drbd_device */
+ struct list_head req_pending_master_completion;
+ struct list_head req_pending_local;
+
+ /* for generic IO accounting */
+ unsigned long start_jif;
+
+ /* for DRBD internal statistics */
+
+ /* Minimal set of time stamps to determine if we wait for activity log
+ * transactions, local disk or peer. 32 bit "jiffies" are good enough,
+ * we don't expect a DRBD request to be stalled for several month.
+ */
+
+ /* before actual request processing */
+ unsigned long in_actlog_jif;
+
+ /* local disk */
+ unsigned long pre_submit_jif;
+
+ /* per connection */
+ unsigned long pre_send_jif;
+ unsigned long acked_jif;
+ unsigned long net_done_jif;
+
+ /* Possibly even more detail to track each phase:
+ * master_completion_jif
+ * how long did it take to complete the master bio
+ * (application visible latency)
+ * allocated_jif
+ * how long the master bio was blocked until we finally allocated
+ * a tracking struct
+ * in_actlog_jif
+ * how long did we wait for activity log transactions
+ *
+ * net_queued_jif
+ * when did we finally queue it for sending
+ * pre_send_jif
+ * when did we start sending it
+ * post_send_jif
+ * how long did we block in the network stack trying to send it
+ * acked_jif
+ * when did we receive (or fake, in protocol A) a remote ACK
+ * net_done_jif
+ * when did we receive final acknowledgement (P_BARRIER_ACK),
+ * or decide, e.g. on connection loss, that we do no longer expect
+ * anything from this peer for this request.
+ *
+ * pre_submit_jif
+ * post_sub_jif
+ * when did we start submiting to the lower level device,
+ * and how long did we block in that submit function
+ * local_completion_jif
+ * how long did it take the lower level device to complete this request
+ */
+
/* once it hits 0, we may complete the master_bio */
atomic_t completion_ref;
@@ -366,6 +422,7 @@ struct drbd_peer_request {
struct drbd_interval i;
/* see comments on ee flag bits below */
unsigned long flags;
+ unsigned long submit_jif;
union {
u64 block_id;
struct digest_info *digest;
@@ -408,6 +465,17 @@ enum {
/* Is set when net_conf had two_primaries set while creating this peer_req */
__EE_IN_INTERVAL_TREE,
+
+ /* for debugfs: */
+ /* has this been submitted, or does it still wait for something else? */
+ __EE_SUBMITTED,
+
+ /* this is/was a write request */
+ __EE_WRITE,
+
+ /* this originates from application on peer
+ * (not some resync or verify or other DRBD internal request) */
+ __EE_APPLICATION,
};
#define EE_CALL_AL_COMPLETE_IO (1<<__EE_CALL_AL_COMPLETE_IO)
#define EE_MAY_SET_IN_SYNC (1<<__EE_MAY_SET_IN_SYNC)
@@ -419,6 +487,9 @@ enum {
#define EE_RESTART_REQUESTS (1<<__EE_RESTART_REQUESTS)
#define EE_SEND_WRITE_ACK (1<<__EE_SEND_WRITE_ACK)
#define EE_IN_INTERVAL_TREE (1<<__EE_IN_INTERVAL_TREE)
+#define EE_SUBMITTED (1<<__EE_SUBMITTED)
+#define EE_WRITE (1<<__EE_WRITE)
+#define EE_APPLICATION (1<<__EE_APPLICATION)
/* flag bits per device */
enum {
@@ -433,11 +504,11 @@ enum {
CONSIDER_RESYNC,
MD_NO_FUA, /* Users wants us to not use FUA/FLUSH on meta data dev */
+
SUSPEND_IO, /* suspend application io */
BITMAP_IO, /* suspend application io;
once no more io in flight, start bitmap io */
BITMAP_IO_QUEUED, /* Started bitmap IO */
- GO_DISKLESS, /* Disk is being detached, on io-error or admin request. */
WAS_IO_ERROR, /* Local disk failed, returned IO error */
WAS_READ_ERROR, /* Local disk READ failed (set additionally to the above) */
FORCE_DETACH, /* Force-detach from local disk, aborting any pending local IO */
@@ -450,6 +521,20 @@ enum {
B_RS_H_DONE, /* Before resync handler done (already executed) */
DISCARD_MY_DATA, /* discard_my_data flag per volume */
READ_BALANCE_RR,
+
+ FLUSH_PENDING, /* if set, device->flush_jif is when we submitted that flush
+ * from drbd_flush_after_epoch() */
+
+ /* cleared only after backing device related structures have been destroyed. */
+ GOING_DISKLESS, /* Disk is being detached, because of io-error, or admin request. */
+
+ /* to be used in drbd_device_post_work() */
+ GO_DISKLESS, /* tell worker to schedule cleanup before detach */
+ DESTROY_DISK, /* tell worker to close backing devices and destroy related structures. */
+ MD_SYNC, /* tell worker to call drbd_md_sync() */
+ RS_START, /* tell worker to start resync/OV */
+ RS_PROGRESS, /* tell worker that resync made significant progress */
+ RS_DONE, /* tell worker that resync is done */
};
struct drbd_bitmap; /* opaque for drbd_device */
@@ -531,6 +616,11 @@ struct drbd_backing_dev {
};
struct drbd_md_io {
+ struct page *page;
+ unsigned long start_jif; /* last call to drbd_md_get_buffer */
+ unsigned long submit_jif; /* last _drbd_md_sync_page_io() submit */
+ const char *current_use;
+ atomic_t in_use;
unsigned int done;
int error;
};
@@ -577,10 +667,18 @@ enum {
* and potentially deadlock on, this drbd worker.
*/
DISCONNECT_SENT,
+
+ DEVICE_WORK_PENDING, /* tell worker that some device has pending work */
};
struct drbd_resource {
char *name;
+#ifdef CONFIG_DEBUG_FS
+ struct dentry *debugfs_res;
+ struct dentry *debugfs_res_volumes;
+ struct dentry *debugfs_res_connections;
+ struct dentry *debugfs_res_in_flight_summary;
+#endif
struct kref kref;
struct idr devices; /* volume number to device mapping */
struct list_head connections;
@@ -594,12 +692,28 @@ struct drbd_resource {
unsigned susp_nod:1; /* IO suspended because no data */
unsigned susp_fen:1; /* IO suspended because fence peer handler runs */
+ enum write_ordering_e write_ordering;
+
cpumask_var_t cpu_mask;
};
+struct drbd_thread_timing_details
+{
+ unsigned long start_jif;
+ void *cb_addr;
+ const char *caller_fn;
+ unsigned int line;
+ unsigned int cb_nr;
+};
+
struct drbd_connection {
struct list_head connections;
struct drbd_resource *resource;
+#ifdef CONFIG_DEBUG_FS
+ struct dentry *debugfs_conn;
+ struct dentry *debugfs_conn_callback_history;
+ struct dentry *debugfs_conn_oldest_requests;
+#endif
struct kref kref;
struct idr peer_devices; /* volume number to peer device mapping */
enum drbd_conns cstate; /* Only C_STANDALONE to C_WF_REPORT_PARAMS */
@@ -636,7 +750,6 @@ struct drbd_connection {
struct drbd_epoch *current_epoch;
spinlock_t epoch_lock;
unsigned int epochs;
- enum write_ordering_e write_ordering;
atomic_t current_tle_nr; /* transfer log epoch number */
unsigned current_tle_writes; /* writes seen within this tl epoch */
@@ -645,9 +758,22 @@ struct drbd_connection {
struct drbd_thread worker;
struct drbd_thread asender;
+ /* cached pointers,
+ * so we can look up the oldest pending requests more quickly.
+ * protected by resource->req_lock */
+ struct drbd_request *req_next; /* DRBD 9: todo.req_next */
+ struct drbd_request *req_ack_pending;
+ struct drbd_request *req_not_net_done;
+
/* sender side */
struct drbd_work_queue sender_work;
+#define DRBD_THREAD_DETAILS_HIST 16
+ unsigned int w_cb_nr; /* keeps counting up */
+ unsigned int r_cb_nr; /* keeps counting up */
+ struct drbd_thread_timing_details w_timing_details[DRBD_THREAD_DETAILS_HIST];
+ struct drbd_thread_timing_details r_timing_details[DRBD_THREAD_DETAILS_HIST];
+
struct {
/* whether this sender thread
* has processed a single write yet. */
@@ -663,11 +789,22 @@ struct drbd_connection {
} send;
};
+void __update_timing_details(
+ struct drbd_thread_timing_details *tdp,
+ unsigned int *cb_nr,
+ void *cb,
+ const char *fn, const unsigned int line);
+
+#define update_worker_timing_details(c, cb) \
+ __update_timing_details(c->w_timing_details, &c->w_cb_nr, cb, __func__ , __LINE__ )
+#define update_receiver_timing_details(c, cb) \
+ __update_timing_details(c->r_timing_details, &c->r_cb_nr, cb, __func__ , __LINE__ )
+
struct submit_worker {
struct workqueue_struct *wq;
struct work_struct worker;
- spinlock_t lock;
+ /* protected by ..->resource->req_lock */
struct list_head writes;
};
@@ -675,12 +812,29 @@ struct drbd_peer_device {
struct list_head peer_devices;
struct drbd_device *device;
struct drbd_connection *connection;
+#ifdef CONFIG_DEBUG_FS
+ struct dentry *debugfs_peer_dev;
+#endif
};
struct drbd_device {
struct drbd_resource *resource;
struct list_head peer_devices;
- int vnr; /* volume number within the connection */
+ struct list_head pending_bitmap_io;
+
+ unsigned long flush_jif;
+#ifdef CONFIG_DEBUG_FS
+ struct dentry *debugfs_minor;
+ struct dentry *debugfs_vol;
+ struct dentry *debugfs_vol_oldest_requests;
+ struct dentry *debugfs_vol_act_log_extents;
+ struct dentry *debugfs_vol_resync_extents;
+ struct dentry *debugfs_vol_data_gen_id;
+#endif
+
+ unsigned int vnr; /* volume number within the connection */
+ unsigned int minor; /* device minor number */
+
struct kref kref;
/* things that are stored as / read from meta data on disk */
@@ -697,19 +851,10 @@ struct drbd_device {
unsigned long last_reattach_jif;
struct drbd_work resync_work;
struct drbd_work unplug_work;
- struct drbd_work go_diskless;
- struct drbd_work md_sync_work;
- struct drbd_work start_resync_work;
struct timer_list resync_timer;
struct timer_list md_sync_timer;
struct timer_list start_resync_timer;
struct timer_list request_timer;
-#ifdef DRBD_DEBUG_MD_SYNC
- struct {
- unsigned int line;
- const char* func;
- } last_md_mark_dirty;
-#endif
/* Used after attach while negotiating new disk state. */
union drbd_state new_state_tmp;
@@ -724,6 +869,7 @@ struct drbd_device {
unsigned int al_writ_cnt;
unsigned int bm_writ_cnt;
atomic_t ap_bio_cnt; /* Requests we need to complete */
+ atomic_t ap_actlog_cnt; /* Requests waiting for activity log */
atomic_t ap_pending_cnt; /* AP data packets on the wire, ack expected */
atomic_t rs_pending_cnt; /* RS request/data packets on the wire */
atomic_t unacked_cnt; /* Need to send replies for */
@@ -733,6 +879,13 @@ struct drbd_device {
struct rb_root read_requests;
struct rb_root write_requests;
+ /* for statistics and timeouts */
+ /* [0] read, [1] write */
+ struct list_head pending_master_completion[2];
+ struct list_head pending_completion[2];
+
+ /* use checksums for *this* resync */
+ bool use_csums;
/* blocks to resync in this run [unit BM_BLOCK_SIZE] */
unsigned long rs_total;
/* number of resync blocks that failed in this run */
@@ -788,9 +941,7 @@ struct drbd_device {
atomic_t pp_in_use; /* allocated from page pool */
atomic_t pp_in_use_by_net; /* sendpage()d, still referenced by tcp */
wait_queue_head_t ee_wait;
- struct page *md_io_page; /* one page buffer for md_io */
struct drbd_md_io md_io;
- atomic_t md_io_in_use; /* protects the md_io, md_io_page and md_io_tmpp */
spinlock_t al_lock;
wait_queue_head_t al_wait;
struct lru_cache *act_log; /* activity log */
@@ -800,7 +951,6 @@ struct drbd_device {
atomic_t packet_seq;
unsigned int peer_seq;
spinlock_t peer_seq_lock;
- unsigned int minor;
unsigned long comm_bm_set; /* communicated number of set bits. */
struct bm_io_work bm_io_work;
u64 ed_uuid; /* UUID of the exposed data */
@@ -824,6 +974,21 @@ struct drbd_device {
struct submit_worker submit;
};
+struct drbd_bm_aio_ctx {
+ struct drbd_device *device;
+ struct list_head list; /* on device->pending_bitmap_io */;
+ unsigned long start_jif;
+ atomic_t in_flight;
+ unsigned int done;
+ unsigned flags;
+#define BM_AIO_COPY_PAGES 1
+#define BM_AIO_WRITE_HINTED 2
+#define BM_AIO_WRITE_ALL_PAGES 4
+#define BM_AIO_READ 8
+ int error;
+ struct kref kref;
+};
+
struct drbd_config_context {
/* assigned from drbd_genlmsghdr */
unsigned int minor;
@@ -949,7 +1114,7 @@ extern int drbd_send_ov_request(struct drbd_peer_device *, sector_t sector, int
extern int drbd_send_bitmap(struct drbd_device *device);
extern void drbd_send_sr_reply(struct drbd_peer_device *, enum drbd_state_rv retcode);
extern void conn_send_sr_reply(struct drbd_connection *connection, enum drbd_state_rv retcode);
-extern void drbd_free_bc(struct drbd_backing_dev *ldev);
+extern void drbd_free_ldev(struct drbd_backing_dev *ldev);
extern void drbd_device_cleanup(struct drbd_device *device);
void drbd_print_uuids(struct drbd_device *device, const char *text);
@@ -966,13 +1131,7 @@ extern void __drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must
extern void drbd_md_set_flag(struct drbd_device *device, int flags) __must_hold(local);
extern void drbd_md_clear_flag(struct drbd_device *device, int flags)__must_hold(local);
extern int drbd_md_test_flag(struct drbd_backing_dev *, int);
-#ifndef DRBD_DEBUG_MD_SYNC
extern void drbd_md_mark_dirty(struct drbd_device *device);
-#else
-#define drbd_md_mark_dirty(m) drbd_md_mark_dirty_(m, __LINE__ , __func__ )
-extern void drbd_md_mark_dirty_(struct drbd_device *device,
- unsigned int line, const char *func);
-#endif
extern void drbd_queue_bitmap_io(struct drbd_device *device,
int (*io_fn)(struct drbd_device *),
void (*done)(struct drbd_device *, int),
@@ -983,9 +1142,8 @@ extern int drbd_bitmap_io(struct drbd_device *device,
extern int drbd_bitmap_io_from_worker(struct drbd_device *device,
int (*io_fn)(struct drbd_device *),
char *why, enum bm_flag flags);
-extern int drbd_bmio_set_n_write(struct drbd_device *device);
-extern int drbd_bmio_clear_n_write(struct drbd_device *device);
-extern void drbd_ldev_destroy(struct drbd_device *device);
+extern int drbd_bmio_set_n_write(struct drbd_device *device) __must_hold(local);
+extern int drbd_bmio_clear_n_write(struct drbd_device *device) __must_hold(local);
/* Meta data layout
*
@@ -1105,17 +1263,21 @@ struct bm_extent {
/* in which _bitmap_ extent (resp. sector) the bit for a certain
* _storage_ sector is located in */
#define BM_SECT_TO_EXT(x) ((x)>>(BM_EXT_SHIFT-9))
+#define BM_BIT_TO_EXT(x) ((x) >> (BM_EXT_SHIFT - BM_BLOCK_SHIFT))
-/* how much _storage_ sectors we have per bitmap sector */
+/* first storage sector a bitmap extent corresponds to */
#define BM_EXT_TO_SECT(x) ((sector_t)(x) << (BM_EXT_SHIFT-9))
+/* how much _storage_ sectors we have per bitmap extent */
#define BM_SECT_PER_EXT BM_EXT_TO_SECT(1)
+/* how many bits are covered by one bitmap extent (resync extent) */
+#define BM_BITS_PER_EXT (1UL << (BM_EXT_SHIFT - BM_BLOCK_SHIFT))
+
+#define BM_BLOCKS_PER_BM_EXT_MASK (BM_BITS_PER_EXT - 1)
+
/* in one sector of the bitmap, we have this many activity_log extents. */
#define AL_EXT_PER_BM_SECT (1 << (BM_EXT_SHIFT - AL_EXTENT_SHIFT))
-#define BM_BLOCKS_PER_BM_EXT_B (BM_EXT_SHIFT - BM_BLOCK_SHIFT)
-#define BM_BLOCKS_PER_BM_EXT_MASK ((1<<BM_BLOCKS_PER_BM_EXT_B) - 1)
-
/* the extent in "PER_EXTENT" below is an activity log extent
* we need that many (long words/bytes) to store the bitmap
* of one AL_EXTENT_SIZE chunk of storage.
@@ -1195,11 +1357,11 @@ extern void _drbd_bm_set_bits(struct drbd_device *device,
const unsigned long s, const unsigned long e);
extern int drbd_bm_test_bit(struct drbd_device *device, unsigned long bitnr);
extern int drbd_bm_e_weight(struct drbd_device *device, unsigned long enr);
-extern int drbd_bm_write_page(struct drbd_device *device, unsigned int idx) __must_hold(local);
extern int drbd_bm_read(struct drbd_device *device) __must_hold(local);
extern void drbd_bm_mark_for_writeout(struct drbd_device *device, int page_nr);
extern int drbd_bm_write(struct drbd_device *device) __must_hold(local);
extern int drbd_bm_write_hinted(struct drbd_device *device) __must_hold(local);
+extern int drbd_bm_write_lazy(struct drbd_device *device, unsigned upper_idx) __must_hold(local);
extern int drbd_bm_write_all(struct drbd_device *device) __must_hold(local);
extern int drbd_bm_write_copy_pages(struct drbd_device *device) __must_hold(local);
extern size_t drbd_bm_words(struct drbd_device *device);
@@ -1213,7 +1375,6 @@ extern unsigned long _drbd_bm_find_next(struct drbd_device *device, unsigned lon
extern unsigned long _drbd_bm_find_next_zero(struct drbd_device *device, unsigned long bm_fo);
extern unsigned long _drbd_bm_total_weight(struct drbd_device *device);
extern unsigned long drbd_bm_total_weight(struct drbd_device *device);
-extern int drbd_bm_rs_done(struct drbd_device *device);
/* for receive_bitmap */
extern void drbd_bm_merge_lel(struct drbd_device *device, size_t offset,
size_t number, unsigned long *buffer);
@@ -1312,7 +1473,7 @@ enum determine_dev_size {
extern enum determine_dev_size
drbd_determine_dev_size(struct drbd_device *, enum dds_flags, struct resize_parms *) __must_hold(local);
extern void resync_after_online_grow(struct drbd_device *);
-extern void drbd_reconsider_max_bio_size(struct drbd_device *device);
+extern void drbd_reconsider_max_bio_size(struct drbd_device *device, struct drbd_backing_dev *bdev);
extern enum drbd_state_rv drbd_set_role(struct drbd_device *device,
enum drbd_role new_role,
int force);
@@ -1333,7 +1494,7 @@ extern void resume_next_sg(struct drbd_device *device);
extern void suspend_other_sg(struct drbd_device *device);
extern int drbd_resync_finished(struct drbd_device *device);
/* maybe rather drbd_main.c ? */
-extern void *drbd_md_get_buffer(struct drbd_device *device);
+extern void *drbd_md_get_buffer(struct drbd_device *device, const char *intent);
extern void drbd_md_put_buffer(struct drbd_device *device);
extern int drbd_md_sync_page_io(struct drbd_device *device,
struct drbd_backing_dev *bdev, sector_t sector, int rw);
@@ -1380,7 +1541,8 @@ extern void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req);
extern int drbd_receiver(struct drbd_thread *thi);
extern int drbd_asender(struct drbd_thread *thi);
extern bool drbd_rs_c_min_rate_throttle(struct drbd_device *device);
-extern bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector);
+extern bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
+ bool throttle_if_app_is_waiting);
extern int drbd_submit_peer_request(struct drbd_device *,
struct drbd_peer_request *, const unsigned,
const int);
@@ -1464,10 +1626,7 @@ static inline void drbd_generic_make_request(struct drbd_device *device,
{
__release(local);
if (!bio->bi_bdev) {
- printk(KERN_ERR "drbd%d: drbd_generic_make_request: "
- "bio->bi_bdev == NULL\n",
- device_to_minor(device));
- dump_stack();
+ drbd_err(device, "drbd_generic_make_request: bio->bi_bdev == NULL\n");
bio_endio(bio, -ENODEV);
return;
}
@@ -1478,7 +1637,8 @@ static inline void drbd_generic_make_request(struct drbd_device *device,
generic_make_request(bio);
}
-void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo);
+void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
+ enum write_ordering_e wo);
/* drbd_proc.c */
extern struct proc_dir_entry *drbd_proc;
@@ -1489,9 +1649,9 @@ extern const char *drbd_role_str(enum drbd_role s);
/* drbd_actlog.c */
extern bool drbd_al_begin_io_prepare(struct drbd_device *device, struct drbd_interval *i);
extern int drbd_al_begin_io_nonblock(struct drbd_device *device, struct drbd_interval *i);
-extern void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate);
+extern void drbd_al_begin_io_commit(struct drbd_device *device);
extern bool drbd_al_begin_io_fastpath(struct drbd_device *device, struct drbd_interval *i);
-extern void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i, bool delegate);
+extern void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i);
extern void drbd_al_complete_io(struct drbd_device *device, struct drbd_interval *i);
extern void drbd_rs_complete_io(struct drbd_device *device, sector_t sector);
extern int drbd_rs_begin_io(struct drbd_device *device, sector_t sector);
@@ -1501,14 +1661,17 @@ extern int drbd_rs_del_all(struct drbd_device *device);
extern void drbd_rs_failed_io(struct drbd_device *device,
sector_t sector, int size);
extern void drbd_advance_rs_marks(struct drbd_device *device, unsigned long still_to_go);
-extern void __drbd_set_in_sync(struct drbd_device *device, sector_t sector,
- int size, const char *file, const unsigned int line);
+
+enum update_sync_bits_mode { RECORD_RS_FAILED, SET_OUT_OF_SYNC, SET_IN_SYNC };
+extern int __drbd_change_sync(struct drbd_device *device, sector_t sector, int size,
+ enum update_sync_bits_mode mode,
+ const char *file, const unsigned int line);
#define drbd_set_in_sync(device, sector, size) \
- __drbd_set_in_sync(device, sector, size, __FILE__, __LINE__)
-extern int __drbd_set_out_of_sync(struct drbd_device *device, sector_t sector,
- int size, const char *file, const unsigned int line);
+ __drbd_change_sync(device, sector, size, SET_IN_SYNC, __FILE__, __LINE__)
#define drbd_set_out_of_sync(device, sector, size) \
- __drbd_set_out_of_sync(device, sector, size, __FILE__, __LINE__)
+ __drbd_change_sync(device, sector, size, SET_OUT_OF_SYNC, __FILE__, __LINE__)
+#define drbd_rs_failed_io(device, sector, size) \
+ __drbd_change_sync(device, sector, size, RECORD_RS_FAILED, __FILE__, __LINE__)
extern void drbd_al_shrink(struct drbd_device *device);
extern int drbd_initialize_al(struct drbd_device *, void *);
@@ -1764,25 +1927,38 @@ static inline sector_t drbd_md_ss(struct drbd_backing_dev *bdev)
}
static inline void
-drbd_queue_work_front(struct drbd_work_queue *q, struct drbd_work *w)
+drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w)
{
unsigned long flags;
spin_lock_irqsave(&q->q_lock, flags);
- list_add(&w->list, &q->q);
+ list_add_tail(&w->list, &q->q);
spin_unlock_irqrestore(&q->q_lock, flags);
wake_up(&q->q_wait);
}
static inline void
-drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w)
+drbd_queue_work_if_unqueued(struct drbd_work_queue *q, struct drbd_work *w)
{
unsigned long flags;
spin_lock_irqsave(&q->q_lock, flags);
- list_add_tail(&w->list, &q->q);
+ if (list_empty_careful(&w->list))
+ list_add_tail(&w->list, &q->q);
spin_unlock_irqrestore(&q->q_lock, flags);
wake_up(&q->q_wait);
}
+static inline void
+drbd_device_post_work(struct drbd_device *device, int work_bit)
+{
+ if (!test_and_set_bit(work_bit, &device->flags)) {
+ struct drbd_connection *connection =
+ first_peer_device(device)->connection;
+ struct drbd_work_queue *q = &connection->sender_work;
+ if (!test_and_set_bit(DEVICE_WORK_PENDING, &connection->flags))
+ wake_up(&q->q_wait);
+ }
+}
+
extern void drbd_flush_workqueue(struct drbd_work_queue *work_queue);
static inline void wake_asender(struct drbd_connection *connection)
@@ -1859,7 +2035,7 @@ static inline void inc_ap_pending(struct drbd_device *device)
func, line, \
atomic_read(&device->which))
-#define dec_ap_pending(device) _dec_ap_pending(device, __FUNCTION__, __LINE__)
+#define dec_ap_pending(device) _dec_ap_pending(device, __func__, __LINE__)
static inline void _dec_ap_pending(struct drbd_device *device, const char *func, int line)
{
if (atomic_dec_and_test(&device->ap_pending_cnt))
@@ -1878,7 +2054,7 @@ static inline void inc_rs_pending(struct drbd_device *device)
atomic_inc(&device->rs_pending_cnt);
}
-#define dec_rs_pending(device) _dec_rs_pending(device, __FUNCTION__, __LINE__)
+#define dec_rs_pending(device) _dec_rs_pending(device, __func__, __LINE__)
static inline void _dec_rs_pending(struct drbd_device *device, const char *func, int line)
{
atomic_dec(&device->rs_pending_cnt);
@@ -1899,20 +2075,29 @@ static inline void inc_unacked(struct drbd_device *device)
atomic_inc(&device->unacked_cnt);
}
-#define dec_unacked(device) _dec_unacked(device, __FUNCTION__, __LINE__)
+#define dec_unacked(device) _dec_unacked(device, __func__, __LINE__)
static inline void _dec_unacked(struct drbd_device *device, const char *func, int line)
{
atomic_dec(&device->unacked_cnt);
ERR_IF_CNT_IS_NEGATIVE(unacked_cnt, func, line);
}
-#define sub_unacked(device, n) _sub_unacked(device, n, __FUNCTION__, __LINE__)
+#define sub_unacked(device, n) _sub_unacked(device, n, __func__, __LINE__)
static inline void _sub_unacked(struct drbd_device *device, int n, const char *func, int line)
{
atomic_sub(n, &device->unacked_cnt);
ERR_IF_CNT_IS_NEGATIVE(unacked_cnt, func, line);
}
+static inline bool is_sync_state(enum drbd_conns connection_state)
+{
+ return
+ (connection_state == C_SYNC_SOURCE
+ || connection_state == C_SYNC_TARGET
+ || connection_state == C_PAUSED_SYNC_S
+ || connection_state == C_PAUSED_SYNC_T);
+}
+
/**
* get_ldev() - Increase the ref count on device->ldev. Returns 0 if there is no ldev
* @M: DRBD device.
@@ -1924,6 +2109,11 @@ static inline void _sub_unacked(struct drbd_device *device, int n, const char *f
static inline void put_ldev(struct drbd_device *device)
{
+ enum drbd_disk_state ds = device->state.disk;
+ /* We must check the state *before* the atomic_dec becomes visible,
+ * or we have a theoretical race where someone hitting zero,
+ * while state still D_FAILED, will then see D_DISKLESS in the
+ * condition below and calling into destroy, where he must not, yet. */
int i = atomic_dec_return(&device->local_cnt);
/* This may be called from some endio handler,
@@ -1932,15 +2122,13 @@ static inline void put_ldev(struct drbd_device *device)
__release(local);
D_ASSERT(device, i >= 0);
if (i == 0) {
- if (device->state.disk == D_DISKLESS)
+ if (ds == D_DISKLESS)
/* even internal references gone, safe to destroy */
- drbd_ldev_destroy(device);
- if (device->state.disk == D_FAILED) {
+ drbd_device_post_work(device, DESTROY_DISK);
+ if (ds == D_FAILED)
/* all application IO references gone. */
- if (!test_and_set_bit(GO_DISKLESS, &device->flags))
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
- &device->go_diskless);
- }
+ if (!test_and_set_bit(GOING_DISKLESS, &device->flags))
+ drbd_device_post_work(device, GO_DISKLESS);
wake_up(&device->misc_wait);
}
}
@@ -1964,54 +2152,6 @@ static inline int _get_ldev_if_state(struct drbd_device *device, enum drbd_disk_
extern int _get_ldev_if_state(struct drbd_device *device, enum drbd_disk_state mins);
#endif
-/* you must have an "get_ldev" reference */
-static inline void drbd_get_syncer_progress(struct drbd_device *device,
- unsigned long *bits_left, unsigned int *per_mil_done)
-{
- /* this is to break it at compile time when we change that, in case we
- * want to support more than (1<<32) bits on a 32bit arch. */
- typecheck(unsigned long, device->rs_total);
-
- /* note: both rs_total and rs_left are in bits, i.e. in
- * units of BM_BLOCK_SIZE.
- * for the percentage, we don't care. */
-
- if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
- *bits_left = device->ov_left;
- else
- *bits_left = drbd_bm_total_weight(device) - device->rs_failed;
- /* >> 10 to prevent overflow,
- * +1 to prevent division by zero */
- if (*bits_left > device->rs_total) {
- /* doh. maybe a logic bug somewhere.
- * may also be just a race condition
- * between this and a disconnect during sync.
- * for now, just prevent in-kernel buffer overflow.
- */
- smp_rmb();
- drbd_warn(device, "cs:%s rs_left=%lu > rs_total=%lu (rs_failed %lu)\n",
- drbd_conn_str(device->state.conn),
- *bits_left, device->rs_total, device->rs_failed);
- *per_mil_done = 0;
- } else {
- /* Make sure the division happens in long context.
- * We allow up to one petabyte storage right now,
- * at a granularity of 4k per bit that is 2**38 bits.
- * After shift right and multiplication by 1000,
- * this should still fit easily into a 32bit long,
- * so we don't need a 64bit division on 32bit arch.
- * Note: currently we don't support such large bitmaps on 32bit
- * arch anyways, but no harm done to be prepared for it here.
- */
- unsigned int shift = device->rs_total > UINT_MAX ? 16 : 10;
- unsigned long left = *bits_left >> shift;
- unsigned long total = 1UL + (device->rs_total >> shift);
- unsigned long tmp = 1000UL - left * 1000UL/total;
- *per_mil_done = tmp;
- }
-}
-
-
/* this throttles on-the-fly application requests
* according to max_buffers settings;
* maybe re-implement using semaphores? */
@@ -2201,25 +2341,6 @@ static inline int drbd_queue_order_type(struct drbd_device *device)
return QUEUE_ORDERED_NONE;
}
-static inline void drbd_md_flush(struct drbd_device *device)
-{
- int r;
-
- if (device->ldev == NULL) {
- drbd_warn(device, "device->ldev == NULL in drbd_md_flush\n");
- return;
- }
-
- if (test_bit(MD_NO_FUA, &device->flags))
- return;
-
- r = blkdev_issue_flush(device->ldev->md_bdev, GFP_NOIO, NULL);
- if (r) {
- set_bit(MD_NO_FUA, &device->flags);
- drbd_err(device, "meta data flush failed with status %d, disabling md-flushes\n", r);
- }
-}
-
static inline struct drbd_connection *first_connection(struct drbd_resource *resource)
{
return list_first_entry_or_null(&resource->connections,
diff --git a/drivers/block/drbd/drbd_interval.h b/drivers/block/drbd/drbd_interval.h
index f38fcb00c10d..f210543f05f4 100644
--- a/drivers/block/drbd/drbd_interval.h
+++ b/drivers/block/drbd/drbd_interval.h
@@ -10,7 +10,9 @@ struct drbd_interval {
unsigned int size; /* size in bytes */
sector_t end; /* highest interval end in subtree */
int local:1 /* local or remote request? */;
- int waiting:1;
+ int waiting:1; /* someone is waiting for this to complete */
+ int completed:1; /* this has been completed already;
+ * ignore for conflict detection */
};
static inline void drbd_clear_interval(struct drbd_interval *i)
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 960645c26e6f..9b465bb68487 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -26,7 +26,10 @@
*/
+#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
+
#include <linux/module.h>
+#include <linux/jiffies.h>
#include <linux/drbd.h>
#include <asm/uaccess.h>
#include <asm/types.h>
@@ -54,16 +57,14 @@
#include "drbd_int.h"
#include "drbd_protocol.h"
#include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
-
#include "drbd_vli.h"
+#include "drbd_debugfs.h"
static DEFINE_MUTEX(drbd_main_mutex);
static int drbd_open(struct block_device *bdev, fmode_t mode);
static void drbd_release(struct gendisk *gd, fmode_t mode);
-static int w_md_sync(struct drbd_work *w, int unused);
static void md_sync_timer_fn(unsigned long data);
static int w_bitmap_io(struct drbd_work *w, int unused);
-static int w_go_diskless(struct drbd_work *w, int unused);
MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
"Lars Ellenberg <lars@linbit.com>");
@@ -264,7 +265,7 @@ bail:
/**
* _tl_restart() - Walks the transfer log, and applies an action to all requests
- * @device: DRBD device.
+ * @connection: DRBD connection to operate on.
* @what: The action/event to perform with all request objects
*
* @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
@@ -662,6 +663,11 @@ static int __send_command(struct drbd_connection *connection, int vnr,
msg_flags);
if (data && !err)
err = drbd_send_all(connection, sock->socket, data, size, 0);
+ /* DRBD protocol "pings" are latency critical.
+ * This is supposed to trigger tcp_push_pending_frames() */
+ if (!err && (cmd == P_PING || cmd == P_PING_ACK))
+ drbd_tcp_nodelay(sock->socket);
+
return err;
}
@@ -1636,7 +1642,10 @@ int drbd_send_dblock(struct drbd_peer_device *peer_device, struct drbd_request *
if (peer_device->connection->agreed_pro_version >= 100) {
if (req->rq_state & RQ_EXP_RECEIVE_ACK)
dp_flags |= DP_SEND_RECEIVE_ACK;
- if (req->rq_state & RQ_EXP_WRITE_ACK)
+ /* During resync, request an explicit write ack,
+ * even in protocol != C */
+ if (req->rq_state & RQ_EXP_WRITE_ACK
+ || (dp_flags & DP_MAY_SET_IN_SYNC))
dp_flags |= DP_SEND_WRITE_ACK;
}
p->dp_flags = cpu_to_be32(dp_flags);
@@ -1900,6 +1909,7 @@ void drbd_init_set_defaults(struct drbd_device *device)
drbd_set_defaults(device);
atomic_set(&device->ap_bio_cnt, 0);
+ atomic_set(&device->ap_actlog_cnt, 0);
atomic_set(&device->ap_pending_cnt, 0);
atomic_set(&device->rs_pending_cnt, 0);
atomic_set(&device->unacked_cnt, 0);
@@ -1908,7 +1918,7 @@ void drbd_init_set_defaults(struct drbd_device *device)
atomic_set(&device->rs_sect_in, 0);
atomic_set(&device->rs_sect_ev, 0);
atomic_set(&device->ap_in_flight, 0);
- atomic_set(&device->md_io_in_use, 0);
+ atomic_set(&device->md_io.in_use, 0);
mutex_init(&device->own_state_mutex);
device->state_mutex = &device->own_state_mutex;
@@ -1924,17 +1934,15 @@ void drbd_init_set_defaults(struct drbd_device *device)
INIT_LIST_HEAD(&device->resync_reads);
INIT_LIST_HEAD(&device->resync_work.list);
INIT_LIST_HEAD(&device->unplug_work.list);
- INIT_LIST_HEAD(&device->go_diskless.list);
- INIT_LIST_HEAD(&device->md_sync_work.list);
- INIT_LIST_HEAD(&device->start_resync_work.list);
INIT_LIST_HEAD(&device->bm_io_work.w.list);
+ INIT_LIST_HEAD(&device->pending_master_completion[0]);
+ INIT_LIST_HEAD(&device->pending_master_completion[1]);
+ INIT_LIST_HEAD(&device->pending_completion[0]);
+ INIT_LIST_HEAD(&device->pending_completion[1]);
device->resync_work.cb = w_resync_timer;
device->unplug_work.cb = w_send_write_hint;
- device->go_diskless.cb = w_go_diskless;
- device->md_sync_work.cb = w_md_sync;
device->bm_io_work.w.cb = w_bitmap_io;
- device->start_resync_work.cb = w_start_resync;
init_timer(&device->resync_timer);
init_timer(&device->md_sync_timer);
@@ -1992,7 +2000,7 @@ void drbd_device_cleanup(struct drbd_device *device)
drbd_bm_cleanup(device);
}
- drbd_free_bc(device->ldev);
+ drbd_free_ldev(device->ldev);
device->ldev = NULL;
clear_bit(AL_SUSPENDED, &device->flags);
@@ -2006,7 +2014,6 @@ void drbd_device_cleanup(struct drbd_device *device)
D_ASSERT(device, list_empty(&first_peer_device(device)->connection->sender_work.q));
D_ASSERT(device, list_empty(&device->resync_work.list));
D_ASSERT(device, list_empty(&device->unplug_work.list));
- D_ASSERT(device, list_empty(&device->go_diskless.list));
drbd_set_defaults(device);
}
@@ -2129,20 +2136,6 @@ Enomem:
return -ENOMEM;
}
-static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
- void *unused)
-{
- /* just so we have it. you never know what interesting things we
- * might want to do here some day...
- */
-
- return NOTIFY_DONE;
-}
-
-static struct notifier_block drbd_notifier = {
- .notifier_call = drbd_notify_sys,
-};
-
static void drbd_release_all_peer_reqs(struct drbd_device *device)
{
int rr;
@@ -2173,7 +2166,7 @@ void drbd_destroy_device(struct kref *kref)
{
struct drbd_device *device = container_of(kref, struct drbd_device, kref);
struct drbd_resource *resource = device->resource;
- struct drbd_connection *connection;
+ struct drbd_peer_device *peer_device, *tmp_peer_device;
del_timer_sync(&device->request_timer);
@@ -2187,7 +2180,7 @@ void drbd_destroy_device(struct kref *kref)
if (device->this_bdev)
bdput(device->this_bdev);
- drbd_free_bc(device->ldev);
+ drbd_free_ldev(device->ldev);
device->ldev = NULL;
drbd_release_all_peer_reqs(device);
@@ -2200,15 +2193,20 @@ void drbd_destroy_device(struct kref *kref)
if (device->bitmap) /* should no longer be there. */
drbd_bm_cleanup(device);
- __free_page(device->md_io_page);
+ __free_page(device->md_io.page);
put_disk(device->vdisk);
blk_cleanup_queue(device->rq_queue);
kfree(device->rs_plan_s);
- kfree(first_peer_device(device));
- kfree(device);
- for_each_connection(connection, resource)
- kref_put(&connection->kref, drbd_destroy_connection);
+ /* not for_each_connection(connection, resource):
+ * those may have been cleaned up and disassociated already.
+ */
+ for_each_peer_device_safe(peer_device, tmp_peer_device, device) {
+ kref_put(&peer_device->connection->kref, drbd_destroy_connection);
+ kfree(peer_device);
+ }
+ memset(device, 0xfd, sizeof(*device));
+ kfree(device);
kref_put(&resource->kref, drbd_destroy_resource);
}
@@ -2236,7 +2234,7 @@ static void do_retry(struct work_struct *ws)
list_for_each_entry_safe(req, tmp, &writes, tl_requests) {
struct drbd_device *device = req->device;
struct bio *bio = req->master_bio;
- unsigned long start_time = req->start_time;
+ unsigned long start_jif = req->start_jif;
bool expected;
expected =
@@ -2271,10 +2269,12 @@ static void do_retry(struct work_struct *ws)
/* We are not just doing generic_make_request(),
* as we want to keep the start_time information. */
inc_ap_bio(device);
- __drbd_make_request(device, bio, start_time);
+ __drbd_make_request(device, bio, start_jif);
}
}
+/* called via drbd_req_put_completion_ref(),
+ * holds resource->req_lock */
void drbd_restart_request(struct drbd_request *req)
{
unsigned long flags;
@@ -2298,6 +2298,7 @@ void drbd_destroy_resource(struct kref *kref)
idr_destroy(&resource->devices);
free_cpumask_var(resource->cpu_mask);
kfree(resource->name);
+ memset(resource, 0xf2, sizeof(*resource));
kfree(resource);
}
@@ -2307,8 +2308,10 @@ void drbd_free_resource(struct drbd_resource *resource)
for_each_connection_safe(connection, tmp, resource) {
list_del(&connection->connections);
+ drbd_debugfs_connection_cleanup(connection);
kref_put(&connection->kref, drbd_destroy_connection);
}
+ drbd_debugfs_resource_cleanup(resource);
kref_put(&resource->kref, drbd_destroy_resource);
}
@@ -2318,8 +2321,6 @@ static void drbd_cleanup(void)
struct drbd_device *device;
struct drbd_resource *resource, *tmp;
- unregister_reboot_notifier(&drbd_notifier);
-
/* first remove proc,
* drbdsetup uses it's presence to detect
* whether DRBD is loaded.
@@ -2335,6 +2336,7 @@ static void drbd_cleanup(void)
destroy_workqueue(retry.wq);
drbd_genl_unregister();
+ drbd_debugfs_cleanup();
idr_for_each_entry(&drbd_devices, device, i)
drbd_delete_device(device);
@@ -2350,7 +2352,7 @@ static void drbd_cleanup(void)
idr_destroy(&drbd_devices);
- printk(KERN_INFO "drbd: module cleanup done.\n");
+ pr_info("module cleanup done.\n");
}
/**
@@ -2539,6 +2541,20 @@ int set_resource_options(struct drbd_resource *resource, struct res_opts *res_op
if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
err = bitmap_parse(res_opts->cpu_mask, DRBD_CPU_MASK_SIZE,
cpumask_bits(new_cpu_mask), nr_cpu_ids);
+ if (err == -EOVERFLOW) {
+ /* So what. mask it out. */
+ cpumask_var_t tmp_cpu_mask;
+ if (zalloc_cpumask_var(&tmp_cpu_mask, GFP_KERNEL)) {
+ cpumask_setall(tmp_cpu_mask);
+ cpumask_and(new_cpu_mask, new_cpu_mask, tmp_cpu_mask);
+ drbd_warn(resource, "Overflow in bitmap_parse(%.12s%s), truncating to %u bits\n",
+ res_opts->cpu_mask,
+ strlen(res_opts->cpu_mask) > 12 ? "..." : "",
+ nr_cpu_ids);
+ free_cpumask_var(tmp_cpu_mask);
+ err = 0;
+ }
+ }
if (err) {
drbd_warn(resource, "bitmap_parse() failed with %d\n", err);
/* retcode = ERR_CPU_MASK_PARSE; */
@@ -2579,10 +2595,12 @@ struct drbd_resource *drbd_create_resource(const char *name)
kref_init(&resource->kref);
idr_init(&resource->devices);
INIT_LIST_HEAD(&resource->connections);
+ resource->write_ordering = WO_bdev_flush;
list_add_tail_rcu(&resource->resources, &drbd_resources);
mutex_init(&resource->conf_update);
mutex_init(&resource->adm_mutex);
spin_lock_init(&resource->req_lock);
+ drbd_debugfs_resource_add(resource);
return resource;
fail_free_name:
@@ -2593,7 +2611,7 @@ fail:
return NULL;
}
-/* caller must be under genl_lock() */
+/* caller must be under adm_mutex */
struct drbd_connection *conn_create(const char *name, struct res_opts *res_opts)
{
struct drbd_resource *resource;
@@ -2617,7 +2635,6 @@ struct drbd_connection *conn_create(const char *name, struct res_opts *res_opts)
INIT_LIST_HEAD(&connection->current_epoch->list);
connection->epochs = 1;
spin_lock_init(&connection->epoch_lock);
- connection->write_ordering = WO_bdev_flush;
connection->send.seen_any_write_yet = false;
connection->send.current_epoch_nr = 0;
@@ -2652,6 +2669,7 @@ struct drbd_connection *conn_create(const char *name, struct res_opts *res_opts)
kref_get(&resource->kref);
list_add_tail_rcu(&connection->connections, &resource->connections);
+ drbd_debugfs_connection_add(connection);
return connection;
fail_resource:
@@ -2680,6 +2698,7 @@ void drbd_destroy_connection(struct kref *kref)
drbd_free_socket(&connection->data);
kfree(connection->int_dig_in);
kfree(connection->int_dig_vv);
+ memset(connection, 0xfc, sizeof(*connection));
kfree(connection);
kref_put(&resource->kref, drbd_destroy_resource);
}
@@ -2694,7 +2713,6 @@ static int init_submitter(struct drbd_device *device)
return -ENOMEM;
INIT_WORK(&device->submit.worker, do_submit);
- spin_lock_init(&device->submit.lock);
INIT_LIST_HEAD(&device->submit.writes);
return 0;
}
@@ -2764,8 +2782,8 @@ enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsig
blk_queue_merge_bvec(q, drbd_merge_bvec);
q->queue_lock = &resource->req_lock;
- device->md_io_page = alloc_page(GFP_KERNEL);
- if (!device->md_io_page)
+ device->md_io.page = alloc_page(GFP_KERNEL);
+ if (!device->md_io.page)
goto out_no_io_page;
if (drbd_bm_init(device))
@@ -2794,6 +2812,7 @@ enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsig
kref_get(&device->kref);
INIT_LIST_HEAD(&device->peer_devices);
+ INIT_LIST_HEAD(&device->pending_bitmap_io);
for_each_connection(connection, resource) {
peer_device = kzalloc(sizeof(struct drbd_peer_device), GFP_KERNEL);
if (!peer_device)
@@ -2829,7 +2848,10 @@ enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsig
for_each_peer_device(peer_device, device)
drbd_connected(peer_device);
}
-
+ /* move to create_peer_device() */
+ for_each_peer_device(peer_device, device)
+ drbd_debugfs_peer_device_add(peer_device);
+ drbd_debugfs_device_add(device);
return NO_ERROR;
out_idr_remove_vol:
@@ -2853,7 +2875,7 @@ out_idr_remove_minor:
out_no_minor_idr:
drbd_bm_cleanup(device);
out_no_bitmap:
- __free_page(device->md_io_page);
+ __free_page(device->md_io.page);
out_no_io_page:
put_disk(disk);
out_no_disk:
@@ -2868,8 +2890,13 @@ void drbd_delete_device(struct drbd_device *device)
{
struct drbd_resource *resource = device->resource;
struct drbd_connection *connection;
+ struct drbd_peer_device *peer_device;
int refs = 3;
+ /* move to free_peer_device() */
+ for_each_peer_device(peer_device, device)
+ drbd_debugfs_peer_device_cleanup(peer_device);
+ drbd_debugfs_device_cleanup(device);
for_each_connection(connection, resource) {
idr_remove(&connection->peer_devices, device->vnr);
refs++;
@@ -2881,13 +2908,12 @@ void drbd_delete_device(struct drbd_device *device)
kref_sub(&device->kref, refs, drbd_destroy_device);
}
-int __init drbd_init(void)
+static int __init drbd_init(void)
{
int err;
if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
- printk(KERN_ERR
- "drbd: invalid minor_count (%d)\n", minor_count);
+ pr_err("invalid minor_count (%d)\n", minor_count);
#ifdef MODULE
return -EINVAL;
#else
@@ -2897,14 +2923,11 @@ int __init drbd_init(void)
err = register_blkdev(DRBD_MAJOR, "drbd");
if (err) {
- printk(KERN_ERR
- "drbd: unable to register block device major %d\n",
+ pr_err("unable to register block device major %d\n",
DRBD_MAJOR);
return err;
}
- register_reboot_notifier(&drbd_notifier);
-
/*
* allocate all necessary structs
*/
@@ -2918,7 +2941,7 @@ int __init drbd_init(void)
err = drbd_genl_register();
if (err) {
- printk(KERN_ERR "drbd: unable to register generic netlink family\n");
+ pr_err("unable to register generic netlink family\n");
goto fail;
}
@@ -2929,38 +2952,39 @@ int __init drbd_init(void)
err = -ENOMEM;
drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
if (!drbd_proc) {
- printk(KERN_ERR "drbd: unable to register proc file\n");
+ pr_err("unable to register proc file\n");
goto fail;
}
retry.wq = create_singlethread_workqueue("drbd-reissue");
if (!retry.wq) {
- printk(KERN_ERR "drbd: unable to create retry workqueue\n");
+ pr_err("unable to create retry workqueue\n");
goto fail;
}
INIT_WORK(&retry.worker, do_retry);
spin_lock_init(&retry.lock);
INIT_LIST_HEAD(&retry.writes);
- printk(KERN_INFO "drbd: initialized. "
+ if (drbd_debugfs_init())
+ pr_notice("failed to initialize debugfs -- will not be available\n");
+
+ pr_info("initialized. "
"Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
- printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
- printk(KERN_INFO "drbd: registered as block device major %d\n",
- DRBD_MAJOR);
-
+ pr_info("%s\n", drbd_buildtag());
+ pr_info("registered as block device major %d\n", DRBD_MAJOR);
return 0; /* Success! */
fail:
drbd_cleanup();
if (err == -ENOMEM)
- printk(KERN_ERR "drbd: ran out of memory\n");
+ pr_err("ran out of memory\n");
else
- printk(KERN_ERR "drbd: initialization failure\n");
+ pr_err("initialization failure\n");
return err;
}
-void drbd_free_bc(struct drbd_backing_dev *ldev)
+void drbd_free_ldev(struct drbd_backing_dev *ldev)
{
if (ldev == NULL)
return;
@@ -2972,24 +2996,29 @@ void drbd_free_bc(struct drbd_backing_dev *ldev)
kfree(ldev);
}
-void drbd_free_sock(struct drbd_connection *connection)
+static void drbd_free_one_sock(struct drbd_socket *ds)
{
- if (connection->data.socket) {
- mutex_lock(&connection->data.mutex);
- kernel_sock_shutdown(connection->data.socket, SHUT_RDWR);
- sock_release(connection->data.socket);
- connection->data.socket = NULL;
- mutex_unlock(&connection->data.mutex);
- }
- if (connection->meta.socket) {
- mutex_lock(&connection->meta.mutex);
- kernel_sock_shutdown(connection->meta.socket, SHUT_RDWR);
- sock_release(connection->meta.socket);
- connection->meta.socket = NULL;
- mutex_unlock(&connection->meta.mutex);
+ struct socket *s;
+ mutex_lock(&ds->mutex);
+ s = ds->socket;
+ ds->socket = NULL;
+ mutex_unlock(&ds->mutex);
+ if (s) {
+ /* so debugfs does not need to mutex_lock() */
+ synchronize_rcu();
+ kernel_sock_shutdown(s, SHUT_RDWR);
+ sock_release(s);
}
}
+void drbd_free_sock(struct drbd_connection *connection)
+{
+ if (connection->data.socket)
+ drbd_free_one_sock(&connection->data);
+ if (connection->meta.socket)
+ drbd_free_one_sock(&connection->meta);
+}
+
/* meta data management */
void conn_md_sync(struct drbd_connection *connection)
@@ -3093,7 +3122,7 @@ void drbd_md_sync(struct drbd_device *device)
if (!get_ldev_if_state(device, D_FAILED))
return;
- buffer = drbd_md_get_buffer(device);
+ buffer = drbd_md_get_buffer(device, __func__);
if (!buffer)
goto out;
@@ -3253,7 +3282,7 @@ int drbd_md_read(struct drbd_device *device, struct drbd_backing_dev *bdev)
if (device->state.disk != D_DISKLESS)
return ERR_DISK_CONFIGURED;
- buffer = drbd_md_get_buffer(device);
+ buffer = drbd_md_get_buffer(device, __func__);
if (!buffer)
return ERR_NOMEM;
@@ -3466,23 +3495,19 @@ void drbd_uuid_set_bm(struct drbd_device *device, u64 val) __must_hold(local)
*
* Sets all bits in the bitmap and writes the whole bitmap to stable storage.
*/
-int drbd_bmio_set_n_write(struct drbd_device *device)
+int drbd_bmio_set_n_write(struct drbd_device *device) __must_hold(local)
{
int rv = -EIO;
- if (get_ldev_if_state(device, D_ATTACHING)) {
- drbd_md_set_flag(device, MDF_FULL_SYNC);
- drbd_md_sync(device);
- drbd_bm_set_all(device);
-
- rv = drbd_bm_write(device);
+ drbd_md_set_flag(device, MDF_FULL_SYNC);
+ drbd_md_sync(device);
+ drbd_bm_set_all(device);
- if (!rv) {
- drbd_md_clear_flag(device, MDF_FULL_SYNC);
- drbd_md_sync(device);
- }
+ rv = drbd_bm_write(device);
- put_ldev(device);
+ if (!rv) {
+ drbd_md_clear_flag(device, MDF_FULL_SYNC);
+ drbd_md_sync(device);
}
return rv;
@@ -3494,18 +3519,11 @@ int drbd_bmio_set_n_write(struct drbd_device *device)
*
* Clears all bits in the bitmap and writes the whole bitmap to stable storage.
*/
-int drbd_bmio_clear_n_write(struct drbd_device *device)
+int drbd_bmio_clear_n_write(struct drbd_device *device) __must_hold(local)
{
- int rv = -EIO;
-
drbd_resume_al(device);
- if (get_ldev_if_state(device, D_ATTACHING)) {
- drbd_bm_clear_all(device);
- rv = drbd_bm_write(device);
- put_ldev(device);
- }
-
- return rv;
+ drbd_bm_clear_all(device);
+ return drbd_bm_write(device);
}
static int w_bitmap_io(struct drbd_work *w, int unused)
@@ -3537,61 +3555,6 @@ static int w_bitmap_io(struct drbd_work *w, int unused)
return 0;
}
-void drbd_ldev_destroy(struct drbd_device *device)
-{
- lc_destroy(device->resync);
- device->resync = NULL;
- lc_destroy(device->act_log);
- device->act_log = NULL;
- __no_warn(local,
- drbd_free_bc(device->ldev);
- device->ldev = NULL;);
-
- clear_bit(GO_DISKLESS, &device->flags);
-}
-
-static int w_go_diskless(struct drbd_work *w, int unused)
-{
- struct drbd_device *device =
- container_of(w, struct drbd_device, go_diskless);
-
- D_ASSERT(device, device->state.disk == D_FAILED);
- /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
- * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
- * the protected members anymore, though, so once put_ldev reaches zero
- * again, it will be safe to free them. */
-
- /* Try to write changed bitmap pages, read errors may have just
- * set some bits outside the area covered by the activity log.
- *
- * If we have an IO error during the bitmap writeout,
- * we will want a full sync next time, just in case.
- * (Do we want a specific meta data flag for this?)
- *
- * If that does not make it to stable storage either,
- * we cannot do anything about that anymore.
- *
- * We still need to check if both bitmap and ldev are present, we may
- * end up here after a failed attach, before ldev was even assigned.
- */
- if (device->bitmap && device->ldev) {
- /* An interrupted resync or similar is allowed to recounts bits
- * while we detach.
- * Any modifications would not be expected anymore, though.
- */
- if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
- "detach", BM_LOCKED_TEST_ALLOWED)) {
- if (test_bit(WAS_READ_ERROR, &device->flags)) {
- drbd_md_set_flag(device, MDF_FULL_SYNC);
- drbd_md_sync(device);
- }
- }
- }
-
- drbd_force_state(device, NS(disk, D_DISKLESS));
- return 0;
-}
-
/**
* drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
* @device: DRBD device.
@@ -3603,6 +3566,9 @@ static int w_go_diskless(struct drbd_work *w, int unused)
* that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
* called from worker context. It MUST NOT be used while a previous such
* work is still pending!
+ *
+ * Its worker function encloses the call of io_fn() by get_ldev() and
+ * put_ldev().
*/
void drbd_queue_bitmap_io(struct drbd_device *device,
int (*io_fn)(struct drbd_device *),
@@ -3685,25 +3651,7 @@ int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
static void md_sync_timer_fn(unsigned long data)
{
struct drbd_device *device = (struct drbd_device *) data;
-
- /* must not double-queue! */
- if (list_empty(&device->md_sync_work.list))
- drbd_queue_work_front(&first_peer_device(device)->connection->sender_work,
- &device->md_sync_work);
-}
-
-static int w_md_sync(struct drbd_work *w, int unused)
-{
- struct drbd_device *device =
- container_of(w, struct drbd_device, md_sync_work);
-
- drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
-#ifdef DEBUG
- drbd_warn(device, "last md_mark_dirty: %s:%u\n",
- device->last_md_mark_dirty.func, device->last_md_mark_dirty.line);
-#endif
- drbd_md_sync(device);
- return 0;
+ drbd_device_post_work(device, MD_SYNC);
}
const char *cmdname(enum drbd_packet cmd)
diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index 3f2e16738080..1cd47df44bda 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -23,6 +23,8 @@
*/
+#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
+
#include <linux/module.h>
#include <linux/drbd.h>
#include <linux/in.h>
@@ -85,7 +87,7 @@ static void drbd_adm_send_reply(struct sk_buff *skb, struct genl_info *info)
{
genlmsg_end(skb, genlmsg_data(nlmsg_data(nlmsg_hdr(skb))));
if (genlmsg_reply(skb, info))
- printk(KERN_ERR "drbd: error sending genl reply\n");
+ pr_err("error sending genl reply\n");
}
/* Used on a fresh "drbd_adm_prepare"d reply_skb, this cannot fail: The only
@@ -558,8 +560,10 @@ void conn_try_outdate_peer_async(struct drbd_connection *connection)
}
enum drbd_state_rv
-drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
+drbd_set_role(struct drbd_device *const device, enum drbd_role new_role, int force)
{
+ struct drbd_peer_device *const peer_device = first_peer_device(device);
+ struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
const int max_tries = 4;
enum drbd_state_rv rv = SS_UNKNOWN_ERROR;
struct net_conf *nc;
@@ -607,7 +611,7 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
device->state.disk == D_CONSISTENT && mask.pdsk == 0) {
D_ASSERT(device, device->state.pdsk == D_UNKNOWN);
- if (conn_try_outdate_peer(first_peer_device(device)->connection)) {
+ if (conn_try_outdate_peer(connection)) {
val.disk = D_UP_TO_DATE;
mask.disk = D_MASK;
}
@@ -617,7 +621,7 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
if (rv == SS_NOTHING_TO_DO)
goto out;
if (rv == SS_PRIMARY_NOP && mask.pdsk == 0) {
- if (!conn_try_outdate_peer(first_peer_device(device)->connection) && force) {
+ if (!conn_try_outdate_peer(connection) && force) {
drbd_warn(device, "Forced into split brain situation!\n");
mask.pdsk = D_MASK;
val.pdsk = D_OUTDATED;
@@ -630,7 +634,7 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
retry at most once more in this case. */
int timeo;
rcu_read_lock();
- nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
+ nc = rcu_dereference(connection->net_conf);
timeo = nc ? (nc->ping_timeo + 1) * HZ / 10 : 1;
rcu_read_unlock();
schedule_timeout_interruptible(timeo);
@@ -659,19 +663,17 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
/* FIXME also wait for all pending P_BARRIER_ACK? */
if (new_role == R_SECONDARY) {
- set_disk_ro(device->vdisk, true);
if (get_ldev(device)) {
device->ldev->md.uuid[UI_CURRENT] &= ~(u64)1;
put_ldev(device);
}
} else {
- /* Called from drbd_adm_set_role only.
- * We are still holding the conf_update mutex. */
- nc = first_peer_device(device)->connection->net_conf;
+ mutex_lock(&device->resource->conf_update);
+ nc = connection->net_conf;
if (nc)
nc->discard_my_data = 0; /* without copy; single bit op is atomic */
+ mutex_unlock(&device->resource->conf_update);
- set_disk_ro(device->vdisk, false);
if (get_ldev(device)) {
if (((device->state.conn < C_CONNECTED ||
device->state.pdsk <= D_FAILED)
@@ -689,12 +691,12 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
if (device->state.conn >= C_WF_REPORT_PARAMS) {
/* if this was forced, we should consider sync */
if (forced)
- drbd_send_uuids(first_peer_device(device));
- drbd_send_current_state(first_peer_device(device));
+ drbd_send_uuids(peer_device);
+ drbd_send_current_state(peer_device);
}
drbd_md_sync(device);
-
+ set_disk_ro(device->vdisk, new_role == R_SECONDARY);
kobject_uevent(&disk_to_dev(device->vdisk)->kobj, KOBJ_CHANGE);
out:
mutex_unlock(device->state_mutex);
@@ -891,7 +893,7 @@ drbd_determine_dev_size(struct drbd_device *device, enum dds_flags flags, struct
* still lock the act_log to not trigger ASSERTs there.
*/
drbd_suspend_io(device);
- buffer = drbd_md_get_buffer(device); /* Lock meta-data IO */
+ buffer = drbd_md_get_buffer(device, __func__); /* Lock meta-data IO */
if (!buffer) {
drbd_resume_io(device);
return DS_ERROR;
@@ -971,6 +973,10 @@ drbd_determine_dev_size(struct drbd_device *device, enum dds_flags flags, struct
if (la_size_changed || md_moved || rs) {
u32 prev_flags;
+ /* We do some synchronous IO below, which may take some time.
+ * Clear the timer, to avoid scary "timer expired!" messages,
+ * "Superblock" is written out at least twice below, anyways. */
+ del_timer(&device->md_sync_timer);
drbd_al_shrink(device); /* All extents inactive. */
prev_flags = md->flags;
@@ -1116,15 +1122,16 @@ static int drbd_check_al_size(struct drbd_device *device, struct disk_conf *dc)
return 0;
}
-static void drbd_setup_queue_param(struct drbd_device *device, unsigned int max_bio_size)
+static void drbd_setup_queue_param(struct drbd_device *device, struct drbd_backing_dev *bdev,
+ unsigned int max_bio_size)
{
struct request_queue * const q = device->rq_queue;
unsigned int max_hw_sectors = max_bio_size >> 9;
unsigned int max_segments = 0;
struct request_queue *b = NULL;
- if (get_ldev_if_state(device, D_ATTACHING)) {
- b = device->ldev->backing_bdev->bd_disk->queue;
+ if (bdev) {
+ b = bdev->backing_bdev->bd_disk->queue;
max_hw_sectors = min(queue_max_hw_sectors(b), max_bio_size >> 9);
rcu_read_lock();
@@ -1169,11 +1176,10 @@ static void drbd_setup_queue_param(struct drbd_device *device, unsigned int max_
b->backing_dev_info.ra_pages);
q->backing_dev_info.ra_pages = b->backing_dev_info.ra_pages;
}
- put_ldev(device);
}
}
-void drbd_reconsider_max_bio_size(struct drbd_device *device)
+void drbd_reconsider_max_bio_size(struct drbd_device *device, struct drbd_backing_dev *bdev)
{
unsigned int now, new, local, peer;
@@ -1181,10 +1187,9 @@ void drbd_reconsider_max_bio_size(struct drbd_device *device)
local = device->local_max_bio_size; /* Eventually last known value, from volatile memory */
peer = device->peer_max_bio_size; /* Eventually last known value, from meta data */
- if (get_ldev_if_state(device, D_ATTACHING)) {
- local = queue_max_hw_sectors(device->ldev->backing_bdev->bd_disk->queue) << 9;
+ if (bdev) {
+ local = queue_max_hw_sectors(bdev->backing_bdev->bd_disk->queue) << 9;
device->local_max_bio_size = local;
- put_ldev(device);
}
local = min(local, DRBD_MAX_BIO_SIZE);
@@ -1217,7 +1222,7 @@ void drbd_reconsider_max_bio_size(struct drbd_device *device)
if (new != now)
drbd_info(device, "max BIO size = %u\n", new);
- drbd_setup_queue_param(device, new);
+ drbd_setup_queue_param(device, bdev, new);
}
/* Starts the worker thread */
@@ -1299,6 +1304,13 @@ static unsigned int drbd_al_extents_max(struct drbd_backing_dev *bdev)
return (al_size_4k - 1) * AL_CONTEXT_PER_TRANSACTION;
}
+static bool write_ordering_changed(struct disk_conf *a, struct disk_conf *b)
+{
+ return a->disk_barrier != b->disk_barrier ||
+ a->disk_flushes != b->disk_flushes ||
+ a->disk_drain != b->disk_drain;
+}
+
int drbd_adm_disk_opts(struct sk_buff *skb, struct genl_info *info)
{
struct drbd_config_context adm_ctx;
@@ -1405,7 +1417,8 @@ int drbd_adm_disk_opts(struct sk_buff *skb, struct genl_info *info)
else
set_bit(MD_NO_FUA, &device->flags);
- drbd_bump_write_ordering(first_peer_device(device)->connection, WO_bdev_flush);
+ if (write_ordering_changed(old_disk_conf, new_disk_conf))
+ drbd_bump_write_ordering(device->resource, NULL, WO_bdev_flush);
drbd_md_sync(device);
@@ -1440,6 +1453,8 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
{
struct drbd_config_context adm_ctx;
struct drbd_device *device;
+ struct drbd_peer_device *peer_device;
+ struct drbd_connection *connection;
int err;
enum drbd_ret_code retcode;
enum determine_dev_size dd;
@@ -1462,7 +1477,9 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
device = adm_ctx.device;
mutex_lock(&adm_ctx.resource->adm_mutex);
- conn_reconfig_start(first_peer_device(device)->connection);
+ peer_device = first_peer_device(device);
+ connection = peer_device ? peer_device->connection : NULL;
+ conn_reconfig_start(connection);
/* if you want to reconfigure, please tear down first */
if (device->state.disk > D_DISKLESS) {
@@ -1473,7 +1490,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
* drbd_ldev_destroy is done already, we may end up here very fast,
* e.g. if someone calls attach from the on-io-error handler,
* to realize a "hot spare" feature (not that I'd recommend that) */
- wait_event(device->misc_wait, !atomic_read(&device->local_cnt));
+ wait_event(device->misc_wait, !test_bit(GOING_DISKLESS, &device->flags));
/* make sure there is no leftover from previous force-detach attempts */
clear_bit(FORCE_DETACH, &device->flags);
@@ -1529,7 +1546,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
goto fail;
rcu_read_lock();
- nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
+ nc = rcu_dereference(connection->net_conf);
if (nc) {
if (new_disk_conf->fencing == FP_STONITH && nc->wire_protocol == DRBD_PROT_A) {
rcu_read_unlock();
@@ -1649,7 +1666,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
*/
wait_event(device->misc_wait, !atomic_read(&device->ap_pending_cnt) || drbd_suspended(device));
/* and for any other previously queued work */
- drbd_flush_workqueue(&first_peer_device(device)->connection->sender_work);
+ drbd_flush_workqueue(&connection->sender_work);
rv = _drbd_request_state(device, NS(disk, D_ATTACHING), CS_VERBOSE);
retcode = rv; /* FIXME: Type mismatch. */
@@ -1710,7 +1727,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
new_disk_conf = NULL;
new_plan = NULL;
- drbd_bump_write_ordering(first_peer_device(device)->connection, WO_bdev_flush);
+ drbd_bump_write_ordering(device->resource, device->ldev, WO_bdev_flush);
if (drbd_md_test_flag(device->ldev, MDF_CRASHED_PRIMARY))
set_bit(CRASHED_PRIMARY, &device->flags);
@@ -1726,7 +1743,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
device->read_cnt = 0;
device->writ_cnt = 0;
- drbd_reconsider_max_bio_size(device);
+ drbd_reconsider_max_bio_size(device, device->ldev);
/* If I am currently not R_PRIMARY,
* but meta data primary indicator is set,
@@ -1845,7 +1862,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
kobject_uevent(&disk_to_dev(device->vdisk)->kobj, KOBJ_CHANGE);
put_ldev(device);
- conn_reconfig_done(first_peer_device(device)->connection);
+ conn_reconfig_done(connection);
mutex_unlock(&adm_ctx.resource->adm_mutex);
drbd_adm_finish(&adm_ctx, info, retcode);
return 0;
@@ -1856,7 +1873,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
drbd_force_state(device, NS(disk, D_DISKLESS));
drbd_md_sync(device);
fail:
- conn_reconfig_done(first_peer_device(device)->connection);
+ conn_reconfig_done(connection);
if (nbc) {
if (nbc->backing_bdev)
blkdev_put(nbc->backing_bdev,
@@ -1888,7 +1905,7 @@ static int adm_detach(struct drbd_device *device, int force)
}
drbd_suspend_io(device); /* so no-one is stuck in drbd_al_begin_io */
- drbd_md_get_buffer(device); /* make sure there is no in-flight meta-data IO */
+ drbd_md_get_buffer(device, __func__); /* make sure there is no in-flight meta-data IO */
retcode = drbd_request_state(device, NS(disk, D_FAILED));
drbd_md_put_buffer(device);
/* D_FAILED will transition to DISKLESS. */
@@ -2654,8 +2671,13 @@ int drbd_adm_invalidate(struct sk_buff *skb, struct genl_info *info)
if (retcode != NO_ERROR)
goto out;
- mutex_lock(&adm_ctx.resource->adm_mutex);
device = adm_ctx.device;
+ if (!get_ldev(device)) {
+ retcode = ERR_NO_DISK;
+ goto out;
+ }
+
+ mutex_lock(&adm_ctx.resource->adm_mutex);
/* If there is still bitmap IO pending, probably because of a previous
* resync just being finished, wait for it before requesting a new resync.
@@ -2679,6 +2701,7 @@ int drbd_adm_invalidate(struct sk_buff *skb, struct genl_info *info)
retcode = drbd_request_state(device, NS(conn, C_STARTING_SYNC_T));
drbd_resume_io(device);
mutex_unlock(&adm_ctx.resource->adm_mutex);
+ put_ldev(device);
out:
drbd_adm_finish(&adm_ctx, info, retcode);
return 0;
@@ -2704,7 +2727,7 @@ out:
return 0;
}
-static int drbd_bmio_set_susp_al(struct drbd_device *device)
+static int drbd_bmio_set_susp_al(struct drbd_device *device) __must_hold(local)
{
int rv;
@@ -2725,8 +2748,13 @@ int drbd_adm_invalidate_peer(struct sk_buff *skb, struct genl_info *info)
if (retcode != NO_ERROR)
goto out;
- mutex_lock(&adm_ctx.resource->adm_mutex);
device = adm_ctx.device;
+ if (!get_ldev(device)) {
+ retcode = ERR_NO_DISK;
+ goto out;
+ }
+
+ mutex_lock(&adm_ctx.resource->adm_mutex);
/* If there is still bitmap IO pending, probably because of a previous
* resync just being finished, wait for it before requesting a new resync.
@@ -2753,6 +2781,7 @@ int drbd_adm_invalidate_peer(struct sk_buff *skb, struct genl_info *info)
retcode = drbd_request_state(device, NS(conn, C_STARTING_SYNC_S));
drbd_resume_io(device);
mutex_unlock(&adm_ctx.resource->adm_mutex);
+ put_ldev(device);
out:
drbd_adm_finish(&adm_ctx, info, retcode);
return 0;
@@ -2892,7 +2921,7 @@ static struct drbd_connection *the_only_connection(struct drbd_resource *resourc
return list_first_entry(&resource->connections, struct drbd_connection, connections);
}
-int nla_put_status_info(struct sk_buff *skb, struct drbd_device *device,
+static int nla_put_status_info(struct sk_buff *skb, struct drbd_device *device,
const struct sib_info *sib)
{
struct drbd_resource *resource = device->resource;
@@ -3622,13 +3651,6 @@ void drbd_bcast_event(struct drbd_device *device, const struct sib_info *sib)
unsigned seq;
int err = -ENOMEM;
- if (sib->sib_reason == SIB_SYNC_PROGRESS) {
- if (time_after(jiffies, device->rs_last_bcast + HZ))
- device->rs_last_bcast = jiffies;
- else
- return;
- }
-
seq = atomic_inc_return(&drbd_genl_seq);
msg = genlmsg_new(NLMSG_GOODSIZE, GFP_NOIO);
if (!msg)
diff --git a/drivers/block/drbd/drbd_proc.c b/drivers/block/drbd/drbd_proc.c
index 89736bdbbc70..06e6147c7601 100644
--- a/drivers/block/drbd/drbd_proc.c
+++ b/drivers/block/drbd/drbd_proc.c
@@ -60,20 +60,65 @@ static void seq_printf_with_thousands_grouping(struct seq_file *seq, long v)
seq_printf(seq, "%ld", v);
}
+static void drbd_get_syncer_progress(struct drbd_device *device,
+ union drbd_dev_state state, unsigned long *rs_total,
+ unsigned long *bits_left, unsigned int *per_mil_done)
+{
+ /* this is to break it at compile time when we change that, in case we
+ * want to support more than (1<<32) bits on a 32bit arch. */
+ typecheck(unsigned long, device->rs_total);
+ *rs_total = device->rs_total;
+
+ /* note: both rs_total and rs_left are in bits, i.e. in
+ * units of BM_BLOCK_SIZE.
+ * for the percentage, we don't care. */
+
+ if (state.conn == C_VERIFY_S || state.conn == C_VERIFY_T)
+ *bits_left = device->ov_left;
+ else
+ *bits_left = drbd_bm_total_weight(device) - device->rs_failed;
+ /* >> 10 to prevent overflow,
+ * +1 to prevent division by zero */
+ if (*bits_left > *rs_total) {
+ /* D'oh. Maybe a logic bug somewhere. More likely just a race
+ * between state change and reset of rs_total.
+ */
+ *bits_left = *rs_total;
+ *per_mil_done = *rs_total ? 0 : 1000;
+ } else {
+ /* Make sure the division happens in long context.
+ * We allow up to one petabyte storage right now,
+ * at a granularity of 4k per bit that is 2**38 bits.
+ * After shift right and multiplication by 1000,
+ * this should still fit easily into a 32bit long,
+ * so we don't need a 64bit division on 32bit arch.
+ * Note: currently we don't support such large bitmaps on 32bit
+ * arch anyways, but no harm done to be prepared for it here.
+ */
+ unsigned int shift = *rs_total > UINT_MAX ? 16 : 10;
+ unsigned long left = *bits_left >> shift;
+ unsigned long total = 1UL + (*rs_total >> shift);
+ unsigned long tmp = 1000UL - left * 1000UL/total;
+ *per_mil_done = tmp;
+ }
+}
+
+
/*lge
* progress bars shamelessly adapted from driver/md/md.c
* output looks like
* [=====>..............] 33.5% (23456/123456)
* finish: 2:20:20 speed: 6,345 (6,456) K/sec
*/
-static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *seq)
+static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *seq,
+ union drbd_dev_state state)
{
- unsigned long db, dt, dbdt, rt, rs_left;
+ unsigned long db, dt, dbdt, rt, rs_total, rs_left;
unsigned int res;
int i, x, y;
int stalled = 0;
- drbd_get_syncer_progress(device, &rs_left, &res);
+ drbd_get_syncer_progress(device, state, &rs_total, &rs_left, &res);
x = res/50;
y = 20-x;
@@ -85,21 +130,21 @@ static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *se
seq_printf(seq, ".");
seq_printf(seq, "] ");
- if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
+ if (state.conn == C_VERIFY_S || state.conn == C_VERIFY_T)
seq_printf(seq, "verified:");
else
seq_printf(seq, "sync'ed:");
seq_printf(seq, "%3u.%u%% ", res / 10, res % 10);
/* if more than a few GB, display in MB */
- if (device->rs_total > (4UL << (30 - BM_BLOCK_SHIFT)))
+ if (rs_total > (4UL << (30 - BM_BLOCK_SHIFT)))
seq_printf(seq, "(%lu/%lu)M",
(unsigned long) Bit2KB(rs_left >> 10),
- (unsigned long) Bit2KB(device->rs_total >> 10));
+ (unsigned long) Bit2KB(rs_total >> 10));
else
seq_printf(seq, "(%lu/%lu)K\n\t",
(unsigned long) Bit2KB(rs_left),
- (unsigned long) Bit2KB(device->rs_total));
+ (unsigned long) Bit2KB(rs_total));
/* see drivers/md/md.c
* We do not want to overflow, so the order of operands and
@@ -150,13 +195,13 @@ static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *se
dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
if (dt == 0)
dt = 1;
- db = device->rs_total - rs_left;
+ db = rs_total - rs_left;
dbdt = Bit2KB(db/dt);
seq_printf_with_thousands_grouping(seq, dbdt);
seq_printf(seq, ")");
- if (device->state.conn == C_SYNC_TARGET ||
- device->state.conn == C_VERIFY_S) {
+ if (state.conn == C_SYNC_TARGET ||
+ state.conn == C_VERIFY_S) {
seq_printf(seq, " want: ");
seq_printf_with_thousands_grouping(seq, device->c_sync_rate);
}
@@ -168,8 +213,8 @@ static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *se
unsigned long bm_bits = drbd_bm_bits(device);
unsigned long bit_pos;
unsigned long long stop_sector = 0;
- if (device->state.conn == C_VERIFY_S ||
- device->state.conn == C_VERIFY_T) {
+ if (state.conn == C_VERIFY_S ||
+ state.conn == C_VERIFY_T) {
bit_pos = bm_bits - device->ov_left;
if (verify_can_do_stop_sector(device))
stop_sector = device->ov_stop_sector;
@@ -188,22 +233,13 @@ static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *se
}
}
-static void resync_dump_detail(struct seq_file *seq, struct lc_element *e)
-{
- struct bm_extent *bme = lc_entry(e, struct bm_extent, lce);
-
- seq_printf(seq, "%5d %s %s\n", bme->rs_left,
- bme->flags & BME_NO_WRITES ? "NO_WRITES" : "---------",
- bme->flags & BME_LOCKED ? "LOCKED" : "------"
- );
-}
-
static int drbd_seq_show(struct seq_file *seq, void *v)
{
int i, prev_i = -1;
const char *sn;
struct drbd_device *device;
struct net_conf *nc;
+ union drbd_dev_state state;
char wp;
static char write_ordering_chars[] = {
@@ -241,11 +277,12 @@ static int drbd_seq_show(struct seq_file *seq, void *v)
seq_printf(seq, "\n");
prev_i = i;
- sn = drbd_conn_str(device->state.conn);
+ state = device->state;
+ sn = drbd_conn_str(state.conn);
- if (device->state.conn == C_STANDALONE &&
- device->state.disk == D_DISKLESS &&
- device->state.role == R_SECONDARY) {
+ if (state.conn == C_STANDALONE &&
+ state.disk == D_DISKLESS &&
+ state.role == R_SECONDARY) {
seq_printf(seq, "%2d: cs:Unconfigured\n", i);
} else {
/* reset device->congestion_reason */
@@ -258,15 +295,15 @@ static int drbd_seq_show(struct seq_file *seq, void *v)
" ns:%u nr:%u dw:%u dr:%u al:%u bm:%u "
"lo:%d pe:%d ua:%d ap:%d ep:%d wo:%c",
i, sn,
- drbd_role_str(device->state.role),
- drbd_role_str(device->state.peer),
- drbd_disk_str(device->state.disk),
- drbd_disk_str(device->state.pdsk),
+ drbd_role_str(state.role),
+ drbd_role_str(state.peer),
+ drbd_disk_str(state.disk),
+ drbd_disk_str(state.pdsk),
wp,
drbd_suspended(device) ? 's' : 'r',
- device->state.aftr_isp ? 'a' : '-',
- device->state.peer_isp ? 'p' : '-',
- device->state.user_isp ? 'u' : '-',
+ state.aftr_isp ? 'a' : '-',
+ state.peer_isp ? 'p' : '-',
+ state.user_isp ? 'u' : '-',
device->congestion_reason ?: '-',
test_bit(AL_SUSPENDED, &device->flags) ? 's' : '-',
device->send_cnt/2,
@@ -281,17 +318,17 @@ static int drbd_seq_show(struct seq_file *seq, void *v)
atomic_read(&device->unacked_cnt),
atomic_read(&device->ap_bio_cnt),
first_peer_device(device)->connection->epochs,
- write_ordering_chars[first_peer_device(device)->connection->write_ordering]
+ write_ordering_chars[device->resource->write_ordering]
);
seq_printf(seq, " oos:%llu\n",
Bit2KB((unsigned long long)
drbd_bm_total_weight(device)));
}
- if (device->state.conn == C_SYNC_SOURCE ||
- device->state.conn == C_SYNC_TARGET ||
- device->state.conn == C_VERIFY_S ||
- device->state.conn == C_VERIFY_T)
- drbd_syncer_progress(device, seq);
+ if (state.conn == C_SYNC_SOURCE ||
+ state.conn == C_SYNC_TARGET ||
+ state.conn == C_VERIFY_S ||
+ state.conn == C_VERIFY_T)
+ drbd_syncer_progress(device, seq, state);
if (proc_details >= 1 && get_ldev_if_state(device, D_FAILED)) {
lc_seq_printf_stats(seq, device->resync);
@@ -299,12 +336,8 @@ static int drbd_seq_show(struct seq_file *seq, void *v)
put_ldev(device);
}
- if (proc_details >= 2) {
- if (device->resync) {
- lc_seq_dump_details(seq, device->resync, "rs_left",
- resync_dump_detail);
- }
- }
+ if (proc_details >= 2)
+ seq_printf(seq, "\tblocked on activity log: %d\n", atomic_read(&device->ap_actlog_cnt));
}
rcu_read_unlock();
@@ -316,7 +349,7 @@ static int drbd_proc_open(struct inode *inode, struct file *file)
int err;
if (try_module_get(THIS_MODULE)) {
- err = single_open(file, drbd_seq_show, PDE_DATA(inode));
+ err = single_open(file, drbd_seq_show, NULL);
if (err)
module_put(THIS_MODULE);
return err;
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index 5b17ec88ea05..9342b8da73ab 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -362,17 +362,14 @@ drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t secto
goto fail;
}
+ memset(peer_req, 0, sizeof(*peer_req));
+ INIT_LIST_HEAD(&peer_req->w.list);
drbd_clear_interval(&peer_req->i);
peer_req->i.size = data_size;
peer_req->i.sector = sector;
- peer_req->i.local = false;
- peer_req->i.waiting = false;
-
- peer_req->epoch = NULL;
+ peer_req->submit_jif = jiffies;
peer_req->peer_device = peer_device;
peer_req->pages = page;
- atomic_set(&peer_req->pending_bios, 0);
- peer_req->flags = 0;
/*
* The block_id is opaque to the receiver. It is not endianness
* converted, and sent back to the sender unchanged.
@@ -389,11 +386,16 @@ drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t secto
void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
int is_net)
{
+ might_sleep();
if (peer_req->flags & EE_HAS_DIGEST)
kfree(peer_req->digest);
drbd_free_pages(device, peer_req->pages, is_net);
D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
D_ASSERT(device, drbd_interval_empty(&peer_req->i));
+ if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
+ peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
+ drbd_al_complete_io(device, &peer_req->i);
+ }
mempool_free(peer_req, drbd_ee_mempool);
}
@@ -791,8 +793,18 @@ static int receive_first_packet(struct drbd_connection *connection, struct socke
{
unsigned int header_size = drbd_header_size(connection);
struct packet_info pi;
+ struct net_conf *nc;
int err;
+ rcu_read_lock();
+ nc = rcu_dereference(connection->net_conf);
+ if (!nc) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+ sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
+ rcu_read_unlock();
+
err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
if (err != header_size) {
if (err >= 0)
@@ -809,7 +821,7 @@ static int receive_first_packet(struct drbd_connection *connection, struct socke
* drbd_socket_okay() - Free the socket if its connection is not okay
* @sock: pointer to the pointer to the socket.
*/
-static int drbd_socket_okay(struct socket **sock)
+static bool drbd_socket_okay(struct socket **sock)
{
int rr;
char tb[4];
@@ -827,6 +839,30 @@ static int drbd_socket_okay(struct socket **sock)
return false;
}
}
+
+static bool connection_established(struct drbd_connection *connection,
+ struct socket **sock1,
+ struct socket **sock2)
+{
+ struct net_conf *nc;
+ int timeout;
+ bool ok;
+
+ if (!*sock1 || !*sock2)
+ return false;
+
+ rcu_read_lock();
+ nc = rcu_dereference(connection->net_conf);
+ timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
+ rcu_read_unlock();
+ schedule_timeout_interruptible(timeout);
+
+ ok = drbd_socket_okay(sock1);
+ ok = drbd_socket_okay(sock2) && ok;
+
+ return ok;
+}
+
/* Gets called if a connection is established, or if a new minor gets created
in a connection */
int drbd_connected(struct drbd_peer_device *peer_device)
@@ -868,8 +904,8 @@ static int conn_connect(struct drbd_connection *connection)
struct drbd_socket sock, msock;
struct drbd_peer_device *peer_device;
struct net_conf *nc;
- int vnr, timeout, h, ok;
- bool discard_my_data;
+ int vnr, timeout, h;
+ bool discard_my_data, ok;
enum drbd_state_rv rv;
struct accept_wait_data ad = {
.connection = connection,
@@ -913,17 +949,8 @@ static int conn_connect(struct drbd_connection *connection)
}
}
- if (sock.socket && msock.socket) {
- rcu_read_lock();
- nc = rcu_dereference(connection->net_conf);
- timeout = nc->ping_timeo * HZ / 10;
- rcu_read_unlock();
- schedule_timeout_interruptible(timeout);
- ok = drbd_socket_okay(&sock.socket);
- ok = drbd_socket_okay(&msock.socket) && ok;
- if (ok)
- break;
- }
+ if (connection_established(connection, &sock.socket, &msock.socket))
+ break;
retry:
s = drbd_wait_for_connect(connection, &ad);
@@ -969,8 +996,7 @@ randomize:
goto out_release_sockets;
}
- ok = drbd_socket_okay(&sock.socket);
- ok = drbd_socket_okay(&msock.socket) && ok;
+ ok = connection_established(connection, &sock.socket, &msock.socket);
} while (!ok);
if (ad.s_listen)
@@ -1151,7 +1177,7 @@ static void drbd_flush(struct drbd_connection *connection)
struct drbd_peer_device *peer_device;
int vnr;
- if (connection->write_ordering >= WO_bdev_flush) {
+ if (connection->resource->write_ordering >= WO_bdev_flush) {
rcu_read_lock();
idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
struct drbd_device *device = peer_device->device;
@@ -1161,14 +1187,22 @@ static void drbd_flush(struct drbd_connection *connection)
kref_get(&device->kref);
rcu_read_unlock();
+ /* Right now, we have only this one synchronous code path
+ * for flushes between request epochs.
+ * We may want to make those asynchronous,
+ * or at least parallelize the flushes to the volume devices.
+ */
+ device->flush_jif = jiffies;
+ set_bit(FLUSH_PENDING, &device->flags);
rv = blkdev_issue_flush(device->ldev->backing_bdev,
GFP_NOIO, NULL);
+ clear_bit(FLUSH_PENDING, &device->flags);
if (rv) {
drbd_info(device, "local disk flush failed with status %d\n", rv);
/* would rather check on EOPNOTSUPP, but that is not reliable.
* don't try again for ANY return value != 0
* if (rv == -EOPNOTSUPP) */
- drbd_bump_write_ordering(connection, WO_drain_io);
+ drbd_bump_write_ordering(connection->resource, NULL, WO_drain_io);
}
put_ldev(device);
kref_put(&device->kref, drbd_destroy_device);
@@ -1257,15 +1291,30 @@ static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connectio
return rv;
}
+static enum write_ordering_e
+max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
+{
+ struct disk_conf *dc;
+
+ dc = rcu_dereference(bdev->disk_conf);
+
+ if (wo == WO_bdev_flush && !dc->disk_flushes)
+ wo = WO_drain_io;
+ if (wo == WO_drain_io && !dc->disk_drain)
+ wo = WO_none;
+
+ return wo;
+}
+
/**
* drbd_bump_write_ordering() - Fall back to an other write ordering method
* @connection: DRBD connection.
* @wo: Write ordering method to try.
*/
-void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
+void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
+ enum write_ordering_e wo)
{
- struct disk_conf *dc;
- struct drbd_peer_device *peer_device;
+ struct drbd_device *device;
enum write_ordering_e pwo;
int vnr;
static char *write_ordering_str[] = {
@@ -1274,26 +1323,27 @@ void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ord
[WO_bdev_flush] = "flush",
};
- pwo = connection->write_ordering;
- wo = min(pwo, wo);
+ pwo = resource->write_ordering;
+ if (wo != WO_bdev_flush)
+ wo = min(pwo, wo);
rcu_read_lock();
- idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
- struct drbd_device *device = peer_device->device;
+ idr_for_each_entry(&resource->devices, device, vnr) {
+ if (get_ldev(device)) {
+ wo = max_allowed_wo(device->ldev, wo);
+ if (device->ldev == bdev)
+ bdev = NULL;
+ put_ldev(device);
+ }
+ }
- if (!get_ldev_if_state(device, D_ATTACHING))
- continue;
- dc = rcu_dereference(device->ldev->disk_conf);
+ if (bdev)
+ wo = max_allowed_wo(bdev, wo);
- if (wo == WO_bdev_flush && !dc->disk_flushes)
- wo = WO_drain_io;
- if (wo == WO_drain_io && !dc->disk_drain)
- wo = WO_none;
- put_ldev(device);
- }
rcu_read_unlock();
- connection->write_ordering = wo;
- if (pwo != connection->write_ordering || wo == WO_bdev_flush)
- drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
+
+ resource->write_ordering = wo;
+ if (pwo != resource->write_ordering || wo == WO_bdev_flush)
+ drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
}
/**
@@ -1330,6 +1380,13 @@ int drbd_submit_peer_request(struct drbd_device *device,
/* wait for all pending IO completions, before we start
* zeroing things out. */
conn_wait_active_ee_empty(first_peer_device(device)->connection);
+ /* add it to the active list now,
+ * so we can find it to present it in debugfs */
+ peer_req->submit_jif = jiffies;
+ peer_req->flags |= EE_SUBMITTED;
+ spin_lock_irq(&device->resource->req_lock);
+ list_add_tail(&peer_req->w.list, &device->active_ee);
+ spin_unlock_irq(&device->resource->req_lock);
if (blkdev_issue_zeroout(device->ldev->backing_bdev,
sector, ds >> 9, GFP_NOIO))
peer_req->flags |= EE_WAS_ERROR;
@@ -1398,6 +1455,9 @@ submit:
D_ASSERT(device, page == NULL);
atomic_set(&peer_req->pending_bios, n_bios);
+ /* for debugfs: update timestamp, mark as submitted */
+ peer_req->submit_jif = jiffies;
+ peer_req->flags |= EE_SUBMITTED;
do {
bio = bios;
bios = bios->bi_next;
@@ -1471,7 +1531,7 @@ static int receive_Barrier(struct drbd_connection *connection, struct packet_inf
* R_PRIMARY crashes now.
* Therefore we must send the barrier_ack after the barrier request was
* completed. */
- switch (connection->write_ordering) {
+ switch (connection->resource->write_ordering) {
case WO_none:
if (rv == FE_RECYCLED)
return 0;
@@ -1498,7 +1558,8 @@ static int receive_Barrier(struct drbd_connection *connection, struct packet_inf
return 0;
default:
- drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
+ drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
+ connection->resource->write_ordering);
return -EIO;
}
@@ -1531,7 +1592,7 @@ read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
struct drbd_peer_request *peer_req;
struct page *page;
int dgs, ds, err;
- int data_size = pi->size;
+ unsigned int data_size = pi->size;
void *dig_in = peer_device->connection->int_dig_in;
void *dig_vv = peer_device->connection->int_dig_vv;
unsigned long *data;
@@ -1578,6 +1639,7 @@ read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
if (!peer_req)
return NULL;
+ peer_req->flags |= EE_WRITE;
if (trim)
return peer_req;
@@ -1734,9 +1796,10 @@ static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t secto
* respective _drbd_clear_done_ee */
peer_req->w.cb = e_end_resync_block;
+ peer_req->submit_jif = jiffies;
spin_lock_irq(&device->resource->req_lock);
- list_add(&peer_req->w.list, &device->sync_ee);
+ list_add_tail(&peer_req->w.list, &device->sync_ee);
spin_unlock_irq(&device->resource->req_lock);
atomic_add(pi->size >> 9, &device->rs_sect_ev);
@@ -1889,6 +1952,7 @@ static int e_end_block(struct drbd_work *w, int cancel)
}
dec_unacked(device);
}
+
/* we delete from the conflict detection hash _after_ we sent out the
* P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
if (peer_req->flags & EE_IN_INTERVAL_TREE) {
@@ -2115,6 +2179,8 @@ static int handle_write_conflicts(struct drbd_device *device,
drbd_for_each_overlap(i, &device->write_requests, sector, size) {
if (i == &peer_req->i)
continue;
+ if (i->completed)
+ continue;
if (!i->local) {
/*
@@ -2147,7 +2213,6 @@ static int handle_write_conflicts(struct drbd_device *device,
(unsigned long long)sector, size,
superseded ? "local" : "remote");
- inc_unacked(device);
peer_req->w.cb = superseded ? e_send_superseded :
e_send_retry_write;
list_add_tail(&peer_req->w.list, &device->done_ee);
@@ -2206,6 +2271,7 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info *
{
struct drbd_peer_device *peer_device;
struct drbd_device *device;
+ struct net_conf *nc;
sector_t sector;
struct drbd_peer_request *peer_req;
struct p_data *p = pi->data;
@@ -2245,6 +2311,8 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info *
}
peer_req->w.cb = e_end_block;
+ peer_req->submit_jif = jiffies;
+ peer_req->flags |= EE_APPLICATION;
dp_flags = be32_to_cpu(p->dp_flags);
rw |= wire_flags_to_bio(dp_flags);
@@ -2271,9 +2339,36 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info *
spin_unlock(&connection->epoch_lock);
rcu_read_lock();
- tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
+ nc = rcu_dereference(peer_device->connection->net_conf);
+ tp = nc->two_primaries;
+ if (peer_device->connection->agreed_pro_version < 100) {
+ switch (nc->wire_protocol) {
+ case DRBD_PROT_C:
+ dp_flags |= DP_SEND_WRITE_ACK;
+ break;
+ case DRBD_PROT_B:
+ dp_flags |= DP_SEND_RECEIVE_ACK;
+ break;
+ }
+ }
rcu_read_unlock();
+
+ if (dp_flags & DP_SEND_WRITE_ACK) {
+ peer_req->flags |= EE_SEND_WRITE_ACK;
+ inc_unacked(device);
+ /* corresponding dec_unacked() in e_end_block()
+ * respective _drbd_clear_done_ee */
+ }
+
+ if (dp_flags & DP_SEND_RECEIVE_ACK) {
+ /* I really don't like it that the receiver thread
+ * sends on the msock, but anyways */
+ drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
+ }
+
if (tp) {
+ /* two primaries implies protocol C */
+ D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
peer_req->flags |= EE_IN_INTERVAL_TREE;
err = wait_for_and_update_peer_seq(peer_device, peer_seq);
if (err)
@@ -2297,44 +2392,18 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info *
* active_ee to become empty in drbd_submit_peer_request();
* better not add ourselves here. */
if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
- list_add(&peer_req->w.list, &device->active_ee);
+ list_add_tail(&peer_req->w.list, &device->active_ee);
spin_unlock_irq(&device->resource->req_lock);
if (device->state.conn == C_SYNC_TARGET)
wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
- if (peer_device->connection->agreed_pro_version < 100) {
- rcu_read_lock();
- switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) {
- case DRBD_PROT_C:
- dp_flags |= DP_SEND_WRITE_ACK;
- break;
- case DRBD_PROT_B:
- dp_flags |= DP_SEND_RECEIVE_ACK;
- break;
- }
- rcu_read_unlock();
- }
-
- if (dp_flags & DP_SEND_WRITE_ACK) {
- peer_req->flags |= EE_SEND_WRITE_ACK;
- inc_unacked(device);
- /* corresponding dec_unacked() in e_end_block()
- * respective _drbd_clear_done_ee */
- }
-
- if (dp_flags & DP_SEND_RECEIVE_ACK) {
- /* I really don't like it that the receiver thread
- * sends on the msock, but anyways */
- drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
- }
-
if (device->state.pdsk < D_INCONSISTENT) {
/* In case we have the only disk of the cluster, */
drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
- peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
- drbd_al_begin_io(device, &peer_req->i, true);
+ drbd_al_begin_io(device, &peer_req->i);
+ peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
}
err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
@@ -2347,8 +2416,10 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info *
list_del(&peer_req->w.list);
drbd_remove_epoch_entry_interval(device, peer_req);
spin_unlock_irq(&device->resource->req_lock);
- if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
+ if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
+ peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
drbd_al_complete_io(device, &peer_req->i);
+ }
out_interrupted:
drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
@@ -2368,13 +2439,14 @@ out_interrupted:
* The current sync rate used here uses only the most recent two step marks,
* to have a short time average so we can react faster.
*/
-bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
+bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
+ bool throttle_if_app_is_waiting)
{
struct lc_element *tmp;
- bool throttle = true;
+ bool throttle = drbd_rs_c_min_rate_throttle(device);
- if (!drbd_rs_c_min_rate_throttle(device))
- return false;
+ if (!throttle || throttle_if_app_is_waiting)
+ return throttle;
spin_lock_irq(&device->al_lock);
tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
@@ -2382,7 +2454,8 @@ bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
if (test_bit(BME_PRIORITY, &bm_ext->flags))
throttle = false;
- /* Do not slow down if app IO is already waiting for this extent */
+ /* Do not slow down if app IO is already waiting for this extent,
+ * and our progress is necessary for application IO to complete. */
}
spin_unlock_irq(&device->al_lock);
@@ -2407,7 +2480,9 @@ bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
(int)part_stat_read(&disk->part0, sectors[1]) -
atomic_read(&device->rs_sect_ev);
- if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
+
+ if (atomic_read(&device->ap_actlog_cnt)
+ || !device->rs_last_events || curr_events - device->rs_last_events > 64) {
unsigned long rs_left;
int i;
@@ -2508,6 +2583,7 @@ static int receive_DataRequest(struct drbd_connection *connection, struct packet
peer_req->w.cb = w_e_end_data_req;
fault_type = DRBD_FAULT_DT_RD;
/* application IO, don't drbd_rs_begin_io */
+ peer_req->flags |= EE_APPLICATION;
goto submit;
case P_RS_DATA_REQUEST:
@@ -2538,6 +2614,8 @@ static int receive_DataRequest(struct drbd_connection *connection, struct packet
peer_req->w.cb = w_e_end_csum_rs_req;
/* used in the sector offset progress display */
device->bm_resync_fo = BM_SECT_TO_BIT(sector);
+ /* remember to report stats in drbd_resync_finished */
+ device->use_csums = true;
} else if (pi->cmd == P_OV_REPLY) {
/* track progress, we may need to throttle */
atomic_add(size >> 9, &device->rs_sect_in);
@@ -2595,8 +2673,20 @@ static int receive_DataRequest(struct drbd_connection *connection, struct packet
* we would also throttle its application reads.
* In that case, throttling is done on the SyncTarget only.
*/
- if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
+
+ /* Even though this may be a resync request, we do add to "read_ee";
+ * "sync_ee" is only used for resync WRITEs.
+ * Add to list early, so debugfs can find this request
+ * even if we have to sleep below. */
+ spin_lock_irq(&device->resource->req_lock);
+ list_add_tail(&peer_req->w.list, &device->read_ee);
+ spin_unlock_irq(&device->resource->req_lock);
+
+ update_receiver_timing_details(connection, drbd_rs_should_slow_down);
+ if (device->state.peer != R_PRIMARY
+ && drbd_rs_should_slow_down(device, sector, false))
schedule_timeout_uninterruptible(HZ/10);
+ update_receiver_timing_details(connection, drbd_rs_begin_io);
if (drbd_rs_begin_io(device, sector))
goto out_free_e;
@@ -2604,22 +2694,20 @@ submit_for_resync:
atomic_add(size >> 9, &device->rs_sect_ev);
submit:
+ update_receiver_timing_details(connection, drbd_submit_peer_request);
inc_unacked(device);
- spin_lock_irq(&device->resource->req_lock);
- list_add_tail(&peer_req->w.list, &device->read_ee);
- spin_unlock_irq(&device->resource->req_lock);
-
if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
return 0;
/* don't care for the reason here */
drbd_err(device, "submit failed, triggering re-connect\n");
+
+out_free_e:
spin_lock_irq(&device->resource->req_lock);
list_del(&peer_req->w.list);
spin_unlock_irq(&device->resource->req_lock);
/* no drbd_rs_complete_io(), we are dropping the connection anyways */
-out_free_e:
put_ldev(device);
drbd_free_peer_req(device, peer_req);
return -EIO;
@@ -2842,8 +2930,10 @@ static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
-1091 requires proto 91
-1096 requires proto 96
*/
-static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
+static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
{
+ struct drbd_peer_device *const peer_device = first_peer_device(device);
+ struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
u64 self, peer;
int i, j;
@@ -2869,7 +2959,7 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho
if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
- if (first_peer_device(device)->connection->agreed_pro_version < 91)
+ if (connection->agreed_pro_version < 91)
return -1091;
if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
@@ -2892,7 +2982,7 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho
if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
- if (first_peer_device(device)->connection->agreed_pro_version < 91)
+ if (connection->agreed_pro_version < 91)
return -1091;
if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
@@ -2925,7 +3015,7 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho
case 1: /* self_pri && !peer_pri */ return 1;
case 2: /* !self_pri && peer_pri */ return -1;
case 3: /* self_pri && peer_pri */
- dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
+ dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
return dc ? -1 : 1;
}
}
@@ -2938,14 +3028,14 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho
*rule_nr = 51;
peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
if (self == peer) {
- if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
+ if (connection->agreed_pro_version < 96 ?
(device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
(device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
/* The last P_SYNC_UUID did not get though. Undo the last start of
resync as sync source modifications of the peer's UUIDs. */
- if (first_peer_device(device)->connection->agreed_pro_version < 91)
+ if (connection->agreed_pro_version < 91)
return -1091;
device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
@@ -2975,14 +3065,14 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho
*rule_nr = 71;
self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
if (self == peer) {
- if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
+ if (connection->agreed_pro_version < 96 ?
(device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
(device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
/* The last P_SYNC_UUID did not get though. Undo the last start of
resync as sync source modifications of our UUIDs. */
- if (first_peer_device(device)->connection->agreed_pro_version < 91)
+ if (connection->agreed_pro_version < 91)
return -1091;
__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
@@ -3352,8 +3442,7 @@ disconnect:
* return: NULL (alg name was "")
* ERR_PTR(error) if something goes wrong
* or the crypto hash ptr, if it worked out ok. */
-static
-struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
+static struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
const char *alg, const char *name)
{
struct crypto_hash *tfm;
@@ -3639,7 +3728,7 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info
struct drbd_device *device;
struct p_sizes *p = pi->data;
enum determine_dev_size dd = DS_UNCHANGED;
- sector_t p_size, p_usize, my_usize;
+ sector_t p_size, p_usize, p_csize, my_usize;
int ldsc = 0; /* local disk size changed */
enum dds_flags ddsf;
@@ -3650,6 +3739,7 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info
p_size = be64_to_cpu(p->d_size);
p_usize = be64_to_cpu(p->u_size);
+ p_csize = be64_to_cpu(p->c_size);
/* just store the peer's disk size for now.
* we still need to figure out whether we accept that. */
@@ -3710,7 +3800,6 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info
}
device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
- drbd_reconsider_max_bio_size(device);
/* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
In case we cleared the QUEUE_FLAG_DISCARD from our queue in
drbd_reconsider_max_bio_size(), we can be sure that after
@@ -3718,14 +3807,28 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info
ddsf = be16_to_cpu(p->dds_flags);
if (get_ldev(device)) {
+ drbd_reconsider_max_bio_size(device, device->ldev);
dd = drbd_determine_dev_size(device, ddsf, NULL);
put_ldev(device);
if (dd == DS_ERROR)
return -EIO;
drbd_md_sync(device);
} else {
- /* I am diskless, need to accept the peer's size. */
- drbd_set_my_capacity(device, p_size);
+ /*
+ * I am diskless, need to accept the peer's *current* size.
+ * I must NOT accept the peers backing disk size,
+ * it may have been larger than mine all along...
+ *
+ * At this point, the peer knows more about my disk, or at
+ * least about what we last agreed upon, than myself.
+ * So if his c_size is less than his d_size, the most likely
+ * reason is that *my* d_size was smaller last time we checked.
+ *
+ * However, if he sends a zero current size,
+ * take his (user-capped or) backing disk size anyways.
+ */
+ drbd_reconsider_max_bio_size(device, NULL);
+ drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
}
if (get_ldev(device)) {
@@ -4501,6 +4604,7 @@ static void drbdd(struct drbd_connection *connection)
struct data_cmd *cmd;
drbd_thread_current_set_cpu(&connection->receiver);
+ update_receiver_timing_details(connection, drbd_recv_header);
if (drbd_recv_header(connection, &pi))
goto err_out;
@@ -4519,12 +4623,14 @@ static void drbdd(struct drbd_connection *connection)
}
if (shs) {
+ update_receiver_timing_details(connection, drbd_recv_all_warn);
err = drbd_recv_all_warn(connection, pi.data, shs);
if (err)
goto err_out;
pi.size -= shs;
}
+ update_receiver_timing_details(connection, cmd->fn);
err = cmd->fn(connection, &pi);
if (err) {
drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c
index 09803d0d5207..c67717d572d1 100644
--- a/drivers/block/drbd/drbd_req.c
+++ b/drivers/block/drbd/drbd_req.c
@@ -52,7 +52,7 @@ static void _drbd_start_io_acct(struct drbd_device *device, struct drbd_request
static void _drbd_end_io_acct(struct drbd_device *device, struct drbd_request *req)
{
int rw = bio_data_dir(req->master_bio);
- unsigned long duration = jiffies - req->start_time;
+ unsigned long duration = jiffies - req->start_jif;
int cpu;
cpu = part_stat_lock();
part_stat_add(cpu, &device->vdisk->part0, ticks[rw], duration);
@@ -66,7 +66,7 @@ static struct drbd_request *drbd_req_new(struct drbd_device *device,
{
struct drbd_request *req;
- req = mempool_alloc(drbd_request_mempool, GFP_NOIO);
+ req = mempool_alloc(drbd_request_mempool, GFP_NOIO | __GFP_ZERO);
if (!req)
return NULL;
@@ -84,6 +84,8 @@ static struct drbd_request *drbd_req_new(struct drbd_device *device,
INIT_LIST_HEAD(&req->tl_requests);
INIT_LIST_HEAD(&req->w.list);
+ INIT_LIST_HEAD(&req->req_pending_master_completion);
+ INIT_LIST_HEAD(&req->req_pending_local);
/* one reference to be put by __drbd_make_request */
atomic_set(&req->completion_ref, 1);
@@ -92,6 +94,19 @@ static struct drbd_request *drbd_req_new(struct drbd_device *device,
return req;
}
+static void drbd_remove_request_interval(struct rb_root *root,
+ struct drbd_request *req)
+{
+ struct drbd_device *device = req->device;
+ struct drbd_interval *i = &req->i;
+
+ drbd_remove_interval(root, i);
+
+ /* Wake up any processes waiting for this request to complete. */
+ if (i->waiting)
+ wake_up(&device->misc_wait);
+}
+
void drbd_req_destroy(struct kref *kref)
{
struct drbd_request *req = container_of(kref, struct drbd_request, kref);
@@ -107,14 +122,30 @@ void drbd_req_destroy(struct kref *kref)
return;
}
- /* remove it from the transfer log.
- * well, only if it had been there in the first
- * place... if it had not (local only or conflicting
- * and never sent), it should still be "empty" as
- * initialized in drbd_req_new(), so we can list_del() it
- * here unconditionally */
+ /* If called from mod_rq_state (expected normal case) or
+ * drbd_send_and_submit (the less likely normal path), this holds the
+ * req_lock, and req->tl_requests will typicaly be on ->transfer_log,
+ * though it may be still empty (never added to the transfer log).
+ *
+ * If called from do_retry(), we do NOT hold the req_lock, but we are
+ * still allowed to unconditionally list_del(&req->tl_requests),
+ * because it will be on a local on-stack list only. */
list_del_init(&req->tl_requests);
+ /* finally remove the request from the conflict detection
+ * respective block_id verification interval tree. */
+ if (!drbd_interval_empty(&req->i)) {
+ struct rb_root *root;
+
+ if (s & RQ_WRITE)
+ root = &device->write_requests;
+ else
+ root = &device->read_requests;
+ drbd_remove_request_interval(root, req);
+ } else if (s & (RQ_NET_MASK & ~RQ_NET_DONE) && req->i.size != 0)
+ drbd_err(device, "drbd_req_destroy: Logic BUG: interval empty, but: rq_state=0x%x, sect=%llu, size=%u\n",
+ s, (unsigned long long)req->i.sector, req->i.size);
+
/* if it was a write, we may have to set the corresponding
* bit(s) out-of-sync first. If it had a local part, we need to
* release the reference to the activity log. */
@@ -188,19 +219,6 @@ void complete_master_bio(struct drbd_device *device,
}
-static void drbd_remove_request_interval(struct rb_root *root,
- struct drbd_request *req)
-{
- struct drbd_device *device = req->device;
- struct drbd_interval *i = &req->i;
-
- drbd_remove_interval(root, i);
-
- /* Wake up any processes waiting for this request to complete. */
- if (i->waiting)
- wake_up(&device->misc_wait);
-}
-
/* Helper for __req_mod().
* Set m->bio to the master bio, if it is fit to be completed,
* or leave it alone (it is initialized to NULL in __req_mod),
@@ -254,18 +272,6 @@ void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m)
ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
error = PTR_ERR(req->private_bio);
- /* remove the request from the conflict detection
- * respective block_id verification hash */
- if (!drbd_interval_empty(&req->i)) {
- struct rb_root *root;
-
- if (rw == WRITE)
- root = &device->write_requests;
- else
- root = &device->read_requests;
- drbd_remove_request_interval(root, req);
- }
-
/* Before we can signal completion to the upper layers,
* we may need to close the current transfer log epoch.
* We are within the request lock, so we can simply compare
@@ -301,9 +307,24 @@ void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m)
m->error = ok ? 0 : (error ?: -EIO);
m->bio = req->master_bio;
req->master_bio = NULL;
+ /* We leave it in the tree, to be able to verify later
+ * write-acks in protocol != C during resync.
+ * But we mark it as "complete", so it won't be counted as
+ * conflict in a multi-primary setup. */
+ req->i.completed = true;
}
+
+ if (req->i.waiting)
+ wake_up(&device->misc_wait);
+
+ /* Either we are about to complete to upper layers,
+ * or we will restart this request.
+ * In either case, the request object will be destroyed soon,
+ * so better remove it from all lists. */
+ list_del_init(&req->req_pending_master_completion);
}
+/* still holds resource->req_lock */
static int drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_error *m, int put)
{
struct drbd_device *device = req->device;
@@ -324,12 +345,91 @@ static int drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_
return 1;
}
+static void set_if_null_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
+{
+ struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
+ if (!connection)
+ return;
+ if (connection->req_next == NULL)
+ connection->req_next = req;
+}
+
+static void advance_conn_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
+{
+ struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
+ if (!connection)
+ return;
+ if (connection->req_next != req)
+ return;
+ list_for_each_entry_continue(req, &connection->transfer_log, tl_requests) {
+ const unsigned s = req->rq_state;
+ if (s & RQ_NET_QUEUED)
+ break;
+ }
+ if (&req->tl_requests == &connection->transfer_log)
+ req = NULL;
+ connection->req_next = req;
+}
+
+static void set_if_null_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
+{
+ struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
+ if (!connection)
+ return;
+ if (connection->req_ack_pending == NULL)
+ connection->req_ack_pending = req;
+}
+
+static void advance_conn_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
+{
+ struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
+ if (!connection)
+ return;
+ if (connection->req_ack_pending != req)
+ return;
+ list_for_each_entry_continue(req, &connection->transfer_log, tl_requests) {
+ const unsigned s = req->rq_state;
+ if ((s & RQ_NET_SENT) && (s & RQ_NET_PENDING))
+ break;
+ }
+ if (&req->tl_requests == &connection->transfer_log)
+ req = NULL;
+ connection->req_ack_pending = req;
+}
+
+static void set_if_null_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
+{
+ struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
+ if (!connection)
+ return;
+ if (connection->req_not_net_done == NULL)
+ connection->req_not_net_done = req;
+}
+
+static void advance_conn_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
+{
+ struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
+ if (!connection)
+ return;
+ if (connection->req_not_net_done != req)
+ return;
+ list_for_each_entry_continue(req, &connection->transfer_log, tl_requests) {
+ const unsigned s = req->rq_state;
+ if ((s & RQ_NET_SENT) && !(s & RQ_NET_DONE))
+ break;
+ }
+ if (&req->tl_requests == &connection->transfer_log)
+ req = NULL;
+ connection->req_not_net_done = req;
+}
+
/* I'd like this to be the only place that manipulates
* req->completion_ref and req->kref. */
static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
int clear, int set)
{
struct drbd_device *device = req->device;
+ struct drbd_peer_device *peer_device = first_peer_device(device);
unsigned s = req->rq_state;
int c_put = 0;
int k_put = 0;
@@ -356,14 +456,23 @@ static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
atomic_inc(&req->completion_ref);
}
- if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED))
+ if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED)) {
atomic_inc(&req->completion_ref);
+ set_if_null_req_next(peer_device, req);
+ }
if (!(s & RQ_EXP_BARR_ACK) && (set & RQ_EXP_BARR_ACK))
kref_get(&req->kref); /* wait for the DONE */
- if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT))
- atomic_add(req->i.size >> 9, &device->ap_in_flight);
+ if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT)) {
+ /* potentially already completed in the asender thread */
+ if (!(s & RQ_NET_DONE)) {
+ atomic_add(req->i.size >> 9, &device->ap_in_flight);
+ set_if_null_req_not_net_done(peer_device, req);
+ }
+ if (s & RQ_NET_PENDING)
+ set_if_null_req_ack_pending(peer_device, req);
+ }
if (!(s & RQ_COMPLETION_SUSP) && (set & RQ_COMPLETION_SUSP))
atomic_inc(&req->completion_ref);
@@ -386,20 +495,34 @@ static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
++k_put;
else
++c_put;
+ list_del_init(&req->req_pending_local);
}
if ((s & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) {
dec_ap_pending(device);
++c_put;
+ req->acked_jif = jiffies;
+ advance_conn_req_ack_pending(peer_device, req);
}
- if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED))
+ if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED)) {
++c_put;
+ advance_conn_req_next(peer_device, req);
+ }
- if ((s & RQ_EXP_BARR_ACK) && !(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) {
- if (req->rq_state & RQ_NET_SENT)
+ if (!(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) {
+ if (s & RQ_NET_SENT)
atomic_sub(req->i.size >> 9, &device->ap_in_flight);
- ++k_put;
+ if (s & RQ_EXP_BARR_ACK)
+ ++k_put;
+ req->net_done_jif = jiffies;
+
+ /* in ahead/behind mode, or just in case,
+ * before we finally destroy this request,
+ * the caching pointers must not reference it anymore */
+ advance_conn_req_next(peer_device, req);
+ advance_conn_req_ack_pending(peer_device, req);
+ advance_conn_req_not_net_done(peer_device, req);
}
/* potentially complete and destroy */
@@ -439,6 +562,19 @@ static void drbd_report_io_error(struct drbd_device *device, struct drbd_request
bdevname(device->ldev->backing_bdev, b));
}
+/* Helper for HANDED_OVER_TO_NETWORK.
+ * Is this a protocol A write (neither WRITE_ACK nor RECEIVE_ACK expected)?
+ * Is it also still "PENDING"?
+ * --> If so, clear PENDING and set NET_OK below.
+ * If it is a protocol A write, but not RQ_PENDING anymore, neg-ack was faster
+ * (and we must not set RQ_NET_OK) */
+static inline bool is_pending_write_protocol_A(struct drbd_request *req)
+{
+ return (req->rq_state &
+ (RQ_WRITE|RQ_NET_PENDING|RQ_EXP_WRITE_ACK|RQ_EXP_RECEIVE_ACK))
+ == (RQ_WRITE|RQ_NET_PENDING);
+}
+
/* obviously this could be coded as many single functions
* instead of one huge switch,
* or by putting the code directly in the respective locations
@@ -454,7 +590,9 @@ static void drbd_report_io_error(struct drbd_device *device, struct drbd_request
int __req_mod(struct drbd_request *req, enum drbd_req_event what,
struct bio_and_error *m)
{
- struct drbd_device *device = req->device;
+ struct drbd_device *const device = req->device;
+ struct drbd_peer_device *const peer_device = first_peer_device(device);
+ struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
struct net_conf *nc;
int p, rv = 0;
@@ -477,7 +615,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
* and from w_read_retry_remote */
D_ASSERT(device, !(req->rq_state & RQ_NET_MASK));
rcu_read_lock();
- nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
+ nc = rcu_dereference(connection->net_conf);
p = nc->wire_protocol;
rcu_read_unlock();
req->rq_state |=
@@ -549,7 +687,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
D_ASSERT(device, (req->rq_state & RQ_LOCAL_MASK) == 0);
mod_rq_state(req, m, 0, RQ_NET_QUEUED);
req->w.cb = w_send_read_req;
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+ drbd_queue_work(&connection->sender_work,
&req->w);
break;
@@ -585,23 +723,23 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
mod_rq_state(req, m, 0, RQ_NET_QUEUED|RQ_EXP_BARR_ACK);
req->w.cb = w_send_dblock;
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+ drbd_queue_work(&connection->sender_work,
&req->w);
/* close the epoch, in case it outgrew the limit */
rcu_read_lock();
- nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
+ nc = rcu_dereference(connection->net_conf);
p = nc->max_epoch_size;
rcu_read_unlock();
- if (first_peer_device(device)->connection->current_tle_writes >= p)
- start_new_tl_epoch(first_peer_device(device)->connection);
+ if (connection->current_tle_writes >= p)
+ start_new_tl_epoch(connection);
break;
case QUEUE_FOR_SEND_OOS:
mod_rq_state(req, m, 0, RQ_NET_QUEUED);
req->w.cb = w_send_out_of_sync;
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+ drbd_queue_work(&connection->sender_work,
&req->w);
break;
@@ -615,18 +753,16 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
case HANDED_OVER_TO_NETWORK:
/* assert something? */
- if (bio_data_dir(req->master_bio) == WRITE &&
- !(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK))) {
+ if (is_pending_write_protocol_A(req))
/* this is what is dangerous about protocol A:
* pretend it was successfully written on the peer. */
- if (req->rq_state & RQ_NET_PENDING)
- mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK);
- /* else: neg-ack was faster... */
- /* it is still not yet RQ_NET_DONE until the
- * corresponding epoch barrier got acked as well,
- * so we know what to dirty on connection loss */
- }
- mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT);
+ mod_rq_state(req, m, RQ_NET_QUEUED|RQ_NET_PENDING,
+ RQ_NET_SENT|RQ_NET_OK);
+ else
+ mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT);
+ /* It is still not yet RQ_NET_DONE until the
+ * corresponding epoch barrier got acked as well,
+ * so we know what to dirty on connection loss. */
break;
case OOS_HANDED_TO_NETWORK:
@@ -658,12 +794,13 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
case WRITE_ACKED_BY_PEER_AND_SIS:
req->rq_state |= RQ_NET_SIS;
case WRITE_ACKED_BY_PEER:
- D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK);
- /* protocol C; successfully written on peer.
+ /* Normal operation protocol C: successfully written on peer.
+ * During resync, even in protocol != C,
+ * we requested an explicit write ack anyways.
+ * Which means we cannot even assert anything here.
* Nothing more to do here.
* We want to keep the tl in place for all protocols, to cater
* for volatile write-back caches on lower level devices. */
-
goto ack_common;
case RECV_ACKED_BY_PEER:
D_ASSERT(device, req->rq_state & RQ_EXP_RECEIVE_ACK);
@@ -671,7 +808,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
* see also notes above in HANDED_OVER_TO_NETWORK about
* protocol != C */
ack_common:
- D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK);
break;
@@ -714,7 +850,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
get_ldev(device); /* always succeeds in this call path */
req->w.cb = w_restart_disk_io;
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+ drbd_queue_work(&connection->sender_work,
&req->w);
break;
@@ -736,7 +872,8 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
mod_rq_state(req, m, RQ_COMPLETION_SUSP, RQ_NET_QUEUED|RQ_NET_PENDING);
if (req->w.cb) {
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+ /* w.cb expected to be w_send_dblock, or w_send_read_req */
+ drbd_queue_work(&connection->sender_work,
&req->w);
rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
} /* else: FIXME can this happen? */
@@ -769,7 +906,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
break;
case QUEUE_AS_DRBD_BARRIER:
- start_new_tl_epoch(first_peer_device(device)->connection);
+ start_new_tl_epoch(connection);
mod_rq_state(req, m, 0, RQ_NET_OK|RQ_NET_DONE);
break;
};
@@ -886,6 +1023,9 @@ static void maybe_pull_ahead(struct drbd_device *device)
connection->agreed_pro_version < 96)
return;
+ if (on_congestion == OC_PULL_AHEAD && device->state.conn == C_AHEAD)
+ return; /* nothing to do ... */
+
/* If I don't even have good local storage, we can not reasonably try
* to pull ahead of the peer. We also need the local reference to make
* sure device->act_log is there.
@@ -1021,6 +1161,7 @@ drbd_submit_req_private_bio(struct drbd_request *req)
* stable storage, and this is a WRITE, we may not even submit
* this bio. */
if (get_ldev(device)) {
+ req->pre_submit_jif = jiffies;
if (drbd_insert_fault(device,
rw == WRITE ? DRBD_FAULT_DT_WR
: rw == READ ? DRBD_FAULT_DT_RD
@@ -1035,10 +1176,14 @@ drbd_submit_req_private_bio(struct drbd_request *req)
static void drbd_queue_write(struct drbd_device *device, struct drbd_request *req)
{
- spin_lock(&device->submit.lock);
+ spin_lock_irq(&device->resource->req_lock);
list_add_tail(&req->tl_requests, &device->submit.writes);
- spin_unlock(&device->submit.lock);
+ list_add_tail(&req->req_pending_master_completion,
+ &device->pending_master_completion[1 /* WRITE */]);
+ spin_unlock_irq(&device->resource->req_lock);
queue_work(device->submit.wq, &device->submit.worker);
+ /* do_submit() may sleep internally on al_wait, too */
+ wake_up(&device->al_wait);
}
/* returns the new drbd_request pointer, if the caller is expected to
@@ -1047,7 +1192,7 @@ static void drbd_queue_write(struct drbd_device *device, struct drbd_request *re
* Returns ERR_PTR(-ENOMEM) if we cannot allocate a drbd_request.
*/
static struct drbd_request *
-drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long start_time)
+drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long start_jif)
{
const int rw = bio_data_dir(bio);
struct drbd_request *req;
@@ -1062,7 +1207,7 @@ drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long
bio_endio(bio, -ENOMEM);
return ERR_PTR(-ENOMEM);
}
- req->start_time = start_time;
+ req->start_jif = start_jif;
if (!get_ldev(device)) {
bio_put(req->private_bio);
@@ -1075,10 +1220,12 @@ drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long
if (rw == WRITE && req->private_bio && req->i.size
&& !test_bit(AL_SUSPENDED, &device->flags)) {
if (!drbd_al_begin_io_fastpath(device, &req->i)) {
+ atomic_inc(&device->ap_actlog_cnt);
drbd_queue_write(device, req);
return NULL;
}
req->rq_state |= RQ_IN_ACT_LOG;
+ req->in_actlog_jif = jiffies;
}
return req;
@@ -1086,11 +1233,13 @@ drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long
static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request *req)
{
+ struct drbd_resource *resource = device->resource;
const int rw = bio_rw(req->master_bio);
struct bio_and_error m = { NULL, };
bool no_remote = false;
+ bool submit_private_bio = false;
- spin_lock_irq(&device->resource->req_lock);
+ spin_lock_irq(&resource->req_lock);
if (rw == WRITE) {
/* This may temporarily give up the req_lock,
* but will re-aquire it before it returns here.
@@ -1148,13 +1297,18 @@ static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request
no_remote = true;
}
+ /* If it took the fast path in drbd_request_prepare, add it here.
+ * The slow path has added it already. */
+ if (list_empty(&req->req_pending_master_completion))
+ list_add_tail(&req->req_pending_master_completion,
+ &device->pending_master_completion[rw == WRITE]);
if (req->private_bio) {
/* needs to be marked within the same spinlock */
+ list_add_tail(&req->req_pending_local,
+ &device->pending_completion[rw == WRITE]);
_req_mod(req, TO_BE_SUBMITTED);
/* but we need to give up the spinlock to submit */
- spin_unlock_irq(&device->resource->req_lock);
- drbd_submit_req_private_bio(req);
- spin_lock_irq(&device->resource->req_lock);
+ submit_private_bio = true;
} else if (no_remote) {
nodata:
if (__ratelimit(&drbd_ratelimit_state))
@@ -1167,15 +1321,23 @@ nodata:
out:
if (drbd_req_put_completion_ref(req, &m, 1))
kref_put(&req->kref, drbd_req_destroy);
- spin_unlock_irq(&device->resource->req_lock);
-
+ spin_unlock_irq(&resource->req_lock);
+
+ /* Even though above is a kref_put(), this is safe.
+ * As long as we still need to submit our private bio,
+ * we hold a completion ref, and the request cannot disappear.
+ * If however this request did not even have a private bio to submit
+ * (e.g. remote read), req may already be invalid now.
+ * That's why we cannot check on req->private_bio. */
+ if (submit_private_bio)
+ drbd_submit_req_private_bio(req);
if (m.bio)
complete_master_bio(device, &m);
}
-void __drbd_make_request(struct drbd_device *device, struct bio *bio, unsigned long start_time)
+void __drbd_make_request(struct drbd_device *device, struct bio *bio, unsigned long start_jif)
{
- struct drbd_request *req = drbd_request_prepare(device, bio, start_time);
+ struct drbd_request *req = drbd_request_prepare(device, bio, start_jif);
if (IS_ERR_OR_NULL(req))
return;
drbd_send_and_submit(device, req);
@@ -1194,6 +1356,8 @@ static void submit_fast_path(struct drbd_device *device, struct list_head *incom
continue;
req->rq_state |= RQ_IN_ACT_LOG;
+ req->in_actlog_jif = jiffies;
+ atomic_dec(&device->ap_actlog_cnt);
}
list_del_init(&req->tl_requests);
@@ -1203,7 +1367,8 @@ static void submit_fast_path(struct drbd_device *device, struct list_head *incom
static bool prepare_al_transaction_nonblock(struct drbd_device *device,
struct list_head *incoming,
- struct list_head *pending)
+ struct list_head *pending,
+ struct list_head *later)
{
struct drbd_request *req, *tmp;
int wake = 0;
@@ -1212,45 +1377,105 @@ static bool prepare_al_transaction_nonblock(struct drbd_device *device,
spin_lock_irq(&device->al_lock);
list_for_each_entry_safe(req, tmp, incoming, tl_requests) {
err = drbd_al_begin_io_nonblock(device, &req->i);
+ if (err == -ENOBUFS)
+ break;
if (err == -EBUSY)
wake = 1;
if (err)
- continue;
- req->rq_state |= RQ_IN_ACT_LOG;
- list_move_tail(&req->tl_requests, pending);
+ list_move_tail(&req->tl_requests, later);
+ else
+ list_move_tail(&req->tl_requests, pending);
}
spin_unlock_irq(&device->al_lock);
if (wake)
wake_up(&device->al_wait);
-
return !list_empty(pending);
}
+void send_and_submit_pending(struct drbd_device *device, struct list_head *pending)
+{
+ struct drbd_request *req, *tmp;
+
+ list_for_each_entry_safe(req, tmp, pending, tl_requests) {
+ req->rq_state |= RQ_IN_ACT_LOG;
+ req->in_actlog_jif = jiffies;
+ atomic_dec(&device->ap_actlog_cnt);
+ list_del_init(&req->tl_requests);
+ drbd_send_and_submit(device, req);
+ }
+}
+
void do_submit(struct work_struct *ws)
{
struct drbd_device *device = container_of(ws, struct drbd_device, submit.worker);
- LIST_HEAD(incoming);
- LIST_HEAD(pending);
- struct drbd_request *req, *tmp;
+ LIST_HEAD(incoming); /* from drbd_make_request() */
+ LIST_HEAD(pending); /* to be submitted after next AL-transaction commit */
+ LIST_HEAD(busy); /* blocked by resync requests */
+
+ /* grab new incoming requests */
+ spin_lock_irq(&device->resource->req_lock);
+ list_splice_tail_init(&device->submit.writes, &incoming);
+ spin_unlock_irq(&device->resource->req_lock);
for (;;) {
- spin_lock(&device->submit.lock);
- list_splice_tail_init(&device->submit.writes, &incoming);
- spin_unlock(&device->submit.lock);
+ DEFINE_WAIT(wait);
+ /* move used-to-be-busy back to front of incoming */
+ list_splice_init(&busy, &incoming);
submit_fast_path(device, &incoming);
if (list_empty(&incoming))
break;
-skip_fast_path:
- wait_event(device->al_wait, prepare_al_transaction_nonblock(device, &incoming, &pending));
- /* Maybe more was queued, while we prepared the transaction?
- * Try to stuff them into this transaction as well.
- * Be strictly non-blocking here, no wait_event, we already
- * have something to commit.
- * Stop if we don't make any more progres.
- */
for (;;) {
+ prepare_to_wait(&device->al_wait, &wait, TASK_UNINTERRUPTIBLE);
+
+ list_splice_init(&busy, &incoming);
+ prepare_al_transaction_nonblock(device, &incoming, &pending, &busy);
+ if (!list_empty(&pending))
+ break;
+
+ schedule();
+
+ /* If all currently "hot" activity log extents are kept busy by
+ * incoming requests, we still must not totally starve new
+ * requests to "cold" extents.
+ * Something left on &incoming means there had not been
+ * enough update slots available, and the activity log
+ * has been marked as "starving".
+ *
+ * Try again now, without looking for new requests,
+ * effectively blocking all new requests until we made
+ * at least _some_ progress with what we currently have.
+ */
+ if (!list_empty(&incoming))
+ continue;
+
+ /* Nothing moved to pending, but nothing left
+ * on incoming: all moved to busy!
+ * Grab new and iterate. */
+ spin_lock_irq(&device->resource->req_lock);
+ list_splice_tail_init(&device->submit.writes, &incoming);
+ spin_unlock_irq(&device->resource->req_lock);
+ }
+ finish_wait(&device->al_wait, &wait);
+
+ /* If the transaction was full, before all incoming requests
+ * had been processed, skip ahead to commit, and iterate
+ * without splicing in more incoming requests from upper layers.
+ *
+ * Else, if all incoming have been processed,
+ * they have become either "pending" (to be submitted after
+ * next transaction commit) or "busy" (blocked by resync).
+ *
+ * Maybe more was queued, while we prepared the transaction?
+ * Try to stuff those into this transaction as well.
+ * Be strictly non-blocking here,
+ * we already have something to commit.
+ *
+ * Commit if we don't make any more progres.
+ */
+
+ while (list_empty(&incoming)) {
LIST_HEAD(more_pending);
LIST_HEAD(more_incoming);
bool made_progress;
@@ -1260,55 +1485,32 @@ skip_fast_path:
if (list_empty(&device->submit.writes))
break;
- spin_lock(&device->submit.lock);
+ spin_lock_irq(&device->resource->req_lock);
list_splice_tail_init(&device->submit.writes, &more_incoming);
- spin_unlock(&device->submit.lock);
+ spin_unlock_irq(&device->resource->req_lock);
if (list_empty(&more_incoming))
break;
- made_progress = prepare_al_transaction_nonblock(device, &more_incoming, &more_pending);
+ made_progress = prepare_al_transaction_nonblock(device, &more_incoming, &more_pending, &busy);
list_splice_tail_init(&more_pending, &pending);
list_splice_tail_init(&more_incoming, &incoming);
-
if (!made_progress)
break;
}
- drbd_al_begin_io_commit(device, false);
-
- list_for_each_entry_safe(req, tmp, &pending, tl_requests) {
- list_del_init(&req->tl_requests);
- drbd_send_and_submit(device, req);
- }
- /* If all currently hot activity log extents are kept busy by
- * incoming requests, we still must not totally starve new
- * requests to cold extents. In that case, prepare one request
- * in blocking mode. */
- list_for_each_entry_safe(req, tmp, &incoming, tl_requests) {
- list_del_init(&req->tl_requests);
- req->rq_state |= RQ_IN_ACT_LOG;
- if (!drbd_al_begin_io_prepare(device, &req->i)) {
- /* Corresponding extent was hot after all? */
- drbd_send_and_submit(device, req);
- } else {
- /* Found a request to a cold extent.
- * Put on "pending" list,
- * and try to cumulate with more. */
- list_add(&req->tl_requests, &pending);
- goto skip_fast_path;
- }
- }
+ drbd_al_begin_io_commit(device);
+ send_and_submit_pending(device, &pending);
}
}
void drbd_make_request(struct request_queue *q, struct bio *bio)
{
struct drbd_device *device = (struct drbd_device *) q->queuedata;
- unsigned long start_time;
+ unsigned long start_jif;
- start_time = jiffies;
+ start_jif = jiffies;
/*
* what we "blindly" assume:
@@ -1316,7 +1518,7 @@ void drbd_make_request(struct request_queue *q, struct bio *bio)
D_ASSERT(device, IS_ALIGNED(bio->bi_iter.bi_size, 512));
inc_ap_bio(device);
- __drbd_make_request(device, bio, start_time);
+ __drbd_make_request(device, bio, start_jif);
}
/* This is called by bio_add_page().
@@ -1353,36 +1555,13 @@ int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct
return limit;
}
-static void find_oldest_requests(
- struct drbd_connection *connection,
- struct drbd_device *device,
- struct drbd_request **oldest_req_waiting_for_peer,
- struct drbd_request **oldest_req_waiting_for_disk)
-{
- struct drbd_request *r;
- *oldest_req_waiting_for_peer = NULL;
- *oldest_req_waiting_for_disk = NULL;
- list_for_each_entry(r, &connection->transfer_log, tl_requests) {
- const unsigned s = r->rq_state;
- if (!*oldest_req_waiting_for_peer
- && ((s & RQ_NET_MASK) && !(s & RQ_NET_DONE)))
- *oldest_req_waiting_for_peer = r;
-
- if (!*oldest_req_waiting_for_disk
- && (s & RQ_LOCAL_PENDING) && r->device == device)
- *oldest_req_waiting_for_disk = r;
-
- if (*oldest_req_waiting_for_peer && *oldest_req_waiting_for_disk)
- break;
- }
-}
-
void request_timer_fn(unsigned long data)
{
struct drbd_device *device = (struct drbd_device *) data;
struct drbd_connection *connection = first_peer_device(device)->connection;
- struct drbd_request *req_disk, *req_peer; /* oldest request */
+ struct drbd_request *req_read, *req_write, *req_peer; /* oldest request */
struct net_conf *nc;
+ unsigned long oldest_submit_jif;
unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
unsigned long now;
@@ -1403,14 +1582,31 @@ void request_timer_fn(unsigned long data)
return; /* Recurring timer stopped */
now = jiffies;
+ nt = now + et;
spin_lock_irq(&device->resource->req_lock);
- find_oldest_requests(connection, device, &req_peer, &req_disk);
- if (req_peer == NULL && req_disk == NULL) {
- spin_unlock_irq(&device->resource->req_lock);
- mod_timer(&device->request_timer, now + et);
- return;
- }
+ req_read = list_first_entry_or_null(&device->pending_completion[0], struct drbd_request, req_pending_local);
+ req_write = list_first_entry_or_null(&device->pending_completion[1], struct drbd_request, req_pending_local);
+ req_peer = connection->req_not_net_done;
+ /* maybe the oldest request waiting for the peer is in fact still
+ * blocking in tcp sendmsg */
+ if (!req_peer && connection->req_next && connection->req_next->pre_send_jif)
+ req_peer = connection->req_next;
+
+ /* evaluate the oldest peer request only in one timer! */
+ if (req_peer && req_peer->device != device)
+ req_peer = NULL;
+
+ /* do we have something to evaluate? */
+ if (req_peer == NULL && req_write == NULL && req_read == NULL)
+ goto out;
+
+ oldest_submit_jif =
+ (req_write && req_read)
+ ? ( time_before(req_write->pre_submit_jif, req_read->pre_submit_jif)
+ ? req_write->pre_submit_jif : req_read->pre_submit_jif )
+ : req_write ? req_write->pre_submit_jif
+ : req_read ? req_read->pre_submit_jif : now;
/* The request is considered timed out, if
* - we have some effective timeout from the configuration,
@@ -1429,13 +1625,13 @@ void request_timer_fn(unsigned long data)
* to expire twice (worst case) to become effective. Good enough.
*/
if (ent && req_peer &&
- time_after(now, req_peer->start_time + ent) &&
+ time_after(now, req_peer->pre_send_jif + ent) &&
!time_in_range(now, connection->last_reconnect_jif, connection->last_reconnect_jif + ent)) {
drbd_warn(device, "Remote failed to finish a request within ko-count * timeout\n");
_drbd_set_state(_NS(device, conn, C_TIMEOUT), CS_VERBOSE | CS_HARD, NULL);
}
- if (dt && req_disk &&
- time_after(now, req_disk->start_time + dt) &&
+ if (dt && oldest_submit_jif != now &&
+ time_after(now, oldest_submit_jif + dt) &&
!time_in_range(now, device->last_reattach_jif, device->last_reattach_jif + dt)) {
drbd_warn(device, "Local backing device failed to meet the disk-timeout\n");
__drbd_chk_io_error(device, DRBD_FORCE_DETACH);
@@ -1443,11 +1639,12 @@ void request_timer_fn(unsigned long data)
/* Reschedule timer for the nearest not already expired timeout.
* Fallback to now + min(effective network timeout, disk timeout). */
- ent = (ent && req_peer && time_before(now, req_peer->start_time + ent))
- ? req_peer->start_time + ent : now + et;
- dt = (dt && req_disk && time_before(now, req_disk->start_time + dt))
- ? req_disk->start_time + dt : now + et;
+ ent = (ent && req_peer && time_before(now, req_peer->pre_send_jif + ent))
+ ? req_peer->pre_send_jif + ent : now + et;
+ dt = (dt && oldest_submit_jif != now && time_before(now, oldest_submit_jif + dt))
+ ? oldest_submit_jif + dt : now + et;
nt = time_before(ent, dt) ? ent : dt;
+out:
spin_unlock_irq(&connection->resource->req_lock);
mod_timer(&device->request_timer, nt);
}
diff --git a/drivers/block/drbd/drbd_req.h b/drivers/block/drbd/drbd_req.h
index 8566cd5866b4..9f6a04080e9f 100644
--- a/drivers/block/drbd/drbd_req.h
+++ b/drivers/block/drbd/drbd_req.h
@@ -288,6 +288,7 @@ extern void complete_master_bio(struct drbd_device *device,
extern void request_timer_fn(unsigned long data);
extern void tl_restart(struct drbd_connection *connection, enum drbd_req_event what);
extern void _tl_restart(struct drbd_connection *connection, enum drbd_req_event what);
+extern void tl_abort_disk_io(struct drbd_device *device);
/* this is in drbd_main.c */
extern void drbd_restart_request(struct drbd_request *req);
diff --git a/drivers/block/drbd/drbd_state.c b/drivers/block/drbd/drbd_state.c
index a5d8aae00e04..c35c0f001bb7 100644
--- a/drivers/block/drbd/drbd_state.c
+++ b/drivers/block/drbd/drbd_state.c
@@ -410,7 +410,7 @@ _drbd_request_state(struct drbd_device *device, union drbd_state mask,
return rv;
}
-static void print_st(struct drbd_device *device, char *name, union drbd_state ns)
+static void print_st(struct drbd_device *device, const char *name, union drbd_state ns)
{
drbd_err(device, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c%c%c }\n",
name,
@@ -952,11 +952,12 @@ enum drbd_state_rv
__drbd_set_state(struct drbd_device *device, union drbd_state ns,
enum chg_state_flags flags, struct completion *done)
{
+ struct drbd_peer_device *peer_device = first_peer_device(device);
+ struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
union drbd_state os;
enum drbd_state_rv rv = SS_SUCCESS;
enum sanitize_state_warnings ssw;
struct after_state_chg_work *ascw;
- bool did_remote, should_do_remote;
os = drbd_read_state(device);
@@ -978,9 +979,9 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
this happen...*/
if (is_valid_state(device, os) == rv)
- rv = is_valid_soft_transition(os, ns, first_peer_device(device)->connection);
+ rv = is_valid_soft_transition(os, ns, connection);
} else
- rv = is_valid_soft_transition(os, ns, first_peer_device(device)->connection);
+ rv = is_valid_soft_transition(os, ns, connection);
}
if (rv < SS_SUCCESS) {
@@ -997,7 +998,7 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
sanitize_state(). Only display it here if we where not called from
_conn_request_state() */
if (!(flags & CS_DC_SUSP))
- conn_pr_state_change(first_peer_device(device)->connection, os, ns,
+ conn_pr_state_change(connection, os, ns,
(flags & ~CS_DC_MASK) | CS_DC_SUSP);
/* if we are going -> D_FAILED or D_DISKLESS, grab one extra reference
@@ -1008,28 +1009,35 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
(os.disk != D_DISKLESS && ns.disk == D_DISKLESS))
atomic_inc(&device->local_cnt);
- did_remote = drbd_should_do_remote(device->state);
+ if (!is_sync_state(os.conn) && is_sync_state(ns.conn))
+ clear_bit(RS_DONE, &device->flags);
+
+ /* changes to local_cnt and device flags should be visible before
+ * changes to state, which again should be visible before anything else
+ * depending on that change happens. */
+ smp_wmb();
device->state.i = ns.i;
- should_do_remote = drbd_should_do_remote(device->state);
device->resource->susp = ns.susp;
device->resource->susp_nod = ns.susp_nod;
device->resource->susp_fen = ns.susp_fen;
+ smp_wmb();
/* put replicated vs not-replicated requests in seperate epochs */
- if (did_remote != should_do_remote)
- start_new_tl_epoch(first_peer_device(device)->connection);
+ if (drbd_should_do_remote((union drbd_dev_state)os.i) !=
+ drbd_should_do_remote((union drbd_dev_state)ns.i))
+ start_new_tl_epoch(connection);
if (os.disk == D_ATTACHING && ns.disk >= D_NEGOTIATING)
drbd_print_uuids(device, "attached to UUIDs");
/* Wake up role changes, that were delayed because of connection establishing */
if (os.conn == C_WF_REPORT_PARAMS && ns.conn != C_WF_REPORT_PARAMS &&
- no_peer_wf_report_params(first_peer_device(device)->connection))
- clear_bit(STATE_SENT, &first_peer_device(device)->connection->flags);
+ no_peer_wf_report_params(connection))
+ clear_bit(STATE_SENT, &connection->flags);
wake_up(&device->misc_wait);
wake_up(&device->state_wait);
- wake_up(&first_peer_device(device)->connection->ping_wait);
+ wake_up(&connection->ping_wait);
/* Aborted verify run, or we reached the stop sector.
* Log the last position, unless end-of-device. */
@@ -1118,21 +1126,21 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
/* Receiver should clean up itself */
if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
- drbd_thread_stop_nowait(&first_peer_device(device)->connection->receiver);
+ drbd_thread_stop_nowait(&connection->receiver);
/* Now the receiver finished cleaning up itself, it should die */
if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
- drbd_thread_stop_nowait(&first_peer_device(device)->connection->receiver);
+ drbd_thread_stop_nowait(&connection->receiver);
/* Upon network failure, we need to restart the receiver. */
if (os.conn > C_WF_CONNECTION &&
ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
- drbd_thread_restart_nowait(&first_peer_device(device)->connection->receiver);
+ drbd_thread_restart_nowait(&connection->receiver);
/* Resume AL writing if we get a connection */
if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
drbd_resume_al(device);
- first_peer_device(device)->connection->connect_cnt++;
+ connection->connect_cnt++;
}
/* remember last attach time so request_timer_fn() won't
@@ -1150,7 +1158,7 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
ascw->w.cb = w_after_state_ch;
ascw->device = device;
ascw->done = done;
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+ drbd_queue_work(&connection->sender_work,
&ascw->w);
} else {
drbd_err(device, "Could not kmalloc an ascw\n");
@@ -1222,13 +1230,16 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
union drbd_state ns, enum chg_state_flags flags)
{
struct drbd_resource *resource = device->resource;
+ struct drbd_peer_device *peer_device = first_peer_device(device);
+ struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
struct sib_info sib;
sib.sib_reason = SIB_STATE_CHANGE;
sib.os = os;
sib.ns = ns;
- if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
+ if ((os.disk != D_UP_TO_DATE || os.pdsk != D_UP_TO_DATE)
+ && (ns.disk == D_UP_TO_DATE && ns.pdsk == D_UP_TO_DATE)) {
clear_bit(CRASHED_PRIMARY, &device->flags);
if (device->p_uuid)
device->p_uuid[UI_FLAGS] &= ~((u64)2);
@@ -1245,7 +1256,6 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
state change. This function might sleep */
if (ns.susp_nod) {
- struct drbd_connection *connection = first_peer_device(device)->connection;
enum drbd_req_event what = NOTHING;
spin_lock_irq(&device->resource->req_lock);
@@ -1267,8 +1277,6 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
}
if (ns.susp_fen) {
- struct drbd_connection *connection = first_peer_device(device)->connection;
-
spin_lock_irq(&device->resource->req_lock);
if (resource->susp_fen && conn_lowest_conn(connection) >= C_CONNECTED) {
/* case2: The connection was established again: */
@@ -1294,8 +1302,8 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
* which is unexpected. */
if ((os.conn != C_SYNC_SOURCE && os.conn != C_PAUSED_SYNC_S) &&
(ns.conn == C_SYNC_SOURCE || ns.conn == C_PAUSED_SYNC_S) &&
- first_peer_device(device)->connection->agreed_pro_version >= 96 && get_ldev(device)) {
- drbd_gen_and_send_sync_uuid(first_peer_device(device));
+ connection->agreed_pro_version >= 96 && get_ldev(device)) {
+ drbd_gen_and_send_sync_uuid(peer_device);
put_ldev(device);
}
@@ -1309,8 +1317,8 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
atomic_set(&device->rs_pending_cnt, 0);
drbd_rs_cancel_all(device);
- drbd_send_uuids(first_peer_device(device));
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_uuids(peer_device);
+ drbd_send_state(peer_device, ns);
}
/* No point in queuing send_bitmap if we don't have a connection
* anymore, so check also the _current_ state, not only the new state
@@ -1335,7 +1343,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
set_bit(NEW_CUR_UUID, &device->flags);
} else {
drbd_uuid_new_current(device);
- drbd_send_uuids(first_peer_device(device));
+ drbd_send_uuids(peer_device);
}
}
put_ldev(device);
@@ -1346,7 +1354,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
if (os.peer == R_SECONDARY && ns.peer == R_PRIMARY &&
device->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
drbd_uuid_new_current(device);
- drbd_send_uuids(first_peer_device(device));
+ drbd_send_uuids(peer_device);
}
/* D_DISKLESS Peer becomes secondary */
if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
@@ -1373,16 +1381,16 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
/* Last part of the attaching process ... */
if (ns.conn >= C_CONNECTED &&
os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
- drbd_send_sizes(first_peer_device(device), 0, 0); /* to start sync... */
- drbd_send_uuids(first_peer_device(device));
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_sizes(peer_device, 0, 0); /* to start sync... */
+ drbd_send_uuids(peer_device);
+ drbd_send_state(peer_device, ns);
}
/* We want to pause/continue resync, tell peer. */
if (ns.conn >= C_CONNECTED &&
((os.aftr_isp != ns.aftr_isp) ||
(os.user_isp != ns.user_isp)))
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);
/* In case one of the isp bits got set, suspend other devices. */
if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
@@ -1392,10 +1400,10 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
/* Make sure the peer gets informed about eventual state
changes (ISP bits) while we were in WFReportParams. */
if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);
if (os.conn != C_AHEAD && ns.conn == C_AHEAD)
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);
/* We are in the progress to start a full sync... */
if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
@@ -1449,7 +1457,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
drbd_disk_str(device->state.disk));
if (ns.conn >= C_CONNECTED)
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);
drbd_rs_cancel_all(device);
@@ -1473,7 +1481,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
drbd_disk_str(device->state.disk));
if (ns.conn >= C_CONNECTED)
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);
/* corresponding get_ldev in __drbd_set_state
* this may finally trigger drbd_ldev_destroy. */
put_ldev(device);
@@ -1481,7 +1489,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
/* Notify peer that I had a local IO error, and did not detached.. */
if (os.disk == D_UP_TO_DATE && ns.disk == D_INCONSISTENT && ns.conn >= C_CONNECTED)
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);
/* Disks got bigger while they were detached */
if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
@@ -1499,14 +1507,14 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
/* sync target done with resync. Explicitly notify peer, even though
* it should (at least for non-empty resyncs) already know itself. */
if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);
/* Verify finished, or reached stop sector. Peer did not know about
* the stop sector, and we may even have changed the stop sector during
* verify to interrupt/stop early. Send the new state. */
if (os.conn == C_VERIFY_S && ns.conn == C_CONNECTED
&& verify_can_do_stop_sector(device))
- drbd_send_state(first_peer_device(device), ns);
+ drbd_send_state(peer_device, ns);
/* This triggers bitmap writeout of potentially still unwritten pages
* if the resync finished cleanly, or aborted because of peer disk
@@ -1563,7 +1571,7 @@ static int w_after_conn_state_ch(struct drbd_work *w, int unused)
old_conf = connection->net_conf;
connection->my_addr_len = 0;
connection->peer_addr_len = 0;
- rcu_assign_pointer(connection->net_conf, NULL);
+ RCU_INIT_POINTER(connection->net_conf, NULL);
conn_free_crypto(connection);
mutex_unlock(&connection->resource->conf_update);
@@ -1599,7 +1607,7 @@ static int w_after_conn_state_ch(struct drbd_work *w, int unused)
return 0;
}
-void conn_old_common_state(struct drbd_connection *connection, union drbd_state *pcs, enum chg_state_flags *pf)
+static void conn_old_common_state(struct drbd_connection *connection, union drbd_state *pcs, enum chg_state_flags *pf)
{
enum chg_state_flags flags = ~0;
struct drbd_peer_device *peer_device;
@@ -1688,7 +1696,7 @@ conn_is_valid_transition(struct drbd_connection *connection, union drbd_state ma
return rv;
}
-void
+static void
conn_set_state(struct drbd_connection *connection, union drbd_state mask, union drbd_state val,
union drbd_state *pns_min, union drbd_state *pns_max, enum chg_state_flags flags)
{
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index d8f57b6305cd..50776b362828 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -67,13 +67,10 @@ rwlock_t global_state_lock;
*/
void drbd_md_io_complete(struct bio *bio, int error)
{
- struct drbd_md_io *md_io;
struct drbd_device *device;
- md_io = (struct drbd_md_io *)bio->bi_private;
- device = container_of(md_io, struct drbd_device, md_io);
-
- md_io->error = error;
+ device = bio->bi_private;
+ device->md_io.error = error;
/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
* to timeout on the lower level device, and eventually detach from it.
@@ -87,7 +84,7 @@ void drbd_md_io_complete(struct bio *bio, int error)
* ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
*/
drbd_md_put_buffer(device);
- md_io->done = 1;
+ device->md_io.done = 1;
wake_up(&device->misc_wait);
bio_put(bio);
if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
@@ -135,6 +132,7 @@ void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(l
i = peer_req->i;
do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
block_id = peer_req->block_id;
+ peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
spin_lock_irqsave(&device->resource->req_lock, flags);
device->writ_cnt += peer_req->i.size >> 9;
@@ -398,9 +396,6 @@ static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector,
if (!get_ldev(device))
return -EIO;
- if (drbd_rs_should_slow_down(device, sector))
- goto defer;
-
/* GFP_TRY, because if there is no memory available right now, this may
* be rescheduled for later. It is "only" background resync, after all. */
peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
@@ -410,7 +405,7 @@ static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector,
peer_req->w.cb = w_e_send_csum;
spin_lock_irq(&device->resource->req_lock);
- list_add(&peer_req->w.list, &device->read_ee);
+ list_add_tail(&peer_req->w.list, &device->read_ee);
spin_unlock_irq(&device->resource->req_lock);
atomic_add(size >> 9, &device->rs_sect_ev);
@@ -452,9 +447,9 @@ void resync_timer_fn(unsigned long data)
{
struct drbd_device *device = (struct drbd_device *) data;
- if (list_empty(&device->resync_work.list))
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
- &device->resync_work);
+ drbd_queue_work_if_unqueued(
+ &first_peer_device(device)->connection->sender_work,
+ &device->resync_work);
}
static void fifo_set(struct fifo_buffer *fb, int value)
@@ -504,9 +499,9 @@ struct fifo_buffer *fifo_alloc(int fifo_size)
static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
{
struct disk_conf *dc;
- unsigned int want; /* The number of sectors we want in the proxy */
+ unsigned int want; /* The number of sectors we want in-flight */
int req_sect; /* Number of sectors to request in this turn */
- int correction; /* Number of sectors more we need in the proxy*/
+ int correction; /* Number of sectors more we need in-flight */
int cps; /* correction per invocation of drbd_rs_controller() */
int steps; /* Number of time steps to plan ahead */
int curr_corr;
@@ -577,20 +572,27 @@ static int drbd_rs_number_requests(struct drbd_device *device)
* potentially causing a distributed deadlock on congestion during
* online-verify or (checksum-based) resync, if max-buffers,
* socket buffer sizes and resync rate settings are mis-configured. */
- if (mxb - device->rs_in_flight < number)
- number = mxb - device->rs_in_flight;
+
+ /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
+ * mxb (as used here, and in drbd_alloc_pages on the peer) is
+ * "number of pages" (typically also 4k),
+ * but "rs_in_flight" is in "sectors" (512 Byte). */
+ if (mxb - device->rs_in_flight/8 < number)
+ number = mxb - device->rs_in_flight/8;
return number;
}
-static int make_resync_request(struct drbd_device *device, int cancel)
+static int make_resync_request(struct drbd_device *const device, int cancel)
{
+ struct drbd_peer_device *const peer_device = first_peer_device(device);
+ struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
unsigned long bit;
sector_t sector;
const sector_t capacity = drbd_get_capacity(device->this_bdev);
int max_bio_size;
int number, rollback_i, size;
- int align, queued, sndbuf;
+ int align, requeue = 0;
int i = 0;
if (unlikely(cancel))
@@ -617,17 +619,22 @@ static int make_resync_request(struct drbd_device *device, int cancel)
goto requeue;
for (i = 0; i < number; i++) {
- /* Stop generating RS requests, when half of the send buffer is filled */
- mutex_lock(&first_peer_device(device)->connection->data.mutex);
- if (first_peer_device(device)->connection->data.socket) {
- queued = first_peer_device(device)->connection->data.socket->sk->sk_wmem_queued;
- sndbuf = first_peer_device(device)->connection->data.socket->sk->sk_sndbuf;
- } else {
- queued = 1;
- sndbuf = 0;
- }
- mutex_unlock(&first_peer_device(device)->connection->data.mutex);
- if (queued > sndbuf / 2)
+ /* Stop generating RS requests when half of the send buffer is filled,
+ * but notify TCP that we'd like to have more space. */
+ mutex_lock(&connection->data.mutex);
+ if (connection->data.socket) {
+ struct sock *sk = connection->data.socket->sk;
+ int queued = sk->sk_wmem_queued;
+ int sndbuf = sk->sk_sndbuf;
+ if (queued > sndbuf / 2) {
+ requeue = 1;
+ if (sk->sk_socket)
+ set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+ }
+ } else
+ requeue = 1;
+ mutex_unlock(&connection->data.mutex);
+ if (requeue)
goto requeue;
next_sector:
@@ -642,8 +649,7 @@ next_sector:
sector = BM_BIT_TO_SECT(bit);
- if (drbd_rs_should_slow_down(device, sector) ||
- drbd_try_rs_begin_io(device, sector)) {
+ if (drbd_try_rs_begin_io(device, sector)) {
device->bm_resync_fo = bit;
goto requeue;
}
@@ -696,9 +702,9 @@ next_sector:
/* adjust very last sectors, in case we are oddly sized */
if (sector + (size>>9) > capacity)
size = (capacity-sector)<<9;
- if (first_peer_device(device)->connection->agreed_pro_version >= 89 &&
- first_peer_device(device)->connection->csums_tfm) {
- switch (read_for_csum(first_peer_device(device), sector, size)) {
+
+ if (device->use_csums) {
+ switch (read_for_csum(peer_device, sector, size)) {
case -EIO: /* Disk failure */
put_ldev(device);
return -EIO;
@@ -717,7 +723,7 @@ next_sector:
int err;
inc_rs_pending(device);
- err = drbd_send_drequest(first_peer_device(device), P_RS_DATA_REQUEST,
+ err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
sector, size, ID_SYNCER);
if (err) {
drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
@@ -774,8 +780,7 @@ static int make_ov_request(struct drbd_device *device, int cancel)
size = BM_BLOCK_SIZE;
- if (drbd_rs_should_slow_down(device, sector) ||
- drbd_try_rs_begin_io(device, sector)) {
+ if (drbd_try_rs_begin_io(device, sector)) {
device->ov_position = sector;
goto requeue;
}
@@ -911,7 +916,7 @@ int drbd_resync_finished(struct drbd_device *device)
if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
khelper_cmd = "after-resync-target";
- if (first_peer_device(device)->connection->csums_tfm && device->rs_total) {
+ if (device->use_csums && device->rs_total) {
const unsigned long s = device->rs_same_csum;
const unsigned long t = device->rs_total;
const int ratio =
@@ -1351,13 +1356,15 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_device *device = req->device;
- struct drbd_connection *connection = first_peer_device(device)->connection;
+ struct drbd_peer_device *const peer_device = first_peer_device(device);
+ struct drbd_connection *const connection = peer_device->connection;
int err;
if (unlikely(cancel)) {
req_mod(req, SEND_CANCELED);
return 0;
}
+ req->pre_send_jif = jiffies;
/* this time, no connection->send.current_epoch_writes++;
* If it was sent, it was the closing barrier for the last
@@ -1365,7 +1372,7 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
* No more barriers will be sent, until we leave AHEAD mode again. */
maybe_send_barrier(connection, req->epoch);
- err = drbd_send_out_of_sync(first_peer_device(device), req);
+ err = drbd_send_out_of_sync(peer_device, req);
req_mod(req, OOS_HANDED_TO_NETWORK);
return err;
@@ -1380,19 +1387,21 @@ int w_send_dblock(struct drbd_work *w, int cancel)
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_device *device = req->device;
- struct drbd_connection *connection = first_peer_device(device)->connection;
+ struct drbd_peer_device *const peer_device = first_peer_device(device);
+ struct drbd_connection *connection = peer_device->connection;
int err;
if (unlikely(cancel)) {
req_mod(req, SEND_CANCELED);
return 0;
}
+ req->pre_send_jif = jiffies;
re_init_if_first_write(connection, req->epoch);
maybe_send_barrier(connection, req->epoch);
connection->send.current_epoch_writes++;
- err = drbd_send_dblock(first_peer_device(device), req);
+ err = drbd_send_dblock(peer_device, req);
req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
return err;
@@ -1407,19 +1416,21 @@ int w_send_read_req(struct drbd_work *w, int cancel)
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_device *device = req->device;
- struct drbd_connection *connection = first_peer_device(device)->connection;
+ struct drbd_peer_device *const peer_device = first_peer_device(device);
+ struct drbd_connection *connection = peer_device->connection;
int err;
if (unlikely(cancel)) {
req_mod(req, SEND_CANCELED);
return 0;
}
+ req->pre_send_jif = jiffies;
/* Even read requests may close a write epoch,
* if there was any yet. */
maybe_send_barrier(connection, req->epoch);
- err = drbd_send_drequest(first_peer_device(device), P_DATA_REQUEST, req->i.sector, req->i.size,
+ err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
(unsigned long)req);
req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
@@ -1433,7 +1444,7 @@ int w_restart_disk_io(struct drbd_work *w, int cancel)
struct drbd_device *device = req->device;
if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
- drbd_al_begin_io(device, &req->i, false);
+ drbd_al_begin_io(device, &req->i);
drbd_req_make_private_bio(req, req->master_bio);
req->private_bio->bi_bdev = device->ldev->backing_bdev;
@@ -1601,26 +1612,32 @@ void drbd_rs_controller_reset(struct drbd_device *device)
void start_resync_timer_fn(unsigned long data)
{
struct drbd_device *device = (struct drbd_device *) data;
-
- drbd_queue_work(&first_peer_device(device)->connection->sender_work,
- &device->start_resync_work);
+ drbd_device_post_work(device, RS_START);
}
-int w_start_resync(struct drbd_work *w, int cancel)
+static void do_start_resync(struct drbd_device *device)
{
- struct drbd_device *device =
- container_of(w, struct drbd_device, start_resync_work);
-
if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
- drbd_warn(device, "w_start_resync later...\n");
+ drbd_warn(device, "postponing start_resync ...\n");
device->start_resync_timer.expires = jiffies + HZ/10;
add_timer(&device->start_resync_timer);
- return 0;
+ return;
}
drbd_start_resync(device, C_SYNC_SOURCE);
clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
- return 0;
+}
+
+static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
+{
+ bool csums_after_crash_only;
+ rcu_read_lock();
+ csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
+ rcu_read_unlock();
+ return connection->agreed_pro_version >= 89 && /* supported? */
+ connection->csums_tfm && /* configured? */
+ (csums_after_crash_only == 0 /* use for each resync? */
+ || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
}
/**
@@ -1633,6 +1650,8 @@ int w_start_resync(struct drbd_work *w, int cancel)
*/
void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
{
+ struct drbd_peer_device *peer_device = first_peer_device(device);
+ struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
union drbd_state ns;
int r;
@@ -1651,7 +1670,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
if (r > 0) {
drbd_info(device, "before-resync-target handler returned %d, "
"dropping connection.\n", r);
- conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
+ conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
return;
}
} else /* C_SYNC_SOURCE */ {
@@ -1664,7 +1683,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
} else {
drbd_info(device, "before-resync-source handler returned %d, "
"dropping connection.\n", r);
- conn_request_state(first_peer_device(device)->connection,
+ conn_request_state(connection,
NS(conn, C_DISCONNECTING), CS_HARD);
return;
}
@@ -1672,7 +1691,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
}
}
- if (current == first_peer_device(device)->connection->worker.task) {
+ if (current == connection->worker.task) {
/* The worker should not sleep waiting for state_mutex,
that can take long */
if (!mutex_trylock(device->state_mutex)) {
@@ -1733,11 +1752,20 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
device->rs_mark_time[i] = now;
}
_drbd_pause_after(device);
+ /* Forget potentially stale cached per resync extent bit-counts.
+ * Open coded drbd_rs_cancel_all(device), we already have IRQs
+ * disabled, and know the disk state is ok. */
+ spin_lock(&device->al_lock);
+ lc_reset(device->resync);
+ device->resync_locked = 0;
+ device->resync_wenr = LC_FREE;
+ spin_unlock(&device->al_lock);
}
write_unlock(&global_state_lock);
spin_unlock_irq(&device->resource->req_lock);
if (r == SS_SUCCESS) {
+ wake_up(&device->al_wait); /* for lc_reset() above */
/* reset rs_last_bcast when a resync or verify is started,
* to deal with potential jiffies wrap. */
device->rs_last_bcast = jiffies - HZ;
@@ -1746,8 +1774,12 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
drbd_conn_str(ns.conn),
(unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
(unsigned long) device->rs_total);
- if (side == C_SYNC_TARGET)
+ if (side == C_SYNC_TARGET) {
device->bm_resync_fo = 0;
+ device->use_csums = use_checksum_based_resync(connection, device);
+ } else {
+ device->use_csums = 0;
+ }
/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
* with w_send_oos, or the sync target will get confused as to
@@ -1756,12 +1788,10 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
* drbd_resync_finished from here in that case.
* We drbd_gen_and_send_sync_uuid here for protocol < 96,
* and from after_state_ch otherwise. */
- if (side == C_SYNC_SOURCE &&
- first_peer_device(device)->connection->agreed_pro_version < 96)
- drbd_gen_and_send_sync_uuid(first_peer_device(device));
+ if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
+ drbd_gen_and_send_sync_uuid(peer_device);
- if (first_peer_device(device)->connection->agreed_pro_version < 95 &&
- device->rs_total == 0) {
+ if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
/* This still has a race (about when exactly the peers
* detect connection loss) that can lead to a full sync
* on next handshake. In 8.3.9 we fixed this with explicit
@@ -1777,7 +1807,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
int timeo;
rcu_read_lock();
- nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
+ nc = rcu_dereference(connection->net_conf);
timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
rcu_read_unlock();
schedule_timeout_interruptible(timeo);
@@ -1799,10 +1829,165 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
mutex_unlock(device->state_mutex);
}
+static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
+{
+ struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
+ device->rs_last_bcast = jiffies;
+
+ if (!get_ldev(device))
+ return;
+
+ drbd_bm_write_lazy(device, 0);
+ if (resync_done && is_sync_state(device->state.conn))
+ drbd_resync_finished(device);
+
+ drbd_bcast_event(device, &sib);
+ /* update timestamp, in case it took a while to write out stuff */
+ device->rs_last_bcast = jiffies;
+ put_ldev(device);
+}
+
+static void drbd_ldev_destroy(struct drbd_device *device)
+{
+ lc_destroy(device->resync);
+ device->resync = NULL;
+ lc_destroy(device->act_log);
+ device->act_log = NULL;
+ __no_warn(local,
+ drbd_free_ldev(device->ldev);
+ device->ldev = NULL;);
+ clear_bit(GOING_DISKLESS, &device->flags);
+ wake_up(&device->misc_wait);
+}
+
+static void go_diskless(struct drbd_device *device)
+{
+ D_ASSERT(device, device->state.disk == D_FAILED);
+ /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
+ * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
+ * the protected members anymore, though, so once put_ldev reaches zero
+ * again, it will be safe to free them. */
+
+ /* Try to write changed bitmap pages, read errors may have just
+ * set some bits outside the area covered by the activity log.
+ *
+ * If we have an IO error during the bitmap writeout,
+ * we will want a full sync next time, just in case.
+ * (Do we want a specific meta data flag for this?)
+ *
+ * If that does not make it to stable storage either,
+ * we cannot do anything about that anymore.
+ *
+ * We still need to check if both bitmap and ldev are present, we may
+ * end up here after a failed attach, before ldev was even assigned.
+ */
+ if (device->bitmap && device->ldev) {
+ /* An interrupted resync or similar is allowed to recounts bits
+ * while we detach.
+ * Any modifications would not be expected anymore, though.
+ */
+ if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
+ "detach", BM_LOCKED_TEST_ALLOWED)) {
+ if (test_bit(WAS_READ_ERROR, &device->flags)) {
+ drbd_md_set_flag(device, MDF_FULL_SYNC);
+ drbd_md_sync(device);
+ }
+ }
+ }
+
+ drbd_force_state(device, NS(disk, D_DISKLESS));
+}
+
+static int do_md_sync(struct drbd_device *device)
+{
+ drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
+ drbd_md_sync(device);
+ return 0;
+}
+
+/* only called from drbd_worker thread, no locking */
+void __update_timing_details(
+ struct drbd_thread_timing_details *tdp,
+ unsigned int *cb_nr,
+ void *cb,
+ const char *fn, const unsigned int line)
+{
+ unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
+ struct drbd_thread_timing_details *td = tdp + i;
+
+ td->start_jif = jiffies;
+ td->cb_addr = cb;
+ td->caller_fn = fn;
+ td->line = line;
+ td->cb_nr = *cb_nr;
+
+ i = (i+1) % DRBD_THREAD_DETAILS_HIST;
+ td = tdp + i;
+ memset(td, 0, sizeof(*td));
+
+ ++(*cb_nr);
+}
+
+#define WORK_PENDING(work_bit, todo) (todo & (1UL << work_bit))
+static void do_device_work(struct drbd_device *device, const unsigned long todo)
+{
+ if (WORK_PENDING(MD_SYNC, todo))
+ do_md_sync(device);
+ if (WORK_PENDING(RS_DONE, todo) ||
+ WORK_PENDING(RS_PROGRESS, todo))
+ update_on_disk_bitmap(device, WORK_PENDING(RS_DONE, todo));
+ if (WORK_PENDING(GO_DISKLESS, todo))
+ go_diskless(device);
+ if (WORK_PENDING(DESTROY_DISK, todo))
+ drbd_ldev_destroy(device);
+ if (WORK_PENDING(RS_START, todo))
+ do_start_resync(device);
+}
+
+#define DRBD_DEVICE_WORK_MASK \
+ ((1UL << GO_DISKLESS) \
+ |(1UL << DESTROY_DISK) \
+ |(1UL << MD_SYNC) \
+ |(1UL << RS_START) \
+ |(1UL << RS_PROGRESS) \
+ |(1UL << RS_DONE) \
+ )
+
+static unsigned long get_work_bits(unsigned long *flags)
+{
+ unsigned long old, new;
+ do {
+ old = *flags;
+ new = old & ~DRBD_DEVICE_WORK_MASK;
+ } while (cmpxchg(flags, old, new) != old);
+ return old & DRBD_DEVICE_WORK_MASK;
+}
+
+static void do_unqueued_work(struct drbd_connection *connection)
+{
+ struct drbd_peer_device *peer_device;
+ int vnr;
+
+ rcu_read_lock();
+ idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
+ struct drbd_device *device = peer_device->device;
+ unsigned long todo = get_work_bits(&device->flags);
+ if (!todo)
+ continue;
+
+ kref_get(&device->kref);
+ rcu_read_unlock();
+ do_device_work(device, todo);
+ kref_put(&device->kref, drbd_destroy_device);
+ rcu_read_lock();
+ }
+ rcu_read_unlock();
+}
+
static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
{
spin_lock_irq(&queue->q_lock);
- list_splice_init(&queue->q, work_list);
+ list_splice_tail_init(&queue->q, work_list);
spin_unlock_irq(&queue->q_lock);
return !list_empty(work_list);
}
@@ -1851,7 +2036,7 @@ static void wait_for_work(struct drbd_connection *connection, struct list_head *
/* dequeue single item only,
* we still use drbd_queue_work_front() in some places */
if (!list_empty(&connection->sender_work.q))
- list_move(connection->sender_work.q.next, work_list);
+ list_splice_tail_init(&connection->sender_work.q, work_list);
spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
if (!list_empty(work_list) || signal_pending(current)) {
spin_unlock_irq(&connection->resource->req_lock);
@@ -1873,6 +2058,14 @@ static void wait_for_work(struct drbd_connection *connection, struct list_head *
if (send_barrier)
maybe_send_barrier(connection,
connection->send.current_epoch_nr + 1);
+
+ if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
+ break;
+
+ /* drbd_send() may have called flush_signals() */
+ if (get_t_state(&connection->worker) != RUNNING)
+ break;
+
schedule();
/* may be woken up for other things but new work, too,
* e.g. if the current epoch got closed.
@@ -1906,10 +2099,15 @@ int drbd_worker(struct drbd_thread *thi)
while (get_t_state(thi) == RUNNING) {
drbd_thread_current_set_cpu(thi);
- /* as long as we use drbd_queue_work_front(),
- * we may only dequeue single work items here, not batches. */
- if (list_empty(&work_list))
+ if (list_empty(&work_list)) {
+ update_worker_timing_details(connection, wait_for_work);
wait_for_work(connection, &work_list);
+ }
+
+ if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
+ update_worker_timing_details(connection, do_unqueued_work);
+ do_unqueued_work(connection);
+ }
if (signal_pending(current)) {
flush_signals(current);
@@ -1926,6 +2124,7 @@ int drbd_worker(struct drbd_thread *thi)
while (!list_empty(&work_list)) {
w = list_first_entry(&work_list, struct drbd_work, list);
list_del_init(&w->list);
+ update_worker_timing_details(connection, w->cb);
if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
continue;
if (connection->cstate >= C_WF_REPORT_PARAMS)
@@ -1934,13 +2133,18 @@ int drbd_worker(struct drbd_thread *thi)
}
do {
+ if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
+ update_worker_timing_details(connection, do_unqueued_work);
+ do_unqueued_work(connection);
+ }
while (!list_empty(&work_list)) {
w = list_first_entry(&work_list, struct drbd_work, list);
list_del_init(&w->list);
+ update_worker_timing_details(connection, w->cb);
w->cb(w, 1);
}
dequeue_work_batch(&connection->sender_work, &work_list);
- } while (!list_empty(&work_list));
+ } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
rcu_read_lock();
idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {