diff options
Diffstat (limited to 'net/ceph/mon_client.c')
-rw-r--r-- | net/ceph/mon_client.c | 135 |
1 files changed, 97 insertions, 38 deletions
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 1845cde26227..89a6409b4e1d 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -106,9 +106,9 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) monc->pending_auth = 1; monc->m_auth->front.iov_len = len; monc->m_auth->hdr.front_len = cpu_to_le32(len); - ceph_con_revoke(monc->con, monc->m_auth); + ceph_msg_revoke(monc->m_auth); ceph_msg_get(monc->m_auth); /* keep our ref */ - ceph_con_send(monc->con, monc->m_auth); + ceph_con_send(&monc->con, monc->m_auth); } /* @@ -117,8 +117,11 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) static void __close_session(struct ceph_mon_client *monc) { dout("__close_session closing mon%d\n", monc->cur_mon); - ceph_con_revoke(monc->con, monc->m_auth); - ceph_con_close(monc->con); + ceph_msg_revoke(monc->m_auth); + ceph_msg_revoke_incoming(monc->m_auth_reply); + ceph_msg_revoke(monc->m_subscribe); + ceph_msg_revoke_incoming(monc->m_subscribe_ack); + ceph_con_close(&monc->con); monc->cur_mon = -1; monc->pending_auth = 0; ceph_auth_reset(monc->auth); @@ -142,9 +145,8 @@ static int __open_session(struct ceph_mon_client *monc) monc->want_next_osdmap = !!monc->want_next_osdmap; dout("open_session mon%d opening\n", monc->cur_mon); - monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON; - monc->con->peer_name.num = cpu_to_le64(monc->cur_mon); - ceph_con_open(monc->con, + ceph_con_open(&monc->con, + CEPH_ENTITY_TYPE_MON, monc->cur_mon, &monc->monmap->mon_inst[monc->cur_mon].addr); /* initiatiate authentication handshake */ @@ -226,8 +228,8 @@ static void __send_subscribe(struct ceph_mon_client *monc) msg->front.iov_len = p - msg->front.iov_base; msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); - ceph_con_revoke(monc->con, msg); - ceph_con_send(monc->con, ceph_msg_get(msg)); + ceph_msg_revoke(msg); + ceph_con_send(&monc->con, ceph_msg_get(msg)); monc->sub_sent = jiffies | 1; /* never 0 */ } @@ -247,7 +249,7 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc, if (monc->hunting) { pr_info("mon%d %s session established\n", monc->cur_mon, - ceph_pr_addr(&monc->con->peer_addr.in_addr)); + ceph_pr_addr(&monc->con.peer_addr.in_addr)); monc->hunting = false; } dout("handle_subscribe_ack after %d seconds\n", seconds); @@ -309,6 +311,17 @@ int ceph_monc_open_session(struct ceph_mon_client *monc) EXPORT_SYMBOL(ceph_monc_open_session); /* + * We require the fsid and global_id in order to initialize our + * debugfs dir. + */ +static bool have_debugfs_info(struct ceph_mon_client *monc) +{ + dout("have_debugfs_info fsid %d globalid %lld\n", + (int)monc->client->have_fsid, monc->auth->global_id); + return monc->client->have_fsid && monc->auth->global_id > 0; +} + +/* * The monitor responds with mount ack indicate mount success. The * included client ticket allows the client to talk to MDSs and OSDs. */ @@ -318,9 +331,12 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc, struct ceph_client *client = monc->client; struct ceph_monmap *monmap = NULL, *old = monc->monmap; void *p, *end; + int had_debugfs_info, init_debugfs = 0; mutex_lock(&monc->mutex); + had_debugfs_info = have_debugfs_info(monc); + dout("handle_monmap\n"); p = msg->front.iov_base; end = p + msg->front.iov_len; @@ -342,12 +358,22 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc, if (!client->have_fsid) { client->have_fsid = true; + if (!had_debugfs_info && have_debugfs_info(monc)) { + pr_info("client%lld fsid %pU\n", + ceph_client_id(monc->client), + &monc->client->fsid); + init_debugfs = 1; + } mutex_unlock(&monc->mutex); - /* - * do debugfs initialization without mutex to avoid - * creating a locking dependency - */ - ceph_debugfs_client_init(client); + + if (init_debugfs) { + /* + * do debugfs initialization without mutex to avoid + * creating a locking dependency + */ + ceph_debugfs_client_init(monc->client); + } + goto out_unlocked; } out: @@ -439,6 +465,7 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con, m = NULL; } else { dout("get_generic_reply %lld got %p\n", tid, req->reply); + *skip = 0; m = ceph_msg_get(req->reply); /* * we don't need to track the connection reading into @@ -461,7 +488,7 @@ static int do_generic_request(struct ceph_mon_client *monc, req->request->hdr.tid = cpu_to_le64(req->tid); __insert_generic_request(monc, req); monc->num_generic_requests++; - ceph_con_send(monc->con, ceph_msg_get(req->request)); + ceph_con_send(&monc->con, ceph_msg_get(req->request)); mutex_unlock(&monc->mutex); err = wait_for_completion_interruptible(&req->completion); @@ -684,8 +711,9 @@ static void __resend_generic_request(struct ceph_mon_client *monc) for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { req = rb_entry(p, struct ceph_mon_generic_request, node); - ceph_con_revoke(monc->con, req->request); - ceph_con_send(monc->con, ceph_msg_get(req->request)); + ceph_msg_revoke(req->request); + ceph_msg_revoke_incoming(req->reply); + ceph_con_send(&monc->con, ceph_msg_get(req->request)); } } @@ -705,7 +733,7 @@ static void delayed_work(struct work_struct *work) __close_session(monc); __open_session(monc); /* continue hunting */ } else { - ceph_con_keepalive(monc->con); + ceph_con_keepalive(&monc->con); __validate_auth(monc); @@ -760,19 +788,12 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) goto out; /* connection */ - monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL); - if (!monc->con) - goto out_monmap; - ceph_con_init(monc->client->msgr, monc->con); - monc->con->private = monc; - monc->con->ops = &mon_con_ops; - /* authentication */ monc->auth = ceph_auth_init(cl->options->name, cl->options->key); if (IS_ERR(monc->auth)) { err = PTR_ERR(monc->auth); - goto out_con; + goto out_monmap; } monc->auth->want_keys = CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | @@ -801,6 +822,9 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) if (!monc->m_auth) goto out_auth_reply; + ceph_con_init(&monc->con, monc, &mon_con_ops, + &monc->client->msgr); + monc->cur_mon = -1; monc->hunting = true; monc->sub_renew_after = jiffies; @@ -824,8 +848,6 @@ out_subscribe_ack: ceph_msg_put(monc->m_subscribe_ack); out_auth: ceph_auth_destroy(monc->auth); -out_con: - monc->con->ops->put(monc->con); out_monmap: kfree(monc->monmap); out: @@ -841,12 +863,16 @@ void ceph_monc_stop(struct ceph_mon_client *monc) mutex_lock(&monc->mutex); __close_session(monc); - monc->con->private = NULL; - monc->con->ops->put(monc->con); - monc->con = NULL; - mutex_unlock(&monc->mutex); + /* + * flush msgr queue before we destroy ourselves to ensure that: + * - any work that references our embedded con is finished. + * - any osd_client or other work that may reference an authorizer + * finishes before we shut down the auth subsystem. + */ + ceph_msgr_flush(); + ceph_auth_destroy(monc->auth); ceph_msg_put(monc->m_auth); @@ -863,8 +889,10 @@ static void handle_auth_reply(struct ceph_mon_client *monc, { int ret; int was_auth = 0; + int had_debugfs_info, init_debugfs = 0; mutex_lock(&monc->mutex); + had_debugfs_info = have_debugfs_info(monc); if (monc->auth->ops) was_auth = monc->auth->ops->is_authenticated(monc->auth); monc->pending_auth = 0; @@ -880,14 +908,29 @@ static void handle_auth_reply(struct ceph_mon_client *monc, } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) { dout("authenticated, starting session\n"); - monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT; - monc->client->msgr->inst.name.num = + monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; + monc->client->msgr.inst.name.num = cpu_to_le64(monc->auth->global_id); __send_subscribe(monc); __resend_generic_request(monc); } + + if (!had_debugfs_info && have_debugfs_info(monc)) { + pr_info("client%lld fsid %pU\n", + ceph_client_id(monc->client), + &monc->client->fsid); + init_debugfs = 1; + } mutex_unlock(&monc->mutex); + + if (init_debugfs) { + /* + * do debugfs initialization without mutex to avoid + * creating a locking dependency + */ + ceph_debugfs_client_init(monc->client); + } } static int __validate_auth(struct ceph_mon_client *monc) @@ -992,6 +1035,8 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, case CEPH_MSG_MDS_MAP: case CEPH_MSG_OSD_MAP: m = ceph_msg_new(type, front_len, GFP_NOFS, false); + if (!m) + return NULL; /* ENOMEM--return skip == 0 */ break; } @@ -1021,7 +1066,7 @@ static void mon_fault(struct ceph_connection *con) if (!monc->hunting) pr_info("mon%d %s session lost, " "hunting for new mon\n", monc->cur_mon, - ceph_pr_addr(&monc->con->peer_addr.in_addr)); + ceph_pr_addr(&monc->con.peer_addr.in_addr)); __close_session(monc); if (!monc->hunting) { @@ -1036,9 +1081,23 @@ out: mutex_unlock(&monc->mutex); } +/* + * We can ignore refcounting on the connection struct, as all references + * will come from the messenger workqueue, which is drained prior to + * mon_client destruction. + */ +static struct ceph_connection *con_get(struct ceph_connection *con) +{ + return con; +} + +static void con_put(struct ceph_connection *con) +{ +} + static const struct ceph_connection_operations mon_con_ops = { - .get = ceph_con_get, - .put = ceph_con_put, + .get = con_get, + .put = con_put, .dispatch = dispatch, .fault = mon_fault, .alloc_msg = mon_alloc_msg, |