diff options
author | Davidlohr Bueso <dave@stgolabs.net> | 2015-06-30 14:58:39 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2015-06-30 19:44:58 -0700 |
commit | ff35e5ef86fea1fa84eb7fdc939d0b1e3f1222bf (patch) | |
tree | dcbd89deb61c178925663ce5ad0457f232715a23 /ipc | |
parent | c5c8975b2eb4eb7604e8ce4f762987f56d2a96a2 (diff) |
ipc,msg: provide barrier pairings for lockless receive
We currently use a full barrier on the sender side to to avoid receiver
tasks disappearing on us while still performing on the sender side wakeup.
We lack however, the proper CPU-CPU interactions pairing on the receiver
side which busy-waits for the message. Similarly, we do not need a full
smp_mb, and can relax the semantics for the writer and reader sides of the
message. This is safe as we are only ordering loads and stores to r_msg.
And in both smp_wmb and smp_rmb, there are no stores after the calls
_anyway_.
This obviously applies for pipelined_send and expunge_all, for EIRDM when
destroying a queue.
Signed-off-by: Davidlohr Bueso <dbueso@suse.de>
Cc: Manfred Spraul <manfred@colorfullife.com>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
Diffstat (limited to 'ipc')
-rw-r--r-- | ipc/msg.c | 48 |
1 files changed, 38 insertions, 10 deletions
diff --git a/ipc/msg.c b/ipc/msg.c index 2b6fdbb9e0e9..a9c3c519490a 100644 --- a/ipc/msg.c +++ b/ipc/msg.c @@ -196,7 +196,7 @@ static void expunge_all(struct msg_queue *msq, int res) * or dealing with -EAGAIN cases. See lockless receive part 1 * and 2 in do_msgrcv(). */ - smp_mb(); + smp_wmb(); /* barrier (B) */ msr->r_msg = ERR_PTR(res); } } @@ -580,7 +580,8 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) /* initialize pipelined send ordering */ msr->r_msg = NULL; wake_up_process(msr->r_tsk); - smp_mb(); /* see barrier comment below */ + /* barrier (B) see barrier comment below */ + smp_wmb(); msr->r_msg = ERR_PTR(-E2BIG); } else { msr->r_msg = NULL; @@ -589,11 +590,12 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) wake_up_process(msr->r_tsk); /* * Ensure that the wakeup is visible before - * setting r_msg, as the receiving end depends - * on it. See lockless receive part 1 and 2 in - * do_msgrcv(). + * setting r_msg, as the receiving can otherwise + * exit - once r_msg is set, the receiver can + * continue. See lockless receive part 1 and 2 + * in do_msgrcv(). Barrier (B). */ - smp_mb(); + smp_wmb(); msr->r_msg = msg; return 1; @@ -932,12 +934,38 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl /* Lockless receive, part 2: * Wait until pipelined_send or expunge_all are outside of * wake_up_process(). There is a race with exit(), see - * ipc/mqueue.c for the details. + * ipc/mqueue.c for the details. The correct serialization + * ensures that a receiver cannot continue without the wakeup + * being visibible _before_ setting r_msg: + * + * CPU 0 CPU 1 + * <loop receiver> + * smp_rmb(); (A) <-- pair -. <waker thread> + * <load ->r_msg> | msr->r_msg = NULL; + * | wake_up_process(); + * <continue> `------> smp_wmb(); (B) + * msr->r_msg = msg; + * + * Where (A) orders the message value read and where (B) orders + * the write to the r_msg -- done in both pipelined_send and + * expunge_all. */ - msg = (struct msg_msg *)msr_d.r_msg; - while (msg == NULL) { - cpu_relax(); + for (;;) { + /* + * Pairs with writer barrier in pipelined_send + * or expunge_all. + */ + smp_rmb(); /* barrier (A) */ msg = (struct msg_msg *)msr_d.r_msg; + if (msg) + break; + + /* + * The cpu_relax() call is a compiler barrier + * which forces everything in this loop to be + * re-loaded. + */ + cpu_relax(); } /* Lockless receive, part 3: |