Table of Contents
Welcome back to the second of a series of posts detailing the inter core communication components used in Mantra.
In the previous post I detailed the implementation and functionality of the Seqlock
. As mentioned then, it is the core building block for all inter core synchronization in Mantra. In this second post, we will use it to construct the low-latency message Queues
for event based communication, and SeqlockVectors
for communicating the "latest" information. One example of the latter are the "ideal positions" that algo models produce based on incoming market data.
The SeqlockVectors
are certainly the most straightforward of the two datastructures, so let's start with those.
SeqlockVector
As the name suggests, a SeqlockVector
is really not much more than a contiguous buffer of Seqlocks
. Each [Version, Data, Padding]
triple in the image below denotes one Seqlock
:
Fig 1. SeqlockVector
Our implementation allows constructing them in the private memory of a given process as well as in a piece of shared memory created by the OS.
SeqlockVectorHeader
To maximize the SeqlockVector's
flexibility we also want it to describe itself to some degree by using the following SeqlockVectorHeader
structure (shown in blue in the above figure):
#[derive(Debug)]
#[repr(C)]
pub struct SeqlockVectorHeader {
elsize: usize,
bufsize: usize
}
A bit underwhelming, perhaps. Nonetheless, it contains the information needed to allow a process to read the SeqlockHeader
from shared memory, and know the number of bytes comprising each element (elsize
), the number of elements (bufsize
), and thus the total bytesize of the SeqlockVector
:
const fn size_of(bufsize: usize) -> usize {
std::mem::size_of::<SeqlockVectorHeader>()
+ bufsize * std::mem::size_of::<Seqlock<T>>()
}
We use [repr(C)]
to stop the compiler from reordering the struct fields. This is mainly useful when accessing the SeqlockVector
in shared memory by programs implemented in programming languages other than rust.
Theoretically fields can also be ordered differently by rust compilers with different versions.
Initialization
The SeqlockVector
and its initialization are implemented as follows:
1 use std::mem::{size_of, forget};
2 use std::ptr::slice_from_raw_parts_mut;
3 #[repr(C)]
4 pub struct SeqlockVector<T: Copy> {
5 header: VectorHeader,
6 buffer: [Seqlock<T>],
7 }
8 impl<T: Copy> SeqlockVector<T> {
9 const fn size_of(bufsize: usize) -> usize {
10 size_of::<SeqlockVectorHeader>()
11 + bufsize * size_of::<Seqlock<T>>()
12 }
13
14 pub fn private(bufsize: usize) -> &'static Self {
15 let size = Self::size_of(bufsize);
16 unsafe {
17 let ptr = std::alloc::alloc_zeroed(
18 Layout::array::<u8>(size)
19 .unwrap()
20 .align_to(64)
21 .unwrap()
22 .pad_to_align(),
23 );
24 Self::from_uninitialized_ptr(ptr, bufsize)
25 }
26 }
27
28 pub fn shared<P: AsRef<std::path::Path>>(
29 shmem_flink: P,
30 bufsize: usize,
31 ) -> Result<&'static Self, &'static str> {
32 use shared_memory::{ShmemConf, ShmemError};
33 match ShmemConf::new()
34 .size(Self::size_of(bufsize))
35 .flink(&shmem_flink)
36 .create()
37 {
38 Ok(shmem) => {
39 let ptr = shmem.as_ptr();
40 forget(shmem);
41 Ok(Self::from_uninitialized_ptr(ptr, bufsize))
42 }
43 Err(ShmemError::LinkExists) => {
44 let shmem = ShmemConf::new().flink(shmem_flink).open().unwrap();
45 let ptr = shmem.as_ptr() as *mut VectorHeader;
46
47
48 let v = Self::from_initialized_ptr(ptr);
49 if v.header.bufsize < bufsize {
50 Err("Existing shmem too small")
51 } else {
52 v.header.bufsize = bufsize;
53 forget(shmem); // shared_memory will be cleaned up when `Dropping`, so we explicitely leak it here.
54 Ok(v)
55 }
56 }
57 Err(_) => {
58 Err("Unable to create or open shmem flink.")
59 }
60 }
61 }
62
63 pub fn from_uninitialized_ptr(
64 ptr: *mut u8,
65 bufsize: usize,
66 ) -> &'static Self {
67 unsafe {
68 let q = &*(slice_from_raw_parts_mut(ptr, bufsize) as *const SeqlockVector<T>);
69 let elsize = size_of::<SeqLock<T>>();
70 q.header.bufsize = bufsize;
71 q.header.elsize = elsize;
72 q
73 }
74 }
75
76 fn from_initialized_ptr(ptr: *mut VectorHeader) -> &'static Self {
77 unsafe {
78 let bufsize = (*ptr).bufsize;
79 &*(slice_from_raw_parts_mut(ptr, bufsize) as *const SeqlockVector<T>)
80 }
81 }
82
83 pub fn len(&self) -> usize {
84 self.header.bufsize
85 }
86 }
The total size of a SeqlockVector<T>
is given by the size_of
function, private
creates a new SeqlockVector
in private memory only, and shared
creates or opens a SeqlockVector
in shared memory. For the latter functionality we use the shared_memory
crate.
I have highlighted the notably tricky parts of the code. The first two make sure that opened shared memory does not get automatically cleaned up when shmem
gets dropped by using forget
, while the last two create a reference to the SeqlockedVector
from the raw shared memory pointer.
Even though slice_from_raw_parts_mut
is used to create a reference to the whole SeqlockVector
, the length
argument denotes the length of the unsized buffer
slice only!
Read/write
The read
and write
implementations of the SeqlockVector
are straightforwardly based on those of the Seqlock
. See the code here.
SPMC/MPMC Message Queues
Without skippin a beat, we continue with the second datastructure that plays an even bigger role in Mantra. Low-latency single/multi producer multi consumer message Queues
form the basis for the modular and robust architecture of Mantra.
The underlying structure is pretty much identical to the SeqlockVector
, i.e. a buffer
of Seqlocks
with the actual data, and a QueueHeader
with metadata about the Queue
. As will become clear in due course, the buffer
in this case is used as a ringbuffer.
#[repr(u8)]
pub enum QueueType {
Unknown,
MPMC,
SPMC,
}
#[repr(C)]
pub struct QueueHeader {
pub queue_type: QueueType, // 1
pub is_initialized: u8, // 2
_pad1: [u8; 6] // 8
pub elsize: usize, // 16
mask: usize, // 24
pub count: AtomicUsize, // 32
}
#[repr(C)]
pub struct Queue<T> {
pub header: QueueHeader,
buffer: [seqlock::SeqLock<T>],
}
Before continuing with the Producer
and Consumer
implementations, let me reiterate that Producers
are oblivious to the attached Consumers
, leading to a couple clear benefits:
- improves performance through less shared data and thus inter-core communication
- greatly simplifying the implementation
- a single misbehaving
Consumer
does not halt the entire system - attaching more
Consumers
to aQueue
is trivial as it does not impacting the performance or functionality of the rest of the system
One obvious negative is that Consumers
can get sped past by Producers
, leading to data loss from dropped messages.
Luckily, this happens extremely infrequently in reality and all observed cases were easily remedied by optimizing the code or offloading to another Consumer
on an additional cores. The points outlined above mean that even the latter, more dramatic case, is easily implementable.
Nonetheless, we at least want Consumers
to be able to autonomously detect when they are sped past. As we will demonstrate, this can be achieved even though the only shared data between Consumers
and Producers
is the buffer
of Seqlocks
.
One final remark before continuing with the implemenation details is that all Consumers
observe all messages flowing through a given Queue
. This is sometimes referred to as broadcast
or multicast
mode.
Producer
1 impl<T: Copy> Queue<T> {
2
3 pub fn len(&self) -> usize {
4 self.header.mask + 1
5 }
6
7 fn next_count(&self) -> usize {
8 match self.header.queue_type {
9 QueueType::Unknown => panic!("Unknown queue"),
10 QueueType::MPMC => self.header.count.fetch_add(1, Ordering::AcqRel),
11 QueueType::SPMC => {
12 let c = self.header.count.load(Ordering::Relaxed);
13 self.header
14 .count
15 .store(c.wrapping_add(1), Ordering::Relaxed);
16 c
17 }
18 }
19 }
20 fn load(&self, pos: usize) -> &SeqLock<T> {
21 unsafe { self.buffer.get_unchecked(pos) }
22 }
23
24 fn produce(&self, item: &T) -> usize {
25 let p = self.next_count();
26 let lock = self.load(p & self.header.mask);
27 lock.write(item);
28 p
29 }
30
31 }
The count
field is used to keep track of the position of the next free slot to write a message into.
The mask
is used to make sure that the Producer
loops back to the start of the Queue
when it reaches the end of the buffer
: if we only allow Queue
sizes that are a power of two and set mask
equal to busize - 1
, the p & self.header.mask
in line 21 is the same as p % bufsize
. Let's try to understand this with a minimal example:
let bufsize = 8;
let mask = 8 - 1; // 7
let p = 7 & mask; // 0x...0111 & 0x...0111 = 0x...0111 = 7
let p_next = (p + 1) & mask; // 0x...1000 & 0x...0111 = 0x...0000 = 0
Very clever. Unfortunately I can not claim authorship of this rather well known trick, alas. It also avoids using
if (self.header.count == self.len()) {
self.header.count = 0
} else {
self.header.count += 1 //or fetch_add
}
in next_count
, which would inevitably lead to branch prediction misses.
Line 10 shows the only difference between the single and multi Producer
cases. count
is shared between all of the Producers
, and we have to make sure no two Producers
try to write to the same slot at the same time. By using fetch_add
a Producer
can both reserve a slot for his message, as well as advance the counter for other Producers
.
The only potential problem is that if another
Producer
manages to make it all the way around thebuffer
to the same slot, in the time that the firstProducer
is still writing the message. Then they might be writing to the same data at the same time. In reality it's clear that this never happens
Consumer
implementation
As mentioned before, Consumers
are be implemented such that only the Seqlock
buffer is shared with other Consumers
and Producers
. We thus have to figure out a way for them to understand when the next message is ready, and wether they got sped past by a producer
.
The version
of the SeqLocks
can be used for both given the following observations:
- the next message to be read will be in the slot whose
version
is lower than that of the previous slot - the version of the slot containing the next message to read will be exactly 2 more than before once the message is ready
- the version of the
Seqlocks
can be fully deduced from thecount
of the queue:(count / bufsize)
is how many times allSeqlocks
have been written to andcount % bufsize
is the position of the nextSeqlock
to be written to
Let's then first initialize a Consumer
with the current count
and the expected version
of the Seqlocks
holding messages to be read:
#[repr(C)]
#[derive(Debug)]
pub struct Consumer<'a, T> {
pos: usize, // 8
mask: usize, // 16
expected_version: usize, // 24
queue: &'a Queue<T>, // 48 fat ptr: (usize, pointer)
}
impl<'a, T: Copy> From<&'a Queue<T>> for Consumer<'a, T> {
fn from(queue: &'a Queue<T>) -> Self {
let mask = queue.header.mask;
let c = queue.header.count.load(std::sync::atomic::Ordering::Relaxed);
let pos = c & queue.header.mask;
/* e.g.
seqlock_versions = [4, 2, 2, 2, 2, 2, 2, 2]
^ next message, ready when version switches to 4
len = 8
count = 9
count / len = 9 / 8 = 1
1 << 1 = 2
expected_version = 2 + 2 = 4
*/
let expected_version = ((queue.header.count / (queue.len())) << 1) + 2;
Self {
pos,
mask,
expected_version,
queue,
}
}
}
A Consumer
knows when the next message is ready to be read when the Seqlock
version
jumps to the expected_version
, i.e. when a Producer
has finished writing it.
A Consumer
can also deduce when it is sped past when it sees that the version
of the next message it tries to read is higher than the expected_version
. The only way that this is posible is if a Producer
wrote to the Seqlock
, thereby incrementing the version
to the expected one, then looping all the way around the buffer
back to the same Seqlock
and writing another message, thus incrementing the version
to one higher than expected_version
. When the slow Consumer
finally comes along it will have lost the data of the overwritten message because it was sped past. At least with this implementation we know and choose how to deal with it.
A visual representation of the possible scenarios is shown in the following figure:
Fig. 2: The operation flow of a Producer
and Consumer
We start by implementing a new read_with_version
function on the Seqlock
:
use thiserror::Error;
#[derive(Error, Debug, Copy, Clone, PartialEq)]
pub enum ReadError {
#[error("Got sped past")]
SpedPast,
#[error("Queue empty")]
Empty,
}
#[inline(never)]
pub fn read_with_version(
&self,
result: &mut T,
expected_version: usize,
) -> Result<(), ReadError> {
loop {
let v1 = self.version.load(Ordering::Acquire);
if v1 != expected_version {
if v1 < expected_version {
return Err(ReadError::Empty);
} else {
return Err(ReadError::SpedPast);
}
}
compiler_fence(Ordering::AcqRel);
*result = unsafe { *self.data.get() };
compiler_fence(Ordering::AcqRel);
let v2 = self.version.load(Ordering::Acquire);
if v1 == v2 {
return Ok(());
}
}
}
The main difference to the previously discussed read
function is that we now know what version
the Consumer
expects. By returning Result<(), ReadError>
, the current situation is communicated back to the caller: either a message has succesfully been read (Ok(())
), no message is ready yet (Err(ReadError::Empty)
), or the Consumer
was sped past (Err(ReadError::SpedPast)
).
We implement the try_read
function for the Consumer
based on this:
fn update_pos(&mut self) {
self.pos = (self.pos + 1) & self.mask;
self.expected_version += 2 * (self.pos == 0) as usize;
}
/// Nonblocking consume returning either Ok(()) or a ReadError
pub fn try_consume(&mut self, el: &mut T) -> Result<(), ReadError> {
self.queue.load(self.pos).read_with_version(el, self.pos, self.expected_version)?;
self.update_pos();
Ok(())
}
Conclusion
Boom, that's it for this installment folks! We have covered the main concepts behind how the SeqlockVector
and Queue
synchronization datastructures have been implemented in Mantra. These form the backbone that allow all systems to work together. We have implemented using the Seqlocks
we have discussed in the previous post, and made them as self-sufficient as possible. As highlighted before, this helps with performance and overall robustness of the system.
Next time we will go about thorougly testing, benchmarking, and potentially optimizing the implementation, hope to see you there!.