summaryrefslogtreecommitdiff
path: root/fs/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/addr.c2
-rw-r--r--fs/ceph/cache.c3
-rw-r--r--fs/ceph/caps.c27
-rw-r--r--fs/ceph/dir.c11
-rw-r--r--fs/ceph/inode.c49
-rw-r--r--fs/ceph/mds_client.c61
-rw-r--r--fs/ceph/mds_client.h1
-rw-r--r--fs/ceph/super.h8
8 files changed, 121 insertions, 41 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 6df8bd481425..1e561c059539 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -216,7 +216,7 @@ static int readpage_nounlock(struct file *filp, struct page *page)
}
SetPageUptodate(page);
- if (err == 0)
+ if (err >= 0)
ceph_readpage_to_fscache(inode, page);
out:
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index 7db2e6ca4b8f..8c44fdd4e1c3 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -324,6 +324,9 @@ void ceph_invalidate_fscache_page(struct inode* inode, struct page *page)
{
struct ceph_inode_info *ci = ceph_inode(inode);
+ if (!PageFsCache(page))
+ return;
+
fscache_wait_on_page_write(ci->fscache, page);
fscache_uncache_page(ci->fscache, page);
}
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 13976c33332e..3c0a4bd74996 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -897,7 +897,7 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci)
* caller should hold i_ceph_lock.
* caller will not hold session s_mutex if called from destroy_inode.
*/
-void __ceph_remove_cap(struct ceph_cap *cap)
+void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
{
struct ceph_mds_session *session = cap->session;
struct ceph_inode_info *ci = cap->ci;
@@ -909,6 +909,16 @@ void __ceph_remove_cap(struct ceph_cap *cap)
/* remove from session list */
spin_lock(&session->s_cap_lock);
+ /*
+ * s_cap_reconnect is protected by s_cap_lock. no one changes
+ * s_cap_gen while session is in the reconnect state.
+ */
+ if (queue_release &&
+ (!session->s_cap_reconnect ||
+ cap->cap_gen == session->s_cap_gen))
+ __queue_cap_release(session, ci->i_vino.ino, cap->cap_id,
+ cap->mseq, cap->issue_seq);
+
if (session->s_cap_iterator == cap) {
/* not yet, we are iterating over this very cap */
dout("__ceph_remove_cap delaying %p removal from session %p\n",
@@ -1023,7 +1033,6 @@ void __queue_cap_release(struct ceph_mds_session *session,
struct ceph_mds_cap_release *head;
struct ceph_mds_cap_item *item;
- spin_lock(&session->s_cap_lock);
BUG_ON(!session->s_num_cap_releases);
msg = list_first_entry(&session->s_cap_releases,
struct ceph_msg, list_head);
@@ -1052,7 +1061,6 @@ void __queue_cap_release(struct ceph_mds_session *session,
(int)CEPH_CAPS_PER_RELEASE,
(int)msg->front.iov_len);
}
- spin_unlock(&session->s_cap_lock);
}
/*
@@ -1067,12 +1075,8 @@ void ceph_queue_caps_release(struct inode *inode)
p = rb_first(&ci->i_caps);
while (p) {
struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
- struct ceph_mds_session *session = cap->session;
-
- __queue_cap_release(session, ceph_ino(inode), cap->cap_id,
- cap->mseq, cap->issue_seq);
p = rb_next(p);
- __ceph_remove_cap(cap);
+ __ceph_remove_cap(cap, true);
}
}
@@ -2791,7 +2795,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
}
spin_unlock(&mdsc->cap_dirty_lock);
}
- __ceph_remove_cap(cap);
+ __ceph_remove_cap(cap, false);
}
/* else, we already released it */
@@ -2931,9 +2935,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
if (!inode) {
dout(" i don't have ino %llx\n", vino.ino);
- if (op == CEPH_CAP_OP_IMPORT)
+ if (op == CEPH_CAP_OP_IMPORT) {
+ spin_lock(&session->s_cap_lock);
__queue_cap_release(session, vino.ino, cap_id,
mseq, seq);
+ spin_unlock(&session->s_cap_lock);
+ }
goto flush_cap_releases;
}
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 868b61d56cac..2a0bcaeb189a 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -352,8 +352,18 @@ more:
}
/* note next offset and last dentry name */
+ rinfo = &req->r_reply_info;
+ if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
+ frag = le32_to_cpu(rinfo->dir_dir->frag);
+ if (ceph_frag_is_leftmost(frag))
+ fi->next_offset = 2;
+ else
+ fi->next_offset = 0;
+ off = fi->next_offset;
+ }
fi->offset = fi->next_offset;
fi->last_readdir = req;
+ fi->frag = frag;
if (req->r_reply_info.dir_end) {
kfree(fi->last_name);
@@ -363,7 +373,6 @@ more:
else
fi->next_offset = 0;
} else {
- rinfo = &req->r_reply_info;
err = note_last_dentry(fi,
rinfo->dir_dname[rinfo->dir_nr-1],
rinfo->dir_dname_len[rinfo->dir_nr-1]);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 8549a48115f7..9a8e396aed89 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -577,6 +577,8 @@ static int fill_inode(struct inode *inode,
int issued = 0, implemented;
struct timespec mtime, atime, ctime;
u32 nsplits;
+ struct ceph_inode_frag *frag;
+ struct rb_node *rb_node;
struct ceph_buffer *xattr_blob = NULL;
int err = 0;
int queue_trunc = 0;
@@ -751,15 +753,38 @@ no_change:
/* FIXME: move me up, if/when version reflects fragtree changes */
nsplits = le32_to_cpu(info->fragtree.nsplits);
mutex_lock(&ci->i_fragtree_mutex);
+ rb_node = rb_first(&ci->i_fragtree);
for (i = 0; i < nsplits; i++) {
u32 id = le32_to_cpu(info->fragtree.splits[i].frag);
- struct ceph_inode_frag *frag = __get_or_create_frag(ci, id);
-
- if (IS_ERR(frag))
- continue;
+ frag = NULL;
+ while (rb_node) {
+ frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+ if (ceph_frag_compare(frag->frag, id) >= 0) {
+ if (frag->frag != id)
+ frag = NULL;
+ else
+ rb_node = rb_next(rb_node);
+ break;
+ }
+ rb_node = rb_next(rb_node);
+ rb_erase(&frag->node, &ci->i_fragtree);
+ kfree(frag);
+ frag = NULL;
+ }
+ if (!frag) {
+ frag = __get_or_create_frag(ci, id);
+ if (IS_ERR(frag))
+ continue;
+ }
frag->split_by = le32_to_cpu(info->fragtree.splits[i].by);
dout(" frag %x split by %d\n", frag->frag, frag->split_by);
}
+ while (rb_node) {
+ frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+ rb_node = rb_next(rb_node);
+ rb_erase(&frag->node, &ci->i_fragtree);
+ kfree(frag);
+ }
mutex_unlock(&ci->i_fragtree_mutex);
/* were we issued a capability? */
@@ -1250,8 +1275,20 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
int err = 0, i;
struct inode *snapdir = NULL;
struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
- u64 frag = le32_to_cpu(rhead->args.readdir.frag);
struct ceph_dentry_info *di;
+ u64 r_readdir_offset = req->r_readdir_offset;
+ u32 frag = le32_to_cpu(rhead->args.readdir.frag);
+
+ if (rinfo->dir_dir &&
+ le32_to_cpu(rinfo->dir_dir->frag) != frag) {
+ dout("readdir_prepopulate got new frag %x -> %x\n",
+ frag, le32_to_cpu(rinfo->dir_dir->frag));
+ frag = le32_to_cpu(rinfo->dir_dir->frag);
+ if (ceph_frag_is_leftmost(frag))
+ r_readdir_offset = 2;
+ else
+ r_readdir_offset = 0;
+ }
if (req->r_aborted)
return readdir_prepopulate_inodes_only(req, session);
@@ -1315,7 +1352,7 @@ retry_lookup:
}
di = dn->d_fsdata;
- di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
+ di->offset = ceph_make_fpos(frag, i + r_readdir_offset);
/* inode */
if (dn->d_inode) {
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index b7bda5d9611d..d90861f45210 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -43,6 +43,7 @@
*/
struct ceph_reconnect_state {
+ int nr_caps;
struct ceph_pagelist *pagelist;
bool flock;
};
@@ -443,6 +444,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
INIT_LIST_HEAD(&s->s_waiting);
INIT_LIST_HEAD(&s->s_unsafe);
s->s_num_cap_releases = 0;
+ s->s_cap_reconnect = 0;
s->s_cap_iterator = NULL;
INIT_LIST_HEAD(&s->s_cap_releases);
INIT_LIST_HEAD(&s->s_cap_releases_done);
@@ -642,6 +644,8 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
req->r_unsafe_dir = NULL;
}
+ complete_all(&req->r_safe_completion);
+
ceph_mdsc_put_request(req);
}
@@ -986,7 +990,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
dout("removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->vfs_inode);
spin_lock(&ci->i_ceph_lock);
- __ceph_remove_cap(cap);
+ __ceph_remove_cap(cap, false);
if (!__ceph_is_any_real_caps(ci)) {
struct ceph_mds_client *mdsc =
ceph_sb_to_client(inode->i_sb)->mdsc;
@@ -1231,9 +1235,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
session->s_trim_caps--;
if (oissued) {
/* we aren't the only cap.. just remove us */
- __queue_cap_release(session, ceph_ino(inode), cap->cap_id,
- cap->mseq, cap->issue_seq);
- __ceph_remove_cap(cap);
+ __ceph_remove_cap(cap, true);
} else {
/* try to drop referring dentries */
spin_unlock(&ci->i_ceph_lock);
@@ -1416,7 +1418,6 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
unsigned num;
dout("discard_cap_releases mds%d\n", session->s_mds);
- spin_lock(&session->s_cap_lock);
/* zero out the in-progress message */
msg = list_first_entry(&session->s_cap_releases,
@@ -1443,8 +1444,6 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
msg->front.iov_len = sizeof(*head);
list_add(&msg->list_head, &session->s_cap_releases);
}
-
- spin_unlock(&session->s_cap_lock);
}
/*
@@ -1875,8 +1874,11 @@ static int __do_request(struct ceph_mds_client *mdsc,
int mds = -1;
int err = -EAGAIN;
- if (req->r_err || req->r_got_result)
+ if (req->r_err || req->r_got_result) {
+ if (req->r_aborted)
+ __unregister_request(mdsc, req);
goto out;
+ }
if (req->r_timeout &&
time_after_eq(jiffies, req->r_started + req->r_timeout)) {
@@ -2186,7 +2188,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
if (head->safe) {
req->r_got_safe = true;
__unregister_request(mdsc, req);
- complete_all(&req->r_safe_completion);
if (req->r_got_unsafe) {
/*
@@ -2238,8 +2239,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
if (err == 0) {
if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
- req->r_op == CEPH_MDS_OP_LSSNAP) &&
- rinfo->dir_nr)
+ req->r_op == CEPH_MDS_OP_LSSNAP))
ceph_readdir_prepopulate(req, req->r_session);
ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
}
@@ -2490,6 +2490,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
cap->seq = 0; /* reset cap seq */
cap->issue_seq = 0; /* and issue_seq */
cap->mseq = 0; /* and migrate_seq */
+ cap->cap_gen = cap->session->s_cap_gen;
if (recon_state->flock) {
rec.v2.cap_id = cpu_to_le64(cap->cap_id);
@@ -2552,6 +2553,8 @@ encode_again:
} else {
err = ceph_pagelist_append(pagelist, &rec, reclen);
}
+
+ recon_state->nr_caps++;
out_free:
kfree(path);
out_dput:
@@ -2579,6 +2582,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
struct rb_node *p;
int mds = session->s_mds;
int err = -ENOMEM;
+ int s_nr_caps;
struct ceph_pagelist *pagelist;
struct ceph_reconnect_state recon_state;
@@ -2610,20 +2614,38 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
dout("session %p state %s\n", session,
session_state_name(session->s_state));
+ spin_lock(&session->s_gen_ttl_lock);
+ session->s_cap_gen++;
+ spin_unlock(&session->s_gen_ttl_lock);
+
+ spin_lock(&session->s_cap_lock);
+ /*
+ * notify __ceph_remove_cap() that we are composing cap reconnect.
+ * If a cap get released before being added to the cap reconnect,
+ * __ceph_remove_cap() should skip queuing cap release.
+ */
+ session->s_cap_reconnect = 1;
/* drop old cap expires; we're about to reestablish that state */
discard_cap_releases(mdsc, session);
+ spin_unlock(&session->s_cap_lock);
/* traverse this session's caps */
- err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
+ s_nr_caps = session->s_nr_caps;
+ err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
if (err)
goto fail;
+ recon_state.nr_caps = 0;
recon_state.pagelist = pagelist;
recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
err = iterate_session_caps(session, encode_caps_cb, &recon_state);
if (err < 0)
goto fail;
+ spin_lock(&session->s_cap_lock);
+ session->s_cap_reconnect = 0;
+ spin_unlock(&session->s_cap_lock);
+
/*
* snaprealms. we provide mds with the ino, seq (version), and
* parent for all of our realms. If the mds has any newer info,
@@ -2646,11 +2668,18 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
if (recon_state.flock)
reply->hdr.version = cpu_to_le16(2);
- if (pagelist->length) {
- /* set up outbound data if we have any */
- reply->hdr.data_len = cpu_to_le32(pagelist->length);
- ceph_msg_data_add_pagelist(reply, pagelist);
+
+ /* raced with cap release? */
+ if (s_nr_caps != recon_state.nr_caps) {
+ struct page *page = list_first_entry(&pagelist->head,
+ struct page, lru);
+ __le32 *addr = kmap_atomic(page);
+ *addr = cpu_to_le32(recon_state.nr_caps);
+ kunmap_atomic(addr);
}
+
+ reply->hdr.data_len = cpu_to_le32(pagelist->length);
+ ceph_msg_data_add_pagelist(reply, pagelist);
ceph_con_send(&session->s_con, reply);
mutex_unlock(&session->s_mutex);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index c2a19fbbe517..4c053d099ae4 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -132,6 +132,7 @@ struct ceph_mds_session {
struct list_head s_caps; /* all caps issued by this session */
int s_nr_caps, s_trim_caps;
int s_num_cap_releases;
+ int s_cap_reconnect;
struct list_head s_cap_releases; /* waiting cap_release messages */
struct list_head s_cap_releases_done; /* ready to send */
struct ceph_cap *s_cap_iterator;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 6014b0a3c405..ef4ac38bb614 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -741,13 +741,7 @@ extern int ceph_add_cap(struct inode *inode,
int fmode, unsigned issued, unsigned wanted,
unsigned cap, unsigned seq, u64 realmino, int flags,
struct ceph_cap_reservation *caps_reservation);
-extern void __ceph_remove_cap(struct ceph_cap *cap);
-static inline void ceph_remove_cap(struct ceph_cap *cap)
-{
- spin_lock(&cap->ci->i_ceph_lock);
- __ceph_remove_cap(cap);
- spin_unlock(&cap->ci->i_ceph_lock);
-}
+extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
extern void ceph_put_cap(struct ceph_mds_client *mdsc,
struct ceph_cap *cap);