Signet Forge 0.1.0
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
event_bus.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
5//
6// Routes SharedColumnBatch events through three tiers:
7//
8// Tier 1 — SPSC dedicated channels (sub-μs, one per producer-consumer pair)
9// MpmcRing used as SPSC when only one writer; no locking overhead.
10// Use: exchange adapter → specific strategy engine.
11//
12// Tier 2 — MPMC shared pool (μs, N producers → M worker threads)
13// All publish() calls land in a single MpmcRing shared by all
14// consumers. Workers call pop() in a tight loop.
15// Use: all exchanges → ML inference pool (load-balanced).
16//
17// Tier 3 — WAL / StreamingSink (ms, async durable logging)
18// Every publish() optionally serialises the batch and submits a
19// StreamRecord to an attached StreamingSink. Non-blocking: drops
20// if the sink ring is full (increments tier3_drops counter).
21// Use: compliance log, Parquet compaction.
22//
23// Usage:
24// EventBus bus;
25//
26// // Tier 1: dedicated channel
27// auto ch = bus.make_channel("binance→risk", 256);
28// ch->push(batch); // producer (Binance adapter thread)
29// ch->pop(batch); // consumer (risk gate thread)
30//
31// // Tier 2: shared pool
32// bus.publish(batch); // any producer
33// bus.pop(batch); // any worker thread
34//
35// // Tier 3: attach sink for compliance logging
36// bus.attach_sink(&my_streaming_sink);
37//
38// Phase 9b: MPMC ColumnBatch Event Bus.
39
40#pragma once
41
42#include "signet/error.hpp"
46
47#include <atomic>
48#include <cstddef>
49#include <cstdint>
50#include <memory>
51#include <mutex>
52#include <string>
53#include <unordered_map>
54
55namespace signet::forge {
56
57// ============================================================================
58// EventBusOptions — defined outside EventBus for Apple Clang compat
59// ============================================================================
60
68 size_t tier2_capacity = 4096;
69 size_t tier1_capacity = 256;
70 bool enable_tier3 = false;
71};
72
73// ============================================================================
74// EventBus — multi-tier SharedColumnBatch router
75// ============================================================================
76
88class EventBus {
89public:
92
93 // -------------------------------------------------------------------------
94 // Channel — Tier-1 dedicated MpmcRing used as SPSC
95 // -------------------------------------------------------------------------
96
99
100 // -------------------------------------------------------------------------
101 // Construction
102 // -------------------------------------------------------------------------
103
106 explicit EventBus(Options opts = {})
107 : opts_(opts)
108 , tier2_(std::make_unique<MpmcRing<SharedColumnBatch>>(
109 opts.tier2_capacity)) {}
110
111 EventBus(EventBus&&) = delete;
113 EventBus(const EventBus&) = delete;
114 EventBus& operator=(const EventBus&) = delete;
115 ~EventBus() = default;
116
117 // =========================================================================
118 // Tier 1: dedicated per-channel rings
119 // =========================================================================
120
125 [[nodiscard]] std::shared_ptr<Channel> make_channel(
126 const std::string& name,
127 size_t capacity = 0) {
128 std::lock_guard<std::mutex> lk(channels_mutex_);
129 auto it = channels_.find(name);
130 if (it != channels_.end()) return it->second;
131
132 const size_t cap = (capacity == 0) ? opts_.tier1_capacity : capacity;
133 auto ch = std::make_shared<Channel>(cap);
134 channels_.emplace(name, ch);
135 return ch;
136 }
137
141 [[nodiscard]] std::shared_ptr<Channel> channel(
142 const std::string& name) const {
143 std::lock_guard<std::mutex> lk(channels_mutex_);
144 auto it = channels_.find(name);
145 return (it != channels_.end()) ? it->second : nullptr;
146 }
147
148 // =========================================================================
149 // Tier 2: shared MPMC pool
150 // =========================================================================
151
159 // Tier 3 first — atomic load ensures sink stays alive for the call
160 // (lock-free: no mutex on the publish() hot path)
161 auto sink = std::atomic_load_explicit(&sink_, std::memory_order_acquire);
162 if (opts_.enable_tier3 && sink) {
163 auto rec = batch->to_stream_record();
164 auto r = sink->submit(std::move(rec));
165 if (!r) tier3_drops_.fetch_add(1, std::memory_order_relaxed);
166 }
167
168 // Tier 2
169 if (!tier2_->push(std::move(batch))) {
170 dropped_.fetch_add(1, std::memory_order_relaxed);
171 return false;
172 }
173 published_.fetch_add(1, std::memory_order_relaxed);
174 return true;
175 }
176
181 return tier2_->pop(out);
182 }
183
184 // =========================================================================
185 // Tier 3: WAL / StreamingSink
186 // =========================================================================
187
191 void attach_sink(std::shared_ptr<StreamingSink> sink) {
192 std::atomic_store_explicit(&sink_, std::move(sink),
193 std::memory_order_release);
194 }
195
197 void detach_sink() {
198 std::atomic_store_explicit(&sink_, std::shared_ptr<StreamingSink>{},
199 std::memory_order_release);
200 }
201
204 [[nodiscard]] bool has_sink() const {
205 return std::atomic_load_explicit(&sink_, std::memory_order_acquire)
206 != nullptr;
207 }
208
209 // =========================================================================
210 // Stats
211 // =========================================================================
212
214 struct Stats {
215 uint64_t published = 0;
216 uint64_t dropped = 0;
217 uint64_t tier3_drops = 0;
218 };
219
222 [[nodiscard]] Stats stats() const noexcept {
223 return Stats{
224 published_.load(std::memory_order_relaxed),
225 dropped_.load(std::memory_order_relaxed),
226 tier3_drops_.load(std::memory_order_relaxed)
227 };
228 }
229
231 void reset_stats() noexcept {
232 published_.store(0, std::memory_order_relaxed);
233 dropped_.store(0, std::memory_order_relaxed);
234 tier3_drops_.store(0, std::memory_order_relaxed);
235 }
236
237 // =========================================================================
238 // Introspection
239 // =========================================================================
240
242 [[nodiscard]] size_t tier2_size() const noexcept { return tier2_->size(); }
244 [[nodiscard]] size_t tier2_capacity() const noexcept { return tier2_->capacity(); }
246 [[nodiscard]] size_t num_channels() const {
247 std::lock_guard<std::mutex> lk(channels_mutex_);
248 return channels_.size();
249 }
250
251private:
252 Options opts_;
253
254 // Tier 2 — shared MPMC ring
255 std::unique_ptr<MpmcRing<SharedColumnBatch>> tier2_;
256
257 // Tier 1 — named dedicated channels
258 mutable std::mutex channels_mutex_;
259 std::unordered_map<std::string, std::shared_ptr<Channel>> channels_;
260
261 // Tier 3 — WAL sink (shared ownership; lock-free via atomic shared_ptr).
262 // CWE-362: H-16 fix — atomic_load/store on shared_ptr prevents
263 // use-after-detach race without mutex overhead on the publish() hot path.
264 std::shared_ptr<StreamingSink> sink_;
265
266 // Stats
267 std::atomic<uint64_t> published_{0};
268 std::atomic<uint64_t> dropped_{0};
269 std::atomic<uint64_t> tier3_drops_{0};
270};
271
272} // namespace signet::forge
Multi-tier event bus for routing SharedColumnBatch events.
Definition event_bus.hpp:88
EventBus(EventBus &&)=delete
Non-movable (contains mutex).
bool pop(SharedColumnBatch &out)
Pop a batch from the shared Tier-2 ring (for worker threads).
bool publish(SharedColumnBatch batch)
Publish a batch to the shared Tier-2 ring.
void attach_sink(std::shared_ptr< StreamingSink > sink)
Attach a StreamingSink for Tier-3 durable logging.
bool has_sink() const
Check whether a Tier-3 StreamingSink is currently attached.
EventBus & operator=(EventBus &&)=delete
Non-movable (contains mutex).
std::shared_ptr< Channel > make_channel(const std::string &name, size_t capacity=0)
Create (or return existing) named SPSC/MPSC channel.
EventBus(const EventBus &)=delete
size_t num_channels() const
Number of named Tier-1 channels that have been created.
EventBus & operator=(const EventBus &)=delete
EventBus(Options opts={})
Construct an EventBus with the given options.
Stats stats() const noexcept
Return a snapshot of the cumulative event bus statistics.
size_t tier2_capacity() const noexcept
Capacity of the Tier-2 MPMC ring (power-of-two, set by EventBusOptions).
void detach_sink()
Detach the currently attached Tier-3 sink (no-op if none attached).
size_t tier2_size() const noexcept
Approximate number of batches currently in the Tier-2 ring.
EventBusOptions Options
Alias for the options struct.
Definition event_bus.hpp:91
std::shared_ptr< Channel > channel(const std::string &name) const
Look up an existing channel by name.
void reset_stats() noexcept
Reset all counters to zero.
Lock-free bounded multi-producer multi-consumer ring buffer.
Definition mpmc_ring.hpp:46
Column-major batch of feature rows for zero-copy tensor wrapping and WAL serialization in ML inferenc...
Lock-free bounded MPMC ring buffer based on Dmitry Vyukov's algorithm.
std::shared_ptr< ColumnBatch > SharedColumnBatch
Thread-safe shared pointer to a ColumnBatch – the unit transferred between producer and consumer thre...
Lock-free SPSC/MPSC ring buffers, StreamingSink for background Parquet compaction,...
Configuration options for EventBus.
Definition event_bus.hpp:67
bool enable_tier3
Route batches to attached StreamingSink.
Definition event_bus.hpp:70
size_t tier2_capacity
Tier-2 MPMC ring capacity (power-of-2)
Definition event_bus.hpp:68
size_t tier1_capacity
Default capacity for make_channel()
Definition event_bus.hpp:69
Snapshot of cumulative event bus counters.
uint64_t published
Total batches successfully enqueued to Tier-2.
uint64_t dropped
Batches dropped because the Tier-2 ring was full.
uint64_t tier3_drops
Batches dropped at the Tier-3 sink (full or not attached).