member.c 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. /******************************************************************************
  2. *******************************************************************************
  3. **
  4. ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
  5. **
  6. ** This copyrighted material is made available to anyone wishing to use,
  7. ** modify, copy, or redistribute it subject to the terms and conditions
  8. ** of the GNU General Public License v.2.
  9. **
  10. *******************************************************************************
  11. ******************************************************************************/
  12. #include "dlm_internal.h"
  13. #include "lockspace.h"
  14. #include "member.h"
  15. #include "recoverd.h"
  16. #include "recover.h"
  17. #include "rcom.h"
  18. #include "config.h"
  19. static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
  20. {
  21. struct dlm_member *memb = NULL;
  22. struct list_head *tmp;
  23. struct list_head *newlist = &new->list;
  24. struct list_head *head = &ls->ls_nodes;
  25. list_for_each(tmp, head) {
  26. memb = list_entry(tmp, struct dlm_member, list);
  27. if (new->nodeid < memb->nodeid)
  28. break;
  29. }
  30. if (!memb)
  31. list_add_tail(newlist, head);
  32. else {
  33. /* FIXME: can use list macro here */
  34. newlist->prev = tmp->prev;
  35. newlist->next = tmp;
  36. tmp->prev->next = newlist;
  37. tmp->prev = newlist;
  38. }
  39. }
  40. static int dlm_add_member(struct dlm_ls *ls, int nodeid)
  41. {
  42. struct dlm_member *memb;
  43. int w;
  44. memb = kzalloc(sizeof(struct dlm_member), GFP_KERNEL);
  45. if (!memb)
  46. return -ENOMEM;
  47. w = dlm_node_weight(ls->ls_name, nodeid);
  48. if (w < 0) {
  49. kfree(memb);
  50. return w;
  51. }
  52. memb->nodeid = nodeid;
  53. memb->weight = w;
  54. add_ordered_member(ls, memb);
  55. ls->ls_num_nodes++;
  56. return 0;
  57. }
  58. static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb)
  59. {
  60. list_move(&memb->list, &ls->ls_nodes_gone);
  61. ls->ls_num_nodes--;
  62. }
  63. static int dlm_is_member(struct dlm_ls *ls, int nodeid)
  64. {
  65. struct dlm_member *memb;
  66. list_for_each_entry(memb, &ls->ls_nodes, list) {
  67. if (memb->nodeid == nodeid)
  68. return 1;
  69. }
  70. return 0;
  71. }
  72. int dlm_is_removed(struct dlm_ls *ls, int nodeid)
  73. {
  74. struct dlm_member *memb;
  75. list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
  76. if (memb->nodeid == nodeid)
  77. return 1;
  78. }
  79. return 0;
  80. }
  81. static void clear_memb_list(struct list_head *head)
  82. {
  83. struct dlm_member *memb;
  84. while (!list_empty(head)) {
  85. memb = list_entry(head->next, struct dlm_member, list);
  86. list_del(&memb->list);
  87. kfree(memb);
  88. }
  89. }
  90. void dlm_clear_members(struct dlm_ls *ls)
  91. {
  92. clear_memb_list(&ls->ls_nodes);
  93. ls->ls_num_nodes = 0;
  94. }
  95. void dlm_clear_members_gone(struct dlm_ls *ls)
  96. {
  97. clear_memb_list(&ls->ls_nodes_gone);
  98. }
  99. static void make_member_array(struct dlm_ls *ls)
  100. {
  101. struct dlm_member *memb;
  102. int i, w, x = 0, total = 0, all_zero = 0, *array;
  103. kfree(ls->ls_node_array);
  104. ls->ls_node_array = NULL;
  105. list_for_each_entry(memb, &ls->ls_nodes, list) {
  106. if (memb->weight)
  107. total += memb->weight;
  108. }
  109. /* all nodes revert to weight of 1 if all have weight 0 */
  110. if (!total) {
  111. total = ls->ls_num_nodes;
  112. all_zero = 1;
  113. }
  114. ls->ls_total_weight = total;
  115. array = kmalloc(sizeof(int) * total, GFP_KERNEL);
  116. if (!array)
  117. return;
  118. list_for_each_entry(memb, &ls->ls_nodes, list) {
  119. if (!all_zero && !memb->weight)
  120. continue;
  121. if (all_zero)
  122. w = 1;
  123. else
  124. w = memb->weight;
  125. DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
  126. for (i = 0; i < w; i++)
  127. array[x++] = memb->nodeid;
  128. }
  129. ls->ls_node_array = array;
  130. }
  131. /* send a status request to all members just to establish comms connections */
  132. static int ping_members(struct dlm_ls *ls)
  133. {
  134. struct dlm_member *memb;
  135. int error = 0;
  136. list_for_each_entry(memb, &ls->ls_nodes, list) {
  137. error = dlm_recovery_stopped(ls);
  138. if (error)
  139. break;
  140. error = dlm_rcom_status(ls, memb->nodeid);
  141. if (error)
  142. break;
  143. }
  144. if (error)
  145. log_debug(ls, "ping_members aborted %d last nodeid %d",
  146. error, ls->ls_recover_nodeid);
  147. return error;
  148. }
  149. int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
  150. {
  151. struct dlm_member *memb, *safe;
  152. int i, error, found, pos = 0, neg = 0, low = -1;
  153. /* previously removed members that we've not finished removing need to
  154. count as a negative change so the "neg" recovery steps will happen */
  155. list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
  156. log_debug(ls, "prev removed member %d", memb->nodeid);
  157. neg++;
  158. }
  159. /* move departed members from ls_nodes to ls_nodes_gone */
  160. list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
  161. found = 0;
  162. for (i = 0; i < rv->node_count; i++) {
  163. if (memb->nodeid == rv->nodeids[i]) {
  164. found = 1;
  165. break;
  166. }
  167. }
  168. if (!found) {
  169. neg++;
  170. dlm_remove_member(ls, memb);
  171. log_debug(ls, "remove member %d", memb->nodeid);
  172. }
  173. }
  174. /* add new members to ls_nodes */
  175. for (i = 0; i < rv->node_count; i++) {
  176. if (dlm_is_member(ls, rv->nodeids[i]))
  177. continue;
  178. dlm_add_member(ls, rv->nodeids[i]);
  179. pos++;
  180. log_debug(ls, "add member %d", rv->nodeids[i]);
  181. }
  182. list_for_each_entry(memb, &ls->ls_nodes, list) {
  183. if (low == -1 || memb->nodeid < low)
  184. low = memb->nodeid;
  185. }
  186. ls->ls_low_nodeid = low;
  187. make_member_array(ls);
  188. dlm_set_recover_status(ls, DLM_RS_NODES);
  189. *neg_out = neg;
  190. error = ping_members(ls);
  191. if (!error || error == -EPROTO) {
  192. /* new_lockspace() may be waiting to know if the config
  193. is good or bad */
  194. ls->ls_members_result = error;
  195. complete(&ls->ls_members_done);
  196. }
  197. if (error)
  198. goto out;
  199. error = dlm_recover_members_wait(ls);
  200. out:
  201. log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error);
  202. return error;
  203. }
  204. /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
  205. dlm_ls_start() is called on any of them to start the new recovery. */
  206. int dlm_ls_stop(struct dlm_ls *ls)
  207. {
  208. int new;
  209. /*
  210. * Prevent dlm_recv from being in the middle of something when we do
  211. * the stop. This includes ensuring dlm_recv isn't processing a
  212. * recovery message (rcom), while dlm_recoverd is aborting and
  213. * resetting things from an in-progress recovery. i.e. we want
  214. * dlm_recoverd to abort its recovery without worrying about dlm_recv
  215. * processing an rcom at the same time. Stopping dlm_recv also makes
  216. * it easy for dlm_receive_message() to check locking stopped and add a
  217. * message to the requestqueue without races.
  218. */
  219. down_write(&ls->ls_recv_active);
  220. /*
  221. * Abort any recovery that's in progress (see RECOVERY_STOP,
  222. * dlm_recovery_stopped()) and tell any other threads running in the
  223. * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
  224. */
  225. spin_lock(&ls->ls_recover_lock);
  226. set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
  227. new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
  228. ls->ls_recover_seq++;
  229. spin_unlock(&ls->ls_recover_lock);
  230. /*
  231. * Let dlm_recv run again, now any normal messages will be saved on the
  232. * requestqueue for later.
  233. */
  234. up_write(&ls->ls_recv_active);
  235. /*
  236. * This in_recovery lock does two things:
  237. * 1) Keeps this function from returning until all threads are out
  238. * of locking routines and locking is truely stopped.
  239. * 2) Keeps any new requests from being processed until it's unlocked
  240. * when recovery is complete.
  241. */
  242. if (new)
  243. down_write(&ls->ls_in_recovery);
  244. /*
  245. * The recoverd suspend/resume makes sure that dlm_recoverd (if
  246. * running) has noticed RECOVERY_STOP above and quit processing the
  247. * previous recovery.
  248. */
  249. dlm_recoverd_suspend(ls);
  250. ls->ls_recover_status = 0;
  251. dlm_recoverd_resume(ls);
  252. if (!ls->ls_recover_begin)
  253. ls->ls_recover_begin = jiffies;
  254. return 0;
  255. }
  256. int dlm_ls_start(struct dlm_ls *ls)
  257. {
  258. struct dlm_recover *rv = NULL, *rv_old;
  259. int *ids = NULL;
  260. int error, count;
  261. rv = kzalloc(sizeof(struct dlm_recover), GFP_KERNEL);
  262. if (!rv)
  263. return -ENOMEM;
  264. error = count = dlm_nodeid_list(ls->ls_name, &ids);
  265. if (error <= 0)
  266. goto fail;
  267. spin_lock(&ls->ls_recover_lock);
  268. /* the lockspace needs to be stopped before it can be started */
  269. if (!dlm_locking_stopped(ls)) {
  270. spin_unlock(&ls->ls_recover_lock);
  271. log_error(ls, "start ignored: lockspace running");
  272. error = -EINVAL;
  273. goto fail;
  274. }
  275. rv->nodeids = ids;
  276. rv->node_count = count;
  277. rv->seq = ++ls->ls_recover_seq;
  278. rv_old = ls->ls_recover_args;
  279. ls->ls_recover_args = rv;
  280. spin_unlock(&ls->ls_recover_lock);
  281. if (rv_old) {
  282. kfree(rv_old->nodeids);
  283. kfree(rv_old);
  284. }
  285. dlm_recoverd_kick(ls);
  286. return 0;
  287. fail:
  288. kfree(rv);
  289. kfree(ids);
  290. return error;
  291. }