190 const std::filesystem::path& path
191#
if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
192 ,
const std::optional<crypto::EncryptionConfig>& encryption = std::nullopt
201 auto file_size = std::filesystem::file_size(path, ec);
204 "cannot determine file size: " + path.string() +
208 static constexpr int64_t MAX_FILE_SIZE = INT64_C(4) * 1024 * 1024 * 1024;
209 if (
static_cast<int64_t
>(file_size) > MAX_FILE_SIZE) {
211 "File exceeds 4 GB limit; use MmapParquetReader for large files"};
214 if (file_size < 12) {
217 "file too small to be a valid Parquet file: " +
221 std::ifstream ifs(path, std::ios::binary);
224 "cannot open file: " + path.string()};
227 std::vector<uint8_t> file_data(
static_cast<size_t>(file_size));
228 ifs.read(
reinterpret_cast<char*
>(file_data.data()),
229 static_cast<std::streamsize
>(file_size));
232 "failed to read file: " + path.string()};
237 const size_t sz = file_data.size();
239 uint32_t magic_start;
240 std::memcpy(&magic_start, file_data.data(), 4);
243 "missing PAR1 magic at start of file"};
248 std::memcpy(&magic_end, file_data.data() + sz - 4, 4);
254 "missing PAR1/PARE magic at end of file"};
259 std::memcpy(&footer_len, file_data.data() + sz - 8, 4);
261 if (footer_len == 0 ||
static_cast<size_t>(footer_len) > sz - 12) {
263 "invalid footer length: " + std::to_string(footer_len)};
267#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
268 std::unique_ptr<crypto::FileDecryptor> decryptor;
269 if (encrypted_footer || encryption) {
272 "file has encrypted footer (PARE magic) but no "
273 "encryption config was provided"};
275 decryptor = std::make_unique<crypto::FileDecryptor>(*encryption);
278 if (encrypted_footer) {
280 "encrypted footer (PARE) requires commercial build and license"};
285 size_t footer_offset = sz - 8 - footer_len;
286 const uint8_t* footer_ptr = file_data.data() + footer_offset;
287 size_t footer_size = footer_len;
290 std::vector<uint8_t> decrypted_footer_buf;
291 if (encrypted_footer) {
292#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
293 auto dec_result = decryptor->decrypt_footer(footer_ptr, footer_size);
294 if (!dec_result)
return dec_result.error();
295 decrypted_footer_buf = std::move(*dec_result);
296 footer_ptr = decrypted_footer_buf.data();
297 footer_size = decrypted_footer_buf.size();
300 "encrypted footer (PARE) requires commercial build and license"};
307 if (
auto r = metadata.
deserialize(dec); !r.has_value()) {
314 std::string schema_name;
315 std::vector<ColumnDescriptor> columns;
317 if (!metadata.
schema.empty()) {
318 schema_name = metadata.
schema[0].name;
320 for (
size_t i = 1; i < metadata.
schema.size(); ++i) {
321 const auto& elem = metadata.
schema[i];
324 if (elem.num_children.has_value()) {
333 if (elem.type_length.has_value()) {
336 if (elem.precision.has_value()) {
339 if (elem.scale.has_value()) {
340 cd.
scale = *elem.scale;
344 if (elem.converted_type.has_value()) {
345 cd.
logical_type = converted_type_to_logical(*elem.converted_type);
348 columns.push_back(std::move(cd));
354 reader.file_data_ = std::move(file_data);
355 reader.metadata_ = std::move(metadata);
356 reader.schema_ =
Schema(std::move(schema_name), std::move(columns));
357 reader.created_by_ = reader.metadata_.
created_by.value_or(
"");
358#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
359 reader.decryptor_ = std::move(decryptor);
378 return static_cast<int64_t
>(metadata_.
row_groups.size());
386 [[nodiscard]]
const std::string&
created_by()
const {
return created_by_; }
394 static const std::vector<thrift::KeyValue> empty;
419 throw std::out_of_range(
"ParquetReader::row_group: index " +
420 std::to_string(index) +
" >= " +
426 static_cast<int64_t
>(index)};
453 for (
size_t c = 0; c < schema_.
num_columns(); ++c) {
460 int64_t total_uncompressed = 0;
461 int64_t total_compressed = 0;
464 for (
size_t c = 0; c < rg.columns.size() && c < schema_.
num_columns(); ++c) {
465 const auto& cc = rg.columns[c];
466 if (!cc.meta_data.has_value())
continue;
467 const auto& cmd = *cc.meta_data;
470 col.uncompressed_bytes += cmd.total_uncompressed_size;
471 col.compressed_bytes += cmd.total_compressed_size;
472 col.num_values += cmd.num_values;
473 col.compression = cmd.codec;
475 if (cmd.statistics.has_value() && cmd.statistics->null_count.has_value()) {
476 col.null_count += *cmd.statistics->null_count;
480 if (cc.bloom_filter_offset.has_value() && *cc.bloom_filter_offset >= 0) {
481 col.has_bloom_filter =
true;
484 if (cc.column_index_offset.has_value() && *cc.column_index_offset >= 0) {
485 col.has_page_index =
true;
488 total_uncompressed += cmd.total_uncompressed_size;
489 total_compressed += cmd.total_compressed_size;
493 if (total_compressed > 0) {
495 /
static_cast<double>(total_compressed);
512 static std::once_flag codec_flag;
513 std::call_once(codec_flag, [] {
515#ifdef SIGNET_HAS_ZSTD
516 register_zstd_codec();
519 register_lz4_codec();
521#ifdef SIGNET_HAS_GZIP
522 register_gzip_codec();
553 template <
typename T>
555 size_t column_index) {
557 if (row_group_index >= metadata_.
row_groups.size()) {
559 "row group index out of range"};
563 "column index out of range"};
566 const auto& rg = metadata_.
row_groups[row_group_index];
567 if (column_index >= rg.columns.size()) {
569 "column index out of range"};
572 const auto& chunk = rg.columns[column_index];
573 if (!chunk.meta_data.has_value()) {
575 "column chunk has no metadata"};
577 const auto& col_meta = *chunk.meta_data;
580 bool has_dict =
false;
583 for (
auto enc : col_meta.encodings) {
599 const std::string& col_name = col_meta.path_in_schema.empty()
601 : col_meta.path_in_schema[0];
606 if constexpr (std::is_same_v<T, std::string> ||
607 std::is_same_v<T, int32_t> ||
608 std::is_same_v<T, int64_t> ||
609 std::is_same_v<T, float> ||
610 std::is_same_v<T, double>) {
611 return read_column_dict<T>(col_meta, col_name,
612 static_cast<int32_t
>(row_group_index));
615 "dictionary encoding not supported for this type"};
621 if constexpr (std::is_same_v<T, int32_t> ||
622 std::is_same_v<T, int64_t>) {
623 return read_column_delta<T>(col_meta, col_name,
624 static_cast<int32_t
>(row_group_index));
627 "DELTA_BINARY_PACKED only supports INT32/INT64"};
633 if constexpr (std::is_same_v<T, float> ||
634 std::is_same_v<T, double>) {
635 return read_column_bss<T>(col_meta, col_name,
636 static_cast<int32_t
>(row_group_index));
639 "BYTE_STREAM_SPLIT only supports FLOAT/DOUBLE"};
646 if constexpr (std::is_same_v<T, bool>) {
647 return read_column_rle_bool<T>(col_meta, col_name,
648 static_cast<int32_t
>(row_group_index));
651 "RLE boolean encoding requires bool type"};
656 auto reader_result = make_column_reader(row_group_index, column_index);
657 if (!reader_result)
return reader_result.error();
659 auto& [col_reader, num_values] = reader_result.value();
661 if (!count_result)
return count_result.error();
662 size_t count = *count_result;
664 if constexpr (std::is_same_v<T, bool>) {
666 std::vector<bool> values;
667 values.reserve(count);
668 for (
size_t i = 0; i < count; ++i) {
669 auto val = col_reader.template read<bool>();
670 if (!val)
return val.error();
671 values.push_back(*val);
675 std::vector<T> values(count);
676 auto batch_result = col_reader.template read_batch<T>(
677 values.data(), count);
678 if (!batch_result)
return batch_result.error();
699 size_t row_group_index,
size_t column_index) {
700 if (row_group_index >= metadata_.
row_groups.size()) {
711 auto res = read_column<bool>(row_group_index, column_index);
712 if (!res)
return res.error();
713 return to_string_vec(res.value());
716 auto res = read_column<int32_t>(row_group_index, column_index);
717 if (!res)
return res.error();
718 return to_string_vec(res.value());
721 auto res = read_column<int64_t>(row_group_index, column_index);
722 if (!res)
return res.error();
723 return to_string_vec(res.value());
726 auto res = read_column<float>(row_group_index, column_index);
727 if (!res)
return res.error();
728 return to_string_vec(res.value());
731 auto res = read_column<double>(row_group_index, column_index);
732 if (!res)
return res.error();
733 return to_string_vec(res.value());
737 return read_column<std::string>(row_group_index, column_index);
741 auto reader_result = make_column_reader(row_group_index, column_index);
742 if (!reader_result)
return reader_result.error();
743 auto& [col_reader, num_values] = reader_result.value();
745 std::vector<std::string> result;
746 result.reserve(
static_cast<size_t>(num_values));
747 for (int64_t i = 0; i < num_values; ++i) {
748 auto bytes_result = col_reader.read_bytes();
749 if (!bytes_result)
return bytes_result.error();
750 result.push_back(hex_encode(bytes_result.value()));
756 "unsupported physical type for string conversion"};
775 size_t row_group_index) {
777 std::vector<std::vector<std::string>> columns(num_cols);
779 for (
size_t c = 0; c < num_cols; ++c) {
781 if (!res)
return res.error();
782 columns[c] = std::move(res.value());
807 if (
static_cast<uint64_t
>(safe_rows) > 1024ULL * 1024 * 1024)
809 std::vector<std::vector<std::string>> rows;
810 rows.reserve(
static_cast<size_t>(safe_rows));
812 for (
size_t rg = 0; rg < metadata_.
row_groups.size(); ++rg) {
814 if (!cols_result)
return cols_result.error();
816 const auto& col_data = cols_result.value();
817 if (col_data.empty())
continue;
819 size_t rg_rows = col_data[0].size();
820 for (
size_t r = 0; r < rg_rows; ++r) {
821 std::vector<std::string> row(num_cols);
822 for (
size_t c = 0; c < num_cols; ++c) {
823 if (r < col_data[c].size()) {
824 row[c] = col_data[c][r];
827 rows.push_back(std::move(row));
851 const std::vector<std::string>& column_names) {
853 std::vector<size_t> indices;
854 indices.reserve(column_names.size());
856 for (
const auto& name : column_names) {
858 if (!idx.has_value()) {
860 "column not found: " + name};
862 indices.push_back(*idx);
866 size_t proj_cols = indices.size();
867 std::vector<std::vector<std::string>> result(proj_cols);
869 for (
size_t rg = 0; rg < metadata_.
row_groups.size(); ++rg) {
870 for (
size_t p = 0; p < proj_cols; ++p) {
872 if (!col_result)
return col_result.error();
874 auto& col_vec = col_result.value();
875 result[p].insert(result[p].end(),
876 std::make_move_iterator(col_vec.begin()),
877 std::make_move_iterator(col_vec.end()));
900 size_t column_index)
const {
901 if (row_group_index >= metadata_.
row_groups.size())
return nullptr;
902 const auto& rg = metadata_.
row_groups[row_group_index];
903 if (column_index >= rg.columns.size())
return nullptr;
904 const auto& chunk = rg.columns[column_index];
905 if (!chunk.meta_data.has_value())
return nullptr;
906 if (!chunk.meta_data->statistics.has_value())
return nullptr;
907 return &(*chunk.meta_data->statistics);
926 size_t row_group_index,
size_t column_index)
const {
927 if (row_group_index >= metadata_.
row_groups.size()) {
929 "row group index out of range"};
932 const auto& rg = metadata_.
row_groups[row_group_index];
933 if (column_index >= rg.columns.size()) {
935 "column index out of range"};
938 const auto& chunk = rg.columns[column_index];
939 if (!chunk.bloom_filter_offset.has_value()) {
941 "no bloom filter for this column chunk"};
944 int64_t bf_offset = *chunk.bloom_filter_offset;
945 if (bf_offset < 0 ||
static_cast<size_t>(bf_offset) + 4 > file_data_.size()) {
947 "bloom filter offset out of file bounds"};
951 uint32_t bf_size = 0;
952 std::memcpy(&bf_size, file_data_.data() + bf_offset, 4);
954 size_t data_start =
static_cast<size_t>(bf_offset) + 4;
955 if (data_start + bf_size > file_data_.size()) {
957 "bloom filter data extends past end of file"};
962 "invalid bloom filter size: " + std::to_string(bf_size)};
966 file_data_.data() + data_start, bf_size);
983 template <
typename T>
985 size_t row_group_index,
size_t column_index,
986 const T& value)
const {
991 return bf_result->might_contain_value(value);
1011 size_t row_group_index,
size_t column_index)
const {
1012 if (row_group_index >= metadata_.
row_groups.size())
1014 const auto& rg = metadata_.
row_groups[row_group_index];
1015 if (column_index >= rg.columns.size())
1018 const auto& chunk = rg.columns[column_index];
1019 if (!chunk.column_index_offset.has_value() || !chunk.column_index_length.has_value())
1022 int64_t ci_offset = *chunk.column_index_offset;
1023 int32_t ci_length = *chunk.column_index_length;
1024 if (ci_offset < 0 || ci_length < 0)
1026 auto uoff =
static_cast<size_t>(ci_offset);
1027 auto ulen =
static_cast<size_t>(ci_length);
1028 if (uoff > file_data_.size() || ulen > file_data_.size() - uoff)
1032 static_cast<size_t>(ci_length));
1053 size_t row_group_index,
size_t column_index)
const {
1054 if (row_group_index >= metadata_.
row_groups.size())
1056 const auto& rg = metadata_.
row_groups[row_group_index];
1057 if (column_index >= rg.columns.size())
1060 const auto& chunk = rg.columns[column_index];
1061 if (!chunk.offset_index_offset.has_value() || !chunk.offset_index_length.has_value())
1064 int64_t oi_offset = *chunk.offset_index_offset;
1065 int32_t oi_length = *chunk.offset_index_length;
1066 if (oi_offset < 0 || oi_length < 0)
1068 auto uoff2 =
static_cast<size_t>(oi_offset);
1069 auto ulen2 =
static_cast<size_t>(oi_length);
1070 if (uoff2 > file_data_.size() || ulen2 > file_data_.size() - uoff2)
1074 static_cast<size_t>(oi_length));
1090 [[nodiscard]]
bool has_page_index(
size_t row_group_index,
size_t column_index)
const {
1091 if (row_group_index >= metadata_.
row_groups.size())
return false;
1092 const auto& rg = metadata_.
row_groups[row_group_index];
1093 if (column_index >= rg.columns.size())
return false;
1094 const auto& chunk = rg.columns[column_index];
1095 return chunk.column_index_offset.has_value() && chunk.offset_index_offset.has_value();
1113 std::vector<uint8_t> file_data_;
1114 thrift::FileMetaData metadata_;
1116 std::
string created_by_;
1117#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1118 std::unique_ptr<crypto::FileDecryptor> decryptor_;
1122 std::vector<std::vector<uint8_t>> decompressed_buffers_;
1125 struct ColumnReaderWithCount {
1132 size_t row_group_index,
size_t column_index) {
1133 if (row_group_index >= metadata_.row_groups.size()) {
1135 "row group index out of range"};
1138 const auto& rg = metadata_.row_groups[row_group_index];
1139 if (column_index >= rg.columns.size()) {
1141 "column index out of range"};
1144 const auto& chunk = rg.columns[column_index];
1145 if (!chunk.meta_data.has_value()) {
1147 "column chunk has no metadata"};
1150 const auto& col_meta = *chunk.meta_data;
1156 int64_t offset = col_meta.data_page_offset;
1157 if (offset < 0 ||
static_cast<size_t>(offset) >= file_data_.size()) {
1159 "data_page_offset out of file bounds"};
1162 size_t remaining = file_data_.size() -
static_cast<size_t>(offset);
1163 const uint8_t* page_start = file_data_.data() + offset;
1165 const std::string col_name = col_meta.path_in_schema.empty()
1167 : col_meta.path_in_schema[0];
1169 thrift::PageHeader page_header;
1170 size_t header_size = 0;
1171#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1172 const bool column_encrypted = decryptor_ &&
1173 decryptor_has_column_key(decryptor_->config(), col_name);
1175 if (!column_encrypted) {
1177 "encrypted page header encountered without matching decryptor for column '" +
1180 const uint32_t encrypted_header_size =
load_le32(page_start + 4);
1181 if (encrypted_header_size == 0 ||
1182 static_cast<size_t>(encrypted_header_size) > remaining - 8) {
1184 "encrypted page header size out of range"};
1186 auto header_result = decryptor_->decrypt_data_page_header(
1187 page_start + 8, encrypted_header_size, col_name,
1188 static_cast<int32_t
>(row_group_index), 0);
1189 if (!header_result)
return header_result.error();
1191 thrift::CompactDecoder page_dec(header_result->data(), header_result->size());
1192 if (
auto r = page_header.deserialize(page_dec); !r.has_value()) {
1195 if (page_dec.position() != header_result->size()) {
1197 "encrypted page header contains trailing bytes"};
1199 header_size = 8 +
static_cast<size_t>(encrypted_header_size);
1207 thrift::CompactDecoder page_dec(page_start, remaining);
1208 if (
auto r = page_header.deserialize(page_dec); !r.has_value()) {
1211 header_size = page_dec.position();
1213 if (page_header.compressed_page_size < 0) {
1215 "negative compressed_page_size"};
1217 size_t page_data_size =
static_cast<size_t>(
1218 page_header.compressed_page_size);
1219 const uint8_t* page_data = page_start + header_size;
1221 if (header_size + page_data_size > remaining) {
1223 "page data extends past end of file"};
1227 if (page_header.crc.has_value()) {
1228 uint32_t expected_crc =
static_cast<uint32_t
>(*page_header.crc);
1230 if (computed_crc != expected_crc) {
1232 "Page CRC-32 mismatch at offset "
1233 + std::to_string(offset) +
": expected 0x"
1235 char buf[9]; std::snprintf(buf,
sizeof(buf),
"%08X", expected_crc);
1236 return std::string(buf);
1240 char buf[9]; std::snprintf(buf,
sizeof(buf),
"%08X", computed_crc);
1241 return std::string(buf);
1247#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1248 if (decryptor_ && decryptor_has_column_key(decryptor_->config(), col_name)) {
1250 ? decryptor_->decrypt_dict_page(
1251 page_data, page_data_size, col_name,
1252 static_cast<int32_t
>(row_group_index))
1253 : decryptor_->decrypt_column_page(
1254 page_data, page_data_size, col_name,
1255 static_cast<int32_t>(row_group_index), 0);
1256 if (!dec_result)
return dec_result.error();
1257 decompressed_buffers_.push_back(std::move(*dec_result));
1258 page_data = decompressed_buffers_.back().data();
1259 page_data_size = decompressed_buffers_.back().size();
1265 size_t uncompressed_size =
static_cast<size_t>(
1266 page_header.uncompressed_page_size);
1267 if (uncompressed_size == 0 || uncompressed_size > PARQUET_MAX_PAGE_SIZE) {
1269 "ParquetReader: uncompressed_page_size exceeds 256 MB hard cap"};
1271 auto decompressed =
decompress(col_meta.codec,
1272 page_data, page_data_size,
1274 if (!decompressed) {
1276 "decompression failed: " +
1277 decompressed.error().message};
1279 decompressed_buffers_.push_back(std::move(decompressed.value()));
1280 page_data = decompressed_buffers_.back().data();
1281 page_data_size = decompressed_buffers_.back().size();
1285 int64_t num_values = 0;
1287 page_header.data_page_header.has_value()) {
1288 num_values = page_header.data_page_header->num_values;
1290 page_header.data_page_header_v2.has_value()) {
1291 num_values = page_header.data_page_header_v2->num_values;
1294 num_values = col_meta.num_values;
1297 if (num_values < 0 || num_values > MAX_VALUES_PER_PAGE) {
1299 "num_values out of valid range"};
1304 int32_t type_length = -1;
1311 size_t elem_size = 0;
1319 elem_size = (type_length > 0) ?
static_cast<size_t>(type_length) : 1;
1321 default: elem_size = 0;
break;
1324 if (elem_size > 0) {
1325 size_t decoded_size =
static_cast<size_t>(num_values) * elem_size;
1326 if (decoded_size > 256ULL * 1024 * 1024) {
1328 "decoded page exceeds 256 MB memory limit"};
1333 ColumnReader col_reader(pt, page_data, page_data_size,
1334 num_values, type_length);
1336 return ColumnReaderWithCount{std::move(col_reader), num_values};
1340 struct PageReadResult {
1341 const uint8_t* data;
1343 thrift::PageHeader header;
1347 expected<PageReadResult> read_page_at(int64_t offset,
Compression codec,
1348 const std::string& col_name =
"",
1349 int32_t rg_index = 0, int32_t page_ordinal = 0) {
1350 if (offset < 0 ||
static_cast<size_t>(offset) >= file_data_.size()) {
1352 "page offset out of file bounds"};
1355 size_t remaining = file_data_.size() -
static_cast<size_t>(offset);
1356 const uint8_t* page_start = file_data_.data() + offset;
1358 thrift::PageHeader ph;
1359 size_t hdr_size = 0;
1360#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1361 const bool column_encrypted = decryptor_ && !col_name.empty() &&
1362 decryptor_has_column_key(decryptor_->config(), col_name);
1364 if (!column_encrypted) {
1366 "encrypted page header encountered without matching decryptor for column '" +
1369 const uint32_t encrypted_header_size =
load_le32(page_start + 4);
1370 if (encrypted_header_size == 0 ||
1371 static_cast<size_t>(encrypted_header_size) > remaining - 8) {
1373 "encrypted page header size out of range"};
1375 auto header_result = decryptor_->decrypt_data_page_header(
1376 page_start + 8, encrypted_header_size, col_name,
1377 rg_index, page_ordinal);
1378 if (!header_result)
return header_result.error();
1380 thrift::CompactDecoder page_dec(header_result->data(), header_result->size());
1381 if (
auto r = ph.deserialize(page_dec); !r.has_value()) {
1384 if (page_dec.position() != header_result->size()) {
1386 "encrypted page header contains trailing bytes"};
1388 hdr_size = 8 +
static_cast<size_t>(encrypted_header_size);
1392 thrift::CompactDecoder page_dec(page_start, remaining);
1393 if (
auto r = ph.deserialize(page_dec); !r.has_value()) {
1396 hdr_size = page_dec.position();
1398 if (ph.compressed_page_size < 0) {
1400 "negative compressed_page_size"};
1402 size_t compressed_size =
static_cast<size_t>(ph.compressed_page_size);
1403 const uint8_t* pdata = page_start + hdr_size;
1405 if (hdr_size + compressed_size > remaining) {
1407 "page data extends past end of file"};
1410 size_t pdata_size = compressed_size;
1414 if (ph.crc.has_value()) {
1415 uint32_t expected_crc =
static_cast<uint32_t
>(*ph.crc);
1417 if (computed_crc != expected_crc) {
1419 "Page CRC-32 mismatch at offset "
1420 + std::to_string(offset) +
": expected 0x"
1422 char buf[9]; std::snprintf(buf,
sizeof(buf),
"%08X", expected_crc);
1423 return std::string(buf);
1427 char buf[9]; std::snprintf(buf,
sizeof(buf),
"%08X", computed_crc);
1428 return std::string(buf);
1434#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1435 if (decryptor_ && !col_name.empty() &&
1436 decryptor_has_column_key(decryptor_->config(), col_name)) {
1438 ? decryptor_->decrypt_dict_page(pdata, pdata_size, col_name, rg_index)
1439 : decryptor_->decrypt_column_page(pdata, pdata_size, col_name, rg_index, page_ordinal);
1440 if (!dec_result)
return dec_result.error();
1441 decompressed_buffers_.push_back(std::move(*dec_result));
1442 pdata = decompressed_buffers_.back().data();
1443 pdata_size = decompressed_buffers_.back().size();
1448 size_t uncompressed_size =
static_cast<size_t>(
1449 ph.uncompressed_page_size);
1450 if (uncompressed_size == 0 || uncompressed_size > PARQUET_MAX_PAGE_SIZE) {
1452 "ParquetReader: uncompressed_page_size exceeds 256 MB hard cap"};
1456 if (pdata_size > 0 && uncompressed_size / pdata_size > 1024) {
1458 "ParquetReader: decompression ratio exceeds 1024x limit"};
1460 auto dec_result =
decompress(codec, pdata, pdata_size,
1464 "decompression failed: " +
1465 dec_result.error().message};
1467 decompressed_buffers_.push_back(std::move(dec_result.value()));
1468 pdata = decompressed_buffers_.back().data();
1469 pdata_size = decompressed_buffers_.back().size();
1472 return PageReadResult{pdata, pdata_size, std::move(ph)};
1476 template <
typename T>
1477 expected<std::vector<T>> read_column_dict(
1478 const thrift::ColumnMetaData& col_meta,
1479 const std::string& col_name =
"",
1480 int32_t rg_index = 0) {
1482 int64_t dict_offset = col_meta.dictionary_page_offset.value_or(
1483 col_meta.data_page_offset);
1486 auto dict_page_result = read_page_at(dict_offset, col_meta.codec,
1487 col_name, rg_index, -1);
1488 if (!dict_page_result)
return dict_page_result.error();
1490 auto& dict_pr = dict_page_result.value();
1492 !dict_pr.header.dictionary_page_header.has_value()) {
1494 "expected DICTIONARY_PAGE at dictionary offset"};
1497 int32_t raw_dict_count = dict_pr.header.dictionary_page_header->num_values;
1498 if (raw_dict_count < 0 || raw_dict_count > 10'000'000) {
1500 "dictionary page num_values out of valid range"};
1502 size_t num_dict_entries =
static_cast<size_t>(raw_dict_count);
1508 int64_t data_offset = col_meta.data_page_offset;
1509 if (data_offset == dict_offset) {
1513 size_t dict_raw_start =
static_cast<size_t>(dict_offset);
1514 const uint8_t* dict_start = file_data_.data() + dict_raw_start;
1515 size_t dict_remaining = file_data_.size() - dict_raw_start;
1517 thrift::CompactDecoder hdr_dec(dict_start, dict_remaining);
1518 thrift::PageHeader tmp_hdr;
1519 if (
auto r = tmp_hdr.deserialize(hdr_dec); !r.has_value()) {
1522 size_t dict_hdr_size = hdr_dec.position();
1523 if (tmp_hdr.compressed_page_size < 0) {
1525 "negative compressed_page_size in dictionary page"};
1527 size_t dict_compressed_size =
static_cast<size_t>(
1528 tmp_hdr.compressed_page_size);
1530 data_offset = dict_offset
1531 +
static_cast<int64_t
>(dict_hdr_size)
1532 +
static_cast<int64_t
>(dict_compressed_size);
1536 auto data_page_result = read_page_at(data_offset, col_meta.codec,
1537 col_name, rg_index, 0);
1538 if (!data_page_result)
return data_page_result.error();
1540 auto& data_pr = data_page_result.value();
1542 int64_t num_values = 0;
1544 data_pr.header.data_page_header.has_value()) {
1545 num_values = data_pr.header.data_page_header->num_values;
1547 data_pr.header.data_page_header_v2.has_value()) {
1548 num_values = data_pr.header.data_page_header_v2->num_values;
1550 num_values = col_meta.num_values;
1553 if (num_values < 0 || num_values > MAX_VALUES_PER_PAGE) {
1555 "num_values out of valid range"};
1559 DictionaryDecoder<T> decoder(dict_pr.data, dict_pr.size,
1560 num_dict_entries, col_meta.type);
1562 return decoder.decode(data_pr.data, data_pr.size,
1563 static_cast<size_t>(num_values));
1567 template <
typename T>
1568 expected<std::vector<T>> read_column_delta(
1569 const thrift::ColumnMetaData& col_meta,
1570 const std::string& col_name =
"",
1571 int32_t rg_index = 0) {
1572 auto page_result = read_page_at(col_meta.data_page_offset,
1574 col_name, rg_index, 0);
1575 if (!page_result)
return page_result.error();
1577 auto& pr = page_result.value();
1579 int64_t num_values = 0;
1581 pr.header.data_page_header.has_value()) {
1582 num_values = pr.header.data_page_header->num_values;
1584 pr.header.data_page_header_v2.has_value()) {
1585 num_values = pr.header.data_page_header_v2->num_values;
1587 num_values = col_meta.num_values;
1591 if (!count_result)
return count_result.error();
1592 size_t count = *count_result;
1594 if constexpr (std::is_same_v<T, int32_t>) {
1596 }
else if constexpr (std::is_same_v<T, int64_t>) {
1600 "DELTA_BINARY_PACKED only supports INT32/INT64"};
1605 template <
typename T>
1606 expected<std::vector<T>> read_column_bss(
1607 const thrift::ColumnMetaData& col_meta,
1608 const std::string& col_name =
"",
1609 int32_t rg_index = 0) {
1610 auto page_result = read_page_at(col_meta.data_page_offset,
1612 col_name, rg_index, 0);
1613 if (!page_result)
return page_result.error();
1615 auto& pr = page_result.value();
1617 int64_t num_values = 0;
1619 pr.header.data_page_header.has_value()) {
1620 num_values = pr.header.data_page_header->num_values;
1622 pr.header.data_page_header_v2.has_value()) {
1623 num_values = pr.header.data_page_header_v2->num_values;
1625 num_values = col_meta.num_values;
1629 if (!count_result)
return count_result.error();
1630 size_t count = *count_result;
1632 if constexpr (std::is_same_v<T, float>) {
1634 }
else if constexpr (std::is_same_v<T, double>) {
1638 "BYTE_STREAM_SPLIT only supports FLOAT/DOUBLE"};
1643 template <
typename T>
1644 expected<std::vector<T>> read_column_rle_bool(
1645 const thrift::ColumnMetaData& col_meta,
1646 const std::string& col_name =
"",
1647 int32_t rg_index = 0) {
1648 auto page_result = read_page_at(col_meta.data_page_offset,
1650 col_name, rg_index, 0);
1651 if (!page_result)
return page_result.error();
1653 auto& pr = page_result.value();
1655 int64_t num_values = 0;
1657 pr.header.data_page_header.has_value()) {
1658 num_values = pr.header.data_page_header->num_values;
1660 pr.header.data_page_header_v2.has_value()) {
1661 num_values = pr.header.data_page_header_v2->num_values;
1663 num_values = col_meta.num_values;
1666 size_t count =
static_cast<size_t>(num_values);
1668 if constexpr (std::is_same_v<T, bool>) {
1671 pr.data, pr.size, 1, count);
1673 std::vector<bool> result;
1674 result.reserve(count);
1675 for (
size_t i = 0; i < count && i < indices.size(); ++i) {
1676 result.push_back(indices[i] != 0);
1681 "RLE encoding for booleans requires bool type"};
1686 static std::vector<std::string> to_string_vec(
const std::vector<bool>& vals) {
1687 std::vector<std::string> result;
1688 result.reserve(vals.size());
1689 for (
bool v : vals) {
1690 result.push_back(v ?
"true" :
"false");
1696 template <
typename T>
1697 static std::vector<std::string> to_string_vec(
const std::vector<T>& vals) {
1698 std::vector<std::string> result;
1699 result.reserve(vals.size());
1700 for (
const auto& v : vals) {
1701 result.push_back(std::to_string(v));
1707 static std::string hex_encode(
const std::vector<uint8_t>& bytes) {
1708 static constexpr char hex_chars[] =
"0123456789abcdef";
1710 result.reserve(bytes.size() * 2);
1711 for (uint8_t b : bytes) {
1712 result.push_back(hex_chars[(b >> 4) & 0x0F]);
1713 result.push_back(hex_chars[b & 0x0F]);