53#include <unordered_map>
126 const std::string& name,
127 size_t capacity = 0) {
128 std::lock_guard<std::mutex> lk(channels_mutex_);
129 auto it = channels_.find(name);
130 if (it != channels_.end())
return it->second;
132 const size_t cap = (capacity == 0) ? opts_.
tier1_capacity : capacity;
133 auto ch = std::make_shared<Channel>(cap);
134 channels_.emplace(name, ch);
141 [[nodiscard]] std::shared_ptr<Channel>
channel(
142 const std::string& name)
const {
143 std::lock_guard<std::mutex> lk(channels_mutex_);
144 auto it = channels_.find(name);
145 return (it != channels_.end()) ? it->second :
nullptr;
161 auto sink = std::atomic_load_explicit(&sink_, std::memory_order_acquire);
163 auto rec = batch->to_stream_record();
164 auto r = sink->submit(std::move(rec));
165 if (!r) tier3_drops_.fetch_add(1, std::memory_order_relaxed);
169 if (!tier2_->push(std::move(batch))) {
170 dropped_.fetch_add(1, std::memory_order_relaxed);
173 published_.fetch_add(1, std::memory_order_relaxed);
181 return tier2_->pop(out);
192 std::atomic_store_explicit(&sink_, std::move(sink),
193 std::memory_order_release);
198 std::atomic_store_explicit(&sink_, std::shared_ptr<StreamingSink>{},
199 std::memory_order_release);
205 return std::atomic_load_explicit(&sink_, std::memory_order_acquire)
224 published_.load(std::memory_order_relaxed),
225 dropped_.load(std::memory_order_relaxed),
226 tier3_drops_.load(std::memory_order_relaxed)
232 published_.store(0, std::memory_order_relaxed);
233 dropped_.store(0, std::memory_order_relaxed);
234 tier3_drops_.store(0, std::memory_order_relaxed);
242 [[nodiscard]]
size_t tier2_size() const noexcept {
return tier2_->size(); }
244 [[nodiscard]]
size_t tier2_capacity() const noexcept {
return tier2_->capacity(); }
247 std::lock_guard<std::mutex> lk(channels_mutex_);
248 return channels_.size();
255 std::unique_ptr<MpmcRing<SharedColumnBatch>> tier2_;
258 mutable std::mutex channels_mutex_;
259 std::unordered_map<std::string, std::shared_ptr<Channel>> channels_;
264 std::shared_ptr<StreamingSink> sink_;
267 std::atomic<uint64_t> published_{0};
268 std::atomic<uint64_t> dropped_{0};
269 std::atomic<uint64_t> tier3_drops_{0};
Multi-tier event bus for routing SharedColumnBatch events.
EventBus(EventBus &&)=delete
Non-movable (contains mutex).
bool pop(SharedColumnBatch &out)
Pop a batch from the shared Tier-2 ring (for worker threads).
bool publish(SharedColumnBatch batch)
Publish a batch to the shared Tier-2 ring.
void attach_sink(std::shared_ptr< StreamingSink > sink)
Attach a StreamingSink for Tier-3 durable logging.
bool has_sink() const
Check whether a Tier-3 StreamingSink is currently attached.
EventBus & operator=(EventBus &&)=delete
Non-movable (contains mutex).
std::shared_ptr< Channel > make_channel(const std::string &name, size_t capacity=0)
Create (or return existing) named SPSC/MPSC channel.
EventBus(const EventBus &)=delete
size_t num_channels() const
Number of named Tier-1 channels that have been created.
EventBus & operator=(const EventBus &)=delete
EventBus(Options opts={})
Construct an EventBus with the given options.
Stats stats() const noexcept
Return a snapshot of the cumulative event bus statistics.
size_t tier2_capacity() const noexcept
Capacity of the Tier-2 MPMC ring (power-of-two, set by EventBusOptions).
void detach_sink()
Detach the currently attached Tier-3 sink (no-op if none attached).
size_t tier2_size() const noexcept
Approximate number of batches currently in the Tier-2 ring.
EventBusOptions Options
Alias for the options struct.
std::shared_ptr< Channel > channel(const std::string &name) const
Look up an existing channel by name.
void reset_stats() noexcept
Reset all counters to zero.
Lock-free bounded multi-producer multi-consumer ring buffer.
Column-major batch of feature rows for zero-copy tensor wrapping and WAL serialization in ML inferenc...
Lock-free bounded MPMC ring buffer based on Dmitry Vyukov's algorithm.
std::shared_ptr< ColumnBatch > SharedColumnBatch
Thread-safe shared pointer to a ColumnBatch – the unit transferred between producer and consumer thre...
Lock-free SPSC/MPSC ring buffers, StreamingSink for background Parquet compaction,...
Configuration options for EventBus.
bool enable_tier3
Route batches to attached StreamingSink.
size_t tier2_capacity
Tier-2 MPMC ring capacity (power-of-2)
size_t tier1_capacity
Default capacity for make_channel()
Snapshot of cumulative event bus counters.
uint64_t published
Total batches successfully enqueued to Tier-2.
uint64_t dropped
Batches dropped because the Tier-2 ring was full.
uint64_t tier3_drops
Batches dropped at the Tier-3 sink (full or not attached).