member.c 8.9 KB


  1. /******************************************************************************
  2. *******************************************************************************
  3. **
  4. ** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
  5. **
  6. ** This copyrighted material is made available to anyone wishing to use,
  7. ** modify, copy, or redistribute it subject to the terms and conditions
  8. ** of the GNU General Public License v.2.
  9. **
  10. *******************************************************************************
  11. ******************************************************************************/
  12. #include "dlm_internal.h"
  13. #include "lockspace.h"
  14. #include "member.h"
  15. #include "recoverd.h"
  16. #include "recover.h"
  17. #include "rcom.h"
  18. #include "config.h"
  19. static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
  20. {
  21. struct dlm_member *memb = NULL;
  22. struct list_head *tmp;
  23. struct list_head *newlist = &new->list;
  24. struct list_head *head = &ls->ls_nodes;
  25. list_for_each(tmp, head) {
  26. memb = list_entry(tmp, struct dlm_member, list);
  27. if (new->nodeid < memb->nodeid)
  28. break;
  29. }
  30. if (!memb)
  31. list_add_tail(newlist, head);
  32. else {
  33. /* FIXME: can use list macro here */
  34. newlist->prev = tmp->prev;
  35. newlist->next = tmp;
  36. tmp->prev->next = newlist;
  37. tmp->prev = newlist;
  38. }
  39. }
  40. static int dlm_add_member(struct dlm_ls *ls, int nodeid)
  41. {
  42. struct dlm_member *memb;
  43. int w;
  44. memb = kzalloc(sizeof(struct dlm_member), GFP_KERNEL);
  45. if (!memb)
  46. return -ENOMEM;
  47. w = dlm_node_weight(ls->ls_name, nodeid);
  48. if (w < 0) {
  49. kfree(memb);
  50. return w;
  51. }
  52. memb->nodeid = nodeid;
  53. memb->weight = w;
  54. add_ordered_member(ls, memb);
  55. ls->ls_num_nodes++;
  56. return 0;
  57. }
  58. static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb)
  59. {
  60. list_move(&memb->list, &ls->ls_nodes_gone);
  61. ls->ls_num_nodes--;
  62. }
  63. int dlm_is_member(struct dlm_ls *ls, int nodeid)
  64. {
  65. struct dlm_member *memb;
  66. list_for_each_entry(memb, &ls->ls_nodes, list) {
  67. if (memb->nodeid == nodeid)
  68. return 1;
  69. }
  70. return 0;
  71. }
  72. int dlm_is_removed(struct dlm_ls *ls, int nodeid)
  73. {
  74. struct dlm_member *memb;
  75. list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
  76. if (memb->nodeid == nodeid)
  77. return 1;
  78. }
  79. return 0;
  80. }
  81. static void clear_memb_list(struct list_head *head)
  82. {
  83. struct dlm_member *memb;
  84. while (!list_empty(head)) {
  85. memb = list_entry(head->next, struct dlm_member, list);
  86. list_del(&memb->list);
  87. kfree(memb);
  88. }
  89. }
  90. void dlm_clear_members(struct dlm_ls *ls)
  91. {
  92. clear_memb_list(&ls->ls_nodes);
  93. ls->ls_num_nodes = 0;
  94. }
  95. void dlm_clear_members_gone(struct dlm_ls *ls)
  96. {
  97. clear_memb_list(&ls->ls_nodes_gone);
  98. }
  99. static void make_member_array(struct dlm_ls *ls)
  100. {
  101. struct dlm_member *memb;
  102. int i, w, x = 0, total = 0, all_zero = 0, *array;
  103. kfree(ls->ls_node_array);
  104. ls->ls_node_array = NULL;
  105. list_for_each_entry(memb, &ls->ls_nodes, list) {
  106. if (memb->weight)
  107. total += memb->weight;
  108. }
  109. /* all nodes revert to weight of 1 if all have weight 0 */
  110. if (!total) {
  111. total = ls->ls_num_nodes;
  112. all_zero = 1;
  113. }
  114. ls->ls_total_weight = total;
  115. array = kmalloc(sizeof(int) * total, GFP_KERNEL);
  116. if (!array)
  117. return;
  118. list_for_each_entry(memb, &ls->ls_nodes, list) {
  119. if (!all_zero && !memb->weight)
  120. continue;
  121. if (all_zero)
  122. w = 1;
  123. else
  124. w = memb->weight;
  125. DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
  126. for (i = 0; i < w; i++)
  127. array[x++] = memb->nodeid;
  128. }
  129. ls->ls_node_array = array;
  130. }
  131. /* send a status request to all members just to establish comms connections */
  132. static int ping_members(struct dlm_ls *ls)
  133. {
  134. struct dlm_member *memb;
  135. int error = 0;
  136. list_for_each_entry(memb, &ls->ls_nodes, list) {
  137. error = dlm_recovery_stopped(ls);
  138. if (error)
  139. break;
  140. error = dlm_rcom_status(ls, memb->nodeid);
  141. if (error)
  142. break;
  143. }
  144. if (error)
  145. log_debug(ls, "ping_members aborted %d last nodeid %d",
  146. error, ls->ls_recover_nodeid);
  147. return error;
  148. }
  149. int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
  150. {
  151. struct dlm_member *memb, *safe;
  152. int i, error, found, pos = 0, neg = 0, low = -1;
  153. /* previously removed members that we've not finished removing need to
  154. count as a negative change so the "neg" recovery steps will happen */
  155. list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
  156. log_debug(ls, "prev removed member %d", memb->nodeid);
  157. neg++;
  158. }
  159. /* move departed members from ls_nodes to ls_nodes_gone */
  160. list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
  161. found = 0;
  162. for (i = 0; i < rv->node_count; i++) {
  163. if (memb->nodeid == rv->nodeids[i]) {
  164. found = 1;
  165. break;
  166. }
  167. }
  168. if (!found) {
  169. neg++;
  170. dlm_remove_member(ls, memb);
  171. log_debug(ls, "remove member %d", memb->nodeid);
  172. }
  173. }
  174. /* Add an entry to ls_nodes_gone for members that were removed and
  175. then added again, so that previous state for these nodes will be
  176. cleared during recovery. */
  177. for (i = 0; i < rv->new_count; i++) {
  178. if (!dlm_is_member(ls, rv->new[i]))
  179. continue;
  180. log_debug(ls, "new nodeid %d is a re-added member", rv->new[i]);
  181. memb = kzalloc(sizeof(struct dlm_member), GFP_KERNEL);
  182. if (!memb)
  183. return -ENOMEM;
  184. memb->nodeid = rv->new[i];
  185. list_add_tail(&memb->list, &ls->ls_nodes_gone);
  186. neg++;
  187. }
  188. /* add new members to ls_nodes */
  189. for (i = 0; i < rv->node_count; i++) {
  190. if (dlm_is_member(ls, rv->nodeids[i]))
  191. continue;
  192. dlm_add_member(ls, rv->nodeids[i]);
  193. pos++;
  194. log_debug(ls, "add member %d", rv->nodeids[i]);
  195. }
  196. list_for_each_entry(memb, &ls->ls_nodes, list) {
  197. if (low == -1 || memb->nodeid < low)
  198. low = memb->nodeid;
  199. }
  200. ls->ls_low_nodeid = low;
  201. make_member_array(ls);
  202. dlm_set_recover_status(ls, DLM_RS_NODES);
  203. *neg_out = neg;
  204. error = ping_members(ls);
  205. if (!error || error == -EPROTO) {
  206. /* new_lockspace() may be waiting to know if the config
  207. is good or bad */
  208. ls->ls_members_result = error;
  209. complete(&ls->ls_members_done);
  210. }
  211. if (error)
  212. goto out;
  213. error = dlm_recover_members_wait(ls);
  214. out:
  215. log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error);
  216. return error;
  217. }
  218. /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
  219. dlm_ls_start() is called on any of them to start the new recovery. */
  220. int dlm_ls_stop(struct dlm_ls *ls)
  221. {
  222. int new;
  223. /*
  224. * Prevent dlm_recv from being in the middle of something when we do
  225. * the stop. This includes ensuring dlm_recv isn't processing a
  226. * recovery message (rcom), while dlm_recoverd is aborting and
  227. * resetting things from an in-progress recovery. i.e. we want
  228. * dlm_recoverd to abort its recovery without worrying about dlm_recv
  229. * processing an rcom at the same time. Stopping dlm_recv also makes
  230. * it easy for dlm_receive_message() to check locking stopped and add a
  231. * message to the requestqueue without races.
  232. */
  233. down_write(&ls->ls_recv_active);
  234. /*
  235. * Abort any recovery that's in progress (see RECOVERY_STOP,
  236. * dlm_recovery_stopped()) and tell any other threads running in the
  237. * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
  238. */
  239. spin_lock(&ls->ls_recover_lock);
  240. set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
  241. new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
  242. ls->ls_recover_seq++;
  243. spin_unlock(&ls->ls_recover_lock);
  244. /*
  245. * Let dlm_recv run again, now any normal messages will be saved on the
  246. * requestqueue for later.
  247. */
  248. up_write(&ls->ls_recv_active);
  249. /*
  250. * This in_recovery lock does two things:
  251. * 1) Keeps this function from returning until all threads are out
  252. * of locking routines and locking is truely stopped.
  253. * 2) Keeps any new requests from being processed until it's unlocked
  254. * when recovery is complete.
  255. */
  256. if (new)
  257. down_write(&ls->ls_in_recovery);
  258. /*
  259. * The recoverd suspend/resume makes sure that dlm_recoverd (if
  260. * running) has noticed RECOVERY_STOP above and quit processing the
  261. * previous recovery.
  262. */
  263. dlm_recoverd_suspend(ls);
  264. ls->ls_recover_status = 0;
  265. dlm_recoverd_resume(ls);
  266. if (!ls->ls_recover_begin)
  267. ls->ls_recover_begin = jiffies;
  268. return 0;
  269. }
  270. int dlm_ls_start(struct dlm_ls *ls)
  271. {
  272. struct dlm_recover *rv = NULL, *rv_old;
  273. int *ids = NULL, *new = NULL;
  274. int error, ids_count = 0, new_count = 0;
  275. rv = kzalloc(sizeof(struct dlm_recover), GFP_KERNEL);
  276. if (!rv)
  277. return -ENOMEM;
  278. error = dlm_nodeid_list(ls->ls_name, &ids, &ids_count,
  279. &new, &new_count);
  280. if (error < 0)
  281. goto fail;
  282. spin_lock(&ls->ls_recover_lock);
  283. /* the lockspace needs to be stopped before it can be started */
  284. if (!dlm_locking_stopped(ls)) {
  285. spin_unlock(&ls->ls_recover_lock);
  286. log_error(ls, "start ignored: lockspace running");
  287. error = -EINVAL;
  288. goto fail;
  289. }
  290. rv->nodeids = ids;
  291. rv->node_count = ids_count;
  292. rv->new = new;
  293. rv->new_count = new_count;
  294. rv->seq = ++ls->ls_recover_seq;
  295. rv_old = ls->ls_recover_args;
  296. ls->ls_recover_args = rv;
  297. spin_unlock(&ls->ls_recover_lock);
  298. if (rv_old) {
  299. log_error(ls, "unused recovery %llx %d",
  300. (unsigned long long)rv_old->seq, rv_old->node_count);
  301. kfree(rv_old->nodeids);
  302. kfree(rv_old->new);
  303. kfree(rv_old);
  304. }
  305. dlm_recoverd_kick(ls);
  306. return 0;
  307. fail:
  308. kfree(rv);
  309. kfree(ids);
  310. kfree(new);
  311. return error;
  312. }