Signet Forge 0.1.1
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
writer.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
3#pragma once
4
5// ---------------------------------------------------------------------------
6// writer.hpp — Streaming Parquet file writer
7//
8// Header-only. Writes valid Parquet files with configurable encoding and
9// compression. Supports PLAIN, DELTA_BINARY_PACKED, BYTE_STREAM_SPLIT,
10// RLE_DICTIONARY, and RLE encodings, plus SNAPPY/ZSTD/LZ4 compression.
11// Supports row-based (string vector) and column-based (typed batch) APIs.
12// Includes a standalone CSV-to-Parquet converter.
13// ---------------------------------------------------------------------------
14
15#include "signet/types.hpp"
16#include "signet/error.hpp"
17#include "signet/schema.hpp"
18#include "signet/statistics.hpp"
30
31#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
32#include "signet/crypto/pme.hpp"
33#endif
34
35#include <algorithm>
36#include <array>
37#include <charconv>
38#include <cstdlib>
39#include <cstring>
40#include <filesystem>
41#include <fstream>
42#include <memory>
43#include <mutex>
44#include <optional>
45#include <string>
46#include <unordered_map>
47#include <unordered_set>
48#include <vector>
49
50// ---------------------------------------------------------------------------
51// Floating-point from_chars availability detection
52//
53// Apple Clang's libc++ (Xcode ≤ 16) does not implement std::from_chars for
54// float/double. GCC ≥ 11 (libstdc++) and MSVC ≥ 19.24 do.
55//
56// On Apple we fall back to strtod_l / strtof_l with an explicit "C" locale
57// to preserve locale-independence — critical for MiFID II financial data
58// where decimal separators must always be '.' regardless of LC_NUMERIC.
59// ---------------------------------------------------------------------------
60#if (defined(__GNUC__) && !defined(__clang__)) || defined(_MSC_VER)
61# define SIGNET_HAS_FLOAT_FROM_CHARS 1
62#else
63# define SIGNET_HAS_FLOAT_FROM_CHARS 0
64# if defined(__APPLE__)
65# include <xlocale.h> // strtod_l, strtof_l, newlocale on macOS
66# elif defined(__linux__) || defined(__FreeBSD__)
67# include <locale.h> // strtod_l, strtof_l, newlocale on Linux/BSD
68# endif
69#endif
70
71namespace signet::forge {
72
73// ---------------------------------------------------------------------------
74// detail::parse_double / detail::parse_float
75//
76// Locale-independent floating-point parsing for CSV-to-Parquet conversion.
77// Uses std::from_chars where available (GCC, MSVC); falls back to
78// strtod_l/strtof_l with C locale on platforms without float from_chars
79// (Apple Clang). Never uses bare strtod — that is locale-dependent and
80// would silently corrupt financial data in EU locales (CWE-1286).
81// ---------------------------------------------------------------------------
82namespace detail {
83
84inline double parse_double(std::string_view sv) noexcept {
85 double value = 0.0;
86#if SIGNET_HAS_FLOAT_FROM_CHARS
87 std::from_chars(sv.data(), sv.data() + sv.size(), value);
88#elif defined(__APPLE__) || defined(__linux__) || defined(__FreeBSD__)
89 // Thread-safe: newlocale returns a per-call handle; static ensures
90 // one-time creation (singleton, never freed — intentional).
91 static locale_t c_locale = newlocale(LC_ALL_MASK, "C", (locale_t)0);
92 std::string tmp(sv);
93 value = strtod_l(tmp.c_str(), nullptr, c_locale);
94#else
95 // Last resort for exotic platforms — document locale dependency risk
96 std::string tmp(sv);
97 value = std::strtod(tmp.c_str(), nullptr);
98#endif
99 return value;
100}
101
102inline float parse_float(std::string_view sv) noexcept {
103 float value = 0.0f;
104#if SIGNET_HAS_FLOAT_FROM_CHARS
105 std::from_chars(sv.data(), sv.data() + sv.size(), value);
106#elif defined(__APPLE__) || defined(__linux__) || defined(__FreeBSD__)
107 static locale_t c_locale = newlocale(LC_ALL_MASK, "C", (locale_t)0);
108 std::string tmp(sv);
109 value = strtof_l(tmp.c_str(), nullptr, c_locale);
110#else
111 std::string tmp(sv);
112 value = std::strtof(tmp.c_str(), nullptr);
113#endif
114 return value;
115}
116
119inline bool try_parse_double(std::string_view sv, double& out) noexcept {
120#if SIGNET_HAS_FLOAT_FROM_CHARS
121 auto [p, ec] = std::from_chars(sv.data(), sv.data() + sv.size(), out);
122 return ec == std::errc{} && p == sv.data() + sv.size();
123#elif defined(__APPLE__) || defined(__linux__) || defined(__FreeBSD__)
124 static locale_t c_locale = newlocale(LC_ALL_MASK, "C", (locale_t)0);
125 std::string tmp(sv);
126 char* end = nullptr;
127 out = strtod_l(tmp.c_str(), &end, c_locale);
128 return end == tmp.c_str() + tmp.size();
129#else
130 std::string tmp(sv);
131 char* end = nullptr;
132 out = std::strtod(tmp.c_str(), &end);
133 return end == tmp.c_str() + tmp.size();
134#endif
135}
136
137} // namespace detail
138
139namespace detail::writer {
140
142inline uint32_t page_crc32(const uint8_t* data, size_t length) noexcept {
143 static constexpr auto make_table = []() {
144 std::array<uint32_t, 256> t{};
145 for (uint32_t i = 0; i < 256; ++i) {
146 uint32_t c = i;
147 for (int k = 0; k < 8; ++k)
148 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
149 t[i] = c;
150 }
151 return t;
152 };
153 static constexpr auto table = make_table();
154 uint32_t crc = 0xFFFFFFFFu;
155 for (size_t i = 0; i < length; ++i)
156 crc = table[(crc ^ data[i]) & 0xFFu] ^ (crc >> 8);
157 return crc ^ 0xFFFFFFFFu;
158}
159
160inline constexpr uint8_t kEncryptedPageHeaderMagic[4] = {'S', 'P', 'H', '1'};
161
162[[nodiscard]] inline std::vector<uint8_t> wrap_encrypted_page_header(
163 const std::vector<uint8_t>& encrypted_header) {
164 std::vector<uint8_t> out;
165 out.reserve(8 + encrypted_header.size());
166 out.insert(out.end(), std::begin(kEncryptedPageHeaderMagic),
167 std::end(kEncryptedPageHeaderMagic));
168 const uint32_t size = static_cast<uint32_t>(encrypted_header.size());
169 out.push_back(static_cast<uint8_t>(size & 0xFFu));
170 out.push_back(static_cast<uint8_t>((size >> 8) & 0xFFu));
171 out.push_back(static_cast<uint8_t>((size >> 16) & 0xFFu));
172 out.push_back(static_cast<uint8_t>((size >> 24) & 0xFFu));
173 out.insert(out.end(), encrypted_header.begin(), encrypted_header.end());
174 return out;
175}
176
177} // namespace detail::writer
178
192 int64_t row_group_size = 64 * 1024;
193
196
198 std::vector<thrift::KeyValue> file_metadata;
199
200 // -- Encoding & compression options --------------------------------------
201
205
209
212 std::unordered_map<std::string, Encoding> column_encodings;
213
218 bool auto_encoding = false;
219
222 bool auto_compression = false;
223
224 // -- Page Index (optional) ------------------------------------------------
225
228 bool enable_page_index = false;
229
230 // -- Bloom filters (optional) ---------------------------------------------
231
235
237 double bloom_filter_fpr = 0.01;
238
242 std::unordered_set<std::string> bloom_filter_columns;
243
244 // -- Encryption (AGPL-3.0+ project, commercial-enabled build gate) -------
245#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
249 std::optional<crypto::EncryptionConfig> encryption;
250#endif
251};
252
253// ---------------------------------------------------------------------------
254// ParquetWriter — streaming writer that writes one row group at a time
255// ---------------------------------------------------------------------------
256
281public:
284
285 // -- Factory: open a file for writing ------------------------------------
286
303 [[nodiscard]] static expected<ParquetWriter> open(
304 const std::filesystem::path& path,
305 const Schema& schema,
306 const Options& options = Options{}) {
307
308 ParquetWriter writer;
309 writer.schema_ = schema;
310 writer.options_ = options;
311 writer.path_ = path;
312
313 // Create parent directories if needed
314 if (path.has_parent_path()) {
315 std::error_code ec;
316 std::filesystem::create_directories(path.parent_path(), ec);
317 // Ignore error — the open() below will catch it
318 }
319
320 writer.file_.open(path, std::ios::binary | std::ios::trunc);
321 if (!writer.file_.is_open()) {
322 return Error{ErrorCode::IO_ERROR,
323 "Failed to open file for writing: " + path.string()};
324 }
325
326 // Write PAR1 magic (4 bytes)
327 writer.write_raw_le32(PARQUET_MAGIC);
328 writer.file_offset_ = 4;
329
330 // Initialize column writers — one per schema column
331 writer.init_column_writers();
332
333 // Initialize encryption if configured (commercial tier)
334#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
335 if (options.encryption) {
336 writer.encryptor_ = std::make_unique<crypto::FileEncryptor>(*options.encryption);
337 }
338#endif
339
340 // Initialize bloom filters if enabled
341 if (options.enable_bloom_filter) {
342 writer.bloom_filters_.resize(schema.num_columns());
343 }
344
345 // Initialize page index builders if enabled
346 if (options.enable_page_index) {
347 writer.col_index_builders_.resize(schema.num_columns());
348 }
349
350 writer.open_ = true;
351 return writer;
352 }
353
354 // -- Row-based API -------------------------------------------------------
355
368 [[nodiscard]] expected<void> write_row(const std::vector<std::string>& values) {
369 if (!open_) {
370 return Error{ErrorCode::IO_ERROR, "Writer is not open"};
371 }
372 if (values.size() != schema_.num_columns()) {
374 "Row has " + std::to_string(values.size()) +
375 " values, schema has " + std::to_string(schema_.num_columns()) +
376 " columns"};
377 }
378
379 pending_rows_.push_back(values);
380
381 // Auto-flush when we reach the target row group size
382 if (static_cast<int64_t>(pending_rows_.size()) >= options_.row_group_size) {
383 return flush_row_group();
384 }
385
386 return expected<void>{};
387 }
388
389 // -- Schema query --------------------------------------------------------
390
393 [[nodiscard]] size_t num_columns() const noexcept { return schema_.num_columns(); }
394
395 // -- Column-based API (typed batch) --------------------------------------
396
418 template <typename T>
419 [[nodiscard]] expected<void> write_column(size_t col_index,
420 const T* values, size_t count) {
421 if (!open_) {
422 return Error{ErrorCode::IO_ERROR, "Writer is not open"};
423 }
424 if (col_index >= schema_.num_columns()) {
426 "Column index " + std::to_string(col_index) +
427 " out of range (schema has " +
428 std::to_string(schema_.num_columns()) + " columns)"};
429 }
430
431 // Validate that T matches the schema's physical type for this column
432 constexpr PhysicalType expected_pt = parquet_type_of_v<T>;
433 const PhysicalType actual_pt = schema_.column(col_index).physical_type;
434 if (expected_pt != actual_pt) {
436 "Type mismatch for column " + std::to_string(col_index) +
437 " (\"" + schema_.column(col_index).name + "\")" +
438 ": schema physical type is " +
439 std::to_string(static_cast<int>(actual_pt)) +
440 " but write_column<T> maps to " +
441 std::to_string(static_cast<int>(expected_pt))};
442 }
443
444 col_writers_[col_index].write_batch(values, count);
445 col_row_counts_[col_index] = col_writers_[col_index].num_values();
446
447 // Insert values into bloom filter if enabled for this column
448 if (!bloom_filters_.empty()) {
449 bloom_insert_typed(col_index, values, count);
450 }
451
452 return expected<void>{};
453 }
454
467 [[nodiscard]] expected<void> write_column(size_t col_index,
468 const std::string* values,
469 size_t count) {
470 if (!open_) {
471 return Error{ErrorCode::IO_ERROR, "Writer is not open"};
472 }
473 if (col_index >= schema_.num_columns()) {
475 "Column index " + std::to_string(col_index) +
476 " out of range"};
477 }
478
479 // Validate that column is BYTE_ARRAY or FIXED_LEN_BYTE_ARRAY
480 const PhysicalType actual_pt = schema_.column(col_index).physical_type;
481 if (actual_pt != PhysicalType::BYTE_ARRAY &&
484 "Type mismatch for column " + std::to_string(col_index) +
485 " (\"" + schema_.column(col_index).name + "\")" +
486 ": schema physical type is " +
487 std::to_string(static_cast<int>(actual_pt)) +
488 " but write_column was called with std::string (BYTE_ARRAY)"};
489 }
490
491 // CWE-130: For FIXED_LEN_BYTE_ARRAY columns, each string must be
492 // written as raw bytes (no length prefix) via write_fixed_len_byte_array
493 // to produce a valid column chunk. The ColumnWriter validates that
494 // each value's length matches the schema's type_length.
495 if (actual_pt == PhysicalType::FIXED_LEN_BYTE_ARRAY) {
496 for (size_t i = 0; i < count; ++i) {
497 col_writers_[col_index].write_fixed_len_byte_array(
498 reinterpret_cast<const uint8_t*>(values[i].data()),
499 values[i].size());
500 }
501 } else {
502 col_writers_[col_index].write_batch(values, count);
503 }
504 col_row_counts_[col_index] = col_writers_[col_index].num_values();
505
506 // Insert values into bloom filter if enabled for this column
507 if (!bloom_filters_.empty()) {
508 bloom_insert_typed(col_index, values, count);
509 }
510
511 return expected<void>{};
512 }
513
514 // -- Flush / Close -------------------------------------------------------
515
533 if (!open_) {
534 return Error{ErrorCode::IO_ERROR, "Writer is not open"};
535 }
536
537 // Ensure Snappy codec is registered (idempotent, done once)
538 ensure_snappy_registered();
539
540 // If we have pending string rows, encode them into the column writers
541 if (!pending_rows_.empty()) {
542 auto result = encode_pending_rows();
543 if (!result) return result;
544 }
545
546 // Check that we have data to flush
547 bool has_data = false;
548 for (size_t c = 0; c < col_writers_.size(); ++c) {
549 if (col_writers_[c].num_values() > 0) {
550 has_data = true;
551 break;
552 }
553 }
554 if (!has_data) {
555 return expected<void>{}; // Nothing to flush
556 }
557
558 // Verify all columns have the same number of values
559 int64_t rg_num_rows = col_writers_[0].num_values();
560 for (size_t c = 1; c < col_writers_.size(); ++c) {
561 if (col_writers_[c].num_values() != rg_num_rows) {
563 "Column " + std::to_string(c) + " has " +
564 std::to_string(col_writers_[c].num_values()) +
565 " values, expected " + std::to_string(rg_num_rows)};
566 }
567 }
568
569 // Build the row group metadata
571 rg.num_rows = rg_num_rows;
572 rg.total_byte_size = 0;
573 rg.columns.resize(col_writers_.size());
574
575 // Write each column chunk with encoding and compression
576 for (size_t c = 0; c < col_writers_.size(); ++c) {
577 const auto& cw = col_writers_[c];
578 const auto& col_desc = schema_.column(c);
579
580 // ---- Step 1: Choose encoding for this column -------------------
581 Encoding col_encoding = choose_encoding(c, col_desc, cw);
582
583 // ---- Step 2: Determine compression codec -----------------------
584 Compression col_codec = options_.compression;
585 if (options_.auto_compression) {
586 // Use the PLAIN data as a sample for auto-selection
587 col_codec = auto_select_compression(
588 cw.data().data(), cw.data().size());
589 }
590
591 // ---- Step 3: Track total uncompressed and compressed bytes -----
592 int64_t total_uncompressed = 0;
593 int64_t total_compressed = 0;
594 std::unordered_set<Encoding> used_encodings;
595
596 // Record the column chunk start offset
597 int64_t column_offset = file_offset_;
598 int64_t dict_page_offset = -1; // -1 means no dict page
599 int64_t data_page_offset = -1;
600
601 // ---- Step 4: Handle dictionary encoding specially --------------
602 if (col_encoding == Encoding::PLAIN_DICTIONARY ||
603 col_encoding == Encoding::RLE_DICTIONARY) {
604
605 // Re-encode as dictionary: extract raw values from PLAIN data
606 // and build a dictionary.
607 auto dict_result = write_dictionary_column(
608 c, col_desc, cw, col_codec);
609 if (!dict_result) return dict_result.error();
610
611 const auto& dict_info = *dict_result;
612 total_uncompressed = dict_info.total_uncompressed;
613 total_compressed = dict_info.total_compressed;
614 used_encodings = dict_info.used_encodings;
615 dict_page_offset = dict_info.dict_page_offset;
616 data_page_offset = dict_info.data_page_offset;
617
618 // Feed page index builder (dictionary path)
619 if (!col_index_builders_.empty()) {
620 auto& builder = col_index_builders_[c];
621 builder.start_page();
622 builder.set_first_row_index(0);
623 builder.set_page_location(dict_info.data_page_offset,
624 static_cast<int32_t>(dict_info.total_compressed));
625
626 const auto& cw_stats_pi = cw.statistics();
627 if (cw_stats_pi.has_min_max()) {
628 const auto& min_b = cw_stats_pi.min_bytes();
629 const auto& max_b = cw_stats_pi.max_bytes();
630 builder.set_min(std::string(min_b.begin(), min_b.end()));
631 builder.set_max(std::string(max_b.begin(), max_b.end()));
632 }
633 builder.set_null_page(cw_stats_pi.null_count() == cw.num_values());
634 builder.set_null_count(cw_stats_pi.null_count());
635 }
636
637 } else {
638 // ---- Step 5: Non-dictionary encoding path ------------------
639 // Encode (or reuse PLAIN data) based on selected encoding
640 auto encoded = encode_column_data(cw, col_encoding, col_desc.physical_type);
641
642 if (encoded.size() > static_cast<size_t>(INT32_MAX)) {
644 "encoded page size exceeds int32 limit (2 GiB)"};
645 }
646 int32_t uncompressed_size = static_cast<int32_t>(encoded.size());
647
648 // Compress the page data
649 const uint8_t* page_data = encoded.data();
650 size_t page_data_size = encoded.size();
651 std::vector<uint8_t> compressed_buf;
652 int32_t compressed_size = uncompressed_size;
653
654 if (col_codec != Compression::UNCOMPRESSED) {
655 auto comp_result = compress(col_codec, page_data, page_data_size);
656 if (!comp_result) return comp_result.error();
657 compressed_buf = std::move(*comp_result);
658 if (compressed_buf.size() > static_cast<size_t>(INT32_MAX))
659 return Error{ErrorCode::INVALID_ARGUMENT, "Compressed page exceeds INT32_MAX"};
660 compressed_size = static_cast<int32_t>(compressed_buf.size());
661 page_data = compressed_buf.data();
662 page_data_size = compressed_buf.size();
663 }
664
665 // Encrypt the page data if PME is configured for this column
666#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
667 std::vector<uint8_t> encrypted_buf;
668 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
669 auto enc_result = encryptor_->encrypt_column_page(
670 page_data, page_data_size, col_desc.name,
671 static_cast<int32_t>(row_groups_.size()), 0);
672 if (!enc_result) return enc_result.error();
673 encrypted_buf = std::move(*enc_result);
674 compressed_size = static_cast<int32_t>(encrypted_buf.size());
675 page_data = encrypted_buf.data();
676 page_data_size = encrypted_buf.size();
677 // uncompressed_page_size stays the same (pre-encryption size
678 // for the reader to know how much to decompress after decrypt)
679 }
680#endif
681
682 // Build the DataPage PageHeader
685 ph.uncompressed_page_size = uncompressed_size;
686 ph.compressed_page_size = compressed_size;
687 // CRC-32 over final page data (post-compression, post-encryption)
688 ph.crc = static_cast<int32_t>(detail::writer::page_crc32(
689 page_data, page_data_size));
690
692 dph.num_values = static_cast<int32_t>(cw.num_values());
693 dph.encoding = col_encoding;
696 ph.data_page_header = dph;
697
698 // Serialize and write the page header
699 thrift::CompactEncoder header_enc;
700 ph.serialize(header_enc);
701 const auto& raw_header_bytes = header_enc.data();
702
703 const uint8_t* header_write_data = raw_header_bytes.data();
704 size_t header_write_size = raw_header_bytes.size();
705#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
706 std::vector<uint8_t> header_record_buf;
707 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
708 auto enc_header = encryptor_->encrypt_data_page_header(
709 raw_header_bytes.data(), raw_header_bytes.size(),
710 col_desc.name, static_cast<int32_t>(row_groups_.size()), 0);
711 if (!enc_header) return enc_header.error();
712 header_record_buf = detail::writer::wrap_encrypted_page_header(*enc_header);
713 header_write_data = header_record_buf.data();
714 header_write_size = header_record_buf.size();
715 }
716#endif
717
718 data_page_offset = file_offset_;
719
720 write_raw(header_write_data, header_write_size);
721 write_raw(page_data, page_data_size);
722
723 total_uncompressed = static_cast<int64_t>(header_write_size) +
724 static_cast<int64_t>(uncompressed_size);
725 total_compressed = static_cast<int64_t>(header_write_size) +
726 static_cast<int64_t>(compressed_size);
727
728 used_encodings.insert(col_encoding);
729
730 // Feed page index builder (non-dictionary path)
731 if (!col_index_builders_.empty()) {
732 auto& builder = col_index_builders_[c];
733 builder.start_page();
734 builder.set_first_row_index(0);
735 builder.set_page_location(data_page_offset,
736 compressed_size + static_cast<int32_t>(header_write_size));
737
738 const auto& cw_stats_pi = cw.statistics();
739 if (cw_stats_pi.has_min_max()) {
740 const auto& min_b = cw_stats_pi.min_bytes();
741 const auto& max_b = cw_stats_pi.max_bytes();
742 builder.set_min(std::string(min_b.begin(), min_b.end()));
743 builder.set_max(std::string(max_b.begin(), max_b.end()));
744 }
745 builder.set_null_page(cw_stats_pi.null_count() == cw.num_values());
746 builder.set_null_count(cw_stats_pi.null_count());
747 }
748 }
749
750 // ---- Step 6: Build ColumnChunk metadata ------------------------
751 thrift::ColumnChunk& cc = rg.columns[c];
752 cc.file_offset = column_offset;
753
755 cmd.type = col_desc.physical_type;
756 cmd.path_in_schema = {col_desc.name};
757 cmd.codec = col_codec;
758 cmd.num_values = cw.num_values();
759 cmd.total_uncompressed_size = total_uncompressed;
760 cmd.total_compressed_size = total_compressed;
761 cmd.data_page_offset = data_page_offset;
762
763 // Set encodings list
764 cmd.encodings.assign(used_encodings.begin(), used_encodings.end());
765
766 // Set dictionary page offset if applicable
767 if (dict_page_offset >= 0) {
768 cmd.dictionary_page_offset = dict_page_offset;
769 }
770
771 // Populate statistics from ColumnWriter
772 const auto& cw_stats = cw.statistics();
773 if (cw_stats.has_min_max()) {
774 thrift::Statistics stats;
775 stats.null_count = cw_stats.null_count();
776
777 // Convert min/max bytes to binary strings for Thrift
778 const auto& min_b = cw_stats.min_bytes();
779 const auto& max_b = cw_stats.max_bytes();
780 stats.min_value = std::string(min_b.begin(), min_b.end());
781 stats.max_value = std::string(max_b.begin(), max_b.end());
782 // Also set legacy min/max for backward compatibility
783 stats.min = stats.min_value;
784 stats.max = stats.max_value;
785
786 if (cw_stats.distinct_count().has_value()) {
787 stats.distinct_count = *cw_stats.distinct_count();
788 }
789
790 cmd.statistics = stats;
791 }
792
793 cc.meta_data = cmd;
794
795 // ---- Step 7: Write bloom filter for this column (if enabled) ---
796 if (!bloom_filters_.empty() && bloom_filters_[c]) {
797 int64_t bf_offset = file_offset_;
798 const auto& bf_data = bloom_filters_[c]->data();
799 uint32_t bf_size = static_cast<uint32_t>(bf_data.size());
800
801 // Write: 4-byte LE header (total bloom filter size) + raw bytes
802 write_raw_le32(bf_size);
803 write_raw(bf_data.data(), bf_data.size());
804
805 cc.bloom_filter_offset = bf_offset;
806 cc.bloom_filter_length = static_cast<int32_t>(4 + bf_data.size());
807
808 rg.total_byte_size += static_cast<int64_t>(4 + bf_data.size());
809 }
810
811 // ---- Step 8: Write ColumnIndex + OffsetIndex (if enabled) ----
812 if (!col_index_builders_.empty()) {
813 auto& builder = col_index_builders_[c];
814
815 // Serialize and write ColumnIndex
816 auto col_idx = builder.build_column_index(schema_.column(c).physical_type);
818 col_idx.serialize(ci_enc);
819 int64_t ci_offset = file_offset_;
820 write_raw(ci_enc.data().data(), ci_enc.data().size());
821 cc.column_index_offset = ci_offset;
822 cc.column_index_length = static_cast<int32_t>(ci_enc.data().size());
823
824 // Serialize and write OffsetIndex
825 auto off_idx = builder.build_offset_index();
827 off_idx.serialize(oi_enc);
828 int64_t oi_offset = file_offset_;
829 write_raw(oi_enc.data().data(), oi_enc.data().size());
830 cc.offset_index_offset = oi_offset;
831 cc.offset_index_length = static_cast<int32_t>(oi_enc.data().size());
832
833 rg.total_byte_size += static_cast<int64_t>(ci_enc.data().size() + oi_enc.data().size());
834 }
835
836 rg.total_byte_size += total_compressed;
837 }
838
839 // Record the row group
840 row_groups_.push_back(std::move(rg));
841 total_rows_ += rg_num_rows;
842
843 // Reset column writers for the next row group
844 for (auto& cw : col_writers_) {
845 cw.reset();
846 }
847 for (auto& count : col_row_counts_) {
848 count = 0;
849 }
850
851 // Reset bloom filters for the next row group
852 for (auto& bf : bloom_filters_) {
853 if (bf) bf->reset();
854 }
855
856 // Reset page index builders for the next row group
857 for (auto& builder : col_index_builders_) {
858 builder.reset();
859 }
860
861 return expected<void>{};
862 }
863
881 [[nodiscard]] expected<WriteStats> close() {
882 if (!open_) {
883 return WriteStats{}; // Already closed — return empty stats
884 }
885
886 // Validate footer completeness before close (Parquet spec)
887 if (schema_.num_columns() == 0) {
888 file_.close();
889 open_ = false;
890 return Error{ErrorCode::INTERNAL_ERROR, "ParquetWriter: cannot close with empty schema"};
891 }
892
893 // Flush any remaining data
894 auto flush_result = flush_row_group();
895 if (!flush_result) {
896 file_.close();
897 open_ = false;
898 return flush_result.error();
899 }
900
901 // Build FileMetaData
904 fmd.num_rows = total_rows_;
905 fmd.row_groups = row_groups_;
906 fmd.created_by = options_.created_by;
907
908 // Build schema elements: root + one per column
909 // Root element: group node with num_children = num_columns
911 root.name = schema_.name();
912 root.num_children = static_cast<int32_t>(schema_.num_columns());
913 // Root has no type (it's a group node)
914 fmd.schema.push_back(root);
915
916 // One element per leaf column
917 for (size_t c = 0; c < schema_.num_columns(); ++c) {
918 const auto& col_desc = schema_.column(c);
919
921 elem.type = col_desc.physical_type;
922 elem.name = col_desc.name;
923 elem.repetition_type = col_desc.repetition;
924
925 // Set type_length for FIXED_LEN_BYTE_ARRAY
926 if (col_desc.physical_type == PhysicalType::FIXED_LEN_BYTE_ARRAY &&
927 col_desc.type_length > 0) {
928 elem.type_length = col_desc.type_length;
929 }
930
931 // Set converted type for common logical types
932 if (col_desc.logical_type == LogicalType::STRING) {
934 } else if (col_desc.logical_type == LogicalType::DATE) {
936 } else if (col_desc.logical_type == LogicalType::TIMESTAMP_MS) {
938 } else if (col_desc.logical_type == LogicalType::TIMESTAMP_US) {
940 } else if (col_desc.logical_type == LogicalType::JSON) {
942 } else if (col_desc.logical_type == LogicalType::ENUM) {
944 } else if (col_desc.logical_type == LogicalType::DECIMAL) {
946 if (col_desc.precision > 0) elem.precision = col_desc.precision;
947 if (col_desc.scale >= 0) elem.scale = col_desc.scale;
948 }
949
950 fmd.schema.push_back(elem);
951 }
952
953 // Set custom key-value metadata
954 if (!options_.file_metadata.empty()) {
955 fmd.key_value_metadata = options_.file_metadata;
956 }
957
958 // If encryption is configured, embed FileEncryptionProperties in
959 // file metadata so the reader knows this file uses PME.
960#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
961 if (encryptor_) {
962 auto file_props = encryptor_->file_properties();
963 auto props_bytes = file_props.serialize();
964 std::string props_str(props_bytes.begin(), props_bytes.end());
965
966 thrift::KeyValue enc_kv;
967 enc_kv.key = "signet.encryption.properties";
968 enc_kv.value = std::move(props_str);
969
970 if (!fmd.key_value_metadata.has_value()) {
971 fmd.key_value_metadata = std::vector<thrift::KeyValue>{};
972 }
973 fmd.key_value_metadata->push_back(std::move(enc_kv));
974 }
975#endif
976
977 // Serialize FileMetaData to Thrift compact protocol
979 fmd.serialize(enc);
980 const auto& footer_bytes = enc.data();
981
982 // Record footer start position
983 int64_t footer_start = file_offset_;
984 (void)footer_start; // Not needed, but useful for debugging
985
986 // Footer encryption: if encryptor is set and footer encryption is
987 // enabled, encrypt the serialized FileMetaData and write "PARE" magic.
988#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
989 if (encryptor_ && encryptor_->config().encrypt_footer) {
990 auto enc_footer = encryptor_->encrypt_footer(
991 footer_bytes.data(), footer_bytes.size());
992 if (!enc_footer) {
993 file_.close();
994 open_ = false;
995 return enc_footer.error();
996 }
997 const auto& encrypted_footer = *enc_footer;
998
999 // Write encrypted footer
1000 write_raw(encrypted_footer.data(), encrypted_footer.size());
1001
1002 // Write footer length (encrypted footer size)
1003 uint32_t footer_len = static_cast<uint32_t>(encrypted_footer.size());
1004 write_raw_le32(footer_len);
1005
1006 // Write "PARE" magic to indicate encrypted footer
1007 write_raw_le32(PARQUET_MAGIC_ENCRYPTED);
1008 } else if (encryptor_ && !encryptor_->config().footer_key.empty()) {
1009 // Signed plaintext footer mode (PARQUET-1178 §4.2): footer is
1010 // readable but HMAC-signed for tamper detection.
1011 auto signed_footer = encryptor_->sign_footer(
1012 footer_bytes.data(), footer_bytes.size());
1013 if (!signed_footer) {
1014 file_.close();
1015 open_ = false;
1016 return signed_footer.error();
1017 }
1018 const auto& sf = *signed_footer;
1019
1020 // Write signed footer (plaintext + 32-byte HMAC appended)
1021 write_raw(sf.data(), sf.size());
1022
1023 // Footer length includes the HMAC signature
1024 uint32_t footer_len = static_cast<uint32_t>(sf.size());
1025 write_raw_le32(footer_len);
1026
1027 // PAR1 magic — footer is plaintext (signed, not encrypted)
1028 write_raw_le32(PARQUET_MAGIC);
1029 } else {
1030#endif
1031 // Write footer in plaintext (standard path)
1032 write_raw(footer_bytes.data(), footer_bytes.size());
1033
1034 // Write footer length (4 bytes LE)
1035 uint32_t footer_len = static_cast<uint32_t>(footer_bytes.size());
1036 write_raw_le32(footer_len);
1037
1038 // Write closing PAR1 magic (4 bytes)
1039 write_raw_le32(PARQUET_MAGIC);
1040#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1041 }
1042#endif
1043
1044 file_.flush();
1045 file_.close();
1046 open_ = false;
1047
1048 // Build WriteStats from accumulated row group metadata
1049 return build_write_stats();
1050 }
1051
1058 if (open_) {
1059 // Best-effort close; ignore errors in destructor
1060 (void)close();
1061 }
1062 }
1063
1064 // -- Non-copyable, movable -----------------------------------------------
1065
1070
1075 : schema_(std::move(other.schema_))
1076 , options_(std::move(other.options_))
1077 , path_(std::move(other.path_))
1078 , file_(std::move(other.file_))
1079 , file_offset_(other.file_offset_)
1080 , col_writers_(std::move(other.col_writers_))
1081 , col_row_counts_(std::move(other.col_row_counts_))
1082 , pending_rows_(std::move(other.pending_rows_))
1083 , row_groups_(std::move(other.row_groups_))
1084 , total_rows_(other.total_rows_)
1085 , open_(other.open_)
1086#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1087 , encryptor_(std::move(other.encryptor_))
1088#endif
1089 , bloom_filters_(std::move(other.bloom_filters_))
1090 , col_index_builders_(std::move(other.col_index_builders_))
1091 {
1092 other.open_ = false;
1093 other.file_offset_ = 0;
1094 other.total_rows_ = 0;
1095 }
1096
1100 if (this != &other) {
1101 // Close current file if open
1102 if (open_) {
1103 (void)close();
1104 }
1105
1106 schema_ = std::move(other.schema_);
1107 options_ = std::move(other.options_);
1108 path_ = std::move(other.path_);
1109 file_ = std::move(other.file_);
1110 file_offset_ = other.file_offset_;
1111 col_writers_ = std::move(other.col_writers_);
1112 col_row_counts_ = std::move(other.col_row_counts_);
1113 pending_rows_ = std::move(other.pending_rows_);
1114 row_groups_ = std::move(other.row_groups_);
1115 total_rows_ = other.total_rows_;
1116 open_ = other.open_;
1117#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1118 encryptor_ = std::move(other.encryptor_);
1119#endif
1120 bloom_filters_ = std::move(other.bloom_filters_);
1121 col_index_builders_ = std::move(other.col_index_builders_);
1122
1123 other.open_ = false;
1124 other.file_offset_ = 0;
1125 other.total_rows_ = 0;
1126 }
1127 return *this;
1128 }
1129
1130 // -- Status queries ------------------------------------------------------
1131
1138 [[nodiscard]] int64_t rows_written() const {
1139 return total_rows_ + static_cast<int64_t>(pending_rows_.size());
1140 }
1141
1145 [[nodiscard]] int64_t row_groups_written() const {
1146 return static_cast<int64_t>(row_groups_.size());
1147 }
1148
1151 [[nodiscard]] bool is_open() const { return open_; }
1152
1153 // ========================================================================
1154 // Standalone CSV-to-Parquet converter
1155 // ========================================================================
1156
1177 [[nodiscard]] static expected<void> csv_to_parquet(
1178 const std::filesystem::path& csv_input,
1179 const std::filesystem::path& parquet_output,
1180 const Options& options = Options{}) {
1181
1182 // Open CSV
1183 std::ifstream csv(csv_input);
1184 if (!csv.is_open()) {
1186 "Failed to open CSV file: " + csv_input.string()};
1187 }
1188
1189 // Read header line
1190 std::string header_line;
1191 if (!std::getline(csv, header_line)) {
1192 return Error{ErrorCode::INVALID_FILE, "CSV file is empty"};
1193 }
1194
1195 auto col_names = split_csv_line(header_line);
1196 if (col_names.empty()) {
1197 return Error{ErrorCode::INVALID_FILE, "CSV header has no columns"};
1198 }
1199
1200 size_t num_cols = col_names.size();
1201
1202 // Read all data rows
1203 std::vector<std::vector<std::string>> rows;
1204 std::string line;
1205 while (std::getline(csv, line)) {
1206 if (line.empty()) continue;
1207 auto fields = split_csv_line(line);
1208 // Pad or truncate to match header width
1209 fields.resize(num_cols);
1210 rows.push_back(std::move(fields));
1211 }
1212 csv.close();
1213
1214 if (rows.empty()) {
1215 return Error{ErrorCode::INVALID_FILE, "CSV file has no data rows"};
1216 }
1217
1218 // Auto-detect column types by scanning all values
1219 // For each column, try in order: INT64 → DOUBLE → BOOLEAN → STRING
1220 std::vector<PhysicalType> detected_types(num_cols, PhysicalType::INT64);
1221 std::vector<LogicalType> detected_logical(num_cols, LogicalType::NONE);
1222
1223 for (size_t c = 0; c < num_cols; ++c) {
1224 bool all_int64 = true;
1225 bool all_double = true;
1226 bool all_bool = true;
1227
1228 for (const auto& row : rows) {
1229 const std::string& val = row[c];
1230 if (val.empty()) continue; // Skip empty values for type detection
1231
1232 // Try INT64
1233 if (all_int64) {
1234 int64_t parsed;
1235 auto [ptr, ec] = std::from_chars(val.data(),
1236 val.data() + val.size(),
1237 parsed);
1238 if (ec != std::errc{} || ptr != val.data() + val.size()) {
1239 all_int64 = false;
1240 }
1241 }
1242
1243 // Try DOUBLE (locale-independent on all platforms)
1244 if (all_double) {
1245 double tmp = 0;
1246 if (!detail::try_parse_double(val, tmp)) {
1247 all_double = false;
1248 }
1249 }
1250
1251 // Try BOOLEAN
1252 if (all_bool) {
1253 if (val != "true" && val != "false" &&
1254 val != "TRUE" && val != "FALSE" &&
1255 val != "True" && val != "False" &&
1256 val != "1" && val != "0") {
1257 all_bool = false;
1258 }
1259 }
1260 }
1261
1262 // Priority: INT64 > DOUBLE > BOOLEAN > STRING
1263 if (all_int64) {
1264 detected_types[c] = PhysicalType::INT64;
1265 detected_logical[c] = LogicalType::NONE;
1266 } else if (all_double) {
1267 detected_types[c] = PhysicalType::DOUBLE;
1268 detected_logical[c] = LogicalType::NONE;
1269 } else if (all_bool) {
1270 detected_types[c] = PhysicalType::BOOLEAN;
1271 detected_logical[c] = LogicalType::NONE;
1272 } else {
1273 detected_types[c] = PhysicalType::BYTE_ARRAY;
1274 detected_logical[c] = LogicalType::STRING;
1275 }
1276 }
1277
1278 // Build schema from detected types
1279 std::vector<ColumnDescriptor> col_descs;
1280 col_descs.reserve(num_cols);
1281 for (size_t c = 0; c < num_cols; ++c) {
1282 ColumnDescriptor cd;
1283 cd.name = col_names[c];
1284 cd.physical_type = detected_types[c];
1285 cd.logical_type = detected_logical[c];
1286 col_descs.push_back(std::move(cd));
1287 }
1288
1289 Schema schema("csv_data", std::move(col_descs));
1290
1291 // Open Parquet writer
1292 auto writer_result = ParquetWriter::open(parquet_output, schema, options);
1293 if (!writer_result) {
1294 return writer_result.error();
1295 }
1296 auto& writer = *writer_result;
1297
1298 // Write all rows
1299 for (const auto& row : rows) {
1300 auto result = writer.write_row(row);
1301 if (!result) {
1302 return result;
1303 }
1304 }
1305
1306 // Close
1307 auto close_result = writer.close();
1308 if (!close_result) return close_result.error();
1309 return expected<void>{};
1310 }
1311
1312private:
1314 ParquetWriter() = default;
1315
1316 // -- Internal state -------------------------------------------------------
1317
1318 Schema schema_;
1319 Options options_;
1320 std::filesystem::path path_;
1321 std::ofstream file_;
1322 int64_t file_offset_ = 0;
1323 std::vector<ColumnWriter> col_writers_;
1324 std::vector<int64_t> col_row_counts_;
1325 std::vector<std::vector<std::string>> pending_rows_;
1326 std::vector<thrift::RowGroup> row_groups_;
1327 int64_t total_rows_ = 0;
1328 bool open_ = false;
1329#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1330 std::unique_ptr<crypto::FileEncryptor> encryptor_; // PME encryption (nullptr if none)
1331#endif
1332 std::vector<std::unique_ptr<SplitBlockBloomFilter>> bloom_filters_; // Per-column bloom filters
1333 std::vector<ColumnIndexBuilder> col_index_builders_; // Per-column page index builders
1334
1335 // -- Initialization -------------------------------------------------------
1336
1337 void init_column_writers() {
1338 col_writers_.clear();
1339 col_writers_.reserve(schema_.num_columns());
1340 col_row_counts_.resize(schema_.num_columns(), 0);
1341 for (size_t c = 0; c < schema_.num_columns(); ++c) {
1342 col_writers_.emplace_back(schema_.column(c).physical_type,
1343 schema_.column(c).type_length);
1344 }
1345 }
1346
1347 // -- Build WriteStats from accumulated row group metadata -----------------
1348
1349 WriteStats build_write_stats() const {
1350 WriteStats stats;
1351 stats.file_size_bytes = file_offset_;
1352 stats.total_rows = total_rows_;
1353 stats.total_row_groups = static_cast<int64_t>(row_groups_.size());
1354
1355 // Aggregate per-column stats across all row groups
1356 size_t num_cols = schema_.num_columns();
1357 stats.columns.resize(num_cols);
1358
1359 for (size_t c = 0; c < num_cols; ++c) {
1360 auto& col_stats = stats.columns[c];
1361 col_stats.column_name = schema_.column(c).name;
1362 col_stats.physical_type = schema_.column(c).physical_type;
1363 }
1364
1365 for (const auto& rg : row_groups_) {
1366 for (size_t c = 0; c < rg.columns.size() && c < num_cols; ++c) {
1367 if (!rg.columns[c].meta_data.has_value()) continue;
1368 const auto& cmd = *rg.columns[c].meta_data;
1369 auto& col_stats = stats.columns[c];
1370
1371 col_stats.uncompressed_bytes += cmd.total_uncompressed_size;
1372 col_stats.compressed_bytes += cmd.total_compressed_size;
1373 col_stats.num_values += cmd.num_values;
1374 col_stats.compression = cmd.codec;
1375
1376 // Use the first encoding in the list as the primary encoding
1377 if (!cmd.encodings.empty()) {
1378 col_stats.encoding = cmd.encodings[0];
1379 }
1380
1381 // Accumulate null count from statistics if available
1382 if (cmd.statistics.has_value() && cmd.statistics->null_count.has_value()) {
1383 col_stats.null_count += *cmd.statistics->null_count;
1384 }
1385
1386 stats.total_uncompressed_bytes += cmd.total_uncompressed_size;
1387 stats.total_compressed_bytes += cmd.total_compressed_size;
1388 }
1389 }
1390
1391 // Compute derived ratios
1392 if (stats.total_compressed_bytes > 0) {
1393 stats.compression_ratio = static_cast<double>(stats.total_uncompressed_bytes)
1394 / static_cast<double>(stats.total_compressed_bytes);
1395 }
1396 if (stats.total_rows > 0) {
1397 stats.bytes_per_row = static_cast<double>(stats.file_size_bytes)
1398 / static_cast<double>(stats.total_rows);
1399 }
1400
1401 return stats;
1402 }
1403
1404 // -- Raw I/O helpers ------------------------------------------------------
1405
1406 void write_raw(const uint8_t* data, size_t len) {
1407 file_.write(reinterpret_cast<const char*>(data), static_cast<std::streamsize>(len));
1408 if (!file_.good()) {
1409 throw std::runtime_error("ParquetWriter::write_raw: I/O error after writing "
1410 + std::to_string(len) + " bytes at offset "
1411 + std::to_string(file_offset_));
1412 }
1413 file_offset_ += static_cast<int64_t>(len);
1414 }
1415
1416 void write_raw_le32(uint32_t val) {
1417 uint8_t bytes[4];
1418 bytes[0] = static_cast<uint8_t>((val ) & 0xFF);
1419 bytes[1] = static_cast<uint8_t>((val >> 8) & 0xFF);
1420 bytes[2] = static_cast<uint8_t>((val >> 16) & 0xFF);
1421 bytes[3] = static_cast<uint8_t>((val >> 24) & 0xFF);
1422 write_raw(bytes, 4);
1423 }
1424
1425 // -- Encode pending rows into column writers ------------------------------
1426
1427 [[nodiscard]] expected<void> encode_pending_rows() {
1428 for (size_t c = 0; c < schema_.num_columns(); ++c) {
1429 const auto& col_desc = schema_.column(c);
1430 auto& cw = col_writers_[c];
1431
1432 // Lazily initialize bloom filter for this column on first data
1433 bool bf_active = bloom_ensure_filter(c);
1434
1435 for (const auto& row : pending_rows_) {
1436 const std::string& val = row[c];
1437
1438 switch (col_desc.physical_type) {
1439 case PhysicalType::BOOLEAN: {
1440 bool b = (val == "true" || val == "TRUE" || val == "True" ||
1441 val == "1");
1442 cw.write_bool(b);
1443 // Booleans: insert as int32 (0 or 1) to match Parquet convention
1444 if (bf_active) bloom_filters_[c]->insert_value(static_cast<int32_t>(b));
1445 break;
1446 }
1447 case PhysicalType::INT32: {
1448 int32_t parsed = 0;
1449 auto [ptr, ec] = std::from_chars(val.data(),
1450 val.data() + val.size(),
1451 parsed);
1452 if (ec != std::errc{}) {
1453 // Fallback: parse as double and truncate (locale-independent)
1454 parsed = static_cast<int32_t>(detail::parse_double(val));
1455 }
1456 cw.write_int32(parsed);
1457 if (bf_active) bloom_filters_[c]->insert_value(parsed);
1458 break;
1459 }
1460 case PhysicalType::INT64: {
1461 int64_t parsed = 0;
1462 auto [ptr, ec] = std::from_chars(val.data(),
1463 val.data() + val.size(),
1464 parsed);
1465 if (ec != std::errc{}) {
1466 // Fallback: parse as double and truncate (locale-independent)
1467 parsed = static_cast<int64_t>(detail::parse_double(val));
1468 }
1469 cw.write_int64(parsed);
1470 if (bf_active) bloom_filters_[c]->insert_value(parsed);
1471 break;
1472 }
1473 case PhysicalType::FLOAT: {
1474 float f = detail::parse_float(val);
1475 cw.write_float(f);
1476 if (bf_active) bloom_filters_[c]->insert_value(f);
1477 break;
1478 }
1479 case PhysicalType::DOUBLE: {
1480 double d = detail::parse_double(val);
1481 cw.write_double(d);
1482 if (bf_active) bloom_filters_[c]->insert_value(d);
1483 break;
1484 }
1486 cw.write_byte_array(val);
1487 if (bf_active) bloom_filters_[c]->insert_value(val);
1488 break;
1489 }
1491 const auto& cd = schema_.column(c);
1492 if (cd.type_length > 0 &&
1493 val.size() != static_cast<size_t>(cd.type_length)) {
1494 return Error{ErrorCode::SCHEMA_MISMATCH,
1495 "FIXED_LEN_BYTE_ARRAY column '" + cd.name +
1496 "': CSV value length " + std::to_string(val.size()) +
1497 " != type_length " + std::to_string(cd.type_length)};
1498 }
1499 cw.write_fixed_len_byte_array(
1500 reinterpret_cast<const uint8_t*>(val.data()), val.size());
1501 if (bf_active) bloom_filters_[c]->insert_value(val);
1502 break;
1503 }
1504 default: {
1505 // INT96 — write as raw bytes
1506 cw.write_byte_array(val);
1507 if (bf_active) bloom_filters_[c]->insert_value(val);
1508 break;
1509 }
1510 }
1511 }
1512 }
1513
1514 pending_rows_.clear();
1515 return expected<void>{};
1516 }
1517
1518 // -- Snappy auto-registration ---------------------------------------------
1519
1520 static void ensure_snappy_registered() {
1521 static std::once_flag flag;
1522 std::call_once(flag, [] { register_snappy_codec(); });
1523 }
1524
1525 // -- Encoding selection ---------------------------------------------------
1526
1528 [[nodiscard]] Encoding choose_encoding(
1529 size_t col_index,
1530 const ColumnDescriptor& col_desc,
1531 const ColumnWriter& cw) const {
1532
1533 // 1. Per-column override takes priority
1534 auto it = options_.column_encodings.find(col_desc.name);
1535 if (it != options_.column_encodings.end()) {
1536 return it->second;
1537 }
1538
1539 // 2. Auto-encoding: pick optimal encoding based on type and data
1540 if (options_.auto_encoding) {
1541 switch (col_desc.physical_type) {
1545
1549
1551 // Check distinct ratio via statistics
1552 const auto& stats = cw.statistics();
1553 if (stats.distinct_count().has_value() && cw.num_values() > 0) {
1554 double ratio = static_cast<double>(*stats.distinct_count()) /
1555 static_cast<double>(cw.num_values());
1556 if (ratio < 0.40) {
1558 }
1559 }
1560 // Fallback: estimate from data size vs value count
1561 // PLAIN BYTE_ARRAY has 4-byte length prefix per value, so
1562 // if many values share the same content, dict encoding wins.
1563 // Without stats, stay with PLAIN as a safe default.
1564 return Encoding::PLAIN;
1565 }
1566
1568 return Encoding::RLE;
1569
1570 default:
1571 return Encoding::PLAIN;
1572 }
1573 }
1574
1575 // 3. Use the global default encoding
1576 (void)col_index;
1577 return options_.default_encoding;
1578 }
1579
1580 // -- Column data encoding (non-dictionary) --------------------------------
1581
1585 [[nodiscard]] static std::vector<uint8_t> encode_column_data(
1586 const ColumnWriter& cw,
1587 Encoding encoding,
1588 PhysicalType type) {
1589
1590 const auto& plain_data = cw.data();
1591 int64_t num_vals = cw.num_values();
1592
1593 switch (encoding) {
1594
1596 if (type == PhysicalType::INT32) {
1597 // Extract int32 values from PLAIN buffer (4 bytes LE each)
1598 size_t count = static_cast<size_t>(num_vals);
1599 std::vector<int32_t> values(count);
1600 for (size_t i = 0; i < count; ++i) {
1601 std::memcpy(&values[i], plain_data.data() + i * 4, 4);
1602 }
1603 return delta::encode_int32(values.data(), count);
1604 }
1605 if (type == PhysicalType::INT64) {
1606 size_t count = static_cast<size_t>(num_vals);
1607 std::vector<int64_t> values(count);
1608 for (size_t i = 0; i < count; ++i) {
1609 std::memcpy(&values[i], plain_data.data() + i * 8, 8);
1610 }
1611 return delta::encode_int64(values.data(), count);
1612 }
1613 // Fallback: unsupported type for DELTA_BINARY_PACKED → return PLAIN
1614 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1615 }
1616
1618 if (type == PhysicalType::FLOAT) {
1619 size_t count = static_cast<size_t>(num_vals);
1620 const auto* float_ptr = reinterpret_cast<const float*>(plain_data.data());
1621 return byte_stream_split::encode_float(float_ptr, count);
1622 }
1623 if (type == PhysicalType::DOUBLE) {
1624 size_t count = static_cast<size_t>(num_vals);
1625 const auto* double_ptr = reinterpret_cast<const double*>(plain_data.data());
1626 return byte_stream_split::encode_double(double_ptr, count);
1627 }
1628 // Fallback: unsupported type → return PLAIN
1629 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1630 }
1631
1632 case Encoding::RLE: {
1633 // For BOOLEAN columns: RLE-encode the bit-packed booleans
1634 if (type == PhysicalType::BOOLEAN) {
1635 size_t count = static_cast<size_t>(num_vals);
1636 // Extract boolean values from the PLAIN bit-packed buffer
1637 std::vector<uint32_t> bool_vals(count);
1638 for (size_t i = 0; i < count; ++i) {
1639 size_t byte_idx = i / 8;
1640 size_t bit_idx = i % 8;
1641 bool_vals[i] = (plain_data[byte_idx] >> bit_idx) & 1;
1642 }
1643 // RLE-encode with bit_width=1, with 4-byte length prefix
1644 return RleEncoder::encode_with_length(bool_vals.data(), count, 1);
1645 }
1646 // Fallback: return PLAIN
1647 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1648 }
1649
1650 case Encoding::PLAIN:
1651 default:
1652 // Return a copy of the PLAIN data
1653 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1654 }
1655 }
1656
1657 // -- Dictionary encoding helper -------------------------------------------
1658
1660 struct DictColumnResult {
1661 int64_t total_uncompressed;
1662 int64_t total_compressed;
1663 std::unordered_set<Encoding> used_encodings;
1664 int64_t dict_page_offset;
1665 int64_t data_page_offset;
1666 };
1667
1669 [[nodiscard]] static std::vector<std::string> extract_byte_array_strings(
1670 const ColumnWriter& cw) {
1671 const auto& buf = cw.data();
1672 size_t count = static_cast<size_t>(cw.num_values());
1673 std::vector<std::string> result;
1674 result.reserve(count);
1675 const size_t buf_size = buf.size();
1676 size_t pos = 0;
1677 for (size_t i = 0; i < count; ++i) {
1678 // CWE-125: Out-of-bounds Read — ensure 4-byte length prefix is within buffer
1679 if (pos + 4 > buf_size) break;
1680 uint32_t len = 0;
1681 std::memcpy(&len, buf.data() + pos, 4);
1682 // CWE-190: Integer Overflow — subtraction avoids unsigned wrap on crafted len
1683 if (len > buf_size - pos - 4) break;
1684 pos += 4;
1685 result.emplace_back(
1686 reinterpret_cast<const char*>(buf.data() + pos), len);
1687 pos += len;
1688 }
1689 return result;
1690 }
1691
1693 [[nodiscard]] static std::vector<int32_t> extract_int32_values(
1694 const ColumnWriter& cw) {
1695 size_t count = static_cast<size_t>(cw.num_values());
1696 std::vector<int32_t> result(count);
1697 for (size_t i = 0; i < count; ++i) {
1698 std::memcpy(&result[i], cw.data().data() + i * 4, 4);
1699 }
1700 return result;
1701 }
1702
1704 [[nodiscard]] static std::vector<int64_t> extract_int64_values(
1705 const ColumnWriter& cw) {
1706 size_t count = static_cast<size_t>(cw.num_values());
1707 std::vector<int64_t> result(count);
1708 for (size_t i = 0; i < count; ++i) {
1709 std::memcpy(&result[i], cw.data().data() + i * 8, 8);
1710 }
1711 return result;
1712 }
1713
1715 [[nodiscard]] static std::vector<float> extract_float_values(
1716 const ColumnWriter& cw) {
1717 size_t count = static_cast<size_t>(cw.num_values());
1718 std::vector<float> result(count);
1719 for (size_t i = 0; i < count; ++i) {
1720 std::memcpy(&result[i], cw.data().data() + i * 4, 4);
1721 }
1722 return result;
1723 }
1724
1726 [[nodiscard]] static std::vector<double> extract_double_values(
1727 const ColumnWriter& cw) {
1728 size_t count = static_cast<size_t>(cw.num_values());
1729 std::vector<double> result(count);
1730 for (size_t i = 0; i < count; ++i) {
1731 std::memcpy(&result[i], cw.data().data() + i * 8, 8);
1732 }
1733 return result;
1734 }
1735
1738 [[nodiscard]] expected<DictColumnResult> write_dictionary_column(
1739 size_t /*col_index*/,
1740 const ColumnDescriptor& col_desc,
1741 const ColumnWriter& cw,
1742 Compression col_codec) {
1743
1744 DictColumnResult info;
1745 info.total_uncompressed = 0;
1746 info.total_compressed = 0;
1747 info.used_encodings.insert(Encoding::PLAIN); // dictionary page
1748 info.used_encodings.insert(Encoding::RLE_DICTIONARY); // data page
1749
1750 // Build the dictionary based on physical type
1751 std::vector<uint8_t> dict_page_data;
1752 std::vector<uint8_t> indices_page_data;
1753 int32_t num_dict_entries = 0;
1754
1755 switch (col_desc.physical_type) {
1757 auto strings = extract_byte_array_strings(cw);
1758 DictionaryEncoder<std::string> enc;
1759 for (const auto& s : strings) enc.put(s);
1760 enc.flush();
1761 dict_page_data = enc.dictionary_page();
1762 indices_page_data = enc.indices_page();
1763 num_dict_entries = static_cast<int32_t>(enc.dictionary_size());
1764 break;
1765 }
1766 case PhysicalType::INT32: {
1767 auto vals = extract_int32_values(cw);
1768 DictionaryEncoder<int32_t> enc;
1769 for (auto v : vals) enc.put(v);
1770 enc.flush();
1771 dict_page_data = enc.dictionary_page();
1772 indices_page_data = enc.indices_page();
1773 num_dict_entries = static_cast<int32_t>(enc.dictionary_size());
1774 break;
1775 }
1776 case PhysicalType::INT64: {
1777 auto vals = extract_int64_values(cw);
1778 DictionaryEncoder<int64_t> enc;
1779 for (auto v : vals) enc.put(v);
1780 enc.flush();
1781 dict_page_data = enc.dictionary_page();
1782 indices_page_data = enc.indices_page();
1783 num_dict_entries = static_cast<int32_t>(enc.dictionary_size());
1784 break;
1785 }
1786 case PhysicalType::FLOAT: {
1787 auto vals = extract_float_values(cw);
1788 DictionaryEncoder<float> enc;
1789 for (auto v : vals) enc.put(v);
1790 enc.flush();
1791 dict_page_data = enc.dictionary_page();
1792 indices_page_data = enc.indices_page();
1793 num_dict_entries = static_cast<int32_t>(enc.dictionary_size());
1794 break;
1795 }
1796 case PhysicalType::DOUBLE: {
1797 auto vals = extract_double_values(cw);
1798 DictionaryEncoder<double> enc;
1799 for (auto v : vals) enc.put(v);
1800 enc.flush();
1801 dict_page_data = enc.dictionary_page();
1802 indices_page_data = enc.indices_page();
1803 num_dict_entries = static_cast<int32_t>(enc.dictionary_size());
1804 break;
1805 }
1806 default:
1807 // Unsupported type for dictionary encoding — fall back to PLAIN
1808 // This shouldn't normally happen.
1810 "Dictionary encoding not supported for this type"};
1811 }
1812
1813 // ---- Write dictionary page -----------------------------------------
1814 int32_t dict_uncompressed_size = static_cast<int32_t>(dict_page_data.size());
1815 const uint8_t* dict_write_data = dict_page_data.data();
1816 size_t dict_write_size = dict_page_data.size();
1817 std::vector<uint8_t> dict_compressed_buf;
1818 int32_t dict_compressed_size = dict_uncompressed_size;
1819
1820 if (col_codec != Compression::UNCOMPRESSED) {
1821 auto comp = compress(col_codec, dict_page_data.data(), dict_page_data.size());
1822 if (!comp) return comp.error();
1823 dict_compressed_buf = std::move(*comp);
1824 dict_compressed_size = static_cast<int32_t>(dict_compressed_buf.size());
1825 dict_write_data = dict_compressed_buf.data();
1826 dict_write_size = dict_compressed_buf.size();
1827 }
1828
1829 // Encrypt dictionary page if PME is configured for this column
1830#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1831 std::vector<uint8_t> dict_encrypted_buf;
1832 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
1833 auto enc_result = encryptor_->encrypt_dict_page(
1834 dict_write_data, dict_write_size, col_desc.name,
1835 static_cast<int32_t>(row_groups_.size()));
1836 if (!enc_result) return enc_result.error();
1837 dict_encrypted_buf = std::move(*enc_result);
1838 dict_compressed_size = static_cast<int32_t>(dict_encrypted_buf.size());
1839 dict_write_data = dict_encrypted_buf.data();
1840 dict_write_size = dict_encrypted_buf.size();
1841 }
1842#endif
1843
1844 thrift::PageHeader dict_ph;
1845 dict_ph.type = PageType::DICTIONARY_PAGE;
1846 dict_ph.uncompressed_page_size = dict_uncompressed_size;
1847 dict_ph.compressed_page_size = dict_compressed_size;
1848 dict_ph.crc = static_cast<int32_t>(detail::writer::page_crc32(
1849 dict_write_data, dict_write_size));
1850
1851 thrift::DictionaryPageHeader dph;
1852 dph.num_values = num_dict_entries;
1853 dph.encoding = Encoding::PLAIN_DICTIONARY;
1854 dict_ph.dictionary_page_header = dph;
1855
1856 thrift::CompactEncoder dict_header_enc;
1857 dict_ph.serialize(dict_header_enc);
1858 const auto& dict_header_bytes = dict_header_enc.data();
1859
1860 info.dict_page_offset = file_offset_;
1861
1862 write_raw(dict_header_bytes.data(), dict_header_bytes.size());
1863 write_raw(dict_write_data, dict_write_size);
1864
1865 info.total_uncompressed += static_cast<int64_t>(dict_header_bytes.size()) +
1866 static_cast<int64_t>(dict_uncompressed_size);
1867 info.total_compressed += static_cast<int64_t>(dict_header_bytes.size()) +
1868 static_cast<int64_t>(dict_compressed_size);
1869
1870 // ---- Write data page (RLE_DICTIONARY indices) ----------------------
1871 int32_t idx_uncompressed_size = static_cast<int32_t>(indices_page_data.size());
1872 const uint8_t* idx_write_data = indices_page_data.data();
1873 size_t idx_write_size = indices_page_data.size();
1874 std::vector<uint8_t> idx_compressed_buf;
1875 int32_t idx_compressed_size = idx_uncompressed_size;
1876
1877 if (col_codec != Compression::UNCOMPRESSED) {
1878 auto comp = compress(col_codec, indices_page_data.data(), indices_page_data.size());
1879 if (!comp) return comp.error();
1880 idx_compressed_buf = std::move(*comp);
1881 idx_compressed_size = static_cast<int32_t>(idx_compressed_buf.size());
1882 idx_write_data = idx_compressed_buf.data();
1883 idx_write_size = idx_compressed_buf.size();
1884 }
1885
1886 // Encrypt indices page if PME is configured for this column
1887#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1888 std::vector<uint8_t> idx_encrypted_buf;
1889 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
1890 auto enc_result = encryptor_->encrypt_column_page(
1891 idx_write_data, idx_write_size, col_desc.name,
1892 static_cast<int32_t>(row_groups_.size()), 0);
1893 if (!enc_result) return enc_result.error();
1894 idx_encrypted_buf = std::move(*enc_result);
1895 idx_compressed_size = static_cast<int32_t>(idx_encrypted_buf.size());
1896 idx_write_data = idx_encrypted_buf.data();
1897 idx_write_size = idx_encrypted_buf.size();
1898 }
1899#endif
1900
1901 thrift::PageHeader idx_ph;
1902 idx_ph.type = PageType::DATA_PAGE;
1903 idx_ph.uncompressed_page_size = idx_uncompressed_size;
1904 idx_ph.compressed_page_size = idx_compressed_size;
1905 idx_ph.crc = static_cast<int32_t>(detail::writer::page_crc32(
1906 idx_write_data, idx_write_size));
1907
1908 thrift::DataPageHeader idx_dph;
1909 idx_dph.num_values = static_cast<int32_t>(cw.num_values());
1910 idx_dph.encoding = Encoding::RLE_DICTIONARY;
1911 idx_dph.definition_level_encoding = Encoding::RLE;
1912 idx_dph.repetition_level_encoding = Encoding::RLE;
1913 idx_ph.data_page_header = idx_dph;
1914
1915 thrift::CompactEncoder idx_header_enc;
1916 idx_ph.serialize(idx_header_enc);
1917 const auto& raw_idx_header_bytes = idx_header_enc.data();
1918
1919 const uint8_t* idx_header_write_data = raw_idx_header_bytes.data();
1920 size_t idx_header_write_size = raw_idx_header_bytes.size();
1921#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1922 std::vector<uint8_t> idx_header_record_buf;
1923 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
1924 auto enc_header = encryptor_->encrypt_data_page_header(
1925 raw_idx_header_bytes.data(), raw_idx_header_bytes.size(),
1926 col_desc.name, static_cast<int32_t>(row_groups_.size()), 0);
1927 if (!enc_header) return enc_header.error();
1928 idx_header_record_buf = detail::writer::wrap_encrypted_page_header(*enc_header);
1929 idx_header_write_data = idx_header_record_buf.data();
1930 idx_header_write_size = idx_header_record_buf.size();
1931 }
1932#endif
1933
1934 info.data_page_offset = file_offset_;
1935
1936 write_raw(idx_header_write_data, idx_header_write_size);
1937 write_raw(idx_write_data, idx_write_size);
1938
1939 info.total_uncompressed += static_cast<int64_t>(idx_header_write_size) +
1940 static_cast<int64_t>(idx_uncompressed_size);
1941 info.total_compressed += static_cast<int64_t>(idx_header_write_size) +
1942 static_cast<int64_t>(idx_compressed_size);
1943
1944 return info;
1945 }
1946
1947 // -- Bloom filter helpers -------------------------------------------------
1948
1952 bool bloom_ensure_filter(size_t col_index) {
1953 if (bloom_filters_.empty()) return false;
1954 if (bloom_filters_[col_index]) return true; // Already initialized
1955
1956 // Check if this column should have a bloom filter
1957 const auto& col_name = schema_.column(col_index).name;
1958 if (!options_.bloom_filter_columns.empty() &&
1959 options_.bloom_filter_columns.find(col_name) ==
1960 options_.bloom_filter_columns.end()) {
1961 return false;
1962 }
1963
1964 // Initialize with row_group_size as the expected NDV estimate
1965 size_t ndv_estimate = static_cast<size_t>(
1966 (std::max)(int64_t{1}, options_.row_group_size));
1967 bloom_filters_[col_index] = std::make_unique<SplitBlockBloomFilter>(
1968 ndv_estimate, options_.bloom_filter_fpr);
1969 return true;
1970 }
1971
1973 template <typename T>
1974 void bloom_insert_typed(size_t col_index, const T* values, size_t count) {
1975 if (!bloom_ensure_filter(col_index)) return;
1976 auto& bf = bloom_filters_[col_index];
1977 for (size_t i = 0; i < count; ++i) {
1978 bf->insert_value(values[i]);
1979 }
1980 }
1981
1982 // -- CSV parsing helper ---------------------------------------------------
1983
1986 [[nodiscard]] static std::vector<std::string> split_csv_line(const std::string& line) {
1987 std::vector<std::string> fields;
1988 std::string field;
1989 bool in_quotes = false;
1990 size_t i = 0;
1991
1992 while (i < line.size()) {
1993 char ch = line[i];
1994
1995 if (in_quotes) {
1996 if (ch == '"') {
1997 // Check for escaped quote ("")
1998 if (i + 1 < line.size() && line[i + 1] == '"') {
1999 field += '"';
2000 i += 2;
2001 } else {
2002 // End of quoted field
2003 in_quotes = false;
2004 ++i;
2005 }
2006 } else {
2007 field += ch;
2008 ++i;
2009 }
2010 } else {
2011 if (ch == '"') {
2012 in_quotes = true;
2013 ++i;
2014 } else if (ch == ',') {
2015 // Trim whitespace from unquoted fields
2016 trim_string(field);
2017 fields.push_back(std::move(field));
2018 field.clear();
2019 ++i;
2020 } else if (ch == '\r') {
2021 // Skip carriage return
2022 ++i;
2023 } else {
2024 field += ch;
2025 ++i;
2026 }
2027 }
2028 }
2029
2030 // Add the last field
2031 trim_string(field);
2032 fields.push_back(std::move(field));
2033
2034 return fields;
2035 }
2036
2038 static void trim_string(std::string& s) {
2039 size_t start = s.find_first_not_of(" \t\r\n");
2040 if (start == std::string::npos) {
2041 s.clear();
2042 return;
2043 }
2044 size_t end = s.find_last_not_of(" \t\r\n");
2045 s = s.substr(start, end - start + 1);
2046 }
2047};
2048
2049} // namespace signet::forge
BYTE_STREAM_SPLIT encoding and decoding (Parquet encoding type 9).
Streaming Parquet file writer with row-based and column-based APIs.
Definition writer.hpp:280
expected< void > write_row(const std::vector< std::string > &values)
Write a single row as a vector of string values.
Definition writer.hpp:368
static expected< void > csv_to_parquet(const std::filesystem::path &csv_input, const std::filesystem::path &parquet_output, const Options &options=Options{})
Convert a CSV file to a Parquet file.
Definition writer.hpp:1177
ParquetWriter(const ParquetWriter &)=delete
Deleted copy constructor. ParquetWriter is move-only.
expected< WriteStats > close()
Close the file and finalize the Parquet footer.
Definition writer.hpp:881
ParquetWriter & operator=(const ParquetWriter &)=delete
Deleted copy-assignment operator. ParquetWriter is move-only.
size_t num_columns() const noexcept
Returns the number of columns in the writer's schema.
Definition writer.hpp:393
int64_t rows_written() const
Returns the total number of rows written so far.
Definition writer.hpp:1138
WriterOptions Options
Alias for WriterOptions, usable as ParquetWriter::Options.
Definition writer.hpp:283
ParquetWriter(ParquetWriter &&other) noexcept
Move constructor.
Definition writer.hpp:1074
bool is_open() const
Returns whether the writer is open and accepting data.
Definition writer.hpp:1151
expected< void > write_column(size_t col_index, const std::string *values, size_t count)
Write a batch of string values to a BYTE_ARRAY column.
Definition writer.hpp:467
expected< void > flush_row_group()
Flush the current row group to disk.
Definition writer.hpp:532
ParquetWriter & operator=(ParquetWriter &&other) noexcept
Move-assignment operator.
Definition writer.hpp:1099
static expected< ParquetWriter > open(const std::filesystem::path &path, const Schema &schema, const Options &options=Options{})
Open a new Parquet file for writing.
Definition writer.hpp:303
int64_t row_groups_written() const
Returns the number of row groups that have been flushed to disk.
Definition writer.hpp:1145
expected< void > write_column(size_t col_index, const T *values, size_t count)
Write a batch of typed values to a single column.
Definition writer.hpp:419
static std::vector< uint8_t > encode_with_length(const uint32_t *values, size_t count, int bit_width)
Encode with a 4-byte little-endian length prefix.
Definition rle.hpp:371
Immutable schema description for a Parquet file.
Definition schema.hpp:192
size_t num_columns() const
Number of columns in this schema.
Definition schema.hpp:238
const std::string & name() const
Root schema name (e.g. "tick_data").
Definition schema.hpp:235
const ColumnDescriptor & column(size_t index) const
Access a column descriptor by index.
Definition schema.hpp:244
A lightweight result type that holds either a success value of type T or an Error.
Definition error.hpp:143
Thrift Compact Protocol writer.
Definition compact.hpp:72
const std::vector< uint8_t > & data() const
Returns a const reference to the underlying byte buffer.
Definition compact.hpp:200
Compression codec interface and registry for Signet Forge.
ColumnIndex, OffsetIndex, and ColumnIndexBuilder for predicate pushdown.
PLAIN encoding writer for all Parquet physical types.
Thrift Compact Protocol encoder and decoder for Parquet metadata serialization.
DELTA_BINARY_PACKED encoding and decoding (Parquet encoding type 5).
Dictionary encoding and decoding for Parquet (PLAIN_DICTIONARY / RLE_DICTIONARY).
std::vector< uint8_t > encode_float(const float *values, size_t count)
Encode float values using the BYTE_STREAM_SPLIT algorithm.
std::vector< uint8_t > encode_double(const double *values, size_t count)
Encode double values using the BYTE_STREAM_SPLIT algorithm.
std::vector< uint8_t > encode_int32(const int32_t *values, size_t count)
Encode int32 values using the DELTA_BINARY_PACKED algorithm.
Definition delta.hpp:408
std::vector< uint8_t > encode_int64(const int64_t *values, size_t count)
Encode int64 values using the DELTA_BINARY_PACKED algorithm.
Definition delta.hpp:298
constexpr uint8_t kEncryptedPageHeaderMagic[4]
Definition writer.hpp:160
uint32_t page_crc32(const uint8_t *data, size_t length) noexcept
CRC-32 (polynomial 0xEDB88320) for page integrity (Parquet PageHeader.crc).
Definition writer.hpp:142
std::vector< uint8_t > wrap_encrypted_page_header(const std::vector< uint8_t > &encrypted_header)
Definition writer.hpp:162
double parse_double(std::string_view sv) noexcept
Definition writer.hpp:84
float parse_float(std::string_view sv) noexcept
Definition writer.hpp:102
bool try_parse_double(std::string_view sv, double &out) noexcept
Try parsing a string_view as double; returns true on full parse success.
Definition writer.hpp:119
Compression auto_select_compression(const uint8_t *sample_data, size_t sample_size)
Automatically select the best available compression codec.
Definition codec.hpp:270
constexpr const char * SIGNET_CREATED_BY
Default "created_by" string embedded in every Parquet footer.
Definition types.hpp:203
PhysicalType
Parquet physical (storage) types as defined in parquet.thrift.
Definition types.hpp:20
@ FIXED_LEN_BYTE_ARRAY
Fixed-length byte array (UUID, vectors, decimals).
@ INT64
64-bit signed integer (little-endian).
@ INT32
32-bit signed integer (little-endian).
@ BOOLEAN
1-bit boolean, bit-packed in pages.
@ BYTE_ARRAY
Variable-length byte sequence (strings, binary).
@ FLOAT
IEEE 754 single-precision float.
@ DOUBLE
IEEE 754 double-precision float.
constexpr int32_t PARQUET_VERSION
Parquet format version written to the file footer.
Definition types.hpp:201
constexpr uint32_t PARQUET_MAGIC_ENCRYPTED
"PARE" magic bytes (little-endian uint32) — marks a Parquet file with an encrypted footer.
Definition types.hpp:207
Compression
Parquet compression codecs.
Definition types.hpp:115
@ UNCOMPRESSED
No compression.
@ TIMESTAMP_MILLIS
Timestamp in milliseconds.
@ DECIMAL
Fixed-point decimal.
@ TIMESTAMP_MICROS
Timestamp in microseconds.
@ DATE
Date (days since epoch).
@ UTF8
UTF-8 encoded string.
void register_snappy_codec()
Register the bundled Snappy codec with the global CodecRegistry.
Definition snappy.hpp:608
expected< std::vector< uint8_t > > compress(Compression codec, const uint8_t *data, size_t size)
Compress data using the specified codec via the global CodecRegistry.
Definition codec.hpp:183
@ JSON
JSON document (stored as BYTE_ARRAY).
@ DECIMAL
Fixed-point decimal (INT32/INT64/FIXED_LEN_BYTE_ARRAY).
@ DATE
Calendar date — INT32, days since 1970-01-01.
@ STRING
UTF-8 string (stored as BYTE_ARRAY).
@ ENUM
Enum string (stored as BYTE_ARRAY).
@ NONE
No logical annotation — raw physical type.
@ TIMESTAMP_MS
Timestamp — INT64, milliseconds since Unix epoch.
@ TIMESTAMP_US
Timestamp — INT64, microseconds since Unix epoch.
constexpr uint32_t PARQUET_MAGIC
"PAR1" magic bytes (little-endian uint32) — marks a standard Parquet file.
Definition types.hpp:205
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ OUT_OF_RANGE
An index, offset, or size value is outside the valid range.
@ SCHEMA_MISMATCH
The requested column name or type does not match the file schema.
@ INVALID_FILE
The file is not a valid Parquet file (e.g. missing or wrong magic bytes).
@ UNSUPPORTED_ENCODING
The file uses an encoding not supported by this build (e.g. BYTE_STREAM_SPLIT on integers).
@ INTERNAL_ERROR
An unexpected internal error that does not fit any other category.
@ INVALID_ARGUMENT
A caller-supplied argument is outside the valid range or violates a precondition.
Encoding
Parquet page encoding types.
Definition types.hpp:98
@ DELTA_BINARY_PACKED
Delta encoding for INT32/INT64 (compact for sorted/sequential data).
@ RLE
Run-length / bit-packed hybrid (used for booleans and def/rep levels).
@ RLE_DICTIONARY
Modern dictionary encoding (Parquet 2.0) — dict page + RLE indices.
@ PLAIN_DICTIONARY
Legacy dictionary encoding (Parquet 1.0).
@ PLAIN
Values stored back-to-back in their native binary layout.
@ BYTE_STREAM_SPLIT
Byte-stream split for FLOAT/DOUBLE (transposes byte lanes for better compression).
@ DICTIONARY_PAGE
Dictionary page — contains the value dictionary for RLE_DICTIONARY columns.
@ DATA_PAGE
Data page (Parquet 1.0 format).
Parquet Modular Encryption (PME) orchestrator – encrypts and decrypts Parquet file components (footer...
RLE/Bit-Packing Hybrid encoding and decoding (Parquet spec).
Schema definition types: Column<T>, SchemaBuilder, and Schema.
Bundled, zero-dependency, header-only Snappy compression codec.
Split Block Bloom Filter as specified by the Apache Parquet format.
Per-column-chunk statistics tracker and little-endian byte helpers.
std::string name
Column name (unique within a schema).
Definition types.hpp:153
PhysicalType physical_type
On-disk storage type.
Definition types.hpp:154
Lightweight error value carrying an ErrorCode and a human-readable message.
Definition error.hpp:99
File-level write statistics returned by ParquetWriter::close().
Definition types.hpp:227
Configuration options for ParquetWriter.
Definition writer.hpp:188
int64_t row_group_size
Target number of rows per row group.
Definition writer.hpp:192
std::vector< thrift::KeyValue > file_metadata
Custom key-value metadata pairs embedded in the Parquet footer.
Definition writer.hpp:198
std::string created_by
Value written into the Parquet footer's "created_by" field.
Definition writer.hpp:195
Encoding default_encoding
Default encoding applied to every column that does not have a per-column override in column_encodings...
Definition writer.hpp:204
std::unordered_map< std::string, Encoding > column_encodings
Per-column encoding overrides keyed by column name.
Definition writer.hpp:212
bool auto_encoding
When true, the writer automatically selects the best encoding for each column based on its physical t...
Definition writer.hpp:218
bool auto_compression
When true, the writer samples page data and selects the most effective compression codec automaticall...
Definition writer.hpp:222
double bloom_filter_fpr
Target false-positive rate for bloom filters. Default: 1 %.
Definition writer.hpp:237
bool enable_page_index
When true, a ColumnIndex and OffsetIndex are written for each column chunk, enabling predicate pushdo...
Definition writer.hpp:228
Compression compression
Compression codec applied to every data and dictionary page.
Definition writer.hpp:208
std::unordered_set< std::string > bloom_filter_columns
Column names for which bloom filters should be generated.
Definition writer.hpp:242
bool enable_bloom_filter
When true, a Split Block Bloom Filter is written for each column (or for the subset named in bloom_fi...
Definition writer.hpp:234
Parquet column chunk descriptor (parquet.thrift fields 1-13).
Definition types.hpp:1884
std::optional< int64_t > column_index_offset
Column index offset (field 10).
Definition types.hpp:1892
int64_t file_offset
Byte offset in file (field 2).
Definition types.hpp:1886
std::optional< int64_t > bloom_filter_offset
Bloom filter offset (field 8).
Definition types.hpp:1890
std::optional< int64_t > offset_index_offset
Offset index offset (field 12).
Definition types.hpp:1894
std::optional< int32_t > offset_index_length
Offset index byte length (field 13).
Definition types.hpp:1895
std::optional< int32_t > column_index_length
Column index byte length (field 11).
Definition types.hpp:1893
std::optional< ColumnMetaData > meta_data
Inline column metadata (field 3).
Definition types.hpp:1887
std::optional< int32_t > bloom_filter_length
Bloom filter byte length (field 9).
Definition types.hpp:1891
Parquet column metadata (parquet.thrift fields 1-12).
Definition types.hpp:1056
std::optional< Statistics > statistics
Definition types.hpp:1068
std::optional< int64_t > dictionary_page_offset
Definition types.hpp:1067
std::vector< Encoding > encodings
Definition types.hpp:1058
std::vector< std::string > path_in_schema
Definition types.hpp:1059
Parquet data page header V1 (parquet.thrift fields 1-5).
Definition types.hpp:674
int32_t num_values
Number of values (field 1, required).
Definition types.hpp:675
Encoding repetition_level_encoding
Rep level encoding (field 4, required).
Definition types.hpp:678
Encoding definition_level_encoding
Def level encoding (field 3, required).
Definition types.hpp:677
Encoding encoding
Data encoding (field 2, required).
Definition types.hpp:676
Parquet file metadata (parquet.thrift fields 1-7).
Definition types.hpp:2265
std::vector< RowGroup > row_groups
Definition types.hpp:2269
std::optional< std::string > created_by
Definition types.hpp:2271
std::optional< std::vector< KeyValue > > key_value_metadata
Definition types.hpp:2270
void serialize(CompactEncoder &enc) const
Definition types.hpp:2276
std::vector< SchemaElement > schema
Definition types.hpp:2267
Parquet KeyValue metadata entry (parquet.thrift field IDs 1-2).
Definition types.hpp:468
std::optional< std::string > value
Metadata value (field 2, optional).
Definition types.hpp:470
std::string key
Metadata key (field 1, required).
Definition types.hpp:469
Parquet page header (parquet.thrift fields 1-8).
Definition types.hpp:933
std::optional< int32_t > crc
Definition types.hpp:937
void serialize(CompactEncoder &enc) const
Definition types.hpp:945
std::optional< DataPageHeader > data_page_header
Definition types.hpp:938
Parquet row group (parquet.thrift fields 1-4).
Definition types.hpp:2156
std::vector< ColumnChunk > columns
Column chunks (field 1).
Definition types.hpp:2157
int64_t total_byte_size
Total byte size (field 2).
Definition types.hpp:2158
int64_t num_rows
Number of rows (field 3).
Definition types.hpp:2159
Parquet schema element (parquet.thrift fields 1-10).
Definition types.hpp:530
std::optional< int32_t > type_length
Type length for FIXED_LEN_BYTE_ARRAY (field 2).
Definition types.hpp:532
std::optional< ConvertedType > converted_type
Legacy converted type (field 6).
Definition types.hpp:536
std::string name
Column or group name (field 4, required).
Definition types.hpp:534
std::optional< Repetition > repetition_type
REQUIRED/OPTIONAL/REPEATED (field 3).
Definition types.hpp:533
std::optional< int32_t > num_children
Number of children for group nodes (field 5).
Definition types.hpp:535
std::optional< int32_t > scale
Decimal scale (field 7).
Definition types.hpp:537
std::optional< int32_t > precision
Decimal precision (field 8).
Definition types.hpp:538
std::optional< PhysicalType > type
Physical type (field 1, absent for group nodes).
Definition types.hpp:531
Parquet column statistics (parquet.thrift fields 1-6).
Definition types.hpp:369
std::optional< std::string > min
Old-style min (field 2, deprecated).
Definition types.hpp:371
std::optional< std::string > max_value
New-style max value (field 5, preferred).
Definition types.hpp:374
std::optional< int64_t > null_count
Number of null values (field 3).
Definition types.hpp:372
std::optional< int64_t > distinct_count
Approximate distinct count (field 4).
Definition types.hpp:373
std::optional< std::string > min_value
New-style min value (field 6, preferred).
Definition types.hpp:375
std::optional< std::string > max
Old-style max (field 1, deprecated).
Definition types.hpp:370
Parquet Thrift struct types – C++ structs matching parquet.thrift, with Compact Protocol serialize/de...
Parquet format enumerations, type traits, and statistics structs.