Signet Forge 0.1.1
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
mmap_reader.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
3#pragma once
4
20
21#include "signet/types.hpp"
22#include "signet/error.hpp"
23#include "signet/schema.hpp"
33
34#include <cstdint>
35#include <cstring>
36#include <filesystem>
37#include <stdexcept>
38#include <string>
39#include <vector>
40
41#ifndef _WIN32
42#include <fcntl.h>
43#include <sys/mman.h>
44#include <sys/stat.h>
45#include <unistd.h>
46#else
47#error "MmapReader: Windows support not yet implemented. Use ParquetReader instead."
48#endif
49
50namespace signet::forge {
51
52namespace detail_mmap_reader {
53
54inline uint32_t crc32(const void* data, size_t length) noexcept {
55 static constexpr auto make_table = []() {
56 std::array<uint32_t, 256> t{};
57 for (uint32_t i = 0; i < 256; ++i) {
58 uint32_t c = i;
59 for (int k = 0; k < 8; ++k)
60 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
61 t[i] = c;
62 }
63 return t;
64 };
65 static constexpr auto table = make_table();
66 uint32_t crc = 0xFFFFFFFFu;
67 auto* p = static_cast<const uint8_t*>(data);
68 for (size_t i = 0; i < length; ++i)
69 crc = table[(crc ^ p[i]) & 0xFFu] ^ (crc >> 8);
70 return crc ^ 0xFFFFFFFFu;
71}
72
73} // namespace detail_mmap_reader
74
83public:
91 [[nodiscard]] static expected<MmapReader> open(
92 const std::filesystem::path& path) {
93
94 // Open the file descriptor
95 int fd = ::open(path.c_str(), O_RDONLY);
96 if (fd < 0) {
98 "cannot open file: " + path.string()};
99 }
100
101 // Determine file size via fstat
102 struct stat st;
103 if (::fstat(fd, &st) != 0) {
104 ::close(fd);
106 "cannot stat file: " + path.string()};
107 }
108
109 size_t file_size = static_cast<size_t>(st.st_size);
110 if (file_size == 0) {
111 ::close(fd);
113 "file is empty: " + path.string()};
114 }
115
116 // Memory-map the file (read-only, private copy-on-write)
117 void* mapped = ::mmap(nullptr, file_size, PROT_READ, MAP_PRIVATE, fd, 0);
118 if (mapped == MAP_FAILED) {
119 ::close(fd);
121 "mmap failed: " + path.string()};
122 }
123
124 // Advise the kernel that we will read sequentially (improves readahead)
125#ifdef MADV_SEQUENTIAL
126 ::madvise(mapped, file_size, MADV_SEQUENTIAL);
127#endif
128
129#ifdef __linux__
130 // Readahead hint only (CWE-367). Deliberate volatile page faults are
131 // removed: a concurrent truncation can raise SIGBUS after any pre-fault
132 // succeeds. Pages are instead copied into owned memory in read_page_at().
133 ::madvise(mapped, file_size, MADV_WILLNEED);
134#endif
135
136 MmapReader reader;
137 reader.mapped_ = mapped;
138 reader.size_ = file_size;
139 reader.fd_ = fd;
140 return reader;
141 }
142
143 // -- Access the mapped memory --------------------------------------------
144
146 [[nodiscard]] const uint8_t* data() const {
147 return static_cast<const uint8_t*>(mapped_);
148 }
149
151 [[nodiscard]] size_t size() const { return size_; }
152
155 // CWE-125: Out-of-bounds Read — reject offset beyond mapped region
156 [[nodiscard]] const uint8_t* data_at(size_t offset) const {
157 if (offset >= size_) {
158 return nullptr;
159 }
160 return data() + offset;
161 }
162
163 // -- Close / unmap -------------------------------------------------------
164
168 void close() {
169 if (mapped_ != nullptr && mapped_ != MAP_FAILED) {
170 ::munmap(mapped_, size_);
171 mapped_ = nullptr;
172 }
173 if (fd_ >= 0) {
174 ::close(fd_);
175 fd_ = -1;
176 }
177 size_ = 0;
178 }
179
182 close();
183 }
184
185 // -- Non-copyable, movable -----------------------------------------------
186
187 MmapReader(const MmapReader&) = delete;
188 MmapReader& operator=(const MmapReader&) = delete;
189
191 MmapReader(MmapReader&& other) noexcept
192 : mapped_(other.mapped_)
193 , size_(other.size_)
194 , fd_(other.fd_)
195 {
196 other.mapped_ = nullptr;
197 other.size_ = 0;
198 other.fd_ = -1;
199 }
200
202 MmapReader& operator=(MmapReader&& other) noexcept {
203 if (this != &other) {
204 close();
205 mapped_ = other.mapped_;
206 size_ = other.size_;
207 fd_ = other.fd_;
208 other.mapped_ = nullptr;
209 other.size_ = 0;
210 other.fd_ = -1;
211 }
212 return *this;
213 }
214
216 [[nodiscard]] bool is_open() const {
217 return mapped_ != nullptr && mapped_ != MAP_FAILED;
218 }
219
221 MmapReader() = default;
222
223private:
224 void* mapped_ = nullptr;
225 size_t size_ = 0;
226 int fd_ = -1;
227};
228
247static constexpr int64_t MMAP_MAX_VALUES_PER_PAGE = 100'000'000;
248
250 int64_t num_values,
251 const char* context) {
252 if (num_values < 0 || num_values > MMAP_MAX_VALUES_PER_PAGE) {
254 std::string(context) + ": num_values out of valid range"};
255 }
256 return static_cast<size_t>(num_values);
257}
258
260public:
268 [[nodiscard]] static expected<MmapParquetReader> open(
269 const std::filesystem::path& path) {
270
271 // Memory-map the file
272 auto mmap_result = MmapReader::open(path);
273 if (!mmap_result) return mmap_result.error();
274
275 auto mmap = std::move(*mmap_result);
276
277 const uint8_t* file_data = mmap.data();
278 const size_t sz = mmap.size();
279
280 // --- Validate minimum size: 4 (magic) + 4 (footer len) + 4 (magic) = 12 ---
281 if (sz < 12) {
283 "file too small to be a valid Parquet file: " +
284 path.string()};
285 }
286
287 // --- Verify PAR1 magic at start ---
288 uint32_t magic_start;
289 std::memcpy(&magic_start, file_data, 4);
290 if (magic_start != PARQUET_MAGIC) {
292 "missing PAR1 magic at start of file"};
293 }
294
295 // --- Verify PAR1 or PARE magic at end ---
296 uint32_t magic_end;
297 std::memcpy(&magic_end, file_data + sz - 4, 4);
298
299 if (magic_end != PARQUET_MAGIC && magic_end != PARQUET_MAGIC_ENCRYPTED) {
301 "missing PAR1/PARE magic at end of file"};
302 }
303
304 if (magic_end == PARQUET_MAGIC_ENCRYPTED) {
306 "MmapParquetReader does not support encrypted footers; "
307 "use ParquetReader with an EncryptionConfig instead"};
308 }
309
310 // --- Read footer length (4-byte LE uint32 at [size-8, size-4]) ---
311 uint32_t footer_len;
312 std::memcpy(&footer_len, file_data + sz - 8, 4);
313
314 if (footer_len == 0 || static_cast<size_t>(footer_len) > sz - 12) {
316 "invalid footer length: " + std::to_string(footer_len)};
317 }
318
319 // --- Deserialize FileMetaData from footer (directly from mmap) ---
320 size_t footer_offset = sz - 8 - footer_len;
321 const uint8_t* footer_ptr = file_data + footer_offset;
322
323 thrift::CompactDecoder dec(footer_ptr, footer_len);
324 thrift::FileMetaData metadata;
325 if (auto r = metadata.deserialize(dec); !r.has_value()) {
326 return r.error();
327 }
328
329 // --- Build Schema from FileMetaData.schema ---
330 std::string schema_name;
331 std::vector<ColumnDescriptor> columns;
332
333 if (!metadata.schema.empty()) {
334 schema_name = metadata.schema[0].name;
335
336 for (size_t i = 1; i < metadata.schema.size(); ++i) {
337 const auto& elem = metadata.schema[i];
338
339 // Skip group nodes (those with num_children set)
340 if (elem.num_children.has_value()) {
341 continue;
342 }
343
345 cd.name = elem.name;
346 cd.physical_type = elem.type.value_or(PhysicalType::BYTE_ARRAY);
347 cd.repetition = elem.repetition_type.value_or(Repetition::REQUIRED);
348
349 if (elem.type_length.has_value()) {
350 cd.type_length = *elem.type_length;
351 }
352 if (elem.precision.has_value()) {
353 cd.precision = *elem.precision;
354 }
355 if (elem.scale.has_value()) {
356 cd.scale = *elem.scale;
357 }
358
359 // Map ConvertedType to LogicalType for common cases
360 if (elem.converted_type.has_value()) {
361 cd.logical_type = converted_type_to_logical(*elem.converted_type);
362 }
363
364 columns.push_back(std::move(cd));
365 }
366 }
367
368 // --- Assemble the reader ---
369 MmapParquetReader reader;
370 reader.mmap_ = std::move(mmap);
371 reader.metadata_ = std::move(metadata);
372 reader.schema_ = Schema(std::move(schema_name), std::move(columns));
373 reader.created_by_ = reader.metadata_.created_by.value_or("");
374
375 return reader;
376 }
377
378 // -- File metadata accessors ---------------------------------------------
379
381 [[nodiscard]] const Schema& schema() const { return schema_; }
382
384 [[nodiscard]] int64_t num_rows() const { return metadata_.num_rows; }
385
387 [[nodiscard]] int64_t num_row_groups() const {
388 return static_cast<int64_t>(metadata_.row_groups.size());
389 }
390
392 [[nodiscard]] const std::string& created_by() const { return created_by_; }
393
396 [[nodiscard]] const std::vector<thrift::KeyValue>& key_value_metadata() const {
397 static const std::vector<thrift::KeyValue> empty;
398 return metadata_.key_value_metadata.has_value()
399 ? *metadata_.key_value_metadata
400 : empty;
401 }
402
403 // -- Row group info ------------------------------------------------------
404
407 int64_t num_rows;
410 };
411
416 [[nodiscard]] RowGroupInfo row_group(size_t index) const {
417 if (index >= metadata_.row_groups.size()) {
418 throw std::out_of_range("MmapParquetReader::row_group: index " +
419 std::to_string(index) + " >= " +
420 std::to_string(metadata_.row_groups.size()));
421 }
422 const auto& rg = metadata_.row_groups[index];
423 return {rg.num_rows,
424 rg.total_byte_size,
425 static_cast<int64_t>(index)};
426 }
427
428 // -- Statistics for a column in a row group ------------------------------
429
436 size_t row_group_index, size_t column_index) const {
437 if (row_group_index >= metadata_.row_groups.size()) return nullptr;
438 const auto& rg = metadata_.row_groups[row_group_index];
439 if (column_index >= rg.columns.size()) return nullptr;
440 const auto& chunk = rg.columns[column_index];
441 if (!chunk.meta_data.has_value()) return nullptr;
442 if (!chunk.meta_data->statistics.has_value()) return nullptr;
443 return &(*chunk.meta_data->statistics);
444 }
445
446 // -- Typed column reads --------------------------------------------------
447
459 template <typename T>
460 expected<std::vector<T>> read_column(size_t row_group_index,
461 size_t column_index) {
462 // --- Validate indices ---
463 if (row_group_index >= metadata_.row_groups.size()) {
465 "row group index out of range"};
466 }
467 if (column_index >= schema_.num_columns()) {
469 "column index out of range"};
470 }
471
472 const auto& rg = metadata_.row_groups[row_group_index];
473 if (column_index >= rg.columns.size()) {
475 "column index out of range"};
476 }
477
478 const auto& chunk = rg.columns[column_index];
479 if (!chunk.meta_data.has_value()) {
481 "column chunk has no metadata"};
482 }
483 const auto& col_meta = *chunk.meta_data;
484
485 // --- Detect encoding strategy ---
486 bool has_dict = false;
487 Encoding data_encoding = Encoding::PLAIN;
488
489 for (auto enc : col_meta.encodings) {
490 if (enc == Encoding::PLAIN_DICTIONARY ||
492 has_dict = true;
493 }
495 data_encoding = Encoding::DELTA_BINARY_PACKED;
496 }
497 if (enc == Encoding::BYTE_STREAM_SPLIT) {
498 data_encoding = Encoding::BYTE_STREAM_SPLIT;
499 }
500 if (enc == Encoding::RLE &&
501 col_meta.type == PhysicalType::BOOLEAN) {
502 data_encoding = Encoding::RLE;
503 }
504 }
505
506 // --- Dictionary encoding path ---
507 if (has_dict) {
508 if constexpr (std::is_same_v<T, std::string> ||
509 std::is_same_v<T, int32_t> ||
510 std::is_same_v<T, int64_t> ||
511 std::is_same_v<T, float> ||
512 std::is_same_v<T, double>) {
513 return read_column_dict<T>(col_meta,
514 static_cast<int32_t>(row_group_index));
515 } else {
517 "dictionary encoding not supported for this type"};
518 }
519 }
520
521 // --- DELTA_BINARY_PACKED path ---
522 if (data_encoding == Encoding::DELTA_BINARY_PACKED) {
523 if constexpr (std::is_same_v<T, int32_t> ||
524 std::is_same_v<T, int64_t>) {
525 return read_column_delta<T>(col_meta);
526 } else {
528 "DELTA_BINARY_PACKED only supports INT32/INT64"};
529 }
530 }
531
532 // --- BYTE_STREAM_SPLIT path ---
533 if (data_encoding == Encoding::BYTE_STREAM_SPLIT) {
534 if constexpr (std::is_same_v<T, float> ||
535 std::is_same_v<T, double>) {
536 return read_column_bss<T>(col_meta);
537 } else {
539 "BYTE_STREAM_SPLIT only supports FLOAT/DOUBLE"};
540 }
541 }
542
543 // --- RLE boolean path ---
544 if (data_encoding == Encoding::RLE &&
545 col_meta.type == PhysicalType::BOOLEAN) {
546 if constexpr (std::is_same_v<T, bool>) {
547 return read_column_rle_bool(col_meta);
548 } else {
550 "RLE boolean encoding requires bool type"};
551 }
552 }
553
554 // --- Default: PLAIN encoding via ColumnReader ---
555 auto reader_result = make_column_reader(row_group_index, column_index);
556 if (!reader_result) return reader_result.error();
557
558 auto& [col_reader, num_values] = reader_result.value();
559 size_t count = static_cast<size_t>(num_values);
560
561 if constexpr (std::is_same_v<T, bool>) {
562 std::vector<bool> values;
563 values.reserve(count);
564 for (size_t i = 0; i < count; ++i) {
565 auto val = col_reader.template read<bool>();
566 if (!val) return val.error();
567 values.push_back(*val);
568 }
569 return values;
570 } else {
571 std::vector<T> values(count);
572 auto batch_result = col_reader.template read_batch<T>(
573 values.data(), count);
574 if (!batch_result) return batch_result.error();
575 return values;
576 }
577 }
578
579 // -- String reads --------------------------------------------------------
580
591 size_t row_group_index, size_t column_index) {
592 if (row_group_index >= metadata_.row_groups.size()) {
593 return Error{ErrorCode::OUT_OF_RANGE, "row group index out of range"};
594 }
595 if (column_index >= schema_.num_columns()) {
596 return Error{ErrorCode::OUT_OF_RANGE, "column index out of range"};
597 }
598
599 PhysicalType pt = schema_.column(column_index).physical_type;
600
601 switch (pt) {
603 auto res = read_column<bool>(row_group_index, column_index);
604 if (!res) return res.error();
605 return to_string_vec(res.value());
606 }
607 case PhysicalType::INT32: {
608 auto res = read_column<int32_t>(row_group_index, column_index);
609 if (!res) return res.error();
610 return to_string_vec(res.value());
611 }
612 case PhysicalType::INT64: {
613 auto res = read_column<int64_t>(row_group_index, column_index);
614 if (!res) return res.error();
615 return to_string_vec(res.value());
616 }
617 case PhysicalType::FLOAT: {
618 auto res = read_column<float>(row_group_index, column_index);
619 if (!res) return res.error();
620 return to_string_vec(res.value());
621 }
623 auto res = read_column<double>(row_group_index, column_index);
624 if (!res) return res.error();
625 return to_string_vec(res.value());
626 }
628 return read_column<std::string>(row_group_index, column_index);
629 }
631 auto reader_result = make_column_reader(row_group_index, column_index);
632 if (!reader_result) return reader_result.error();
633 auto& [col_reader, num_values] = reader_result.value();
634
635 std::vector<std::string> result;
636 result.reserve(static_cast<size_t>(num_values));
637 for (int64_t i = 0; i < num_values; ++i) {
638 auto bytes_result = col_reader.read_bytes();
639 if (!bytes_result) return bytes_result.error();
640 result.push_back(hex_encode(bytes_result.value()));
641 }
642 return result;
643 }
644 default:
646 "unsupported physical type for string conversion"};
647 }
648 }
649
650 // -- Read all rows -------------------------------------------------------
651
660 static constexpr size_t MAX_READ_ALL_ROWS = 100'000'000; // 100M row safety cap
661 size_t num_cols = schema_.num_columns();
662 if (metadata_.num_rows < 0 ||
663 static_cast<size_t>(metadata_.num_rows) > MAX_READ_ALL_ROWS) {
665 "read_all: num_rows exceeds safety cap ("
666 + std::to_string(MAX_READ_ALL_ROWS) + ")"};
667 }
668 std::vector<std::vector<std::string>> rows;
669 rows.reserve(static_cast<size_t>(metadata_.num_rows));
670
671 for (size_t rg = 0; rg < metadata_.row_groups.size(); ++rg) {
672 // Read all columns for this row group
673 size_t rg_num_cols = num_cols;
674 std::vector<std::vector<std::string>> col_data(rg_num_cols);
675 for (size_t c = 0; c < rg_num_cols; ++c) {
676 auto res = read_column_as_strings(rg, c);
677 if (!res) return res.error();
678 col_data[c] = std::move(res.value());
679 }
680
681 if (col_data.empty() || col_data[0].empty()) continue;
682
683 size_t rg_rows = col_data[0].size();
684 for (size_t r = 0; r < rg_rows; ++r) {
685 std::vector<std::string> row(num_cols);
686 for (size_t c = 0; c < num_cols; ++c) {
687 if (r < col_data[c].size()) {
688 row[c] = col_data[c][r];
689 }
690 }
691 rows.push_back(std::move(row));
692 }
693 }
694
695 return rows;
696 }
697
698 // -- Access the underlying mmap ------------------------------------------
699
701 [[nodiscard]] const MmapReader& mmap() const { return mmap_; }
702
703 // -- Special members -----------------------------------------------------
704
705 ~MmapParquetReader() = default;
706 MmapParquetReader(MmapParquetReader&&) noexcept = default;
707 MmapParquetReader& operator=(MmapParquetReader&&) noexcept = default;
708
709private:
711 MmapParquetReader() = default;
712
713 MmapReader mmap_;
714 thrift::FileMetaData metadata_;
715 Schema schema_;
716 std::string created_by_;
717
718 // Holds decompressed page data so ColumnReader pointers remain valid
719 std::vector<std::vector<uint8_t>> decompressed_buffers_;
720
722 struct ColumnReaderWithCount {
723 ColumnReader reader;
724 int64_t num_values;
725 };
726
728 struct PageReadResult {
729 const uint8_t* data;
730 size_t size;
731 thrift::PageHeader header;
732 };
733
735 expected<PageReadResult> read_page_at(int64_t offset, Compression codec) {
736 if (offset < 0 || static_cast<size_t>(offset) >= mmap_.size()) {
737 return Error{ErrorCode::CORRUPT_PAGE,
738 "page offset out of file bounds"};
739 }
740
741 size_t remaining = mmap_.size() - static_cast<size_t>(offset);
742 const uint8_t* mapped_start = mmap_.data_at(static_cast<size_t>(offset));
743
744 // CWE-367: Copy the Thrift page header bytes into owned memory before
745 // parsing. This prevents SIGBUS if another process truncates the backing
746 // file after open() but before we finish reading. A 4 KB window is far
747 // larger than any real Parquet page header.
748 constexpr size_t kHdrWindow = 4096;
749 size_t hdr_window = (std::min)(remaining, kHdrWindow);
750 std::vector<uint8_t> hdr_copy(mapped_start, mapped_start + hdr_window);
751
752 thrift::CompactDecoder page_dec(hdr_copy.data(), hdr_copy.size());
753 thrift::PageHeader ph;
754 if (auto r = ph.deserialize(page_dec); !r.has_value()) {
755 return r.error();
756 }
757
758 size_t hdr_size = page_dec.position();
759
760 // Reject negative page sizes from crafted files (CWE-191)
761 if (ph.compressed_page_size < 0 || ph.uncompressed_page_size < 0) {
762 return Error{ErrorCode::CORRUPT_PAGE,
763 "mmap: negative page size in PageHeader"};
764 }
765
766 size_t compressed_size = static_cast<size_t>(ph.compressed_page_size);
767
768 if (hdr_size + compressed_size > remaining) {
769 return Error{ErrorCode::CORRUPT_PAGE,
770 "page data extends past end of file"};
771 }
772
773 // Copy the page payload into owned memory before CRC validation and
774 // decompression. This completes the CWE-367 guard: neither the header
775 // copy above nor this copy reads from mapped memory after this point.
776 std::vector<uint8_t> payload_copy(
777 mapped_start + hdr_size,
778 mapped_start + hdr_size + compressed_size);
779 const uint8_t* pdata = payload_copy.data();
780 size_t pdata_size = compressed_size;
781
782 if (ph.crc.has_value()) {
783 uint32_t expected_crc = static_cast<uint32_t>(*ph.crc);
784 uint32_t computed_crc = detail_mmap_reader::crc32(pdata, pdata_size);
785 if (computed_crc != expected_crc) {
786 return Error{ErrorCode::CORRUPT_PAGE,
787 "mmap: page CRC-32 mismatch at offset " + std::to_string(offset)};
788 }
789 }
790
791 // Decompress if needed
792 if (codec != Compression::UNCOMPRESSED) {
793 size_t uncompressed_size = static_cast<size_t>(
794 ph.uncompressed_page_size);
795 // L-9: Pre-validate decompressed size to prevent allocation bombs (CWE-770)
796 static constexpr size_t MMAP_MAX_PAGE_SIZE2 = 256ULL * 1024ULL * 1024ULL;
797 if (uncompressed_size == 0 || uncompressed_size > MMAP_MAX_PAGE_SIZE2) {
798 return Error{ErrorCode::CORRUPT_PAGE,
799 "mmap: uncompressed page size out of range (0 or > 256 MB)"};
800 }
801 // CWE-409: Improper Handling of Highly Compressed Data (Zip Bomb)
802 // M21: Reject suspiciously high decompression ratios (zip bomb guard)
803 if (compressed_size > 0 && uncompressed_size / compressed_size > 1024) {
804 return Error{ErrorCode::CORRUPT_DATA,
805 "mmap: decompression ratio exceeds 1024x limit"};
806 }
807 // payload_copy is the input; decompressed result is kept alive in member.
808 auto dec_result = decompress(codec, pdata, pdata_size,
809 uncompressed_size);
810 if (!dec_result) {
812 "decompression failed: " +
813 dec_result.error().message};
814 }
815 decompressed_buffers_.push_back(std::move(dec_result.value()));
816 pdata = decompressed_buffers_.back().data();
817 pdata_size = decompressed_buffers_.back().size();
818 } else {
819 // For uncompressed pages, move the payload copy into the member
820 // buffer so that PageReadResult::data remains valid after return.
821 decompressed_buffers_.push_back(std::move(payload_copy));
822 pdata = decompressed_buffers_.back().data();
823 }
824
825 return PageReadResult{pdata, pdata_size, std::move(ph)};
826 }
827
829 expected<ColumnReaderWithCount> make_column_reader(
830 size_t row_group_index, size_t column_index) {
831 if (row_group_index >= metadata_.row_groups.size()) {
832 return Error{ErrorCode::OUT_OF_RANGE,
833 "row group index out of range"};
834 }
835
836 const auto& rg = metadata_.row_groups[row_group_index];
837 if (column_index >= rg.columns.size()) {
838 return Error{ErrorCode::OUT_OF_RANGE,
839 "column index out of range"};
840 }
841
842 const auto& chunk = rg.columns[column_index];
843 if (!chunk.meta_data.has_value()) {
844 return Error{ErrorCode::CORRUPT_PAGE,
845 "column chunk has no metadata"};
846 }
847
848 const auto& col_meta = *chunk.meta_data;
849 int64_t offset = col_meta.data_page_offset;
850
851 if (offset < 0 || static_cast<size_t>(offset) >= mmap_.size()) {
852 return Error{ErrorCode::CORRUPT_PAGE,
853 "data_page_offset out of file bounds"};
854 }
855
856 // Read from mmap directly -- no file I/O
857 size_t remaining = mmap_.size() - static_cast<size_t>(offset);
858 const uint8_t* page_start = mmap_.data_at(static_cast<size_t>(offset));
859
860 // Deserialize the PageHeader
861 thrift::CompactDecoder page_dec(page_start, remaining);
862 thrift::PageHeader page_header;
863 if (auto r = page_header.deserialize(page_dec); !r.has_value()) {
864 return r.error();
865 }
866
867 size_t header_size = page_dec.position();
868 // F1: Validate sign before cast — a negative compressed_page_size wraps
869 // to a huge size_t, enabling OOB reads in the CRC and decompress paths
870 // before the downstream bounds gate fires (CWE-190, CWE-125).
871 if (page_header.compressed_page_size <= 0) {
872 return Error{ErrorCode::CORRUPT_PAGE,
873 "mmap: compressed_page_size must be positive"};
874 }
875 size_t page_data_size = static_cast<size_t>(
876 page_header.compressed_page_size);
877 const uint8_t* page_data = page_start + header_size;
878
879 // Bounds-check BEFORE any memory access (CRC or decompress).
880 // Moving this gate ahead of the CRC closes the OOB-read window.
881 if (header_size + page_data_size > remaining) {
882 return Error{ErrorCode::CORRUPT_PAGE,
883 "page data extends past end of file"};
884 }
885
886 if (page_header.crc.has_value()) {
887 uint32_t expected_crc = static_cast<uint32_t>(*page_header.crc);
888 uint32_t computed_crc = detail_mmap_reader::crc32(page_data, page_data_size);
889 if (computed_crc != expected_crc) {
890 return Error{ErrorCode::CORRUPT_PAGE,
891 "mmap: page CRC-32 mismatch"};
892 }
893 }
894
895 // Decompress if needed
896 if (col_meta.codec != Compression::UNCOMPRESSED) {
897 size_t uncompressed_size = static_cast<size_t>(
898 page_header.uncompressed_page_size);
899 // L-9: Pre-validate decompressed size to prevent allocation bombs (CWE-770)
900 static constexpr size_t MMAP_MAX_PAGE_SIZE = 256ULL * 1024ULL * 1024ULL;
901 if (uncompressed_size == 0 || uncompressed_size > MMAP_MAX_PAGE_SIZE) {
902 return Error{ErrorCode::CORRUPT_PAGE,
903 "mmap: uncompressed page size out of range (0 or > 256 MB)"};
904 }
905 // CWE-409: Improper Handling of Highly Compressed Data (Zip Bomb)
906 // M21: Reject suspiciously high decompression ratios (zip bomb guard)
907 if (page_data_size > 0 && uncompressed_size / page_data_size > 1024) {
908 return Error{ErrorCode::CORRUPT_DATA,
909 "mmap: decompression ratio exceeds 1024x limit"};
910 }
911 auto decompressed = decompress(col_meta.codec,
912 page_data, page_data_size,
913 uncompressed_size);
914 if (!decompressed) {
916 "decompression failed: " +
917 decompressed.error().message};
918 }
919 decompressed_buffers_.push_back(std::move(decompressed.value()));
920 page_data = decompressed_buffers_.back().data();
921 page_data_size = decompressed_buffers_.back().size();
922 }
923
924 // Determine num_values
925 int64_t num_values = 0;
926 if (page_header.type == PageType::DATA_PAGE &&
927 page_header.data_page_header.has_value()) {
928 num_values = page_header.data_page_header->num_values;
929 } else if (page_header.type == PageType::DATA_PAGE_V2 &&
930 page_header.data_page_header_v2.has_value()) {
931 num_values = page_header.data_page_header_v2->num_values;
932 } else {
933 num_values = col_meta.num_values;
934 }
935
936 if (num_values < 0 || num_values > MMAP_MAX_VALUES_PER_PAGE) {
937 return Error{ErrorCode::CORRUPT_PAGE,
938 "mmap: num_values out of range (" +
939 std::to_string(num_values) + ")"};
940 }
941
942 // Determine physical type and type_length
943 PhysicalType pt = col_meta.type;
944 int32_t type_length = -1;
945 if (column_index < schema_.num_columns()) {
946 type_length = schema_.column(column_index).type_length;
947 }
948
949 ColumnReader col_reader(pt, page_data, page_data_size,
950 num_values, type_length);
951
952 return ColumnReaderWithCount{std::move(col_reader), num_values};
953 }
954
956 template <typename T>
957 expected<std::vector<T>> read_column_dict(
958 const thrift::ColumnMetaData& col_meta,
959 int32_t rg_index = 0) {
960 (void)rg_index;
961
962 int64_t dict_offset = col_meta.dictionary_page_offset.value_or(
963 col_meta.data_page_offset);
964
965 // Read the dictionary page
966 auto dict_page_result = read_page_at(dict_offset, col_meta.codec);
967 if (!dict_page_result) return dict_page_result.error();
968
969 auto& dict_pr = dict_page_result.value();
970 if (dict_pr.header.type != PageType::DICTIONARY_PAGE ||
971 !dict_pr.header.dictionary_page_header.has_value()) {
972 return Error{ErrorCode::CORRUPT_PAGE,
973 "expected DICTIONARY_PAGE at dictionary offset"};
974 }
975
976 int32_t raw_dict_count = dict_pr.header.dictionary_page_header->num_values;
977 if (raw_dict_count < 0 || raw_dict_count > 10'000'000) {
978 return Error{ErrorCode::CORRUPT_PAGE,
979 "mmap: dictionary page num_values out of valid range"};
980 }
981 size_t num_dict_entries = static_cast<size_t>(raw_dict_count);
982
983 // Find the data page offset
984 int64_t data_offset = col_meta.data_page_offset;
985 if (data_offset == dict_offset) {
986 // Dictionary and data pages are sequential -- skip past the
987 // dictionary page to reach the data page.
988 size_t dict_raw_start = static_cast<size_t>(dict_offset);
989 const uint8_t* dict_start = mmap_.data_at(dict_raw_start);
990 size_t dict_remaining = mmap_.size() - dict_raw_start;
991
992 thrift::CompactDecoder hdr_dec(dict_start, dict_remaining);
993 thrift::PageHeader tmp_hdr;
994 if (auto r = tmp_hdr.deserialize(hdr_dec); !r.has_value()) {
995 return r.error();
996 }
997 size_t dict_hdr_size = hdr_dec.position();
998 size_t dict_compressed_size = static_cast<size_t>(
999 tmp_hdr.compressed_page_size);
1000
1001 data_offset = dict_offset
1002 + static_cast<int64_t>(dict_hdr_size)
1003 + static_cast<int64_t>(dict_compressed_size);
1004 }
1005
1006 // Read the data page (RLE-encoded indices)
1007 auto data_page_result = read_page_at(data_offset, col_meta.codec);
1008 if (!data_page_result) return data_page_result.error();
1009
1010 auto& data_pr = data_page_result.value();
1011
1012 int64_t num_values = 0;
1013 if (data_pr.header.type == PageType::DATA_PAGE &&
1014 data_pr.header.data_page_header.has_value()) {
1015 num_values = data_pr.header.data_page_header->num_values;
1016 } else if (data_pr.header.type == PageType::DATA_PAGE_V2 &&
1017 data_pr.header.data_page_header_v2.has_value()) {
1018 num_values = data_pr.header.data_page_header_v2->num_values;
1019 } else {
1020 num_values = col_meta.num_values;
1021 }
1022 if (num_values < 0 || num_values > 100'000'000)
1023 return Error{ErrorCode::CORRUPT_DATA, "Invalid num_values in page header"};
1024
1025 DictionaryDecoder<T> decoder(dict_pr.data, dict_pr.size,
1026 num_dict_entries, col_meta.type);
1027
1028 return decoder.decode(data_pr.data, data_pr.size,
1029 static_cast<size_t>(num_values));
1030 }
1031
1033 template <typename T>
1034 expected<std::vector<T>> read_column_delta(
1035 const thrift::ColumnMetaData& col_meta) {
1036 auto page_result = read_page_at(col_meta.data_page_offset,
1037 col_meta.codec);
1038 if (!page_result) return page_result.error();
1039
1040 auto& pr = page_result.value();
1041
1042 int64_t num_values = 0;
1043 if (pr.header.type == PageType::DATA_PAGE &&
1044 pr.header.data_page_header.has_value()) {
1045 num_values = pr.header.data_page_header->num_values;
1046 } else if (pr.header.type == PageType::DATA_PAGE_V2 &&
1047 pr.header.data_page_header_v2.has_value()) {
1048 num_values = pr.header.data_page_header_v2->num_values;
1049 } else {
1050 num_values = col_meta.num_values;
1051 }
1052 auto count_result = validate_mmap_page_value_count(num_values, "MmapParquetReader DELTA_BINARY_PACKED");
1053 if (!count_result) return count_result.error();
1054 size_t count = *count_result;
1055
1056 if constexpr (std::is_same_v<T, int32_t>) {
1057 return delta::decode_int32(pr.data, pr.size, count);
1058 } else if constexpr (std::is_same_v<T, int64_t>) {
1059 return delta::decode_int64(pr.data, pr.size, count);
1060 } else {
1062 "DELTA_BINARY_PACKED only supports INT32/INT64"};
1063 }
1064 }
1065
1067 template <typename T>
1068 expected<std::vector<T>> read_column_bss(
1069 const thrift::ColumnMetaData& col_meta) {
1070 auto page_result = read_page_at(col_meta.data_page_offset,
1071 col_meta.codec);
1072 if (!page_result) return page_result.error();
1073
1074 auto& pr = page_result.value();
1075
1076 int64_t num_values = 0;
1077 if (pr.header.type == PageType::DATA_PAGE &&
1078 pr.header.data_page_header.has_value()) {
1079 num_values = pr.header.data_page_header->num_values;
1080 } else if (pr.header.type == PageType::DATA_PAGE_V2 &&
1081 pr.header.data_page_header_v2.has_value()) {
1082 num_values = pr.header.data_page_header_v2->num_values;
1083 } else {
1084 num_values = col_meta.num_values;
1085 }
1086 auto count_result = validate_mmap_page_value_count(num_values, "MmapParquetReader BYTE_STREAM_SPLIT");
1087 if (!count_result) return count_result.error();
1088 size_t count = *count_result;
1089
1090 if constexpr (std::is_same_v<T, float>) {
1091 return byte_stream_split::decode_float(pr.data, pr.size, count);
1092 } else if constexpr (std::is_same_v<T, double>) {
1093 return byte_stream_split::decode_double(pr.data, pr.size, count);
1094 } else {
1096 "BYTE_STREAM_SPLIT only supports FLOAT/DOUBLE"};
1097 }
1098 }
1099
1101 expected<std::vector<bool>> read_column_rle_bool(
1102 const thrift::ColumnMetaData& col_meta) {
1103 auto page_result = read_page_at(col_meta.data_page_offset,
1104 col_meta.codec);
1105 if (!page_result) return page_result.error();
1106
1107 auto& pr = page_result.value();
1108
1109 int64_t num_values = 0;
1110 if (pr.header.type == PageType::DATA_PAGE &&
1111 pr.header.data_page_header.has_value()) {
1112 num_values = pr.header.data_page_header->num_values;
1113 } else if (pr.header.type == PageType::DATA_PAGE_V2 &&
1114 pr.header.data_page_header_v2.has_value()) {
1115 num_values = pr.header.data_page_header_v2->num_values;
1116 } else {
1117 num_values = col_meta.num_values;
1118 }
1119 auto count_result = validate_mmap_page_value_count(num_values, "MmapParquetReader RLE_BOOL");
1120 if (!count_result) return count_result.error();
1121 size_t count = *count_result;
1122
1123 // RLE boolean: 4-byte LE length prefix + RLE payload, bit_width=1
1124 auto indices = RleDecoder::decode_with_length(
1125 pr.data, pr.size, /*bit_width=*/1, count);
1126
1127 std::vector<bool> result;
1128 result.reserve(count);
1129 for (size_t i = 0; i < count && i < indices.size(); ++i) {
1130 result.push_back(indices[i] != 0);
1131 }
1132 return result;
1133 }
1134
1135 // -------------------------------------------------------------------
1136 // String conversion helpers
1137 // -------------------------------------------------------------------
1138
1140 static std::vector<std::string> to_string_vec(const std::vector<bool>& vals) {
1141 std::vector<std::string> result;
1142 result.reserve(vals.size());
1143 for (bool v : vals) {
1144 result.push_back(v ? "true" : "false");
1145 }
1146 return result;
1147 }
1148
1150 template <typename T>
1151 static std::vector<std::string> to_string_vec(const std::vector<T>& vals) {
1152 std::vector<std::string> result;
1153 result.reserve(vals.size());
1154 for (const auto& v : vals) {
1155 result.push_back(std::to_string(v));
1156 }
1157 return result;
1158 }
1159
1161 static std::string hex_encode(const std::vector<uint8_t>& bytes) {
1162 static constexpr char hex_chars[] = "0123456789abcdef";
1163 std::string result;
1164 result.reserve(bytes.size() * 2);
1165 for (uint8_t b : bytes) {
1166 result.push_back(hex_chars[(b >> 4) & 0x0F]);
1167 result.push_back(hex_chars[b & 0x0F]);
1168 }
1169 return result;
1170 }
1171
1173 static LogicalType converted_type_to_logical(ConvertedType ct) {
1174 switch (ct) {
1185 default: return LogicalType::NONE;
1186 }
1187 }
1188};
1189
1190} // namespace signet::forge
BYTE_STREAM_SPLIT encoding and decoding (Parquet encoding type 9).
PLAIN-encoded Parquet column decoder.
MmapParquetReader(MmapParquetReader &&) noexcept=default
Move-constructible.
const Schema & schema() const
The file's column schema.
const std::string & created_by() const
The "created by" string from the file footer (may be empty).
const std::vector< thrift::KeyValue > & key_value_metadata() const
User-defined key-value metadata from the file footer.
int64_t num_row_groups() const
Number of row groups in the file.
const MmapReader & mmap() const
Direct access to the memory-mapped file data.
expected< std::vector< std::string > > read_column_as_strings(size_t row_group_index, size_t column_index)
Read a column and convert all values to their string representations.
expected< std::vector< std::vector< std::string > > > read_all()
Read all rows from all row groups as a vector of string rows.
int64_t num_rows() const
Total number of rows across all row groups.
expected< std::vector< T > > read_column(size_t row_group_index, size_t column_index)
Read an entire column from a row group as a typed vector.
~MmapParquetReader()=default
Default destructor.
RowGroupInfo row_group(size_t index) const
Retrieve summary information for a specific row group.
const thrift::Statistics * column_statistics(size_t row_group_index, size_t column_index) const
Retrieve the Thrift Statistics for a column chunk.
static expected< MmapParquetReader > open(const std::filesystem::path &path)
Open a Parquet file with memory-mapped I/O.
Low-level memory-mapped file handle.
MmapReader & operator=(MmapReader &&other) noexcept
Move assignment – transfers ownership of the mapping.
size_t size() const
Total file size in bytes.
const uint8_t * data() const
Pointer to the start of the mapped file.
static expected< MmapReader > open(const std::filesystem::path &path)
Open a file and memory-map it read-only.
bool is_open() const
Returns true if the mapping is currently active.
MmapReader & operator=(const MmapReader &)=delete
Non-copyable.
~MmapReader()
Destructor – unmaps file and closes fd.
void close()
Unmap the file and close the file descriptor.
MmapReader()=default
Default constructor — creates a closed/unmapped reader.
MmapReader(const MmapReader &)=delete
Non-copyable.
const uint8_t * data_at(size_t offset) const
Pointer to mapped memory at a given offset.
MmapReader(MmapReader &&other) noexcept
Move constructor – transfers ownership of the mapping.
static std::vector< uint32_t > decode_with_length(const uint8_t *data, size_t size, int bit_width, size_t num_values)
Decode from a buffer that starts with a 4-byte LE length prefix.
Definition rle.hpp:658
Immutable schema description for a Parquet file.
Definition schema.hpp:192
size_t num_columns() const
Number of columns in this schema.
Definition schema.hpp:238
const ColumnDescriptor & column(size_t index) const
Access a column descriptor by index.
Definition schema.hpp:244
A lightweight result type that holds either a success value of type T or an Error.
Definition error.hpp:143
Thrift Compact Protocol reader.
Definition compact.hpp:267
Compression codec interface and registry for Signet Forge.
PLAIN-encoded Parquet column decoder.
Thrift Compact Protocol encoder and decoder for Parquet metadata serialization.
DELTA_BINARY_PACKED encoding and decoding (Parquet encoding type 5).
Dictionary encoding and decoding for Parquet (PLAIN_DICTIONARY / RLE_DICTIONARY).
std::vector< float > decode_float(const uint8_t *data, size_t size, size_t count)
Decode float values from BYTE_STREAM_SPLIT encoding.
std::vector< double > decode_double(const uint8_t *data, size_t size, size_t count)
Decode double values from BYTE_STREAM_SPLIT encoding.
std::vector< int32_t > decode_int32(const uint8_t *data, size_t size, size_t num_values)
Decode DELTA_BINARY_PACKED data back to int32 values.
Definition delta.hpp:564
std::vector< int64_t > decode_int64(const uint8_t *data, size_t size, size_t num_values)
Decode DELTA_BINARY_PACKED data back to int64 values.
Definition delta.hpp:438
uint32_t crc32(const void *data, size_t length) noexcept
PhysicalType
Parquet physical (storage) types as defined in parquet.thrift.
Definition types.hpp:20
@ FIXED_LEN_BYTE_ARRAY
Fixed-length byte array (UUID, vectors, decimals).
@ INT64
64-bit signed integer (little-endian).
@ INT32
32-bit signed integer (little-endian).
@ BOOLEAN
1-bit boolean, bit-packed in pages.
@ BYTE_ARRAY
Variable-length byte sequence (strings, binary).
@ FLOAT
IEEE 754 single-precision float.
@ DOUBLE
IEEE 754 double-precision float.
constexpr uint32_t PARQUET_MAGIC_ENCRYPTED
"PARE" magic bytes (little-endian uint32) — marks a Parquet file with an encrypted footer.
Definition types.hpp:207
Compression
Parquet compression codecs.
Definition types.hpp:115
@ UNCOMPRESSED
No compression.
ConvertedType
Legacy Parquet converted types for backward compatibility with older readers.
Definition types.hpp:67
@ TIMESTAMP_MILLIS
Timestamp in milliseconds.
@ DECIMAL
Fixed-point decimal.
@ TIMESTAMP_MICROS
Timestamp in microseconds.
@ DATE
Date (days since epoch).
@ TIME_MILLIS
Time in milliseconds.
@ TIME_MICROS
Time in microseconds.
@ UTF8
UTF-8 encoded string.
expected< std::vector< uint8_t > > decompress(Compression codec, const uint8_t *data, size_t size, size_t uncompressed_size)
Decompress data using the specified codec via the global CodecRegistry.
Definition codec.hpp:213
expected< size_t > validate_mmap_page_value_count(int64_t num_values, const char *context)
LogicalType
Parquet logical types (from parquet.thrift LogicalType union).
Definition types.hpp:41
@ JSON
JSON document (stored as BYTE_ARRAY).
@ DECIMAL
Fixed-point decimal (INT32/INT64/FIXED_LEN_BYTE_ARRAY).
@ DATE
Calendar date — INT32, days since 1970-01-01.
@ STRING
UTF-8 string (stored as BYTE_ARRAY).
@ ENUM
Enum string (stored as BYTE_ARRAY).
@ TIME_MS
Time of day — INT32, milliseconds since midnight.
@ NONE
No logical annotation — raw physical type.
@ TIME_US
Time of day — INT64, microseconds since midnight.
@ BSON
BSON document (stored as BYTE_ARRAY).
@ TIMESTAMP_MS
Timestamp — INT64, milliseconds since Unix epoch.
@ TIMESTAMP_US
Timestamp — INT64, microseconds since Unix epoch.
constexpr uint32_t PARQUET_MAGIC
"PAR1" magic bytes (little-endian uint32) — marks a standard Parquet file.
Definition types.hpp:205
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ ENCRYPTION_ERROR
An encryption or decryption operation failed (bad key, tampered ciphertext, PME error).
@ UNSUPPORTED_COMPRESSION
The file uses a compression codec not linked into this build (ZSTD, LZ4, Gzip).
@ UNSUPPORTED_TYPE
The file contains a Parquet physical or logical type that is not implemented.
@ OUT_OF_RANGE
An index, offset, or size value is outside the valid range.
@ CORRUPT_FOOTER
The Parquet footer (FileMetaData) is missing, truncated, or malformed.
@ INVALID_FILE
The file is not a valid Parquet file (e.g. missing or wrong magic bytes).
@ UNSUPPORTED_ENCODING
The file uses an encoding not supported by this build (e.g. BYTE_STREAM_SPLIT on integers).
@ INVALID_ARGUMENT
A caller-supplied argument is outside the valid range or violates a precondition.
@ CORRUPT_PAGE
A data page failed integrity checks (bad CRC, truncated, or exceeds size limits).
@ CORRUPT_DATA
Decoded data is corrupt or inconsistent (e.g. out-of-range dictionary index).
Encoding
Parquet page encoding types.
Definition types.hpp:98
@ DELTA_BINARY_PACKED
Delta encoding for INT32/INT64 (compact for sorted/sequential data).
@ RLE
Run-length / bit-packed hybrid (used for booleans and def/rep levels).
@ RLE_DICTIONARY
Modern dictionary encoding (Parquet 2.0) — dict page + RLE indices.
@ PLAIN_DICTIONARY
Legacy dictionary encoding (Parquet 1.0).
@ PLAIN
Values stored back-to-back in their native binary layout.
@ BYTE_STREAM_SPLIT
Byte-stream split for FLOAT/DOUBLE (transposes byte lanes for better compression).
@ DATA_PAGE_V2
Data page v2 (Parquet 2.0 format with separate rep/def level sections).
@ DICTIONARY_PAGE
Dictionary page — contains the value dictionary for RLE_DICTIONARY columns.
@ DATA_PAGE
Data page (Parquet 1.0 format).
@ REQUIRED
Exactly one value per row (non-nullable).
RLE/Bit-Packing Hybrid encoding and decoding (Parquet spec).
Schema definition types: Column<T>, SchemaBuilder, and Schema.
Bundled, zero-dependency, header-only Snappy compression codec.
Descriptor for a single column in a Parquet schema.
Definition types.hpp:152
int32_t type_length
Byte length for FIXED_LEN_BYTE_ARRAY columns (-1 = N/A).
Definition types.hpp:157
LogicalType logical_type
Semantic annotation (STRING, TIMESTAMP_NS, etc.).
Definition types.hpp:155
Repetition repetition
Nullability / cardinality.
Definition types.hpp:156
std::string name
Column name (unique within a schema).
Definition types.hpp:153
int32_t scale
Decimal scale (-1 = N/A).
Definition types.hpp:159
PhysicalType physical_type
On-disk storage type.
Definition types.hpp:154
int32_t precision
Decimal precision (-1 = N/A).
Definition types.hpp:158
Lightweight error value carrying an ErrorCode and a human-readable message.
Definition error.hpp:99
Summary information for a single row group.
int64_t num_rows
Number of rows in this row group.
int64_t total_byte_size
Total uncompressed byte size of the row group.
int64_t row_group_index
Zero-based index of this row group.
Parquet file metadata (parquet.thrift fields 1-7).
Definition types.hpp:2265
expected< void > deserialize(CompactDecoder &dec)
Definition types.hpp:2323
std::vector< RowGroup > row_groups
Definition types.hpp:2269
std::optional< std::string > created_by
Definition types.hpp:2271
std::optional< std::vector< KeyValue > > key_value_metadata
Definition types.hpp:2270
std::vector< SchemaElement > schema
Definition types.hpp:2267
Parquet page header (parquet.thrift fields 1-8).
Definition types.hpp:933
Parquet column statistics (parquet.thrift fields 1-6).
Definition types.hpp:369
Parquet Thrift struct types – C++ structs matching parquet.thrift, with Compact Protocol serialize/de...
Parquet format enumerations, type traits, and statistics structs.