Crossbeams bounded mpmc is genius (and heres why)


Intro

So I was writing some multithreaded C++ code for a uni project recently (well recently when this article was drafted, now it’s a year ago, I have ADHD okay), and I was really pining for some MPMC channels. My years of writing Rust have made me reach for these concurrency primitives almost instinctually and going back to a language without them really hurt. What’s worse I couldn’t find any good lightweight libraries that provided a nice interface similar to Rust’s crossbeam library (which is also used as the basis for the standard libraries mpmc module).

So I ended up writing my own, calling it transbeam and looking at crossbeam’s source code to find what clever things it was doing, turns out it was quite a few.

I will only be looking at the bounded MPMC channel in this article, though the unbounded one also does some clever things like block allocation (it’s a linked-list).

Okay but what is an MPMC

MPMC stands for “Multi Provider Multi Consumer”, it is a data structure for facilitating communication between multiple threads. Multi provider means multiple threads can put data into it simultaneously and multi consumer means multiple threads can pull data out simultaneously. If you’re familiar with Go, its channels are MPMC channels.

Bounded MPMC structures are naturally expressed as ring buffers, with “channels” just being an abstraction on top that place restrictions on whether a given object is a “reader” or “writer” to the channel.

Let’s try some implementations

Let’s go over a few naive cases to see why they fail before getting into it, if you’re already familiar with this kind of thing you can probably skip this section

Two pointers

What if we just had a read pointer and a write pointer, like a traditional single threaded ring buffer?

Unfortunately this falls over quickly in an MPMC context, consider:

read: 0
write: 0
--- do a write
write: 1
--- write thread gets slept after incrementing but before writing
--- Our read sees that write is one ahead so reads out of the slot
read: 1
--- now write wakes up and kaboom race condition (or it doesn't wake and we read uninitalised/stale memory)

You might think we can fix this by simply incrementing the write head after the write completes, but – because it is multi provider – this would allow a race by multiple writers.

Naive three pointer

OK so we need a guard pointer to prevent the read head advancing while allowing multiple writes to happen at once

read: 0
write: 0
end: 0
--- do a write
write: 1
--- write thread is slept but read sees that end is the same so it doesn't do anything

--- now write finishes
end: 1
--- and read can continue safely

This works, but we have to be careful how we increment end, consider:

read: 0
write: 0
end: 0
--- two threads start writing at once A and B
write: 1 -- A claims slot 0
write: 2 -- B claims slot 1
--- now A is slept and B completes before it, then increments `end` (or sets it to its write)
end: 1
--- uh oh read comes along, A is still slept,
--- read sees that end is incremented aaand we're back to the two-pointer version

Therefore we need to compare-and-swap end and ensure its value is write - 1 before updating it (i.e. we wait for all writers up to us to finish before we do). This is not ideal

Crossbeam does smart stuff

Let’s take a look at how crossbeam handles this, because it’s really really cool. It uses bit packing to store the lap + flag + index for each slot in a secondary “metadata” array.

Basics

The lap is used to determine if two markers are on the same “lap” around the ring. The flag is used only on the write head to mark if the channel has been disconnected, and the reset of the bits are the index. e.g. “0011” is lap 0, flag 0, index 3 ("11"). This encoding is very useful because it allows using a single atomic operation when interacting with slots, which is not only technically faster but more importantly has very nice ordering properties.

It also uses a rather clever system for marking when a slot is safe to read or write. If the stamp + 1 is equal to the head then it was written to and can be read, otherwise if the stamp is equal to the head then it can be written to. Note that stamp + 1 can never overflow because of the flag bit (which can never be 1 on an active head).

Working through it

Let’s look at simplified version of this packing which only considers 5 bits (the real one is a full usize)

Consider this initial state:

slots: [
    [0 0 00]
    [0 0 01]
    [0 0 10]
    [0 0 11]
]
read: 0 0 00
write: 0 0 00

Let’s attempt a read operation. We load “0 0 00” and extract the index 00 and lap 0. Now we compare it to the stamp of the slot at 0 which is also 0 0 00. We need it to be our signet +1 (i.e. “0 0 01”) but it’s not, so we check if the stamp is equal to our signet, which it is. This means we have run into the back of our last read, which means there are no items left. We still check the write head however, because if it not equal to us then we know it is currently in the process of writing to this slot so we back off and wait for it to complete (this ensures that even if the write thread gets slept, reads ordered chronologically after it still succeed). If it is equal we of course simply return failure as you cannot pull from an empty channel.

OK so our read in position 0 is failing like it should, now lets see what a write does:

We load the write head ("0 0 00") and compare it to the stamp on slot 0, and it matches! Now we can write to this slot, but we have to be careful not to race with any reads happening – if we update the stamp on this slot before we have written fully to it then a read may try accessing the data before we write it. Therefore we first increment the write head (to “0 0 01”) using compare-and-swap then we write the data into the slot then we can update the stamp to say we’re done. This is why we back off in the read case, as the read thread may observe the update to the write head before the update to the stamp (e.g. because we got slept by the OS). The read thread also uses a seq_cst fence to ensure its load of the write head is not ordered to before its load of the stamp. Anyway we finally update the stamp and set it to 0 0 01.

Now if we try to read again we find the stamp with our signet + 1, hurray! Now we can just read the data and then update the read head– Wait! We have to be careful again, because we need to make sure we don’t race with other readers as well. So now we know the stamp is valid, the very first thing to do is attempt to claim this slot by moving on the read head with compare-and-swap. If the compare fails then someone beat us to it so we’ll try again with the new value, otherwise now we are clear to read the data out of the slot and then write the stamp back as “1 0 00”. Importantly we write the stamp only after fully reading, as if we write the stamp too early we may race with a writer as we read the data out. Note how the reader doesn’t increment the index, unlike the writer, instead incrementing the lap to block other readers (but allow writers).

If you managed to follow all that and you’re anything like me you’re probably thinking “damn this is so cool”. We aren’t done though, because what happens when a write head encounters a slot currently being read from?

Let’s say we’ve done a bunch of reads and writes and our structure now looks like this:

slots: [
    [1 0 00]
    [0 0 10]
    [0 0 11]
    [0 1 00]
]
read: 0 0 01
write: 1 0 00

Write head has gone through:

0 0 00
0 0 01
0 0 10
0 0 11
1 0 00

Now we try and write to slot 0, this works (signet matches) and gives us:

slots: [
    [1 0 01]
    [0 0 10]
    [0 0 11]
    [0 1 00]
]
read: 0 0 01
write: 1 0 01

OK now we try another write, slot 1 and oh… Our signet doesn’t match. Our signet + 1 ("1 0 10") does match the stamp + one lap (0 0 10 + 1 0 00 = 1 0 10) which means this slot must have been made by a write on a the last lap, and it hasn’t yet been read from. Now we do the mirror of the read thread’s action when it encounters a write stamp: we put a fence down to prevent our loading of the stamp from time-travelling and then we load the read head:

std::atomic_thread_fence(std::memory_order_seq_cst);
const auto rd = read_.load(std::memory_order_relaxed);

If it is behind by one lap we know that no reads are currently happening and we have to fail since this is a fixed size data structure

Conclusion

In conclusion crossbeam is fucking cool y’all!! I’m glad I finally got round to going back over this and getting it in a state to publish, hope you enjoyed the read and mayhaps even learned some stuff about threading.