Kiss ![]()
Lock-Free Circular Queue
A common application of memory barriers/atomic operations
Single Producer Single Consumer
KFIFO
C++ version rewritten from the Linux kernel queue
No atomic variables are needed; by cleverly using memory barriers, the head and tail pointers are updated only after read/write completes, ensuring that the portion being read/written by the current thread is invisible to the other thread
template<class ValueType>
class kfifo {
private:
ValueType* buffer; /* the buffer holding the data */
unsigned int size; /* the size of the allocated buffer */
unsigned int in; /* data is added at offset (in % size) */
unsigned int out; /* data is extracted from off. (out % size) */
public:
kfifo(unsigned int sz) : size((1<sz)), in(0), out(0) {
buffer = new ValueType[size];
}
kfifo(const kfifo&) = delete;
kfifo(const kfifo&&) = delete;
void operator = (const kfifo&&) = delete;
~kfifo() {
delete[] buffer;
}
bool push(const char *item) {
/****** Both spin methods are correct *******/
// MODE1:
std::atomic_thread_fence(std::memory_order_acquire);
if (size - in + out == 0) {
return false;
} // spin outside the function (while)
// MODE2:
// while (size - in + out == 0){
// std::atomic_thread_fence(std::memory_order_acquire);
// } // spin inside the function (while)
std::atomic_thread_fence(std::memory_order_acq_rel);
std::memcpy(buffer+(in & (size - 1)), item, sizeof(ValueType));
std::atomic_thread_fence(std::memory_order_release);
++ in;
return true;
}
bool pop(char *item) {
// MODE1:
std::atomic_thread_fence(std::memory_order_acquire);
if (in == out) {
return false;
} // spin outside the function (while)
// MODE2:
// while (in == out){
// std::atomic_thread_fence(std::memory_order_acquire);
// } // spin inside the function (while)
std::atomic_thread_fence(std::memory_order_acquire);
std::memcpy(item, buffer+(out & (size - 1)), sizeof(ValueType));
std::atomic_thread_fence(std::memory_order_acq_rel);
++ out;
return true;
}
};
If the copy length is large or varies, it can be written as:
unsigned int push(const char *buf, unsigned int len)
{
unsigned int l;
len = min(len, size - in + out);
std::atomic_thread_fence(std::memory_order_acq_rel);
l = min(len, size - (in & (size - 1)));
memcpy(buffer + (in & (size - 1)), buf, l);
memcpy(buffer, buf + l, len - l);
std::atomic_thread_fence(std::memory_order_release);
in += len;
return len;
}
unsigned int pop(char *buf, unsigned int len)
{
unsigned int l;
len = min(len, in - out);
std::atomic_thread_fence(std::memory_order_acquire);
l = min(len, size - (out & (size - 1)));
memcpy(buf, buffer + (out & (size - 1)), l);
memcpy(buf + l, buffer, len - l);
std::atomic_thread_fence(std::memory_order_acq_rel);
out += len;
return len;
}
SPSC
Implemented using CAS (compare and swap) atomic operations
template <typename T>
class spsc {
T *data;
std::atomic<size_t> head{0}, tail{0};
size_t Cap;
public:
spsc(size_t siz): Cap(1<siz) {
data = new T[Cap];
}
spsc(const spsc&) = delete;
spsc &operator=(const spsc&) = delete;
spsc &operator=(const spsc&) volatile = delete;
bool push(const T &val) {
/**** The commented section is an incorrect version ****/
// size_t t;
// do {
// t = tail.load(std::memory_order_acquire);
// } while ((t + 1) & (Cap-1) == head.load(std::memory_order_acquire));
size_t t = tail.load(std::memory_order_relaxed);
if ((t + 1) % Cap == head.load(std::memory_order_acquire)) return false;
std::memcpy(data + t, &val, sizeof(T));
tail.store((t + 1) & (Cap-1), std::memory_order_release);
return true;
}
bool pop(T &val) {
// size_t h;
// do {
// h = head.load(std::memory_order_acquire);
// } while (h == tail.load(std::memory_order_acquire));
size_t h = head.load(std::memory_order_relaxed);
if (h == tail.load(std::memory_order_acquire)) return false;
std::memcpy(&val, data + h, sizeof(T));
head.store((h + 1) & (Cap-1), std::memory_order_release);
return true;
}
};
Multiple Producers and Consumers
Disruptor
Implementation Principle
Four position variables:
lastRead– position of the last read itemlastWrote– position of the last written itemlastDispatch– slot index of the last dispatch to a consumerwritableSeq– current writable slot index
readableSeq = lastDispatch + 1 records the current readable slot index.
Ordered from smallest to largest they are: lastRead, readableSeq, lastWrote, writableSeq.
Implementation logic:
- For a producer, first acquire a
writableSeqslot and updatewritableSeq; after writing, wait until all preceding slots are written before updatinglastWrote. - For a consumer, first acquire a
readableSeqslot and updatereadableSeq; after reading, wait until all preceding slots are read before updatinglastRead.
From this we know:
lastRead~readableSeq– part currently being consumedreadableSeq~lastWrote– part already produced and visible to consumerslastWrote~writableSeq– part currently being producedwritableSeq~lastRead– part already consumed and visible to producers
Thus only the fully updated parts are visible to the opposite threads.
Some Optimizations
- Align frequently used variables with
alignas(64)to avoid false sharing. - Speed up modulo operations by setting the queue size N to a power‑of‑two compile‑time constant and replace
x % Nwithx & (N‑1). - Avoid modulo by using
unsigned intfor the four position variables, which handle overflow automatically.
template<class ValueType , size_t N = DefaultRingBufferSize>
class Disruptor
{
public:
Disruptor() : _lastRead(-1L) , _lastWrote(-1L), _lastDispatch(-1L), _writableSeq(0L) , _stopWorking(false){};
~Disruptor() {};
Disruptor(const Disruptor&) = delete;
Disruptor(const Disruptor&&) = delete;
void operator=(const Disruptor&) = delete;
static_assert(((N > 0) && ((N & (~N + 1)) == N)),
"RingBuffer's size must be a positive power of 2");
template<typename T>
bool push(ValueType& val)
{
const uint64_t writableSeq = _writableSeq.fetch_add(1);
while (writableSeq - _lastRead > N)
{
if (_stopWorking.load()) {
// throw std::runtime_error("writing when stopped disruptor");
return false;
}
std::this_thread::yield();
}
std::memcpy(_ringBuf + (writableSeq & (N - 1)), &val, sizeof(ValueType));
while (writableSeq - 1 != _lastWrote);
_lastWrote = writableSeq;
return true;
};
bool pop(ValueType& val) {
const uint64_t readableSeq = _lastDispatch.fetch_add(1) + 1;
while (readableSeq > _lastWrote)
{
if (_stopWorking.load())
{
// throw std::runtime_error("reading when stopped disruptor");
return false;
}
std::this_thread::yield();
}
std::memcpy(&val, _ringBuf + (readableSeq & (N-1)), sizeof(ValueType));
while (readableSeq - 1 != _lastRead);
_lastRead = readableSeq;
return true;
}
bool empty() {return _writableSeq - _lastRead == 1;}
// Notify the disruptor to stop working. After calling this, if the buffer has been fully processed, the readable index will be –1L.
void stop() {_stopWorking.store(true);}
private:
alignas(64) std::atomic_bool _stopWorking;
alignas(64) std::atomic_uint64_t _lastRead;
alignas(64) std::atomic_uint64_t _lastWrote;
alignas(64) std::atomic_uint64_t _lastDispatch;
alignas(64) std::atomic_uint64_t _writableSeq;
ValueType _ringBuf[N];
};
RingBuffer
Implemented using CAS (compare and swap) operations
Implementation Principle
Wrap non‑atomic sections in a do while loop, and finish with a CAS check. If the CAS fails, it means another thread has taken the operation, so the whole process is retried until it can acquire the execution rights.
Note that the pointer must be read before it is updated to ensure correct concurrency. Considering that:
- In
push, the data copy must be placed outside the loop; otherwise, a thread that has acquired the execution right might have its written data overwritten by another thread that has not. - In
pop, copying inside the loop does not cause errors, but threads that have not acquired the execution right would repeatedly copy the same data, wasting CPU cycles.
Therefore two new pointers are introduced:
write– position after a successfulpushread– position after a successfulpop
From this we know:
read~head– part currently being readhead~write– part that is readablewrite~tail– part currently being writtentail~read– part that is writable
(Actually, this is essentially the same as the Disruptor.)c++
template
class RingBuffer {
public:
RingBuffer(size_t siz): Cap(1<<siz){ data = new T[Cap];}
RingBuffer(const RingBuffer&) = delete;
RingBuffer &operator=(const RingBuffer&) = delete;
RingBuffer &operator=(const RingBuffer&) volatile = delete;
bool push(const T &val) {
size_t t, w;
do {
t = tail.load();
if ((t + 1) % Cap == head.load())
return false;
} while (!tail.compare_exchange_weak(t, (t + 1) % Cap));
std::memcpy(data+t, &val, sizeof(T));
do {
w = t;
} while (!write.compare_exchange_weak(w, (w + 1) % Cap));
return true;
}
bool pop(T &val) {
size_t h;
do {
h = head.load();
if (h == write.load())
return false;
std::memcpy(&val, data+h, sizeof(T));
} while (!head.compare_exchange_strong(h, (h + 1) % Cap));
return true;
}
private:
T* data;
std::atomic<size_t> head{0}, tail{0}, write{0};
uint64_t Cap;
};
## Test
`using ValueType = char[64]`, testing with a 512‑bit character array as a single element
### Performance Test
All threads are bound to isolated cores
Excerpt of results:
```bash
*:~/fastQ/bin$ ./test_sng --op 3e9
Sng_Queue Performance Test for Queue size 1<<10, 1 producers, 1 consumers, 3000000000 operations per producer, 0 nanoseconds per production, 0 nanoseconds per consumption
Mutex CircularQueue Elapsed time: 350.593 seconds
KFIFO Elapsed time: 34.319 seconds
SPSC Elapsed time: 32.0383 seconds
*:~/fastQ/bin$ ./test_muti --pd 3 --cs 3 --op 1e9
Muti_Queue Performance Test for Queue size 1<<10, 3 producers, 3 consumers, 1000000000 operations per producer, 0 nanoseconds per production, 0 nanoseconds per consumption
Muti-ptr Disruptor Elapsed time: 321.647 seconds
Atomic RingBuffer Elapsed time: 710.012 seconds
Mutex CircularQueue Elapsed time: 650.429 seconds
*:~/fastQ/bin$ ./test_muti --pd 3 --cs 1 --op 1e9
Muti_Queue Performance Test for Queue size 1<<10, 3 producers, 1 consumers, 1000000000 operations per producer, 0 nanoseconds per production, 0 nanoseconds per consumption
Muti-ptr Disruptor Elapsed time: 234.385 seconds
Atomic RingBuffer Elapsed time: 430.647 seconds
Mutex CircularQueue Elapsed time: 732.851 seconds
kfifo and spsc are an order of magnitude faster than locking, with spsc being slightly better
Disruptor is about 2–3 times faster than locking
RingBuffer performs poorly when the number of consumers is relatively large, suspected that CAS operations cause excessive contention and waste CPU resources
With the same amount of data, the speed of single‑producer single‑consumer kfifo is 7–8 times faster than the multi‑producer multi‑consumer Disruptor
At the same time, in multi‑producer multi‑consumer scenarios there is contention; when contention is too intense, lock‑free performance degrades relatively
Correctness Test
All tests pass with -O3 enabled, except for the erroneous method noted in the spsc comment, which has not been resolved yet (:
In performance tests, spsc performs better than kfifo, but with -O3 enabled, moving the spin while into the push or pop function causes a deadlock. There is a phenomenon where inserting sleep for 1 nanosecond in the spin loop allows correct execution; the reason is unclear
Limitations
- When storing smart pointers, the objects they point to cannot be destructed promptly; after dequeue, the object remains in the data array and is not immediately destroyed
- To ensure performance, the array size N must be determined at compile time and cannot be dynamically resized.
Kiss ![]()
Recently discovered a mysterious application of CRTP.
Consider the following common scenario: you have a very complex class D, and you try to split it into the following modules:
- Shared information A, which is needed by the other functional modules (B and C)
- Functional module B
- Functional module C
- The interaction between modules B and C and the overall interface, called D
To avoid pulling a huge “shit‑mountain” into a single file, you decide to pull a chunk into each file to split them into separate files. Since you’re using C++, you want this division to satisfy the zero‑overhead abstraction principle, i.e., introduce no extra runtime or space overhead.
We consider the following approaches:
-
.h and .cpp
C++ can put declarations in .h and implementations in separate .cpp files; as long as you split the .cpp files reasonably, it’s theoretically possible, but it’s still ugly—for example, to compile you have to feed all .cpp files to the compiler (including only the .h will cause linking failures). Also, the function declarations are still lumped together in one file.
Moreover, separating interface and implementation prevents some optimizations; you need to enable
-fltoto allow cross‑file optimization. -
Split into four classes and use multiple inheritance
Write a common class A, let B and C inherit A, and let D multiply inherit B and C.
The problem with this approach is the diamond inheritance, so you’ll find A’s member variables appear twice in D, which is undesirable, causing ambiguity and wasting space.
-
Add virtual inheritance
Adding virtual inheritance solves the space waste, but unfortunately introduces a v‑table, leading to runtime overhead.
-
Final solution: CRTP
In the end, what you really need is a pointer to the shared information x. If you put the shared information in A and use multiple inheritance, you have to compromise either time or space because C++ must ensure B and C are usable (even though you won’t actually use B and C
Since both B and C need to use A, it’s certainly not that B or C own A; instead, the owners of B and C’s ownership hold the ownership of A.