libceph: a per-osdc crush scratch buffer
authorIlya Dryomov <ilya.dryomov@inktank.com>
Fri, 31 Jan 2014 15:54:26 +0000 (17:54 +0200)
committerYan, Zheng <zheng.z.yan@intel.com>
Thu, 3 Apr 2014 02:33:50 +0000 (10:33 +0800)
With the addition of erasure coding support in the future, scratch
variable-length array in crush_do_rule_ary() is going to grow to at
least 200 bytes on average, on top of another 128 bytes consumed by
rawosd/osd arrays in the call chain.  Replace it with a buffer inside
struct osdmap and a mutex.  This shouldn't result in any contention,
because all osd requests were already serialized by request_mutex at
that point; the only unlocked caller was ceph_ioctl_get_dataloc().

Signed-off-by: Ilya Dryomov <ilya.dryomov@inktank.com>
Reviewed-by: Sage Weil <sage@inktank.com>
include/linux/ceph/osdmap.h
net/ceph/osdmap.c

index 49ff69f0746bd6f28ee517ac6146716ff0e27fa5..8c8b3cefc28b608c02898e1a71b5decd4002d17d 100644 (file)
@@ -84,6 +84,9 @@ struct ceph_osdmap {
        /* the CRUSH map specifies the mapping of placement groups to
         * the list of osds that store+replicate them. */
        struct crush_map *crush;
+
+       struct mutex crush_scratch_mutex;
+       int crush_scratch_ary[CEPH_PG_MAX_SIZE * 3];
 };
 
 static inline void ceph_oid_set_name(struct ceph_object_id *oid,
index aade4a5c1c07f6ab0ca0f1ee66dfacbded110aed..9d1aaa24def6c31d90704dd6a5098d73f7422c0c 100644 (file)
@@ -698,7 +698,9 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
        map = kzalloc(sizeof(*map), GFP_NOFS);
        if (map == NULL)
                return ERR_PTR(-ENOMEM);
+
        map->pg_temp = RB_ROOT;
+       mutex_init(&map->crush_scratch_mutex);
 
        ceph_decode_16_safe(p, end, version, bad);
        if (version > 6) {
@@ -1142,14 +1144,20 @@ int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap,
 }
 EXPORT_SYMBOL(ceph_oloc_oid_to_pg);
 
-static int crush_do_rule_ary(const struct crush_map *map, int ruleno, int x,
-                            int *result, int result_max,
-                            const __u32 *weight, int weight_max)
+static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
+                   int *result, int result_max,
+                   const __u32 *weight, int weight_max)
 {
-       int scratch[result_max * 3];
+       int r;
+
+       BUG_ON(result_max > CEPH_PG_MAX_SIZE);
+
+       mutex_lock(&map->crush_scratch_mutex);
+       r = crush_do_rule(map->crush, ruleno, x, result, result_max,
+                         weight, weight_max, map->crush_scratch_ary);
+       mutex_unlock(&map->crush_scratch_mutex);
 
-       return crush_do_rule(map, ruleno, x, result, result_max,
-                            weight, weight_max, scratch);
+       return r;
 }
 
 /*
@@ -1205,9 +1213,8 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
                                      pool->pgp_num_mask) +
                        (unsigned)pgid.pool;
        }
-       r = crush_do_rule_ary(osdmap->crush, ruleno, pps,
-                             osds, min_t(int, pool->size, *num),
-                             osdmap->osd_weight, osdmap->max_osd);
+       r = do_crush(osdmap, ruleno, pps, osds, min_t(int, pool->size, *num),
+                    osdmap->osd_weight, osdmap->max_osd);
        if (r < 0) {
                pr_err("error %d from crush rule: pool %lld ruleset %d type %d"
                       " size %d\n", r, pgid.pool, pool->crush_ruleset,