From cd9d9f5df6098c50726200d4185e9e8da32785b3 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 4 Apr 2012 13:35:44 -0500 Subject: rbd: don't hold spinlock during messenger flush A recent change made changes to the rbd_client_list be protected by a spinlock. Unfortunately in rbd_put_client(), the lock is taken before possibly dropping the last reference to an rbd_client, and on the last reference that eventually calls flush_workqueue() which can sleep. The problem was flagged by a debug spinlock warning: BUG: spinlock wrong CPU on CPU#3, rbd/27814 The solution is to move the spinlock acquisition and release inside rbd_client_release(), which is the spot where it's really needed for protecting the removal of the rbd_client from the client list. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- drivers/block/rbd.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 013c7a549fb6..c1f770131654 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -450,7 +450,9 @@ static void rbd_client_release(struct kref *kref) struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); dout("rbd_release_client %p\n", rbdc); + spin_lock(&rbd_client_list_lock); list_del(&rbdc->node); + spin_unlock(&rbd_client_list_lock); ceph_destroy_client(rbdc->client); kfree(rbdc->rbd_opts); @@ -463,9 +465,7 @@ static void rbd_client_release(struct kref *kref) */ static void rbd_put_client(struct rbd_device *rbd_dev) { - spin_lock(&rbd_client_list_lock); kref_put(&rbd_dev->rbd_client->kref, rbd_client_release); - spin_unlock(&rbd_client_list_lock); rbd_dev->rbd_client = NULL; } -- cgit v1.2.3 From 3469ac1aa3a2f1e2586a412923c414779a0af854 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 7 May 2012 15:33:36 -0700 Subject: ceph: drop support for preferred_osd pgs This was an ill-conceived feature that has been removed from Ceph. Do this gracefully: - reject attempts to specify a preferred_osd via the ioctl - stop exposing this information via virtual xattrs - always fill in -1 for requests, in case we talk to an older server - don't calculate preferred_osd placements/pgids Reviewed-by: Alex Elder Signed-off-by: Sage Weil --- drivers/block/rbd.c | 1 - fs/ceph/file.c | 1 - fs/ceph/ioctl.c | 15 +++++--------- fs/ceph/xattr.c | 9 --------- include/linux/ceph/ceph_fs.h | 4 ++-- include/linux/ceph/osdmap.h | 2 -- net/ceph/osdmap.c | 47 ++++++++++---------------------------------- 7 files changed, 17 insertions(+), 62 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index c1f770131654..a67fa63a966b 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -935,7 +935,6 @@ static int rbd_do_request(struct request *rq, layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); layout->fl_stripe_count = cpu_to_le32(1); layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); - layout->fl_pg_preferred = cpu_to_le32(-1); layout->fl_pg_pool = cpu_to_le32(dev->poolid); ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno, req, ops); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index ed72428d9c75..988d4f302e48 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -54,7 +54,6 @@ prepare_open_request(struct super_block *sb, int flags, int create_mode) req->r_fmode = ceph_flags_to_mode(flags); req->r_args.open.flags = cpu_to_le32(flags); req->r_args.open.mode = cpu_to_le32(create_mode); - req->r_args.open.preferred = cpu_to_le32(-1); out: return req; } diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 790914a598dd..4feab52c5bff 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -26,8 +26,7 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg) l.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); l.object_size = ceph_file_layout_object_size(ci->i_layout); l.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool); - l.preferred_osd = - (s32)le32_to_cpu(ci->i_layout.fl_pg_preferred); + l.preferred_osd = (s32)-1; if (copy_to_user(arg, &l, sizeof(l))) return -EFAULT; } @@ -49,6 +48,10 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) if (copy_from_user(&l, arg, sizeof(l))) return -EFAULT; + /* preferred_osd is no longer supported */ + if (l.preferred_osd != -1) + return -EINVAL; + /* validate changed params against current layout */ err = ceph_do_getattr(file->f_dentry->d_inode, CEPH_STAT_CAP_LAYOUT); if (!err) { @@ -56,8 +59,6 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); nl.object_size = ceph_file_layout_object_size(ci->i_layout); nl.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool); - nl.preferred_osd = - (s32)le32_to_cpu(ci->i_layout.fl_pg_preferred); } else return err; @@ -69,8 +70,6 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) nl.object_size = l.object_size; if (l.data_pool) nl.data_pool = l.data_pool; - if (l.preferred_osd) - nl.preferred_osd = l.preferred_osd; if ((nl.object_size & ~PAGE_MASK) || (nl.stripe_unit & ~PAGE_MASK) || @@ -106,8 +105,6 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) req->r_args.setlayout.layout.fl_object_size = cpu_to_le32(l.object_size); req->r_args.setlayout.layout.fl_pg_pool = cpu_to_le32(l.data_pool); - req->r_args.setlayout.layout.fl_pg_preferred = - cpu_to_le32(l.preferred_osd); parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); err = ceph_mdsc_do_request(mdsc, parent_inode, req); @@ -171,8 +168,6 @@ static long ceph_ioctl_set_layout_policy (struct file *file, void __user *arg) cpu_to_le32(l.object_size); req->r_args.setlayout.layout.fl_pg_pool = cpu_to_le32(l.data_pool); - req->r_args.setlayout.layout.fl_pg_preferred = - cpu_to_le32(l.preferred_osd); err = ceph_mdsc_do_request(mdsc, inode, req); ceph_mdsc_put_request(req); diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 35b86331d8a5..785cb3057c95 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -118,15 +118,6 @@ static size_t ceph_vxattrcb_file_layout(struct ceph_inode_info *ci, char *val, (unsigned long long)ceph_file_layout_su(ci->i_layout), (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), (unsigned long long)ceph_file_layout_object_size(ci->i_layout)); - - if (ceph_file_layout_pg_preferred(ci->i_layout) >= 0) { - val += ret; - size -= ret; - ret += snprintf(val, size, "preferred_osd=%lld\n", - (unsigned long long)ceph_file_layout_pg_preferred( - ci->i_layout)); - } - return ret; } diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index b8c60694b2b0..e81ab30d4896 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -65,7 +65,7 @@ struct ceph_file_layout { __le32 fl_object_stripe_unit; /* UNUSED. for per-object parity, if any */ /* object -> pg layout */ - __le32 fl_pg_preferred; /* preferred primary for pg (-1 for none) */ + __le32 fl_unused; /* unused; used to be preferred primary (-1) */ __le32 fl_pg_pool; /* namespace, crush ruleset, rep level */ } __attribute__ ((packed)); @@ -384,7 +384,7 @@ union ceph_mds_request_args { __le32 stripe_count; /* ... */ __le32 object_size; __le32 file_replication; - __le32 preferred; + __le32 unused; /* used to be preferred osd */ } __attribute__ ((packed)) open; struct { __le32 flags; diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index ba4c205cbb01..311ef8d6aa9e 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -65,8 +65,6 @@ struct ceph_osdmap { #define ceph_file_layout_cas_hash(l) ((__s32)le32_to_cpu((l).fl_cas_hash)) #define ceph_file_layout_object_su(l) \ ((__s32)le32_to_cpu((l).fl_object_stripe_unit)) -#define ceph_file_layout_pg_preferred(l) \ - ((__s32)le32_to_cpu((l).fl_pg_preferred)) #define ceph_file_layout_pg_pool(l) \ ((__s32)le32_to_cpu((l).fl_pg_pool)) diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 29ad46ec9dcf..7d39f3cb4947 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1000,7 +1000,6 @@ int ceph_calc_object_layout(struct ceph_object_layout *ol, { unsigned num, num_mask; struct ceph_pg pgid; - s32 preferred = (s32)le32_to_cpu(fl->fl_pg_preferred); int poolid = le32_to_cpu(fl->fl_pg_pool); struct ceph_pg_pool_info *pool; unsigned ps; @@ -1011,23 +1010,13 @@ int ceph_calc_object_layout(struct ceph_object_layout *ol, if (!pool) return -EIO; ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid)); - if (preferred >= 0) { - ps += preferred; - num = le32_to_cpu(pool->v.lpg_num); - num_mask = pool->lpg_num_mask; - } else { - num = le32_to_cpu(pool->v.pg_num); - num_mask = pool->pg_num_mask; - } + num = le32_to_cpu(pool->v.pg_num); + num_mask = pool->pg_num_mask; pgid.ps = cpu_to_le16(ps); - pgid.preferred = cpu_to_le16(preferred); + pgid.preferred = cpu_to_le16(-1); pgid.pool = fl->fl_pg_pool; - if (preferred >= 0) - dout("calc_object_layout '%s' pgid %d.%xp%d\n", oid, poolid, ps, - (int)preferred); - else - dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps); + dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps); ol->ol_pgid = pgid; ol->ol_stripe_unit = fl->fl_object_stripe_unit; @@ -1046,23 +1035,17 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, struct ceph_pg_pool_info *pool; int ruleno; unsigned poolid, ps, pps, t; - int preferred; poolid = le32_to_cpu(pgid.pool); ps = le16_to_cpu(pgid.ps); - preferred = (s16)le16_to_cpu(pgid.preferred); pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); if (!pool) return NULL; /* pg_temp? */ - if (preferred >= 0) - t = ceph_stable_mod(ps, le32_to_cpu(pool->v.lpg_num), - pool->lpgp_num_mask); - else - t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num), - pool->pgp_num_mask); + t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num), + pool->pgp_num_mask); pgid.ps = cpu_to_le16(t); pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); if (pg) { @@ -1080,23 +1063,13 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, return NULL; } - /* don't forcefeed bad device ids to crush */ - if (preferred >= osdmap->max_osd || - preferred >= osdmap->crush->max_devices) - preferred = -1; - - if (preferred >= 0) - pps = ceph_stable_mod(ps, - le32_to_cpu(pool->v.lpgp_num), - pool->lpgp_num_mask); - else - pps = ceph_stable_mod(ps, - le32_to_cpu(pool->v.pgp_num), - pool->pgp_num_mask); + pps = ceph_stable_mod(ps, + le32_to_cpu(pool->v.pgp_num), + pool->pgp_num_mask); pps += poolid; *num = crush_do_rule(osdmap->crush, ruleno, pps, osds, min_t(int, pool->v.size, *num), - preferred, osdmap->osd_weight); + -1, osdmap->osd_weight); return osds; } -- cgit v1.2.3 From e49bf4c51cbe27439c00516d4297193d45dd4097 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 7 May 2012 15:34:35 -0700 Subject: ceph: refactor SETLAYOUT and SETDIRLAYOUT ioctl checks into common helper Both of these methods perform similar checks; move that code to a helper so that we can ensure the checks are consistent. Reviewed-by: Alex Elder Signed-off-by: Sage Weil --- fs/ceph/ioctl.c | 82 ++++++++++++++++++++++++++------------------------------- 1 file changed, 38 insertions(+), 44 deletions(-) diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 4feab52c5bff..0e2f021109ee 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -34,6 +34,36 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg) return err; } +static long __validate_layout(struct ceph_mds_client *mdsc, + struct ceph_ioctl_layout *l) +{ + int i, err; + + /* preferred_osd is no longer supported */ + if (l->preferred_osd != -1) + return -EINVAL; + + /* validate striping parameters */ + if ((l->object_size & ~PAGE_MASK) || + (l->stripe_unit & ~PAGE_MASK) || + ((unsigned)l->object_size % (unsigned)l->stripe_unit)) + return -EINVAL; + + /* make sure it's a valid data pool */ + mutex_lock(&mdsc->mutex); + err = -EINVAL; + for (i = 0; i < mdsc->mdsmap->m_num_data_pg_pools; i++) + if (mdsc->mdsmap->m_data_pg_pools[i] == l->data_pool) { + err = 0; + break; + } + mutex_unlock(&mdsc->mutex); + if (err) + return err; + + return 0; +} + static long ceph_ioctl_set_layout(struct file *file, void __user *arg) { struct inode *inode = file->f_dentry->d_inode; @@ -43,15 +73,11 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) struct ceph_ioctl_layout l; struct ceph_inode_info *ci = ceph_inode(file->f_dentry->d_inode); struct ceph_ioctl_layout nl; - int err, i; + int err; if (copy_from_user(&l, arg, sizeof(l))) return -EFAULT; - /* preferred_osd is no longer supported */ - if (l.preferred_osd != -1) - return -EINVAL; - /* validate changed params against current layout */ err = ceph_do_getattr(file->f_dentry->d_inode, CEPH_STAT_CAP_LAYOUT); if (!err) { @@ -71,24 +97,9 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) if (l.data_pool) nl.data_pool = l.data_pool; - if ((nl.object_size & ~PAGE_MASK) || - (nl.stripe_unit & ~PAGE_MASK) || - ((unsigned)nl.object_size % (unsigned)nl.stripe_unit)) - return -EINVAL; - - /* make sure it's a valid data pool */ - if (l.data_pool > 0) { - mutex_lock(&mdsc->mutex); - err = -EINVAL; - for (i = 0; i < mdsc->mdsmap->m_num_data_pg_pools; i++) - if (mdsc->mdsmap->m_data_pg_pools[i] == l.data_pool) { - err = 0; - break; - } - mutex_unlock(&mdsc->mutex); - if (err) - return err; - } + err = __validate_layout(mdsc, &nl); + if (err) + return err; req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETLAYOUT, USE_AUTH_MDS); @@ -124,33 +135,16 @@ static long ceph_ioctl_set_layout_policy (struct file *file, void __user *arg) struct inode *inode = file->f_dentry->d_inode; struct ceph_mds_request *req; struct ceph_ioctl_layout l; - int err, i; + int err; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; /* copy and validate */ if (copy_from_user(&l, arg, sizeof(l))) return -EFAULT; - if ((l.object_size & ~PAGE_MASK) || - (l.stripe_unit & ~PAGE_MASK) || - !l.stripe_unit || - (l.object_size && - (unsigned)l.object_size % (unsigned)l.stripe_unit)) - return -EINVAL; - - /* make sure it's a valid data pool */ - if (l.data_pool > 0) { - mutex_lock(&mdsc->mutex); - err = -EINVAL; - for (i = 0; i < mdsc->mdsmap->m_num_data_pg_pools; i++) - if (mdsc->mdsmap->m_data_pg_pools[i] == l.data_pool) { - err = 0; - break; - } - mutex_unlock(&mdsc->mutex); - if (err) - return err; - } + err = __validate_layout(mdsc, &l); + if (err) + return err; req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETDIRLAYOUT, USE_AUTH_MDS); -- cgit v1.2.3 From 8b12d47b80c7a34dffdd98244d99316db490ec58 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 7 May 2012 15:38:35 -0700 Subject: crush: clean up types, const-ness Move various types from int -> __u32 (or similar), and add const as appropriate. This reflects changes that have been present in the userland implementation for some time. Reviewed-by: Alex Elder Signed-off-by: Sage Weil --- include/linux/crush/crush.h | 2 +- include/linux/crush/mapper.h | 6 +++--- net/ceph/crush/crush.c | 8 ++++---- net/ceph/crush/mapper.c | 31 ++++++++++++++++--------------- 4 files changed, 24 insertions(+), 23 deletions(-) diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 97e435b191f4..3f50369a50e8 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -168,7 +168,7 @@ struct crush_map { /* crush.c */ -extern int crush_get_bucket_item_weight(struct crush_bucket *b, int pos); +extern int crush_get_bucket_item_weight(const struct crush_bucket *b, int pos); extern void crush_calc_parents(struct crush_map *map); extern void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b); extern void crush_destroy_bucket_list(struct crush_bucket_list *b); diff --git a/include/linux/crush/mapper.h b/include/linux/crush/mapper.h index c46b99c18bb0..9322ab8bccd8 100644 --- a/include/linux/crush/mapper.h +++ b/include/linux/crush/mapper.h @@ -10,11 +10,11 @@ #include "crush.h" -extern int crush_find_rule(struct crush_map *map, int pool, int type, int size); -extern int crush_do_rule(struct crush_map *map, +extern int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size); +extern int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, int forcefeed, /* -1 for none */ - __u32 *weights); + const __u32 *weights); #endif diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c index d6ebb13a18a4..8dd19a0deedc 100644 --- a/net/ceph/crush/crush.c +++ b/net/ceph/crush/crush.c @@ -26,9 +26,9 @@ const char *crush_bucket_alg_name(int alg) * @b: bucket pointer * @p: item index in bucket */ -int crush_get_bucket_item_weight(struct crush_bucket *b, int p) +int crush_get_bucket_item_weight(const struct crush_bucket *b, int p) { - if (p >= b->size) + if ((__u32)p >= b->size) return 0; switch (b->alg) { @@ -124,10 +124,9 @@ void crush_destroy_bucket(struct crush_bucket *b) */ void crush_destroy(struct crush_map *map) { - int b; - /* buckets */ if (map->buckets) { + __s32 b; for (b = 0; b < map->max_buckets; b++) { if (map->buckets[b] == NULL) continue; @@ -138,6 +137,7 @@ void crush_destroy(struct crush_map *map) /* rules */ if (map->rules) { + __u32 b; for (b = 0; b < map->max_rules; b++) kfree(map->rules[b]); kfree(map->rules); diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index b79747c4b645..436102a8a461 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -32,9 +32,9 @@ * @type: storage ruleset type (user defined) * @size: output set size */ -int crush_find_rule(struct crush_map *map, int ruleset, int type, int size) +int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size) { - int i; + __u32 i; for (i = 0; i < map->max_rules; i++) { if (map->rules[i] && @@ -72,7 +72,7 @@ static int bucket_perm_choose(struct crush_bucket *bucket, unsigned i, s; /* start a new permutation if @x has changed */ - if (bucket->perm_x != x || bucket->perm_n == 0) { + if (bucket->perm_x != (__u32)x || bucket->perm_n == 0) { dprintk("bucket %d new x=%d\n", bucket->id, x); bucket->perm_x = x; @@ -219,7 +219,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket, static int bucket_straw_choose(struct crush_bucket_straw *bucket, int x, int r) { - int i; + __u32 i; int high = 0; __u64 high_draw = 0; __u64 draw; @@ -262,7 +262,7 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r) * true if device is marked "out" (failed, fully offloaded) * of the cluster */ -static int is_out(struct crush_map *map, __u32 *weight, int item, int x) +static int is_out(const struct crush_map *map, const __u32 *weight, int item, int x) { if (weight[item] >= 0x10000) return 0; @@ -287,16 +287,16 @@ static int is_out(struct crush_map *map, __u32 *weight, int item, int x) * @recurse_to_leaf: true if we want one device under each item of given type * @out2: second output vector for leaf items (if @recurse_to_leaf) */ -static int crush_choose(struct crush_map *map, +static int crush_choose(const struct crush_map *map, struct crush_bucket *bucket, - __u32 *weight, + const __u32 *weight, int x, int numrep, int type, int *out, int outpos, int firstn, int recurse_to_leaf, int *out2) { int rep; - int ftotal, flocal; + unsigned int ftotal, flocal; int retry_descent, retry_bucket, skip_rep; struct crush_bucket *in = bucket; int r; @@ -304,7 +304,7 @@ static int crush_choose(struct crush_map *map, int item = 0; int itemtype; int collide, reject; - const int orig_tries = 5; /* attempts before we fall back to search */ + const unsigned int orig_tries = 5; /* attempts before we fall back to search */ dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", bucket->id, x, outpos, numrep); @@ -325,7 +325,7 @@ static int crush_choose(struct crush_map *map, r = rep; if (in->alg == CRUSH_BUCKET_UNIFORM) { /* be careful */ - if (firstn || numrep >= in->size) + if (firstn || (__u32)numrep >= in->size) /* r' = r + f_total */ r += ftotal; else if (in->size % numrep == 0) @@ -425,7 +425,7 @@ reject: /* else give up */ skip_rep = 1; dprintk(" reject %d collide %d " - "ftotal %d flocal %d\n", + "ftotal %u flocal %u\n", reject, collide, ftotal, flocal); } @@ -456,9 +456,9 @@ reject: * @result_max: maximum result size * @force: force initial replica choice; -1 for none */ -int crush_do_rule(struct crush_map *map, +int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, - int force, __u32 *weight) + int force, const __u32 *weight) { int result_len; int force_context[CRUSH_MAX_DEPTH]; @@ -473,7 +473,7 @@ int crush_do_rule(struct crush_map *map, int osize; int *tmp; struct crush_rule *rule; - int step; + __u32 step; int i, j; int numrep; int firstn; @@ -488,7 +488,8 @@ int crush_do_rule(struct crush_map *map, /* * determine hierarchical context of force, if any. note * that this may or may not correspond to the specific types - * referenced by the crush rule. + * referenced by the crush rule. it will also only affect + * the first descent (TAKE). */ if (force >= 0 && force < map->max_devices && -- cgit v1.2.3 From c90f95ed46393e29d843686e21947d1c6fcb1164 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 7 May 2012 15:35:09 -0700 Subject: crush: adjust local retry threshold This small adjustment reflects a change that was made in ceph.git commit af6a9f30696c900a2a8bd7ae24e8ed15fb4964bb, about 6 months ago. An N-1 search is not exhaustive. Fixed ceph.git bug #1594. Reviewed-by: Alex Elder Signed-off-by: Sage Weil --- net/ceph/crush/mapper.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 436102a8a461..583f644b0e28 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -415,7 +415,7 @@ reject: if (collide && flocal < 3) /* retry locally a few times */ retry_bucket = 1; - else if (flocal < in->size + orig_tries) + else if (flocal <= in->size + orig_tries) /* exhaustive bucket search */ retry_bucket = 1; else if (ftotal < 20) -- cgit v1.2.3 From a1f4895be8bf1ba56c2306b058f51619e9b0e8f8 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 7 May 2012 15:35:24 -0700 Subject: crush: be more tolerant of nonsensical crush maps If we get a map that doesn't make sense, error out or ignore the badness instead of BUGging out. This reflects the ceph.git commits 9895f0bff7dc68e9b49b572613d242315fb11b6c and 8ded26472058d5205803f244c2f33cb6cb10de79. Reviewed-by: Alex Elder Signed-off-by: Sage Weil --- net/ceph/crush/mapper.c | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 583f644b0e28..00baad5d3bde 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -152,8 +152,8 @@ static int bucket_list_choose(struct crush_bucket_list *bucket, return bucket->h.items[i]; } - BUG_ON(1); - return 0; + dprintk("bad list sums for bucket %d\n", bucket->h.id); + return bucket->h.items[0]; } @@ -239,6 +239,7 @@ static int bucket_straw_choose(struct crush_bucket_straw *bucket, static int crush_bucket_choose(struct crush_bucket *in, int x, int r) { dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r); + BUG_ON(in->size == 0); switch (in->alg) { case CRUSH_BUCKET_UNIFORM: return bucket_uniform_choose((struct crush_bucket_uniform *)in, @@ -253,7 +254,7 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r) return bucket_straw_choose((struct crush_bucket_straw *)in, x, r); default: - BUG_ON(1); + dprintk("unknown bucket %d alg %d\n", in->id, in->alg); return in->items[0]; } } @@ -354,7 +355,11 @@ static int crush_choose(const struct crush_map *map, item = bucket_perm_choose(in, x, r); else item = crush_bucket_choose(in, x, r); - BUG_ON(item >= map->max_devices); + if (item >= map->max_devices) { + dprintk(" bad item %d\n", item); + skip_rep = 1; + break; + } /* desired type? */ if (item < 0) @@ -365,8 +370,12 @@ static int crush_choose(const struct crush_map *map, /* keep going? */ if (itemtype != type) { - BUG_ON(item >= 0 || - (-1-item) >= map->max_buckets); + if (item >= 0 || + (-1-item) >= map->max_buckets) { + dprintk(" bad item type %d\n", type); + skip_rep = 1; + break; + } in = map->buckets[-1-item]; retry_bucket = 1; continue; @@ -478,7 +487,10 @@ int crush_do_rule(const struct crush_map *map, int numrep; int firstn; - BUG_ON(ruleno >= map->max_rules); + if ((__u32)ruleno >= map->max_rules) { + dprintk(" bad ruleno %d\n", ruleno); + return 0; + } rule = map->rules[ruleno]; result_len = 0; @@ -528,7 +540,8 @@ int crush_do_rule(const struct crush_map *map, firstn = 1; case CRUSH_RULE_CHOOSE_LEAF_INDEP: case CRUSH_RULE_CHOOSE_INDEP: - BUG_ON(wsize == 0); + if (wsize == 0) + break; recurse_to_leaf = rule->steps[step].op == @@ -597,7 +610,9 @@ int crush_do_rule(const struct crush_map *map, break; default: - BUG_ON(1); + dprintk(" unknown op %d at step %d\n", + curstep->op, step); + break; } } return result_len; -- cgit v1.2.3 From 0668216efe16ab1adf077e5f138775cee2af927a Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 7 May 2012 15:35:48 -0700 Subject: crush: use a temporary variable to simplify crush_do_rule Use a temporary variable here to avoid repeated array lookups and clean up the code a bit. This reflects ceph.git commit 6b5be27634ad307b471a5bf0db85c4f5c834885f. Reviewed-by: Alex Elder Signed-off-by: Sage Weil --- net/ceph/crush/mapper.c | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 00baad5d3bde..fba9460fe572 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -519,14 +519,15 @@ int crush_do_rule(const struct crush_map *map, } for (step = 0; step < rule->len; step++) { + struct crush_rule_step *curstep = &rule->steps[step]; + firstn = 0; - switch (rule->steps[step].op) { + switch (curstep->op) { case CRUSH_RULE_TAKE: - w[0] = rule->steps[step].arg1; + w[0] = curstep->arg1; /* find position in force_context/hierarchy */ - while (force_pos >= 0 && - force_context[force_pos] != w[0]) + while (force_pos >= 0 && force_context[force_pos] != w[0]) force_pos--; /* and move past it */ if (force_pos >= 0) @@ -538,15 +539,16 @@ int crush_do_rule(const struct crush_map *map, case CRUSH_RULE_CHOOSE_LEAF_FIRSTN: case CRUSH_RULE_CHOOSE_FIRSTN: firstn = 1; + /* fall through */ case CRUSH_RULE_CHOOSE_LEAF_INDEP: case CRUSH_RULE_CHOOSE_INDEP: if (wsize == 0) break; recurse_to_leaf = - rule->steps[step].op == + curstep->op == CRUSH_RULE_CHOOSE_LEAF_FIRSTN || - rule->steps[step].op == + curstep->op == CRUSH_RULE_CHOOSE_LEAF_INDEP; /* reset output */ @@ -558,7 +560,7 @@ int crush_do_rule(const struct crush_map *map, * basically, numrep <= 0 means relative to * the provided result_max */ - numrep = rule->steps[step].arg1; + numrep = curstep->arg1; if (numrep <= 0) { numrep += result_max; if (numrep <= 0) @@ -569,7 +571,7 @@ int crush_do_rule(const struct crush_map *map, /* skip any intermediate types */ while (force_pos && force_context[force_pos] < 0 && - rule->steps[step].arg2 != + curstep->arg2 != map->buckets[-1 - force_context[force_pos]]->type) force_pos--; @@ -583,7 +585,7 @@ int crush_do_rule(const struct crush_map *map, map->buckets[-1-w[i]], weight, x, numrep, - rule->steps[step].arg2, + curstep->arg2, o+osize, j, firstn, recurse_to_leaf, c+osize); -- cgit v1.2.3 From 41ebcc0907c58f75d0b25afcaf8b9c35c6b1ad14 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 7 May 2012 15:39:29 -0700 Subject: crush: remove forcefeed functionality Remove forcefeed functionality from CRUSH. This is an ugly misfeature that is mostly useless and unused. Remove it. Reflects ceph.git commit ed974b5000f2851207d860a651809af4a1867942. Reviewed-by: Alex Elder Signed-off-by: Sage Weil Conflicts: net/ceph/crush/mapper.c --- include/linux/crush/mapper.h | 1 - net/ceph/crush/mapper.c | 48 +------------------------------------------- net/ceph/osdmap.c | 2 +- 3 files changed, 2 insertions(+), 49 deletions(-) diff --git a/include/linux/crush/mapper.h b/include/linux/crush/mapper.h index 9322ab8bccd8..71d79f44a7d0 100644 --- a/include/linux/crush/mapper.h +++ b/include/linux/crush/mapper.h @@ -14,7 +14,6 @@ extern int crush_find_rule(const struct crush_map *map, int ruleset, int type, i extern int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, - int forcefeed, /* -1 for none */ const __u32 *weights); #endif diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index fba9460fe572..11cf352201ba 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -463,15 +463,12 @@ reject: * @x: hash input * @result: pointer to result vector * @result_max: maximum result size - * @force: force initial replica choice; -1 for none */ int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, - int force, const __u32 *weight) + const __u32 *weight) { int result_len; - int force_context[CRUSH_MAX_DEPTH]; - int force_pos = -1; int a[CRUSH_MAX_SET]; int b[CRUSH_MAX_SET]; int c[CRUSH_MAX_SET]; @@ -497,27 +494,6 @@ int crush_do_rule(const struct crush_map *map, w = a; o = b; - /* - * determine hierarchical context of force, if any. note - * that this may or may not correspond to the specific types - * referenced by the crush rule. it will also only affect - * the first descent (TAKE). - */ - if (force >= 0 && - force < map->max_devices && - map->device_parents[force] != 0 && - !is_out(map, weight, force, x)) { - while (1) { - force_context[++force_pos] = force; - if (force >= 0) - force = map->device_parents[force]; - else - force = map->bucket_parents[-1-force]; - if (force == 0) - break; - } - } - for (step = 0; step < rule->len; step++) { struct crush_rule_step *curstep = &rule->steps[step]; @@ -525,14 +501,6 @@ int crush_do_rule(const struct crush_map *map, switch (curstep->op) { case CRUSH_RULE_TAKE: w[0] = curstep->arg1; - - /* find position in force_context/hierarchy */ - while (force_pos >= 0 && force_context[force_pos] != w[0]) - force_pos--; - /* and move past it */ - if (force_pos >= 0) - force_pos--; - wsize = 1; break; @@ -567,20 +535,6 @@ int crush_do_rule(const struct crush_map *map, continue; } j = 0; - if (osize == 0 && force_pos >= 0) { - /* skip any intermediate types */ - while (force_pos && - force_context[force_pos] < 0 && - curstep->arg2 != - map->buckets[-1 - - force_context[force_pos]]->type) - force_pos--; - o[osize] = force_context[force_pos]; - if (recurse_to_leaf) - c[osize] = force_context[0]; - j++; - force_pos--; - } osize += crush_choose(map, map->buckets[-1-w[i]], weight, diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 7d39f3cb4947..9dda36f7aa9d 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1069,7 +1069,7 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, pps += poolid; *num = crush_do_rule(osdmap->crush, ruleno, pps, osds, min_t(int, pool->v.size, *num), - -1, osdmap->osd_weight); + osdmap->osd_weight); return osds; } -- cgit v1.2.3 From fc7c3ae5ab9246ad96aab4d0d57f67e9255cfb56 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 7 May 2012 15:36:35 -0700 Subject: crush: remove parent maps These were used for the ill-fated forcefeed feature. Remove them. Reflects ceph.git commit ebdf80edfecfbd5a842b71fbe5732857994380c1. Reviewed-by: Alex Elder Signed-off-by: Sage Weil --- include/linux/crush/crush.h | 11 ----------- net/ceph/crush/crush.c | 25 ------------------------- net/ceph/osdmap.c | 7 ------- 3 files changed, 43 deletions(-) diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 3f50369a50e8..158a4d25ca83 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -151,16 +151,6 @@ struct crush_map { struct crush_bucket **buckets; struct crush_rule **rules; - /* - * Parent pointers to identify the parent bucket a device or - * bucket in the hierarchy. If an item appears more than - * once, this is the _last_ time it appeared (where buckets - * are processed in bucket id order, from -1 on down to - * -max_buckets. - */ - __u32 *bucket_parents; - __u32 *device_parents; - __s32 max_buckets; __u32 max_rules; __s32 max_devices; @@ -169,7 +159,6 @@ struct crush_map { /* crush.c */ extern int crush_get_bucket_item_weight(const struct crush_bucket *b, int pos); -extern void crush_calc_parents(struct crush_map *map); extern void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b); extern void crush_destroy_bucket_list(struct crush_bucket_list *b); extern void crush_destroy_bucket_tree(struct crush_bucket_tree *b); diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c index 8dd19a0deedc..2160791acf03 100644 --- a/net/ceph/crush/crush.c +++ b/net/ceph/crush/crush.c @@ -46,29 +46,6 @@ int crush_get_bucket_item_weight(const struct crush_bucket *b, int p) return 0; } -/** - * crush_calc_parents - Calculate parent vectors for the given crush map. - * @map: crush_map pointer - */ -void crush_calc_parents(struct crush_map *map) -{ - int i, b, c; - - for (b = 0; b < map->max_buckets; b++) { - if (map->buckets[b] == NULL) - continue; - for (i = 0; i < map->buckets[b]->size; i++) { - c = map->buckets[b]->items[i]; - BUG_ON(c >= map->max_devices || - c < -map->max_buckets); - if (c >= 0) - map->device_parents[c] = map->buckets[b]->id; - else - map->bucket_parents[-1-c] = map->buckets[b]->id; - } - } -} - void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b) { kfree(b->h.perm); @@ -143,8 +120,6 @@ void crush_destroy(struct crush_map *map) kfree(map->rules); } - kfree(map->bucket_parents); - kfree(map->device_parents); kfree(map); } diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 9dda36f7aa9d..dac448ba68e4 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -161,13 +161,6 @@ static struct crush_map *crush_decode(void *pbyval, void *end) c->max_rules = ceph_decode_32(p); c->max_devices = ceph_decode_32(p); - c->device_parents = kcalloc(c->max_devices, sizeof(u32), GFP_NOFS); - if (c->device_parents == NULL) - goto badmem; - c->bucket_parents = kcalloc(c->max_buckets, sizeof(u32), GFP_NOFS); - if (c->bucket_parents == NULL) - goto badmem; - c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS); if (c->buckets == NULL) goto badmem; -- cgit v1.2.3 From f671d4cd9b36691ac4ef42cde44c1b7a84e13631 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 7 May 2012 15:36:49 -0700 Subject: crush: fix tree node weight lookup Fix the node weight lookup for tree buckets by using a correct accessor. Reflects ceph.git commit d287ade5bcbdca82a3aef145b92924cf1e856733. Reviewed-by: Alex Elder Signed-off-by: Sage Weil --- include/linux/crush/crush.h | 5 +++++ net/ceph/crush/crush.c | 4 +--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 158a4d25ca83..7c4750811b96 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -166,4 +166,9 @@ extern void crush_destroy_bucket_straw(struct crush_bucket_straw *b); extern void crush_destroy_bucket(struct crush_bucket *b); extern void crush_destroy(struct crush_map *map); +static inline int crush_calc_tree_node(int i) +{ + return ((i+1) << 1)-1; +} + #endif diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c index 2160791acf03..b93575f4eb13 100644 --- a/net/ceph/crush/crush.c +++ b/net/ceph/crush/crush.c @@ -37,9 +37,7 @@ int crush_get_bucket_item_weight(const struct crush_bucket *b, int p) case CRUSH_BUCKET_LIST: return ((struct crush_bucket_list *)b)->item_weights[p]; case CRUSH_BUCKET_TREE: - if (p & 1) - return ((struct crush_bucket_tree *)b)->node_weights[p]; - return 0; + return ((struct crush_bucket_tree *)b)->node_weights[crush_calc_tree_node(p)]; case CRUSH_BUCKET_STRAW: return ((struct crush_bucket_straw *)b)->item_weights[p]; } -- cgit v1.2.3 From 6eb43f4b5a2a74599b4ff17a97c03a342327ca65 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 7 May 2012 15:37:05 -0700 Subject: crush: fix memory leak when destroying tree buckets Reflects ceph.git commit 46d63d98434b3bc9dad2fc9ab23cbaedc3bcb0e4. Reported-by: Alexander Lyakas Reviewed-by: Alex Elder Signed-off-by: Sage Weil --- net/ceph/crush/crush.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c index b93575f4eb13..089613234f03 100644 --- a/net/ceph/crush/crush.c +++ b/net/ceph/crush/crush.c @@ -62,6 +62,8 @@ void crush_destroy_bucket_list(struct crush_bucket_list *b) void crush_destroy_bucket_tree(struct crush_bucket_tree *b) { + kfree(b->h.perm); + kfree(b->h.items); kfree(b->node_weights); kfree(b); } -- cgit v1.2.3 From 8b393269008411a612ca549b733b4296e819f2fb Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 7 May 2012 15:37:23 -0700 Subject: crush: warn on do_rule failure If we get an error code from crush_do_rule(), print an error to the console. Reviewed-by: Alex Elder Signed-off-by: Sage Weil --- net/ceph/osdmap.c | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index dac448ba68e4..2592f3cca987 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1027,7 +1027,7 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, struct ceph_pg_mapping *pg; struct ceph_pg_pool_info *pool; int ruleno; - unsigned poolid, ps, pps, t; + unsigned poolid, ps, pps, t, r; poolid = le32_to_cpu(pgid.pool); ps = le16_to_cpu(pgid.ps); @@ -1060,9 +1060,16 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, le32_to_cpu(pool->v.pgp_num), pool->pgp_num_mask); pps += poolid; - *num = crush_do_rule(osdmap->crush, ruleno, pps, osds, - min_t(int, pool->v.size, *num), - osdmap->osd_weight); + r = crush_do_rule(osdmap->crush, ruleno, pps, osds, + min_t(int, pool->v.size, *num), + osdmap->osd_weight); + if (r < 0) { + pr_err("error %d from crush rule: pool %d ruleset %d type %d" + " size %d\n", r, poolid, pool->v.crush_ruleset, + pool->v.type, pool->v.size); + return NULL; + } + *num = r; return osds; } -- cgit v1.2.3 From 065a68f9167e20f321a62d044cb2c3024393d455 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 20 Apr 2012 15:49:43 -0500 Subject: ceph: osd_client: fix endianness bug in osd_req_encode_op() From Al Viro Al Viro noticed that we were using a non-cpu-encoded value in a switch statement in osd_req_encode_op(). The result would clearly not work correctly on a big-endian machine. Signed-off-by: Alex Elder --- net/ceph/osd_client.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 5e254055c910..daa2716a0c30 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -278,7 +278,7 @@ static void osd_req_encode_op(struct ceph_osd_request *req, { dst->op = cpu_to_le16(src->op); - switch (dst->op) { + switch (src->op) { case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: dst->extent.offset = -- cgit v1.2.3 From 76aa542fb90e3e91edb1146d10ca7cf2cae8e7e9 Mon Sep 17 00:00:00 2001 From: Xi Wang Date: Fri, 20 Apr 2012 15:49:44 -0500 Subject: ceph: fix bounds check in ceph_decode_need and ceph_encode_need Given a large n, the bounds check (*p + n > end) can be bypassed due to pointer wraparound. A safer check is (n > end - *p). [elder@dreamhost.com: inverted test and renamed ceph_has_room()] Signed-off-by: Xi Wang Reviewed-by: Alex Elder --- include/linux/ceph/decode.h | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h index c5b6939fb32a..ecf324eb2c9a 100644 --- a/include/linux/ceph/decode.h +++ b/include/linux/ceph/decode.h @@ -45,9 +45,14 @@ static inline void ceph_decode_copy(void **p, void *pv, size_t n) /* * bounds check input. */ +static inline int ceph_has_room(void **p, void *end, size_t n) +{ + return end >= *p && n <= end - *p; +} + #define ceph_decode_need(p, end, n, bad) \ do { \ - if (unlikely(*(p) + (n) > (end))) \ + if (!likely(ceph_has_room(p, end, n))) \ goto bad; \ } while (0) @@ -166,7 +171,7 @@ static inline void ceph_encode_string(void **p, void *end, #define ceph_encode_need(p, end, n, bad) \ do { \ - if (unlikely(*(p) + (n) > (end))) \ + if (!likely(ceph_has_room(p, end, n))) \ goto bad; \ } while (0) -- cgit v1.2.3 From f8ad495a8a0277b88c59bf38319e5e944aaf5a4a Mon Sep 17 00:00:00 2001 From: Dan Carpenter Date: Fri, 20 Apr 2012 15:49:44 -0500 Subject: rbd: use gfp_flags parameter in rbd_header_from_disk() We should use the gfp_flags that the caller specified instead of GFP_KERNEL here. There is only one caller and it uses GFP_KERNEL, so this change is just a cleanup and doesn't change how the code works. Signed-off-by: Dan Carpenter Reviewed-by: Alex Elder --- drivers/block/rbd.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index a67fa63a966b..ca59d4d9471e 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -506,11 +506,11 @@ static int rbd_header_from_disk(struct rbd_image_header *header, header->snap_names_len = le64_to_cpu(ondisk->snap_names_len); if (snap_count) { header->snap_names = kmalloc(header->snap_names_len, - GFP_KERNEL); + gfp_flags); if (!header->snap_names) goto err_snapc; header->snap_sizes = kmalloc(snap_count * sizeof(u64), - GFP_KERNEL); + gfp_flags); if (!header->snap_sizes) goto err_names; } else { -- cgit v1.2.3 From 50f7c4c967d0b5acd8e7ba6ab654dc4a7ac869ac Mon Sep 17 00:00:00 2001 From: Xi Wang Date: Fri, 20 Apr 2012 15:49:44 -0500 Subject: rbd: fix integer overflow in rbd_header_from_disk() ondisk->snap_count is read from disk via rbd_req_sync_read() and thus needs validation. Otherwise, a bogus `snap_count' could overflow the kmalloc() size, leading to memory corruption. Also use `u32' consistently for `snap_count'. [elder@dreamhost.com: changed to use UINT_MAX rather than ULONG_MAX] Signed-off-by: Xi Wang Reviewed-by: Alex Elder --- drivers/block/rbd.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index ca59d4d9471e..a75fe93a25b1 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -487,16 +487,18 @@ static void rbd_coll_release(struct kref *kref) */ static int rbd_header_from_disk(struct rbd_image_header *header, struct rbd_image_header_ondisk *ondisk, - int allocated_snaps, + u32 allocated_snaps, gfp_t gfp_flags) { - int i; - u32 snap_count; + u32 i, snap_count; if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT))) return -ENXIO; snap_count = le32_to_cpu(ondisk->snap_count); + if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context)) + / sizeof (*ondisk)) + return -EINVAL; header->snapc = kmalloc(sizeof(struct ceph_snap_context) + snap_count * sizeof (*ondisk), gfp_flags); @@ -1591,7 +1593,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev, { ssize_t rc; struct rbd_image_header_ondisk *dh; - int snap_count = 0; + u32 snap_count = 0; u64 ver; size_t len; -- cgit v1.2.3 From 403f24d3d51760a8b9368d595fa5f48c309f1a0f Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Mon, 5 Dec 2011 10:47:13 -0800 Subject: rbd: protect read of snapshot sequence number This is updated whenever a snapshot is added or deleted, and the snapc pointer is changed with every refresh of the header. Signed-off-by: Josh Durgin Reviewed-by: Alex Elder Reviewed-by: Yehuda Sadeh --- drivers/block/rbd.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index a75fe93a25b1..5ab9f55d3e0c 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1684,7 +1684,9 @@ static int rbd_header_add_snap(struct rbd_device *dev, if (ret < 0) return ret; - dev->header.snapc->seq = new_snapid; + down_write(&dev->header_rwsem); + dev->header.snapc->seq = new_snapid; + up_write(&dev->header_rwsem); return 0; bad: -- cgit v1.2.3 From 77dfe99fe3cb0b2b0545e19e2d57b7a9134ee3c0 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Mon, 21 Nov 2011 13:04:42 -0800 Subject: rbd: store snapshot id instead of index When a device was open at a snapshot, and snapshots were deleted or added, data from the wrong snapshot could be read. Instead of assuming the snap context is constant, store the actual snap id when the device is initialized, and rely on the OSDs to signal an error if we try reading from a snapshot that was deleted. Signed-off-by: Josh Durgin Reviewed-by: Alex Elder Reviewed-by: Yehuda Sadeh --- drivers/block/rbd.c | 27 +++++---------------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 5ab9f55d3e0c..c1650bdf2f6e 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -175,8 +175,7 @@ struct rbd_device { /* protects updating the header */ struct rw_semaphore header_rwsem; char snap_name[RBD_MAX_SNAP_NAME_LEN]; - u32 cur_snap; /* index+1 of current snapshot within snap context - 0 - for the head */ + u64 snap_id; /* current snapshot id */ int read_only; struct list_head node; @@ -554,21 +553,6 @@ err_snapc: return -ENOMEM; } -static int snap_index(struct rbd_image_header *header, int snap_num) -{ - return header->total_snaps - snap_num; -} - -static u64 cur_snap_id(struct rbd_device *rbd_dev) -{ - struct rbd_image_header *header = &rbd_dev->header; - - if (!rbd_dev->cur_snap) - return 0; - - return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)]; -} - static int snap_by_name(struct rbd_image_header *header, const char *snap_name, u64 *seq, u64 *size) { @@ -607,7 +591,7 @@ static int rbd_header_set_snap(struct rbd_device *dev, u64 *size) snapc->seq = header->snap_seq; else snapc->seq = 0; - dev->cur_snap = 0; + dev->snap_id = CEPH_NOSNAP; dev->read_only = 0; if (size) *size = header->image_size; @@ -615,8 +599,7 @@ static int rbd_header_set_snap(struct rbd_device *dev, u64 *size) ret = snap_by_name(header, dev->snap_name, &snapc->seq, size); if (ret < 0) goto done; - - dev->cur_snap = header->total_snaps - ret; + dev->snap_id = snapc->seq; dev->read_only = 1; } @@ -1522,7 +1505,7 @@ static void rbd_rq_fn(struct request_queue *q) coll, cur_seg); else rbd_req_read(rq, rbd_dev, - cur_snap_id(rbd_dev), + rbd_dev->snap_id, ofs, op_size, bio, coll, cur_seg); @@ -1657,7 +1640,7 @@ static int rbd_header_add_snap(struct rbd_device *dev, struct ceph_mon_client *monc; /* we should create a snapshot only if we're pointing at the head */ - if (dev->cur_snap) + if (dev->snap_id != CEPH_NOSNAP) return -EINVAL; monc = &dev->rbd_client->client->monc; -- cgit v1.2.3 From b06e6a6be796bc365a19b0ac5176b553c13abf2f Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Mon, 21 Nov 2011 18:16:52 -0800 Subject: rbd: remove conditional snapid parameters The snapid parameters passed to rbd_do_op() and rbd_req_sync_op() are now always either a valid snapid or an explicit CEPH_NOSNAP. [elder@dreamhost.com: Rephrased the description] Signed-off-by: Josh Durgin Reviewed-by: Alex Elder Reviewed-by: Yehuda Sadeh --- drivers/block/rbd.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index c1650bdf2f6e..4a0a829f79d1 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1152,7 +1152,7 @@ static int rbd_req_read(struct request *rq, int coll_index) { return rbd_do_op(rq, rbd_dev, NULL, - (snapid ? snapid : CEPH_NOSNAP), + snapid, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2, @@ -1171,7 +1171,7 @@ static int rbd_req_sync_read(struct rbd_device *dev, u64 *ver) { return rbd_req_sync_op(dev, NULL, - (snapid ? snapid : CEPH_NOSNAP), + snapid, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, -- cgit v1.2.3 From 3591538fb272d2432d112d47d7e0ddd0be4cded2 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Mon, 5 Dec 2011 18:25:13 -0800 Subject: rbd: fix snapshot size type Snapshot sizes should be the same type as regular image sizes. This only affects their displayed size in sysfs, not the reported size of an actual block device sizes. Signed-off-by: Josh Durgin Reviewed-by: Alex Elder Reviewed-by: Yehuda Sadeh --- drivers/block/rbd.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 4a0a829f79d1..83597965c486 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -141,7 +141,7 @@ struct rbd_request { struct rbd_snap { struct device dev; const char *name; - size_t size; + u64 size; struct list_head node; u64 id; }; @@ -1935,7 +1935,7 @@ static ssize_t rbd_snap_size_show(struct device *dev, { struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); - return sprintf(buf, "%zd\n", snap->size); + return sprintf(buf, "%llu\n", (unsigned long long)snap->size); } static ssize_t rbd_snap_id_show(struct device *dev, @@ -1944,7 +1944,7 @@ static ssize_t rbd_snap_id_show(struct device *dev, { struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); - return sprintf(buf, "%llu\n", (unsigned long long) snap->id); + return sprintf(buf, "%llu\n", (unsigned long long)snap->id); } static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL); -- cgit v1.2.3 From 263c6ca007a6693fb724a24c5a55716c49d33573 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Mon, 5 Dec 2011 10:43:42 -0800 Subject: rbd: rename __rbd_update_snaps to __rbd_refresh_header This function rereads the entire header and handles any changes in it, not just changes in snapshots. Signed-off-by: Josh Durgin Reviewed-by: Alex Elder Reviewed-by: Yehuda Sadeh --- drivers/block/rbd.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 83597965c486..65665c9c42c6 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -240,7 +240,7 @@ static void rbd_put_dev(struct rbd_device *rbd_dev) put_device(&rbd_dev->dev); } -static int __rbd_update_snaps(struct rbd_device *rbd_dev); +static int __rbd_refresh_header(struct rbd_device *rbd_dev); static int rbd_open(struct block_device *bdev, fmode_t mode) { @@ -1222,7 +1222,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, notify_id, (int)opcode); mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); - rc = __rbd_update_snaps(dev); + rc = __rbd_refresh_header(dev); mutex_unlock(&ctl_mutex); if (rc) pr_warning(RBD_DRV_NAME "%d got notification but failed to " @@ -1689,7 +1689,7 @@ static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev) /* * only read the first part of the ondisk header, without the snaps info */ -static int __rbd_update_snaps(struct rbd_device *rbd_dev) +static int __rbd_refresh_header(struct rbd_device *rbd_dev) { int ret; struct rbd_image_header h; @@ -1876,7 +1876,7 @@ static ssize_t rbd_image_refresh(struct device *dev, mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); - rc = __rbd_update_snaps(rbd_dev); + rc = __rbd_refresh_header(rbd_dev); if (rc < 0) ret = rc; @@ -2159,7 +2159,7 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev) rbd_dev->header.obj_version); if (ret == -ERANGE) { mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); - rc = __rbd_update_snaps(rbd_dev); + rc = __rbd_refresh_header(rbd_dev); mutex_unlock(&ctl_mutex); if (rc < 0) return rc; @@ -2544,7 +2544,7 @@ static ssize_t rbd_snap_add(struct device *dev, if (ret < 0) goto err_unlock; - ret = __rbd_update_snaps(rbd_dev); + ret = __rbd_refresh_header(rbd_dev); if (ret < 0) goto err_unlock; -- cgit v1.2.3 From b7f6519e6bc7a0c5a9e3eadc8a2c79c0d4556050 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Thu, 1 Dec 2011 15:12:03 -0800 Subject: rbd: correct sysfs snap attribute documentation Each attribute is prefixed with "snap_". Signed-off-by: Josh Durgin Reviewed-by: Alex Elder Reviewed-by: Yehuda Sadeh --- Documentation/ABI/testing/sysfs-bus-rbd | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd index dbedafb095e2..bcd88eb7ebcd 100644 --- a/Documentation/ABI/testing/sysfs-bus-rbd +++ b/Documentation/ABI/testing/sysfs-bus-rbd @@ -65,11 +65,11 @@ snap_* Entries under /sys/bus/rbd/devices//snap_ ------------------------------------------------------------- -id +snap_id The rados internal snapshot id assigned for this snapshot -size +snap_size The size of the image when this snapshot was taken. -- cgit v1.2.3 From 57dac9d1620942608306d8c17c98a9d1568ffdf4 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 10 May 2012 10:29:50 -0500 Subject: ceph: messenger: use read_partial() in read_partial_message() There are two blocks of code in read_partial_message()--those that read the header and footer of the message--that can be replaced by a call to read_partial(). Do that. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index f0993af2ae4d..673133ee3181 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1628,7 +1628,7 @@ static int read_partial_message(struct ceph_connection *con) { struct ceph_msg *m = con->in_msg; int ret; - int to, left; + int to; unsigned front_len, middle_len, data_len; bool do_datacrc = !con->msgr->nocrc; int skip; @@ -1638,15 +1638,10 @@ static int read_partial_message(struct ceph_connection *con) dout("read_partial_message con %p msg %p\n", con, m); /* header */ - while (con->in_base_pos < sizeof(con->in_hdr)) { - left = sizeof(con->in_hdr) - con->in_base_pos; - ret = ceph_tcp_recvmsg(con->sock, - (char *)&con->in_hdr + con->in_base_pos, - left); - if (ret <= 0) - return ret; - con->in_base_pos += ret; - } + to = 0; + ret = read_partial(con, &to, sizeof (con->in_hdr), &con->in_hdr); + if (ret <= 0) + return ret; crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); if (cpu_to_le32(crc) != con->in_hdr.crc) { @@ -1759,16 +1754,11 @@ static int read_partial_message(struct ceph_connection *con) } /* footer */ - to = sizeof(m->hdr) + sizeof(m->footer); - while (con->in_base_pos < to) { - left = to - con->in_base_pos; - ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer + - (con->in_base_pos - sizeof(m->hdr)), - left); - if (ret <= 0) - return ret; - con->in_base_pos += ret; - } + to = sizeof (m->hdr); + ret = read_partial(con, &to, sizeof (m->footer), &m->footer); + if (ret <= 0) + return ret; + dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", m, front_len, m->footer.front_crc, middle_len, m->footer.middle_crc, data_len, m->footer.data_crc); -- cgit v1.2.3 From e6cee71fac27c946a0bbad754dd076e66c4e9dbd Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 10 May 2012 10:29:50 -0500 Subject: ceph: messenger: update "to" in read_partial() caller read_partial() always increases whatever "to" value is supplied by adding the requested size to it, and that's the only thing it does with that pointed-to value. Do that pointer advance in the caller (and then only when the updated value will be subsequently used), and change the "to" parameter to be an in-only and non-pointer value. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 673133ee3181..37fd2aece232 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -992,11 +992,12 @@ static int prepare_read_message(struct ceph_connection *con) static int read_partial(struct ceph_connection *con, - int *to, int size, void *object) + int to, int size, void *object) { - *to += size; - while (con->in_base_pos < *to) { - int left = *to - con->in_base_pos; + int end = to + size; + + while (con->in_base_pos < end) { + int left = end - con->in_base_pos; int have = size - left; int ret = ceph_tcp_recvmsg(con->sock, object + have, left); if (ret <= 0) @@ -1017,14 +1018,16 @@ static int read_partial_banner(struct ceph_connection *con) dout("read_partial_banner %p at %d\n", con, con->in_base_pos); /* peer's banner */ - ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner); + ret = read_partial(con, to, strlen(CEPH_BANNER), con->in_banner); if (ret <= 0) goto out; - ret = read_partial(con, &to, sizeof(con->actual_peer_addr), + to += strlen(CEPH_BANNER); + ret = read_partial(con, to, sizeof(con->actual_peer_addr), &con->actual_peer_addr); if (ret <= 0) goto out; - ret = read_partial(con, &to, sizeof(con->peer_addr_for_me), + to += sizeof(con->actual_peer_addr); + ret = read_partial(con, to, sizeof(con->peer_addr_for_me), &con->peer_addr_for_me); if (ret <= 0) goto out; @@ -1038,10 +1041,11 @@ static int read_partial_connect(struct ceph_connection *con) dout("read_partial_connect %p at %d\n", con, con->in_base_pos); - ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply); + ret = read_partial(con, to, sizeof(con->in_reply), &con->in_reply); if (ret <= 0) goto out; - ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len), + to += sizeof(con->in_reply); + ret = read_partial(con, to, le32_to_cpu(con->in_reply.authorizer_len), con->auth_reply_buf); if (ret <= 0) goto out; @@ -1491,9 +1495,7 @@ static int process_connect(struct ceph_connection *con) */ static int read_partial_ack(struct ceph_connection *con) { - int to = 0; - - return read_partial(con, &to, sizeof(con->in_temp_ack), + return read_partial(con, 0, sizeof(con->in_temp_ack), &con->in_temp_ack); } @@ -1638,8 +1640,7 @@ static int read_partial_message(struct ceph_connection *con) dout("read_partial_message con %p msg %p\n", con, m); /* header */ - to = 0; - ret = read_partial(con, &to, sizeof (con->in_hdr), &con->in_hdr); + ret = read_partial(con, 0, sizeof (con->in_hdr), &con->in_hdr); if (ret <= 0) return ret; @@ -1755,7 +1756,7 @@ static int read_partial_message(struct ceph_connection *con) /* footer */ to = sizeof (m->hdr); - ret = read_partial(con, &to, sizeof (m->footer), &m->footer); + ret = read_partial(con, to, sizeof (m->footer), &m->footer); if (ret <= 0) return ret; -- cgit v1.2.3 From fd51653f78cf40a0516e521b6de22f329c5bad8d Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 10 May 2012 10:29:50 -0500 Subject: ceph: messenger: change read_partial() to take "end" arg Make the second argument to read_partial() be the ending input byte position rather than the beginning offset it now represents. This amounts to moving the addition "to + size" into the caller. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 60 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 37fd2aece232..a659b4de64aa 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -992,10 +992,8 @@ static int prepare_read_message(struct ceph_connection *con) static int read_partial(struct ceph_connection *con, - int to, int size, void *object) + int end, int size, void *object) { - int end = to + size; - while (con->in_base_pos < end) { int left = end - con->in_base_pos; int have = size - left; @@ -1013,40 +1011,52 @@ static int read_partial(struct ceph_connection *con, */ static int read_partial_banner(struct ceph_connection *con) { - int ret, to = 0; + int size; + int end; + int ret; dout("read_partial_banner %p at %d\n", con, con->in_base_pos); /* peer's banner */ - ret = read_partial(con, to, strlen(CEPH_BANNER), con->in_banner); + size = strlen(CEPH_BANNER); + end = size; + ret = read_partial(con, end, size, con->in_banner); if (ret <= 0) goto out; - to += strlen(CEPH_BANNER); - ret = read_partial(con, to, sizeof(con->actual_peer_addr), - &con->actual_peer_addr); + + size = sizeof (con->actual_peer_addr); + end += size; + ret = read_partial(con, end, size, &con->actual_peer_addr); if (ret <= 0) goto out; - to += sizeof(con->actual_peer_addr); - ret = read_partial(con, to, sizeof(con->peer_addr_for_me), - &con->peer_addr_for_me); + + size = sizeof (con->peer_addr_for_me); + end += size; + ret = read_partial(con, end, size, &con->peer_addr_for_me); if (ret <= 0) goto out; + out: return ret; } static int read_partial_connect(struct ceph_connection *con) { - int ret, to = 0; + int size; + int end; + int ret; dout("read_partial_connect %p at %d\n", con, con->in_base_pos); - ret = read_partial(con, to, sizeof(con->in_reply), &con->in_reply); + size = sizeof (con->in_reply); + end = size; + ret = read_partial(con, end, size, &con->in_reply); if (ret <= 0) goto out; - to += sizeof(con->in_reply); - ret = read_partial(con, to, le32_to_cpu(con->in_reply.authorizer_len), - con->auth_reply_buf); + + size = le32_to_cpu(con->in_reply.authorizer_len); + end += size; + ret = read_partial(con, end, size, con->auth_reply_buf); if (ret <= 0) goto out; @@ -1495,8 +1505,10 @@ static int process_connect(struct ceph_connection *con) */ static int read_partial_ack(struct ceph_connection *con) { - return read_partial(con, 0, sizeof(con->in_temp_ack), - &con->in_temp_ack); + int size = sizeof (con->in_temp_ack); + int end = size; + + return read_partial(con, end, size, &con->in_temp_ack); } @@ -1629,8 +1641,9 @@ static int read_partial_message_bio(struct ceph_connection *con, static int read_partial_message(struct ceph_connection *con) { struct ceph_msg *m = con->in_msg; + int size; + int end; int ret; - int to; unsigned front_len, middle_len, data_len; bool do_datacrc = !con->msgr->nocrc; int skip; @@ -1640,7 +1653,9 @@ static int read_partial_message(struct ceph_connection *con) dout("read_partial_message con %p msg %p\n", con, m); /* header */ - ret = read_partial(con, 0, sizeof (con->in_hdr), &con->in_hdr); + size = sizeof (con->in_hdr); + end = size; + ret = read_partial(con, end, size, &con->in_hdr); if (ret <= 0) return ret; @@ -1755,8 +1770,9 @@ static int read_partial_message(struct ceph_connection *con) } /* footer */ - to = sizeof (m->hdr); - ret = read_partial(con, to, sizeof (m->footer), &m->footer); + size = sizeof (m->footer); + end += size; + ret = read_partial(con, end, size, &m->footer); if (ret <= 0) return ret; -- cgit v1.2.3 From 702aeb1f88e707241d76e1e2a1a02dd81e6c2d77 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 14 May 2012 12:34:38 -0700 Subject: ceph: fully initialize new layout When we are setting a new layout, fully initialize the structure: - zero it out - always set preferred_osd to -1 Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- fs/ceph/ioctl.c | 19 +++++++++++++------ fs/ceph/ioctl.h | 2 ++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 0e2f021109ee..c0b7314b90f9 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -80,22 +80,29 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) /* validate changed params against current layout */ err = ceph_do_getattr(file->f_dentry->d_inode, CEPH_STAT_CAP_LAYOUT); - if (!err) { - nl.stripe_unit = ceph_file_layout_su(ci->i_layout); - nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); - nl.object_size = ceph_file_layout_object_size(ci->i_layout); - nl.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool); - } else + if (err) return err; + memset(&nl, 0, sizeof(nl)); if (l.stripe_count) nl.stripe_count = l.stripe_count; + else + nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); if (l.stripe_unit) nl.stripe_unit = l.stripe_unit; + else + nl.stripe_unit = ceph_file_layout_su(ci->i_layout); if (l.object_size) nl.object_size = l.object_size; + else + nl.object_size = ceph_file_layout_object_size(ci->i_layout); if (l.data_pool) nl.data_pool = l.data_pool; + else + nl.data_pool = ceph_file_layout_pg_pool(ci->i_layout); + + /* this is obsolete, and always -1 */ + nl.preferred_osd = le64_to_cpu(-1); err = __validate_layout(mdsc, &nl); if (err) diff --git a/fs/ceph/ioctl.h b/fs/ceph/ioctl.h index be4a60487333..c77028afb1e1 100644 --- a/fs/ceph/ioctl.h +++ b/fs/ceph/ioctl.h @@ -34,6 +34,8 @@ struct ceph_ioctl_layout { __u64 stripe_unit, stripe_count, object_size; __u64 data_pool; + + /* obsolete. new values ignored, always return -1 */ __s64 preferred_osd; }; -- cgit v1.2.3 From c047be09349752b8a4dac27bc9f130bf4d592f11 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 14 May 2012 12:34:50 -0700 Subject: ceph: ignore preferred_osd field Old users may not expect EINVAL, and there is no clear user-visibile behavior change now that we ignore it. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- fs/ceph/ioctl.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index c0b7314b90f9..8e3fb69fbe62 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -39,10 +39,6 @@ static long __validate_layout(struct ceph_mds_client *mdsc, { int i, err; - /* preferred_osd is no longer supported */ - if (l->preferred_osd != -1) - return -EINVAL; - /* validate striping parameters */ if ((l->object_size & ~PAGE_MASK) || (l->stripe_unit & ~PAGE_MASK) || -- cgit v1.2.3 From d329156f16306449c273002486c28de3ddddfd89 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 15:16:38 -0500 Subject: libceph: don't reset kvec in prepare_write_banner() Move the kvec reset for a connection out of prepare_write_banner and into its only caller. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index a659b4de64aa..bcbd409b5ca7 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -686,7 +686,6 @@ static int prepare_connect_authorizer(struct ceph_connection *con) static void prepare_write_banner(struct ceph_messenger *msgr, struct ceph_connection *con) { - ceph_con_out_kvec_reset(con); ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr), &msgr->my_enc_addr); @@ -726,10 +725,9 @@ static int prepare_write_connect(struct ceph_messenger *msgr, con->out_connect.protocol_version = cpu_to_le32(proto); con->out_connect.flags = 0; + ceph_con_out_kvec_reset(con); if (include_banner) prepare_write_banner(msgr, con); - else - ceph_con_out_kvec_reset(con); ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); con->out_more = 0; -- cgit v1.2.3 From 84fb3adf6413862cff51d8af3fce5f0b655586a2 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 15:16:38 -0500 Subject: ceph: messenger: reset connection kvec caller Reset a connection's kvec fields in the caller rather than in prepare_write_connect(). This ends up repeating a few lines of code but it's improving the separation between distinct operations on the connection, which we can take advantage of later. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index bcbd409b5ca7..cca3cf341d70 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -725,7 +725,6 @@ static int prepare_write_connect(struct ceph_messenger *msgr, con->out_connect.protocol_version = cpu_to_le32(proto); con->out_connect.flags = 0; - ceph_con_out_kvec_reset(con); if (include_banner) prepare_write_banner(msgr, con); ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); @@ -1389,6 +1388,7 @@ static int process_connect(struct ceph_connection *con) return -1; } con->auth_retry = 1; + ceph_con_out_kvec_reset(con); ret = prepare_write_connect(con->msgr, con, 0); if (ret < 0) return ret; @@ -1409,6 +1409,7 @@ static int process_connect(struct ceph_connection *con) ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr.in_addr)); reset_connection(con); + ceph_con_out_kvec_reset(con); prepare_write_connect(con->msgr, con, 0); prepare_read_connect(con); @@ -1432,6 +1433,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->out_connect.connect_seq), le32_to_cpu(con->in_connect.connect_seq)); con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); + ceph_con_out_kvec_reset(con); prepare_write_connect(con->msgr, con, 0); prepare_read_connect(con); break; @@ -1446,6 +1448,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_connect.global_seq)); get_global_seq(con->msgr, le32_to_cpu(con->in_connect.global_seq)); + ceph_con_out_kvec_reset(con); prepare_write_connect(con->msgr, con, 0); prepare_read_connect(con); break; @@ -1851,6 +1854,7 @@ more: /* open the socket first? */ if (con->sock == NULL) { + ceph_con_out_kvec_reset(con); prepare_write_connect(msgr, con, 1); prepare_read_banner(con); set_bit(CONNECTING, &con->state); -- cgit v1.2.3 From 41b90c00858129f52d08e6a05c9cfdb0f2bd074d Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 15:16:38 -0500 Subject: ceph: messenger: send banner in process_connect() prepare_write_connect() has an argument indicating whether a banner should be sent out before sending out a connection message. It's only ever set in one of its callers, so move the code that arranges to send the banner into that caller and drop the "include_banner" argument from prepare_write_connect(). Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index cca3cf341d70..6b38b6fbb25f 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -695,8 +695,7 @@ static void prepare_write_banner(struct ceph_messenger *msgr, } static int prepare_write_connect(struct ceph_messenger *msgr, - struct ceph_connection *con, - int include_banner) + struct ceph_connection *con) { unsigned global_seq = get_global_seq(con->msgr, 0); int proto; @@ -725,8 +724,6 @@ static int prepare_write_connect(struct ceph_messenger *msgr, con->out_connect.protocol_version = cpu_to_le32(proto); con->out_connect.flags = 0; - if (include_banner) - prepare_write_banner(msgr, con); ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); con->out_more = 0; @@ -1389,7 +1386,7 @@ static int process_connect(struct ceph_connection *con) } con->auth_retry = 1; ceph_con_out_kvec_reset(con); - ret = prepare_write_connect(con->msgr, con, 0); + ret = prepare_write_connect(con->msgr, con); if (ret < 0) return ret; prepare_read_connect(con); @@ -1410,7 +1407,7 @@ static int process_connect(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr)); reset_connection(con); ceph_con_out_kvec_reset(con); - prepare_write_connect(con->msgr, con, 0); + prepare_write_connect(con->msgr, con); prepare_read_connect(con); /* Tell ceph about it. */ @@ -1434,7 +1431,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_connect.connect_seq)); con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); ceph_con_out_kvec_reset(con); - prepare_write_connect(con->msgr, con, 0); + prepare_write_connect(con->msgr, con); prepare_read_connect(con); break; @@ -1449,7 +1446,7 @@ static int process_connect(struct ceph_connection *con) get_global_seq(con->msgr, le32_to_cpu(con->in_connect.global_seq)); ceph_con_out_kvec_reset(con); - prepare_write_connect(con->msgr, con, 0); + prepare_write_connect(con->msgr, con); prepare_read_connect(con); break; @@ -1855,7 +1852,8 @@ more: /* open the socket first? */ if (con->sock == NULL) { ceph_con_out_kvec_reset(con); - prepare_write_connect(msgr, con, 1); + prepare_write_banner(msgr, con); + prepare_write_connect(msgr, con); prepare_read_banner(con); set_bit(CONNECTING, &con->state); clear_bit(NEGOTIATING, &con->state); -- cgit v1.2.3 From e825a66df97776d30a48a187e3a986736af43945 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 15:16:38 -0500 Subject: ceph: drop msgr argument from prepare_write_connect() In all cases, the value passed as the msgr argument to prepare_write_connect() is just con->msgr. Just get the msgr value from the ceph connection and drop the unneeded argument. The only msgr passed to prepare_write_banner() is also therefore just the one from con->msgr, so change that function to drop the msgr argument as well. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 6b38b6fbb25f..47499dc0e413 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -683,19 +683,17 @@ static int prepare_connect_authorizer(struct ceph_connection *con) /* * We connected to a peer and are saying hello. */ -static void prepare_write_banner(struct ceph_messenger *msgr, - struct ceph_connection *con) +static void prepare_write_banner(struct ceph_connection *con) { ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); - ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr), - &msgr->my_enc_addr); + ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), + &con->msgr->my_enc_addr); con->out_more = 0; set_bit(WRITE_PENDING, &con->state); } -static int prepare_write_connect(struct ceph_messenger *msgr, - struct ceph_connection *con) +static int prepare_write_connect(struct ceph_connection *con) { unsigned global_seq = get_global_seq(con->msgr, 0); int proto; @@ -717,7 +715,7 @@ static int prepare_write_connect(struct ceph_messenger *msgr, dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, con->connect_seq, global_seq, proto); - con->out_connect.features = cpu_to_le64(msgr->supported_features); + con->out_connect.features = cpu_to_le64(con->msgr->supported_features); con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); con->out_connect.global_seq = cpu_to_le32(global_seq); @@ -1386,7 +1384,7 @@ static int process_connect(struct ceph_connection *con) } con->auth_retry = 1; ceph_con_out_kvec_reset(con); - ret = prepare_write_connect(con->msgr, con); + ret = prepare_write_connect(con); if (ret < 0) return ret; prepare_read_connect(con); @@ -1407,7 +1405,7 @@ static int process_connect(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr)); reset_connection(con); ceph_con_out_kvec_reset(con); - prepare_write_connect(con->msgr, con); + prepare_write_connect(con); prepare_read_connect(con); /* Tell ceph about it. */ @@ -1431,7 +1429,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_connect.connect_seq)); con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); ceph_con_out_kvec_reset(con); - prepare_write_connect(con->msgr, con); + prepare_write_connect(con); prepare_read_connect(con); break; @@ -1446,7 +1444,7 @@ static int process_connect(struct ceph_connection *con) get_global_seq(con->msgr, le32_to_cpu(con->in_connect.global_seq)); ceph_con_out_kvec_reset(con); - prepare_write_connect(con->msgr, con); + prepare_write_connect(con); prepare_read_connect(con); break; @@ -1840,7 +1838,6 @@ static void process_message(struct ceph_connection *con) */ static int try_write(struct ceph_connection *con) { - struct ceph_messenger *msgr = con->msgr; int ret = 1; dout("try_write start %p state %lu nref %d\n", con, con->state, @@ -1852,8 +1849,8 @@ more: /* open the socket first? */ if (con->sock == NULL) { ceph_con_out_kvec_reset(con); - prepare_write_banner(msgr, con); - prepare_write_connect(msgr, con); + prepare_write_banner(con); + prepare_write_connect(con); prepare_read_banner(con); set_bit(CONNECTING, &con->state); clear_bit(NEGOTIATING, &con->state); -- cgit v1.2.3 From e10c758e4031a801ea4d2f8fb39bf14c2658d74b Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 15:16:38 -0500 Subject: ceph: don't set WRITE_PENDING too early prepare_write_connect() prepares a connect message, then sets WRITE_PENDING on the connection. Then *after* this, it calls prepare_connect_authorizer(), which updates the content of the connection buffer already queued for sending. It's also possible it will result in prepare_write_connect() returning -EAGAIN despite the WRITE_PENDING big getting set. Fix this by preparing the connect authorizer first, setting the WRITE_PENDING bit only after that is done. Partially addresses http://tracker.newdream.net/issues/2424 Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 47499dc0e413..cf292939dd1e 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -697,6 +697,7 @@ static int prepare_write_connect(struct ceph_connection *con) { unsigned global_seq = get_global_seq(con->msgr, 0); int proto; + int ret; switch (con->peer_name.type) { case CEPH_ENTITY_TYPE_MON: @@ -723,11 +724,14 @@ static int prepare_write_connect(struct ceph_connection *con) con->out_connect.flags = 0; ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); + ret = prepare_connect_authorizer(con); + if (ret) + return ret; con->out_more = 0; set_bit(WRITE_PENDING, &con->state); - return prepare_connect_authorizer(con); + return 0; } /* -- cgit v1.2.3 From 5a0f8fdd8a0ebe320952a388331dc043d7e14ced Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 21:51:59 -0500 Subject: ceph: messenger: check prepare_write_connect() result prepare_write_connect() can return an error, but only one of its callers checks for it. All the rest are in functions that already return errors, so it should be fine to return the error if one gets returned. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index cf292939dd1e..8e76936a8c13 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1409,7 +1409,9 @@ static int process_connect(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr)); reset_connection(con); ceph_con_out_kvec_reset(con); - prepare_write_connect(con); + ret = prepare_write_connect(con); + if (ret < 0) + return ret; prepare_read_connect(con); /* Tell ceph about it. */ @@ -1433,7 +1435,9 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_connect.connect_seq)); con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); ceph_con_out_kvec_reset(con); - prepare_write_connect(con); + ret = prepare_write_connect(con); + if (ret < 0) + return ret; prepare_read_connect(con); break; @@ -1448,7 +1452,9 @@ static int process_connect(struct ceph_connection *con) get_global_seq(con->msgr, le32_to_cpu(con->in_connect.global_seq)); ceph_con_out_kvec_reset(con); - prepare_write_connect(con); + ret = prepare_write_connect(con); + if (ret < 0) + return ret; prepare_read_connect(con); break; @@ -1854,7 +1860,9 @@ more: if (con->sock == NULL) { ceph_con_out_kvec_reset(con); prepare_write_banner(con); - prepare_write_connect(con); + ret = prepare_write_connect(con); + if (ret < 0) + goto out; prepare_read_banner(con); set_bit(CONNECTING, &con->state); clear_bit(NEGOTIATING, &con->state); -- cgit v1.2.3 From b1c6b9803f5491e94041e6da96bc9dec3870e792 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 15:16:38 -0500 Subject: ceph: messenger: rework prepare_connect_authorizer() Change prepare_connect_authorizer() so it returns without dropping the connection mutex if the connection has no get_authorizer method. Use the symbolic CEPH_AUTH_UNKNOWN instead of 0 when assigning authorization protocols. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 8e76936a8c13..09409a3d9500 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -656,19 +656,29 @@ static void prepare_write_keepalive(struct ceph_connection *con) static int prepare_connect_authorizer(struct ceph_connection *con) { void *auth_buf; - int auth_len = 0; - int auth_protocol = 0; + int auth_len; + int auth_protocol; + + if (!con->ops->get_authorizer) { + con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; + con->out_connect.authorizer_len = 0; + + return 0; + } + + /* Can't hold the mutex while getting authorizer */ mutex_unlock(&con->mutex); - if (con->ops->get_authorizer) - con->ops->get_authorizer(con, &auth_buf, &auth_len, - &auth_protocol, &con->auth_reply_buf, - &con->auth_reply_buf_len, - con->auth_retry); + + auth_buf = NULL; + auth_len = 0; + auth_protocol = CEPH_AUTH_UNKNOWN; + con->ops->get_authorizer(con, &auth_buf, &auth_len, &auth_protocol, + &con->auth_reply_buf, &con->auth_reply_buf_len, + con->auth_retry); mutex_lock(&con->mutex); - if (test_bit(CLOSED, &con->state) || - test_bit(OPENING, &con->state)) + if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) return -EAGAIN; con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); -- cgit v1.2.3 From ed96af646011412c2bf1ffe860db170db355fae5 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 15:16:38 -0500 Subject: ceph: messenger: check return from get_authorizer In prepare_connect_authorizer(), a connection's get_authorizer method is called but ignores its return value. This function can return an error, so check for it and return it if that ever occurs. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 09409a3d9500..e0532d5b22f5 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -658,6 +658,7 @@ static int prepare_connect_authorizer(struct ceph_connection *con) void *auth_buf; int auth_len; int auth_protocol; + int ret; if (!con->ops->get_authorizer) { con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; @@ -673,11 +674,14 @@ static int prepare_connect_authorizer(struct ceph_connection *con) auth_buf = NULL; auth_len = 0; auth_protocol = CEPH_AUTH_UNKNOWN; - con->ops->get_authorizer(con, &auth_buf, &auth_len, &auth_protocol, - &con->auth_reply_buf, &con->auth_reply_buf_len, - con->auth_retry); + ret = con->ops->get_authorizer(con, &auth_buf, &auth_len, + &auth_protocol, &con->auth_reply_buf, + &con->auth_reply_buf_len, con->auth_retry); mutex_lock(&con->mutex); + if (ret) + return ret; + if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) return -EAGAIN; -- cgit v1.2.3 From 6c4a19158b96ea1fb8acbe0c1d5493d9dcd2f147 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 15:16:38 -0500 Subject: ceph: define ceph_auth_handshake type The definitions for the ceph_mds_session and ceph_osd both contain five fields related only to "authorizers." Encapsulate those fields into their own struct type, allowing for better isolation in some upcoming patches. Fix the #includes in "linux/ceph/osd_client.h" to lay out their more complete canonical path. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- fs/ceph/mds_client.c | 32 ++++++++++++++++---------------- fs/ceph/mds_client.h | 5 ++--- include/linux/ceph/auth.h | 8 ++++++++ include/linux/ceph/osd_client.h | 11 +++++------ net/ceph/osd_client.c | 32 ++++++++++++++++---------------- 5 files changed, 47 insertions(+), 41 deletions(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 89971e137aab..42013c620488 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -334,10 +334,10 @@ void ceph_put_mds_session(struct ceph_mds_session *s) dout("mdsc put_session %p %d -> %d\n", s, atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); if (atomic_dec_and_test(&s->s_ref)) { - if (s->s_authorizer) + if (s->s_auth.authorizer) s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer( s->s_mdsc->fsc->client->monc.auth, - s->s_authorizer); + s->s_auth.authorizer); kfree(s); } } @@ -3404,29 +3404,29 @@ static int get_authorizer(struct ceph_connection *con, struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; int ret = 0; - if (force_new && s->s_authorizer) { - ac->ops->destroy_authorizer(ac, s->s_authorizer); - s->s_authorizer = NULL; + if (force_new && s->s_auth.authorizer) { + ac->ops->destroy_authorizer(ac, s->s_auth.authorizer); + s->s_auth.authorizer = NULL; } - if (s->s_authorizer == NULL) { + if (s->s_auth.authorizer == NULL) { if (ac->ops->create_authorizer) { ret = ac->ops->create_authorizer( ac, CEPH_ENTITY_TYPE_MDS, - &s->s_authorizer, - &s->s_authorizer_buf, - &s->s_authorizer_buf_len, - &s->s_authorizer_reply_buf, - &s->s_authorizer_reply_buf_len); + &s->s_auth.authorizer, + &s->s_auth.authorizer_buf, + &s->s_auth.authorizer_buf_len, + &s->s_auth.authorizer_reply_buf, + &s->s_auth.authorizer_reply_buf_len); if (ret) return ret; } } *proto = ac->protocol; - *buf = s->s_authorizer_buf; - *len = s->s_authorizer_buf_len; - *reply_buf = s->s_authorizer_reply_buf; - *reply_len = s->s_authorizer_reply_buf_len; + *buf = s->s_auth.authorizer_buf; + *len = s->s_auth.authorizer_buf_len; + *reply_buf = s->s_auth.authorizer_reply_buf; + *reply_len = s->s_auth.authorizer_reply_buf_len; return 0; } @@ -3437,7 +3437,7 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len) struct ceph_mds_client *mdsc = s->s_mdsc; struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; - return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len); + return ac->ops->verify_authorizer_reply(ac, s->s_auth.authorizer, len); } static int invalidate_authorizer(struct ceph_connection *con) diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 8c7c04ebb595..dd26846dd71d 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -11,6 +11,7 @@ #include #include #include +#include /* * Some lock dependencies: @@ -113,9 +114,7 @@ struct ceph_mds_session { struct ceph_connection s_con; - struct ceph_authorizer *s_authorizer; - void *s_authorizer_buf, *s_authorizer_reply_buf; - size_t s_authorizer_buf_len, s_authorizer_reply_buf_len; + struct ceph_auth_handshake s_auth; /* protected by s_gen_ttl_lock */ spinlock_t s_gen_ttl_lock; diff --git a/include/linux/ceph/auth.h b/include/linux/ceph/auth.h index aa13392a7efb..5b774d141e09 100644 --- a/include/linux/ceph/auth.h +++ b/include/linux/ceph/auth.h @@ -14,6 +14,14 @@ struct ceph_auth_client; struct ceph_authorizer; +struct ceph_auth_handshake { + struct ceph_authorizer *authorizer; + void *authorizer_buf; + size_t authorizer_buf_len; + void *authorizer_reply_buf; + size_t authorizer_reply_buf_len; +}; + struct ceph_auth_client_ops { const char *name; diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 7c05ac202d90..cedfb1a8434a 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -6,9 +6,10 @@ #include #include -#include "types.h" -#include "osdmap.h" -#include "messenger.h" +#include +#include +#include +#include /* * Maximum object name size @@ -40,9 +41,7 @@ struct ceph_osd { struct list_head o_requests; struct list_head o_linger_requests; struct list_head o_osd_lru; - struct ceph_authorizer *o_authorizer; - void *o_authorizer_buf, *o_authorizer_reply_buf; - size_t o_authorizer_buf_len, o_authorizer_reply_buf_len; + struct ceph_auth_handshake o_auth; unsigned long lru_ttl; int o_marked_for_keepalive; struct list_head o_keepalive_item; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index daa2716a0c30..66b09d6a1531 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -667,8 +667,8 @@ static void put_osd(struct ceph_osd *osd) if (atomic_dec_and_test(&osd->o_ref)) { struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; - if (osd->o_authorizer) - ac->ops->destroy_authorizer(ac, osd->o_authorizer); + if (osd->o_auth.authorizer) + ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer); kfree(osd); } } @@ -2117,27 +2117,27 @@ static int get_authorizer(struct ceph_connection *con, struct ceph_auth_client *ac = osdc->client->monc.auth; int ret = 0; - if (force_new && o->o_authorizer) { - ac->ops->destroy_authorizer(ac, o->o_authorizer); - o->o_authorizer = NULL; + if (force_new && o->o_auth.authorizer) { + ac->ops->destroy_authorizer(ac, o->o_auth.authorizer); + o->o_auth.authorizer = NULL; } - if (o->o_authorizer == NULL) { + if (o->o_auth.authorizer == NULL) { ret = ac->ops->create_authorizer( ac, CEPH_ENTITY_TYPE_OSD, - &o->o_authorizer, - &o->o_authorizer_buf, - &o->o_authorizer_buf_len, - &o->o_authorizer_reply_buf, - &o->o_authorizer_reply_buf_len); + &o->o_auth.authorizer, + &o->o_auth.authorizer_buf, + &o->o_auth.authorizer_buf_len, + &o->o_auth.authorizer_reply_buf, + &o->o_auth.authorizer_reply_buf_len); if (ret) return ret; } *proto = ac->protocol; - *buf = o->o_authorizer_buf; - *len = o->o_authorizer_buf_len; - *reply_buf = o->o_authorizer_reply_buf; - *reply_len = o->o_authorizer_reply_buf_len; + *buf = o->o_auth.authorizer_buf; + *len = o->o_auth.authorizer_buf_len; + *reply_buf = o->o_auth.authorizer_reply_buf; + *reply_len = o->o_auth.authorizer_reply_buf_len; return 0; } @@ -2148,7 +2148,7 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len) struct ceph_osd_client *osdc = o->o_osdc; struct ceph_auth_client *ac = osdc->client->monc.auth; - return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len); + return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len); } static int invalidate_authorizer(struct ceph_connection *con) -- cgit v1.2.3 From 74f1869f76d043bad12ec03b4d5f04a8c3d1f157 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 15:16:39 -0500 Subject: ceph: messenger: reduce args to create_authorizer Make use of the new ceph_auth_handshake structure in order to reduce the number of arguments passed to the create_authorizor method in ceph_auth_client_ops. Use a local variable of that type as a shorthand in the get_authorizer method definitions. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- fs/ceph/mds_client.c | 27 ++++++++++++--------------- include/linux/ceph/auth.h | 4 +--- net/ceph/auth_none.c | 15 +++++++-------- net/ceph/auth_x.c | 15 +++++++-------- net/ceph/osd_client.c | 28 ++++++++++++---------------- 5 files changed, 39 insertions(+), 50 deletions(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 42013c620488..b71ffd2c8094 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -3402,31 +3402,28 @@ static int get_authorizer(struct ceph_connection *con, struct ceph_mds_session *s = con->private; struct ceph_mds_client *mdsc = s->s_mdsc; struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; + struct ceph_auth_handshake *auth = &s->s_auth; int ret = 0; - if (force_new && s->s_auth.authorizer) { - ac->ops->destroy_authorizer(ac, s->s_auth.authorizer); - s->s_auth.authorizer = NULL; + if (force_new && auth->authorizer) { + ac->ops->destroy_authorizer(ac, auth->authorizer); + auth->authorizer = NULL; } - if (s->s_auth.authorizer == NULL) { + if (auth->authorizer == NULL) { if (ac->ops->create_authorizer) { - ret = ac->ops->create_authorizer( - ac, CEPH_ENTITY_TYPE_MDS, - &s->s_auth.authorizer, - &s->s_auth.authorizer_buf, - &s->s_auth.authorizer_buf_len, - &s->s_auth.authorizer_reply_buf, - &s->s_auth.authorizer_reply_buf_len); + ret = ac->ops->create_authorizer(ac, + CEPH_ENTITY_TYPE_MDS, auth); if (ret) return ret; } } *proto = ac->protocol; - *buf = s->s_auth.authorizer_buf; - *len = s->s_auth.authorizer_buf_len; - *reply_buf = s->s_auth.authorizer_reply_buf; - *reply_len = s->s_auth.authorizer_reply_buf_len; + *buf = auth->authorizer_buf; + *len = auth->authorizer_buf_len; + *reply_buf = auth->authorizer_reply_buf; + *reply_len = auth->authorizer_reply_buf_len; + return 0; } diff --git a/include/linux/ceph/auth.h b/include/linux/ceph/auth.h index 5b774d141e09..d4080f309b56 100644 --- a/include/linux/ceph/auth.h +++ b/include/linux/ceph/auth.h @@ -51,9 +51,7 @@ struct ceph_auth_client_ops { * the response to authenticate the service. */ int (*create_authorizer)(struct ceph_auth_client *ac, int peer_type, - struct ceph_authorizer **a, - void **buf, size_t *len, - void **reply_buf, size_t *reply_len); + struct ceph_auth_handshake *auth); int (*verify_authorizer_reply)(struct ceph_auth_client *ac, struct ceph_authorizer *a, size_t len); void (*destroy_authorizer)(struct ceph_auth_client *ac, diff --git a/net/ceph/auth_none.c b/net/ceph/auth_none.c index 214c2bb43d62..925ca583c09c 100644 --- a/net/ceph/auth_none.c +++ b/net/ceph/auth_none.c @@ -59,9 +59,7 @@ static int handle_reply(struct ceph_auth_client *ac, int result, */ static int ceph_auth_none_create_authorizer( struct ceph_auth_client *ac, int peer_type, - struct ceph_authorizer **a, - void **buf, size_t *len, - void **reply_buf, size_t *reply_len) + struct ceph_auth_handshake *auth) { struct ceph_auth_none_info *ai = ac->private; struct ceph_none_authorizer *au = &ai->au; @@ -82,11 +80,12 @@ static int ceph_auth_none_create_authorizer( dout("built authorizer len %d\n", au->buf_len); } - *a = (struct ceph_authorizer *)au; - *buf = au->buf; - *len = au->buf_len; - *reply_buf = au->reply_buf; - *reply_len = sizeof(au->reply_buf); + auth->authorizer = (struct ceph_authorizer *) au; + auth->authorizer_buf = au->buf; + auth->authorizer_buf_len = au->buf_len; + auth->authorizer_reply_buf = au->reply_buf; + auth->authorizer_reply_buf_len = sizeof (au->reply_buf); + return 0; bad2: diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c index 1587dc6010c6..a16bf14eb027 100644 --- a/net/ceph/auth_x.c +++ b/net/ceph/auth_x.c @@ -526,9 +526,7 @@ static int ceph_x_handle_reply(struct ceph_auth_client *ac, int result, static int ceph_x_create_authorizer( struct ceph_auth_client *ac, int peer_type, - struct ceph_authorizer **a, - void **buf, size_t *len, - void **reply_buf, size_t *reply_len) + struct ceph_auth_handshake *auth) { struct ceph_x_authorizer *au; struct ceph_x_ticket_handler *th; @@ -548,11 +546,12 @@ static int ceph_x_create_authorizer( return ret; } - *a = (struct ceph_authorizer *)au; - *buf = au->buf->vec.iov_base; - *len = au->buf->vec.iov_len; - *reply_buf = au->reply_buf; - *reply_len = sizeof(au->reply_buf); + auth->authorizer = (struct ceph_authorizer *) au; + auth->authorizer_buf = au->buf->vec.iov_base; + auth->authorizer_buf_len = au->buf->vec.iov_len; + auth->authorizer_reply_buf = au->reply_buf; + auth->authorizer_reply_buf_len = sizeof (au->reply_buf); + return 0; } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 66b09d6a1531..2da4b9e97dc1 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2115,29 +2115,25 @@ static int get_authorizer(struct ceph_connection *con, struct ceph_osd *o = con->private; struct ceph_osd_client *osdc = o->o_osdc; struct ceph_auth_client *ac = osdc->client->monc.auth; + struct ceph_auth_handshake *auth = &o->o_auth; int ret = 0; - if (force_new && o->o_auth.authorizer) { - ac->ops->destroy_authorizer(ac, o->o_auth.authorizer); - o->o_auth.authorizer = NULL; - } - if (o->o_auth.authorizer == NULL) { - ret = ac->ops->create_authorizer( - ac, CEPH_ENTITY_TYPE_OSD, - &o->o_auth.authorizer, - &o->o_auth.authorizer_buf, - &o->o_auth.authorizer_buf_len, - &o->o_auth.authorizer_reply_buf, - &o->o_auth.authorizer_reply_buf_len); + if (force_new && auth->authorizer) { + ac->ops->destroy_authorizer(ac, auth->authorizer); + auth->authorizer = NULL; + } + if (auth->authorizer == NULL) { + ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, auth); if (ret) return ret; } *proto = ac->protocol; - *buf = o->o_auth.authorizer_buf; - *len = o->o_auth.authorizer_buf_len; - *reply_buf = o->o_auth.authorizer_reply_buf; - *reply_len = o->o_auth.authorizer_reply_buf_len; + *buf = auth->authorizer_buf; + *len = auth->authorizer_buf_len; + *reply_buf = auth->authorizer_reply_buf; + *reply_len = auth->authorizer_reply_buf_len; + return 0; } -- cgit v1.2.3 From a255651d4cad89f1a606edd36135af892ada4f20 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 15:16:39 -0500 Subject: ceph: ensure auth ops are defined before use In the create_authorizer method for both the mds and osd clients, the auth_client->ops pointer is blindly dereferenced. There is no obvious guarantee that this pointer has been assigned. And furthermore, even if the ops pointer is non-null there is definitely no guarantee that the create_authorizer or destroy_authorizer methods are defined. Add checks in both routines to make sure they are defined (non-null) before use. Add similar checks in a few other spots in these files while we're at it. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- fs/ceph/mds_client.c | 14 ++++++-------- net/ceph/osd_client.c | 15 ++++++++++----- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index b71ffd2c8094..462281742aef 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -3406,16 +3406,14 @@ static int get_authorizer(struct ceph_connection *con, int ret = 0; if (force_new && auth->authorizer) { - ac->ops->destroy_authorizer(ac, auth->authorizer); + if (ac->ops && ac->ops->destroy_authorizer) + ac->ops->destroy_authorizer(ac, auth->authorizer); auth->authorizer = NULL; } - if (auth->authorizer == NULL) { - if (ac->ops->create_authorizer) { - ret = ac->ops->create_authorizer(ac, - CEPH_ENTITY_TYPE_MDS, auth); - if (ret) - return ret; - } + if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) { + ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, auth); + if (ret) + return ret; } *proto = ac->protocol; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 2da4b9e97dc1..f640bdf027e7 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -664,10 +664,10 @@ static void put_osd(struct ceph_osd *osd) { dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), atomic_read(&osd->o_ref) - 1); - if (atomic_dec_and_test(&osd->o_ref)) { + if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; - if (osd->o_auth.authorizer) + if (ac->ops && ac->ops->destroy_authorizer) ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer); kfree(osd); } @@ -2119,10 +2119,11 @@ static int get_authorizer(struct ceph_connection *con, int ret = 0; if (force_new && auth->authorizer) { - ac->ops->destroy_authorizer(ac, auth->authorizer); + if (ac->ops && ac->ops->destroy_authorizer) + ac->ops->destroy_authorizer(ac, auth->authorizer); auth->authorizer = NULL; } - if (auth->authorizer == NULL) { + if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) { ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, auth); if (ret) return ret; @@ -2144,6 +2145,10 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len) struct ceph_osd_client *osdc = o->o_osdc; struct ceph_auth_client *ac = osdc->client->monc.auth; + /* + * XXX If ac->ops or ac->ops->verify_authorizer_reply is null, + * XXX which do we do: succeed or fail? + */ return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len); } @@ -2153,7 +2158,7 @@ static int invalidate_authorizer(struct ceph_connection *con) struct ceph_osd_client *osdc = o->o_osdc; struct ceph_auth_client *ac = osdc->client->monc.auth; - if (ac->ops->invalidate_authorizer) + if (ac->ops && ac->ops->invalidate_authorizer) ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); return ceph_monc_validate_auth(&osdc->client->monc); -- cgit v1.2.3 From a3530df33eb91d787d08c7383a0a9982690e42d0 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 15:16:39 -0500 Subject: ceph: have get_authorizer methods return pointers Have the get_authorizer auth_client method return a ceph_auth pointer rather than an integer, pointer-encoding any returned error value. This is to pave the way for making use of the returned value in an upcoming patch. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- fs/ceph/mds_client.c | 20 +++++++++++++------- include/linux/ceph/messenger.h | 8 +++++--- net/ceph/messenger.c | 8 ++++---- net/ceph/osd_client.c | 19 ++++++++++++------- 4 files changed, 34 insertions(+), 21 deletions(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 462281742aef..67938a9d049b 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -3395,15 +3395,20 @@ out: /* * authentication */ -static int get_authorizer(struct ceph_connection *con, - void **buf, int *len, int *proto, - void **reply_buf, int *reply_len, int force_new) + +/* + * Note: returned pointer is the address of a structure that's + * managed separately. Caller must *not* attempt to free it. + */ +static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, + void **buf, int *len, int *proto, + void **reply_buf, int *reply_len, + int force_new) { struct ceph_mds_session *s = con->private; struct ceph_mds_client *mdsc = s->s_mdsc; struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; struct ceph_auth_handshake *auth = &s->s_auth; - int ret = 0; if (force_new && auth->authorizer) { if (ac->ops && ac->ops->destroy_authorizer) @@ -3411,9 +3416,10 @@ static int get_authorizer(struct ceph_connection *con, auth->authorizer = NULL; } if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) { - ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, auth); + int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, + auth); if (ret) - return ret; + return ERR_PTR(ret); } *proto = ac->protocol; @@ -3422,7 +3428,7 @@ static int get_authorizer(struct ceph_connection *con, *reply_buf = auth->authorizer_reply_buf; *reply_len = auth->authorizer_reply_buf_len; - return 0; + return auth; } diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 3bff047f6b0f..b10b55f8f301 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -25,9 +25,11 @@ struct ceph_connection_operations { void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m); /* authorize an outgoing connection */ - int (*get_authorizer) (struct ceph_connection *con, - void **buf, int *len, int *proto, - void **reply_buf, int *reply_len, int force_new); + struct ceph_auth_handshake *(*get_authorizer) ( + struct ceph_connection *con, + void **buf, int *len, int *proto, + void **reply_buf, int *reply_len, + int force_new); int (*verify_authorizer_reply) (struct ceph_connection *con, int len); int (*invalidate_authorizer)(struct ceph_connection *con); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index e0532d5b22f5..ac27a2c0694a 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -658,7 +658,7 @@ static int prepare_connect_authorizer(struct ceph_connection *con) void *auth_buf; int auth_len; int auth_protocol; - int ret; + struct ceph_auth_handshake *auth; if (!con->ops->get_authorizer) { con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; @@ -674,13 +674,13 @@ static int prepare_connect_authorizer(struct ceph_connection *con) auth_buf = NULL; auth_len = 0; auth_protocol = CEPH_AUTH_UNKNOWN; - ret = con->ops->get_authorizer(con, &auth_buf, &auth_len, + auth = con->ops->get_authorizer(con, &auth_buf, &auth_len, &auth_protocol, &con->auth_reply_buf, &con->auth_reply_buf_len, con->auth_retry); mutex_lock(&con->mutex); - if (ret) - return ret; + if (IS_ERR(auth)) + return PTR_ERR(auth); if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) return -EAGAIN; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index f640bdf027e7..fa74ae0ea910 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2108,15 +2108,19 @@ static void put_osd_con(struct ceph_connection *con) /* * authentication */ -static int get_authorizer(struct ceph_connection *con, - void **buf, int *len, int *proto, - void **reply_buf, int *reply_len, int force_new) +/* + * Note: returned pointer is the address of a structure that's + * managed separately. Caller must *not* attempt to free it. + */ +static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, + void **buf, int *len, int *proto, + void **reply_buf, int *reply_len, + int force_new) { struct ceph_osd *o = con->private; struct ceph_osd_client *osdc = o->o_osdc; struct ceph_auth_client *ac = osdc->client->monc.auth; struct ceph_auth_handshake *auth = &o->o_auth; - int ret = 0; if (force_new && auth->authorizer) { if (ac->ops && ac->ops->destroy_authorizer) @@ -2124,9 +2128,10 @@ static int get_authorizer(struct ceph_connection *con, auth->authorizer = NULL; } if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) { - ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, auth); + int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, + auth); if (ret) - return ret; + return ERR_PTR(ret); } *proto = ac->protocol; @@ -2135,7 +2140,7 @@ static int get_authorizer(struct ceph_connection *con, *reply_buf = auth->authorizer_reply_buf; *reply_len = auth->authorizer_reply_buf_len; - return 0; + return auth; } -- cgit v1.2.3 From 8f43fb53894079bf0caab6e348ceaffe7adc651a Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 15:16:39 -0500 Subject: ceph: use info returned by get_authorizer Rather than passing a bunch of arguments to be filled in with the content of the ceph_auth_handshake buffer now returned by the get_authorizer method, just use the returned information in the caller, and drop the unnecessary arguments. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- fs/ceph/mds_client.c | 9 +-------- include/linux/ceph/messenger.h | 4 +--- net/ceph/messenger.c | 13 +++++++------ net/ceph/osd_client.c | 9 +-------- 4 files changed, 10 insertions(+), 25 deletions(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 67938a9d049b..200bc87eceb1 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -3401,9 +3401,7 @@ out: * managed separately. Caller must *not* attempt to free it. */ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, - void **buf, int *len, int *proto, - void **reply_buf, int *reply_len, - int force_new) + int *proto, int force_new) { struct ceph_mds_session *s = con->private; struct ceph_mds_client *mdsc = s->s_mdsc; @@ -3421,12 +3419,7 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, if (ret) return ERR_PTR(ret); } - *proto = ac->protocol; - *buf = auth->authorizer_buf; - *len = auth->authorizer_buf_len; - *reply_buf = auth->authorizer_reply_buf; - *reply_len = auth->authorizer_reply_buf_len; return auth; } diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index b10b55f8f301..2521a95fa6d9 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -27,9 +27,7 @@ struct ceph_connection_operations { /* authorize an outgoing connection */ struct ceph_auth_handshake *(*get_authorizer) ( struct ceph_connection *con, - void **buf, int *len, int *proto, - void **reply_buf, int *reply_len, - int force_new); + int *proto, int force_new); int (*verify_authorizer_reply) (struct ceph_connection *con, int len); int (*invalidate_authorizer)(struct ceph_connection *con); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index ac27a2c0694a..6d82c1a1a89b 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -671,20 +671,21 @@ static int prepare_connect_authorizer(struct ceph_connection *con) mutex_unlock(&con->mutex); - auth_buf = NULL; - auth_len = 0; auth_protocol = CEPH_AUTH_UNKNOWN; - auth = con->ops->get_authorizer(con, &auth_buf, &auth_len, - &auth_protocol, &con->auth_reply_buf, - &con->auth_reply_buf_len, con->auth_retry); + auth = con->ops->get_authorizer(con, &auth_protocol, con->auth_retry); + mutex_lock(&con->mutex); if (IS_ERR(auth)) return PTR_ERR(auth); - if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) return -EAGAIN; + auth_buf = auth->authorizer_buf; + auth_len = auth->authorizer_buf_len; + con->auth_reply_buf = auth->authorizer_reply_buf; + con->auth_reply_buf_len = auth->authorizer_reply_buf_len; + con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); con->out_connect.authorizer_len = cpu_to_le32(auth_len); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index fa74ae0ea910..b7d633cc96a6 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2113,9 +2113,7 @@ static void put_osd_con(struct ceph_connection *con) * managed separately. Caller must *not* attempt to free it. */ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, - void **buf, int *len, int *proto, - void **reply_buf, int *reply_len, - int force_new) + int *proto, int force_new) { struct ceph_osd *o = con->private; struct ceph_osd_client *osdc = o->o_osdc; @@ -2133,12 +2131,7 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, if (ret) return ERR_PTR(ret); } - *proto = ac->protocol; - *buf = auth->authorizer_buf; - *len = auth->authorizer_buf_len; - *reply_buf = auth->authorizer_reply_buf; - *reply_len = auth->authorizer_reply_buf_len; return auth; } -- cgit v1.2.3 From 729796be9190f57ca40ccca315e8ad34a1eb8fef Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 15:16:39 -0500 Subject: ceph: return pointer from prepare_connect_authorizer() Change prepare_connect_authorizer() so it returns a pointer (or pointer-coded error). Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 6d82c1a1a89b..f92d564c1505 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -653,7 +653,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) * Connection negotiation. */ -static int prepare_connect_authorizer(struct ceph_connection *con) +static struct ceph_auth_handshake *prepare_connect_authorizer(struct ceph_connection *con) { void *auth_buf; int auth_len; @@ -664,7 +664,7 @@ static int prepare_connect_authorizer(struct ceph_connection *con) con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; con->out_connect.authorizer_len = 0; - return 0; + return NULL; } /* Can't hold the mutex while getting authorizer */ @@ -677,9 +677,9 @@ static int prepare_connect_authorizer(struct ceph_connection *con) mutex_lock(&con->mutex); if (IS_ERR(auth)) - return PTR_ERR(auth); + return auth; if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) - return -EAGAIN; + return ERR_PTR(-EAGAIN); auth_buf = auth->authorizer_buf; auth_len = auth->authorizer_buf_len; @@ -692,7 +692,7 @@ static int prepare_connect_authorizer(struct ceph_connection *con) if (auth_len) ceph_con_out_kvec_add(con, auth_len, auth_buf); - return 0; + return auth; } /* @@ -712,7 +712,7 @@ static int prepare_write_connect(struct ceph_connection *con) { unsigned global_seq = get_global_seq(con->msgr, 0); int proto; - int ret; + struct ceph_auth_handshake *auth; switch (con->peer_name.type) { case CEPH_ENTITY_TYPE_MON: @@ -739,9 +739,9 @@ static int prepare_write_connect(struct ceph_connection *con) con->out_connect.flags = 0; ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); - ret = prepare_connect_authorizer(con); - if (ret) - return ret; + auth = prepare_connect_authorizer(con); + if (IS_ERR(auth)) + return PTR_ERR(auth); con->out_more = 0; set_bit(WRITE_PENDING, &con->state); -- cgit v1.2.3 From dac1e716c60161867a47745bca592987ca3a9cb2 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 15:16:39 -0500 Subject: ceph: rename prepare_connect_authorizer() Change the name of prepare_connect_authorizer(). The next patch is going to make this function no longer add anything to the connection's out_kvec, so it will no longer fit the pattern of the rest of the prepare_connect_*() functions. In addition, pass the address of a variable that will hold the authorization protocol to use. Move the assignment of that to the connection's out_connect structure into prepare_write_connect(). Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index f92d564c1505..bfddd87db788 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -653,11 +653,11 @@ static void prepare_write_keepalive(struct ceph_connection *con) * Connection negotiation. */ -static struct ceph_auth_handshake *prepare_connect_authorizer(struct ceph_connection *con) +static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con, + int *auth_proto) { void *auth_buf; int auth_len; - int auth_protocol; struct ceph_auth_handshake *auth; if (!con->ops->get_authorizer) { @@ -671,8 +671,7 @@ static struct ceph_auth_handshake *prepare_connect_authorizer(struct ceph_connec mutex_unlock(&con->mutex); - auth_protocol = CEPH_AUTH_UNKNOWN; - auth = con->ops->get_authorizer(con, &auth_protocol, con->auth_retry); + auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); mutex_lock(&con->mutex); @@ -686,7 +685,6 @@ static struct ceph_auth_handshake *prepare_connect_authorizer(struct ceph_connec con->auth_reply_buf = auth->authorizer_reply_buf; con->auth_reply_buf_len = auth->authorizer_reply_buf_len; - con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); con->out_connect.authorizer_len = cpu_to_le32(auth_len); if (auth_len) @@ -712,6 +710,7 @@ static int prepare_write_connect(struct ceph_connection *con) { unsigned global_seq = get_global_seq(con->msgr, 0); int proto; + int auth_proto; struct ceph_auth_handshake *auth; switch (con->peer_name.type) { @@ -739,9 +738,11 @@ static int prepare_write_connect(struct ceph_connection *con) con->out_connect.flags = 0; ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); - auth = prepare_connect_authorizer(con); + auth_proto = CEPH_AUTH_UNKNOWN; + auth = get_connect_authorizer(con, &auth_proto); if (IS_ERR(auth)) return PTR_ERR(auth); + con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); con->out_more = 0; set_bit(WRITE_PENDING, &con->state); -- cgit v1.2.3 From 3da54776e2c0385c32d143fd497a7f40a88e29dd Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 16 May 2012 15:16:39 -0500 Subject: ceph: add auth buf in prepare_write_connect() Move the addition of the authorizer buffer to a connection's out_kvec out of get_connect_authorizer() and into its caller. This way, the caller--prepare_write_connect()--can avoid adding the connect header to out_kvec before it has been fully initialized. Prior to this patch, it was possible for a connect header to be sent over the wire before the authorizer protocol or buffer length fields were initialized. An authorizer buffer associated with that header could also be queued to send only after the connection header that describes it was on the wire. Fixes http://tracker.newdream.net/issues/2424 Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index bfddd87db788..1a80907282cc 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -656,8 +656,6 @@ static void prepare_write_keepalive(struct ceph_connection *con) static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con, int *auth_proto) { - void *auth_buf; - int auth_len; struct ceph_auth_handshake *auth; if (!con->ops->get_authorizer) { @@ -680,15 +678,9 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) return ERR_PTR(-EAGAIN); - auth_buf = auth->authorizer_buf; - auth_len = auth->authorizer_buf_len; con->auth_reply_buf = auth->authorizer_reply_buf; con->auth_reply_buf_len = auth->authorizer_reply_buf_len; - con->out_connect.authorizer_len = cpu_to_le32(auth_len); - - if (auth_len) - ceph_con_out_kvec_add(con, auth_len, auth_buf); return auth; } @@ -737,12 +729,20 @@ static int prepare_write_connect(struct ceph_connection *con) con->out_connect.protocol_version = cpu_to_le32(proto); con->out_connect.flags = 0; - ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); auth_proto = CEPH_AUTH_UNKNOWN; auth = get_connect_authorizer(con, &auth_proto); if (IS_ERR(auth)) return PTR_ERR(auth); + con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); + con->out_connect.authorizer_len = auth ? + cpu_to_le32(auth->authorizer_buf_len) : 0; + + ceph_con_out_kvec_add(con, sizeof (con->out_connect), + &con->out_connect); + if (auth && auth->authorizer_buf_len) + ceph_con_out_kvec_add(con, auth->authorizer_buf_len, + auth->authorizer_buf); con->out_more = 0; set_bit(WRITE_PENDING, &con->state); -- cgit v1.2.3 From 35f9f8a09e1e88e31bd34a1e645ca0e5f070dd5c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 16 May 2012 15:16:38 -0500 Subject: libceph: avoid unregistering osd request when not registered There is a race between two __unregister_request() callers: the reply path and the ceph_osdc_wait_request(). If we get a reply *and* the timeout expires at roughly the same time, both callers will try to unregister the request, and the second one will do bad things. Simply check if the request is still already unregistered; if so, return immediately and do nothing. Fixes http://tracker.newdream.net/issues/2420 Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/osd_client.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index b7d633cc96a6..b098e7b591f0 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -841,6 +841,12 @@ static void register_request(struct ceph_osd_client *osdc, static void __unregister_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { + if (RB_EMPTY_NODE(&req->r_node)) { + dout("__unregister_request %p tid %lld not registered\n", + req, req->r_tid); + return; + } + dout("__unregister_request %p tid %lld\n", req, req->r_tid); rb_erase(&req->r_node, &osdc->requests); osdc->num_requests--; -- cgit v1.2.3 From 6bd9adbdf9ca6a052b0b7455ac67b925eb38cfad Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 21 May 2012 09:45:23 -0700 Subject: libceph: fix pg_temp updates Usually, we are adding pg_temp entries or removing them. Occasionally they update. In that case, osdmap_apply_incremental() was failing because the rbtree entry already exists. Fix by removing the existing entry before inserting a new one. Fixes http://tracker.newdream.net/issues/2446 Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/osdmap.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 2592f3cca987..1892c523c43c 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -883,8 +883,12 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, pglen = ceph_decode_32(p); if (pglen) { - /* insert */ ceph_decode_need(p, end, pglen*sizeof(u32), bad); + + /* removing existing (if any) */ + (void) __remove_pg_mapping(&map->pg_temp, pgid); + + /* insert */ pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); if (!pg) { err = -ENOMEM; -- cgit v1.2.3