Signet Forge 0.1.1
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
streaming_sink.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
3// streaming_sink.hpp — Lock-free SPSC ring buffer + async Parquet compaction
4// for SignetStack Signet Forge.
5// Phase 8: Streaming WAL + Async Compaction.
6
11
12#pragma once
13
14#include "signet/error.hpp"
15#include "signet/types.hpp"
16#include "signet/schema.hpp"
17#include "signet/writer.hpp"
18#include "signet/reader.hpp"
19#include "signet/ai/wal.hpp"
20
21#include <atomic>
22#include <chrono>
23#include <condition_variable>
24#include <cstdint>
25#include <cstring>
26#include <filesystem>
27#include <functional>
28#include <limits>
29#include <memory>
30#include <mutex>
31#include <numeric>
32#include <string>
33#include <string_view>
34#include <thread>
35#include <vector>
36
37namespace signet::forge {
38
39// ============================================================================
40// detail — internal helpers (base64 encode/decode, etc.)
41// ============================================================================
42
44namespace detail {
45
47inline constexpr char kBase64Chars[] =
48 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
49
51inline std::string base64_encode(const uint8_t* data, size_t len) {
52 std::string result;
53 result.reserve(((len + 2) / 3) * 4);
54
55 for (size_t i = 0; i < len; i += 3) {
56 uint32_t octet_a = i < len ? data[i] : 0u;
57 uint32_t octet_b = i + 1 < len ? data[i + 1] : 0u;
58 uint32_t octet_c = i + 2 < len ? data[i + 2] : 0u;
59
60 uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c;
61
62 result.push_back(kBase64Chars[(triple >> 18) & 0x3Fu]);
63 result.push_back(kBase64Chars[(triple >> 12) & 0x3Fu]);
64 result.push_back(i + 1 < len ? kBase64Chars[(triple >> 6) & 0x3Fu] : '=');
65 result.push_back(i + 2 < len ? kBase64Chars[(triple ) & 0x3Fu] : '=');
66 }
67
68 return result;
69}
70
73inline uint8_t base64_val(char c) {
74 if (c >= 'A' && c <= 'Z') return static_cast<uint8_t>(c - 'A');
75 if (c >= 'a' && c <= 'z') return static_cast<uint8_t>(c - 'a' + 26);
76 if (c >= '0' && c <= '9') return static_cast<uint8_t>(c - '0' + 52);
77 if (c == '+') return 62u;
78 if (c == '/') return 63u;
79 if (c == '=') return 64u; // padding sentinel
80 return 255u; // invalid
81}
82
85inline std::vector<uint8_t> base64_decode(std::string_view encoded) {
86 const size_t n = encoded.size();
87 if (n % 4 != 0) return {};
88
89 std::vector<uint8_t> result;
90 result.reserve((n / 4) * 3);
91
92 for (size_t i = 0; i < n; i += 4) {
93 uint8_t v0 = base64_val(encoded[i]);
94 uint8_t v1 = base64_val(encoded[i + 1]);
95 uint8_t v2 = base64_val(encoded[i + 2]);
96 uint8_t v3 = base64_val(encoded[i + 3]);
97
98 if (v0 == 255u || v1 == 255u || v2 == 255u || v3 == 255u) return {};
99
100 uint32_t triple = (static_cast<uint32_t>(v0) << 18) |
101 (static_cast<uint32_t>(v1) << 12) |
102 (static_cast<uint32_t>(v2) << 6) |
103 (static_cast<uint32_t>(v3));
104
105 result.push_back(static_cast<uint8_t>((triple >> 16) & 0xFFu));
106 if (v2 != 64u) result.push_back(static_cast<uint8_t>((triple >> 8) & 0xFFu));
107 if (v3 != 64u) result.push_back(static_cast<uint8_t>( triple & 0xFFu));
108 }
109
110 return result;
111}
112
113} // namespace detail
115
116// ============================================================================
117// SpscRingBuffer<T, Capacity>
118// Lock-free single-producer single-consumer ring buffer.
119// Capacity must be a power of two.
120// ============================================================================
121
131template <typename T, size_t Capacity>
133 static_assert((Capacity & (Capacity - 1)) == 0,
134 "SpscRingBuffer: Capacity must be a power of 2");
135
136public:
138 SpscRingBuffer() : head_(0), tail_(0) {}
139
140 // Non-copyable, non-movable (atomic members).
143
144 // -------------------------------------------------------------------------
145 // Producer API (single producer thread)
146 // -------------------------------------------------------------------------
147
149 bool push(T item) {
150 const size_t head = head_.load(std::memory_order_relaxed);
151 const size_t next = (head + 1) & kMask;
152
153 if (next == tail_.load(std::memory_order_acquire)) {
154 return false; // full
155 }
156
157 storage_[head] = std::move(item);
158 head_.store(next, std::memory_order_release);
159 return true;
160 }
161
163 size_t push(const T* items, size_t count) {
164 size_t pushed = 0;
165 for (size_t i = 0; i < count; ++i) {
166 if (!push(items[i])) break;
167 ++pushed;
168 }
169 return pushed;
170 }
171
172 // -------------------------------------------------------------------------
173 // Consumer API (single consumer thread)
174 // -------------------------------------------------------------------------
175
177 bool pop(T& out) {
178 const size_t tail = tail_.load(std::memory_order_relaxed);
179
180 if (tail == head_.load(std::memory_order_acquire)) {
181 return false; // empty
182 }
183
184 out = std::move(storage_[tail]);
185 tail_.store((tail + 1) & kMask, std::memory_order_release);
186 return true;
187 }
188
190 size_t pop(T* out, size_t max_count) {
191 size_t popped = 0;
192 while (popped < max_count) {
193 const size_t tail = tail_.load(std::memory_order_relaxed);
194 if (tail == head_.load(std::memory_order_acquire)) break;
195
196 out[popped] = std::move(storage_[tail]);
197 tail_.store((tail + 1) & kMask, std::memory_order_release);
198 ++popped;
199 }
200 return popped;
201 }
202
203 // -------------------------------------------------------------------------
204 // Queries (approximate — no lock)
205 // -------------------------------------------------------------------------
206
208 [[nodiscard]] size_t size() const {
209 const size_t head = head_.load(std::memory_order_acquire);
210 const size_t tail = tail_.load(std::memory_order_acquire);
211 return (head - tail) & kMask;
212 }
213
215 [[nodiscard]] bool empty() const { return size() == 0; }
217 [[nodiscard]] bool full() const { return size() == Capacity - 1; }
218
220 static constexpr size_t capacity() { return Capacity; }
221
222private:
223 static constexpr size_t kMask = Capacity - 1;
224
225 alignas(64) std::atomic<size_t> head_;
226 alignas(64) std::atomic<size_t> tail_;
227 // L28: storage_ is a fixed-size C array embedded in the object. For large
228 // Capacity values, this can exceed stack limits if allocated on the stack.
229 // Always heap-allocate SpscRingBuffer (e.g., via std::make_unique) when
230 // Capacity * sizeof(T) is significant.
231 T storage_[Capacity];
232};
233
234// ============================================================================
235// MpscRingBuffer<T, Capacity>
236// Multiple-Producer Single-Consumer bounded ring buffer.
237// Lock-free producers via CAS on enqueue position (Vyukov-style).
238// Single consumer requires no locking on the dequeue path.
239// Capacity must be a power of two.
240// ============================================================================
241
250template <typename T, size_t Capacity>
252 static_assert((Capacity & (Capacity - 1)) == 0,
253 "MpscRingBuffer: Capacity must be a power of 2");
254
255 // Per-slot handshake sequence. Initialized to slot index.
256 // After a producer writes: sequence = enqueue_pos + 1.
257 // After the consumer reads: sequence = enqueue_pos + Capacity (ready for reuse).
258 struct Slot {
259 std::atomic<size_t> sequence{0};
260 T data{};
261 };
262
263 static constexpr size_t kMask = Capacity - 1;
264
265 alignas(64) std::atomic<size_t> enqueue_pos_{0};
266 alignas(64) size_t dequeue_pos_{0}; // single consumer — plain size_t
267 Slot slots_[Capacity];
268
269public:
272 for (size_t i = 0; i < Capacity; ++i) {
273 slots_[i].sequence.store(i, std::memory_order_relaxed);
274 }
275 }
276
277 // Non-copyable, non-movable (atomic members).
280
281 // -------------------------------------------------------------------------
282 // Producer API — safe to call from multiple threads concurrently
283 // -------------------------------------------------------------------------
284
286 bool push(T item) {
287 size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
288 for (;;) {
289 Slot& slot = slots_[pos & kMask];
290 const size_t seq = slot.sequence.load(std::memory_order_acquire);
291 const auto diff = static_cast<std::ptrdiff_t>(seq)
292 - static_cast<std::ptrdiff_t>(pos);
293
294 if (diff == 0) {
295 // Slot is free: try to claim it with a CAS on enqueue_pos_.
296 if (enqueue_pos_.compare_exchange_weak(
297 pos, pos + 1,
298 std::memory_order_relaxed,
299 std::memory_order_relaxed)) {
300 // We own the slot — write data and publish.
301 slot.data = std::move(item);
302 slot.sequence.store(pos + 1, std::memory_order_release);
303 return true;
304 }
305 // Another producer beat us; pos was updated by the CAS failure — retry.
306 } else if (diff < 0) {
307 // Consumer hasn't freed this slot yet — buffer is full.
308 return false;
309 } else {
310 // Another producer is ahead of us; reload the current position.
311 pos = enqueue_pos_.load(std::memory_order_relaxed);
312 }
313 }
314 }
315
317 size_t push(const T* items, size_t count) {
318 size_t pushed = 0;
319 for (size_t i = 0; i < count; ++i) {
320 if (!push(items[i])) break;
321 ++pushed;
322 }
323 return pushed;
324 }
325
326 // -------------------------------------------------------------------------
327 // Consumer API — must be called from a single thread only
328 // -------------------------------------------------------------------------
329
331 bool pop(T& out) {
332 const size_t pos = dequeue_pos_;
333 Slot& slot = slots_[pos & kMask];
334 const size_t seq = slot.sequence.load(std::memory_order_acquire);
335
336 // The producer publishes by setting sequence = enqueue_pos + 1 = pos + 1.
337 if (seq != pos + 1) {
338 return false; // nothing ready yet
339 }
340
341 out = std::move(slot.data);
342 // Release the slot back to producers: next cycle starts at pos + Capacity.
343 slot.sequence.store(pos + Capacity, std::memory_order_release);
344 dequeue_pos_ = pos + 1;
345 return true;
346 }
347
349 size_t pop(T* out, size_t max_count) {
350 size_t popped = 0;
351 while (popped < max_count && pop(out[popped])) {
352 ++popped;
353 }
354 return popped;
355 }
356
357 // -------------------------------------------------------------------------
358 // Queries (approximate)
359 // -------------------------------------------------------------------------
360
362 [[nodiscard]] size_t size() const {
363 const size_t eq = enqueue_pos_.load(std::memory_order_acquire);
364 return eq - dequeue_pos_; // may exceed Capacity briefly during push; clamp if needed
365 }
367 [[nodiscard]] bool empty() const { return size() == 0; }
369 [[nodiscard]] bool full() const { return size() >= Capacity; }
370
372 static constexpr size_t capacity() { return Capacity; }
373};
374
375// ============================================================================
376// StreamRecord — unit of data flowing through the sink
377// ============================================================================
378
385 int64_t timestamp_ns = 0;
386 uint32_t type_id = 0;
387 std::string payload;
388};
389
390// ============================================================================
391// StreamingSink
392// Background-thread Parquet compaction sink.
393// ============================================================================
394
406public:
407 // -------------------------------------------------------------------------
408 // Options (nested struct)
409 // -------------------------------------------------------------------------
410
412 struct Options {
413 std::string output_dir;
414 std::string file_prefix = "stream";
415 size_t ring_buffer_capacity = 65536;
416 size_t row_group_size = 10'000;
417 size_t max_file_rows = 1'000'000;
418 std::chrono::milliseconds flush_interval{100};
419 bool auto_start = true;
420 };
421
422 // -------------------------------------------------------------------------
423 // Factory — returns expected<StreamingSink> (movable via unique_ptr<Impl>)
424 // -------------------------------------------------------------------------
425
434 [[nodiscard]] static expected<StreamingSink> create(Options opts) {
435 if (opts.output_dir.empty())
436 return Error{ErrorCode::IO_ERROR, "StreamingSink: output_dir must not be empty"};
437
438 // L29: Path traversal guard using std::filesystem::path iteration
439 // instead of manual string splitting for correctness across platforms.
440 {
441 std::filesystem::path p(opts.output_dir);
442 for (const auto& comp : p) {
443 if (comp == "..")
445 "StreamingSink: output_dir must not contain '..' path traversal"}; // CWE-22: Improper Limitation of a Pathname to a Restricted Directory
446 }
447 }
448
449 std::error_code ec;
450 std::filesystem::create_directories(opts.output_dir, ec);
451 if (ec)
453 "StreamingSink: cannot create output_dir '" +
454 opts.output_dir + "': " + ec.message()};
455
456 // CWE-22: file_prefix is concatenated into output filenames —
457 // reject path separators and traversal to prevent directory escape.
458 {
459 const auto& pfx = opts.file_prefix;
460 if (pfx.find('/') != std::string::npos ||
461 pfx.find('\\') != std::string::npos ||
462 pfx.find("..") != std::string::npos ||
463 pfx.find('\0') != std::string::npos)
465 "StreamingSink: file_prefix contains path separator or traversal"};
466 }
467
468 const size_t cap = opts.ring_buffer_capacity;
469 if (cap == 0 || (cap & (cap - 1)) != 0)
471 "StreamingSink: ring_buffer_capacity must be a power of 2"};
472
473 auto impl = std::make_unique<Impl>(std::move(opts));
474 const bool auto_start = impl->opts_.auto_start;
475 StreamingSink sink(std::move(impl));
476 if (auto_start) sink.impl_->start();
477 return sink; // NRVO — std::move is redundant here (GCC -Wredundant-move)
478 }
479
480 // -------------------------------------------------------------------------
481 // Movable, non-copyable
482 // -------------------------------------------------------------------------
483
486 StreamingSink(const StreamingSink&) = delete;
488
490 if (impl_) (void)impl_->stop();
491 }
492
493 // -------------------------------------------------------------------------
494 // Submit (MPSC-safe via mutex in Impl)
495 // -------------------------------------------------------------------------
496
501 [[nodiscard]] expected<void> submit(StreamRecord rec) {
502 return impl_->submit(std::move(rec));
503 }
504
511 [[nodiscard]] expected<void> submit(int64_t timestamp_ns, uint32_t type_id,
512 const uint8_t* data, size_t size) {
513 StreamRecord rec;
514 rec.timestamp_ns = timestamp_ns;
515 rec.type_id = type_id;
516 rec.payload.assign(reinterpret_cast<const char*>(data), size);
517 return impl_->submit(std::move(rec));
518 }
519
525 [[nodiscard]] expected<void> submit(int64_t timestamp_ns, uint32_t type_id,
526 std::string_view sv) {
527 StreamRecord rec;
528 rec.timestamp_ns = timestamp_ns;
529 rec.type_id = type_id;
530 rec.payload.assign(sv.data(), sv.size());
531 return impl_->submit(std::move(rec));
532 }
533
534 // -------------------------------------------------------------------------
535 // Lifecycle
536 // -------------------------------------------------------------------------
537
539 void start() { impl_->start(); }
543 [[nodiscard]] expected<void> stop() { return impl_->stop(); }
545 void stop_nowait() { impl_->stop_nowait(); }
546
547 // -------------------------------------------------------------------------
548 // Force flush (blocking)
549 // -------------------------------------------------------------------------
550
555 [[nodiscard]] expected<void> flush() { return impl_->flush(); }
556
557 // -------------------------------------------------------------------------
558 // Stats
559 // -------------------------------------------------------------------------
560
562 [[nodiscard]] uint64_t records_submitted() const { return impl_->records_submitted_.load(std::memory_order_relaxed); }
564 [[nodiscard]] int64_t records_written() const { return impl_->records_written_.load(std::memory_order_relaxed); }
566 [[nodiscard]] uint64_t records_dropped() const { return impl_->records_dropped_.load(std::memory_order_relaxed); }
568 [[nodiscard]] int64_t files_written() const { return impl_->files_written_.load(std::memory_order_relaxed); }
570 [[nodiscard]] int64_t bytes_written() const { return impl_->bytes_written_.load(std::memory_order_relaxed); }
571
572 // -------------------------------------------------------------------------
573 // Output file listing
574 // -------------------------------------------------------------------------
575
577 [[nodiscard]] std::vector<std::string> output_files() const {
578 std::lock_guard<std::mutex> lk(impl_->files_mutex_);
579 return impl_->output_files_;
580 }
581
582private:
583 // =========================================================================
584 // Impl — holds all non-movable state + implementation logic
585 // =========================================================================
586
587 struct Impl {
588 static constexpr size_t kRingCap = 65536; // power of 2
590
591 // ----- Construction -----
592 explicit Impl(Options opts)
593 : opts_(std::move(opts))
594 , ring_(std::make_unique<RingBuffer>())
595 {}
596
597 // ----- Ring helpers -----
598
599 // Soft-cap: honour opts_.ring_buffer_capacity even though the physical
600 // ring (kRingCap) may be larger. Records beyond the soft cap are dropped.
601 bool ring_push(StreamRecord rec) {
602 if (ring_->size() >= opts_.ring_buffer_capacity) return false;
603 return ring_->push(std::move(rec));
604 }
605
606 // Must be called with consumer_mutex_ held.
607 void drain_ring_locked(std::vector<StreamRecord>& batch, size_t max_count) {
608 StreamRecord r;
609 while (batch.size() < max_count && ring_->pop(r))
610 batch.push_back(std::move(r));
611 }
612
613 // ----- Submit -----
614 [[nodiscard]] expected<void> submit(StreamRecord rec) {
615 std::lock_guard<std::mutex> lk(submit_mutex_);
616 if (!ring_push(std::move(rec))) {
617 records_dropped_.fetch_add(1, std::memory_order_relaxed);
618 return Error{ErrorCode::IO_ERROR, "StreamingSink: ring buffer full, record dropped"};
619 }
620 records_submitted_.fetch_add(1, std::memory_order_relaxed);
621 cv_.notify_one();
622 return {};
623 }
624
625 // ----- Lifecycle -----
626 void start() {
627 bool exp = false;
628 if (!running_.compare_exchange_strong(exp, true, std::memory_order_acq_rel))
629 return;
630 stop_requested_.store(false, std::memory_order_release);
631 worker_ = std::thread(&Impl::compaction_loop, this);
632 }
633
634 [[nodiscard]] expected<void> stop() {
635 if (!running_.load(std::memory_order_acquire)) return {};
636 stop_requested_.store(true, std::memory_order_release);
637 cv_.notify_all();
638 if (worker_.joinable()) worker_.join();
639 running_.store(false, std::memory_order_release);
640 // Close any writer not already finalized in the compaction loop
641 // (can happen if stop_nowait() was used or thread exited early).
642 if (current_writer_) {
643 (void)current_writer_->close();
644 if (current_file_rows_ > 0) register_output_file(current_file_path_);
645 current_writer_.reset();
646 current_file_rows_ = 0;
647 }
648 return {};
649 }
650
651 void stop_nowait() {
652 stop_requested_.store(true, std::memory_order_release);
653 cv_.notify_all();
654 }
655
656 // ----- Flush -----
657 // Drain the ring and write all pending records, then close the current
658 // Parquet file so it has a valid footer and is immediately readable.
659 // Acquires consumer_mutex_ to serialise against the compaction thread.
660 [[nodiscard]] expected<void> flush() {
661 std::lock_guard<std::mutex> consumer_lk(consumer_mutex_);
662 std::vector<StreamRecord> batch;
663 drain_ring_locked(batch, (std::numeric_limits<size_t>::max)());
664 if (!batch.empty()) {
665 auto r = write_batch(batch);
666 if (!r) return r;
667 }
668 // Close the current writer (if open) so the file has a valid footer.
669 if (current_writer_) {
670 auto r = current_writer_->close();
671 if (current_file_rows_ > 0) register_output_file(current_file_path_);
672 current_writer_.reset();
673 current_file_rows_ = 0;
674 if (!r) return r.error();
675 }
676 return expected<void>{};
677 }
678
679 // ----- Background thread -----
680 void compaction_loop() {
681 std::vector<StreamRecord> batch;
682 batch.reserve(opts_.row_group_size);
683
684 while (true) {
685 {
686 std::unique_lock<std::mutex> lk(cv_mutex_);
687 cv_.wait_for(lk, opts_.flush_interval, [this] {
688 return stop_requested_.load(std::memory_order_acquire) ||
689 !ring_->empty();
690 });
691 }
692
693 // Acquire consumer_mutex_ to serialise with flush() calls from
694 // outside the loop (SPSC ring has only one consumer at a time).
695 {
696 std::lock_guard<std::mutex> consumer_lk(consumer_mutex_);
697 batch.clear();
698 drain_ring_locked(batch, opts_.row_group_size);
699 if (!batch.empty()) (void)write_batch(batch);
700 }
701
702 if (stop_requested_.load(std::memory_order_acquire)) {
703 std::lock_guard<std::mutex> consumer_lk(consumer_mutex_);
704 batch.clear();
705 drain_ring_locked(batch, (std::numeric_limits<size_t>::max)());
706 if (!batch.empty()) (void)write_batch(batch);
707 // Close any open writer so the Parquet footer is written.
708 if (current_writer_) {
709 (void)current_writer_->close();
710 if (current_file_rows_ > 0) register_output_file(current_file_path_);
711 current_writer_.reset();
712 current_file_rows_ = 0;
713 }
714 break;
715 }
716 }
717 }
718
719 // ----- Parquet batch write -----
720 [[nodiscard]] expected<void> write_batch(std::vector<StreamRecord>& batch) {
721 if (batch.empty()) return expected<void>{};
722
723 Schema schema = Schema::builder("stream_data")
724 .column<int64_t>("timestamp_ns", LogicalType::TIMESTAMP_NS)
725 .column<int32_t>("type_id")
726 .column<std::string>("payload", LogicalType::STRING)
727 .build();
728
729 size_t written = 0;
730 while (written < batch.size()) {
731 const size_t remaining_in_file =
732 opts_.max_file_rows > current_file_rows_
733 ? opts_.max_file_rows - current_file_rows_ : 0;
734
735 if (remaining_in_file == 0) {
736 if (current_writer_) {
737 auto r = current_writer_->close();
738 current_writer_.reset();
739 if (!r) return r.error();
740 }
741 current_file_rows_ = 0;
742 }
743
744 if (!current_writer_) {
745 std::string path = next_output_path();
746 WriterOptions wo;
747 wo.row_group_size = static_cast<int64_t>(opts_.row_group_size);
748 auto wr = ParquetWriter::open(path, schema, wo);
749 if (!wr) return wr.error();
750 current_writer_ = std::make_unique<ParquetWriter>(std::move(*wr));
751 current_file_path_ = path;
752 }
753
754 const size_t chunk_size =
755 (std::min)(batch.size() - written,
756 opts_.max_file_rows - current_file_rows_);
757
758 std::vector<int64_t> ts_col;
759 std::vector<int32_t> type_col;
760 std::vector<std::string> payload_col;
761 ts_col.reserve(chunk_size);
762 type_col.reserve(chunk_size);
763 payload_col.reserve(chunk_size);
764
765 for (size_t i = written; i < written + chunk_size; ++i) {
766 const auto& rec = batch[i];
767 ts_col.push_back(rec.timestamp_ns);
768 type_col.push_back(static_cast<int32_t>(rec.type_id));
769 payload_col.push_back(
770 detail::base64_encode(
771 reinterpret_cast<const uint8_t*>(rec.payload.data()),
772 rec.payload.size()));
773 }
774
775 { auto r = current_writer_->write_column<int64_t>(0, ts_col.data(), ts_col.size()); if (!r) return r; }
776 { auto r = current_writer_->write_column<int32_t>(1, type_col.data(), type_col.size()); if (!r) return r; }
777 { auto r = current_writer_->write_column(2, payload_col.data(), payload_col.size()); if (!r) return r; }
778 { auto r = current_writer_->flush_row_group(); if (!r) return r; }
779
780 current_file_rows_ += chunk_size;
781 written += chunk_size;
782
783 {
784 std::error_code ec;
785 auto fsz = std::filesystem::file_size(current_file_path_, ec);
786 if (!ec) bytes_written_.store(static_cast<int64_t>(fsz), std::memory_order_relaxed);
787 }
788 records_written_.fetch_add(static_cast<int64_t>(chunk_size), std::memory_order_relaxed);
789
790 if (current_file_rows_ >= opts_.max_file_rows) {
791 auto r = current_writer_->close();
792 register_output_file(current_file_path_);
793 current_writer_.reset();
794 current_file_rows_ = 0;
795 if (!r) return r.error();
796 }
797 }
798
799 // Note: partial (not-yet-full) files are NOT registered here.
800 // They are registered only when closed by flush() or stop(), so
801 // that HybridReader never sees a file without a valid Parquet footer.
802
803 return expected<void>{};
804 }
805
806 // ----- File naming -----
807 std::string next_output_path() {
808 using namespace std::chrono;
809 const int64_t ts_ns = duration_cast<nanoseconds>(
810 system_clock::now().time_since_epoch()).count();
811 char buf[64];
812 std::snprintf(buf, sizeof(buf), "%08zu_%lld",
813 file_index_++, static_cast<long long>(ts_ns));
814 return opts_.output_dir + "/" + opts_.file_prefix + "_" + buf + ".parquet";
815 }
816
817 void register_output_file(const std::string& path) {
818 std::lock_guard<std::mutex> lk(files_mutex_);
819 for (const auto& p : output_files_) { if (p == path) return; }
820 output_files_.push_back(path);
821 files_written_.store(static_cast<int64_t>(output_files_.size()),
822 std::memory_order_relaxed);
823 }
824
825 // ----- Members -----
826 Options opts_;
827 std::unique_ptr<RingBuffer> ring_;
828 mutable std::mutex submit_mutex_; // serialises multi-producer submits
829 std::mutex consumer_mutex_; // single-consumer serialiser (flush vs loop)
830 std::thread worker_;
831 std::atomic<bool> running_{false};
832 std::atomic<bool> stop_requested_{false};
833 std::mutex cv_mutex_;
834 std::condition_variable cv_;
835 std::atomic<uint64_t> records_submitted_{0};
836 std::atomic<int64_t> records_written_{0};
837 std::atomic<uint64_t> records_dropped_{0};
838 std::atomic<int64_t> files_written_{0};
839 std::atomic<int64_t> bytes_written_{0};
840 std::unique_ptr<ParquetWriter> current_writer_;
841 std::string current_file_path_;
842 size_t current_file_rows_{0};
843 size_t file_index_{0};
844 mutable std::mutex files_mutex_;
845 std::vector<std::string> output_files_;
846 };
847
848 std::unique_ptr<Impl> impl_;
849
850 explicit StreamingSink(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
851};
852
853// ============================================================================
854// HybridReaderOptions — options for HybridReader::create()
855// (defined outside HybridReader to avoid Apple Clang default-member-init bug)
856// ============================================================================
857
862 std::vector<std::string> parquet_files;
863 int64_t start_timestamp = 0;
864 int64_t end_timestamp = (std::numeric_limits<int64_t>::max)();
865 uint32_t type_id_filter = 0;
866};
867
868// ============================================================================
869// HybridQueryOptions
870// (defined outside HybridReader to avoid Apple Clang default-member-init bug)
871// ============================================================================
872
875 int64_t start_ns = 0;
876 int64_t end_ns = (std::numeric_limits<int64_t>::max)();
877 uint32_t type_id = 0;
878 size_t max_rows = (std::numeric_limits<size_t>::max)();
879};
880
881// ============================================================================
882// HybridReader
883// Query records across historical Parquet files + live ring buffer snapshot.
884// ============================================================================
885
895public:
900
901 // -------------------------------------------------------------------------
902 // Factory — create from Options struct (preferred API)
903 // -------------------------------------------------------------------------
904
909 [[nodiscard]] static expected<HybridReader> create(Options opts) {
910 HybridReader reader(std::move(opts.parquet_files));
911 reader.filter_start_ = opts.start_timestamp;
912 reader.filter_end_ = opts.end_timestamp;
913 reader.filter_type_ = opts.type_id_filter;
914 return reader;
915 }
916
917 // -------------------------------------------------------------------------
918 // Constructors
919 // -------------------------------------------------------------------------
920
923 explicit HybridReader(const StreamingSink& sink)
924 : parquet_files_(sink.output_files())
925 {
926 // Snapshot live records from the ring buffer by re-reading submitted
927 // records that haven't been flushed yet. Because we cannot directly
928 // access the SPSC ring from outside (it is private), we consume the
929 // list of output files (already flushed) and note the gap. The live
930 // snapshot is only the count of records that the background thread
931 // has not yet written — unavailable without internal access. We
932 // expose this as "live records = 0" from the public perspective, which
933 // is the safe conservative answer. Users who need live records should
934 // call sink.flush() first.
935 //
936 // If callers need a precise live snapshot they can:
937 // 1. sink.flush()
938 // 2. HybridReader(sink)
939 live_count_ = 0;
940 }
941
943 explicit HybridReader(std::vector<std::string> parquet_files)
944 : parquet_files_(std::move(parquet_files))
945 , live_count_(0)
946 {}
947
948 // -------------------------------------------------------------------------
949 // Read
950 // -------------------------------------------------------------------------
951
953 [[nodiscard]] expected<std::vector<StreamRecord>> read(QueryOptions opts = {}) const {
954 std::vector<StreamRecord> result;
955
956 // Column indices for stream_data schema: 0=timestamp_ns, 1=type_id, 2=payload.
957 static constexpr size_t kColTs = 0;
958 static constexpr size_t kColTypeId = 1;
959 static constexpr size_t kColPayload = 2;
960
961 for (const auto& file_path : parquet_files_) {
962 if (result.size() >= opts.max_rows) break;
963
964 // Skip files that don't exist yet (may still be open for writing).
965 std::error_code ec;
966 if (!std::filesystem::exists(file_path, ec)) continue;
967
968 auto reader_result = ParquetReader::open(file_path);
969 if (!reader_result) {
970 // Skip unreadable / partially-written files.
971 continue;
972 }
973
974 auto& reader = *reader_result;
975
976 const size_t num_rg = static_cast<size_t>(reader.num_row_groups());
977
978 for (size_t rg = 0; rg < num_rg; ++rg) {
979 if (result.size() >= opts.max_rows) break;
980
981 // Read typed columns from this row group.
982 auto ts_result = reader.read_column<int64_t>(rg, kColTs);
983 if (!ts_result) continue;
984
985 auto type_result = reader.read_column<int32_t>(rg, kColTypeId);
986 if (!type_result) continue;
987
988 auto payload_result = reader.read_column<std::string>(rg, kColPayload);
989 if (!payload_result) continue;
990
991 const auto& ts_col = *ts_result;
992 const auto& type_col = *type_result;
993 const auto& payload_col = *payload_result;
994
995 const size_t nrows = (std::min)({ts_col.size(),
996 type_col.size(),
997 payload_col.size()});
998
999 for (size_t i = 0; i < nrows; ++i) {
1000 if (result.size() >= opts.max_rows) break;
1001
1002 const int64_t ts = ts_col[i];
1003 const uint32_t tid = static_cast<uint32_t>(type_col[i]);
1004
1005 // Apply time-range filter.
1006 if (ts < opts.start_ns || ts > opts.end_ns) continue;
1007
1008 // Apply type_id filter (0 = accept all types).
1009 if (opts.type_id != 0 && tid != opts.type_id) continue;
1010
1011 StreamRecord rec;
1012 rec.timestamp_ns = ts;
1013 rec.type_id = tid;
1014 auto decoded = detail::base64_decode(payload_col[i]);
1015 rec.payload.assign(decoded.begin(), decoded.end());
1016
1017 result.push_back(std::move(rec));
1018 }
1019 }
1020 }
1021
1022 return result;
1023 }
1024
1025 // -------------------------------------------------------------------------
1026 // Convenience read that uses filter fields set by create()
1027 // -------------------------------------------------------------------------
1028
1031 QueryOptions qopts;
1032 qopts.start_ns = filter_start_;
1033 qopts.end_ns = filter_end_;
1034 qopts.type_id = filter_type_;
1035 return read(qopts);
1036 }
1037
1038 // -------------------------------------------------------------------------
1039 // Queries
1040 // -------------------------------------------------------------------------
1041
1043 [[nodiscard]] size_t num_files() const { return parquet_files_.size(); }
1044
1046 [[nodiscard]] size_t num_live() const { return live_count_; }
1047
1048private:
1049 std::vector<std::string> parquet_files_;
1050 size_t live_count_ = 0;
1051
1052 // Filter fields populated by create(Options) — default values = no filtering.
1053 int64_t filter_start_ = 0;
1054 int64_t filter_end_ = (std::numeric_limits<int64_t>::max)();
1055 uint32_t filter_type_ = 0;
1056};
1057
1058} // namespace signet::forge
Reads StreamRecords across historical Parquet files and (optionally) a live StreamingSink ring buffer...
expected< std::vector< StreamRecord > > read(QueryOptions opts={}) const
Read all matching records from historical Parquet files.
HybridReader(const StreamingSink &sink)
Construct from a live StreamingSink: captures a snapshot of the ring buffer plus the list of historic...
static expected< HybridReader > create(Options opts)
Create a HybridReader with pre-configured filters.
size_t num_live() const
Number of live records visible in ring snapshot (0 unless flush() was called first).
expected< std::vector< StreamRecord > > read_all() const
Read all records applying the filters set at construction (via Options).
size_t num_files() const
Number of historical Parquet files available.
HybridReader(std::vector< std::string > parquet_files)
Construct from an explicit list of Parquet file paths (no live ring).
Multiple-producer single-consumer (MPSC) bounded ring buffer.
bool empty() const
True if the buffer appears empty (approximate).
MpscRingBuffer(const MpscRingBuffer &)=delete
size_t pop(T *out, size_t max_count)
Bulk pop up to max_count items. Returns number popped.
bool full() const
True if the buffer appears full (approximate).
size_t push(const T *items, size_t count)
Bulk push. Returns the number of items actually pushed.
static constexpr size_t capacity()
The compile-time capacity of this ring buffer.
size_t size() const
Approximate number of items in the buffer (may transiently exceed Capacity during push).
MpscRingBuffer()
Construct the ring buffer, initializing all slot sequences.
bool push(T item)
Push one item. Returns false immediately if the buffer is full.
bool pop(T &out)
Pop one item into out. Returns false if the buffer is empty.
MpscRingBuffer & operator=(const MpscRingBuffer &)=delete
static expected< ParquetReader > open(const std::filesystem::path &path)
Open and parse a Parquet file, returning a ready-to-query reader.
Definition reader.hpp:189
static expected< ParquetWriter > open(const std::filesystem::path &path, const Schema &schema, const Options &options=Options{})
Open a new Parquet file for writing.
Definition writer.hpp:303
SchemaBuilder & column(std::string col_name, LogicalType logical_type=LogicalType::NONE)
Add a typed column, deducing PhysicalType from T.
Definition schema.hpp:107
static SchemaBuilder builder(std::string name)
Create a SchemaBuilder for fluent column construction.
Definition schema.hpp:228
Lock-free single-producer single-consumer (SPSC) bounded ring buffer.
SpscRingBuffer(const SpscRingBuffer &)=delete
bool full() const
True if the buffer appears full (approximate, no lock).
SpscRingBuffer & operator=(const SpscRingBuffer &)=delete
bool push(T item)
Push a single item. Returns false if the buffer is full.
size_t push(const T *items, size_t count)
Bulk push up to count items. Returns the number actually pushed.
size_t size() const
Approximate number of items currently in the buffer.
bool pop(T &out)
Pop a single item into out. Returns false if the buffer is empty.
size_t pop(T *out, size_t max_count)
Bulk pop up to max_count items into out. Returns number popped.
static constexpr size_t capacity()
The compile-time capacity of this ring buffer.
SpscRingBuffer()
Construct an empty ring buffer.
bool empty() const
True if the buffer appears empty (approximate, no lock).
Background-thread Parquet compaction sink fed by a lock-free ring buffer.
expected< void > submit(int64_t timestamp_ns, uint32_t type_id, const uint8_t *data, size_t size)
Submit a record from raw bytes.
std::vector< std::string > output_files() const
List of completed Parquet output file paths (thread-safe snapshot).
expected< void > flush()
Drain the ring buffer and write all pending records, then close the current Parquet file so it has a ...
uint64_t records_dropped() const
Total number of records dropped due to ring buffer overflow.
StreamingSink(const StreamingSink &)=delete
int64_t records_written() const
Total number of records written to Parquet files.
StreamingSink & operator=(const StreamingSink &)=delete
static expected< StreamingSink > create(Options opts)
Create a StreamingSink with the given options.
uint64_t records_submitted() const
Total number of records successfully submitted to the ring buffer.
expected< void > submit(int64_t timestamp_ns, uint32_t type_id, std::string_view sv)
Submit a record from a string_view payload.
expected< void > stop()
Stop the background thread, drain remaining records, and close open files.
expected< void > submit(StreamRecord rec)
Submit a fully-constructed StreamRecord to the ring buffer.
StreamingSink(StreamingSink &&)=default
void stop_nowait()
Signal the background thread to stop without waiting for it to finish.
StreamingSink & operator=(StreamingSink &&)=default
void start()
Start the background compaction thread (no-op if already running).
int64_t files_written() const
Number of completed Parquet output files.
int64_t bytes_written() const
Approximate total bytes written to the current output file.
A lightweight result type that holds either a success value of type T or an Error.
Definition error.hpp:143
@ TIMESTAMP_NS
Timestamp — INT64, nanoseconds since Unix epoch.
@ STRING
UTF-8 string (stored as BYTE_ARRAY).
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ INTERNAL_ERROR
An unexpected internal error that does not fit any other category.
Schema definition types: Column<T>, SchemaBuilder, and Schema.
Lightweight error value carrying an ErrorCode and a human-readable message.
Definition error.hpp:99
Per-query filter options passed to HybridReader::read().
uint32_t type_id
Type ID filter (0 = all types)
int64_t end_ns
Maximum timestamp_ns (inclusive)
size_t max_rows
Maximum records to return.
int64_t start_ns
Minimum timestamp_ns (inclusive)
Options for constructing a HybridReader via HybridReader::create().
uint32_t type_id_filter
Type ID filter (0 = accept all types)
std::vector< std::string > parquet_files
Parquet files to query.
int64_t end_timestamp
Maximum timestamp_ns (inclusive)
int64_t start_timestamp
Minimum timestamp_ns (inclusive)
A single record flowing through the StreamingSink pipeline.
int64_t timestamp_ns
Wall-clock timestamp in nanoseconds since Unix epoch.
uint32_t type_id
User-defined record type tag (0 = untyped)
std::string payload
Serialized record bytes (UTF-8 safe or binary via base64)
Configuration options for StreamingSink::create().
std::string output_dir
Directory for output Parquet files (required)
bool auto_start
Start the background thread immediately on create()
size_t max_file_rows
Maximum rows per output Parquet file before rolling.
std::chrono::milliseconds flush_interval
Background thread wake-up interval.
size_t ring_buffer_capacity
Soft cap on ring buffer occupancy (must be power of 2)
std::string file_prefix
Filename prefix for output files.
size_t row_group_size
Records per Parquet row group.
Parquet format enumerations, type traits, and statistics structs.
Write-Ahead Log (WAL) with sub-millisecond append, CRC-32 integrity, crash recovery,...