Posted on :: Tags: , ,

As the first technical topic in this blog, I will discuss the main method of synchronizing the inter core communication used in Mantra: a Seqlock. It forms the fundamental building block for the "real" communication datastructures: Queues and SeqlockVectors, which will be the topic of the next blog post.

I have chosen the Seqlock because it is a lock-free synchronization primitive that is simple and should be capable of achieving almost the ideal core-to-core latency. Moreover, reading from the Seqlock does not require taking a lock at all. The worst case is that a Consumer needs to read multiple times until the Producer is done writing. This favors Producers over Consumers and also makes Consumers not impact eachother. This is ideal for a low-latency trading system, since we do not want a single Consumer or Produce that fails or is slow to bring the whole system to a halt.

I will start by getting straight to the final implemenation for those in a hurry.

We then continue with the whys behind that implementation. First we discuss how to verify the correctness of a Seqlock implementation. This will demonstrate how the concept of memory barriers is necessary to make it reliable. We investigate this by designing tests, observing the potential pitfalls of function inlining, looking at some assembly code (funky), and strong-arming the compiler to do our bidding.

Finally, we go through a quick 101 on low-latency timing followed by an investigation into the performance of our implementation.

Before continuing, I would like to give major credit to everyone involved with creating the following inspirational material

Design Goals and Considerations

Seqlock

The embodiment of the above goals in terms of synchronization techniques is the Seqlock (see Wikipedia, seqlock in the linux kernel, and Erik Rigtorp's C++11 implementation).

The key points are:

  • A Producer (or writer) is never blocked by Consumers (readers)
  • The Producer atomically increments a counter (the Seq in Seqlock) once before and once after writing the data
  • counter & 1 == 0 (even) communicates to Consumers that they can read data
  • counter_before_read == counter_after_read: data remained consistent while reading
  • Compare and swap could be used on the counter to allow multiple Producers to write to same Seqlock
  • Compilers and cpus in general can't be trusted, making it crucial to verify that the execution sequence indeed follows the steps we instructed. Memory barriers and fences are required to guarantee this in general

TL;DR

Out of solidarity with your scroll wheel and without further ado:

#[derive(Default)]
#[repr(align(64))]
pub struct Seqlock<T> {
    version: AtomicUsize,
    data: UnsafeCell<T>,
}
unsafe impl<T: Send> Send for Seqlock<T> {}
unsafe impl<T: Sync> Sync for Seqlock<T> {}

impl<T: Copy> Seqlock<T> {
    pub fn new(data: T) -> Self {
        Self {version: AtomicUsize::new(0), data: UnsafeCell::new(data)}
    }
    #[inline(never)]
    pub fn read(&self, result: &mut T) {
        loop {
            let v1 = self.version.load(Ordering::Acquire);
            compiler_fence(Ordering::AcqRel);
            *result = unsafe { *self.data.get() };
            compiler_fence(Ordering::AcqRel);
            let v2 = self.version.load(Ordering::Acquire);
            if v1 == v2 && v1 & 1 == 0 {
                return;
            }
        }
    }

    #[inline(never)]
    pub fn write(&self, val: &T) {
        let v = self.version.fetch_add(1, Ordering::Release);
        compiler_fence(Ordering::AcqRel);
        unsafe { *self.data.get() = *val };
        compiler_fence(Ordering::AcqRel);
        self.version.store(v.wrapping_add(2), Ordering::Release);
    }
}

That's it, till next time folks!

Are memory barriers necessary?

Most literature on Seqlocks focuses (rightly so) on guaranteeing correctness.

The first potential problem is that the stored data does not depend on the version of the Seqlock. This allows the compiler to merge or reorder the two increments to the version in the write function. The same goes for the checks on v1 and v2 on the read side of things.

It potentially gets worse, though: depending on the architecture of the cpu, read and write memory operations to version and data could be reordered on the hardware level.

Given that the Seqlock's correctness depends entirely on the sequence of version increments and checks around data writes and reads, these issues are big no-nos.

As we will investigate further below, memory barriers are the main solution to these issues. They keep the compiler in line by guaranteeing certain things, forcing it to adhere to the sequence of instructions that we specified in the code. The same applies to the cpu itself.

For x86 cpus, these barriers luckily do not require any additional cpu instructions, just that no instructions are reordered or ommitted. x86 cpus are strongly memory ordered, meaning that they guarantee the following: writes to some memory (i.e. version) can not be reordered with writes to other memory (i.e. data), and similar for reads. Other cpu architectures might require additional cpu instructions to enforce these guarantees. However, as long as we include the barriers, the rust compiler can figure out the rest.

See the Release-Acquire ordering section in the c++ reference for further information on the specific barrier construction that is used in the Seqlock.

Torn data testing

The first concern that we can relatively easily verify is data consistency. In the test below we verify that when a Consumer supposedly succesfully reads data, the Producer was indeed not simultaneously writing to it. We do this by making a Producer fill and write an array with an increasing counter, while a Consumer reads and verifies that all entries in the array are identical (see the highlighted line below). If reading and writing were to happen at the same time, the Consumer would at some point see partially new and partially old data with differing counter values. This would make the test fail.

1#[cfg(test)]
2mod tests {
3 use super::*;
4 use std::{sync::atomic::AtomicBool, time::{Duration, Instant}};
5
6 fn consumer_loop<const N: usize>(lock: &Seqlock<[usize;N]>, done: &AtomicBool) {
7 let mut msg = [0usize; N];
8 while !done.load(Ordering::Relaxed) {
9 lock.read(&mut msg);
10 let first = msg[0];
11 for i in msg {
12 assert_eq!(first, i);
13 }
14 }
15 }
16
17 fn producer_loop<const N: usize>(lock: &Seqlock<[usize;N]>, done: &AtomicBool) {
18 let curt = Instant::now();
19 let mut count = 0;
20 let mut msg = [0usize; N];
21 while curt.elapsed() < Duration::from_secs(1) {
22 msg.fill(count);
23 lock.write(&msg);
24 count = count.wrapping_add(1);
25 }
26 done.store(true, Ordering::Relaxed);
27 }
28
29 fn read_test<const N: usize>()
30 {
31 let lock = Seqlock::new([0usize; N]);
32 let done = AtomicBool::new(false);
33 std::thread::scope(|s| {
34 s.spawn(|| {
35 consumer_loop(&lock, &done);
36 });
37 s.spawn(|| {
38 producer_loop(&lock, &done);
39 });
40 });
41 }
42
43 #[test]
44 fn read_16() {
45 read_test::<16>()
46 }
47 #[test]
48 fn read_32() {
49 read_test::<32>()
50 }
51 #[test]
52 fn read_64() {
53 read_test::<64>()
54 }
55 #[test]
56 fn read_128() {
57 read_test::<128>()
58 }
59 #[test]
60 fn read_large() {
61 read_test::<65536>()
62 }
63}

If I run these tests on an intel i9 14900k, using the following simplified read and write implementations without memory barriers

pub fn read(&self, result: &mut T) {
    loop {
        let v1 = self.version.load(Ordering::Relaxed);
        *result = unsafe { *self.data.get() };
        let v2 = self.version.load(Ordering::Relaxed);
        if v1 == v2 && v1 & 1 == 0 {
            return;
        }
    }
}

pub fn write(&self, val: &T) {
    let v = self.version.load(Ordering::Relaxed).wrapping_add(1);
    self.version.store(v, Ordering::Relaxed);
    unsafe { *self.data.get() = *val };
    self.version.store(v.wrapping_add(1), Ordering::Relaxed);
}

I find that they fail for array sizes of 64 (512 bytes) and up. This signals that the compiler did some reordering of operations.

Inline, and Compiler Cleverness

Funnily enough, barriers are not necessarily needed to fix these tests. Yeah, I also was not happy that my illustrating example in fact does not illustrate what I was trying to illustrate.

Nonetheless, I chose to mention it because it highlights just how much the compiler will mangle your code if you let it. I will not paste the resulting assembly here as it is rather lengthy (see assembly lines (23, 50-303) on godbolt). The crux is that the compiler chose to inline the read function and then decided to move the let first = msg[0] statement of line (10) entirely before the while loop...

Strange? Maybe not. The compiler's reasoning here is actually similar to the one that requires us to use memory barriers. The essential point is, again, that the data field inside the Seqlock is not an atomic variable like version. This allows the compiler to assume that only the current thread touches it. Meanwhile, the Consumer thread never writes to data, so it never changes, right? Ha, might as well just set first = data[0] once and for all before starting with the actual read & verify loop. Of course, the reality is that the Producer is actually changing data. Thus, as soon as the Consumer thread reads it into msg, first != msg[i] causing our test to fail.

Interestingly, adding assert_ne!(msg[0], 0) after line (14) seems to make the compiler less sure about this code transformation because suddenly all tests pass. Looking at the resulting assembly confirms this observation as now line (10) is correctly executed each loop after first reading the Seqlock.

The first step towards provable correctness of the Seqlock is thus to add #[inline(never)] to the read and write functions.

Deeper dive using cargo asm

I kind of jumped the gun above with respect to reading compiler produced assembly. The tool I use by far the most for this is cargo asm. It can be easily installed using cargo and has a very user friendly terminal based interface. godbolt is another great choice, but it can become tedious to copy-paste all the supporting code when working on a larger codebase. In either case, I recommend adding #[inline(never)] to the function of interest so its assembly can be more easily filtered out.

Let's see what the compiler generates for the read function of a couple different array sizes.

Seqlock::<[usize; 1024]>::read

When using a large array with 1024 elements, the assembly reads

1code::Seqlock<T>::read:
2 .cfi_startproc
3 push r15
4 .cfi_def_cfa_offset 16
5 push r14
6 .cfi_def_cfa_offset 24
7 push r12
8 .cfi_def_cfa_offset 32
9 push rbx
10 .cfi_def_cfa_offset 40
11 push rax
12 .cfi_def_cfa_offset 48
13 .cfi_offset rbx, -40
14 .cfi_offset r12, -32
15 .cfi_offset r14, -24
16 .cfi_offset r15, -16
17 mov rbx, rsi
18 mov r14, rdi
19 mov r15, qword ptr [rip + memcpy@GOTPCREL]
20 .p2align 4, 0x90
21.LBB6_1:
22 mov r12, qword ptr [r14 + 8192]
23 mov edx, 8192
24 mov rdi, rbx
25 mov rsi, r14
26 call r15
27 mov rax, qword ptr [r14 + 8192]
28 test r12b, 1
29 jne .LBB6_1
30 cmp r12, rax
31 jne .LBB6_1
32 add rsp, 8
33 .cfi_def_cfa_offset 40
34 pop rbx
35 .cfi_def_cfa_offset 32
36 pop r12
37 .cfi_def_cfa_offset 24
38 pop r14
39 .cfi_def_cfa_offset 16
40 pop r15
41 .cfi_def_cfa_offset 8
42 ret

The first thing we observe in lines (19, 22, 27) is that the compiler chose not to adhere to the ordering of fields in our definition of the Seqlock, moving version behind data. If needed, the order of fields can be preserved by adding #[repr(C)].

The operational part of the read function is, instead, almost one-to-one translated into assembly:

  1. assign function pointer to memcpy to r15 for faster future calling
  2. move version at Seqlock start (r14) + 8192 bytes into r12
  3. perform the memcpy
  4. move version at Seqlock start (r14) + 8192 bytes into rax
  5. check r12 & 1 == 0
  6. check r12 == rax
  7. Profit...

Seqlock::<[usize; 1]>::read

For smaller array sizes we get

1code::Seqlock<T>::read:
2 .cfi_startproc
3 mov rax, qword ptr [rdi + 8]
4 .p2align 4, 0x90
5.LBB6_1:
6 mov rcx, qword ptr [rdi]
7 mov rdx, qword ptr [rdi]
8 test cl, 1
9 jne .LBB6_1
10 cmp rcx, rdx
11 jne .LBB6_1
12 mov qword ptr [rsi], rax
13 ret

Well at least it looks clean... I'm pretty sure I don't have to underline the issue with steps

  1. Do the copy of data into rax
  2. move version into rcx... and rdx?
  3. test version & 1 != 1
  4. test rcx == rdx... hol' on, what?
  5. copy from rax into the input
  6. wait a minute...

This is a good demonstration of why tests should not be blindly trusted and why double checking the produced assembly is good practice. In fact, I never got the tests to fail after adding the #[inline(never)] discussed earlier, even though the assembly clearly shows that nothing stops a read while a write is happening. This happens because the memcpy is done inline/in cache for small enough data, using moves between cache and registers (rax in this case). If a single instruction is used (mov here) it is never possible that the data is partially overwritten while reading, and it remains highly unlikely even when multiple instructions are required.

Adding Memory Barriers

Here we go:

1#[inline(never)]
2pub fn read(&self, result: &mut T) {
3 loop {
4 let v1 = self.version.load(Ordering::Acquire);
5 compiler_fence(Ordering::AcqRel);
6 *result = unsafe { *self.data.get() };
7 compiler_fence(Ordering::AcqRel);
8 let v2 = self.version.load(Ordering::Acquire);
9 if v1 == v2 && v1 & 1 == 0 {
10 return;
11 }
12 }
13}
1code::Seqlock<T>::read:
2 .cfi_startproc
3 .p2align 4, 0x90
4.LBB6_1:
5 mov rax, qword ptr [rdi]
6 #MEMBARRIER
7 mov rcx, qword ptr [rdi + 8]
8 mov qword ptr [rsi], rcx
9 #MEMBARRIER
10 mov rcx, qword ptr [rdi]
11 test al, 1
12 jne .LBB6_1
13 cmp rax, rcx
14 jne .LBB6_1
15 ret

It is interesting to see that the compiler chooses to reuse rcx both for the data copy in lines (5) and (6), as well as the second version load in line (8).

With the current rust compiler (1.78.0), I found that only adding Ordering::Acquire in lines (4) or (7) of the rust code already does the trick. However, they only guarantee the ordering of loads of the atomic version when combined with an Ordering::Release store in the write function, not when the actual data is copied in relation to it. That is where the compiler_fence comes in, guaranteeing also this ordering. As discussed before, adding these extra barriers in the code did not change the performance on x86.

The corresponding write function becomes:

#[inline(never)]
pub fn write(&self, val: &T) {
    let v = self.version.load(Ordering::Relaxed).wrapping_add(1);
    self.version.store(v, Ordering::Release);
    compiler_fence(Ordering::AcqRel);
    unsafe { *self.data.get() = *val };
    compiler_fence(Ordering::AcqRel);
    self.version.store(v.wrapping_add(1), Ordering::Release);
}

Our Seqlock implementation should now be correct, and is pretty much identical to others that can be found in the wild.

Having now understood a thing or two about memory barriers while solidifying our Seqlock, we turn to an aspect that is covered much less frequently: timing and potentially optimizing the implementation. Granted, there is not much room to play with here given the size of the functions. Nevertheless, some of the key concepts that I will discuss in the process will be used in many future posts.

P.S.: if memory models and barriers are really your schtick, live a little and marvel your way through The Linux Kernel Docs on Memory Barriers.

Performance

THe main question we will answer is: Does the fetch_add make the write function of the final implementation indeed faster?

Timing 101

The full details regarding the suite of timing and performance measurements tools I have developed to track the performance of Mantra will be divulged in a later post.

For now, the key points are:

Use rdtscp to take timestamps: the rdtscp hardware counter is a monotonously increasing cpu cycle counter (at base frequency) which is reset upon startup. What's even better is that on recent cpus it is shared between all cores (look for constant_tsc in /proc/cpuinfo). It is the cheapest, at ~5ns overhead, and most precise way to take timestamps. Another benefit for our usecase is that it also partially orders operations (see discussion above). It will not execute until "all previous instructions have executed and all previous loads are globally visible", see this. Using an _mm_lfence after the initial rdtscp will also force executions to not start before the timestamp is taken. This is the only reasonable way to time on really low latency scales.

use core_affinity and isolcpus: The combination of the isolcpus kernel parameter with binding a thread in rust to a specific core allows us to minimize jitter coming from whatever else is running on the computer. The p-cores on my cpu have been isolated for our testing purposes below. See Erik Rigtorp's low latency tuning guide for even more info.

Offload the actual timing: To minimize the timing overheadl, we take the two rdtscp stamps and offload them to a Queue in shared memory (more on what a Queue is later). Another process can then read these messages, collect statistics and convert rdtscp stamp deltas to nanoseconds (in the i9 14900k case x3.2). For this last step we can actually reuse the nanos_from_raw_delta function in the quanta library.

Putting it all together, a block of code can be timed like:

let t1 = unsafe { __rdtscp(&mut 0u32 as *mut _) };
unsafe { _mm_lfence() };
// code to be timed
let t2 = unsafe { __rdtscp(&mut 0u32 as *mut _) };
timer_queue.produce((t1, t2));

or, using my timing library

let mut timer = Timer::new("my_cool_timer");
timer.start();
//code to be timed
timer.stop();

with an added functionality where you can use a previously taken rdtscp timestamp to measure a latency:

timer.latency(prev_rdtscp);

The former will be called Business timing (for business logic), and the latter, you guessed it, Latency timing.

Throughout the following discussion we'll use a small tui tool I've created called timekeeper that ingests and displays these timing results:

Fig 1. Timekeeper example

Baseline Inter Core Latency

After isolating cpus, turning off hyperthreading, doing some more of the low latency tuning steps and switching to the performance governor, I've ran the following basic ping/pong code to provide us with some baseline latency timings:

#[repr(align(64))]
struct Test(AtomicI32);

fn one_way_2_lines(n_samples:usize) {
    let seq1 = Test(AtomicI32::new(-1));
    let seq2 = Test(AtomicI32::new(-1));
    std::thread::scope(|s| {
        s.spawn(|| {
            core_affinity::set_for_current(CoreId { id: 2});
            for _ in 0..n_samples {
                for n in 0..10000 {
                    while seq1.0.load(Ordering::Acquire) != n {}
                    seq2.0.store(n, Ordering::Release);
                }
            }
        });
        s.spawn(|| {
            let mut timer = Timer::new("one_way_2_lines");
            core_affinity::set_for_current(CoreId { id: 3 });
            for _ in 0..n_samples {
                for n in 0..10000 {
                    timer.start();
                    seq1.0.store(n, Ordering::Release);
                    while seq2.0.load(Ordering::Acquire) != n {}
                    timer.stop();
                }
            }
        });
    });
}

The "2_lines" stands for the fact that we are communicating through atomics seq1 and seq2 with each their own cacheline: i.e. #[repr(align(64))]. Running the code multiple times leads to:

Fig 2. Base line core-core latency

Bear in mind that the real latency is half of what is measured since these are round trip times.

The steps with different but constant average timings showcases the main difficulty with timing low level/low latency constructs: the cpu is essentially a black box and does a lot of memory and cache related wizardry behind the scenes to implement the MESI protocol. Combining this with branch prediction renders the final result quite dependent on the exact execution starting times of the threads, leading to different but stable averages each run.

Anyway, the lower end of these measurements serves as a sanity check and target for our Seqlock latency.

Seqlock performance

When using Seqlocks the Queues, the scenario is that Producers 99% of the time do not produce anything. This causes the Consumers to essentially busy spin on the Seqlock associated with the next message to read.

We reflect this in the timing code's setup:

  • a Producer writes an rdtscp timestamp into the Seqlock every 2 microseconds
  • a Consumer busy spins reading this timestamp
  • if it changes, the Consumer publishes a timing and latency measurement using the rdtscp value of the message as the starting point
  • 0 or more "contender" Consumers do the same to see how increasing Consumer contention impacts the main Producer and Consumer
#[derive(Clone, Copy)]
struct TimingMessage {
    rdtscp: Instant,
    data:   [u8; 1],
}

fn contender(lock: &Seqlock<TimingMessage>)
{
    let mut m = TimingMessage { rdtscp: Instant::now(), data: [0]};
    while m.data[0] == 0 {
        lock.read(&mut m);
    }
}

fn timed_consumer(lock: &Seqlock<TimingMessage>)
{
    let mut timer = Timer::new("read");
    core_affinity::set_for_current(CoreId { id: 1 });
    let mut m = TimingMessage { rdtscp: Instant::now(), data: [0]};
    let mut last = m.rdtscp;
    while m.data[0] == 0 {
        timer.start();
        lock.read(&mut m);
        if m.rdtscp != last {
            timer.stop();
            timer.latency_till_stop(m.rdtscp);
        }
        last = m.rdtscp;
    }
}

fn producer(lock: &Seqlock<TimingMessage>)
{
    let mut timer = Timer::new("write");
    core_affinity::set_for_current(CoreId { id: 2 });
    let mut m = TimingMessage { rdtscp: Instant::now(), data: [0]};
    let curt = Instant::now();
    while curt.elapsed() < Nanos::from_secs(5) {
        timer.start();
        m.rdtscp = Instant::now();
        lock.write(&m);
        timer.stop();
        let curt = Instant::now();
        while Instant::now() - curt < Nanos::from_micros(2) {}
    }
    m.data[0] = 1;
    lock.write(&m);
}

fn consumer_latency(n_contenders: usize) {
    let lock = Seqlock::default();
    std::thread::scope(|s| {
        for i in 1..(n_contenders + 1) {
            let lck = &lock;
            s.spawn(move || {
                core_affinity::set_for_current(CoreId { id: i + 2 });
                contender(lck);
            });
        }
        s.spawn(|| timed_consumer(&lock));
        s.spawn(|| producer(&lock));
    })
}

Starting Point

We use the Seqlock code we implemented above as the initial point, leading to the following latency timings for a single consumer (left) and 5 consumers (right):

Fig 3. Initial Consumer Latency

Would you look at that: timings are very stable without much jitter (tuning works!), the latency increase with increasing Consumer count is extremely minimal while the Producer gets... even faster(?), somehow. I triple checked and it is consistently reproducible.

Optimization

We are, however, still quite far off the ~30-40ns latency target. Looking closer at the write function, we realize that fetch_add is a single instruction version of lines (1) and (2):

1 let v = self.version.load(Ordering::Relaxed).wrapping_add(1);
2 self.version.store(v, Ordering::Release);
3 compiler_fence(Ordering::AcqRel);
4 unsafe { *self.data.get() = *val };
5 compiler_fence(Ordering::AcqRel);
6 self.version.store(v.wrapping_add(1), Ordering::Release);

which we thus change to:

    let v = self.version.fetch_add(1, Ordering::Release);
    compiler_fence(Ordering::AcqRel);
    unsafe { *self.data.get() = *val };
    compiler_fence(Ordering::AcqRel);
    self.version.store(v.wrapping_add(2), Ordering::Release);

Measuring again, we find that this leads to a serious improvement, almost halving the latency in the 1 Consumer case (left), while also slightly improving the 5 Consumer case (right):

Fig 4. Optimized Consumer Latency

Unfortunately, there is nothing that can be optimized on the read side of things.

One final optimization we'll proactively do is to add #[repr(align(64))] the Seqlocks:

#[repr(align(64))]
pub struct Seqlock<T> {
    version: AtomicUsize,
    data: UnsafeCell<T>,
}

This fixes potential false sharing issues by never having two or more Seqlocks on a single cache line. While it is not very important when using a single Seqlock, it becomes crucial when using them inside Queues and SeqlockVectors.

Looking back at our original design goals:

  • close to minimum inter core latency
  • Producers are never blocked
  • Consumers don't impact the Producers and themselves + adding more Consumers doesn't dramatically decrease performance

our implementation seems to be as good as it can be!

We thus conclude our deep dive into Seqlocks here. It is the main building block for the Queues and SeqlockVectors we will discuss in Pt 2 on inter core communication.

See you then!

Possible future investigations/improvements