Posted on :: Tags: , , , ,

Welcome back to the second of a series of posts detailing the inter core communication components used in Mantra.

In the previous post I detailed the implementation and functionality of the Seqlock. As mentioned then, it is the core building block for all inter core synchronization in Mantra. In this second post, we will use it to construct the low-latency message Queues for event based communication, and SeqlockVectors for communicating the "latest" information. One example of the latter are the "ideal positions" that algo models produce based on incoming market data.

The SeqlockVectors are certainly the most straightforward of the two datastructures, so let's start with those.

SeqlockVector

As the name suggests, a SeqlockVector is really not much more than a contiguous buffer of Seqlocks. Each [Version, Data, Padding] triple in the image below denotes one Seqlock:

Fig 1. SeqlockVector

Our implementation allows constructing them in the private memory of a given process as well as in a piece of shared memory created by the OS.

SeqlockVectorHeader

To maximize the SeqlockVector's flexibility we also want it to describe itself to some degree by using the following SeqlockVectorHeader structure (shown in blue in the above figure):

#[derive(Debug)]
#[repr(C)]
pub struct SeqlockVectorHeader {
    elsize: usize,
    bufsize: usize
}

A bit underwhelming, perhaps. Nonetheless, it contains the information needed to allow a process to read the SeqlockHeader from shared memory, and know the number of bytes comprising each element (elsize), the number of elements (bufsize), and thus the total bytesize of the SeqlockVector:

const fn size_of(bufsize: usize) -> usize {
    std::mem::size_of::<SeqlockVectorHeader>()
        + bufsize * std::mem::size_of::<Seqlock<T>>()
}

We use [repr(C)] to stop the compiler from reordering the struct fields. This is mainly useful when accessing the SeqlockVector in shared memory by programs implemented in programming languages other than rust.

Theoretically fields can also be ordered differently by rust compilers with different versions.

Initialization

The SeqlockVector and its initialization are implemented as follows:

1use std::mem::{size_of, forget};
2use std::ptr::slice_from_raw_parts_mut;
3#[repr(C)]
4pub struct SeqlockVector<T: Copy> {
5 header: VectorHeader,
6 buffer: [Seqlock<T>],
7}
8impl<T: Copy> SeqlockVector<T> {
9 const fn size_of(bufsize: usize) -> usize {
10 size_of::<SeqlockVectorHeader>()
11 + bufsize * size_of::<Seqlock<T>>()
12 }
13
14 pub fn private(bufsize: usize) -> &'static Self {
15 let size = Self::size_of(bufsize);
16 unsafe {
17 let ptr = std::alloc::alloc_zeroed(
18 Layout::array::<u8>(size)
19 .unwrap()
20 .align_to(64)
21 .unwrap()
22 .pad_to_align(),
23 );
24 Self::from_uninitialized_ptr(ptr, bufsize)
25 }
26 }
27
28 pub fn shared<P: AsRef<std::path::Path>>(
29 shmem_flink: P,
30 bufsize: usize,
31 ) -> Result<&'static Self, &'static str> {
32 use shared_memory::{ShmemConf, ShmemError};
33 match ShmemConf::new()
34 .size(Self::size_of(bufsize))
35 .flink(&shmem_flink)
36 .create()
37 {
38 Ok(shmem) => {
39 let ptr = shmem.as_ptr();
40 forget(shmem);
41 Ok(Self::from_uninitialized_ptr(ptr, bufsize))
42 }
43 Err(ShmemError::LinkExists) => {
44 let shmem = ShmemConf::new().flink(shmem_flink).open().unwrap();
45 let ptr = shmem.as_ptr() as *mut VectorHeader;
46
47
48 let v = Self::from_initialized_ptr(ptr);
49 if v.header.bufsize < bufsize {
50 Err("Existing shmem too small")
51 } else {
52 v.header.bufsize = bufsize;
53 forget(shmem); // shared_memory will be cleaned up when `Dropping`, so we explicitely leak it here.
54 Ok(v)
55 }
56 }
57 Err(_) => {
58 Err("Unable to create or open shmem flink.")
59 }
60 }
61 }
62
63 pub fn from_uninitialized_ptr(
64 ptr: *mut u8,
65 bufsize: usize,
66 ) -> &'static Self {
67 unsafe {
68 let q = &*(slice_from_raw_parts_mut(ptr, bufsize) as *const SeqlockVector<T>);
69 let elsize = size_of::<SeqLock<T>>();
70 q.header.bufsize = bufsize;
71 q.header.elsize = elsize;
72 q
73 }
74 }
75
76 fn from_initialized_ptr(ptr: *mut VectorHeader) -> &'static Self {
77 unsafe {
78 let bufsize = (*ptr).bufsize;
79 &*(slice_from_raw_parts_mut(ptr, bufsize) as *const SeqlockVector<T>)
80 }
81 }
82
83 pub fn len(&self) -> usize {
84 self.header.bufsize
85 }
86}

The total size of a SeqlockVector<T> is given by the size_of function, private creates a new SeqlockVector in private memory only, and shared creates or opens a SeqlockVector in shared memory. For the latter functionality we use the shared_memory crate.

I have highlighted the notably tricky parts of the code. The first two make sure that opened shared memory does not get automatically cleaned up when shmem gets dropped by using forget, while the last two create a reference to the SeqlockedVector from the raw shared memory pointer.

Note!

Even though slice_from_raw_parts_mut is used to create a reference to the whole SeqlockVector, the length argument denotes the length of the unsized buffer slice only!

Read/write

The read and write implementations of the SeqlockVector are straightforwardly based on those of the Seqlock. See the code here.

SPMC/MPMC Message Queues

Without skippin a beat, we continue with the second datastructure that plays an even bigger role in Mantra. Low-latency single/multi producer multi consumer message Queues form the basis for the modular and robust architecture of Mantra.

The underlying structure is pretty much identical to the SeqlockVector, i.e. a buffer of Seqlocks with the actual data, and a QueueHeader with metadata about the Queue. As will become clear in due course, the buffer in this case is used as a ringbuffer.

#[repr(u8)]
pub enum QueueType {
    Unknown,
    MPMC,
    SPMC,
}
#[repr(C)]
pub struct QueueHeader {
    pub queue_type:         QueueType,   // 1
    pub is_initialized:     u8,          // 2
    _pad1:                  [u8; 6]      // 8
    pub elsize:             usize,       // 16
    mask:                   usize,       // 24
    pub count:              AtomicUsize, // 32
}

#[repr(C)]
pub struct Queue<T> {
    pub header: QueueHeader,
    buffer:     [seqlock::SeqLock<T>],
}

Before continuing with the Producer and Consumer implementations, let me reiterate that Producers are oblivious to the attached Consumers, leading to a couple clear benefits:

  • improves performance through less shared data and thus inter-core communication
  • greatly simplifying the implementation
  • a single misbehaving Consumer does not halt the entire system
  • attaching more Consumers to a Queue is trivial as it does not impacting the performance or functionality of the rest of the system

One obvious negative is that Consumers can get sped past by Producers, leading to data loss from dropped messages.

Luckily, this happens extremely infrequently in reality and all observed cases were easily remedied by optimizing the code or offloading to another Consumer on an additional cores. The points outlined above mean that even the latter, more dramatic case, is easily implementable.

Nonetheless, we at least want Consumers to be able to autonomously detect when they are sped past. As we will demonstrate, this can be achieved even though the only shared data between Consumers and Producers is the buffer of Seqlocks.

One final remark before continuing with the implemenation details is that all Consumers observe all messages flowing through a given Queue. This is sometimes referred to as broadcast or multicast mode.

Producer

1impl<T: Copy> Queue<T> {
2
3 pub fn len(&self) -> usize {
4 self.header.mask + 1
5 }
6
7 fn next_count(&self) -> usize {
8 match self.header.queue_type {
9 QueueType::Unknown => panic!("Unknown queue"),
10 QueueType::MPMC => self.header.count.fetch_add(1, Ordering::AcqRel),
11 QueueType::SPMC => {
12 let c = self.header.count.load(Ordering::Relaxed);
13 self.header
14 .count
15 .store(c.wrapping_add(1), Ordering::Relaxed);
16 c
17 }
18 }
19 }
20 fn load(&self, pos: usize) -> &SeqLock<T> {
21 unsafe { self.buffer.get_unchecked(pos) }
22 }
23
24 fn produce(&self, item: &T) -> usize {
25 let p = self.next_count();
26 let lock = self.load(p & self.header.mask);
27 lock.write(item);
28 p
29 }
30
31}

The count field is used to keep track of the position of the next free slot to write a message into.

The mask is used to make sure that the Producer loops back to the start of the Queue when it reaches the end of the buffer: if we only allow Queue sizes that are a power of two and set mask equal to busize - 1, the p & self.header.mask in line 21 is the same as p % bufsize. Let's try to understand this with a minimal example:

let bufsize = 8;
let mask = 8 - 1;            // 7
let p = 7 & mask;            // 0x...0111 & 0x...0111 = 0x...0111 = 7
let p_next = (p + 1) & mask; // 0x...1000 & 0x...0111 = 0x...0000 = 0

Very clever. Unfortunately I can not claim authorship of this rather well known trick, alas. It also avoids using

if (self.header.count == self.len()) {
    self.header.count = 0
} else {
    self.header.count += 1 //or fetch_add
}

in next_count, which would inevitably lead to branch prediction misses.

Line 10 shows the only difference between the single and multi Producer cases. count is shared between all of the Producers, and we have to make sure no two Producers try to write to the same slot at the same time. By using fetch_add a Producer can both reserve a slot for his message, as well as advance the counter for other Producers.

The only potential problem is that if another Producer manages to make it all the way around the buffer to the same slot, in the time that the first Producer is still writing the message. Then they might be writing to the same data at the same time. In reality it's clear that this never happens

Consumer implementation

As mentioned before, Consumers are be implemented such that only the Seqlock buffer is shared with other Consumers and Producers. We thus have to figure out a way for them to understand when the next message is ready, and wether they got sped past by a producer.

The version of the SeqLocks can be used for both given the following observations:

  1. the next message to be read will be in the slot whose version is lower than that of the previous slot
  2. the version of the slot containing the next message to read will be exactly 2 more than before once the message is ready
  3. the version of the Seqlocks can be fully deduced from the count of the queue: (count / bufsize) is how many times all Seqlocks have been written to and count % bufsize is the position of the next Seqlock to be written to

Let's then first initialize a Consumer with the current count and the expected version of the Seqlocks holding messages to be read:

#[repr(C)]
#[derive(Debug)]
pub struct Consumer<'a, T> {
    pos:              usize, // 8
    mask:             usize, // 16
    expected_version: usize, // 24
    queue:            &'a Queue<T>, // 48 fat ptr: (usize, pointer)
}

impl<'a, T: Copy> From<&'a Queue<T>> for Consumer<'a, T> {
    fn from(queue: &'a Queue<T>) -> Self {
        let mask = queue.header.mask;
        let c = queue.header.count.load(std::sync::atomic::Ordering::Relaxed);
        let pos = c & queue.header.mask;
        /* e.g.
            seqlock_versions = [4, 2, 2, 2, 2, 2, 2, 2]
                                   ^ next message, ready when version switches to 4
            len = 8
            count = 9
            count / len = 9 / 8 = 1
            1 << 1 = 2
            expected_version = 2 + 2 = 4
        */
        let expected_version = ((queue.header.count / (queue.len())) << 1) + 2;
        Self {
            pos,
            mask,
            expected_version,
            queue,
        }
    }
}

A Consumer knows when the next message is ready to be read when the Seqlock version jumps to the expected_version, i.e. when a Producer has finished writing it.

A Consumer can also deduce when it is sped past when it sees that the version of the next message it tries to read is higher than the expected_version. The only way that this is posible is if a Producer wrote to the Seqlock, thereby incrementing the version to the expected one, then looping all the way around the buffer back to the same Seqlock and writing another message, thus incrementing the version to one higher than expected_version. When the slow Consumer finally comes along it will have lost the data of the overwritten message because it was sped past. At least with this implementation we know and choose how to deal with it.

A visual representation of the possible scenarios is shown in the following figure:

Fig. 2: The operation flow of a Producer and Consumer

We start by implementing a new read_with_version function on the Seqlock:

use thiserror::Error;

#[derive(Error, Debug, Copy, Clone, PartialEq)]
pub enum ReadError {
    #[error("Got sped past")]
    SpedPast,
    #[error("Queue empty")]
    Empty,
}

#[inline(never)]
pub fn read_with_version(
    &self,
    result: &mut T,
    expected_version: usize,
) -> Result<(), ReadError> {
    loop {
        let v1 = self.version.load(Ordering::Acquire);
        if v1 != expected_version {
            if v1 < expected_version {
                return Err(ReadError::Empty);
            } else {
                return Err(ReadError::SpedPast);
            }
        }

        compiler_fence(Ordering::AcqRel);
        *result = unsafe { *self.data.get() };
        compiler_fence(Ordering::AcqRel);
        let v2 = self.version.load(Ordering::Acquire);
        if v1 == v2 {
            return Ok(());
        }
    }
}

The main difference to the previously discussed read function is that we now know what version the Consumer expects. By returning Result<(), ReadError>, the current situation is communicated back to the caller: either a message has succesfully been read (Ok(())), no message is ready yet (Err(ReadError::Empty)), or the Consumer was sped past (Err(ReadError::SpedPast)).

We implement the try_read function for the Consumer based on this:

fn update_pos(&mut self) {
    self.pos = (self.pos + 1) & self.mask;
    self.expected_version += 2 * (self.pos == 0) as usize;
}

/// Nonblocking consume returning either Ok(()) or a ReadError
pub fn try_consume(&mut self, el: &mut T) -> Result<(), ReadError> {
    self.queue.load(self.pos).read_with_version(el, self.pos, self.expected_version)?;
    self.update_pos();
    Ok(())
}

Conclusion

Boom, that's it for this installment folks! We have covered the main concepts behind how the SeqlockVector and Queue synchronization datastructures have been implemented in Mantra. These form the backbone that allow all systems to work together. We have implemented using the Seqlocks we have discussed in the previous post, and made them as self-sufficient as possible. As highlighted before, this helps with performance and overall robustness of the system.

Next time we will go about thorougly testing, benchmarking, and potentially optimizing the implementation, hope to see you there!.