304 const std::filesystem::path& path,
309 writer.schema_ = schema;
310 writer.options_ = options;
314 if (path.has_parent_path()) {
316 std::filesystem::create_directories(path.parent_path(), ec);
320 writer.file_.open(path, std::ios::binary | std::ios::trunc);
321 if (!writer.file_.is_open()) {
323 "Failed to open file for writing: " + path.string()};
328 writer.file_offset_ = 4;
331 writer.init_column_writers();
334#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
335 if (options.encryption) {
336 writer.encryptor_ = std::make_unique<crypto::FileEncryptor>(*options.encryption);
341 if (options.enable_bloom_filter) {
342 writer.bloom_filters_.resize(schema.
num_columns());
346 if (options.enable_page_index) {
347 writer.col_index_builders_.resize(schema.
num_columns());
374 "Row has " + std::to_string(values.size()) +
375 " values, schema has " + std::to_string(schema_.
num_columns()) +
379 pending_rows_.push_back(values);
382 if (
static_cast<int64_t
>(pending_rows_.size()) >= options_.
row_group_size) {
418 template <
typename T>
420 const T* values,
size_t count) {
426 "Column index " + std::to_string(col_index) +
427 " out of range (schema has " +
428 std::to_string(schema_.
num_columns()) +
" columns)"};
432 constexpr PhysicalType expected_pt = parquet_type_of_v<T>;
434 if (expected_pt != actual_pt) {
436 "Type mismatch for column " + std::to_string(col_index) +
437 " (\"" + schema_.
column(col_index).
name +
"\")" +
438 ": schema physical type is " +
439 std::to_string(
static_cast<int>(actual_pt)) +
440 " but write_column<T> maps to " +
441 std::to_string(
static_cast<int>(expected_pt))};
444 col_writers_[col_index].write_batch(values, count);
445 col_row_counts_[col_index] = col_writers_[col_index].num_values();
448 if (!bloom_filters_.empty()) {
449 bloom_insert_typed(col_index, values, count);
468 const std::string* values,
475 "Column index " + std::to_string(col_index) +
484 "Type mismatch for column " + std::to_string(col_index) +
485 " (\"" + schema_.
column(col_index).
name +
"\")" +
486 ": schema physical type is " +
487 std::to_string(
static_cast<int>(actual_pt)) +
488 " but write_column was called with std::string (BYTE_ARRAY)"};
496 for (
size_t i = 0; i < count; ++i) {
497 col_writers_[col_index].write_fixed_len_byte_array(
498 reinterpret_cast<const uint8_t*
>(values[i].data()),
502 col_writers_[col_index].write_batch(values, count);
504 col_row_counts_[col_index] = col_writers_[col_index].num_values();
507 if (!bloom_filters_.empty()) {
508 bloom_insert_typed(col_index, values, count);
538 ensure_snappy_registered();
541 if (!pending_rows_.empty()) {
542 auto result = encode_pending_rows();
543 if (!result)
return result;
547 bool has_data =
false;
548 for (
size_t c = 0; c < col_writers_.size(); ++c) {
549 if (col_writers_[c].num_values() > 0) {
559 int64_t rg_num_rows = col_writers_[0].num_values();
560 for (
size_t c = 1; c < col_writers_.size(); ++c) {
561 if (col_writers_[c].num_values() != rg_num_rows) {
563 "Column " + std::to_string(c) +
" has " +
564 std::to_string(col_writers_[c].num_values()) +
565 " values, expected " + std::to_string(rg_num_rows)};
573 rg.
columns.resize(col_writers_.size());
576 for (
size_t c = 0; c < col_writers_.size(); ++c) {
577 const auto& cw = col_writers_[c];
578 const auto& col_desc = schema_.
column(c);
581 Encoding col_encoding = choose_encoding(c, col_desc, cw);
588 cw.data().data(), cw.data().size());
592 int64_t total_uncompressed = 0;
593 int64_t total_compressed = 0;
594 std::unordered_set<Encoding> used_encodings;
597 int64_t column_offset = file_offset_;
598 int64_t dict_page_offset = -1;
599 int64_t data_page_offset = -1;
607 auto dict_result = write_dictionary_column(
608 c, col_desc, cw, col_codec);
609 if (!dict_result)
return dict_result.error();
611 const auto& dict_info = *dict_result;
612 total_uncompressed = dict_info.total_uncompressed;
613 total_compressed = dict_info.total_compressed;
614 used_encodings = dict_info.used_encodings;
615 dict_page_offset = dict_info.dict_page_offset;
616 data_page_offset = dict_info.data_page_offset;
619 if (!col_index_builders_.empty()) {
620 auto& builder = col_index_builders_[c];
621 builder.start_page();
622 builder.set_first_row_index(0);
623 builder.set_page_location(dict_info.data_page_offset,
624 static_cast<int32_t
>(dict_info.total_compressed));
626 const auto& cw_stats_pi = cw.statistics();
627 if (cw_stats_pi.has_min_max()) {
628 const auto& min_b = cw_stats_pi.min_bytes();
629 const auto& max_b = cw_stats_pi.max_bytes();
630 builder.set_min(std::string(min_b.begin(), min_b.end()));
631 builder.set_max(std::string(max_b.begin(), max_b.end()));
633 builder.set_null_page(cw_stats_pi.null_count() == cw.num_values());
634 builder.set_null_count(cw_stats_pi.null_count());
640 auto encoded = encode_column_data(cw, col_encoding, col_desc.physical_type);
642 if (encoded.size() >
static_cast<size_t>(INT32_MAX)) {
644 "encoded page size exceeds int32 limit (2 GiB)"};
646 int32_t uncompressed_size =
static_cast<int32_t
>(encoded.size());
649 const uint8_t* page_data = encoded.data();
650 size_t page_data_size = encoded.size();
651 std::vector<uint8_t> compressed_buf;
652 int32_t compressed_size = uncompressed_size;
655 auto comp_result =
compress(col_codec, page_data, page_data_size);
656 if (!comp_result)
return comp_result.error();
657 compressed_buf = std::move(*comp_result);
658 if (compressed_buf.size() >
static_cast<size_t>(INT32_MAX))
660 compressed_size =
static_cast<int32_t
>(compressed_buf.size());
661 page_data = compressed_buf.data();
662 page_data_size = compressed_buf.size();
666#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
667 std::vector<uint8_t> encrypted_buf;
668 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
669 auto enc_result = encryptor_->encrypt_column_page(
670 page_data, page_data_size, col_desc.name,
671 static_cast<int32_t
>(row_groups_.size()), 0);
672 if (!enc_result)
return enc_result.error();
673 encrypted_buf = std::move(*enc_result);
674 compressed_size =
static_cast<int32_t
>(encrypted_buf.size());
675 page_data = encrypted_buf.data();
676 page_data_size = encrypted_buf.size();
689 page_data, page_data_size));
692 dph.
num_values =
static_cast<int32_t
>(cw.num_values());
701 const auto& raw_header_bytes = header_enc.
data();
703 const uint8_t* header_write_data = raw_header_bytes.data();
704 size_t header_write_size = raw_header_bytes.size();
705#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
706 std::vector<uint8_t> header_record_buf;
707 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
708 auto enc_header = encryptor_->encrypt_data_page_header(
709 raw_header_bytes.data(), raw_header_bytes.size(),
710 col_desc.name,
static_cast<int32_t
>(row_groups_.size()), 0);
711 if (!enc_header)
return enc_header.error();
713 header_write_data = header_record_buf.data();
714 header_write_size = header_record_buf.size();
718 data_page_offset = file_offset_;
720 write_raw(header_write_data, header_write_size);
721 write_raw(page_data, page_data_size);
723 total_uncompressed =
static_cast<int64_t
>(header_write_size) +
724 static_cast<int64_t
>(uncompressed_size);
725 total_compressed =
static_cast<int64_t
>(header_write_size) +
726 static_cast<int64_t
>(compressed_size);
728 used_encodings.insert(col_encoding);
731 if (!col_index_builders_.empty()) {
732 auto& builder = col_index_builders_[c];
733 builder.start_page();
734 builder.set_first_row_index(0);
735 builder.set_page_location(data_page_offset,
736 compressed_size +
static_cast<int32_t
>(header_write_size));
738 const auto& cw_stats_pi = cw.statistics();
739 if (cw_stats_pi.has_min_max()) {
740 const auto& min_b = cw_stats_pi.min_bytes();
741 const auto& max_b = cw_stats_pi.max_bytes();
742 builder.set_min(std::string(min_b.begin(), min_b.end()));
743 builder.set_max(std::string(max_b.begin(), max_b.end()));
745 builder.set_null_page(cw_stats_pi.null_count() == cw.num_values());
746 builder.set_null_count(cw_stats_pi.null_count());
755 cmd.
type = col_desc.physical_type;
757 cmd.
codec = col_codec;
764 cmd.
encodings.assign(used_encodings.begin(), used_encodings.end());
767 if (dict_page_offset >= 0) {
772 const auto& cw_stats = cw.statistics();
773 if (cw_stats.has_min_max()) {
778 const auto& min_b = cw_stats.min_bytes();
779 const auto& max_b = cw_stats.max_bytes();
780 stats.
min_value = std::string(min_b.begin(), min_b.end());
781 stats.
max_value = std::string(max_b.begin(), max_b.end());
786 if (cw_stats.distinct_count().has_value()) {
796 if (!bloom_filters_.empty() && bloom_filters_[c]) {
797 int64_t bf_offset = file_offset_;
798 const auto& bf_data = bloom_filters_[c]->data();
799 uint32_t bf_size =
static_cast<uint32_t
>(bf_data.size());
802 write_raw_le32(bf_size);
803 write_raw(bf_data.data(), bf_data.size());
812 if (!col_index_builders_.empty()) {
813 auto& builder = col_index_builders_[c];
818 col_idx.serialize(ci_enc);
819 int64_t ci_offset = file_offset_;
820 write_raw(ci_enc.
data().data(), ci_enc.
data().size());
825 auto off_idx = builder.build_offset_index();
827 off_idx.serialize(oi_enc);
828 int64_t oi_offset = file_offset_;
829 write_raw(oi_enc.
data().data(), oi_enc.
data().size());
840 row_groups_.push_back(std::move(rg));
841 total_rows_ += rg_num_rows;
844 for (
auto& cw : col_writers_) {
847 for (
auto& count : col_row_counts_) {
852 for (
auto& bf : bloom_filters_) {
857 for (
auto& builder : col_index_builders_) {
898 return flush_result.error();
914 fmd.
schema.push_back(root);
917 for (
size_t c = 0; c < schema_.
num_columns(); ++c) {
918 const auto& col_desc = schema_.
column(c);
921 elem.
type = col_desc.physical_type;
922 elem.
name = col_desc.name;
927 col_desc.type_length > 0) {
946 if (col_desc.precision > 0) elem.
precision = col_desc.precision;
947 if (col_desc.scale >= 0) elem.
scale = col_desc.scale;
950 fmd.
schema.push_back(elem);
960#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
962 auto file_props = encryptor_->file_properties();
963 auto props_bytes = file_props.serialize();
964 std::string props_str(props_bytes.begin(), props_bytes.end());
967 enc_kv.
key =
"signet.encryption.properties";
968 enc_kv.
value = std::move(props_str);
980 const auto& footer_bytes = enc.
data();
983 int64_t footer_start = file_offset_;
988#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
989 if (encryptor_ && encryptor_->config().encrypt_footer) {
990 auto enc_footer = encryptor_->encrypt_footer(
991 footer_bytes.data(), footer_bytes.size());
995 return enc_footer.error();
997 const auto& encrypted_footer = *enc_footer;
1000 write_raw(encrypted_footer.data(), encrypted_footer.size());
1003 uint32_t footer_len =
static_cast<uint32_t
>(encrypted_footer.size());
1004 write_raw_le32(footer_len);
1008 }
else if (encryptor_ && !encryptor_->config().footer_key.empty()) {
1011 auto signed_footer = encryptor_->sign_footer(
1012 footer_bytes.data(), footer_bytes.size());
1013 if (!signed_footer) {
1016 return signed_footer.error();
1018 const auto& sf = *signed_footer;
1021 write_raw(sf.data(), sf.size());
1024 uint32_t footer_len =
static_cast<uint32_t
>(sf.size());
1025 write_raw_le32(footer_len);
1032 write_raw(footer_bytes.data(), footer_bytes.size());
1035 uint32_t footer_len =
static_cast<uint32_t
>(footer_bytes.size());
1036 write_raw_le32(footer_len);
1040#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1049 return build_write_stats();
1075 : schema_(std::move(other.schema_))
1076 , options_(std::move(other.options_))
1077 , path_(std::move(other.path_))
1078 , file_(std::move(other.file_))
1079 , file_offset_(other.file_offset_)
1080 , col_writers_(std::move(other.col_writers_))
1081 , col_row_counts_(std::move(other.col_row_counts_))
1082 , pending_rows_(std::move(other.pending_rows_))
1083 , row_groups_(std::move(other.row_groups_))
1084 , total_rows_(other.total_rows_)
1085 , open_(other.open_)
1086#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1087 , encryptor_(std::move(other.encryptor_))
1089 , bloom_filters_(std::move(other.bloom_filters_))
1090 , col_index_builders_(std::move(other.col_index_builders_))
1092 other.open_ =
false;
1093 other.file_offset_ = 0;
1094 other.total_rows_ = 0;
1100 if (
this != &other) {
1106 schema_ = std::move(other.schema_);
1107 options_ = std::move(other.options_);
1108 path_ = std::move(other.path_);
1109 file_ = std::move(other.file_);
1110 file_offset_ = other.file_offset_;
1111 col_writers_ = std::move(other.col_writers_);
1112 col_row_counts_ = std::move(other.col_row_counts_);
1113 pending_rows_ = std::move(other.pending_rows_);
1114 row_groups_ = std::move(other.row_groups_);
1115 total_rows_ = other.total_rows_;
1116 open_ = other.open_;
1117#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1118 encryptor_ = std::move(other.encryptor_);
1120 bloom_filters_ = std::move(other.bloom_filters_);
1121 col_index_builders_ = std::move(other.col_index_builders_);
1123 other.open_ =
false;
1124 other.file_offset_ = 0;
1125 other.total_rows_ = 0;
1139 return total_rows_ +
static_cast<int64_t
>(pending_rows_.size());
1146 return static_cast<int64_t
>(row_groups_.size());
1151 [[nodiscard]]
bool is_open()
const {
return open_; }
1178 const std::filesystem::path& csv_input,
1179 const std::filesystem::path& parquet_output,
1183 std::ifstream csv(csv_input);
1184 if (!csv.is_open()) {
1186 "Failed to open CSV file: " + csv_input.string()};
1190 std::string header_line;
1191 if (!std::getline(csv, header_line)) {
1195 auto col_names = split_csv_line(header_line);
1196 if (col_names.empty()) {
1200 size_t num_cols = col_names.size();
1203 std::vector<std::vector<std::string>> rows;
1205 while (std::getline(csv, line)) {
1206 if (line.empty())
continue;
1207 auto fields = split_csv_line(line);
1209 fields.resize(num_cols);
1210 rows.push_back(std::move(fields));
1223 for (
size_t c = 0; c < num_cols; ++c) {
1224 bool all_int64 =
true;
1225 bool all_double =
true;
1226 bool all_bool =
true;
1228 for (
const auto& row : rows) {
1229 const std::string& val = row[c];
1230 if (val.empty())
continue;
1235 auto [ptr, ec] = std::from_chars(val.data(),
1236 val.data() + val.size(),
1238 if (ec != std::errc{} || ptr != val.data() + val.size()) {
1253 if (val !=
"true" && val !=
"false" &&
1254 val !=
"TRUE" && val !=
"FALSE" &&
1255 val !=
"True" && val !=
"False" &&
1256 val !=
"1" && val !=
"0") {
1266 }
else if (all_double) {
1269 }
else if (all_bool) {
1279 std::vector<ColumnDescriptor> col_descs;
1280 col_descs.reserve(num_cols);
1281 for (
size_t c = 0; c < num_cols; ++c) {
1282 ColumnDescriptor cd;
1283 cd.name = col_names[c];
1284 cd.physical_type = detected_types[c];
1285 cd.logical_type = detected_logical[c];
1286 col_descs.push_back(std::move(cd));
1289 Schema schema(
"csv_data", std::move(col_descs));
1293 if (!writer_result) {
1294 return writer_result.error();
1296 auto& writer = *writer_result;
1299 for (
const auto& row : rows) {
1307 auto close_result = writer.
close();
1308 if (!close_result)
return close_result.error();
1309 return expected<void>{};
1320 std::filesystem::path path_;
1321 std::ofstream file_;
1322 int64_t file_offset_ = 0;
1323 std::vector<ColumnWriter> col_writers_;
1324 std::vector<int64_t> col_row_counts_;
1325 std::vector<std::vector<std::string>> pending_rows_;
1326 std::vector<thrift::RowGroup> row_groups_;
1327 int64_t total_rows_ = 0;
1329#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1330 std::unique_ptr<crypto::FileEncryptor> encryptor_;
1332 std::vector<std::unique_ptr<SplitBlockBloomFilter>> bloom_filters_;
1333 std::vector<ColumnIndexBuilder> col_index_builders_;
1337 void init_column_writers() {
1338 col_writers_.clear();
1339 col_writers_.reserve(schema_.num_columns());
1340 col_row_counts_.resize(schema_.num_columns(), 0);
1341 for (
size_t c = 0; c < schema_.num_columns(); ++c) {
1342 col_writers_.emplace_back(schema_.column(c).physical_type,
1343 schema_.column(c).type_length);
1349 WriteStats build_write_stats()
const {
1351 stats.file_size_bytes = file_offset_;
1352 stats.total_rows = total_rows_;
1353 stats.total_row_groups =
static_cast<int64_t
>(row_groups_.size());
1356 size_t num_cols = schema_.num_columns();
1357 stats.columns.resize(num_cols);
1359 for (
size_t c = 0; c < num_cols; ++c) {
1360 auto& col_stats = stats.columns[c];
1361 col_stats.column_name = schema_.column(c).name;
1362 col_stats.physical_type = schema_.column(c).physical_type;
1365 for (
const auto& rg : row_groups_) {
1366 for (
size_t c = 0; c < rg.columns.size() && c < num_cols; ++c) {
1367 if (!rg.columns[c].meta_data.has_value())
continue;
1368 const auto& cmd = *rg.columns[c].meta_data;
1369 auto& col_stats = stats.columns[c];
1371 col_stats.uncompressed_bytes += cmd.total_uncompressed_size;
1372 col_stats.compressed_bytes += cmd.total_compressed_size;
1373 col_stats.num_values += cmd.num_values;
1377 if (!cmd.encodings.empty()) {
1378 col_stats.encoding = cmd.encodings[0];
1382 if (cmd.statistics.has_value() && cmd.statistics->null_count.has_value()) {
1383 col_stats.null_count += *cmd.statistics->null_count;
1386 stats.total_uncompressed_bytes += cmd.total_uncompressed_size;
1387 stats.total_compressed_bytes += cmd.total_compressed_size;
1392 if (stats.total_compressed_bytes > 0) {
1393 stats.compression_ratio =
static_cast<double>(stats.total_uncompressed_bytes)
1394 /
static_cast<double>(stats.total_compressed_bytes);
1396 if (stats.total_rows > 0) {
1397 stats.bytes_per_row =
static_cast<double>(stats.file_size_bytes)
1398 /
static_cast<double>(stats.total_rows);
1406 void write_raw(
const uint8_t* data,
size_t len) {
1407 file_.write(
reinterpret_cast<const char*
>(data),
static_cast<std::streamsize
>(len));
1408 if (!file_.good()) {
1409 throw std::runtime_error(
"ParquetWriter::write_raw: I/O error after writing "
1410 + std::to_string(len) +
" bytes at offset "
1411 + std::to_string(file_offset_));
1413 file_offset_ +=
static_cast<int64_t
>(len);
1416 void write_raw_le32(uint32_t val) {
1418 bytes[0] =
static_cast<uint8_t
>((val ) & 0xFF);
1419 bytes[1] =
static_cast<uint8_t
>((val >> 8) & 0xFF);
1420 bytes[2] =
static_cast<uint8_t
>((val >> 16) & 0xFF);
1421 bytes[3] =
static_cast<uint8_t
>((val >> 24) & 0xFF);
1422 write_raw(bytes, 4);
1427 [[nodiscard]] expected<void> encode_pending_rows() {
1428 for (
size_t c = 0; c < schema_.num_columns(); ++c) {
1429 const auto& col_desc = schema_.column(c);
1430 auto& cw = col_writers_[c];
1433 bool bf_active = bloom_ensure_filter(c);
1435 for (
const auto& row : pending_rows_) {
1436 const std::string& val = row[c];
1438 switch (col_desc.physical_type) {
1440 bool b = (val ==
"true" || val ==
"TRUE" || val ==
"True" ||
1444 if (bf_active) bloom_filters_[c]->insert_value(
static_cast<int32_t
>(b));
1449 auto [ptr, ec] = std::from_chars(val.data(),
1450 val.data() + val.size(),
1452 if (ec != std::errc{}) {
1456 cw.write_int32(parsed);
1457 if (bf_active) bloom_filters_[c]->insert_value(parsed);
1462 auto [ptr, ec] = std::from_chars(val.data(),
1463 val.data() + val.size(),
1465 if (ec != std::errc{}) {
1469 cw.write_int64(parsed);
1470 if (bf_active) bloom_filters_[c]->insert_value(parsed);
1476 if (bf_active) bloom_filters_[c]->insert_value(f);
1482 if (bf_active) bloom_filters_[c]->insert_value(d);
1486 cw.write_byte_array(val);
1487 if (bf_active) bloom_filters_[c]->insert_value(val);
1491 const auto& cd = schema_.column(c);
1492 if (cd.type_length > 0 &&
1493 val.size() !=
static_cast<size_t>(cd.type_length)) {
1495 "FIXED_LEN_BYTE_ARRAY column '" + cd.name +
1496 "': CSV value length " + std::to_string(val.size()) +
1497 " != type_length " + std::to_string(cd.type_length)};
1499 cw.write_fixed_len_byte_array(
1500 reinterpret_cast<const uint8_t*
>(val.data()), val.size());
1501 if (bf_active) bloom_filters_[c]->insert_value(val);
1506 cw.write_byte_array(val);
1507 if (bf_active) bloom_filters_[c]->insert_value(val);
1514 pending_rows_.clear();
1515 return expected<void>{};
1520 static void ensure_snappy_registered() {
1521 static std::once_flag flag;
1528 [[nodiscard]]
Encoding choose_encoding(
1530 const ColumnDescriptor& col_desc,
1531 const ColumnWriter& cw)
const {
1534 auto it = options_.column_encodings.find(col_desc.name);
1535 if (it != options_.column_encodings.end()) {
1540 if (options_.auto_encoding) {
1541 switch (col_desc.physical_type) {
1552 const auto& stats = cw.statistics();
1553 if (stats.distinct_count().has_value() && cw.num_values() > 0) {
1554 double ratio =
static_cast<double>(*stats.distinct_count()) /
1555 static_cast<double>(cw.num_values());
1577 return options_.default_encoding;
1585 [[nodiscard]]
static std::vector<uint8_t> encode_column_data(
1586 const ColumnWriter& cw,
1590 const auto& plain_data = cw.data();
1591 int64_t num_vals = cw.num_values();
1598 size_t count =
static_cast<size_t>(num_vals);
1599 std::vector<int32_t> values(count);
1600 for (
size_t i = 0; i < count; ++i) {
1601 std::memcpy(&values[i], plain_data.data() + i * 4, 4);
1606 size_t count =
static_cast<size_t>(num_vals);
1607 std::vector<int64_t> values(count);
1608 for (
size_t i = 0; i < count; ++i) {
1609 std::memcpy(&values[i], plain_data.data() + i * 8, 8);
1614 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1619 size_t count =
static_cast<size_t>(num_vals);
1620 const auto* float_ptr =
reinterpret_cast<const float*
>(plain_data.data());
1624 size_t count =
static_cast<size_t>(num_vals);
1625 const auto* double_ptr =
reinterpret_cast<const double*
>(plain_data.data());
1629 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1635 size_t count =
static_cast<size_t>(num_vals);
1637 std::vector<uint32_t> bool_vals(count);
1638 for (
size_t i = 0; i < count; ++i) {
1639 size_t byte_idx = i / 8;
1640 size_t bit_idx = i % 8;
1641 bool_vals[i] = (plain_data[byte_idx] >> bit_idx) & 1;
1647 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1653 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1660 struct DictColumnResult {
1661 int64_t total_uncompressed;
1662 int64_t total_compressed;
1663 std::unordered_set<Encoding> used_encodings;
1664 int64_t dict_page_offset;
1665 int64_t data_page_offset;
1669 [[nodiscard]]
static std::vector<std::string> extract_byte_array_strings(
1670 const ColumnWriter& cw) {
1671 const auto& buf = cw.data();
1672 size_t count =
static_cast<size_t>(cw.num_values());
1673 std::vector<std::string> result;
1674 result.reserve(count);
1675 const size_t buf_size = buf.size();
1677 for (
size_t i = 0; i < count; ++i) {
1679 if (pos + 4 > buf_size)
break;
1681 std::memcpy(&len, buf.data() + pos, 4);
1683 if (len > buf_size - pos - 4)
break;
1685 result.emplace_back(
1686 reinterpret_cast<const char*
>(buf.data() + pos), len);
1693 [[nodiscard]]
static std::vector<int32_t> extract_int32_values(
1694 const ColumnWriter& cw) {
1695 size_t count =
static_cast<size_t>(cw.num_values());
1696 std::vector<int32_t> result(count);
1697 for (
size_t i = 0; i < count; ++i) {
1698 std::memcpy(&result[i], cw.data().data() + i * 4, 4);
1704 [[nodiscard]]
static std::vector<int64_t> extract_int64_values(
1705 const ColumnWriter& cw) {
1706 size_t count =
static_cast<size_t>(cw.num_values());
1707 std::vector<int64_t> result(count);
1708 for (
size_t i = 0; i < count; ++i) {
1709 std::memcpy(&result[i], cw.data().data() + i * 8, 8);
1715 [[nodiscard]]
static std::vector<float> extract_float_values(
1716 const ColumnWriter& cw) {
1717 size_t count =
static_cast<size_t>(cw.num_values());
1718 std::vector<float> result(count);
1719 for (
size_t i = 0; i < count; ++i) {
1720 std::memcpy(&result[i], cw.data().data() + i * 4, 4);
1726 [[nodiscard]]
static std::vector<double> extract_double_values(
1727 const ColumnWriter& cw) {
1728 size_t count =
static_cast<size_t>(cw.num_values());
1729 std::vector<double> result(count);
1730 for (
size_t i = 0; i < count; ++i) {
1731 std::memcpy(&result[i], cw.data().data() + i * 8, 8);
1738 [[nodiscard]] expected<DictColumnResult> write_dictionary_column(
1740 const ColumnDescriptor& col_desc,
1741 const ColumnWriter& cw,
1744 DictColumnResult info;
1745 info.total_uncompressed = 0;
1746 info.total_compressed = 0;
1751 std::vector<uint8_t> dict_page_data;
1752 std::vector<uint8_t> indices_page_data;
1753 int32_t num_dict_entries = 0;
1755 switch (col_desc.physical_type) {
1757 auto strings = extract_byte_array_strings(cw);
1758 DictionaryEncoder<std::string> enc;
1759 for (
const auto& s : strings) enc.put(s);
1761 dict_page_data = enc.dictionary_page();
1762 indices_page_data = enc.indices_page();
1763 num_dict_entries =
static_cast<int32_t
>(enc.dictionary_size());
1767 auto vals = extract_int32_values(cw);
1768 DictionaryEncoder<int32_t> enc;
1769 for (
auto v : vals) enc.put(v);
1771 dict_page_data = enc.dictionary_page();
1772 indices_page_data = enc.indices_page();
1773 num_dict_entries =
static_cast<int32_t
>(enc.dictionary_size());
1777 auto vals = extract_int64_values(cw);
1778 DictionaryEncoder<int64_t> enc;
1779 for (
auto v : vals) enc.put(v);
1781 dict_page_data = enc.dictionary_page();
1782 indices_page_data = enc.indices_page();
1783 num_dict_entries =
static_cast<int32_t
>(enc.dictionary_size());
1787 auto vals = extract_float_values(cw);
1788 DictionaryEncoder<float> enc;
1789 for (
auto v : vals) enc.put(v);
1791 dict_page_data = enc.dictionary_page();
1792 indices_page_data = enc.indices_page();
1793 num_dict_entries =
static_cast<int32_t
>(enc.dictionary_size());
1797 auto vals = extract_double_values(cw);
1798 DictionaryEncoder<double> enc;
1799 for (
auto v : vals) enc.put(v);
1801 dict_page_data = enc.dictionary_page();
1802 indices_page_data = enc.indices_page();
1803 num_dict_entries =
static_cast<int32_t
>(enc.dictionary_size());
1810 "Dictionary encoding not supported for this type"};
1814 int32_t dict_uncompressed_size =
static_cast<int32_t
>(dict_page_data.size());
1815 const uint8_t* dict_write_data = dict_page_data.data();
1816 size_t dict_write_size = dict_page_data.size();
1817 std::vector<uint8_t> dict_compressed_buf;
1818 int32_t dict_compressed_size = dict_uncompressed_size;
1821 auto comp =
compress(col_codec, dict_page_data.data(), dict_page_data.size());
1822 if (!comp)
return comp.error();
1823 dict_compressed_buf = std::move(*comp);
1824 dict_compressed_size =
static_cast<int32_t
>(dict_compressed_buf.size());
1825 dict_write_data = dict_compressed_buf.data();
1826 dict_write_size = dict_compressed_buf.size();
1830#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1831 std::vector<uint8_t> dict_encrypted_buf;
1832 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
1833 auto enc_result = encryptor_->encrypt_dict_page(
1834 dict_write_data, dict_write_size, col_desc.name,
1835 static_cast<int32_t
>(row_groups_.size()));
1836 if (!enc_result)
return enc_result.error();
1837 dict_encrypted_buf = std::move(*enc_result);
1838 dict_compressed_size =
static_cast<int32_t
>(dict_encrypted_buf.size());
1839 dict_write_data = dict_encrypted_buf.data();
1840 dict_write_size = dict_encrypted_buf.size();
1844 thrift::PageHeader dict_ph;
1846 dict_ph.uncompressed_page_size = dict_uncompressed_size;
1847 dict_ph.compressed_page_size = dict_compressed_size;
1849 dict_write_data, dict_write_size));
1851 thrift::DictionaryPageHeader dph;
1852 dph.num_values = num_dict_entries;
1854 dict_ph.dictionary_page_header = dph;
1856 thrift::CompactEncoder dict_header_enc;
1857 dict_ph.serialize(dict_header_enc);
1858 const auto& dict_header_bytes = dict_header_enc.data();
1860 info.dict_page_offset = file_offset_;
1862 write_raw(dict_header_bytes.data(), dict_header_bytes.size());
1863 write_raw(dict_write_data, dict_write_size);
1865 info.total_uncompressed +=
static_cast<int64_t
>(dict_header_bytes.size()) +
1866 static_cast<int64_t
>(dict_uncompressed_size);
1867 info.total_compressed +=
static_cast<int64_t
>(dict_header_bytes.size()) +
1868 static_cast<int64_t
>(dict_compressed_size);
1871 int32_t idx_uncompressed_size =
static_cast<int32_t
>(indices_page_data.size());
1872 const uint8_t* idx_write_data = indices_page_data.data();
1873 size_t idx_write_size = indices_page_data.size();
1874 std::vector<uint8_t> idx_compressed_buf;
1875 int32_t idx_compressed_size = idx_uncompressed_size;
1878 auto comp =
compress(col_codec, indices_page_data.data(), indices_page_data.size());
1879 if (!comp)
return comp.error();
1880 idx_compressed_buf = std::move(*comp);
1881 idx_compressed_size =
static_cast<int32_t
>(idx_compressed_buf.size());
1882 idx_write_data = idx_compressed_buf.data();
1883 idx_write_size = idx_compressed_buf.size();
1887#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1888 std::vector<uint8_t> idx_encrypted_buf;
1889 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
1890 auto enc_result = encryptor_->encrypt_column_page(
1891 idx_write_data, idx_write_size, col_desc.name,
1892 static_cast<int32_t
>(row_groups_.size()), 0);
1893 if (!enc_result)
return enc_result.error();
1894 idx_encrypted_buf = std::move(*enc_result);
1895 idx_compressed_size =
static_cast<int32_t
>(idx_encrypted_buf.size());
1896 idx_write_data = idx_encrypted_buf.data();
1897 idx_write_size = idx_encrypted_buf.size();
1901 thrift::PageHeader idx_ph;
1903 idx_ph.uncompressed_page_size = idx_uncompressed_size;
1904 idx_ph.compressed_page_size = idx_compressed_size;
1906 idx_write_data, idx_write_size));
1908 thrift::DataPageHeader idx_dph;
1909 idx_dph.num_values =
static_cast<int32_t
>(cw.num_values());
1913 idx_ph.data_page_header = idx_dph;
1915 thrift::CompactEncoder idx_header_enc;
1916 idx_ph.serialize(idx_header_enc);
1917 const auto& raw_idx_header_bytes = idx_header_enc.data();
1919 const uint8_t* idx_header_write_data = raw_idx_header_bytes.data();
1920 size_t idx_header_write_size = raw_idx_header_bytes.size();
1921#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1922 std::vector<uint8_t> idx_header_record_buf;
1923 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
1924 auto enc_header = encryptor_->encrypt_data_page_header(
1925 raw_idx_header_bytes.data(), raw_idx_header_bytes.size(),
1926 col_desc.name,
static_cast<int32_t
>(row_groups_.size()), 0);
1927 if (!enc_header)
return enc_header.error();
1929 idx_header_write_data = idx_header_record_buf.data();
1930 idx_header_write_size = idx_header_record_buf.size();
1934 info.data_page_offset = file_offset_;
1936 write_raw(idx_header_write_data, idx_header_write_size);
1937 write_raw(idx_write_data, idx_write_size);
1939 info.total_uncompressed +=
static_cast<int64_t
>(idx_header_write_size) +
1940 static_cast<int64_t
>(idx_uncompressed_size);
1941 info.total_compressed +=
static_cast<int64_t
>(idx_header_write_size) +
1942 static_cast<int64_t
>(idx_compressed_size);
1952 bool bloom_ensure_filter(
size_t col_index) {
1953 if (bloom_filters_.empty())
return false;
1954 if (bloom_filters_[col_index])
return true;
1957 const auto& col_name = schema_.column(col_index).name;
1958 if (!options_.bloom_filter_columns.empty() &&
1959 options_.bloom_filter_columns.find(col_name) ==
1960 options_.bloom_filter_columns.end()) {
1965 size_t ndv_estimate =
static_cast<size_t>(
1966 (std::max)(int64_t{1}, options_.row_group_size));
1967 bloom_filters_[col_index] = std::make_unique<SplitBlockBloomFilter>(
1968 ndv_estimate, options_.bloom_filter_fpr);
1973 template <
typename T>
1974 void bloom_insert_typed(
size_t col_index,
const T* values,
size_t count) {
1975 if (!bloom_ensure_filter(col_index))
return;
1976 auto& bf = bloom_filters_[col_index];
1977 for (
size_t i = 0; i < count; ++i) {
1978 bf->insert_value(values[i]);
1986 [[nodiscard]]
static std::vector<std::string> split_csv_line(
const std::string& line) {
1987 std::vector<std::string> fields;
1989 bool in_quotes =
false;
1992 while (i < line.size()) {
1998 if (i + 1 < line.size() && line[i + 1] ==
'"') {
2014 }
else if (ch ==
',') {
2017 fields.push_back(std::move(field));
2020 }
else if (ch ==
'\r') {
2032 fields.push_back(std::move(field));
2038 static void trim_string(std::string& s) {
2039 size_t start = s.find_first_not_of(
" \t\r\n");
2040 if (start == std::string::npos) {
2044 size_t end = s.find_last_not_of(
" \t\r\n");
2045 s = s.substr(start, end - start + 1);