xsk: fix XDP socket ring buffer memory ordering
authorMagnus Karlsson <magnus.karlsson@intel.com>
Tue, 16 Apr 2019 12:58:08 +0000 (14:58 +0200)
committerAlexei Starovoitov <ast@kernel.org>
Wed, 17 Apr 2019 03:13:10 +0000 (20:13 -0700)
The ring buffer code of XDP sockets is missing a memory barrier on the
consumer side between the load of the data and the write that signals
that it is ok for the producer to put new data into the buffer. On
architectures that does not guarantee that stores are not reordered
with older loads, the producer might put data into the ring before the
consumer had the chance to read it. As IA does guarantee this
ordering, it would only need a compiler barrier here, but there are no
primitives in Linux for this specific case (hinder writes to be ordered
before older reads) so I had to add a smp_mb() here which will
translate into a run-time synch operation on IA.

Added a longish comment in the code explaining what each barrier in
the ring implementation accomplishes and what would happen if we
removed one of them.

Signed-off-by: Magnus Karlsson <magnus.karlsson@intel.com>
Signed-off-by: Alexei Starovoitov <ast@kernel.org>
net/xdp/xsk_queue.h

index 610c0bdc0c2b883d08e706bbd8b0ce3109b09d1e..88b9ae24658d1efd39286d9a42357efe3b25ee42 100644 (file)
@@ -43,6 +43,48 @@ struct xsk_queue {
        u64 invalid_descs;
 };
 
+/* The structure of the shared state of the rings are the same as the
+ * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
+ * ring, the kernel is the producer and user space is the consumer. For
+ * the Tx and fill rings, the kernel is the consumer and user space is
+ * the producer.
+ *
+ * producer                         consumer
+ *
+ * if (LOAD ->consumer) {           LOAD ->producer
+ *                    (A)           smp_rmb()       (C)
+ *    STORE $data                   LOAD $data
+ *    smp_wmb()       (B)           smp_mb()        (D)
+ *    STORE ->producer              STORE ->consumer
+ * }
+ *
+ * (A) pairs with (D), and (B) pairs with (C).
+ *
+ * Starting with (B), it protects the data from being written after
+ * the producer pointer. If this barrier was missing, the consumer
+ * could observe the producer pointer being set and thus load the data
+ * before the producer has written the new data. The consumer would in
+ * this case load the old data.
+ *
+ * (C) protects the consumer from speculatively loading the data before
+ * the producer pointer actually has been read. If we do not have this
+ * barrier, some architectures could load old data as speculative loads
+ * are not discarded as the CPU does not know there is a dependency
+ * between ->producer and data.
+ *
+ * (A) is a control dependency that separates the load of ->consumer
+ * from the stores of $data. In case ->consumer indicates there is no
+ * room in the buffer to store $data we do not. So no barrier is needed.
+ *
+ * (D) protects the load of the data to be observed to happen after the
+ * store of the consumer pointer. If we did not have this memory
+ * barrier, the producer could observe the consumer pointer being set
+ * and overwrite the data with a new value before the consumer got the
+ * chance to read the old value. The consumer would thus miss reading
+ * the old entry and very likely read the new entry twice, once right
+ * now and again after circling through the ring.
+ */
+
 /* Common functions operating for both RXTX and umem queues */
 
 static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
@@ -106,6 +148,7 @@ static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr)
 static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr)
 {
        if (q->cons_tail == q->cons_head) {
+               smp_mb(); /* D, matches A */
                WRITE_ONCE(q->ring->consumer, q->cons_tail);
                q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
 
@@ -128,10 +171,11 @@ static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
        if (xskq_nb_free(q, q->prod_tail, 1) == 0)
                return -ENOSPC;
 
+       /* A, matches D */
        ring->desc[q->prod_tail++ & q->ring_mask] = addr;
 
        /* Order producer and data */
-       smp_wmb();
+       smp_wmb(); /* B, matches C */
 
        WRITE_ONCE(q->ring->producer, q->prod_tail);
        return 0;
@@ -144,6 +188,7 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr)
        if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0)
                return -ENOSPC;
 
+       /* A, matches D */
        ring->desc[q->prod_head++ & q->ring_mask] = addr;
        return 0;
 }
@@ -152,7 +197,7 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q,
                                             u32 nb_entries)
 {
        /* Order producer and data */
-       smp_wmb();
+       smp_wmb(); /* B, matches C */
 
        q->prod_tail += nb_entries;
        WRITE_ONCE(q->ring->producer, q->prod_tail);
@@ -163,6 +208,7 @@ static inline int xskq_reserve_addr(struct xsk_queue *q)
        if (xskq_nb_free(q, q->prod_head, 1) == 0)
                return -ENOSPC;
 
+       /* A, matches D */
        q->prod_head++;
        return 0;
 }
@@ -204,11 +250,12 @@ static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
                                              struct xdp_desc *desc)
 {
        if (q->cons_tail == q->cons_head) {
+               smp_mb(); /* D, matches A */
                WRITE_ONCE(q->ring->consumer, q->cons_tail);
                q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
 
                /* Order consumer and data */
-               smp_rmb();
+               smp_rmb(); /* C, matches B */
        }
 
        return xskq_validate_desc(q, desc);
@@ -228,6 +275,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
        if (xskq_nb_free(q, q->prod_head, 1) == 0)
                return -ENOSPC;
 
+       /* A, matches D */
        idx = (q->prod_head++) & q->ring_mask;
        ring->desc[idx].addr = addr;
        ring->desc[idx].len = len;
@@ -238,7 +286,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
 static inline void xskq_produce_flush_desc(struct xsk_queue *q)
 {
        /* Order producer and data */
-       smp_wmb();
+       smp_wmb(); /* B, matches C */
 
        q->prod_tail = q->prod_head,
        WRITE_ONCE(q->ring->producer, q->prod_tail);