Signet Forge 0.1.0
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
streaming_sink.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
3// streaming_sink.hpp — Lock-free SPSC ring buffer + async Parquet compaction
4// for SignetStack Signet Forge.
5// Phase 8: Streaming WAL + Async Compaction.
6
11
12#pragma once
13
14#include "signet/error.hpp"
15#include "signet/types.hpp"
16#include "signet/schema.hpp"
17#include "signet/writer.hpp"
18#include "signet/reader.hpp"
19#include "signet/ai/wal.hpp"
20
21#include <atomic>
22#include <chrono>
23#include <condition_variable>
24#include <cstdint>
25#include <cstring>
26#include <filesystem>
27#include <functional>
28#include <limits>
29#include <memory>
30#include <mutex>
31#include <numeric>
32#include <string>
33#include <string_view>
34#include <thread>
35#include <vector>
36
37namespace signet::forge {
38
39// ============================================================================
40// detail — internal helpers (base64 encode/decode, etc.)
41// ============================================================================
42
44namespace detail {
45
47inline constexpr char kBase64Chars[] =
48 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
49
51inline std::string base64_encode(const uint8_t* data, size_t len) {
52 std::string result;
53 result.reserve(((len + 2) / 3) * 4);
54
55 for (size_t i = 0; i < len; i += 3) {
56 uint32_t octet_a = i < len ? data[i] : 0u;
57 uint32_t octet_b = i + 1 < len ? data[i + 1] : 0u;
58 uint32_t octet_c = i + 2 < len ? data[i + 2] : 0u;
59
60 uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c;
61
62 result.push_back(kBase64Chars[(triple >> 18) & 0x3Fu]);
63 result.push_back(kBase64Chars[(triple >> 12) & 0x3Fu]);
64 result.push_back(i + 1 < len ? kBase64Chars[(triple >> 6) & 0x3Fu] : '=');
65 result.push_back(i + 2 < len ? kBase64Chars[(triple ) & 0x3Fu] : '=');
66 }
67
68 return result;
69}
70
73inline uint8_t base64_val(char c) {
74 if (c >= 'A' && c <= 'Z') return static_cast<uint8_t>(c - 'A');
75 if (c >= 'a' && c <= 'z') return static_cast<uint8_t>(c - 'a' + 26);
76 if (c >= '0' && c <= '9') return static_cast<uint8_t>(c - '0' + 52);
77 if (c == '+') return 62u;
78 if (c == '/') return 63u;
79 if (c == '=') return 64u; // padding sentinel
80 return 255u; // invalid
81}
82
85inline std::vector<uint8_t> base64_decode(std::string_view encoded) {
86 const size_t n = encoded.size();
87 if (n % 4 != 0) return {};
88
89 std::vector<uint8_t> result;
90 result.reserve((n / 4) * 3);
91
92 for (size_t i = 0; i < n; i += 4) {
93 uint8_t v0 = base64_val(encoded[i]);
94 uint8_t v1 = base64_val(encoded[i + 1]);
95 uint8_t v2 = base64_val(encoded[i + 2]);
96 uint8_t v3 = base64_val(encoded[i + 3]);
97
98 if (v0 == 255u || v1 == 255u || v2 == 255u || v3 == 255u) return {};
99
100 uint32_t triple = (static_cast<uint32_t>(v0) << 18) |
101 (static_cast<uint32_t>(v1) << 12) |
102 (static_cast<uint32_t>(v2) << 6) |
103 (static_cast<uint32_t>(v3));
104
105 result.push_back(static_cast<uint8_t>((triple >> 16) & 0xFFu));
106 if (v2 != 64u) result.push_back(static_cast<uint8_t>((triple >> 8) & 0xFFu));
107 if (v3 != 64u) result.push_back(static_cast<uint8_t>( triple & 0xFFu));
108 }
109
110 return result;
111}
112
113} // namespace detail
115
116// ============================================================================
117// SpscRingBuffer<T, Capacity>
118// Lock-free single-producer single-consumer ring buffer.
119// Capacity must be a power of two.
120// ============================================================================
121
131template <typename T, size_t Capacity>
133 static_assert((Capacity & (Capacity - 1)) == 0,
134 "SpscRingBuffer: Capacity must be a power of 2");
135
136public:
138 SpscRingBuffer() : head_(0), tail_(0) {}
139
140 // Non-copyable, non-movable (atomic members).
143
144 // -------------------------------------------------------------------------
145 // Producer API (single producer thread)
146 // -------------------------------------------------------------------------
147
149 bool push(T item) {
150 const size_t head = head_.load(std::memory_order_relaxed);
151 const size_t next = (head + 1) & kMask;
152
153 if (next == tail_.load(std::memory_order_acquire)) {
154 return false; // full
155 }
156
157 storage_[head] = std::move(item);
158 head_.store(next, std::memory_order_release);
159 return true;
160 }
161
163 size_t push(const T* items, size_t count) {
164 size_t pushed = 0;
165 for (size_t i = 0; i < count; ++i) {
166 if (!push(items[i])) break;
167 ++pushed;
168 }
169 return pushed;
170 }
171
172 // -------------------------------------------------------------------------
173 // Consumer API (single consumer thread)
174 // -------------------------------------------------------------------------
175
177 bool pop(T& out) {
178 const size_t tail = tail_.load(std::memory_order_relaxed);
179
180 if (tail == head_.load(std::memory_order_acquire)) {
181 return false; // empty
182 }
183
184 out = std::move(storage_[tail]);
185 tail_.store((tail + 1) & kMask, std::memory_order_release);
186 return true;
187 }
188
190 size_t pop(T* out, size_t max_count) {
191 size_t popped = 0;
192 while (popped < max_count) {
193 const size_t tail = tail_.load(std::memory_order_relaxed);
194 if (tail == head_.load(std::memory_order_acquire)) break;
195
196 out[popped] = std::move(storage_[tail]);
197 tail_.store((tail + 1) & kMask, std::memory_order_release);
198 ++popped;
199 }
200 return popped;
201 }
202
203 // -------------------------------------------------------------------------
204 // Queries (approximate — no lock)
205 // -------------------------------------------------------------------------
206
208 [[nodiscard]] size_t size() const {
209 const size_t head = head_.load(std::memory_order_acquire);
210 const size_t tail = tail_.load(std::memory_order_acquire);
211 return (head - tail) & kMask;
212 }
213
215 [[nodiscard]] bool empty() const { return size() == 0; }
217 [[nodiscard]] bool full() const { return size() == Capacity - 1; }
218
220 static constexpr size_t capacity() { return Capacity; }
221
222private:
223 static constexpr size_t kMask = Capacity - 1;
224
225 alignas(64) std::atomic<size_t> head_;
226 alignas(64) std::atomic<size_t> tail_;
227 // L28: storage_ is a fixed-size C array embedded in the object. For large
228 // Capacity values, this can exceed stack limits if allocated on the stack.
229 // Always heap-allocate SpscRingBuffer (e.g., via std::make_unique) when
230 // Capacity * sizeof(T) is significant.
231 T storage_[Capacity];
232};
233
234// ============================================================================
235// MpscRingBuffer<T, Capacity>
236// Multiple-Producer Single-Consumer bounded ring buffer.
237// Lock-free producers via CAS on enqueue position (Vyukov-style).
238// Single consumer requires no locking on the dequeue path.
239// Capacity must be a power of two.
240// ============================================================================
241
250template <typename T, size_t Capacity>
252 static_assert((Capacity & (Capacity - 1)) == 0,
253 "MpscRingBuffer: Capacity must be a power of 2");
254
255 // Per-slot handshake sequence. Initialized to slot index.
256 // After a producer writes: sequence = enqueue_pos + 1.
257 // After the consumer reads: sequence = enqueue_pos + Capacity (ready for reuse).
258 struct Slot {
259 std::atomic<size_t> sequence{0};
260 T data{};
261 };
262
263 static constexpr size_t kMask = Capacity - 1;
264
265 alignas(64) std::atomic<size_t> enqueue_pos_{0};
266 alignas(64) size_t dequeue_pos_{0}; // single consumer — plain size_t
267 Slot slots_[Capacity];
268
269public:
272 for (size_t i = 0; i < Capacity; ++i) {
273 slots_[i].sequence.store(i, std::memory_order_relaxed);
274 }
275 }
276
277 // Non-copyable, non-movable (atomic members).
280
281 // -------------------------------------------------------------------------
282 // Producer API — safe to call from multiple threads concurrently
283 // -------------------------------------------------------------------------
284
286 bool push(T item) {
287 size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
288 for (;;) {
289 Slot& slot = slots_[pos & kMask];
290 const size_t seq = slot.sequence.load(std::memory_order_acquire);
291 const auto diff = static_cast<std::ptrdiff_t>(seq)
292 - static_cast<std::ptrdiff_t>(pos);
293
294 if (diff == 0) {
295 // Slot is free: try to claim it with a CAS on enqueue_pos_.
296 if (enqueue_pos_.compare_exchange_weak(
297 pos, pos + 1,
298 std::memory_order_relaxed,
299 std::memory_order_relaxed)) {
300 // We own the slot — write data and publish.
301 slot.data = std::move(item);
302 slot.sequence.store(pos + 1, std::memory_order_release);
303 return true;
304 }
305 // Another producer beat us; pos was updated by the CAS failure — retry.
306 } else if (diff < 0) {
307 // Consumer hasn't freed this slot yet — buffer is full.
308 return false;
309 } else {
310 // Another producer is ahead of us; reload the current position.
311 pos = enqueue_pos_.load(std::memory_order_relaxed);
312 }
313 }
314 }
315
317 size_t push(const T* items, size_t count) {
318 size_t pushed = 0;
319 for (size_t i = 0; i < count; ++i) {
320 if (!push(items[i])) break;
321 ++pushed;
322 }
323 return pushed;
324 }
325
326 // -------------------------------------------------------------------------
327 // Consumer API — must be called from a single thread only
328 // -------------------------------------------------------------------------
329
331 bool pop(T& out) {
332 const size_t pos = dequeue_pos_;
333 Slot& slot = slots_[pos & kMask];
334 const size_t seq = slot.sequence.load(std::memory_order_acquire);
335
336 // The producer publishes by setting sequence = enqueue_pos + 1 = pos + 1.
337 if (seq != pos + 1) {
338 return false; // nothing ready yet
339 }
340
341 out = std::move(slot.data);
342 // Release the slot back to producers: next cycle starts at pos + Capacity.
343 slot.sequence.store(pos + Capacity, std::memory_order_release);
344 dequeue_pos_ = pos + 1;
345 return true;
346 }
347
349 size_t pop(T* out, size_t max_count) {
350 size_t popped = 0;
351 while (popped < max_count && pop(out[popped])) {
352 ++popped;
353 }
354 return popped;
355 }
356
357 // -------------------------------------------------------------------------
358 // Queries (approximate)
359 // -------------------------------------------------------------------------
360
362 [[nodiscard]] size_t size() const {
363 const size_t eq = enqueue_pos_.load(std::memory_order_acquire);
364 return eq - dequeue_pos_; // may exceed Capacity briefly during push; clamp if needed
365 }
367 [[nodiscard]] bool empty() const { return size() == 0; }
369 [[nodiscard]] bool full() const { return size() >= Capacity; }
370
372 static constexpr size_t capacity() { return Capacity; }
373};
374
375// ============================================================================
376// StreamRecord — unit of data flowing through the sink
377// ============================================================================
378
385 int64_t timestamp_ns = 0;
386 uint32_t type_id = 0;
387 std::string payload;
388};
389
390// ============================================================================
391// StreamingSink
392// Background-thread Parquet compaction sink.
393// ============================================================================
394
406public:
407 // -------------------------------------------------------------------------
408 // Options (nested struct)
409 // -------------------------------------------------------------------------
410
412 struct Options {
413 std::string output_dir;
414 std::string file_prefix = "stream";
415 size_t ring_buffer_capacity = 65536;
416 size_t row_group_size = 10'000;
417 size_t max_file_rows = 1'000'000;
418 std::chrono::milliseconds flush_interval{100};
419 bool auto_start = true;
420 };
421
422 // -------------------------------------------------------------------------
423 // Factory — returns expected<StreamingSink> (movable via unique_ptr<Impl>)
424 // -------------------------------------------------------------------------
425
434 [[nodiscard]] static expected<StreamingSink> create(Options opts) {
435 if (opts.output_dir.empty())
436 return Error{ErrorCode::IO_ERROR, "StreamingSink: output_dir must not be empty"};
437
438 // L29: Path traversal guard using std::filesystem::path iteration
439 // instead of manual string splitting for correctness across platforms.
440 {
441 std::filesystem::path p(opts.output_dir);
442 for (const auto& comp : p) {
443 if (comp == "..")
445 "StreamingSink: output_dir must not contain '..' path traversal"}; // CWE-22: Improper Limitation of a Pathname to a Restricted Directory
446 }
447 }
448
449 std::error_code ec;
450 std::filesystem::create_directories(opts.output_dir, ec);
451 if (ec)
453 "StreamingSink: cannot create output_dir '" +
454 opts.output_dir + "': " + ec.message()};
455
456 const size_t cap = opts.ring_buffer_capacity;
457 if (cap == 0 || (cap & (cap - 1)) != 0)
459 "StreamingSink: ring_buffer_capacity must be a power of 2"};
460
461 auto impl = std::make_unique<Impl>(std::move(opts));
462 const bool auto_start = impl->opts_.auto_start;
463 StreamingSink sink(std::move(impl));
464 if (auto_start) sink.impl_->start();
465 return std::move(sink); // StreamingSink is movable (unique_ptr<Impl>)
466 }
467
468 // -------------------------------------------------------------------------
469 // Movable, non-copyable
470 // -------------------------------------------------------------------------
471
474 StreamingSink(const StreamingSink&) = delete;
476
478 if (impl_) (void)impl_->stop();
479 }
480
481 // -------------------------------------------------------------------------
482 // Submit (MPSC-safe via mutex in Impl)
483 // -------------------------------------------------------------------------
484
489 [[nodiscard]] expected<void> submit(StreamRecord rec) {
490 return impl_->submit(std::move(rec));
491 }
492
499 [[nodiscard]] expected<void> submit(int64_t timestamp_ns, uint32_t type_id,
500 const uint8_t* data, size_t size) {
501 StreamRecord rec;
502 rec.timestamp_ns = timestamp_ns;
503 rec.type_id = type_id;
504 rec.payload.assign(reinterpret_cast<const char*>(data), size);
505 return impl_->submit(std::move(rec));
506 }
507
513 [[nodiscard]] expected<void> submit(int64_t timestamp_ns, uint32_t type_id,
514 std::string_view sv) {
515 StreamRecord rec;
516 rec.timestamp_ns = timestamp_ns;
517 rec.type_id = type_id;
518 rec.payload.assign(sv.data(), sv.size());
519 return impl_->submit(std::move(rec));
520 }
521
522 // -------------------------------------------------------------------------
523 // Lifecycle
524 // -------------------------------------------------------------------------
525
527 void start() { impl_->start(); }
531 [[nodiscard]] expected<void> stop() { return impl_->stop(); }
533 void stop_nowait() { impl_->stop_nowait(); }
534
535 // -------------------------------------------------------------------------
536 // Force flush (blocking)
537 // -------------------------------------------------------------------------
538
543 [[nodiscard]] expected<void> flush() { return impl_->flush(); }
544
545 // -------------------------------------------------------------------------
546 // Stats
547 // -------------------------------------------------------------------------
548
550 [[nodiscard]] uint64_t records_submitted() const { return impl_->records_submitted_.load(std::memory_order_relaxed); }
552 [[nodiscard]] int64_t records_written() const { return impl_->records_written_.load(std::memory_order_relaxed); }
554 [[nodiscard]] uint64_t records_dropped() const { return impl_->records_dropped_.load(std::memory_order_relaxed); }
556 [[nodiscard]] int64_t files_written() const { return impl_->files_written_.load(std::memory_order_relaxed); }
558 [[nodiscard]] int64_t bytes_written() const { return impl_->bytes_written_.load(std::memory_order_relaxed); }
559
560 // -------------------------------------------------------------------------
561 // Output file listing
562 // -------------------------------------------------------------------------
563
565 [[nodiscard]] std::vector<std::string> output_files() const {
566 std::lock_guard<std::mutex> lk(impl_->files_mutex_);
567 return impl_->output_files_;
568 }
569
570private:
571 // =========================================================================
572 // Impl — holds all non-movable state + implementation logic
573 // =========================================================================
574
575 struct Impl {
576 static constexpr size_t kRingCap = 65536; // power of 2
578
579 // ----- Construction -----
580 explicit Impl(Options opts)
581 : opts_(std::move(opts))
582 , ring_(std::make_unique<RingBuffer>())
583 {}
584
585 // ----- Ring helpers -----
586
587 // Soft-cap: honour opts_.ring_buffer_capacity even though the physical
588 // ring (kRingCap) may be larger. Records beyond the soft cap are dropped.
589 bool ring_push(StreamRecord rec) {
590 if (ring_->size() >= opts_.ring_buffer_capacity) return false;
591 return ring_->push(std::move(rec));
592 }
593
594 // Must be called with consumer_mutex_ held.
595 void drain_ring_locked(std::vector<StreamRecord>& batch, size_t max_count) {
596 StreamRecord r;
597 while (batch.size() < max_count && ring_->pop(r))
598 batch.push_back(std::move(r));
599 }
600
601 // ----- Submit -----
602 [[nodiscard]] expected<void> submit(StreamRecord rec) {
603 std::lock_guard<std::mutex> lk(submit_mutex_);
604 if (!ring_push(std::move(rec))) {
605 records_dropped_.fetch_add(1, std::memory_order_relaxed);
606 return Error{ErrorCode::IO_ERROR, "StreamingSink: ring buffer full, record dropped"};
607 }
608 records_submitted_.fetch_add(1, std::memory_order_relaxed);
609 cv_.notify_one();
610 return {};
611 }
612
613 // ----- Lifecycle -----
614 void start() {
615 bool exp = false;
616 if (!running_.compare_exchange_strong(exp, true, std::memory_order_acq_rel))
617 return;
618 stop_requested_.store(false, std::memory_order_release);
619 worker_ = std::thread(&Impl::compaction_loop, this);
620 }
621
622 [[nodiscard]] expected<void> stop() {
623 if (!running_.load(std::memory_order_acquire)) return {};
624 stop_requested_.store(true, std::memory_order_release);
625 cv_.notify_all();
626 if (worker_.joinable()) worker_.join();
627 running_.store(false, std::memory_order_release);
628 // Close any writer not already finalized in the compaction loop
629 // (can happen if stop_nowait() was used or thread exited early).
630 if (current_writer_) {
631 (void)current_writer_->close();
632 if (current_file_rows_ > 0) register_output_file(current_file_path_);
633 current_writer_.reset();
634 current_file_rows_ = 0;
635 }
636 return {};
637 }
638
639 void stop_nowait() {
640 stop_requested_.store(true, std::memory_order_release);
641 cv_.notify_all();
642 }
643
644 // ----- Flush -----
645 // Drain the ring and write all pending records, then close the current
646 // Parquet file so it has a valid footer and is immediately readable.
647 // Acquires consumer_mutex_ to serialise against the compaction thread.
648 [[nodiscard]] expected<void> flush() {
649 std::lock_guard<std::mutex> consumer_lk(consumer_mutex_);
650 std::vector<StreamRecord> batch;
651 drain_ring_locked(batch, (std::numeric_limits<size_t>::max)());
652 if (!batch.empty()) {
653 auto r = write_batch(batch);
654 if (!r) return r;
655 }
656 // Close the current writer (if open) so the file has a valid footer.
657 if (current_writer_) {
658 auto r = current_writer_->close();
659 if (current_file_rows_ > 0) register_output_file(current_file_path_);
660 current_writer_.reset();
661 current_file_rows_ = 0;
662 if (!r) return r.error();
663 }
664 return expected<void>{};
665 }
666
667 // ----- Background thread -----
668 void compaction_loop() {
669 std::vector<StreamRecord> batch;
670 batch.reserve(opts_.row_group_size);
671
672 while (true) {
673 {
674 std::unique_lock<std::mutex> lk(cv_mutex_);
675 cv_.wait_for(lk, opts_.flush_interval, [this] {
676 return stop_requested_.load(std::memory_order_acquire) ||
677 !ring_->empty();
678 });
679 }
680
681 // Acquire consumer_mutex_ to serialise with flush() calls from
682 // outside the loop (SPSC ring has only one consumer at a time).
683 {
684 std::lock_guard<std::mutex> consumer_lk(consumer_mutex_);
685 batch.clear();
686 drain_ring_locked(batch, opts_.row_group_size);
687 if (!batch.empty()) (void)write_batch(batch);
688 }
689
690 if (stop_requested_.load(std::memory_order_acquire)) {
691 std::lock_guard<std::mutex> consumer_lk(consumer_mutex_);
692 batch.clear();
693 drain_ring_locked(batch, (std::numeric_limits<size_t>::max)());
694 if (!batch.empty()) (void)write_batch(batch);
695 // Close any open writer so the Parquet footer is written.
696 if (current_writer_) {
697 (void)current_writer_->close();
698 if (current_file_rows_ > 0) register_output_file(current_file_path_);
699 current_writer_.reset();
700 current_file_rows_ = 0;
701 }
702 break;
703 }
704 }
705 }
706
707 // ----- Parquet batch write -----
708 [[nodiscard]] expected<void> write_batch(std::vector<StreamRecord>& batch) {
709 if (batch.empty()) return expected<void>{};
710
711 Schema schema = Schema::builder("stream_data")
712 .column<int64_t>("timestamp_ns", LogicalType::TIMESTAMP_NS)
713 .column<int32_t>("type_id")
714 .column<std::string>("payload", LogicalType::STRING)
715 .build();
716
717 size_t written = 0;
718 while (written < batch.size()) {
719 const size_t remaining_in_file =
720 opts_.max_file_rows > current_file_rows_
721 ? opts_.max_file_rows - current_file_rows_ : 0;
722
723 if (remaining_in_file == 0) {
724 if (current_writer_) {
725 auto r = current_writer_->close();
726 current_writer_.reset();
727 if (!r) return r.error();
728 }
729 current_file_rows_ = 0;
730 }
731
732 if (!current_writer_) {
733 std::string path = next_output_path();
734 WriterOptions wo;
735 wo.row_group_size = static_cast<int64_t>(opts_.row_group_size);
736 auto wr = ParquetWriter::open(path, schema, wo);
737 if (!wr) return wr.error();
738 current_writer_ = std::make_unique<ParquetWriter>(std::move(*wr));
739 current_file_path_ = path;
740 }
741
742 const size_t chunk_size =
743 (std::min)(batch.size() - written,
744 opts_.max_file_rows - current_file_rows_);
745
746 std::vector<int64_t> ts_col;
747 std::vector<int32_t> type_col;
748 std::vector<std::string> payload_col;
749 ts_col.reserve(chunk_size);
750 type_col.reserve(chunk_size);
751 payload_col.reserve(chunk_size);
752
753 for (size_t i = written; i < written + chunk_size; ++i) {
754 const auto& rec = batch[i];
755 ts_col.push_back(rec.timestamp_ns);
756 type_col.push_back(static_cast<int32_t>(rec.type_id));
757 payload_col.push_back(
758 detail::base64_encode(
759 reinterpret_cast<const uint8_t*>(rec.payload.data()),
760 rec.payload.size()));
761 }
762
763 { auto r = current_writer_->write_column<int64_t>(0, ts_col.data(), ts_col.size()); if (!r) return r; }
764 { auto r = current_writer_->write_column<int32_t>(1, type_col.data(), type_col.size()); if (!r) return r; }
765 { auto r = current_writer_->write_column(2, payload_col.data(), payload_col.size()); if (!r) return r; }
766 { auto r = current_writer_->flush_row_group(); if (!r) return r; }
767
768 current_file_rows_ += chunk_size;
769 written += chunk_size;
770
771 {
772 std::error_code ec;
773 auto fsz = std::filesystem::file_size(current_file_path_, ec);
774 if (!ec) bytes_written_.store(static_cast<int64_t>(fsz), std::memory_order_relaxed);
775 }
776 records_written_.fetch_add(static_cast<int64_t>(chunk_size), std::memory_order_relaxed);
777
778 if (current_file_rows_ >= opts_.max_file_rows) {
779 auto r = current_writer_->close();
780 register_output_file(current_file_path_);
781 current_writer_.reset();
782 current_file_rows_ = 0;
783 if (!r) return r.error();
784 }
785 }
786
787 // Note: partial (not-yet-full) files are NOT registered here.
788 // They are registered only when closed by flush() or stop(), so
789 // that HybridReader never sees a file without a valid Parquet footer.
790
791 return expected<void>{};
792 }
793
794 // ----- File naming -----
795 std::string next_output_path() {
796 using namespace std::chrono;
797 const int64_t ts_ns = duration_cast<nanoseconds>(
798 system_clock::now().time_since_epoch()).count();
799 char buf[64];
800 std::snprintf(buf, sizeof(buf), "%08zu_%lld",
801 file_index_++, static_cast<long long>(ts_ns));
802 return opts_.output_dir + "/" + opts_.file_prefix + "_" + buf + ".parquet";
803 }
804
805 void register_output_file(const std::string& path) {
806 std::lock_guard<std::mutex> lk(files_mutex_);
807 for (const auto& p : output_files_) { if (p == path) return; }
808 output_files_.push_back(path);
809 files_written_.store(static_cast<int64_t>(output_files_.size()),
810 std::memory_order_relaxed);
811 }
812
813 // ----- Members -----
814 Options opts_;
815 std::unique_ptr<RingBuffer> ring_;
816 mutable std::mutex submit_mutex_; // serialises multi-producer submits
817 std::mutex consumer_mutex_; // single-consumer serialiser (flush vs loop)
818 std::thread worker_;
819 std::atomic<bool> running_{false};
820 std::atomic<bool> stop_requested_{false};
821 std::mutex cv_mutex_;
822 std::condition_variable cv_;
823 std::atomic<uint64_t> records_submitted_{0};
824 std::atomic<int64_t> records_written_{0};
825 std::atomic<uint64_t> records_dropped_{0};
826 std::atomic<int64_t> files_written_{0};
827 std::atomic<int64_t> bytes_written_{0};
828 std::unique_ptr<ParquetWriter> current_writer_;
829 std::string current_file_path_;
830 size_t current_file_rows_{0};
831 size_t file_index_{0};
832 mutable std::mutex files_mutex_;
833 std::vector<std::string> output_files_;
834 };
835
836 std::unique_ptr<Impl> impl_;
837
838 explicit StreamingSink(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
839};
840
841// ============================================================================
842// HybridReaderOptions — options for HybridReader::create()
843// (defined outside HybridReader to avoid Apple Clang default-member-init bug)
844// ============================================================================
845
850 std::vector<std::string> parquet_files;
851 int64_t start_timestamp = 0;
852 int64_t end_timestamp = (std::numeric_limits<int64_t>::max)();
853 uint32_t type_id_filter = 0;
854};
855
856// ============================================================================
857// HybridQueryOptions
858// (defined outside HybridReader to avoid Apple Clang default-member-init bug)
859// ============================================================================
860
863 int64_t start_ns = 0;
864 int64_t end_ns = (std::numeric_limits<int64_t>::max)();
865 uint32_t type_id = 0;
866 size_t max_rows = (std::numeric_limits<size_t>::max)();
867};
868
869// ============================================================================
870// HybridReader
871// Query records across historical Parquet files + live ring buffer snapshot.
872// ============================================================================
873
883public:
888
889 // -------------------------------------------------------------------------
890 // Factory — create from Options struct (preferred API)
891 // -------------------------------------------------------------------------
892
897 [[nodiscard]] static expected<HybridReader> create(Options opts) {
898 HybridReader reader(std::move(opts.parquet_files));
899 reader.filter_start_ = opts.start_timestamp;
900 reader.filter_end_ = opts.end_timestamp;
901 reader.filter_type_ = opts.type_id_filter;
902 return reader;
903 }
904
905 // -------------------------------------------------------------------------
906 // Constructors
907 // -------------------------------------------------------------------------
908
911 explicit HybridReader(const StreamingSink& sink)
912 : parquet_files_(sink.output_files())
913 {
914 // Snapshot live records from the ring buffer by re-reading submitted
915 // records that haven't been flushed yet. Because we cannot directly
916 // access the SPSC ring from outside (it is private), we consume the
917 // list of output files (already flushed) and note the gap. The live
918 // snapshot is only the count of records that the background thread
919 // has not yet written — unavailable without internal access. We
920 // expose this as "live records = 0" from the public perspective, which
921 // is the safe conservative answer. Users who need live records should
922 // call sink.flush() first.
923 //
924 // If callers need a precise live snapshot they can:
925 // 1. sink.flush()
926 // 2. HybridReader(sink)
927 live_count_ = 0;
928 }
929
931 explicit HybridReader(std::vector<std::string> parquet_files)
932 : parquet_files_(std::move(parquet_files))
933 , live_count_(0)
934 {}
935
936 // -------------------------------------------------------------------------
937 // Read
938 // -------------------------------------------------------------------------
939
941 [[nodiscard]] expected<std::vector<StreamRecord>> read(QueryOptions opts = {}) const {
942 std::vector<StreamRecord> result;
943
944 // Column indices for stream_data schema: 0=timestamp_ns, 1=type_id, 2=payload.
945 static constexpr size_t kColTs = 0;
946 static constexpr size_t kColTypeId = 1;
947 static constexpr size_t kColPayload = 2;
948
949 for (const auto& file_path : parquet_files_) {
950 if (result.size() >= opts.max_rows) break;
951
952 // Skip files that don't exist yet (may still be open for writing).
953 std::error_code ec;
954 if (!std::filesystem::exists(file_path, ec)) continue;
955
956 auto reader_result = ParquetReader::open(file_path);
957 if (!reader_result) {
958 // Skip unreadable / partially-written files.
959 continue;
960 }
961
962 auto& reader = *reader_result;
963
964 const size_t num_rg = static_cast<size_t>(reader.num_row_groups());
965
966 for (size_t rg = 0; rg < num_rg; ++rg) {
967 if (result.size() >= opts.max_rows) break;
968
969 // Read typed columns from this row group.
970 auto ts_result = reader.read_column<int64_t>(rg, kColTs);
971 if (!ts_result) continue;
972
973 auto type_result = reader.read_column<int32_t>(rg, kColTypeId);
974 if (!type_result) continue;
975
976 auto payload_result = reader.read_column<std::string>(rg, kColPayload);
977 if (!payload_result) continue;
978
979 const auto& ts_col = *ts_result;
980 const auto& type_col = *type_result;
981 const auto& payload_col = *payload_result;
982
983 const size_t nrows = (std::min)({ts_col.size(),
984 type_col.size(),
985 payload_col.size()});
986
987 for (size_t i = 0; i < nrows; ++i) {
988 if (result.size() >= opts.max_rows) break;
989
990 const int64_t ts = ts_col[i];
991 const uint32_t tid = static_cast<uint32_t>(type_col[i]);
992
993 // Apply time-range filter.
994 if (ts < opts.start_ns || ts > opts.end_ns) continue;
995
996 // Apply type_id filter (0 = accept all types).
997 if (opts.type_id != 0 && tid != opts.type_id) continue;
998
999 StreamRecord rec;
1000 rec.timestamp_ns = ts;
1001 rec.type_id = tid;
1002 auto decoded = detail::base64_decode(payload_col[i]);
1003 rec.payload.assign(decoded.begin(), decoded.end());
1004
1005 result.push_back(std::move(rec));
1006 }
1007 }
1008 }
1009
1010 return result;
1011 }
1012
1013 // -------------------------------------------------------------------------
1014 // Convenience read that uses filter fields set by create()
1015 // -------------------------------------------------------------------------
1016
1019 QueryOptions qopts;
1020 qopts.start_ns = filter_start_;
1021 qopts.end_ns = filter_end_;
1022 qopts.type_id = filter_type_;
1023 return read(qopts);
1024 }
1025
1026 // -------------------------------------------------------------------------
1027 // Queries
1028 // -------------------------------------------------------------------------
1029
1031 [[nodiscard]] size_t num_files() const { return parquet_files_.size(); }
1032
1034 [[nodiscard]] size_t num_live() const { return live_count_; }
1035
1036private:
1037 std::vector<std::string> parquet_files_;
1038 size_t live_count_ = 0;
1039
1040 // Filter fields populated by create(Options) — default values = no filtering.
1041 int64_t filter_start_ = 0;
1042 int64_t filter_end_ = (std::numeric_limits<int64_t>::max)();
1043 uint32_t filter_type_ = 0;
1044};
1045
1046} // namespace signet::forge
Reads StreamRecords across historical Parquet files and (optionally) a live StreamingSink ring buffer...
expected< std::vector< StreamRecord > > read(QueryOptions opts={}) const
Read all matching records from historical Parquet files.
HybridReader(const StreamingSink &sink)
Construct from a live StreamingSink: captures a snapshot of the ring buffer plus the list of historic...
static expected< HybridReader > create(Options opts)
Create a HybridReader with pre-configured filters.
size_t num_live() const
Number of live records visible in ring snapshot (0 unless flush() was called first).
expected< std::vector< StreamRecord > > read_all() const
Read all records applying the filters set at construction (via Options).
size_t num_files() const
Number of historical Parquet files available.
HybridReader(std::vector< std::string > parquet_files)
Construct from an explicit list of Parquet file paths (no live ring).
Multiple-producer single-consumer (MPSC) bounded ring buffer.
bool empty() const
True if the buffer appears empty (approximate).
MpscRingBuffer(const MpscRingBuffer &)=delete
size_t pop(T *out, size_t max_count)
Bulk pop up to max_count items. Returns number popped.
bool full() const
True if the buffer appears full (approximate).
size_t push(const T *items, size_t count)
Bulk push. Returns the number of items actually pushed.
static constexpr size_t capacity()
The compile-time capacity of this ring buffer.
size_t size() const
Approximate number of items in the buffer (may transiently exceed Capacity during push).
MpscRingBuffer()
Construct the ring buffer, initializing all slot sequences.
bool push(T item)
Push one item. Returns false immediately if the buffer is full.
bool pop(T &out)
Pop one item into out. Returns false if the buffer is empty.
MpscRingBuffer & operator=(const MpscRingBuffer &)=delete
static expected< ParquetReader > open(const std::filesystem::path &path)
Open and parse a Parquet file, returning a ready-to-query reader.
Definition reader.hpp:189
static expected< ParquetWriter > open(const std::filesystem::path &path, const Schema &schema, const Options &options=Options{})
Open a new Parquet file for writing.
Definition writer.hpp:303
SchemaBuilder & column(std::string col_name, LogicalType logical_type=LogicalType::NONE)
Add a typed column, deducing PhysicalType from T.
Definition schema.hpp:107
static SchemaBuilder builder(std::string name)
Create a SchemaBuilder for fluent column construction.
Definition schema.hpp:228
Lock-free single-producer single-consumer (SPSC) bounded ring buffer.
SpscRingBuffer(const SpscRingBuffer &)=delete
bool full() const
True if the buffer appears full (approximate, no lock).
SpscRingBuffer & operator=(const SpscRingBuffer &)=delete
bool push(T item)
Push a single item. Returns false if the buffer is full.
size_t push(const T *items, size_t count)
Bulk push up to count items. Returns the number actually pushed.
size_t size() const
Approximate number of items currently in the buffer.
bool pop(T &out)
Pop a single item into out. Returns false if the buffer is empty.
size_t pop(T *out, size_t max_count)
Bulk pop up to max_count items into out. Returns number popped.
static constexpr size_t capacity()
The compile-time capacity of this ring buffer.
SpscRingBuffer()
Construct an empty ring buffer.
bool empty() const
True if the buffer appears empty (approximate, no lock).
Background-thread Parquet compaction sink fed by a lock-free ring buffer.
expected< void > submit(int64_t timestamp_ns, uint32_t type_id, const uint8_t *data, size_t size)
Submit a record from raw bytes.
std::vector< std::string > output_files() const
List of completed Parquet output file paths (thread-safe snapshot).
expected< void > flush()
Drain the ring buffer and write all pending records, then close the current Parquet file so it has a ...
uint64_t records_dropped() const
Total number of records dropped due to ring buffer overflow.
StreamingSink(const StreamingSink &)=delete
int64_t records_written() const
Total number of records written to Parquet files.
StreamingSink & operator=(const StreamingSink &)=delete
static expected< StreamingSink > create(Options opts)
Create a StreamingSink with the given options.
uint64_t records_submitted() const
Total number of records successfully submitted to the ring buffer.
expected< void > submit(int64_t timestamp_ns, uint32_t type_id, std::string_view sv)
Submit a record from a string_view payload.
expected< void > stop()
Stop the background thread, drain remaining records, and close open files.
expected< void > submit(StreamRecord rec)
Submit a fully-constructed StreamRecord to the ring buffer.
StreamingSink(StreamingSink &&)=default
void stop_nowait()
Signal the background thread to stop without waiting for it to finish.
StreamingSink & operator=(StreamingSink &&)=default
void start()
Start the background compaction thread (no-op if already running).
int64_t files_written() const
Number of completed Parquet output files.
int64_t bytes_written() const
Approximate total bytes written to the current output file.
A lightweight result type that holds either a success value of type T or an Error.
Definition error.hpp:145
@ TIMESTAMP_NS
Timestamp — INT64, nanoseconds since Unix epoch.
@ STRING
UTF-8 string (stored as BYTE_ARRAY).
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ INTERNAL_ERROR
An unexpected internal error that does not fit any other category.
Schema definition types: Column<T>, SchemaBuilder, and Schema.
Lightweight error value carrying an ErrorCode and a human-readable message.
Definition error.hpp:101
Per-query filter options passed to HybridReader::read().
uint32_t type_id
Type ID filter (0 = all types)
int64_t end_ns
Maximum timestamp_ns (inclusive)
size_t max_rows
Maximum records to return.
int64_t start_ns
Minimum timestamp_ns (inclusive)
Options for constructing a HybridReader via HybridReader::create().
uint32_t type_id_filter
Type ID filter (0 = accept all types)
std::vector< std::string > parquet_files
Parquet files to query.
int64_t end_timestamp
Maximum timestamp_ns (inclusive)
int64_t start_timestamp
Minimum timestamp_ns (inclusive)
A single record flowing through the StreamingSink pipeline.
int64_t timestamp_ns
Wall-clock timestamp in nanoseconds since Unix epoch.
uint32_t type_id
User-defined record type tag (0 = untyped)
std::string payload
Serialized record bytes (UTF-8 safe or binary via base64)
Configuration options for StreamingSink::create().
std::string output_dir
Directory for output Parquet files (required)
bool auto_start
Start the background thread immediately on create()
size_t max_file_rows
Maximum rows per output Parquet file before rolling.
std::chrono::milliseconds flush_interval
Background thread wake-up interval.
size_t ring_buffer_capacity
Soft cap on ring buffer occupancy (must be power of 2)
std::string file_prefix
Filename prefix for output files.
size_t row_group_size
Records per Parquet row group.
Parquet format enumerations, type traits, and statistics structs.
Write-Ahead Log (WAL) with sub-millisecond append, CRC-32 integrity, crash recovery,...