From f9c935db8086231a35b7f5c2a53e3f1e10f388ee Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 29 Dec 2017 19:48:02 +0100 Subject: [PATCH] tipc: fix problems with multipoint-to-point flow control In commit 04d7b574b245 ("tipc: add multipoint-to-point flow control") we introduced a protocol for preventing buffer overflow when many group members try to simultaneously send messages to the same receiving member. Stress test of this mechanism has revealed a couple of related bugs: - When the receiving member receives an advertisement REMIT message from one of the senders, it will sometimes prematurely activate a pending member and send it the remitted advertisement, although the upper limit for active senders has been reached. This leads to accumulation of illegal advertisements, and eventually to messages being dropped because of receive buffer overflow. - When the receiving member leaves REMITTED state while a received message is being read, we miss to look at the pending queue, to activate the oldest pending peer. This leads to some pending senders being starved out, and never getting the opportunity to profit from the remitted advertisement. We fix the former in the function tipc_group_proto_rcv() by returning directly from the function once it becomes clear that the remitting peer cannot leave REMITTED state at that point. We fix the latter in the function tipc_group_update_rcv_win() by looking up and activate the longest pending peer when it becomes clear that the remitting peer now can leave REMITTED state. Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/group.c | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/net/tipc/group.c b/net/tipc/group.c index 8e12ab55346b..5f4ffae807ee 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -109,7 +109,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, static void tipc_group_decr_active(struct tipc_group *grp, struct tipc_member *m) { - if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING) + if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING || + m->state == MBR_REMITTED) grp->active_cnt--; } @@ -562,7 +563,7 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, int max_active = grp->max_active; int reclaim_limit = max_active * 3 / 4; int active_cnt = grp->active_cnt; - struct tipc_member *m, *rm; + struct tipc_member *m, *rm, *pm; m = tipc_group_find_member(grp, node, port); if (!m) @@ -605,6 +606,17 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); } + grp->active_cnt--; + list_del_init(&m->list); + if (list_empty(&grp->pending)) + return; + + /* Set oldest pending member to active and advertise */ + pm = list_first_entry(&grp->pending, struct tipc_member, list); + pm->state = MBR_ACTIVE; + list_move_tail(&pm->list, &grp->active); + grp->active_cnt++; + tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); break; case MBR_RECLAIMING: case MBR_DISCOVERED: @@ -742,14 +754,14 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, if (!m || m->state != MBR_RECLAIMING) return; - list_del_init(&m->list); - grp->active_cnt--; remitted = msg_grp_remitted(hdr); /* Messages preceding the REMIT still in receive queue */ if (m->advertised > remitted) { m->state = MBR_REMITTED; in_flight = m->advertised - remitted; + m->advertised = ADV_IDLE + in_flight; + return; } /* All messages preceding the REMIT have been read */ if (m->advertised <= remitted) { @@ -761,6 +773,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); m->advertised = ADV_IDLE + in_flight; + grp->active_cnt--; + list_del_init(&m->list); /* Set oldest pending member to active and advertise */ if (list_empty(&grp->pending)) -- 2.30.2