23#include <condition_variable>
47inline constexpr char kBase64Chars[] =
48 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
51inline std::string base64_encode(
const uint8_t* data,
size_t len) {
53 result.reserve(((len + 2) / 3) * 4);
55 for (
size_t i = 0; i < len; i += 3) {
56 uint32_t octet_a = i < len ? data[i] : 0u;
57 uint32_t octet_b = i + 1 < len ? data[i + 1] : 0u;
58 uint32_t octet_c = i + 2 < len ? data[i + 2] : 0u;
60 uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c;
62 result.push_back(kBase64Chars[(triple >> 18) & 0x3Fu]);
63 result.push_back(kBase64Chars[(triple >> 12) & 0x3Fu]);
64 result.push_back(i + 1 < len ? kBase64Chars[(triple >> 6) & 0x3Fu] :
'=');
65 result.push_back(i + 2 < len ? kBase64Chars[(triple ) & 0x3Fu] :
'=');
73inline uint8_t base64_val(
char c) {
74 if (c >=
'A' && c <=
'Z')
return static_cast<uint8_t
>(c -
'A');
75 if (c >=
'a' && c <=
'z')
return static_cast<uint8_t
>(c -
'a' + 26);
76 if (c >=
'0' && c <=
'9')
return static_cast<uint8_t
>(c -
'0' + 52);
77 if (c ==
'+')
return 62u;
78 if (c ==
'/')
return 63u;
79 if (c ==
'=')
return 64u;
85inline std::vector<uint8_t> base64_decode(std::string_view encoded) {
86 const size_t n = encoded.size();
87 if (n % 4 != 0)
return {};
89 std::vector<uint8_t> result;
90 result.reserve((n / 4) * 3);
92 for (
size_t i = 0; i < n; i += 4) {
93 uint8_t v0 = base64_val(encoded[i]);
94 uint8_t v1 = base64_val(encoded[i + 1]);
95 uint8_t v2 = base64_val(encoded[i + 2]);
96 uint8_t v3 = base64_val(encoded[i + 3]);
98 if (v0 == 255u || v1 == 255u || v2 == 255u || v3 == 255u)
return {};
100 uint32_t triple = (
static_cast<uint32_t
>(v0) << 18) |
101 (
static_cast<uint32_t
>(v1) << 12) |
102 (
static_cast<uint32_t
>(v2) << 6) |
103 (
static_cast<uint32_t
>(v3));
105 result.push_back(
static_cast<uint8_t
>((triple >> 16) & 0xFFu));
106 if (v2 != 64u) result.push_back(
static_cast<uint8_t
>((triple >> 8) & 0xFFu));
107 if (v3 != 64u) result.push_back(
static_cast<uint8_t
>( triple & 0xFFu));
131template <
typename T,
size_t Capacity>
133 static_assert((Capacity & (Capacity - 1)) == 0,
134 "SpscRingBuffer: Capacity must be a power of 2");
150 const size_t head = head_.load(std::memory_order_relaxed);
151 const size_t next = (head + 1) & kMask;
153 if (next == tail_.load(std::memory_order_acquire)) {
157 storage_[head] = std::move(item);
158 head_.store(next, std::memory_order_release);
163 size_t push(
const T* items,
size_t count) {
165 for (
size_t i = 0; i < count; ++i) {
166 if (!
push(items[i]))
break;
178 const size_t tail = tail_.load(std::memory_order_relaxed);
180 if (tail == head_.load(std::memory_order_acquire)) {
184 out = std::move(storage_[tail]);
185 tail_.store((tail + 1) & kMask, std::memory_order_release);
190 size_t pop(T* out,
size_t max_count) {
192 while (popped < max_count) {
193 const size_t tail = tail_.load(std::memory_order_relaxed);
194 if (tail == head_.load(std::memory_order_acquire))
break;
196 out[popped] = std::move(storage_[tail]);
197 tail_.store((tail + 1) & kMask, std::memory_order_release);
208 [[nodiscard]]
size_t size()
const {
209 const size_t head = head_.load(std::memory_order_acquire);
210 const size_t tail = tail_.load(std::memory_order_acquire);
211 return (head - tail) & kMask;
215 [[nodiscard]]
bool empty()
const {
return size() == 0; }
217 [[nodiscard]]
bool full()
const {
return size() == Capacity - 1; }
220 static constexpr size_t capacity() {
return Capacity; }
223 static constexpr size_t kMask = Capacity - 1;
225 alignas(64) std::atomic<size_t> head_;
226 alignas(64) std::atomic<size_t> tail_;
231 T storage_[Capacity];
250template <
typename T,
size_t Capacity>
252 static_assert((Capacity & (Capacity - 1)) == 0,
253 "MpscRingBuffer: Capacity must be a power of 2");
259 std::atomic<size_t> sequence{0};
263 static constexpr size_t kMask = Capacity - 1;
265 alignas(64) std::atomic<size_t> enqueue_pos_{0};
266 alignas(64)
size_t dequeue_pos_{0};
267 Slot slots_[Capacity];
272 for (
size_t i = 0; i < Capacity; ++i) {
273 slots_[i].sequence.store(i, std::memory_order_relaxed);
287 size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
289 Slot& slot = slots_[pos & kMask];
290 const size_t seq = slot.sequence.load(std::memory_order_acquire);
291 const auto diff =
static_cast<std::ptrdiff_t
>(seq)
292 -
static_cast<std::ptrdiff_t
>(pos);
296 if (enqueue_pos_.compare_exchange_weak(
298 std::memory_order_relaxed,
299 std::memory_order_relaxed)) {
301 slot.data = std::move(item);
302 slot.sequence.store(pos + 1, std::memory_order_release);
306 }
else if (diff < 0) {
311 pos = enqueue_pos_.load(std::memory_order_relaxed);
317 size_t push(
const T* items,
size_t count) {
319 for (
size_t i = 0; i < count; ++i) {
320 if (!
push(items[i]))
break;
332 const size_t pos = dequeue_pos_;
333 Slot& slot = slots_[pos & kMask];
334 const size_t seq = slot.sequence.load(std::memory_order_acquire);
337 if (seq != pos + 1) {
341 out = std::move(slot.data);
343 slot.sequence.store(pos + Capacity, std::memory_order_release);
344 dequeue_pos_ = pos + 1;
349 size_t pop(T* out,
size_t max_count) {
351 while (popped < max_count &&
pop(out[popped])) {
362 [[nodiscard]]
size_t size()
const {
363 const size_t eq = enqueue_pos_.load(std::memory_order_acquire);
364 return eq - dequeue_pos_;
367 [[nodiscard]]
bool empty()
const {
return size() == 0; }
369 [[nodiscard]]
bool full()
const {
return size() >= Capacity; }
372 static constexpr size_t capacity() {
return Capacity; }
442 for (
const auto& comp : p) {
445 "StreamingSink: output_dir must not contain '..' path traversal"};
450 std::filesystem::create_directories(opts.
output_dir, ec);
453 "StreamingSink: cannot create output_dir '" +
460 if (pfx.find(
'/') != std::string::npos ||
461 pfx.find(
'\\') != std::string::npos ||
462 pfx.find(
"..") != std::string::npos ||
463 pfx.find(
'\0') != std::string::npos)
465 "StreamingSink: file_prefix contains path separator or traversal"};
469 if (cap == 0 || (cap & (cap - 1)) != 0)
471 "StreamingSink: ring_buffer_capacity must be a power of 2"};
473 auto impl = std::make_unique<Impl>(std::move(opts));
474 const bool auto_start = impl->opts_.auto_start;
476 if (auto_start) sink.impl_->start();
490 if (impl_) (void)impl_->stop();
502 return impl_->submit(std::move(rec));
512 const uint8_t* data,
size_t size) {
516 rec.
payload.assign(
reinterpret_cast<const char*
>(data), size);
517 return impl_->submit(std::move(rec));
526 std::string_view sv) {
530 rec.
payload.assign(sv.data(), sv.size());
531 return impl_->submit(std::move(rec));
562 [[nodiscard]] uint64_t
records_submitted()
const {
return impl_->records_submitted_.load(std::memory_order_relaxed); }
564 [[nodiscard]] int64_t
records_written()
const {
return impl_->records_written_.load(std::memory_order_relaxed); }
566 [[nodiscard]] uint64_t
records_dropped()
const {
return impl_->records_dropped_.load(std::memory_order_relaxed); }
568 [[nodiscard]] int64_t
files_written()
const {
return impl_->files_written_.load(std::memory_order_relaxed); }
570 [[nodiscard]] int64_t
bytes_written()
const {
return impl_->bytes_written_.load(std::memory_order_relaxed); }
578 std::lock_guard<std::mutex> lk(impl_->files_mutex_);
579 return impl_->output_files_;
588 static constexpr size_t kRingCap = 65536;
592 explicit Impl(Options opts)
593 : opts_(std::move(opts))
594 , ring_(std::make_unique<RingBuffer>())
601 bool ring_push(StreamRecord rec) {
602 if (ring_->size() >= opts_.ring_buffer_capacity)
return false;
603 return ring_->push(std::move(rec));
607 void drain_ring_locked(std::vector<StreamRecord>& batch,
size_t max_count) {
609 while (batch.size() < max_count && ring_->pop(r))
610 batch.push_back(std::move(r));
614 [[nodiscard]] expected<void> submit(StreamRecord rec) {
615 std::lock_guard<std::mutex> lk(submit_mutex_);
616 if (!ring_push(std::move(rec))) {
617 records_dropped_.fetch_add(1, std::memory_order_relaxed);
620 records_submitted_.fetch_add(1, std::memory_order_relaxed);
628 if (!running_.compare_exchange_strong(exp,
true, std::memory_order_acq_rel))
630 stop_requested_.store(
false, std::memory_order_release);
631 worker_ = std::thread(&Impl::compaction_loop,
this);
634 [[nodiscard]] expected<void> stop() {
635 if (!running_.load(std::memory_order_acquire))
return {};
636 stop_requested_.store(
true, std::memory_order_release);
638 if (worker_.joinable()) worker_.join();
639 running_.store(
false, std::memory_order_release);
642 if (current_writer_) {
643 (void)current_writer_->close();
644 if (current_file_rows_ > 0) register_output_file(current_file_path_);
645 current_writer_.reset();
646 current_file_rows_ = 0;
652 stop_requested_.store(
true, std::memory_order_release);
660 [[nodiscard]] expected<void> flush() {
661 std::lock_guard<std::mutex> consumer_lk(consumer_mutex_);
662 std::vector<StreamRecord> batch;
663 drain_ring_locked(batch, (std::numeric_limits<size_t>::max)());
664 if (!batch.empty()) {
665 auto r = write_batch(batch);
669 if (current_writer_) {
670 auto r = current_writer_->close();
671 if (current_file_rows_ > 0) register_output_file(current_file_path_);
672 current_writer_.reset();
673 current_file_rows_ = 0;
674 if (!r)
return r.error();
676 return expected<void>{};
680 void compaction_loop() {
681 std::vector<StreamRecord> batch;
682 batch.reserve(opts_.row_group_size);
686 std::unique_lock<std::mutex> lk(cv_mutex_);
687 cv_.wait_for(lk, opts_.flush_interval, [
this] {
688 return stop_requested_.load(std::memory_order_acquire) ||
696 std::lock_guard<std::mutex> consumer_lk(consumer_mutex_);
698 drain_ring_locked(batch, opts_.row_group_size);
699 if (!batch.empty()) (void)write_batch(batch);
702 if (stop_requested_.load(std::memory_order_acquire)) {
703 std::lock_guard<std::mutex> consumer_lk(consumer_mutex_);
705 drain_ring_locked(batch, (std::numeric_limits<size_t>::max)());
706 if (!batch.empty()) (void)write_batch(batch);
708 if (current_writer_) {
709 (void)current_writer_->close();
710 if (current_file_rows_ > 0) register_output_file(current_file_path_);
711 current_writer_.reset();
712 current_file_rows_ = 0;
720 [[nodiscard]] expected<void> write_batch(std::vector<StreamRecord>& batch) {
721 if (batch.empty())
return expected<void>{};
725 .column<int32_t>(
"type_id")
730 while (written < batch.size()) {
731 const size_t remaining_in_file =
732 opts_.max_file_rows > current_file_rows_
733 ? opts_.max_file_rows - current_file_rows_ : 0;
735 if (remaining_in_file == 0) {
736 if (current_writer_) {
737 auto r = current_writer_->close();
738 current_writer_.reset();
739 if (!r)
return r.error();
741 current_file_rows_ = 0;
744 if (!current_writer_) {
745 std::string path = next_output_path();
747 wo.row_group_size =
static_cast<int64_t
>(opts_.row_group_size);
749 if (!wr)
return wr.error();
750 current_writer_ = std::make_unique<ParquetWriter>(std::move(*wr));
751 current_file_path_ = path;
754 const size_t chunk_size =
755 (std::min)(batch.size() - written,
756 opts_.max_file_rows - current_file_rows_);
758 std::vector<int64_t> ts_col;
759 std::vector<int32_t> type_col;
760 std::vector<std::string> payload_col;
761 ts_col.reserve(chunk_size);
762 type_col.reserve(chunk_size);
763 payload_col.reserve(chunk_size);
765 for (
size_t i = written; i < written + chunk_size; ++i) {
766 const auto& rec = batch[i];
767 ts_col.push_back(rec.timestamp_ns);
768 type_col.push_back(
static_cast<int32_t
>(rec.type_id));
769 payload_col.push_back(
770 detail::base64_encode(
771 reinterpret_cast<const uint8_t*
>(rec.payload.data()),
772 rec.payload.size()));
775 {
auto r = current_writer_->write_column<int64_t>(0, ts_col.data(), ts_col.size());
if (!r)
return r; }
776 {
auto r = current_writer_->write_column<int32_t>(1, type_col.data(), type_col.size());
if (!r)
return r; }
777 {
auto r = current_writer_->write_column(2, payload_col.data(), payload_col.size());
if (!r)
return r; }
778 {
auto r = current_writer_->flush_row_group();
if (!r)
return r; }
780 current_file_rows_ += chunk_size;
781 written += chunk_size;
785 auto fsz = std::filesystem::file_size(current_file_path_, ec);
786 if (!ec) bytes_written_.store(
static_cast<int64_t
>(fsz), std::memory_order_relaxed);
788 records_written_.fetch_add(
static_cast<int64_t
>(chunk_size), std::memory_order_relaxed);
790 if (current_file_rows_ >= opts_.max_file_rows) {
791 auto r = current_writer_->close();
792 register_output_file(current_file_path_);
793 current_writer_.reset();
794 current_file_rows_ = 0;
795 if (!r)
return r.error();
803 return expected<void>{};
807 std::string next_output_path() {
808 using namespace std::chrono;
809 const int64_t ts_ns = duration_cast<nanoseconds>(
810 system_clock::now().time_since_epoch()).count();
812 std::snprintf(buf,
sizeof(buf),
"%08zu_%lld",
813 file_index_++,
static_cast<long long>(ts_ns));
814 return opts_.output_dir +
"/" + opts_.file_prefix +
"_" + buf +
".parquet";
817 void register_output_file(
const std::string& path) {
818 std::lock_guard<std::mutex> lk(files_mutex_);
819 for (
const auto& p : output_files_) {
if (p == path)
return; }
820 output_files_.push_back(path);
821 files_written_.store(
static_cast<int64_t
>(output_files_.size()),
822 std::memory_order_relaxed);
827 std::unique_ptr<RingBuffer> ring_;
828 mutable std::mutex submit_mutex_;
829 std::mutex consumer_mutex_;
831 std::atomic<bool> running_{
false};
832 std::atomic<bool> stop_requested_{
false};
833 std::mutex cv_mutex_;
834 std::condition_variable cv_;
835 std::atomic<uint64_t> records_submitted_{0};
836 std::atomic<int64_t> records_written_{0};
837 std::atomic<uint64_t> records_dropped_{0};
838 std::atomic<int64_t> files_written_{0};
839 std::atomic<int64_t> bytes_written_{0};
840 std::unique_ptr<ParquetWriter> current_writer_;
841 std::string current_file_path_;
842 size_t current_file_rows_{0};
843 size_t file_index_{0};
844 mutable std::mutex files_mutex_;
845 std::vector<std::string> output_files_;
848 std::unique_ptr<Impl> impl_;
850 explicit StreamingSink(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
876 int64_t
end_ns = (std::numeric_limits<int64_t>::max)();
878 size_t max_rows = (std::numeric_limits<size_t>::max)();
924 : parquet_files_(sink.output_files())
944 : parquet_files_(std::move(parquet_files))
954 std::vector<StreamRecord> result;
957 static constexpr size_t kColTs = 0;
958 static constexpr size_t kColTypeId = 1;
959 static constexpr size_t kColPayload = 2;
961 for (
const auto& file_path : parquet_files_) {
962 if (result.size() >= opts.max_rows)
break;
966 if (!std::filesystem::exists(file_path, ec))
continue;
969 if (!reader_result) {
974 auto& reader = *reader_result;
976 const size_t num_rg =
static_cast<size_t>(reader.num_row_groups());
978 for (
size_t rg = 0; rg < num_rg; ++rg) {
979 if (result.size() >= opts.max_rows)
break;
982 auto ts_result = reader.read_column<int64_t>(rg, kColTs);
983 if (!ts_result)
continue;
985 auto type_result = reader.read_column<int32_t>(rg, kColTypeId);
986 if (!type_result)
continue;
988 auto payload_result = reader.read_column<std::string>(rg, kColPayload);
989 if (!payload_result)
continue;
991 const auto& ts_col = *ts_result;
992 const auto& type_col = *type_result;
993 const auto& payload_col = *payload_result;
995 const size_t nrows = (std::min)({ts_col.size(),
997 payload_col.size()});
999 for (
size_t i = 0; i < nrows; ++i) {
1000 if (result.size() >= opts.max_rows)
break;
1002 const int64_t ts = ts_col[i];
1003 const uint32_t tid =
static_cast<uint32_t
>(type_col[i]);
1006 if (ts < opts.start_ns || ts > opts.end_ns)
continue;
1009 if (opts.type_id != 0 && tid != opts.type_id)
continue;
1012 rec.timestamp_ns = ts;
1014 auto decoded = detail::base64_decode(payload_col[i]);
1015 rec.payload.assign(decoded.begin(), decoded.end());
1017 result.push_back(std::move(rec));
1033 qopts.
end_ns = filter_end_;
1043 [[nodiscard]]
size_t num_files()
const {
return parquet_files_.size(); }
1046 [[nodiscard]]
size_t num_live()
const {
return live_count_; }
1049 std::vector<std::string> parquet_files_;
1050 size_t live_count_ = 0;
1053 int64_t filter_start_ = 0;
1054 int64_t filter_end_ = (std::numeric_limits<int64_t>::max)();
1055 uint32_t filter_type_ = 0;
Reads StreamRecords across historical Parquet files and (optionally) a live StreamingSink ring buffer...
expected< std::vector< StreamRecord > > read(QueryOptions opts={}) const
Read all matching records from historical Parquet files.
HybridReader(const StreamingSink &sink)
Construct from a live StreamingSink: captures a snapshot of the ring buffer plus the list of historic...
static expected< HybridReader > create(Options opts)
Create a HybridReader with pre-configured filters.
size_t num_live() const
Number of live records visible in ring snapshot (0 unless flush() was called first).
expected< std::vector< StreamRecord > > read_all() const
Read all records applying the filters set at construction (via Options).
size_t num_files() const
Number of historical Parquet files available.
HybridReader(std::vector< std::string > parquet_files)
Construct from an explicit list of Parquet file paths (no live ring).
Multiple-producer single-consumer (MPSC) bounded ring buffer.
bool empty() const
True if the buffer appears empty (approximate).
MpscRingBuffer(const MpscRingBuffer &)=delete
size_t pop(T *out, size_t max_count)
Bulk pop up to max_count items. Returns number popped.
bool full() const
True if the buffer appears full (approximate).
size_t push(const T *items, size_t count)
Bulk push. Returns the number of items actually pushed.
static constexpr size_t capacity()
The compile-time capacity of this ring buffer.
size_t size() const
Approximate number of items in the buffer (may transiently exceed Capacity during push).
MpscRingBuffer()
Construct the ring buffer, initializing all slot sequences.
bool push(T item)
Push one item. Returns false immediately if the buffer is full.
bool pop(T &out)
Pop one item into out. Returns false if the buffer is empty.
MpscRingBuffer & operator=(const MpscRingBuffer &)=delete
static expected< ParquetReader > open(const std::filesystem::path &path)
Open and parse a Parquet file, returning a ready-to-query reader.
static expected< ParquetWriter > open(const std::filesystem::path &path, const Schema &schema, const Options &options=Options{})
Open a new Parquet file for writing.
SchemaBuilder & column(std::string col_name, LogicalType logical_type=LogicalType::NONE)
Add a typed column, deducing PhysicalType from T.
static SchemaBuilder builder(std::string name)
Create a SchemaBuilder for fluent column construction.
Lock-free single-producer single-consumer (SPSC) bounded ring buffer.
SpscRingBuffer(const SpscRingBuffer &)=delete
bool full() const
True if the buffer appears full (approximate, no lock).
SpscRingBuffer & operator=(const SpscRingBuffer &)=delete
bool push(T item)
Push a single item. Returns false if the buffer is full.
size_t push(const T *items, size_t count)
Bulk push up to count items. Returns the number actually pushed.
size_t size() const
Approximate number of items currently in the buffer.
bool pop(T &out)
Pop a single item into out. Returns false if the buffer is empty.
size_t pop(T *out, size_t max_count)
Bulk pop up to max_count items into out. Returns number popped.
static constexpr size_t capacity()
The compile-time capacity of this ring buffer.
SpscRingBuffer()
Construct an empty ring buffer.
bool empty() const
True if the buffer appears empty (approximate, no lock).
Background-thread Parquet compaction sink fed by a lock-free ring buffer.
expected< void > submit(int64_t timestamp_ns, uint32_t type_id, const uint8_t *data, size_t size)
Submit a record from raw bytes.
std::vector< std::string > output_files() const
List of completed Parquet output file paths (thread-safe snapshot).
expected< void > flush()
Drain the ring buffer and write all pending records, then close the current Parquet file so it has a ...
uint64_t records_dropped() const
Total number of records dropped due to ring buffer overflow.
StreamingSink(const StreamingSink &)=delete
int64_t records_written() const
Total number of records written to Parquet files.
StreamingSink & operator=(const StreamingSink &)=delete
static expected< StreamingSink > create(Options opts)
Create a StreamingSink with the given options.
uint64_t records_submitted() const
Total number of records successfully submitted to the ring buffer.
expected< void > submit(int64_t timestamp_ns, uint32_t type_id, std::string_view sv)
Submit a record from a string_view payload.
expected< void > stop()
Stop the background thread, drain remaining records, and close open files.
expected< void > submit(StreamRecord rec)
Submit a fully-constructed StreamRecord to the ring buffer.
StreamingSink(StreamingSink &&)=default
void stop_nowait()
Signal the background thread to stop without waiting for it to finish.
StreamingSink & operator=(StreamingSink &&)=default
void start()
Start the background compaction thread (no-op if already running).
int64_t files_written() const
Number of completed Parquet output files.
int64_t bytes_written() const
Approximate total bytes written to the current output file.
A lightweight result type that holds either a success value of type T or an Error.
@ TIMESTAMP_NS
Timestamp — INT64, nanoseconds since Unix epoch.
@ STRING
UTF-8 string (stored as BYTE_ARRAY).
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ INTERNAL_ERROR
An unexpected internal error that does not fit any other category.
Schema definition types: Column<T>, SchemaBuilder, and Schema.
Lightweight error value carrying an ErrorCode and a human-readable message.
Per-query filter options passed to HybridReader::read().
uint32_t type_id
Type ID filter (0 = all types)
int64_t end_ns
Maximum timestamp_ns (inclusive)
size_t max_rows
Maximum records to return.
int64_t start_ns
Minimum timestamp_ns (inclusive)
Options for constructing a HybridReader via HybridReader::create().
uint32_t type_id_filter
Type ID filter (0 = accept all types)
std::vector< std::string > parquet_files
Parquet files to query.
int64_t end_timestamp
Maximum timestamp_ns (inclusive)
int64_t start_timestamp
Minimum timestamp_ns (inclusive)
A single record flowing through the StreamingSink pipeline.
int64_t timestamp_ns
Wall-clock timestamp in nanoseconds since Unix epoch.
uint32_t type_id
User-defined record type tag (0 = untyped)
std::string payload
Serialized record bytes (UTF-8 safe or binary via base64)
Configuration options for StreamingSink::create().
std::string output_dir
Directory for output Parquet files (required)
bool auto_start
Start the background thread immediately on create()
size_t max_file_rows
Maximum rows per output Parquet file before rolling.
std::chrono::milliseconds flush_interval
Background thread wake-up interval.
size_t ring_buffer_capacity
Soft cap on ring buffer occupancy (must be power of 2)
std::string file_prefix
Filename prefix for output files.
size_t row_group_size
Records per Parquet row group.
Parquet format enumerations, type traits, and statistics structs.
Write-Ahead Log (WAL) with sub-millisecond append, CRC-32 integrity, crash recovery,...