Code that counts

With a title like this, you might expect this little book to be a philosophical or sociological exploration of how to write programs that are meaningful to yourself, others, or humanity as a whole.

However, while that would certainly be an interesting topic to write about, and I encourage you to do so and send me back a link if you think you can give it a good shot, that’s actually not the kind of subject that I personally feel qualified or compelled to cover. You see, I work in academia, and it is a well-known fact that in academia, our main area of expertise is writing meaningless code.

So instead, I’m going to explore optimization techniques for compute-bound code by going through the meaningless exercise of counting from zero to a certain limit N as quickly as possible.

To make this interesting, I will only allow myself to use integer constants 0 and 1. It is not allowed to skip steps in the counting or directly set an integer to the limit N to declare the job done. And if the compiler decides to produce code that does this, then optimization barriers will need to be added in order to prevent it from doing so. However, parallelization techniques that involve initializing multiple integers to 0 or 1, incrementing all of them, then summing them later on is fair game.

I’m going to go step by step from what would be my first “intuitive” answer to the problem, to what I believe to be the optimal answer on current-generation x86 hardware (as of 2022), and hopefully you’ll learn an interesting thing or two along the way.

Getting started

Let’s start simple and count with the + operator:

#![allow(unused)]
fn main() {
pub fn basic(target: u64) -> u64 {
    let mut current = 0;
    for _ in 0..target {
        current = pessimize::hide(current + 1);
    }
    current
}
}

Here, I use my pessimize crate, which is a fancier version of the recently stabilized std::hint::black_box, to ensure that the compiler doesn’t optimize out the counting by directly returning the end result. This optimization barrier pretends to the compiler that something is reading out every intermediary count, which prevents it from skipping ahead.

How fast does this code count ? Well, actually, that depends how far you are counting:

  • For smaller counts, the counting time is dominated by the time it takes to set up and tear down the counting code, which is around one nanosecond on my CPU.
  • For counts of more than about a thousand elements, this overhead becomes negligible with respect to the actual counting costs, and thus we start counting once per CPU cycle, or a little more than 4 billion times per second on my CPU (4.27 to be precise).

But to me, the interesting question is, how much faster can we go than this simple code, and at what costs (logical shortcuts, API constraints, code complexity and maintainability…).

In the following, I’m going to assume that we are interested in optimizing for asymptotic counting throughput when going for large counts. But I’ll still keep track of how many iterations it takes to reach baseline performance and peak performance, just so you see what tradeoffs we make when optimizing for throughput like this.

All the code that I’m going to present will be featured in the “counter” directory of this book’s source code, along with tests and microbenchmarks that allow you to assert that the count is correct and replicate the results discussed here on your own CPU.

Instruction-Level Parallelism (ILP)

Reaching the end of the previous chapter, you may think that the naive for loop is optimal, since it increments the counter on every clock cycle so it obviously keeps the CPU busy all the time. However, you would be wrong.

You see, modern CPUs are superscalar, which means that they can do multiple things per clock cyle. In the case of my Zen 2 CPU, you can check over at uops.info that it can actually do four integer increments per cycle. This is called “Instruction Level Parallelism”, or ILP for short. The question then is, why do we not observe this form of parallelism in the simple counting loop?

The problem here is that the CPU may only execute multiple instructions at the same time if these instructions are independent from each other. But here, each counter value depends on the previous one, so the counter increments are not independent and must await each other. To get instruction level parallelism, we need to maintain multiple independent counters and spread the counting work across them like this:

#![allow(unused)]
fn main() {
pub fn ilp<const WIDTH: usize>(target: u64) -> u64 {
    assert_ne!(WIDTH, 0, "No progress possible in this configuration");

    // Accumulate in parallel
    let mut counters = [0; WIDTH];
    for _ in 0..(target / WIDTH as u64) {
        for counter in &mut counters {
            *counter = pessimize::hide(*counter + 1);
        }
    }

    // Accumulate remaining elements
    for counter in counters.iter_mut().take(target as usize % WIDTH) {
        *counter = pessimize::hide(*counter + 1);
    }

    // Merge accumulators using parallel reduction
    let mut stride = WIDTH.next_power_of_two() / 2;
    while stride > 0 {
        for i in 0..stride.min(WIDTH - stride) {
            counters[i] += counters[i + stride];
        }
        stride /= 2;
    }
    counters[0]
}
}

Notice that we need extra code in the end to merge counters and manage those odd elements that don’t fit into an integer multiple of the number of counters. This will be a recuring theme in the remainder of this book.

Given the above assertion that the CPU can do 4 increments per cycle, you may expect optimal instruction-level performance to be achieved with 4 independent counters. But due to the extra instructions needed to manage the counting loop, and the fact that adding counters has the side-effect of forcing the compiler to do extra unrolling which amortizes that overhead, performance improvements are actually observed all the way up to 15 counters, which is the maximum supported by the x86_64 architecture before counters start being spilled from registers to RAM.

With those 15 counters, we observe a peak throughput of 3.6 increments per cycle, a little less than the 4 we would have hoped for but still pretty close.

In terms of scaling with the number of counting iterations, the performance is close to that of sequential counting with only one iteration of parallel counting (15 increments), and beats it with two iterations. Here, we see that instruction-level parallelism is actually a good way to get performance in large loops without sacrificing the performance of smaller loops too much.

SIMD

Executing multiple instructions per CPU clock cycle is one thing, but modern CPUs have another trick up their sleeves, which is to execute multiple computations per instruction. The technical jargon for this is “Single Instruction Multiple Data” or SIMD for short.

While one also often speaks of “vectorization”, which is definitely easier to pronounce and remember, I will try to avoid this term here because these days it is most often used to refer to the unrelated idea of re-expressing iterative computations in terms of standardized operations on large arrays, in order to work around inefficient programming language designs and implementations.

SIMD is a broad topic, so I will approach it incrementally:

  • First I will demonstrate the basic concept by taking our baseline counting loop and translating it to simple SIMD as another way of computing multiple integer increments per cycle.
  • Next I will show how SIMD can can be productively combined with instruction-level parallelism.
  • After that, I will address the hardware heterogeneity issues that pervade SIMD programming, and tools that can be used to handle this discrepancy.
  • And finally, I will demonstrate SIMD’s sensitivity to data width and ways to make the most of it in the context of this toy counting problem.

Basic SIMD

Let’s start with a simple vector loop that uses the SSE2 instruction set:

#![allow(unused)]
fn main() {
pub fn simd_basic(target: u64) -> u64 {
    use safe_arch::m128i;
    const SIMD_WIDTH: usize = 2;

    // Set up SIMD counters and increment
    let mut simd_counter = m128i::from([0u64; SIMD_WIDTH]);
    let simd_increment = m128i::from([1u64; SIMD_WIDTH]);

    // Accumulate in parallel
    for _ in 0..(target / SIMD_WIDTH as u64) {
        simd_counter = pessimize::hide(safe_arch::add_i64_m128i(simd_counter, simd_increment));
    }

    // Merge the SIMD counters into a scalar counter
    let counters: [u64; SIMD_WIDTH] = simd_counter.into();
    let mut counter = counters.iter().sum();

    // Accumulate trailing element, if any
    counter = pessimize::hide(counter + target % SIMD_WIDTH as u64);
    counter
}
}

You may notice strong similarities with the instruction-level parallelism approach that I used earlier, except this time instead of nice arrays I am using a weird m128i x86-specific type that comes with its own weird array of operations. And that is in spite of me using the safe_arch Rust crate, which already has friendlier API conventions than the underlying hardware intrinsics.

If you can bear with the weirdness, however, this approach works slightly better than two-way instruction level parallelism by virtue of generating a simpler instruction stream that the CPU has an easier time parsing through, and also leaving the CPU’s scalar execution resources free to handle other program work like loop counter management.

As a result, while the asymptotic throughput is the same as that of two-way instruction-level parallelism, as you would expect, small loops will execute a few CPU cycles faster.

Also, while readers with former SIMD experience may frown at me for using conversions from integer arrays instead of cheaper hardware tricks for generating SIMD vectors of zeroes and ones, I would advise such readers to give optimizing compilers a little more credit. While this is certainly not always true, sometimes you don’t really need clever code to produce decent assembly:

    counter::simd_basic:
      mov     %rdi,%rax
      pxor    %xmm0,%xmm0
      cmp     $0x2,%rdi
    ↓ jb      29
      mov     %rax,%rcx
      shr     %rcx
      pxor    %xmm0,%xmm0
      pcmpeqd %xmm1,%xmm1
      nop
20:   psubq   %xmm1,%xmm0
      dec     %rcx
    ↑ jne     20
29:   movq    %xmm0,%rcx
      pshufd  $0xee,%xmm0,%xmm0
      movq    %xmm0,%rdx
      and     $0x1,%eax
      add     %rcx,%rax
      add     %rdx,%rax
    ← ret

Superscalar SIMD

By our powers combined…

As we have seen, both instruction-level parallelism and SIMD can be leveraged to achieve extra execution performance. Now it’s time to combine them and see how the benefits add up.

#![allow(unused)]
fn main() {
pub fn simd_ilp<const ILP_WIDTH: usize>(target: u64) -> u64 {
    use safe_arch::m128i;
    const SIMD_WIDTH: usize = 2;
    assert_ne!(ILP_WIDTH, 0, "No progress possible in this configuration");

    // Set up counters and increment
    let mut simd_counters = [m128i::from([0u64; SIMD_WIDTH]); ILP_WIDTH];
    let simd_increment = m128i::from([1u64; SIMD_WIDTH]);

    // Accumulate in parallel
    let full_width = (SIMD_WIDTH * ILP_WIDTH) as u64;
    for _ in 0..(target / full_width) {
        for simd_counter in &mut simd_counters {
            *simd_counter =
                pessimize::hide(safe_arch::add_i64_m128i(*simd_counter, simd_increment));
        }
    }

    // Accumulate remaining pairs of elements
    let mut remainder = (target % full_width) as usize;
    while remainder >= SIMD_WIDTH {
        for simd_counter in simd_counters.iter_mut().take(remainder / SIMD_WIDTH) {
            *simd_counter =
                pessimize::hide(safe_arch::add_i64_m128i(*simd_counter, simd_increment));
            remainder -= SIMD_WIDTH;
        }
    }

    // Merge SIMD accumulators using parallel reduction
    let mut stride = ILP_WIDTH.next_power_of_two() / 2;
    while stride > 0 {
        for i in 0..stride.min(ILP_WIDTH - stride) {
            simd_counters[i] =
                safe_arch::add_i64_m128i(simd_counters[i], simd_counters[i + stride]);
        }
        stride /= 2;
    }
    let simd_counter = simd_counters[0];

    // Merge the SIMD counters into a scalar counter
    let counters: [u64; SIMD_WIDTH] = simd_counter.into();
    let mut counter = counters.iter().sum();

    // Accumulate trailing element, if any, the scalar way
    for _ in 0..remainder {
        counter = pessimize::hide(counter + 1);
    }
    counter
}
}

This code… could be prettier. It certainly shows the need for layers of abstraction when combining multiple layers of hardware parallelism. But since this might be your first exposure to the marriage of SIMD and ILP, seeing it written explicitly the first time may make understanding easier.

How fast do we expect this to get? From uops.info, my Zen 2 CPU can do 3 SSE 64-bit integer additions per clock cycle. Each SSE addition, in turn, can sum two 64-bit integers. So our expectation would be to reach 6 integer increments per clock cycle, which indeed we do, with this implementation reaching 25.6 billion integer increments per second on my CPU.

Another thing worth noting is that we need less ILP streams to do this, with peak performance being reached with an ILP_WIDTH of 9. This is good for multiple reasons:

  • Narrower code uses up less instruction cache and CPU registers, thus reducing the risk of slowdown elsewhere in the codebase when this function is called.
  • Narrower code reaches top speed with a smaller number of loop iterations. This implementation beats the 15-way ILP loop at any number of iterations, but it also beats the basic for loop’s counting rate when counting only up to >8, and the basic SIMD loop when counting up to >16. So not only is it very efficient asymptotically, it also becomes efficient very quickly as problem size ramps up.

It is also very easy to explain why this is the case: with ILP based on scalar increments, our previous code was sharing resources with the outer loop management instructions. This code only relies on SIMD instructions for counting, which are not used for outer loop management, so as soon as we have enough work to keep the SIMD unit fed without outer loop management becoming a bottleneck, we should operate at maximum possible speed.

Superscalar honeymoon?

At this point, it may seem like I am claiming that SIMD is superior to scalar processing and should be used in its place whenever practical. However, for integer computations at least, it actually should not have to be one or the other.

Since my Zen 2 CPU can process at most 5 instructions per cycle, but only 3 SSE instructions per cycle, one has to wonder if given suitable shaped code, it shouldn’t be possible for me to also have it perform a pair of scalar integer increments in each CPU cycle in addition to all the SSE work it’s already doing.

Of course, in doing so, I would need to be careful not to consume ILP resources needed to handle the outer counting loop. But given enough code contortions, that can be taken care of.

And so I did actually try out this hybrid scalar + SIMD approach…

#![allow(unused)]
fn main() {
pub fn extreme_ilp<
    // Number of SIMD operations per cycle
    const SIMD_ILP: usize,
    // Number of scalar iterations per cycle
    const SCALAR_ILP: usize,
    // Number of cycles in the inner loop
    const UNROLL_FACTOR: usize,
    // Number of scalar instructions needed for outer loop maintenance
    const LOOP_INSNS: usize,
>(
    target: u64,
) -> u64 {
    use safe_arch::m128i;
    const SIMD_WIDTH: usize = 2;
    assert_ne!(SIMD_ILP, 0, "No point in this without SIMD ops");
    assert_ne!(SCALAR_ILP, 0, "No point in this without scalar ops");

    // Set up counters and increment
    let mut simd_counters = [m128i::from([0u64; SIMD_WIDTH]); SIMD_ILP];
    let simd_increment = m128i::from([1u64; SIMD_WIDTH]);
    let mut scalar_counters = [0; SCALAR_ILP];

    // Accumulate in parallel
    let unrolled_simd_ops = SIMD_WIDTH * SIMD_ILP * UNROLL_FACTOR;
    let unrolled_scalar_ops = (SCALAR_ILP * UNROLL_FACTOR).saturating_sub(LOOP_INSNS);
    let full_width = unrolled_simd_ops + unrolled_scalar_ops;
    for _ in 0..(target / full_width as u64) {
        for unroll_iter in 0..UNROLL_FACTOR {
            for simd_counter in &mut simd_counters {
                *simd_counter =
                    pessimize::hide(safe_arch::add_i64_m128i(*simd_counter, simd_increment));
            }
            for scalar_counter in scalar_counters
                .iter_mut()
                .take(unrolled_scalar_ops - unroll_iter * SCALAR_ILP)
            {
                *scalar_counter = pessimize::hide(*scalar_counter + 1)
            }
        }
    }

    // Accumulate remaining elements using as much parallelism as possible
    let mut remainder = (target % full_width as u64) as usize;
    while remainder > SIMD_WIDTH {
        for simd_counter in simd_counters.iter_mut().take(remainder / SIMD_WIDTH) {
            *simd_counter =
                pessimize::hide(safe_arch::add_i64_m128i(*simd_counter, simd_increment));
            remainder -= SIMD_WIDTH;
        }
    }
    while remainder > 0 {
        for scalar_counter in scalar_counters.iter_mut().take(remainder) {
            *scalar_counter = pessimize::hide(*scalar_counter + 1);
            remainder -= 1;
        }
    }

    // Merge accumulators using parallel reduction
    let mut stride = (SIMD_ILP.max(SCALAR_ILP)).next_power_of_two() / 2;
    while stride > 0 {
        for i in 0..stride.min(SIMD_ILP.saturating_sub(stride)) {
            simd_counters[i] =
                safe_arch::add_i64_m128i(simd_counters[i], simd_counters[i + stride]);
        }
        for i in 0..stride.min(SCALAR_ILP.saturating_sub(stride)) {
            scalar_counters[i] += scalar_counters[i + stride];
        }
        stride /= 2;
    }
    let simd_counter = simd_counters[0];
    let mut scalar_counter = scalar_counters[0];

    // Merge the SIMD counters and scalar counter into one
    let counters: [u64; SIMD_WIDTH] = simd_counter.into();
    scalar_counter += counters.iter().sum::<u64>();
    scalar_counter
}
}

…but I observed no performance benefits with respect to the SIMD+ILP version, only a slight performance degradation that varied depending on the value of tuning parameters.

This puzzles me. Obviously I must be hitting some microarchitectural limit, but it’s not clear which. I looked up every source of microarchitectural documentation that I know of and tried to check every relevant AMD performance counter for this code and the previous one. But I found nothing that could explain this lack of ILP improvements.

However, AMD’s CPU performance counters are much less numerous than Intel’s, which suggests that they may have more blind spots. So it may be the case that if I re-tried this analysis on an Intel chip, I could find the answer to this enigma. Maybe I’ll do this someday. Or maybe one of you readers will find the answer before I do and tell me about it so I update the book accordingly!

In any case, though, since this approach does not currently provide any measurable improvement and requires even more convoluted code, I am very happy to let it go and stick with the simpler SIMD + ILP approach for the remainder of the current version of this book.

Refactoring

Speaking of keeping code understandable, before we scale this code up further, we need to do something about the cognitive complexity of the SIMD + ILP solution. While less scary than the hybrid scalar + SIMD version, this code still feels too complex, and we’re far from being done with it.

To address this, let me separate concerns of SIMD computation and instruction-level parallelism a bit. First, for SIMD abstraction purposes, let’s introduce the following trait.

#![allow(unused)]
fn main() {
/// Set of integer counters with SIMD semantics
pub trait SimdAccumulator<Counter>: Copy + Eq + pessimize::Pessimize + Sized {
    /// Number of inner accumulators
    const WIDTH: usize = std::mem::size_of::<Self>() / std::mem::size_of::<Counter>();

    /// Set up accumulators, all initialized to 0 or 1
    fn identity(one: bool) -> Self;

    /// Set up accumulators initialized to 0
    #[inline]
    fn zeros() -> Self {
        Self::identity(false)
    }

    /// Set up accumulators initialized to 1
    #[inline]
    fn ones() -> Self {
        Self::identity(true)
    }

    /// Merge another set of accumulators into this one
    fn add(self, other: Self) -> Self;

    /// Add one to every accumulator in the set in a manner that cannot be
    /// optimized out by the compiler
    #[inline]
    fn increment(&mut self) {
        *self = pessimize::hide(Self::add(*self, Self::ones()));
    }

    /// Merge another accumulator into this one
    #[inline]
    fn merge(&mut self, other: Self) {
        *self = Self::add(*self, other);
    }

    /// Doubles the size of Counter if it's not u64 yet, else stays at u64
    type ReducedCounter;

    /// Goes to another SimdAccumulator type that is half as wide if Counter is
    /// u64 and WIDTH is not yet 1, else stays at Self.
    type ReducedAccumulator: SimdAccumulator<Self::ReducedCounter>;

    /// Go to the next step down the reduction pipeline
    fn reduce_step(self) -> [Self::ReducedAccumulator; 2];

    /// Reduce all the way to a 64-bit counter
    /// Must be specialized to the identity function for u64 scalars
    #[inline]
    fn reduce(self) -> u64 {
        let [mut half, half2] = self.reduce_step();
        half.merge(half2);
        half.reduce()
    }
}
}

While mostly straightfoward, this trait has a number of generic knobs that we do not need yet but are going to use later on in this book. Let’s go through them briefly:

  • The trait is parametrized on an inner integer type, so that we can use all of the integer widths supported by a given SIMD vector type later on.
  • Reduction from a SIMD vector of counters to a single 64-bit integer is performed through a series of recursive steps:
    • First, we double inner integer size until we reach 64-bits by converting each half of the SIMD vector to a an integer size twice as wide and merging the resulting SIMD vectors.
    • Then we recursively split the eventual SIMD vector of 64-bit integers in halves and merge the halves until we get to a single 64-bit integer.

The trait can be easily implemented for both our initial u64 scalar counter and our new m128i SIMD counter as follows…

#![allow(unused)]
fn main() {
impl SimdAccumulator<u64> for safe_arch::m128i {
    #[inline]
    fn identity(one: bool) -> Self {
        Self::from([one as u64; <Self as SimdAccumulator<u64>>::WIDTH])
    }

    #[inline]
    fn add(self, other: Self) -> Self {
        safe_arch::add_i64_m128i(self, other)
    }

    // SSE vectors of 64-bit integers reduce to 64-bit integers
    type ReducedCounter = u64;
    type ReducedAccumulator = u64;
    //
    #[inline]
    fn reduce_step(self) -> [Self::ReducedAccumulator; 2] {
        self.into()
    }
}

impl SimdAccumulator<u64> for u64 {
    #[inline]
    fn identity(one: bool) -> Self {
        one as u64
    }

    #[inline]
    fn add(self, other: Self) -> Self {
        self + other
    }

    // Scalar u64 is the end of the reduction recursion
    type ReducedCounter = u64;
    type ReducedAccumulator = u64;
    //
    #[inline]
    fn reduce_step(self) -> [Self::ReducedAccumulator; 2] {
        [self, 0]
    }
    //
    #[inline]
    fn reduce(self) -> u64 {
        self
    }
}
}

…and then we can write a generic SIMD + ILP implementation that can work in terms of any implementation of SimdAccumulator<u64>, superseding all of our previous _ilp implementations:

#![allow(unused)]
fn main() {
pub fn generic_ilp_u64<const ILP_WIDTH: usize, Simd: SimdAccumulator<u64>>(target: u64) -> u64 {
    assert_ne!(ILP_WIDTH, 0, "No progress possible in this configuration");

    // Set up counters
    let mut simd_accumulators = [Simd::zeros(); ILP_WIDTH];

    // Accumulate in parallel
    let full_width = (Simd::WIDTH * ILP_WIDTH) as u64;
    for _ in 0..(target / full_width) {
        for simd_accumulator in &mut simd_accumulators {
            simd_accumulator.increment();
        }
    }

    // Accumulate remaining SIMD vectors of elements
    let mut remainder = (target % full_width) as usize;
    while remainder >= Simd::WIDTH {
        for simd_accumulator in simd_accumulators.iter_mut().take(remainder / Simd::WIDTH) {
            simd_accumulator.increment();
            remainder -= Simd::WIDTH;
        }
    }

    // Merge SIMD accumulators using parallel reduction
    let mut stride = ILP_WIDTH.next_power_of_two() / 2;
    while stride > 0 {
        for i in 0..stride.min(ILP_WIDTH - stride) {
            simd_accumulators[i].merge(simd_accumulators[i + stride]);
        }
        stride /= 2;
    }
    let simd_accumulator = simd_accumulators[0];

    // Merge the SIMD counters into a scalar counter
    let mut counter = simd_accumulator.reduce();

    // Accumulate trailing element, if any, the scalar way
    for _ in 0..remainder {
        counter.increment();
    }
    counter
}
}

With that, we can stop worrying about ILP for now and offload that concern to generic_ilp_u64, keeping our focus on perfecting our usage of SIMD.

Handling hardware heterogeneity

So far, I have been making two assumptions:

  • That our CPU supports SSE2 vectorization. For x86 hardware, that is quite reasonable as almost every CPU released since the Intel Pentium 4 in 2000 supports it, to the point that in fact binary releases of the Rust standard library and most Linux distributions assume availability of this instruction set on x86. But for non-x86 hardware, like say a smartphone, assuming availability of SSE2 would be very wrong.
  • That our CPU does not support any better vectorization scheme than SSE2. That is very much not reasonable, as many x86 CPUs support the wider AVX instruction set, and some support the even wider AVX-512 instruction set. And even if we were to stick with SSE, SSE4.1 did contain several interesting upgrades that I’ll want to use in later chapters of this book.

In this chapter, we’ll break free from these assumptions to produce counting code that runs everywhere and runs efficiently on most currently available x86 hardware, including of course Zen 2.

Scaling down

To correctly handle hardware diversity, we need a way to dispatch to multiple code paths, picking the best one for the hardware available.

One way to do this is at compile-time, using cfg attributes. And indeed, this is the only way supported by the safe_arch crate that we’ve used so far, so that’s the way we are going to use. Simple usage looks like this:

#![allow(unused)]
fn main() {
#[cfg(target_feature = "sse2")]
pub fn multiversion_sse2(target: u64) -> u64 {
    super::ilp::generic_ilp_u64::<9, safe_arch::m128i>(target)
}

#[cfg(not(target_feature = "sse2"))]
pub fn multiversion_sse2(target: u64) -> u64 {
    super::ilp::generic_ilp_u64::<15, u64>(target)
}
}

…and by sprinkling #[cfg(target_feature = "sse2")] in all the places where we have used SSE2-specific functionality before, we can get rid of our “supports SSE2” assumption by using our previous optimal SSE2 implementation where SSE2 is supported, and our previous optimal scalar implementation where SSE2 is not supported.

The end result may not be optimally tuned for, say, an ARM chip, but at least it will run. Tuning can be taken care of later through more cfg directives as more hardware becomes available for testing.

The biggest caveat of this approach is that users of older x86 hardware will need to adjust their compiler options to benefit from it, as we’ll demonstrate next.

Scaling up

Now that we have a way to adapt to hardware capabilities, we can now get more adventurous and support AVX2 vectorization, which is a wider cousin of SSE2 available on newer x86 CPUs from 2011 onwards. Using that, we can increment four 64-bit integers per instruction instead of two.

The support code should look straightforward enough if you’ve followed what I’ve done so far. The only bit that may be mildly surprising is the binary reduction tree in the SimdAccumulator::reduce() implementation, which is just the x86 SIMD equivalent of what we’ve been doing with ILP for a while now.

#![allow(unused)]
fn main() {
#[cfg(target_feature = "avx2")]
impl SimdAccumulator<u64> for safe_arch::m256i {
    #[inline]
    fn identity(one: bool) -> Self {
        Self::from([one as u64; <Self as SimdAccumulator<u64>>::WIDTH])
    }

    #[inline]
    fn add(self, other: Self) -> Self {
        safe_arch::add_i64_m256i(self, other)
    }

    // AVX vectors of 64-bit integers reduce to SSE vectors of 64-bit integers
    type ReducedCounter = u64;
    type ReducedAccumulator = safe_arch::m128i;
    //
    #[inline]
    fn reduce_step(self) -> [Self::ReducedAccumulator; 2] {
        [
            safe_arch::extract_m128i_m256i::<0>(self),
            safe_arch::extract_m128i_m256i::<1>(self),
        ]
    }
}

#[cfg(target_feature = "avx2")]
pub fn multiversion_avx2(target: u64) -> u64 {
    super::ilp::generic_ilp_u64::<9, safe_arch::m256i>(target)
}

#[cfg(not(target_feature = "avx2"))]
pub fn multiversion_avx2(target: u64) -> u64 {
    multiversion_sse2(target)
}
}

But to actually benefit from this, users need to compile the code with the RUSTFLAGS environment variable set to -C target-cpu=native. This will produce binaries specific to the host CPU, that will leverage all of its features, including AVX2 if available. And that’s the same way one would use SSE2 on older 32-bit x86 processors.

On Zen 2 and during high-intensity microbenchmarking, the AVX2 version is never slower than SSE2, and asymptotically twice as fast as one would expect. But it should be possible to find situations where the AVX2 version is slower than SSE2 one if it were called infrequently on older CPUs, due to power license sxitching phenomena. Here’s a discussion of those in an AVX-512 context.

And with that, we reach peak 64-bit integer increment rate on all CPUs up to but not including the AVX-512 generation (which I cannot test on, and thus will not cover here). And our new record to beat is twelve 64-bit integer increments per cycle or 51.3 billion integer increments per second.

When smaller is better

At this stage, we are computing 64-bit additions as fast as the CPU can, so you might expect that we have also reached peak counting throughput. But that is not the case yet.

On x86 as well as on many other CPU architectures, SIMD works by giving you a fixed-size bag of bits and letting you slice it in integers of any machine-supported size. So if you choose to slice your SIMD vector in smaller-sized integers, then you can do work on more of these integers per instruction. For example, given a 256-bit AVX vector, you can do either four 64-bit additions per cycle, eight 32-bit ones, sixteen 16-bit ones, or thirty-two 8-bit ones.

The simple way to use this property would be to redesign the API to use smaller-size counters, e.g. u32 ones. But at the rate we’re already going, we would overflow a 32-bit counter in about 20ms. So clearly, for our Big Counting purposes, we want to keep using 64-bit accumulators somewhere.

However, that does not mean we cannot use smaller-sized integers as temporary fast accumulators that eventually spill into these slower 64-bit ones.

Using 32-bit counters for illustration, the general approach is going to be along these lines…

#![allow(unused)]
fn main() {
let mut accumulator = 0u64;
let mut remainder = target;
while remainder > 0 {
    let mut counter = 0u32;
    for _ in 0..remainder.min(u32::MAX as u64) {
        counter = pessimize::hide(counter + 1);
        remainder -= 1;
    }
    accumulator += counter as u64;
}
}

…but with native-width SIMD, instruction-level parallelism, and ideally a choice of counter width as well since there’s a potential tradeoff there: small counters count more quickly but merge more often, and merging might be expensive, so it’s not absolutely certain that, say, 8-bit integers will be the fastest choice.

Narrow SIMD accumulators

Since the SimdAccumulator trait was designed for it, supporting narrower integers is, on its side, just a matter of going through the safe_arch documentation and filling in the right methods with appropriate conditional compilation directives.

Here I will do it for AVX vectors only, since that’s all I need for optimal performance on Zen 2 and other modern x86 CPUs. But hopefully you can easily see how the general approach would extend to SSE and non-x86 vector instruction sets.

#![allow(unused)]
fn main() {
#[cfg(target_feature = "avx2")]
impl SimdAccumulator<u8> for safe_arch::m256i {
    #[inline]
    fn identity(one: bool) -> Self {
        Self::from([one as u8; <Self as SimdAccumulator<u8>>::WIDTH])
    }

    #[inline]
    fn add(self, other: Self) -> Self {
        safe_arch::add_i8_m256i(self, other)
    }

    // AVX vectors of 8-bit integers reduce to AVX vectors of 16-bit integers
    type ReducedCounter = u16;
    type ReducedAccumulator = safe_arch::m256i;
    //
    #[inline]
    fn reduce_step(self) -> [Self::ReducedAccumulator; 2] {
        use safe_arch::m256i;
        fn extract_u16_m256i_from_u8_m256i<const LANE: i32>(u8s: m256i) -> m256i {
            let half = safe_arch::extract_m128i_m256i::<LANE>(u8s);
            safe_arch::convert_to_i16_m256i_from_u8_m128i(half)
        }
        [
            extract_u16_m256i_from_u8_m256i::<0>(self),
            extract_u16_m256i_from_u8_m256i::<1>(self),
        ]
    }
}

#[cfg(target_feature = "avx2")]
impl SimdAccumulator<u16> for safe_arch::m256i {
    #[inline]
    fn identity(one: bool) -> Self {
        Self::from([one as u16; <Self as SimdAccumulator<u16>>::WIDTH])
    }

    #[inline]
    fn add(self, other: Self) -> Self {
        safe_arch::add_i16_m256i(self, other)
    }

    // AVX vectors of 16-bit integers reduce to AVX vectors of 32-bit integers
    type ReducedCounter = u32;
    type ReducedAccumulator = safe_arch::m256i;
    //
    #[inline]
    fn reduce_step(self) -> [Self::ReducedAccumulator; 2] {
        use safe_arch::m256i;
        fn extract_u32_m256i_from_u16_m256i<const LANE: i32>(u16s: m256i) -> m256i {
            let half = safe_arch::extract_m128i_m256i::<LANE>(u16s);
            safe_arch::convert_to_i32_m256i_from_u16_m128i(half)
        }
        [
            extract_u32_m256i_from_u16_m256i::<0>(self),
            extract_u32_m256i_from_u16_m256i::<1>(self),
        ]
    }
}

#[cfg(target_feature = "avx2")]
impl SimdAccumulator<u32> for safe_arch::m256i {
    #[inline]
    fn identity(one: bool) -> Self {
        Self::from([one as u32; <Self as SimdAccumulator<u32>>::WIDTH])
    }

    #[inline]
    fn add(self, other: Self) -> Self {
        safe_arch::add_i32_m256i(self, other)
    }

    // AVX vectors of 32-bit integers reduce to AVX vectors of 64-bit integers
    type ReducedCounter = u64;
    type ReducedAccumulator = safe_arch::m256i;
    //
    #[inline]
    fn reduce_step(self) -> [Self::ReducedAccumulator; 2] {
        use safe_arch::m256i;
        fn extract_u64_m256i_from_u32_m256i<const LANE: i32>(u16s: m256i) -> m256i {
            let half = safe_arch::extract_m128i_m256i::<LANE>(u16s);
            safe_arch::convert_to_i64_m256i_from_u32_m128i(half)
        }
        [
            extract_u64_m256i_from_u32_m256i::<0>(self),
            extract_u64_m256i_from_u32_m256i::<1>(self),
        ]
    }
}
}

Simple full counter implementation

As currently written, generic_ilp_u64 will only reliably work with 64-bit accumulators. Narrower accumulators would overflow for larger values of target. To avoid this, we need to regularly spill into larger accumulators as outlined at the beginning of this chapter.

A simple if imperfect way to do so is to use the existing SimdAccumulator facilities to reduce our full narrow SIMD accumulators all the way to u64 integers, then integrate these results into a set of global u64 accumulators, every time a narrow integer overflow would happen:

#![allow(unused)]
fn main() {
pub fn generic_narrow_simple<
    Counter: num_traits::PrimInt,
    Simd: crate::simd::SimdAccumulator<Counter>,
>(
    target: u64,
) -> u64 {
    const ILP_WIDTH: usize = 10;

    // Set up narrow SIMD counters and wide scalar counters
    let mut simd_accumulators = [Simd::zeros(); ILP_WIDTH];
    let mut scalar_accumulators = [0u64; ILP_WIDTH];

    // Set up overflow avoidance through spilling to scalar counters
    let mut counter_usage = Counter::zero();
    let spill = |simd_accumulators: &mut [Simd; ILP_WIDTH],
                 scalar_accumulators: &mut [u64; ILP_WIDTH]| {
        for (scalar, simd) in scalar_accumulators
            .iter_mut()
            .zip(simd_accumulators.iter_mut())
        {
            *scalar += simd.reduce();
        }
        for simd in simd_accumulators {
            *simd = Simd::zeros();
        }
    };

    // Accumulate in parallel
    let mut remainder = target;
    let full_width = (Simd::WIDTH * ILP_WIDTH) as u64;
    while remainder >= full_width {
        // Perform a round of counting
        for simd_accumulator in &mut simd_accumulators {
            simd_accumulator.increment();
        }
        remainder -= full_width;

        // When the narrow SIMD counters fill up, spill to scalar counters
        counter_usage = counter_usage + Counter::one();
        if counter_usage == Counter::max_value() {
            spill(&mut simd_accumulators, &mut scalar_accumulators);
            counter_usage = Counter::zero();
        }
    }

    // Merge SIMD accumulators into scalar counters
    spill(&mut simd_accumulators, &mut scalar_accumulators);

    // Accumulate remaining elements in scalar counters
    while remainder > 0 {
        for scalar_accumulator in scalar_accumulators.iter_mut().take(remainder as usize) {
            scalar_accumulator.increment();
            remainder -= 1;
        }
    }

    // Merge scalar accumulators using parallel reduction
    let mut stride = ILP_WIDTH.next_power_of_two() / 2;
    while stride > 0 {
        for i in 0..stride.min(ILP_WIDTH - stride) {
            scalar_accumulators[i].merge(scalar_accumulators[i + stride]);
        }
        stride /= 2;
    }
    scalar_accumulators[0]
}

#[cfg(target_feature = "avx2")]
pub fn narrow_simple<Counter: num_traits::PrimInt>(target: u64) -> u64
where
    safe_arch::m256i: SimdAccumulator<Counter>,
{
    generic_narrow_simple::<Counter, safe_arch::m256i>(target)
}
}

…and if we test this implementation, we see that each halving of the counter width doubles our throughput until we reach u8 integers. There we only improve over u16 integers by a factor of 1.7 because we merge every 255 increments and the merging overhead starts to become noticeable.

Now, improving by a factor of 6.7x overall is already nice, but if we want this technique to get closer to the 8x speedup over the 64-bit SIMD version that it aimed for, then we need to reduce the overhead of merging. And the obvious way to do that is to refrain from reducing narrow SIMD accumulators all the way to scalar u64s when spilling to wider SIMD accumulators would suffice.

Along the way, I’ll also extract some complexity out of the full counter implementation, as it starts to pack too much complexity into a single function for my taste again.

Reaching peak asymptotic throughput

As outlined above, we want to have a cascading accumulation design where we increment SIMD vectors of 8-bit integers that spill into SIMD vectors of 16-bit integers when full, and those SIMD vectors of 16-bit integers in turn spill into scalar 64-bit integers as they did before.

One can offload this cascading accumulation concern to a dedicated struct…

#![allow(unused)]
fn main() {
struct U8Accumulator<const ILP_WIDTH: usize, Simd: SimdAccumulator<u8> + SimdAccumulator<u16>> {
    /// SIMD 8-bit integer accumulators
    simd_u8s: [Simd; ILP_WIDTH],

    /// Number of increments that occured in u8s
    u8s_usage: u8,

    /// SIMD 16-bit integer accumulators
    simd_u16s: [Simd; ILP_WIDTH],

    /// Number of u8s spills that occured in u16s
    u16s_usage: u8,

    /// Scalar integer accumulators
    scalars: [u64; ILP_WIDTH],
}
//
impl<
        const ILP_WIDTH: usize,
        Simd: SimdAccumulator<u8, ReducedAccumulator = Simd> + SimdAccumulator<u16>,
    > U8Accumulator<ILP_WIDTH, Simd>
{
    /// Total width including ILP
    pub const WIDTH: usize = ILP_WIDTH * <Simd as SimdAccumulator<u8>>::WIDTH;

    /// Set up accumulator
    pub fn new() -> Self {
        Self {
            simd_u8s: [<Simd as SimdAccumulator<u8>>::zeros(); ILP_WIDTH],
            u8s_usage: 0,
            simd_u16s: [<Simd as SimdAccumulator<u16>>::zeros(); ILP_WIDTH],
            u16s_usage: 0,
            scalars: [0; ILP_WIDTH],
        }
    }

    /// Increment counters
    #[inline]
    pub fn increment(&mut self) {
        // Perfom a 8-bit increment
        for simd_u8 in &mut self.simd_u8s {
            <Simd as SimdAccumulator<u8>>::increment(simd_u8);
        }
        self.u8s_usage += 1;

        // Spill to 16-bit counters if it's time
        if self.u8s_usage == u8::MAX {
            self.spill_u8s_to_u16s();
            self.u8s_usage = 0;
            self.u16s_usage += 1;
        }

        // Spill to scalar counters if it's time
        if self.u16s_usage == (u16::MAX / (2 * u8::MAX as u16)) as u8 {
            self.spill_u16s_to_scalars();
            self.u16s_usage = 0;
        }
    }

    /// Spill SIMD counters and extract scalar counters
    pub fn scalarize(mut self) -> [u64; ILP_WIDTH] {
        self.spill_u8s_to_u16s();
        self.spill_u16s_to_scalars();
        self.scalars
    }

    /// Spill 8-bit SIMD accumulators into matching 16-bit ones
    #[inline]
    fn spill_u8s_to_u16s(&mut self) {
        for (simd_u8, simd_u16) in self.simd_u8s.iter_mut().zip(self.simd_u16s.iter_mut()) {
            fn spill_one<SimdU8, SimdU16>(simd_u8: &mut SimdU8, simd_u16: &mut SimdU16)
            where
                SimdU8: SimdAccumulator<u8, ReducedAccumulator = SimdU16>,
                SimdU16: SimdAccumulator<u16>,
            {
                let [mut u16_contrib, u16_contrib_2] = simd_u8.reduce_step();
                u16_contrib.merge(u16_contrib_2);
                simd_u16.merge(u16_contrib);
                *simd_u8 = SimdU8::zeros();
            }
            spill_one(simd_u8, simd_u16);
        }
    }

    /// Spill 16-bit SIMD accumulators into matching scalar ones
    #[inline]
    fn spill_u16s_to_scalars(&mut self) {
        for (simd_u16, scalar) in self.simd_u16s.iter_mut().zip(self.scalars.iter_mut()) {
            fn spill_one<SimdU16: SimdAccumulator<u16>>(simd_u16: &mut SimdU16, scalar: &mut u64) {
                scalar.merge(simd_u16.reduce());
                *simd_u16 = SimdU16::zeros();
            }
            spill_one(simd_u16, scalar);
        }
    }
}
}

…and then build a top-level counting function that is expressed in terms of using this accumulator:

#![allow(unused)]
fn main() {
pub fn generic_narrow_u8<Simd>(target: u64) -> u64
where
    Simd: SimdAccumulator<u8, ReducedAccumulator = Simd> + SimdAccumulator<u16>,
{
    const ILP_WIDTH: usize = 10;

    // Set up accumulators
    let mut simd_accumulator = U8Accumulator::<ILP_WIDTH, Simd>::new();

    // Accumulate in parallel
    let mut remainder = target;
    let full_width = U8Accumulator::<ILP_WIDTH, Simd>::WIDTH as u64;
    while remainder >= full_width {
        simd_accumulator.increment();
        remainder -= full_width;
    }

    // Merge SIMD accumulators into scalar counters
    let mut scalar_accumulators = simd_accumulator.scalarize();

    // Accumulate remaining elements in scalar counters
    while remainder > 0 {
        for scalar_accumulator in scalar_accumulators.iter_mut().take(remainder as usize) {
            scalar_accumulator.increment();
            remainder -= 1;
        }
    }

    // Merge scalar accumulators using parallel reduction
    let mut stride = ILP_WIDTH.next_power_of_two() / 2;
    while stride > 0 {
        for i in 0..stride.min(ILP_WIDTH - stride) {
            scalar_accumulators[i].merge(scalar_accumulators[i + stride]);
        }
        stride /= 2;
    }
    scalar_accumulators[0]
}

#[cfg(target_feature = "avx2")]
pub fn narrow_u8(target: u64) -> u64 {
    generic_narrow_u8::<safe_arch::m256i>(target)
}
}

And with that, I get a 1.9x speedup with respect to the version that uses 16-bit integers, or a 7.7x speedup with respect to the version that does not use narrow integers, for a final asymptotic performance of 98 integer increments per CPU cycle or 393 billion integer increments per second.

Now, it’s time to talk about non-asymptotic throughput.

All this extra counter merging we now do to achieve 64-bit counting range using narrower integers is not free. It comes at the price of a steep reduction in performance at low iteration counts.

Merely invoking a narrow counting function with no work to do costs 14ns with 32-bit integers, 23ns with 16-bit integers, and 31ns with 8-bit integers. And at low iteration counts, we’re also penalized by the fact that we’re not trying to keep using the widest SIMD format at all times, as we did in the non-narrow SIMD versions, as the code for that would get quite gruesome when more and more accumulator reduction stages start to pile up!

As a result, 32-bit hierarchical counting only becomes faster than 64-bit SIMD counting when counting to more than 2048, then 16-bit counting catches up around 8192, and finally 8-bit counting becomes king of the hill around 16384 iterations.

But hey, at least we are eventually counting as fast as a single Zen 2 CPU core can count!

Improving small-scale throughput

While the “entry cost” cannot be reduced without losing some benefits of counting with smaller integers, the problem of poor performance at low iteration counts can be worked around by simply offloading the work of small-scale countings to our previous SIMD + ILP version, which has a very low entry cost and much better performance than counting with scalar 64-bit integers already!

#![allow(unused)]
fn main() {
pub fn generic_narrow_u8_tuned<Simd>(target: u64) -> u64
where
    Simd: SimdAccumulator<u8, ReducedAccumulator = Simd> + SimdAccumulator<u16>,
{
    const ILP_WIDTH: usize = 10;

    // Set up accumulators
    let mut simd_accumulator = U8Accumulator::<ILP_WIDTH, Simd>::new();

    // Accumulate in parallel
    let mut remainder = target;
    let full_width = U8Accumulator::<ILP_WIDTH, Simd>::WIDTH as u64;
    while remainder >= full_width {
        simd_accumulator.increment();
        remainder -= full_width;
    }

    // Merge SIMD accumulators into scalar counters
    let mut scalar_accumulators = simd_accumulator.scalarize();

    // Merge scalar accumulators using parallel reduction
    let mut stride = ILP_WIDTH.next_power_of_two() / 2;
    while stride > 0 {
        for i in 0..stride.min(ILP_WIDTH - stride) {
            scalar_accumulators[i].merge(scalar_accumulators[i + stride]);
        }
        stride /= 2;
    }
    scalar_accumulators[0] + super::multiversion::multiversion_avx2(remainder)
}

#[cfg(target_feature = "avx2")]
pub fn narrow_u8_tuned(target: u64) -> u64 {
    generic_narrow_u8_tuned::<safe_arch::m256i>(target)
}
}

And with that, we get much more satisfactory performance at low iteration counts:

  • Above 16 iterations, narrow_u8_tuned is faster than un-tuned narrow_u8
  • Above 64 iterations, it is faster than narrow_simple<u16>
  • Above 1024 iterations, it is faster than narrow_simple<u32>
  • Above 2048 iterations, it is faster than multiversion_avx2 alone

Basically, as soon as the timing of another solution gets above 35-40ns, narrow_u8_tuned beats it. And for 4096 iterations, narrow_u8_tuned is a whopping 2.6x faster than its un-tuned counterpart.

Speaking more generally, if you are trying to implement a general-purpose utility that must perform well on a very wide range of problem sizes, like memcpy() in C, it is a good idea to combine algorithms with low entry cost and suboptimal scaling with algorithms with high entry cost and better scaling like this. The price to pay being, of course, more implementation complexity.

Multithreading

Now that I have taken single-core counting as far as I can (which is a little less than 100 times the counting speed of a naive for loop), it is time to leverage all the cores of my CPU for absolute best-in-class leading counting performance.

On the surface, that sounds easy enough:

#![allow(unused)]
fn main() {
pub fn thread_basic(target: u64, sequential: impl Fn(u64) -> u64 + Sync) -> u64 {
    let num_threads = std::thread::available_parallelism()
        .map(usize::from)
        .unwrap_or(2);

    let base_share = target / num_threads as u64;
    let extra = target % num_threads as u64;
    let sequential = &sequential;

    std::thread::scope(|s| {
        let mut threads = Vec::with_capacity(num_threads);
        for thread in 0..num_threads as u64 {
            let share = base_share + (thread < extra) as u64;
            threads.push(s.spawn(move || sequential(share)));
        }
        threads.into_iter().map(|t| t.join().unwrap()).sum()
    })
}
}

But this version doesn’t perform super impressively, only achieving a 4.8x speed-up on 8 CPU cores even when counting to a very high 69 billion limit. Surely we can do better than that.

Reducing launch overhead

There are two problems with the simple approach to multithreading that I used at the start of this chapter. One that we can already see on my CPU, and another one that we would see if we were to run the code on a higher-end compute node.

The first problem is that creating and destroying thread uses up a lot of system resources, and thread_simple does that on every call. It would be better to create threads once at the start of the program, and keep them alive across benchmark runs, only submitting work to them through a lightweight synchronization mechanism.

The second problem is that we are spawning threads one by one from the main thread, which would become a bottleneck if we were to run this on a compute node with many CPU cores. To put things in perspective, one can already build rackmount servers with more than 200 physical CPU cores as of 2022, where this book is written, and systems with thousands of CPU cores per node are on the medium term horizon if semiconductor technology evolution progresses as expected.

Thankfully, I can offload both of these concerns to rayon, a great Rust library that provides both an automatically managed thread pool and a scalable fork-join task spawning mechanism, to get code that is simpler and performs better at little development cost :

#![allow(unused)]
fn main() {
pub fn thread_rayon(target: u64, sequential: impl Fn(u64) -> u64 + Sync) -> u64 {
    use rayon::prelude::*;
    rayon::iter::split(target, |target| {
        let half1 = target / 2;
        let half2 = target - half1;
        (half2, (half1 > 0).then_some(half1))
    })
    .map(&sequential)
    .sum()
}
}

And with that, we get a 7.6x speedup on 8 CPU cores when counting up to 2^36. This imperfect scaling largely comes from a slight reduction in CPU clock rate when using all CPU cores, with throughput per CPU clock cycle increasing by 7.9x on its side.

Now, three thousand billion counter increments per cycle is as far as I’m going to get on my CPU when it comes to asymptotic throughput. But this solution can be improved in other respects.

Before getting there, though, I would like to address another matter. Given that my CPU has hyperthreading functionality, it actually exposes 16 hardware threads, not 8, so you might expect a 16x speedup. But the truth is, for a well-tuned compute-bound task, that cannot happen.

The reason is that hyperthreading works by multiplexing two instruction streams over the execution resources of one CPU core. As a result, hyperthreading only provides performance benefits for code that fails to saturate a CPU core’s execution resources on its own, like our initial versions without ILP. Now that our code is tuned to do avoid this pitfall, the impact of hyperthreading on performance can only be at best neutral, and at worst negative.

To put in an other way, if you encounter code that performs better with hyperthreading than without, chances are high that the code is not putting the CPU to good use to begin with and could benefit from some single-threaded optimization. The main exception would be latency-sensitive code that spends its time waiting for external events but must react to them very quickly as soon as they occur, such as low-latency servers or hardware interrupt handlers in operating systems.

Custom thread pool

Using rayon makes it a lot cheaper to spawn parallel tasks than explicitly spawning threads on every counting run, but cheaper is not free.

When using all 16 hyperthreads of my 8-core CPU, small counting loops take microseconds to execute due to scheduling and synchronization overhead. As a result, it takes more than 32 million counting iterations for this execution configuration to beat the performance of sequential counting.

This overhead is, as one would expect, less severe when using less threads…

  • With 2 threads, sequential counting is beaten above 4 million iterations
  • With 4 threads pinned in such a way that they share a common CPU L3 cache shard, which reduces communication latency, it takes 8 million iterations.
  • With 8 threads or 4 poorly pinned threads, it takes 16 million iterations.

…but it’s still a big entry cost compared to the optimizations we have done previously, which could be problematic in latency-sensitive applications like audio processing. Can we do better than that?

Indeed we can because rayon is a general-purpose library, which supports features we do not need here like dynamic load balancing and arbitrary task code. By forgoing these features and accepting to write the code ourselves, we can implement a lighter-weight scheduling and synchronization protocol, and thus beat rayon’s performance. However, rayon is pretty well implemented, so it actually takes a decent amount of work to outperform it.

So, what building blocks do we need to count in parallel efficiently?

  • N-1 worker threads where N is the desired amount of parallelism. The thread which requested the counting will do its share of the work.
  • A mechanism for worker threads to wait for work to come up and for the main thread to wake them up, which must be immune to lost wake-ups.
  • A mechanism for the main thread to tell worker threads what work they need to do.
  • A mechanism for the main thread to wait for worker threads to be done and for worker threads to wake it up, again immune to lost wake-ups.
  • A mechanism for worker threads and the main thread to aggregate their temporary results into shared storage, and for the main thread to collect the final results.
  • A mechanism for worker threads and the main thread to tell each other to stop when the main thread exits or a worker thread panics.

That’s actually a fair amount of building blocks, and since we are trying to outperform rayon’s tuned implementation, we also need to implement them using a synchronization protocol that is relatively clever. Hence the end result is a big chunk of code. Let’s go through it step by step.

Read-Modify-Write operations and spin locks

Our synchronization protocol requires some atomic Read-Modify-Write (RMW) operations, where a thread must read a value from memory, compute a new dependent value, and store it in place of its former self, without other threads being able to intervene in the middle.

While x86 CPUs provides native support for such operations on <=64-bit words, they are very expensive (hundreds of CPU cycles in the absence of contention, and that grows super-linearly when contention occurs), so we want as few of them per synchronization transaction as possible.

To support blocking, well implemented mutexes must perform a minimum of two of these operations during a lock/unlock cycle:

  • One at locking time to check if the mutex is unlocked and if so lock it.
    • If the mutex is locked, at least one more to tell the thread holding the lock that we’re waiting if the lock is still held at that time, or else try to lock it again.
  • One at unlocking time to check if there are threads waiting for the unlock, which allows avoiding the overhead of calling into the OS to unblock them in the uncontended fast path.

For the purpose of this simple program, any synchronization transaction should be doable in one blocking OS transaction, so we should never need more than two hardware RMW operations in succession on the uncontended path. If we do, it means that we’re doing something inefficiently.

There are two ways to get that down to one single hardware RMW atomic operation:

  • If we have only one <=64-bit word to update, we can do it with one hardware RMW operation.
  • If we have multiple variables to update but we expect updates to take a very short amount of time (<1µs), then we can use a spin lock.

This last bit warrants some explanations since spin locks got a bit of a bad reputation recently, and for good reason: they are a rather specialized tool that tends to be overused.

Well implemented mutexes differ from well-implemented spin locks in the following ways:

  1. Mutexes need more expensive hardware RMW operations during lock acquisition to exchange more information with other threads. An RMW operation will always be needed, but on some hardware, not all RMW operations are born equal.
    • For example, fetch_add is less expensive than compare_exchange on x86 because the former is implemented using an infaillible instruction, avoiding the need for costly retries in presence of thread contention. But the more complexity you pile up into a single variable, the more likely you are to need the full power of compare_exchange or the higher-level fetch_update abstraction tht Rust builds on top of it.
  2. To detect other blocked threads, mutexes need a hardware RMW operation at unlock time, as mentioned earlier, whereas unlocking a spin lock only requires a write because the in-memory data only has two states, locked and unlocked, and only the thread holding the lock can perform the locked -> unlocked state transition. There is no information for it to read.
  3. If they fail to acquire the lock, mutexes will start by busy-waiting in userspace, but then eventually tell the OS scheduler that they are waiting for something. The OS can use this information to schedule them out and priorize running the thread holding the lock that they depend on. This is not possible with spin locks, which are doomed to either burn CPU cycles or yield to random other OS threads for unknown amounts of time.

Spin locks are used to avoid costs #1 and #2 (expensive RMW operations) at the cost of losing benefit #3 (efficient blocking during long waits).

The problem is that if long waits do happen, efficient blocking is actually very important, much more so than cheaping out on hardware RMW operations. Therefore, spin locks are only efficient when long waits are exceptional, and their performance degrades horribly as contention goes up. Thus, they require a very well-controlled execution environment where you can confidently assert that CPU cores are not oversubscribed for extended periods of time. And as a result, they are only relevant for specialized use cases, not general-purpose applications.

However, “specialized use case” is basically the motto of this book, so obviously I can and will provide the right environment guarantees for the sake of reaping those nice little spinlock perf profits.

And thus, when I’ll need to do batches of read-modify-write operations, I’ll allow myself to lock them through the following spin waiting mechanism. Which, conveniently enough, is also generic enough to be usable as part of a blocking synchronization strategy.

#![allow(unused)]
fn main() {
/// Spin until a condition is validated
///
/// Start with busy waiting in userpace with a cheap condition check. For longer
/// waits, burn less CPU cycles by yielding to the OS and allow the waiter to
/// switch to a more expensive check along the way.
///
/// Unless INFINITE is true, give up after a while and return None to allow for
/// proper OS-controlled blocking to take place.
///
/// Otherwise, only Some(result) options may be returned.
///
fn spin_loop<const INFINITE: bool, Ready>(
    mut cheap_check: impl FnMut() -> Option<Ready>,
    mut expensive_check: impl FnMut() -> Option<Ready>,
) -> Option<Ready> {
    // Tuning parameters, empirically optimal on my machine
    use std::time::{Duration, Instant};
    const SPIN_ITERS: usize = 300;
    const MAX_BACKOFF: usize = 1 << 2;
    const OS_SPIN_DELAY: Duration = Duration::from_nanos(1);
    const OS_SPIN_BOUND: Duration = Duration::from_micros(20);

    // Start with a userspace busy loop with a bit of exponential backoff
    let mut backoff = 1;
    for _ in 0..SPIN_ITERS {
        if let Some(ready) = cheap_check() {
            return Some(ready);
        }
        for _ in 0..backoff {
            std::hint::spin_loop();
        }
        backoff = (2 * backoff).min(MAX_BACKOFF);
    }

    // Switch to yielding to the OS once it's clear it's gonna take a while, to
    // reduce our CPU consumption at the cost of higher wakeup latency
    macro_rules! yield_iter {
        () => {
            // Check if the condition is now met
            if let Some(ready) = expensive_check() {
                return Some(ready);
            }

            // yield_now() would be semantically more correct for this situation
            // but is broken on Linux as the CFS scheduler just reschedules us.
            std::thread::sleep(OS_SPIN_DELAY);
        };
    }
    //
    if INFINITE {
        loop {
            yield_iter!();
        }
    } else {
        let start = Instant::now();
        while start.elapsed() < OS_SPIN_BOUND {
            yield_iter!();
        }
        expensive_check()
    }
}
}

Scheduling

Given read-modify-write building blocks, we can start to work on the scheduling mechanism used by the main thread to submit work to worker threads and by all parties involved to tell each other when it’s time to stop working, typically when the main thread exits or a worker thread panics.

As a minor spoiler, I’ll need to iterate on the implementation a bit later on, so let’s be generic over components implementing this logic via the following trait…

#![allow(unused)]
fn main() {
/// Mechanism to synchronize job startup and error handling
pub trait JobScheduler: Sync {
    /// Minimum accepted parallel job size, smaller jobs must be run sequentially
    const MIN_TARGET: u64;

    /// Start a job
    fn start(&self, target: u64);

    /// Request all threads to stop
    fn stop(&self);

    /// Check if the stop signal has been raised
    fn stopped(&self) -> bool;

    /// Wait for a counting job, or a stop signal
    ///
    /// This returns the full counting job, it is then up to this thread to
    /// figure out its share of work from it.
    ///
    fn wait_for_task(&self) -> Result<u64, Stopped>;

    /// Wait until all other worker threads have accepted their task
    ///
    /// Worker threads must do this before telling other threads that they are
    /// done with their current task, and `JobScheduler` implementations may
    /// rely on this to optimize `wait_for_task()`.
    ///
    fn wait_for_started(&self);

    /// Wait for a condition to be met or for the stop signal to be received
    fn faillible_spin_wait(&self, condition: impl Fn() -> bool) -> Result<Done, Stopped> {
        self.faillible_spin::<true>(condition).unwrap()
    }

    /// `spin_loop` specialization that monitors the stop signal like
    /// `faillible_spin_wait`
    fn faillible_spin<const INFINITE: bool>(
        &self,
        condition: impl Fn() -> bool,
    ) -> Option<Result<Done, Stopped>> {
        let condition = || condition().then_some(Ok(Done));
        let error = || self.stopped().then_some(Err(Stopped));
        spin_loop::<INFINITE, _>(condition, || condition().or(error()))
    }
}

/// Error type used to signal that the stop signal was raised
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Stopped;

/// Signal emitted by `faillible_spin` to tell that the condition was satisfied
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Done;
}

…which is easy to implement using an atomic variable and a standard Barrier.

#![allow(unused)]
fn main() {
/// Basic JobScheduler implementation, to be refined later on
pub struct BasicScheduler {
    /// Counting job (Some(target)) or request to stop (None)
    request: atomic::Atomic<Option<std::num::NonZeroU64>>,

    /// Mechanism for worker threads to await a request.
    /// This can take the form of an incoming job or a termination request.
    barrier: std::sync::Barrier,
}
//
impl BasicScheduler {
    /// Set up a new BasicScheduler
    pub fn new(num_threads: u32) -> Self {
        use atomic::Atomic;
        use std::{num::NonZeroU64, sync::Barrier};
        assert!(Atomic::<Option<NonZeroU64>>::is_lock_free());
        Self {
            request: Atomic::new(NonZeroU64::new(u64::MAX)),
            barrier: Barrier::new(num_threads.try_into().unwrap()),
        }
    }
}
//
impl JobScheduler for BasicScheduler {
    // We're reserving `target` == 0 for requests to stop
    const MIN_TARGET: u64 = 1;

    fn start(&self, target: u64) {
        use std::{num::NonZeroU64, sync::atomic::Ordering};

        // Package the counting request
        let request = NonZeroU64::new(target);
        assert!(
            request.is_some(),
            "Don't schedule jobs smaller than MIN_TARGET"
        );

        // Send it to the worker threads, making sure that worker threads have
        // not stopped on the other side of the pipeline.
        self.request
            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |old| {
                old.expect("Can't schedule this new job, some workers have stopped");
                Some(request)
            })
            .unwrap();

        // Wake up worker threads
        self.barrier.wait();
    }

    fn stop(&self) {
        self.request.store(None, atomic::Ordering::Relaxed);
        self.barrier.wait();
    }

    fn stopped(&self) -> bool {
        self.request.load(atomic::Ordering::Relaxed).is_none()
    }

    fn wait_for_task(&self) -> Result<u64, Stopped> {
        self.barrier.wait();
        self.request
            .load(atomic::Ordering::Relaxed)
            .map(u64::from)
            .ok_or(Stopped)
    }

    fn wait_for_started(&self) {}
}
}

Here, I am using a standard Barrier to have worker threads wait for job and stop signals. The main interesting thing that’s happening is that I am forbidding zero-sized parallel jobs, as allowed by the JobScheduler API, to repurpose that forbidden job size as a signal that threads should stop.

The astute reader will, however, notice that I am running afoul of my own “not more than two hardware RMW operations per transaction” rule in BasicScheduler::start(), as the blocking implementation of Barrier::wait() must use at least two such operations and I am using one more for error handling. This performance deficiency will be adressed in the next chapter.

Collecting results

After starting jobs, we need to wait for them to finish and collect results. Again, we’ll explore several ways of doing this, so let’s express what we need as a trait:

#![allow(unused)]
fn main() {
/// Synchronization primitive that aggregates per-thread contributions and can
/// tell when all threads have provided their contribution.
///
/// Logically equivalent to the combination of an atomic thread counter and
/// an atomic accumulator of results, where results are accumulated before
/// decreasing the thread counter.
///
pub trait Reducer: Sync {
    /// Clear the accumulator and prepare for `num_threads` contributions
    ///
    /// This is meant for use by the main thread inbetween jobs and should not
    /// be called while worker threads may observe the state of this object.
    ///
    fn reset(&self, num_threads: u32);

    /// Truth that not all threads have submitted their contribution yet
    ///
    /// If this is used for synchronization purposes, it should be followed by
    /// an `Acquire` memory barrier.
    ///
    fn has_remaining_threads(&self) -> bool;

    /// Optional data payload that threads can contribute
    ///
    /// In the special case where this is `()`, this `Reducer` just counts down
    /// until all threads have called `thread_done`, as a barrier of sorts.
    ///
    type Contribution;

    /// Sum of aggregated contributions for all threads so far
    fn current_result(&self) -> Self::Contribution;

    /// Wait for all contributions to have been received or an error to occur
    fn wait_for_end_result(
        &self,
        scheduler: &impl JobScheduler,
    ) -> Result<Self::Contribution, Stopped> {
        let res = scheduler.faillible_spin_wait(|| !self.has_remaining_threads());
        atomic::fence(atomic::Ordering::Acquire);
        res.map(|Done| self.current_result())
    }

    /// Accumulator identifier
    ///
    /// Some Reducers have an internal structure, where threads can target
    /// multiple inner accumulators. This identifies one such accumulator.
    ///
    type AccumulatorId: Copy;

    /// Process a contribution from one thread
    ///
    /// If this was the last thread, it gets the full aggregated job result.
    ///
    /// The notification that this thread has passed by and the check for job
    /// completion are performed as if done by a single atomic read-modify-write
    /// operation with the specified memory ordering.
    ///
    fn thread_done(
        &self,
        contribution: Self::Contribution,
        ordering: atomic::Ordering,
        accumulator_idx: Self::AccumulatorId,
    ) -> Option<Self::Contribution>;
}
}

To be able simultaneously track the aggregated 64-bit job result and the 32-bit counter of threads that still have to provide their contribution, we need 96 bits of state, which is more state than hardware can update in a single atomic read-modify-write transaction (well technically some hardware can do 128-bit atomics but they are almost twice as slow as their 64-bit counterparts).

To avoid the need for multiple expensive hardware atomic transactions, we synchronize writers using a spinlock, as we hinted at earlier.

#![allow(unused)]
fn main() {
#[derive(Default)]
pub struct BasicResultReducer {
    /// Spin lock used to synchronize concurrent read-modify-write operations
    ///
    /// Initially `false`, set to `true` while the lock is held.
    ///
    spin_lock: atomic::Atomic<bool>,

    /// Number of processing threads that have work to do
    ///
    /// This is set to `num_threads` when a job is submitted. Each time
    /// a thread is done with its task, it decrements this counter. So when it
    /// reaches 0, it tells the thread that last decremented it that all
    /// worker threads are done, with Acquire/Release synchronization.
    ///
    remaining_threads: atomic::Atomic<u32>,

    /// Sum of partial computation results
    result: atomic::Atomic<u64>,
}
//
impl BasicResultReducer {
    /// Create a new result reducer
    pub fn new() -> Self {
        Self::default()
    }

    /// Acquire spin lock
    fn lock(&self) -> ReducerGuard {
        use atomic::Ordering;

        // If we are the last thread, we do not need a lock
        if self.remaining_threads.load(Ordering::Relaxed) == 1 {
            atomic::fence(Ordering::Acquire);
            return ReducerGuard(self);
        }

        loop {
            // Try to opportunistically acquire the lock
            if !self.spin_lock.swap(true, Ordering::Acquire) {
                return ReducerGuard(self);
            }

            // Otherwise, wait for it to become available before retrying
            let check = || {
                if self.spin_lock.load(Ordering::Relaxed) {
                    None
                } else {
                    Some(())
                }
            };
            spin_loop::<true, ()>(check, check);
        }
    }
}
//
impl Reducer for BasicResultReducer {
    fn reset(&self, num_threads: u32) {
        use atomic::Ordering;
        debug_assert!(!self.spin_lock.load(Ordering::Relaxed));
        debug_assert_eq!(self.remaining_threads.load(Ordering::Relaxed), 0);
        self.remaining_threads.store(num_threads, Ordering::Relaxed);
        self.result.store(0, Ordering::Relaxed);
    }

    fn has_remaining_threads(&self) -> bool {
        self.remaining_threads.load(atomic::Ordering::Relaxed) != 0
    }

    type Contribution = u64;

    fn current_result(&self) -> u64 {
        self.result.load(atomic::Ordering::Relaxed)
    }

    type AccumulatorId = ();

    fn thread_done(&self, result: u64, mut ordering: atomic::Ordering, (): ()) -> Option<u64> {
        use atomic::Ordering;

        // Enforce a Release barrier so that threads observing this
        // notification with Acquire ordering also observe the merged results
        ordering = match ordering {
            Ordering::Relaxed => Ordering::Release,
            Ordering::Acquire => Ordering::AcqRel,
            Ordering::Release | Ordering::AcqRel | Ordering::SeqCst => ordering,
            _ => unimplemented!(),
        };

        // Merge our results, expose job results if done
        let mut lock = self.lock();
        let merged_result = lock.merge_result(result, Ordering::Relaxed);
        lock.notify_done(ordering).then_some(merged_result)
    }
}
//
/// Equivalent of `MutexGuard` for the BasicResultReducer spin lock
struct ReducerGuard<'aggregator>(&'aggregator BasicResultReducer);
//
impl<'aggregator> ReducerGuard<'aggregator> {
    /// Merge partial result `result`, get the current sum of partial results
    pub fn merge_result(&mut self, result: u64, order: atomic::Ordering) -> u64 {
        self.fetch_update(
            &self.0.result,
            order,
            |old| old <= u64::MAX - result,
            |old| old + result,
        )
    }

    /// Notify that this thread is done, tell if all threads are done
    pub fn notify_done(&mut self, order: atomic::Ordering) -> bool {
        self.fetch_update(
            &self.0.remaining_threads,
            order,
            |old| old > 0,
            |old| old - 1,
        ) == 0
    }

    /// Read-Modify-Write operation that is not atomic in hardware, but
    /// logically atomic if all concurrent writes to the target atomic variable
    /// require exclusive access to the spinlock-protected ReducerGuard
    ///
    /// In debug builds, the `check` sanity check is first performed on the
    /// existing value, then a new value is computed through `change`, inserted
    /// into the target atomic variable, and returned.
    ///
    /// Note that this is unlike the fetch_xyz functions of Atomic variables,
    /// which return the _previous_ value of the variable.
    ///
    fn fetch_update<T: bytemuck::NoUninit>(
        &mut self,
        target: &atomic::Atomic<T>,
        order: atomic::Ordering,
        check: impl FnOnce(T) -> bool,
        change: impl FnOnce(T) -> T,
    ) -> T {
        assert!(atomic::Atomic::<T>::is_lock_free());
        let [load_order, store_order] = Self::rmw_order(order);
        let old = target.load(load_order);
        debug_assert!(check(old));
        let new = change(old);
        target.store(new, store_order);
        new
    }

    /// Load and store ordering to be used when emulating an atomic
    /// read-modify-write operation under lock protection
    fn rmw_order(order: atomic::Ordering) -> [atomic::Ordering; 2] {
        use atomic::Ordering;
        match order {
            Ordering::Relaxed => [Ordering::Relaxed, Ordering::Relaxed],
            Ordering::Acquire => [Ordering::Acquire, Ordering::Relaxed],
            Ordering::Release => [Ordering::Relaxed, Ordering::Release],
            Ordering::AcqRel => [Ordering::Acquire, Ordering::Release],
            _ => unimplemented!(),
        }
    }
}
//
impl<'aggregator> Drop for ReducerGuard<'aggregator> {
    fn drop(&mut self) {
        self.0.spin_lock.store(false, atomic::Ordering::Release);
    }
}
}

Besides the spinlock, the main other interesting thing in this code is the choice of atomic memory orderings, which ensure that any thread which either acquires the spinlock or spins waiting for remaining_threads to reach 0 with Acquire ordering will get a consistent value of result at the time where remaining_threads reached 0.

Shared facilities

Now that we have ways to schedule jobs and collect results, we are ready to define the state shared between all processing threads, as well as its basic transactions.

#![allow(unused)]
fn main() {
/// State shared between the main thread and worker threads
struct SharedState<Counter: Fn(u64) -> u64 + Sync, Scheduler, ResultReducer: Reducer> {
    /// Counter implementation
    counter: Counter,

    /// Assignment of threads to reducer slots
    thread_ids: Box<[ResultReducer::AccumulatorId]>,

    /// Mechanism to synchronize job startup and error handling
    scheduler: Scheduler,

    /// Mechanism to synchronize task and job completion
    result_reducer: ResultReducer,
}
//
impl<
        Counter: Fn(u64) -> u64 + std::panic::RefUnwindSafe + Sync,
        Scheduler: JobScheduler,
        ResultReducer: Reducer<Contribution = u64>,
    > SharedState<Counter, Scheduler, ResultReducer>
{
    /// Set up shared state
    pub fn new(
        counter: Counter,
        thread_ids: Box<[ResultReducer::AccumulatorId]>,
        scheduler: Scheduler,
        result_reducer: ResultReducer,
    ) -> Self {
        assert!(thread_ids.len() < usize::try_from(u32::MAX).unwrap());
        Self {
            counter,
            thread_ids,
            scheduler,
            result_reducer,
        }
    }

    /// Schedule counting work and wait for the end result
    pub fn count(&self, target: u64) -> u64 {
        // Handle sequential special cases
        if self.num_threads() == 1 || target < Scheduler::MIN_TARGET {
            return (self.counter)(target);
        }

        // Schedule job
        let debug_log = |action| debug_log(true, action);
        debug_log("scheduling a new job");
        self.result_reducer.reset(self.num_threads());
        self.scheduler.start(target);

        // Do our share of the work
        let result = self.process(target, Self::MAIN_THREAD).unwrap_or_else(|| {
            // If we're not finishing last, wait for workers to finish
            debug_log("waiting for the job's result");
            self.result_reducer
                .wait_for_end_result(&self.scheduler)
                .expect("This job won't end because some workers have stopped")
        });
        debug_log("done");
        result
    }

    /// Request worker threads to stop
    pub fn stop(&self, is_main: bool) {
        debug_log(is_main, "sending the stop signal");
        self.scheduler.stop();
    }

    /// Worker thread implementation
    pub fn worker(&self, thread_idx: u32) {
        use std::panic::AssertUnwindSafe;
        assert!(thread_idx != Self::MAIN_THREAD);
        let debug_log = |action| debug_log(false, action);
        if let Err(payload) = std::panic::catch_unwind(AssertUnwindSafe(|| {
            // Normal work loop
            debug_log("waiting for its first job");
            while let Ok(target) = self.scheduler.wait_for_task() {
                self.process(target, thread_idx);
                debug_log("waiting for its next job")
            }
            debug_log("shutting down normally");
        })) {
            // In case of panic, tell others to stop before unwinding
            debug_log("panicking!");
            self.stop(false);
            std::panic::resume_unwind(payload);
        }
    }

    /// Thread index of the main thread
    const MAIN_THREAD: u32 = 0;

    /// Number of threads managed by this SharedState
    fn num_threads(&self) -> u32 {
        self.thread_ids.len() as u32
    }

    /// Process this thread's share of a job, tell if the job is done
    fn process(&self, target: u64, thread_idx: u32) -> Option<u64> {
        // Discriminate main thread from worker thread
        let is_main = thread_idx == Self::MAIN_THREAD;
        let is_worker = !is_main;

        // Determine which share of the counting work we'll take
        let thread_idx = thread_idx as u64;
        let num_threads = self.num_threads() as u64;
        let base_share = target / num_threads;
        let extra = target % num_threads;
        let share = base_share + (thread_idx < extra) as u64;

        // Do the counting work
        let debug_log = |action| debug_log(is_main, action);
        debug_log("executing its task");
        let result = (self.counter)(share);

        // Wait for other threads to have accepted their task
        if is_worker {
            debug_log("waiting for other tasks to be started");
            self.scheduler.wait_for_started();
        }

        // Merge our partial result into the global result
        debug_log("merging its result contribution");
        self.result_reducer.thread_done(
            result,
            atomic::Ordering::Release,
            self.thread_ids[thread_idx as usize],
        )
    }
}

/// Logs to ease debugging
fn debug_log(is_main: bool, action: &str) {
    if cfg!(debug_assertions) {
        let header = if is_main { "Main " } else { "" };
        // While using stdout here defies Unix convention, it also ensures that
        // the test harness can capture the message, unlike unbuffered stderr.
        println!("{header}{:?} is {action}", std::thread::current().id());
    }
}
}

This is a bit large, let’s go through it step by step.

TODO: Update

The shared state is a combination of the two synchronization primitives that we have introduced previously with a sequential counting implementation and a counter of processing threads.

The main thread starts a job by resetting the Accumulator, then waking up workers through the JobScheduler mechanism. After that, it does its share of the work by calling process(), which we’re going to describe later. Finally it either gets the final result directly by virtue of finishing last or waits for worker threads to provide it.

Worker threads go through a simple loop where they wait for scheduler.wait_for_task() to emit a job, then process it, then go back to waiting, until they are requested to stop. If a panic occurs, the panicking thread sends the stop signal so that other worker threads and the main threads eventually stop as well, avoiding a deadlock scenario where surviving worker threads would wait for a new job from the main thread, which itself waits for worker threads to finish the current job.

The processing logic in process() starts by splitting the job into roughly identical chunks, counts sequentially, then makes sure worker threads wait for other workers to have started working (which is going to be needed later on), and finally merges the result contribution using the Accumulator.

Finally, a little logger which is compiled out of release builds is provided, as this is empirically very useful when debugging incorrect logic in multi-threaded code.

Putting it all together

At this point, we have all the logic we need. The only thing left to do is to determine how many CPUs are available, spawn an appropriate number of worker threads, provide the top-level counting API, and making sure that when the main thread exits, it warns worker threads so they exit too.

#![allow(unused)]
fn main() {
pub struct BasicThreadPool<
    Counter: Fn(u64) -> u64 + std::panic::RefUnwindSafe + Sync + 'static,
    Scheduler: JobScheduler,
> {
    /// Worker threads
    _workers: Box<[std::thread::JoinHandle<()>]>,

    /// State shared with worker threads
    state: std::sync::Arc<SharedState<Counter, Scheduler, BasicResultReducer>>,
}
//
impl<
        Counter: Fn(u64) -> u64 + std::panic::RefUnwindSafe + Send + Sync + 'static,
        Scheduler: JobScheduler + Send + 'static,
    > BasicThreadPool<Counter, Scheduler>
{
    /// Set up worker threads with a certain counter implementation
    ///
    /// `make_scheduler` takes a number of threads as a parameter and sets up a
    /// Scheduler that can work with this number of threads.
    ///
    pub fn start(counter: Counter, make_scheduler: impl FnOnce(u32) -> Scheduler) -> Self {
        use std::{sync::Arc, thread};

        let num_threads = u32::try_from(
            std::thread::available_parallelism()
                .map(usize::from)
                .unwrap_or(2),
        )
        .expect("Number of threads must fit on 32 bits");

        let state = Arc::new(SharedState::new(
            counter,
            std::iter::repeat(()).take(num_threads as usize).collect(),
            make_scheduler(num_threads),
            BasicResultReducer::new(),
        ));
        let _workers = (1..num_threads)
            .map(|thread_idx| {
                let state2 = state.clone();
                thread::spawn(move || state2.worker(thread_idx))
            })
            .collect();

        Self { _workers, state }
    }

    /// Count in parallel using the worker threads
    ///
    /// This wants &mut because the shared state is not meant for concurrent use
    ///
    pub fn count(&mut self, target: u64) -> u64 {
        self.state.count(target)
    }
}
//
impl<
        Counter: Fn(u64) -> u64 + std::panic::RefUnwindSafe + Sync + 'static,
        Scheduler: JobScheduler,
    > Drop for BasicThreadPool<Counter, Scheduler>
{
    /// Tell worker threads to exit on Drop: we won't be sending more tasks
    fn drop(&mut self) {
        self.state.stop(true)
    }
}
}

And with that, we have a full custom parallel counting implementation that we can compare to the Rayon one. How well does it perform?

  • 2 well-pinned threads beat sequential counting above 256 thousand iterations (16x better).
  • 4 well-pinned threads do so above 1 million iterations (8x better).
  • 8 threads do so above 2 million iterations (8x better).
  • 16 hyperthreads do so above 8 million iterations, (4x better).
  • Asymptotic performance at large amounts of iterations is comparable.

So, with this specialized implementation, we’ve cut down the small-task overhead by a sizable factor that goes up as the number of threads goes down. But can we do better still by adressing my earlier comment that the naive JobScheduler above isn’t very optimal?

Custom synchronization

Running the custom threaded version with a small job through a profiler tells me that it spends a sizable fraction of its time in the Linux kernel’s futex() synchronization primitive, called by rustc’s internal condition variable implementation, itself triggered by our “wait for new task” Barrier.

As far as I know, futex() is indeed the fastest primitive provided by Linux for programs to await events. However, a condition variable may not be the most efficient API to use it for our purposes.

Condition variables are not designed for scalability. You need to hold a mutex just to await one, and when it is notified, every thread awating it will be awakened, immediately grabbing the mutex just to make sure other threads don’t get a chance to run yet.

Even when this underlying thundering herd problem is correctly accounted for, this API design prevents condition variable overheads from scaling anything better than linearly with the number of waiting threads, which matches our observations.

To its credit, this API design makes it a little harder to shoot yourself in the foot with lost wake-ups in naive usage. But it is unfortunate that it does so at the cost of making this synchronization primitive a poor pick in code where performance matters.

So if we want to go faster, we’ll need to skip the middleman and go lower-level, down to futex() and its cousins on other OSes. Fortunately, the atomic_wait crate provides a least common denominator platform abstraction layer across all popular desktop operating systems.

#![allow(unused)]
fn main() {
pub struct FutexScheduler {
    /// Number of threads targeted by this FutexScheduler
    num_threads: u32,

    /// Number of tasks to be grabbed or request to stop
    ///
    /// In normal operation, when a job is submitted, the main thread sets this
    /// to to the number of worker threads (which is `num_threads - 1`) and
    /// worker threads decrement it as they accept their tasks. Once this
    /// reaches zero, it means all tasks to be processed have been taken care of
    /// and next wait_for_task() callers should go to sleep.
    ///
    /// To request threads to stop, this is set to `u32::MAX`. In general,
    /// anything above `Self::STOP_THRESHOLD` indicates that threads should
    /// stop, having this flexibility makes cancelation cheaper to implement.
    ///
    /// Provides Acquire/Release synchronization with publisher of info read.
    ///
    task_futex: std::sync::atomic::AtomicU32,

    /// Requested or ongoing computation
    request: atomic::Atomic<u64>,
}
//
impl FutexScheduler {
    /// Set up a new FutexScheduler
    pub fn new(num_threads: u32) -> Self {
        use atomic::Atomic;
        use std::sync::atomic::AtomicU32;
        assert!(
            num_threads <= Self::STOP_THRESHOLD,
            "{} threads ought to be enough for anybody",
            Self::STOP_THRESHOLD
        );
        Self {
            num_threads,
            task_futex: AtomicU32::new(0),
            request: Atomic::default(),
        }
    }
}
//
impl JobScheduler for FutexScheduler {
    const MIN_TARGET: u64 = 0;

    fn start(&self, target: u64) {
        use atomic::Ordering;

        // Publish the target, that's not synchronization-critical
        self.request.store(target, Ordering::Relaxed);

        // Publish one task per worker thread, making sure that worker threads
        // have not stopped on the other side of the pipeline.
        self.task_futex
            .fetch_update(Ordering::Release, Ordering::Relaxed, |old| {
                assert!(
                    old < Self::STOP_THRESHOLD,
                    "Can't schedule this new job, some workers have stopped"
                );
                debug_assert_eq!(old, Self::NO_TASK);
                Some(self.num_threads - 1)
            })
            .unwrap();

        // Wake up worker threads
        atomic_wait::wake_all(&self.task_futex);
    }

    fn stop(&self) {
        self.task_futex.store(u32::MAX, atomic::Ordering::Release);
        atomic_wait::wake_all(&self.task_futex);
    }

    fn stopped(&self) -> bool {
        self.task_futex.load(atomic::Ordering::Relaxed) >= Self::STOP_THRESHOLD
    }

    fn wait_for_task(&self) -> Result<u64, Stopped> {
        use atomic::Ordering;
        loop {
            // Wait for a request to come in
            if self.faillible_spin::<false>(|| !self.no_task()).is_none() {
                atomic_wait::wait(&self.task_futex, Self::NO_TASK);
                if self.no_task() {
                    continue;
                }
            }

            // Acknowledge the request, check for concurrent stop signals
            let prev_started = self.task_futex.fetch_sub(1, Ordering::Acquire);
            debug_assert!(prev_started > Self::NO_TASK);
            if prev_started < Self::STOP_THRESHOLD {
                return Ok(self.request.load(Ordering::Relaxed));
            } else {
                return Err(Stopped);
            }
        }
    }

    fn wait_for_started(&self) {
        // No need to handle the stop signal at this stage, it will be caught
        // on the next call to wait_for_task().
        self.faillible_spin_wait(|| self.no_task()).unwrap_or(Done);
    }
}
//
impl FutexScheduler {
    /// This value of `task_futex` means there is no work to be done and worker
    /// threads should spin then sleep waiting for tasks to come up
    const NO_TASK: u32 = 0;

    /// Values of `task_futex` higher than this means all threads should stop
    const STOP_THRESHOLD: u32 = u32::MAX / 2;

    /// Truth that all scheduled tasks have been started
    fn no_task(&self) -> bool {
        self.task_futex.load(atomic::Ordering::Relaxed) == Self::NO_TASK
    }
}
}

How does that change in synchronization strategy affect performance?

  • 2 well-pinned threads beat sequential counting above 128 thousand iterations (2x better).
  • 4 well-pinned threads do so above 128 thousand iterations (8x better).
  • 8 threads do so above 512 thousand iterations (4x better).
  • 16 hyperthreads do so above 1 million iterations, (8x better).
  • Asymptotic performance at large amounts of iterations is comparable.

Scaling up

So far, by virtue of working on a “small” 8-core CPU, I have had the luxury of not caring at all about scalability to large thread counts and still getting decent synchronization performance. But now is the right time to start caring, be it only to cross-check my assumption that 8 cores is indeed “few” cores from a synchronization perspective.

What have I been doing wrong from a scalability perspective so far?

  • In my atomic read-modify-write transactions, all threads are interacting through a fixed set of atomic variables. This does not scale well on cache-coherent CPU architectures like x86:
    • Because coherent caches handle concurrent access to cache lines through the conceptual equivalent of a reader-writer lock, only one thread at a time can be in the process of synchronizing with other threads. A scalable synchronization protocol would instead allow threads to synchronize in parallel. This is done by using N-ary reduction trees, where threads start to synchronize in groups of N, then the group results are themselves synchronized in groups of N, and so on until a fully merged result is produced.
    • Cache coherence is implemented by physically transferring data from the L1 cache of one CPU core to that of another core through an interconnect whenever a write occurs. But not all CPU cores are at equal distance from the perspective of this interconnect. For example, my 8-core Zen 2 CPU is composed of two 4-core complexes, with faster data transfers within a complex than across complexes. Scalable synchronization protocols must account for this by minimizing data transfers across slower interconnects.
  • I have not paid attention to cache locality and false sharing concerns so far. In a nutshell, data which is manipulated together by a thread should reside on the same cache line to amortize inter-cache data transfer costs, while data which is not manipulated together should reside on different cache lines to enable parallel access by different CPU cores. Where these goals conflict, parallelism should usually be favored over reduction of inter-cache data transfers.

Given the enormous amount of work that has been expended on non-uniform memory access in Linux, which is a related problem that affects RAM accesses, it is safe to assume that the futex() Linux system call does the right thing here and we do not need to waste time implementing a futex() reduction tree. However, any synchronization that we do ourselves using atomic shared state will need to be reworked to follow the scalability guidelines outlined above.

Result aggregation tree

TODO: Use reduction trees with const MAX_ARITY + lstopo-driven splitting.

Streaming

TODO: What if the thread was counting all the time and we were just checking its value periodically?

GPU

TODO: Do this with wgpu, explain that we cannot do streaming counting on the GPU as that would kill the graphics stack but we can do streaming counting on the CPU while waiting for the GPU to produce its own results.

Conclusion (WIP)

TODO: This is most certainly a pointless calculation, but it does provide a nice occasion to touch on several things that matter in real calculations, and it is also not that far from a real calculation (just add some form of masking to turn pointless exhaustive counting into an actually useful statistical primitive). Hope you enjoyed this thxbye.

  I should definitely also send a copy of this to Paul McKenney with some
  background info on how this is a parallel programming book joke gone too
  far.

  Also, look ma no unsafe !

  General observations:
  - Know your hardware specs, question why you're not there yet
  - Parallelize, parallelize, parallelize
  - Quite a difference between ILP & SIMD where you usually need to do all
    the work yourself but it's easy, and multithreading where you can often
    be satisfied with what a generic library gives you but when you aren't
    it's an order of magnitude more tricky.