269 const std::filesystem::path& path) {
273 if (!mmap_result)
return mmap_result.error();
275 auto mmap = std::move(*mmap_result);
277 const uint8_t* file_data =
mmap.
data();
283 "file too small to be a valid Parquet file: " +
288 uint32_t magic_start;
289 std::memcpy(&magic_start, file_data, 4);
292 "missing PAR1 magic at start of file"};
297 std::memcpy(&magic_end, file_data + sz - 4, 4);
301 "missing PAR1/PARE magic at end of file"};
306 "MmapParquetReader does not support encrypted footers; "
307 "use ParquetReader with an EncryptionConfig instead"};
312 std::memcpy(&footer_len, file_data + sz - 8, 4);
314 if (footer_len == 0 ||
static_cast<size_t>(footer_len) > sz - 12) {
316 "invalid footer length: " + std::to_string(footer_len)};
320 size_t footer_offset = sz - 8 - footer_len;
321 const uint8_t* footer_ptr = file_data + footer_offset;
325 if (
auto r = metadata.
deserialize(dec); !r.has_value()) {
330 std::string schema_name;
331 std::vector<ColumnDescriptor> columns;
333 if (!metadata.
schema.empty()) {
334 schema_name = metadata.
schema[0].name;
336 for (
size_t i = 1; i < metadata.
schema.size(); ++i) {
337 const auto& elem = metadata.
schema[i];
340 if (elem.num_children.has_value()) {
349 if (elem.type_length.has_value()) {
352 if (elem.precision.has_value()) {
355 if (elem.scale.has_value()) {
356 cd.
scale = *elem.scale;
360 if (elem.converted_type.has_value()) {
361 cd.
logical_type = converted_type_to_logical(*elem.converted_type);
364 columns.push_back(std::move(cd));
370 reader.mmap_ = std::move(
mmap);
371 reader.metadata_ = std::move(metadata);
372 reader.schema_ =
Schema(std::move(schema_name), std::move(columns));
373 reader.created_by_ = reader.metadata_.
created_by.value_or(
"");
388 return static_cast<int64_t
>(metadata_.
row_groups.size());
392 [[nodiscard]]
const std::string&
created_by()
const {
return created_by_; }
397 static const std::vector<thrift::KeyValue> empty;
418 throw std::out_of_range(
"MmapParquetReader::row_group: index " +
419 std::to_string(index) +
" >= " +
425 static_cast<int64_t
>(index)};
436 size_t row_group_index,
size_t column_index)
const {
437 if (row_group_index >= metadata_.
row_groups.size())
return nullptr;
438 const auto& rg = metadata_.
row_groups[row_group_index];
439 if (column_index >= rg.columns.size())
return nullptr;
440 const auto& chunk = rg.columns[column_index];
441 if (!chunk.meta_data.has_value())
return nullptr;
442 if (!chunk.meta_data->statistics.has_value())
return nullptr;
443 return &(*chunk.meta_data->statistics);
459 template <
typename T>
461 size_t column_index) {
463 if (row_group_index >= metadata_.
row_groups.size()) {
465 "row group index out of range"};
469 "column index out of range"};
472 const auto& rg = metadata_.
row_groups[row_group_index];
473 if (column_index >= rg.columns.size()) {
475 "column index out of range"};
478 const auto& chunk = rg.columns[column_index];
479 if (!chunk.meta_data.has_value()) {
481 "column chunk has no metadata"};
483 const auto& col_meta = *chunk.meta_data;
486 bool has_dict =
false;
489 for (
auto enc : col_meta.encodings) {
508 if constexpr (std::is_same_v<T, std::string> ||
509 std::is_same_v<T, int32_t> ||
510 std::is_same_v<T, int64_t> ||
511 std::is_same_v<T, float> ||
512 std::is_same_v<T, double>) {
513 return read_column_dict<T>(col_meta,
514 static_cast<int32_t
>(row_group_index));
517 "dictionary encoding not supported for this type"};
523 if constexpr (std::is_same_v<T, int32_t> ||
524 std::is_same_v<T, int64_t>) {
525 return read_column_delta<T>(col_meta);
528 "DELTA_BINARY_PACKED only supports INT32/INT64"};
534 if constexpr (std::is_same_v<T, float> ||
535 std::is_same_v<T, double>) {
536 return read_column_bss<T>(col_meta);
539 "BYTE_STREAM_SPLIT only supports FLOAT/DOUBLE"};
546 if constexpr (std::is_same_v<T, bool>) {
547 return read_column_rle_bool(col_meta);
550 "RLE boolean encoding requires bool type"};
555 auto reader_result = make_column_reader(row_group_index, column_index);
556 if (!reader_result)
return reader_result.error();
558 auto& [col_reader, num_values] = reader_result.value();
559 size_t count =
static_cast<size_t>(num_values);
561 if constexpr (std::is_same_v<T, bool>) {
562 std::vector<bool> values;
563 values.reserve(count);
564 for (
size_t i = 0; i < count; ++i) {
565 auto val = col_reader.template read<bool>();
566 if (!val)
return val.error();
567 values.push_back(*val);
571 std::vector<T> values(count);
572 auto batch_result = col_reader.template read_batch<T>(
573 values.data(), count);
574 if (!batch_result)
return batch_result.error();
591 size_t row_group_index,
size_t column_index) {
592 if (row_group_index >= metadata_.
row_groups.size()) {
603 auto res = read_column<bool>(row_group_index, column_index);
604 if (!res)
return res.error();
605 return to_string_vec(res.value());
608 auto res = read_column<int32_t>(row_group_index, column_index);
609 if (!res)
return res.error();
610 return to_string_vec(res.value());
613 auto res = read_column<int64_t>(row_group_index, column_index);
614 if (!res)
return res.error();
615 return to_string_vec(res.value());
618 auto res = read_column<float>(row_group_index, column_index);
619 if (!res)
return res.error();
620 return to_string_vec(res.value());
623 auto res = read_column<double>(row_group_index, column_index);
624 if (!res)
return res.error();
625 return to_string_vec(res.value());
628 return read_column<std::string>(row_group_index, column_index);
631 auto reader_result = make_column_reader(row_group_index, column_index);
632 if (!reader_result)
return reader_result.error();
633 auto& [col_reader, num_values] = reader_result.value();
635 std::vector<std::string> result;
636 result.reserve(
static_cast<size_t>(num_values));
637 for (int64_t i = 0; i < num_values; ++i) {
638 auto bytes_result = col_reader.read_bytes();
639 if (!bytes_result)
return bytes_result.error();
640 result.push_back(hex_encode(bytes_result.value()));
646 "unsupported physical type for string conversion"};
660 static constexpr size_t MAX_READ_ALL_ROWS = 100'000'000;
663 static_cast<size_t>(metadata_.
num_rows) > MAX_READ_ALL_ROWS) {
665 "read_all: num_rows exceeds safety cap ("
666 + std::to_string(MAX_READ_ALL_ROWS) +
")"};
668 std::vector<std::vector<std::string>> rows;
669 rows.reserve(
static_cast<size_t>(metadata_.
num_rows));
671 for (
size_t rg = 0; rg < metadata_.
row_groups.size(); ++rg) {
673 size_t rg_num_cols = num_cols;
674 std::vector<std::vector<std::string>> col_data(rg_num_cols);
675 for (
size_t c = 0; c < rg_num_cols; ++c) {
677 if (!res)
return res.error();
678 col_data[c] = std::move(res.value());
681 if (col_data.empty() || col_data[0].empty())
continue;
683 size_t rg_rows = col_data[0].size();
684 for (
size_t r = 0; r < rg_rows; ++r) {
685 std::vector<std::string> row(num_cols);
686 for (
size_t c = 0; c < num_cols; ++c) {
687 if (r < col_data[c].size()) {
688 row[c] = col_data[c][r];
691 rows.push_back(std::move(row));
714 thrift::FileMetaData metadata_;
716 std::
string created_by_;
719 std::vector<std::vector<uint8_t>> decompressed_buffers_;
722 struct ColumnReaderWithCount {
728 struct PageReadResult {
735 expected<PageReadResult> read_page_at(int64_t offset,
Compression codec) {
736 if (offset < 0 ||
static_cast<size_t>(offset) >= mmap_.
size()) {
738 "page offset out of file bounds"};
741 size_t remaining = mmap_.
size() -
static_cast<size_t>(offset);
742 const uint8_t* mapped_start = mmap_.
data_at(
static_cast<size_t>(offset));
748 constexpr size_t kHdrWindow = 4096;
749 size_t hdr_window = (std::min)(remaining, kHdrWindow);
750 std::vector<uint8_t> hdr_copy(mapped_start, mapped_start + hdr_window);
752 thrift::CompactDecoder page_dec(hdr_copy.data(), hdr_copy.size());
753 thrift::PageHeader ph;
754 if (
auto r = ph.deserialize(page_dec); !r.has_value()) {
758 size_t hdr_size = page_dec.position();
761 if (ph.compressed_page_size < 0 || ph.uncompressed_page_size < 0) {
763 "mmap: negative page size in PageHeader"};
766 size_t compressed_size =
static_cast<size_t>(ph.compressed_page_size);
768 if (hdr_size + compressed_size > remaining) {
770 "page data extends past end of file"};
776 std::vector<uint8_t> payload_copy(
777 mapped_start + hdr_size,
778 mapped_start + hdr_size + compressed_size);
779 const uint8_t* pdata = payload_copy.data();
780 size_t pdata_size = compressed_size;
782 if (ph.crc.has_value()) {
783 uint32_t expected_crc =
static_cast<uint32_t
>(*ph.crc);
785 if (computed_crc != expected_crc) {
787 "mmap: page CRC-32 mismatch at offset " + std::to_string(offset)};
793 size_t uncompressed_size =
static_cast<size_t>(
794 ph.uncompressed_page_size);
796 static constexpr size_t MMAP_MAX_PAGE_SIZE2 = 256ULL * 1024ULL * 1024ULL;
797 if (uncompressed_size == 0 || uncompressed_size > MMAP_MAX_PAGE_SIZE2) {
799 "mmap: uncompressed page size out of range (0 or > 256 MB)"};
803 if (compressed_size > 0 && uncompressed_size / compressed_size > 1024) {
805 "mmap: decompression ratio exceeds 1024x limit"};
808 auto dec_result =
decompress(codec, pdata, pdata_size,
812 "decompression failed: " +
813 dec_result.error().message};
815 decompressed_buffers_.push_back(std::move(dec_result.value()));
816 pdata = decompressed_buffers_.back().data();
817 pdata_size = decompressed_buffers_.back().size();
821 decompressed_buffers_.push_back(std::move(payload_copy));
822 pdata = decompressed_buffers_.back().data();
825 return PageReadResult{pdata, pdata_size, std::move(ph)};
829 expected<ColumnReaderWithCount> make_column_reader(
830 size_t row_group_index,
size_t column_index) {
831 if (row_group_index >= metadata_.
row_groups.size()) {
833 "row group index out of range"};
836 const auto& rg = metadata_.
row_groups[row_group_index];
837 if (column_index >= rg.columns.size()) {
839 "column index out of range"};
842 const auto& chunk = rg.columns[column_index];
843 if (!chunk.meta_data.has_value()) {
845 "column chunk has no metadata"};
848 const auto& col_meta = *chunk.meta_data;
849 int64_t offset = col_meta.data_page_offset;
851 if (offset < 0 ||
static_cast<size_t>(offset) >= mmap_.
size()) {
853 "data_page_offset out of file bounds"};
857 size_t remaining = mmap_.
size() -
static_cast<size_t>(offset);
858 const uint8_t* page_start = mmap_.
data_at(
static_cast<size_t>(offset));
861 thrift::CompactDecoder page_dec(page_start, remaining);
862 thrift::PageHeader page_header;
863 if (
auto r = page_header.deserialize(page_dec); !r.has_value()) {
867 size_t header_size = page_dec.position();
871 if (page_header.compressed_page_size <= 0) {
873 "mmap: compressed_page_size must be positive"};
875 size_t page_data_size =
static_cast<size_t>(
876 page_header.compressed_page_size);
877 const uint8_t* page_data = page_start + header_size;
881 if (header_size + page_data_size > remaining) {
883 "page data extends past end of file"};
886 if (page_header.crc.has_value()) {
887 uint32_t expected_crc =
static_cast<uint32_t
>(*page_header.crc);
889 if (computed_crc != expected_crc) {
891 "mmap: page CRC-32 mismatch"};
897 size_t uncompressed_size =
static_cast<size_t>(
898 page_header.uncompressed_page_size);
900 static constexpr size_t MMAP_MAX_PAGE_SIZE = 256ULL * 1024ULL * 1024ULL;
901 if (uncompressed_size == 0 || uncompressed_size > MMAP_MAX_PAGE_SIZE) {
903 "mmap: uncompressed page size out of range (0 or > 256 MB)"};
907 if (page_data_size > 0 && uncompressed_size / page_data_size > 1024) {
909 "mmap: decompression ratio exceeds 1024x limit"};
911 auto decompressed =
decompress(col_meta.codec,
912 page_data, page_data_size,
916 "decompression failed: " +
917 decompressed.error().message};
919 decompressed_buffers_.push_back(std::move(decompressed.value()));
920 page_data = decompressed_buffers_.back().data();
921 page_data_size = decompressed_buffers_.back().size();
925 int64_t num_values = 0;
927 page_header.data_page_header.has_value()) {
928 num_values = page_header.data_page_header->num_values;
930 page_header.data_page_header_v2.has_value()) {
931 num_values = page_header.data_page_header_v2->num_values;
933 num_values = col_meta.num_values;
936 if (num_values < 0 || num_values > MMAP_MAX_VALUES_PER_PAGE) {
938 "mmap: num_values out of range (" +
939 std::to_string(num_values) +
")"};
944 int32_t type_length = -1;
949 ColumnReader col_reader(pt, page_data, page_data_size,
950 num_values, type_length);
952 return ColumnReaderWithCount{std::move(col_reader), num_values};
956 template <
typename T>
957 expected<std::vector<T>> read_column_dict(
958 const thrift::ColumnMetaData& col_meta,
959 int32_t rg_index = 0) {
962 int64_t dict_offset = col_meta.dictionary_page_offset.value_or(
963 col_meta.data_page_offset);
966 auto dict_page_result = read_page_at(dict_offset, col_meta.codec);
967 if (!dict_page_result)
return dict_page_result.error();
969 auto& dict_pr = dict_page_result.value();
971 !dict_pr.header.dictionary_page_header.has_value()) {
973 "expected DICTIONARY_PAGE at dictionary offset"};
976 int32_t raw_dict_count = dict_pr.header.dictionary_page_header->num_values;
977 if (raw_dict_count < 0 || raw_dict_count > 10'000'000) {
979 "mmap: dictionary page num_values out of valid range"};
981 size_t num_dict_entries =
static_cast<size_t>(raw_dict_count);
984 int64_t data_offset = col_meta.data_page_offset;
985 if (data_offset == dict_offset) {
988 size_t dict_raw_start =
static_cast<size_t>(dict_offset);
989 const uint8_t* dict_start = mmap_.
data_at(dict_raw_start);
990 size_t dict_remaining = mmap_.
size() - dict_raw_start;
992 thrift::CompactDecoder hdr_dec(dict_start, dict_remaining);
993 thrift::PageHeader tmp_hdr;
994 if (
auto r = tmp_hdr.deserialize(hdr_dec); !r.has_value()) {
997 size_t dict_hdr_size = hdr_dec.position();
998 size_t dict_compressed_size =
static_cast<size_t>(
999 tmp_hdr.compressed_page_size);
1001 data_offset = dict_offset
1002 +
static_cast<int64_t
>(dict_hdr_size)
1003 +
static_cast<int64_t
>(dict_compressed_size);
1007 auto data_page_result = read_page_at(data_offset, col_meta.codec);
1008 if (!data_page_result)
return data_page_result.error();
1010 auto& data_pr = data_page_result.value();
1012 int64_t num_values = 0;
1014 data_pr.header.data_page_header.has_value()) {
1015 num_values = data_pr.header.data_page_header->num_values;
1017 data_pr.header.data_page_header_v2.has_value()) {
1018 num_values = data_pr.header.data_page_header_v2->num_values;
1020 num_values = col_meta.num_values;
1022 if (num_values < 0 || num_values > 100'000'000)
1025 DictionaryDecoder<T> decoder(dict_pr.data, dict_pr.size,
1026 num_dict_entries, col_meta.type);
1028 return decoder.decode(data_pr.data, data_pr.size,
1029 static_cast<size_t>(num_values));
1033 template <
typename T>
1034 expected<std::vector<T>> read_column_delta(
1035 const thrift::ColumnMetaData& col_meta) {
1036 auto page_result = read_page_at(col_meta.data_page_offset,
1038 if (!page_result)
return page_result.error();
1040 auto& pr = page_result.value();
1042 int64_t num_values = 0;
1044 pr.header.data_page_header.has_value()) {
1045 num_values = pr.header.data_page_header->num_values;
1047 pr.header.data_page_header_v2.has_value()) {
1048 num_values = pr.header.data_page_header_v2->num_values;
1050 num_values = col_meta.num_values;
1053 if (!count_result)
return count_result.error();
1054 size_t count = *count_result;
1056 if constexpr (std::is_same_v<T, int32_t>) {
1058 }
else if constexpr (std::is_same_v<T, int64_t>) {
1062 "DELTA_BINARY_PACKED only supports INT32/INT64"};
1067 template <
typename T>
1068 expected<std::vector<T>> read_column_bss(
1069 const thrift::ColumnMetaData& col_meta) {
1070 auto page_result = read_page_at(col_meta.data_page_offset,
1072 if (!page_result)
return page_result.error();
1074 auto& pr = page_result.value();
1076 int64_t num_values = 0;
1078 pr.header.data_page_header.has_value()) {
1079 num_values = pr.header.data_page_header->num_values;
1081 pr.header.data_page_header_v2.has_value()) {
1082 num_values = pr.header.data_page_header_v2->num_values;
1084 num_values = col_meta.num_values;
1087 if (!count_result)
return count_result.error();
1088 size_t count = *count_result;
1090 if constexpr (std::is_same_v<T, float>) {
1092 }
else if constexpr (std::is_same_v<T, double>) {
1096 "BYTE_STREAM_SPLIT only supports FLOAT/DOUBLE"};
1101 expected<std::vector<bool>> read_column_rle_bool(
1102 const thrift::ColumnMetaData& col_meta) {
1103 auto page_result = read_page_at(col_meta.data_page_offset,
1105 if (!page_result)
return page_result.error();
1107 auto& pr = page_result.value();
1109 int64_t num_values = 0;
1111 pr.header.data_page_header.has_value()) {
1112 num_values = pr.header.data_page_header->num_values;
1114 pr.header.data_page_header_v2.has_value()) {
1115 num_values = pr.header.data_page_header_v2->num_values;
1117 num_values = col_meta.num_values;
1120 if (!count_result)
return count_result.error();
1121 size_t count = *count_result;
1125 pr.data, pr.size, 1, count);
1127 std::vector<bool> result;
1128 result.reserve(count);
1129 for (
size_t i = 0; i < count && i < indices.size(); ++i) {
1130 result.push_back(indices[i] != 0);
1140 static std::vector<std::string> to_string_vec(
const std::vector<bool>& vals) {
1141 std::vector<std::string> result;
1142 result.reserve(vals.size());
1143 for (
bool v : vals) {
1144 result.push_back(v ?
"true" :
"false");
1150 template <
typename T>
1151 static std::vector<std::string> to_string_vec(
const std::vector<T>& vals) {
1152 std::vector<std::string> result;
1153 result.reserve(vals.size());
1154 for (
const auto& v : vals) {
1155 result.push_back(std::to_string(v));
1161 static std::string hex_encode(
const std::vector<uint8_t>& bytes) {
1162 static constexpr char hex_chars[] =
"0123456789abcdef";
1164 result.reserve(bytes.size() * 2);
1165 for (uint8_t b : bytes) {
1166 result.push_back(hex_chars[(b >> 4) & 0x0F]);
1167 result.push_back(hex_chars[b & 0x0F]);