From 550aebfe1c573518c35ae85d6ffbdc2d44c92703 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Fri, 14 Sep 2018 14:32:45 -0400 Subject: [PATCH] SUNRPC: Allow AF_LOCAL sockets to use the generic stream receive Signed-off-by: Trond Myklebust --- include/linux/sunrpc/xdr.h | 1 - net/sunrpc/socklib.c | 4 +- net/sunrpc/xprtsock.c | 137 +++++-------------------------------- 3 files changed, 18 insertions(+), 124 deletions(-) diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h index 745587132a87..8815be7cae72 100644 --- a/include/linux/sunrpc/xdr.h +++ b/include/linux/sunrpc/xdr.h @@ -185,7 +185,6 @@ struct xdr_skb_reader { typedef size_t (*xdr_skb_read_actor)(struct xdr_skb_reader *desc, void *to, size_t len); -size_t xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len); extern int csum_partial_copy_to_xdr(struct xdr_buf *, struct sk_buff *); extern ssize_t xdr_partial_copy_from_skb(struct xdr_buf *, unsigned int, struct xdr_skb_reader *, xdr_skb_read_actor); diff --git a/net/sunrpc/socklib.c b/net/sunrpc/socklib.c index 08f00a98151f..0e7c0dee7578 100644 --- a/net/sunrpc/socklib.c +++ b/net/sunrpc/socklib.c @@ -26,7 +26,8 @@ * Possibly called several times to iterate over an sk_buff and copy * data out of it. */ -size_t xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len) +static size_t +xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len) { if (len > desc->count) len = desc->count; @@ -36,7 +37,6 @@ size_t xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len) desc->offset += len; return len; } -EXPORT_SYMBOL_GPL(xdr_skb_read_bits); /** * xdr_skb_read_and_csum_bits - copy and checksum from skb to buffer diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 55df1fadab27..90d4c92177b7 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -670,6 +670,17 @@ static void xs_stream_data_receive_workfn(struct work_struct *work) xs_stream_data_receive(transport); } +static void +xs_stream_reset_connect(struct sock_xprt *transport) +{ + transport->recv.offset = 0; + transport->recv.len = 0; + transport->recv.copied = 0; + transport->xmit.offset = 0; + transport->xprt.stat.connect_count++; + transport->xprt.stat.connect_start = jiffies; +} + #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more) @@ -1266,114 +1277,6 @@ static void xs_destroy(struct rpc_xprt *xprt) module_put(THIS_MODULE); } -static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) -{ - struct xdr_skb_reader desc = { - .skb = skb, - .offset = sizeof(rpc_fraghdr), - .count = skb->len - sizeof(rpc_fraghdr), - }; - - if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0) - return -1; - if (desc.count) - return -1; - return 0; -} - -/** - * xs_local_data_read_skb - * @xprt: transport - * @sk: socket - * @skb: skbuff - * - * Currently this assumes we can read the whole reply in a single gulp. - */ -static void xs_local_data_read_skb(struct rpc_xprt *xprt, - struct sock *sk, - struct sk_buff *skb) -{ - struct rpc_task *task; - struct rpc_rqst *rovr; - int repsize, copied; - u32 _xid; - __be32 *xp; - - repsize = skb->len - sizeof(rpc_fraghdr); - if (repsize < 4) { - dprintk("RPC: impossible RPC reply size %d\n", repsize); - return; - } - - /* Copy the XID from the skb... */ - xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid); - if (xp == NULL) - return; - - /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->queue_lock); - rovr = xprt_lookup_rqst(xprt, *xp); - if (!rovr) - goto out_unlock; - xprt_pin_rqst(rovr); - spin_unlock(&xprt->queue_lock); - task = rovr->rq_task; - - copied = rovr->rq_private_buf.buflen; - if (copied > repsize) - copied = repsize; - - if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) { - dprintk("RPC: sk_buff copy failed\n"); - spin_lock(&xprt->queue_lock); - goto out_unpin; - } - - spin_lock(&xprt->queue_lock); - xprt_complete_rqst(task, copied); -out_unpin: - xprt_unpin_rqst(rovr); - out_unlock: - spin_unlock(&xprt->queue_lock); -} - -static void xs_local_data_receive(struct sock_xprt *transport) -{ - struct sk_buff *skb; - struct sock *sk; - int err; - -restart: - mutex_lock(&transport->recv_mutex); - sk = transport->inet; - if (sk == NULL) - goto out; - for (;;) { - skb = skb_recv_datagram(sk, 0, 1, &err); - if (skb != NULL) { - xs_local_data_read_skb(&transport->xprt, sk, skb); - skb_free_datagram(sk, skb); - continue; - } - if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) - break; - if (need_resched()) { - mutex_unlock(&transport->recv_mutex); - cond_resched(); - goto restart; - } - } -out: - mutex_unlock(&transport->recv_mutex); -} - -static void xs_local_data_receive_workfn(struct work_struct *work) -{ - struct sock_xprt *transport = - container_of(work, struct sock_xprt, recv_worker); - xs_local_data_receive(transport); -} - /** * xs_udp_data_read_skb - receive callback for UDP sockets * @xprt: transport @@ -1974,11 +1877,8 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, write_unlock_bh(&sk->sk_callback_lock); } - transport->xmit.offset = 0; + xs_stream_reset_connect(transport); - /* Tell the socket layer to start connecting... */ - xprt->stat.connect_count++; - xprt->stat.connect_start = jiffies; return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0); } @@ -2335,14 +2235,9 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) xs_set_memalloc(xprt); /* Reset TCP record info */ - transport->recv.offset = 0; - transport->recv.len = 0; - transport->recv.copied = 0; - transport->xmit.offset = 0; + xs_stream_reset_connect(transport); /* Tell the socket layer to start connecting... */ - xprt->stat.connect_count++; - xprt->stat.connect_start = jiffies; set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); switch (ret) { @@ -2717,6 +2612,7 @@ static const struct rpc_xprt_ops xs_local_ops = { .connect = xs_local_connect, .buf_alloc = rpc_malloc, .buf_free = rpc_free, + .prepare_request = xs_stream_prepare_request, .send_request = xs_local_send_request, .set_retrans_timeout = xprt_set_retrans_timeout_def, .close = xs_close, @@ -2901,9 +2797,8 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) xprt->ops = &xs_local_ops; xprt->timeout = &xs_local_default_timeout; - INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_dummy_setup_socket); + INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); + INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); switch (sun->sun_family) { case AF_LOCAL: -- 2.30.2