68 throw std::invalid_argument(
"MpmcRing capacity must be >= 1");
76 slots_ = std::make_unique<Slot[]>(cap);
77 for (
size_t i = 0; i < cap; ++i)
78 slots_[i].seq.store(i, std::memory_order_relaxed);
80 enqueue_.store(0, std::memory_order_relaxed);
81 dequeue_.store(0, std::memory_order_relaxed);
97 [[nodiscard]]
bool push(T val)
noexcept(std::is_nothrow_move_assignable_v<T>) {
98 size_t pos = enqueue_.load(std::memory_order_relaxed);
100 Slot& slot = slots_[pos & mask_];
101 const size_t seq = slot.seq.load(std::memory_order_acquire);
102 const auto diff =
static_cast<intptr_t
>(seq)
103 -
static_cast<intptr_t
>(pos);
107 if (enqueue_.compare_exchange_weak(
109 std::memory_order_relaxed,
110 std::memory_order_relaxed)) {
111 slot.data = std::move(val);
112 slot.seq.store(pos + 1, std::memory_order_release);
116 }
else if (diff < 0) {
119 pos = enqueue_.load(std::memory_order_relaxed);
132 [[nodiscard]]
bool pop(T& out)
noexcept(std::is_nothrow_move_assignable_v<T>) {
133 size_t pos = dequeue_.load(std::memory_order_relaxed);
135 Slot& slot = slots_[pos & mask_];
136 const size_t seq = slot.seq.load(std::memory_order_acquire);
137 const auto diff =
static_cast<intptr_t
>(seq)
138 -
static_cast<intptr_t
>(pos + 1);
142 if (dequeue_.compare_exchange_weak(
144 std::memory_order_relaxed,
145 std::memory_order_relaxed)) {
146 out = std::move(slot.data);
148 slot.seq.store(pos + mask_ + 1, std::memory_order_release);
152 }
else if (diff < 0) {
155 pos = dequeue_.load(std::memory_order_relaxed);
165 [[nodiscard]]
size_t capacity() const noexcept {
return capacity_; }
169 [[nodiscard]]
size_t size() const noexcept {
170 const size_t head = enqueue_.load(std::memory_order_relaxed);
171 const size_t tail = dequeue_.load(std::memory_order_relaxed);
172 return (head >= tail) ? (head - tail) : 0u;
176 [[nodiscard]]
bool empty() const noexcept {
return size() == 0u; }
184 struct alignas(64) Slot {
185 std::atomic<size_t> seq{0};
190 static constexpr size_t kCacheLine = 64;
192 alignas(kCacheLine) std::atomic<size_t> enqueue_{0};
193 alignas(kCacheLine) std::atomic<size_t> dequeue_{0};
195 std::unique_ptr<Slot[]> slots_;
Lock-free bounded multi-producer multi-consumer ring buffer.
size_t size() const noexcept
Approximate occupancy (not linearizable without external synchronization).
size_t capacity() const noexcept
Return the actual ring capacity (always a power of two).
MpmcRing(MpmcRing &&)=delete
MpmcRing(size_t capacity)
Construct a ring buffer with the given capacity.
bool empty() const noexcept
Check whether the ring appears empty (approximate).
MpmcRing & operator=(const MpmcRing &)=delete
MpmcRing & operator=(MpmcRing &&)=delete
MpmcRing(const MpmcRing &)=delete
bool pop(T &out) noexcept(std::is_nothrow_move_assignable_v< T >)
Pop an element from the ring (non-blocking).
bool push(T val) noexcept(std::is_nothrow_move_assignable_v< T >)
Push an element into the ring (non-blocking).