diff options
Diffstat (limited to 'fs/ceph/caps.c')
-rw-r--r-- | fs/ceph/caps.c | 873 |
1 files changed, 466 insertions, 407 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 6f60d0a3d0f9..99115cae1652 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -40,6 +40,11 @@ * cluster to release server state. */ +static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc); +static void __kick_flushing_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + struct ceph_inode_info *ci, + u64 oldest_flush_tid); /* * Generate readable cap strings for debugging output. @@ -849,12 +854,14 @@ int __ceph_caps_used(struct ceph_inode_info *ci) */ int __ceph_caps_file_wanted(struct ceph_inode_info *ci) { - int want = 0; - int mode; - for (mode = 0; mode < CEPH_FILE_MODE_NUM; mode++) - if (ci->i_nr_by_mode[mode]) - want |= ceph_caps_for_mode(mode); - return want; + int i, bits = 0; + for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { + if (ci->i_nr_by_mode[i]) + bits |= 1 << i; + } + if (bits == 0) + return 0; + return ceph_caps_for_mode(bits >> 1); } /* @@ -991,7 +998,7 @@ static int send_cap_msg(struct ceph_mds_session *session, u32 seq, u64 flush_tid, u64 oldest_flush_tid, u32 issue_seq, u32 mseq, u64 size, u64 max_size, struct timespec *mtime, struct timespec *atime, - struct timespec *ctime, u64 time_warp_seq, + struct timespec *ctime, u32 time_warp_seq, kuid_t uid, kgid_t gid, umode_t mode, u64 xattr_version, struct ceph_buffer *xattrs_buf, @@ -1116,8 +1123,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, struct inode *inode = &ci->vfs_inode; u64 cap_id = cap->cap_id; int held, revoking, dropping, keep; - u64 seq, issue_seq, mseq, time_warp_seq, follows; - u64 size, max_size; + u64 follows, size, max_size; + u32 seq, issue_seq, mseq, time_warp_seq; struct timespec mtime, atime, ctime; int wake = 0; umode_t mode; @@ -1215,6 +1222,22 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, return delayed; } +static inline int __send_flush_snap(struct inode *inode, + struct ceph_mds_session *session, + struct ceph_cap_snap *capsnap, + u32 mseq, u64 oldest_flush_tid) +{ + return send_cap_msg(session, ceph_vino(inode).ino, 0, + CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0, + capsnap->dirty, 0, capsnap->cap_flush.tid, + oldest_flush_tid, 0, mseq, capsnap->size, 0, + &capsnap->mtime, &capsnap->atime, + &capsnap->ctime, capsnap->time_warp_seq, + capsnap->uid, capsnap->gid, capsnap->mode, + capsnap->xattr_version, capsnap->xattr_blob, + capsnap->follows, capsnap->inline_data); +} + /* * When a snapshot is taken, clients accumulate dirty metadata on * inodes with capabilities in ceph_cap_snaps to describe the file @@ -1222,37 +1245,22 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, * asynchronously back to the MDS once sync writes complete and dirty * data is written out. * - * Unless @kick is true, skip cap_snaps that were already sent to - * the MDS (i.e., during this session). - * * Called under i_ceph_lock. Takes s_mutex as needed. */ -void __ceph_flush_snaps(struct ceph_inode_info *ci, - struct ceph_mds_session **psession, - int kick) +static void __ceph_flush_snaps(struct ceph_inode_info *ci, + struct ceph_mds_session *session) __releases(ci->i_ceph_lock) __acquires(ci->i_ceph_lock) { struct inode *inode = &ci->vfs_inode; - int mds; + struct ceph_mds_client *mdsc = session->s_mdsc; struct ceph_cap_snap *capsnap; - u32 mseq; - struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; - struct ceph_mds_session *session = NULL; /* if session != NULL, we hold - session->s_mutex */ - u64 next_follows = 0; /* keep track of how far we've gotten through the - i_cap_snaps list, and skip these entries next time - around to avoid an infinite loop */ + u64 oldest_flush_tid = 0; + u64 first_tid = 1, last_tid = 0; - if (psession) - session = *psession; + dout("__flush_snaps %p session %p\n", inode, session); - dout("__flush_snaps %p\n", inode); -retry: list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { - /* avoid an infiniute loop after retry */ - if (capsnap->follows < next_follows) - continue; /* * we need to wait for sync writes to complete and for dirty * pages to be written out. @@ -1263,97 +1271,129 @@ retry: /* should be removed by ceph_try_drop_cap_snap() */ BUG_ON(!capsnap->need_flush); - /* pick mds, take s_mutex */ - if (ci->i_auth_cap == NULL) { - dout("no auth cap (migrating?), doing nothing\n"); - goto out; - } - /* only flush each capsnap once */ - if (!kick && !list_empty(&capsnap->flushing_item)) { - dout("already flushed %p, skipping\n", capsnap); + if (capsnap->cap_flush.tid > 0) { + dout(" already flushed %p, skipping\n", capsnap); continue; } - mds = ci->i_auth_cap->session->s_mds; - mseq = ci->i_auth_cap->mseq; + spin_lock(&mdsc->cap_dirty_lock); + capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid; + list_add_tail(&capsnap->cap_flush.g_list, + &mdsc->cap_flush_list); + if (oldest_flush_tid == 0) + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + if (list_empty(&ci->i_flushing_item)) { + list_add_tail(&ci->i_flushing_item, + &session->s_cap_flushing); + } + spin_unlock(&mdsc->cap_dirty_lock); + + list_add_tail(&capsnap->cap_flush.i_list, + &ci->i_cap_flush_list); - if (session && session->s_mds != mds) { - dout("oops, wrong session %p mutex\n", session); - if (kick) - goto out; + if (first_tid == 1) + first_tid = capsnap->cap_flush.tid; + last_tid = capsnap->cap_flush.tid; + } - mutex_unlock(&session->s_mutex); - ceph_put_mds_session(session); - session = NULL; + ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS; + + while (first_tid <= last_tid) { + struct ceph_cap *cap = ci->i_auth_cap; + struct ceph_cap_flush *cf; + int ret; + + if (!(cap && cap->session == session)) { + dout("__flush_snaps %p auth cap %p not mds%d, " + "stop\n", inode, cap, session->s_mds); + break; } - if (!session) { - spin_unlock(&ci->i_ceph_lock); - mutex_lock(&mdsc->mutex); - session = __ceph_lookup_mds_session(mdsc, mds); - mutex_unlock(&mdsc->mutex); - if (session) { - dout("inverting session/ino locks on %p\n", - session); - mutex_lock(&session->s_mutex); + + ret = -ENOENT; + list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { + if (cf->tid >= first_tid) { + ret = 0; + break; } - /* - * if session == NULL, we raced against a cap - * deletion or migration. retry, and we'll - * get a better @mds value next time. - */ - spin_lock(&ci->i_ceph_lock); - goto retry; } + if (ret < 0) + break; - spin_lock(&mdsc->cap_dirty_lock); - capsnap->flush_tid = ++mdsc->last_cap_flush_tid; - spin_unlock(&mdsc->cap_dirty_lock); + first_tid = cf->tid + 1; + capsnap = container_of(cf, struct ceph_cap_snap, cap_flush); atomic_inc(&capsnap->nref); - if (list_empty(&capsnap->flushing_item)) - list_add_tail(&capsnap->flushing_item, - &session->s_cap_snaps_flushing); spin_unlock(&ci->i_ceph_lock); - dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n", - inode, capsnap, capsnap->follows, capsnap->flush_tid); - send_cap_msg(session, ceph_vino(inode).ino, 0, - CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0, - capsnap->dirty, 0, capsnap->flush_tid, 0, - 0, mseq, capsnap->size, 0, - &capsnap->mtime, &capsnap->atime, - &capsnap->ctime, capsnap->time_warp_seq, - capsnap->uid, capsnap->gid, capsnap->mode, - capsnap->xattr_version, capsnap->xattr_blob, - capsnap->follows, capsnap->inline_data); - - next_follows = capsnap->follows + 1; - ceph_put_cap_snap(capsnap); + dout("__flush_snaps %p capsnap %p tid %llu %s\n", + inode, capsnap, cf->tid, ceph_cap_string(capsnap->dirty)); + + ret = __send_flush_snap(inode, session, capsnap, cap->mseq, + oldest_flush_tid); + if (ret < 0) { + pr_err("__flush_snaps: error sending cap flushsnap, " + "ino (%llx.%llx) tid %llu follows %llu\n", + ceph_vinop(inode), cf->tid, capsnap->follows); + } + ceph_put_cap_snap(capsnap); spin_lock(&ci->i_ceph_lock); - goto retry; } +} - /* we flushed them all; remove this inode from the queue */ - spin_lock(&mdsc->snap_flush_lock); - list_del_init(&ci->i_snap_flush_item); - spin_unlock(&mdsc->snap_flush_lock); +void ceph_flush_snaps(struct ceph_inode_info *ci, + struct ceph_mds_session **psession) +{ + struct inode *inode = &ci->vfs_inode; + struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; + struct ceph_mds_session *session = *psession; + int mds; + dout("ceph_flush_snaps %p\n", inode); +retry: + spin_lock(&ci->i_ceph_lock); + if (!(ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)) { + dout(" no capsnap needs flush, doing nothing\n"); + goto out; + } + if (!ci->i_auth_cap) { + dout(" no auth cap (migrating?), doing nothing\n"); + goto out; + } -out: - if (psession) - *psession = session; - else if (session) { + mds = ci->i_auth_cap->session->s_mds; + if (session && session->s_mds != mds) { + dout(" oops, wrong session %p mutex\n", session); mutex_unlock(&session->s_mutex); ceph_put_mds_session(session); + session = NULL; + } + if (!session) { + spin_unlock(&ci->i_ceph_lock); + mutex_lock(&mdsc->mutex); + session = __ceph_lookup_mds_session(mdsc, mds); + mutex_unlock(&mdsc->mutex); + if (session) { + dout(" inverting session/ino locks on %p\n", session); + mutex_lock(&session->s_mutex); + } + goto retry; } -} -static void ceph_flush_snaps(struct ceph_inode_info *ci) -{ - spin_lock(&ci->i_ceph_lock); - __ceph_flush_snaps(ci, NULL, 0); + __ceph_flush_snaps(ci, session); +out: spin_unlock(&ci->i_ceph_lock); + + if (psession) { + *psession = session; + } else { + mutex_unlock(&session->s_mutex); + ceph_put_mds_session(session); + } + /* we flushed them all; remove this inode from the queue */ + spin_lock(&mdsc->snap_flush_lock); + list_del_init(&ci->i_snap_flush_item); + spin_unlock(&mdsc->snap_flush_lock); } /* @@ -1411,52 +1451,6 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, return dirty; } -static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci, - struct ceph_cap_flush *cf) -{ - struct rb_node **p = &ci->i_cap_flush_tree.rb_node; - struct rb_node *parent = NULL; - struct ceph_cap_flush *other = NULL; - - while (*p) { - parent = *p; - other = rb_entry(parent, struct ceph_cap_flush, i_node); - - if (cf->tid < other->tid) - p = &(*p)->rb_left; - else if (cf->tid > other->tid) - p = &(*p)->rb_right; - else - BUG(); - } - - rb_link_node(&cf->i_node, parent, p); - rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree); -} - -static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc, - struct ceph_cap_flush *cf) -{ - struct rb_node **p = &mdsc->cap_flush_tree.rb_node; - struct rb_node *parent = NULL; - struct ceph_cap_flush *other = NULL; - - while (*p) { - parent = *p; - other = rb_entry(parent, struct ceph_cap_flush, g_node); - - if (cf->tid < other->tid) - p = &(*p)->rb_left; - else if (cf->tid > other->tid) - p = &(*p)->rb_right; - else - BUG(); - } - - rb_link_node(&cf->g_node, parent, p); - rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree); -} - struct ceph_cap_flush *ceph_alloc_cap_flush(void) { return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL); @@ -1470,23 +1464,54 @@ void ceph_free_cap_flush(struct ceph_cap_flush *cf) static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc) { - struct rb_node *n = rb_first(&mdsc->cap_flush_tree); - if (n) { + if (!list_empty(&mdsc->cap_flush_list)) { struct ceph_cap_flush *cf = - rb_entry(n, struct ceph_cap_flush, g_node); + list_first_entry(&mdsc->cap_flush_list, + struct ceph_cap_flush, g_list); return cf->tid; } return 0; } /* + * Remove cap_flush from the mdsc's or inode's flushing cap list. + * Return true if caller needs to wake up flush waiters. + */ +static bool __finish_cap_flush(struct ceph_mds_client *mdsc, + struct ceph_inode_info *ci, + struct ceph_cap_flush *cf) +{ + struct ceph_cap_flush *prev; + bool wake = cf->wake; + if (mdsc) { + /* are there older pending cap flushes? */ + if (wake && cf->g_list.prev != &mdsc->cap_flush_list) { + prev = list_prev_entry(cf, g_list); + prev->wake = true; + wake = false; + } + list_del(&cf->g_list); + } else if (ci) { + if (wake && cf->i_list.prev != &ci->i_cap_flush_list) { + prev = list_prev_entry(cf, i_list); + prev->wake = true; + wake = false; + } + list_del(&cf->i_list); + } else { + BUG_ON(1); + } + return wake; +} + +/* * Add dirty inode to the flushing list. Assigned a seq number so we * can wait for caps to flush without starving. * * Called under i_ceph_lock. */ static int __mark_caps_flushing(struct inode *inode, - struct ceph_mds_session *session, + struct ceph_mds_session *session, bool wake, u64 *flush_tid, u64 *oldest_flush_tid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; @@ -1509,26 +1534,22 @@ static int __mark_caps_flushing(struct inode *inode, swap(cf, ci->i_prealloc_cap_flush); cf->caps = flushing; + cf->wake = wake; spin_lock(&mdsc->cap_dirty_lock); list_del_init(&ci->i_dirty_item); cf->tid = ++mdsc->last_cap_flush_tid; - __add_cap_flushing_to_mdsc(mdsc, cf); + list_add_tail(&cf->g_list, &mdsc->cap_flush_list); *oldest_flush_tid = __get_oldest_flush_tid(mdsc); if (list_empty(&ci->i_flushing_item)) { list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); mdsc->num_cap_flushing++; - dout(" inode %p now flushing tid %llu\n", inode, cf->tid); - } else { - list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing); - dout(" inode %p now flushing (more) tid %llu\n", - inode, cf->tid); } spin_unlock(&mdsc->cap_dirty_lock); - __add_cap_flushing_to_inode(ci, cf); + list_add_tail(&cf->i_list, &ci->i_cap_flush_list); *flush_tid = cf->tid; return flushing; @@ -1583,10 +1604,11 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, int mds = -1; /* keep track of how far we've gone through i_caps list to avoid an infinite loop on retry */ struct rb_node *p; - int tried_invalidate = 0; - int delayed = 0, sent = 0, force_requeue = 0, num; - int queue_invalidate = 0; - int is_delayed = flags & CHECK_CAPS_NODELAY; + int delayed = 0, sent = 0, num; + bool is_delayed = flags & CHECK_CAPS_NODELAY; + bool queue_invalidate = false; + bool force_requeue = false; + bool tried_invalidate = false; /* if we are unmounting, flush any unused caps immediately. */ if (mdsc->stopping) @@ -1597,9 +1619,6 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, if (ci->i_ceph_flags & CEPH_I_FLUSH) flags |= CHECK_CAPS_FLUSH; - /* flush snaps first time around only */ - if (!list_empty(&ci->i_cap_snaps)) - __ceph_flush_snaps(ci, &session, 0); goto retry_locked; retry: spin_lock(&ci->i_ceph_lock); @@ -1666,17 +1685,17 @@ retry_locked: if (revoking & (CEPH_CAP_FILE_CACHE| CEPH_CAP_FILE_LAZYIO)) { dout("check_caps queuing invalidate\n"); - queue_invalidate = 1; + queue_invalidate = true; ci->i_rdcache_revoking = ci->i_rdcache_gen; } else { dout("check_caps failed to invalidate pages\n"); /* we failed to invalidate pages. check these caps again later. */ - force_requeue = 1; + force_requeue = true; __cap_set_timeouts(mdsc, ci); } } - tried_invalidate = 1; + tried_invalidate = true; goto retry_locked; } @@ -1720,10 +1739,15 @@ retry_locked: } } /* flush anything dirty? */ - if (cap == ci->i_auth_cap && (flags & CHECK_CAPS_FLUSH) && - ci->i_dirty_caps) { - dout("flushing dirty caps\n"); - goto ack; + if (cap == ci->i_auth_cap) { + if ((flags & CHECK_CAPS_FLUSH) && ci->i_dirty_caps) { + dout("flushing dirty caps\n"); + goto ack; + } + if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) { + dout("flushing snap caps\n"); + goto ack; + } } /* completed revocation? going down and there are no caps? */ @@ -1782,6 +1806,26 @@ ack: goto retry; } } + + /* kick flushing and flush snaps before sending normal + * cap message */ + if (cap == ci->i_auth_cap && + (ci->i_ceph_flags & + (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) { + if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { + spin_lock(&mdsc->cap_dirty_lock); + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + spin_unlock(&mdsc->cap_dirty_lock); + __kick_flushing_caps(mdsc, session, ci, + oldest_flush_tid); + ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; + } + if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) + __ceph_flush_snaps(ci, session); + + goto retry_locked; + } + /* take snap_rwsem after session mutex */ if (!took_snap_rwsem) { if (down_read_trylock(&mdsc->snap_rwsem) == 0) { @@ -1796,7 +1840,7 @@ ack: } if (cap == ci->i_auth_cap && ci->i_dirty_caps) { - flushing = __mark_caps_flushing(inode, session, + flushing = __mark_caps_flushing(inode, session, false, &flush_tid, &oldest_flush_tid); } else { @@ -1822,7 +1866,7 @@ ack: * otherwise cancel. */ if (delayed && is_delayed) - force_requeue = 1; /* __send_cap delayed release; requeue */ + force_requeue = true; /* __send_cap delayed release; requeue */ if (!delayed && !is_delayed) __cap_delay_cancel(mdsc, ci); else if (!is_delayed || force_requeue) @@ -1873,8 +1917,8 @@ retry: if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) goto out; - flushing = __mark_caps_flushing(inode, session, &flush_tid, - &oldest_flush_tid); + flushing = __mark_caps_flushing(inode, session, true, + &flush_tid, &oldest_flush_tid); /* __send_cap drops i_ceph_lock */ delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want, @@ -1887,10 +1931,11 @@ retry: spin_unlock(&ci->i_ceph_lock); } } else { - struct rb_node *n = rb_last(&ci->i_cap_flush_tree); - if (n) { + if (!list_empty(&ci->i_cap_flush_list)) { struct ceph_cap_flush *cf = - rb_entry(n, struct ceph_cap_flush, i_node); + list_last_entry(&ci->i_cap_flush_list, + struct ceph_cap_flush, i_list); + cf->wake = true; flush_tid = cf->tid; } flushing = ci->i_flushing_caps; @@ -1910,14 +1955,13 @@ out: static int caps_are_flushed(struct inode *inode, u64 flush_tid) { struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_cap_flush *cf; - struct rb_node *n; int ret = 1; spin_lock(&ci->i_ceph_lock); - n = rb_first(&ci->i_cap_flush_tree); - if (n) { - cf = rb_entry(n, struct ceph_cap_flush, i_node); + if (!list_empty(&ci->i_cap_flush_list)) { + struct ceph_cap_flush * cf = + list_first_entry(&ci->i_cap_flush_list, + struct ceph_cap_flush, i_list); if (cf->tid <= flush_tid) ret = 0; } @@ -1926,53 +1970,6 @@ static int caps_are_flushed(struct inode *inode, u64 flush_tid) } /* - * Wait on any unsafe replies for the given inode. First wait on the - * newest request, and make that the upper bound. Then, if there are - * more requests, keep waiting on the oldest as long as it is still older - * than the original request. - */ -static void sync_write_wait(struct inode *inode) -{ - struct ceph_inode_info *ci = ceph_inode(inode); - struct list_head *head = &ci->i_unsafe_writes; - struct ceph_osd_request *req; - u64 last_tid; - - if (!S_ISREG(inode->i_mode)) - return; - - spin_lock(&ci->i_unsafe_lock); - if (list_empty(head)) - goto out; - - /* set upper bound as _last_ entry in chain */ - req = list_last_entry(head, struct ceph_osd_request, - r_unsafe_item); - last_tid = req->r_tid; - - do { - ceph_osdc_get_request(req); - spin_unlock(&ci->i_unsafe_lock); - dout("sync_write_wait on tid %llu (until %llu)\n", - req->r_tid, last_tid); - wait_for_completion(&req->r_safe_completion); - spin_lock(&ci->i_unsafe_lock); - ceph_osdc_put_request(req); - - /* - * from here on look at first entry in chain, since we - * only want to wait for anything older than last_tid - */ - if (list_empty(head)) - break; - req = list_first_entry(head, struct ceph_osd_request, - r_unsafe_item); - } while (req->r_tid < last_tid); -out: - spin_unlock(&ci->i_unsafe_lock); -} - -/* * wait for any unsafe requests to complete. */ static int unsafe_request_wait(struct inode *inode) @@ -2024,7 +2021,8 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) int dirty; dout("fsync %p%s\n", inode, datasync ? " datasync" : ""); - sync_write_wait(inode); + + ceph_sync_write_wait(inode); ret = filemap_write_and_wait_range(inode->i_mapping, start, end); if (ret < 0) @@ -2087,87 +2085,74 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) return err; } -/* - * After a recovering MDS goes active, we need to resend any caps - * we were flushing. - * - * Caller holds session->s_mutex. - */ -static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) -{ - struct ceph_cap_snap *capsnap; - - dout("kick_flushing_capsnaps mds%d\n", session->s_mds); - list_for_each_entry(capsnap, &session->s_cap_snaps_flushing, - flushing_item) { - struct ceph_inode_info *ci = capsnap->ci; - struct inode *inode = &ci->vfs_inode; - struct ceph_cap *cap; - - spin_lock(&ci->i_ceph_lock); - cap = ci->i_auth_cap; - if (cap && cap->session == session) { - dout("kick_flushing_caps %p cap %p capsnap %p\n", inode, - cap, capsnap); - __ceph_flush_snaps(ci, &session, 1); - } else { - pr_err("%p auth cap %p not mds%d ???\n", inode, - cap, session->s_mds); - } - spin_unlock(&ci->i_ceph_lock); - } -} - -static int __kick_flushing_caps(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session, - struct ceph_inode_info *ci) +static void __kick_flushing_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + struct ceph_inode_info *ci, + u64 oldest_flush_tid) + __releases(ci->i_ceph_lock) + __acquires(ci->i_ceph_lock) { struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; struct ceph_cap_flush *cf; - struct rb_node *n; - int delayed = 0; + int ret; u64 first_tid = 0; - u64 oldest_flush_tid; - spin_lock(&mdsc->cap_dirty_lock); - oldest_flush_tid = __get_oldest_flush_tid(mdsc); - spin_unlock(&mdsc->cap_dirty_lock); + list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { + if (cf->tid < first_tid) + continue; - while (true) { - spin_lock(&ci->i_ceph_lock); cap = ci->i_auth_cap; if (!(cap && cap->session == session)) { - pr_err("%p auth cap %p not mds%d ???\n", inode, - cap, session->s_mds); - spin_unlock(&ci->i_ceph_lock); + pr_err("%p auth cap %p not mds%d ???\n", + inode, cap, session->s_mds); break; } - for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) { - cf = rb_entry(n, struct ceph_cap_flush, i_node); - if (cf->tid >= first_tid) - break; - } - if (!n) { + first_tid = cf->tid + 1; + + if (cf->caps) { + dout("kick_flushing_caps %p cap %p tid %llu %s\n", + inode, cap, cf->tid, ceph_cap_string(cf->caps)); + ci->i_ceph_flags |= CEPH_I_NODELAY; + ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, + __ceph_caps_used(ci), + __ceph_caps_wanted(ci), + cap->issued | cap->implemented, + cf->caps, cf->tid, oldest_flush_tid); + if (ret) { + pr_err("kick_flushing_caps: error sending " + "cap flush, ino (%llx.%llx) " + "tid %llu flushing %s\n", + ceph_vinop(inode), cf->tid, + ceph_cap_string(cf->caps)); + } + } else { + struct ceph_cap_snap *capsnap = + container_of(cf, struct ceph_cap_snap, + cap_flush); + dout("kick_flushing_caps %p capsnap %p tid %llu %s\n", + inode, capsnap, cf->tid, + ceph_cap_string(capsnap->dirty)); + + atomic_inc(&capsnap->nref); spin_unlock(&ci->i_ceph_lock); - break; - } - cf = rb_entry(n, struct ceph_cap_flush, i_node); + ret = __send_flush_snap(inode, session, capsnap, cap->mseq, + oldest_flush_tid); + if (ret < 0) { + pr_err("kick_flushing_caps: error sending " + "cap flushsnap, ino (%llx.%llx) " + "tid %llu follows %llu\n", + ceph_vinop(inode), cf->tid, + capsnap->follows); + } - first_tid = cf->tid + 1; + ceph_put_cap_snap(capsnap); + } - dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode, - cap, cf->tid, ceph_cap_string(cf->caps)); - delayed |= __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, - __ceph_caps_used(ci), - __ceph_caps_wanted(ci), - cap->issued | cap->implemented, - cf->caps, cf->tid, oldest_flush_tid); + spin_lock(&ci->i_ceph_lock); } - return delayed; } void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, @@ -2175,8 +2160,14 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, { struct ceph_inode_info *ci; struct ceph_cap *cap; + u64 oldest_flush_tid; dout("early_kick_flushing_caps mds%d\n", session->s_mds); + + spin_lock(&mdsc->cap_dirty_lock); + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + spin_unlock(&mdsc->cap_dirty_lock); + list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { spin_lock(&ci->i_ceph_lock); cap = ci->i_auth_cap; @@ -2196,10 +2187,11 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, */ if ((cap->issued & ci->i_flushing_caps) != ci->i_flushing_caps) { - spin_unlock(&ci->i_ceph_lock); - if (!__kick_flushing_caps(mdsc, session, ci)) - continue; - spin_lock(&ci->i_ceph_lock); + ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; + __kick_flushing_caps(mdsc, session, ci, + oldest_flush_tid); + } else { + ci->i_ceph_flags |= CEPH_I_KICK_FLUSH; } spin_unlock(&ci->i_ceph_lock); @@ -2210,50 +2202,56 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { struct ceph_inode_info *ci; - - kick_flushing_capsnaps(mdsc, session); + struct ceph_cap *cap; + u64 oldest_flush_tid; dout("kick_flushing_caps mds%d\n", session->s_mds); + + spin_lock(&mdsc->cap_dirty_lock); + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + spin_unlock(&mdsc->cap_dirty_lock); + list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { - int delayed = __kick_flushing_caps(mdsc, session, ci); - if (delayed) { - spin_lock(&ci->i_ceph_lock); - __cap_delay_requeue(mdsc, ci); + spin_lock(&ci->i_ceph_lock); + cap = ci->i_auth_cap; + if (!(cap && cap->session == session)) { + pr_err("%p auth cap %p not mds%d ???\n", + &ci->vfs_inode, cap, session->s_mds); spin_unlock(&ci->i_ceph_lock); + continue; + } + if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { + ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; + __kick_flushing_caps(mdsc, session, ci, + oldest_flush_tid); } + spin_unlock(&ci->i_ceph_lock); } } static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, struct inode *inode) + __releases(ci->i_ceph_lock) { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap *cap; - spin_lock(&ci->i_ceph_lock); cap = ci->i_auth_cap; dout("kick_flushing_inode_caps %p flushing %s\n", inode, ceph_cap_string(ci->i_flushing_caps)); - __ceph_flush_snaps(ci, &session, 1); - - if (ci->i_flushing_caps) { - int delayed; - + if (!list_empty(&ci->i_cap_flush_list)) { + u64 oldest_flush_tid; spin_lock(&mdsc->cap_dirty_lock); list_move_tail(&ci->i_flushing_item, &cap->session->s_cap_flushing); + oldest_flush_tid = __get_oldest_flush_tid(mdsc); spin_unlock(&mdsc->cap_dirty_lock); + ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; + __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); spin_unlock(&ci->i_ceph_lock); - - delayed = __kick_flushing_caps(mdsc, session, ci); - if (delayed) { - spin_lock(&ci->i_ceph_lock); - __cap_delay_requeue(mdsc, ci); - spin_unlock(&ci->i_ceph_lock); - } } else { spin_unlock(&ci->i_ceph_lock); } @@ -2580,16 +2578,19 @@ void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps) * drop cap_snap that is not associated with any snapshot. * we don't need to send FLUSHSNAP message for it. */ -static int ceph_try_drop_cap_snap(struct ceph_cap_snap *capsnap) +static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci, + struct ceph_cap_snap *capsnap) { if (!capsnap->need_flush && !capsnap->writing && !capsnap->dirty_pages) { - dout("dropping cap_snap %p follows %llu\n", capsnap, capsnap->follows); + BUG_ON(capsnap->cap_flush.tid > 0); ceph_put_snap_context(capsnap->context); + if (!list_is_last(&capsnap->ci_item, &ci->i_cap_snaps)) + ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS; + list_del(&capsnap->ci_item); - list_del(&capsnap->flushing_item); ceph_put_cap_snap(capsnap); return 1; } @@ -2636,7 +2637,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) struct ceph_cap_snap, ci_item); capsnap->writing = 0; - if (ceph_try_drop_cap_snap(capsnap)) + if (ceph_try_drop_cap_snap(ci, capsnap)) put++; else if (__ceph_finish_cap_snap(ci, capsnap)) flushsnaps = 1; @@ -2661,7 +2662,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) if (last && !flushsnaps) ceph_check_caps(ci, 0, NULL); else if (flushsnaps) - ceph_flush_snaps(ci); + ceph_flush_snaps(ci, NULL); if (wake) wake_up_all(&ci->i_cap_wq); while (put-- > 0) @@ -2679,15 +2680,19 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, struct ceph_snap_context *snapc) { struct inode *inode = &ci->vfs_inode; - int last = 0; - int complete_capsnap = 0; - int drop_capsnap = 0; - int found = 0; struct ceph_cap_snap *capsnap = NULL; + int put = 0; + bool last = false; + bool found = false; + bool flush_snaps = false; + bool complete_capsnap = false; spin_lock(&ci->i_ceph_lock); ci->i_wrbuffer_ref -= nr; - last = !ci->i_wrbuffer_ref; + if (ci->i_wrbuffer_ref == 0) { + last = true; + put++; + } if (ci->i_head_snapc == snapc) { ci->i_wrbuffer_ref_head -= nr; @@ -2707,15 +2712,22 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, } else { list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { if (capsnap->context == snapc) { - found = 1; + found = true; break; } } BUG_ON(!found); capsnap->dirty_pages -= nr; if (capsnap->dirty_pages == 0) { - complete_capsnap = 1; - drop_capsnap = ceph_try_drop_cap_snap(capsnap); + complete_capsnap = true; + if (!capsnap->writing) { + if (ceph_try_drop_cap_snap(ci, capsnap)) { + put++; + } else { + ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS; + flush_snaps = true; + } + } } dout("put_wrbuffer_cap_refs on %p cap_snap %p " " snap %lld %d/%d -> %d/%d %s%s\n", @@ -2730,12 +2742,12 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, if (last) { ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); - iput(inode); - } else if (complete_capsnap) { - ceph_flush_snaps(ci); - wake_up_all(&ci->i_cap_wq); + } else if (flush_snaps) { + ceph_flush_snaps(ci, NULL); } - if (drop_capsnap) + if (complete_capsnap) + wake_up_all(&ci->i_cap_wq); + while (put-- > 0) iput(inode); } @@ -2779,12 +2791,11 @@ static void invalidate_aliases(struct inode *inode) */ static void handle_cap_grant(struct ceph_mds_client *mdsc, struct inode *inode, struct ceph_mds_caps *grant, - u64 inline_version, - void *inline_data, int inline_len, + struct ceph_string **pns, u64 inline_version, + void *inline_data, u32 inline_len, struct ceph_buffer *xattr_buf, struct ceph_mds_session *session, - struct ceph_cap *cap, int issued, - u32 pool_ns_len) + struct ceph_cap *cap, int issued) __releases(ci->i_ceph_lock) __releases(mdsc->snap_rwsem) { @@ -2895,8 +2906,18 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) { /* file layout may have changed */ - ci->i_layout = grant->layout; - ci->i_pool_ns_len = pool_ns_len; + s64 old_pool = ci->i_layout.pool_id; + struct ceph_string *old_ns; + + ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout); + old_ns = rcu_dereference_protected(ci->i_layout.pool_ns, + lockdep_is_held(&ci->i_ceph_lock)); + rcu_assign_pointer(ci->i_layout.pool_ns, *pns); + + if (ci->i_layout.pool_id != old_pool || *pns != old_ns) + ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; + + *pns = old_ns; /* size/truncate_seq? */ queue_trunc = ceph_fill_file_size(inode, issued, @@ -2979,13 +3000,13 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, fill_inline = true; } - spin_unlock(&ci->i_ceph_lock); - if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { - kick_flushing_inode_caps(mdsc, session, inode); - up_read(&mdsc->snap_rwsem); if (newcaps & ~issued) wake = true; + kick_flushing_inode_caps(mdsc, session, inode); + up_read(&mdsc->snap_rwsem); + } else { + spin_unlock(&ci->i_ceph_lock); } if (fill_inline) @@ -3029,23 +3050,24 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; - struct ceph_cap_flush *cf; - struct rb_node *n; + struct ceph_cap_flush *cf, *tmp_cf; LIST_HEAD(to_remove); unsigned seq = le32_to_cpu(m->seq); int dirty = le32_to_cpu(m->dirty); int cleaned = 0; - int drop = 0; + bool drop = false; + bool wake_ci = 0; + bool wake_mdsc = 0; - n = rb_first(&ci->i_cap_flush_tree); - while (n) { - cf = rb_entry(n, struct ceph_cap_flush, i_node); - n = rb_next(&cf->i_node); + list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) { if (cf->tid == flush_tid) cleaned = cf->caps; + if (cf->caps == 0) /* capsnap */ + continue; if (cf->tid <= flush_tid) { - rb_erase(&cf->i_node, &ci->i_cap_flush_tree); - list_add_tail(&cf->list, &to_remove); + if (__finish_cap_flush(NULL, ci, cf)) + wake_ci = true; + list_add_tail(&cf->i_list, &to_remove); } else { cleaned &= ~cf->caps; if (!cleaned) @@ -3066,31 +3088,29 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, spin_lock(&mdsc->cap_dirty_lock); - if (!list_empty(&to_remove)) { - list_for_each_entry(cf, &to_remove, list) - rb_erase(&cf->g_node, &mdsc->cap_flush_tree); - - n = rb_first(&mdsc->cap_flush_tree); - cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; - if (!cf || cf->tid > flush_tid) - wake_up_all(&mdsc->cap_flushing_wq); + list_for_each_entry(cf, &to_remove, i_list) { + if (__finish_cap_flush(mdsc, NULL, cf)) + wake_mdsc = true; } if (ci->i_flushing_caps == 0) { - list_del_init(&ci->i_flushing_item); - if (!list_empty(&session->s_cap_flushing)) - dout(" mds%d still flushing cap on %p\n", - session->s_mds, - &list_entry(session->s_cap_flushing.next, - struct ceph_inode_info, - i_flushing_item)->vfs_inode); + if (list_empty(&ci->i_cap_flush_list)) { + list_del_init(&ci->i_flushing_item); + if (!list_empty(&session->s_cap_flushing)) { + dout(" mds%d still flushing cap on %p\n", + session->s_mds, + &list_first_entry(&session->s_cap_flushing, + struct ceph_inode_info, + i_flushing_item)->vfs_inode); + } + } mdsc->num_cap_flushing--; dout(" inode %p now !flushing\n", inode); if (ci->i_dirty_caps == 0) { dout(" inode %p now clean\n", inode); BUG_ON(!list_empty(&ci->i_dirty_item)); - drop = 1; + drop = true; if (ci->i_wr_ref == 0 && ci->i_wrbuffer_ref_head == 0) { BUG_ON(!ci->i_head_snapc); @@ -3102,17 +3122,21 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, } } spin_unlock(&mdsc->cap_dirty_lock); - wake_up_all(&ci->i_cap_wq); out: spin_unlock(&ci->i_ceph_lock); while (!list_empty(&to_remove)) { cf = list_first_entry(&to_remove, - struct ceph_cap_flush, list); - list_del(&cf->list); + struct ceph_cap_flush, i_list); + list_del(&cf->i_list); ceph_free_cap_flush(cf); } + + if (wake_ci) + wake_up_all(&ci->i_cap_wq); + if (wake_mdsc) + wake_up_all(&mdsc->cap_flushing_wq); if (drop) iput(inode); } @@ -3131,7 +3155,9 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; u64 follows = le64_to_cpu(m->snap_follows); struct ceph_cap_snap *capsnap; - int drop = 0; + bool flushed = false; + bool wake_ci = false; + bool wake_mdsc = false; dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n", inode, ci, session->s_mds, follows); @@ -3139,30 +3165,47 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, spin_lock(&ci->i_ceph_lock); list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { if (capsnap->follows == follows) { - if (capsnap->flush_tid != flush_tid) { + if (capsnap->cap_flush.tid != flush_tid) { dout(" cap_snap %p follows %lld tid %lld !=" " %lld\n", capsnap, follows, - flush_tid, capsnap->flush_tid); + flush_tid, capsnap->cap_flush.tid); break; } - WARN_ON(capsnap->dirty_pages || capsnap->writing); - dout(" removing %p cap_snap %p follows %lld\n", - inode, capsnap, follows); - ceph_put_snap_context(capsnap->context); - list_del(&capsnap->ci_item); - list_del(&capsnap->flushing_item); - ceph_put_cap_snap(capsnap); - wake_up_all(&mdsc->cap_flushing_wq); - drop = 1; + flushed = true; break; } else { dout(" skipping cap_snap %p follows %lld\n", capsnap, capsnap->follows); } } + if (flushed) { + WARN_ON(capsnap->dirty_pages || capsnap->writing); + dout(" removing %p cap_snap %p follows %lld\n", + inode, capsnap, follows); + list_del(&capsnap->ci_item); + if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush)) + wake_ci = true; + + spin_lock(&mdsc->cap_dirty_lock); + + if (list_empty(&ci->i_cap_flush_list)) + list_del_init(&ci->i_flushing_item); + + if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush)) + wake_mdsc = true; + + spin_unlock(&mdsc->cap_dirty_lock); + } spin_unlock(&ci->i_ceph_lock); - if (drop) + if (flushed) { + ceph_put_snap_context(capsnap->context); + ceph_put_cap_snap(capsnap); + if (wake_ci) + wake_up_all(&ci->i_cap_wq); + if (wake_mdsc) + wake_up_all(&mdsc->cap_flushing_wq); iput(inode); + } } /* @@ -3267,7 +3310,8 @@ retry: tcap->implemented |= issued; if (cap == ci->i_auth_cap) ci->i_auth_cap = tcap; - if (ci->i_flushing_caps && ci->i_auth_cap == tcap) { + if (!list_empty(&ci->i_cap_flush_list) && + ci->i_auth_cap == tcap) { spin_lock(&mdsc->cap_dirty_lock); list_move_tail(&ci->i_flushing_item, &tcap->session->s_cap_flushing); @@ -3420,20 +3464,18 @@ void ceph_handle_caps(struct ceph_mds_session *session, struct ceph_cap *cap; struct ceph_mds_caps *h; struct ceph_mds_cap_peer *peer = NULL; - struct ceph_snap_realm *realm; + struct ceph_snap_realm *realm = NULL; + struct ceph_string *pool_ns = NULL; int mds = session->s_mds; int op, issued; u32 seq, mseq; struct ceph_vino vino; - u64 cap_id; - u64 size, max_size; u64 tid; u64 inline_version = 0; void *inline_data = NULL; u32 inline_len = 0; void *snaptrace; size_t snaptrace_len; - u32 pool_ns_len = 0; void *p, *end; dout("handle_caps from mds%d\n", mds); @@ -3447,11 +3489,8 @@ void ceph_handle_caps(struct ceph_mds_session *session, op = le32_to_cpu(h->op); vino.ino = le64_to_cpu(h->ino); vino.snap = CEPH_NOSNAP; - cap_id = le64_to_cpu(h->cap_id); seq = le32_to_cpu(h->seq); mseq = le32_to_cpu(h->migrate_seq); - size = le64_to_cpu(h->size); - max_size = le64_to_cpu(h->max_size); snaptrace = h + 1; snaptrace_len = le32_to_cpu(h->snap_trace_len); @@ -3490,6 +3529,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, u64 flush_tid; u32 caller_uid, caller_gid; u32 osd_epoch_barrier; + u32 pool_ns_len; /* version >= 5 */ ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad); /* version >= 6 */ @@ -3499,6 +3539,11 @@ void ceph_handle_caps(struct ceph_mds_session *session, ceph_decode_32_safe(&p, end, caller_gid, bad); /* version >= 8 */ ceph_decode_32_safe(&p, end, pool_ns_len, bad); + if (pool_ns_len > 0) { + ceph_decode_need(&p, end, pool_ns_len, bad); + pool_ns = ceph_find_or_create_string(p, pool_ns_len); + p += pool_ns_len; + } } /* lookup ino */ @@ -3519,7 +3564,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, cap = ceph_get_cap(mdsc, NULL); cap->cap_ino = vino.ino; cap->queue_release = 1; - cap->cap_id = cap_id; + cap->cap_id = le64_to_cpu(h->cap_id); cap->mseq = mseq; cap->seq = seq; spin_lock(&session->s_cap_lock); @@ -3554,10 +3599,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, } handle_cap_import(mdsc, inode, h, peer, session, &cap, &issued); - handle_cap_grant(mdsc, inode, h, + handle_cap_grant(mdsc, inode, h, &pool_ns, inline_version, inline_data, inline_len, - msg->middle, session, cap, issued, - pool_ns_len); + msg->middle, session, cap, issued); if (realm) ceph_put_snap_realm(mdsc, realm); goto done_unlocked; @@ -3579,10 +3623,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, case CEPH_CAP_OP_GRANT: __ceph_caps_issued(ci, &issued); issued |= __ceph_caps_dirty(ci); - handle_cap_grant(mdsc, inode, h, + handle_cap_grant(mdsc, inode, h, &pool_ns, inline_version, inline_data, inline_len, - msg->middle, session, cap, issued, - pool_ns_len); + msg->middle, session, cap, issued); goto done_unlocked; case CEPH_CAP_OP_FLUSH_ACK: @@ -3613,6 +3656,7 @@ done: mutex_unlock(&session->s_mutex); done_unlocked: iput(inode); + ceph_put_string(pool_ns); return; bad: @@ -3673,6 +3717,16 @@ void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc) dout("flush_dirty_caps done\n"); } +void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode) +{ + int i; + int bits = (fmode << 1) | 1; + for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { + if (bits & (1 << i)) + ci->i_nr_by_mode[i]++; + } +} + /* * Drop open file reference. If we were the last open file, * we may need to release capabilities to the MDS (or schedule @@ -3680,15 +3734,20 @@ void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc) */ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode) { - struct inode *inode = &ci->vfs_inode; - int last = 0; - + int i, last = 0; + int bits = (fmode << 1) | 1; spin_lock(&ci->i_ceph_lock); - dout("put_fmode %p fmode %d %d -> %d\n", inode, fmode, - ci->i_nr_by_mode[fmode], ci->i_nr_by_mode[fmode]-1); - BUG_ON(ci->i_nr_by_mode[fmode] == 0); - if (--ci->i_nr_by_mode[fmode] == 0) - last++; + for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { + if (bits & (1 << i)) { + BUG_ON(ci->i_nr_by_mode[i] == 0); + if (--ci->i_nr_by_mode[i] == 0) + last++; + } + } + dout("put_fmode %p fmode %d {%d,%d,%d,%d}\n", + &ci->vfs_inode, fmode, + ci->i_nr_by_mode[0], ci->i_nr_by_mode[1], + ci->i_nr_by_mode[2], ci->i_nr_by_mode[3]); spin_unlock(&ci->i_ceph_lock); if (last && ci->i_vino.snap == CEPH_NOSNAP) |