afs: Fix race in async call refcounting
authorDavid Howells <dhowells@redhat.com>
Thu, 10 Jan 2019 15:40:50 +0000 (15:40 +0000)
committerDavid Howells <dhowells@redhat.com>
Thu, 17 Jan 2019 15:17:28 +0000 (15:17 +0000)
There's a race between afs_make_call() and afs_wake_up_async_call() in the
case that an error is returned from rxrpc_kernel_send_data() after it has
queued the final packet.

afs_make_call() will try and clean up the mess, but the call state may have
been moved on thereby causing afs_process_async_call() to also try and to
delete the call.

Fix this by:

 (1) Getting an extra ref for an asynchronous call for the call itself to
     hold.  This makes sure the call doesn't evaporate on us accidentally
     and will allow the call to be retained by the caller in a future
     patch.  The ref is released on leaving afs_make_call() or
     afs_wait_for_call_to_complete().

 (2) In the event of an error from rxrpc_kernel_send_data():

     (a) Don't set the call state to AFS_CALL_COMPLETE until *after* the
       call has been aborted and ended.  This prevents
       afs_deliver_to_call() from doing anything with any notifications
       it gets.

     (b) Explicitly end the call immediately to prevent further callbacks.

     (c) Cancel any queued async_work and wait for the work if it's
       executing.  This allows us to be sure the race won't recur when we
       change the state.  We put the work queue's ref on the call if we
       managed to cancel it.

     (d) Put the call's ref that we got in (1).  This belongs to us as long
       as the call is in state AFS_CALL_CL_REQUESTING.

Fixes: 341f741f04be ("afs: Refcount the afs_call struct")
Signed-off-by: David Howells <dhowells@redhat.com>
fs/afs/rxrpc.c
include/trace/events/afs.h

index 4830e0a6bf1d1ebdf7f036ac13b7de16e96dea43..2c588f9bbbda226ec64fa0670e9c92c700f259e6 100644 (file)
@@ -23,6 +23,7 @@ struct workqueue_struct *afs_async_calls;
 static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
 static long afs_wait_for_call_to_complete(struct afs_call *, struct afs_addr_cursor *);
 static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
+static void afs_delete_async_call(struct work_struct *);
 static void afs_process_async_call(struct work_struct *);
 static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
 static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
@@ -404,6 +405,12 @@ long afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call,
                }
        }
 
+       /* If the call is going to be asynchronous, we need an extra ref for
+        * the call to hold itself so the caller need not hang on to its ref.
+        */
+       if (call->async)
+               afs_get_call(call, afs_call_trace_get);
+
        /* create a call */
        rxcall = rxrpc_kernel_begin_call(call->net->socket, srx, call->key,
                                         (unsigned long)call,
@@ -444,15 +451,17 @@ long afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call,
                        goto error_do_abort;
        }
 
-       /* at this point, an async call may no longer exist as it may have
-        * already completed */
-       if (call->async)
+       /* Note that at this point, we may have received the reply or an abort
+        * - and an asynchronous call may already have completed.
+        */
+       if (call->async) {
+               afs_put_call(call);
                return -EINPROGRESS;
+       }
 
        return afs_wait_for_call_to_complete(call, ac);
 
 error_do_abort:
-       call->state = AFS_CALL_COMPLETE;
        if (ret != -ECONNABORTED) {
                rxrpc_kernel_abort_call(call->net->socket, rxcall,
                                        RX_USER_ABORT, ret, "KSD");
@@ -469,8 +478,24 @@ error_do_abort:
 error_kill_call:
        if (call->type->done)
                call->type->done(call);
-       afs_put_call(call);
+
+       /* We need to dispose of the extra ref we grabbed for an async call.
+        * The call, however, might be queued on afs_async_calls and we need to
+        * make sure we don't get any more notifications that might requeue it.
+        */
+       if (call->rxcall) {
+               rxrpc_kernel_end_call(call->net->socket, call->rxcall);
+               call->rxcall = NULL;
+       }
+       if (call->async) {
+               if (cancel_work_sync(&call->async_work))
+                       afs_put_call(call);
+               afs_put_call(call);
+       }
+
        ac->error = ret;
+       call->state = AFS_CALL_COMPLETE;
+       afs_put_call(call);
        _leave(" = %d", ret);
        return ret;
 }
index 33d291888ba9c1ceb6edc3cbcd9db4467633939e..e3f005eae1f7679f9b30dcb8b5d0e5d7032c7ebf 100644 (file)
@@ -25,6 +25,7 @@
 enum afs_call_trace {
        afs_call_trace_alloc,
        afs_call_trace_free,
+       afs_call_trace_get,
        afs_call_trace_put,
        afs_call_trace_wake,
        afs_call_trace_work,
@@ -159,6 +160,7 @@ enum afs_file_error {
 #define afs_call_traces \
        EM(afs_call_trace_alloc,                "ALLOC") \
        EM(afs_call_trace_free,                 "FREE ") \
+       EM(afs_call_trace_get,                  "GET  ") \
        EM(afs_call_trace_put,                  "PUT  ") \
        EM(afs_call_trace_wake,                 "WAKE ") \
        E_(afs_call_trace_work,                 "WORK ")