rxrpc: Fix the packet reception routine
authorDavid Howells <dhowells@redhat.com>
Mon, 8 Oct 2018 14:46:25 +0000 (15:46 +0100)
committerDavid Howells <dhowells@redhat.com>
Mon, 8 Oct 2018 21:42:04 +0000 (22:42 +0100)
The rxrpc_input_packet() function and its call tree was built around the
assumption that data_ready() handler called from UDP to inform a kernel
service that there is data to be had was non-reentrant.  This means that
certain locking could be dispensed with.

This, however, turns out not to be the case with a multi-queue network card
that can deliver packets to multiple cpus simultaneously.  Each of those
cpus can be in the rxrpc_input_packet() function at the same time.

Fix by adding or changing some structure members:

 (1) Add peer->rtt_input_lock to serialise access to the RTT buffer.

 (2) Make conn->service_id into a 32-bit variable so that it can be
     cmpxchg'd on all arches.

 (3) Add call->input_lock to serialise access to the Rx/Tx state.  Note
     that although the Rx and Tx states are (almost) entirely separate,
     there's no point completing the separation and having separate locks
     since it's a bi-phasal RPC protocol rather than a bi-direction
     streaming protocol.  Data transmission and data reception do not take
     place simultaneously on any particular call.

and making the following functional changes:

 (1) In rxrpc_input_data(), hold call->input_lock around the core to
     prevent simultaneous producing of packets into the Rx ring and
     updating of tracking state for a particular call.

 (2) In rxrpc_input_ping_response(), only read call->ping_serial once, and
     check it before checking RXRPC_CALL_PINGING as that's a cheaper test.
     The bit test and bit clear can then be combined.  No further locking
     is needed here.

 (3) In rxrpc_input_ack(), take call->input_lock after we've parsed much of
     the ACK packet.  The superseded ACK check is then done both before and
     after the lock is taken.

     The handing of ackinfo data is split, parsing before the lock is taken
     and processing with it held.  This is keyed on rxMTU being non-zero.

     Congestion management is also done within the locked section.

 (4) In rxrpc_input_ackall(), take call->input_lock around the Tx window
     rotation.  The ACKALL packet carries no information and is only really
     useful after all packets have been transmitted since it's imprecise.

 (5) In rxrpc_input_implicit_end_call(), we use rx->incoming_lock to
     prevent calls being simultaneously implicitly ended on two cpus and
     also to prevent any races with incoming call setup.

 (6) In rxrpc_input_packet(), use cmpxchg() to effect the service upgrade
     on a connection.  It is only permitted to happen once for a
     connection.

 (7) In rxrpc_new_incoming_call(), we have to recheck the routing inside
     rx->incoming_lock to see if someone else set up the call, connection
     or peer whilst we were getting there.  We can't trust the values from
     the earlier routing check unless we pin refs on them - which we want
     to avoid.

     Further, we need to allow for an incoming call to have its state
     changed on another CPU between us making it live and us adjusting it
     because the conn is now in the RXRPC_CONN_SERVICE state.

 (8) In rxrpc_peer_add_rtt(), take peer->rtt_input_lock around the access
     to the RTT buffer.  Don't need to lock around setting peer->rtt.

For reference, the inventory of state-accessing or state-altering functions
used by the packet input procedure is:

> rxrpc_input_packet()
  * PACKET CHECKING

  * ROUTING
    > rxrpc_post_packet_to_local()
    > rxrpc_find_connection_rcu() - uses RCU
      > rxrpc_lookup_peer_rcu() - uses RCU
      > rxrpc_find_service_conn_rcu() - uses RCU
      > idr_find() - uses RCU

  * CONNECTION-LEVEL PROCESSING
    - Service upgrade
      - Can only happen once per conn
      ! Changed to use cmpxchg
    > rxrpc_post_packet_to_conn()
    - Setting conn->hi_serial
      - Probably safe not using locks
      - Maybe use cmpxchg

  * CALL-LEVEL PROCESSING
    > Old-call checking
      > rxrpc_input_implicit_end_call()
        > rxrpc_call_completed()
> rxrpc_queue_call()
! Need to take rx->incoming_lock
> __rxrpc_disconnect_call()
> rxrpc_notify_socket()
    > rxrpc_new_incoming_call()
      - Uses rx->incoming_lock for the entire process
        - Might be able to drop this earlier in favour of the call lock
      > rxrpc_incoming_call()
       ! Conflicts with rxrpc_input_implicit_end_call()
    > rxrpc_send_ping()
      - Don't need locks to check rtt state
      > rxrpc_propose_ACK

  * PACKET DISTRIBUTION
    > rxrpc_input_call_packet()
      > rxrpc_input_data()
* QUEUE DATA PACKET ON CALL
> rxrpc_reduce_call_timer()
  - Uses timer_reduce()
! Needs call->input_lock()
> rxrpc_receiving_reply()
  ! Needs locking around ack state
  > rxrpc_rotate_tx_window()
  > rxrpc_end_tx_phase()
> rxrpc_proto_abort()
> rxrpc_input_dup_data()
- Fills the Rx buffer
- rxrpc_propose_ACK()
- rxrpc_notify_socket()

      > rxrpc_input_ack()
* APPLY ACK PACKET TO CALL AND DISCARD PACKET
> rxrpc_input_ping_response()
  - Probably doesn't need any extra locking
  ! Need READ_ONCE() on call->ping_serial
  > rxrpc_input_check_for_lost_ack()
    - Takes call->lock to consult Tx buffer
  > rxrpc_peer_add_rtt()
    ! Needs to take a lock (peer->rtt_input_lock)
    ! Could perhaps manage with cmpxchg() and xadd() instead
> rxrpc_input_requested_ack
  - Consults Tx buffer
    ! Probably needs a lock
  > rxrpc_peer_add_rtt()
> rxrpc_propose_ack()
> rxrpc_input_ackinfo()
  - Changes call->tx_winsize
    ! Use cmpxchg to handle change
    ! Should perhaps track serial number
  - Uses peer->lock to record MTU specification changes
> rxrpc_proto_abort()
! Need to take call->input_lock
> rxrpc_rotate_tx_window()
> rxrpc_end_tx_phase()
> rxrpc_input_soft_acks()
- Consults the Tx buffer
> rxrpc_congestion_management()
  - Modifies the Tx annotations
  ! Needs call->input_lock()
  > rxrpc_queue_call()

      > rxrpc_input_abort()
* APPLY ABORT PACKET TO CALL AND DISCARD PACKET
> rxrpc_set_call_completion()
> rxrpc_notify_socket()

      > rxrpc_input_ackall()
* APPLY ACKALL PACKET TO CALL AND DISCARD PACKET
! Need to take call->input_lock
> rxrpc_rotate_tx_window()
> rxrpc_end_tx_phase()

    > rxrpc_reject_packet()

There are some functions used by the above that queue the packet, after
which the procedure is terminated:

 - rxrpc_post_packet_to_local()
   - local->event_queue is an sk_buff_head
   - local->processor is a work_struct
 - rxrpc_post_packet_to_conn()
   - conn->rx_queue is an sk_buff_head
   - conn->processor is a work_struct
 - rxrpc_reject_packet()
   - local->reject_queue is an sk_buff_head
   - local->processor is a work_struct

And some that offload processing to process context:

 - rxrpc_notify_socket()
   - Uses RCU lock
   - Uses call->notify_lock to call call->notify_rx
   - Uses call->recvmsg_lock to queue recvmsg side
 - rxrpc_queue_call()
   - call->processor is a work_struct
 - rxrpc_propose_ACK()
   - Uses call->lock to wrap __rxrpc_propose_ACK()

And a bunch that complete a call, all of which use call->state_lock to
protect the call state:

 - rxrpc_call_completed()
 - rxrpc_set_call_completion()
 - rxrpc_abort_call()
 - rxrpc_proto_abort()
   - Also uses rxrpc_queue_call()

Fixes: 17926a79320a ("[AF_RXRPC]: Provide secure RxRPC sockets for use by userspace and kernel both")
Signed-off-by: David Howells <dhowells@redhat.com>
net/rxrpc/ar-internal.h
net/rxrpc/call_accept.c
net/rxrpc/call_object.c
net/rxrpc/input.c
net/rxrpc/peer_event.c
net/rxrpc/peer_object.c

index 45307463b7ddc58fe62e85b205fde3d65195499a..a6e6cae82c30503d57d1a2aa487dc64f5079f4c2 100644 (file)
@@ -302,6 +302,7 @@ struct rxrpc_peer {
 
        /* calculated RTT cache */
 #define RXRPC_RTT_CACHE_SIZE 32
+       spinlock_t              rtt_input_lock; /* RTT lock for input routine */
        ktime_t                 rtt_last_req;   /* Time of last RTT request */
        u64                     rtt;            /* Current RTT estimate (in nS) */
        u64                     rtt_sum;        /* Sum of cache contents */
@@ -447,7 +448,7 @@ struct rxrpc_connection {
        atomic_t                serial;         /* packet serial number counter */
        unsigned int            hi_serial;      /* highest serial number received */
        u32                     security_nonce; /* response re-use preventer */
-       u16                     service_id;     /* Service ID, possibly upgraded */
+       u32                     service_id;     /* Service ID, possibly upgraded */
        u8                      size_align;     /* data size alignment (for security) */
        u8                      security_size;  /* security header size */
        u8                      security_ix;    /* security type */
@@ -635,6 +636,8 @@ struct rxrpc_call {
        bool                    tx_phase;       /* T if transmission phase, F if receive phase */
        u8                      nr_jumbo_bad;   /* Number of jumbo dups/exceeds-windows */
 
+       spinlock_t              input_lock;     /* Lock for packet input to this call */
+
        /* receive-phase ACK management */
        u8                      ackr_reason;    /* reason to ACK */
        u16                     ackr_skew;      /* skew on packet being ACK'd */
@@ -720,8 +723,6 @@ int rxrpc_service_prealloc(struct rxrpc_sock *, gfp_t);
 void rxrpc_discard_prealloc(struct rxrpc_sock *);
 struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *,
                                           struct rxrpc_sock *,
-                                          struct rxrpc_peer *,
-                                          struct rxrpc_connection *,
                                           struct sk_buff *);
 void rxrpc_accept_incoming_calls(struct rxrpc_local *);
 struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long,
index 1c4ebc0cb25b70775fd4264ed55da51119717147..652e314de38ebf16c8682234cb1150dc9bf485f5 100644 (file)
@@ -333,11 +333,11 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
  */
 struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
                                           struct rxrpc_sock *rx,
-                                          struct rxrpc_peer *peer,
-                                          struct rxrpc_connection *conn,
                                           struct sk_buff *skb)
 {
        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+       struct rxrpc_connection *conn;
+       struct rxrpc_peer *peer;
        struct rxrpc_call *call;
 
        _enter("");
@@ -354,6 +354,13 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
                goto out;
        }
 
+       /* The peer, connection and call may all have sprung into existence due
+        * to a duplicate packet being handled on another CPU in parallel, so
+        * we have to recheck the routing.  However, we're now holding
+        * rx->incoming_lock, so the values should remain stable.
+        */
+       conn = rxrpc_find_connection_rcu(local, skb, &peer);
+
        call = rxrpc_alloc_incoming_call(rx, local, peer, conn, skb);
        if (!call) {
                skb->mark = RXRPC_SKB_MARK_REJECT_BUSY;
@@ -396,10 +403,12 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
 
        case RXRPC_CONN_SERVICE:
                write_lock(&call->state_lock);
-               if (rx->discard_new_call)
-                       call->state = RXRPC_CALL_SERVER_RECV_REQUEST;
-               else
-                       call->state = RXRPC_CALL_SERVER_ACCEPTING;
+               if (call->state < RXRPC_CALL_COMPLETE) {
+                       if (rx->discard_new_call)
+                               call->state = RXRPC_CALL_SERVER_RECV_REQUEST;
+                       else
+                               call->state = RXRPC_CALL_SERVER_ACCEPTING;
+               }
                write_unlock(&call->state_lock);
                break;
 
index 0ca2c2dfd1964e614c4867b04439454ca92d432d..8f1a8f85b1f99ef7ded560bf1da2ec80a08f0fcb 100644 (file)
@@ -138,6 +138,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
        init_waitqueue_head(&call->waitq);
        spin_lock_init(&call->lock);
        spin_lock_init(&call->notify_lock);
+       spin_lock_init(&call->input_lock);
        rwlock_init(&call->state_lock);
        atomic_set(&call->usage, 1);
        call->debug_id = debug_id;
index 04213a65c1ac2f1d8006f9dbfd820dc4636e61f6..570b49d2da427a465d7dfdb631412ebb19dc3721 100644 (file)
@@ -459,13 +459,15 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb,
                }
        }
 
+       spin_lock(&call->input_lock);
+
        /* Received data implicitly ACKs all of the request packets we sent
         * when we're acting as a client.
         */
        if ((state == RXRPC_CALL_CLIENT_SEND_REQUEST ||
             state == RXRPC_CALL_CLIENT_AWAIT_REPLY) &&
            !rxrpc_receiving_reply(call))
-               return;
+               goto unlock;
 
        call->ackr_prev_seq = seq;
 
@@ -495,12 +497,16 @@ next_subpacket:
 
        if (flags & RXRPC_LAST_PACKET) {
                if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
-                   seq != call->rx_top)
-                       return rxrpc_proto_abort("LSN", call, seq);
+                   seq != call->rx_top) {
+                       rxrpc_proto_abort("LSN", call, seq);
+                       goto unlock;
+               }
        } else {
                if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
-                   after_eq(seq, call->rx_top))
-                       return rxrpc_proto_abort("LSA", call, seq);
+                   after_eq(seq, call->rx_top)) {
+                       rxrpc_proto_abort("LSA", call, seq);
+                       goto unlock;
+               }
        }
 
        trace_rxrpc_rx_data(call->debug_id, seq, serial, flags, annotation);
@@ -567,8 +573,10 @@ next_subpacket:
 skip:
        offset += len;
        if (flags & RXRPC_JUMBO_PACKET) {
-               if (skb_copy_bits(skb, offset, &flags, 1) < 0)
-                       return rxrpc_proto_abort("XJF", call, seq);
+               if (skb_copy_bits(skb, offset, &flags, 1) < 0) {
+                       rxrpc_proto_abort("XJF", call, seq);
+                       goto unlock;
+               }
                offset += sizeof(struct rxrpc_jumbo_header);
                seq++;
                serial++;
@@ -608,6 +616,9 @@ ack:
                trace_rxrpc_notify_socket(call->debug_id, serial);
                rxrpc_notify_socket(call);
        }
+
+unlock:
+       spin_unlock(&call->input_lock);
        _leave(" [queued]");
 }
 
@@ -694,15 +705,14 @@ static void rxrpc_input_ping_response(struct rxrpc_call *call,
 
        ping_time = call->ping_time;
        smp_rmb();
-       ping_serial = call->ping_serial;
+       ping_serial = READ_ONCE(call->ping_serial);
 
        if (orig_serial == call->acks_lost_ping)
                rxrpc_input_check_for_lost_ack(call);
 
-       if (!test_bit(RXRPC_CALL_PINGING, &call->flags) ||
-           before(orig_serial, ping_serial))
+       if (before(orig_serial, ping_serial) ||
+           !test_and_clear_bit(RXRPC_CALL_PINGING, &call->flags))
                return;
-       clear_bit(RXRPC_CALL_PINGING, &call->flags);
        if (after(orig_serial, ping_serial))
                return;
 
@@ -869,24 +879,31 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
        }
 
        /* Discard any out-of-order or duplicate ACKs. */
-       if (before_eq(sp->hdr.serial, call->acks_latest)) {
-               _debug("discard ACK %d <= %d",
-                      sp->hdr.serial, call->acks_latest);
+       if (before_eq(sp->hdr.serial, call->acks_latest))
                return;
-       }
+
+       buf.info.rxMTU = 0;
+       ioffset = offset + nr_acks + 3;
+       if (skb->len >= ioffset + sizeof(buf.info) &&
+           skb_copy_bits(skb, ioffset, &buf.info, sizeof(buf.info)) < 0)
+               return rxrpc_proto_abort("XAI", call, 0);
+
+       spin_lock(&call->input_lock);
+
+       /* Discard any out-of-order or duplicate ACKs. */
+       if (before_eq(sp->hdr.serial, call->acks_latest))
+               goto out;
        call->acks_latest_ts = skb->tstamp;
        call->acks_latest = sp->hdr.serial;
 
        /* Parse rwind and mtu sizes if provided. */
-       ioffset = offset + nr_acks + 3;
-       if (skb->len >= ioffset + sizeof(buf.info)) {
-               if (skb_copy_bits(skb, ioffset, &buf.info, sizeof(buf.info)) < 0)
-                       return rxrpc_proto_abort("XAI", call, 0);
+       if (buf.info.rxMTU)
                rxrpc_input_ackinfo(call, skb, &buf.info);
-       }
 
-       if (first_soft_ack == 0)
-               return rxrpc_proto_abort("AK0", call, 0);
+       if (first_soft_ack == 0) {
+               rxrpc_proto_abort("AK0", call, 0);
+               goto out;
+       }
 
        /* Ignore ACKs unless we are or have just been transmitting. */
        switch (READ_ONCE(call->state)) {
@@ -896,25 +913,31 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
        case RXRPC_CALL_SERVER_AWAIT_ACK:
                break;
        default:
-               return;
+               goto out;
        }
 
        if (before(hard_ack, call->tx_hard_ack) ||
-           after(hard_ack, call->tx_top))
-               return rxrpc_proto_abort("AKW", call, 0);
-       if (nr_acks > call->tx_top - hard_ack)
-               return rxrpc_proto_abort("AKN", call, 0);
+           after(hard_ack, call->tx_top)) {
+               rxrpc_proto_abort("AKW", call, 0);
+               goto out;
+       }
+       if (nr_acks > call->tx_top - hard_ack) {
+               rxrpc_proto_abort("AKN", call, 0);
+               goto out;
+       }
 
        if (after(hard_ack, call->tx_hard_ack)) {
                if (rxrpc_rotate_tx_window(call, hard_ack, &summary)) {
                        rxrpc_end_tx_phase(call, false, "ETA");
-                       return;
+                       goto out;
                }
        }
 
        if (nr_acks > 0) {
-               if (skb_copy_bits(skb, offset, buf.acks, nr_acks) < 0)
-                       return rxrpc_proto_abort("XSA", call, 0);
+               if (skb_copy_bits(skb, offset, buf.acks, nr_acks) < 0) {
+                       rxrpc_proto_abort("XSA", call, 0);
+                       goto out;
+               }
                rxrpc_input_soft_acks(call, buf.acks, first_soft_ack, nr_acks,
                                      &summary);
        }
@@ -927,7 +950,9 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
                                  false, true,
                                  rxrpc_propose_ack_ping_for_lost_reply);
 
-       return rxrpc_congestion_management(call, skb, &summary, acked_serial);
+       rxrpc_congestion_management(call, skb, &summary, acked_serial);
+out:
+       spin_unlock(&call->input_lock);
 }
 
 /*
@@ -940,8 +965,12 @@ static void rxrpc_input_ackall(struct rxrpc_call *call, struct sk_buff *skb)
 
        _proto("Rx ACKALL %%%u", sp->hdr.serial);
 
+       spin_lock(&call->input_lock);
+
        if (rxrpc_rotate_tx_window(call, call->tx_top, &summary))
                rxrpc_end_tx_phase(call, false, "ETL");
+
+       spin_unlock(&call->input_lock);
 }
 
 /*
@@ -1024,18 +1053,19 @@ static void rxrpc_input_call_packet(struct rxrpc_call *call,
 }
 
 /*
- * Handle a new call on a channel implicitly completing the preceding call on
- * that channel.
+ * Handle a new service call on a channel implicitly completing the preceding
+ * call on that channel.  This does not apply to client conns.
  *
  * TODO: If callNumber > call_id + 1, renegotiate security.
  */
-static void rxrpc_input_implicit_end_call(struct rxrpc_connection *conn,
+static void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
+                                         struct rxrpc_connection *conn,
                                          struct rxrpc_call *call)
 {
        switch (READ_ONCE(call->state)) {
        case RXRPC_CALL_SERVER_AWAIT_ACK:
                rxrpc_call_completed(call);
-               break;
+               /* Fall through */
        case RXRPC_CALL_COMPLETE:
                break;
        default:
@@ -1043,11 +1073,13 @@ static void rxrpc_input_implicit_end_call(struct rxrpc_connection *conn,
                        set_bit(RXRPC_CALL_EV_ABORT, &call->events);
                        rxrpc_queue_call(call);
                }
+               trace_rxrpc_improper_term(call);
                break;
        }
 
-       trace_rxrpc_improper_term(call);
+       spin_lock(&rx->incoming_lock);
        __rxrpc_disconnect_call(conn, call);
+       spin_unlock(&rx->incoming_lock);
        rxrpc_notify_socket(call);
 }
 
@@ -1244,10 +1276,16 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
                        goto wrong_security;
 
                if (sp->hdr.serviceId != conn->service_id) {
-                       if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags) ||
-                           conn->service_id != conn->params.service_id)
+                       int old_id;
+
+                       if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
+                               goto reupgrade;
+                       old_id = cmpxchg(&conn->service_id, conn->params.service_id,
+                                        sp->hdr.serviceId);
+
+                       if (old_id != conn->params.service_id &&
+                           old_id != sp->hdr.serviceId)
                                goto reupgrade;
-                       conn->service_id = sp->hdr.serviceId;
                }
 
                if (sp->hdr.callNumber == 0) {
@@ -1305,7 +1343,7 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
                        if (rxrpc_to_client(sp))
                                goto reject_packet;
                        if (call)
-                               rxrpc_input_implicit_end_call(conn, call);
+                               rxrpc_input_implicit_end_call(rx, conn, call);
                        call = NULL;
                }
 
@@ -1325,7 +1363,7 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
                        goto bad_message;
                if (sp->hdr.seq != 1)
                        goto discard;
-               call = rxrpc_new_incoming_call(local, rx, peer, conn, skb);
+               call = rxrpc_new_incoming_call(local, rx, skb);
                if (!call)
                        goto reject_packet;
                rxrpc_send_ping(call, skb, skew);
index f3e6fc670da2339998992f0f0904e1f0b767ddd1..05b51bdbdd41d0248d9b9c97dbcd9348277371a4 100644 (file)
@@ -301,6 +301,8 @@ void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
        if (rtt < 0)
                return;
 
+       spin_lock(&peer->rtt_input_lock);
+
        /* Replace the oldest datum in the RTT buffer */
        sum -= peer->rtt_cache[cursor];
        sum += rtt;
@@ -312,6 +314,8 @@ void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
                peer->rtt_usage = usage;
        }
 
+       spin_unlock(&peer->rtt_input_lock);
+
        /* Now recalculate the average */
        if (usage == RXRPC_RTT_CACHE_SIZE) {
                avg = sum / RXRPC_RTT_CACHE_SIZE;
@@ -320,6 +324,7 @@ void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
                do_div(avg, usage);
        }
 
+       /* Don't need to update this under lock */
        peer->rtt = avg;
        trace_rxrpc_rtt_rx(call, why, send_serial, resp_serial, rtt,
                           usage, avg);
index 2d39eaf19620ec6a36e9303d50986b41ed8fcbe1..5691b7d266ca0aaef3a5b2da30d64891e644f0f5 100644 (file)
@@ -225,6 +225,7 @@ struct rxrpc_peer *rxrpc_alloc_peer(struct rxrpc_local *local, gfp_t gfp)
                peer->service_conns = RB_ROOT;
                seqlock_init(&peer->service_conn_lock);
                spin_lock_init(&peer->lock);
+               spin_lock_init(&peer->rtt_input_lock);
                peer->debug_id = atomic_inc_return(&rxrpc_debug_id);
 
                if (RXRPC_TX_SMSS > 2190)