Signet Forge 0.1.0
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
writer.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
3#pragma once
4
5// ---------------------------------------------------------------------------
6// writer.hpp — Streaming Parquet file writer
7//
8// Header-only. Writes valid Parquet files with configurable encoding and
9// compression. Supports PLAIN, DELTA_BINARY_PACKED, BYTE_STREAM_SPLIT,
10// RLE_DICTIONARY, and RLE encodings, plus SNAPPY/ZSTD/LZ4 compression.
11// Supports row-based (string vector) and column-based (typed batch) APIs.
12// Includes a standalone CSV-to-Parquet converter.
13// ---------------------------------------------------------------------------
14
15#include "signet/types.hpp"
16#include "signet/error.hpp"
17#include "signet/schema.hpp"
18#include "signet/statistics.hpp"
30
31#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
32#include "signet/crypto/pme.hpp"
33#endif
34
35#include <algorithm>
36#include <array>
37#include <charconv>
38#include <cstdlib>
39#include <cstring>
40#include <filesystem>
41#include <fstream>
42#include <memory>
43#include <mutex>
44#include <optional>
45#include <string>
46#include <unordered_map>
47#include <unordered_set>
48#include <vector>
49
50// ---------------------------------------------------------------------------
51// Floating-point from_chars availability detection
52//
53// Apple Clang's libc++ (Xcode ≤ 16) does not implement std::from_chars for
54// float/double. GCC ≥ 11 (libstdc++) and MSVC ≥ 19.24 do.
55//
56// On Apple we fall back to strtod_l / strtof_l with an explicit "C" locale
57// to preserve locale-independence — critical for MiFID II financial data
58// where decimal separators must always be '.' regardless of LC_NUMERIC.
59// ---------------------------------------------------------------------------
60#if (defined(__GNUC__) && !defined(__clang__)) || defined(_MSC_VER)
61# define SIGNET_HAS_FLOAT_FROM_CHARS 1
62#else
63# define SIGNET_HAS_FLOAT_FROM_CHARS 0
64# if defined(__APPLE__)
65# include <xlocale.h> // strtod_l, strtof_l, newlocale on macOS
66# elif defined(__linux__) || defined(__FreeBSD__)
67# include <locale.h> // strtod_l, strtof_l, newlocale on Linux/BSD
68# endif
69#endif
70
71namespace signet::forge {
72
73// ---------------------------------------------------------------------------
74// detail::parse_double / detail::parse_float
75//
76// Locale-independent floating-point parsing for CSV-to-Parquet conversion.
77// Uses std::from_chars where available (GCC, MSVC); falls back to
78// strtod_l/strtof_l with C locale on platforms without float from_chars
79// (Apple Clang). Never uses bare strtod — that is locale-dependent and
80// would silently corrupt financial data in EU locales (CWE-1286).
81// ---------------------------------------------------------------------------
82namespace detail {
83
84inline double parse_double(std::string_view sv) noexcept {
85 double value = 0.0;
86#if SIGNET_HAS_FLOAT_FROM_CHARS
87 std::from_chars(sv.data(), sv.data() + sv.size(), value);
88#elif defined(__APPLE__) || defined(__linux__) || defined(__FreeBSD__)
89 // Thread-safe: newlocale returns a per-call handle; static ensures
90 // one-time creation (singleton, never freed — intentional).
91 static locale_t c_locale = newlocale(LC_ALL_MASK, "C", (locale_t)0);
92 std::string tmp(sv);
93 value = strtod_l(tmp.c_str(), nullptr, c_locale);
94#else
95 // Last resort for exotic platforms — document locale dependency risk
96 std::string tmp(sv);
97 value = std::strtod(tmp.c_str(), nullptr);
98#endif
99 return value;
100}
101
102inline float parse_float(std::string_view sv) noexcept {
103 float value = 0.0f;
104#if SIGNET_HAS_FLOAT_FROM_CHARS
105 std::from_chars(sv.data(), sv.data() + sv.size(), value);
106#elif defined(__APPLE__) || defined(__linux__) || defined(__FreeBSD__)
107 static locale_t c_locale = newlocale(LC_ALL_MASK, "C", (locale_t)0);
108 std::string tmp(sv);
109 value = strtof_l(tmp.c_str(), nullptr, c_locale);
110#else
111 std::string tmp(sv);
112 value = std::strtof(tmp.c_str(), nullptr);
113#endif
114 return value;
115}
116
119inline bool try_parse_double(std::string_view sv, double& out) noexcept {
120#if SIGNET_HAS_FLOAT_FROM_CHARS
121 auto [p, ec] = std::from_chars(sv.data(), sv.data() + sv.size(), out);
122 return ec == std::errc{} && p == sv.data() + sv.size();
123#elif defined(__APPLE__) || defined(__linux__) || defined(__FreeBSD__)
124 static locale_t c_locale = newlocale(LC_ALL_MASK, "C", (locale_t)0);
125 std::string tmp(sv);
126 char* end = nullptr;
127 out = strtod_l(tmp.c_str(), &end, c_locale);
128 return end == tmp.c_str() + tmp.size();
129#else
130 std::string tmp(sv);
131 char* end = nullptr;
132 out = std::strtod(tmp.c_str(), &end);
133 return end == tmp.c_str() + tmp.size();
134#endif
135}
136
137} // namespace detail
138
139namespace detail::writer {
140
142inline uint32_t page_crc32(const uint8_t* data, size_t length) noexcept {
143 static constexpr auto make_table = []() {
144 std::array<uint32_t, 256> t{};
145 for (uint32_t i = 0; i < 256; ++i) {
146 uint32_t c = i;
147 for (int k = 0; k < 8; ++k)
148 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
149 t[i] = c;
150 }
151 return t;
152 };
153 static constexpr auto table = make_table();
154 uint32_t crc = 0xFFFFFFFFu;
155 for (size_t i = 0; i < length; ++i)
156 crc = table[(crc ^ data[i]) & 0xFFu] ^ (crc >> 8);
157 return crc ^ 0xFFFFFFFFu;
158}
159
160inline constexpr uint8_t kEncryptedPageHeaderMagic[4] = {'S', 'P', 'H', '1'};
161
162[[nodiscard]] inline std::vector<uint8_t> wrap_encrypted_page_header(
163 const std::vector<uint8_t>& encrypted_header) {
164 std::vector<uint8_t> out;
165 out.reserve(8 + encrypted_header.size());
166 out.insert(out.end(), std::begin(kEncryptedPageHeaderMagic),
167 std::end(kEncryptedPageHeaderMagic));
168 const uint32_t size = static_cast<uint32_t>(encrypted_header.size());
169 out.push_back(static_cast<uint8_t>(size & 0xFFu));
170 out.push_back(static_cast<uint8_t>((size >> 8) & 0xFFu));
171 out.push_back(static_cast<uint8_t>((size >> 16) & 0xFFu));
172 out.push_back(static_cast<uint8_t>((size >> 24) & 0xFFu));
173 out.insert(out.end(), encrypted_header.begin(), encrypted_header.end());
174 return out;
175}
176
177} // namespace detail::writer
178
192 int64_t row_group_size = 64 * 1024;
193
196
198 std::vector<thrift::KeyValue> file_metadata;
199
200 // -- Encoding & compression options --------------------------------------
201
205
209
212 std::unordered_map<std::string, Encoding> column_encodings;
213
218 bool auto_encoding = false;
219
222 bool auto_compression = false;
223
224 // -- Page Index (optional) ------------------------------------------------
225
228 bool enable_page_index = false;
229
230 // -- Bloom filters (optional) ---------------------------------------------
231
235
237 double bloom_filter_fpr = 0.01;
238
242 std::unordered_set<std::string> bloom_filter_columns;
243
244 // -- Encryption (AGPL-3.0+ project, commercial-enabled build gate) -------
245#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
249 std::optional<crypto::EncryptionConfig> encryption;
250#endif
251};
252
253// ---------------------------------------------------------------------------
254// ParquetWriter — streaming writer that writes one row group at a time
255// ---------------------------------------------------------------------------
256
281public:
284
285 // -- Factory: open a file for writing ------------------------------------
286
303 [[nodiscard]] static expected<ParquetWriter> open(
304 const std::filesystem::path& path,
305 const Schema& schema,
306 const Options& options = Options{}) {
307
308 ParquetWriter writer;
309 writer.schema_ = schema;
310 writer.options_ = options;
311 writer.path_ = path;
312
313 // Create parent directories if needed
314 if (path.has_parent_path()) {
315 std::error_code ec;
316 std::filesystem::create_directories(path.parent_path(), ec);
317 // Ignore error — the open() below will catch it
318 }
319
320 writer.file_.open(path, std::ios::binary | std::ios::trunc);
321 if (!writer.file_.is_open()) {
322 return Error{ErrorCode::IO_ERROR,
323 "Failed to open file for writing: " + path.string()};
324 }
325
326 // Write PAR1 magic (4 bytes)
327 writer.write_raw_le32(PARQUET_MAGIC);
328 writer.file_offset_ = 4;
329
330 // Initialize column writers — one per schema column
331 writer.init_column_writers();
332
333 // Initialize encryption if configured (commercial tier)
334#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
335 if (options.encryption) {
336 writer.encryptor_ = std::make_unique<crypto::FileEncryptor>(*options.encryption);
337 }
338#endif
339
340 // Initialize bloom filters if enabled
341 if (options.enable_bloom_filter) {
342 writer.bloom_filters_.resize(schema.num_columns());
343 }
344
345 // Initialize page index builders if enabled
346 if (options.enable_page_index) {
347 writer.col_index_builders_.resize(schema.num_columns());
348 }
349
350 writer.open_ = true;
351 return writer;
352 }
353
354 // -- Row-based API -------------------------------------------------------
355
368 [[nodiscard]] expected<void> write_row(const std::vector<std::string>& values) {
369 if (!open_) {
370 return Error{ErrorCode::IO_ERROR, "Writer is not open"};
371 }
372 if (values.size() != schema_.num_columns()) {
374 "Row has " + std::to_string(values.size()) +
375 " values, schema has " + std::to_string(schema_.num_columns()) +
376 " columns"};
377 }
378
379 pending_rows_.push_back(values);
380
381 // Auto-flush when we reach the target row group size
382 if (static_cast<int64_t>(pending_rows_.size()) >= options_.row_group_size) {
383 return flush_row_group();
384 }
385
386 return expected<void>{};
387 }
388
389 // -- Schema query --------------------------------------------------------
390
393 [[nodiscard]] size_t num_columns() const noexcept { return schema_.num_columns(); }
394
395 // -- Column-based API (typed batch) --------------------------------------
396
418 template <typename T>
419 [[nodiscard]] expected<void> write_column(size_t col_index,
420 const T* values, size_t count) {
421 if (!open_) {
422 return Error{ErrorCode::IO_ERROR, "Writer is not open"};
423 }
424 if (col_index >= schema_.num_columns()) {
426 "Column index " + std::to_string(col_index) +
427 " out of range (schema has " +
428 std::to_string(schema_.num_columns()) + " columns)"};
429 }
430
431 // Validate that T matches the schema's physical type for this column
432 constexpr PhysicalType expected_pt = parquet_type_of_v<T>;
433 const PhysicalType actual_pt = schema_.column(col_index).physical_type;
434 if (expected_pt != actual_pt) {
436 "Type mismatch for column " + std::to_string(col_index) +
437 " (\"" + schema_.column(col_index).name + "\")" +
438 ": schema physical type is " +
439 std::to_string(static_cast<int>(actual_pt)) +
440 " but write_column<T> maps to " +
441 std::to_string(static_cast<int>(expected_pt))};
442 }
443
444 col_writers_[col_index].write_batch(values, count);
445 col_row_counts_[col_index] = col_writers_[col_index].num_values();
446
447 // Insert values into bloom filter if enabled for this column
448 if (!bloom_filters_.empty()) {
449 bloom_insert_typed(col_index, values, count);
450 }
451
452 return expected<void>{};
453 }
454
467 [[nodiscard]] expected<void> write_column(size_t col_index,
468 const std::string* values,
469 size_t count) {
470 if (!open_) {
471 return Error{ErrorCode::IO_ERROR, "Writer is not open"};
472 }
473 if (col_index >= schema_.num_columns()) {
475 "Column index " + std::to_string(col_index) +
476 " out of range"};
477 }
478
479 // Validate that column is BYTE_ARRAY (string-compatible)
480 const PhysicalType actual_pt = schema_.column(col_index).physical_type;
481 if (actual_pt != PhysicalType::BYTE_ARRAY &&
484 "Type mismatch for column " + std::to_string(col_index) +
485 " (\"" + schema_.column(col_index).name + "\")" +
486 ": schema physical type is " +
487 std::to_string(static_cast<int>(actual_pt)) +
488 " but write_column was called with std::string (BYTE_ARRAY)"};
489 }
490
491 col_writers_[col_index].write_batch(values, count);
492 col_row_counts_[col_index] = col_writers_[col_index].num_values();
493
494 // Insert values into bloom filter if enabled for this column
495 if (!bloom_filters_.empty()) {
496 bloom_insert_typed(col_index, values, count);
497 }
498
499 return expected<void>{};
500 }
501
502 // -- Flush / Close -------------------------------------------------------
503
521 if (!open_) {
522 return Error{ErrorCode::IO_ERROR, "Writer is not open"};
523 }
524
525 // Ensure Snappy codec is registered (idempotent, done once)
526 ensure_snappy_registered();
527
528 // If we have pending string rows, encode them into the column writers
529 if (!pending_rows_.empty()) {
530 auto result = encode_pending_rows();
531 if (!result) return result;
532 }
533
534 // Check that we have data to flush
535 bool has_data = false;
536 for (size_t c = 0; c < col_writers_.size(); ++c) {
537 if (col_writers_[c].num_values() > 0) {
538 has_data = true;
539 break;
540 }
541 }
542 if (!has_data) {
543 return expected<void>{}; // Nothing to flush
544 }
545
546 // Verify all columns have the same number of values
547 int64_t rg_num_rows = col_writers_[0].num_values();
548 for (size_t c = 1; c < col_writers_.size(); ++c) {
549 if (col_writers_[c].num_values() != rg_num_rows) {
551 "Column " + std::to_string(c) + " has " +
552 std::to_string(col_writers_[c].num_values()) +
553 " values, expected " + std::to_string(rg_num_rows)};
554 }
555 }
556
557 // Build the row group metadata
559 rg.num_rows = rg_num_rows;
560 rg.total_byte_size = 0;
561 rg.columns.resize(col_writers_.size());
562
563 // Write each column chunk with encoding and compression
564 for (size_t c = 0; c < col_writers_.size(); ++c) {
565 const auto& cw = col_writers_[c];
566 const auto& col_desc = schema_.column(c);
567
568 // ---- Step 1: Choose encoding for this column -------------------
569 Encoding col_encoding = choose_encoding(c, col_desc, cw);
570
571 // ---- Step 2: Determine compression codec -----------------------
572 Compression col_codec = options_.compression;
573 if (options_.auto_compression) {
574 // Use the PLAIN data as a sample for auto-selection
575 col_codec = auto_select_compression(
576 cw.data().data(), cw.data().size());
577 }
578
579 // ---- Step 3: Track total uncompressed and compressed bytes -----
580 int64_t total_uncompressed = 0;
581 int64_t total_compressed = 0;
582 std::unordered_set<Encoding> used_encodings;
583
584 // Record the column chunk start offset
585 int64_t column_offset = file_offset_;
586 int64_t dict_page_offset = -1; // -1 means no dict page
587 int64_t data_page_offset = -1;
588
589 // ---- Step 4: Handle dictionary encoding specially --------------
590 if (col_encoding == Encoding::PLAIN_DICTIONARY ||
591 col_encoding == Encoding::RLE_DICTIONARY) {
592
593 // Re-encode as dictionary: extract raw values from PLAIN data
594 // and build a dictionary.
595 auto dict_result = write_dictionary_column(
596 c, col_desc, cw, col_codec);
597 if (!dict_result) return dict_result.error();
598
599 const auto& dict_info = *dict_result;
600 total_uncompressed = dict_info.total_uncompressed;
601 total_compressed = dict_info.total_compressed;
602 used_encodings = dict_info.used_encodings;
603 dict_page_offset = dict_info.dict_page_offset;
604 data_page_offset = dict_info.data_page_offset;
605
606 // Feed page index builder (dictionary path)
607 if (!col_index_builders_.empty()) {
608 auto& builder = col_index_builders_[c];
609 builder.start_page();
610 builder.set_first_row_index(0);
611 builder.set_page_location(dict_info.data_page_offset,
612 static_cast<int32_t>(dict_info.total_compressed));
613
614 const auto& cw_stats_pi = cw.statistics();
615 if (cw_stats_pi.has_min_max()) {
616 const auto& min_b = cw_stats_pi.min_bytes();
617 const auto& max_b = cw_stats_pi.max_bytes();
618 builder.set_min(std::string(min_b.begin(), min_b.end()));
619 builder.set_max(std::string(max_b.begin(), max_b.end()));
620 }
621 builder.set_null_page(cw_stats_pi.null_count() == cw.num_values());
622 builder.set_null_count(cw_stats_pi.null_count());
623 }
624
625 } else {
626 // ---- Step 5: Non-dictionary encoding path ------------------
627 // Encode (or reuse PLAIN data) based on selected encoding
628 auto encoded = encode_column_data(cw, col_encoding, col_desc.physical_type);
629
630 if (encoded.size() > static_cast<size_t>(INT32_MAX)) {
632 "encoded page size exceeds int32 limit (2 GiB)"};
633 }
634 int32_t uncompressed_size = static_cast<int32_t>(encoded.size());
635
636 // Compress the page data
637 const uint8_t* page_data = encoded.data();
638 size_t page_data_size = encoded.size();
639 std::vector<uint8_t> compressed_buf;
640 int32_t compressed_size = uncompressed_size;
641
642 if (col_codec != Compression::UNCOMPRESSED) {
643 auto comp_result = compress(col_codec, page_data, page_data_size);
644 if (!comp_result) return comp_result.error();
645 compressed_buf = std::move(*comp_result);
646 if (compressed_buf.size() > static_cast<size_t>(INT32_MAX))
647 return Error{ErrorCode::INVALID_ARGUMENT, "Compressed page exceeds INT32_MAX"};
648 compressed_size = static_cast<int32_t>(compressed_buf.size());
649 page_data = compressed_buf.data();
650 page_data_size = compressed_buf.size();
651 }
652
653 // Encrypt the page data if PME is configured for this column
654#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
655 std::vector<uint8_t> encrypted_buf;
656 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
657 auto enc_result = encryptor_->encrypt_column_page(
658 page_data, page_data_size, col_desc.name,
659 static_cast<int32_t>(row_groups_.size()), 0);
660 if (!enc_result) return enc_result.error();
661 encrypted_buf = std::move(*enc_result);
662 compressed_size = static_cast<int32_t>(encrypted_buf.size());
663 page_data = encrypted_buf.data();
664 page_data_size = encrypted_buf.size();
665 // uncompressed_page_size stays the same (pre-encryption size
666 // for the reader to know how much to decompress after decrypt)
667 }
668#endif
669
670 // Build the DataPage PageHeader
673 ph.uncompressed_page_size = uncompressed_size;
674 ph.compressed_page_size = compressed_size;
675 // CRC-32 over final page data (post-compression, post-encryption)
676 ph.crc = static_cast<int32_t>(detail::writer::page_crc32(
677 page_data, page_data_size));
678
680 dph.num_values = static_cast<int32_t>(cw.num_values());
681 dph.encoding = col_encoding;
684 ph.data_page_header = dph;
685
686 // Serialize and write the page header
687 thrift::CompactEncoder header_enc;
688 ph.serialize(header_enc);
689 const auto& raw_header_bytes = header_enc.data();
690
691 const uint8_t* header_write_data = raw_header_bytes.data();
692 size_t header_write_size = raw_header_bytes.size();
693#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
694 std::vector<uint8_t> header_record_buf;
695 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
696 auto enc_header = encryptor_->encrypt_data_page_header(
697 raw_header_bytes.data(), raw_header_bytes.size(),
698 col_desc.name, static_cast<int32_t>(row_groups_.size()), 0);
699 if (!enc_header) return enc_header.error();
700 header_record_buf = detail::writer::wrap_encrypted_page_header(*enc_header);
701 header_write_data = header_record_buf.data();
702 header_write_size = header_record_buf.size();
703 }
704#endif
705
706 data_page_offset = file_offset_;
707
708 write_raw(header_write_data, header_write_size);
709 write_raw(page_data, page_data_size);
710
711 total_uncompressed = static_cast<int64_t>(header_write_size) +
712 static_cast<int64_t>(uncompressed_size);
713 total_compressed = static_cast<int64_t>(header_write_size) +
714 static_cast<int64_t>(compressed_size);
715
716 used_encodings.insert(col_encoding);
717
718 // Feed page index builder (non-dictionary path)
719 if (!col_index_builders_.empty()) {
720 auto& builder = col_index_builders_[c];
721 builder.start_page();
722 builder.set_first_row_index(0);
723 builder.set_page_location(data_page_offset,
724 compressed_size + static_cast<int32_t>(header_write_size));
725
726 const auto& cw_stats_pi = cw.statistics();
727 if (cw_stats_pi.has_min_max()) {
728 const auto& min_b = cw_stats_pi.min_bytes();
729 const auto& max_b = cw_stats_pi.max_bytes();
730 builder.set_min(std::string(min_b.begin(), min_b.end()));
731 builder.set_max(std::string(max_b.begin(), max_b.end()));
732 }
733 builder.set_null_page(cw_stats_pi.null_count() == cw.num_values());
734 builder.set_null_count(cw_stats_pi.null_count());
735 }
736 }
737
738 // ---- Step 6: Build ColumnChunk metadata ------------------------
739 thrift::ColumnChunk& cc = rg.columns[c];
740 cc.file_offset = column_offset;
741
743 cmd.type = col_desc.physical_type;
744 cmd.path_in_schema = {col_desc.name};
745 cmd.codec = col_codec;
746 cmd.num_values = cw.num_values();
747 cmd.total_uncompressed_size = total_uncompressed;
748 cmd.total_compressed_size = total_compressed;
749 cmd.data_page_offset = data_page_offset;
750
751 // Set encodings list
752 cmd.encodings.assign(used_encodings.begin(), used_encodings.end());
753
754 // Set dictionary page offset if applicable
755 if (dict_page_offset >= 0) {
756 cmd.dictionary_page_offset = dict_page_offset;
757 }
758
759 // Populate statistics from ColumnWriter
760 const auto& cw_stats = cw.statistics();
761 if (cw_stats.has_min_max()) {
762 thrift::Statistics stats;
763 stats.null_count = cw_stats.null_count();
764
765 // Convert min/max bytes to binary strings for Thrift
766 const auto& min_b = cw_stats.min_bytes();
767 const auto& max_b = cw_stats.max_bytes();
768 stats.min_value = std::string(min_b.begin(), min_b.end());
769 stats.max_value = std::string(max_b.begin(), max_b.end());
770 // Also set legacy min/max for backward compatibility
771 stats.min = stats.min_value;
772 stats.max = stats.max_value;
773
774 if (cw_stats.distinct_count().has_value()) {
775 stats.distinct_count = *cw_stats.distinct_count();
776 }
777
778 cmd.statistics = stats;
779 }
780
781 cc.meta_data = cmd;
782
783 // ---- Step 7: Write bloom filter for this column (if enabled) ---
784 if (!bloom_filters_.empty() && bloom_filters_[c]) {
785 int64_t bf_offset = file_offset_;
786 const auto& bf_data = bloom_filters_[c]->data();
787 uint32_t bf_size = static_cast<uint32_t>(bf_data.size());
788
789 // Write: 4-byte LE header (total bloom filter size) + raw bytes
790 write_raw_le32(bf_size);
791 write_raw(bf_data.data(), bf_data.size());
792
793 cc.bloom_filter_offset = bf_offset;
794 cc.bloom_filter_length = static_cast<int32_t>(4 + bf_data.size());
795
796 rg.total_byte_size += static_cast<int64_t>(4 + bf_data.size());
797 }
798
799 // ---- Step 8: Write ColumnIndex + OffsetIndex (if enabled) ----
800 if (!col_index_builders_.empty()) {
801 auto& builder = col_index_builders_[c];
802
803 // Serialize and write ColumnIndex
804 auto col_idx = builder.build_column_index(schema_.column(c).physical_type);
806 col_idx.serialize(ci_enc);
807 int64_t ci_offset = file_offset_;
808 write_raw(ci_enc.data().data(), ci_enc.data().size());
809 cc.column_index_offset = ci_offset;
810 cc.column_index_length = static_cast<int32_t>(ci_enc.data().size());
811
812 // Serialize and write OffsetIndex
813 auto off_idx = builder.build_offset_index();
815 off_idx.serialize(oi_enc);
816 int64_t oi_offset = file_offset_;
817 write_raw(oi_enc.data().data(), oi_enc.data().size());
818 cc.offset_index_offset = oi_offset;
819 cc.offset_index_length = static_cast<int32_t>(oi_enc.data().size());
820
821 rg.total_byte_size += static_cast<int64_t>(ci_enc.data().size() + oi_enc.data().size());
822 }
823
824 rg.total_byte_size += total_compressed;
825 }
826
827 // Record the row group
828 row_groups_.push_back(std::move(rg));
829 total_rows_ += rg_num_rows;
830
831 // Reset column writers for the next row group
832 for (auto& cw : col_writers_) {
833 cw.reset();
834 }
835 for (auto& count : col_row_counts_) {
836 count = 0;
837 }
838
839 // Reset bloom filters for the next row group
840 for (auto& bf : bloom_filters_) {
841 if (bf) bf->reset();
842 }
843
844 // Reset page index builders for the next row group
845 for (auto& builder : col_index_builders_) {
846 builder.reset();
847 }
848
849 return expected<void>{};
850 }
851
869 [[nodiscard]] expected<WriteStats> close() {
870 if (!open_) {
871 return WriteStats{}; // Already closed — return empty stats
872 }
873
874 // Validate footer completeness before close (Parquet spec)
875 if (schema_.num_columns() == 0) {
876 file_.close();
877 open_ = false;
878 return Error{ErrorCode::INTERNAL_ERROR, "ParquetWriter: cannot close with empty schema"};
879 }
880
881 // Flush any remaining data
882 auto flush_result = flush_row_group();
883 if (!flush_result) {
884 file_.close();
885 open_ = false;
886 return flush_result.error();
887 }
888
889 // Build FileMetaData
892 fmd.num_rows = total_rows_;
893 fmd.row_groups = row_groups_;
894 fmd.created_by = options_.created_by;
895
896 // Build schema elements: root + one per column
897 // Root element: group node with num_children = num_columns
899 root.name = schema_.name();
900 root.num_children = static_cast<int32_t>(schema_.num_columns());
901 // Root has no type (it's a group node)
902 fmd.schema.push_back(root);
903
904 // One element per leaf column
905 for (size_t c = 0; c < schema_.num_columns(); ++c) {
906 const auto& col_desc = schema_.column(c);
907
909 elem.type = col_desc.physical_type;
910 elem.name = col_desc.name;
911 elem.repetition_type = col_desc.repetition;
912
913 // Set type_length for FIXED_LEN_BYTE_ARRAY
914 if (col_desc.physical_type == PhysicalType::FIXED_LEN_BYTE_ARRAY &&
915 col_desc.type_length > 0) {
916 elem.type_length = col_desc.type_length;
917 }
918
919 // Set converted type for common logical types
920 if (col_desc.logical_type == LogicalType::STRING) {
922 } else if (col_desc.logical_type == LogicalType::DATE) {
924 } else if (col_desc.logical_type == LogicalType::TIMESTAMP_MS) {
926 } else if (col_desc.logical_type == LogicalType::TIMESTAMP_US) {
928 } else if (col_desc.logical_type == LogicalType::JSON) {
930 } else if (col_desc.logical_type == LogicalType::ENUM) {
932 } else if (col_desc.logical_type == LogicalType::DECIMAL) {
934 if (col_desc.precision > 0) elem.precision = col_desc.precision;
935 if (col_desc.scale >= 0) elem.scale = col_desc.scale;
936 }
937
938 fmd.schema.push_back(elem);
939 }
940
941 // Set custom key-value metadata
942 if (!options_.file_metadata.empty()) {
943 fmd.key_value_metadata = options_.file_metadata;
944 }
945
946 // If encryption is configured, embed FileEncryptionProperties in
947 // file metadata so the reader knows this file uses PME.
948#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
949 if (encryptor_) {
950 auto file_props = encryptor_->file_properties();
951 auto props_bytes = file_props.serialize();
952 std::string props_str(props_bytes.begin(), props_bytes.end());
953
954 thrift::KeyValue enc_kv;
955 enc_kv.key = "signet.encryption.properties";
956 enc_kv.value = std::move(props_str);
957
958 if (!fmd.key_value_metadata.has_value()) {
959 fmd.key_value_metadata = std::vector<thrift::KeyValue>{};
960 }
961 fmd.key_value_metadata->push_back(std::move(enc_kv));
962 }
963#endif
964
965 // Serialize FileMetaData to Thrift compact protocol
967 fmd.serialize(enc);
968 const auto& footer_bytes = enc.data();
969
970 // Record footer start position
971 int64_t footer_start = file_offset_;
972 (void)footer_start; // Not needed, but useful for debugging
973
974 // Footer encryption: if encryptor is set and footer encryption is
975 // enabled, encrypt the serialized FileMetaData and write "PARE" magic.
976#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
977 if (encryptor_ && encryptor_->config().encrypt_footer) {
978 auto enc_footer = encryptor_->encrypt_footer(
979 footer_bytes.data(), footer_bytes.size());
980 if (!enc_footer) {
981 file_.close();
982 open_ = false;
983 return enc_footer.error();
984 }
985 const auto& encrypted_footer = *enc_footer;
986
987 // Write encrypted footer
988 write_raw(encrypted_footer.data(), encrypted_footer.size());
989
990 // Write footer length (encrypted footer size)
991 uint32_t footer_len = static_cast<uint32_t>(encrypted_footer.size());
992 write_raw_le32(footer_len);
993
994 // Write "PARE" magic to indicate encrypted footer
995 write_raw_le32(PARQUET_MAGIC_ENCRYPTED);
996 } else {
997#endif
998 // Write footer in plaintext (standard path)
999 write_raw(footer_bytes.data(), footer_bytes.size());
1000
1001 // Write footer length (4 bytes LE)
1002 uint32_t footer_len = static_cast<uint32_t>(footer_bytes.size());
1003 write_raw_le32(footer_len);
1004
1005 // Write closing PAR1 magic (4 bytes)
1006 write_raw_le32(PARQUET_MAGIC);
1007#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1008 }
1009#endif
1010
1011 file_.flush();
1012 file_.close();
1013 open_ = false;
1014
1015 // Build WriteStats from accumulated row group metadata
1016 return build_write_stats();
1017 }
1018
1025 if (open_) {
1026 // Best-effort close; ignore errors in destructor
1027 (void)close();
1028 }
1029 }
1030
1031 // -- Non-copyable, movable -----------------------------------------------
1032
1037
1042 : schema_(std::move(other.schema_))
1043 , options_(std::move(other.options_))
1044 , path_(std::move(other.path_))
1045 , file_(std::move(other.file_))
1046 , file_offset_(other.file_offset_)
1047 , col_writers_(std::move(other.col_writers_))
1048 , col_row_counts_(std::move(other.col_row_counts_))
1049 , pending_rows_(std::move(other.pending_rows_))
1050 , row_groups_(std::move(other.row_groups_))
1051 , total_rows_(other.total_rows_)
1052 , open_(other.open_)
1053#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1054 , encryptor_(std::move(other.encryptor_))
1055#endif
1056 , bloom_filters_(std::move(other.bloom_filters_))
1057 , col_index_builders_(std::move(other.col_index_builders_))
1058 {
1059 other.open_ = false;
1060 other.file_offset_ = 0;
1061 other.total_rows_ = 0;
1062 }
1063
1067 if (this != &other) {
1068 // Close current file if open
1069 if (open_) {
1070 (void)close();
1071 }
1072
1073 schema_ = std::move(other.schema_);
1074 options_ = std::move(other.options_);
1075 path_ = std::move(other.path_);
1076 file_ = std::move(other.file_);
1077 file_offset_ = other.file_offset_;
1078 col_writers_ = std::move(other.col_writers_);
1079 col_row_counts_ = std::move(other.col_row_counts_);
1080 pending_rows_ = std::move(other.pending_rows_);
1081 row_groups_ = std::move(other.row_groups_);
1082 total_rows_ = other.total_rows_;
1083 open_ = other.open_;
1084#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1085 encryptor_ = std::move(other.encryptor_);
1086#endif
1087 bloom_filters_ = std::move(other.bloom_filters_);
1088 col_index_builders_ = std::move(other.col_index_builders_);
1089
1090 other.open_ = false;
1091 other.file_offset_ = 0;
1092 other.total_rows_ = 0;
1093 }
1094 return *this;
1095 }
1096
1097 // -- Status queries ------------------------------------------------------
1098
1105 [[nodiscard]] int64_t rows_written() const {
1106 return total_rows_ + static_cast<int64_t>(pending_rows_.size());
1107 }
1108
1112 [[nodiscard]] int64_t row_groups_written() const {
1113 return static_cast<int64_t>(row_groups_.size());
1114 }
1115
1118 [[nodiscard]] bool is_open() const { return open_; }
1119
1120 // ========================================================================
1121 // Standalone CSV-to-Parquet converter
1122 // ========================================================================
1123
1144 [[nodiscard]] static expected<void> csv_to_parquet(
1145 const std::filesystem::path& csv_input,
1146 const std::filesystem::path& parquet_output,
1147 const Options& options = Options{}) {
1148
1149 // Open CSV
1150 std::ifstream csv(csv_input);
1151 if (!csv.is_open()) {
1153 "Failed to open CSV file: " + csv_input.string()};
1154 }
1155
1156 // Read header line
1157 std::string header_line;
1158 if (!std::getline(csv, header_line)) {
1159 return Error{ErrorCode::INVALID_FILE, "CSV file is empty"};
1160 }
1161
1162 auto col_names = split_csv_line(header_line);
1163 if (col_names.empty()) {
1164 return Error{ErrorCode::INVALID_FILE, "CSV header has no columns"};
1165 }
1166
1167 size_t num_cols = col_names.size();
1168
1169 // Read all data rows
1170 std::vector<std::vector<std::string>> rows;
1171 std::string line;
1172 while (std::getline(csv, line)) {
1173 if (line.empty()) continue;
1174 auto fields = split_csv_line(line);
1175 // Pad or truncate to match header width
1176 fields.resize(num_cols);
1177 rows.push_back(std::move(fields));
1178 }
1179 csv.close();
1180
1181 if (rows.empty()) {
1182 return Error{ErrorCode::INVALID_FILE, "CSV file has no data rows"};
1183 }
1184
1185 // Auto-detect column types by scanning all values
1186 // For each column, try in order: INT64 → DOUBLE → BOOLEAN → STRING
1187 std::vector<PhysicalType> detected_types(num_cols, PhysicalType::INT64);
1188 std::vector<LogicalType> detected_logical(num_cols, LogicalType::NONE);
1189
1190 for (size_t c = 0; c < num_cols; ++c) {
1191 bool all_int64 = true;
1192 bool all_double = true;
1193 bool all_bool = true;
1194
1195 for (const auto& row : rows) {
1196 const std::string& val = row[c];
1197 if (val.empty()) continue; // Skip empty values for type detection
1198
1199 // Try INT64
1200 if (all_int64) {
1201 int64_t parsed;
1202 auto [ptr, ec] = std::from_chars(val.data(),
1203 val.data() + val.size(),
1204 parsed);
1205 if (ec != std::errc{} || ptr != val.data() + val.size()) {
1206 all_int64 = false;
1207 }
1208 }
1209
1210 // Try DOUBLE (locale-independent on all platforms)
1211 if (all_double) {
1212 double tmp = 0;
1213 if (!detail::try_parse_double(val, tmp)) {
1214 all_double = false;
1215 }
1216 }
1217
1218 // Try BOOLEAN
1219 if (all_bool) {
1220 if (val != "true" && val != "false" &&
1221 val != "TRUE" && val != "FALSE" &&
1222 val != "True" && val != "False" &&
1223 val != "1" && val != "0") {
1224 all_bool = false;
1225 }
1226 }
1227 }
1228
1229 // Priority: INT64 > DOUBLE > BOOLEAN > STRING
1230 if (all_int64) {
1231 detected_types[c] = PhysicalType::INT64;
1232 detected_logical[c] = LogicalType::NONE;
1233 } else if (all_double) {
1234 detected_types[c] = PhysicalType::DOUBLE;
1235 detected_logical[c] = LogicalType::NONE;
1236 } else if (all_bool) {
1237 detected_types[c] = PhysicalType::BOOLEAN;
1238 detected_logical[c] = LogicalType::NONE;
1239 } else {
1240 detected_types[c] = PhysicalType::BYTE_ARRAY;
1241 detected_logical[c] = LogicalType::STRING;
1242 }
1243 }
1244
1245 // Build schema from detected types
1246 std::vector<ColumnDescriptor> col_descs;
1247 col_descs.reserve(num_cols);
1248 for (size_t c = 0; c < num_cols; ++c) {
1249 ColumnDescriptor cd;
1250 cd.name = col_names[c];
1251 cd.physical_type = detected_types[c];
1252 cd.logical_type = detected_logical[c];
1253 col_descs.push_back(std::move(cd));
1254 }
1255
1256 Schema schema("csv_data", std::move(col_descs));
1257
1258 // Open Parquet writer
1259 auto writer_result = ParquetWriter::open(parquet_output, schema, options);
1260 if (!writer_result) {
1261 return writer_result.error();
1262 }
1263 auto& writer = *writer_result;
1264
1265 // Write all rows
1266 for (const auto& row : rows) {
1267 auto result = writer.write_row(row);
1268 if (!result) {
1269 return result;
1270 }
1271 }
1272
1273 // Close
1274 auto close_result = writer.close();
1275 if (!close_result) return close_result.error();
1276 return expected<void>{};
1277 }
1278
1279private:
1281 ParquetWriter() = default;
1282
1283 // -- Internal state -------------------------------------------------------
1284
1285 Schema schema_;
1286 Options options_;
1287 std::filesystem::path path_;
1288 std::ofstream file_;
1289 int64_t file_offset_ = 0;
1290 std::vector<ColumnWriter> col_writers_;
1291 std::vector<int64_t> col_row_counts_;
1292 std::vector<std::vector<std::string>> pending_rows_;
1293 std::vector<thrift::RowGroup> row_groups_;
1294 int64_t total_rows_ = 0;
1295 bool open_ = false;
1296#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1297 std::unique_ptr<crypto::FileEncryptor> encryptor_; // PME encryption (nullptr if none)
1298#endif
1299 std::vector<std::unique_ptr<SplitBlockBloomFilter>> bloom_filters_; // Per-column bloom filters
1300 std::vector<ColumnIndexBuilder> col_index_builders_; // Per-column page index builders
1301
1302 // -- Initialization -------------------------------------------------------
1303
1304 void init_column_writers() {
1305 col_writers_.clear();
1306 col_writers_.reserve(schema_.num_columns());
1307 col_row_counts_.resize(schema_.num_columns(), 0);
1308 for (size_t c = 0; c < schema_.num_columns(); ++c) {
1309 col_writers_.emplace_back(schema_.column(c).physical_type);
1310 }
1311 }
1312
1313 // -- Build WriteStats from accumulated row group metadata -----------------
1314
1315 WriteStats build_write_stats() const {
1316 WriteStats stats;
1317 stats.file_size_bytes = file_offset_;
1318 stats.total_rows = total_rows_;
1319 stats.total_row_groups = static_cast<int64_t>(row_groups_.size());
1320
1321 // Aggregate per-column stats across all row groups
1322 size_t num_cols = schema_.num_columns();
1323 stats.columns.resize(num_cols);
1324
1325 for (size_t c = 0; c < num_cols; ++c) {
1326 auto& col_stats = stats.columns[c];
1327 col_stats.column_name = schema_.column(c).name;
1328 col_stats.physical_type = schema_.column(c).physical_type;
1329 }
1330
1331 for (const auto& rg : row_groups_) {
1332 for (size_t c = 0; c < rg.columns.size() && c < num_cols; ++c) {
1333 if (!rg.columns[c].meta_data.has_value()) continue;
1334 const auto& cmd = *rg.columns[c].meta_data;
1335 auto& col_stats = stats.columns[c];
1336
1337 col_stats.uncompressed_bytes += cmd.total_uncompressed_size;
1338 col_stats.compressed_bytes += cmd.total_compressed_size;
1339 col_stats.num_values += cmd.num_values;
1340 col_stats.compression = cmd.codec;
1341
1342 // Use the first encoding in the list as the primary encoding
1343 if (!cmd.encodings.empty()) {
1344 col_stats.encoding = cmd.encodings[0];
1345 }
1346
1347 // Accumulate null count from statistics if available
1348 if (cmd.statistics.has_value() && cmd.statistics->null_count.has_value()) {
1349 col_stats.null_count += *cmd.statistics->null_count;
1350 }
1351
1352 stats.total_uncompressed_bytes += cmd.total_uncompressed_size;
1353 stats.total_compressed_bytes += cmd.total_compressed_size;
1354 }
1355 }
1356
1357 // Compute derived ratios
1358 if (stats.total_compressed_bytes > 0) {
1359 stats.compression_ratio = static_cast<double>(stats.total_uncompressed_bytes)
1360 / static_cast<double>(stats.total_compressed_bytes);
1361 }
1362 if (stats.total_rows > 0) {
1363 stats.bytes_per_row = static_cast<double>(stats.file_size_bytes)
1364 / static_cast<double>(stats.total_rows);
1365 }
1366
1367 return stats;
1368 }
1369
1370 // -- Raw I/O helpers ------------------------------------------------------
1371
1372 void write_raw(const uint8_t* data, size_t len) {
1373 file_.write(reinterpret_cast<const char*>(data), static_cast<std::streamsize>(len));
1374 if (!file_.good()) {
1375 throw std::runtime_error("ParquetWriter::write_raw: I/O error after writing "
1376 + std::to_string(len) + " bytes at offset "
1377 + std::to_string(file_offset_));
1378 }
1379 file_offset_ += static_cast<int64_t>(len);
1380 }
1381
1382 void write_raw_le32(uint32_t val) {
1383 uint8_t bytes[4];
1384 bytes[0] = static_cast<uint8_t>((val ) & 0xFF);
1385 bytes[1] = static_cast<uint8_t>((val >> 8) & 0xFF);
1386 bytes[2] = static_cast<uint8_t>((val >> 16) & 0xFF);
1387 bytes[3] = static_cast<uint8_t>((val >> 24) & 0xFF);
1388 write_raw(bytes, 4);
1389 }
1390
1391 // -- Encode pending rows into column writers ------------------------------
1392
1393 [[nodiscard]] expected<void> encode_pending_rows() {
1394 for (size_t c = 0; c < schema_.num_columns(); ++c) {
1395 const auto& col_desc = schema_.column(c);
1396 auto& cw = col_writers_[c];
1397
1398 // Lazily initialize bloom filter for this column on first data
1399 bool bf_active = bloom_ensure_filter(c);
1400
1401 for (const auto& row : pending_rows_) {
1402 const std::string& val = row[c];
1403
1404 switch (col_desc.physical_type) {
1405 case PhysicalType::BOOLEAN: {
1406 bool b = (val == "true" || val == "TRUE" || val == "True" ||
1407 val == "1");
1408 cw.write_bool(b);
1409 // Booleans: insert as int32 (0 or 1) to match Parquet convention
1410 if (bf_active) bloom_filters_[c]->insert_value(static_cast<int32_t>(b));
1411 break;
1412 }
1413 case PhysicalType::INT32: {
1414 int32_t parsed = 0;
1415 auto [ptr, ec] = std::from_chars(val.data(),
1416 val.data() + val.size(),
1417 parsed);
1418 if (ec != std::errc{}) {
1419 // Fallback: parse as double and truncate (locale-independent)
1420 parsed = static_cast<int32_t>(detail::parse_double(val));
1421 }
1422 cw.write_int32(parsed);
1423 if (bf_active) bloom_filters_[c]->insert_value(parsed);
1424 break;
1425 }
1426 case PhysicalType::INT64: {
1427 int64_t parsed = 0;
1428 auto [ptr, ec] = std::from_chars(val.data(),
1429 val.data() + val.size(),
1430 parsed);
1431 if (ec != std::errc{}) {
1432 // Fallback: parse as double and truncate (locale-independent)
1433 parsed = static_cast<int64_t>(detail::parse_double(val));
1434 }
1435 cw.write_int64(parsed);
1436 if (bf_active) bloom_filters_[c]->insert_value(parsed);
1437 break;
1438 }
1439 case PhysicalType::FLOAT: {
1440 float f = detail::parse_float(val);
1441 cw.write_float(f);
1442 if (bf_active) bloom_filters_[c]->insert_value(f);
1443 break;
1444 }
1445 case PhysicalType::DOUBLE: {
1446 double d = detail::parse_double(val);
1447 cw.write_double(d);
1448 if (bf_active) bloom_filters_[c]->insert_value(d);
1449 break;
1450 }
1452 cw.write_byte_array(val);
1453 if (bf_active) bloom_filters_[c]->insert_value(val);
1454 break;
1455 }
1457 cw.write_fixed_len_byte_array(
1458 reinterpret_cast<const uint8_t*>(val.data()), val.size());
1459 if (bf_active) bloom_filters_[c]->insert_value(val);
1460 break;
1461 }
1462 default: {
1463 // INT96 — write as raw bytes
1464 cw.write_byte_array(val);
1465 if (bf_active) bloom_filters_[c]->insert_value(val);
1466 break;
1467 }
1468 }
1469 }
1470 }
1471
1472 pending_rows_.clear();
1473 return expected<void>{};
1474 }
1475
1476 // -- Snappy auto-registration ---------------------------------------------
1477
1478 static void ensure_snappy_registered() {
1479 static std::once_flag flag;
1480 std::call_once(flag, [] { register_snappy_codec(); });
1481 }
1482
1483 // -- Encoding selection ---------------------------------------------------
1484
1486 [[nodiscard]] Encoding choose_encoding(
1487 size_t col_index,
1488 const ColumnDescriptor& col_desc,
1489 const ColumnWriter& cw) const {
1490
1491 // 1. Per-column override takes priority
1492 auto it = options_.column_encodings.find(col_desc.name);
1493 if (it != options_.column_encodings.end()) {
1494 return it->second;
1495 }
1496
1497 // 2. Auto-encoding: pick optimal encoding based on type and data
1498 if (options_.auto_encoding) {
1499 switch (col_desc.physical_type) {
1503
1507
1509 // Check distinct ratio via statistics
1510 const auto& stats = cw.statistics();
1511 if (stats.distinct_count().has_value() && cw.num_values() > 0) {
1512 double ratio = static_cast<double>(*stats.distinct_count()) /
1513 static_cast<double>(cw.num_values());
1514 if (ratio < 0.40) {
1516 }
1517 }
1518 // Fallback: estimate from data size vs value count
1519 // PLAIN BYTE_ARRAY has 4-byte length prefix per value, so
1520 // if many values share the same content, dict encoding wins.
1521 // Without stats, stay with PLAIN as a safe default.
1522 return Encoding::PLAIN;
1523 }
1524
1526 return Encoding::RLE;
1527
1528 default:
1529 return Encoding::PLAIN;
1530 }
1531 }
1532
1533 // 3. Use the global default encoding
1534 (void)col_index;
1535 return options_.default_encoding;
1536 }
1537
1538 // -- Column data encoding (non-dictionary) --------------------------------
1539
1543 [[nodiscard]] static std::vector<uint8_t> encode_column_data(
1544 const ColumnWriter& cw,
1545 Encoding encoding,
1546 PhysicalType type) {
1547
1548 const auto& plain_data = cw.data();
1549 int64_t num_vals = cw.num_values();
1550
1551 switch (encoding) {
1552
1554 if (type == PhysicalType::INT32) {
1555 // Extract int32 values from PLAIN buffer (4 bytes LE each)
1556 size_t count = static_cast<size_t>(num_vals);
1557 std::vector<int32_t> values(count);
1558 for (size_t i = 0; i < count; ++i) {
1559 std::memcpy(&values[i], plain_data.data() + i * 4, 4);
1560 }
1561 return delta::encode_int32(values.data(), count);
1562 }
1563 if (type == PhysicalType::INT64) {
1564 size_t count = static_cast<size_t>(num_vals);
1565 std::vector<int64_t> values(count);
1566 for (size_t i = 0; i < count; ++i) {
1567 std::memcpy(&values[i], plain_data.data() + i * 8, 8);
1568 }
1569 return delta::encode_int64(values.data(), count);
1570 }
1571 // Fallback: unsupported type for DELTA_BINARY_PACKED → return PLAIN
1572 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1573 }
1574
1576 if (type == PhysicalType::FLOAT) {
1577 size_t count = static_cast<size_t>(num_vals);
1578 const auto* float_ptr = reinterpret_cast<const float*>(plain_data.data());
1579 return byte_stream_split::encode_float(float_ptr, count);
1580 }
1581 if (type == PhysicalType::DOUBLE) {
1582 size_t count = static_cast<size_t>(num_vals);
1583 const auto* double_ptr = reinterpret_cast<const double*>(plain_data.data());
1584 return byte_stream_split::encode_double(double_ptr, count);
1585 }
1586 // Fallback: unsupported type → return PLAIN
1587 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1588 }
1589
1590 case Encoding::RLE: {
1591 // For BOOLEAN columns: RLE-encode the bit-packed booleans
1592 if (type == PhysicalType::BOOLEAN) {
1593 size_t count = static_cast<size_t>(num_vals);
1594 // Extract boolean values from the PLAIN bit-packed buffer
1595 std::vector<uint32_t> bool_vals(count);
1596 for (size_t i = 0; i < count; ++i) {
1597 size_t byte_idx = i / 8;
1598 size_t bit_idx = i % 8;
1599 bool_vals[i] = (plain_data[byte_idx] >> bit_idx) & 1;
1600 }
1601 // RLE-encode with bit_width=1, with 4-byte length prefix
1602 return RleEncoder::encode_with_length(bool_vals.data(), count, 1);
1603 }
1604 // Fallback: return PLAIN
1605 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1606 }
1607
1608 case Encoding::PLAIN:
1609 default:
1610 // Return a copy of the PLAIN data
1611 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1612 }
1613 }
1614
1615 // -- Dictionary encoding helper -------------------------------------------
1616
1618 struct DictColumnResult {
1619 int64_t total_uncompressed;
1620 int64_t total_compressed;
1621 std::unordered_set<Encoding> used_encodings;
1622 int64_t dict_page_offset;
1623 int64_t data_page_offset;
1624 };
1625
1627 [[nodiscard]] static std::vector<std::string> extract_byte_array_strings(
1628 const ColumnWriter& cw) {
1629 const auto& buf = cw.data();
1630 size_t count = static_cast<size_t>(cw.num_values());
1631 std::vector<std::string> result;
1632 result.reserve(count);
1633 const size_t buf_size = buf.size();
1634 size_t pos = 0;
1635 for (size_t i = 0; i < count; ++i) {
1636 // CWE-125: Out-of-bounds Read — ensure 4-byte length prefix is within buffer
1637 if (pos + 4 > buf_size) break;
1638 uint32_t len = 0;
1639 std::memcpy(&len, buf.data() + pos, 4);
1640 // CWE-190: Integer Overflow — subtraction avoids unsigned wrap on crafted len
1641 if (len > buf_size - pos - 4) break;
1642 pos += 4;
1643 result.emplace_back(
1644 reinterpret_cast<const char*>(buf.data() + pos), len);
1645 pos += len;
1646 }
1647 return result;
1648 }
1649
1651 [[nodiscard]] static std::vector<int32_t> extract_int32_values(
1652 const ColumnWriter& cw) {
1653 size_t count = static_cast<size_t>(cw.num_values());
1654 std::vector<int32_t> result(count);
1655 for (size_t i = 0; i < count; ++i) {
1656 std::memcpy(&result[i], cw.data().data() + i * 4, 4);
1657 }
1658 return result;
1659 }
1660
1662 [[nodiscard]] static std::vector<int64_t> extract_int64_values(
1663 const ColumnWriter& cw) {
1664 size_t count = static_cast<size_t>(cw.num_values());
1665 std::vector<int64_t> result(count);
1666 for (size_t i = 0; i < count; ++i) {
1667 std::memcpy(&result[i], cw.data().data() + i * 8, 8);
1668 }
1669 return result;
1670 }
1671
1673 [[nodiscard]] static std::vector<float> extract_float_values(
1674 const ColumnWriter& cw) {
1675 size_t count = static_cast<size_t>(cw.num_values());
1676 std::vector<float> result(count);
1677 for (size_t i = 0; i < count; ++i) {
1678 std::memcpy(&result[i], cw.data().data() + i * 4, 4);
1679 }
1680 return result;
1681 }
1682
1684 [[nodiscard]] static std::vector<double> extract_double_values(
1685 const ColumnWriter& cw) {
1686 size_t count = static_cast<size_t>(cw.num_values());
1687 std::vector<double> result(count);
1688 for (size_t i = 0; i < count; ++i) {
1689 std::memcpy(&result[i], cw.data().data() + i * 8, 8);
1690 }
1691 return result;
1692 }
1693
1696 [[nodiscard]] expected<DictColumnResult> write_dictionary_column(
1697 size_t /*col_index*/,
1698 const ColumnDescriptor& col_desc,
1699 const ColumnWriter& cw,
1700 Compression col_codec) {
1701
1702 DictColumnResult info;
1703 info.total_uncompressed = 0;
1704 info.total_compressed = 0;
1705 info.used_encodings.insert(Encoding::PLAIN); // dictionary page
1706 info.used_encodings.insert(Encoding::RLE_DICTIONARY); // data page
1707
1708 // Build the dictionary based on physical type
1709 std::vector<uint8_t> dict_page_data;
1710 std::vector<uint8_t> indices_page_data;
1711 int32_t num_dict_entries = 0;
1712
1713 switch (col_desc.physical_type) {
1715 auto strings = extract_byte_array_strings(cw);
1716 DictionaryEncoder<std::string> enc;
1717 for (const auto& s : strings) enc.put(s);
1718 enc.flush();
1719 dict_page_data = enc.dictionary_page();
1720 indices_page_data = enc.indices_page();
1721 num_dict_entries = static_cast<int32_t>(enc.dictionary_size());
1722 break;
1723 }
1724 case PhysicalType::INT32: {
1725 auto vals = extract_int32_values(cw);
1726 DictionaryEncoder<int32_t> enc;
1727 for (auto v : vals) enc.put(v);
1728 enc.flush();
1729 dict_page_data = enc.dictionary_page();
1730 indices_page_data = enc.indices_page();
1731 num_dict_entries = static_cast<int32_t>(enc.dictionary_size());
1732 break;
1733 }
1734 case PhysicalType::INT64: {
1735 auto vals = extract_int64_values(cw);
1736 DictionaryEncoder<int64_t> enc;
1737 for (auto v : vals) enc.put(v);
1738 enc.flush();
1739 dict_page_data = enc.dictionary_page();
1740 indices_page_data = enc.indices_page();
1741 num_dict_entries = static_cast<int32_t>(enc.dictionary_size());
1742 break;
1743 }
1744 case PhysicalType::FLOAT: {
1745 auto vals = extract_float_values(cw);
1746 DictionaryEncoder<float> enc;
1747 for (auto v : vals) enc.put(v);
1748 enc.flush();
1749 dict_page_data = enc.dictionary_page();
1750 indices_page_data = enc.indices_page();
1751 num_dict_entries = static_cast<int32_t>(enc.dictionary_size());
1752 break;
1753 }
1754 case PhysicalType::DOUBLE: {
1755 auto vals = extract_double_values(cw);
1756 DictionaryEncoder<double> enc;
1757 for (auto v : vals) enc.put(v);
1758 enc.flush();
1759 dict_page_data = enc.dictionary_page();
1760 indices_page_data = enc.indices_page();
1761 num_dict_entries = static_cast<int32_t>(enc.dictionary_size());
1762 break;
1763 }
1764 default:
1765 // Unsupported type for dictionary encoding — fall back to PLAIN
1766 // This shouldn't normally happen.
1768 "Dictionary encoding not supported for this type"};
1769 }
1770
1771 // ---- Write dictionary page -----------------------------------------
1772 int32_t dict_uncompressed_size = static_cast<int32_t>(dict_page_data.size());
1773 const uint8_t* dict_write_data = dict_page_data.data();
1774 size_t dict_write_size = dict_page_data.size();
1775 std::vector<uint8_t> dict_compressed_buf;
1776 int32_t dict_compressed_size = dict_uncompressed_size;
1777
1778 if (col_codec != Compression::UNCOMPRESSED) {
1779 auto comp = compress(col_codec, dict_page_data.data(), dict_page_data.size());
1780 if (!comp) return comp.error();
1781 dict_compressed_buf = std::move(*comp);
1782 dict_compressed_size = static_cast<int32_t>(dict_compressed_buf.size());
1783 dict_write_data = dict_compressed_buf.data();
1784 dict_write_size = dict_compressed_buf.size();
1785 }
1786
1787 // Encrypt dictionary page if PME is configured for this column
1788#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1789 std::vector<uint8_t> dict_encrypted_buf;
1790 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
1791 auto enc_result = encryptor_->encrypt_dict_page(
1792 dict_write_data, dict_write_size, col_desc.name,
1793 static_cast<int32_t>(row_groups_.size()));
1794 if (!enc_result) return enc_result.error();
1795 dict_encrypted_buf = std::move(*enc_result);
1796 dict_compressed_size = static_cast<int32_t>(dict_encrypted_buf.size());
1797 dict_write_data = dict_encrypted_buf.data();
1798 dict_write_size = dict_encrypted_buf.size();
1799 }
1800#endif
1801
1802 thrift::PageHeader dict_ph;
1803 dict_ph.type = PageType::DICTIONARY_PAGE;
1804 dict_ph.uncompressed_page_size = dict_uncompressed_size;
1805 dict_ph.compressed_page_size = dict_compressed_size;
1806 dict_ph.crc = static_cast<int32_t>(detail::writer::page_crc32(
1807 dict_write_data, dict_write_size));
1808
1809 thrift::DictionaryPageHeader dph;
1810 dph.num_values = num_dict_entries;
1811 dph.encoding = Encoding::PLAIN_DICTIONARY;
1812 dict_ph.dictionary_page_header = dph;
1813
1814 thrift::CompactEncoder dict_header_enc;
1815 dict_ph.serialize(dict_header_enc);
1816 const auto& dict_header_bytes = dict_header_enc.data();
1817
1818 info.dict_page_offset = file_offset_;
1819
1820 write_raw(dict_header_bytes.data(), dict_header_bytes.size());
1821 write_raw(dict_write_data, dict_write_size);
1822
1823 info.total_uncompressed += static_cast<int64_t>(dict_header_bytes.size()) +
1824 static_cast<int64_t>(dict_uncompressed_size);
1825 info.total_compressed += static_cast<int64_t>(dict_header_bytes.size()) +
1826 static_cast<int64_t>(dict_compressed_size);
1827
1828 // ---- Write data page (RLE_DICTIONARY indices) ----------------------
1829 int32_t idx_uncompressed_size = static_cast<int32_t>(indices_page_data.size());
1830 const uint8_t* idx_write_data = indices_page_data.data();
1831 size_t idx_write_size = indices_page_data.size();
1832 std::vector<uint8_t> idx_compressed_buf;
1833 int32_t idx_compressed_size = idx_uncompressed_size;
1834
1835 if (col_codec != Compression::UNCOMPRESSED) {
1836 auto comp = compress(col_codec, indices_page_data.data(), indices_page_data.size());
1837 if (!comp) return comp.error();
1838 idx_compressed_buf = std::move(*comp);
1839 idx_compressed_size = static_cast<int32_t>(idx_compressed_buf.size());
1840 idx_write_data = idx_compressed_buf.data();
1841 idx_write_size = idx_compressed_buf.size();
1842 }
1843
1844 // Encrypt indices page if PME is configured for this column
1845#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1846 std::vector<uint8_t> idx_encrypted_buf;
1847 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
1848 auto enc_result = encryptor_->encrypt_column_page(
1849 idx_write_data, idx_write_size, col_desc.name,
1850 static_cast<int32_t>(row_groups_.size()), 0);
1851 if (!enc_result) return enc_result.error();
1852 idx_encrypted_buf = std::move(*enc_result);
1853 idx_compressed_size = static_cast<int32_t>(idx_encrypted_buf.size());
1854 idx_write_data = idx_encrypted_buf.data();
1855 idx_write_size = idx_encrypted_buf.size();
1856 }
1857#endif
1858
1859 thrift::PageHeader idx_ph;
1860 idx_ph.type = PageType::DATA_PAGE;
1861 idx_ph.uncompressed_page_size = idx_uncompressed_size;
1862 idx_ph.compressed_page_size = idx_compressed_size;
1863 idx_ph.crc = static_cast<int32_t>(detail::writer::page_crc32(
1864 idx_write_data, idx_write_size));
1865
1866 thrift::DataPageHeader idx_dph;
1867 idx_dph.num_values = static_cast<int32_t>(cw.num_values());
1868 idx_dph.encoding = Encoding::RLE_DICTIONARY;
1869 idx_dph.definition_level_encoding = Encoding::RLE;
1870 idx_dph.repetition_level_encoding = Encoding::RLE;
1871 idx_ph.data_page_header = idx_dph;
1872
1873 thrift::CompactEncoder idx_header_enc;
1874 idx_ph.serialize(idx_header_enc);
1875 const auto& raw_idx_header_bytes = idx_header_enc.data();
1876
1877 const uint8_t* idx_header_write_data = raw_idx_header_bytes.data();
1878 size_t idx_header_write_size = raw_idx_header_bytes.size();
1879#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1880 std::vector<uint8_t> idx_header_record_buf;
1881 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
1882 auto enc_header = encryptor_->encrypt_data_page_header(
1883 raw_idx_header_bytes.data(), raw_idx_header_bytes.size(),
1884 col_desc.name, static_cast<int32_t>(row_groups_.size()), 0);
1885 if (!enc_header) return enc_header.error();
1886 idx_header_record_buf = detail::writer::wrap_encrypted_page_header(*enc_header);
1887 idx_header_write_data = idx_header_record_buf.data();
1888 idx_header_write_size = idx_header_record_buf.size();
1889 }
1890#endif
1891
1892 info.data_page_offset = file_offset_;
1893
1894 write_raw(idx_header_write_data, idx_header_write_size);
1895 write_raw(idx_write_data, idx_write_size);
1896
1897 info.total_uncompressed += static_cast<int64_t>(idx_header_write_size) +
1898 static_cast<int64_t>(idx_uncompressed_size);
1899 info.total_compressed += static_cast<int64_t>(idx_header_write_size) +
1900 static_cast<int64_t>(idx_compressed_size);
1901
1902 return info;
1903 }
1904
1905 // -- Bloom filter helpers -------------------------------------------------
1906
1910 bool bloom_ensure_filter(size_t col_index) {
1911 if (bloom_filters_.empty()) return false;
1912 if (bloom_filters_[col_index]) return true; // Already initialized
1913
1914 // Check if this column should have a bloom filter
1915 const auto& col_name = schema_.column(col_index).name;
1916 if (!options_.bloom_filter_columns.empty() &&
1917 options_.bloom_filter_columns.find(col_name) ==
1918 options_.bloom_filter_columns.end()) {
1919 return false;
1920 }
1921
1922 // Initialize with row_group_size as the expected NDV estimate
1923 size_t ndv_estimate = static_cast<size_t>(
1924 (std::max)(int64_t{1}, options_.row_group_size));
1925 bloom_filters_[col_index] = std::make_unique<SplitBlockBloomFilter>(
1926 ndv_estimate, options_.bloom_filter_fpr);
1927 return true;
1928 }
1929
1931 template <typename T>
1932 void bloom_insert_typed(size_t col_index, const T* values, size_t count) {
1933 if (!bloom_ensure_filter(col_index)) return;
1934 auto& bf = bloom_filters_[col_index];
1935 for (size_t i = 0; i < count; ++i) {
1936 bf->insert_value(values[i]);
1937 }
1938 }
1939
1940 // -- CSV parsing helper ---------------------------------------------------
1941
1944 [[nodiscard]] static std::vector<std::string> split_csv_line(const std::string& line) {
1945 std::vector<std::string> fields;
1946 std::string field;
1947 bool in_quotes = false;
1948 size_t i = 0;
1949
1950 while (i < line.size()) {
1951 char ch = line[i];
1952
1953 if (in_quotes) {
1954 if (ch == '"') {
1955 // Check for escaped quote ("")
1956 if (i + 1 < line.size() && line[i + 1] == '"') {
1957 field += '"';
1958 i += 2;
1959 } else {
1960 // End of quoted field
1961 in_quotes = false;
1962 ++i;
1963 }
1964 } else {
1965 field += ch;
1966 ++i;
1967 }
1968 } else {
1969 if (ch == '"') {
1970 in_quotes = true;
1971 ++i;
1972 } else if (ch == ',') {
1973 // Trim whitespace from unquoted fields
1974 trim_string(field);
1975 fields.push_back(std::move(field));
1976 field.clear();
1977 ++i;
1978 } else if (ch == '\r') {
1979 // Skip carriage return
1980 ++i;
1981 } else {
1982 field += ch;
1983 ++i;
1984 }
1985 }
1986 }
1987
1988 // Add the last field
1989 trim_string(field);
1990 fields.push_back(std::move(field));
1991
1992 return fields;
1993 }
1994
1996 static void trim_string(std::string& s) {
1997 size_t start = s.find_first_not_of(" \t\r\n");
1998 if (start == std::string::npos) {
1999 s.clear();
2000 return;
2001 }
2002 size_t end = s.find_last_not_of(" \t\r\n");
2003 s = s.substr(start, end - start + 1);
2004 }
2005};
2006
2007} // namespace signet::forge
BYTE_STREAM_SPLIT encoding and decoding (Parquet encoding type 9).
Streaming Parquet file writer with row-based and column-based APIs.
Definition writer.hpp:280
expected< void > write_row(const std::vector< std::string > &values)
Write a single row as a vector of string values.
Definition writer.hpp:368
static expected< void > csv_to_parquet(const std::filesystem::path &csv_input, const std::filesystem::path &parquet_output, const Options &options=Options{})
Convert a CSV file to a Parquet file.
Definition writer.hpp:1144
ParquetWriter(const ParquetWriter &)=delete
Deleted copy constructor. ParquetWriter is move-only.
expected< WriteStats > close()
Close the file and finalize the Parquet footer.
Definition writer.hpp:869
ParquetWriter & operator=(const ParquetWriter &)=delete
Deleted copy-assignment operator. ParquetWriter is move-only.
size_t num_columns() const noexcept
Returns the number of columns in the writer's schema.
Definition writer.hpp:393
int64_t rows_written() const
Returns the total number of rows written so far.
Definition writer.hpp:1105
WriterOptions Options
Alias for WriterOptions, usable as ParquetWriter::Options.
Definition writer.hpp:283
ParquetWriter(ParquetWriter &&other) noexcept
Move constructor.
Definition writer.hpp:1041
bool is_open() const
Returns whether the writer is open and accepting data.
Definition writer.hpp:1118
expected< void > write_column(size_t col_index, const std::string *values, size_t count)
Write a batch of string values to a BYTE_ARRAY column.
Definition writer.hpp:467
expected< void > flush_row_group()
Flush the current row group to disk.
Definition writer.hpp:520
ParquetWriter & operator=(ParquetWriter &&other) noexcept
Move-assignment operator.
Definition writer.hpp:1066
static expected< ParquetWriter > open(const std::filesystem::path &path, const Schema &schema, const Options &options=Options{})
Open a new Parquet file for writing.
Definition writer.hpp:303
int64_t row_groups_written() const
Returns the number of row groups that have been flushed to disk.
Definition writer.hpp:1112
expected< void > write_column(size_t col_index, const T *values, size_t count)
Write a batch of typed values to a single column.
Definition writer.hpp:419
static std::vector< uint8_t > encode_with_length(const uint32_t *values, size_t count, int bit_width)
Encode with a 4-byte little-endian length prefix.
Definition rle.hpp:371
Immutable schema description for a Parquet file.
Definition schema.hpp:192
size_t num_columns() const
Number of columns in this schema.
Definition schema.hpp:238
const std::string & name() const
Root schema name (e.g. "tick_data").
Definition schema.hpp:235
const ColumnDescriptor & column(size_t index) const
Access a column descriptor by index.
Definition schema.hpp:244
A lightweight result type that holds either a success value of type T or an Error.
Definition error.hpp:145
Thrift Compact Protocol writer.
Definition compact.hpp:72
const std::vector< uint8_t > & data() const
Returns a const reference to the underlying byte buffer.
Definition compact.hpp:200
Compression codec interface and registry for Signet Forge.
ColumnIndex, OffsetIndex, and ColumnIndexBuilder for predicate pushdown.
PLAIN encoding writer for all Parquet physical types.
Thrift Compact Protocol encoder and decoder for Parquet metadata serialization.
DELTA_BINARY_PACKED encoding and decoding (Parquet encoding type 5).
Dictionary encoding and decoding for Parquet (PLAIN_DICTIONARY / RLE_DICTIONARY).
std::vector< uint8_t > encode_float(const float *values, size_t count)
Encode float values using the BYTE_STREAM_SPLIT algorithm.
std::vector< uint8_t > encode_double(const double *values, size_t count)
Encode double values using the BYTE_STREAM_SPLIT algorithm.
std::vector< uint8_t > encode_int32(const int32_t *values, size_t count)
Encode int32 values using the DELTA_BINARY_PACKED algorithm.
Definition delta.hpp:408
std::vector< uint8_t > encode_int64(const int64_t *values, size_t count)
Encode int64 values using the DELTA_BINARY_PACKED algorithm.
Definition delta.hpp:298
constexpr uint8_t kEncryptedPageHeaderMagic[4]
Definition writer.hpp:160
uint32_t page_crc32(const uint8_t *data, size_t length) noexcept
CRC-32 (polynomial 0xEDB88320) for page integrity (Parquet PageHeader.crc).
Definition writer.hpp:142
std::vector< uint8_t > wrap_encrypted_page_header(const std::vector< uint8_t > &encrypted_header)
Definition writer.hpp:162
double parse_double(std::string_view sv) noexcept
Definition writer.hpp:84
float parse_float(std::string_view sv) noexcept
Definition writer.hpp:102
bool try_parse_double(std::string_view sv, double &out) noexcept
Try parsing a string_view as double; returns true on full parse success.
Definition writer.hpp:119
Compression auto_select_compression(const uint8_t *sample_data, size_t sample_size)
Automatically select the best available compression codec.
Definition codec.hpp:270
constexpr const char * SIGNET_CREATED_BY
Default "created_by" string embedded in every Parquet footer.
Definition types.hpp:203
PhysicalType
Parquet physical (storage) types as defined in parquet.thrift.
Definition types.hpp:20
@ FIXED_LEN_BYTE_ARRAY
Fixed-length byte array (UUID, vectors, decimals).
@ INT64
64-bit signed integer (little-endian).
@ INT32
32-bit signed integer (little-endian).
@ BOOLEAN
1-bit boolean, bit-packed in pages.
@ BYTE_ARRAY
Variable-length byte sequence (strings, binary).
@ FLOAT
IEEE 754 single-precision float.
@ DOUBLE
IEEE 754 double-precision float.
constexpr int32_t PARQUET_VERSION
Parquet format version written to the file footer.
Definition types.hpp:201
constexpr uint32_t PARQUET_MAGIC_ENCRYPTED
"PARE" magic bytes (little-endian uint32) — marks a Parquet file with an encrypted footer.
Definition types.hpp:207
Compression
Parquet compression codecs.
Definition types.hpp:115
@ UNCOMPRESSED
No compression.
@ TIMESTAMP_MILLIS
Timestamp in milliseconds.
@ DECIMAL
Fixed-point decimal.
@ TIMESTAMP_MICROS
Timestamp in microseconds.
@ DATE
Date (days since epoch).
@ UTF8
UTF-8 encoded string.
void register_snappy_codec()
Register the bundled Snappy codec with the global CodecRegistry.
Definition snappy.hpp:608
expected< std::vector< uint8_t > > compress(Compression codec, const uint8_t *data, size_t size)
Compress data using the specified codec via the global CodecRegistry.
Definition codec.hpp:183
@ JSON
JSON document (stored as BYTE_ARRAY).
@ DECIMAL
Fixed-point decimal (INT32/INT64/FIXED_LEN_BYTE_ARRAY).
@ DATE
Calendar date — INT32, days since 1970-01-01.
@ STRING
UTF-8 string (stored as BYTE_ARRAY).
@ ENUM
Enum string (stored as BYTE_ARRAY).
@ NONE
No logical annotation — raw physical type.
@ TIMESTAMP_MS
Timestamp — INT64, milliseconds since Unix epoch.
@ TIMESTAMP_US
Timestamp — INT64, microseconds since Unix epoch.
constexpr uint32_t PARQUET_MAGIC
"PAR1" magic bytes (little-endian uint32) — marks a standard Parquet file.
Definition types.hpp:205
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ OUT_OF_RANGE
An index, offset, or size value is outside the valid range.
@ SCHEMA_MISMATCH
The requested column name or type does not match the file schema.
@ INVALID_FILE
The file is not a valid Parquet file (e.g. missing or wrong magic bytes).
@ UNSUPPORTED_ENCODING
The file uses an encoding not supported by this build (e.g. BYTE_STREAM_SPLIT on integers).
@ INTERNAL_ERROR
An unexpected internal error that does not fit any other category.
@ INVALID_ARGUMENT
A caller-supplied argument is outside the valid range or violates a precondition.
Encoding
Parquet page encoding types.
Definition types.hpp:98
@ DELTA_BINARY_PACKED
Delta encoding for INT32/INT64 (compact for sorted/sequential data).
@ RLE
Run-length / bit-packed hybrid (used for booleans and def/rep levels).
@ RLE_DICTIONARY
Modern dictionary encoding (Parquet 2.0) — dict page + RLE indices.
@ PLAIN_DICTIONARY
Legacy dictionary encoding (Parquet 1.0).
@ PLAIN
Values stored back-to-back in their native binary layout.
@ BYTE_STREAM_SPLIT
Byte-stream split for FLOAT/DOUBLE (transposes byte lanes for better compression).
@ DICTIONARY_PAGE
Dictionary page — contains the value dictionary for RLE_DICTIONARY columns.
@ DATA_PAGE
Data page (Parquet 1.0 format).
Parquet Modular Encryption (PME) orchestrator – encrypts and decrypts Parquet file components (footer...
RLE/Bit-Packing Hybrid encoding and decoding (Parquet spec).
Schema definition types: Column<T>, SchemaBuilder, and Schema.
Bundled, zero-dependency, header-only Snappy compression codec.
Split Block Bloom Filter as specified by the Apache Parquet format.
Per-column-chunk statistics tracker and little-endian byte helpers.
std::string name
Column name (unique within a schema).
Definition types.hpp:153
PhysicalType physical_type
On-disk storage type.
Definition types.hpp:154
Lightweight error value carrying an ErrorCode and a human-readable message.
Definition error.hpp:101
File-level write statistics returned by ParquetWriter::close().
Definition types.hpp:227
Configuration options for ParquetWriter.
Definition writer.hpp:188
int64_t row_group_size
Target number of rows per row group.
Definition writer.hpp:192
std::vector< thrift::KeyValue > file_metadata
Custom key-value metadata pairs embedded in the Parquet footer.
Definition writer.hpp:198
std::string created_by
Value written into the Parquet footer's "created_by" field.
Definition writer.hpp:195
Encoding default_encoding
Default encoding applied to every column that does not have a per-column override in column_encodings...
Definition writer.hpp:204
std::unordered_map< std::string, Encoding > column_encodings
Per-column encoding overrides keyed by column name.
Definition writer.hpp:212
bool auto_encoding
When true, the writer automatically selects the best encoding for each column based on its physical t...
Definition writer.hpp:218
bool auto_compression
When true, the writer samples page data and selects the most effective compression codec automaticall...
Definition writer.hpp:222
double bloom_filter_fpr
Target false-positive rate for bloom filters. Default: 1 %.
Definition writer.hpp:237
bool enable_page_index
When true, a ColumnIndex and OffsetIndex are written for each column chunk, enabling predicate pushdo...
Definition writer.hpp:228
Compression compression
Compression codec applied to every data and dictionary page.
Definition writer.hpp:208
std::unordered_set< std::string > bloom_filter_columns
Column names for which bloom filters should be generated.
Definition writer.hpp:242
bool enable_bloom_filter
When true, a Split Block Bloom Filter is written for each column (or for the subset named in bloom_fi...
Definition writer.hpp:234
Parquet column chunk descriptor (parquet.thrift fields 1-13).
Definition types.hpp:1884
std::optional< int64_t > column_index_offset
Column index offset (field 10).
Definition types.hpp:1892
int64_t file_offset
Byte offset in file (field 2).
Definition types.hpp:1886
std::optional< int64_t > bloom_filter_offset
Bloom filter offset (field 8).
Definition types.hpp:1890
std::optional< int64_t > offset_index_offset
Offset index offset (field 12).
Definition types.hpp:1894
std::optional< int32_t > offset_index_length
Offset index byte length (field 13).
Definition types.hpp:1895
std::optional< int32_t > column_index_length
Column index byte length (field 11).
Definition types.hpp:1893
std::optional< ColumnMetaData > meta_data
Inline column metadata (field 3).
Definition types.hpp:1887
std::optional< int32_t > bloom_filter_length
Bloom filter byte length (field 9).
Definition types.hpp:1891
Parquet column metadata (parquet.thrift fields 1-12).
Definition types.hpp:1056
std::optional< Statistics > statistics
Definition types.hpp:1068
std::optional< int64_t > dictionary_page_offset
Definition types.hpp:1067
std::vector< Encoding > encodings
Definition types.hpp:1058
std::vector< std::string > path_in_schema
Definition types.hpp:1059
Parquet data page header V1 (parquet.thrift fields 1-5).
Definition types.hpp:674
int32_t num_values
Number of values (field 1, required).
Definition types.hpp:675
Encoding repetition_level_encoding
Rep level encoding (field 4, required).
Definition types.hpp:678
Encoding definition_level_encoding
Def level encoding (field 3, required).
Definition types.hpp:677
Encoding encoding
Data encoding (field 2, required).
Definition types.hpp:676
Parquet file metadata (parquet.thrift fields 1-7).
Definition types.hpp:2265
std::vector< RowGroup > row_groups
Definition types.hpp:2269
std::optional< std::string > created_by
Definition types.hpp:2271
std::optional< std::vector< KeyValue > > key_value_metadata
Definition types.hpp:2270
void serialize(CompactEncoder &enc) const
Definition types.hpp:2276
std::vector< SchemaElement > schema
Definition types.hpp:2267
Parquet KeyValue metadata entry (parquet.thrift field IDs 1-2).
Definition types.hpp:468
std::optional< std::string > value
Metadata value (field 2, optional).
Definition types.hpp:470
std::string key
Metadata key (field 1, required).
Definition types.hpp:469
Parquet page header (parquet.thrift fields 1-8).
Definition types.hpp:933
std::optional< int32_t > crc
Definition types.hpp:937
void serialize(CompactEncoder &enc) const
Definition types.hpp:945
std::optional< DataPageHeader > data_page_header
Definition types.hpp:938
Parquet row group (parquet.thrift fields 1-4).
Definition types.hpp:2156
std::vector< ColumnChunk > columns
Column chunks (field 1).
Definition types.hpp:2157
int64_t total_byte_size
Total byte size (field 2).
Definition types.hpp:2158
int64_t num_rows
Number of rows (field 3).
Definition types.hpp:2159
Parquet schema element (parquet.thrift fields 1-10).
Definition types.hpp:530
std::optional< int32_t > type_length
Type length for FIXED_LEN_BYTE_ARRAY (field 2).
Definition types.hpp:532
std::optional< ConvertedType > converted_type
Legacy converted type (field 6).
Definition types.hpp:536
std::string name
Column or group name (field 4, required).
Definition types.hpp:534
std::optional< Repetition > repetition_type
REQUIRED/OPTIONAL/REPEATED (field 3).
Definition types.hpp:533
std::optional< int32_t > num_children
Number of children for group nodes (field 5).
Definition types.hpp:535
std::optional< int32_t > scale
Decimal scale (field 7).
Definition types.hpp:537
std::optional< int32_t > precision
Decimal precision (field 8).
Definition types.hpp:538
std::optional< PhysicalType > type
Physical type (field 1, absent for group nodes).
Definition types.hpp:531
Parquet column statistics (parquet.thrift fields 1-6).
Definition types.hpp:369
std::optional< std::string > min
Old-style min (field 2, deprecated).
Definition types.hpp:371
std::optional< std::string > max_value
New-style max value (field 5, preferred).
Definition types.hpp:374
std::optional< int64_t > null_count
Number of null values (field 3).
Definition types.hpp:372
std::optional< int64_t > distinct_count
Approximate distinct count (field 4).
Definition types.hpp:373
std::optional< std::string > min_value
New-style min value (field 6, preferred).
Definition types.hpp:375
std::optional< std::string > max
Old-style max (field 1, deprecated).
Definition types.hpp:370
Parquet Thrift struct types – C++ structs matching parquet.thrift, with Compact Protocol serialize/de...
Parquet format enumerations, type traits, and statistics structs.