When smaller is better

At this stage, we are computing 64-bit additions as fast as the CPU can, so you might expect that we have also reached peak counting throughput. But that is not the case yet.

On x86 as well as on many other CPU architectures, SIMD works by giving you a fixed-size bag of bits and letting you slice it in integers of any machine-supported size. So if you choose to slice your SIMD vector in smaller-sized integers, then you can do work on more of these integers per instruction. For example, given a 256-bit AVX vector, you can do either four 64-bit additions per cycle, eight 32-bit ones, sixteen 16-bit ones, or thirty-two 8-bit ones.

The simple way to use this property would be to redesign the API to use smaller-size counters, e.g. u32 ones. But at the rate we’re already going, we would overflow a 32-bit counter in about 20ms. So clearly, for our Big Counting purposes, we want to keep using 64-bit accumulators somewhere.

However, that does not mean we cannot use smaller-sized integers as temporary fast accumulators that eventually spill into these slower 64-bit ones.

Using 32-bit counters for illustration, the general approach is going to be along these lines…

#![allow(unused)]
fn main() {
let mut accumulator = 0u64;
let mut remainder = target;
while remainder > 0 {
    let mut counter = 0u32;
    for _ in 0..remainder.min(u32::MAX as u64) {
        counter = pessimize::hide(counter + 1);
        remainder -= 1;
    }
    accumulator += counter as u64;
}
}

…but with native-width SIMD, instruction-level parallelism, and ideally a choice of counter width as well since there’s a potential tradeoff there: small counters count more quickly but merge more often, and merging might be expensive, so it’s not absolutely certain that, say, 8-bit integers will be the fastest choice.

Narrow SIMD accumulators

Since the SimdAccumulator trait was designed for it, supporting narrower integers is, on its side, just a matter of going through the safe_arch documentation and filling in the right methods with appropriate conditional compilation directives.

Here I will do it for AVX vectors only, since that’s all I need for optimal performance on Zen 2 and other modern x86 CPUs. But hopefully you can easily see how the general approach would extend to SSE and non-x86 vector instruction sets.

#![allow(unused)]
fn main() {
#[cfg(target_feature = "avx2")]
impl SimdAccumulator<u8> for safe_arch::m256i {
    #[inline]
    fn identity(one: bool) -> Self {
        Self::from([one as u8; <Self as SimdAccumulator<u8>>::WIDTH])
    }

    #[inline]
    fn add(self, other: Self) -> Self {
        safe_arch::add_i8_m256i(self, other)
    }

    // AVX vectors of 8-bit integers reduce to AVX vectors of 16-bit integers
    type ReducedCounter = u16;
    type ReducedAccumulator = safe_arch::m256i;
    //
    #[inline]
    fn reduce_step(self) -> [Self::ReducedAccumulator; 2] {
        use safe_arch::m256i;
        fn extract_u16_m256i_from_u8_m256i<const LANE: i32>(u8s: m256i) -> m256i {
            let half = safe_arch::extract_m128i_m256i::<LANE>(u8s);
            safe_arch::convert_to_i16_m256i_from_u8_m128i(half)
        }
        [
            extract_u16_m256i_from_u8_m256i::<0>(self),
            extract_u16_m256i_from_u8_m256i::<1>(self),
        ]
    }
}

#[cfg(target_feature = "avx2")]
impl SimdAccumulator<u16> for safe_arch::m256i {
    #[inline]
    fn identity(one: bool) -> Self {
        Self::from([one as u16; <Self as SimdAccumulator<u16>>::WIDTH])
    }

    #[inline]
    fn add(self, other: Self) -> Self {
        safe_arch::add_i16_m256i(self, other)
    }

    // AVX vectors of 16-bit integers reduce to AVX vectors of 32-bit integers
    type ReducedCounter = u32;
    type ReducedAccumulator = safe_arch::m256i;
    //
    #[inline]
    fn reduce_step(self) -> [Self::ReducedAccumulator; 2] {
        use safe_arch::m256i;
        fn extract_u32_m256i_from_u16_m256i<const LANE: i32>(u16s: m256i) -> m256i {
            let half = safe_arch::extract_m128i_m256i::<LANE>(u16s);
            safe_arch::convert_to_i32_m256i_from_u16_m128i(half)
        }
        [
            extract_u32_m256i_from_u16_m256i::<0>(self),
            extract_u32_m256i_from_u16_m256i::<1>(self),
        ]
    }
}

#[cfg(target_feature = "avx2")]
impl SimdAccumulator<u32> for safe_arch::m256i {
    #[inline]
    fn identity(one: bool) -> Self {
        Self::from([one as u32; <Self as SimdAccumulator<u32>>::WIDTH])
    }

    #[inline]
    fn add(self, other: Self) -> Self {
        safe_arch::add_i32_m256i(self, other)
    }

    // AVX vectors of 32-bit integers reduce to AVX vectors of 64-bit integers
    type ReducedCounter = u64;
    type ReducedAccumulator = safe_arch::m256i;
    //
    #[inline]
    fn reduce_step(self) -> [Self::ReducedAccumulator; 2] {
        use safe_arch::m256i;
        fn extract_u64_m256i_from_u32_m256i<const LANE: i32>(u16s: m256i) -> m256i {
            let half = safe_arch::extract_m128i_m256i::<LANE>(u16s);
            safe_arch::convert_to_i64_m256i_from_u32_m128i(half)
        }
        [
            extract_u64_m256i_from_u32_m256i::<0>(self),
            extract_u64_m256i_from_u32_m256i::<1>(self),
        ]
    }
}
}

Simple full counter implementation

As currently written, generic_ilp_u64 will only reliably work with 64-bit accumulators. Narrower accumulators would overflow for larger values of target. To avoid this, we need to regularly spill into larger accumulators as outlined at the beginning of this chapter.

A simple if imperfect way to do so is to use the existing SimdAccumulator facilities to reduce our full narrow SIMD accumulators all the way to u64 integers, then integrate these results into a set of global u64 accumulators, every time a narrow integer overflow would happen:

#![allow(unused)]
fn main() {
pub fn generic_narrow_simple<
    Counter: num_traits::PrimInt,
    Simd: crate::simd::SimdAccumulator<Counter>,
>(
    target: u64,
) -> u64 {
    const ILP_WIDTH: usize = 10;

    // Set up narrow SIMD counters and wide scalar counters
    let mut simd_accumulators = [Simd::zeros(); ILP_WIDTH];
    let mut scalar_accumulators = [0u64; ILP_WIDTH];

    // Set up overflow avoidance through spilling to scalar counters
    let mut counter_usage = Counter::zero();
    let spill = |simd_accumulators: &mut [Simd; ILP_WIDTH],
                 scalar_accumulators: &mut [u64; ILP_WIDTH]| {
        for (scalar, simd) in scalar_accumulators
            .iter_mut()
            .zip(simd_accumulators.iter_mut())
        {
            *scalar += simd.reduce();
        }
        for simd in simd_accumulators {
            *simd = Simd::zeros();
        }
    };

    // Accumulate in parallel
    let mut remainder = target;
    let full_width = (Simd::WIDTH * ILP_WIDTH) as u64;
    while remainder >= full_width {
        // Perform a round of counting
        for simd_accumulator in &mut simd_accumulators {
            simd_accumulator.increment();
        }
        remainder -= full_width;

        // When the narrow SIMD counters fill up, spill to scalar counters
        counter_usage = counter_usage + Counter::one();
        if counter_usage == Counter::max_value() {
            spill(&mut simd_accumulators, &mut scalar_accumulators);
            counter_usage = Counter::zero();
        }
    }

    // Merge SIMD accumulators into scalar counters
    spill(&mut simd_accumulators, &mut scalar_accumulators);

    // Accumulate remaining elements in scalar counters
    while remainder > 0 {
        for scalar_accumulator in scalar_accumulators.iter_mut().take(remainder as usize) {
            scalar_accumulator.increment();
            remainder -= 1;
        }
    }

    // Merge scalar accumulators using parallel reduction
    let mut stride = ILP_WIDTH.next_power_of_two() / 2;
    while stride > 0 {
        for i in 0..stride.min(ILP_WIDTH - stride) {
            scalar_accumulators[i].merge(scalar_accumulators[i + stride]);
        }
        stride /= 2;
    }
    scalar_accumulators[0]
}

#[cfg(target_feature = "avx2")]
pub fn narrow_simple<Counter: num_traits::PrimInt>(target: u64) -> u64
where
    safe_arch::m256i: SimdAccumulator<Counter>,
{
    generic_narrow_simple::<Counter, safe_arch::m256i>(target)
}
}

…and if we test this implementation, we see that each halving of the counter width doubles our throughput until we reach u8 integers. There we only improve over u16 integers by a factor of 1.7 because we merge every 255 increments and the merging overhead starts to become noticeable.

Now, improving by a factor of 6.7x overall is already nice, but if we want this technique to get closer to the 8x speedup over the 64-bit SIMD version that it aimed for, then we need to reduce the overhead of merging. And the obvious way to do that is to refrain from reducing narrow SIMD accumulators all the way to scalar u64s when spilling to wider SIMD accumulators would suffice.

Along the way, I’ll also extract some complexity out of the full counter implementation, as it starts to pack too much complexity into a single function for my taste again.

Reaching peak asymptotic throughput

As outlined above, we want to have a cascading accumulation design where we increment SIMD vectors of 8-bit integers that spill into SIMD vectors of 16-bit integers when full, and those SIMD vectors of 16-bit integers in turn spill into scalar 64-bit integers as they did before.

One can offload this cascading accumulation concern to a dedicated struct…

#![allow(unused)]
fn main() {
struct U8Accumulator<const ILP_WIDTH: usize, Simd: SimdAccumulator<u8> + SimdAccumulator<u16>> {
    /// SIMD 8-bit integer accumulators
    simd_u8s: [Simd; ILP_WIDTH],

    /// Number of increments that occured in u8s
    u8s_usage: u8,

    /// SIMD 16-bit integer accumulators
    simd_u16s: [Simd; ILP_WIDTH],

    /// Number of u8s spills that occured in u16s
    u16s_usage: u8,

    /// Scalar integer accumulators
    scalars: [u64; ILP_WIDTH],
}
//
impl<
        const ILP_WIDTH: usize,
        Simd: SimdAccumulator<u8, ReducedAccumulator = Simd> + SimdAccumulator<u16>,
    > U8Accumulator<ILP_WIDTH, Simd>
{
    /// Total width including ILP
    pub const WIDTH: usize = ILP_WIDTH * <Simd as SimdAccumulator<u8>>::WIDTH;

    /// Set up accumulator
    pub fn new() -> Self {
        Self {
            simd_u8s: [<Simd as SimdAccumulator<u8>>::zeros(); ILP_WIDTH],
            u8s_usage: 0,
            simd_u16s: [<Simd as SimdAccumulator<u16>>::zeros(); ILP_WIDTH],
            u16s_usage: 0,
            scalars: [0; ILP_WIDTH],
        }
    }

    /// Increment counters
    #[inline]
    pub fn increment(&mut self) {
        // Perfom a 8-bit increment
        for simd_u8 in &mut self.simd_u8s {
            <Simd as SimdAccumulator<u8>>::increment(simd_u8);
        }
        self.u8s_usage += 1;

        // Spill to 16-bit counters if it's time
        if self.u8s_usage == u8::MAX {
            self.spill_u8s_to_u16s();
            self.u8s_usage = 0;
            self.u16s_usage += 1;
        }

        // Spill to scalar counters if it's time
        if self.u16s_usage == (u16::MAX / (2 * u8::MAX as u16)) as u8 {
            self.spill_u16s_to_scalars();
            self.u16s_usage = 0;
        }
    }

    /// Spill SIMD counters and extract scalar counters
    pub fn scalarize(mut self) -> [u64; ILP_WIDTH] {
        self.spill_u8s_to_u16s();
        self.spill_u16s_to_scalars();
        self.scalars
    }

    /// Spill 8-bit SIMD accumulators into matching 16-bit ones
    #[inline]
    fn spill_u8s_to_u16s(&mut self) {
        for (simd_u8, simd_u16) in self.simd_u8s.iter_mut().zip(self.simd_u16s.iter_mut()) {
            fn spill_one<SimdU8, SimdU16>(simd_u8: &mut SimdU8, simd_u16: &mut SimdU16)
            where
                SimdU8: SimdAccumulator<u8, ReducedAccumulator = SimdU16>,
                SimdU16: SimdAccumulator<u16>,
            {
                let [mut u16_contrib, u16_contrib_2] = simd_u8.reduce_step();
                u16_contrib.merge(u16_contrib_2);
                simd_u16.merge(u16_contrib);
                *simd_u8 = SimdU8::zeros();
            }
            spill_one(simd_u8, simd_u16);
        }
    }

    /// Spill 16-bit SIMD accumulators into matching scalar ones
    #[inline]
    fn spill_u16s_to_scalars(&mut self) {
        for (simd_u16, scalar) in self.simd_u16s.iter_mut().zip(self.scalars.iter_mut()) {
            fn spill_one<SimdU16: SimdAccumulator<u16>>(simd_u16: &mut SimdU16, scalar: &mut u64) {
                scalar.merge(simd_u16.reduce());
                *simd_u16 = SimdU16::zeros();
            }
            spill_one(simd_u16, scalar);
        }
    }
}
}

…and then build a top-level counting function that is expressed in terms of using this accumulator:

#![allow(unused)]
fn main() {
pub fn generic_narrow_u8<Simd>(target: u64) -> u64
where
    Simd: SimdAccumulator<u8, ReducedAccumulator = Simd> + SimdAccumulator<u16>,
{
    const ILP_WIDTH: usize = 10;

    // Set up accumulators
    let mut simd_accumulator = U8Accumulator::<ILP_WIDTH, Simd>::new();

    // Accumulate in parallel
    let mut remainder = target;
    let full_width = U8Accumulator::<ILP_WIDTH, Simd>::WIDTH as u64;
    while remainder >= full_width {
        simd_accumulator.increment();
        remainder -= full_width;
    }

    // Merge SIMD accumulators into scalar counters
    let mut scalar_accumulators = simd_accumulator.scalarize();

    // Accumulate remaining elements in scalar counters
    while remainder > 0 {
        for scalar_accumulator in scalar_accumulators.iter_mut().take(remainder as usize) {
            scalar_accumulator.increment();
            remainder -= 1;
        }
    }

    // Merge scalar accumulators using parallel reduction
    let mut stride = ILP_WIDTH.next_power_of_two() / 2;
    while stride > 0 {
        for i in 0..stride.min(ILP_WIDTH - stride) {
            scalar_accumulators[i].merge(scalar_accumulators[i + stride]);
        }
        stride /= 2;
    }
    scalar_accumulators[0]
}

#[cfg(target_feature = "avx2")]
pub fn narrow_u8(target: u64) -> u64 {
    generic_narrow_u8::<safe_arch::m256i>(target)
}
}

And with that, I get a 1.9x speedup with respect to the version that uses 16-bit integers, or a 7.7x speedup with respect to the version that does not use narrow integers, for a final asymptotic performance of 98 integer increments per CPU cycle or 393 billion integer increments per second.

Now, it’s time to talk about non-asymptotic throughput.

All this extra counter merging we now do to achieve 64-bit counting range using narrower integers is not free. It comes at the price of a steep reduction in performance at low iteration counts.

Merely invoking a narrow counting function with no work to do costs 14ns with 32-bit integers, 23ns with 16-bit integers, and 31ns with 8-bit integers. And at low iteration counts, we’re also penalized by the fact that we’re not trying to keep using the widest SIMD format at all times, as we did in the non-narrow SIMD versions, as the code for that would get quite gruesome when more and more accumulator reduction stages start to pile up!

As a result, 32-bit hierarchical counting only becomes faster than 64-bit SIMD counting when counting to more than 2048, then 16-bit counting catches up around 8192, and finally 8-bit counting becomes king of the hill around 16384 iterations.

But hey, at least we are eventually counting as fast as a single Zen 2 CPU core can count!

Improving small-scale throughput

While the “entry cost” cannot be reduced without losing some benefits of counting with smaller integers, the problem of poor performance at low iteration counts can be worked around by simply offloading the work of small-scale countings to our previous SIMD + ILP version, which has a very low entry cost and much better performance than counting with scalar 64-bit integers already!

#![allow(unused)]
fn main() {
pub fn generic_narrow_u8_tuned<Simd>(target: u64) -> u64
where
    Simd: SimdAccumulator<u8, ReducedAccumulator = Simd> + SimdAccumulator<u16>,
{
    const ILP_WIDTH: usize = 10;

    // Set up accumulators
    let mut simd_accumulator = U8Accumulator::<ILP_WIDTH, Simd>::new();

    // Accumulate in parallel
    let mut remainder = target;
    let full_width = U8Accumulator::<ILP_WIDTH, Simd>::WIDTH as u64;
    while remainder >= full_width {
        simd_accumulator.increment();
        remainder -= full_width;
    }

    // Merge SIMD accumulators into scalar counters
    let mut scalar_accumulators = simd_accumulator.scalarize();

    // Merge scalar accumulators using parallel reduction
    let mut stride = ILP_WIDTH.next_power_of_two() / 2;
    while stride > 0 {
        for i in 0..stride.min(ILP_WIDTH - stride) {
            scalar_accumulators[i].merge(scalar_accumulators[i + stride]);
        }
        stride /= 2;
    }
    scalar_accumulators[0] + super::multiversion::multiversion_avx2(remainder)
}

#[cfg(target_feature = "avx2")]
pub fn narrow_u8_tuned(target: u64) -> u64 {
    generic_narrow_u8_tuned::<safe_arch::m256i>(target)
}
}

And with that, we get much more satisfactory performance at low iteration counts:

  • Above 16 iterations, narrow_u8_tuned is faster than un-tuned narrow_u8
  • Above 64 iterations, it is faster than narrow_simple<u16>
  • Above 1024 iterations, it is faster than narrow_simple<u32>
  • Above 2048 iterations, it is faster than multiversion_avx2 alone

Basically, as soon as the timing of another solution gets above 35-40ns, narrow_u8_tuned beats it. And for 4096 iterations, narrow_u8_tuned is a whopping 2.6x faster than its un-tuned counterpart.

Speaking more generally, if you are trying to implement a general-purpose utility that must perform well on a very wide range of problem sizes, like memcpy() in C, it is a good idea to combine algorithms with low entry cost and suboptimal scaling with algorithms with high entry cost and better scaling like this. The price to pay being, of course, more implementation complexity.