From: Lars Ellenberg <lars.ellenberg@linbit.com>
Date: Tue, 14 Sep 2010 18:26:27 +0000 (+0200)
Subject: drbd: fix for possible deadlock on IO error during resync
X-Git-Url: http://git.cdn.openwrt.org/?a=commitdiff_plain;h=e9e6f3ec535d7b7c9e2ca64ad691e743e7d3c2f0;p=openwrt%2Fstaging%2Fblogic.git

drbd: fix for possible deadlock on IO error during resync

Scenario:

Something (say, flush-147:0) is in drbd_al_begin_io,
holding a local_cnt, waiting for the resync to make progress.

Disk fails, worker in after_state_ch does drbd_rs_cancel_all,
then waits for local_cnt to drop to zero.

flush-147:0 is woken by drbd_rs_cancel_all, needs to write an AL
transaction, and queues that on the worker.

Deadlock.

Fix: do not wait in the worker, have put_ldev() trigger the
state change D_FAILED -> D_DISKLESS when necessary.
put_ldev() cannot do the state change directly, as it may or may not
already hold various spinlocks. We queue a short work instead.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
---

diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index 8ab6fed39539..c07c370c4c82 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -852,6 +852,7 @@ enum {
 	BITMAP_IO,		/* suspend application io;
 				   once no more io in flight, start bitmap io */
 	BITMAP_IO_QUEUED,       /* Started bitmap IO */
+	GO_DISKLESS,		/* Disk failed, local_cnt reached zero, we are going diskless */
 	RESYNC_AFTER_NEG,       /* Resync after online grow after the attach&negotiate finished. */
 	NET_CONGESTED,		/* The data socket is congested */
 
@@ -976,6 +977,7 @@ struct drbd_conf {
 	unsigned int ko_count;
 	struct drbd_work  resync_work,
 			  unplug_work,
+			  go_diskless,
 			  md_sync_work;
 	struct timer_list resync_timer;
 	struct timer_list md_sync_timer;
@@ -1278,6 +1280,7 @@ extern void drbd_queue_bitmap_io(struct drbd_conf *mdev,
 extern int drbd_bmio_set_n_write(struct drbd_conf *mdev);
 extern int drbd_bmio_clear_n_write(struct drbd_conf *mdev);
 extern int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why);
+extern void drbd_go_diskless(struct drbd_conf *mdev);
 
 
 /* Meta data layout
@@ -2123,8 +2126,11 @@ static inline void put_ldev(struct drbd_conf *mdev)
 	int i = atomic_dec_return(&mdev->local_cnt);
 	__release(local);
 	D_ASSERT(i >= 0);
-	if (i == 0)
+	if (i == 0) {
+		if (mdev->state.disk == D_FAILED)
+			drbd_go_diskless(mdev);
 		wake_up(&mdev->misc_wait);
+	}
 }
 
 #ifndef __CHECKER__
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 63f45d730f3f..f89b97466d07 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -77,6 +77,7 @@ static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
 static void md_sync_timer_fn(unsigned long data);
 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
+static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused);
 
 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
 	      "Lars Ellenberg <lars@linbit.com>");
@@ -1363,42 +1364,46 @@ static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
 
+	/* first half of local IO error */
 	if (os.disk > D_FAILED && ns.disk == D_FAILED) {
-		enum drbd_io_error_p eh;
+		enum drbd_io_error_p eh = EP_PASS_ON;
+
+		if (drbd_send_state(mdev))
+			dev_warn(DEV, "Notified peer that my disk is broken.\n");
+		else
+			dev_err(DEV, "Sending state for drbd_io_error() failed\n");
+
+		drbd_rs_cancel_all(mdev);
 
-		eh = EP_PASS_ON;
 		if (get_ldev_if_state(mdev, D_FAILED)) {
 			eh = mdev->ldev->dc.on_io_error;
 			put_ldev(mdev);
 		}
+		if (eh == EP_CALL_HELPER)
+			drbd_khelper(mdev, "local-io-error");
+	}
 
-		drbd_rs_cancel_all(mdev);
-		/* since get_ldev() only works as long as disk>=D_INCONSISTENT,
-		   and it is D_DISKLESS here, local_cnt can only go down, it can
-		   not increase... It will reach zero */
-		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
+
+	/* second half of local IO error handling,
+	 * after local_cnt references have reached zero: */
+	if (os.disk == D_FAILED && ns.disk == D_DISKLESS) {
 		mdev->rs_total = 0;
 		mdev->rs_failed = 0;
 		atomic_set(&mdev->rs_pending_cnt, 0);
-
-		spin_lock_irq(&mdev->req_lock);
-		_drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
-		spin_unlock_irq(&mdev->req_lock);
-
-		if (eh == EP_CALL_HELPER)
-			drbd_khelper(mdev, "local-io-error");
 	}
 
 	if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
+		int c = atomic_read(&mdev->local_cnt);
 
-		if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
-			if (drbd_send_state(mdev))
-				dev_warn(DEV, "Notified peer that my disk is broken.\n");
-			else
-				dev_err(DEV, "Sending state in drbd_io_error() failed\n");
-		}
+		if (drbd_send_state(mdev))
+			dev_warn(DEV, "Notified peer that I detached my disk.\n");
+		else
+			dev_err(DEV, "Sending state for detach failed\n");
 
-		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
+		if (c != 0) {
+			dev_err(DEV, "Logic bug, local_cnt=%d, but should be 0\n", c);
+			wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
+		}
 		lc_destroy(mdev->resync);
 		mdev->resync = NULL;
 		lc_destroy(mdev->act_log);
@@ -2803,11 +2808,13 @@ void drbd_init_set_defaults(struct drbd_conf *mdev)
 	INIT_LIST_HEAD(&mdev->meta.work.q);
 	INIT_LIST_HEAD(&mdev->resync_work.list);
 	INIT_LIST_HEAD(&mdev->unplug_work.list);
+	INIT_LIST_HEAD(&mdev->go_diskless.list);
 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
 	INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
 
 	mdev->resync_work.cb  = w_resync_inactive;
 	mdev->unplug_work.cb  = w_send_write_hint;
+	mdev->go_diskless.cb  = w_go_diskless;
 	mdev->md_sync_work.cb = w_md_sync;
 	mdev->bm_io_work.w.cb = w_bitmap_io;
 	init_timer(&mdev->resync_timer);
@@ -2885,6 +2892,7 @@ void drbd_mdev_cleanup(struct drbd_conf *mdev)
 	D_ASSERT(list_empty(&mdev->meta.work.q));
 	D_ASSERT(list_empty(&mdev->resync_work.list));
 	D_ASSERT(list_empty(&mdev->unplug_work.list));
+	D_ASSERT(list_empty(&mdev->go_diskless.list));
 
 }
 
@@ -3712,6 +3720,24 @@ static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
 	return 1;
 }
 
+static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
+{
+	D_ASSERT(mdev->state.disk == D_FAILED);
+	D_ASSERT(atomic_read(&mdev->local_cnt) == 0);
+
+	drbd_force_state(mdev, NS(disk, D_DISKLESS));
+
+	clear_bit(GO_DISKLESS, &mdev->flags);
+	return 1;
+}
+
+void drbd_go_diskless(struct drbd_conf *mdev)
+{
+	D_ASSERT(mdev->state.disk == D_FAILED);
+	if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
+		drbd_queue_work_front(&mdev->data.work, &mdev->go_diskless);
+}
+
 /**
  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
  * @mdev:	DRBD device.