iput_final() may wait for reahahead pages. The wait can cause deadlock.
For example:
Workqueue: ceph-msgr ceph_con_workfn [libceph]
Call Trace:
schedule+0x36/0x80
io_schedule+0x16/0x40
__lock_page+0x101/0x140
truncate_inode_pages_range+0x556/0x9f0
truncate_inode_pages_final+0x4d/0x60
evict+0x182/0x1a0
iput+0x1d2/0x220
iterate_session_caps+0x82/0x230 [ceph]
dispatch+0x678/0xa80 [ceph]
ceph_con_workfn+0x95b/0x1560 [libceph]
process_one_work+0x14d/0x410
worker_thread+0x4b/0x460
kthread+0x105/0x140
ret_from_fork+0x22/0x40
Workqueue: ceph-msgr ceph_con_workfn [libceph]
Call Trace:
__schedule+0x3d6/0x8b0
schedule+0x36/0x80
schedule_preempt_disabled+0xe/0x10
mutex_lock+0x2f/0x40
ceph_check_caps+0x505/0xa80 [ceph]
ceph_put_wrbuffer_cap_refs+0x1e5/0x2c0 [ceph]
writepages_finish+0x2d3/0x410 [ceph]
__complete_request+0x26/0x60 [libceph]
handle_reply+0x6c8/0xa10 [libceph]
dispatch+0x29a/0xbb0 [libceph]
ceph_con_workfn+0x95b/0x1560 [libceph]
process_one_work+0x14d/0x410
worker_thread+0x4b/0x460
kthread+0x105/0x140
ret_from_fork+0x22/0x40
In above example, truncate_inode_pages_range() waits for readahead pages
while holding s_mutex. ceph_check_caps() waits for s_mutex and blocks
OSD dispatch thread. Later OSD replies (for readahead) can't be handled.
ceph_check_caps() also may lock snap_rwsem for read. So similar deadlock
can happen if iput_final() is called while holding snap_rwsem.
In general, it's not good to call iput_final() inside MDS/OSD dispatch
threads or while holding any mutex.
The fix is introducing ceph_async_iput(), which calls iput_final() in
workqueue.
Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
Reviewed-by: Jeff Layton <jlayton@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
}
if (complete_capsnap)
wake_up_all(&ci->i_cap_wq);
- while (put-- > 0)
- iput(inode);
+ while (put-- > 0) {
+ /* avoid calling iput_final() in osd dispatch threads */
+ ceph_async_iput(inode);
+ }
}
/*
done:
mutex_unlock(&session->s_mutex);
done_unlocked:
- iput(inode);
ceph_put_string(extra_info.pool_ns);
+ /* avoid calling iput_final() in mds dispatch threads */
+ ceph_async_iput(inode);
return;
flush_cap_releases:
if (inode) {
dout("check_delayed_caps on %p\n", inode);
ceph_check_caps(ci, flags, NULL);
- iput(inode);
+ /* avoid calling iput_final() in tick thread */
+ ceph_async_iput(inode);
}
}
spin_unlock(&mdsc->cap_delay_lock);
pr_err("fill_inode badness on %p got %d\n", in, rc);
err = rc;
}
- iput(in);
+ /* avoid calling iput_final() in mds dispatch threads */
+ ceph_async_iput(in);
}
return err;
&req->r_caps_reservation);
if (ret < 0) {
pr_err("fill_inode badness on %p\n", in);
- if (d_really_is_negative(dn))
- iput(in);
+ if (d_really_is_negative(dn)) {
+ /* avoid calling iput_final() in mds
+ * dispatch threads */
+ ceph_async_iput(in);
+ }
d_drop(dn);
err = ret;
goto next_item;
if (ceph_security_xattr_deadlock(in)) {
dout(" skip splicing dn %p to inode %p"
" (security xattr deadlock)\n", dn, in);
- iput(in);
+ ceph_async_iput(in);
skipped++;
goto next_item;
}
return ret;
}
+/*
+ * Put reference to inode, but avoid calling iput_final() in current thread.
+ * iput_final() may wait for reahahead pages. The wait can cause deadlock in
+ * some contexts.
+ */
+void ceph_async_iput(struct inode *inode)
+{
+ if (!inode)
+ return;
+ for (;;) {
+ if (atomic_add_unless(&inode->i_count, -1, 1))
+ break;
+ if (queue_work(ceph_inode_to_client(inode)->inode_wq,
+ &ceph_inode(inode)->i_work))
+ break;
+ /* queue work failed, i_count must be at least 2 */
+ }
+}
+
/*
* Write back inode data in a worker thread. (This can't be done
* in the message handler context.)
ceph_msg_put(req->r_reply);
if (req->r_inode) {
ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
- iput(req->r_inode);
+ /* avoid calling iput_final() in mds dispatch threads */
+ ceph_async_iput(req->r_inode);
}
if (req->r_parent)
ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
- iput(req->r_target_inode);
+ ceph_async_iput(req->r_target_inode);
if (req->r_dentry)
dput(req->r_dentry);
if (req->r_old_dentry)
*/
ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
- iput(req->r_old_dentry_dir);
+ ceph_async_iput(req->r_old_dentry_dir);
}
kfree(req->r_path1);
kfree(req->r_path2);
}
if (req->r_unsafe_dir) {
- iput(req->r_unsafe_dir);
+ /* avoid calling iput_final() in mds dispatch threads */
+ ceph_async_iput(req->r_unsafe_dir);
req->r_unsafe_dir = NULL;
}
cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
if (!cap) {
spin_unlock(&ci->i_ceph_lock);
- iput(inode);
+ ceph_async_iput(inode);
goto random;
}
mds = cap->session->s_mds;
cap == ci->i_auth_cap ? "auth " : "", cap);
spin_unlock(&ci->i_ceph_lock);
out:
- iput(inode);
+ /* avoid calling iput_final() while holding mdsc->mutex or
+ * in mds dispatch threads */
+ ceph_async_iput(inode);
return mds;
random:
spin_unlock(&session->s_cap_lock);
if (last_inode) {
- iput(last_inode);
+ /* avoid calling iput_final() while holding
+ * s_mutex or in mds dispatch threads */
+ ceph_async_iput(last_inode);
last_inode = NULL;
}
if (old_cap) {
session->s_cap_iterator = NULL;
spin_unlock(&session->s_cap_lock);
- iput(last_inode);
+ ceph_async_iput(last_inode);
if (old_cap)
ceph_put_cap(session->s_mdsc, old_cap);
spin_unlock(&session->s_cap_lock);
inode = ceph_find_inode(sb, vino);
- iput(inode);
+ /* avoid calling iput_final() while holding s_mutex */
+ ceph_async_iput(inode);
spin_lock(&session->s_cap_lock);
}
ceph_con_send(&session->s_con, msg);
out:
- iput(inode);
mutex_unlock(&session->s_mutex);
+ /* avoid calling iput_final() in mds dispatch threads */
+ ceph_async_iput(inode);
return;
bad:
le64_to_cpu(h->max_files));
spin_unlock(&ci->i_ceph_lock);
- iput(inode);
+ /* avoid calling iput_final() in dispatch thread */
+ ceph_async_iput(inode);
}
static struct ceph_quotarealm_inode *
ci = ceph_inode(in);
has_quota = __ceph_has_any_quota(ci);
- iput(in);
+ /* avoid calling iput_final() while holding mdsc->snap_rwsem */
+ ceph_async_iput(in);
next = realm->parent;
if (has_quota || !next)
pr_warn("Invalid quota check op (%d)\n", op);
exceeded = true; /* Just break the loop */
}
- iput(in);
+ /* avoid calling iput_final() while holding mdsc->snap_rwsem */
+ ceph_async_iput(in);
next = realm->parent;
if (exceeded || !next)
if (!inode)
continue;
spin_unlock(&realm->inodes_with_caps_lock);
- iput(lastinode);
+ /* avoid calling iput_final() while holding
+ * mdsc->snap_rwsem or in mds dispatch threads */
+ ceph_async_iput(lastinode);
lastinode = inode;
ceph_queue_cap_snap(ci);
spin_lock(&realm->inodes_with_caps_lock);
}
spin_unlock(&realm->inodes_with_caps_lock);
- iput(lastinode);
+ ceph_async_iput(lastinode);
dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino);
}
ihold(inode);
spin_unlock(&mdsc->snap_flush_lock);
ceph_flush_snaps(ci, &session);
- iput(inode);
+ /* avoid calling iput_final() while holding
+ * session->s_mutex or in mds dispatch threads */
+ ceph_async_iput(inode);
spin_lock(&mdsc->snap_flush_lock);
}
spin_unlock(&mdsc->snap_flush_lock);
ceph_get_snap_realm(mdsc, realm);
ceph_put_snap_realm(mdsc, oldrealm);
- iput(inode);
+ /* avoid calling iput_final() while holding
+ * mdsc->snap_rwsem or mds in dispatch threads */
+ ceph_async_iput(inode);
continue;
skip_inode:
spin_unlock(&ci->i_ceph_lock);
- iput(inode);
+ ceph_async_iput(inode);
}
/* we may have taken some of the old realm's children. */
extern bool ceph_inode_set_size(struct inode *inode, loff_t size);
extern void __ceph_do_pending_vmtruncate(struct inode *inode);
extern void ceph_queue_vmtruncate(struct inode *inode);
-
extern void ceph_queue_invalidate(struct inode *inode);
extern void ceph_queue_writeback(struct inode *inode);
+extern void ceph_async_iput(struct inode *inode);
extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
int mask, bool force);