dlmmaster.c 71 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664
  1. /* -*- mode: c; c-basic-offset: 8; -*-
  2. * vim: noexpandtab sw=8 ts=8 sts=0:
  3. *
  4. * dlmmod.c
  5. *
  6. * standalone DLM module
  7. *
  8. * Copyright (C) 2004 Oracle. All rights reserved.
  9. *
  10. * This program is free software; you can redistribute it and/or
  11. * modify it under the terms of the GNU General Public
  12. * License as published by the Free Software Foundation; either
  13. * version 2 of the License, or (at your option) any later version.
  14. *
  15. * This program is distributed in the hope that it will be useful,
  16. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  17. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  18. * General Public License for more details.
  19. *
  20. * You should have received a copy of the GNU General Public
  21. * License along with this program; if not, write to the
  22. * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  23. * Boston, MA 021110-1307, USA.
  24. *
  25. */
  26. #include <linux/module.h>
  27. #include <linux/fs.h>
  28. #include <linux/types.h>
  29. #include <linux/slab.h>
  30. #include <linux/highmem.h>
  31. #include <linux/utsname.h>
  32. #include <linux/init.h>
  33. #include <linux/sysctl.h>
  34. #include <linux/random.h>
  35. #include <linux/blkdev.h>
  36. #include <linux/socket.h>
  37. #include <linux/inet.h>
  38. #include <linux/spinlock.h>
  39. #include <linux/delay.h>
  40. #include "cluster/heartbeat.h"
  41. #include "cluster/nodemanager.h"
  42. #include "cluster/tcp.h"
  43. #include "dlmapi.h"
  44. #include "dlmcommon.h"
  45. #include "dlmdebug.h"
  46. #include "dlmdomain.h"
  47. #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
  48. #include "cluster/masklog.h"
  49. enum dlm_mle_type {
  50. DLM_MLE_BLOCK,
  51. DLM_MLE_MASTER,
  52. DLM_MLE_MIGRATION
  53. };
  54. struct dlm_lock_name
  55. {
  56. u8 len;
  57. u8 name[DLM_LOCKID_NAME_MAX];
  58. };
  59. struct dlm_master_list_entry
  60. {
  61. struct list_head list;
  62. struct list_head hb_events;
  63. struct dlm_ctxt *dlm;
  64. spinlock_t spinlock;
  65. wait_queue_head_t wq;
  66. atomic_t woken;
  67. struct kref mle_refs;
  68. unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
  69. unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
  70. unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
  71. unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
  72. u8 master;
  73. u8 new_master;
  74. enum dlm_mle_type type;
  75. struct o2hb_callback_func mle_hb_up;
  76. struct o2hb_callback_func mle_hb_down;
  77. union {
  78. struct dlm_lock_resource *res;
  79. struct dlm_lock_name name;
  80. } u;
  81. };
  82. static void dlm_mle_node_down(struct dlm_ctxt *dlm,
  83. struct dlm_master_list_entry *mle,
  84. struct o2nm_node *node,
  85. int idx);
  86. static void dlm_mle_node_up(struct dlm_ctxt *dlm,
  87. struct dlm_master_list_entry *mle,
  88. struct o2nm_node *node,
  89. int idx);
  90. static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
  91. static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
  92. unsigned int namelen, void *nodemap,
  93. u32 flags);
  94. static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
  95. struct dlm_master_list_entry *mle,
  96. const char *name,
  97. unsigned int namelen)
  98. {
  99. struct dlm_lock_resource *res;
  100. if (dlm != mle->dlm)
  101. return 0;
  102. if (mle->type == DLM_MLE_BLOCK ||
  103. mle->type == DLM_MLE_MIGRATION) {
  104. if (namelen != mle->u.name.len ||
  105. memcmp(name, mle->u.name.name, namelen)!=0)
  106. return 0;
  107. } else {
  108. res = mle->u.res;
  109. if (namelen != res->lockname.len ||
  110. memcmp(res->lockname.name, name, namelen) != 0)
  111. return 0;
  112. }
  113. return 1;
  114. }
  115. #if 0
  116. /* Code here is included but defined out as it aids debugging */
  117. void dlm_print_one_mle(struct dlm_master_list_entry *mle)
  118. {
  119. int i = 0, refs;
  120. char *type;
  121. char attached;
  122. u8 master;
  123. unsigned int namelen;
  124. const char *name;
  125. struct kref *k;
  126. k = &mle->mle_refs;
  127. if (mle->type == DLM_MLE_BLOCK)
  128. type = "BLK";
  129. else if (mle->type == DLM_MLE_MASTER)
  130. type = "MAS";
  131. else
  132. type = "MIG";
  133. refs = atomic_read(&k->refcount);
  134. master = mle->master;
  135. attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
  136. if (mle->type != DLM_MLE_MASTER) {
  137. namelen = mle->u.name.len;
  138. name = mle->u.name.name;
  139. } else {
  140. namelen = mle->u.res->lockname.len;
  141. name = mle->u.res->lockname.name;
  142. }
  143. mlog(ML_NOTICE, " #%3d: %3s %3d %3u %3u %c (%d)%.*s\n",
  144. i, type, refs, master, mle->new_master, attached,
  145. namelen, namelen, name);
  146. }
  147. static void dlm_dump_mles(struct dlm_ctxt *dlm)
  148. {
  149. struct dlm_master_list_entry *mle;
  150. struct list_head *iter;
  151. mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
  152. mlog(ML_NOTICE, " ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n");
  153. spin_lock(&dlm->master_lock);
  154. list_for_each(iter, &dlm->master_list) {
  155. mle = list_entry(iter, struct dlm_master_list_entry, list);
  156. dlm_print_one_mle(mle);
  157. }
  158. spin_unlock(&dlm->master_lock);
  159. }
  160. int dlm_dump_all_mles(const char __user *data, unsigned int len)
  161. {
  162. struct list_head *iter;
  163. struct dlm_ctxt *dlm;
  164. spin_lock(&dlm_domain_lock);
  165. list_for_each(iter, &dlm_domains) {
  166. dlm = list_entry (iter, struct dlm_ctxt, list);
  167. mlog(ML_NOTICE, "found dlm: %p, name=%s\n", dlm, dlm->name);
  168. dlm_dump_mles(dlm);
  169. }
  170. spin_unlock(&dlm_domain_lock);
  171. return len;
  172. }
  173. EXPORT_SYMBOL_GPL(dlm_dump_all_mles);
  174. #endif /* 0 */
  175. static kmem_cache_t *dlm_mle_cache = NULL;
  176. static void dlm_mle_release(struct kref *kref);
  177. static void dlm_init_mle(struct dlm_master_list_entry *mle,
  178. enum dlm_mle_type type,
  179. struct dlm_ctxt *dlm,
  180. struct dlm_lock_resource *res,
  181. const char *name,
  182. unsigned int namelen);
  183. static void dlm_put_mle(struct dlm_master_list_entry *mle);
  184. static void __dlm_put_mle(struct dlm_master_list_entry *mle);
  185. static int dlm_find_mle(struct dlm_ctxt *dlm,
  186. struct dlm_master_list_entry **mle,
  187. char *name, unsigned int namelen);
  188. static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to);
  189. static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
  190. struct dlm_lock_resource *res,
  191. struct dlm_master_list_entry *mle,
  192. int *blocked);
  193. static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
  194. struct dlm_lock_resource *res,
  195. struct dlm_master_list_entry *mle,
  196. int blocked);
  197. static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
  198. struct dlm_lock_resource *res,
  199. struct dlm_master_list_entry *mle,
  200. struct dlm_master_list_entry **oldmle,
  201. const char *name, unsigned int namelen,
  202. u8 new_master, u8 master);
  203. static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
  204. struct dlm_lock_resource *res);
  205. static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
  206. struct dlm_lock_resource *res);
  207. static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
  208. struct dlm_lock_resource *res,
  209. u8 target);
  210. int dlm_is_host_down(int errno)
  211. {
  212. switch (errno) {
  213. case -EBADF:
  214. case -ECONNREFUSED:
  215. case -ENOTCONN:
  216. case -ECONNRESET:
  217. case -EPIPE:
  218. case -EHOSTDOWN:
  219. case -EHOSTUNREACH:
  220. case -ETIMEDOUT:
  221. case -ECONNABORTED:
  222. case -ENETDOWN:
  223. case -ENETUNREACH:
  224. case -ENETRESET:
  225. case -ESHUTDOWN:
  226. case -ENOPROTOOPT:
  227. case -EINVAL: /* if returned from our tcp code,
  228. this means there is no socket */
  229. return 1;
  230. }
  231. return 0;
  232. }
  233. /*
  234. * MASTER LIST FUNCTIONS
  235. */
  236. /*
  237. * regarding master list entries and heartbeat callbacks:
  238. *
  239. * in order to avoid sleeping and allocation that occurs in
  240. * heartbeat, master list entries are simply attached to the
  241. * dlm's established heartbeat callbacks. the mle is attached
  242. * when it is created, and since the dlm->spinlock is held at
  243. * that time, any heartbeat event will be properly discovered
  244. * by the mle. the mle needs to be detached from the
  245. * dlm->mle_hb_events list as soon as heartbeat events are no
  246. * longer useful to the mle, and before the mle is freed.
  247. *
  248. * as a general rule, heartbeat events are no longer needed by
  249. * the mle once an "answer" regarding the lock master has been
  250. * received.
  251. */
  252. static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
  253. struct dlm_master_list_entry *mle)
  254. {
  255. assert_spin_locked(&dlm->spinlock);
  256. list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
  257. }
  258. static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
  259. struct dlm_master_list_entry *mle)
  260. {
  261. if (!list_empty(&mle->hb_events))
  262. list_del_init(&mle->hb_events);
  263. }
  264. static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
  265. struct dlm_master_list_entry *mle)
  266. {
  267. spin_lock(&dlm->spinlock);
  268. __dlm_mle_detach_hb_events(dlm, mle);
  269. spin_unlock(&dlm->spinlock);
  270. }
  271. /* remove from list and free */
  272. static void __dlm_put_mle(struct dlm_master_list_entry *mle)
  273. {
  274. struct dlm_ctxt *dlm;
  275. dlm = mle->dlm;
  276. assert_spin_locked(&dlm->spinlock);
  277. assert_spin_locked(&dlm->master_lock);
  278. BUG_ON(!atomic_read(&mle->mle_refs.refcount));
  279. kref_put(&mle->mle_refs, dlm_mle_release);
  280. }
  281. /* must not have any spinlocks coming in */
  282. static void dlm_put_mle(struct dlm_master_list_entry *mle)
  283. {
  284. struct dlm_ctxt *dlm;
  285. dlm = mle->dlm;
  286. spin_lock(&dlm->spinlock);
  287. spin_lock(&dlm->master_lock);
  288. __dlm_put_mle(mle);
  289. spin_unlock(&dlm->master_lock);
  290. spin_unlock(&dlm->spinlock);
  291. }
  292. static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
  293. {
  294. kref_get(&mle->mle_refs);
  295. }
  296. static void dlm_init_mle(struct dlm_master_list_entry *mle,
  297. enum dlm_mle_type type,
  298. struct dlm_ctxt *dlm,
  299. struct dlm_lock_resource *res,
  300. const char *name,
  301. unsigned int namelen)
  302. {
  303. assert_spin_locked(&dlm->spinlock);
  304. mle->dlm = dlm;
  305. mle->type = type;
  306. INIT_LIST_HEAD(&mle->list);
  307. INIT_LIST_HEAD(&mle->hb_events);
  308. memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
  309. spin_lock_init(&mle->spinlock);
  310. init_waitqueue_head(&mle->wq);
  311. atomic_set(&mle->woken, 0);
  312. kref_init(&mle->mle_refs);
  313. memset(mle->response_map, 0, sizeof(mle->response_map));
  314. mle->master = O2NM_MAX_NODES;
  315. mle->new_master = O2NM_MAX_NODES;
  316. if (mle->type == DLM_MLE_MASTER) {
  317. BUG_ON(!res);
  318. mle->u.res = res;
  319. } else if (mle->type == DLM_MLE_BLOCK) {
  320. BUG_ON(!name);
  321. memcpy(mle->u.name.name, name, namelen);
  322. mle->u.name.len = namelen;
  323. } else /* DLM_MLE_MIGRATION */ {
  324. BUG_ON(!name);
  325. memcpy(mle->u.name.name, name, namelen);
  326. mle->u.name.len = namelen;
  327. }
  328. /* copy off the node_map and register hb callbacks on our copy */
  329. memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
  330. memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
  331. clear_bit(dlm->node_num, mle->vote_map);
  332. clear_bit(dlm->node_num, mle->node_map);
  333. /* attach the mle to the domain node up/down events */
  334. __dlm_mle_attach_hb_events(dlm, mle);
  335. }
  336. /* returns 1 if found, 0 if not */
  337. static int dlm_find_mle(struct dlm_ctxt *dlm,
  338. struct dlm_master_list_entry **mle,
  339. char *name, unsigned int namelen)
  340. {
  341. struct dlm_master_list_entry *tmpmle;
  342. struct list_head *iter;
  343. assert_spin_locked(&dlm->master_lock);
  344. list_for_each(iter, &dlm->master_list) {
  345. tmpmle = list_entry(iter, struct dlm_master_list_entry, list);
  346. if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
  347. continue;
  348. dlm_get_mle(tmpmle);
  349. *mle = tmpmle;
  350. return 1;
  351. }
  352. return 0;
  353. }
  354. void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
  355. {
  356. struct dlm_master_list_entry *mle;
  357. struct list_head *iter;
  358. assert_spin_locked(&dlm->spinlock);
  359. list_for_each(iter, &dlm->mle_hb_events) {
  360. mle = list_entry(iter, struct dlm_master_list_entry,
  361. hb_events);
  362. if (node_up)
  363. dlm_mle_node_up(dlm, mle, NULL, idx);
  364. else
  365. dlm_mle_node_down(dlm, mle, NULL, idx);
  366. }
  367. }
  368. static void dlm_mle_node_down(struct dlm_ctxt *dlm,
  369. struct dlm_master_list_entry *mle,
  370. struct o2nm_node *node, int idx)
  371. {
  372. spin_lock(&mle->spinlock);
  373. if (!test_bit(idx, mle->node_map))
  374. mlog(0, "node %u already removed from nodemap!\n", idx);
  375. else
  376. clear_bit(idx, mle->node_map);
  377. spin_unlock(&mle->spinlock);
  378. }
  379. static void dlm_mle_node_up(struct dlm_ctxt *dlm,
  380. struct dlm_master_list_entry *mle,
  381. struct o2nm_node *node, int idx)
  382. {
  383. spin_lock(&mle->spinlock);
  384. if (test_bit(idx, mle->node_map))
  385. mlog(0, "node %u already in node map!\n", idx);
  386. else
  387. set_bit(idx, mle->node_map);
  388. spin_unlock(&mle->spinlock);
  389. }
  390. int dlm_init_mle_cache(void)
  391. {
  392. dlm_mle_cache = kmem_cache_create("dlm_mle_cache",
  393. sizeof(struct dlm_master_list_entry),
  394. 0, SLAB_HWCACHE_ALIGN,
  395. NULL, NULL);
  396. if (dlm_mle_cache == NULL)
  397. return -ENOMEM;
  398. return 0;
  399. }
  400. void dlm_destroy_mle_cache(void)
  401. {
  402. if (dlm_mle_cache)
  403. kmem_cache_destroy(dlm_mle_cache);
  404. }
  405. static void dlm_mle_release(struct kref *kref)
  406. {
  407. struct dlm_master_list_entry *mle;
  408. struct dlm_ctxt *dlm;
  409. mlog_entry_void();
  410. mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
  411. dlm = mle->dlm;
  412. if (mle->type != DLM_MLE_MASTER) {
  413. mlog(0, "calling mle_release for %.*s, type %d\n",
  414. mle->u.name.len, mle->u.name.name, mle->type);
  415. } else {
  416. mlog(0, "calling mle_release for %.*s, type %d\n",
  417. mle->u.res->lockname.len,
  418. mle->u.res->lockname.name, mle->type);
  419. }
  420. assert_spin_locked(&dlm->spinlock);
  421. assert_spin_locked(&dlm->master_lock);
  422. /* remove from list if not already */
  423. if (!list_empty(&mle->list))
  424. list_del_init(&mle->list);
  425. /* detach the mle from the domain node up/down events */
  426. __dlm_mle_detach_hb_events(dlm, mle);
  427. /* NOTE: kfree under spinlock here.
  428. * if this is bad, we can move this to a freelist. */
  429. kmem_cache_free(dlm_mle_cache, mle);
  430. }
  431. /*
  432. * LOCK RESOURCE FUNCTIONS
  433. */
  434. static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,
  435. struct dlm_lock_resource *res,
  436. u8 owner)
  437. {
  438. assert_spin_locked(&res->spinlock);
  439. mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner);
  440. if (owner == dlm->node_num)
  441. atomic_inc(&dlm->local_resources);
  442. else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN)
  443. atomic_inc(&dlm->unknown_resources);
  444. else
  445. atomic_inc(&dlm->remote_resources);
  446. res->owner = owner;
  447. }
  448. void dlm_change_lockres_owner(struct dlm_ctxt *dlm,
  449. struct dlm_lock_resource *res, u8 owner)
  450. {
  451. assert_spin_locked(&res->spinlock);
  452. if (owner == res->owner)
  453. return;
  454. if (res->owner == dlm->node_num)
  455. atomic_dec(&dlm->local_resources);
  456. else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN)
  457. atomic_dec(&dlm->unknown_resources);
  458. else
  459. atomic_dec(&dlm->remote_resources);
  460. dlm_set_lockres_owner(dlm, res, owner);
  461. }
  462. static void dlm_lockres_release(struct kref *kref)
  463. {
  464. struct dlm_lock_resource *res;
  465. res = container_of(kref, struct dlm_lock_resource, refs);
  466. /* This should not happen -- all lockres' have a name
  467. * associated with them at init time. */
  468. BUG_ON(!res->lockname.name);
  469. mlog(0, "destroying lockres %.*s\n", res->lockname.len,
  470. res->lockname.name);
  471. /* By the time we're ready to blow this guy away, we shouldn't
  472. * be on any lists. */
  473. BUG_ON(!list_empty(&res->list));
  474. BUG_ON(!list_empty(&res->granted));
  475. BUG_ON(!list_empty(&res->converting));
  476. BUG_ON(!list_empty(&res->blocked));
  477. BUG_ON(!list_empty(&res->dirty));
  478. BUG_ON(!list_empty(&res->recovering));
  479. BUG_ON(!list_empty(&res->purge));
  480. kfree(res->lockname.name);
  481. kfree(res);
  482. }
  483. void dlm_lockres_get(struct dlm_lock_resource *res)
  484. {
  485. kref_get(&res->refs);
  486. }
  487. void dlm_lockres_put(struct dlm_lock_resource *res)
  488. {
  489. kref_put(&res->refs, dlm_lockres_release);
  490. }
  491. static void dlm_init_lockres(struct dlm_ctxt *dlm,
  492. struct dlm_lock_resource *res,
  493. const char *name, unsigned int namelen)
  494. {
  495. char *qname;
  496. /* If we memset here, we lose our reference to the kmalloc'd
  497. * res->lockname.name, so be sure to init every field
  498. * correctly! */
  499. qname = (char *) res->lockname.name;
  500. memcpy(qname, name, namelen);
  501. res->lockname.len = namelen;
  502. res->lockname.hash = full_name_hash(name, namelen);
  503. init_waitqueue_head(&res->wq);
  504. spin_lock_init(&res->spinlock);
  505. INIT_LIST_HEAD(&res->list);
  506. INIT_LIST_HEAD(&res->granted);
  507. INIT_LIST_HEAD(&res->converting);
  508. INIT_LIST_HEAD(&res->blocked);
  509. INIT_LIST_HEAD(&res->dirty);
  510. INIT_LIST_HEAD(&res->recovering);
  511. INIT_LIST_HEAD(&res->purge);
  512. atomic_set(&res->asts_reserved, 0);
  513. res->migration_pending = 0;
  514. kref_init(&res->refs);
  515. /* just for consistency */
  516. spin_lock(&res->spinlock);
  517. dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
  518. spin_unlock(&res->spinlock);
  519. res->state = DLM_LOCK_RES_IN_PROGRESS;
  520. res->last_used = 0;
  521. memset(res->lvb, 0, DLM_LVB_LEN);
  522. }
  523. struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
  524. const char *name,
  525. unsigned int namelen)
  526. {
  527. struct dlm_lock_resource *res;
  528. res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
  529. if (!res)
  530. return NULL;
  531. res->lockname.name = kmalloc(namelen, GFP_KERNEL);
  532. if (!res->lockname.name) {
  533. kfree(res);
  534. return NULL;
  535. }
  536. dlm_init_lockres(dlm, res, name, namelen);
  537. return res;
  538. }
  539. /*
  540. * lookup a lock resource by name.
  541. * may already exist in the hashtable.
  542. * lockid is null terminated
  543. *
  544. * if not, allocate enough for the lockres and for
  545. * the temporary structure used in doing the mastering.
  546. *
  547. * also, do a lookup in the dlm->master_list to see
  548. * if another node has begun mastering the same lock.
  549. * if so, there should be a block entry in there
  550. * for this name, and we should *not* attempt to master
  551. * the lock here. need to wait around for that node
  552. * to assert_master (or die).
  553. *
  554. */
  555. struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
  556. const char *lockid,
  557. int flags)
  558. {
  559. struct dlm_lock_resource *tmpres=NULL, *res=NULL;
  560. struct dlm_master_list_entry *mle = NULL;
  561. struct dlm_master_list_entry *alloc_mle = NULL;
  562. int blocked = 0;
  563. int ret, nodenum;
  564. struct dlm_node_iter iter;
  565. unsigned int namelen;
  566. int tries = 0;
  567. BUG_ON(!lockid);
  568. namelen = strlen(lockid);
  569. mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
  570. lookup:
  571. spin_lock(&dlm->spinlock);
  572. tmpres = __dlm_lookup_lockres(dlm, lockid, namelen);
  573. if (tmpres) {
  574. spin_unlock(&dlm->spinlock);
  575. mlog(0, "found in hash!\n");
  576. if (res)
  577. dlm_lockres_put(res);
  578. res = tmpres;
  579. goto leave;
  580. }
  581. if (!res) {
  582. spin_unlock(&dlm->spinlock);
  583. mlog(0, "allocating a new resource\n");
  584. /* nothing found and we need to allocate one. */
  585. alloc_mle = (struct dlm_master_list_entry *)
  586. kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
  587. if (!alloc_mle)
  588. goto leave;
  589. res = dlm_new_lockres(dlm, lockid, namelen);
  590. if (!res)
  591. goto leave;
  592. goto lookup;
  593. }
  594. mlog(0, "no lockres found, allocated our own: %p\n", res);
  595. if (flags & LKM_LOCAL) {
  596. /* caller knows it's safe to assume it's not mastered elsewhere
  597. * DONE! return right away */
  598. spin_lock(&res->spinlock);
  599. dlm_change_lockres_owner(dlm, res, dlm->node_num);
  600. __dlm_insert_lockres(dlm, res);
  601. spin_unlock(&res->spinlock);
  602. spin_unlock(&dlm->spinlock);
  603. /* lockres still marked IN_PROGRESS */
  604. goto wake_waiters;
  605. }
  606. /* check master list to see if another node has started mastering it */
  607. spin_lock(&dlm->master_lock);
  608. /* if we found a block, wait for lock to be mastered by another node */
  609. blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
  610. if (blocked) {
  611. if (mle->type == DLM_MLE_MASTER) {
  612. mlog(ML_ERROR, "master entry for nonexistent lock!\n");
  613. BUG();
  614. } else if (mle->type == DLM_MLE_MIGRATION) {
  615. /* migration is in progress! */
  616. /* the good news is that we now know the
  617. * "current" master (mle->master). */
  618. spin_unlock(&dlm->master_lock);
  619. assert_spin_locked(&dlm->spinlock);
  620. /* set the lockres owner and hash it */
  621. spin_lock(&res->spinlock);
  622. dlm_set_lockres_owner(dlm, res, mle->master);
  623. __dlm_insert_lockres(dlm, res);
  624. spin_unlock(&res->spinlock);
  625. spin_unlock(&dlm->spinlock);
  626. /* master is known, detach */
  627. dlm_mle_detach_hb_events(dlm, mle);
  628. dlm_put_mle(mle);
  629. mle = NULL;
  630. goto wake_waiters;
  631. }
  632. } else {
  633. /* go ahead and try to master lock on this node */
  634. mle = alloc_mle;
  635. /* make sure this does not get freed below */
  636. alloc_mle = NULL;
  637. dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
  638. set_bit(dlm->node_num, mle->maybe_map);
  639. list_add(&mle->list, &dlm->master_list);
  640. }
  641. /* at this point there is either a DLM_MLE_BLOCK or a
  642. * DLM_MLE_MASTER on the master list, so it's safe to add the
  643. * lockres to the hashtable. anyone who finds the lock will
  644. * still have to wait on the IN_PROGRESS. */
  645. /* finally add the lockres to its hash bucket */
  646. __dlm_insert_lockres(dlm, res);
  647. /* get an extra ref on the mle in case this is a BLOCK
  648. * if so, the creator of the BLOCK may try to put the last
  649. * ref at this time in the assert master handler, so we
  650. * need an extra one to keep from a bad ptr deref. */
  651. dlm_get_mle(mle);
  652. spin_unlock(&dlm->master_lock);
  653. spin_unlock(&dlm->spinlock);
  654. /* must wait for lock to be mastered elsewhere */
  655. if (blocked)
  656. goto wait;
  657. redo_request:
  658. ret = -EINVAL;
  659. dlm_node_iter_init(mle->vote_map, &iter);
  660. while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
  661. ret = dlm_do_master_request(mle, nodenum);
  662. if (ret < 0)
  663. mlog_errno(ret);
  664. if (mle->master != O2NM_MAX_NODES) {
  665. /* found a master ! */
  666. break;
  667. }
  668. }
  669. wait:
  670. /* keep going until the response map includes all nodes */
  671. ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
  672. if (ret < 0) {
  673. mlog(0, "%s:%.*s: node map changed, redo the "
  674. "master request now, blocked=%d\n",
  675. dlm->name, res->lockname.len,
  676. res->lockname.name, blocked);
  677. if (++tries > 20) {
  678. mlog(ML_ERROR, "%s:%.*s: spinning on "
  679. "dlm_wait_for_lock_mastery, blocked=%d\n",
  680. dlm->name, res->lockname.len,
  681. res->lockname.name, blocked);
  682. dlm_print_one_lock_resource(res);
  683. /* dlm_print_one_mle(mle); */
  684. tries = 0;
  685. }
  686. goto redo_request;
  687. }
  688. mlog(0, "lockres mastered by %u\n", res->owner);
  689. /* make sure we never continue without this */
  690. BUG_ON(res->owner == O2NM_MAX_NODES);
  691. /* master is known, detach if not already detached */
  692. dlm_mle_detach_hb_events(dlm, mle);
  693. dlm_put_mle(mle);
  694. /* put the extra ref */
  695. dlm_put_mle(mle);
  696. wake_waiters:
  697. spin_lock(&res->spinlock);
  698. res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
  699. spin_unlock(&res->spinlock);
  700. wake_up(&res->wq);
  701. leave:
  702. /* need to free the unused mle */
  703. if (alloc_mle)
  704. kmem_cache_free(dlm_mle_cache, alloc_mle);
  705. return res;
  706. }
  707. #define DLM_MASTERY_TIMEOUT_MS 5000
  708. static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
  709. struct dlm_lock_resource *res,
  710. struct dlm_master_list_entry *mle,
  711. int *blocked)
  712. {
  713. u8 m;
  714. int ret, bit;
  715. int map_changed, voting_done;
  716. int assert, sleep;
  717. recheck:
  718. ret = 0;
  719. assert = 0;
  720. /* check if another node has already become the owner */
  721. spin_lock(&res->spinlock);
  722. if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
  723. spin_unlock(&res->spinlock);
  724. goto leave;
  725. }
  726. spin_unlock(&res->spinlock);
  727. spin_lock(&mle->spinlock);
  728. m = mle->master;
  729. map_changed = (memcmp(mle->vote_map, mle->node_map,
  730. sizeof(mle->vote_map)) != 0);
  731. voting_done = (memcmp(mle->vote_map, mle->response_map,
  732. sizeof(mle->vote_map)) == 0);
  733. /* restart if we hit any errors */
  734. if (map_changed) {
  735. int b;
  736. mlog(0, "%s: %.*s: node map changed, restarting\n",
  737. dlm->name, res->lockname.len, res->lockname.name);
  738. ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
  739. b = (mle->type == DLM_MLE_BLOCK);
  740. if ((*blocked && !b) || (!*blocked && b)) {
  741. mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
  742. dlm->name, res->lockname.len, res->lockname.name,
  743. *blocked, b);
  744. *blocked = b;
  745. }
  746. spin_unlock(&mle->spinlock);
  747. if (ret < 0) {
  748. mlog_errno(ret);
  749. goto leave;
  750. }
  751. mlog(0, "%s:%.*s: restart lock mastery succeeded, "
  752. "rechecking now\n", dlm->name, res->lockname.len,
  753. res->lockname.name);
  754. goto recheck;
  755. }
  756. if (m != O2NM_MAX_NODES) {
  757. /* another node has done an assert!
  758. * all done! */
  759. sleep = 0;
  760. } else {
  761. sleep = 1;
  762. /* have all nodes responded? */
  763. if (voting_done && !*blocked) {
  764. bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
  765. if (dlm->node_num <= bit) {
  766. /* my node number is lowest.
  767. * now tell other nodes that I am
  768. * mastering this. */
  769. mle->master = dlm->node_num;
  770. assert = 1;
  771. sleep = 0;
  772. }
  773. /* if voting is done, but we have not received
  774. * an assert master yet, we must sleep */
  775. }
  776. }
  777. spin_unlock(&mle->spinlock);
  778. /* sleep if we haven't finished voting yet */
  779. if (sleep) {
  780. unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
  781. /*
  782. if (atomic_read(&mle->mle_refs.refcount) < 2)
  783. mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
  784. atomic_read(&mle->mle_refs.refcount),
  785. res->lockname.len, res->lockname.name);
  786. */
  787. atomic_set(&mle->woken, 0);
  788. (void)wait_event_timeout(mle->wq,
  789. (atomic_read(&mle->woken) == 1),
  790. timeo);
  791. if (res->owner == O2NM_MAX_NODES) {
  792. mlog(0, "waiting again\n");
  793. goto recheck;
  794. }
  795. mlog(0, "done waiting, master is %u\n", res->owner);
  796. ret = 0;
  797. goto leave;
  798. }
  799. ret = 0; /* done */
  800. if (assert) {
  801. m = dlm->node_num;
  802. mlog(0, "about to master %.*s here, this=%u\n",
  803. res->lockname.len, res->lockname.name, m);
  804. ret = dlm_do_assert_master(dlm, res->lockname.name,
  805. res->lockname.len, mle->vote_map, 0);
  806. if (ret) {
  807. /* This is a failure in the network path,
  808. * not in the response to the assert_master
  809. * (any nonzero response is a BUG on this node).
  810. * Most likely a socket just got disconnected
  811. * due to node death. */
  812. mlog_errno(ret);
  813. }
  814. /* no longer need to restart lock mastery.
  815. * all living nodes have been contacted. */
  816. ret = 0;
  817. }
  818. /* set the lockres owner */
  819. spin_lock(&res->spinlock);
  820. dlm_change_lockres_owner(dlm, res, m);
  821. spin_unlock(&res->spinlock);
  822. leave:
  823. return ret;
  824. }
  825. struct dlm_bitmap_diff_iter
  826. {
  827. int curnode;
  828. unsigned long *orig_bm;
  829. unsigned long *cur_bm;
  830. unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
  831. };
  832. enum dlm_node_state_change
  833. {
  834. NODE_DOWN = -1,
  835. NODE_NO_CHANGE = 0,
  836. NODE_UP
  837. };
  838. static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
  839. unsigned long *orig_bm,
  840. unsigned long *cur_bm)
  841. {
  842. unsigned long p1, p2;
  843. int i;
  844. iter->curnode = -1;
  845. iter->orig_bm = orig_bm;
  846. iter->cur_bm = cur_bm;
  847. for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
  848. p1 = *(iter->orig_bm + i);
  849. p2 = *(iter->cur_bm + i);
  850. iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
  851. }
  852. }
  853. static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
  854. enum dlm_node_state_change *state)
  855. {
  856. int bit;
  857. if (iter->curnode >= O2NM_MAX_NODES)
  858. return -ENOENT;
  859. bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
  860. iter->curnode+1);
  861. if (bit >= O2NM_MAX_NODES) {
  862. iter->curnode = O2NM_MAX_NODES;
  863. return -ENOENT;
  864. }
  865. /* if it was there in the original then this node died */
  866. if (test_bit(bit, iter->orig_bm))
  867. *state = NODE_DOWN;
  868. else
  869. *state = NODE_UP;
  870. iter->curnode = bit;
  871. return bit;
  872. }
  873. static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
  874. struct dlm_lock_resource *res,
  875. struct dlm_master_list_entry *mle,
  876. int blocked)
  877. {
  878. struct dlm_bitmap_diff_iter bdi;
  879. enum dlm_node_state_change sc;
  880. int node;
  881. int ret = 0;
  882. mlog(0, "something happened such that the "
  883. "master process may need to be restarted!\n");
  884. assert_spin_locked(&mle->spinlock);
  885. dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
  886. node = dlm_bitmap_diff_iter_next(&bdi, &sc);
  887. while (node >= 0) {
  888. if (sc == NODE_UP) {
  889. /* a node came up. easy. might not even need
  890. * to talk to it if its node number is higher
  891. * or if we are already blocked. */
  892. mlog(0, "node up! %d\n", node);
  893. if (blocked)
  894. goto next;
  895. if (node > dlm->node_num) {
  896. mlog(0, "node > this node. skipping.\n");
  897. goto next;
  898. }
  899. /* redo the master request, but only for the new node */
  900. mlog(0, "sending request to new node\n");
  901. clear_bit(node, mle->response_map);
  902. set_bit(node, mle->vote_map);
  903. } else {
  904. mlog(ML_ERROR, "node down! %d\n", node);
  905. /* if the node wasn't involved in mastery skip it,
  906. * but clear it out from the maps so that it will
  907. * not affect mastery of this lockres */
  908. clear_bit(node, mle->response_map);
  909. clear_bit(node, mle->vote_map);
  910. if (!test_bit(node, mle->maybe_map))
  911. goto next;
  912. /* if we're already blocked on lock mastery, and the
  913. * dead node wasn't the expected master, or there is
  914. * another node in the maybe_map, keep waiting */
  915. if (blocked) {
  916. int lowest = find_next_bit(mle->maybe_map,
  917. O2NM_MAX_NODES, 0);
  918. /* act like it was never there */
  919. clear_bit(node, mle->maybe_map);
  920. if (node != lowest)
  921. goto next;
  922. mlog(ML_ERROR, "expected master %u died while "
  923. "this node was blocked waiting on it!\n",
  924. node);
  925. lowest = find_next_bit(mle->maybe_map,
  926. O2NM_MAX_NODES,
  927. lowest+1);
  928. if (lowest < O2NM_MAX_NODES) {
  929. mlog(0, "still blocked. waiting "
  930. "on %u now\n", lowest);
  931. goto next;
  932. }
  933. /* mle is an MLE_BLOCK, but there is now
  934. * nothing left to block on. we need to return
  935. * all the way back out and try again with
  936. * an MLE_MASTER. dlm_do_local_recovery_cleanup
  937. * has already run, so the mle refcount is ok */
  938. mlog(0, "no longer blocking. we can "
  939. "try to master this here\n");
  940. mle->type = DLM_MLE_MASTER;
  941. memset(mle->maybe_map, 0,
  942. sizeof(mle->maybe_map));
  943. memset(mle->response_map, 0,
  944. sizeof(mle->maybe_map));
  945. memcpy(mle->vote_map, mle->node_map,
  946. sizeof(mle->node_map));
  947. mle->u.res = res;
  948. set_bit(dlm->node_num, mle->maybe_map);
  949. ret = -EAGAIN;
  950. goto next;
  951. }
  952. clear_bit(node, mle->maybe_map);
  953. if (node > dlm->node_num)
  954. goto next;
  955. mlog(0, "dead node in map!\n");
  956. /* yuck. go back and re-contact all nodes
  957. * in the vote_map, removing this node. */
  958. memset(mle->response_map, 0,
  959. sizeof(mle->response_map));
  960. }
  961. ret = -EAGAIN;
  962. next:
  963. node = dlm_bitmap_diff_iter_next(&bdi, &sc);
  964. }
  965. return ret;
  966. }
  967. /*
  968. * DLM_MASTER_REQUEST_MSG
  969. *
  970. * returns: 0 on success,
  971. * -errno on a network error
  972. *
  973. * on error, the caller should assume the target node is "dead"
  974. *
  975. */
  976. static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
  977. {
  978. struct dlm_ctxt *dlm = mle->dlm;
  979. struct dlm_master_request request;
  980. int ret, response=0, resend;
  981. memset(&request, 0, sizeof(request));
  982. request.node_idx = dlm->node_num;
  983. BUG_ON(mle->type == DLM_MLE_MIGRATION);
  984. if (mle->type != DLM_MLE_MASTER) {
  985. request.namelen = mle->u.name.len;
  986. memcpy(request.name, mle->u.name.name, request.namelen);
  987. } else {
  988. request.namelen = mle->u.res->lockname.len;
  989. memcpy(request.name, mle->u.res->lockname.name,
  990. request.namelen);
  991. }
  992. again:
  993. ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
  994. sizeof(request), to, &response);
  995. if (ret < 0) {
  996. if (ret == -ESRCH) {
  997. /* should never happen */
  998. mlog(ML_ERROR, "TCP stack not ready!\n");
  999. BUG();
  1000. } else if (ret == -EINVAL) {
  1001. mlog(ML_ERROR, "bad args passed to o2net!\n");
  1002. BUG();
  1003. } else if (ret == -ENOMEM) {
  1004. mlog(ML_ERROR, "out of memory while trying to send "
  1005. "network message! retrying\n");
  1006. /* this is totally crude */
  1007. msleep(50);
  1008. goto again;
  1009. } else if (!dlm_is_host_down(ret)) {
  1010. /* not a network error. bad. */
  1011. mlog_errno(ret);
  1012. mlog(ML_ERROR, "unhandled error!");
  1013. BUG();
  1014. }
  1015. /* all other errors should be network errors,
  1016. * and likely indicate node death */
  1017. mlog(ML_ERROR, "link to %d went down!\n", to);
  1018. goto out;
  1019. }
  1020. ret = 0;
  1021. resend = 0;
  1022. spin_lock(&mle->spinlock);
  1023. switch (response) {
  1024. case DLM_MASTER_RESP_YES:
  1025. set_bit(to, mle->response_map);
  1026. mlog(0, "node %u is the master, response=YES\n", to);
  1027. mle->master = to;
  1028. break;
  1029. case DLM_MASTER_RESP_NO:
  1030. mlog(0, "node %u not master, response=NO\n", to);
  1031. set_bit(to, mle->response_map);
  1032. break;
  1033. case DLM_MASTER_RESP_MAYBE:
  1034. mlog(0, "node %u not master, response=MAYBE\n", to);
  1035. set_bit(to, mle->response_map);
  1036. set_bit(to, mle->maybe_map);
  1037. break;
  1038. case DLM_MASTER_RESP_ERROR:
  1039. mlog(0, "node %u hit an error, resending\n", to);
  1040. resend = 1;
  1041. response = 0;
  1042. break;
  1043. default:
  1044. mlog(ML_ERROR, "bad response! %u\n", response);
  1045. BUG();
  1046. }
  1047. spin_unlock(&mle->spinlock);
  1048. if (resend) {
  1049. /* this is also totally crude */
  1050. msleep(50);
  1051. goto again;
  1052. }
  1053. out:
  1054. return ret;
  1055. }
  1056. /*
  1057. * locks that can be taken here:
  1058. * dlm->spinlock
  1059. * res->spinlock
  1060. * mle->spinlock
  1061. * dlm->master_list
  1062. *
  1063. * if possible, TRIM THIS DOWN!!!
  1064. */
  1065. int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
  1066. {
  1067. u8 response = DLM_MASTER_RESP_MAYBE;
  1068. struct dlm_ctxt *dlm = data;
  1069. struct dlm_lock_resource *res;
  1070. struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
  1071. struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
  1072. char *name;
  1073. unsigned int namelen;
  1074. int found, ret;
  1075. int set_maybe;
  1076. if (!dlm_grab(dlm))
  1077. return DLM_MASTER_RESP_NO;
  1078. if (!dlm_domain_fully_joined(dlm)) {
  1079. response = DLM_MASTER_RESP_NO;
  1080. goto send_response;
  1081. }
  1082. name = request->name;
  1083. namelen = request->namelen;
  1084. if (namelen > DLM_LOCKID_NAME_MAX) {
  1085. response = DLM_IVBUFLEN;
  1086. goto send_response;
  1087. }
  1088. way_up_top:
  1089. spin_lock(&dlm->spinlock);
  1090. res = __dlm_lookup_lockres(dlm, name, namelen);
  1091. if (res) {
  1092. spin_unlock(&dlm->spinlock);
  1093. /* take care of the easy cases up front */
  1094. spin_lock(&res->spinlock);
  1095. if (res->state & DLM_LOCK_RES_RECOVERING) {
  1096. spin_unlock(&res->spinlock);
  1097. mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
  1098. "being recovered\n");
  1099. response = DLM_MASTER_RESP_ERROR;
  1100. if (mle)
  1101. kmem_cache_free(dlm_mle_cache, mle);
  1102. goto send_response;
  1103. }
  1104. if (res->owner == dlm->node_num) {
  1105. u32 flags = DLM_ASSERT_MASTER_MLE_CLEANUP;
  1106. spin_unlock(&res->spinlock);
  1107. // mlog(0, "this node is the master\n");
  1108. response = DLM_MASTER_RESP_YES;
  1109. if (mle)
  1110. kmem_cache_free(dlm_mle_cache, mle);
  1111. /* this node is the owner.
  1112. * there is some extra work that needs to
  1113. * happen now. the requesting node has
  1114. * caused all nodes up to this one to
  1115. * create mles. this node now needs to
  1116. * go back and clean those up. */
  1117. mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
  1118. dlm->node_num, res->lockname.len, res->lockname.name);
  1119. ret = dlm_dispatch_assert_master(dlm, res, 1,
  1120. request->node_idx,
  1121. flags);
  1122. if (ret < 0) {
  1123. mlog(ML_ERROR, "failed to dispatch assert "
  1124. "master work\n");
  1125. response = DLM_MASTER_RESP_ERROR;
  1126. }
  1127. goto send_response;
  1128. } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
  1129. spin_unlock(&res->spinlock);
  1130. // mlog(0, "node %u is the master\n", res->owner);
  1131. response = DLM_MASTER_RESP_NO;
  1132. if (mle)
  1133. kmem_cache_free(dlm_mle_cache, mle);
  1134. goto send_response;
  1135. }
  1136. /* ok, there is no owner. either this node is
  1137. * being blocked, or it is actively trying to
  1138. * master this lock. */
  1139. if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
  1140. mlog(ML_ERROR, "lock with no owner should be "
  1141. "in-progress!\n");
  1142. BUG();
  1143. }
  1144. // mlog(0, "lockres is in progress...\n");
  1145. spin_lock(&dlm->master_lock);
  1146. found = dlm_find_mle(dlm, &tmpmle, name, namelen);
  1147. if (!found) {
  1148. mlog(ML_ERROR, "no mle found for this lock!\n");
  1149. BUG();
  1150. }
  1151. set_maybe = 1;
  1152. spin_lock(&tmpmle->spinlock);
  1153. if (tmpmle->type == DLM_MLE_BLOCK) {
  1154. // mlog(0, "this node is waiting for "
  1155. // "lockres to be mastered\n");
  1156. response = DLM_MASTER_RESP_NO;
  1157. } else if (tmpmle->type == DLM_MLE_MIGRATION) {
  1158. mlog(0, "node %u is master, but trying to migrate to "
  1159. "node %u.\n", tmpmle->master, tmpmle->new_master);
  1160. if (tmpmle->master == dlm->node_num) {
  1161. response = DLM_MASTER_RESP_YES;
  1162. mlog(ML_ERROR, "no owner on lockres, but this "
  1163. "node is trying to migrate it to %u?!\n",
  1164. tmpmle->new_master);
  1165. BUG();
  1166. } else {
  1167. /* the real master can respond on its own */
  1168. response = DLM_MASTER_RESP_NO;
  1169. }
  1170. } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
  1171. set_maybe = 0;
  1172. if (tmpmle->master == dlm->node_num)
  1173. response = DLM_MASTER_RESP_YES;
  1174. else
  1175. response = DLM_MASTER_RESP_NO;
  1176. } else {
  1177. // mlog(0, "this node is attempting to "
  1178. // "master lockres\n");
  1179. response = DLM_MASTER_RESP_MAYBE;
  1180. }
  1181. if (set_maybe)
  1182. set_bit(request->node_idx, tmpmle->maybe_map);
  1183. spin_unlock(&tmpmle->spinlock);
  1184. spin_unlock(&dlm->master_lock);
  1185. spin_unlock(&res->spinlock);
  1186. /* keep the mle attached to heartbeat events */
  1187. dlm_put_mle(tmpmle);
  1188. if (mle)
  1189. kmem_cache_free(dlm_mle_cache, mle);
  1190. goto send_response;
  1191. }
  1192. /*
  1193. * lockres doesn't exist on this node
  1194. * if there is an MLE_BLOCK, return NO
  1195. * if there is an MLE_MASTER, return MAYBE
  1196. * otherwise, add an MLE_BLOCK, return NO
  1197. */
  1198. spin_lock(&dlm->master_lock);
  1199. found = dlm_find_mle(dlm, &tmpmle, name, namelen);
  1200. if (!found) {
  1201. /* this lockid has never been seen on this node yet */
  1202. // mlog(0, "no mle found\n");
  1203. if (!mle) {
  1204. spin_unlock(&dlm->master_lock);
  1205. spin_unlock(&dlm->spinlock);
  1206. mle = (struct dlm_master_list_entry *)
  1207. kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
  1208. if (!mle) {
  1209. // bad bad bad... this sucks.
  1210. response = DLM_MASTER_RESP_ERROR;
  1211. goto send_response;
  1212. }
  1213. spin_lock(&dlm->spinlock);
  1214. dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,
  1215. name, namelen);
  1216. spin_unlock(&dlm->spinlock);
  1217. goto way_up_top;
  1218. }
  1219. // mlog(0, "this is second time thru, already allocated, "
  1220. // "add the block.\n");
  1221. set_bit(request->node_idx, mle->maybe_map);
  1222. list_add(&mle->list, &dlm->master_list);
  1223. response = DLM_MASTER_RESP_NO;
  1224. } else {
  1225. // mlog(0, "mle was found\n");
  1226. set_maybe = 1;
  1227. spin_lock(&tmpmle->spinlock);
  1228. if (tmpmle->type == DLM_MLE_BLOCK)
  1229. response = DLM_MASTER_RESP_NO;
  1230. else if (tmpmle->type == DLM_MLE_MIGRATION) {
  1231. mlog(0, "migration mle was found (%u->%u)\n",
  1232. tmpmle->master, tmpmle->new_master);
  1233. if (tmpmle->master == dlm->node_num) {
  1234. mlog(ML_ERROR, "no lockres, but migration mle "
  1235. "says that this node is master!\n");
  1236. BUG();
  1237. }
  1238. /* real master can respond on its own */
  1239. response = DLM_MASTER_RESP_NO;
  1240. } else {
  1241. if (tmpmle->master == dlm->node_num) {
  1242. response = DLM_MASTER_RESP_YES;
  1243. set_maybe = 0;
  1244. } else
  1245. response = DLM_MASTER_RESP_MAYBE;
  1246. }
  1247. if (set_maybe)
  1248. set_bit(request->node_idx, tmpmle->maybe_map);
  1249. spin_unlock(&tmpmle->spinlock);
  1250. }
  1251. spin_unlock(&dlm->master_lock);
  1252. spin_unlock(&dlm->spinlock);
  1253. if (found) {
  1254. /* keep the mle attached to heartbeat events */
  1255. dlm_put_mle(tmpmle);
  1256. }
  1257. send_response:
  1258. dlm_put(dlm);
  1259. return response;
  1260. }
  1261. /*
  1262. * DLM_ASSERT_MASTER_MSG
  1263. */
  1264. /*
  1265. * NOTE: this can be used for debugging
  1266. * can periodically run all locks owned by this node
  1267. * and re-assert across the cluster...
  1268. */
  1269. static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
  1270. unsigned int namelen, void *nodemap,
  1271. u32 flags)
  1272. {
  1273. struct dlm_assert_master assert;
  1274. int to, tmpret;
  1275. struct dlm_node_iter iter;
  1276. int ret = 0;
  1277. BUG_ON(namelen > O2NM_MAX_NAME_LEN);
  1278. /* note that if this nodemap is empty, it returns 0 */
  1279. dlm_node_iter_init(nodemap, &iter);
  1280. while ((to = dlm_node_iter_next(&iter)) >= 0) {
  1281. int r = 0;
  1282. mlog(0, "sending assert master to %d (%.*s)\n", to,
  1283. namelen, lockname);
  1284. memset(&assert, 0, sizeof(assert));
  1285. assert.node_idx = dlm->node_num;
  1286. assert.namelen = namelen;
  1287. memcpy(assert.name, lockname, namelen);
  1288. assert.flags = cpu_to_be32(flags);
  1289. tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
  1290. &assert, sizeof(assert), to, &r);
  1291. if (tmpret < 0) {
  1292. mlog(ML_ERROR, "assert_master returned %d!\n", tmpret);
  1293. if (!dlm_is_host_down(tmpret)) {
  1294. mlog(ML_ERROR, "unhandled error!\n");
  1295. BUG();
  1296. }
  1297. /* a node died. finish out the rest of the nodes. */
  1298. mlog(ML_ERROR, "link to %d went down!\n", to);
  1299. /* any nonzero status return will do */
  1300. ret = tmpret;
  1301. } else if (r < 0) {
  1302. /* ok, something horribly messed. kill thyself. */
  1303. mlog(ML_ERROR,"during assert master of %.*s to %u, "
  1304. "got %d.\n", namelen, lockname, to, r);
  1305. dlm_dump_lock_resources(dlm);
  1306. BUG();
  1307. }
  1308. }
  1309. return ret;
  1310. }
  1311. /*
  1312. * locks that can be taken here:
  1313. * dlm->spinlock
  1314. * res->spinlock
  1315. * mle->spinlock
  1316. * dlm->master_list
  1317. *
  1318. * if possible, TRIM THIS DOWN!!!
  1319. */
  1320. int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
  1321. {
  1322. struct dlm_ctxt *dlm = data;
  1323. struct dlm_master_list_entry *mle = NULL;
  1324. struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
  1325. struct dlm_lock_resource *res = NULL;
  1326. char *name;
  1327. unsigned int namelen;
  1328. u32 flags;
  1329. if (!dlm_grab(dlm))
  1330. return 0;
  1331. name = assert->name;
  1332. namelen = assert->namelen;
  1333. flags = be32_to_cpu(assert->flags);
  1334. if (namelen > DLM_LOCKID_NAME_MAX) {
  1335. mlog(ML_ERROR, "Invalid name length!");
  1336. goto done;
  1337. }
  1338. spin_lock(&dlm->spinlock);
  1339. if (flags)
  1340. mlog(0, "assert_master with flags: %u\n", flags);
  1341. /* find the MLE */
  1342. spin_lock(&dlm->master_lock);
  1343. if (!dlm_find_mle(dlm, &mle, name, namelen)) {
  1344. /* not an error, could be master just re-asserting */
  1345. mlog(0, "just got an assert_master from %u, but no "
  1346. "MLE for it! (%.*s)\n", assert->node_idx,
  1347. namelen, name);
  1348. } else {
  1349. int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
  1350. if (bit >= O2NM_MAX_NODES) {
  1351. /* not necessarily an error, though less likely.
  1352. * could be master just re-asserting. */
  1353. mlog(ML_ERROR, "no bits set in the maybe_map, but %u "
  1354. "is asserting! (%.*s)\n", assert->node_idx,
  1355. namelen, name);
  1356. } else if (bit != assert->node_idx) {
  1357. if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
  1358. mlog(0, "master %u was found, %u should "
  1359. "back off\n", assert->node_idx, bit);
  1360. } else {
  1361. /* with the fix for bug 569, a higher node
  1362. * number winning the mastery will respond
  1363. * YES to mastery requests, but this node
  1364. * had no way of knowing. let it pass. */
  1365. mlog(ML_ERROR, "%u is the lowest node, "
  1366. "%u is asserting. (%.*s) %u must "
  1367. "have begun after %u won.\n", bit,
  1368. assert->node_idx, namelen, name, bit,
  1369. assert->node_idx);
  1370. }
  1371. }
  1372. }
  1373. spin_unlock(&dlm->master_lock);
  1374. /* ok everything checks out with the MLE
  1375. * now check to see if there is a lockres */
  1376. res = __dlm_lookup_lockres(dlm, name, namelen);
  1377. if (res) {
  1378. spin_lock(&res->spinlock);
  1379. if (res->state & DLM_LOCK_RES_RECOVERING) {
  1380. mlog(ML_ERROR, "%u asserting but %.*s is "
  1381. "RECOVERING!\n", assert->node_idx, namelen, name);
  1382. goto kill;
  1383. }
  1384. if (!mle) {
  1385. if (res->owner != assert->node_idx) {
  1386. mlog(ML_ERROR, "assert_master from "
  1387. "%u, but current owner is "
  1388. "%u! (%.*s)\n",
  1389. assert->node_idx, res->owner,
  1390. namelen, name);
  1391. goto kill;
  1392. }
  1393. } else if (mle->type != DLM_MLE_MIGRATION) {
  1394. if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
  1395. /* owner is just re-asserting */
  1396. if (res->owner == assert->node_idx) {
  1397. mlog(0, "owner %u re-asserting on "
  1398. "lock %.*s\n", assert->node_idx,
  1399. namelen, name);
  1400. goto ok;
  1401. }
  1402. mlog(ML_ERROR, "got assert_master from "
  1403. "node %u, but %u is the owner! "
  1404. "(%.*s)\n", assert->node_idx,
  1405. res->owner, namelen, name);
  1406. goto kill;
  1407. }
  1408. if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
  1409. mlog(ML_ERROR, "got assert from %u, but lock "
  1410. "with no owner should be "
  1411. "in-progress! (%.*s)\n",
  1412. assert->node_idx,
  1413. namelen, name);
  1414. goto kill;
  1415. }
  1416. } else /* mle->type == DLM_MLE_MIGRATION */ {
  1417. /* should only be getting an assert from new master */
  1418. if (assert->node_idx != mle->new_master) {
  1419. mlog(ML_ERROR, "got assert from %u, but "
  1420. "new master is %u, and old master "
  1421. "was %u (%.*s)\n",
  1422. assert->node_idx, mle->new_master,
  1423. mle->master, namelen, name);
  1424. goto kill;
  1425. }
  1426. }
  1427. ok:
  1428. spin_unlock(&res->spinlock);
  1429. }
  1430. spin_unlock(&dlm->spinlock);
  1431. // mlog(0, "woo! got an assert_master from node %u!\n",
  1432. // assert->node_idx);
  1433. if (mle) {
  1434. int extra_ref;
  1435. spin_lock(&mle->spinlock);
  1436. extra_ref = !!(mle->type == DLM_MLE_BLOCK
  1437. || mle->type == DLM_MLE_MIGRATION);
  1438. mle->master = assert->node_idx;
  1439. atomic_set(&mle->woken, 1);
  1440. wake_up(&mle->wq);
  1441. spin_unlock(&mle->spinlock);
  1442. if (mle->type == DLM_MLE_MIGRATION && res) {
  1443. mlog(0, "finishing off migration of lockres %.*s, "
  1444. "from %u to %u\n",
  1445. res->lockname.len, res->lockname.name,
  1446. dlm->node_num, mle->new_master);
  1447. spin_lock(&res->spinlock);
  1448. res->state &= ~DLM_LOCK_RES_MIGRATING;
  1449. dlm_change_lockres_owner(dlm, res, mle->new_master);
  1450. BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
  1451. spin_unlock(&res->spinlock);
  1452. }
  1453. /* master is known, detach if not already detached */
  1454. dlm_mle_detach_hb_events(dlm, mle);
  1455. dlm_put_mle(mle);
  1456. if (extra_ref) {
  1457. /* the assert master message now balances the extra
  1458. * ref given by the master / migration request message.
  1459. * if this is the last put, it will be removed
  1460. * from the list. */
  1461. dlm_put_mle(mle);
  1462. }
  1463. }
  1464. done:
  1465. if (res)
  1466. dlm_lockres_put(res);
  1467. dlm_put(dlm);
  1468. return 0;
  1469. kill:
  1470. /* kill the caller! */
  1471. spin_unlock(&res->spinlock);
  1472. spin_unlock(&dlm->spinlock);
  1473. dlm_lockres_put(res);
  1474. mlog(ML_ERROR, "Bad message received from another node. Dumping state "
  1475. "and killing the other node now! This node is OK and can continue.\n");
  1476. dlm_dump_lock_resources(dlm);
  1477. dlm_put(dlm);
  1478. return -EINVAL;
  1479. }
  1480. int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
  1481. struct dlm_lock_resource *res,
  1482. int ignore_higher, u8 request_from, u32 flags)
  1483. {
  1484. struct dlm_work_item *item;
  1485. item = kcalloc(1, sizeof(*item), GFP_KERNEL);
  1486. if (!item)
  1487. return -ENOMEM;
  1488. /* queue up work for dlm_assert_master_worker */
  1489. dlm_grab(dlm); /* get an extra ref for the work item */
  1490. dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
  1491. item->u.am.lockres = res; /* already have a ref */
  1492. /* can optionally ignore node numbers higher than this node */
  1493. item->u.am.ignore_higher = ignore_higher;
  1494. item->u.am.request_from = request_from;
  1495. item->u.am.flags = flags;
  1496. spin_lock(&dlm->work_lock);
  1497. list_add_tail(&item->list, &dlm->work_list);
  1498. spin_unlock(&dlm->work_lock);
  1499. schedule_work(&dlm->dispatched_work);
  1500. return 0;
  1501. }
  1502. static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
  1503. {
  1504. struct dlm_ctxt *dlm = data;
  1505. int ret = 0;
  1506. struct dlm_lock_resource *res;
  1507. unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
  1508. int ignore_higher;
  1509. int bit;
  1510. u8 request_from;
  1511. u32 flags;
  1512. dlm = item->dlm;
  1513. res = item->u.am.lockres;
  1514. ignore_higher = item->u.am.ignore_higher;
  1515. request_from = item->u.am.request_from;
  1516. flags = item->u.am.flags;
  1517. spin_lock(&dlm->spinlock);
  1518. memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
  1519. spin_unlock(&dlm->spinlock);
  1520. clear_bit(dlm->node_num, nodemap);
  1521. if (ignore_higher) {
  1522. /* if is this just to clear up mles for nodes below
  1523. * this node, do not send the message to the original
  1524. * caller or any node number higher than this */
  1525. clear_bit(request_from, nodemap);
  1526. bit = dlm->node_num;
  1527. while (1) {
  1528. bit = find_next_bit(nodemap, O2NM_MAX_NODES,
  1529. bit+1);
  1530. if (bit >= O2NM_MAX_NODES)
  1531. break;
  1532. clear_bit(bit, nodemap);
  1533. }
  1534. }
  1535. /* this call now finishes out the nodemap
  1536. * even if one or more nodes die */
  1537. mlog(0, "worker about to master %.*s here, this=%u\n",
  1538. res->lockname.len, res->lockname.name, dlm->node_num);
  1539. ret = dlm_do_assert_master(dlm, res->lockname.name,
  1540. res->lockname.len,
  1541. nodemap, flags);
  1542. if (ret < 0) {
  1543. /* no need to restart, we are done */
  1544. mlog_errno(ret);
  1545. }
  1546. dlm_lockres_put(res);
  1547. mlog(0, "finished with dlm_assert_master_worker\n");
  1548. }
  1549. /*
  1550. * DLM_MIGRATE_LOCKRES
  1551. */
  1552. int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
  1553. u8 target)
  1554. {
  1555. struct dlm_master_list_entry *mle = NULL;
  1556. struct dlm_master_list_entry *oldmle = NULL;
  1557. struct dlm_migratable_lockres *mres = NULL;
  1558. int ret = -EINVAL;
  1559. const char *name;
  1560. unsigned int namelen;
  1561. int mle_added = 0;
  1562. struct list_head *queue, *iter;
  1563. int i;
  1564. struct dlm_lock *lock;
  1565. int empty = 1;
  1566. if (!dlm_grab(dlm))
  1567. return -EINVAL;
  1568. name = res->lockname.name;
  1569. namelen = res->lockname.len;
  1570. mlog(0, "migrating %.*s to %u\n", namelen, name, target);
  1571. /*
  1572. * ensure this lockres is a proper candidate for migration
  1573. */
  1574. spin_lock(&res->spinlock);
  1575. if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
  1576. mlog(0, "cannot migrate lockres with unknown owner!\n");
  1577. spin_unlock(&res->spinlock);
  1578. goto leave;
  1579. }
  1580. if (res->owner != dlm->node_num) {
  1581. mlog(0, "cannot migrate lockres this node doesn't own!\n");
  1582. spin_unlock(&res->spinlock);
  1583. goto leave;
  1584. }
  1585. mlog(0, "checking queues...\n");
  1586. queue = &res->granted;
  1587. for (i=0; i<3; i++) {
  1588. list_for_each(iter, queue) {
  1589. lock = list_entry (iter, struct dlm_lock, list);
  1590. empty = 0;
  1591. if (lock->ml.node == dlm->node_num) {
  1592. mlog(0, "found a lock owned by this node "
  1593. "still on the %s queue! will not "
  1594. "migrate this lockres\n",
  1595. i==0 ? "granted" :
  1596. (i==1 ? "converting" : "blocked"));
  1597. spin_unlock(&res->spinlock);
  1598. ret = -ENOTEMPTY;
  1599. goto leave;
  1600. }
  1601. }
  1602. queue++;
  1603. }
  1604. mlog(0, "all locks on this lockres are nonlocal. continuing\n");
  1605. spin_unlock(&res->spinlock);
  1606. /* no work to do */
  1607. if (empty) {
  1608. mlog(0, "no locks were found on this lockres! done!\n");
  1609. ret = 0;
  1610. goto leave;
  1611. }
  1612. /*
  1613. * preallocate up front
  1614. * if this fails, abort
  1615. */
  1616. ret = -ENOMEM;
  1617. mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL);
  1618. if (!mres) {
  1619. mlog_errno(ret);
  1620. goto leave;
  1621. }
  1622. mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
  1623. GFP_KERNEL);
  1624. if (!mle) {
  1625. mlog_errno(ret);
  1626. goto leave;
  1627. }
  1628. ret = 0;
  1629. /*
  1630. * find a node to migrate the lockres to
  1631. */
  1632. mlog(0, "picking a migration node\n");
  1633. spin_lock(&dlm->spinlock);
  1634. /* pick a new node */
  1635. if (!test_bit(target, dlm->domain_map) ||
  1636. target >= O2NM_MAX_NODES) {
  1637. target = dlm_pick_migration_target(dlm, res);
  1638. }
  1639. mlog(0, "node %u chosen for migration\n", target);
  1640. if (target >= O2NM_MAX_NODES ||
  1641. !test_bit(target, dlm->domain_map)) {
  1642. /* target chosen is not alive */
  1643. ret = -EINVAL;
  1644. }
  1645. if (ret) {
  1646. spin_unlock(&dlm->spinlock);
  1647. goto fail;
  1648. }
  1649. mlog(0, "continuing with target = %u\n", target);
  1650. /*
  1651. * clear any existing master requests and
  1652. * add the migration mle to the list
  1653. */
  1654. spin_lock(&dlm->master_lock);
  1655. ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
  1656. namelen, target, dlm->node_num);
  1657. spin_unlock(&dlm->master_lock);
  1658. spin_unlock(&dlm->spinlock);
  1659. if (ret == -EEXIST) {
  1660. mlog(0, "another process is already migrating it\n");
  1661. goto fail;
  1662. }
  1663. mle_added = 1;
  1664. /*
  1665. * set the MIGRATING flag and flush asts
  1666. * if we fail after this we need to re-dirty the lockres
  1667. */
  1668. if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
  1669. mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
  1670. "the target went down.\n", res->lockname.len,
  1671. res->lockname.name, target);
  1672. spin_lock(&res->spinlock);
  1673. res->state &= ~DLM_LOCK_RES_MIGRATING;
  1674. spin_unlock(&res->spinlock);
  1675. ret = -EINVAL;
  1676. }
  1677. fail:
  1678. if (oldmle) {
  1679. /* master is known, detach if not already detached */
  1680. dlm_mle_detach_hb_events(dlm, oldmle);
  1681. dlm_put_mle(oldmle);
  1682. }
  1683. if (ret < 0) {
  1684. if (mle_added) {
  1685. dlm_mle_detach_hb_events(dlm, mle);
  1686. dlm_put_mle(mle);
  1687. } else if (mle) {
  1688. kmem_cache_free(dlm_mle_cache, mle);
  1689. }
  1690. goto leave;
  1691. }
  1692. /*
  1693. * at this point, we have a migration target, an mle
  1694. * in the master list, and the MIGRATING flag set on
  1695. * the lockres
  1696. */
  1697. /* get an extra reference on the mle.
  1698. * otherwise the assert_master from the new
  1699. * master will destroy this.
  1700. * also, make sure that all callers of dlm_get_mle
  1701. * take both dlm->spinlock and dlm->master_lock */
  1702. spin_lock(&dlm->spinlock);
  1703. spin_lock(&dlm->master_lock);
  1704. dlm_get_mle(mle);
  1705. spin_unlock(&dlm->master_lock);
  1706. spin_unlock(&dlm->spinlock);
  1707. /* notify new node and send all lock state */
  1708. /* call send_one_lockres with migration flag.
  1709. * this serves as notice to the target node that a
  1710. * migration is starting. */
  1711. ret = dlm_send_one_lockres(dlm, res, mres, target,
  1712. DLM_MRES_MIGRATION);
  1713. if (ret < 0) {
  1714. mlog(0, "migration to node %u failed with %d\n",
  1715. target, ret);
  1716. /* migration failed, detach and clean up mle */
  1717. dlm_mle_detach_hb_events(dlm, mle);
  1718. dlm_put_mle(mle);
  1719. dlm_put_mle(mle);
  1720. goto leave;
  1721. }
  1722. /* at this point, the target sends a message to all nodes,
  1723. * (using dlm_do_migrate_request). this node is skipped since
  1724. * we had to put an mle in the list to begin the process. this
  1725. * node now waits for target to do an assert master. this node
  1726. * will be the last one notified, ensuring that the migration
  1727. * is complete everywhere. if the target dies while this is
  1728. * going on, some nodes could potentially see the target as the
  1729. * master, so it is important that my recovery finds the migration
  1730. * mle and sets the master to UNKNONWN. */
  1731. /* wait for new node to assert master */
  1732. while (1) {
  1733. ret = wait_event_interruptible_timeout(mle->wq,
  1734. (atomic_read(&mle->woken) == 1),
  1735. msecs_to_jiffies(5000));
  1736. if (ret >= 0) {
  1737. if (atomic_read(&mle->woken) == 1 ||
  1738. res->owner == target)
  1739. break;
  1740. mlog(0, "timed out during migration\n");
  1741. }
  1742. if (ret == -ERESTARTSYS) {
  1743. /* migration failed, detach and clean up mle */
  1744. dlm_mle_detach_hb_events(dlm, mle);
  1745. dlm_put_mle(mle);
  1746. dlm_put_mle(mle);
  1747. goto leave;
  1748. }
  1749. /* TODO: if node died: stop, clean up, return error */
  1750. }
  1751. /* all done, set the owner, clear the flag */
  1752. spin_lock(&res->spinlock);
  1753. dlm_set_lockres_owner(dlm, res, target);
  1754. res->state &= ~DLM_LOCK_RES_MIGRATING;
  1755. dlm_remove_nonlocal_locks(dlm, res);
  1756. spin_unlock(&res->spinlock);
  1757. wake_up(&res->wq);
  1758. /* master is known, detach if not already detached */
  1759. dlm_mle_detach_hb_events(dlm, mle);
  1760. dlm_put_mle(mle);
  1761. ret = 0;
  1762. dlm_lockres_calc_usage(dlm, res);
  1763. leave:
  1764. /* re-dirty the lockres if we failed */
  1765. if (ret < 0)
  1766. dlm_kick_thread(dlm, res);
  1767. /* TODO: cleanup */
  1768. if (mres)
  1769. free_page((unsigned long)mres);
  1770. dlm_put(dlm);
  1771. mlog(0, "returning %d\n", ret);
  1772. return ret;
  1773. }
  1774. EXPORT_SYMBOL_GPL(dlm_migrate_lockres);
  1775. int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
  1776. {
  1777. int ret;
  1778. spin_lock(&dlm->ast_lock);
  1779. spin_lock(&lock->spinlock);
  1780. ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
  1781. spin_unlock(&lock->spinlock);
  1782. spin_unlock(&dlm->ast_lock);
  1783. return ret;
  1784. }
  1785. static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
  1786. struct dlm_lock_resource *res,
  1787. u8 mig_target)
  1788. {
  1789. int can_proceed;
  1790. spin_lock(&res->spinlock);
  1791. can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
  1792. spin_unlock(&res->spinlock);
  1793. /* target has died, so make the caller break out of the
  1794. * wait_event, but caller must recheck the domain_map */
  1795. spin_lock(&dlm->spinlock);
  1796. if (!test_bit(mig_target, dlm->domain_map))
  1797. can_proceed = 1;
  1798. spin_unlock(&dlm->spinlock);
  1799. return can_proceed;
  1800. }
  1801. int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
  1802. {
  1803. int ret;
  1804. spin_lock(&res->spinlock);
  1805. ret = !!(res->state & DLM_LOCK_RES_DIRTY);
  1806. spin_unlock(&res->spinlock);
  1807. return ret;
  1808. }
  1809. static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
  1810. struct dlm_lock_resource *res,
  1811. u8 target)
  1812. {
  1813. int ret = 0;
  1814. mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
  1815. res->lockname.len, res->lockname.name, dlm->node_num,
  1816. target);
  1817. /* need to set MIGRATING flag on lockres. this is done by
  1818. * ensuring that all asts have been flushed for this lockres. */
  1819. spin_lock(&res->spinlock);
  1820. BUG_ON(res->migration_pending);
  1821. res->migration_pending = 1;
  1822. /* strategy is to reserve an extra ast then release
  1823. * it below, letting the release do all of the work */
  1824. __dlm_lockres_reserve_ast(res);
  1825. spin_unlock(&res->spinlock);
  1826. /* now flush all the pending asts.. hang out for a bit */
  1827. dlm_kick_thread(dlm, res);
  1828. wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
  1829. dlm_lockres_release_ast(dlm, res);
  1830. mlog(0, "about to wait on migration_wq, dirty=%s\n",
  1831. res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
  1832. /* if the extra ref we just put was the final one, this
  1833. * will pass thru immediately. otherwise, we need to wait
  1834. * for the last ast to finish. */
  1835. again:
  1836. ret = wait_event_interruptible_timeout(dlm->migration_wq,
  1837. dlm_migration_can_proceed(dlm, res, target),
  1838. msecs_to_jiffies(1000));
  1839. if (ret < 0) {
  1840. mlog(0, "woken again: migrating? %s, dead? %s\n",
  1841. res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
  1842. test_bit(target, dlm->domain_map) ? "no":"yes");
  1843. } else {
  1844. mlog(0, "all is well: migrating? %s, dead? %s\n",
  1845. res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
  1846. test_bit(target, dlm->domain_map) ? "no":"yes");
  1847. }
  1848. if (!dlm_migration_can_proceed(dlm, res, target)) {
  1849. mlog(0, "trying again...\n");
  1850. goto again;
  1851. }
  1852. /* did the target go down or die? */
  1853. spin_lock(&dlm->spinlock);
  1854. if (!test_bit(target, dlm->domain_map)) {
  1855. mlog(ML_ERROR, "aha. migration target %u just went down\n",
  1856. target);
  1857. ret = -EHOSTDOWN;
  1858. }
  1859. spin_unlock(&dlm->spinlock);
  1860. /*
  1861. * at this point:
  1862. *
  1863. * o the DLM_LOCK_RES_MIGRATING flag is set
  1864. * o there are no pending asts on this lockres
  1865. * o all processes trying to reserve an ast on this
  1866. * lockres must wait for the MIGRATING flag to clear
  1867. */
  1868. return ret;
  1869. }
  1870. /* last step in the migration process.
  1871. * original master calls this to free all of the dlm_lock
  1872. * structures that used to be for other nodes. */
  1873. static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
  1874. struct dlm_lock_resource *res)
  1875. {
  1876. struct list_head *iter, *iter2;
  1877. struct list_head *queue = &res->granted;
  1878. int i;
  1879. struct dlm_lock *lock;
  1880. assert_spin_locked(&res->spinlock);
  1881. BUG_ON(res->owner == dlm->node_num);
  1882. for (i=0; i<3; i++) {
  1883. list_for_each_safe(iter, iter2, queue) {
  1884. lock = list_entry (iter, struct dlm_lock, list);
  1885. if (lock->ml.node != dlm->node_num) {
  1886. mlog(0, "putting lock for node %u\n",
  1887. lock->ml.node);
  1888. /* be extra careful */
  1889. BUG_ON(!list_empty(&lock->ast_list));
  1890. BUG_ON(!list_empty(&lock->bast_list));
  1891. BUG_ON(lock->ast_pending);
  1892. BUG_ON(lock->bast_pending);
  1893. list_del_init(&lock->list);
  1894. dlm_lock_put(lock);
  1895. }
  1896. }
  1897. queue++;
  1898. }
  1899. }
  1900. /* for now this is not too intelligent. we will
  1901. * need stats to make this do the right thing.
  1902. * this just finds the first lock on one of the
  1903. * queues and uses that node as the target. */
  1904. static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
  1905. struct dlm_lock_resource *res)
  1906. {
  1907. int i;
  1908. struct list_head *queue = &res->granted;
  1909. struct list_head *iter;
  1910. struct dlm_lock *lock;
  1911. int nodenum;
  1912. assert_spin_locked(&dlm->spinlock);
  1913. spin_lock(&res->spinlock);
  1914. for (i=0; i<3; i++) {
  1915. list_for_each(iter, queue) {
  1916. /* up to the caller to make sure this node
  1917. * is alive */
  1918. lock = list_entry (iter, struct dlm_lock, list);
  1919. if (lock->ml.node != dlm->node_num) {
  1920. spin_unlock(&res->spinlock);
  1921. return lock->ml.node;
  1922. }
  1923. }
  1924. queue++;
  1925. }
  1926. spin_unlock(&res->spinlock);
  1927. mlog(0, "have not found a suitable target yet! checking domain map\n");
  1928. /* ok now we're getting desperate. pick anyone alive. */
  1929. nodenum = -1;
  1930. while (1) {
  1931. nodenum = find_next_bit(dlm->domain_map,
  1932. O2NM_MAX_NODES, nodenum+1);
  1933. mlog(0, "found %d in domain map\n", nodenum);
  1934. if (nodenum >= O2NM_MAX_NODES)
  1935. break;
  1936. if (nodenum != dlm->node_num) {
  1937. mlog(0, "picking %d\n", nodenum);
  1938. return nodenum;
  1939. }
  1940. }
  1941. mlog(0, "giving up. no master to migrate to\n");
  1942. return DLM_LOCK_RES_OWNER_UNKNOWN;
  1943. }
  1944. /* this is called by the new master once all lockres
  1945. * data has been received */
  1946. static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
  1947. struct dlm_lock_resource *res,
  1948. u8 master, u8 new_master,
  1949. struct dlm_node_iter *iter)
  1950. {
  1951. struct dlm_migrate_request migrate;
  1952. int ret, status = 0;
  1953. int nodenum;
  1954. memset(&migrate, 0, sizeof(migrate));
  1955. migrate.namelen = res->lockname.len;
  1956. memcpy(migrate.name, res->lockname.name, migrate.namelen);
  1957. migrate.new_master = new_master;
  1958. migrate.master = master;
  1959. ret = 0;
  1960. /* send message to all nodes, except the master and myself */
  1961. while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
  1962. if (nodenum == master ||
  1963. nodenum == new_master)
  1964. continue;
  1965. ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
  1966. &migrate, sizeof(migrate), nodenum,
  1967. &status);
  1968. if (ret < 0)
  1969. mlog_errno(ret);
  1970. else if (status < 0) {
  1971. mlog(0, "migrate request (node %u) returned %d!\n",
  1972. nodenum, status);
  1973. ret = status;
  1974. }
  1975. }
  1976. if (ret < 0)
  1977. mlog_errno(ret);
  1978. mlog(0, "returning ret=%d\n", ret);
  1979. return ret;
  1980. }
  1981. /* if there is an existing mle for this lockres, we now know who the master is.
  1982. * (the one who sent us *this* message) we can clear it up right away.
  1983. * since the process that put the mle on the list still has a reference to it,
  1984. * we can unhash it now, set the master and wake the process. as a result,
  1985. * we will have no mle in the list to start with. now we can add an mle for
  1986. * the migration and this should be the only one found for those scanning the
  1987. * list. */
  1988. int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
  1989. {
  1990. struct dlm_ctxt *dlm = data;
  1991. struct dlm_lock_resource *res = NULL;
  1992. struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
  1993. struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
  1994. const char *name;
  1995. unsigned int namelen;
  1996. int ret = 0;
  1997. if (!dlm_grab(dlm))
  1998. return -EINVAL;
  1999. name = migrate->name;
  2000. namelen = migrate->namelen;
  2001. /* preallocate.. if this fails, abort */
  2002. mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
  2003. GFP_KERNEL);
  2004. if (!mle) {
  2005. ret = -ENOMEM;
  2006. goto leave;
  2007. }
  2008. /* check for pre-existing lock */
  2009. spin_lock(&dlm->spinlock);
  2010. res = __dlm_lookup_lockres(dlm, name, namelen);
  2011. spin_lock(&dlm->master_lock);
  2012. if (res) {
  2013. spin_lock(&res->spinlock);
  2014. if (res->state & DLM_LOCK_RES_RECOVERING) {
  2015. /* if all is working ok, this can only mean that we got
  2016. * a migrate request from a node that we now see as
  2017. * dead. what can we do here? drop it to the floor? */
  2018. spin_unlock(&res->spinlock);
  2019. mlog(ML_ERROR, "Got a migrate request, but the "
  2020. "lockres is marked as recovering!");
  2021. kmem_cache_free(dlm_mle_cache, mle);
  2022. ret = -EINVAL; /* need a better solution */
  2023. goto unlock;
  2024. }
  2025. res->state |= DLM_LOCK_RES_MIGRATING;
  2026. spin_unlock(&res->spinlock);
  2027. }
  2028. /* ignore status. only nonzero status would BUG. */
  2029. ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
  2030. name, namelen,
  2031. migrate->new_master,
  2032. migrate->master);
  2033. unlock:
  2034. spin_unlock(&dlm->master_lock);
  2035. spin_unlock(&dlm->spinlock);
  2036. if (oldmle) {
  2037. /* master is known, detach if not already detached */
  2038. dlm_mle_detach_hb_events(dlm, oldmle);
  2039. dlm_put_mle(oldmle);
  2040. }
  2041. if (res)
  2042. dlm_lockres_put(res);
  2043. leave:
  2044. dlm_put(dlm);
  2045. return ret;
  2046. }
  2047. /* must be holding dlm->spinlock and dlm->master_lock
  2048. * when adding a migration mle, we can clear any other mles
  2049. * in the master list because we know with certainty that
  2050. * the master is "master". so we remove any old mle from
  2051. * the list after setting it's master field, and then add
  2052. * the new migration mle. this way we can hold with the rule
  2053. * of having only one mle for a given lock name at all times. */
  2054. static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
  2055. struct dlm_lock_resource *res,
  2056. struct dlm_master_list_entry *mle,
  2057. struct dlm_master_list_entry **oldmle,
  2058. const char *name, unsigned int namelen,
  2059. u8 new_master, u8 master)
  2060. {
  2061. int found;
  2062. int ret = 0;
  2063. *oldmle = NULL;
  2064. mlog_entry_void();
  2065. assert_spin_locked(&dlm->spinlock);
  2066. assert_spin_locked(&dlm->master_lock);
  2067. /* caller is responsible for any ref taken here on oldmle */
  2068. found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
  2069. if (found) {
  2070. struct dlm_master_list_entry *tmp = *oldmle;
  2071. spin_lock(&tmp->spinlock);
  2072. if (tmp->type == DLM_MLE_MIGRATION) {
  2073. if (master == dlm->node_num) {
  2074. /* ah another process raced me to it */
  2075. mlog(0, "tried to migrate %.*s, but some "
  2076. "process beat me to it\n",
  2077. namelen, name);
  2078. ret = -EEXIST;
  2079. } else {
  2080. /* bad. 2 NODES are trying to migrate! */
  2081. mlog(ML_ERROR, "migration error mle: "
  2082. "master=%u new_master=%u // request: "
  2083. "master=%u new_master=%u // "
  2084. "lockres=%.*s\n",
  2085. tmp->master, tmp->new_master,
  2086. master, new_master,
  2087. namelen, name);
  2088. BUG();
  2089. }
  2090. } else {
  2091. /* this is essentially what assert_master does */
  2092. tmp->master = master;
  2093. atomic_set(&tmp->woken, 1);
  2094. wake_up(&tmp->wq);
  2095. /* remove it from the list so that only one
  2096. * mle will be found */
  2097. list_del_init(&tmp->list);
  2098. }
  2099. spin_unlock(&tmp->spinlock);
  2100. }
  2101. /* now add a migration mle to the tail of the list */
  2102. dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
  2103. mle->new_master = new_master;
  2104. mle->master = master;
  2105. /* do this for consistency with other mle types */
  2106. set_bit(new_master, mle->maybe_map);
  2107. list_add(&mle->list, &dlm->master_list);
  2108. return ret;
  2109. }
  2110. void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
  2111. {
  2112. struct list_head *iter, *iter2;
  2113. struct dlm_master_list_entry *mle;
  2114. struct dlm_lock_resource *res;
  2115. mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
  2116. top:
  2117. assert_spin_locked(&dlm->spinlock);
  2118. /* clean the master list */
  2119. spin_lock(&dlm->master_lock);
  2120. list_for_each_safe(iter, iter2, &dlm->master_list) {
  2121. mle = list_entry(iter, struct dlm_master_list_entry, list);
  2122. BUG_ON(mle->type != DLM_MLE_BLOCK &&
  2123. mle->type != DLM_MLE_MASTER &&
  2124. mle->type != DLM_MLE_MIGRATION);
  2125. /* MASTER mles are initiated locally. the waiting
  2126. * process will notice the node map change
  2127. * shortly. let that happen as normal. */
  2128. if (mle->type == DLM_MLE_MASTER)
  2129. continue;
  2130. /* BLOCK mles are initiated by other nodes.
  2131. * need to clean up if the dead node would have
  2132. * been the master. */
  2133. if (mle->type == DLM_MLE_BLOCK) {
  2134. int bit;
  2135. spin_lock(&mle->spinlock);
  2136. bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
  2137. if (bit != dead_node) {
  2138. mlog(0, "mle found, but dead node %u would "
  2139. "not have been master\n", dead_node);
  2140. spin_unlock(&mle->spinlock);
  2141. } else {
  2142. /* must drop the refcount by one since the
  2143. * assert_master will never arrive. this
  2144. * may result in the mle being unlinked and
  2145. * freed, but there may still be a process
  2146. * waiting in the dlmlock path which is fine. */
  2147. mlog(ML_ERROR, "node %u was expected master\n",
  2148. dead_node);
  2149. atomic_set(&mle->woken, 1);
  2150. spin_unlock(&mle->spinlock);
  2151. wake_up(&mle->wq);
  2152. /* final put will take care of list removal */
  2153. __dlm_put_mle(mle);
  2154. }
  2155. continue;
  2156. }
  2157. /* everything else is a MIGRATION mle */
  2158. /* the rule for MIGRATION mles is that the master
  2159. * becomes UNKNOWN if *either* the original or
  2160. * the new master dies. all UNKNOWN lockreses
  2161. * are sent to whichever node becomes the recovery
  2162. * master. the new master is responsible for
  2163. * determining if there is still a master for
  2164. * this lockres, or if he needs to take over
  2165. * mastery. either way, this node should expect
  2166. * another message to resolve this. */
  2167. if (mle->master != dead_node &&
  2168. mle->new_master != dead_node)
  2169. continue;
  2170. /* if we have reached this point, this mle needs to
  2171. * be removed from the list and freed. */
  2172. /* remove from the list early. NOTE: unlinking
  2173. * list_head while in list_for_each_safe */
  2174. spin_lock(&mle->spinlock);
  2175. list_del_init(&mle->list);
  2176. atomic_set(&mle->woken, 1);
  2177. spin_unlock(&mle->spinlock);
  2178. wake_up(&mle->wq);
  2179. mlog(0, "node %u died during migration from "
  2180. "%u to %u!\n", dead_node,
  2181. mle->master, mle->new_master);
  2182. /* if there is a lockres associated with this
  2183. * mle, find it and set its owner to UNKNOWN */
  2184. res = __dlm_lookup_lockres(dlm, mle->u.name.name,
  2185. mle->u.name.len);
  2186. if (res) {
  2187. /* unfortunately if we hit this rare case, our
  2188. * lock ordering is messed. we need to drop
  2189. * the master lock so that we can take the
  2190. * lockres lock, meaning that we will have to
  2191. * restart from the head of list. */
  2192. spin_unlock(&dlm->master_lock);
  2193. /* move lockres onto recovery list */
  2194. spin_lock(&res->spinlock);
  2195. dlm_set_lockres_owner(dlm, res,
  2196. DLM_LOCK_RES_OWNER_UNKNOWN);
  2197. dlm_move_lockres_to_recovery_list(dlm, res);
  2198. spin_unlock(&res->spinlock);
  2199. dlm_lockres_put(res);
  2200. /* dump the mle */
  2201. spin_lock(&dlm->master_lock);
  2202. __dlm_put_mle(mle);
  2203. spin_unlock(&dlm->master_lock);
  2204. /* restart */
  2205. goto top;
  2206. }
  2207. /* this may be the last reference */
  2208. __dlm_put_mle(mle);
  2209. }
  2210. spin_unlock(&dlm->master_lock);
  2211. }
  2212. int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
  2213. u8 old_master)
  2214. {
  2215. struct dlm_node_iter iter;
  2216. int ret = 0;
  2217. spin_lock(&dlm->spinlock);
  2218. dlm_node_iter_init(dlm->domain_map, &iter);
  2219. clear_bit(old_master, iter.node_map);
  2220. clear_bit(dlm->node_num, iter.node_map);
  2221. spin_unlock(&dlm->spinlock);
  2222. mlog(0, "now time to do a migrate request to other nodes\n");
  2223. ret = dlm_do_migrate_request(dlm, res, old_master,
  2224. dlm->node_num, &iter);
  2225. if (ret < 0) {
  2226. mlog_errno(ret);
  2227. goto leave;
  2228. }
  2229. mlog(0, "doing assert master of %.*s to all except the original node\n",
  2230. res->lockname.len, res->lockname.name);
  2231. /* this call now finishes out the nodemap
  2232. * even if one or more nodes die */
  2233. ret = dlm_do_assert_master(dlm, res->lockname.name,
  2234. res->lockname.len, iter.node_map,
  2235. DLM_ASSERT_MASTER_FINISH_MIGRATION);
  2236. if (ret < 0) {
  2237. /* no longer need to retry. all living nodes contacted. */
  2238. mlog_errno(ret);
  2239. ret = 0;
  2240. }
  2241. memset(iter.node_map, 0, sizeof(iter.node_map));
  2242. set_bit(old_master, iter.node_map);
  2243. mlog(0, "doing assert master of %.*s back to %u\n",
  2244. res->lockname.len, res->lockname.name, old_master);
  2245. ret = dlm_do_assert_master(dlm, res->lockname.name,
  2246. res->lockname.len, iter.node_map,
  2247. DLM_ASSERT_MASTER_FINISH_MIGRATION);
  2248. if (ret < 0) {
  2249. mlog(0, "assert master to original master failed "
  2250. "with %d.\n", ret);
  2251. /* the only nonzero status here would be because of
  2252. * a dead original node. we're done. */
  2253. ret = 0;
  2254. }
  2255. /* all done, set the owner, clear the flag */
  2256. spin_lock(&res->spinlock);
  2257. dlm_set_lockres_owner(dlm, res, dlm->node_num);
  2258. res->state &= ~DLM_LOCK_RES_MIGRATING;
  2259. spin_unlock(&res->spinlock);
  2260. /* re-dirty it on the new master */
  2261. dlm_kick_thread(dlm, res);
  2262. wake_up(&res->wq);
  2263. leave:
  2264. return ret;
  2265. }
  2266. /*
  2267. * LOCKRES AST REFCOUNT
  2268. * this is integral to migration
  2269. */
  2270. /* for future intent to call an ast, reserve one ahead of time.
  2271. * this should be called only after waiting on the lockres
  2272. * with dlm_wait_on_lockres, and while still holding the
  2273. * spinlock after the call. */
  2274. void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
  2275. {
  2276. assert_spin_locked(&res->spinlock);
  2277. if (res->state & DLM_LOCK_RES_MIGRATING) {
  2278. __dlm_print_one_lock_resource(res);
  2279. }
  2280. BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
  2281. atomic_inc(&res->asts_reserved);
  2282. }
  2283. /*
  2284. * used to drop the reserved ast, either because it went unused,
  2285. * or because the ast/bast was actually called.
  2286. *
  2287. * also, if there is a pending migration on this lockres,
  2288. * and this was the last pending ast on the lockres,
  2289. * atomically set the MIGRATING flag before we drop the lock.
  2290. * this is how we ensure that migration can proceed with no
  2291. * asts in progress. note that it is ok if the state of the
  2292. * queues is such that a lock should be granted in the future
  2293. * or that a bast should be fired, because the new master will
  2294. * shuffle the lists on this lockres as soon as it is migrated.
  2295. */
  2296. void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
  2297. struct dlm_lock_resource *res)
  2298. {
  2299. if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
  2300. return;
  2301. if (!res->migration_pending) {
  2302. spin_unlock(&res->spinlock);
  2303. return;
  2304. }
  2305. BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
  2306. res->migration_pending = 0;
  2307. res->state |= DLM_LOCK_RES_MIGRATING;
  2308. spin_unlock(&res->spinlock);
  2309. wake_up(&res->wq);
  2310. wake_up(&dlm->migration_wq);
  2311. }