276 const std::filesystem::path& path) {
280 if (!mmap_result)
return mmap_result.error();
282 auto mmap = std::move(*mmap_result);
284 const uint8_t* file_data =
mmap.
data();
290 "file too small to be a valid Parquet file: " +
295 uint32_t magic_start;
296 std::memcpy(&magic_start, file_data, 4);
299 "missing PAR1 magic at start of file"};
304 std::memcpy(&magic_end, file_data + sz - 4, 4);
308 "missing PAR1/PARE magic at end of file"};
313 "MmapParquetReader does not support encrypted footers; "
314 "use ParquetReader with an EncryptionConfig instead"};
319 std::memcpy(&footer_len, file_data + sz - 8, 4);
321 if (footer_len == 0 ||
static_cast<size_t>(footer_len) > sz - 12) {
323 "invalid footer length: " + std::to_string(footer_len)};
327 size_t footer_offset = sz - 8 - footer_len;
328 const uint8_t* footer_ptr = file_data + footer_offset;
332 if (
auto r = metadata.
deserialize(dec); !r.has_value()) {
337 std::string schema_name;
338 std::vector<ColumnDescriptor> columns;
340 if (!metadata.
schema.empty()) {
341 schema_name = metadata.
schema[0].name;
343 for (
size_t i = 1; i < metadata.
schema.size(); ++i) {
344 const auto& elem = metadata.
schema[i];
347 if (elem.num_children.has_value()) {
356 if (elem.type_length.has_value()) {
359 if (elem.precision.has_value()) {
362 if (elem.scale.has_value()) {
363 cd.
scale = *elem.scale;
367 if (elem.converted_type.has_value()) {
368 cd.
logical_type = converted_type_to_logical(*elem.converted_type);
371 columns.push_back(std::move(cd));
377 reader.mmap_ = std::move(
mmap);
378 reader.metadata_ = std::move(metadata);
379 reader.schema_ =
Schema(std::move(schema_name), std::move(columns));
380 reader.created_by_ = reader.metadata_.
created_by.value_or(
"");
395 return static_cast<int64_t
>(metadata_.
row_groups.size());
399 [[nodiscard]]
const std::string&
created_by()
const {
return created_by_; }
404 static const std::vector<thrift::KeyValue> empty;
425 throw std::out_of_range(
"MmapParquetReader::row_group: index " +
426 std::to_string(index) +
" >= " +
432 static_cast<int64_t
>(index)};
443 size_t row_group_index,
size_t column_index)
const {
444 if (row_group_index >= metadata_.
row_groups.size())
return nullptr;
445 const auto& rg = metadata_.
row_groups[row_group_index];
446 if (column_index >= rg.columns.size())
return nullptr;
447 const auto& chunk = rg.columns[column_index];
448 if (!chunk.meta_data.has_value())
return nullptr;
449 if (!chunk.meta_data->statistics.has_value())
return nullptr;
450 return &(*chunk.meta_data->statistics);
466 template <
typename T>
468 size_t column_index) {
470 if (row_group_index >= metadata_.
row_groups.size()) {
472 "row group index out of range"};
476 "column index out of range"};
479 const auto& rg = metadata_.
row_groups[row_group_index];
480 if (column_index >= rg.columns.size()) {
482 "column index out of range"};
485 const auto& chunk = rg.columns[column_index];
486 if (!chunk.meta_data.has_value()) {
488 "column chunk has no metadata"};
490 const auto& col_meta = *chunk.meta_data;
493 bool has_dict =
false;
496 for (
auto enc : col_meta.encodings) {
515 if constexpr (std::is_same_v<T, std::string> ||
516 std::is_same_v<T, int32_t> ||
517 std::is_same_v<T, int64_t> ||
518 std::is_same_v<T, float> ||
519 std::is_same_v<T, double>) {
520 return read_column_dict<T>(col_meta,
521 static_cast<int32_t
>(row_group_index));
524 "dictionary encoding not supported for this type"};
530 if constexpr (std::is_same_v<T, int32_t> ||
531 std::is_same_v<T, int64_t>) {
532 return read_column_delta<T>(col_meta);
535 "DELTA_BINARY_PACKED only supports INT32/INT64"};
541 if constexpr (std::is_same_v<T, float> ||
542 std::is_same_v<T, double>) {
543 return read_column_bss<T>(col_meta);
546 "BYTE_STREAM_SPLIT only supports FLOAT/DOUBLE"};
553 if constexpr (std::is_same_v<T, bool>) {
554 return read_column_rle_bool(col_meta);
557 "RLE boolean encoding requires bool type"};
562 auto reader_result = make_column_reader(row_group_index, column_index);
563 if (!reader_result)
return reader_result.error();
565 auto& [col_reader, num_values] = reader_result.value();
566 size_t count =
static_cast<size_t>(num_values);
568 if constexpr (std::is_same_v<T, bool>) {
569 std::vector<bool> values;
570 values.reserve(count);
571 for (
size_t i = 0; i < count; ++i) {
572 auto val = col_reader.template read<bool>();
573 if (!val)
return val.error();
574 values.push_back(*val);
578 std::vector<T> values(count);
579 auto batch_result = col_reader.template read_batch<T>(
580 values.data(), count);
581 if (!batch_result)
return batch_result.error();
598 size_t row_group_index,
size_t column_index) {
599 if (row_group_index >= metadata_.
row_groups.size()) {
610 auto res = read_column<bool>(row_group_index, column_index);
611 if (!res)
return res.error();
612 return to_string_vec(res.value());
615 auto res = read_column<int32_t>(row_group_index, column_index);
616 if (!res)
return res.error();
617 return to_string_vec(res.value());
620 auto res = read_column<int64_t>(row_group_index, column_index);
621 if (!res)
return res.error();
622 return to_string_vec(res.value());
625 auto res = read_column<float>(row_group_index, column_index);
626 if (!res)
return res.error();
627 return to_string_vec(res.value());
630 auto res = read_column<double>(row_group_index, column_index);
631 if (!res)
return res.error();
632 return to_string_vec(res.value());
635 return read_column<std::string>(row_group_index, column_index);
638 auto reader_result = make_column_reader(row_group_index, column_index);
639 if (!reader_result)
return reader_result.error();
640 auto& [col_reader, num_values] = reader_result.value();
642 std::vector<std::string> result;
643 result.reserve(
static_cast<size_t>(num_values));
644 for (int64_t i = 0; i < num_values; ++i) {
645 auto bytes_result = col_reader.read_bytes();
646 if (!bytes_result)
return bytes_result.error();
647 result.push_back(hex_encode(bytes_result.value()));
653 "unsupported physical type for string conversion"};
667 static constexpr size_t MAX_READ_ALL_ROWS = 100'000'000;
670 static_cast<size_t>(metadata_.
num_rows) > MAX_READ_ALL_ROWS) {
672 "read_all: num_rows exceeds safety cap ("
673 + std::to_string(MAX_READ_ALL_ROWS) +
")"};
675 std::vector<std::vector<std::string>> rows;
676 rows.reserve(
static_cast<size_t>(metadata_.
num_rows));
678 for (
size_t rg = 0; rg < metadata_.
row_groups.size(); ++rg) {
680 size_t rg_num_cols = num_cols;
681 std::vector<std::vector<std::string>> col_data(rg_num_cols);
682 for (
size_t c = 0; c < rg_num_cols; ++c) {
684 if (!res)
return res.error();
685 col_data[c] = std::move(res.value());
688 if (col_data.empty() || col_data[0].empty())
continue;
690 size_t rg_rows = col_data[0].size();
691 for (
size_t r = 0; r < rg_rows; ++r) {
692 std::vector<std::string> row(num_cols);
693 for (
size_t c = 0; c < num_cols; ++c) {
694 if (r < col_data[c].size()) {
695 row[c] = col_data[c][r];
698 rows.push_back(std::move(row));
721 thrift::FileMetaData metadata_;
723 std::
string created_by_;
726 std::vector<std::vector<uint8_t>> decompressed_buffers_;
729 struct ColumnReaderWithCount {
735 struct PageReadResult {
742 expected<PageReadResult> read_page_at(int64_t offset,
Compression codec) {
743 if (offset < 0 ||
static_cast<size_t>(offset) >= mmap_.
size()) {
745 "page offset out of file bounds"};
748 size_t remaining = mmap_.
size() -
static_cast<size_t>(offset);
749 const uint8_t* page_start = mmap_.
data_at(
static_cast<size_t>(offset));
751 thrift::CompactDecoder page_dec(page_start, remaining);
752 thrift::PageHeader ph;
753 if (
auto r = ph.deserialize(page_dec); !r.has_value()) {
757 size_t hdr_size = page_dec.position();
760 if (ph.compressed_page_size < 0 || ph.uncompressed_page_size < 0) {
762 "mmap: negative page size in PageHeader"};
765 size_t compressed_size =
static_cast<size_t>(ph.compressed_page_size);
766 const uint8_t* pdata = page_start + hdr_size;
768 if (hdr_size + compressed_size > remaining) {
770 "page data extends past end of file"};
773 size_t pdata_size = compressed_size;
775 if (ph.crc.has_value()) {
776 uint32_t expected_crc =
static_cast<uint32_t
>(*ph.crc);
778 if (computed_crc != expected_crc) {
780 "mmap: page CRC-32 mismatch at offset " + std::to_string(offset)};
786 size_t uncompressed_size =
static_cast<size_t>(
787 ph.uncompressed_page_size);
789 static constexpr size_t MMAP_MAX_PAGE_SIZE2 = 256ULL * 1024ULL * 1024ULL;
790 if (uncompressed_size == 0 || uncompressed_size > MMAP_MAX_PAGE_SIZE2) {
792 "mmap: uncompressed page size out of range (0 or > 256 MB)"};
796 if (compressed_size > 0 && uncompressed_size / compressed_size > 1024) {
798 "mmap: decompression ratio exceeds 1024x limit"};
800 auto dec_result =
decompress(codec, pdata, pdata_size,
804 "decompression failed: " +
805 dec_result.error().message};
807 decompressed_buffers_.push_back(std::move(dec_result.value()));
808 pdata = decompressed_buffers_.back().data();
809 pdata_size = decompressed_buffers_.back().size();
812 return PageReadResult{pdata, pdata_size, std::move(ph)};
816 expected<ColumnReaderWithCount> make_column_reader(
817 size_t row_group_index,
size_t column_index) {
818 if (row_group_index >= metadata_.
row_groups.size()) {
820 "row group index out of range"};
823 const auto& rg = metadata_.
row_groups[row_group_index];
824 if (column_index >= rg.columns.size()) {
826 "column index out of range"};
829 const auto& chunk = rg.columns[column_index];
830 if (!chunk.meta_data.has_value()) {
832 "column chunk has no metadata"};
835 const auto& col_meta = *chunk.meta_data;
836 int64_t offset = col_meta.data_page_offset;
838 if (offset < 0 ||
static_cast<size_t>(offset) >= mmap_.
size()) {
840 "data_page_offset out of file bounds"};
844 size_t remaining = mmap_.
size() -
static_cast<size_t>(offset);
845 const uint8_t* page_start = mmap_.
data_at(
static_cast<size_t>(offset));
848 thrift::CompactDecoder page_dec(page_start, remaining);
849 thrift::PageHeader page_header;
850 if (
auto r = page_header.deserialize(page_dec); !r.has_value()) {
854 size_t header_size = page_dec.position();
855 size_t page_data_size =
static_cast<size_t>(
856 page_header.compressed_page_size);
857 const uint8_t* page_data = page_start + header_size;
859 if (page_header.crc.has_value()) {
860 uint32_t expected_crc =
static_cast<uint32_t
>(*page_header.crc);
862 if (computed_crc != expected_crc) {
864 "mmap: page CRC-32 mismatch"};
868 if (header_size + page_data_size > remaining) {
870 "page data extends past end of file"};
875 size_t uncompressed_size =
static_cast<size_t>(
876 page_header.uncompressed_page_size);
878 static constexpr size_t MMAP_MAX_PAGE_SIZE = 256ULL * 1024ULL * 1024ULL;
879 if (uncompressed_size == 0 || uncompressed_size > MMAP_MAX_PAGE_SIZE) {
881 "mmap: uncompressed page size out of range (0 or > 256 MB)"};
885 if (page_data_size > 0 && uncompressed_size / page_data_size > 1024) {
887 "mmap: decompression ratio exceeds 1024x limit"};
889 auto decompressed =
decompress(col_meta.codec,
890 page_data, page_data_size,
894 "decompression failed: " +
895 decompressed.error().message};
897 decompressed_buffers_.push_back(std::move(decompressed.value()));
898 page_data = decompressed_buffers_.back().data();
899 page_data_size = decompressed_buffers_.back().size();
903 int64_t num_values = 0;
905 page_header.data_page_header.has_value()) {
906 num_values = page_header.data_page_header->num_values;
908 page_header.data_page_header_v2.has_value()) {
909 num_values = page_header.data_page_header_v2->num_values;
911 num_values = col_meta.num_values;
914 if (num_values < 0 || num_values > MMAP_MAX_VALUES_PER_PAGE) {
916 "mmap: num_values out of range (" +
917 std::to_string(num_values) +
")"};
922 int32_t type_length = -1;
927 ColumnReader col_reader(pt, page_data, page_data_size,
928 num_values, type_length);
930 return ColumnReaderWithCount{std::move(col_reader), num_values};
934 template <
typename T>
935 expected<std::vector<T>> read_column_dict(
936 const thrift::ColumnMetaData& col_meta,
937 int32_t rg_index = 0) {
940 int64_t dict_offset = col_meta.dictionary_page_offset.value_or(
941 col_meta.data_page_offset);
944 auto dict_page_result = read_page_at(dict_offset, col_meta.codec);
945 if (!dict_page_result)
return dict_page_result.error();
947 auto& dict_pr = dict_page_result.value();
949 !dict_pr.header.dictionary_page_header.has_value()) {
951 "expected DICTIONARY_PAGE at dictionary offset"};
954 int32_t raw_dict_count = dict_pr.header.dictionary_page_header->num_values;
955 if (raw_dict_count < 0 || raw_dict_count > 10'000'000) {
957 "mmap: dictionary page num_values out of valid range"};
959 size_t num_dict_entries =
static_cast<size_t>(raw_dict_count);
962 int64_t data_offset = col_meta.data_page_offset;
963 if (data_offset == dict_offset) {
966 size_t dict_raw_start =
static_cast<size_t>(dict_offset);
967 const uint8_t* dict_start = mmap_.
data_at(dict_raw_start);
968 size_t dict_remaining = mmap_.
size() - dict_raw_start;
970 thrift::CompactDecoder hdr_dec(dict_start, dict_remaining);
971 thrift::PageHeader tmp_hdr;
972 if (
auto r = tmp_hdr.deserialize(hdr_dec); !r.has_value()) {
975 size_t dict_hdr_size = hdr_dec.position();
976 size_t dict_compressed_size =
static_cast<size_t>(
977 tmp_hdr.compressed_page_size);
979 data_offset = dict_offset
980 +
static_cast<int64_t
>(dict_hdr_size)
981 +
static_cast<int64_t
>(dict_compressed_size);
985 auto data_page_result = read_page_at(data_offset, col_meta.codec);
986 if (!data_page_result)
return data_page_result.error();
988 auto& data_pr = data_page_result.value();
990 int64_t num_values = 0;
992 data_pr.header.data_page_header.has_value()) {
993 num_values = data_pr.header.data_page_header->num_values;
995 data_pr.header.data_page_header_v2.has_value()) {
996 num_values = data_pr.header.data_page_header_v2->num_values;
998 num_values = col_meta.num_values;
1000 if (num_values < 0 || num_values > 100'000'000)
1003 DictionaryDecoder<T> decoder(dict_pr.data, dict_pr.size,
1004 num_dict_entries, col_meta.type);
1006 return decoder.decode(data_pr.data, data_pr.size,
1007 static_cast<size_t>(num_values));
1011 template <
typename T>
1012 expected<std::vector<T>> read_column_delta(
1013 const thrift::ColumnMetaData& col_meta) {
1014 auto page_result = read_page_at(col_meta.data_page_offset,
1016 if (!page_result)
return page_result.error();
1018 auto& pr = page_result.value();
1020 int64_t num_values = 0;
1022 pr.header.data_page_header.has_value()) {
1023 num_values = pr.header.data_page_header->num_values;
1025 pr.header.data_page_header_v2.has_value()) {
1026 num_values = pr.header.data_page_header_v2->num_values;
1028 num_values = col_meta.num_values;
1031 if (!count_result)
return count_result.error();
1032 size_t count = *count_result;
1034 if constexpr (std::is_same_v<T, int32_t>) {
1036 }
else if constexpr (std::is_same_v<T, int64_t>) {
1040 "DELTA_BINARY_PACKED only supports INT32/INT64"};
1045 template <
typename T>
1046 expected<std::vector<T>> read_column_bss(
1047 const thrift::ColumnMetaData& col_meta) {
1048 auto page_result = read_page_at(col_meta.data_page_offset,
1050 if (!page_result)
return page_result.error();
1052 auto& pr = page_result.value();
1054 int64_t num_values = 0;
1056 pr.header.data_page_header.has_value()) {
1057 num_values = pr.header.data_page_header->num_values;
1059 pr.header.data_page_header_v2.has_value()) {
1060 num_values = pr.header.data_page_header_v2->num_values;
1062 num_values = col_meta.num_values;
1065 if (!count_result)
return count_result.error();
1066 size_t count = *count_result;
1068 if constexpr (std::is_same_v<T, float>) {
1070 }
else if constexpr (std::is_same_v<T, double>) {
1074 "BYTE_STREAM_SPLIT only supports FLOAT/DOUBLE"};
1079 expected<std::vector<bool>> read_column_rle_bool(
1080 const thrift::ColumnMetaData& col_meta) {
1081 auto page_result = read_page_at(col_meta.data_page_offset,
1083 if (!page_result)
return page_result.error();
1085 auto& pr = page_result.value();
1087 int64_t num_values = 0;
1089 pr.header.data_page_header.has_value()) {
1090 num_values = pr.header.data_page_header->num_values;
1092 pr.header.data_page_header_v2.has_value()) {
1093 num_values = pr.header.data_page_header_v2->num_values;
1095 num_values = col_meta.num_values;
1098 if (!count_result)
return count_result.error();
1099 size_t count = *count_result;
1103 pr.data, pr.size, 1, count);
1105 std::vector<bool> result;
1106 result.reserve(count);
1107 for (
size_t i = 0; i < count && i < indices.size(); ++i) {
1108 result.push_back(indices[i] != 0);
1118 static std::vector<std::string> to_string_vec(
const std::vector<bool>& vals) {
1119 std::vector<std::string> result;
1120 result.reserve(vals.size());
1121 for (
bool v : vals) {
1122 result.push_back(v ?
"true" :
"false");
1128 template <
typename T>
1129 static std::vector<std::string> to_string_vec(
const std::vector<T>& vals) {
1130 std::vector<std::string> result;
1131 result.reserve(vals.size());
1132 for (
const auto& v : vals) {
1133 result.push_back(std::to_string(v));
1139 static std::string hex_encode(
const std::vector<uint8_t>& bytes) {
1140 static constexpr char hex_chars[] =
"0123456789abcdef";
1142 result.reserve(bytes.size() * 2);
1143 for (uint8_t b : bytes) {
1144 result.push_back(hex_chars[(b >> 4) & 0x0F]);
1145 result.push_back(hex_chars[b & 0x0F]);