rds: use RCU to synchronize work-enqueue with connection teardown
authorSowmini Varadhan <sowmini.varadhan@oracle.com>
Thu, 4 Jan 2018 14:53:00 +0000 (06:53 -0800)
committerDavid S. Miller <davem@davemloft.net>
Fri, 5 Jan 2018 18:39:18 +0000 (13:39 -0500)
rds_sendmsg() can enqueue work on cp_send_w from process context, but
it should not enqueue this work if connection teardown  has commenced
(else we risk enquing work after rds_conn_path_destroy() has assumed that
all work has been cancelled/flushed).

Similarly some other functions like rds_cong_queue_updates
and rds_tcp_data_ready are called in softirq context, and may end
up enqueuing work on rds_wq after rds_conn_path_destroy() has assumed
that all workqs are quiesced.

Check the RDS_DESTROY_PENDING bit and use rcu synchronization to avoid
all these races.

Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/rds/cong.c
net/rds/connection.c
net/rds/send.c
net/rds/tcp_recv.c
net/rds/tcp_send.c
net/rds/threads.c

index 8398fee7c866b37257cf2fb8f542d7e7a4d71ceb..8d19fd25dce36db3eb7c5b4252da325cc0b74913 100644 (file)
@@ -219,7 +219,11 @@ void rds_cong_queue_updates(struct rds_cong_map *map)
        spin_lock_irqsave(&rds_cong_lock, flags);
 
        list_for_each_entry(conn, &map->m_conn_list, c_map_item) {
-               if (!test_and_set_bit(0, &conn->c_map_queued)) {
+               struct rds_conn_path *cp = &conn->c_path[0];
+
+               rcu_read_lock();
+               if (!test_and_set_bit(0, &conn->c_map_queued) &&
+                   !test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
                        rds_stats_inc(s_cong_update_queued);
                        /* We cannot inline the call to rds_send_xmit() here
                         * for two reasons (both pertaining to a TCP transport):
@@ -235,9 +239,9 @@ void rds_cong_queue_updates(struct rds_cong_map *map)
                         *    therefore trigger warnings.
                         * Defer the xmit to rds_send_worker() instead.
                         */
-                       queue_delayed_work(rds_wq,
-                                          &conn->c_path[0].cp_send_w, 0);
+                       queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
                }
+               rcu_read_unlock();
        }
 
        spin_unlock_irqrestore(&rds_cong_lock, flags);
index 1eed197e694f6ed75e9019c5a925c6e91ccd3713..b10c0ef36d8d458d808d054670dd2833b0e6a322 100644 (file)
@@ -366,8 +366,6 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
         * to the conn hash, so we never trigger a reconnect on this
         * conn - the reconnect is always triggered by the active peer. */
        cancel_delayed_work_sync(&cp->cp_conn_w);
-       if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
-               return;
        rcu_read_lock();
        if (!hlist_unhashed(&conn->c_hash_node)) {
                rcu_read_unlock();
@@ -390,6 +388,7 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
                return;
 
        /* make sure lingering queued work won't try to ref the conn */
+       synchronize_rcu();
        cancel_delayed_work_sync(&cp->cp_send_w);
        cancel_delayed_work_sync(&cp->cp_recv_w);
 
@@ -407,6 +406,11 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
        if (cp->cp_xmit_rm)
                rds_message_put(cp->cp_xmit_rm);
 
+       WARN_ON(delayed_work_pending(&cp->cp_send_w));
+       WARN_ON(delayed_work_pending(&cp->cp_recv_w));
+       WARN_ON(delayed_work_pending(&cp->cp_conn_w));
+       WARN_ON(work_pending(&cp->cp_down_w));
+
        cp->cp_conn->c_trans->conn_free(cp->cp_transport_data);
 }
 
@@ -686,10 +690,13 @@ void rds_conn_path_drop(struct rds_conn_path *cp, bool destroy)
 {
        atomic_set(&cp->cp_state, RDS_CONN_ERROR);
 
-       if (!destroy && test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
+       rcu_read_lock();
+       if (!destroy && test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
+               rcu_read_unlock();
                return;
-
+       }
        queue_work(rds_wq, &cp->cp_down_w);
+       rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rds_conn_path_drop);
 
@@ -706,9 +713,15 @@ EXPORT_SYMBOL_GPL(rds_conn_drop);
  */
 void rds_conn_path_connect_if_down(struct rds_conn_path *cp)
 {
+       rcu_read_lock();
+       if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
+               rcu_read_unlock();
+               return;
+       }
        if (rds_conn_path_state(cp) == RDS_CONN_DOWN &&
            !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags))
                queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
+       rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down);
 
index f72466c63f0c5657a2f44e11ae432dd1457991cf..d3e32d1f3c7d61fe910e2dfa59016e7456d4252c 100644 (file)
@@ -162,6 +162,12 @@ restart:
                goto out;
        }
 
+       if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
+               release_in_xmit(cp);
+               ret = -ENETUNREACH; /* dont requeue send work */
+               goto out;
+       }
+
        /*
         * we record the send generation after doing the xmit acquire.
         * if someone else manages to jump in and do some work, we'll use
@@ -437,7 +443,12 @@ over_batch:
                    !list_empty(&cp->cp_send_queue)) && !raced) {
                        if (batch_count < send_batch_count)
                                goto restart;
-                       queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+                       rcu_read_lock();
+                       if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
+                               ret = -ENETUNREACH;
+                       else
+                               queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+                       rcu_read_unlock();
                } else if (raced) {
                        rds_stats_inc(s_send_lock_queue_raced);
                }
@@ -1151,6 +1162,11 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
        else
                cpath = &conn->c_path[0];
 
+       if (test_bit(RDS_DESTROY_PENDING, &cpath->cp_flags)) {
+               ret = -EAGAIN;
+               goto out;
+       }
+
        rds_conn_path_connect_if_down(cpath);
 
        ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
@@ -1190,9 +1206,17 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
        rds_stats_inc(s_send_queued);
 
        ret = rds_send_xmit(cpath);
-       if (ret == -ENOMEM || ret == -EAGAIN)
-               queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
-
+       if (ret == -ENOMEM || ret == -EAGAIN) {
+               ret = 0;
+               rcu_read_lock();
+               if (test_bit(RDS_DESTROY_PENDING, &cpath->cp_flags))
+                       ret = -ENETUNREACH;
+               else
+                       queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
+               rcu_read_unlock();
+       }
+       if (ret)
+               goto out;
        rds_message_put(rm);
        return payload_len;
 
@@ -1270,7 +1294,10 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport,
        rds_stats_inc(s_send_pong);
 
        /* schedule the send work on rds_wq */
-       queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+       rcu_read_lock();
+       if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
+               queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+       rcu_read_unlock();
 
        rds_message_put(rm);
        return 0;
index e006ef8e6d404195f19e5d8b9bbf6683b504a7cc..dd707b9e73e57dae52e8568b60d3e3f7000765af 100644 (file)
@@ -321,8 +321,12 @@ void rds_tcp_data_ready(struct sock *sk)
        ready = tc->t_orig_data_ready;
        rds_tcp_stats_inc(s_tcp_data_ready_calls);
 
-       if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM)
-               queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+       if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) {
+               rcu_read_lock();
+               if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
+                       queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+               rcu_read_unlock();
+       }
 out:
        read_unlock_bh(&sk->sk_callback_lock);
        ready(sk);
index dc860d1bb6088929591bb670ef96079de7c1819c..73c74763ca720f4684b64c1851de63c4f05b2796 100644 (file)
@@ -202,8 +202,11 @@ void rds_tcp_write_space(struct sock *sk)
        tc->t_last_seen_una = rds_tcp_snd_una(tc);
        rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), rds_tcp_is_acked);
 
-       if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf)
+       rcu_read_lock();
+       if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf &&
+           !test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
                queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+       rcu_read_unlock();
 
 out:
        read_unlock_bh(&sk->sk_callback_lock);
index f121daa402c81df6ea111da3d5bee6472629025f..eb76db1360b00b3fe3e7ff96f588c3b1a9cac46d 100644 (file)
@@ -87,8 +87,12 @@ void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
 
        cp->cp_reconnect_jiffies = 0;
        set_bit(0, &cp->cp_conn->c_map_queued);
-       queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
-       queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+       rcu_read_lock();
+       if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
+               queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+               queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+       }
+       rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rds_connect_path_complete);
 
@@ -133,7 +137,10 @@ void rds_queue_reconnect(struct rds_conn_path *cp)
        set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
        if (cp->cp_reconnect_jiffies == 0) {
                cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
-               queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
+               rcu_read_lock();
+               if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
+                       queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
+               rcu_read_unlock();
                return;
        }
 
@@ -141,8 +148,11 @@ void rds_queue_reconnect(struct rds_conn_path *cp)
        rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n",
                 rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
                 conn, &conn->c_laddr, &conn->c_faddr);
-       queue_delayed_work(rds_wq, &cp->cp_conn_w,
-                          rand % cp->cp_reconnect_jiffies);
+       rcu_read_lock();
+       if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
+               queue_delayed_work(rds_wq, &cp->cp_conn_w,
+                                  rand % cp->cp_reconnect_jiffies);
+       rcu_read_unlock();
 
        cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
                                        rds_sysctl_reconnect_max_jiffies);