|
@@ -38,23 +38,22 @@
|
|
|
#include "dbg.h"
|
|
|
#include "subscr.h"
|
|
|
#include "name_table.h"
|
|
|
+#include "port.h"
|
|
|
#include "ref.h"
|
|
|
|
|
|
/**
|
|
|
* struct subscriber - TIPC network topology subscriber
|
|
|
- * @ref: object reference to subscriber object itself
|
|
|
- * @lock: pointer to spinlock controlling access to subscriber object
|
|
|
+ * @port_ref: object reference to server port connecting to subscriber
|
|
|
+ * @lock: pointer to spinlock controlling access to subscriber's server port
|
|
|
* @subscriber_list: adjacent subscribers in top. server's list of subscribers
|
|
|
* @subscription_list: list of subscription objects for this subscriber
|
|
|
- * @port_ref: object reference to port used to communicate with subscriber
|
|
|
*/
|
|
|
|
|
|
struct subscriber {
|
|
|
- u32 ref;
|
|
|
+ u32 port_ref;
|
|
|
spinlock_t *lock;
|
|
|
struct list_head subscriber_list;
|
|
|
struct list_head subscription_list;
|
|
|
- u32 port_ref;
|
|
|
};
|
|
|
|
|
|
/**
|
|
@@ -91,6 +90,9 @@ static u32 htohl(u32 in, int swap)
|
|
|
|
|
|
/**
|
|
|
* subscr_send_event - send a message containing a tipc_event to the subscriber
|
|
|
+ *
|
|
|
+ * Note: Must not hold subscriber's server port lock, since tipc_send() will
|
|
|
+ * try to take the lock if the message is rejected and returned!
|
|
|
*/
|
|
|
|
|
|
static void subscr_send_event(struct subscription *sub,
|
|
@@ -110,7 +112,7 @@ static void subscr_send_event(struct subscription *sub,
|
|
|
sub->evt.found_upper = htohl(found_upper, sub->swap);
|
|
|
sub->evt.port.ref = htohl(port_ref, sub->swap);
|
|
|
sub->evt.port.node = htohl(node, sub->swap);
|
|
|
- tipc_send(sub->owner->port_ref, 1, &msg_sect);
|
|
|
+ tipc_send(sub->server_ref, 1, &msg_sect);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -163,20 +165,18 @@ void tipc_subscr_report_overlap(struct subscription *sub,
|
|
|
|
|
|
static void subscr_timeout(struct subscription *sub)
|
|
|
{
|
|
|
- struct subscriber *subscriber;
|
|
|
- u32 subscriber_ref;
|
|
|
+ struct port *server_port;
|
|
|
|
|
|
- /* Validate subscriber reference (in case subscriber is terminating) */
|
|
|
+ /* Validate server port reference (in case subscriber is terminating) */
|
|
|
|
|
|
- subscriber_ref = sub->owner->ref;
|
|
|
- subscriber = (struct subscriber *)tipc_ref_lock(subscriber_ref);
|
|
|
- if (subscriber == NULL)
|
|
|
+ server_port = tipc_port_lock(sub->server_ref);
|
|
|
+ if (server_port == NULL)
|
|
|
return;
|
|
|
|
|
|
/* Validate timeout (in case subscription is being cancelled) */
|
|
|
|
|
|
if (sub->timeout == TIPC_WAIT_FOREVER) {
|
|
|
- tipc_ref_unlock(subscriber_ref);
|
|
|
+ tipc_port_unlock(server_port);
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -184,19 +184,21 @@ static void subscr_timeout(struct subscription *sub)
|
|
|
|
|
|
tipc_nametbl_unsubscribe(sub);
|
|
|
|
|
|
- /* Notify subscriber of timeout, then unlink subscription */
|
|
|
+ /* Unlink subscription from subscriber */
|
|
|
|
|
|
- subscr_send_event(sub,
|
|
|
- sub->evt.s.seq.lower,
|
|
|
- sub->evt.s.seq.upper,
|
|
|
- TIPC_SUBSCR_TIMEOUT,
|
|
|
- 0,
|
|
|
- 0);
|
|
|
list_del(&sub->subscription_list);
|
|
|
|
|
|
+ /* Release subscriber's server port */
|
|
|
+
|
|
|
+ tipc_port_unlock(server_port);
|
|
|
+
|
|
|
+ /* Notify subscriber of timeout */
|
|
|
+
|
|
|
+ subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
|
|
|
+ TIPC_SUBSCR_TIMEOUT, 0, 0);
|
|
|
+
|
|
|
/* Now destroy subscription */
|
|
|
|
|
|
- tipc_ref_unlock(subscriber_ref);
|
|
|
k_term_timer(&sub->timer);
|
|
|
kfree(sub);
|
|
|
atomic_dec(&topsrv.subscription_count);
|
|
@@ -205,7 +207,7 @@ static void subscr_timeout(struct subscription *sub)
|
|
|
/**
|
|
|
* subscr_del - delete a subscription within a subscription list
|
|
|
*
|
|
|
- * Called with subscriber locked.
|
|
|
+ * Called with subscriber port locked.
|
|
|
*/
|
|
|
|
|
|
static void subscr_del(struct subscription *sub)
|
|
@@ -219,7 +221,7 @@ static void subscr_del(struct subscription *sub)
|
|
|
/**
|
|
|
* subscr_terminate - terminate communication with a subscriber
|
|
|
*
|
|
|
- * Called with subscriber locked. Routine must temporarily release this lock
|
|
|
+ * Called with subscriber port locked. Routine must temporarily release lock
|
|
|
* to enable subscription timeout routine(s) to finish without deadlocking;
|
|
|
* the lock is then reclaimed to allow caller to release it upon return.
|
|
|
* (This should work even in the unlikely event some other thread creates
|
|
@@ -229,14 +231,21 @@ static void subscr_del(struct subscription *sub)
|
|
|
|
|
|
static void subscr_terminate(struct subscriber *subscriber)
|
|
|
{
|
|
|
+ u32 port_ref;
|
|
|
struct subscription *sub;
|
|
|
struct subscription *sub_temp;
|
|
|
|
|
|
/* Invalidate subscriber reference */
|
|
|
|
|
|
- tipc_ref_discard(subscriber->ref);
|
|
|
+ port_ref = subscriber->port_ref;
|
|
|
+ subscriber->port_ref = 0;
|
|
|
spin_unlock_bh(subscriber->lock);
|
|
|
|
|
|
+ /* Sever connection to subscriber */
|
|
|
+
|
|
|
+ tipc_shutdown(port_ref);
|
|
|
+ tipc_deleteport(port_ref);
|
|
|
+
|
|
|
/* Destroy any existing subscriptions for subscriber */
|
|
|
|
|
|
list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
|
|
@@ -250,27 +259,25 @@ static void subscr_terminate(struct subscriber *subscriber)
|
|
|
subscr_del(sub);
|
|
|
}
|
|
|
|
|
|
- /* Sever connection to subscriber */
|
|
|
-
|
|
|
- tipc_shutdown(subscriber->port_ref);
|
|
|
- tipc_deleteport(subscriber->port_ref);
|
|
|
-
|
|
|
/* Remove subscriber from topology server's subscriber list */
|
|
|
|
|
|
spin_lock_bh(&topsrv.lock);
|
|
|
list_del(&subscriber->subscriber_list);
|
|
|
spin_unlock_bh(&topsrv.lock);
|
|
|
|
|
|
- /* Now destroy subscriber */
|
|
|
+ /* Reclaim subscriber lock */
|
|
|
|
|
|
spin_lock_bh(subscriber->lock);
|
|
|
+
|
|
|
+ /* Now destroy subscriber */
|
|
|
+
|
|
|
kfree(subscriber);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* subscr_cancel - handle subscription cancellation request
|
|
|
*
|
|
|
- * Called with subscriber locked. Routine must temporarily release this lock
|
|
|
+ * Called with subscriber port locked. Routine must temporarily release lock
|
|
|
* to enable the subscription timeout routine to finish without deadlocking;
|
|
|
* the lock is then reclaimed to allow caller to release it upon return.
|
|
|
*
|
|
@@ -313,11 +320,11 @@ static void subscr_cancel(struct tipc_subscr *s,
|
|
|
/**
|
|
|
* subscr_subscribe - create subscription for subscriber
|
|
|
*
|
|
|
- * Called with subscriber locked
|
|
|
+ * Called with subscriber port locked.
|
|
|
*/
|
|
|
|
|
|
-static void subscr_subscribe(struct tipc_subscr *s,
|
|
|
- struct subscriber *subscriber)
|
|
|
+static struct subscription *subscr_subscribe(struct tipc_subscr *s,
|
|
|
+ struct subscriber *subscriber)
|
|
|
{
|
|
|
struct subscription *sub;
|
|
|
int swap;
|
|
@@ -331,7 +338,7 @@ static void subscr_subscribe(struct tipc_subscr *s,
|
|
|
if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
|
|
|
s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
|
|
|
subscr_cancel(s, subscriber);
|
|
|
- return;
|
|
|
+ return NULL;
|
|
|
}
|
|
|
|
|
|
/* Refuse subscription if global limit exceeded */
|
|
@@ -340,16 +347,16 @@ static void subscr_subscribe(struct tipc_subscr *s,
|
|
|
warn("Subscription rejected, subscription limit reached (%u)\n",
|
|
|
tipc_max_subscriptions);
|
|
|
subscr_terminate(subscriber);
|
|
|
- return;
|
|
|
+ return NULL;
|
|
|
}
|
|
|
|
|
|
/* Allocate subscription object */
|
|
|
|
|
|
- sub = kzalloc(sizeof(*sub), GFP_ATOMIC);
|
|
|
+ sub = kmalloc(sizeof(*sub), GFP_ATOMIC);
|
|
|
if (!sub) {
|
|
|
warn("Subscription rejected, no memory\n");
|
|
|
subscr_terminate(subscriber);
|
|
|
- return;
|
|
|
+ return NULL;
|
|
|
}
|
|
|
|
|
|
/* Initialize subscription object */
|
|
@@ -365,40 +372,41 @@ static void subscr_subscribe(struct tipc_subscr *s,
|
|
|
warn("Subscription rejected, illegal request\n");
|
|
|
kfree(sub);
|
|
|
subscr_terminate(subscriber);
|
|
|
- return;
|
|
|
+ return NULL;
|
|
|
}
|
|
|
sub->event_cb = subscr_send_event;
|
|
|
- memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
|
|
|
- INIT_LIST_HEAD(&sub->subscription_list);
|
|
|
INIT_LIST_HEAD(&sub->nameseq_list);
|
|
|
list_add(&sub->subscription_list, &subscriber->subscription_list);
|
|
|
+ sub->server_ref = subscriber->port_ref;
|
|
|
sub->swap = swap;
|
|
|
+ memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
|
|
|
atomic_inc(&topsrv.subscription_count);
|
|
|
if (sub->timeout != TIPC_WAIT_FOREVER) {
|
|
|
k_init_timer(&sub->timer,
|
|
|
(Handler)subscr_timeout, (unsigned long)sub);
|
|
|
k_start_timer(&sub->timer, sub->timeout);
|
|
|
}
|
|
|
- sub->owner = subscriber;
|
|
|
- tipc_nametbl_subscribe(sub);
|
|
|
+
|
|
|
+ return sub;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* subscr_conn_shutdown_event - handle termination request from subscriber
|
|
|
+ *
|
|
|
+ * Called with subscriber's server port unlocked.
|
|
|
*/
|
|
|
|
|
|
static void subscr_conn_shutdown_event(void *usr_handle,
|
|
|
- u32 portref,
|
|
|
+ u32 port_ref,
|
|
|
struct sk_buff **buf,
|
|
|
unsigned char const *data,
|
|
|
unsigned int size,
|
|
|
int reason)
|
|
|
{
|
|
|
- struct subscriber *subscriber;
|
|
|
+ struct subscriber *subscriber = usr_handle;
|
|
|
spinlock_t *subscriber_lock;
|
|
|
|
|
|
- subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle);
|
|
|
- if (subscriber == NULL)
|
|
|
+ if (tipc_port_lock(port_ref) == NULL)
|
|
|
return;
|
|
|
|
|
|
subscriber_lock = subscriber->lock;
|
|
@@ -408,6 +416,8 @@ static void subscr_conn_shutdown_event(void *usr_handle,
|
|
|
|
|
|
/**
|
|
|
* subscr_conn_msg_event - handle new subscription request from subscriber
|
|
|
+ *
|
|
|
+ * Called with subscriber's server port unlocked.
|
|
|
*/
|
|
|
|
|
|
static void subscr_conn_msg_event(void *usr_handle,
|
|
@@ -416,20 +426,46 @@ static void subscr_conn_msg_event(void *usr_handle,
|
|
|
const unchar *data,
|
|
|
u32 size)
|
|
|
{
|
|
|
- struct subscriber *subscriber;
|
|
|
+ struct subscriber *subscriber = usr_handle;
|
|
|
spinlock_t *subscriber_lock;
|
|
|
+ struct subscription *sub;
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Lock subscriber's server port (& make a local copy of lock pointer,
|
|
|
+ * in case subscriber is deleted while processing subscription request)
|
|
|
+ */
|
|
|
|
|
|
- subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle);
|
|
|
- if (subscriber == NULL)
|
|
|
+ if (tipc_port_lock(port_ref) == NULL)
|
|
|
return;
|
|
|
|
|
|
subscriber_lock = subscriber->lock;
|
|
|
- if (size != sizeof(struct tipc_subscr))
|
|
|
- subscr_terminate(subscriber);
|
|
|
- else
|
|
|
- subscr_subscribe((struct tipc_subscr *)data, subscriber);
|
|
|
|
|
|
- spin_unlock_bh(subscriber_lock);
|
|
|
+ if (size != sizeof(struct tipc_subscr)) {
|
|
|
+ subscr_terminate(subscriber);
|
|
|
+ spin_unlock_bh(subscriber_lock);
|
|
|
+ } else {
|
|
|
+ sub = subscr_subscribe((struct tipc_subscr *)data, subscriber);
|
|
|
+ spin_unlock_bh(subscriber_lock);
|
|
|
+ if (sub != NULL) {
|
|
|
+
|
|
|
+ /*
|
|
|
+ * We must release the server port lock before adding a
|
|
|
+ * subscription to the name table since TIPC needs to be
|
|
|
+ * able to (re)acquire the port lock if an event message
|
|
|
+ * issued by the subscription process is rejected and
|
|
|
+ * returned. The subscription cannot be deleted while
|
|
|
+ * it is being added to the name table because:
|
|
|
+ * a) the single-threading of the native API port code
|
|
|
+ * ensures the subscription cannot be cancelled and
|
|
|
+ * the subscriber connection cannot be broken, and
|
|
|
+ * b) the name table lock ensures the subscription
|
|
|
+ * timeout code cannot delete the subscription,
|
|
|
+ * so the subscription object is still protected.
|
|
|
+ */
|
|
|
+
|
|
|
+ tipc_nametbl_subscribe(sub);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -445,16 +481,10 @@ static void subscr_named_msg_event(void *usr_handle,
|
|
|
struct tipc_portid const *orig,
|
|
|
struct tipc_name_seq const *dest)
|
|
|
{
|
|
|
- struct subscriber *subscriber;
|
|
|
- struct iovec msg_sect = {NULL, 0};
|
|
|
- spinlock_t *subscriber_lock;
|
|
|
+ static struct iovec msg_sect = {NULL, 0};
|
|
|
|
|
|
- dbg("subscr_named_msg_event: orig = %x own = %x,\n",
|
|
|
- orig->node, tipc_own_addr);
|
|
|
- if (size && (size != sizeof(struct tipc_subscr))) {
|
|
|
- warn("Subscriber rejected, invalid subscription size\n");
|
|
|
- return;
|
|
|
- }
|
|
|
+ struct subscriber *subscriber;
|
|
|
+ u32 server_port_ref;
|
|
|
|
|
|
/* Create subscriber object */
|
|
|
|
|
@@ -465,18 +495,11 @@ static void subscr_named_msg_event(void *usr_handle,
|
|
|
}
|
|
|
INIT_LIST_HEAD(&subscriber->subscription_list);
|
|
|
INIT_LIST_HEAD(&subscriber->subscriber_list);
|
|
|
- subscriber->ref = tipc_ref_acquire(subscriber, &subscriber->lock);
|
|
|
- if (subscriber->ref == 0) {
|
|
|
- warn("Subscriber rejected, reference table exhausted\n");
|
|
|
- kfree(subscriber);
|
|
|
- return;
|
|
|
- }
|
|
|
- spin_unlock_bh(subscriber->lock);
|
|
|
|
|
|
- /* Establish a connection to subscriber */
|
|
|
+ /* Create server port & establish connection to subscriber */
|
|
|
|
|
|
tipc_createport(topsrv.user_ref,
|
|
|
- (void *)(unsigned long)subscriber->ref,
|
|
|
+ subscriber,
|
|
|
importance,
|
|
|
NULL,
|
|
|
NULL,
|
|
@@ -488,32 +511,36 @@ static void subscr_named_msg_event(void *usr_handle,
|
|
|
&subscriber->port_ref);
|
|
|
if (subscriber->port_ref == 0) {
|
|
|
warn("Subscriber rejected, unable to create port\n");
|
|
|
- tipc_ref_discard(subscriber->ref);
|
|
|
kfree(subscriber);
|
|
|
return;
|
|
|
}
|
|
|
tipc_connect2port(subscriber->port_ref, orig);
|
|
|
|
|
|
+ /* Lock server port (& save lock address for future use) */
|
|
|
+
|
|
|
+ subscriber->lock = tipc_port_lock(subscriber->port_ref)->publ.lock;
|
|
|
|
|
|
/* Add subscriber to topology server's subscriber list */
|
|
|
|
|
|
- tipc_ref_lock(subscriber->ref);
|
|
|
spin_lock_bh(&topsrv.lock);
|
|
|
list_add(&subscriber->subscriber_list, &topsrv.subscriber_list);
|
|
|
spin_unlock_bh(&topsrv.lock);
|
|
|
|
|
|
- /*
|
|
|
- * Subscribe now if message contains a subscription,
|
|
|
- * otherwise send an empty response to complete connection handshaking
|
|
|
- */
|
|
|
+ /* Unlock server port */
|
|
|
|
|
|
- subscriber_lock = subscriber->lock;
|
|
|
- if (size)
|
|
|
- subscr_subscribe((struct tipc_subscr *)data, subscriber);
|
|
|
- else
|
|
|
- tipc_send(subscriber->port_ref, 1, &msg_sect);
|
|
|
+ server_port_ref = subscriber->port_ref;
|
|
|
+ spin_unlock_bh(subscriber->lock);
|
|
|
|
|
|
- spin_unlock_bh(subscriber_lock);
|
|
|
+ /* Send an ACK- to complete connection handshaking */
|
|
|
+
|
|
|
+ tipc_send(server_port_ref, 1, &msg_sect);
|
|
|
+
|
|
|
+ /* Handle optional subscription request */
|
|
|
+
|
|
|
+ if (size != 0) {
|
|
|
+ subscr_conn_msg_event(subscriber, server_port_ref,
|
|
|
+ buf, data, size);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
int tipc_subscr_start(void)
|
|
@@ -572,8 +599,8 @@ void tipc_subscr_stop(void)
|
|
|
list_for_each_entry_safe(subscriber, subscriber_temp,
|
|
|
&topsrv.subscriber_list,
|
|
|
subscriber_list) {
|
|
|
- tipc_ref_lock(subscriber->ref);
|
|
|
subscriber_lock = subscriber->lock;
|
|
|
+ spin_lock_bh(subscriber_lock);
|
|
|
subscr_terminate(subscriber);
|
|
|
spin_unlock_bh(subscriber_lock);
|
|
|
}
|