* net/tipc/server.c: TIPC server infrastructure
*
* Copyright (c) 2012-2013, Wind River Systems
+ * Copyright (c) 2017, Ericsson AB
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* @sock: socket handler associated with connection
* @flags: indicates connection state
* @server: pointer to connected server
+ * @sub_list: lsit to all pertaing subscriptions
+ * @sub_lock: lock protecting the subscription list
+ * @outqueue_lock: control access to the outqueue
* @rwork: receive work item
- * @usr_data: user-specified field
* @rx_action: what to do when connection socket is active
* @outqueue: pointer to first outbound message in queue
* @outqueue_lock: control access to the outqueue
- * @outqueue: list of connection objects for its server
* @swork: send work item
*/
struct tipc_conn {
struct socket *sock;
unsigned long flags;
struct tipc_server *server;
+ struct list_head sub_list;
+ spinlock_t sub_lock; /* for subscription list */
struct work_struct rwork;
int (*rx_action) (struct tipc_conn *con);
- void *usr_data;
struct list_head outqueue;
spinlock_t outqueue_lock;
struct work_struct swork;
/* An entry waiting to be sent */
struct outqueue_entry {
+ u32 evt;
struct list_head list;
struct kvec iov;
};
static void tipc_send_work(struct work_struct *work);
static void tipc_clean_outqueues(struct tipc_conn *con);
+static bool connected(struct tipc_conn *con)
+{
+ return con && test_bit(CF_CONNECTED, &con->flags);
+}
+
+/**
+ * htohl - convert value to endianness used by destination
+ * @in: value to convert
+ * @swap: non-zero if endianness must be reversed
+ *
+ * Returns converted value
+ */
+static u32 htohl(u32 in, int swap)
+{
+ return swap ? swab32(in) : in;
+}
+
static void tipc_conn_kref_release(struct kref *kref)
{
struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
struct tipc_server *s = con->server;
struct socket *sock = con->sock;
- struct sock *sk;
if (sock) {
- sk = sock->sk;
if (test_bit(CF_SERVER, &con->flags)) {
__module_get(sock->ops->owner);
- __module_get(sk->sk_prot_creator->owner);
+ __module_get(sock->sk->sk_prot_creator->owner);
}
sock_release(sock);
con->sock = NULL;
spin_lock_bh(&s->idr_lock);
con = idr_find(&s->conn_idr, conid);
- if (con) {
- if (!test_bit(CF_CONNECTED, &con->flags) ||
- !kref_get_unless_zero(&con->kref))
- con = NULL;
- }
+ if (!connected(con) || !kref_get_unless_zero(&con->kref))
+ con = NULL;
spin_unlock_bh(&s->idr_lock);
return con;
}
read_lock_bh(&sk->sk_callback_lock);
con = sock2con(sk);
- if (con && test_bit(CF_CONNECTED, &con->flags)) {
+ if (connected(con)) {
conn_get(con);
if (!queue_work(con->server->rcv_wq, &con->rwork))
conn_put(con);
read_lock_bh(&sk->sk_callback_lock);
con = sock2con(sk);
- if (con && test_bit(CF_CONNECTED, &con->flags)) {
+ if (connected(con)) {
conn_get(con);
if (!queue_work(con->server->send_wq, &con->swork))
conn_put(con);
write_unlock_bh(&sk->sk_callback_lock);
}
+/* tipc_con_delete_sub - delete a specific or all subscriptions
+ * for a given subscriber
+ */
+static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s)
+{
+ struct list_head *sub_list = &con->sub_list;
+ struct tipc_subscription *sub, *tmp;
+
+ spin_lock_bh(&con->sub_lock);
+ list_for_each_entry_safe(sub, tmp, sub_list, subscrp_list) {
+ if (!s || !memcmp(s, &sub->evt.s, sizeof(*s)))
+ tipc_sub_delete(sub);
+ else if (s)
+ break;
+ }
+ spin_unlock_bh(&con->sub_lock);
+}
+
static void tipc_close_conn(struct tipc_conn *con)
{
struct sock *sk = con->sock->sk;
write_lock_bh(&sk->sk_callback_lock);
disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
+
if (disconnect) {
sk->sk_user_data = NULL;
if (con->conid)
- tipc_subscrb_delete(con->usr_data);
+ tipc_con_delete_sub(con, NULL);
}
write_unlock_bh(&sk->sk_callback_lock);
kref_init(&con->kref);
INIT_LIST_HEAD(&con->outqueue);
+ INIT_LIST_HEAD(&con->sub_list);
spin_lock_init(&con->outqueue_lock);
+ spin_lock_init(&con->sub_lock);
INIT_WORK(&con->swork, tipc_send_work);
INIT_WORK(&con->rwork, tipc_recv_work);
return con;
}
+int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
+ void *buf, size_t len)
+{
+ struct tipc_subscr *s = (struct tipc_subscr *)buf;
+ struct tipc_subscription *sub;
+ bool status;
+ int swap;
+
+ /* Determine subscriber's endianness */
+ swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
+ TIPC_SUB_CANCEL));
+
+ /* Detect & process a subscription cancellation request */
+ if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
+ s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
+ tipc_con_delete_sub(con, s);
+ return 0;
+ }
+ status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
+ sub = tipc_subscrp_subscribe(net, s, conid, swap, status);
+ if (!sub)
+ return -1;
+
+ spin_lock_bh(&con->sub_lock);
+ list_add(&sub->subscrp_list, &con->sub_list);
+ spin_unlock_bh(&con->sub_lock);
+ return 0;
+}
+
static int tipc_receive_from_sock(struct tipc_conn *con)
{
struct tipc_server *s = con->server;
}
read_lock_bh(&sk->sk_callback_lock);
- if (test_bit(CF_CONNECTED, &con->flags))
- ret = tipc_subscrb_rcv(sock_net(con->sock->sk), con->conid,
- con->usr_data, buf, ret);
+ ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret);
read_unlock_bh(&sk->sk_callback_lock);
kmem_cache_free(s->rcvbuf_cache, buf);
if (ret < 0)
newcon->rx_action = tipc_receive_from_sock;
tipc_register_callbacks(newsock, newcon);
- /* Notify that new connection is incoming */
- newcon->usr_data = tipc_subscrb_create(newcon->conid);
-
- if (!newcon->usr_data) {
- sock_release(newsock);
- conn_put(newcon);
- return -ENOMEM;
- }
-
/* Wake up receive process in case of 'SYN+' message */
newsock->sk->sk_data_ready(newsock->sk);
return ret;
}
int tipc_conn_sendmsg(struct tipc_server *s, int conid,
- void *data, size_t len)
+ u32 evt, void *data, size_t len)
{
struct outqueue_entry *e;
struct tipc_conn *con;
if (!con)
return -EINVAL;
- if (!test_bit(CF_CONNECTED, &con->flags)) {
+ if (!connected(con)) {
conn_put(con);
return 0;
}
conn_put(con);
return -ENOMEM;
}
-
+ e->evt = evt;
spin_lock_bh(&con->outqueue_lock);
list_add_tail(&e->list, &con->outqueue);
spin_unlock_bh(&con->outqueue_lock);
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
u32 upper, u32 filter, int *conid)
{
- struct tipc_subscriber *scbr;
struct tipc_subscr sub;
- struct tipc_server *s;
struct tipc_conn *con;
+ int rc;
sub.seq.type = type;
sub.seq.lower = lower;
return false;
*conid = con->conid;
- s = con->server;
- scbr = tipc_subscrb_create(*conid);
- if (!scbr) {
- conn_put(con);
- return false;
- }
-
- con->usr_data = scbr;
con->sock = NULL;
- tipc_subscrb_rcv(net, *conid, scbr, &sub, sizeof(sub));
- return true;
+ rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub));
+ if (rc < 0)
+ tipc_close_conn(con);
+ return !rc;
}
void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
{
struct tipc_conn *con;
- struct tipc_server *srv;
con = tipc_conn_lookup(tipc_topsrv(net), conid);
if (!con)
return;
test_and_clear_bit(CF_CONNECTED, &con->flags);
- srv = con->server;
- if (con->conid)
- tipc_subscrb_delete(con->usr_data);
+ tipc_con_delete_sub(con, NULL);
conn_put(con);
conn_put(con);
}
static void tipc_send_to_sock(struct tipc_conn *con)
{
- struct tipc_server *s = con->server;
+ struct list_head *queue = &con->outqueue;
+ struct tipc_server *srv = con->server;
struct outqueue_entry *e;
struct tipc_event *evt;
struct msghdr msg;
int ret;
spin_lock_bh(&con->outqueue_lock);
- while (test_bit(CF_CONNECTED, &con->flags)) {
- e = list_entry(con->outqueue.next, struct outqueue_entry, list);
- if ((struct list_head *) e == &con->outqueue)
- break;
+
+ while (!list_empty(queue)) {
+ e = list_first_entry(queue, struct outqueue_entry, list);
spin_unlock_bh(&con->outqueue_lock);
+ if (e->evt == TIPC_SUBSCR_TIMEOUT) {
+ evt = (struct tipc_event *)e->iov.iov_base;
+ tipc_con_delete_sub(con, &evt->s);
+ }
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_flags = MSG_DONTWAIT;
+
if (con->sock) {
- memset(&msg, 0, sizeof(msg));
- msg.msg_flags = MSG_DONTWAIT;
ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
e->iov.iov_len);
if (ret == -EWOULDBLOCK || ret == 0) {
}
} else {
evt = e->iov.iov_base;
- tipc_send_kern_top_evt(s->net, evt);
+ tipc_send_kern_top_evt(srv->net, evt);
}
/* Don't starve users filling buffers */
cond_resched();
count = 0;
}
-
spin_lock_bh(&con->outqueue_lock);
list_del(&e->list);
tipc_free_entry(e);
struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
int count = 0;
- while (test_bit(CF_CONNECTED, &con->flags)) {
+ while (connected(con)) {
if (con->rx_action(con))
break;
{
struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
- if (test_bit(CF_CONNECTED, &con->flags))
+ if (connected(con))
tipc_send_to_sock(con);
conn_put(con);
/*
* net/tipc/subscr.c: TIPC network topology service
*
- * Copyright (c) 2000-2006, Ericsson AB
+ * Copyright (c) 2000-2017, Ericsson AB
* Copyright (c) 2005-2007, 2010-2013, Wind River Systems
* All rights reserved.
*
#include "name_table.h"
#include "subscr.h"
-/**
- * struct tipc_subscriber - TIPC network topology subscriber
- * @kref: reference counter to tipc_subscription object
- * @conid: connection identifier to server connecting to subscriber
- * @lock: control access to subscriber
- * @subscrp_list: list of subscription objects for this subscriber
- */
-struct tipc_subscriber {
- struct kref kref;
- int conid;
- spinlock_t lock;
- struct list_head subscrp_list;
-};
-
-static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
-
/**
* htohl - convert value to endianness used by destination
* @in: value to convert
u32 event, u32 port_ref, u32 node)
{
struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
- struct tipc_subscriber *subscriber = sub->subscriber;
struct kvec msg_sect;
+ if (sub->inactive)
+ return;
msg_sect.iov_base = (void *)&sub->evt;
msg_sect.iov_len = sizeof(struct tipc_event);
sub->evt.event = htohl(event, sub->swap);
sub->evt.found_upper = htohl(found_upper, sub->swap);
sub->evt.port.ref = htohl(port_ref, sub->swap);
sub->evt.port.node = htohl(node, sub->swap);
- tipc_conn_sendmsg(tn->topsrv, subscriber->conid,
+ tipc_conn_sendmsg(tn->topsrv, sub->conid, event,
msg_sect.iov_base, msg_sect.iov_len);
}
return;
if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE)
return;
-
- tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref,
- node);
+ spin_lock(&sub->lock);
+ tipc_subscrp_send_event(sub, found_lower, found_upper,
+ event, port_ref, node);
+ spin_unlock(&sub->lock);
}
static void tipc_subscrp_timeout(struct timer_list *t)
{
struct tipc_subscription *sub = from_timer(sub, t, timer);
- struct tipc_subscriber *subscriber = sub->subscriber;
-
- spin_lock_bh(&subscriber->lock);
- tipc_nametbl_unsubscribe(sub);
- list_del(&sub->subscrp_list);
- spin_unlock_bh(&subscriber->lock);
+ struct tipc_subscr *s = &sub->evt.s;
- /* Notify subscriber of timeout */
- tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
+ spin_lock(&sub->lock);
+ tipc_subscrp_send_event(sub, s->seq.lower, s->seq.upper,
TIPC_SUBSCR_TIMEOUT, 0, 0);
-
- tipc_subscrp_put(sub);
-}
-
-static void tipc_subscrb_kref_release(struct kref *kref)
-{
- kfree(container_of(kref,struct tipc_subscriber, kref));
-}
-
-static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
-{
- kref_put(&subscriber->kref, tipc_subscrb_kref_release);
-}
-
-static void tipc_subscrb_get(struct tipc_subscriber *subscriber)
-{
- kref_get(&subscriber->kref);
+ sub->inactive = true;
+ spin_unlock(&sub->lock);
}
static void tipc_subscrp_kref_release(struct kref *kref)
struct tipc_subscription,
kref);
struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
- struct tipc_subscriber *subscriber = sub->subscriber;
atomic_dec(&tn->subscription_count);
kfree(sub);
- tipc_subscrb_put(subscriber);
}
void tipc_subscrp_put(struct tipc_subscription *subscription)
kref_get(&subscription->kref);
}
-/* tipc_subscrb_subscrp_delete - delete a specific subscription or all
- * subscriptions for a given subscriber.
- */
-static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber,
- struct tipc_subscr *s)
-{
- struct list_head *subscription_list = &subscriber->subscrp_list;
- struct tipc_subscription *sub, *temp;
- u32 timeout;
-
- spin_lock_bh(&subscriber->lock);
- list_for_each_entry_safe(sub, temp, subscription_list, subscrp_list) {
- if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr)))
- continue;
-
- timeout = htohl(sub->evt.s.timeout, sub->swap);
- if (timeout == TIPC_WAIT_FOREVER || del_timer(&sub->timer)) {
- tipc_nametbl_unsubscribe(sub);
- list_del(&sub->subscrp_list);
- tipc_subscrp_put(sub);
- }
-
- if (s)
- break;
- }
- spin_unlock_bh(&subscriber->lock);
-}
-
-struct tipc_subscriber *tipc_subscrb_create(int conid)
-{
- struct tipc_subscriber *subscriber;
-
- subscriber = kzalloc(sizeof(*subscriber), GFP_ATOMIC);
- if (!subscriber) {
- pr_warn("Subscriber rejected, no memory\n");
- return NULL;
- }
- INIT_LIST_HEAD(&subscriber->subscrp_list);
- kref_init(&subscriber->kref);
- subscriber->conid = conid;
- spin_lock_init(&subscriber->lock);
-
- return subscriber;
-}
-
-void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
-{
- tipc_subscrb_subscrp_delete(subscriber, NULL);
- tipc_subscrb_put(subscriber);
-}
-
-static void tipc_subscrp_cancel(struct tipc_subscr *s,
- struct tipc_subscriber *subscriber)
-{
- tipc_subscrb_get(subscriber);
- tipc_subscrb_subscrp_delete(subscriber, s);
- tipc_subscrb_put(subscriber);
-}
-
static struct tipc_subscription *tipc_subscrp_create(struct net *net,
struct tipc_subscr *s,
- int swap)
+ int conid, bool swap)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_subscription *sub;
/* Initialize subscription object */
sub->net = net;
+ sub->conid = conid;
+ sub->inactive = false;
if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) ||
(htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) {
pr_warn("Subscription rejected, illegal request\n");
sub->swap = swap;
memcpy(&sub->evt.s, s, sizeof(*s));
+ spin_lock_init(&sub->lock);
atomic_inc(&tn->subscription_count);
kref_init(&sub->kref);
return sub;
}
-static int tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
- struct tipc_subscriber *subscriber, int swap,
- bool status)
+struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
+ struct tipc_subscr *s,
+ int conid, bool swap,
+ bool status)
{
struct tipc_subscription *sub = NULL;
u32 timeout;
- sub = tipc_subscrp_create(net, s, swap);
+ sub = tipc_subscrp_create(net, s, conid, swap);
if (!sub)
- return -1;
+ return NULL;
- spin_lock_bh(&subscriber->lock);
- list_add(&sub->subscrp_list, &subscriber->subscrp_list);
- sub->subscriber = subscriber;
tipc_nametbl_subscribe(sub, status);
- tipc_subscrb_get(subscriber);
- spin_unlock_bh(&subscriber->lock);
-
timer_setup(&sub->timer, tipc_subscrp_timeout, 0);
timeout = htohl(sub->evt.s.timeout, swap);
-
if (timeout != TIPC_WAIT_FOREVER)
mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
- return 0;
+ return sub;
}
-/* Handle one request to create a new subscription for the subscriber
- */
-int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data,
- void *buf, size_t len)
+void tipc_sub_delete(struct tipc_subscription *sub)
{
- struct tipc_subscriber *subscriber = usr_data;
- struct tipc_subscr *s = (struct tipc_subscr *)buf;
- bool status;
- int swap;
-
- /* Determine subscriber's endianness */
- swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
- TIPC_SUB_CANCEL));
-
- /* Detect & process a subscription cancellation request */
- if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
- s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
- tipc_subscrp_cancel(s, subscriber);
- return 0;
- }
- status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
- return tipc_subscrp_subscribe(net, s, subscriber, swap, status);
+ tipc_nametbl_unsubscribe(sub);
+ if (sub->evt.s.timeout != TIPC_WAIT_FOREVER)
+ del_timer_sync(&sub->timer);
+ list_del(&sub->subscrp_list);
+ tipc_subscrp_put(sub);
}
int tipc_topsrv_start(struct net *net)