mon_client.c 24 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034
  1. #include <linux/ceph/ceph_debug.h>
  2. #include <linux/module.h>
  3. #include <linux/types.h>
  4. #include <linux/slab.h>
  5. #include <linux/random.h>
  6. #include <linux/sched.h>
  7. #include <linux/ceph/mon_client.h>
  8. #include <linux/ceph/libceph.h>
  9. #include <linux/ceph/decode.h>
  10. #include <linux/ceph/auth.h>
  11. /*
  12. * Interact with Ceph monitor cluster. Handle requests for new map
  13. * versions, and periodically resend as needed. Also implement
  14. * statfs() and umount().
  15. *
  16. * A small cluster of Ceph "monitors" are responsible for managing critical
  17. * cluster configuration and state information. An odd number (e.g., 3, 5)
  18. * of cmon daemons use a modified version of the Paxos part-time parliament
  19. * algorithm to manage the MDS map (mds cluster membership), OSD map, and
  20. * list of clients who have mounted the file system.
  21. *
  22. * We maintain an open, active session with a monitor at all times in order to
  23. * receive timely MDSMap updates. We periodically send a keepalive byte on the
  24. * TCP socket to ensure we detect a failure. If the connection does break, we
  25. * randomly hunt for a new monitor. Once the connection is reestablished, we
  26. * resend any outstanding requests.
  27. */
  28. static const struct ceph_connection_operations mon_con_ops;
  29. static int __validate_auth(struct ceph_mon_client *monc);
  30. /*
  31. * Decode a monmap blob (e.g., during mount).
  32. */
  33. struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
  34. {
  35. struct ceph_monmap *m = NULL;
  36. int i, err = -EINVAL;
  37. struct ceph_fsid fsid;
  38. u32 epoch, num_mon;
  39. u16 version;
  40. u32 len;
  41. ceph_decode_32_safe(&p, end, len, bad);
  42. ceph_decode_need(&p, end, len, bad);
  43. dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
  44. ceph_decode_16_safe(&p, end, version, bad);
  45. ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
  46. ceph_decode_copy(&p, &fsid, sizeof(fsid));
  47. epoch = ceph_decode_32(&p);
  48. num_mon = ceph_decode_32(&p);
  49. ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
  50. if (num_mon >= CEPH_MAX_MON)
  51. goto bad;
  52. m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
  53. if (m == NULL)
  54. return ERR_PTR(-ENOMEM);
  55. m->fsid = fsid;
  56. m->epoch = epoch;
  57. m->num_mon = num_mon;
  58. ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
  59. for (i = 0; i < num_mon; i++)
  60. ceph_decode_addr(&m->mon_inst[i].addr);
  61. dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
  62. m->num_mon);
  63. for (i = 0; i < m->num_mon; i++)
  64. dout("monmap_decode mon%d is %s\n", i,
  65. ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
  66. return m;
  67. bad:
  68. dout("monmap_decode failed with %d\n", err);
  69. kfree(m);
  70. return ERR_PTR(err);
  71. }
  72. /*
  73. * return true if *addr is included in the monmap.
  74. */
  75. int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
  76. {
  77. int i;
  78. for (i = 0; i < m->num_mon; i++)
  79. if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
  80. return 1;
  81. return 0;
  82. }
  83. /*
  84. * Send an auth request.
  85. */
  86. static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
  87. {
  88. monc->pending_auth = 1;
  89. monc->m_auth->front.iov_len = len;
  90. monc->m_auth->hdr.front_len = cpu_to_le32(len);
  91. ceph_con_revoke(monc->con, monc->m_auth);
  92. ceph_msg_get(monc->m_auth); /* keep our ref */
  93. ceph_con_send(monc->con, monc->m_auth);
  94. }
  95. /*
  96. * Close monitor session, if any.
  97. */
  98. static void __close_session(struct ceph_mon_client *monc)
  99. {
  100. dout("__close_session closing mon%d\n", monc->cur_mon);
  101. ceph_con_revoke(monc->con, monc->m_auth);
  102. ceph_con_close(monc->con);
  103. monc->cur_mon = -1;
  104. monc->pending_auth = 0;
  105. ceph_auth_reset(monc->auth);
  106. }
  107. /*
  108. * Open a session with a (new) monitor.
  109. */
  110. static int __open_session(struct ceph_mon_client *monc)
  111. {
  112. char r;
  113. int ret;
  114. if (monc->cur_mon < 0) {
  115. get_random_bytes(&r, 1);
  116. monc->cur_mon = r % monc->monmap->num_mon;
  117. dout("open_session num=%d r=%d -> mon%d\n",
  118. monc->monmap->num_mon, r, monc->cur_mon);
  119. monc->sub_sent = 0;
  120. monc->sub_renew_after = jiffies; /* i.e., expired */
  121. monc->want_next_osdmap = !!monc->want_next_osdmap;
  122. dout("open_session mon%d opening\n", monc->cur_mon);
  123. monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON;
  124. monc->con->peer_name.num = cpu_to_le64(monc->cur_mon);
  125. ceph_con_open(monc->con,
  126. &monc->monmap->mon_inst[monc->cur_mon].addr);
  127. /* initiatiate authentication handshake */
  128. ret = ceph_auth_build_hello(monc->auth,
  129. monc->m_auth->front.iov_base,
  130. monc->m_auth->front_max);
  131. __send_prepared_auth_request(monc, ret);
  132. } else {
  133. dout("open_session mon%d already open\n", monc->cur_mon);
  134. }
  135. return 0;
  136. }
  137. static bool __sub_expired(struct ceph_mon_client *monc)
  138. {
  139. return time_after_eq(jiffies, monc->sub_renew_after);
  140. }
  141. /*
  142. * Reschedule delayed work timer.
  143. */
  144. static void __schedule_delayed(struct ceph_mon_client *monc)
  145. {
  146. unsigned delay;
  147. if (monc->cur_mon < 0 || __sub_expired(monc))
  148. delay = 10 * HZ;
  149. else
  150. delay = 20 * HZ;
  151. dout("__schedule_delayed after %u\n", delay);
  152. schedule_delayed_work(&monc->delayed_work, delay);
  153. }
  154. /*
  155. * Send subscribe request for mdsmap and/or osdmap.
  156. */
  157. static void __send_subscribe(struct ceph_mon_client *monc)
  158. {
  159. dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
  160. (unsigned)monc->sub_sent, __sub_expired(monc),
  161. monc->want_next_osdmap);
  162. if ((__sub_expired(monc) && !monc->sub_sent) ||
  163. monc->want_next_osdmap == 1) {
  164. struct ceph_msg *msg = monc->m_subscribe;
  165. struct ceph_mon_subscribe_item *i;
  166. void *p, *end;
  167. int num;
  168. p = msg->front.iov_base;
  169. end = p + msg->front_max;
  170. num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
  171. ceph_encode_32(&p, num);
  172. if (monc->want_next_osdmap) {
  173. dout("__send_subscribe to 'osdmap' %u\n",
  174. (unsigned)monc->have_osdmap);
  175. ceph_encode_string(&p, end, "osdmap", 6);
  176. i = p;
  177. i->have = cpu_to_le64(monc->have_osdmap);
  178. i->onetime = 1;
  179. p += sizeof(*i);
  180. monc->want_next_osdmap = 2; /* requested */
  181. }
  182. if (monc->want_mdsmap) {
  183. dout("__send_subscribe to 'mdsmap' %u+\n",
  184. (unsigned)monc->have_mdsmap);
  185. ceph_encode_string(&p, end, "mdsmap", 6);
  186. i = p;
  187. i->have = cpu_to_le64(monc->have_mdsmap);
  188. i->onetime = 0;
  189. p += sizeof(*i);
  190. }
  191. ceph_encode_string(&p, end, "monmap", 6);
  192. i = p;
  193. i->have = 0;
  194. i->onetime = 0;
  195. p += sizeof(*i);
  196. msg->front.iov_len = p - msg->front.iov_base;
  197. msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
  198. ceph_con_revoke(monc->con, msg);
  199. ceph_con_send(monc->con, ceph_msg_get(msg));
  200. monc->sub_sent = jiffies | 1; /* never 0 */
  201. }
  202. }
  203. static void handle_subscribe_ack(struct ceph_mon_client *monc,
  204. struct ceph_msg *msg)
  205. {
  206. unsigned seconds;
  207. struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
  208. if (msg->front.iov_len < sizeof(*h))
  209. goto bad;
  210. seconds = le32_to_cpu(h->duration);
  211. mutex_lock(&monc->mutex);
  212. if (monc->hunting) {
  213. pr_info("mon%d %s session established\n",
  214. monc->cur_mon,
  215. ceph_pr_addr(&monc->con->peer_addr.in_addr));
  216. monc->hunting = false;
  217. }
  218. dout("handle_subscribe_ack after %d seconds\n", seconds);
  219. monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
  220. monc->sub_sent = 0;
  221. mutex_unlock(&monc->mutex);
  222. return;
  223. bad:
  224. pr_err("got corrupt subscribe-ack msg\n");
  225. ceph_msg_dump(msg);
  226. }
  227. /*
  228. * Keep track of which maps we have
  229. */
  230. int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
  231. {
  232. mutex_lock(&monc->mutex);
  233. monc->have_mdsmap = got;
  234. mutex_unlock(&monc->mutex);
  235. return 0;
  236. }
  237. EXPORT_SYMBOL(ceph_monc_got_mdsmap);
  238. int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
  239. {
  240. mutex_lock(&monc->mutex);
  241. monc->have_osdmap = got;
  242. monc->want_next_osdmap = 0;
  243. mutex_unlock(&monc->mutex);
  244. return 0;
  245. }
  246. /*
  247. * Register interest in the next osdmap
  248. */
  249. void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
  250. {
  251. dout("request_next_osdmap have %u\n", monc->have_osdmap);
  252. mutex_lock(&monc->mutex);
  253. if (!monc->want_next_osdmap)
  254. monc->want_next_osdmap = 1;
  255. if (monc->want_next_osdmap < 2)
  256. __send_subscribe(monc);
  257. mutex_unlock(&monc->mutex);
  258. }
  259. /*
  260. *
  261. */
  262. int ceph_monc_open_session(struct ceph_mon_client *monc)
  263. {
  264. mutex_lock(&monc->mutex);
  265. __open_session(monc);
  266. __schedule_delayed(monc);
  267. mutex_unlock(&monc->mutex);
  268. return 0;
  269. }
  270. EXPORT_SYMBOL(ceph_monc_open_session);
  271. /*
  272. * The monitor responds with mount ack indicate mount success. The
  273. * included client ticket allows the client to talk to MDSs and OSDs.
  274. */
  275. static void ceph_monc_handle_map(struct ceph_mon_client *monc,
  276. struct ceph_msg *msg)
  277. {
  278. struct ceph_client *client = monc->client;
  279. struct ceph_monmap *monmap = NULL, *old = monc->monmap;
  280. void *p, *end;
  281. mutex_lock(&monc->mutex);
  282. dout("handle_monmap\n");
  283. p = msg->front.iov_base;
  284. end = p + msg->front.iov_len;
  285. monmap = ceph_monmap_decode(p, end);
  286. if (IS_ERR(monmap)) {
  287. pr_err("problem decoding monmap, %d\n",
  288. (int)PTR_ERR(monmap));
  289. goto out;
  290. }
  291. if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
  292. kfree(monmap);
  293. goto out;
  294. }
  295. client->monc.monmap = monmap;
  296. kfree(old);
  297. out:
  298. mutex_unlock(&monc->mutex);
  299. wake_up_all(&client->auth_wq);
  300. }
  301. /*
  302. * generic requests (e.g., statfs, poolop)
  303. */
  304. static struct ceph_mon_generic_request *__lookup_generic_req(
  305. struct ceph_mon_client *monc, u64 tid)
  306. {
  307. struct ceph_mon_generic_request *req;
  308. struct rb_node *n = monc->generic_request_tree.rb_node;
  309. while (n) {
  310. req = rb_entry(n, struct ceph_mon_generic_request, node);
  311. if (tid < req->tid)
  312. n = n->rb_left;
  313. else if (tid > req->tid)
  314. n = n->rb_right;
  315. else
  316. return req;
  317. }
  318. return NULL;
  319. }
  320. static void __insert_generic_request(struct ceph_mon_client *monc,
  321. struct ceph_mon_generic_request *new)
  322. {
  323. struct rb_node **p = &monc->generic_request_tree.rb_node;
  324. struct rb_node *parent = NULL;
  325. struct ceph_mon_generic_request *req = NULL;
  326. while (*p) {
  327. parent = *p;
  328. req = rb_entry(parent, struct ceph_mon_generic_request, node);
  329. if (new->tid < req->tid)
  330. p = &(*p)->rb_left;
  331. else if (new->tid > req->tid)
  332. p = &(*p)->rb_right;
  333. else
  334. BUG();
  335. }
  336. rb_link_node(&new->node, parent, p);
  337. rb_insert_color(&new->node, &monc->generic_request_tree);
  338. }
  339. static void release_generic_request(struct kref *kref)
  340. {
  341. struct ceph_mon_generic_request *req =
  342. container_of(kref, struct ceph_mon_generic_request, kref);
  343. if (req->reply)
  344. ceph_msg_put(req->reply);
  345. if (req->request)
  346. ceph_msg_put(req->request);
  347. kfree(req);
  348. }
  349. static void put_generic_request(struct ceph_mon_generic_request *req)
  350. {
  351. kref_put(&req->kref, release_generic_request);
  352. }
  353. static void get_generic_request(struct ceph_mon_generic_request *req)
  354. {
  355. kref_get(&req->kref);
  356. }
  357. static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
  358. struct ceph_msg_header *hdr,
  359. int *skip)
  360. {
  361. struct ceph_mon_client *monc = con->private;
  362. struct ceph_mon_generic_request *req;
  363. u64 tid = le64_to_cpu(hdr->tid);
  364. struct ceph_msg *m;
  365. mutex_lock(&monc->mutex);
  366. req = __lookup_generic_req(monc, tid);
  367. if (!req) {
  368. dout("get_generic_reply %lld dne\n", tid);
  369. *skip = 1;
  370. m = NULL;
  371. } else {
  372. dout("get_generic_reply %lld got %p\n", tid, req->reply);
  373. m = ceph_msg_get(req->reply);
  374. /*
  375. * we don't need to track the connection reading into
  376. * this reply because we only have one open connection
  377. * at a time, ever.
  378. */
  379. }
  380. mutex_unlock(&monc->mutex);
  381. return m;
  382. }
  383. static int do_generic_request(struct ceph_mon_client *monc,
  384. struct ceph_mon_generic_request *req)
  385. {
  386. int err;
  387. /* register request */
  388. mutex_lock(&monc->mutex);
  389. req->tid = ++monc->last_tid;
  390. req->request->hdr.tid = cpu_to_le64(req->tid);
  391. __insert_generic_request(monc, req);
  392. monc->num_generic_requests++;
  393. ceph_con_send(monc->con, ceph_msg_get(req->request));
  394. mutex_unlock(&monc->mutex);
  395. err = wait_for_completion_interruptible(&req->completion);
  396. mutex_lock(&monc->mutex);
  397. rb_erase(&req->node, &monc->generic_request_tree);
  398. monc->num_generic_requests--;
  399. mutex_unlock(&monc->mutex);
  400. if (!err)
  401. err = req->result;
  402. return err;
  403. }
  404. /*
  405. * statfs
  406. */
  407. static void handle_statfs_reply(struct ceph_mon_client *monc,
  408. struct ceph_msg *msg)
  409. {
  410. struct ceph_mon_generic_request *req;
  411. struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
  412. u64 tid = le64_to_cpu(msg->hdr.tid);
  413. if (msg->front.iov_len != sizeof(*reply))
  414. goto bad;
  415. dout("handle_statfs_reply %p tid %llu\n", msg, tid);
  416. mutex_lock(&monc->mutex);
  417. req = __lookup_generic_req(monc, tid);
  418. if (req) {
  419. *(struct ceph_statfs *)req->buf = reply->st;
  420. req->result = 0;
  421. get_generic_request(req);
  422. }
  423. mutex_unlock(&monc->mutex);
  424. if (req) {
  425. complete_all(&req->completion);
  426. put_generic_request(req);
  427. }
  428. return;
  429. bad:
  430. pr_err("corrupt generic reply, tid %llu\n", tid);
  431. ceph_msg_dump(msg);
  432. }
  433. /*
  434. * Do a synchronous statfs().
  435. */
  436. int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
  437. {
  438. struct ceph_mon_generic_request *req;
  439. struct ceph_mon_statfs *h;
  440. int err;
  441. req = kzalloc(sizeof(*req), GFP_NOFS);
  442. if (!req)
  443. return -ENOMEM;
  444. kref_init(&req->kref);
  445. req->buf = buf;
  446. req->buf_len = sizeof(*buf);
  447. init_completion(&req->completion);
  448. err = -ENOMEM;
  449. req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
  450. true);
  451. if (!req->request)
  452. goto out;
  453. req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS,
  454. true);
  455. if (!req->reply)
  456. goto out;
  457. /* fill out request */
  458. h = req->request->front.iov_base;
  459. h->monhdr.have_version = 0;
  460. h->monhdr.session_mon = cpu_to_le16(-1);
  461. h->monhdr.session_mon_tid = 0;
  462. h->fsid = monc->monmap->fsid;
  463. err = do_generic_request(monc, req);
  464. out:
  465. kref_put(&req->kref, release_generic_request);
  466. return err;
  467. }
  468. EXPORT_SYMBOL(ceph_monc_do_statfs);
  469. /*
  470. * pool ops
  471. */
  472. static int get_poolop_reply_buf(const char *src, size_t src_len,
  473. char *dst, size_t dst_len)
  474. {
  475. u32 buf_len;
  476. if (src_len != sizeof(u32) + dst_len)
  477. return -EINVAL;
  478. buf_len = le32_to_cpu(*(u32 *)src);
  479. if (buf_len != dst_len)
  480. return -EINVAL;
  481. memcpy(dst, src + sizeof(u32), dst_len);
  482. return 0;
  483. }
  484. static void handle_poolop_reply(struct ceph_mon_client *monc,
  485. struct ceph_msg *msg)
  486. {
  487. struct ceph_mon_generic_request *req;
  488. struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
  489. u64 tid = le64_to_cpu(msg->hdr.tid);
  490. if (msg->front.iov_len < sizeof(*reply))
  491. goto bad;
  492. dout("handle_poolop_reply %p tid %llu\n", msg, tid);
  493. mutex_lock(&monc->mutex);
  494. req = __lookup_generic_req(monc, tid);
  495. if (req) {
  496. if (req->buf_len &&
  497. get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
  498. msg->front.iov_len - sizeof(*reply),
  499. req->buf, req->buf_len) < 0) {
  500. mutex_unlock(&monc->mutex);
  501. goto bad;
  502. }
  503. req->result = le32_to_cpu(reply->reply_code);
  504. get_generic_request(req);
  505. }
  506. mutex_unlock(&monc->mutex);
  507. if (req) {
  508. complete(&req->completion);
  509. put_generic_request(req);
  510. }
  511. return;
  512. bad:
  513. pr_err("corrupt generic reply, tid %llu\n", tid);
  514. ceph_msg_dump(msg);
  515. }
  516. /*
  517. * Do a synchronous pool op.
  518. */
  519. int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
  520. u32 pool, u64 snapid,
  521. char *buf, int len)
  522. {
  523. struct ceph_mon_generic_request *req;
  524. struct ceph_mon_poolop *h;
  525. int err;
  526. req = kzalloc(sizeof(*req), GFP_NOFS);
  527. if (!req)
  528. return -ENOMEM;
  529. kref_init(&req->kref);
  530. req->buf = buf;
  531. req->buf_len = len;
  532. init_completion(&req->completion);
  533. err = -ENOMEM;
  534. req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS,
  535. true);
  536. if (!req->request)
  537. goto out;
  538. req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS,
  539. true);
  540. if (!req->reply)
  541. goto out;
  542. /* fill out request */
  543. req->request->hdr.version = cpu_to_le16(2);
  544. h = req->request->front.iov_base;
  545. h->monhdr.have_version = 0;
  546. h->monhdr.session_mon = cpu_to_le16(-1);
  547. h->monhdr.session_mon_tid = 0;
  548. h->fsid = monc->monmap->fsid;
  549. h->pool = cpu_to_le32(pool);
  550. h->op = cpu_to_le32(op);
  551. h->auid = 0;
  552. h->snapid = cpu_to_le64(snapid);
  553. h->name_len = 0;
  554. err = do_generic_request(monc, req);
  555. out:
  556. kref_put(&req->kref, release_generic_request);
  557. return err;
  558. }
  559. int ceph_monc_create_snapid(struct ceph_mon_client *monc,
  560. u32 pool, u64 *snapid)
  561. {
  562. return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
  563. pool, 0, (char *)snapid, sizeof(*snapid));
  564. }
  565. EXPORT_SYMBOL(ceph_monc_create_snapid);
  566. int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
  567. u32 pool, u64 snapid)
  568. {
  569. return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
  570. pool, snapid, 0, 0);
  571. }
  572. /*
  573. * Resend pending generic requests.
  574. */
  575. static void __resend_generic_request(struct ceph_mon_client *monc)
  576. {
  577. struct ceph_mon_generic_request *req;
  578. struct rb_node *p;
  579. for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
  580. req = rb_entry(p, struct ceph_mon_generic_request, node);
  581. ceph_con_revoke(monc->con, req->request);
  582. ceph_con_send(monc->con, ceph_msg_get(req->request));
  583. }
  584. }
  585. /*
  586. * Delayed work. If we haven't mounted yet, retry. Otherwise,
  587. * renew/retry subscription as needed (in case it is timing out, or we
  588. * got an ENOMEM). And keep the monitor connection alive.
  589. */
  590. static void delayed_work(struct work_struct *work)
  591. {
  592. struct ceph_mon_client *monc =
  593. container_of(work, struct ceph_mon_client, delayed_work.work);
  594. dout("monc delayed_work\n");
  595. mutex_lock(&monc->mutex);
  596. if (monc->hunting) {
  597. __close_session(monc);
  598. __open_session(monc); /* continue hunting */
  599. } else {
  600. ceph_con_keepalive(monc->con);
  601. __validate_auth(monc);
  602. if (monc->auth->ops->is_authenticated(monc->auth))
  603. __send_subscribe(monc);
  604. }
  605. __schedule_delayed(monc);
  606. mutex_unlock(&monc->mutex);
  607. }
  608. /*
  609. * On startup, we build a temporary monmap populated with the IPs
  610. * provided by mount(2).
  611. */
  612. static int build_initial_monmap(struct ceph_mon_client *monc)
  613. {
  614. struct ceph_options *opt = monc->client->options;
  615. struct ceph_entity_addr *mon_addr = opt->mon_addr;
  616. int num_mon = opt->num_mon;
  617. int i;
  618. /* build initial monmap */
  619. monc->monmap = kzalloc(sizeof(*monc->monmap) +
  620. num_mon*sizeof(monc->monmap->mon_inst[0]),
  621. GFP_KERNEL);
  622. if (!monc->monmap)
  623. return -ENOMEM;
  624. for (i = 0; i < num_mon; i++) {
  625. monc->monmap->mon_inst[i].addr = mon_addr[i];
  626. monc->monmap->mon_inst[i].addr.nonce = 0;
  627. monc->monmap->mon_inst[i].name.type =
  628. CEPH_ENTITY_TYPE_MON;
  629. monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
  630. }
  631. monc->monmap->num_mon = num_mon;
  632. monc->have_fsid = false;
  633. return 0;
  634. }
  635. int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
  636. {
  637. int err = 0;
  638. dout("init\n");
  639. memset(monc, 0, sizeof(*monc));
  640. monc->client = cl;
  641. monc->monmap = NULL;
  642. mutex_init(&monc->mutex);
  643. err = build_initial_monmap(monc);
  644. if (err)
  645. goto out;
  646. /* connection */
  647. monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
  648. if (!monc->con)
  649. goto out_monmap;
  650. ceph_con_init(monc->client->msgr, monc->con);
  651. monc->con->private = monc;
  652. monc->con->ops = &mon_con_ops;
  653. /* authentication */
  654. monc->auth = ceph_auth_init(cl->options->name,
  655. cl->options->key);
  656. if (IS_ERR(monc->auth)) {
  657. err = PTR_ERR(monc->auth);
  658. goto out_con;
  659. }
  660. monc->auth->want_keys =
  661. CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
  662. CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
  663. /* msgs */
  664. err = -ENOMEM;
  665. monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
  666. sizeof(struct ceph_mon_subscribe_ack),
  667. GFP_NOFS, true);
  668. if (!monc->m_subscribe_ack)
  669. goto out_auth;
  670. monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS,
  671. true);
  672. if (!monc->m_subscribe)
  673. goto out_subscribe_ack;
  674. monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
  675. true);
  676. if (!monc->m_auth_reply)
  677. goto out_subscribe;
  678. monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
  679. monc->pending_auth = 0;
  680. if (!monc->m_auth)
  681. goto out_auth_reply;
  682. monc->cur_mon = -1;
  683. monc->hunting = true;
  684. monc->sub_renew_after = jiffies;
  685. monc->sub_sent = 0;
  686. INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
  687. monc->generic_request_tree = RB_ROOT;
  688. monc->num_generic_requests = 0;
  689. monc->last_tid = 0;
  690. monc->have_mdsmap = 0;
  691. monc->have_osdmap = 0;
  692. monc->want_next_osdmap = 1;
  693. return 0;
  694. out_auth_reply:
  695. ceph_msg_put(monc->m_auth_reply);
  696. out_subscribe:
  697. ceph_msg_put(monc->m_subscribe);
  698. out_subscribe_ack:
  699. ceph_msg_put(monc->m_subscribe_ack);
  700. out_auth:
  701. ceph_auth_destroy(monc->auth);
  702. out_con:
  703. monc->con->ops->put(monc->con);
  704. out_monmap:
  705. kfree(monc->monmap);
  706. out:
  707. return err;
  708. }
  709. EXPORT_SYMBOL(ceph_monc_init);
  710. void ceph_monc_stop(struct ceph_mon_client *monc)
  711. {
  712. dout("stop\n");
  713. cancel_delayed_work_sync(&monc->delayed_work);
  714. mutex_lock(&monc->mutex);
  715. __close_session(monc);
  716. monc->con->private = NULL;
  717. monc->con->ops->put(monc->con);
  718. monc->con = NULL;
  719. mutex_unlock(&monc->mutex);
  720. ceph_auth_destroy(monc->auth);
  721. ceph_msg_put(monc->m_auth);
  722. ceph_msg_put(monc->m_auth_reply);
  723. ceph_msg_put(monc->m_subscribe);
  724. ceph_msg_put(monc->m_subscribe_ack);
  725. kfree(monc->monmap);
  726. }
  727. EXPORT_SYMBOL(ceph_monc_stop);
  728. static void handle_auth_reply(struct ceph_mon_client *monc,
  729. struct ceph_msg *msg)
  730. {
  731. int ret;
  732. int was_auth = 0;
  733. mutex_lock(&monc->mutex);
  734. if (monc->auth->ops)
  735. was_auth = monc->auth->ops->is_authenticated(monc->auth);
  736. monc->pending_auth = 0;
  737. ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
  738. msg->front.iov_len,
  739. monc->m_auth->front.iov_base,
  740. monc->m_auth->front_max);
  741. if (ret < 0) {
  742. monc->client->auth_err = ret;
  743. wake_up_all(&monc->client->auth_wq);
  744. } else if (ret > 0) {
  745. __send_prepared_auth_request(monc, ret);
  746. } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
  747. dout("authenticated, starting session\n");
  748. monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
  749. monc->client->msgr->inst.name.num =
  750. cpu_to_le64(monc->auth->global_id);
  751. __send_subscribe(monc);
  752. __resend_generic_request(monc);
  753. }
  754. mutex_unlock(&monc->mutex);
  755. }
  756. static int __validate_auth(struct ceph_mon_client *monc)
  757. {
  758. int ret;
  759. if (monc->pending_auth)
  760. return 0;
  761. ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
  762. monc->m_auth->front_max);
  763. if (ret <= 0)
  764. return ret; /* either an error, or no need to authenticate */
  765. __send_prepared_auth_request(monc, ret);
  766. return 0;
  767. }
  768. int ceph_monc_validate_auth(struct ceph_mon_client *monc)
  769. {
  770. int ret;
  771. mutex_lock(&monc->mutex);
  772. ret = __validate_auth(monc);
  773. mutex_unlock(&monc->mutex);
  774. return ret;
  775. }
  776. EXPORT_SYMBOL(ceph_monc_validate_auth);
  777. /*
  778. * handle incoming message
  779. */
  780. static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
  781. {
  782. struct ceph_mon_client *monc = con->private;
  783. int type = le16_to_cpu(msg->hdr.type);
  784. if (!monc)
  785. return;
  786. switch (type) {
  787. case CEPH_MSG_AUTH_REPLY:
  788. handle_auth_reply(monc, msg);
  789. break;
  790. case CEPH_MSG_MON_SUBSCRIBE_ACK:
  791. handle_subscribe_ack(monc, msg);
  792. break;
  793. case CEPH_MSG_STATFS_REPLY:
  794. handle_statfs_reply(monc, msg);
  795. break;
  796. case CEPH_MSG_POOLOP_REPLY:
  797. handle_poolop_reply(monc, msg);
  798. break;
  799. case CEPH_MSG_MON_MAP:
  800. ceph_monc_handle_map(monc, msg);
  801. break;
  802. case CEPH_MSG_OSD_MAP:
  803. ceph_osdc_handle_map(&monc->client->osdc, msg);
  804. break;
  805. default:
  806. /* can the chained handler handle it? */
  807. if (monc->client->extra_mon_dispatch &&
  808. monc->client->extra_mon_dispatch(monc->client, msg) == 0)
  809. break;
  810. pr_err("received unknown message type %d %s\n", type,
  811. ceph_msg_type_name(type));
  812. }
  813. ceph_msg_put(msg);
  814. }
  815. /*
  816. * Allocate memory for incoming message
  817. */
  818. static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
  819. struct ceph_msg_header *hdr,
  820. int *skip)
  821. {
  822. struct ceph_mon_client *monc = con->private;
  823. int type = le16_to_cpu(hdr->type);
  824. int front_len = le32_to_cpu(hdr->front_len);
  825. struct ceph_msg *m = NULL;
  826. *skip = 0;
  827. switch (type) {
  828. case CEPH_MSG_MON_SUBSCRIBE_ACK:
  829. m = ceph_msg_get(monc->m_subscribe_ack);
  830. break;
  831. case CEPH_MSG_POOLOP_REPLY:
  832. case CEPH_MSG_STATFS_REPLY:
  833. return get_generic_reply(con, hdr, skip);
  834. case CEPH_MSG_AUTH_REPLY:
  835. m = ceph_msg_get(monc->m_auth_reply);
  836. break;
  837. case CEPH_MSG_MON_MAP:
  838. case CEPH_MSG_MDS_MAP:
  839. case CEPH_MSG_OSD_MAP:
  840. m = ceph_msg_new(type, front_len, GFP_NOFS, false);
  841. break;
  842. }
  843. if (!m) {
  844. pr_info("alloc_msg unknown type %d\n", type);
  845. *skip = 1;
  846. }
  847. return m;
  848. }
  849. /*
  850. * If the monitor connection resets, pick a new monitor and resubmit
  851. * any pending requests.
  852. */
  853. static void mon_fault(struct ceph_connection *con)
  854. {
  855. struct ceph_mon_client *monc = con->private;
  856. if (!monc)
  857. return;
  858. dout("mon_fault\n");
  859. mutex_lock(&monc->mutex);
  860. if (!con->private)
  861. goto out;
  862. if (!monc->hunting)
  863. pr_info("mon%d %s session lost, "
  864. "hunting for new mon\n", monc->cur_mon,
  865. ceph_pr_addr(&monc->con->peer_addr.in_addr));
  866. __close_session(monc);
  867. if (!monc->hunting) {
  868. /* start hunting */
  869. monc->hunting = true;
  870. __open_session(monc);
  871. } else {
  872. /* already hunting, let's wait a bit */
  873. __schedule_delayed(monc);
  874. }
  875. out:
  876. mutex_unlock(&monc->mutex);
  877. }
  878. static const struct ceph_connection_operations mon_con_ops = {
  879. .get = ceph_con_get,
  880. .put = ceph_con_put,
  881. .dispatch = dispatch,
  882. .fault = mon_fault,
  883. .alloc_msg = mon_alloc_msg,
  884. };