xpc_channel.c 60 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243
  1. /*
  2. * This file is subject to the terms and conditions of the GNU General Public
  3. * License. See the file "COPYING" in the main directory of this archive
  4. * for more details.
  5. *
  6. * Copyright (c) 2004-2008 Silicon Graphics, Inc. All Rights Reserved.
  7. */
  8. /*
  9. * Cross Partition Communication (XPC) channel support.
  10. *
  11. * This is the part of XPC that manages the channels and
  12. * sends/receives messages across them to/from other partitions.
  13. *
  14. */
  15. #include <linux/kernel.h>
  16. #include <linux/init.h>
  17. #include <linux/sched.h>
  18. #include <linux/cache.h>
  19. #include <linux/interrupt.h>
  20. #include <linux/mutex.h>
  21. #include <linux/completion.h>
  22. #include <asm/sn/bte.h>
  23. #include <asm/sn/sn_sal.h>
  24. #include "xpc.h"
  25. /*
  26. * Guarantee that the kzalloc'd memory is cacheline aligned.
  27. */
  28. static void *
  29. xpc_kzalloc_cacheline_aligned(size_t size, gfp_t flags, void **base)
  30. {
  31. /* see if kzalloc will give us cachline aligned memory by default */
  32. *base = kzalloc(size, flags);
  33. if (*base == NULL)
  34. return NULL;
  35. if ((u64)*base == L1_CACHE_ALIGN((u64)*base))
  36. return *base;
  37. kfree(*base);
  38. /* nope, we'll have to do it ourselves */
  39. *base = kzalloc(size + L1_CACHE_BYTES, flags);
  40. if (*base == NULL)
  41. return NULL;
  42. return (void *)L1_CACHE_ALIGN((u64)*base);
  43. }
  44. /*
  45. * Set up the initial values for the XPartition Communication channels.
  46. */
  47. static void
  48. xpc_initialize_channels(struct xpc_partition *part, short partid)
  49. {
  50. int ch_number;
  51. struct xpc_channel *ch;
  52. for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
  53. ch = &part->channels[ch_number];
  54. ch->partid = partid;
  55. ch->number = ch_number;
  56. ch->flags = XPC_C_DISCONNECTED;
  57. ch->local_GP = &part->local_GPs[ch_number];
  58. ch->local_openclose_args =
  59. &part->local_openclose_args[ch_number];
  60. atomic_set(&ch->kthreads_assigned, 0);
  61. atomic_set(&ch->kthreads_idle, 0);
  62. atomic_set(&ch->kthreads_active, 0);
  63. atomic_set(&ch->references, 0);
  64. atomic_set(&ch->n_to_notify, 0);
  65. spin_lock_init(&ch->lock);
  66. mutex_init(&ch->msg_to_pull_mutex);
  67. init_completion(&ch->wdisconnect_wait);
  68. atomic_set(&ch->n_on_msg_allocate_wq, 0);
  69. init_waitqueue_head(&ch->msg_allocate_wq);
  70. init_waitqueue_head(&ch->idle_wq);
  71. }
  72. }
  73. /*
  74. * Setup the infrastructure necessary to support XPartition Communication
  75. * between the specified remote partition and the local one.
  76. */
  77. enum xp_retval
  78. xpc_setup_infrastructure(struct xpc_partition *part)
  79. {
  80. int ret, cpuid;
  81. struct timer_list *timer;
  82. short partid = XPC_PARTID(part);
  83. /*
  84. * Zero out MOST of the entry for this partition. Only the fields
  85. * starting with `nchannels' will be zeroed. The preceding fields must
  86. * remain `viable' across partition ups and downs, since they may be
  87. * referenced during this memset() operation.
  88. */
  89. memset(&part->nchannels, 0, sizeof(struct xpc_partition) -
  90. offsetof(struct xpc_partition, nchannels));
  91. /*
  92. * Allocate all of the channel structures as a contiguous chunk of
  93. * memory.
  94. */
  95. part->channels = kzalloc(sizeof(struct xpc_channel) * XPC_NCHANNELS,
  96. GFP_KERNEL);
  97. if (part->channels == NULL) {
  98. dev_err(xpc_chan, "can't get memory for channels\n");
  99. return xpNoMemory;
  100. }
  101. part->nchannels = XPC_NCHANNELS;
  102. /* allocate all the required GET/PUT values */
  103. part->local_GPs = xpc_kzalloc_cacheline_aligned(XPC_GP_SIZE,
  104. GFP_KERNEL,
  105. &part->local_GPs_base);
  106. if (part->local_GPs == NULL) {
  107. kfree(part->channels);
  108. part->channels = NULL;
  109. dev_err(xpc_chan, "can't get memory for local get/put "
  110. "values\n");
  111. return xpNoMemory;
  112. }
  113. part->remote_GPs = xpc_kzalloc_cacheline_aligned(XPC_GP_SIZE,
  114. GFP_KERNEL,
  115. &part->
  116. remote_GPs_base);
  117. if (part->remote_GPs == NULL) {
  118. dev_err(xpc_chan, "can't get memory for remote get/put "
  119. "values\n");
  120. kfree(part->local_GPs_base);
  121. part->local_GPs = NULL;
  122. kfree(part->channels);
  123. part->channels = NULL;
  124. return xpNoMemory;
  125. }
  126. /* allocate all the required open and close args */
  127. part->local_openclose_args =
  128. xpc_kzalloc_cacheline_aligned(XPC_OPENCLOSE_ARGS_SIZE, GFP_KERNEL,
  129. &part->local_openclose_args_base);
  130. if (part->local_openclose_args == NULL) {
  131. dev_err(xpc_chan, "can't get memory for local connect args\n");
  132. kfree(part->remote_GPs_base);
  133. part->remote_GPs = NULL;
  134. kfree(part->local_GPs_base);
  135. part->local_GPs = NULL;
  136. kfree(part->channels);
  137. part->channels = NULL;
  138. return xpNoMemory;
  139. }
  140. part->remote_openclose_args =
  141. xpc_kzalloc_cacheline_aligned(XPC_OPENCLOSE_ARGS_SIZE, GFP_KERNEL,
  142. &part->remote_openclose_args_base);
  143. if (part->remote_openclose_args == NULL) {
  144. dev_err(xpc_chan, "can't get memory for remote connect args\n");
  145. kfree(part->local_openclose_args_base);
  146. part->local_openclose_args = NULL;
  147. kfree(part->remote_GPs_base);
  148. part->remote_GPs = NULL;
  149. kfree(part->local_GPs_base);
  150. part->local_GPs = NULL;
  151. kfree(part->channels);
  152. part->channels = NULL;
  153. return xpNoMemory;
  154. }
  155. xpc_initialize_channels(part, partid);
  156. atomic_set(&part->nchannels_active, 0);
  157. atomic_set(&part->nchannels_engaged, 0);
  158. /* local_IPI_amo were set to 0 by an earlier memset() */
  159. /* Initialize this partitions AMO_t structure */
  160. part->local_IPI_amo_va = xpc_IPI_init(partid);
  161. spin_lock_init(&part->IPI_lock);
  162. atomic_set(&part->channel_mgr_requests, 1);
  163. init_waitqueue_head(&part->channel_mgr_wq);
  164. sprintf(part->IPI_owner, "xpc%02d", partid);
  165. ret = request_irq(SGI_XPC_NOTIFY, xpc_notify_IRQ_handler, IRQF_SHARED,
  166. part->IPI_owner, (void *)(u64)partid);
  167. if (ret != 0) {
  168. dev_err(xpc_chan, "can't register NOTIFY IRQ handler, "
  169. "errno=%d\n", -ret);
  170. kfree(part->remote_openclose_args_base);
  171. part->remote_openclose_args = NULL;
  172. kfree(part->local_openclose_args_base);
  173. part->local_openclose_args = NULL;
  174. kfree(part->remote_GPs_base);
  175. part->remote_GPs = NULL;
  176. kfree(part->local_GPs_base);
  177. part->local_GPs = NULL;
  178. kfree(part->channels);
  179. part->channels = NULL;
  180. return xpLackOfResources;
  181. }
  182. /* Setup a timer to check for dropped IPIs */
  183. timer = &part->dropped_IPI_timer;
  184. init_timer(timer);
  185. timer->function = (void (*)(unsigned long))xpc_dropped_IPI_check;
  186. timer->data = (unsigned long)part;
  187. timer->expires = jiffies + XPC_P_DROPPED_IPI_WAIT;
  188. add_timer(timer);
  189. /*
  190. * With the setting of the partition setup_state to XPC_P_SETUP, we're
  191. * declaring that this partition is ready to go.
  192. */
  193. part->setup_state = XPC_P_SETUP;
  194. /*
  195. * Setup the per partition specific variables required by the
  196. * remote partition to establish channel connections with us.
  197. *
  198. * The setting of the magic # indicates that these per partition
  199. * specific variables are ready to be used.
  200. */
  201. xpc_vars_part[partid].GPs_pa = __pa(part->local_GPs);
  202. xpc_vars_part[partid].openclose_args_pa =
  203. __pa(part->local_openclose_args);
  204. xpc_vars_part[partid].IPI_amo_pa = __pa(part->local_IPI_amo_va);
  205. cpuid = raw_smp_processor_id(); /* any CPU in this partition will do */
  206. xpc_vars_part[partid].IPI_nasid = cpuid_to_nasid(cpuid);
  207. xpc_vars_part[partid].IPI_phys_cpuid = cpu_physical_id(cpuid);
  208. xpc_vars_part[partid].nchannels = part->nchannels;
  209. xpc_vars_part[partid].magic = XPC_VP_MAGIC1;
  210. return xpSuccess;
  211. }
  212. /*
  213. * Create a wrapper that hides the underlying mechanism for pulling a cacheline
  214. * (or multiple cachelines) from a remote partition.
  215. *
  216. * src must be a cacheline aligned physical address on the remote partition.
  217. * dst must be a cacheline aligned virtual address on this partition.
  218. * cnt must be an cacheline sized
  219. */
  220. static enum xp_retval
  221. xpc_pull_remote_cachelines(struct xpc_partition *part, void *dst,
  222. const void *src, size_t cnt)
  223. {
  224. bte_result_t bte_ret;
  225. DBUG_ON((u64)src != L1_CACHE_ALIGN((u64)src));
  226. DBUG_ON((u64)dst != L1_CACHE_ALIGN((u64)dst));
  227. DBUG_ON(cnt != L1_CACHE_ALIGN(cnt));
  228. if (part->act_state == XPC_P_DEACTIVATING)
  229. return part->reason;
  230. bte_ret = xp_bte_copy((u64)src, (u64)dst, (u64)cnt,
  231. (BTE_NORMAL | BTE_WACQUIRE), NULL);
  232. if (bte_ret == BTE_SUCCESS)
  233. return xpSuccess;
  234. dev_dbg(xpc_chan, "xp_bte_copy() from partition %d failed, ret=%d\n",
  235. XPC_PARTID(part), bte_ret);
  236. return xpc_map_bte_errors(bte_ret);
  237. }
  238. /*
  239. * Pull the remote per partition specific variables from the specified
  240. * partition.
  241. */
  242. enum xp_retval
  243. xpc_pull_remote_vars_part(struct xpc_partition *part)
  244. {
  245. u8 buffer[L1_CACHE_BYTES * 2];
  246. struct xpc_vars_part *pulled_entry_cacheline =
  247. (struct xpc_vars_part *)L1_CACHE_ALIGN((u64)buffer);
  248. struct xpc_vars_part *pulled_entry;
  249. u64 remote_entry_cacheline_pa, remote_entry_pa;
  250. short partid = XPC_PARTID(part);
  251. enum xp_retval ret;
  252. /* pull the cacheline that contains the variables we're interested in */
  253. DBUG_ON(part->remote_vars_part_pa !=
  254. L1_CACHE_ALIGN(part->remote_vars_part_pa));
  255. DBUG_ON(sizeof(struct xpc_vars_part) != L1_CACHE_BYTES / 2);
  256. remote_entry_pa = part->remote_vars_part_pa +
  257. sn_partition_id * sizeof(struct xpc_vars_part);
  258. remote_entry_cacheline_pa = (remote_entry_pa & ~(L1_CACHE_BYTES - 1));
  259. pulled_entry = (struct xpc_vars_part *)((u64)pulled_entry_cacheline +
  260. (remote_entry_pa &
  261. (L1_CACHE_BYTES - 1)));
  262. ret = xpc_pull_remote_cachelines(part, pulled_entry_cacheline,
  263. (void *)remote_entry_cacheline_pa,
  264. L1_CACHE_BYTES);
  265. if (ret != xpSuccess) {
  266. dev_dbg(xpc_chan, "failed to pull XPC vars_part from "
  267. "partition %d, ret=%d\n", partid, ret);
  268. return ret;
  269. }
  270. /* see if they've been set up yet */
  271. if (pulled_entry->magic != XPC_VP_MAGIC1 &&
  272. pulled_entry->magic != XPC_VP_MAGIC2) {
  273. if (pulled_entry->magic != 0) {
  274. dev_dbg(xpc_chan, "partition %d's XPC vars_part for "
  275. "partition %d has bad magic value (=0x%lx)\n",
  276. partid, sn_partition_id, pulled_entry->magic);
  277. return xpBadMagic;
  278. }
  279. /* they've not been initialized yet */
  280. return xpRetry;
  281. }
  282. if (xpc_vars_part[partid].magic == XPC_VP_MAGIC1) {
  283. /* validate the variables */
  284. if (pulled_entry->GPs_pa == 0 ||
  285. pulled_entry->openclose_args_pa == 0 ||
  286. pulled_entry->IPI_amo_pa == 0) {
  287. dev_err(xpc_chan, "partition %d's XPC vars_part for "
  288. "partition %d are not valid\n", partid,
  289. sn_partition_id);
  290. return xpInvalidAddress;
  291. }
  292. /* the variables we imported look to be valid */
  293. part->remote_GPs_pa = pulled_entry->GPs_pa;
  294. part->remote_openclose_args_pa =
  295. pulled_entry->openclose_args_pa;
  296. part->remote_IPI_amo_va =
  297. (AMO_t *)__va(pulled_entry->IPI_amo_pa);
  298. part->remote_IPI_nasid = pulled_entry->IPI_nasid;
  299. part->remote_IPI_phys_cpuid = pulled_entry->IPI_phys_cpuid;
  300. if (part->nchannels > pulled_entry->nchannels)
  301. part->nchannels = pulled_entry->nchannels;
  302. /* let the other side know that we've pulled their variables */
  303. xpc_vars_part[partid].magic = XPC_VP_MAGIC2;
  304. }
  305. if (pulled_entry->magic == XPC_VP_MAGIC1)
  306. return xpRetry;
  307. return xpSuccess;
  308. }
  309. /*
  310. * Get the IPI flags and pull the openclose args and/or remote GPs as needed.
  311. */
  312. static u64
  313. xpc_get_IPI_flags(struct xpc_partition *part)
  314. {
  315. unsigned long irq_flags;
  316. u64 IPI_amo;
  317. enum xp_retval ret;
  318. /*
  319. * See if there are any IPI flags to be handled.
  320. */
  321. spin_lock_irqsave(&part->IPI_lock, irq_flags);
  322. IPI_amo = part->local_IPI_amo;
  323. if (IPI_amo != 0)
  324. part->local_IPI_amo = 0;
  325. spin_unlock_irqrestore(&part->IPI_lock, irq_flags);
  326. if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_amo)) {
  327. ret = xpc_pull_remote_cachelines(part,
  328. part->remote_openclose_args,
  329. (void *)part->
  330. remote_openclose_args_pa,
  331. XPC_OPENCLOSE_ARGS_SIZE);
  332. if (ret != xpSuccess) {
  333. XPC_DEACTIVATE_PARTITION(part, ret);
  334. dev_dbg(xpc_chan, "failed to pull openclose args from "
  335. "partition %d, ret=%d\n", XPC_PARTID(part),
  336. ret);
  337. /* don't bother processing IPIs anymore */
  338. IPI_amo = 0;
  339. }
  340. }
  341. if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_amo)) {
  342. ret = xpc_pull_remote_cachelines(part, part->remote_GPs,
  343. (void *)part->remote_GPs_pa,
  344. XPC_GP_SIZE);
  345. if (ret != xpSuccess) {
  346. XPC_DEACTIVATE_PARTITION(part, ret);
  347. dev_dbg(xpc_chan, "failed to pull GPs from partition "
  348. "%d, ret=%d\n", XPC_PARTID(part), ret);
  349. /* don't bother processing IPIs anymore */
  350. IPI_amo = 0;
  351. }
  352. }
  353. return IPI_amo;
  354. }
  355. /*
  356. * Allocate the local message queue and the notify queue.
  357. */
  358. static enum xp_retval
  359. xpc_allocate_local_msgqueue(struct xpc_channel *ch)
  360. {
  361. unsigned long irq_flags;
  362. int nentries;
  363. size_t nbytes;
  364. for (nentries = ch->local_nentries; nentries > 0; nentries--) {
  365. nbytes = nentries * ch->msg_size;
  366. ch->local_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
  367. GFP_KERNEL,
  368. &ch->local_msgqueue_base);
  369. if (ch->local_msgqueue == NULL)
  370. continue;
  371. nbytes = nentries * sizeof(struct xpc_notify);
  372. ch->notify_queue = kzalloc(nbytes, GFP_KERNEL);
  373. if (ch->notify_queue == NULL) {
  374. kfree(ch->local_msgqueue_base);
  375. ch->local_msgqueue = NULL;
  376. continue;
  377. }
  378. spin_lock_irqsave(&ch->lock, irq_flags);
  379. if (nentries < ch->local_nentries) {
  380. dev_dbg(xpc_chan, "nentries=%d local_nentries=%d, "
  381. "partid=%d, channel=%d\n", nentries,
  382. ch->local_nentries, ch->partid, ch->number);
  383. ch->local_nentries = nentries;
  384. }
  385. spin_unlock_irqrestore(&ch->lock, irq_flags);
  386. return xpSuccess;
  387. }
  388. dev_dbg(xpc_chan, "can't get memory for local message queue and notify "
  389. "queue, partid=%d, channel=%d\n", ch->partid, ch->number);
  390. return xpNoMemory;
  391. }
  392. /*
  393. * Allocate the cached remote message queue.
  394. */
  395. static enum xp_retval
  396. xpc_allocate_remote_msgqueue(struct xpc_channel *ch)
  397. {
  398. unsigned long irq_flags;
  399. int nentries;
  400. size_t nbytes;
  401. DBUG_ON(ch->remote_nentries <= 0);
  402. for (nentries = ch->remote_nentries; nentries > 0; nentries--) {
  403. nbytes = nentries * ch->msg_size;
  404. ch->remote_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
  405. GFP_KERNEL,
  406. &ch->remote_msgqueue_base);
  407. if (ch->remote_msgqueue == NULL)
  408. continue;
  409. spin_lock_irqsave(&ch->lock, irq_flags);
  410. if (nentries < ch->remote_nentries) {
  411. dev_dbg(xpc_chan, "nentries=%d remote_nentries=%d, "
  412. "partid=%d, channel=%d\n", nentries,
  413. ch->remote_nentries, ch->partid, ch->number);
  414. ch->remote_nentries = nentries;
  415. }
  416. spin_unlock_irqrestore(&ch->lock, irq_flags);
  417. return xpSuccess;
  418. }
  419. dev_dbg(xpc_chan, "can't get memory for cached remote message queue, "
  420. "partid=%d, channel=%d\n", ch->partid, ch->number);
  421. return xpNoMemory;
  422. }
  423. /*
  424. * Allocate message queues and other stuff associated with a channel.
  425. *
  426. * Note: Assumes all of the channel sizes are filled in.
  427. */
  428. static enum xp_retval
  429. xpc_allocate_msgqueues(struct xpc_channel *ch)
  430. {
  431. unsigned long irq_flags;
  432. enum xp_retval ret;
  433. DBUG_ON(ch->flags & XPC_C_SETUP);
  434. ret = xpc_allocate_local_msgqueue(ch);
  435. if (ret != xpSuccess)
  436. return ret;
  437. ret = xpc_allocate_remote_msgqueue(ch);
  438. if (ret != xpSuccess) {
  439. kfree(ch->local_msgqueue_base);
  440. ch->local_msgqueue = NULL;
  441. kfree(ch->notify_queue);
  442. ch->notify_queue = NULL;
  443. return ret;
  444. }
  445. spin_lock_irqsave(&ch->lock, irq_flags);
  446. ch->flags |= XPC_C_SETUP;
  447. spin_unlock_irqrestore(&ch->lock, irq_flags);
  448. return xpSuccess;
  449. }
  450. /*
  451. * Process a connect message from a remote partition.
  452. *
  453. * Note: xpc_process_connect() is expecting to be called with the
  454. * spin_lock_irqsave held and will leave it locked upon return.
  455. */
  456. static void
  457. xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags)
  458. {
  459. enum xp_retval ret;
  460. DBUG_ON(!spin_is_locked(&ch->lock));
  461. if (!(ch->flags & XPC_C_OPENREQUEST) ||
  462. !(ch->flags & XPC_C_ROPENREQUEST)) {
  463. /* nothing more to do for now */
  464. return;
  465. }
  466. DBUG_ON(!(ch->flags & XPC_C_CONNECTING));
  467. if (!(ch->flags & XPC_C_SETUP)) {
  468. spin_unlock_irqrestore(&ch->lock, *irq_flags);
  469. ret = xpc_allocate_msgqueues(ch);
  470. spin_lock_irqsave(&ch->lock, *irq_flags);
  471. if (ret != xpSuccess)
  472. XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags);
  473. if (ch->flags & (XPC_C_CONNECTED | XPC_C_DISCONNECTING))
  474. return;
  475. DBUG_ON(!(ch->flags & XPC_C_SETUP));
  476. DBUG_ON(ch->local_msgqueue == NULL);
  477. DBUG_ON(ch->remote_msgqueue == NULL);
  478. }
  479. if (!(ch->flags & XPC_C_OPENREPLY)) {
  480. ch->flags |= XPC_C_OPENREPLY;
  481. xpc_IPI_send_openreply(ch, irq_flags);
  482. }
  483. if (!(ch->flags & XPC_C_ROPENREPLY))
  484. return;
  485. DBUG_ON(ch->remote_msgqueue_pa == 0);
  486. ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP); /* clear all else */
  487. dev_info(xpc_chan, "channel %d to partition %d connected\n",
  488. ch->number, ch->partid);
  489. spin_unlock_irqrestore(&ch->lock, *irq_flags);
  490. xpc_create_kthreads(ch, 1, 0);
  491. spin_lock_irqsave(&ch->lock, *irq_flags);
  492. }
  493. /*
  494. * Notify those who wanted to be notified upon delivery of their message.
  495. */
  496. static void
  497. xpc_notify_senders(struct xpc_channel *ch, enum xp_retval reason, s64 put)
  498. {
  499. struct xpc_notify *notify;
  500. u8 notify_type;
  501. s64 get = ch->w_remote_GP.get - 1;
  502. while (++get < put && atomic_read(&ch->n_to_notify) > 0) {
  503. notify = &ch->notify_queue[get % ch->local_nentries];
  504. /*
  505. * See if the notify entry indicates it was associated with
  506. * a message who's sender wants to be notified. It is possible
  507. * that it is, but someone else is doing or has done the
  508. * notification.
  509. */
  510. notify_type = notify->type;
  511. if (notify_type == 0 ||
  512. cmpxchg(&notify->type, notify_type, 0) != notify_type) {
  513. continue;
  514. }
  515. DBUG_ON(notify_type != XPC_N_CALL);
  516. atomic_dec(&ch->n_to_notify);
  517. if (notify->func != NULL) {
  518. dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, "
  519. "msg_number=%ld, partid=%d, channel=%d\n",
  520. (void *)notify, get, ch->partid, ch->number);
  521. notify->func(reason, ch->partid, ch->number,
  522. notify->key);
  523. dev_dbg(xpc_chan, "notify->func() returned, "
  524. "notify=0x%p, msg_number=%ld, partid=%d, "
  525. "channel=%d\n", (void *)notify, get,
  526. ch->partid, ch->number);
  527. }
  528. }
  529. }
  530. /*
  531. * Free up message queues and other stuff that were allocated for the specified
  532. * channel.
  533. *
  534. * Note: ch->reason and ch->reason_line are left set for debugging purposes,
  535. * they're cleared when XPC_C_DISCONNECTED is cleared.
  536. */
  537. static void
  538. xpc_free_msgqueues(struct xpc_channel *ch)
  539. {
  540. DBUG_ON(!spin_is_locked(&ch->lock));
  541. DBUG_ON(atomic_read(&ch->n_to_notify) != 0);
  542. ch->remote_msgqueue_pa = 0;
  543. ch->func = NULL;
  544. ch->key = NULL;
  545. ch->msg_size = 0;
  546. ch->local_nentries = 0;
  547. ch->remote_nentries = 0;
  548. ch->kthreads_assigned_limit = 0;
  549. ch->kthreads_idle_limit = 0;
  550. ch->local_GP->get = 0;
  551. ch->local_GP->put = 0;
  552. ch->remote_GP.get = 0;
  553. ch->remote_GP.put = 0;
  554. ch->w_local_GP.get = 0;
  555. ch->w_local_GP.put = 0;
  556. ch->w_remote_GP.get = 0;
  557. ch->w_remote_GP.put = 0;
  558. ch->next_msg_to_pull = 0;
  559. if (ch->flags & XPC_C_SETUP) {
  560. ch->flags &= ~XPC_C_SETUP;
  561. dev_dbg(xpc_chan, "ch->flags=0x%x, partid=%d, channel=%d\n",
  562. ch->flags, ch->partid, ch->number);
  563. kfree(ch->local_msgqueue_base);
  564. ch->local_msgqueue = NULL;
  565. kfree(ch->remote_msgqueue_base);
  566. ch->remote_msgqueue = NULL;
  567. kfree(ch->notify_queue);
  568. ch->notify_queue = NULL;
  569. }
  570. }
  571. /*
  572. * spin_lock_irqsave() is expected to be held on entry.
  573. */
  574. static void
  575. xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
  576. {
  577. struct xpc_partition *part = &xpc_partitions[ch->partid];
  578. u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED);
  579. DBUG_ON(!spin_is_locked(&ch->lock));
  580. if (!(ch->flags & XPC_C_DISCONNECTING))
  581. return;
  582. DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
  583. /* make sure all activity has settled down first */
  584. if (atomic_read(&ch->kthreads_assigned) > 0 ||
  585. atomic_read(&ch->references) > 0) {
  586. return;
  587. }
  588. DBUG_ON((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
  589. !(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE));
  590. if (part->act_state == XPC_P_DEACTIVATING) {
  591. /* can't proceed until the other side disengages from us */
  592. if (xpc_partition_engaged(1UL << ch->partid))
  593. return;
  594. } else {
  595. /* as long as the other side is up do the full protocol */
  596. if (!(ch->flags & XPC_C_RCLOSEREQUEST))
  597. return;
  598. if (!(ch->flags & XPC_C_CLOSEREPLY)) {
  599. ch->flags |= XPC_C_CLOSEREPLY;
  600. xpc_IPI_send_closereply(ch, irq_flags);
  601. }
  602. if (!(ch->flags & XPC_C_RCLOSEREPLY))
  603. return;
  604. }
  605. /* wake those waiting for notify completion */
  606. if (atomic_read(&ch->n_to_notify) > 0) {
  607. /* >>> we do callout while holding ch->lock */
  608. xpc_notify_senders(ch, ch->reason, ch->w_local_GP.put);
  609. }
  610. /* both sides are disconnected now */
  611. if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) {
  612. spin_unlock_irqrestore(&ch->lock, *irq_flags);
  613. xpc_disconnect_callout(ch, xpDisconnected);
  614. spin_lock_irqsave(&ch->lock, *irq_flags);
  615. }
  616. /* it's now safe to free the channel's message queues */
  617. xpc_free_msgqueues(ch);
  618. /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */
  619. ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT));
  620. atomic_dec(&part->nchannels_active);
  621. if (channel_was_connected) {
  622. dev_info(xpc_chan, "channel %d to partition %d disconnected, "
  623. "reason=%d\n", ch->number, ch->partid, ch->reason);
  624. }
  625. if (ch->flags & XPC_C_WDISCONNECT) {
  626. /* we won't lose the CPU since we're holding ch->lock */
  627. complete(&ch->wdisconnect_wait);
  628. } else if (ch->delayed_IPI_flags) {
  629. if (part->act_state != XPC_P_DEACTIVATING) {
  630. /* time to take action on any delayed IPI flags */
  631. spin_lock(&part->IPI_lock);
  632. XPC_SET_IPI_FLAGS(part->local_IPI_amo, ch->number,
  633. ch->delayed_IPI_flags);
  634. spin_unlock(&part->IPI_lock);
  635. }
  636. ch->delayed_IPI_flags = 0;
  637. }
  638. }
  639. /*
  640. * Process a change in the channel's remote connection state.
  641. */
  642. static void
  643. xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
  644. u8 IPI_flags)
  645. {
  646. unsigned long irq_flags;
  647. struct xpc_openclose_args *args =
  648. &part->remote_openclose_args[ch_number];
  649. struct xpc_channel *ch = &part->channels[ch_number];
  650. enum xp_retval reason;
  651. spin_lock_irqsave(&ch->lock, irq_flags);
  652. again:
  653. if ((ch->flags & XPC_C_DISCONNECTED) &&
  654. (ch->flags & XPC_C_WDISCONNECT)) {
  655. /*
  656. * Delay processing IPI flags until thread waiting disconnect
  657. * has had a chance to see that the channel is disconnected.
  658. */
  659. ch->delayed_IPI_flags |= IPI_flags;
  660. spin_unlock_irqrestore(&ch->lock, irq_flags);
  661. return;
  662. }
  663. if (IPI_flags & XPC_IPI_CLOSEREQUEST) {
  664. dev_dbg(xpc_chan, "XPC_IPI_CLOSEREQUEST (reason=%d) received "
  665. "from partid=%d, channel=%d\n", args->reason,
  666. ch->partid, ch->number);
  667. /*
  668. * If RCLOSEREQUEST is set, we're probably waiting for
  669. * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
  670. * with this RCLOSEREQUEST in the IPI_flags.
  671. */
  672. if (ch->flags & XPC_C_RCLOSEREQUEST) {
  673. DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
  674. DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
  675. DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY));
  676. DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY);
  677. DBUG_ON(!(IPI_flags & XPC_IPI_CLOSEREPLY));
  678. IPI_flags &= ~XPC_IPI_CLOSEREPLY;
  679. ch->flags |= XPC_C_RCLOSEREPLY;
  680. /* both sides have finished disconnecting */
  681. xpc_process_disconnect(ch, &irq_flags);
  682. DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
  683. goto again;
  684. }
  685. if (ch->flags & XPC_C_DISCONNECTED) {
  686. if (!(IPI_flags & XPC_IPI_OPENREQUEST)) {
  687. if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo,
  688. ch_number) &
  689. XPC_IPI_OPENREQUEST)) {
  690. DBUG_ON(ch->delayed_IPI_flags != 0);
  691. spin_lock(&part->IPI_lock);
  692. XPC_SET_IPI_FLAGS(part->local_IPI_amo,
  693. ch_number,
  694. XPC_IPI_CLOSEREQUEST);
  695. spin_unlock(&part->IPI_lock);
  696. }
  697. spin_unlock_irqrestore(&ch->lock, irq_flags);
  698. return;
  699. }
  700. XPC_SET_REASON(ch, 0, 0);
  701. ch->flags &= ~XPC_C_DISCONNECTED;
  702. atomic_inc(&part->nchannels_active);
  703. ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST);
  704. }
  705. IPI_flags &= ~(XPC_IPI_OPENREQUEST | XPC_IPI_OPENREPLY);
  706. /*
  707. * The meaningful CLOSEREQUEST connection state fields are:
  708. * reason = reason connection is to be closed
  709. */
  710. ch->flags |= XPC_C_RCLOSEREQUEST;
  711. if (!(ch->flags & XPC_C_DISCONNECTING)) {
  712. reason = args->reason;
  713. if (reason <= xpSuccess || reason > xpUnknownReason)
  714. reason = xpUnknownReason;
  715. else if (reason == xpUnregistering)
  716. reason = xpOtherUnregistering;
  717. XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
  718. DBUG_ON(IPI_flags & XPC_IPI_CLOSEREPLY);
  719. spin_unlock_irqrestore(&ch->lock, irq_flags);
  720. return;
  721. }
  722. xpc_process_disconnect(ch, &irq_flags);
  723. }
  724. if (IPI_flags & XPC_IPI_CLOSEREPLY) {
  725. dev_dbg(xpc_chan, "XPC_IPI_CLOSEREPLY received from partid=%d,"
  726. " channel=%d\n", ch->partid, ch->number);
  727. if (ch->flags & XPC_C_DISCONNECTED) {
  728. DBUG_ON(part->act_state != XPC_P_DEACTIVATING);
  729. spin_unlock_irqrestore(&ch->lock, irq_flags);
  730. return;
  731. }
  732. DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
  733. if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
  734. if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, ch_number)
  735. & XPC_IPI_CLOSEREQUEST)) {
  736. DBUG_ON(ch->delayed_IPI_flags != 0);
  737. spin_lock(&part->IPI_lock);
  738. XPC_SET_IPI_FLAGS(part->local_IPI_amo,
  739. ch_number,
  740. XPC_IPI_CLOSEREPLY);
  741. spin_unlock(&part->IPI_lock);
  742. }
  743. spin_unlock_irqrestore(&ch->lock, irq_flags);
  744. return;
  745. }
  746. ch->flags |= XPC_C_RCLOSEREPLY;
  747. if (ch->flags & XPC_C_CLOSEREPLY) {
  748. /* both sides have finished disconnecting */
  749. xpc_process_disconnect(ch, &irq_flags);
  750. }
  751. }
  752. if (IPI_flags & XPC_IPI_OPENREQUEST) {
  753. dev_dbg(xpc_chan, "XPC_IPI_OPENREQUEST (msg_size=%d, "
  754. "local_nentries=%d) received from partid=%d, "
  755. "channel=%d\n", args->msg_size, args->local_nentries,
  756. ch->partid, ch->number);
  757. if (part->act_state == XPC_P_DEACTIVATING ||
  758. (ch->flags & XPC_C_ROPENREQUEST)) {
  759. spin_unlock_irqrestore(&ch->lock, irq_flags);
  760. return;
  761. }
  762. if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) {
  763. ch->delayed_IPI_flags |= XPC_IPI_OPENREQUEST;
  764. spin_unlock_irqrestore(&ch->lock, irq_flags);
  765. return;
  766. }
  767. DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED |
  768. XPC_C_OPENREQUEST)));
  769. DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
  770. XPC_C_OPENREPLY | XPC_C_CONNECTED));
  771. /*
  772. * The meaningful OPENREQUEST connection state fields are:
  773. * msg_size = size of channel's messages in bytes
  774. * local_nentries = remote partition's local_nentries
  775. */
  776. if (args->msg_size == 0 || args->local_nentries == 0) {
  777. /* assume OPENREQUEST was delayed by mistake */
  778. spin_unlock_irqrestore(&ch->lock, irq_flags);
  779. return;
  780. }
  781. ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING);
  782. ch->remote_nentries = args->local_nentries;
  783. if (ch->flags & XPC_C_OPENREQUEST) {
  784. if (args->msg_size != ch->msg_size) {
  785. XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
  786. &irq_flags);
  787. spin_unlock_irqrestore(&ch->lock, irq_flags);
  788. return;
  789. }
  790. } else {
  791. ch->msg_size = args->msg_size;
  792. XPC_SET_REASON(ch, 0, 0);
  793. ch->flags &= ~XPC_C_DISCONNECTED;
  794. atomic_inc(&part->nchannels_active);
  795. }
  796. xpc_process_connect(ch, &irq_flags);
  797. }
  798. if (IPI_flags & XPC_IPI_OPENREPLY) {
  799. dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY (local_msgqueue_pa=0x%lx, "
  800. "local_nentries=%d, remote_nentries=%d) received from "
  801. "partid=%d, channel=%d\n", args->local_msgqueue_pa,
  802. args->local_nentries, args->remote_nentries,
  803. ch->partid, ch->number);
  804. if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) {
  805. spin_unlock_irqrestore(&ch->lock, irq_flags);
  806. return;
  807. }
  808. if (!(ch->flags & XPC_C_OPENREQUEST)) {
  809. XPC_DISCONNECT_CHANNEL(ch, xpOpenCloseError,
  810. &irq_flags);
  811. spin_unlock_irqrestore(&ch->lock, irq_flags);
  812. return;
  813. }
  814. DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST));
  815. DBUG_ON(ch->flags & XPC_C_CONNECTED);
  816. /*
  817. * The meaningful OPENREPLY connection state fields are:
  818. * local_msgqueue_pa = physical address of remote
  819. * partition's local_msgqueue
  820. * local_nentries = remote partition's local_nentries
  821. * remote_nentries = remote partition's remote_nentries
  822. */
  823. DBUG_ON(args->local_msgqueue_pa == 0);
  824. DBUG_ON(args->local_nentries == 0);
  825. DBUG_ON(args->remote_nentries == 0);
  826. ch->flags |= XPC_C_ROPENREPLY;
  827. ch->remote_msgqueue_pa = args->local_msgqueue_pa;
  828. if (args->local_nentries < ch->remote_nentries) {
  829. dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
  830. "remote_nentries=%d, old remote_nentries=%d, "
  831. "partid=%d, channel=%d\n",
  832. args->local_nentries, ch->remote_nentries,
  833. ch->partid, ch->number);
  834. ch->remote_nentries = args->local_nentries;
  835. }
  836. if (args->remote_nentries < ch->local_nentries) {
  837. dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
  838. "local_nentries=%d, old local_nentries=%d, "
  839. "partid=%d, channel=%d\n",
  840. args->remote_nentries, ch->local_nentries,
  841. ch->partid, ch->number);
  842. ch->local_nentries = args->remote_nentries;
  843. }
  844. xpc_process_connect(ch, &irq_flags);
  845. }
  846. spin_unlock_irqrestore(&ch->lock, irq_flags);
  847. }
  848. /*
  849. * Attempt to establish a channel connection to a remote partition.
  850. */
  851. static enum xp_retval
  852. xpc_connect_channel(struct xpc_channel *ch)
  853. {
  854. unsigned long irq_flags;
  855. struct xpc_registration *registration = &xpc_registrations[ch->number];
  856. if (mutex_trylock(&registration->mutex) == 0)
  857. return xpRetry;
  858. if (!XPC_CHANNEL_REGISTERED(ch->number)) {
  859. mutex_unlock(&registration->mutex);
  860. return xpUnregistered;
  861. }
  862. spin_lock_irqsave(&ch->lock, irq_flags);
  863. DBUG_ON(ch->flags & XPC_C_CONNECTED);
  864. DBUG_ON(ch->flags & XPC_C_OPENREQUEST);
  865. if (ch->flags & XPC_C_DISCONNECTING) {
  866. spin_unlock_irqrestore(&ch->lock, irq_flags);
  867. mutex_unlock(&registration->mutex);
  868. return ch->reason;
  869. }
  870. /* add info from the channel connect registration to the channel */
  871. ch->kthreads_assigned_limit = registration->assigned_limit;
  872. ch->kthreads_idle_limit = registration->idle_limit;
  873. DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
  874. DBUG_ON(atomic_read(&ch->kthreads_idle) != 0);
  875. DBUG_ON(atomic_read(&ch->kthreads_active) != 0);
  876. ch->func = registration->func;
  877. DBUG_ON(registration->func == NULL);
  878. ch->key = registration->key;
  879. ch->local_nentries = registration->nentries;
  880. if (ch->flags & XPC_C_ROPENREQUEST) {
  881. if (registration->msg_size != ch->msg_size) {
  882. /* the local and remote sides aren't the same */
  883. /*
  884. * Because XPC_DISCONNECT_CHANNEL() can block we're
  885. * forced to up the registration sema before we unlock
  886. * the channel lock. But that's okay here because we're
  887. * done with the part that required the registration
  888. * sema. XPC_DISCONNECT_CHANNEL() requires that the
  889. * channel lock be locked and will unlock and relock
  890. * the channel lock as needed.
  891. */
  892. mutex_unlock(&registration->mutex);
  893. XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
  894. &irq_flags);
  895. spin_unlock_irqrestore(&ch->lock, irq_flags);
  896. return xpUnequalMsgSizes;
  897. }
  898. } else {
  899. ch->msg_size = registration->msg_size;
  900. XPC_SET_REASON(ch, 0, 0);
  901. ch->flags &= ~XPC_C_DISCONNECTED;
  902. atomic_inc(&xpc_partitions[ch->partid].nchannels_active);
  903. }
  904. mutex_unlock(&registration->mutex);
  905. /* initiate the connection */
  906. ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING);
  907. xpc_IPI_send_openrequest(ch, &irq_flags);
  908. xpc_process_connect(ch, &irq_flags);
  909. spin_unlock_irqrestore(&ch->lock, irq_flags);
  910. return xpSuccess;
  911. }
  912. /*
  913. * Clear some of the msg flags in the local message queue.
  914. */
  915. static inline void
  916. xpc_clear_local_msgqueue_flags(struct xpc_channel *ch)
  917. {
  918. struct xpc_msg *msg;
  919. s64 get;
  920. get = ch->w_remote_GP.get;
  921. do {
  922. msg = (struct xpc_msg *)((u64)ch->local_msgqueue +
  923. (get % ch->local_nentries) *
  924. ch->msg_size);
  925. msg->flags = 0;
  926. } while (++get < ch->remote_GP.get);
  927. }
  928. /*
  929. * Clear some of the msg flags in the remote message queue.
  930. */
  931. static inline void
  932. xpc_clear_remote_msgqueue_flags(struct xpc_channel *ch)
  933. {
  934. struct xpc_msg *msg;
  935. s64 put;
  936. put = ch->w_remote_GP.put;
  937. do {
  938. msg = (struct xpc_msg *)((u64)ch->remote_msgqueue +
  939. (put % ch->remote_nentries) *
  940. ch->msg_size);
  941. msg->flags = 0;
  942. } while (++put < ch->remote_GP.put);
  943. }
  944. static void
  945. xpc_process_msg_IPI(struct xpc_partition *part, int ch_number)
  946. {
  947. struct xpc_channel *ch = &part->channels[ch_number];
  948. int nmsgs_sent;
  949. ch->remote_GP = part->remote_GPs[ch_number];
  950. /* See what, if anything, has changed for each connected channel */
  951. xpc_msgqueue_ref(ch);
  952. if (ch->w_remote_GP.get == ch->remote_GP.get &&
  953. ch->w_remote_GP.put == ch->remote_GP.put) {
  954. /* nothing changed since GPs were last pulled */
  955. xpc_msgqueue_deref(ch);
  956. return;
  957. }
  958. if (!(ch->flags & XPC_C_CONNECTED)) {
  959. xpc_msgqueue_deref(ch);
  960. return;
  961. }
  962. /*
  963. * First check to see if messages recently sent by us have been
  964. * received by the other side. (The remote GET value will have
  965. * changed since we last looked at it.)
  966. */
  967. if (ch->w_remote_GP.get != ch->remote_GP.get) {
  968. /*
  969. * We need to notify any senders that want to be notified
  970. * that their sent messages have been received by their
  971. * intended recipients. We need to do this before updating
  972. * w_remote_GP.get so that we don't allocate the same message
  973. * queue entries prematurely (see xpc_allocate_msg()).
  974. */
  975. if (atomic_read(&ch->n_to_notify) > 0) {
  976. /*
  977. * Notify senders that messages sent have been
  978. * received and delivered by the other side.
  979. */
  980. xpc_notify_senders(ch, xpMsgDelivered,
  981. ch->remote_GP.get);
  982. }
  983. /*
  984. * Clear msg->flags in previously sent messages, so that
  985. * they're ready for xpc_allocate_msg().
  986. */
  987. xpc_clear_local_msgqueue_flags(ch);
  988. ch->w_remote_GP.get = ch->remote_GP.get;
  989. dev_dbg(xpc_chan, "w_remote_GP.get changed to %ld, partid=%d, "
  990. "channel=%d\n", ch->w_remote_GP.get, ch->partid,
  991. ch->number);
  992. /*
  993. * If anyone was waiting for message queue entries to become
  994. * available, wake them up.
  995. */
  996. if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
  997. wake_up(&ch->msg_allocate_wq);
  998. }
  999. /*
  1000. * Now check for newly sent messages by the other side. (The remote
  1001. * PUT value will have changed since we last looked at it.)
  1002. */
  1003. if (ch->w_remote_GP.put != ch->remote_GP.put) {
  1004. /*
  1005. * Clear msg->flags in previously received messages, so that
  1006. * they're ready for xpc_get_deliverable_msg().
  1007. */
  1008. xpc_clear_remote_msgqueue_flags(ch);
  1009. ch->w_remote_GP.put = ch->remote_GP.put;
  1010. dev_dbg(xpc_chan, "w_remote_GP.put changed to %ld, partid=%d, "
  1011. "channel=%d\n", ch->w_remote_GP.put, ch->partid,
  1012. ch->number);
  1013. nmsgs_sent = ch->w_remote_GP.put - ch->w_local_GP.get;
  1014. if (nmsgs_sent > 0) {
  1015. dev_dbg(xpc_chan, "msgs waiting to be copied and "
  1016. "delivered=%d, partid=%d, channel=%d\n",
  1017. nmsgs_sent, ch->partid, ch->number);
  1018. if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE)
  1019. xpc_activate_kthreads(ch, nmsgs_sent);
  1020. }
  1021. }
  1022. xpc_msgqueue_deref(ch);
  1023. }
  1024. void
  1025. xpc_process_channel_activity(struct xpc_partition *part)
  1026. {
  1027. unsigned long irq_flags;
  1028. u64 IPI_amo, IPI_flags;
  1029. struct xpc_channel *ch;
  1030. int ch_number;
  1031. u32 ch_flags;
  1032. IPI_amo = xpc_get_IPI_flags(part);
  1033. /*
  1034. * Initiate channel connections for registered channels.
  1035. *
  1036. * For each connected channel that has pending messages activate idle
  1037. * kthreads and/or create new kthreads as needed.
  1038. */
  1039. for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
  1040. ch = &part->channels[ch_number];
  1041. /*
  1042. * Process any open or close related IPI flags, and then deal
  1043. * with connecting or disconnecting the channel as required.
  1044. */
  1045. IPI_flags = XPC_GET_IPI_FLAGS(IPI_amo, ch_number);
  1046. if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_flags))
  1047. xpc_process_openclose_IPI(part, ch_number, IPI_flags);
  1048. ch_flags = ch->flags; /* need an atomic snapshot of flags */
  1049. if (ch_flags & XPC_C_DISCONNECTING) {
  1050. spin_lock_irqsave(&ch->lock, irq_flags);
  1051. xpc_process_disconnect(ch, &irq_flags);
  1052. spin_unlock_irqrestore(&ch->lock, irq_flags);
  1053. continue;
  1054. }
  1055. if (part->act_state == XPC_P_DEACTIVATING)
  1056. continue;
  1057. if (!(ch_flags & XPC_C_CONNECTED)) {
  1058. if (!(ch_flags & XPC_C_OPENREQUEST)) {
  1059. DBUG_ON(ch_flags & XPC_C_SETUP);
  1060. (void)xpc_connect_channel(ch);
  1061. } else {
  1062. spin_lock_irqsave(&ch->lock, irq_flags);
  1063. xpc_process_connect(ch, &irq_flags);
  1064. spin_unlock_irqrestore(&ch->lock, irq_flags);
  1065. }
  1066. continue;
  1067. }
  1068. /*
  1069. * Process any message related IPI flags, this may involve the
  1070. * activation of kthreads to deliver any pending messages sent
  1071. * from the other partition.
  1072. */
  1073. if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_flags))
  1074. xpc_process_msg_IPI(part, ch_number);
  1075. }
  1076. }
  1077. /*
  1078. * XPC's heartbeat code calls this function to inform XPC that a partition is
  1079. * going down. XPC responds by tearing down the XPartition Communication
  1080. * infrastructure used for the just downed partition.
  1081. *
  1082. * XPC's heartbeat code will never call this function and xpc_partition_up()
  1083. * at the same time. Nor will it ever make multiple calls to either function
  1084. * at the same time.
  1085. */
  1086. void
  1087. xpc_partition_going_down(struct xpc_partition *part, enum xp_retval reason)
  1088. {
  1089. unsigned long irq_flags;
  1090. int ch_number;
  1091. struct xpc_channel *ch;
  1092. dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n",
  1093. XPC_PARTID(part), reason);
  1094. if (!xpc_part_ref(part)) {
  1095. /* infrastructure for this partition isn't currently set up */
  1096. return;
  1097. }
  1098. /* disconnect channels associated with the partition going down */
  1099. for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
  1100. ch = &part->channels[ch_number];
  1101. xpc_msgqueue_ref(ch);
  1102. spin_lock_irqsave(&ch->lock, irq_flags);
  1103. XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
  1104. spin_unlock_irqrestore(&ch->lock, irq_flags);
  1105. xpc_msgqueue_deref(ch);
  1106. }
  1107. xpc_wakeup_channel_mgr(part);
  1108. xpc_part_deref(part);
  1109. }
  1110. /*
  1111. * Teardown the infrastructure necessary to support XPartition Communication
  1112. * between the specified remote partition and the local one.
  1113. */
  1114. void
  1115. xpc_teardown_infrastructure(struct xpc_partition *part)
  1116. {
  1117. short partid = XPC_PARTID(part);
  1118. /*
  1119. * We start off by making this partition inaccessible to local
  1120. * processes by marking it as no longer setup. Then we make it
  1121. * inaccessible to remote processes by clearing the XPC per partition
  1122. * specific variable's magic # (which indicates that these variables
  1123. * are no longer valid) and by ignoring all XPC notify IPIs sent to
  1124. * this partition.
  1125. */
  1126. DBUG_ON(atomic_read(&part->nchannels_engaged) != 0);
  1127. DBUG_ON(atomic_read(&part->nchannels_active) != 0);
  1128. DBUG_ON(part->setup_state != XPC_P_SETUP);
  1129. part->setup_state = XPC_P_WTEARDOWN;
  1130. xpc_vars_part[partid].magic = 0;
  1131. free_irq(SGI_XPC_NOTIFY, (void *)(u64)partid);
  1132. /*
  1133. * Before proceeding with the teardown we have to wait until all
  1134. * existing references cease.
  1135. */
  1136. wait_event(part->teardown_wq, (atomic_read(&part->references) == 0));
  1137. /* now we can begin tearing down the infrastructure */
  1138. part->setup_state = XPC_P_TORNDOWN;
  1139. /* in case we've still got outstanding timers registered... */
  1140. del_timer_sync(&part->dropped_IPI_timer);
  1141. kfree(part->remote_openclose_args_base);
  1142. part->remote_openclose_args = NULL;
  1143. kfree(part->local_openclose_args_base);
  1144. part->local_openclose_args = NULL;
  1145. kfree(part->remote_GPs_base);
  1146. part->remote_GPs = NULL;
  1147. kfree(part->local_GPs_base);
  1148. part->local_GPs = NULL;
  1149. kfree(part->channels);
  1150. part->channels = NULL;
  1151. part->local_IPI_amo_va = NULL;
  1152. }
  1153. /*
  1154. * Called by XP at the time of channel connection registration to cause
  1155. * XPC to establish connections to all currently active partitions.
  1156. */
  1157. void
  1158. xpc_initiate_connect(int ch_number)
  1159. {
  1160. short partid;
  1161. struct xpc_partition *part;
  1162. struct xpc_channel *ch;
  1163. DBUG_ON(ch_number < 0 || ch_number >= XPC_NCHANNELS);
  1164. for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
  1165. part = &xpc_partitions[partid];
  1166. if (xpc_part_ref(part)) {
  1167. ch = &part->channels[ch_number];
  1168. /*
  1169. * Initiate the establishment of a connection on the
  1170. * newly registered channel to the remote partition.
  1171. */
  1172. xpc_wakeup_channel_mgr(part);
  1173. xpc_part_deref(part);
  1174. }
  1175. }
  1176. }
  1177. void
  1178. xpc_connected_callout(struct xpc_channel *ch)
  1179. {
  1180. /* let the registerer know that a connection has been established */
  1181. if (ch->func != NULL) {
  1182. dev_dbg(xpc_chan, "ch->func() called, reason=xpConnected, "
  1183. "partid=%d, channel=%d\n", ch->partid, ch->number);
  1184. ch->func(xpConnected, ch->partid, ch->number,
  1185. (void *)(u64)ch->local_nentries, ch->key);
  1186. dev_dbg(xpc_chan, "ch->func() returned, reason=xpConnected, "
  1187. "partid=%d, channel=%d\n", ch->partid, ch->number);
  1188. }
  1189. }
  1190. /*
  1191. * Called by XP at the time of channel connection unregistration to cause
  1192. * XPC to teardown all current connections for the specified channel.
  1193. *
  1194. * Before returning xpc_initiate_disconnect() will wait until all connections
  1195. * on the specified channel have been closed/torndown. So the caller can be
  1196. * assured that they will not be receiving any more callouts from XPC to the
  1197. * function they registered via xpc_connect().
  1198. *
  1199. * Arguments:
  1200. *
  1201. * ch_number - channel # to unregister.
  1202. */
  1203. void
  1204. xpc_initiate_disconnect(int ch_number)
  1205. {
  1206. unsigned long irq_flags;
  1207. short partid;
  1208. struct xpc_partition *part;
  1209. struct xpc_channel *ch;
  1210. DBUG_ON(ch_number < 0 || ch_number >= XPC_NCHANNELS);
  1211. /* initiate the channel disconnect for every active partition */
  1212. for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
  1213. part = &xpc_partitions[partid];
  1214. if (xpc_part_ref(part)) {
  1215. ch = &part->channels[ch_number];
  1216. xpc_msgqueue_ref(ch);
  1217. spin_lock_irqsave(&ch->lock, irq_flags);
  1218. if (!(ch->flags & XPC_C_DISCONNECTED)) {
  1219. ch->flags |= XPC_C_WDISCONNECT;
  1220. XPC_DISCONNECT_CHANNEL(ch, xpUnregistering,
  1221. &irq_flags);
  1222. }
  1223. spin_unlock_irqrestore(&ch->lock, irq_flags);
  1224. xpc_msgqueue_deref(ch);
  1225. xpc_part_deref(part);
  1226. }
  1227. }
  1228. xpc_disconnect_wait(ch_number);
  1229. }
  1230. /*
  1231. * To disconnect a channel, and reflect it back to all who may be waiting.
  1232. *
  1233. * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
  1234. * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
  1235. * xpc_disconnect_wait().
  1236. *
  1237. * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
  1238. */
  1239. void
  1240. xpc_disconnect_channel(const int line, struct xpc_channel *ch,
  1241. enum xp_retval reason, unsigned long *irq_flags)
  1242. {
  1243. u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED);
  1244. DBUG_ON(!spin_is_locked(&ch->lock));
  1245. if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED))
  1246. return;
  1247. DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED)));
  1248. dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n",
  1249. reason, line, ch->partid, ch->number);
  1250. XPC_SET_REASON(ch, reason, line);
  1251. ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
  1252. /* some of these may not have been set */
  1253. ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY |
  1254. XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
  1255. XPC_C_CONNECTING | XPC_C_CONNECTED);
  1256. xpc_IPI_send_closerequest(ch, irq_flags);
  1257. if (channel_was_connected)
  1258. ch->flags |= XPC_C_WASCONNECTED;
  1259. spin_unlock_irqrestore(&ch->lock, *irq_flags);
  1260. /* wake all idle kthreads so they can exit */
  1261. if (atomic_read(&ch->kthreads_idle) > 0) {
  1262. wake_up_all(&ch->idle_wq);
  1263. } else if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
  1264. !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) {
  1265. /* start a kthread that will do the xpDisconnecting callout */
  1266. xpc_create_kthreads(ch, 1, 1);
  1267. }
  1268. /* wake those waiting to allocate an entry from the local msg queue */
  1269. if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
  1270. wake_up(&ch->msg_allocate_wq);
  1271. spin_lock_irqsave(&ch->lock, *irq_flags);
  1272. }
  1273. void
  1274. xpc_disconnect_callout(struct xpc_channel *ch, enum xp_retval reason)
  1275. {
  1276. /*
  1277. * Let the channel's registerer know that the channel is being
  1278. * disconnected. We don't want to do this if the registerer was never
  1279. * informed of a connection being made.
  1280. */
  1281. if (ch->func != NULL) {
  1282. dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, "
  1283. "channel=%d\n", reason, ch->partid, ch->number);
  1284. ch->func(reason, ch->partid, ch->number, NULL, ch->key);
  1285. dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, "
  1286. "channel=%d\n", reason, ch->partid, ch->number);
  1287. }
  1288. }
  1289. /*
  1290. * Wait for a message entry to become available for the specified channel,
  1291. * but don't wait any longer than 1 jiffy.
  1292. */
  1293. static enum xp_retval
  1294. xpc_allocate_msg_wait(struct xpc_channel *ch)
  1295. {
  1296. enum xp_retval ret;
  1297. if (ch->flags & XPC_C_DISCONNECTING) {
  1298. DBUG_ON(ch->reason == xpInterrupted);
  1299. return ch->reason;
  1300. }
  1301. atomic_inc(&ch->n_on_msg_allocate_wq);
  1302. ret = interruptible_sleep_on_timeout(&ch->msg_allocate_wq, 1);
  1303. atomic_dec(&ch->n_on_msg_allocate_wq);
  1304. if (ch->flags & XPC_C_DISCONNECTING) {
  1305. ret = ch->reason;
  1306. DBUG_ON(ch->reason == xpInterrupted);
  1307. } else if (ret == 0) {
  1308. ret = xpTimeout;
  1309. } else {
  1310. ret = xpInterrupted;
  1311. }
  1312. return ret;
  1313. }
  1314. /*
  1315. * Allocate an entry for a message from the message queue associated with the
  1316. * specified channel.
  1317. */
  1318. static enum xp_retval
  1319. xpc_allocate_msg(struct xpc_channel *ch, u32 flags,
  1320. struct xpc_msg **address_of_msg)
  1321. {
  1322. struct xpc_msg *msg;
  1323. enum xp_retval ret;
  1324. s64 put;
  1325. /* this reference will be dropped in xpc_send_msg() */
  1326. xpc_msgqueue_ref(ch);
  1327. if (ch->flags & XPC_C_DISCONNECTING) {
  1328. xpc_msgqueue_deref(ch);
  1329. return ch->reason;
  1330. }
  1331. if (!(ch->flags & XPC_C_CONNECTED)) {
  1332. xpc_msgqueue_deref(ch);
  1333. return xpNotConnected;
  1334. }
  1335. /*
  1336. * Get the next available message entry from the local message queue.
  1337. * If none are available, we'll make sure that we grab the latest
  1338. * GP values.
  1339. */
  1340. ret = xpTimeout;
  1341. while (1) {
  1342. put = ch->w_local_GP.put;
  1343. rmb(); /* guarantee that .put loads before .get */
  1344. if (put - ch->w_remote_GP.get < ch->local_nentries) {
  1345. /* There are available message entries. We need to try
  1346. * to secure one for ourselves. We'll do this by trying
  1347. * to increment w_local_GP.put as long as someone else
  1348. * doesn't beat us to it. If they do, we'll have to
  1349. * try again.
  1350. */
  1351. if (cmpxchg(&ch->w_local_GP.put, put, put + 1) == put) {
  1352. /* we got the entry referenced by put */
  1353. break;
  1354. }
  1355. continue; /* try again */
  1356. }
  1357. /*
  1358. * There aren't any available msg entries at this time.
  1359. *
  1360. * In waiting for a message entry to become available,
  1361. * we set a timeout in case the other side is not
  1362. * sending completion IPIs. This lets us fake an IPI
  1363. * that will cause the IPI handler to fetch the latest
  1364. * GP values as if an IPI was sent by the other side.
  1365. */
  1366. if (ret == xpTimeout)
  1367. xpc_IPI_send_local_msgrequest(ch);
  1368. if (flags & XPC_NOWAIT) {
  1369. xpc_msgqueue_deref(ch);
  1370. return xpNoWait;
  1371. }
  1372. ret = xpc_allocate_msg_wait(ch);
  1373. if (ret != xpInterrupted && ret != xpTimeout) {
  1374. xpc_msgqueue_deref(ch);
  1375. return ret;
  1376. }
  1377. }
  1378. /* get the message's address and initialize it */
  1379. msg = (struct xpc_msg *)((u64)ch->local_msgqueue +
  1380. (put % ch->local_nentries) * ch->msg_size);
  1381. DBUG_ON(msg->flags != 0);
  1382. msg->number = put;
  1383. dev_dbg(xpc_chan, "w_local_GP.put changed to %ld; msg=0x%p, "
  1384. "msg_number=%ld, partid=%d, channel=%d\n", put + 1,
  1385. (void *)msg, msg->number, ch->partid, ch->number);
  1386. *address_of_msg = msg;
  1387. return xpSuccess;
  1388. }
  1389. /*
  1390. * Allocate an entry for a message from the message queue associated with the
  1391. * specified channel. NOTE that this routine can sleep waiting for a message
  1392. * entry to become available. To not sleep, pass in the XPC_NOWAIT flag.
  1393. *
  1394. * Arguments:
  1395. *
  1396. * partid - ID of partition to which the channel is connected.
  1397. * ch_number - channel #.
  1398. * flags - see xpc.h for valid flags.
  1399. * payload - address of the allocated payload area pointer (filled in on
  1400. * return) in which the user-defined message is constructed.
  1401. */
  1402. enum xp_retval
  1403. xpc_initiate_allocate(short partid, int ch_number, u32 flags, void **payload)
  1404. {
  1405. struct xpc_partition *part = &xpc_partitions[partid];
  1406. enum xp_retval ret = xpUnknownReason;
  1407. struct xpc_msg *msg = NULL;
  1408. DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
  1409. DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
  1410. *payload = NULL;
  1411. if (xpc_part_ref(part)) {
  1412. ret = xpc_allocate_msg(&part->channels[ch_number], flags, &msg);
  1413. xpc_part_deref(part);
  1414. if (msg != NULL)
  1415. *payload = &msg->payload;
  1416. }
  1417. return ret;
  1418. }
  1419. /*
  1420. * Now we actually send the messages that are ready to be sent by advancing
  1421. * the local message queue's Put value and then send an IPI to the recipient
  1422. * partition.
  1423. */
  1424. static void
  1425. xpc_send_msgs(struct xpc_channel *ch, s64 initial_put)
  1426. {
  1427. struct xpc_msg *msg;
  1428. s64 put = initial_put + 1;
  1429. int send_IPI = 0;
  1430. while (1) {
  1431. while (1) {
  1432. if (put == ch->w_local_GP.put)
  1433. break;
  1434. msg = (struct xpc_msg *)((u64)ch->local_msgqueue +
  1435. (put % ch->local_nentries) *
  1436. ch->msg_size);
  1437. if (!(msg->flags & XPC_M_READY))
  1438. break;
  1439. put++;
  1440. }
  1441. if (put == initial_put) {
  1442. /* nothing's changed */
  1443. break;
  1444. }
  1445. if (cmpxchg_rel(&ch->local_GP->put, initial_put, put) !=
  1446. initial_put) {
  1447. /* someone else beat us to it */
  1448. DBUG_ON(ch->local_GP->put < initial_put);
  1449. break;
  1450. }
  1451. /* we just set the new value of local_GP->put */
  1452. dev_dbg(xpc_chan, "local_GP->put changed to %ld, partid=%d, "
  1453. "channel=%d\n", put, ch->partid, ch->number);
  1454. send_IPI = 1;
  1455. /*
  1456. * We need to ensure that the message referenced by
  1457. * local_GP->put is not XPC_M_READY or that local_GP->put
  1458. * equals w_local_GP.put, so we'll go have a look.
  1459. */
  1460. initial_put = put;
  1461. }
  1462. if (send_IPI)
  1463. xpc_IPI_send_msgrequest(ch);
  1464. }
  1465. /*
  1466. * Common code that does the actual sending of the message by advancing the
  1467. * local message queue's Put value and sends an IPI to the partition the
  1468. * message is being sent to.
  1469. */
  1470. static enum xp_retval
  1471. xpc_send_msg(struct xpc_channel *ch, struct xpc_msg *msg, u8 notify_type,
  1472. xpc_notify_func func, void *key)
  1473. {
  1474. enum xp_retval ret = xpSuccess;
  1475. struct xpc_notify *notify = notify;
  1476. s64 put, msg_number = msg->number;
  1477. DBUG_ON(notify_type == XPC_N_CALL && func == NULL);
  1478. DBUG_ON((((u64)msg - (u64)ch->local_msgqueue) / ch->msg_size) !=
  1479. msg_number % ch->local_nentries);
  1480. DBUG_ON(msg->flags & XPC_M_READY);
  1481. if (ch->flags & XPC_C_DISCONNECTING) {
  1482. /* drop the reference grabbed in xpc_allocate_msg() */
  1483. xpc_msgqueue_deref(ch);
  1484. return ch->reason;
  1485. }
  1486. if (notify_type != 0) {
  1487. /*
  1488. * Tell the remote side to send an ACK interrupt when the
  1489. * message has been delivered.
  1490. */
  1491. msg->flags |= XPC_M_INTERRUPT;
  1492. atomic_inc(&ch->n_to_notify);
  1493. notify = &ch->notify_queue[msg_number % ch->local_nentries];
  1494. notify->func = func;
  1495. notify->key = key;
  1496. notify->type = notify_type;
  1497. /* >>> is a mb() needed here? */
  1498. if (ch->flags & XPC_C_DISCONNECTING) {
  1499. /*
  1500. * An error occurred between our last error check and
  1501. * this one. We will try to clear the type field from
  1502. * the notify entry. If we succeed then
  1503. * xpc_disconnect_channel() didn't already process
  1504. * the notify entry.
  1505. */
  1506. if (cmpxchg(&notify->type, notify_type, 0) ==
  1507. notify_type) {
  1508. atomic_dec(&ch->n_to_notify);
  1509. ret = ch->reason;
  1510. }
  1511. /* drop the reference grabbed in xpc_allocate_msg() */
  1512. xpc_msgqueue_deref(ch);
  1513. return ret;
  1514. }
  1515. }
  1516. msg->flags |= XPC_M_READY;
  1517. /*
  1518. * The preceding store of msg->flags must occur before the following
  1519. * load of ch->local_GP->put.
  1520. */
  1521. mb();
  1522. /* see if the message is next in line to be sent, if so send it */
  1523. put = ch->local_GP->put;
  1524. if (put == msg_number)
  1525. xpc_send_msgs(ch, put);
  1526. /* drop the reference grabbed in xpc_allocate_msg() */
  1527. xpc_msgqueue_deref(ch);
  1528. return ret;
  1529. }
  1530. /*
  1531. * Send a message previously allocated using xpc_initiate_allocate() on the
  1532. * specified channel connected to the specified partition.
  1533. *
  1534. * This routine will not wait for the message to be received, nor will
  1535. * notification be given when it does happen. Once this routine has returned
  1536. * the message entry allocated via xpc_initiate_allocate() is no longer
  1537. * accessable to the caller.
  1538. *
  1539. * This routine, although called by users, does not call xpc_part_ref() to
  1540. * ensure that the partition infrastructure is in place. It relies on the
  1541. * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
  1542. *
  1543. * Arguments:
  1544. *
  1545. * partid - ID of partition to which the channel is connected.
  1546. * ch_number - channel # to send message on.
  1547. * payload - pointer to the payload area allocated via
  1548. * xpc_initiate_allocate().
  1549. */
  1550. enum xp_retval
  1551. xpc_initiate_send(short partid, int ch_number, void *payload)
  1552. {
  1553. struct xpc_partition *part = &xpc_partitions[partid];
  1554. struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
  1555. enum xp_retval ret;
  1556. dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg,
  1557. partid, ch_number);
  1558. DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
  1559. DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
  1560. DBUG_ON(msg == NULL);
  1561. ret = xpc_send_msg(&part->channels[ch_number], msg, 0, NULL, NULL);
  1562. return ret;
  1563. }
  1564. /*
  1565. * Send a message previously allocated using xpc_initiate_allocate on the
  1566. * specified channel connected to the specified partition.
  1567. *
  1568. * This routine will not wait for the message to be sent. Once this routine
  1569. * has returned the message entry allocated via xpc_initiate_allocate() is no
  1570. * longer accessable to the caller.
  1571. *
  1572. * Once the remote end of the channel has received the message, the function
  1573. * passed as an argument to xpc_initiate_send_notify() will be called. This
  1574. * allows the sender to free up or re-use any buffers referenced by the
  1575. * message, but does NOT mean the message has been processed at the remote
  1576. * end by a receiver.
  1577. *
  1578. * If this routine returns an error, the caller's function will NOT be called.
  1579. *
  1580. * This routine, although called by users, does not call xpc_part_ref() to
  1581. * ensure that the partition infrastructure is in place. It relies on the
  1582. * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
  1583. *
  1584. * Arguments:
  1585. *
  1586. * partid - ID of partition to which the channel is connected.
  1587. * ch_number - channel # to send message on.
  1588. * payload - pointer to the payload area allocated via
  1589. * xpc_initiate_allocate().
  1590. * func - function to call with asynchronous notification of message
  1591. * receipt. THIS FUNCTION MUST BE NON-BLOCKING.
  1592. * key - user-defined key to be passed to the function when it's called.
  1593. */
  1594. enum xp_retval
  1595. xpc_initiate_send_notify(short partid, int ch_number, void *payload,
  1596. xpc_notify_func func, void *key)
  1597. {
  1598. struct xpc_partition *part = &xpc_partitions[partid];
  1599. struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
  1600. enum xp_retval ret;
  1601. dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg,
  1602. partid, ch_number);
  1603. DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
  1604. DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
  1605. DBUG_ON(msg == NULL);
  1606. DBUG_ON(func == NULL);
  1607. ret = xpc_send_msg(&part->channels[ch_number], msg, XPC_N_CALL,
  1608. func, key);
  1609. return ret;
  1610. }
  1611. static struct xpc_msg *
  1612. xpc_pull_remote_msg(struct xpc_channel *ch, s64 get)
  1613. {
  1614. struct xpc_partition *part = &xpc_partitions[ch->partid];
  1615. struct xpc_msg *remote_msg, *msg;
  1616. u32 msg_index, nmsgs;
  1617. u64 msg_offset;
  1618. enum xp_retval ret;
  1619. if (mutex_lock_interruptible(&ch->msg_to_pull_mutex) != 0) {
  1620. /* we were interrupted by a signal */
  1621. return NULL;
  1622. }
  1623. while (get >= ch->next_msg_to_pull) {
  1624. /* pull as many messages as are ready and able to be pulled */
  1625. msg_index = ch->next_msg_to_pull % ch->remote_nentries;
  1626. DBUG_ON(ch->next_msg_to_pull >= ch->w_remote_GP.put);
  1627. nmsgs = ch->w_remote_GP.put - ch->next_msg_to_pull;
  1628. if (msg_index + nmsgs > ch->remote_nentries) {
  1629. /* ignore the ones that wrap the msg queue for now */
  1630. nmsgs = ch->remote_nentries - msg_index;
  1631. }
  1632. msg_offset = msg_index * ch->msg_size;
  1633. msg = (struct xpc_msg *)((u64)ch->remote_msgqueue + msg_offset);
  1634. remote_msg = (struct xpc_msg *)(ch->remote_msgqueue_pa +
  1635. msg_offset);
  1636. ret = xpc_pull_remote_cachelines(part, msg, remote_msg,
  1637. nmsgs * ch->msg_size);
  1638. if (ret != xpSuccess) {
  1639. dev_dbg(xpc_chan, "failed to pull %d msgs starting with"
  1640. " msg %ld from partition %d, channel=%d, "
  1641. "ret=%d\n", nmsgs, ch->next_msg_to_pull,
  1642. ch->partid, ch->number, ret);
  1643. XPC_DEACTIVATE_PARTITION(part, ret);
  1644. mutex_unlock(&ch->msg_to_pull_mutex);
  1645. return NULL;
  1646. }
  1647. ch->next_msg_to_pull += nmsgs;
  1648. }
  1649. mutex_unlock(&ch->msg_to_pull_mutex);
  1650. /* return the message we were looking for */
  1651. msg_offset = (get % ch->remote_nentries) * ch->msg_size;
  1652. msg = (struct xpc_msg *)((u64)ch->remote_msgqueue + msg_offset);
  1653. return msg;
  1654. }
  1655. /*
  1656. * Get a message to be delivered.
  1657. */
  1658. static struct xpc_msg *
  1659. xpc_get_deliverable_msg(struct xpc_channel *ch)
  1660. {
  1661. struct xpc_msg *msg = NULL;
  1662. s64 get;
  1663. do {
  1664. if (ch->flags & XPC_C_DISCONNECTING)
  1665. break;
  1666. get = ch->w_local_GP.get;
  1667. rmb(); /* guarantee that .get loads before .put */
  1668. if (get == ch->w_remote_GP.put)
  1669. break;
  1670. /* There are messages waiting to be pulled and delivered.
  1671. * We need to try to secure one for ourselves. We'll do this
  1672. * by trying to increment w_local_GP.get and hope that no one
  1673. * else beats us to it. If they do, we'll we'll simply have
  1674. * to try again for the next one.
  1675. */
  1676. if (cmpxchg(&ch->w_local_GP.get, get, get + 1) == get) {
  1677. /* we got the entry referenced by get */
  1678. dev_dbg(xpc_chan, "w_local_GP.get changed to %ld, "
  1679. "partid=%d, channel=%d\n", get + 1,
  1680. ch->partid, ch->number);
  1681. /* pull the message from the remote partition */
  1682. msg = xpc_pull_remote_msg(ch, get);
  1683. DBUG_ON(msg != NULL && msg->number != get);
  1684. DBUG_ON(msg != NULL && (msg->flags & XPC_M_DONE));
  1685. DBUG_ON(msg != NULL && !(msg->flags & XPC_M_READY));
  1686. break;
  1687. }
  1688. } while (1);
  1689. return msg;
  1690. }
  1691. /*
  1692. * Deliver a message to its intended recipient.
  1693. */
  1694. void
  1695. xpc_deliver_msg(struct xpc_channel *ch)
  1696. {
  1697. struct xpc_msg *msg;
  1698. msg = xpc_get_deliverable_msg(ch);
  1699. if (msg != NULL) {
  1700. /*
  1701. * This ref is taken to protect the payload itself from being
  1702. * freed before the user is finished with it, which the user
  1703. * indicates by calling xpc_initiate_received().
  1704. */
  1705. xpc_msgqueue_ref(ch);
  1706. atomic_inc(&ch->kthreads_active);
  1707. if (ch->func != NULL) {
  1708. dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, "
  1709. "msg_number=%ld, partid=%d, channel=%d\n",
  1710. (void *)msg, msg->number, ch->partid,
  1711. ch->number);
  1712. /* deliver the message to its intended recipient */
  1713. ch->func(xpMsgReceived, ch->partid, ch->number,
  1714. &msg->payload, ch->key);
  1715. dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, "
  1716. "msg_number=%ld, partid=%d, channel=%d\n",
  1717. (void *)msg, msg->number, ch->partid,
  1718. ch->number);
  1719. }
  1720. atomic_dec(&ch->kthreads_active);
  1721. }
  1722. }
  1723. /*
  1724. * Now we actually acknowledge the messages that have been delivered and ack'd
  1725. * by advancing the cached remote message queue's Get value and if requested
  1726. * send an IPI to the message sender's partition.
  1727. */
  1728. static void
  1729. xpc_acknowledge_msgs(struct xpc_channel *ch, s64 initial_get, u8 msg_flags)
  1730. {
  1731. struct xpc_msg *msg;
  1732. s64 get = initial_get + 1;
  1733. int send_IPI = 0;
  1734. while (1) {
  1735. while (1) {
  1736. if (get == ch->w_local_GP.get)
  1737. break;
  1738. msg = (struct xpc_msg *)((u64)ch->remote_msgqueue +
  1739. (get % ch->remote_nentries) *
  1740. ch->msg_size);
  1741. if (!(msg->flags & XPC_M_DONE))
  1742. break;
  1743. msg_flags |= msg->flags;
  1744. get++;
  1745. }
  1746. if (get == initial_get) {
  1747. /* nothing's changed */
  1748. break;
  1749. }
  1750. if (cmpxchg_rel(&ch->local_GP->get, initial_get, get) !=
  1751. initial_get) {
  1752. /* someone else beat us to it */
  1753. DBUG_ON(ch->local_GP->get <= initial_get);
  1754. break;
  1755. }
  1756. /* we just set the new value of local_GP->get */
  1757. dev_dbg(xpc_chan, "local_GP->get changed to %ld, partid=%d, "
  1758. "channel=%d\n", get, ch->partid, ch->number);
  1759. send_IPI = (msg_flags & XPC_M_INTERRUPT);
  1760. /*
  1761. * We need to ensure that the message referenced by
  1762. * local_GP->get is not XPC_M_DONE or that local_GP->get
  1763. * equals w_local_GP.get, so we'll go have a look.
  1764. */
  1765. initial_get = get;
  1766. }
  1767. if (send_IPI)
  1768. xpc_IPI_send_msgrequest(ch);
  1769. }
  1770. /*
  1771. * Acknowledge receipt of a delivered message.
  1772. *
  1773. * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition
  1774. * that sent the message.
  1775. *
  1776. * This function, although called by users, does not call xpc_part_ref() to
  1777. * ensure that the partition infrastructure is in place. It relies on the
  1778. * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg().
  1779. *
  1780. * Arguments:
  1781. *
  1782. * partid - ID of partition to which the channel is connected.
  1783. * ch_number - channel # message received on.
  1784. * payload - pointer to the payload area allocated via
  1785. * xpc_initiate_allocate().
  1786. */
  1787. void
  1788. xpc_initiate_received(short partid, int ch_number, void *payload)
  1789. {
  1790. struct xpc_partition *part = &xpc_partitions[partid];
  1791. struct xpc_channel *ch;
  1792. struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
  1793. s64 get, msg_number = msg->number;
  1794. DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
  1795. DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
  1796. ch = &part->channels[ch_number];
  1797. dev_dbg(xpc_chan, "msg=0x%p, msg_number=%ld, partid=%d, channel=%d\n",
  1798. (void *)msg, msg_number, ch->partid, ch->number);
  1799. DBUG_ON((((u64)msg - (u64)ch->remote_msgqueue) / ch->msg_size) !=
  1800. msg_number % ch->remote_nentries);
  1801. DBUG_ON(msg->flags & XPC_M_DONE);
  1802. msg->flags |= XPC_M_DONE;
  1803. /*
  1804. * The preceding store of msg->flags must occur before the following
  1805. * load of ch->local_GP->get.
  1806. */
  1807. mb();
  1808. /*
  1809. * See if this message is next in line to be acknowledged as having
  1810. * been delivered.
  1811. */
  1812. get = ch->local_GP->get;
  1813. if (get == msg_number)
  1814. xpc_acknowledge_msgs(ch, get, msg->flags);
  1815. /* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg() */
  1816. xpc_msgqueue_deref(ch);
  1817. }