23#include <condition_variable>
47inline constexpr char kBase64Chars[] =
48 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
51inline std::string base64_encode(
const uint8_t* data,
size_t len) {
53 result.reserve(((len + 2) / 3) * 4);
55 for (
size_t i = 0; i < len; i += 3) {
56 uint32_t octet_a = i < len ? data[i] : 0u;
57 uint32_t octet_b = i + 1 < len ? data[i + 1] : 0u;
58 uint32_t octet_c = i + 2 < len ? data[i + 2] : 0u;
60 uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c;
62 result.push_back(kBase64Chars[(triple >> 18) & 0x3Fu]);
63 result.push_back(kBase64Chars[(triple >> 12) & 0x3Fu]);
64 result.push_back(i + 1 < len ? kBase64Chars[(triple >> 6) & 0x3Fu] :
'=');
65 result.push_back(i + 2 < len ? kBase64Chars[(triple ) & 0x3Fu] :
'=');
73inline uint8_t base64_val(
char c) {
74 if (c >=
'A' && c <=
'Z')
return static_cast<uint8_t
>(c -
'A');
75 if (c >=
'a' && c <=
'z')
return static_cast<uint8_t
>(c -
'a' + 26);
76 if (c >=
'0' && c <=
'9')
return static_cast<uint8_t
>(c -
'0' + 52);
77 if (c ==
'+')
return 62u;
78 if (c ==
'/')
return 63u;
79 if (c ==
'=')
return 64u;
85inline std::vector<uint8_t> base64_decode(std::string_view encoded) {
86 const size_t n = encoded.size();
87 if (n % 4 != 0)
return {};
89 std::vector<uint8_t> result;
90 result.reserve((n / 4) * 3);
92 for (
size_t i = 0; i < n; i += 4) {
93 uint8_t v0 = base64_val(encoded[i]);
94 uint8_t v1 = base64_val(encoded[i + 1]);
95 uint8_t v2 = base64_val(encoded[i + 2]);
96 uint8_t v3 = base64_val(encoded[i + 3]);
98 if (v0 == 255u || v1 == 255u || v2 == 255u || v3 == 255u)
return {};
100 uint32_t triple = (
static_cast<uint32_t
>(v0) << 18) |
101 (
static_cast<uint32_t
>(v1) << 12) |
102 (
static_cast<uint32_t
>(v2) << 6) |
103 (
static_cast<uint32_t
>(v3));
105 result.push_back(
static_cast<uint8_t
>((triple >> 16) & 0xFFu));
106 if (v2 != 64u) result.push_back(
static_cast<uint8_t
>((triple >> 8) & 0xFFu));
107 if (v3 != 64u) result.push_back(
static_cast<uint8_t
>( triple & 0xFFu));
131template <
typename T,
size_t Capacity>
133 static_assert((Capacity & (Capacity - 1)) == 0,
134 "SpscRingBuffer: Capacity must be a power of 2");
150 const size_t head = head_.load(std::memory_order_relaxed);
151 const size_t next = (head + 1) & kMask;
153 if (next == tail_.load(std::memory_order_acquire)) {
157 storage_[head] = std::move(item);
158 head_.store(next, std::memory_order_release);
163 size_t push(
const T* items,
size_t count) {
165 for (
size_t i = 0; i < count; ++i) {
166 if (!
push(items[i]))
break;
178 const size_t tail = tail_.load(std::memory_order_relaxed);
180 if (tail == head_.load(std::memory_order_acquire)) {
184 out = std::move(storage_[tail]);
185 tail_.store((tail + 1) & kMask, std::memory_order_release);
190 size_t pop(T* out,
size_t max_count) {
192 while (popped < max_count) {
193 const size_t tail = tail_.load(std::memory_order_relaxed);
194 if (tail == head_.load(std::memory_order_acquire))
break;
196 out[popped] = std::move(storage_[tail]);
197 tail_.store((tail + 1) & kMask, std::memory_order_release);
208 [[nodiscard]]
size_t size()
const {
209 const size_t head = head_.load(std::memory_order_acquire);
210 const size_t tail = tail_.load(std::memory_order_acquire);
211 return (head - tail) & kMask;
215 [[nodiscard]]
bool empty()
const {
return size() == 0; }
217 [[nodiscard]]
bool full()
const {
return size() == Capacity - 1; }
220 static constexpr size_t capacity() {
return Capacity; }
223 static constexpr size_t kMask = Capacity - 1;
225 alignas(64) std::atomic<size_t> head_;
226 alignas(64) std::atomic<size_t> tail_;
231 T storage_[Capacity];
250template <
typename T,
size_t Capacity>
252 static_assert((Capacity & (Capacity - 1)) == 0,
253 "MpscRingBuffer: Capacity must be a power of 2");
259 std::atomic<size_t> sequence{0};
263 static constexpr size_t kMask = Capacity - 1;
265 alignas(64) std::atomic<size_t> enqueue_pos_{0};
266 alignas(64)
size_t dequeue_pos_{0};
267 Slot slots_[Capacity];
272 for (
size_t i = 0; i < Capacity; ++i) {
273 slots_[i].sequence.store(i, std::memory_order_relaxed);
287 size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
289 Slot& slot = slots_[pos & kMask];
290 const size_t seq = slot.sequence.load(std::memory_order_acquire);
291 const auto diff =
static_cast<std::ptrdiff_t
>(seq)
292 -
static_cast<std::ptrdiff_t
>(pos);
296 if (enqueue_pos_.compare_exchange_weak(
298 std::memory_order_relaxed,
299 std::memory_order_relaxed)) {
301 slot.data = std::move(item);
302 slot.sequence.store(pos + 1, std::memory_order_release);
306 }
else if (diff < 0) {
311 pos = enqueue_pos_.load(std::memory_order_relaxed);
317 size_t push(
const T* items,
size_t count) {
319 for (
size_t i = 0; i < count; ++i) {
320 if (!
push(items[i]))
break;
332 const size_t pos = dequeue_pos_;
333 Slot& slot = slots_[pos & kMask];
334 const size_t seq = slot.sequence.load(std::memory_order_acquire);
337 if (seq != pos + 1) {
341 out = std::move(slot.data);
343 slot.sequence.store(pos + Capacity, std::memory_order_release);
344 dequeue_pos_ = pos + 1;
349 size_t pop(T* out,
size_t max_count) {
351 while (popped < max_count &&
pop(out[popped])) {
362 [[nodiscard]]
size_t size()
const {
363 const size_t eq = enqueue_pos_.load(std::memory_order_acquire);
364 return eq - dequeue_pos_;
367 [[nodiscard]]
bool empty()
const {
return size() == 0; }
369 [[nodiscard]]
bool full()
const {
return size() >= Capacity; }
372 static constexpr size_t capacity() {
return Capacity; }
442 for (
const auto& comp : p) {
445 "StreamingSink: output_dir must not contain '..' path traversal"};
450 std::filesystem::create_directories(opts.
output_dir, ec);
453 "StreamingSink: cannot create output_dir '" +
457 if (cap == 0 || (cap & (cap - 1)) != 0)
459 "StreamingSink: ring_buffer_capacity must be a power of 2"};
461 auto impl = std::make_unique<Impl>(std::move(opts));
462 const bool auto_start = impl->opts_.auto_start;
464 if (auto_start) sink.impl_->start();
465 return std::move(sink);
478 if (impl_) (void)impl_->stop();
490 return impl_->submit(std::move(rec));
500 const uint8_t* data,
size_t size) {
504 rec.
payload.assign(
reinterpret_cast<const char*
>(data), size);
505 return impl_->submit(std::move(rec));
514 std::string_view sv) {
518 rec.
payload.assign(sv.data(), sv.size());
519 return impl_->submit(std::move(rec));
550 [[nodiscard]] uint64_t
records_submitted()
const {
return impl_->records_submitted_.load(std::memory_order_relaxed); }
552 [[nodiscard]] int64_t
records_written()
const {
return impl_->records_written_.load(std::memory_order_relaxed); }
554 [[nodiscard]] uint64_t
records_dropped()
const {
return impl_->records_dropped_.load(std::memory_order_relaxed); }
556 [[nodiscard]] int64_t
files_written()
const {
return impl_->files_written_.load(std::memory_order_relaxed); }
558 [[nodiscard]] int64_t
bytes_written()
const {
return impl_->bytes_written_.load(std::memory_order_relaxed); }
566 std::lock_guard<std::mutex> lk(impl_->files_mutex_);
567 return impl_->output_files_;
576 static constexpr size_t kRingCap = 65536;
580 explicit Impl(Options opts)
581 : opts_(std::move(opts))
582 , ring_(std::make_unique<RingBuffer>())
589 bool ring_push(StreamRecord rec) {
590 if (ring_->size() >= opts_.ring_buffer_capacity)
return false;
591 return ring_->push(std::move(rec));
595 void drain_ring_locked(std::vector<StreamRecord>& batch,
size_t max_count) {
597 while (batch.size() < max_count && ring_->pop(r))
598 batch.push_back(std::move(r));
602 [[nodiscard]] expected<void> submit(StreamRecord rec) {
603 std::lock_guard<std::mutex> lk(submit_mutex_);
604 if (!ring_push(std::move(rec))) {
605 records_dropped_.fetch_add(1, std::memory_order_relaxed);
608 records_submitted_.fetch_add(1, std::memory_order_relaxed);
616 if (!running_.compare_exchange_strong(exp,
true, std::memory_order_acq_rel))
618 stop_requested_.store(
false, std::memory_order_release);
619 worker_ = std::thread(&Impl::compaction_loop,
this);
622 [[nodiscard]] expected<void> stop() {
623 if (!running_.load(std::memory_order_acquire))
return {};
624 stop_requested_.store(
true, std::memory_order_release);
626 if (worker_.joinable()) worker_.join();
627 running_.store(
false, std::memory_order_release);
630 if (current_writer_) {
631 (void)current_writer_->close();
632 if (current_file_rows_ > 0) register_output_file(current_file_path_);
633 current_writer_.reset();
634 current_file_rows_ = 0;
640 stop_requested_.store(
true, std::memory_order_release);
648 [[nodiscard]] expected<void> flush() {
649 std::lock_guard<std::mutex> consumer_lk(consumer_mutex_);
650 std::vector<StreamRecord> batch;
651 drain_ring_locked(batch, (std::numeric_limits<size_t>::max)());
652 if (!batch.empty()) {
653 auto r = write_batch(batch);
657 if (current_writer_) {
658 auto r = current_writer_->close();
659 if (current_file_rows_ > 0) register_output_file(current_file_path_);
660 current_writer_.reset();
661 current_file_rows_ = 0;
662 if (!r)
return r.error();
664 return expected<void>{};
668 void compaction_loop() {
669 std::vector<StreamRecord> batch;
670 batch.reserve(opts_.row_group_size);
674 std::unique_lock<std::mutex> lk(cv_mutex_);
675 cv_.wait_for(lk, opts_.flush_interval, [
this] {
676 return stop_requested_.load(std::memory_order_acquire) ||
684 std::lock_guard<std::mutex> consumer_lk(consumer_mutex_);
686 drain_ring_locked(batch, opts_.row_group_size);
687 if (!batch.empty()) (void)write_batch(batch);
690 if (stop_requested_.load(std::memory_order_acquire)) {
691 std::lock_guard<std::mutex> consumer_lk(consumer_mutex_);
693 drain_ring_locked(batch, (std::numeric_limits<size_t>::max)());
694 if (!batch.empty()) (void)write_batch(batch);
696 if (current_writer_) {
697 (void)current_writer_->close();
698 if (current_file_rows_ > 0) register_output_file(current_file_path_);
699 current_writer_.reset();
700 current_file_rows_ = 0;
708 [[nodiscard]] expected<void> write_batch(std::vector<StreamRecord>& batch) {
709 if (batch.empty())
return expected<void>{};
713 .column<int32_t>(
"type_id")
718 while (written < batch.size()) {
719 const size_t remaining_in_file =
720 opts_.max_file_rows > current_file_rows_
721 ? opts_.max_file_rows - current_file_rows_ : 0;
723 if (remaining_in_file == 0) {
724 if (current_writer_) {
725 auto r = current_writer_->close();
726 current_writer_.reset();
727 if (!r)
return r.error();
729 current_file_rows_ = 0;
732 if (!current_writer_) {
733 std::string path = next_output_path();
735 wo.row_group_size =
static_cast<int64_t
>(opts_.row_group_size);
737 if (!wr)
return wr.error();
738 current_writer_ = std::make_unique<ParquetWriter>(std::move(*wr));
739 current_file_path_ = path;
742 const size_t chunk_size =
743 (std::min)(batch.size() - written,
744 opts_.max_file_rows - current_file_rows_);
746 std::vector<int64_t> ts_col;
747 std::vector<int32_t> type_col;
748 std::vector<std::string> payload_col;
749 ts_col.reserve(chunk_size);
750 type_col.reserve(chunk_size);
751 payload_col.reserve(chunk_size);
753 for (
size_t i = written; i < written + chunk_size; ++i) {
754 const auto& rec = batch[i];
755 ts_col.push_back(rec.timestamp_ns);
756 type_col.push_back(
static_cast<int32_t
>(rec.type_id));
757 payload_col.push_back(
758 detail::base64_encode(
759 reinterpret_cast<const uint8_t*
>(rec.payload.data()),
760 rec.payload.size()));
763 {
auto r = current_writer_->write_column<int64_t>(0, ts_col.data(), ts_col.size());
if (!r)
return r; }
764 {
auto r = current_writer_->write_column<int32_t>(1, type_col.data(), type_col.size());
if (!r)
return r; }
765 {
auto r = current_writer_->write_column(2, payload_col.data(), payload_col.size());
if (!r)
return r; }
766 {
auto r = current_writer_->flush_row_group();
if (!r)
return r; }
768 current_file_rows_ += chunk_size;
769 written += chunk_size;
773 auto fsz = std::filesystem::file_size(current_file_path_, ec);
774 if (!ec) bytes_written_.store(
static_cast<int64_t
>(fsz), std::memory_order_relaxed);
776 records_written_.fetch_add(
static_cast<int64_t
>(chunk_size), std::memory_order_relaxed);
778 if (current_file_rows_ >= opts_.max_file_rows) {
779 auto r = current_writer_->close();
780 register_output_file(current_file_path_);
781 current_writer_.reset();
782 current_file_rows_ = 0;
783 if (!r)
return r.error();
791 return expected<void>{};
795 std::string next_output_path() {
796 using namespace std::chrono;
797 const int64_t ts_ns = duration_cast<nanoseconds>(
798 system_clock::now().time_since_epoch()).count();
800 std::snprintf(buf,
sizeof(buf),
"%08zu_%lld",
801 file_index_++,
static_cast<long long>(ts_ns));
802 return opts_.output_dir +
"/" + opts_.file_prefix +
"_" + buf +
".parquet";
805 void register_output_file(
const std::string& path) {
806 std::lock_guard<std::mutex> lk(files_mutex_);
807 for (
const auto& p : output_files_) {
if (p == path)
return; }
808 output_files_.push_back(path);
809 files_written_.store(
static_cast<int64_t
>(output_files_.size()),
810 std::memory_order_relaxed);
815 std::unique_ptr<RingBuffer> ring_;
816 mutable std::mutex submit_mutex_;
817 std::mutex consumer_mutex_;
819 std::atomic<bool> running_{
false};
820 std::atomic<bool> stop_requested_{
false};
821 std::mutex cv_mutex_;
822 std::condition_variable cv_;
823 std::atomic<uint64_t> records_submitted_{0};
824 std::atomic<int64_t> records_written_{0};
825 std::atomic<uint64_t> records_dropped_{0};
826 std::atomic<int64_t> files_written_{0};
827 std::atomic<int64_t> bytes_written_{0};
828 std::unique_ptr<ParquetWriter> current_writer_;
829 std::string current_file_path_;
830 size_t current_file_rows_{0};
831 size_t file_index_{0};
832 mutable std::mutex files_mutex_;
833 std::vector<std::string> output_files_;
836 std::unique_ptr<Impl> impl_;
838 explicit StreamingSink(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
864 int64_t
end_ns = (std::numeric_limits<int64_t>::max)();
866 size_t max_rows = (std::numeric_limits<size_t>::max)();
912 : parquet_files_(sink.output_files())
932 : parquet_files_(std::move(parquet_files))
942 std::vector<StreamRecord> result;
945 static constexpr size_t kColTs = 0;
946 static constexpr size_t kColTypeId = 1;
947 static constexpr size_t kColPayload = 2;
949 for (
const auto& file_path : parquet_files_) {
950 if (result.size() >= opts.max_rows)
break;
954 if (!std::filesystem::exists(file_path, ec))
continue;
957 if (!reader_result) {
962 auto& reader = *reader_result;
964 const size_t num_rg =
static_cast<size_t>(reader.num_row_groups());
966 for (
size_t rg = 0; rg < num_rg; ++rg) {
967 if (result.size() >= opts.max_rows)
break;
970 auto ts_result = reader.read_column<int64_t>(rg, kColTs);
971 if (!ts_result)
continue;
973 auto type_result = reader.read_column<int32_t>(rg, kColTypeId);
974 if (!type_result)
continue;
976 auto payload_result = reader.read_column<std::string>(rg, kColPayload);
977 if (!payload_result)
continue;
979 const auto& ts_col = *ts_result;
980 const auto& type_col = *type_result;
981 const auto& payload_col = *payload_result;
983 const size_t nrows = (std::min)({ts_col.size(),
985 payload_col.size()});
987 for (
size_t i = 0; i < nrows; ++i) {
988 if (result.size() >= opts.max_rows)
break;
990 const int64_t ts = ts_col[i];
991 const uint32_t tid =
static_cast<uint32_t
>(type_col[i]);
994 if (ts < opts.start_ns || ts > opts.end_ns)
continue;
997 if (opts.type_id != 0 && tid != opts.type_id)
continue;
1000 rec.timestamp_ns = ts;
1002 auto decoded = detail::base64_decode(payload_col[i]);
1003 rec.payload.assign(decoded.begin(), decoded.end());
1005 result.push_back(std::move(rec));
1021 qopts.
end_ns = filter_end_;
1031 [[nodiscard]]
size_t num_files()
const {
return parquet_files_.size(); }
1034 [[nodiscard]]
size_t num_live()
const {
return live_count_; }
1037 std::vector<std::string> parquet_files_;
1038 size_t live_count_ = 0;
1041 int64_t filter_start_ = 0;
1042 int64_t filter_end_ = (std::numeric_limits<int64_t>::max)();
1043 uint32_t filter_type_ = 0;
Reads StreamRecords across historical Parquet files and (optionally) a live StreamingSink ring buffer...
expected< std::vector< StreamRecord > > read(QueryOptions opts={}) const
Read all matching records from historical Parquet files.
HybridReader(const StreamingSink &sink)
Construct from a live StreamingSink: captures a snapshot of the ring buffer plus the list of historic...
static expected< HybridReader > create(Options opts)
Create a HybridReader with pre-configured filters.
size_t num_live() const
Number of live records visible in ring snapshot (0 unless flush() was called first).
expected< std::vector< StreamRecord > > read_all() const
Read all records applying the filters set at construction (via Options).
size_t num_files() const
Number of historical Parquet files available.
HybridReader(std::vector< std::string > parquet_files)
Construct from an explicit list of Parquet file paths (no live ring).
Multiple-producer single-consumer (MPSC) bounded ring buffer.
bool empty() const
True if the buffer appears empty (approximate).
MpscRingBuffer(const MpscRingBuffer &)=delete
size_t pop(T *out, size_t max_count)
Bulk pop up to max_count items. Returns number popped.
bool full() const
True if the buffer appears full (approximate).
size_t push(const T *items, size_t count)
Bulk push. Returns the number of items actually pushed.
static constexpr size_t capacity()
The compile-time capacity of this ring buffer.
size_t size() const
Approximate number of items in the buffer (may transiently exceed Capacity during push).
MpscRingBuffer()
Construct the ring buffer, initializing all slot sequences.
bool push(T item)
Push one item. Returns false immediately if the buffer is full.
bool pop(T &out)
Pop one item into out. Returns false if the buffer is empty.
MpscRingBuffer & operator=(const MpscRingBuffer &)=delete
static expected< ParquetReader > open(const std::filesystem::path &path)
Open and parse a Parquet file, returning a ready-to-query reader.
static expected< ParquetWriter > open(const std::filesystem::path &path, const Schema &schema, const Options &options=Options{})
Open a new Parquet file for writing.
SchemaBuilder & column(std::string col_name, LogicalType logical_type=LogicalType::NONE)
Add a typed column, deducing PhysicalType from T.
static SchemaBuilder builder(std::string name)
Create a SchemaBuilder for fluent column construction.
Lock-free single-producer single-consumer (SPSC) bounded ring buffer.
SpscRingBuffer(const SpscRingBuffer &)=delete
bool full() const
True if the buffer appears full (approximate, no lock).
SpscRingBuffer & operator=(const SpscRingBuffer &)=delete
bool push(T item)
Push a single item. Returns false if the buffer is full.
size_t push(const T *items, size_t count)
Bulk push up to count items. Returns the number actually pushed.
size_t size() const
Approximate number of items currently in the buffer.
bool pop(T &out)
Pop a single item into out. Returns false if the buffer is empty.
size_t pop(T *out, size_t max_count)
Bulk pop up to max_count items into out. Returns number popped.
static constexpr size_t capacity()
The compile-time capacity of this ring buffer.
SpscRingBuffer()
Construct an empty ring buffer.
bool empty() const
True if the buffer appears empty (approximate, no lock).
Background-thread Parquet compaction sink fed by a lock-free ring buffer.
expected< void > submit(int64_t timestamp_ns, uint32_t type_id, const uint8_t *data, size_t size)
Submit a record from raw bytes.
std::vector< std::string > output_files() const
List of completed Parquet output file paths (thread-safe snapshot).
expected< void > flush()
Drain the ring buffer and write all pending records, then close the current Parquet file so it has a ...
uint64_t records_dropped() const
Total number of records dropped due to ring buffer overflow.
StreamingSink(const StreamingSink &)=delete
int64_t records_written() const
Total number of records written to Parquet files.
StreamingSink & operator=(const StreamingSink &)=delete
static expected< StreamingSink > create(Options opts)
Create a StreamingSink with the given options.
uint64_t records_submitted() const
Total number of records successfully submitted to the ring buffer.
expected< void > submit(int64_t timestamp_ns, uint32_t type_id, std::string_view sv)
Submit a record from a string_view payload.
expected< void > stop()
Stop the background thread, drain remaining records, and close open files.
expected< void > submit(StreamRecord rec)
Submit a fully-constructed StreamRecord to the ring buffer.
StreamingSink(StreamingSink &&)=default
void stop_nowait()
Signal the background thread to stop without waiting for it to finish.
StreamingSink & operator=(StreamingSink &&)=default
void start()
Start the background compaction thread (no-op if already running).
int64_t files_written() const
Number of completed Parquet output files.
int64_t bytes_written() const
Approximate total bytes written to the current output file.
A lightweight result type that holds either a success value of type T or an Error.
@ TIMESTAMP_NS
Timestamp — INT64, nanoseconds since Unix epoch.
@ STRING
UTF-8 string (stored as BYTE_ARRAY).
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ INTERNAL_ERROR
An unexpected internal error that does not fit any other category.
Schema definition types: Column<T>, SchemaBuilder, and Schema.
Lightweight error value carrying an ErrorCode and a human-readable message.
Per-query filter options passed to HybridReader::read().
uint32_t type_id
Type ID filter (0 = all types)
int64_t end_ns
Maximum timestamp_ns (inclusive)
size_t max_rows
Maximum records to return.
int64_t start_ns
Minimum timestamp_ns (inclusive)
Options for constructing a HybridReader via HybridReader::create().
uint32_t type_id_filter
Type ID filter (0 = accept all types)
std::vector< std::string > parquet_files
Parquet files to query.
int64_t end_timestamp
Maximum timestamp_ns (inclusive)
int64_t start_timestamp
Minimum timestamp_ns (inclusive)
A single record flowing through the StreamingSink pipeline.
int64_t timestamp_ns
Wall-clock timestamp in nanoseconds since Unix epoch.
uint32_t type_id
User-defined record type tag (0 = untyped)
std::string payload
Serialized record bytes (UTF-8 safe or binary via base64)
Configuration options for StreamingSink::create().
std::string output_dir
Directory for output Parquet files (required)
bool auto_start
Start the background thread immediately on create()
size_t max_file_rows
Maximum rows per output Parquet file before rolling.
std::chrono::milliseconds flush_interval
Background thread wake-up interval.
size_t ring_buffer_capacity
Soft cap on ring buffer occupancy (must be power of 2)
std::string file_prefix
Filename prefix for output files.
size_t row_group_size
Records per Parquet row group.
Parquet format enumerations, type traits, and statistics structs.
Write-Ahead Log (WAL) with sub-millisecond append, CRC-32 integrity, crash recovery,...