ceph: search cache postion for dcache readdir
authorYan, Zheng <zyan@redhat.com>
Thu, 28 Apr 2016 09:43:35 +0000 (17:43 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Wed, 25 May 2016 23:15:33 +0000 (01:15 +0200)
use binary search to find cache index that corresponds to readdir
postion.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/dir.c

index 4fb2bbc2a2722af6e9ccf84cf9fc80ce7f3edbeb..cdea450c59026bb9fb209f3218ce8503148e8ad1 100644 (file)
@@ -110,6 +110,50 @@ static int note_last_dentry(struct ceph_file_info *fi, const char *name,
        return 0;
 }
 
+
+static struct dentry *
+__dcache_find_get_entry(struct dentry *parent, u64 idx,
+                       struct ceph_readdir_cache_control *cache_ctl)
+{
+       struct inode *dir = d_inode(parent);
+       struct dentry *dentry;
+       unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1;
+       loff_t ptr_pos = idx * sizeof(struct dentry *);
+       pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT;
+
+       if (ptr_pos >= i_size_read(dir))
+               return NULL;
+
+       if (!cache_ctl->page || ptr_pgoff != page_index(cache_ctl->page)) {
+               ceph_readdir_cache_release(cache_ctl);
+               cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff);
+               if (!cache_ctl->page) {
+                       dout(" page %lu not found\n", ptr_pgoff);
+                       return ERR_PTR(-EAGAIN);
+               }
+               /* reading/filling the cache are serialized by
+                  i_mutex, no need to use page lock */
+               unlock_page(cache_ctl->page);
+               cache_ctl->dentries = kmap(cache_ctl->page);
+       }
+
+       cache_ctl->index = idx & idx_mask;
+
+       rcu_read_lock();
+       spin_lock(&parent->d_lock);
+       /* check i_size again here, because empty directory can be
+        * marked as complete while not holding the i_mutex. */
+       if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir))
+               dentry = cache_ctl->dentries[cache_ctl->index];
+       else
+               dentry = NULL;
+       spin_unlock(&parent->d_lock);
+       if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
+               dentry = NULL;
+       rcu_read_unlock();
+       return dentry ? : ERR_PTR(-EAGAIN);
+}
+
 /*
  * When possible, we try to satisfy a readdir by peeking at the
  * dcache.  We make this work by carefully ordering dentries on
@@ -129,62 +173,57 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
        struct inode *dir = d_inode(parent);
        struct dentry *dentry, *last = NULL;
        struct ceph_dentry_info *di;
-       unsigned nsize = PAGE_SIZE / sizeof(struct dentry *);
-       int err = 0;
-       loff_t ptr_pos = 0;
        struct ceph_readdir_cache_control cache_ctl = {};
+       u64 idx = 0;
+       int err = 0;
 
        dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos);
 
-       /* we can calculate cache index for the first dirfrag */
-       if (ceph_frag_is_leftmost(fpos_frag(ctx->pos))) {
-               cache_ctl.index = fpos_off(ctx->pos) - 2;
-               BUG_ON(cache_ctl.index < 0);
-               ptr_pos = cache_ctl.index * sizeof(struct dentry *);
+       /* search start position */
+       if (ctx->pos > 2) {
+               u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *));
+               while (count > 0) {
+                       u64 step = count >> 1;
+                       dentry = __dcache_find_get_entry(parent, idx + step,
+                                                        &cache_ctl);
+                       if (!dentry) {
+                               /* use linar search */
+                               idx = 0;
+                               break;
+                       }
+                       if (IS_ERR(dentry)) {
+                               err = PTR_ERR(dentry);
+                               goto out;
+                       }
+                       di = ceph_dentry(dentry);
+                       spin_lock(&dentry->d_lock);
+                       if (fpos_cmp(di->offset, ctx->pos) < 0) {
+                               idx += step + 1;
+                               count -= step + 1;
+                       } else {
+                               count = step;
+                       }
+                       spin_unlock(&dentry->d_lock);
+                       dput(dentry);
+               }
+
+               dout("__dcache_readdir %p cache idx %llu\n", dir, idx);
        }
 
-       while (true) {
-               pgoff_t pgoff;
-               bool emit_dentry;
 
-               if (ptr_pos >= i_size_read(dir)) {
+       for (;;) {
+               bool emit_dentry = false;
+               dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl);
+               if (!dentry) {
                        fi->flags |= CEPH_F_ATEND;
                        err = 0;
                        break;
                }
-
-               err = -EAGAIN;
-               pgoff = ptr_pos >> PAGE_SHIFT;
-               if (!cache_ctl.page || pgoff != page_index(cache_ctl.page)) {
-                       ceph_readdir_cache_release(&cache_ctl);
-                       cache_ctl.page = find_lock_page(&dir->i_data, pgoff);
-                       if (!cache_ctl.page) {
-                               dout(" page %lu not found\n", pgoff);
-                               break;
-                       }
-                       /* reading/filling the cache are serialized by
-                        * i_mutex, no need to use page lock */
-                       unlock_page(cache_ctl.page);
-                       cache_ctl.dentries = kmap(cache_ctl.page);
+               if (IS_ERR(dentry)) {
+                       err = PTR_ERR(dentry);
+                       goto out;
                }
 
-               rcu_read_lock();
-               spin_lock(&parent->d_lock);
-               /* check i_size again here, because empty directory can be
-                * marked as complete while not holding the i_mutex. */
-               if (ceph_dir_is_complete_ordered(dir) &&
-                   ptr_pos < i_size_read(dir))
-                       dentry = cache_ctl.dentries[cache_ctl.index % nsize];
-               else
-                       dentry = NULL;
-               spin_unlock(&parent->d_lock);
-               if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
-                       dentry = NULL;
-               rcu_read_unlock();
-               if (!dentry)
-                       break;
-
-               emit_dentry = false;
                di = ceph_dentry(dentry);
                spin_lock(&dentry->d_lock);
                if (di->lease_shared_gen == shared_gen &&
@@ -217,10 +256,8 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
                } else {
                        dput(dentry);
                }
-
-               cache_ctl.index++;
-               ptr_pos += sizeof(struct dentry *);
        }
+out:
        ceph_readdir_cache_release(&cache_ctl);
        if (last) {
                int ret;