Signet Forge 0.1.0
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
mmap_reader.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
3#pragma once
4
20
21#include "signet/types.hpp"
22#include "signet/error.hpp"
23#include "signet/schema.hpp"
33
34#include <cstdint>
35#include <cstring>
36#include <filesystem>
37#include <stdexcept>
38#include <string>
39#include <vector>
40
41#ifndef _WIN32
42#include <fcntl.h>
43#include <sys/mman.h>
44#include <sys/stat.h>
45#include <unistd.h>
46#else
47#error "MmapReader: Windows support not yet implemented. Use ParquetReader instead."
48#endif
49
50namespace signet::forge {
51
52namespace detail_mmap_reader {
53
54inline uint32_t crc32(const void* data, size_t length) noexcept {
55 static constexpr auto make_table = []() {
56 std::array<uint32_t, 256> t{};
57 for (uint32_t i = 0; i < 256; ++i) {
58 uint32_t c = i;
59 for (int k = 0; k < 8; ++k)
60 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
61 t[i] = c;
62 }
63 return t;
64 };
65 static constexpr auto table = make_table();
66 uint32_t crc = 0xFFFFFFFFu;
67 auto* p = static_cast<const uint8_t*>(data);
68 for (size_t i = 0; i < length; ++i)
69 crc = table[(crc ^ p[i]) & 0xFFu] ^ (crc >> 8);
70 return crc ^ 0xFFFFFFFFu;
71}
72
73} // namespace detail_mmap_reader
74
83public:
91 [[nodiscard]] static expected<MmapReader> open(
92 const std::filesystem::path& path) {
93
94 // Open the file descriptor
95 int fd = ::open(path.c_str(), O_RDONLY);
96 if (fd < 0) {
98 "cannot open file: " + path.string()};
99 }
100
101 // Determine file size via fstat
102 struct stat st;
103 if (::fstat(fd, &st) != 0) {
104 ::close(fd);
106 "cannot stat file: " + path.string()};
107 }
108
109 size_t file_size = static_cast<size_t>(st.st_size);
110 if (file_size == 0) {
111 ::close(fd);
113 "file is empty: " + path.string()};
114 }
115
116 // Memory-map the file (read-only, private copy-on-write)
117 void* mapped = ::mmap(nullptr, file_size, PROT_READ, MAP_PRIVATE, fd, 0);
118 if (mapped == MAP_FAILED) {
119 ::close(fd);
121 "mmap failed: " + path.string()};
122 }
123
124 // Advise the kernel that we will read sequentially (improves readahead)
125#ifdef MADV_SEQUENTIAL
126 ::madvise(mapped, file_size, MADV_SEQUENTIAL);
127#endif
128
129#ifdef __linux__
130 // POSIX mmap(2): pre-fault pages to detect truncated files early (SIGBUS risk)
131 // CWE-252: Unchecked Return Value — madvise failure is non-fatal, but
132 // the volatile reads below detect truncation before any real parsing.
133 ::madvise(mapped, file_size, MADV_WILLNEED);
134#endif
135
136 // POSIX mmap(2) + CWE-252: volatile byte reads force page-in; a SIGBUS
137 // here means the file was truncated after stat() — fail fast.
138 if (file_size >= 4) {
139 volatile uint8_t check = static_cast<const uint8_t*>(mapped)[0]; (void)check;
140 check = static_cast<const uint8_t*>(mapped)[file_size - 1]; (void)check;
141 }
142
143 MmapReader reader;
144 reader.mapped_ = mapped;
145 reader.size_ = file_size;
146 reader.fd_ = fd;
147 return reader;
148 }
149
150 // -- Access the mapped memory --------------------------------------------
151
153 [[nodiscard]] const uint8_t* data() const {
154 return static_cast<const uint8_t*>(mapped_);
155 }
156
158 [[nodiscard]] size_t size() const { return size_; }
159
162 // CWE-125: Out-of-bounds Read — reject offset beyond mapped region
163 [[nodiscard]] const uint8_t* data_at(size_t offset) const {
164 if (offset >= size_) {
165 return nullptr;
166 }
167 return data() + offset;
168 }
169
170 // -- Close / unmap -------------------------------------------------------
171
175 void close() {
176 if (mapped_ != nullptr && mapped_ != MAP_FAILED) {
177 ::munmap(mapped_, size_);
178 mapped_ = nullptr;
179 }
180 if (fd_ >= 0) {
181 ::close(fd_);
182 fd_ = -1;
183 }
184 size_ = 0;
185 }
186
189 close();
190 }
191
192 // -- Non-copyable, movable -----------------------------------------------
193
194 MmapReader(const MmapReader&) = delete;
195 MmapReader& operator=(const MmapReader&) = delete;
196
198 MmapReader(MmapReader&& other) noexcept
199 : mapped_(other.mapped_)
200 , size_(other.size_)
201 , fd_(other.fd_)
202 {
203 other.mapped_ = nullptr;
204 other.size_ = 0;
205 other.fd_ = -1;
206 }
207
209 MmapReader& operator=(MmapReader&& other) noexcept {
210 if (this != &other) {
211 close();
212 mapped_ = other.mapped_;
213 size_ = other.size_;
214 fd_ = other.fd_;
215 other.mapped_ = nullptr;
216 other.size_ = 0;
217 other.fd_ = -1;
218 }
219 return *this;
220 }
221
223 [[nodiscard]] bool is_open() const {
224 return mapped_ != nullptr && mapped_ != MAP_FAILED;
225 }
226
228 MmapReader() = default;
229
230private:
231 void* mapped_ = nullptr;
232 size_t size_ = 0;
233 int fd_ = -1;
234};
235
254static constexpr int64_t MMAP_MAX_VALUES_PER_PAGE = 100'000'000;
255
257 int64_t num_values,
258 const char* context) {
259 if (num_values < 0 || num_values > MMAP_MAX_VALUES_PER_PAGE) {
261 std::string(context) + ": num_values out of valid range"};
262 }
263 return static_cast<size_t>(num_values);
264}
265
267public:
275 [[nodiscard]] static expected<MmapParquetReader> open(
276 const std::filesystem::path& path) {
277
278 // Memory-map the file
279 auto mmap_result = MmapReader::open(path);
280 if (!mmap_result) return mmap_result.error();
281
282 auto mmap = std::move(*mmap_result);
283
284 const uint8_t* file_data = mmap.data();
285 const size_t sz = mmap.size();
286
287 // --- Validate minimum size: 4 (magic) + 4 (footer len) + 4 (magic) = 12 ---
288 if (sz < 12) {
290 "file too small to be a valid Parquet file: " +
291 path.string()};
292 }
293
294 // --- Verify PAR1 magic at start ---
295 uint32_t magic_start;
296 std::memcpy(&magic_start, file_data, 4);
297 if (magic_start != PARQUET_MAGIC) {
299 "missing PAR1 magic at start of file"};
300 }
301
302 // --- Verify PAR1 or PARE magic at end ---
303 uint32_t magic_end;
304 std::memcpy(&magic_end, file_data + sz - 4, 4);
305
306 if (magic_end != PARQUET_MAGIC && magic_end != PARQUET_MAGIC_ENCRYPTED) {
308 "missing PAR1/PARE magic at end of file"};
309 }
310
311 if (magic_end == PARQUET_MAGIC_ENCRYPTED) {
313 "MmapParquetReader does not support encrypted footers; "
314 "use ParquetReader with an EncryptionConfig instead"};
315 }
316
317 // --- Read footer length (4-byte LE uint32 at [size-8, size-4]) ---
318 uint32_t footer_len;
319 std::memcpy(&footer_len, file_data + sz - 8, 4);
320
321 if (footer_len == 0 || static_cast<size_t>(footer_len) > sz - 12) {
323 "invalid footer length: " + std::to_string(footer_len)};
324 }
325
326 // --- Deserialize FileMetaData from footer (directly from mmap) ---
327 size_t footer_offset = sz - 8 - footer_len;
328 const uint8_t* footer_ptr = file_data + footer_offset;
329
330 thrift::CompactDecoder dec(footer_ptr, footer_len);
331 thrift::FileMetaData metadata;
332 if (auto r = metadata.deserialize(dec); !r.has_value()) {
333 return r.error();
334 }
335
336 // --- Build Schema from FileMetaData.schema ---
337 std::string schema_name;
338 std::vector<ColumnDescriptor> columns;
339
340 if (!metadata.schema.empty()) {
341 schema_name = metadata.schema[0].name;
342
343 for (size_t i = 1; i < metadata.schema.size(); ++i) {
344 const auto& elem = metadata.schema[i];
345
346 // Skip group nodes (those with num_children set)
347 if (elem.num_children.has_value()) {
348 continue;
349 }
350
352 cd.name = elem.name;
353 cd.physical_type = elem.type.value_or(PhysicalType::BYTE_ARRAY);
354 cd.repetition = elem.repetition_type.value_or(Repetition::REQUIRED);
355
356 if (elem.type_length.has_value()) {
357 cd.type_length = *elem.type_length;
358 }
359 if (elem.precision.has_value()) {
360 cd.precision = *elem.precision;
361 }
362 if (elem.scale.has_value()) {
363 cd.scale = *elem.scale;
364 }
365
366 // Map ConvertedType to LogicalType for common cases
367 if (elem.converted_type.has_value()) {
368 cd.logical_type = converted_type_to_logical(*elem.converted_type);
369 }
370
371 columns.push_back(std::move(cd));
372 }
373 }
374
375 // --- Assemble the reader ---
376 MmapParquetReader reader;
377 reader.mmap_ = std::move(mmap);
378 reader.metadata_ = std::move(metadata);
379 reader.schema_ = Schema(std::move(schema_name), std::move(columns));
380 reader.created_by_ = reader.metadata_.created_by.value_or("");
381
382 return reader;
383 }
384
385 // -- File metadata accessors ---------------------------------------------
386
388 [[nodiscard]] const Schema& schema() const { return schema_; }
389
391 [[nodiscard]] int64_t num_rows() const { return metadata_.num_rows; }
392
394 [[nodiscard]] int64_t num_row_groups() const {
395 return static_cast<int64_t>(metadata_.row_groups.size());
396 }
397
399 [[nodiscard]] const std::string& created_by() const { return created_by_; }
400
403 [[nodiscard]] const std::vector<thrift::KeyValue>& key_value_metadata() const {
404 static const std::vector<thrift::KeyValue> empty;
405 return metadata_.key_value_metadata.has_value()
406 ? *metadata_.key_value_metadata
407 : empty;
408 }
409
410 // -- Row group info ------------------------------------------------------
411
414 int64_t num_rows;
417 };
418
423 [[nodiscard]] RowGroupInfo row_group(size_t index) const {
424 if (index >= metadata_.row_groups.size()) {
425 throw std::out_of_range("MmapParquetReader::row_group: index " +
426 std::to_string(index) + " >= " +
427 std::to_string(metadata_.row_groups.size()));
428 }
429 const auto& rg = metadata_.row_groups[index];
430 return {rg.num_rows,
431 rg.total_byte_size,
432 static_cast<int64_t>(index)};
433 }
434
435 // -- Statistics for a column in a row group ------------------------------
436
443 size_t row_group_index, size_t column_index) const {
444 if (row_group_index >= metadata_.row_groups.size()) return nullptr;
445 const auto& rg = metadata_.row_groups[row_group_index];
446 if (column_index >= rg.columns.size()) return nullptr;
447 const auto& chunk = rg.columns[column_index];
448 if (!chunk.meta_data.has_value()) return nullptr;
449 if (!chunk.meta_data->statistics.has_value()) return nullptr;
450 return &(*chunk.meta_data->statistics);
451 }
452
453 // -- Typed column reads --------------------------------------------------
454
466 template <typename T>
467 expected<std::vector<T>> read_column(size_t row_group_index,
468 size_t column_index) {
469 // --- Validate indices ---
470 if (row_group_index >= metadata_.row_groups.size()) {
472 "row group index out of range"};
473 }
474 if (column_index >= schema_.num_columns()) {
476 "column index out of range"};
477 }
478
479 const auto& rg = metadata_.row_groups[row_group_index];
480 if (column_index >= rg.columns.size()) {
482 "column index out of range"};
483 }
484
485 const auto& chunk = rg.columns[column_index];
486 if (!chunk.meta_data.has_value()) {
488 "column chunk has no metadata"};
489 }
490 const auto& col_meta = *chunk.meta_data;
491
492 // --- Detect encoding strategy ---
493 bool has_dict = false;
494 Encoding data_encoding = Encoding::PLAIN;
495
496 for (auto enc : col_meta.encodings) {
497 if (enc == Encoding::PLAIN_DICTIONARY ||
499 has_dict = true;
500 }
502 data_encoding = Encoding::DELTA_BINARY_PACKED;
503 }
504 if (enc == Encoding::BYTE_STREAM_SPLIT) {
505 data_encoding = Encoding::BYTE_STREAM_SPLIT;
506 }
507 if (enc == Encoding::RLE &&
508 col_meta.type == PhysicalType::BOOLEAN) {
509 data_encoding = Encoding::RLE;
510 }
511 }
512
513 // --- Dictionary encoding path ---
514 if (has_dict) {
515 if constexpr (std::is_same_v<T, std::string> ||
516 std::is_same_v<T, int32_t> ||
517 std::is_same_v<T, int64_t> ||
518 std::is_same_v<T, float> ||
519 std::is_same_v<T, double>) {
520 return read_column_dict<T>(col_meta,
521 static_cast<int32_t>(row_group_index));
522 } else {
524 "dictionary encoding not supported for this type"};
525 }
526 }
527
528 // --- DELTA_BINARY_PACKED path ---
529 if (data_encoding == Encoding::DELTA_BINARY_PACKED) {
530 if constexpr (std::is_same_v<T, int32_t> ||
531 std::is_same_v<T, int64_t>) {
532 return read_column_delta<T>(col_meta);
533 } else {
535 "DELTA_BINARY_PACKED only supports INT32/INT64"};
536 }
537 }
538
539 // --- BYTE_STREAM_SPLIT path ---
540 if (data_encoding == Encoding::BYTE_STREAM_SPLIT) {
541 if constexpr (std::is_same_v<T, float> ||
542 std::is_same_v<T, double>) {
543 return read_column_bss<T>(col_meta);
544 } else {
546 "BYTE_STREAM_SPLIT only supports FLOAT/DOUBLE"};
547 }
548 }
549
550 // --- RLE boolean path ---
551 if (data_encoding == Encoding::RLE &&
552 col_meta.type == PhysicalType::BOOLEAN) {
553 if constexpr (std::is_same_v<T, bool>) {
554 return read_column_rle_bool(col_meta);
555 } else {
557 "RLE boolean encoding requires bool type"};
558 }
559 }
560
561 // --- Default: PLAIN encoding via ColumnReader ---
562 auto reader_result = make_column_reader(row_group_index, column_index);
563 if (!reader_result) return reader_result.error();
564
565 auto& [col_reader, num_values] = reader_result.value();
566 size_t count = static_cast<size_t>(num_values);
567
568 if constexpr (std::is_same_v<T, bool>) {
569 std::vector<bool> values;
570 values.reserve(count);
571 for (size_t i = 0; i < count; ++i) {
572 auto val = col_reader.template read<bool>();
573 if (!val) return val.error();
574 values.push_back(*val);
575 }
576 return values;
577 } else {
578 std::vector<T> values(count);
579 auto batch_result = col_reader.template read_batch<T>(
580 values.data(), count);
581 if (!batch_result) return batch_result.error();
582 return values;
583 }
584 }
585
586 // -- String reads --------------------------------------------------------
587
598 size_t row_group_index, size_t column_index) {
599 if (row_group_index >= metadata_.row_groups.size()) {
600 return Error{ErrorCode::OUT_OF_RANGE, "row group index out of range"};
601 }
602 if (column_index >= schema_.num_columns()) {
603 return Error{ErrorCode::OUT_OF_RANGE, "column index out of range"};
604 }
605
606 PhysicalType pt = schema_.column(column_index).physical_type;
607
608 switch (pt) {
610 auto res = read_column<bool>(row_group_index, column_index);
611 if (!res) return res.error();
612 return to_string_vec(res.value());
613 }
614 case PhysicalType::INT32: {
615 auto res = read_column<int32_t>(row_group_index, column_index);
616 if (!res) return res.error();
617 return to_string_vec(res.value());
618 }
619 case PhysicalType::INT64: {
620 auto res = read_column<int64_t>(row_group_index, column_index);
621 if (!res) return res.error();
622 return to_string_vec(res.value());
623 }
624 case PhysicalType::FLOAT: {
625 auto res = read_column<float>(row_group_index, column_index);
626 if (!res) return res.error();
627 return to_string_vec(res.value());
628 }
630 auto res = read_column<double>(row_group_index, column_index);
631 if (!res) return res.error();
632 return to_string_vec(res.value());
633 }
635 return read_column<std::string>(row_group_index, column_index);
636 }
638 auto reader_result = make_column_reader(row_group_index, column_index);
639 if (!reader_result) return reader_result.error();
640 auto& [col_reader, num_values] = reader_result.value();
641
642 std::vector<std::string> result;
643 result.reserve(static_cast<size_t>(num_values));
644 for (int64_t i = 0; i < num_values; ++i) {
645 auto bytes_result = col_reader.read_bytes();
646 if (!bytes_result) return bytes_result.error();
647 result.push_back(hex_encode(bytes_result.value()));
648 }
649 return result;
650 }
651 default:
653 "unsupported physical type for string conversion"};
654 }
655 }
656
657 // -- Read all rows -------------------------------------------------------
658
667 static constexpr size_t MAX_READ_ALL_ROWS = 100'000'000; // 100M row safety cap
668 size_t num_cols = schema_.num_columns();
669 if (metadata_.num_rows < 0 ||
670 static_cast<size_t>(metadata_.num_rows) > MAX_READ_ALL_ROWS) {
672 "read_all: num_rows exceeds safety cap ("
673 + std::to_string(MAX_READ_ALL_ROWS) + ")"};
674 }
675 std::vector<std::vector<std::string>> rows;
676 rows.reserve(static_cast<size_t>(metadata_.num_rows));
677
678 for (size_t rg = 0; rg < metadata_.row_groups.size(); ++rg) {
679 // Read all columns for this row group
680 size_t rg_num_cols = num_cols;
681 std::vector<std::vector<std::string>> col_data(rg_num_cols);
682 for (size_t c = 0; c < rg_num_cols; ++c) {
683 auto res = read_column_as_strings(rg, c);
684 if (!res) return res.error();
685 col_data[c] = std::move(res.value());
686 }
687
688 if (col_data.empty() || col_data[0].empty()) continue;
689
690 size_t rg_rows = col_data[0].size();
691 for (size_t r = 0; r < rg_rows; ++r) {
692 std::vector<std::string> row(num_cols);
693 for (size_t c = 0; c < num_cols; ++c) {
694 if (r < col_data[c].size()) {
695 row[c] = col_data[c][r];
696 }
697 }
698 rows.push_back(std::move(row));
699 }
700 }
701
702 return rows;
703 }
704
705 // -- Access the underlying mmap ------------------------------------------
706
708 [[nodiscard]] const MmapReader& mmap() const { return mmap_; }
709
710 // -- Special members -----------------------------------------------------
711
712 ~MmapParquetReader() = default;
713 MmapParquetReader(MmapParquetReader&&) noexcept = default;
714 MmapParquetReader& operator=(MmapParquetReader&&) noexcept = default;
715
716private:
718 MmapParquetReader() = default;
719
720 MmapReader mmap_;
721 thrift::FileMetaData metadata_;
722 Schema schema_;
723 std::string created_by_;
724
725 // Holds decompressed page data so ColumnReader pointers remain valid
726 std::vector<std::vector<uint8_t>> decompressed_buffers_;
727
729 struct ColumnReaderWithCount {
730 ColumnReader reader;
731 int64_t num_values;
732 };
733
735 struct PageReadResult {
736 const uint8_t* data;
737 size_t size;
738 thrift::PageHeader header;
739 };
740
742 expected<PageReadResult> read_page_at(int64_t offset, Compression codec) {
743 if (offset < 0 || static_cast<size_t>(offset) >= mmap_.size()) {
744 return Error{ErrorCode::CORRUPT_PAGE,
745 "page offset out of file bounds"};
746 }
747
748 size_t remaining = mmap_.size() - static_cast<size_t>(offset);
749 const uint8_t* page_start = mmap_.data_at(static_cast<size_t>(offset));
750
751 thrift::CompactDecoder page_dec(page_start, remaining);
752 thrift::PageHeader ph;
753 if (auto r = ph.deserialize(page_dec); !r.has_value()) {
754 return r.error();
755 }
756
757 size_t hdr_size = page_dec.position();
758
759 // Reject negative page sizes from crafted files (CWE-191)
760 if (ph.compressed_page_size < 0 || ph.uncompressed_page_size < 0) {
761 return Error{ErrorCode::CORRUPT_PAGE,
762 "mmap: negative page size in PageHeader"};
763 }
764
765 size_t compressed_size = static_cast<size_t>(ph.compressed_page_size);
766 const uint8_t* pdata = page_start + hdr_size;
767
768 if (hdr_size + compressed_size > remaining) {
769 return Error{ErrorCode::CORRUPT_PAGE,
770 "page data extends past end of file"};
771 }
772
773 size_t pdata_size = compressed_size;
774
775 if (ph.crc.has_value()) {
776 uint32_t expected_crc = static_cast<uint32_t>(*ph.crc);
777 uint32_t computed_crc = detail_mmap_reader::crc32(pdata, pdata_size);
778 if (computed_crc != expected_crc) {
779 return Error{ErrorCode::CORRUPT_PAGE,
780 "mmap: page CRC-32 mismatch at offset " + std::to_string(offset)};
781 }
782 }
783
784 // Decompress if needed
785 if (codec != Compression::UNCOMPRESSED) {
786 size_t uncompressed_size = static_cast<size_t>(
787 ph.uncompressed_page_size);
788 // L-9: Pre-validate decompressed size to prevent allocation bombs (CWE-770)
789 static constexpr size_t MMAP_MAX_PAGE_SIZE2 = 256ULL * 1024ULL * 1024ULL;
790 if (uncompressed_size == 0 || uncompressed_size > MMAP_MAX_PAGE_SIZE2) {
791 return Error{ErrorCode::CORRUPT_PAGE,
792 "mmap: uncompressed page size out of range (0 or > 256 MB)"};
793 }
794 // CWE-409: Improper Handling of Highly Compressed Data (Zip Bomb)
795 // M21: Reject suspiciously high decompression ratios (zip bomb guard)
796 if (compressed_size > 0 && uncompressed_size / compressed_size > 1024) {
797 return Error{ErrorCode::CORRUPT_DATA,
798 "mmap: decompression ratio exceeds 1024x limit"};
799 }
800 auto dec_result = decompress(codec, pdata, pdata_size,
801 uncompressed_size);
802 if (!dec_result) {
804 "decompression failed: " +
805 dec_result.error().message};
806 }
807 decompressed_buffers_.push_back(std::move(dec_result.value()));
808 pdata = decompressed_buffers_.back().data();
809 pdata_size = decompressed_buffers_.back().size();
810 }
811
812 return PageReadResult{pdata, pdata_size, std::move(ph)};
813 }
814
816 expected<ColumnReaderWithCount> make_column_reader(
817 size_t row_group_index, size_t column_index) {
818 if (row_group_index >= metadata_.row_groups.size()) {
819 return Error{ErrorCode::OUT_OF_RANGE,
820 "row group index out of range"};
821 }
822
823 const auto& rg = metadata_.row_groups[row_group_index];
824 if (column_index >= rg.columns.size()) {
825 return Error{ErrorCode::OUT_OF_RANGE,
826 "column index out of range"};
827 }
828
829 const auto& chunk = rg.columns[column_index];
830 if (!chunk.meta_data.has_value()) {
831 return Error{ErrorCode::CORRUPT_PAGE,
832 "column chunk has no metadata"};
833 }
834
835 const auto& col_meta = *chunk.meta_data;
836 int64_t offset = col_meta.data_page_offset;
837
838 if (offset < 0 || static_cast<size_t>(offset) >= mmap_.size()) {
839 return Error{ErrorCode::CORRUPT_PAGE,
840 "data_page_offset out of file bounds"};
841 }
842
843 // Read from mmap directly -- no file I/O
844 size_t remaining = mmap_.size() - static_cast<size_t>(offset);
845 const uint8_t* page_start = mmap_.data_at(static_cast<size_t>(offset));
846
847 // Deserialize the PageHeader
848 thrift::CompactDecoder page_dec(page_start, remaining);
849 thrift::PageHeader page_header;
850 if (auto r = page_header.deserialize(page_dec); !r.has_value()) {
851 return r.error();
852 }
853
854 size_t header_size = page_dec.position();
855 size_t page_data_size = static_cast<size_t>(
856 page_header.compressed_page_size);
857 const uint8_t* page_data = page_start + header_size;
858
859 if (page_header.crc.has_value()) {
860 uint32_t expected_crc = static_cast<uint32_t>(*page_header.crc);
861 uint32_t computed_crc = detail_mmap_reader::crc32(page_data, page_data_size);
862 if (computed_crc != expected_crc) {
863 return Error{ErrorCode::CORRUPT_PAGE,
864 "mmap: page CRC-32 mismatch"};
865 }
866 }
867
868 if (header_size + page_data_size > remaining) {
869 return Error{ErrorCode::CORRUPT_PAGE,
870 "page data extends past end of file"};
871 }
872
873 // Decompress if needed
874 if (col_meta.codec != Compression::UNCOMPRESSED) {
875 size_t uncompressed_size = static_cast<size_t>(
876 page_header.uncompressed_page_size);
877 // L-9: Pre-validate decompressed size to prevent allocation bombs (CWE-770)
878 static constexpr size_t MMAP_MAX_PAGE_SIZE = 256ULL * 1024ULL * 1024ULL;
879 if (uncompressed_size == 0 || uncompressed_size > MMAP_MAX_PAGE_SIZE) {
880 return Error{ErrorCode::CORRUPT_PAGE,
881 "mmap: uncompressed page size out of range (0 or > 256 MB)"};
882 }
883 // CWE-409: Improper Handling of Highly Compressed Data (Zip Bomb)
884 // M21: Reject suspiciously high decompression ratios (zip bomb guard)
885 if (page_data_size > 0 && uncompressed_size / page_data_size > 1024) {
886 return Error{ErrorCode::CORRUPT_DATA,
887 "mmap: decompression ratio exceeds 1024x limit"};
888 }
889 auto decompressed = decompress(col_meta.codec,
890 page_data, page_data_size,
891 uncompressed_size);
892 if (!decompressed) {
894 "decompression failed: " +
895 decompressed.error().message};
896 }
897 decompressed_buffers_.push_back(std::move(decompressed.value()));
898 page_data = decompressed_buffers_.back().data();
899 page_data_size = decompressed_buffers_.back().size();
900 }
901
902 // Determine num_values
903 int64_t num_values = 0;
904 if (page_header.type == PageType::DATA_PAGE &&
905 page_header.data_page_header.has_value()) {
906 num_values = page_header.data_page_header->num_values;
907 } else if (page_header.type == PageType::DATA_PAGE_V2 &&
908 page_header.data_page_header_v2.has_value()) {
909 num_values = page_header.data_page_header_v2->num_values;
910 } else {
911 num_values = col_meta.num_values;
912 }
913
914 if (num_values < 0 || num_values > MMAP_MAX_VALUES_PER_PAGE) {
915 return Error{ErrorCode::CORRUPT_PAGE,
916 "mmap: num_values out of range (" +
917 std::to_string(num_values) + ")"};
918 }
919
920 // Determine physical type and type_length
921 PhysicalType pt = col_meta.type;
922 int32_t type_length = -1;
923 if (column_index < schema_.num_columns()) {
924 type_length = schema_.column(column_index).type_length;
925 }
926
927 ColumnReader col_reader(pt, page_data, page_data_size,
928 num_values, type_length);
929
930 return ColumnReaderWithCount{std::move(col_reader), num_values};
931 }
932
934 template <typename T>
935 expected<std::vector<T>> read_column_dict(
936 const thrift::ColumnMetaData& col_meta,
937 int32_t rg_index = 0) {
938 (void)rg_index;
939
940 int64_t dict_offset = col_meta.dictionary_page_offset.value_or(
941 col_meta.data_page_offset);
942
943 // Read the dictionary page
944 auto dict_page_result = read_page_at(dict_offset, col_meta.codec);
945 if (!dict_page_result) return dict_page_result.error();
946
947 auto& dict_pr = dict_page_result.value();
948 if (dict_pr.header.type != PageType::DICTIONARY_PAGE ||
949 !dict_pr.header.dictionary_page_header.has_value()) {
950 return Error{ErrorCode::CORRUPT_PAGE,
951 "expected DICTIONARY_PAGE at dictionary offset"};
952 }
953
954 int32_t raw_dict_count = dict_pr.header.dictionary_page_header->num_values;
955 if (raw_dict_count < 0 || raw_dict_count > 10'000'000) {
956 return Error{ErrorCode::CORRUPT_PAGE,
957 "mmap: dictionary page num_values out of valid range"};
958 }
959 size_t num_dict_entries = static_cast<size_t>(raw_dict_count);
960
961 // Find the data page offset
962 int64_t data_offset = col_meta.data_page_offset;
963 if (data_offset == dict_offset) {
964 // Dictionary and data pages are sequential -- skip past the
965 // dictionary page to reach the data page.
966 size_t dict_raw_start = static_cast<size_t>(dict_offset);
967 const uint8_t* dict_start = mmap_.data_at(dict_raw_start);
968 size_t dict_remaining = mmap_.size() - dict_raw_start;
969
970 thrift::CompactDecoder hdr_dec(dict_start, dict_remaining);
971 thrift::PageHeader tmp_hdr;
972 if (auto r = tmp_hdr.deserialize(hdr_dec); !r.has_value()) {
973 return r.error();
974 }
975 size_t dict_hdr_size = hdr_dec.position();
976 size_t dict_compressed_size = static_cast<size_t>(
977 tmp_hdr.compressed_page_size);
978
979 data_offset = dict_offset
980 + static_cast<int64_t>(dict_hdr_size)
981 + static_cast<int64_t>(dict_compressed_size);
982 }
983
984 // Read the data page (RLE-encoded indices)
985 auto data_page_result = read_page_at(data_offset, col_meta.codec);
986 if (!data_page_result) return data_page_result.error();
987
988 auto& data_pr = data_page_result.value();
989
990 int64_t num_values = 0;
991 if (data_pr.header.type == PageType::DATA_PAGE &&
992 data_pr.header.data_page_header.has_value()) {
993 num_values = data_pr.header.data_page_header->num_values;
994 } else if (data_pr.header.type == PageType::DATA_PAGE_V2 &&
995 data_pr.header.data_page_header_v2.has_value()) {
996 num_values = data_pr.header.data_page_header_v2->num_values;
997 } else {
998 num_values = col_meta.num_values;
999 }
1000 if (num_values < 0 || num_values > 100'000'000)
1001 return Error{ErrorCode::CORRUPT_DATA, "Invalid num_values in page header"};
1002
1003 DictionaryDecoder<T> decoder(dict_pr.data, dict_pr.size,
1004 num_dict_entries, col_meta.type);
1005
1006 return decoder.decode(data_pr.data, data_pr.size,
1007 static_cast<size_t>(num_values));
1008 }
1009
1011 template <typename T>
1012 expected<std::vector<T>> read_column_delta(
1013 const thrift::ColumnMetaData& col_meta) {
1014 auto page_result = read_page_at(col_meta.data_page_offset,
1015 col_meta.codec);
1016 if (!page_result) return page_result.error();
1017
1018 auto& pr = page_result.value();
1019
1020 int64_t num_values = 0;
1021 if (pr.header.type == PageType::DATA_PAGE &&
1022 pr.header.data_page_header.has_value()) {
1023 num_values = pr.header.data_page_header->num_values;
1024 } else if (pr.header.type == PageType::DATA_PAGE_V2 &&
1025 pr.header.data_page_header_v2.has_value()) {
1026 num_values = pr.header.data_page_header_v2->num_values;
1027 } else {
1028 num_values = col_meta.num_values;
1029 }
1030 auto count_result = validate_mmap_page_value_count(num_values, "MmapParquetReader DELTA_BINARY_PACKED");
1031 if (!count_result) return count_result.error();
1032 size_t count = *count_result;
1033
1034 if constexpr (std::is_same_v<T, int32_t>) {
1035 return delta::decode_int32(pr.data, pr.size, count);
1036 } else if constexpr (std::is_same_v<T, int64_t>) {
1037 return delta::decode_int64(pr.data, pr.size, count);
1038 } else {
1040 "DELTA_BINARY_PACKED only supports INT32/INT64"};
1041 }
1042 }
1043
1045 template <typename T>
1046 expected<std::vector<T>> read_column_bss(
1047 const thrift::ColumnMetaData& col_meta) {
1048 auto page_result = read_page_at(col_meta.data_page_offset,
1049 col_meta.codec);
1050 if (!page_result) return page_result.error();
1051
1052 auto& pr = page_result.value();
1053
1054 int64_t num_values = 0;
1055 if (pr.header.type == PageType::DATA_PAGE &&
1056 pr.header.data_page_header.has_value()) {
1057 num_values = pr.header.data_page_header->num_values;
1058 } else if (pr.header.type == PageType::DATA_PAGE_V2 &&
1059 pr.header.data_page_header_v2.has_value()) {
1060 num_values = pr.header.data_page_header_v2->num_values;
1061 } else {
1062 num_values = col_meta.num_values;
1063 }
1064 auto count_result = validate_mmap_page_value_count(num_values, "MmapParquetReader BYTE_STREAM_SPLIT");
1065 if (!count_result) return count_result.error();
1066 size_t count = *count_result;
1067
1068 if constexpr (std::is_same_v<T, float>) {
1069 return byte_stream_split::decode_float(pr.data, pr.size, count);
1070 } else if constexpr (std::is_same_v<T, double>) {
1071 return byte_stream_split::decode_double(pr.data, pr.size, count);
1072 } else {
1074 "BYTE_STREAM_SPLIT only supports FLOAT/DOUBLE"};
1075 }
1076 }
1077
1079 expected<std::vector<bool>> read_column_rle_bool(
1080 const thrift::ColumnMetaData& col_meta) {
1081 auto page_result = read_page_at(col_meta.data_page_offset,
1082 col_meta.codec);
1083 if (!page_result) return page_result.error();
1084
1085 auto& pr = page_result.value();
1086
1087 int64_t num_values = 0;
1088 if (pr.header.type == PageType::DATA_PAGE &&
1089 pr.header.data_page_header.has_value()) {
1090 num_values = pr.header.data_page_header->num_values;
1091 } else if (pr.header.type == PageType::DATA_PAGE_V2 &&
1092 pr.header.data_page_header_v2.has_value()) {
1093 num_values = pr.header.data_page_header_v2->num_values;
1094 } else {
1095 num_values = col_meta.num_values;
1096 }
1097 auto count_result = validate_mmap_page_value_count(num_values, "MmapParquetReader RLE_BOOL");
1098 if (!count_result) return count_result.error();
1099 size_t count = *count_result;
1100
1101 // RLE boolean: 4-byte LE length prefix + RLE payload, bit_width=1
1102 auto indices = RleDecoder::decode_with_length(
1103 pr.data, pr.size, /*bit_width=*/1, count);
1104
1105 std::vector<bool> result;
1106 result.reserve(count);
1107 for (size_t i = 0; i < count && i < indices.size(); ++i) {
1108 result.push_back(indices[i] != 0);
1109 }
1110 return result;
1111 }
1112
1113 // -------------------------------------------------------------------
1114 // String conversion helpers
1115 // -------------------------------------------------------------------
1116
1118 static std::vector<std::string> to_string_vec(const std::vector<bool>& vals) {
1119 std::vector<std::string> result;
1120 result.reserve(vals.size());
1121 for (bool v : vals) {
1122 result.push_back(v ? "true" : "false");
1123 }
1124 return result;
1125 }
1126
1128 template <typename T>
1129 static std::vector<std::string> to_string_vec(const std::vector<T>& vals) {
1130 std::vector<std::string> result;
1131 result.reserve(vals.size());
1132 for (const auto& v : vals) {
1133 result.push_back(std::to_string(v));
1134 }
1135 return result;
1136 }
1137
1139 static std::string hex_encode(const std::vector<uint8_t>& bytes) {
1140 static constexpr char hex_chars[] = "0123456789abcdef";
1141 std::string result;
1142 result.reserve(bytes.size() * 2);
1143 for (uint8_t b : bytes) {
1144 result.push_back(hex_chars[(b >> 4) & 0x0F]);
1145 result.push_back(hex_chars[b & 0x0F]);
1146 }
1147 return result;
1148 }
1149
1151 static LogicalType converted_type_to_logical(ConvertedType ct) {
1152 switch (ct) {
1163 default: return LogicalType::NONE;
1164 }
1165 }
1166};
1167
1168} // namespace signet::forge
BYTE_STREAM_SPLIT encoding and decoding (Parquet encoding type 9).
PLAIN-encoded Parquet column decoder.
MmapParquetReader(MmapParquetReader &&) noexcept=default
Move-constructible.
const Schema & schema() const
The file's column schema.
const std::string & created_by() const
The "created by" string from the file footer (may be empty).
const std::vector< thrift::KeyValue > & key_value_metadata() const
User-defined key-value metadata from the file footer.
int64_t num_row_groups() const
Number of row groups in the file.
const MmapReader & mmap() const
Direct access to the memory-mapped file data.
expected< std::vector< std::string > > read_column_as_strings(size_t row_group_index, size_t column_index)
Read a column and convert all values to their string representations.
expected< std::vector< std::vector< std::string > > > read_all()
Read all rows from all row groups as a vector of string rows.
int64_t num_rows() const
Total number of rows across all row groups.
expected< std::vector< T > > read_column(size_t row_group_index, size_t column_index)
Read an entire column from a row group as a typed vector.
~MmapParquetReader()=default
Default destructor.
RowGroupInfo row_group(size_t index) const
Retrieve summary information for a specific row group.
const thrift::Statistics * column_statistics(size_t row_group_index, size_t column_index) const
Retrieve the Thrift Statistics for a column chunk.
static expected< MmapParquetReader > open(const std::filesystem::path &path)
Open a Parquet file with memory-mapped I/O.
Low-level memory-mapped file handle.
MmapReader & operator=(MmapReader &&other) noexcept
Move assignment – transfers ownership of the mapping.
size_t size() const
Total file size in bytes.
const uint8_t * data() const
Pointer to the start of the mapped file.
static expected< MmapReader > open(const std::filesystem::path &path)
Open a file and memory-map it read-only.
bool is_open() const
Returns true if the mapping is currently active.
MmapReader & operator=(const MmapReader &)=delete
Non-copyable.
~MmapReader()
Destructor – unmaps file and closes fd.
void close()
Unmap the file and close the file descriptor.
MmapReader()=default
Default constructor — creates a closed/unmapped reader.
MmapReader(const MmapReader &)=delete
Non-copyable.
const uint8_t * data_at(size_t offset) const
Pointer to mapped memory at a given offset.
MmapReader(MmapReader &&other) noexcept
Move constructor – transfers ownership of the mapping.
static std::vector< uint32_t > decode_with_length(const uint8_t *data, size_t size, int bit_width, size_t num_values)
Decode from a buffer that starts with a 4-byte LE length prefix.
Definition rle.hpp:658
Immutable schema description for a Parquet file.
Definition schema.hpp:192
size_t num_columns() const
Number of columns in this schema.
Definition schema.hpp:238
const ColumnDescriptor & column(size_t index) const
Access a column descriptor by index.
Definition schema.hpp:244
A lightweight result type that holds either a success value of type T or an Error.
Definition error.hpp:145
Thrift Compact Protocol reader.
Definition compact.hpp:267
Compression codec interface and registry for Signet Forge.
PLAIN-encoded Parquet column decoder.
Thrift Compact Protocol encoder and decoder for Parquet metadata serialization.
DELTA_BINARY_PACKED encoding and decoding (Parquet encoding type 5).
Dictionary encoding and decoding for Parquet (PLAIN_DICTIONARY / RLE_DICTIONARY).
std::vector< float > decode_float(const uint8_t *data, size_t size, size_t count)
Decode float values from BYTE_STREAM_SPLIT encoding.
std::vector< double > decode_double(const uint8_t *data, size_t size, size_t count)
Decode double values from BYTE_STREAM_SPLIT encoding.
std::vector< int32_t > decode_int32(const uint8_t *data, size_t size, size_t num_values)
Decode DELTA_BINARY_PACKED data back to int32 values.
Definition delta.hpp:564
std::vector< int64_t > decode_int64(const uint8_t *data, size_t size, size_t num_values)
Decode DELTA_BINARY_PACKED data back to int64 values.
Definition delta.hpp:438
uint32_t crc32(const void *data, size_t length) noexcept
PhysicalType
Parquet physical (storage) types as defined in parquet.thrift.
Definition types.hpp:20
@ FIXED_LEN_BYTE_ARRAY
Fixed-length byte array (UUID, vectors, decimals).
@ INT64
64-bit signed integer (little-endian).
@ INT32
32-bit signed integer (little-endian).
@ BOOLEAN
1-bit boolean, bit-packed in pages.
@ BYTE_ARRAY
Variable-length byte sequence (strings, binary).
@ FLOAT
IEEE 754 single-precision float.
@ DOUBLE
IEEE 754 double-precision float.
constexpr uint32_t PARQUET_MAGIC_ENCRYPTED
"PARE" magic bytes (little-endian uint32) — marks a Parquet file with an encrypted footer.
Definition types.hpp:207
Compression
Parquet compression codecs.
Definition types.hpp:115
@ UNCOMPRESSED
No compression.
ConvertedType
Legacy Parquet converted types for backward compatibility with older readers.
Definition types.hpp:67
@ TIMESTAMP_MILLIS
Timestamp in milliseconds.
@ DECIMAL
Fixed-point decimal.
@ TIMESTAMP_MICROS
Timestamp in microseconds.
@ DATE
Date (days since epoch).
@ TIME_MILLIS
Time in milliseconds.
@ TIME_MICROS
Time in microseconds.
@ UTF8
UTF-8 encoded string.
expected< std::vector< uint8_t > > decompress(Compression codec, const uint8_t *data, size_t size, size_t uncompressed_size)
Decompress data using the specified codec via the global CodecRegistry.
Definition codec.hpp:213
expected< size_t > validate_mmap_page_value_count(int64_t num_values, const char *context)
LogicalType
Parquet logical types (from parquet.thrift LogicalType union).
Definition types.hpp:41
@ JSON
JSON document (stored as BYTE_ARRAY).
@ DECIMAL
Fixed-point decimal (INT32/INT64/FIXED_LEN_BYTE_ARRAY).
@ DATE
Calendar date — INT32, days since 1970-01-01.
@ STRING
UTF-8 string (stored as BYTE_ARRAY).
@ ENUM
Enum string (stored as BYTE_ARRAY).
@ TIME_MS
Time of day — INT32, milliseconds since midnight.
@ NONE
No logical annotation — raw physical type.
@ TIME_US
Time of day — INT64, microseconds since midnight.
@ BSON
BSON document (stored as BYTE_ARRAY).
@ TIMESTAMP_MS
Timestamp — INT64, milliseconds since Unix epoch.
@ TIMESTAMP_US
Timestamp — INT64, microseconds since Unix epoch.
constexpr uint32_t PARQUET_MAGIC
"PAR1" magic bytes (little-endian uint32) — marks a standard Parquet file.
Definition types.hpp:205
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ ENCRYPTION_ERROR
An encryption or decryption operation failed (bad key, tampered ciphertext, PME error).
@ UNSUPPORTED_COMPRESSION
The file uses a compression codec not linked into this build (ZSTD, LZ4, Gzip).
@ UNSUPPORTED_TYPE
The file contains a Parquet physical or logical type that is not implemented.
@ OUT_OF_RANGE
An index, offset, or size value is outside the valid range.
@ CORRUPT_FOOTER
The Parquet footer (FileMetaData) is missing, truncated, or malformed.
@ INVALID_FILE
The file is not a valid Parquet file (e.g. missing or wrong magic bytes).
@ UNSUPPORTED_ENCODING
The file uses an encoding not supported by this build (e.g. BYTE_STREAM_SPLIT on integers).
@ INVALID_ARGUMENT
A caller-supplied argument is outside the valid range or violates a precondition.
@ CORRUPT_PAGE
A data page failed integrity checks (bad CRC, truncated, or exceeds size limits).
@ CORRUPT_DATA
Decoded data is corrupt or inconsistent (e.g. out-of-range dictionary index).
Encoding
Parquet page encoding types.
Definition types.hpp:98
@ DELTA_BINARY_PACKED
Delta encoding for INT32/INT64 (compact for sorted/sequential data).
@ RLE
Run-length / bit-packed hybrid (used for booleans and def/rep levels).
@ RLE_DICTIONARY
Modern dictionary encoding (Parquet 2.0) — dict page + RLE indices.
@ PLAIN_DICTIONARY
Legacy dictionary encoding (Parquet 1.0).
@ PLAIN
Values stored back-to-back in their native binary layout.
@ BYTE_STREAM_SPLIT
Byte-stream split for FLOAT/DOUBLE (transposes byte lanes for better compression).
@ DATA_PAGE_V2
Data page v2 (Parquet 2.0 format with separate rep/def level sections).
@ DICTIONARY_PAGE
Dictionary page — contains the value dictionary for RLE_DICTIONARY columns.
@ DATA_PAGE
Data page (Parquet 1.0 format).
@ REQUIRED
Exactly one value per row (non-nullable).
RLE/Bit-Packing Hybrid encoding and decoding (Parquet spec).
Schema definition types: Column<T>, SchemaBuilder, and Schema.
Bundled, zero-dependency, header-only Snappy compression codec.
Descriptor for a single column in a Parquet schema.
Definition types.hpp:152
int32_t type_length
Byte length for FIXED_LEN_BYTE_ARRAY columns (-1 = N/A).
Definition types.hpp:157
LogicalType logical_type
Semantic annotation (STRING, TIMESTAMP_NS, etc.).
Definition types.hpp:155
Repetition repetition
Nullability / cardinality.
Definition types.hpp:156
std::string name
Column name (unique within a schema).
Definition types.hpp:153
int32_t scale
Decimal scale (-1 = N/A).
Definition types.hpp:159
PhysicalType physical_type
On-disk storage type.
Definition types.hpp:154
int32_t precision
Decimal precision (-1 = N/A).
Definition types.hpp:158
Lightweight error value carrying an ErrorCode and a human-readable message.
Definition error.hpp:101
Summary information for a single row group.
int64_t num_rows
Number of rows in this row group.
int64_t total_byte_size
Total uncompressed byte size of the row group.
int64_t row_group_index
Zero-based index of this row group.
Parquet file metadata (parquet.thrift fields 1-7).
Definition types.hpp:2265
expected< void > deserialize(CompactDecoder &dec)
Definition types.hpp:2323
std::vector< RowGroup > row_groups
Definition types.hpp:2269
std::optional< std::string > created_by
Definition types.hpp:2271
std::optional< std::vector< KeyValue > > key_value_metadata
Definition types.hpp:2270
std::vector< SchemaElement > schema
Definition types.hpp:2267
Parquet page header (parquet.thrift fields 1-8).
Definition types.hpp:933
Parquet column statistics (parquet.thrift fields 1-6).
Definition types.hpp:369
Parquet Thrift struct types – C++ structs matching parquet.thrift, with Compact Protocol serialize/de...
Parquet format enumerations, type traits, and statistics structs.