diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2015-11-13 09:24:40 -0800 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2015-11-13 09:24:40 -0800 |
commit | ca4ba96e02e932a0c9997a40fd51253b5b2d0f9d (patch) | |
tree | ca7cc57de628ec777d0fcda3425fcbba8b53d4ca | |
parent | 4aeabc6b5ca3b9d025f287978096e138bdfbdd35 (diff) | |
parent | 583d0fef756a7615e50f0f68ea0892a497d03971 (diff) |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil:
"There are several patches from Ilya fixing RBD allocation lifecycle
issues, a series adding a nocephx_sign_messages option (and associated
bug fixes/cleanups), several patches from Zheng improving the
(directory) fsync behavior, a big improvement in IO for direct-io
requests when striping is enabled from Caifeng, and several other
small fixes and cleanups"
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
libceph: clear msg->con in ceph_msg_release() only
libceph: add nocephx_sign_messages option
libceph: stop duplicating client fields in messenger
libceph: drop authorizer check from cephx msg signing routines
libceph: msg signing callouts don't need con argument
libceph: evaluate osd_req_op_data() arguments only once
ceph: make fsync() wait unsafe requests that created/modified inode
ceph: add request to i_unsafe_dirops when getting unsafe reply
libceph: introduce ceph_x_authorizer_cleanup()
ceph: don't invalidate page cache when inode is no longer used
rbd: remove duplicate calls to rbd_dev_mapping_clear()
rbd: set device_type::release instead of device::release
rbd: don't free rbd_dev outside of the release callback
rbd: return -ENOMEM instead of pool id if rbd_dev_create() fails
libceph: use local variable cursor instead of &msg->cursor
libceph: remove con argument in handle_reply()
ceph: combine as many iovec as possile into one OSD request
ceph: fix message length computation
ceph: fix a comment typo
rbd: drop null test before destroy functions
-rw-r--r-- | drivers/block/rbd.c | 109 | ||||
-rw-r--r-- | fs/ceph/cache.c | 2 | ||||
-rw-r--r-- | fs/ceph/caps.c | 76 | ||||
-rw-r--r-- | fs/ceph/file.c | 87 | ||||
-rw-r--r-- | fs/ceph/inode.c | 1 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 57 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 3 | ||||
-rw-r--r-- | fs/ceph/super.h | 1 | ||||
-rw-r--r-- | include/linux/ceph/libceph.h | 4 | ||||
-rw-r--r-- | include/linux/ceph/messenger.h | 16 | ||||
-rw-r--r-- | net/ceph/auth_x.c | 36 | ||||
-rw-r--r-- | net/ceph/ceph_common.c | 18 | ||||
-rw-r--r-- | net/ceph/crypto.h | 4 | ||||
-rw-r--r-- | net/ceph/messenger.c | 88 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 34 |
15 files changed, 314 insertions, 222 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 128e7df5b807..235708c7c46e 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -418,8 +418,6 @@ MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (d static int rbd_img_request_submit(struct rbd_img_request *img_request); -static void rbd_dev_device_release(struct device *dev); - static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count); static ssize_t rbd_remove(struct bus_type *bus, const char *buf, @@ -3991,14 +3989,12 @@ static const struct attribute_group *rbd_attr_groups[] = { NULL }; -static void rbd_sysfs_dev_release(struct device *dev) -{ -} +static void rbd_dev_release(struct device *dev); static struct device_type rbd_device_type = { .name = "rbd", .groups = rbd_attr_groups, - .release = rbd_sysfs_dev_release, + .release = rbd_dev_release, }; static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) @@ -4041,6 +4037,25 @@ static void rbd_spec_free(struct kref *kref) kfree(spec); } +static void rbd_dev_release(struct device *dev) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + bool need_put = !!rbd_dev->opts; + + rbd_put_client(rbd_dev->rbd_client); + rbd_spec_put(rbd_dev->spec); + kfree(rbd_dev->opts); + kfree(rbd_dev); + + /* + * This is racy, but way better than putting module outside of + * the release callback. The race window is pretty small, so + * doing something similar to dm (dm-builtin.c) is overkill. + */ + if (need_put) + module_put(THIS_MODULE); +} + static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, struct rbd_spec *spec, struct rbd_options *opts) @@ -4057,6 +4072,11 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, INIT_LIST_HEAD(&rbd_dev->node); init_rwsem(&rbd_dev->header_rwsem); + rbd_dev->dev.bus = &rbd_bus_type; + rbd_dev->dev.type = &rbd_device_type; + rbd_dev->dev.parent = &rbd_root_dev; + device_initialize(&rbd_dev->dev); + rbd_dev->rbd_client = rbdc; rbd_dev->spec = spec; rbd_dev->opts = opts; @@ -4068,15 +4088,21 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id); + /* + * If this is a mapping rbd_dev (as opposed to a parent one), + * pin our module. We have a ref from do_rbd_add(), so use + * __module_get(). + */ + if (rbd_dev->opts) + __module_get(THIS_MODULE); + return rbd_dev; } static void rbd_dev_destroy(struct rbd_device *rbd_dev) { - rbd_put_client(rbd_dev->rbd_client); - rbd_spec_put(rbd_dev->spec); - kfree(rbd_dev->opts); - kfree(rbd_dev); + if (rbd_dev) + put_device(&rbd_dev->dev); } /* @@ -4702,27 +4728,6 @@ static int rbd_dev_header_info(struct rbd_device *rbd_dev) return rbd_dev_v2_header_info(rbd_dev); } -static int rbd_bus_add_dev(struct rbd_device *rbd_dev) -{ - struct device *dev; - int ret; - - dev = &rbd_dev->dev; - dev->bus = &rbd_bus_type; - dev->type = &rbd_device_type; - dev->parent = &rbd_root_dev; - dev->release = rbd_dev_device_release; - dev_set_name(dev, "%d", rbd_dev->dev_id); - ret = device_register(dev); - - return ret; -} - -static void rbd_bus_del_dev(struct rbd_device *rbd_dev) -{ - device_unregister(&rbd_dev->dev); -} - /* * Get a unique rbd identifier for the given new rbd_dev, and add * the rbd_dev to the global list. @@ -5225,7 +5230,8 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); - ret = rbd_bus_add_dev(rbd_dev); + dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id); + ret = device_add(&rbd_dev->dev); if (ret) goto err_out_mapping; @@ -5248,8 +5254,6 @@ err_out_blkdev: unregister_blkdev(rbd_dev->major, rbd_dev->name); err_out_id: rbd_dev_id_put(rbd_dev); - rbd_dev_mapping_clear(rbd_dev); - return ret; } @@ -5397,7 +5401,7 @@ static ssize_t do_rbd_add(struct bus_type *bus, struct rbd_spec *spec = NULL; struct rbd_client *rbdc; bool read_only; - int rc = -ENOMEM; + int rc; if (!try_module_get(THIS_MODULE)) return -ENODEV; @@ -5405,7 +5409,7 @@ static ssize_t do_rbd_add(struct bus_type *bus, /* parse add command */ rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); if (rc < 0) - goto err_out_module; + goto out; rbdc = rbd_get_client(ceph_opts); if (IS_ERR(rbdc)) { @@ -5432,8 +5436,10 @@ static ssize_t do_rbd_add(struct bus_type *bus, } rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts); - if (!rbd_dev) + if (!rbd_dev) { + rc = -ENOMEM; goto err_out_client; + } rbdc = NULL; /* rbd_dev now owns this */ spec = NULL; /* rbd_dev now owns this */ rbd_opts = NULL; /* rbd_dev now owns this */ @@ -5458,10 +5464,13 @@ static ssize_t do_rbd_add(struct bus_type *bus, */ rbd_dev_header_unwatch_sync(rbd_dev); rbd_dev_image_release(rbd_dev); - goto err_out_module; + goto out; } - return count; + rc = count; +out: + module_put(THIS_MODULE); + return rc; err_out_rbd_dev: rbd_dev_destroy(rbd_dev); @@ -5470,12 +5479,7 @@ err_out_client: err_out_args: rbd_spec_put(spec); kfree(rbd_opts); -err_out_module: - module_put(THIS_MODULE); - - dout("Error adding device %s\n", buf); - - return (ssize_t)rc; + goto out; } static ssize_t rbd_add(struct bus_type *bus, @@ -5495,17 +5499,15 @@ static ssize_t rbd_add_single_major(struct bus_type *bus, return do_rbd_add(bus, buf, count); } -static void rbd_dev_device_release(struct device *dev) +static void rbd_dev_device_release(struct rbd_device *rbd_dev) { - struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - rbd_free_disk(rbd_dev); clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); + device_del(&rbd_dev->dev); rbd_dev_mapping_clear(rbd_dev); if (!single_major) unregister_blkdev(rbd_dev->major, rbd_dev->name); rbd_dev_id_put(rbd_dev); - rbd_dev_mapping_clear(rbd_dev); } static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) @@ -5590,9 +5592,8 @@ static ssize_t do_rbd_remove(struct bus_type *bus, * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting * in a potential use after free of rbd_dev->disk or rbd_dev. */ - rbd_bus_del_dev(rbd_dev); + rbd_dev_device_release(rbd_dev); rbd_dev_image_release(rbd_dev); - module_put(THIS_MODULE); return count; } @@ -5663,10 +5664,8 @@ static int rbd_slab_init(void) if (rbd_segment_name_cache) return 0; out_err: - if (rbd_obj_request_cache) { - kmem_cache_destroy(rbd_obj_request_cache); - rbd_obj_request_cache = NULL; - } + kmem_cache_destroy(rbd_obj_request_cache); + rbd_obj_request_cache = NULL; kmem_cache_destroy(rbd_img_request_cache); rbd_img_request_cache = NULL; diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c index 834f9f3723fb..a4766ded1ba7 100644 --- a/fs/ceph/cache.c +++ b/fs/ceph/cache.c @@ -88,7 +88,7 @@ static uint16_t ceph_fscache_inode_get_key(const void *cookie_netfs_data, const struct ceph_inode_info* ci = cookie_netfs_data; uint16_t klen; - /* use ceph virtual inode (id + snaphot) */ + /* use ceph virtual inode (id + snapshot) */ klen = sizeof(ci->i_vino); if (klen > maxbuf) return 0; diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 27b566874bc1..c69e1253b47b 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1655,9 +1655,8 @@ retry_locked: !S_ISDIR(inode->i_mode) && /* ignore readdir cache */ ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ inode->i_data.nrpages && /* have cached pages */ - (file_wanted == 0 || /* no open files */ - (revoking & (CEPH_CAP_FILE_CACHE| - CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */ + (revoking & (CEPH_CAP_FILE_CACHE| + CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */ !tried_invalidate) { dout("check_caps trying to invalidate on %p\n", inode); if (try_nonblocking_invalidate(inode) < 0) { @@ -1971,49 +1970,46 @@ out: } /* - * wait for any uncommitted directory operations to commit. + * wait for any unsafe requests to complete. */ -static int unsafe_dirop_wait(struct inode *inode) +static int unsafe_request_wait(struct inode *inode) { struct ceph_inode_info *ci = ceph_inode(inode); - struct list_head *head = &ci->i_unsafe_dirops; - struct ceph_mds_request *req; - u64 last_tid; - int ret = 0; - - if (!S_ISDIR(inode->i_mode)) - return 0; + struct ceph_mds_request *req1 = NULL, *req2 = NULL; + int ret, err = 0; spin_lock(&ci->i_unsafe_lock); - if (list_empty(head)) - goto out; - - req = list_last_entry(head, struct ceph_mds_request, - r_unsafe_dir_item); - last_tid = req->r_tid; - - do { - ceph_mdsc_get_request(req); - spin_unlock(&ci->i_unsafe_lock); + if (S_ISDIR(inode->i_mode) && !list_empty(&ci->i_unsafe_dirops)) { + req1 = list_last_entry(&ci->i_unsafe_dirops, + struct ceph_mds_request, + r_unsafe_dir_item); + ceph_mdsc_get_request(req1); + } + if (!list_empty(&ci->i_unsafe_iops)) { + req2 = list_last_entry(&ci->i_unsafe_iops, + struct ceph_mds_request, + r_unsafe_target_item); + ceph_mdsc_get_request(req2); + } + spin_unlock(&ci->i_unsafe_lock); - dout("unsafe_dirop_wait %p wait on tid %llu (until %llu)\n", - inode, req->r_tid, last_tid); - ret = !wait_for_completion_timeout(&req->r_safe_completion, - ceph_timeout_jiffies(req->r_timeout)); + dout("unsafe_requeset_wait %p wait on tid %llu %llu\n", + inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL); + if (req1) { + ret = !wait_for_completion_timeout(&req1->r_safe_completion, + ceph_timeout_jiffies(req1->r_timeout)); if (ret) - ret = -EIO; /* timed out */ - - ceph_mdsc_put_request(req); - - spin_lock(&ci->i_unsafe_lock); - if (ret || list_empty(head)) - break; - req = list_first_entry(head, struct ceph_mds_request, - r_unsafe_dir_item); - } while (req->r_tid < last_tid); -out: - spin_unlock(&ci->i_unsafe_lock); - return ret; + err = -EIO; + ceph_mdsc_put_request(req1); + } + if (req2) { + ret = !wait_for_completion_timeout(&req2->r_safe_completion, + ceph_timeout_jiffies(req2->r_timeout)); + if (ret) + err = -EIO; + ceph_mdsc_put_request(req2); + } + return err; } int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) @@ -2039,7 +2035,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) dirty = try_flush_caps(inode, &flush_tid); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); - ret = unsafe_dirop_wait(inode); + ret = unsafe_request_wait(inode); /* * only wait on non-file metadata writeback (the mds diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 0c62868b5c56..3c68e6aee2f0 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -34,6 +34,74 @@ * need to wait for MDS acknowledgement. */ +/* + * Calculate the length sum of direct io vectors that can + * be combined into one page vector. + */ +static size_t dio_get_pagev_size(const struct iov_iter *it) +{ + const struct iovec *iov = it->iov; + const struct iovec *iovend = iov + it->nr_segs; + size_t size; + + size = iov->iov_len - it->iov_offset; + /* + * An iov can be page vectored when both the current tail + * and the next base are page aligned. + */ + while (PAGE_ALIGNED((iov->iov_base + iov->iov_len)) && + (++iov < iovend && PAGE_ALIGNED((iov->iov_base)))) { + size += iov->iov_len; + } + dout("dio_get_pagevlen len = %zu\n", size); + return size; +} + +/* + * Allocate a page vector based on (@it, @nbytes). + * The return value is the tuple describing a page vector, + * that is (@pages, @page_align, @num_pages). + */ +static struct page ** +dio_get_pages_alloc(const struct iov_iter *it, size_t nbytes, + size_t *page_align, int *num_pages) +{ + struct iov_iter tmp_it = *it; + size_t align; + struct page **pages; + int ret = 0, idx, npages; + + align = (unsigned long)(it->iov->iov_base + it->iov_offset) & + (PAGE_SIZE - 1); + npages = calc_pages_for(align, nbytes); + pages = kmalloc(sizeof(*pages) * npages, GFP_KERNEL); + if (!pages) { + pages = vmalloc(sizeof(*pages) * npages); + if (!pages) + return ERR_PTR(-ENOMEM); + } + + for (idx = 0; idx < npages; ) { + size_t start; + ret = iov_iter_get_pages(&tmp_it, pages + idx, nbytes, + npages - idx, &start); + if (ret < 0) + goto fail; + + iov_iter_advance(&tmp_it, ret); + nbytes -= ret; + idx += (ret + start + PAGE_SIZE - 1) / PAGE_SIZE; + } + + BUG_ON(nbytes != 0); + *num_pages = npages; + *page_align = align; + dout("dio_get_pages_alloc: got %d pages align %zu\n", npages, align); + return pages; +fail: + ceph_put_page_vector(pages, idx, false); + return ERR_PTR(ret); +} /* * Prepare an open request. Preallocate ceph_cap to avoid an @@ -458,11 +526,10 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, size_t start; ssize_t n; - n = iov_iter_get_pages_alloc(i, &pages, INT_MAX, &start); - if (n < 0) - return n; - - num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE; + n = dio_get_pagev_size(i); + pages = dio_get_pages_alloc(i, n, &start, &num_pages); + if (IS_ERR(pages)) + return PTR_ERR(pages); ret = striped_read(inode, off, n, pages, num_pages, checkeof, @@ -592,7 +659,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, CEPH_OSD_FLAG_WRITE; while (iov_iter_count(from) > 0) { - u64 len = iov_iter_single_seg_count(from); + u64 len = dio_get_pagev_size(from); size_t start; ssize_t n; @@ -611,14 +678,14 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); - n = iov_iter_get_pages_alloc(from, &pages, len, &start); - if (unlikely(n < 0)) { - ret = n; + n = len; + pages = dio_get_pages_alloc(from, len, &start, &num_pages); + if (IS_ERR(pages)) { ceph_osdc_put_request(req); + ret = PTR_ERR(pages); break; } - num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE; /* * throw out any page cache pages in this range. this * may block. diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 96d2bd829902..498dcfa2dcdb 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -452,6 +452,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) INIT_LIST_HEAD(&ci->i_unsafe_writes); INIT_LIST_HEAD(&ci->i_unsafe_dirops); + INIT_LIST_HEAD(&ci->i_unsafe_iops); spin_lock_init(&ci->i_unsafe_lock); ci->i_snap_realm = NULL; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 51cb02da75d9..e7b130a637f9 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -633,13 +633,8 @@ static void __register_request(struct ceph_mds_client *mdsc, mdsc->oldest_tid = req->r_tid; if (dir) { - struct ceph_inode_info *ci = ceph_inode(dir); - ihold(dir); - spin_lock(&ci->i_unsafe_lock); req->r_unsafe_dir = dir; - list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); - spin_unlock(&ci->i_unsafe_lock); } } @@ -665,13 +660,20 @@ static void __unregister_request(struct ceph_mds_client *mdsc, rb_erase(&req->r_node, &mdsc->request_tree); RB_CLEAR_NODE(&req->r_node); - if (req->r_unsafe_dir) { + if (req->r_unsafe_dir && req->r_got_unsafe) { struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); - spin_lock(&ci->i_unsafe_lock); list_del_init(&req->r_unsafe_dir_item); spin_unlock(&ci->i_unsafe_lock); + } + if (req->r_target_inode && req->r_got_unsafe) { + struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); + spin_lock(&ci->i_unsafe_lock); + list_del_init(&req->r_unsafe_target_item); + spin_unlock(&ci->i_unsafe_lock); + } + if (req->r_unsafe_dir) { iput(req->r_unsafe_dir); req->r_unsafe_dir = NULL; } @@ -1430,6 +1432,13 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) if ((used | wanted) & CEPH_CAP_ANY_WR) goto out; } + /* The inode has cached pages, but it's no longer used. + * we can safely drop it */ + if (wanted == 0 && used == CEPH_CAP_FILE_CACHE && + !(oissued & CEPH_CAP_FILE_CACHE)) { + used = 0; + oissued = 0; + } if ((used | wanted) & ~oissued & mine) goto out; /* we need these caps */ @@ -1438,7 +1447,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) /* we aren't the only cap.. just remove us */ __ceph_remove_cap(cap, true); } else { - /* try to drop referring dentries */ + /* try dropping referring dentries */ spin_unlock(&ci->i_ceph_lock); d_prune_aliases(inode); dout("trim_caps_cb %p cap %p pruned, count now %d\n", @@ -1704,6 +1713,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) req->r_started = jiffies; req->r_resend_mds = -1; INIT_LIST_HEAD(&req->r_unsafe_dir_item); + INIT_LIST_HEAD(&req->r_unsafe_target_item); req->r_fmode = -1; kref_init(&req->r_kref); INIT_LIST_HEAD(&req->r_wait); @@ -1935,7 +1945,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, len = sizeof(*head) + pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + - sizeof(struct timespec); + sizeof(struct ceph_timespec); /* calculate (max) length for cap releases */ len += sizeof(struct ceph_mds_request_release) * @@ -2477,6 +2487,14 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) } else { req->r_got_unsafe = true; list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); + if (req->r_unsafe_dir) { + struct ceph_inode_info *ci = + ceph_inode(req->r_unsafe_dir); + spin_lock(&ci->i_unsafe_lock); + list_add_tail(&req->r_unsafe_dir_item, + &ci->i_unsafe_dirops); + spin_unlock(&ci->i_unsafe_lock); + } } dout("handle_reply tid %lld result %d\n", tid, result); @@ -2518,6 +2536,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) up_read(&mdsc->snap_rwsem); if (realm) ceph_put_snap_realm(mdsc, realm); + + if (err == 0 && req->r_got_unsafe && req->r_target_inode) { + struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); + spin_lock(&ci->i_unsafe_lock); + list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops); + spin_unlock(&ci->i_unsafe_lock); + } out_err: mutex_lock(&mdsc->mutex); if (!req->r_aborted) { @@ -3917,17 +3942,19 @@ static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, return msg; } -static int sign_message(struct ceph_connection *con, struct ceph_msg *msg) +static int mds_sign_message(struct ceph_msg *msg) { - struct ceph_mds_session *s = con->private; + struct ceph_mds_session *s = msg->con->private; struct ceph_auth_handshake *auth = &s->s_auth; + return ceph_auth_sign_message(auth, msg); } -static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg) +static int mds_check_message_signature(struct ceph_msg *msg) { - struct ceph_mds_session *s = con->private; + struct ceph_mds_session *s = msg->con->private; struct ceph_auth_handshake *auth = &s->s_auth; + return ceph_auth_check_message_signature(auth, msg); } @@ -3940,8 +3967,8 @@ static const struct ceph_connection_operations mds_con_ops = { .invalidate_authorizer = invalidate_authorizer, .peer_reset = peer_reset, .alloc_msg = mds_alloc_msg, - .sign_message = sign_message, - .check_message_signature = check_message_signature, + .sign_message = mds_sign_message, + .check_message_signature = mds_check_message_signature, }; /* eof */ diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index f575eafe2261..ccf11ef0ca87 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -236,6 +236,9 @@ struct ceph_mds_request { struct inode *r_unsafe_dir; struct list_head r_unsafe_dir_item; + /* unsafe requests that modify the target inode */ + struct list_head r_unsafe_target_item; + struct ceph_mds_session *r_session; int r_attempts; /* resend attempts */ diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 2f2460d23a06..75b7d125ce66 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -342,6 +342,7 @@ struct ceph_inode_info { struct list_head i_unsafe_writes; /* uncommitted sync writes */ struct list_head i_unsafe_dirops; /* uncommitted mds dir ops */ + struct list_head i_unsafe_iops; /* uncommitted mds inode ops */ spinlock_t i_unsafe_lock; struct ceph_snap_realm *i_snap_realm; /* snap realm (if caps) */ diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index 397c5cd09794..3e3799cdc6e6 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -29,8 +29,9 @@ #define CEPH_OPT_NOSHARE (1<<1) /* don't share client with other sbs */ #define CEPH_OPT_MYIP (1<<2) /* specified my ip */ #define CEPH_OPT_NOCRC (1<<3) /* no data crc on writes */ -#define CEPH_OPT_NOMSGAUTH (1<<4) /* not require cephx message signature */ +#define CEPH_OPT_NOMSGAUTH (1<<4) /* don't require msg signing feat */ #define CEPH_OPT_TCP_NODELAY (1<<5) /* TCP_NODELAY on TCP sockets */ +#define CEPH_OPT_NOMSGSIGN (1<<6) /* don't sign msgs */ #define CEPH_OPT_DEFAULT (CEPH_OPT_TCP_NODELAY) @@ -137,6 +138,7 @@ struct ceph_client { #endif }; +#define from_msgr(ms) container_of(ms, struct ceph_client, msgr) /* diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index b2371d9b51fa..71b1d6cdcb5d 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -43,10 +43,9 @@ struct ceph_connection_operations { struct ceph_msg * (*alloc_msg) (struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip); - int (*sign_message) (struct ceph_connection *con, struct ceph_msg *msg); - int (*check_message_signature) (struct ceph_connection *con, - struct ceph_msg *msg); + int (*sign_message) (struct ceph_msg *msg); + int (*check_message_signature) (struct ceph_msg *msg); }; /* use format string %s%d */ @@ -58,8 +57,6 @@ struct ceph_messenger { atomic_t stopping; possible_net_t net; - bool nocrc; - bool tcp_nodelay; /* * the global_seq counts connections i (attempt to) initiate @@ -67,9 +64,6 @@ struct ceph_messenger { */ u32 global_seq; spinlock_t global_seq_lock; - - u64 supported_features; - u64 required_features; }; enum ceph_msg_data_type { @@ -268,11 +262,7 @@ extern void ceph_msgr_exit(void); extern void ceph_msgr_flush(void); extern void ceph_messenger_init(struct ceph_messenger *msgr, - struct ceph_entity_addr *myaddr, - u64 supported_features, - u64 required_features, - bool nocrc, - bool tcp_nodelay); + struct ceph_entity_addr *myaddr); extern void ceph_messenger_fini(struct ceph_messenger *msgr); extern void ceph_con_init(struct ceph_connection *con, void *private, diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c index ba6eb17226da..10d87753ed87 100644 --- a/net/ceph/auth_x.c +++ b/net/ceph/auth_x.c @@ -8,6 +8,7 @@ #include <linux/ceph/decode.h> #include <linux/ceph/auth.h> +#include <linux/ceph/libceph.h> #include <linux/ceph/messenger.h> #include "crypto.h" @@ -279,6 +280,15 @@ bad: return -EINVAL; } +static void ceph_x_authorizer_cleanup(struct ceph_x_authorizer *au) +{ + ceph_crypto_key_destroy(&au->session_key); + if (au->buf) { + ceph_buffer_put(au->buf); + au->buf = NULL; + } +} + static int ceph_x_build_authorizer(struct ceph_auth_client *ac, struct ceph_x_ticket_handler *th, struct ceph_x_authorizer *au) @@ -297,7 +307,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, ceph_crypto_key_destroy(&au->session_key); ret = ceph_crypto_key_clone(&au->session_key, &th->session_key); if (ret) - return ret; + goto out_au; maxlen = sizeof(*msg_a) + sizeof(msg_b) + ceph_x_encrypt_buflen(ticket_blob_len); @@ -309,8 +319,8 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, if (!au->buf) { au->buf = ceph_buffer_new(maxlen, GFP_NOFS); if (!au->buf) { - ceph_crypto_key_destroy(&au->session_key); - return -ENOMEM; + ret = -ENOMEM; + goto out_au; } } au->service = th->service; @@ -340,7 +350,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, ret = ceph_x_encrypt(&au->session_key, &msg_b, sizeof(msg_b), p, end - p); if (ret < 0) - goto out_buf; + goto out_au; p += ret; au->buf->vec.iov_len = p - au->buf->vec.iov_base; dout(" built authorizer nonce %llx len %d\n", au->nonce, @@ -348,9 +358,8 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, BUG_ON(au->buf->vec.iov_len > maxlen); return 0; -out_buf: - ceph_buffer_put(au->buf); - au->buf = NULL; +out_au: + ceph_x_authorizer_cleanup(au); return ret; } @@ -624,8 +633,7 @@ static void ceph_x_destroy_authorizer(struct ceph_auth_client *ac, { struct ceph_x_authorizer *au = (void *)a; - ceph_crypto_key_destroy(&au->session_key); - ceph_buffer_put(au->buf); + ceph_x_authorizer_cleanup(au); kfree(au); } @@ -653,8 +661,7 @@ static void ceph_x_destroy(struct ceph_auth_client *ac) remove_ticket_handler(ac, th); } - if (xi->auth_authorizer.buf) - ceph_buffer_put(xi->auth_authorizer.buf); + ceph_x_authorizer_cleanup(&xi->auth_authorizer); kfree(ac->private); ac->private = NULL; @@ -691,8 +698,10 @@ static int ceph_x_sign_message(struct ceph_auth_handshake *auth, struct ceph_msg *msg) { int ret; - if (!auth->authorizer) + + if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN)) return 0; + ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer, msg, &msg->footer.sig); if (ret < 0) @@ -707,8 +716,9 @@ static int ceph_x_check_message_signature(struct ceph_auth_handshake *auth, __le64 sig_check; int ret; - if (!auth->authorizer) + if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN)) return 0; + ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer, msg, &sig_check); if (ret < 0) diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 78f098a20796..bcbec33c6a14 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -245,6 +245,8 @@ enum { Opt_nocrc, Opt_cephx_require_signatures, Opt_nocephx_require_signatures, + Opt_cephx_sign_messages, + Opt_nocephx_sign_messages, Opt_tcp_nodelay, Opt_notcp_nodelay, }; @@ -267,6 +269,8 @@ static match_table_t opt_tokens = { {Opt_nocrc, "nocrc"}, {Opt_cephx_require_signatures, "cephx_require_signatures"}, {Opt_nocephx_require_signatures, "nocephx_require_signatures"}, + {Opt_cephx_sign_messages, "cephx_sign_messages"}, + {Opt_nocephx_sign_messages, "nocephx_sign_messages"}, {Opt_tcp_nodelay, "tcp_nodelay"}, {Opt_notcp_nodelay, "notcp_nodelay"}, {-1, NULL} @@ -491,6 +495,12 @@ ceph_parse_options(char *options, const char *dev_name, case Opt_nocephx_require_signatures: opt->flags |= CEPH_OPT_NOMSGAUTH; break; + case Opt_cephx_sign_messages: + opt->flags &= ~CEPH_OPT_NOMSGSIGN; + break; + case Opt_nocephx_sign_messages: + opt->flags |= CEPH_OPT_NOMSGSIGN; + break; case Opt_tcp_nodelay: opt->flags |= CEPH_OPT_TCP_NODELAY; @@ -534,6 +544,8 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client) seq_puts(m, "nocrc,"); if (opt->flags & CEPH_OPT_NOMSGAUTH) seq_puts(m, "nocephx_require_signatures,"); + if (opt->flags & CEPH_OPT_NOMSGSIGN) + seq_puts(m, "nocephx_sign_messages,"); if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0) seq_puts(m, "notcp_nodelay,"); @@ -596,11 +608,7 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, if (ceph_test_opt(client, MYIP)) myaddr = &client->options->my_addr; - ceph_messenger_init(&client->msgr, myaddr, - client->supported_features, - client->required_features, - ceph_test_opt(client, NOCRC), - ceph_test_opt(client, TCP_NODELAY)); + ceph_messenger_init(&client->msgr, myaddr); /* subsystems */ err = ceph_monc_init(&client->monc, client); diff --git a/net/ceph/crypto.h b/net/ceph/crypto.h index d1498224c49d..2e9cab09f37b 100644 --- a/net/ceph/crypto.h +++ b/net/ceph/crypto.h @@ -16,8 +16,10 @@ struct ceph_crypto_key { static inline void ceph_crypto_key_destroy(struct ceph_crypto_key *key) { - if (key) + if (key) { kfree(key->key); + key->key = NULL; + } } int ceph_crypto_key_clone(struct ceph_crypto_key *dst, diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index b9b0e3b5da49..9981039ef4ff 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -509,7 +509,7 @@ static int ceph_tcp_connect(struct ceph_connection *con) return ret; } - if (con->msgr->tcp_nodelay) { + if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) { int optval = 1; ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, @@ -637,9 +637,6 @@ static int con_close_socket(struct ceph_connection *con) static void ceph_msg_remove(struct ceph_msg *msg) { list_del_init(&msg->list_head); - BUG_ON(msg->con == NULL); - msg->con->ops->put(msg->con); - msg->con = NULL; ceph_msg_put(msg); } @@ -662,15 +659,14 @@ static void reset_connection(struct ceph_connection *con) if (con->in_msg) { BUG_ON(con->in_msg->con != con); - con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; - con->ops->put(con); } con->connect_seq = 0; con->out_seq = 0; if (con->out_msg) { + BUG_ON(con->out_msg->con != con); ceph_msg_put(con->out_msg); con->out_msg = NULL; } @@ -1205,7 +1201,7 @@ static void prepare_write_message_footer(struct ceph_connection *con) con->out_kvec[v].iov_base = &m->footer; if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { if (con->ops->sign_message) - con->ops->sign_message(con, m); + con->ops->sign_message(m); else m->footer.sig = 0; con->out_kvec[v].iov_len = sizeof(m->footer); @@ -1432,7 +1428,8 @@ static int prepare_write_connect(struct ceph_connection *con) dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, con->connect_seq, global_seq, proto); - con->out_connect.features = cpu_to_le64(con->msgr->supported_features); + con->out_connect.features = + cpu_to_le64(from_msgr(con->msgr)->supported_features); con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); con->out_connect.global_seq = cpu_to_le32(global_seq); @@ -1527,7 +1524,7 @@ static int write_partial_message_data(struct ceph_connection *con) { struct ceph_msg *msg = con->out_msg; struct ceph_msg_data_cursor *cursor = &msg->cursor; - bool do_datacrc = !con->msgr->nocrc; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); u32 crc; dout("%s %p msg %p\n", __func__, con, msg); @@ -1552,8 +1549,8 @@ static int write_partial_message_data(struct ceph_connection *con) bool need_crc; int ret; - page = ceph_msg_data_next(&msg->cursor, &page_offset, &length, - &last_piece); + page = ceph_msg_data_next(cursor, &page_offset, &length, + &last_piece); ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, !last_piece); if (ret <= 0) { @@ -1564,7 +1561,7 @@ static int write_partial_message_data(struct ceph_connection *con) } if (do_datacrc && cursor->need_crc) crc = ceph_crc32c_page(crc, page, page_offset, length); - need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret); + need_crc = ceph_msg_data_advance(cursor, (size_t)ret); } dout("%s %p msg %p done\n", __func__, con, msg); @@ -2005,8 +2002,8 @@ static int process_banner(struct ceph_connection *con) static int process_connect(struct ceph_connection *con) { - u64 sup_feat = con->msgr->supported_features; - u64 req_feat = con->msgr->required_features; + u64 sup_feat = from_msgr(con->msgr)->supported_features; + u64 req_feat = from_msgr(con->msgr)->required_features; u64 server_feat = ceph_sanitize_features( le64_to_cpu(con->in_reply.features)); int ret; @@ -2232,7 +2229,7 @@ static int read_partial_msg_data(struct ceph_connection *con) { struct ceph_msg *msg = con->in_msg; struct ceph_msg_data_cursor *cursor = &msg->cursor; - const bool do_datacrc = !con->msgr->nocrc; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); struct page *page; size_t page_offset; size_t length; @@ -2246,8 +2243,7 @@ static int read_partial_msg_data(struct ceph_connection *con) if (do_datacrc) crc = con->in_data_crc; while (cursor->resid) { - page = ceph_msg_data_next(&msg->cursor, &page_offset, &length, - NULL); + page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); if (ret <= 0) { if (do_datacrc) @@ -2258,7 +2254,7 @@ static int read_partial_msg_data(struct ceph_connection *con) if (do_datacrc) crc = ceph_crc32c_page(crc, page, page_offset, ret); - (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret); + (void) ceph_msg_data_advance(cursor, (size_t)ret); } if (do_datacrc) con->in_data_crc = crc; @@ -2278,7 +2274,7 @@ static int read_partial_message(struct ceph_connection *con) int end; int ret; unsigned int front_len, middle_len, data_len; - bool do_datacrc = !con->msgr->nocrc; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); u64 seq; u32 crc; @@ -2423,7 +2419,7 @@ static int read_partial_message(struct ceph_connection *con) } if (need_sign && con->ops->check_message_signature && - con->ops->check_message_signature(con, m)) { + con->ops->check_message_signature(m)) { pr_err("read_partial_message %p signature check failed\n", m); return -EBADMSG; } @@ -2438,13 +2434,10 @@ static int read_partial_message(struct ceph_connection *con) */ static void process_message(struct ceph_connection *con) { - struct ceph_msg *msg; + struct ceph_msg *msg = con->in_msg; BUG_ON(con->in_msg->con != con); - con->in_msg->con = NULL; - msg = con->in_msg; con->in_msg = NULL; - con->ops->put(con); /* if first message, set peer_name */ if (con->peer_name.type == 0) @@ -2677,7 +2670,7 @@ more: if (ret <= 0) { switch (ret) { case -EBADMSG: - con->error_msg = "bad crc"; + con->error_msg = "bad crc/signature"; /* fall through */ case -EBADE: ret = -EIO; @@ -2918,10 +2911,8 @@ static void con_fault(struct ceph_connection *con) if (con->in_msg) { BUG_ON(con->in_msg->con != con); - con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; - con->ops->put(con); } /* Requeue anything that hasn't been acked */ @@ -2952,15 +2943,8 @@ static void con_fault(struct ceph_connection *con) * initialize a new messenger instance */ void ceph_messenger_init(struct ceph_messenger *msgr, - struct ceph_entity_addr *myaddr, - u64 supported_features, - u64 required_features, - bool nocrc, - bool tcp_nodelay) + struct ceph_entity_addr *myaddr) { - msgr->supported_features = supported_features; - msgr->required_features = required_features; - spin_lock_init(&msgr->global_seq_lock); if (myaddr) @@ -2970,8 +2954,6 @@ void ceph_messenger_init(struct ceph_messenger *msgr, msgr->inst.addr.type = 0; get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); encode_my_addr(msgr); - msgr->nocrc = nocrc; - msgr->tcp_nodelay = tcp_nodelay; atomic_set(&msgr->stopping, 0); write_pnet(&msgr->net, get_net(current->nsproxy->net_ns)); @@ -2986,6 +2968,15 @@ void ceph_messenger_fini(struct ceph_messenger *msgr) } EXPORT_SYMBOL(ceph_messenger_fini); +static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con) +{ + if (msg->con) + msg->con->ops->put(msg->con); + + msg->con = con ? con->ops->get(con) : NULL; + BUG_ON(msg->con != con); +} + static void clear_standby(struct ceph_connection *con) { /* come back from STANDBY? */ @@ -3017,9 +3008,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) return; } - BUG_ON(msg->con != NULL); - msg->con = con->ops->get(con); - BUG_ON(msg->con == NULL); + msg_con_set(msg, con); BUG_ON(!list_empty(&msg->list_head)); list_add_tail(&msg->list_head, &con->out_queue); @@ -3047,16 +3036,15 @@ void ceph_msg_revoke(struct ceph_msg *msg) { struct ceph_connection *con = msg->con; - if (!con) + if (!con) { + dout("%s msg %p null con\n", __func__, msg); return; /* Message not in our possession */ + } mutex_lock(&con->mutex); if (!list_empty(&msg->list_head)) { dout("%s %p msg %p - was on queue\n", __func__, con, msg); list_del_init(&msg->list_head); - BUG_ON(msg->con == NULL); - msg->con->ops->put(msg->con); - msg->con = NULL; msg->hdr.seq = 0; ceph_msg_put(msg); @@ -3080,16 +3068,13 @@ void ceph_msg_revoke(struct ceph_msg *msg) */ void ceph_msg_revoke_incoming(struct ceph_msg *msg) { - struct ceph_connection *con; + struct ceph_connection *con = msg->con; - BUG_ON(msg == NULL); - if (!msg->con) { + if (!con) { dout("%s msg %p null con\n", __func__, msg); - return; /* Message not in our possession */ } - con = msg->con; mutex_lock(&con->mutex); if (con->in_msg == msg) { unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); @@ -3335,9 +3320,8 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) } if (msg) { BUG_ON(*skip); + msg_con_set(msg, con); con->in_msg = msg; - con->in_msg->con = con->ops->get(con); - BUG_ON(con->in_msg->con == NULL); } else { /* * Null message pointer means either we should skip @@ -3384,6 +3368,8 @@ static void ceph_msg_release(struct kref *kref) dout("%s %p\n", __func__, m); WARN_ON(!list_empty(&m->list_head)); + msg_con_set(m, NULL); + /* drop middle, data, if any */ if (m->middle) { ceph_buffer_put(m->middle); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index f79ccac6699f..f8f235930d88 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -120,11 +120,13 @@ static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, } #endif /* CONFIG_BLOCK */ -#define osd_req_op_data(oreq, whch, typ, fld) \ - ({ \ - BUG_ON(whch >= (oreq)->r_num_ops); \ - &(oreq)->r_ops[whch].typ.fld; \ - }) +#define osd_req_op_data(oreq, whch, typ, fld) \ +({ \ + struct ceph_osd_request *__oreq = (oreq); \ + unsigned int __whch = (whch); \ + BUG_ON(__whch >= __oreq->r_num_ops); \ + &__oreq->r_ops[__whch].typ.fld; \ +}) static struct ceph_osd_data * osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) @@ -1750,8 +1752,7 @@ static void complete_request(struct ceph_osd_request *req) * handle osd op reply. either call the callback if it is specified, * or do the completion to wake up the waiting thread. */ -static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, - struct ceph_connection *con) +static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) { void *p, *end; struct ceph_osd_request *req; @@ -2807,7 +2808,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) ceph_osdc_handle_map(osdc, msg); break; case CEPH_MSG_OSD_OPREPLY: - handle_reply(osdc, msg, con); + handle_reply(osdc, msg); break; case CEPH_MSG_WATCH_NOTIFY: handle_watch_notify(osdc, msg); @@ -2849,9 +2850,6 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, goto out; } - if (req->r_reply->con) - dout("%s revoking msg %p from old con %p\n", __func__, - req->r_reply, req->r_reply->con); ceph_msg_revoke_incoming(req->r_reply); if (front_len > req->r_reply->front_alloc_len) { @@ -2978,17 +2976,19 @@ static int invalidate_authorizer(struct ceph_connection *con) return ceph_monc_validate_auth(&osdc->client->monc); } -static int sign_message(struct ceph_connection *con, struct ceph_msg *msg) +static int osd_sign_message(struct ceph_msg *msg) { - struct ceph_osd *o = con->private; + struct ceph_osd *o = msg->con->private; struct ceph_auth_handshake *auth = &o->o_auth; + return ceph_auth_sign_message(auth, msg); } -static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg) +static int osd_check_message_signature(struct ceph_msg *msg) { - struct ceph_osd *o = con->private; + struct ceph_osd *o = msg->con->private; struct ceph_auth_handshake *auth = &o->o_auth; + return ceph_auth_check_message_signature(auth, msg); } @@ -3000,7 +3000,7 @@ static const struct ceph_connection_operations osd_con_ops = { .verify_authorizer_reply = verify_authorizer_reply, .invalidate_authorizer = invalidate_authorizer, .alloc_msg = alloc_msg, - .sign_message = sign_message, - .check_message_signature = check_message_signature, + .sign_message = osd_sign_message, + .check_message_signature = osd_check_message_signature, .fault = osd_reset, }; |