Signet Forge 0.1.0
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
mpmc_ring.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
5//
6// O(1) amortized push/pop, no allocation after construction, ABA-safe via
7// per-slot sequence numbers. Runtime capacity (always rounded up to next
8// power of two, minimum 2).
9//
10// Phase 9b: MPMC ColumnBatch Event Bus.
11
12#pragma once
13
14#include <atomic>
15#include <cstddef>
16#include <cstdint>
17#include <memory>
18#include <stdexcept>
19#include <type_traits>
20
21namespace signet::forge {
22
23// ============================================================================
24// MpmcRing<T> — Vyukov bounded MPMC queue
25// ============================================================================
26
45template<typename T>
46class MpmcRing {
47public:
48 // -------------------------------------------------------------------------
49 // Construction
50 // -------------------------------------------------------------------------
51
66 explicit MpmcRing(size_t capacity) {
67 if (capacity == 0) {
68 throw std::invalid_argument("MpmcRing capacity must be >= 1");
69 }
70 size_t cap = 2;
71 while (cap < capacity) cap <<= 1;
72 // Invariant: cap is a power of two (enforced by construction above)
73 capacity_ = cap;
74 mask_ = cap - 1;
75
76 slots_ = std::make_unique<Slot[]>(cap);
77 for (size_t i = 0; i < cap; ++i)
78 slots_[i].seq.store(i, std::memory_order_relaxed);
79
80 enqueue_.store(0, std::memory_order_relaxed);
81 dequeue_.store(0, std::memory_order_relaxed);
82 }
83
84 MpmcRing(const MpmcRing&) = delete;
85 MpmcRing& operator=(const MpmcRing&) = delete;
86 MpmcRing(MpmcRing&&) = delete;
88
89 // -------------------------------------------------------------------------
90 // push — non-blocking; returns false if ring is full
91 // -------------------------------------------------------------------------
92
97 [[nodiscard]] bool push(T val) noexcept(std::is_nothrow_move_assignable_v<T>) {
98 size_t pos = enqueue_.load(std::memory_order_relaxed);
99 for (;;) {
100 Slot& slot = slots_[pos & mask_];
101 const size_t seq = slot.seq.load(std::memory_order_acquire);
102 const auto diff = static_cast<intptr_t>(seq)
103 - static_cast<intptr_t>(pos);
104
105 if (diff == 0) {
106 // Slot free — try to claim it.
107 if (enqueue_.compare_exchange_weak(
108 pos, pos + 1,
109 std::memory_order_relaxed,
110 std::memory_order_relaxed)) {
111 slot.data = std::move(val);
112 slot.seq.store(pos + 1, std::memory_order_release);
113 return true;
114 }
115 // Another producer won the CAS; reload and retry.
116 } else if (diff < 0) {
117 return false; // ring is full
118 } else {
119 pos = enqueue_.load(std::memory_order_relaxed);
120 }
121 }
122 }
123
124 // -------------------------------------------------------------------------
125 // pop — non-blocking; returns false if ring is empty
126 // -------------------------------------------------------------------------
127
132 [[nodiscard]] bool pop(T& out) noexcept(std::is_nothrow_move_assignable_v<T>) {
133 size_t pos = dequeue_.load(std::memory_order_relaxed);
134 for (;;) {
135 Slot& slot = slots_[pos & mask_];
136 const size_t seq = slot.seq.load(std::memory_order_acquire);
137 const auto diff = static_cast<intptr_t>(seq)
138 - static_cast<intptr_t>(pos + 1);
139
140 if (diff == 0) {
141 // Slot full — try to claim it.
142 if (dequeue_.compare_exchange_weak(
143 pos, pos + 1,
144 std::memory_order_relaxed,
145 std::memory_order_relaxed)) {
146 out = std::move(slot.data);
147 // Mark slot free for the NEXT full cycle.
148 slot.seq.store(pos + mask_ + 1, std::memory_order_release);
149 return true;
150 }
151 // Another consumer won the CAS; reload and retry.
152 } else if (diff < 0) {
153 return false; // ring is empty
154 } else {
155 pos = dequeue_.load(std::memory_order_relaxed);
156 }
157 }
158 }
159
160 // -------------------------------------------------------------------------
161 // Metadata
162 // -------------------------------------------------------------------------
163
165 [[nodiscard]] size_t capacity() const noexcept { return capacity_; }
166
169 [[nodiscard]] size_t size() const noexcept {
170 const size_t head = enqueue_.load(std::memory_order_relaxed);
171 const size_t tail = dequeue_.load(std::memory_order_relaxed);
172 return (head >= tail) ? (head - tail) : 0u;
173 }
174
176 [[nodiscard]] bool empty() const noexcept { return size() == 0u; }
177
178private:
184 struct alignas(64) Slot {
185 std::atomic<size_t> seq{0};
186 T data{};
187 };
188
189 // Separate cache lines for producer and consumer cursors.
190 static constexpr size_t kCacheLine = 64;
191
192 alignas(kCacheLine) std::atomic<size_t> enqueue_{0};
193 alignas(kCacheLine) std::atomic<size_t> dequeue_{0};
194
195 std::unique_ptr<Slot[]> slots_;
196 size_t capacity_{0};
197 size_t mask_{0};
198};
199
200} // namespace signet::forge
Lock-free bounded multi-producer multi-consumer ring buffer.
Definition mpmc_ring.hpp:46
size_t size() const noexcept
Approximate occupancy (not linearizable without external synchronization).
size_t capacity() const noexcept
Return the actual ring capacity (always a power of two).
MpmcRing(MpmcRing &&)=delete
MpmcRing(size_t capacity)
Construct a ring buffer with the given capacity.
Definition mpmc_ring.hpp:66
bool empty() const noexcept
Check whether the ring appears empty (approximate).
MpmcRing & operator=(const MpmcRing &)=delete
MpmcRing & operator=(MpmcRing &&)=delete
MpmcRing(const MpmcRing &)=delete
bool pop(T &out) noexcept(std::is_nothrow_move_assignable_v< T >)
Pop an element from the ring (non-blocking).
bool push(T val) noexcept(std::is_nothrow_move_assignable_v< T >)
Push an element into the ring (non-blocking).
Definition mpmc_ring.hpp:97