Signet Forge 0.1.0
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
reader.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
3#pragma once
4
5// ---------------------------------------------------------------------------
6// ParquetReader — Parquet file reader
7//
8// Opens a Parquet file, verifies PAR1 magic bytes, deserializes the Thrift
9// footer to extract FileMetaData, builds a Schema, and provides typed access
10// to column data via ColumnReader.
11//
12// Supports:
13// - Typed column reads (read_column<T>)
14// - String conversion reads (read_column_as_strings)
15// - Full row-group and file reads
16// - Column projection by name
17// - Per-column statistics access
18// - Multiple encodings: PLAIN, RLE_DICTIONARY, DELTA_BINARY_PACKED,
19// BYTE_STREAM_SPLIT, RLE (booleans)
20// - Decompression via CodecRegistry (Snappy, ZSTD, LZ4, etc.)
21// ---------------------------------------------------------------------------
22
23#include "signet/types.hpp"
24#include "signet/error.hpp"
25#include "signet/schema.hpp"
27#include "signet/memory.hpp"
41
42#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
43#include "signet/crypto/pme.hpp"
44#endif
45
46#include <array>
47#include <cstdint>
48#include <cstdio>
49#include <cstring>
50#include <filesystem>
51#include <fstream>
52#include <functional>
53#include <mutex>
54#include <stdexcept>
55#include <memory>
56#include <optional>
57#include <string>
58#include <vector>
59
60// Windows macro pollution: undefine after all includes.
61// TIME_MS from <mmsystem.h>, OPTIONAL from <sal.h>
62#ifdef TIME_MS
63#undef TIME_MS
64#endif
65#ifdef OPTIONAL
66#undef OPTIONAL
67#endif
68
69namespace signet::forge {
70
71namespace detail_reader {
72
74inline uint32_t crc32(const void* data, size_t length) noexcept {
75 static constexpr auto make_table = []() {
76 std::array<uint32_t, 256> t{};
77 for (uint32_t i = 0; i < 256; ++i) {
78 uint32_t c = i;
79 for (int k = 0; k < 8; ++k)
80 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
81 t[i] = c;
82 }
83 return t;
84 };
85 static constexpr auto table = make_table();
86 uint32_t crc = 0xFFFFFFFFu;
87 auto* p = static_cast<const uint8_t*>(data);
88 for (size_t i = 0; i < length; ++i)
89 crc = table[(crc ^ p[i]) & 0xFFu] ^ (crc >> 8);
90 return crc ^ 0xFFFFFFFFu;
91}
92
93} // namespace detail_reader
94
101static constexpr size_t PARQUET_MAX_PAGE_SIZE = 256ULL * 1024ULL * 1024ULL; // 256 MB hard cap per page
102
108static constexpr int64_t MAX_VALUES_PER_PAGE = 100'000'000; // 100M values per page — OOM guard
109
111 int64_t num_values,
112 const char* context) {
113 if (num_values < 0 || num_values > MAX_VALUES_PER_PAGE) {
115 std::string(context) + ": num_values out of valid range"};
116 }
117 return static_cast<size_t>(num_values);
118}
119
120inline constexpr uint8_t kEncryptedPageHeaderMagic[4] = {'S', 'P', 'H', '1'};
121
122[[nodiscard]] inline bool has_encrypted_page_header_prefix(
123 const uint8_t* data,
124 size_t size) noexcept {
125 return size >= 8 &&
126 std::memcmp(data, kEncryptedPageHeaderMagic, sizeof(kEncryptedPageHeaderMagic)) == 0;
127}
128
129[[nodiscard]] inline uint32_t load_le32(const uint8_t* data) noexcept {
130 return static_cast<uint32_t>(data[0]) |
131 (static_cast<uint32_t>(data[1]) << 8) |
132 (static_cast<uint32_t>(data[2]) << 16) |
133 (static_cast<uint32_t>(data[3]) << 24);
134}
135
136#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
137[[nodiscard]] inline bool decryptor_has_column_key(
138 const crypto::EncryptionConfig& config,
139 const std::string& col_name) {
140 if (!config.default_column_key.empty()) {
141 return true;
142 }
143 return std::any_of(config.column_keys.begin(), config.column_keys.end(),
144 [&](const auto& ck) {
145 return ck.column_name == col_name && !ck.key.empty();
146 });
147}
148#endif
149
168public:
169 // ===================================================================
170 // Open a Parquet file
171 // ===================================================================
172
190 const std::filesystem::path& path
191#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
192 , const std::optional<crypto::EncryptionConfig>& encryption = std::nullopt
193#endif
194 ) {
195
196 // Ensure built-in codecs are available for page decompression.
198
199 // --- Read entire file into memory ---
200 std::error_code ec;
201 auto file_size = std::filesystem::file_size(path, ec);
202 if (ec) {
204 "cannot determine file size: " + path.string() +
205 " (" + ec.message() + ")"};
206 }
207
208 static constexpr int64_t MAX_FILE_SIZE = INT64_C(4) * 1024 * 1024 * 1024; // 4 GB
209 if (static_cast<int64_t>(file_size) > MAX_FILE_SIZE) {
211 "File exceeds 4 GB limit; use MmapParquetReader for large files"};
212 }
213
214 if (file_size < 12) {
215 // Minimum: 4 (magic) + 4 (footer len) + 4 (magic) = 12
217 "file too small to be a valid Parquet file: " +
218 path.string()};
219 }
220
221 std::ifstream ifs(path, std::ios::binary);
222 if (!ifs) {
224 "cannot open file: " + path.string()};
225 }
226
227 std::vector<uint8_t> file_data(static_cast<size_t>(file_size));
228 ifs.read(reinterpret_cast<char*>(file_data.data()),
229 static_cast<std::streamsize>(file_size));
230 if (!ifs) {
232 "failed to read file: " + path.string()};
233 }
234 ifs.close();
235
236 // --- Verify PAR1 magic at start ---
237 const size_t sz = file_data.size();
238
239 uint32_t magic_start;
240 std::memcpy(&magic_start, file_data.data(), 4);
241 if (magic_start != PARQUET_MAGIC) {
243 "missing PAR1 magic at start of file"};
244 }
245
246 // --- Check trailing magic: PAR1 (plaintext) or PARE (encrypted footer) ---
247 uint32_t magic_end;
248 std::memcpy(&magic_end, file_data.data() + sz - 4, 4);
249
250 bool encrypted_footer = (magic_end == PARQUET_MAGIC_ENCRYPTED);
251
252 if (magic_end != PARQUET_MAGIC && magic_end != PARQUET_MAGIC_ENCRYPTED) {
254 "missing PAR1/PARE magic at end of file"};
255 }
256
257 // --- Read footer length (4-byte LE uint32 at [size-8, size-4]) ---
258 uint32_t footer_len;
259 std::memcpy(&footer_len, file_data.data() + sz - 8, 4);
260
261 if (footer_len == 0 || static_cast<size_t>(footer_len) > sz - 12) {
263 "invalid footer length: " + std::to_string(footer_len)};
264 }
265
266 // --- Prepare decryptor if needed ---
267#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
268 std::unique_ptr<crypto::FileDecryptor> decryptor;
269 if (encrypted_footer || encryption) {
270 if (!encryption) {
272 "file has encrypted footer (PARE magic) but no "
273 "encryption config was provided"};
274 }
275 decryptor = std::make_unique<crypto::FileDecryptor>(*encryption);
276 }
277#else
278 if (encrypted_footer) {
280 "encrypted footer (PARE) requires commercial build and license"};
281 }
282#endif
283
284 // --- Deserialize FileMetaData from footer ---
285 size_t footer_offset = sz - 8 - footer_len;
286 const uint8_t* footer_ptr = file_data.data() + footer_offset;
287 size_t footer_size = footer_len;
288
289 // If footer is encrypted, decrypt it first
290 std::vector<uint8_t> decrypted_footer_buf;
291 if (encrypted_footer) {
292#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
293 auto dec_result = decryptor->decrypt_footer(footer_ptr, footer_size);
294 if (!dec_result) return dec_result.error();
295 decrypted_footer_buf = std::move(*dec_result);
296 footer_ptr = decrypted_footer_buf.data();
297 footer_size = decrypted_footer_buf.size();
298#else
300 "encrypted footer (PARE) requires commercial build and license"};
301#endif
302 }
303
304 thrift::CompactDecoder dec(footer_ptr, footer_size);
305
306 thrift::FileMetaData metadata;
307 if (auto r = metadata.deserialize(dec); !r.has_value()) {
308 return r.error();
309 }
310
311 // --- Build Schema from FileMetaData.schema ---
312 // The first element is the root (group) node — skip it.
313 // Subsequent leaf elements become columns.
314 std::string schema_name;
315 std::vector<ColumnDescriptor> columns;
316
317 if (!metadata.schema.empty()) {
318 schema_name = metadata.schema[0].name;
319
320 for (size_t i = 1; i < metadata.schema.size(); ++i) {
321 const auto& elem = metadata.schema[i];
322
323 // Skip group nodes (those with num_children set)
324 if (elem.num_children.has_value()) {
325 continue;
326 }
327
329 cd.name = elem.name;
330 cd.physical_type = elem.type.value_or(PhysicalType::BYTE_ARRAY);
331 cd.repetition = elem.repetition_type.value_or(Repetition::REQUIRED);
332
333 if (elem.type_length.has_value()) {
334 cd.type_length = *elem.type_length;
335 }
336 if (elem.precision.has_value()) {
337 cd.precision = *elem.precision;
338 }
339 if (elem.scale.has_value()) {
340 cd.scale = *elem.scale;
341 }
342
343 // Map ConvertedType to LogicalType for common cases
344 if (elem.converted_type.has_value()) {
345 cd.logical_type = converted_type_to_logical(*elem.converted_type);
346 }
347
348 columns.push_back(std::move(cd));
349 }
350 }
351
352 // --- Assemble the reader ---
353 ParquetReader reader;
354 reader.file_data_ = std::move(file_data);
355 reader.metadata_ = std::move(metadata);
356 reader.schema_ = Schema(std::move(schema_name), std::move(columns));
357 reader.created_by_ = reader.metadata_.created_by.value_or("");
358#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
359 reader.decryptor_ = std::move(decryptor);
360#endif
361
362 return reader;
363 }
364
365 // ===================================================================
366 // File metadata accessors
367 // ===================================================================
368
371 [[nodiscard]] const Schema& schema() const { return schema_; }
372
374 [[nodiscard]] int64_t num_rows() const { return metadata_.num_rows; }
375
377 [[nodiscard]] int64_t num_row_groups() const {
378 return static_cast<int64_t>(metadata_.row_groups.size());
379 }
380
386 [[nodiscard]] const std::string& created_by() const { return created_by_; }
387
393 [[nodiscard]] const std::vector<thrift::KeyValue>& key_value_metadata() const {
394 static const std::vector<thrift::KeyValue> empty;
395 return metadata_.key_value_metadata.has_value()
396 ? *metadata_.key_value_metadata
397 : empty;
398 }
399
400 // ===================================================================
401 // Row group info
402 // ===================================================================
403
406 int64_t num_rows;
409 };
410
417 RowGroupInfo row_group(size_t index) const {
418 if (index >= metadata_.row_groups.size()) {
419 throw std::out_of_range("ParquetReader::row_group: index " +
420 std::to_string(index) + " >= " +
421 std::to_string(metadata_.row_groups.size()));
422 }
423 const auto& rg = metadata_.row_groups[index];
424 return {rg.num_rows,
425 rg.total_byte_size,
426 static_cast<int64_t>(index)};
427 }
428
429 // ===================================================================
430 // File statistics (aggregate metadata for the entire file)
431 // ===================================================================
432
443 [[nodiscard]] FileStats file_stats() const {
444 FileStats fs;
445 fs.file_size_bytes = static_cast<int64_t>(file_data_.size());
446 fs.total_rows = metadata_.num_rows;
447 fs.num_row_groups = static_cast<int64_t>(metadata_.row_groups.size());
448 fs.num_columns = static_cast<int64_t>(schema_.num_columns());
449 fs.created_by = created_by_;
450
451 // Aggregate per-column stats across all row groups
452 fs.columns.resize(schema_.num_columns());
453 for (size_t c = 0; c < schema_.num_columns(); ++c) {
454 auto& col = fs.columns[c];
455 col.column_name = schema_.column(c).name;
456 col.physical_type = schema_.column(c).physical_type;
457 col.logical_type = schema_.column(c).logical_type;
458 }
459
460 int64_t total_uncompressed = 0;
461 int64_t total_compressed = 0;
462
463 for (const auto& rg : metadata_.row_groups) {
464 for (size_t c = 0; c < rg.columns.size() && c < schema_.num_columns(); ++c) {
465 const auto& cc = rg.columns[c];
466 if (!cc.meta_data.has_value()) continue;
467 const auto& cmd = *cc.meta_data;
468 auto& col = fs.columns[c];
469
470 col.uncompressed_bytes += cmd.total_uncompressed_size;
471 col.compressed_bytes += cmd.total_compressed_size;
472 col.num_values += cmd.num_values;
473 col.compression = cmd.codec;
474
475 if (cmd.statistics.has_value() && cmd.statistics->null_count.has_value()) {
476 col.null_count += *cmd.statistics->null_count;
477 }
478
479 // Check bloom filter presence
480 if (cc.bloom_filter_offset.has_value() && *cc.bloom_filter_offset >= 0) {
481 col.has_bloom_filter = true;
482 }
483 // Check page index presence
484 if (cc.column_index_offset.has_value() && *cc.column_index_offset >= 0) {
485 col.has_page_index = true;
486 }
487
488 total_uncompressed += cmd.total_uncompressed_size;
489 total_compressed += cmd.total_compressed_size;
490 }
491 }
492
493 if (total_compressed > 0) {
494 fs.compression_ratio = static_cast<double>(total_uncompressed)
495 / static_cast<double>(total_compressed);
496 }
497 if (fs.total_rows > 0) {
498 fs.bytes_per_row = static_cast<double>(fs.file_size_bytes)
499 / static_cast<double>(fs.total_rows);
500 }
501
502 return fs;
503 }
504
512 static std::once_flag codec_flag;
513 std::call_once(codec_flag, [] {
515#ifdef SIGNET_HAS_ZSTD
516 register_zstd_codec();
517#endif
518#ifdef SIGNET_HAS_LZ4
519 register_lz4_codec();
520#endif
521#ifdef SIGNET_HAS_GZIP
522 register_gzip_codec();
523#endif
524 });
525 }
526
527 // ===================================================================
528 // Read a single column from a row group as a typed vector
529 // ===================================================================
530
553 template <typename T>
554 expected<std::vector<T>> read_column(size_t row_group_index,
555 size_t column_index) {
556 // --- Validate indices ---
557 if (row_group_index >= metadata_.row_groups.size()) {
559 "row group index out of range"};
560 }
561 if (column_index >= schema_.num_columns()) {
563 "column index out of range"};
564 }
565
566 const auto& rg = metadata_.row_groups[row_group_index];
567 if (column_index >= rg.columns.size()) {
569 "column index out of range"};
570 }
571
572 const auto& chunk = rg.columns[column_index];
573 if (!chunk.meta_data.has_value()) {
575 "column chunk has no metadata"};
576 }
577 const auto& col_meta = *chunk.meta_data;
578
579 // --- Detect encoding strategy from column metadata ---
580 bool has_dict = false;
581 Encoding data_encoding = Encoding::PLAIN;
582
583 for (auto enc : col_meta.encodings) {
585 has_dict = true;
586 }
588 data_encoding = Encoding::DELTA_BINARY_PACKED;
589 }
590 if (enc == Encoding::BYTE_STREAM_SPLIT) {
591 data_encoding = Encoding::BYTE_STREAM_SPLIT;
592 }
593 if (enc == Encoding::RLE && col_meta.type == PhysicalType::BOOLEAN) {
594 data_encoding = Encoding::RLE;
595 }
596 }
597
598 // Resolve column name for encryption context
599 const std::string& col_name = col_meta.path_in_schema.empty()
600 ? schema_.column(column_index).name
601 : col_meta.path_in_schema[0];
602
603 // --- Dictionary encoding path ---
604 // Dictionary encoding supports: string, int32, int64, float, double
605 if (has_dict) {
606 if constexpr (std::is_same_v<T, std::string> ||
607 std::is_same_v<T, int32_t> ||
608 std::is_same_v<T, int64_t> ||
609 std::is_same_v<T, float> ||
610 std::is_same_v<T, double>) {
611 return read_column_dict<T>(col_meta, col_name,
612 static_cast<int32_t>(row_group_index));
613 } else {
615 "dictionary encoding not supported for this type"};
616 }
617 }
618
619 // --- DELTA_BINARY_PACKED path (INT32/INT64 only) ---
620 if (data_encoding == Encoding::DELTA_BINARY_PACKED) {
621 if constexpr (std::is_same_v<T, int32_t> ||
622 std::is_same_v<T, int64_t>) {
623 return read_column_delta<T>(col_meta, col_name,
624 static_cast<int32_t>(row_group_index));
625 } else {
627 "DELTA_BINARY_PACKED only supports INT32/INT64"};
628 }
629 }
630
631 // --- BYTE_STREAM_SPLIT path (FLOAT/DOUBLE only) ---
632 if (data_encoding == Encoding::BYTE_STREAM_SPLIT) {
633 if constexpr (std::is_same_v<T, float> ||
634 std::is_same_v<T, double>) {
635 return read_column_bss<T>(col_meta, col_name,
636 static_cast<int32_t>(row_group_index));
637 } else {
639 "BYTE_STREAM_SPLIT only supports FLOAT/DOUBLE"};
640 }
641 }
642
643 // --- RLE path (boolean only) ---
644 if (data_encoding == Encoding::RLE &&
645 col_meta.type == PhysicalType::BOOLEAN) {
646 if constexpr (std::is_same_v<T, bool>) {
647 return read_column_rle_bool<T>(col_meta, col_name,
648 static_cast<int32_t>(row_group_index));
649 } else {
651 "RLE boolean encoding requires bool type"};
652 }
653 }
654
655 // --- Default: PLAIN encoding via ColumnReader ---
656 auto reader_result = make_column_reader(row_group_index, column_index);
657 if (!reader_result) return reader_result.error();
658
659 auto& [col_reader, num_values] = reader_result.value();
660 auto count_result = validate_page_value_count(num_values, "ParquetReader RLE_BOOL");
661 if (!count_result) return count_result.error();
662 size_t count = *count_result;
663
664 if constexpr (std::is_same_v<T, bool>) {
665 // std::vector<bool> has no .data() — read one by one
666 std::vector<bool> values;
667 values.reserve(count);
668 for (size_t i = 0; i < count; ++i) {
669 auto val = col_reader.template read<bool>();
670 if (!val) return val.error();
671 values.push_back(*val);
672 }
673 return values;
674 } else {
675 std::vector<T> values(count);
676 auto batch_result = col_reader.template read_batch<T>(
677 values.data(), count);
678 if (!batch_result) return batch_result.error();
679 return values;
680 }
681 }
682
683 // ===================================================================
684 // Read column as strings (converts any type to string representation)
685 // ===================================================================
686
699 size_t row_group_index, size_t column_index) {
700 if (row_group_index >= metadata_.row_groups.size()) {
701 return Error{ErrorCode::OUT_OF_RANGE, "row group index out of range"};
702 }
703 if (column_index >= schema_.num_columns()) {
704 return Error{ErrorCode::OUT_OF_RANGE, "column index out of range"};
705 }
706
707 PhysicalType pt = schema_.column(column_index).physical_type;
708
709 switch (pt) {
711 auto res = read_column<bool>(row_group_index, column_index);
712 if (!res) return res.error();
713 return to_string_vec(res.value());
714 }
715 case PhysicalType::INT32: {
716 auto res = read_column<int32_t>(row_group_index, column_index);
717 if (!res) return res.error();
718 return to_string_vec(res.value());
719 }
720 case PhysicalType::INT64: {
721 auto res = read_column<int64_t>(row_group_index, column_index);
722 if (!res) return res.error();
723 return to_string_vec(res.value());
724 }
725 case PhysicalType::FLOAT: {
726 auto res = read_column<float>(row_group_index, column_index);
727 if (!res) return res.error();
728 return to_string_vec(res.value());
729 }
731 auto res = read_column<double>(row_group_index, column_index);
732 if (!res) return res.error();
733 return to_string_vec(res.value());
734 }
736 // Strings are already strings — read directly
737 return read_column<std::string>(row_group_index, column_index);
738 }
740 // Read as raw bytes, hex-encode each value
741 auto reader_result = make_column_reader(row_group_index, column_index);
742 if (!reader_result) return reader_result.error();
743 auto& [col_reader, num_values] = reader_result.value();
744
745 std::vector<std::string> result;
746 result.reserve(static_cast<size_t>(num_values));
747 for (int64_t i = 0; i < num_values; ++i) {
748 auto bytes_result = col_reader.read_bytes();
749 if (!bytes_result) return bytes_result.error();
750 result.push_back(hex_encode(bytes_result.value()));
751 }
752 return result;
753 }
754 default:
756 "unsupported physical type for string conversion"};
757 }
758 }
759
760 // ===================================================================
761 // Read all columns from a row group as vectors of strings
762 // ===================================================================
763
775 size_t row_group_index) {
776 size_t num_cols = schema_.num_columns();
777 std::vector<std::vector<std::string>> columns(num_cols);
778
779 for (size_t c = 0; c < num_cols; ++c) {
780 auto res = read_column_as_strings(row_group_index, c);
781 if (!res) return res.error();
782 columns[c] = std::move(res.value());
783 }
784
785 return columns;
786 }
787
788 // ===================================================================
789 // Read entire file as vector of rows (each row = vector of strings)
790 // ===================================================================
791
805 size_t num_cols = schema_.num_columns();
806 int64_t safe_rows = (metadata_.num_rows < 0) ? 0 : metadata_.num_rows;
807 if (static_cast<uint64_t>(safe_rows) > 1024ULL * 1024 * 1024)
808 return Error{ErrorCode::INVALID_ARGUMENT, "num_rows exceeds 1 billion limit"};
809 std::vector<std::vector<std::string>> rows;
810 rows.reserve(static_cast<size_t>(safe_rows));
811
812 for (size_t rg = 0; rg < metadata_.row_groups.size(); ++rg) {
813 auto cols_result = read_row_group(rg);
814 if (!cols_result) return cols_result.error();
815
816 const auto& col_data = cols_result.value();
817 if (col_data.empty()) continue;
818
819 size_t rg_rows = col_data[0].size();
820 for (size_t r = 0; r < rg_rows; ++r) {
821 std::vector<std::string> row(num_cols);
822 for (size_t c = 0; c < num_cols; ++c) {
823 if (r < col_data[c].size()) {
824 row[c] = col_data[c][r];
825 }
826 }
827 rows.push_back(std::move(row));
828 }
829 }
830
831 return rows;
832 }
833
834 // ===================================================================
835 // Column projection -- read only specific columns by name
836 // ===================================================================
837
851 const std::vector<std::string>& column_names) {
852 // Resolve column indices
853 std::vector<size_t> indices;
854 indices.reserve(column_names.size());
855
856 for (const auto& name : column_names) {
857 auto idx = schema_.find_column(name);
858 if (!idx.has_value()) {
860 "column not found: " + name};
861 }
862 indices.push_back(*idx);
863 }
864
865 // Read across all row groups
866 size_t proj_cols = indices.size();
867 std::vector<std::vector<std::string>> result(proj_cols);
868
869 for (size_t rg = 0; rg < metadata_.row_groups.size(); ++rg) {
870 for (size_t p = 0; p < proj_cols; ++p) {
871 auto col_result = read_column_as_strings(rg, indices[p]);
872 if (!col_result) return col_result.error();
873
874 auto& col_vec = col_result.value();
875 result[p].insert(result[p].end(),
876 std::make_move_iterator(col_vec.begin()),
877 std::make_move_iterator(col_vec.end()));
878 }
879 }
880
881 return result;
882 }
883
884 // ===================================================================
885 // Statistics for a column in a row group
886 // ===================================================================
887
899 const thrift::Statistics* column_statistics(size_t row_group_index,
900 size_t column_index) const {
901 if (row_group_index >= metadata_.row_groups.size()) return nullptr;
902 const auto& rg = metadata_.row_groups[row_group_index];
903 if (column_index >= rg.columns.size()) return nullptr;
904 const auto& chunk = rg.columns[column_index];
905 if (!chunk.meta_data.has_value()) return nullptr;
906 if (!chunk.meta_data->statistics.has_value()) return nullptr;
907 return &(*chunk.meta_data->statistics);
908 }
909
910 // ===================================================================
911 // Bloom filter access
912 // ===================================================================
913
926 size_t row_group_index, size_t column_index) const {
927 if (row_group_index >= metadata_.row_groups.size()) {
929 "row group index out of range"};
930 }
931
932 const auto& rg = metadata_.row_groups[row_group_index];
933 if (column_index >= rg.columns.size()) {
935 "column index out of range"};
936 }
937
938 const auto& chunk = rg.columns[column_index];
939 if (!chunk.bloom_filter_offset.has_value()) {
941 "no bloom filter for this column chunk"};
942 }
943
944 int64_t bf_offset = *chunk.bloom_filter_offset;
945 if (bf_offset < 0 || static_cast<size_t>(bf_offset) + 4 > file_data_.size()) {
947 "bloom filter offset out of file bounds"};
948 }
949
950 // Read 4-byte LE size header
951 uint32_t bf_size = 0;
952 std::memcpy(&bf_size, file_data_.data() + bf_offset, 4);
953
954 size_t data_start = static_cast<size_t>(bf_offset) + 4;
955 if (data_start + bf_size > file_data_.size()) {
957 "bloom filter data extends past end of file"};
958 }
959
960 if (bf_size == 0 || (bf_size % SplitBlockBloomFilter::kBytesPerBlock) != 0) {
962 "invalid bloom filter size: " + std::to_string(bf_size)};
963 }
964
966 file_data_.data() + data_start, bf_size);
967 }
968
983 template <typename T>
984 [[nodiscard]] bool bloom_might_contain(
985 size_t row_group_index, size_t column_index,
986 const T& value) const {
987 auto bf_result = read_bloom_filter(row_group_index, column_index);
988 if (!bf_result) {
989 return true; // No bloom filter — cannot rule out the value
990 }
991 return bf_result->might_contain_value(value);
992 }
993
994 // ===================================================================
995 // Page Index access (ColumnIndex + OffsetIndex)
996 // ===================================================================
997
1011 size_t row_group_index, size_t column_index) const {
1012 if (row_group_index >= metadata_.row_groups.size())
1013 return Error{ErrorCode::OUT_OF_RANGE, "row group index out of range"};
1014 const auto& rg = metadata_.row_groups[row_group_index];
1015 if (column_index >= rg.columns.size())
1016 return Error{ErrorCode::OUT_OF_RANGE, "column index out of range"};
1017
1018 const auto& chunk = rg.columns[column_index];
1019 if (!chunk.column_index_offset.has_value() || !chunk.column_index_length.has_value())
1020 return Error{ErrorCode::INVALID_FILE, "no column index for this column chunk"};
1021
1022 int64_t ci_offset = *chunk.column_index_offset;
1023 int32_t ci_length = *chunk.column_index_length;
1024 if (ci_offset < 0 || ci_length < 0)
1025 return Error{ErrorCode::CORRUPT_PAGE, "column index offset/length negative"};
1026 auto uoff = static_cast<size_t>(ci_offset);
1027 auto ulen = static_cast<size_t>(ci_length);
1028 if (uoff > file_data_.size() || ulen > file_data_.size() - uoff)
1029 return Error{ErrorCode::CORRUPT_PAGE, "column index offset/length out of bounds"};
1030
1031 thrift::CompactDecoder dec(file_data_.data() + ci_offset,
1032 static_cast<size_t>(ci_length));
1033 ColumnIndex ci;
1034 ci.deserialize(dec);
1035 if (!dec.good())
1036 return Error{ErrorCode::CORRUPT_PAGE, "column index deserialization failed"};
1037 return ci;
1038 }
1039
1053 size_t row_group_index, size_t column_index) const {
1054 if (row_group_index >= metadata_.row_groups.size())
1055 return Error{ErrorCode::OUT_OF_RANGE, "row group index out of range"};
1056 const auto& rg = metadata_.row_groups[row_group_index];
1057 if (column_index >= rg.columns.size())
1058 return Error{ErrorCode::OUT_OF_RANGE, "column index out of range"};
1059
1060 const auto& chunk = rg.columns[column_index];
1061 if (!chunk.offset_index_offset.has_value() || !chunk.offset_index_length.has_value())
1062 return Error{ErrorCode::INVALID_FILE, "no offset index for this column chunk"};
1063
1064 int64_t oi_offset = *chunk.offset_index_offset;
1065 int32_t oi_length = *chunk.offset_index_length;
1066 if (oi_offset < 0 || oi_length < 0)
1067 return Error{ErrorCode::CORRUPT_PAGE, "offset index offset/length negative"};
1068 auto uoff2 = static_cast<size_t>(oi_offset);
1069 auto ulen2 = static_cast<size_t>(oi_length);
1070 if (uoff2 > file_data_.size() || ulen2 > file_data_.size() - uoff2)
1071 return Error{ErrorCode::CORRUPT_PAGE, "offset index offset/length out of bounds"};
1072
1073 thrift::CompactDecoder dec(file_data_.data() + oi_offset,
1074 static_cast<size_t>(oi_length));
1075 OffsetIndex oi;
1076 oi.deserialize(dec);
1077 if (!dec.good())
1078 return Error{ErrorCode::CORRUPT_PAGE, "offset index deserialization failed"};
1079 return oi;
1080 }
1081
1090 [[nodiscard]] bool has_page_index(size_t row_group_index, size_t column_index) const {
1091 if (row_group_index >= metadata_.row_groups.size()) return false;
1092 const auto& rg = metadata_.row_groups[row_group_index];
1093 if (column_index >= rg.columns.size()) return false;
1094 const auto& chunk = rg.columns[column_index];
1095 return chunk.column_index_offset.has_value() && chunk.offset_index_offset.has_value();
1096 }
1097
1098 // ===================================================================
1099 // Special members
1100 // ===================================================================
1101
1103 ~ParquetReader() = default;
1105 ParquetReader(ParquetReader&&) noexcept = default;
1107 ParquetReader& operator=(ParquetReader&&) noexcept = default;
1108
1109private:
1111 ParquetReader() = default;
1112
1113 std::vector<uint8_t> file_data_;
1114 thrift::FileMetaData metadata_;
1115 Schema schema_;
1116 std::string created_by_;
1117#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1118 std::unique_ptr<crypto::FileDecryptor> decryptor_;
1119#endif
1120
1122 std::vector<std::vector<uint8_t>> decompressed_buffers_;
1123
1125 struct ColumnReaderWithCount {
1126 ColumnReader reader;
1127 int64_t num_values;
1128 };
1129
1131 expected<ColumnReaderWithCount> make_column_reader(
1132 size_t row_group_index, size_t column_index) {
1133 if (row_group_index >= metadata_.row_groups.size()) {
1135 "row group index out of range"};
1136 }
1137
1138 const auto& rg = metadata_.row_groups[row_group_index];
1139 if (column_index >= rg.columns.size()) {
1140 return Error{ErrorCode::OUT_OF_RANGE,
1141 "column index out of range"};
1142 }
1143
1144 const auto& chunk = rg.columns[column_index];
1145 if (!chunk.meta_data.has_value()) {
1146 return Error{ErrorCode::CORRUPT_PAGE,
1147 "column chunk has no metadata"};
1148 }
1149
1150 const auto& col_meta = *chunk.meta_data;
1151
1152 // Locate the data page in the file buffer.
1153 // For dictionary-encoded columns, the dictionary page comes first
1154 // at dictionary_page_offset, and data_page_offset points to the
1155 // data page (which contains RLE-encoded indices).
1156 int64_t offset = col_meta.data_page_offset;
1157 if (offset < 0 || static_cast<size_t>(offset) >= file_data_.size()) {
1158 return Error{ErrorCode::CORRUPT_PAGE,
1159 "data_page_offset out of file bounds"};
1160 }
1161
1162 size_t remaining = file_data_.size() - static_cast<size_t>(offset);
1163 const uint8_t* page_start = file_data_.data() + offset;
1164
1165 const std::string col_name = col_meta.path_in_schema.empty()
1166 ? schema_.column(column_index).name
1167 : col_meta.path_in_schema[0];
1168
1169 thrift::PageHeader page_header;
1170 size_t header_size = 0;
1171#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1172 const bool column_encrypted = decryptor_ &&
1173 decryptor_has_column_key(decryptor_->config(), col_name);
1174 if (has_encrypted_page_header_prefix(page_start, remaining)) {
1175 if (!column_encrypted) {
1176 return Error{ErrorCode::ENCRYPTION_ERROR,
1177 "encrypted page header encountered without matching decryptor for column '" +
1178 col_name + "'"};
1179 }
1180 const uint32_t encrypted_header_size = load_le32(page_start + 4);
1181 if (encrypted_header_size == 0 ||
1182 static_cast<size_t>(encrypted_header_size) > remaining - 8) {
1183 return Error{ErrorCode::CORRUPT_PAGE,
1184 "encrypted page header size out of range"};
1185 }
1186 auto header_result = decryptor_->decrypt_data_page_header(
1187 page_start + 8, encrypted_header_size, col_name,
1188 static_cast<int32_t>(row_group_index), 0);
1189 if (!header_result) return header_result.error();
1190
1191 thrift::CompactDecoder page_dec(header_result->data(), header_result->size());
1192 if (auto r = page_header.deserialize(page_dec); !r.has_value()) {
1193 return r.error();
1194 }
1195 if (page_dec.position() != header_result->size()) {
1196 return Error{ErrorCode::THRIFT_DECODE_ERROR,
1197 "encrypted page header contains trailing bytes"};
1198 }
1199 header_size = 8 + static_cast<size_t>(encrypted_header_size);
1200 } else
1201#endif
1202 {
1203 // Deserialize the PageHeader to find where the data begins.
1204 // The serialized size of the PageHeader is variable (Thrift compact
1205 // encoding). We use decoder.position() to determine how many bytes
1206 // the PageHeader consumed.
1207 thrift::CompactDecoder page_dec(page_start, remaining);
1208 if (auto r = page_header.deserialize(page_dec); !r.has_value()) {
1209 return r.error();
1210 }
1211 header_size = page_dec.position();
1212 }
1213 if (page_header.compressed_page_size < 0) {
1214 return Error{ErrorCode::CORRUPT_PAGE,
1215 "negative compressed_page_size"};
1216 }
1217 size_t page_data_size = static_cast<size_t>(
1218 page_header.compressed_page_size);
1219 const uint8_t* page_data = page_start + header_size;
1220
1221 if (header_size + page_data_size > remaining) {
1222 return Error{ErrorCode::CORRUPT_PAGE,
1223 "page data extends past end of file"};
1224 }
1225
1226 // EX-1: Verify page CRC-32 if present in the page header.
1227 if (page_header.crc.has_value()) {
1228 uint32_t expected_crc = static_cast<uint32_t>(*page_header.crc);
1229 uint32_t computed_crc = detail_reader::crc32(page_data, page_data_size);
1230 if (computed_crc != expected_crc) {
1231 return Error{ErrorCode::CORRUPT_PAGE,
1232 "Page CRC-32 mismatch at offset "
1233 + std::to_string(offset) + ": expected 0x"
1234 + ([&]{
1235 char buf[9]; std::snprintf(buf, sizeof(buf), "%08X", expected_crc);
1236 return std::string(buf);
1237 })()
1238 + ", computed 0x"
1239 + ([&]{
1240 char buf[9]; std::snprintf(buf, sizeof(buf), "%08X", computed_crc);
1241 return std::string(buf);
1242 })()};
1243 }
1244 }
1245
1246 // --- Decrypt page data if PME is configured for this column ---
1247#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1248 if (decryptor_ && decryptor_has_column_key(decryptor_->config(), col_name)) {
1249 auto dec_result = (page_header.type == PageType::DICTIONARY_PAGE)
1250 ? decryptor_->decrypt_dict_page(
1251 page_data, page_data_size, col_name,
1252 static_cast<int32_t>(row_group_index))
1253 : decryptor_->decrypt_column_page(
1254 page_data, page_data_size, col_name,
1255 static_cast<int32_t>(row_group_index), 0);
1256 if (!dec_result) return dec_result.error();
1257 decompressed_buffers_.push_back(std::move(*dec_result));
1258 page_data = decompressed_buffers_.back().data();
1259 page_data_size = decompressed_buffers_.back().size();
1260 }
1261#endif
1262
1263 // --- Decompress page data if needed ---
1264 if (col_meta.codec != Compression::UNCOMPRESSED) {
1265 size_t uncompressed_size = static_cast<size_t>(
1266 page_header.uncompressed_page_size);
1267 if (uncompressed_size == 0 || uncompressed_size > PARQUET_MAX_PAGE_SIZE) {
1268 return Error{ErrorCode::CORRUPT_PAGE,
1269 "ParquetReader: uncompressed_page_size exceeds 256 MB hard cap"};
1270 }
1271 auto decompressed = decompress(col_meta.codec,
1272 page_data, page_data_size,
1273 uncompressed_size);
1274 if (!decompressed) {
1276 "decompression failed: " +
1277 decompressed.error().message};
1278 }
1279 decompressed_buffers_.push_back(std::move(decompressed.value()));
1280 page_data = decompressed_buffers_.back().data();
1281 page_data_size = decompressed_buffers_.back().size();
1282 }
1283
1284 // Determine num_values from the PageHeader
1285 int64_t num_values = 0;
1286 if (page_header.type == PageType::DATA_PAGE &&
1287 page_header.data_page_header.has_value()) {
1288 num_values = page_header.data_page_header->num_values;
1289 } else if (page_header.type == PageType::DATA_PAGE_V2 &&
1290 page_header.data_page_header_v2.has_value()) {
1291 num_values = page_header.data_page_header_v2->num_values;
1292 } else {
1293 // Fall back to column metadata num_values
1294 num_values = col_meta.num_values;
1295 }
1296
1297 if (num_values < 0 || num_values > MAX_VALUES_PER_PAGE) {
1298 return Error{ErrorCode::CORRUPT_PAGE,
1299 "num_values out of valid range"};
1300 }
1301
1302 // Determine the physical type and type_length from the schema
1303 PhysicalType pt = col_meta.type;
1304 int32_t type_length = -1;
1305 if (column_index < schema_.num_columns()) {
1306 type_length = schema_.column(column_index).type_length;
1307 }
1308
1309 // M22: Check that decoded page won't exceed a reasonable memory budget
1310 {
1311 size_t elem_size = 0;
1312 switch (pt) {
1313 case PhysicalType::BOOLEAN: elem_size = 1; break;
1315 case PhysicalType::FLOAT: elem_size = 4; break;
1317 case PhysicalType::DOUBLE: elem_size = 8; break;
1319 elem_size = (type_length > 0) ? static_cast<size_t>(type_length) : 1;
1320 break;
1321 default: elem_size = 0; break; // variable-length: skip budget check
1322 }
1323 // CWE-400: Uncontrolled Resource Consumption — 256 MB decoded page memory budget
1324 if (elem_size > 0) {
1325 size_t decoded_size = static_cast<size_t>(num_values) * elem_size;
1326 if (decoded_size > 256ULL * 1024 * 1024) {
1327 return Error{ErrorCode::CORRUPT_DATA,
1328 "decoded page exceeds 256 MB memory limit"};
1329 }
1330 }
1331 }
1332
1333 ColumnReader col_reader(pt, page_data, page_data_size,
1334 num_values, type_length);
1335
1336 return ColumnReaderWithCount{std::move(col_reader), num_values};
1337 }
1338
1340 struct PageReadResult {
1341 const uint8_t* data;
1342 size_t size;
1343 thrift::PageHeader header;
1344 };
1345
1347 expected<PageReadResult> read_page_at(int64_t offset, Compression codec,
1348 const std::string& col_name = "",
1349 int32_t rg_index = 0, int32_t page_ordinal = 0) {
1350 if (offset < 0 || static_cast<size_t>(offset) >= file_data_.size()) {
1351 return Error{ErrorCode::CORRUPT_PAGE,
1352 "page offset out of file bounds"};
1353 }
1354
1355 size_t remaining = file_data_.size() - static_cast<size_t>(offset);
1356 const uint8_t* page_start = file_data_.data() + offset;
1357
1358 thrift::PageHeader ph;
1359 size_t hdr_size = 0;
1360#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1361 const bool column_encrypted = decryptor_ && !col_name.empty() &&
1362 decryptor_has_column_key(decryptor_->config(), col_name);
1363 if (has_encrypted_page_header_prefix(page_start, remaining)) {
1364 if (!column_encrypted) {
1365 return Error{ErrorCode::ENCRYPTION_ERROR,
1366 "encrypted page header encountered without matching decryptor for column '" +
1367 col_name + "'"};
1368 }
1369 const uint32_t encrypted_header_size = load_le32(page_start + 4);
1370 if (encrypted_header_size == 0 ||
1371 static_cast<size_t>(encrypted_header_size) > remaining - 8) {
1372 return Error{ErrorCode::CORRUPT_PAGE,
1373 "encrypted page header size out of range"};
1374 }
1375 auto header_result = decryptor_->decrypt_data_page_header(
1376 page_start + 8, encrypted_header_size, col_name,
1377 rg_index, page_ordinal);
1378 if (!header_result) return header_result.error();
1379
1380 thrift::CompactDecoder page_dec(header_result->data(), header_result->size());
1381 if (auto r = ph.deserialize(page_dec); !r.has_value()) {
1382 return r.error();
1383 }
1384 if (page_dec.position() != header_result->size()) {
1385 return Error{ErrorCode::THRIFT_DECODE_ERROR,
1386 "encrypted page header contains trailing bytes"};
1387 }
1388 hdr_size = 8 + static_cast<size_t>(encrypted_header_size);
1389 } else
1390#endif
1391 {
1392 thrift::CompactDecoder page_dec(page_start, remaining);
1393 if (auto r = ph.deserialize(page_dec); !r.has_value()) {
1394 return r.error();
1395 }
1396 hdr_size = page_dec.position();
1397 }
1398 if (ph.compressed_page_size < 0) {
1399 return Error{ErrorCode::CORRUPT_PAGE,
1400 "negative compressed_page_size"};
1401 }
1402 size_t compressed_size = static_cast<size_t>(ph.compressed_page_size);
1403 const uint8_t* pdata = page_start + hdr_size;
1404
1405 if (hdr_size + compressed_size > remaining) {
1406 return Error{ErrorCode::CORRUPT_PAGE,
1407 "page data extends past end of file"};
1408 }
1409
1410 size_t pdata_size = compressed_size;
1411
1412 // EX-1: Verify page CRC-32 if present in the page header.
1413 // Parquet spec field 4 is an optional CRC-32 over the compressed page data.
1414 if (ph.crc.has_value()) {
1415 uint32_t expected_crc = static_cast<uint32_t>(*ph.crc);
1416 uint32_t computed_crc = detail_reader::crc32(pdata, pdata_size);
1417 if (computed_crc != expected_crc) {
1418 return Error{ErrorCode::CORRUPT_PAGE,
1419 "Page CRC-32 mismatch at offset "
1420 + std::to_string(offset) + ": expected 0x"
1421 + ([&]{
1422 char buf[9]; std::snprintf(buf, sizeof(buf), "%08X", expected_crc);
1423 return std::string(buf);
1424 })()
1425 + ", computed 0x"
1426 + ([&]{
1427 char buf[9]; std::snprintf(buf, sizeof(buf), "%08X", computed_crc);
1428 return std::string(buf);
1429 })()};
1430 }
1431 }
1432
1433 // --- Decrypt page data if PME is configured ---
1434#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1435 if (decryptor_ && !col_name.empty() &&
1436 decryptor_has_column_key(decryptor_->config(), col_name)) {
1437 auto dec_result = (ph.type == PageType::DICTIONARY_PAGE)
1438 ? decryptor_->decrypt_dict_page(pdata, pdata_size, col_name, rg_index)
1439 : decryptor_->decrypt_column_page(pdata, pdata_size, col_name, rg_index, page_ordinal);
1440 if (!dec_result) return dec_result.error();
1441 decompressed_buffers_.push_back(std::move(*dec_result));
1442 pdata = decompressed_buffers_.back().data();
1443 pdata_size = decompressed_buffers_.back().size();
1444 }
1445#endif
1446
1447 if (codec != Compression::UNCOMPRESSED) {
1448 size_t uncompressed_size = static_cast<size_t>(
1449 ph.uncompressed_page_size);
1450 if (uncompressed_size == 0 || uncompressed_size > PARQUET_MAX_PAGE_SIZE) {
1451 return Error{ErrorCode::CORRUPT_PAGE,
1452 "ParquetReader: uncompressed_page_size exceeds 256 MB hard cap"};
1453 }
1454 // CWE-409: Improper Handling of Highly Compressed Data (Zip Bomb)
1455 // M20: Reject suspiciously high decompression ratios (zip bomb guard)
1456 if (pdata_size > 0 && uncompressed_size / pdata_size > 1024) {
1457 return Error{ErrorCode::CORRUPT_DATA,
1458 "ParquetReader: decompression ratio exceeds 1024x limit"};
1459 }
1460 auto dec_result = decompress(codec, pdata, pdata_size,
1461 uncompressed_size);
1462 if (!dec_result) {
1464 "decompression failed: " +
1465 dec_result.error().message};
1466 }
1467 decompressed_buffers_.push_back(std::move(dec_result.value()));
1468 pdata = decompressed_buffers_.back().data();
1469 pdata_size = decompressed_buffers_.back().size();
1470 }
1471
1472 return PageReadResult{pdata, pdata_size, std::move(ph)};
1473 }
1474
1476 template <typename T>
1477 expected<std::vector<T>> read_column_dict(
1478 const thrift::ColumnMetaData& col_meta,
1479 const std::string& col_name = "",
1480 int32_t rg_index = 0) {
1481 // Step 1: determine where the dictionary page lives
1482 int64_t dict_offset = col_meta.dictionary_page_offset.value_or(
1483 col_meta.data_page_offset);
1484
1485 // Step 2: read the dictionary page
1486 auto dict_page_result = read_page_at(dict_offset, col_meta.codec,
1487 col_name, rg_index, -1);
1488 if (!dict_page_result) return dict_page_result.error();
1489
1490 auto& dict_pr = dict_page_result.value();
1491 if (dict_pr.header.type != PageType::DICTIONARY_PAGE ||
1492 !dict_pr.header.dictionary_page_header.has_value()) {
1493 return Error{ErrorCode::CORRUPT_PAGE,
1494 "expected DICTIONARY_PAGE at dictionary offset"};
1495 }
1496
1497 int32_t raw_dict_count = dict_pr.header.dictionary_page_header->num_values;
1498 if (raw_dict_count < 0 || raw_dict_count > 10'000'000) {
1499 return Error{ErrorCode::CORRUPT_PAGE,
1500 "dictionary page num_values out of valid range"};
1501 }
1502 size_t num_dict_entries = static_cast<size_t>(raw_dict_count);
1503
1504 // Step 3: find the data page offset
1505 // If dictionary_page_offset was set and != data_page_offset, the
1506 // data page is at data_page_offset. Otherwise the data page
1507 // immediately follows the dictionary page in the file.
1508 int64_t data_offset = col_meta.data_page_offset;
1509 if (data_offset == dict_offset) {
1510 // Dictionary page and data page are sequential; skip past the
1511 // dictionary page in the file to get to the data page.
1512 // We need the raw page size (compressed) + header.
1513 size_t dict_raw_start = static_cast<size_t>(dict_offset);
1514 const uint8_t* dict_start = file_data_.data() + dict_raw_start;
1515 size_t dict_remaining = file_data_.size() - dict_raw_start;
1516
1517 thrift::CompactDecoder hdr_dec(dict_start, dict_remaining);
1518 thrift::PageHeader tmp_hdr;
1519 if (auto r = tmp_hdr.deserialize(hdr_dec); !r.has_value()) {
1520 return r.error();
1521 }
1522 size_t dict_hdr_size = hdr_dec.position();
1523 if (tmp_hdr.compressed_page_size < 0) {
1524 return Error{ErrorCode::CORRUPT_PAGE,
1525 "negative compressed_page_size in dictionary page"};
1526 }
1527 size_t dict_compressed_size = static_cast<size_t>(
1528 tmp_hdr.compressed_page_size);
1529
1530 data_offset = dict_offset
1531 + static_cast<int64_t>(dict_hdr_size)
1532 + static_cast<int64_t>(dict_compressed_size);
1533 }
1534
1535 // Step 4: read the data page (contains RLE-encoded indices)
1536 auto data_page_result = read_page_at(data_offset, col_meta.codec,
1537 col_name, rg_index, 0);
1538 if (!data_page_result) return data_page_result.error();
1539
1540 auto& data_pr = data_page_result.value();
1541
1542 int64_t num_values = 0;
1543 if (data_pr.header.type == PageType::DATA_PAGE &&
1544 data_pr.header.data_page_header.has_value()) {
1545 num_values = data_pr.header.data_page_header->num_values;
1546 } else if (data_pr.header.type == PageType::DATA_PAGE_V2 &&
1547 data_pr.header.data_page_header_v2.has_value()) {
1548 num_values = data_pr.header.data_page_header_v2->num_values;
1549 } else {
1550 num_values = col_meta.num_values;
1551 }
1552
1553 if (num_values < 0 || num_values > MAX_VALUES_PER_PAGE) {
1554 return Error{ErrorCode::CORRUPT_PAGE,
1555 "num_values out of valid range"};
1556 }
1557
1558 // Step 5: decode using DictionaryDecoder
1559 DictionaryDecoder<T> decoder(dict_pr.data, dict_pr.size,
1560 num_dict_entries, col_meta.type);
1561
1562 return decoder.decode(data_pr.data, data_pr.size,
1563 static_cast<size_t>(num_values));
1564 }
1565
1567 template <typename T>
1568 expected<std::vector<T>> read_column_delta(
1569 const thrift::ColumnMetaData& col_meta,
1570 const std::string& col_name = "",
1571 int32_t rg_index = 0) {
1572 auto page_result = read_page_at(col_meta.data_page_offset,
1573 col_meta.codec,
1574 col_name, rg_index, 0);
1575 if (!page_result) return page_result.error();
1576
1577 auto& pr = page_result.value();
1578
1579 int64_t num_values = 0;
1580 if (pr.header.type == PageType::DATA_PAGE &&
1581 pr.header.data_page_header.has_value()) {
1582 num_values = pr.header.data_page_header->num_values;
1583 } else if (pr.header.type == PageType::DATA_PAGE_V2 &&
1584 pr.header.data_page_header_v2.has_value()) {
1585 num_values = pr.header.data_page_header_v2->num_values;
1586 } else {
1587 num_values = col_meta.num_values;
1588 }
1589
1590 auto count_result = validate_page_value_count(num_values, "ParquetReader DELTA_BINARY_PACKED");
1591 if (!count_result) return count_result.error();
1592 size_t count = *count_result;
1593
1594 if constexpr (std::is_same_v<T, int32_t>) {
1595 return delta::decode_int32(pr.data, pr.size, count);
1596 } else if constexpr (std::is_same_v<T, int64_t>) {
1597 return delta::decode_int64(pr.data, pr.size, count);
1598 } else {
1600 "DELTA_BINARY_PACKED only supports INT32/INT64"};
1601 }
1602 }
1603
1605 template <typename T>
1606 expected<std::vector<T>> read_column_bss(
1607 const thrift::ColumnMetaData& col_meta,
1608 const std::string& col_name = "",
1609 int32_t rg_index = 0) {
1610 auto page_result = read_page_at(col_meta.data_page_offset,
1611 col_meta.codec,
1612 col_name, rg_index, 0);
1613 if (!page_result) return page_result.error();
1614
1615 auto& pr = page_result.value();
1616
1617 int64_t num_values = 0;
1618 if (pr.header.type == PageType::DATA_PAGE &&
1619 pr.header.data_page_header.has_value()) {
1620 num_values = pr.header.data_page_header->num_values;
1621 } else if (pr.header.type == PageType::DATA_PAGE_V2 &&
1622 pr.header.data_page_header_v2.has_value()) {
1623 num_values = pr.header.data_page_header_v2->num_values;
1624 } else {
1625 num_values = col_meta.num_values;
1626 }
1627
1628 auto count_result = validate_page_value_count(num_values, "ParquetReader BYTE_STREAM_SPLIT");
1629 if (!count_result) return count_result.error();
1630 size_t count = *count_result;
1631
1632 if constexpr (std::is_same_v<T, float>) {
1633 return byte_stream_split::decode_float(pr.data, pr.size, count);
1634 } else if constexpr (std::is_same_v<T, double>) {
1635 return byte_stream_split::decode_double(pr.data, pr.size, count);
1636 } else {
1638 "BYTE_STREAM_SPLIT only supports FLOAT/DOUBLE"};
1639 }
1640 }
1641
1643 template <typename T>
1644 expected<std::vector<T>> read_column_rle_bool(
1645 const thrift::ColumnMetaData& col_meta,
1646 const std::string& col_name = "",
1647 int32_t rg_index = 0) {
1648 auto page_result = read_page_at(col_meta.data_page_offset,
1649 col_meta.codec,
1650 col_name, rg_index, 0);
1651 if (!page_result) return page_result.error();
1652
1653 auto& pr = page_result.value();
1654
1655 int64_t num_values = 0;
1656 if (pr.header.type == PageType::DATA_PAGE &&
1657 pr.header.data_page_header.has_value()) {
1658 num_values = pr.header.data_page_header->num_values;
1659 } else if (pr.header.type == PageType::DATA_PAGE_V2 &&
1660 pr.header.data_page_header_v2.has_value()) {
1661 num_values = pr.header.data_page_header_v2->num_values;
1662 } else {
1663 num_values = col_meta.num_values;
1664 }
1665
1666 size_t count = static_cast<size_t>(num_values);
1667
1668 if constexpr (std::is_same_v<T, bool>) {
1669 // RLE boolean: 4-byte LE length prefix + RLE payload, bit_width=1
1670 auto indices = RleDecoder::decode_with_length(
1671 pr.data, pr.size, /*bit_width=*/1, count);
1672
1673 std::vector<bool> result;
1674 result.reserve(count);
1675 for (size_t i = 0; i < count && i < indices.size(); ++i) {
1676 result.push_back(indices[i] != 0);
1677 }
1678 return result;
1679 } else {
1681 "RLE encoding for booleans requires bool type"};
1682 }
1683 }
1684
1686 static std::vector<std::string> to_string_vec(const std::vector<bool>& vals) {
1687 std::vector<std::string> result;
1688 result.reserve(vals.size());
1689 for (bool v : vals) {
1690 result.push_back(v ? "true" : "false");
1691 }
1692 return result;
1693 }
1694
1696 template <typename T>
1697 static std::vector<std::string> to_string_vec(const std::vector<T>& vals) {
1698 std::vector<std::string> result;
1699 result.reserve(vals.size());
1700 for (const auto& v : vals) {
1701 result.push_back(std::to_string(v));
1702 }
1703 return result;
1704 }
1705
1707 static std::string hex_encode(const std::vector<uint8_t>& bytes) {
1708 static constexpr char hex_chars[] = "0123456789abcdef";
1709 std::string result;
1710 result.reserve(bytes.size() * 2);
1711 for (uint8_t b : bytes) {
1712 result.push_back(hex_chars[(b >> 4) & 0x0F]);
1713 result.push_back(hex_chars[b & 0x0F]);
1714 }
1715 return result;
1716 }
1717
1719 static LogicalType converted_type_to_logical(ConvertedType ct) {
1720 switch (ct) {
1731 default: return LogicalType::NONE;
1732 }
1733 }
1734};
1735
1736} // namespace signet::forge
BYTE_STREAM_SPLIT encoding and decoding (Parquet encoding type 9).
PLAIN-encoded Parquet column decoder.
Parquet file reader with typed column access and full encoding support.
Definition reader.hpp:167
const std::vector< thrift::KeyValue > & key_value_metadata() const
Return the file-level key-value metadata pairs.
Definition reader.hpp:393
int64_t num_rows() const
Return the total number of rows across all row groups.
Definition reader.hpp:374
const thrift::Statistics * column_statistics(size_t row_group_index, size_t column_index) const
Access Parquet column statistics for a specific column chunk.
Definition reader.hpp:899
const Schema & schema() const
Return the file's column schema.
Definition reader.hpp:371
bool has_page_index(size_t row_group_index, size_t column_index) const
Check whether a column chunk has both ColumnIndex and OffsetIndex data.
Definition reader.hpp:1090
static void ensure_default_codecs_registered()
Ensure common compression codecs are registered in the global CodecRegistry.
Definition reader.hpp:511
expected< OffsetIndex > read_offset_index(size_t row_group_index, size_t column_index) const
Read the OffsetIndex (page locations) for a column chunk.
Definition reader.hpp:1052
int64_t num_row_groups() const
Return the number of row groups in the file.
Definition reader.hpp:377
bool bloom_might_contain(size_t row_group_index, size_t column_index, const T &value) const
Check whether a value might exist in a column using its bloom filter.
Definition reader.hpp:984
expected< std::vector< std::vector< std::string > > > read_columns(const std::vector< std::string > &column_names)
Read a subset of columns (by name) across all row groups.
Definition reader.hpp:850
expected< std::vector< T > > read_column(size_t row_group_index, size_t column_index)
Read a single column from a row group as a typed vector.
Definition reader.hpp:554
expected< std::vector< std::vector< std::string > > > read_row_group(size_t row_group_index)
Read all columns from a single row group as string vectors.
Definition reader.hpp:774
expected< SplitBlockBloomFilter > read_bloom_filter(size_t row_group_index, size_t column_index) const
Read the Split Block Bloom Filter for a column chunk, if present.
Definition reader.hpp:925
expected< ColumnIndex > read_column_index(size_t row_group_index, size_t column_index) const
Read the ColumnIndex (min/max per page) for a column chunk.
Definition reader.hpp:1010
FileStats file_stats() const
Compute aggregate statistics for the entire file.
Definition reader.hpp:443
expected< std::vector< std::vector< std::string > > > read_all()
Read the entire file as a row-major vector of string vectors.
Definition reader.hpp:804
~ParquetReader()=default
Destructor. Releases the in-memory file buffer and all decode state.
ParquetReader(ParquetReader &&) noexcept=default
Move constructor.
static expected< ParquetReader > open(const std::filesystem::path &path)
Open and parse a Parquet file, returning a ready-to-query reader.
Definition reader.hpp:189
const std::string & created_by() const
Return the created_by string from the file footer metadata.
Definition reader.hpp:386
expected< std::vector< std::string > > read_column_as_strings(size_t row_group_index, size_t column_index)
Read a column and convert every value to its string representation.
Definition reader.hpp:698
RowGroupInfo row_group(size_t index) const
Return summary metadata for a specific row group.
Definition reader.hpp:417
static std::vector< uint32_t > decode_with_length(const uint8_t *data, size_t size, int bit_width, size_t num_values)
Decode from a buffer that starts with a 4-byte LE length prefix.
Definition rle.hpp:658
Immutable schema description for a Parquet file.
Definition schema.hpp:192
size_t num_columns() const
Number of columns in this schema.
Definition schema.hpp:238
std::optional< size_t > find_column(const std::string &col_name) const
Find a column index by name.
Definition schema.hpp:261
const ColumnDescriptor & column(size_t index) const
Access a column descriptor by index.
Definition schema.hpp:244
static constexpr size_t kBytesPerBlock
Block size in bytes (32 bytes = 256 bits = 8 x uint32_t words).
static SplitBlockBloomFilter from_data(const uint8_t *src, size_t size)
Reconstruct a filter from previously serialized bytes.
A lightweight result type that holds either a success value of type T or an Error.
Definition error.hpp:145
Thrift Compact Protocol reader.
Definition compact.hpp:267
bool good() const
Returns true if no errors have occurred (no bounds violations).
Definition compact.hpp:531
Compression codec interface and registry for Signet Forge.
ColumnIndex, OffsetIndex, and ColumnIndexBuilder for predicate pushdown.
PLAIN-encoded Parquet column decoder.
Thrift Compact Protocol encoder and decoder for Parquet metadata serialization.
DELTA_BINARY_PACKED encoding and decoding (Parquet encoding type 5).
Dictionary encoding and decoding for Parquet (PLAIN_DICTIONARY / RLE_DICTIONARY).
GZIP compression codec for Signet Forge (wraps zlib).
LZ4 raw-block compression codec for Signet Forge (wraps liblz4).
Arena (bump-pointer) allocator for batch Parquet reads.
std::vector< float > decode_float(const uint8_t *data, size_t size, size_t count)
Decode float values from BYTE_STREAM_SPLIT encoding.
std::vector< double > decode_double(const uint8_t *data, size_t size, size_t count)
Decode double values from BYTE_STREAM_SPLIT encoding.
std::vector< int32_t > decode_int32(const uint8_t *data, size_t size, size_t num_values)
Decode DELTA_BINARY_PACKED data back to int32 values.
Definition delta.hpp:564
std::vector< int64_t > decode_int64(const uint8_t *data, size_t size, size_t num_values)
Decode DELTA_BINARY_PACKED data back to int64 values.
Definition delta.hpp:438
uint32_t crc32(const void *data, size_t length) noexcept
EX-1: CRC-32 (polynomial 0xEDB88320) for page checksum verification.
Definition reader.hpp:74
PhysicalType
Parquet physical (storage) types as defined in parquet.thrift.
Definition types.hpp:20
@ FIXED_LEN_BYTE_ARRAY
Fixed-length byte array (UUID, vectors, decimals).
@ INT64
64-bit signed integer (little-endian).
@ INT32
32-bit signed integer (little-endian).
@ BOOLEAN
1-bit boolean, bit-packed in pages.
@ BYTE_ARRAY
Variable-length byte sequence (strings, binary).
@ FLOAT
IEEE 754 single-precision float.
@ DOUBLE
IEEE 754 double-precision float.
constexpr uint32_t PARQUET_MAGIC_ENCRYPTED
"PARE" magic bytes (little-endian uint32) — marks a Parquet file with an encrypted footer.
Definition types.hpp:207
Compression
Parquet compression codecs.
Definition types.hpp:115
@ UNCOMPRESSED
No compression.
expected< size_t > validate_page_value_count(int64_t num_values, const char *context)
Definition reader.hpp:110
ConvertedType
Legacy Parquet converted types for backward compatibility with older readers.
Definition types.hpp:67
@ TIMESTAMP_MILLIS
Timestamp in milliseconds.
@ DECIMAL
Fixed-point decimal.
@ TIMESTAMP_MICROS
Timestamp in microseconds.
@ DATE
Date (days since epoch).
@ TIME_MILLIS
Time in milliseconds.
@ TIME_MICROS
Time in microseconds.
@ UTF8
UTF-8 encoded string.
constexpr uint8_t kEncryptedPageHeaderMagic[4]
Definition reader.hpp:120
expected< std::vector< uint8_t > > decompress(Compression codec, const uint8_t *data, size_t size, size_t uncompressed_size)
Decompress data using the specified codec via the global CodecRegistry.
Definition codec.hpp:213
void register_snappy_codec()
Register the bundled Snappy codec with the global CodecRegistry.
Definition snappy.hpp:608
LogicalType
Parquet logical types (from parquet.thrift LogicalType union).
Definition types.hpp:41
@ JSON
JSON document (stored as BYTE_ARRAY).
@ DECIMAL
Fixed-point decimal (INT32/INT64/FIXED_LEN_BYTE_ARRAY).
@ DATE
Calendar date — INT32, days since 1970-01-01.
@ STRING
UTF-8 string (stored as BYTE_ARRAY).
@ ENUM
Enum string (stored as BYTE_ARRAY).
@ TIME_MS
Time of day — INT32, milliseconds since midnight.
@ NONE
No logical annotation — raw physical type.
@ TIME_US
Time of day — INT64, microseconds since midnight.
@ BSON
BSON document (stored as BYTE_ARRAY).
@ TIMESTAMP_MS
Timestamp — INT64, milliseconds since Unix epoch.
@ TIMESTAMP_US
Timestamp — INT64, microseconds since Unix epoch.
constexpr uint32_t PARQUET_MAGIC
"PAR1" magic bytes (little-endian uint32) — marks a standard Parquet file.
Definition types.hpp:205
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ ENCRYPTION_ERROR
An encryption or decryption operation failed (bad key, tampered ciphertext, PME error).
@ UNSUPPORTED_COMPRESSION
The file uses a compression codec not linked into this build (ZSTD, LZ4, Gzip).
@ LICENSE_ERROR
The commercial license is missing, invalid, or the build is misconfigured.
@ UNSUPPORTED_TYPE
The file contains a Parquet physical or logical type that is not implemented.
@ OUT_OF_RANGE
An index, offset, or size value is outside the valid range.
@ CORRUPT_FOOTER
The Parquet footer (FileMetaData) is missing, truncated, or malformed.
@ SCHEMA_MISMATCH
The requested column name or type does not match the file schema.
@ INVALID_FILE
The file is not a valid Parquet file (e.g. missing or wrong magic bytes).
@ THRIFT_DECODE_ERROR
The Thrift Compact Protocol decoder encountered invalid or malicious input.
@ UNSUPPORTED_ENCODING
The file uses an encoding not supported by this build (e.g. BYTE_STREAM_SPLIT on integers).
@ INVALID_ARGUMENT
A caller-supplied argument is outside the valid range or violates a precondition.
@ CORRUPT_PAGE
A data page failed integrity checks (bad CRC, truncated, or exceeds size limits).
@ CORRUPT_DATA
Decoded data is corrupt or inconsistent (e.g. out-of-range dictionary index).
bool has_encrypted_page_header_prefix(const uint8_t *data, size_t size) noexcept
Definition reader.hpp:122
uint32_t load_le32(const uint8_t *data) noexcept
Definition reader.hpp:129
Encoding
Parquet page encoding types.
Definition types.hpp:98
@ DELTA_BINARY_PACKED
Delta encoding for INT32/INT64 (compact for sorted/sequential data).
@ RLE
Run-length / bit-packed hybrid (used for booleans and def/rep levels).
@ RLE_DICTIONARY
Modern dictionary encoding (Parquet 2.0) — dict page + RLE indices.
@ PLAIN_DICTIONARY
Legacy dictionary encoding (Parquet 1.0).
@ PLAIN
Values stored back-to-back in their native binary layout.
@ BYTE_STREAM_SPLIT
Byte-stream split for FLOAT/DOUBLE (transposes byte lanes for better compression).
@ DATA_PAGE_V2
Data page v2 (Parquet 2.0 format with separate rep/def level sections).
@ DICTIONARY_PAGE
Dictionary page — contains the value dictionary for RLE_DICTIONARY columns.
@ DATA_PAGE
Data page (Parquet 1.0 format).
@ REQUIRED
Exactly one value per row (non-nullable).
Parquet Modular Encryption (PME) orchestrator – encrypts and decrypts Parquet file components (footer...
RLE/Bit-Packing Hybrid encoding and decoding (Parquet spec).
Schema definition types: Column<T>, SchemaBuilder, and Schema.
Bundled, zero-dependency, header-only Snappy compression codec.
Split Block Bloom Filter as specified by the Apache Parquet format.
Descriptor for a single column in a Parquet schema.
Definition types.hpp:152
int32_t type_length
Byte length for FIXED_LEN_BYTE_ARRAY columns (-1 = N/A).
Definition types.hpp:157
LogicalType logical_type
Semantic annotation (STRING, TIMESTAMP_NS, etc.).
Definition types.hpp:155
Repetition repetition
Nullability / cardinality.
Definition types.hpp:156
std::string name
Column name (unique within a schema).
Definition types.hpp:153
int32_t scale
Decimal scale (-1 = N/A).
Definition types.hpp:159
PhysicalType physical_type
On-disk storage type.
Definition types.hpp:154
int32_t precision
Decimal precision (-1 = N/A).
Definition types.hpp:158
Per-page min/max statistics for predicate pushdown.
void deserialize(thrift::CompactDecoder &dec)
Deserialize this ColumnIndex from a Thrift compact decoder.
Lightweight error value carrying an ErrorCode and a human-readable message.
Definition error.hpp:101
std::string message
A human-readable description of what went wrong (may be empty for OK).
Definition error.hpp:105
Aggregate file-level statistics returned by ParquetReader::file_stats().
Definition types.hpp:259
int64_t total_rows
Total rows in the file.
Definition types.hpp:261
double compression_ratio
Overall uncompressed / compressed ratio.
Definition types.hpp:265
std::string created_by
"created_by" string from the footer.
Definition types.hpp:264
std::vector< ColumnFileStats > columns
Per-column statistics.
Definition types.hpp:268
int64_t num_columns
Number of columns.
Definition types.hpp:263
int64_t num_row_groups
Number of row groups.
Definition types.hpp:262
double bytes_per_row
Average file bytes per row.
Definition types.hpp:266
int64_t file_size_bytes
Total file size on disk (bytes).
Definition types.hpp:260
Page locations for random access within a column chunk.
void deserialize(thrift::CompactDecoder &dec)
Deserialize this OffsetIndex from a Thrift compact decoder.
Summary metadata for a single row group.
Definition reader.hpp:405
int64_t total_byte_size
Total serialized size in bytes (compressed).
Definition reader.hpp:407
int64_t num_rows
Number of rows in this row group.
Definition reader.hpp:406
int64_t row_group_index
Zero-based index of this row group in the file.
Definition reader.hpp:408
Parquet file metadata (parquet.thrift fields 1-7).
Definition types.hpp:2265
expected< void > deserialize(CompactDecoder &dec)
Definition types.hpp:2323
std::vector< RowGroup > row_groups
Definition types.hpp:2269
std::optional< std::string > created_by
Definition types.hpp:2271
std::optional< std::vector< KeyValue > > key_value_metadata
Definition types.hpp:2270
std::vector< SchemaElement > schema
Definition types.hpp:2267
Parquet column statistics (parquet.thrift fields 1-6).
Definition types.hpp:369
Parquet Thrift struct types – C++ structs matching parquet.thrift, with Compact Protocol serialize/de...
Parquet format enumerations, type traits, and statistics structs.
ZSTD compression codec for Signet Forge (wraps libzstd).