304 const std::filesystem::path& path,
309 writer.schema_ = schema;
310 writer.options_ = options;
314 if (path.has_parent_path()) {
316 std::filesystem::create_directories(path.parent_path(), ec);
320 writer.file_.open(path, std::ios::binary | std::ios::trunc);
321 if (!writer.file_.is_open()) {
323 "Failed to open file for writing: " + path.string()};
328 writer.file_offset_ = 4;
331 writer.init_column_writers();
334#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
335 if (options.encryption) {
336 writer.encryptor_ = std::make_unique<crypto::FileEncryptor>(*options.encryption);
341 if (options.enable_bloom_filter) {
342 writer.bloom_filters_.resize(schema.
num_columns());
346 if (options.enable_page_index) {
347 writer.col_index_builders_.resize(schema.
num_columns());
374 "Row has " + std::to_string(values.size()) +
375 " values, schema has " + std::to_string(schema_.
num_columns()) +
379 pending_rows_.push_back(values);
382 if (
static_cast<int64_t
>(pending_rows_.size()) >= options_.
row_group_size) {
418 template <
typename T>
420 const T* values,
size_t count) {
426 "Column index " + std::to_string(col_index) +
427 " out of range (schema has " +
428 std::to_string(schema_.
num_columns()) +
" columns)"};
432 constexpr PhysicalType expected_pt = parquet_type_of_v<T>;
434 if (expected_pt != actual_pt) {
436 "Type mismatch for column " + std::to_string(col_index) +
437 " (\"" + schema_.
column(col_index).
name +
"\")" +
438 ": schema physical type is " +
439 std::to_string(
static_cast<int>(actual_pt)) +
440 " but write_column<T> maps to " +
441 std::to_string(
static_cast<int>(expected_pt))};
444 col_writers_[col_index].write_batch(values, count);
445 col_row_counts_[col_index] = col_writers_[col_index].num_values();
448 if (!bloom_filters_.empty()) {
449 bloom_insert_typed(col_index, values, count);
468 const std::string* values,
475 "Column index " + std::to_string(col_index) +
484 "Type mismatch for column " + std::to_string(col_index) +
485 " (\"" + schema_.
column(col_index).
name +
"\")" +
486 ": schema physical type is " +
487 std::to_string(
static_cast<int>(actual_pt)) +
488 " but write_column was called with std::string (BYTE_ARRAY)"};
491 col_writers_[col_index].write_batch(values, count);
492 col_row_counts_[col_index] = col_writers_[col_index].num_values();
495 if (!bloom_filters_.empty()) {
496 bloom_insert_typed(col_index, values, count);
526 ensure_snappy_registered();
529 if (!pending_rows_.empty()) {
530 auto result = encode_pending_rows();
531 if (!result)
return result;
535 bool has_data =
false;
536 for (
size_t c = 0; c < col_writers_.size(); ++c) {
537 if (col_writers_[c].num_values() > 0) {
547 int64_t rg_num_rows = col_writers_[0].num_values();
548 for (
size_t c = 1; c < col_writers_.size(); ++c) {
549 if (col_writers_[c].num_values() != rg_num_rows) {
551 "Column " + std::to_string(c) +
" has " +
552 std::to_string(col_writers_[c].num_values()) +
553 " values, expected " + std::to_string(rg_num_rows)};
561 rg.
columns.resize(col_writers_.size());
564 for (
size_t c = 0; c < col_writers_.size(); ++c) {
565 const auto& cw = col_writers_[c];
566 const auto& col_desc = schema_.
column(c);
569 Encoding col_encoding = choose_encoding(c, col_desc, cw);
576 cw.data().data(), cw.data().size());
580 int64_t total_uncompressed = 0;
581 int64_t total_compressed = 0;
582 std::unordered_set<Encoding> used_encodings;
585 int64_t column_offset = file_offset_;
586 int64_t dict_page_offset = -1;
587 int64_t data_page_offset = -1;
595 auto dict_result = write_dictionary_column(
596 c, col_desc, cw, col_codec);
597 if (!dict_result)
return dict_result.error();
599 const auto& dict_info = *dict_result;
600 total_uncompressed = dict_info.total_uncompressed;
601 total_compressed = dict_info.total_compressed;
602 used_encodings = dict_info.used_encodings;
603 dict_page_offset = dict_info.dict_page_offset;
604 data_page_offset = dict_info.data_page_offset;
607 if (!col_index_builders_.empty()) {
608 auto& builder = col_index_builders_[c];
609 builder.start_page();
610 builder.set_first_row_index(0);
611 builder.set_page_location(dict_info.data_page_offset,
612 static_cast<int32_t
>(dict_info.total_compressed));
614 const auto& cw_stats_pi = cw.statistics();
615 if (cw_stats_pi.has_min_max()) {
616 const auto& min_b = cw_stats_pi.min_bytes();
617 const auto& max_b = cw_stats_pi.max_bytes();
618 builder.set_min(std::string(min_b.begin(), min_b.end()));
619 builder.set_max(std::string(max_b.begin(), max_b.end()));
621 builder.set_null_page(cw_stats_pi.null_count() == cw.num_values());
622 builder.set_null_count(cw_stats_pi.null_count());
628 auto encoded = encode_column_data(cw, col_encoding, col_desc.physical_type);
630 if (encoded.size() >
static_cast<size_t>(INT32_MAX)) {
632 "encoded page size exceeds int32 limit (2 GiB)"};
634 int32_t uncompressed_size =
static_cast<int32_t
>(encoded.size());
637 const uint8_t* page_data = encoded.data();
638 size_t page_data_size = encoded.size();
639 std::vector<uint8_t> compressed_buf;
640 int32_t compressed_size = uncompressed_size;
643 auto comp_result =
compress(col_codec, page_data, page_data_size);
644 if (!comp_result)
return comp_result.error();
645 compressed_buf = std::move(*comp_result);
646 if (compressed_buf.size() >
static_cast<size_t>(INT32_MAX))
648 compressed_size =
static_cast<int32_t
>(compressed_buf.size());
649 page_data = compressed_buf.data();
650 page_data_size = compressed_buf.size();
654#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
655 std::vector<uint8_t> encrypted_buf;
656 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
657 auto enc_result = encryptor_->encrypt_column_page(
658 page_data, page_data_size, col_desc.name,
659 static_cast<int32_t
>(row_groups_.size()), 0);
660 if (!enc_result)
return enc_result.error();
661 encrypted_buf = std::move(*enc_result);
662 compressed_size =
static_cast<int32_t
>(encrypted_buf.size());
663 page_data = encrypted_buf.data();
664 page_data_size = encrypted_buf.size();
677 page_data, page_data_size));
680 dph.
num_values =
static_cast<int32_t
>(cw.num_values());
689 const auto& raw_header_bytes = header_enc.
data();
691 const uint8_t* header_write_data = raw_header_bytes.data();
692 size_t header_write_size = raw_header_bytes.size();
693#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
694 std::vector<uint8_t> header_record_buf;
695 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
696 auto enc_header = encryptor_->encrypt_data_page_header(
697 raw_header_bytes.data(), raw_header_bytes.size(),
698 col_desc.name,
static_cast<int32_t
>(row_groups_.size()), 0);
699 if (!enc_header)
return enc_header.error();
701 header_write_data = header_record_buf.data();
702 header_write_size = header_record_buf.size();
706 data_page_offset = file_offset_;
708 write_raw(header_write_data, header_write_size);
709 write_raw(page_data, page_data_size);
711 total_uncompressed =
static_cast<int64_t
>(header_write_size) +
712 static_cast<int64_t
>(uncompressed_size);
713 total_compressed =
static_cast<int64_t
>(header_write_size) +
714 static_cast<int64_t
>(compressed_size);
716 used_encodings.insert(col_encoding);
719 if (!col_index_builders_.empty()) {
720 auto& builder = col_index_builders_[c];
721 builder.start_page();
722 builder.set_first_row_index(0);
723 builder.set_page_location(data_page_offset,
724 compressed_size +
static_cast<int32_t
>(header_write_size));
726 const auto& cw_stats_pi = cw.statistics();
727 if (cw_stats_pi.has_min_max()) {
728 const auto& min_b = cw_stats_pi.min_bytes();
729 const auto& max_b = cw_stats_pi.max_bytes();
730 builder.set_min(std::string(min_b.begin(), min_b.end()));
731 builder.set_max(std::string(max_b.begin(), max_b.end()));
733 builder.set_null_page(cw_stats_pi.null_count() == cw.num_values());
734 builder.set_null_count(cw_stats_pi.null_count());
743 cmd.
type = col_desc.physical_type;
745 cmd.
codec = col_codec;
752 cmd.
encodings.assign(used_encodings.begin(), used_encodings.end());
755 if (dict_page_offset >= 0) {
760 const auto& cw_stats = cw.statistics();
761 if (cw_stats.has_min_max()) {
766 const auto& min_b = cw_stats.min_bytes();
767 const auto& max_b = cw_stats.max_bytes();
768 stats.
min_value = std::string(min_b.begin(), min_b.end());
769 stats.
max_value = std::string(max_b.begin(), max_b.end());
774 if (cw_stats.distinct_count().has_value()) {
784 if (!bloom_filters_.empty() && bloom_filters_[c]) {
785 int64_t bf_offset = file_offset_;
786 const auto& bf_data = bloom_filters_[c]->data();
787 uint32_t bf_size =
static_cast<uint32_t
>(bf_data.size());
790 write_raw_le32(bf_size);
791 write_raw(bf_data.data(), bf_data.size());
800 if (!col_index_builders_.empty()) {
801 auto& builder = col_index_builders_[c];
806 col_idx.serialize(ci_enc);
807 int64_t ci_offset = file_offset_;
808 write_raw(ci_enc.
data().data(), ci_enc.
data().size());
813 auto off_idx = builder.build_offset_index();
815 off_idx.serialize(oi_enc);
816 int64_t oi_offset = file_offset_;
817 write_raw(oi_enc.
data().data(), oi_enc.
data().size());
828 row_groups_.push_back(std::move(rg));
829 total_rows_ += rg_num_rows;
832 for (
auto& cw : col_writers_) {
835 for (
auto& count : col_row_counts_) {
840 for (
auto& bf : bloom_filters_) {
845 for (
auto& builder : col_index_builders_) {
886 return flush_result.error();
902 fmd.
schema.push_back(root);
905 for (
size_t c = 0; c < schema_.
num_columns(); ++c) {
906 const auto& col_desc = schema_.
column(c);
909 elem.
type = col_desc.physical_type;
910 elem.
name = col_desc.name;
915 col_desc.type_length > 0) {
934 if (col_desc.precision > 0) elem.
precision = col_desc.precision;
935 if (col_desc.scale >= 0) elem.
scale = col_desc.scale;
938 fmd.
schema.push_back(elem);
948#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
950 auto file_props = encryptor_->file_properties();
951 auto props_bytes = file_props.serialize();
952 std::string props_str(props_bytes.begin(), props_bytes.end());
955 enc_kv.
key =
"signet.encryption.properties";
956 enc_kv.
value = std::move(props_str);
968 const auto& footer_bytes = enc.
data();
971 int64_t footer_start = file_offset_;
976#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
977 if (encryptor_ && encryptor_->config().encrypt_footer) {
978 auto enc_footer = encryptor_->encrypt_footer(
979 footer_bytes.data(), footer_bytes.size());
983 return enc_footer.error();
985 const auto& encrypted_footer = *enc_footer;
988 write_raw(encrypted_footer.data(), encrypted_footer.size());
991 uint32_t footer_len =
static_cast<uint32_t
>(encrypted_footer.size());
992 write_raw_le32(footer_len);
999 write_raw(footer_bytes.data(), footer_bytes.size());
1002 uint32_t footer_len =
static_cast<uint32_t
>(footer_bytes.size());
1003 write_raw_le32(footer_len);
1007#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1016 return build_write_stats();
1042 : schema_(std::move(other.schema_))
1043 , options_(std::move(other.options_))
1044 , path_(std::move(other.path_))
1045 , file_(std::move(other.file_))
1046 , file_offset_(other.file_offset_)
1047 , col_writers_(std::move(other.col_writers_))
1048 , col_row_counts_(std::move(other.col_row_counts_))
1049 , pending_rows_(std::move(other.pending_rows_))
1050 , row_groups_(std::move(other.row_groups_))
1051 , total_rows_(other.total_rows_)
1052 , open_(other.open_)
1053#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1054 , encryptor_(std::move(other.encryptor_))
1056 , bloom_filters_(std::move(other.bloom_filters_))
1057 , col_index_builders_(std::move(other.col_index_builders_))
1059 other.open_ =
false;
1060 other.file_offset_ = 0;
1061 other.total_rows_ = 0;
1067 if (
this != &other) {
1073 schema_ = std::move(other.schema_);
1074 options_ = std::move(other.options_);
1075 path_ = std::move(other.path_);
1076 file_ = std::move(other.file_);
1077 file_offset_ = other.file_offset_;
1078 col_writers_ = std::move(other.col_writers_);
1079 col_row_counts_ = std::move(other.col_row_counts_);
1080 pending_rows_ = std::move(other.pending_rows_);
1081 row_groups_ = std::move(other.row_groups_);
1082 total_rows_ = other.total_rows_;
1083 open_ = other.open_;
1084#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1085 encryptor_ = std::move(other.encryptor_);
1087 bloom_filters_ = std::move(other.bloom_filters_);
1088 col_index_builders_ = std::move(other.col_index_builders_);
1090 other.open_ =
false;
1091 other.file_offset_ = 0;
1092 other.total_rows_ = 0;
1106 return total_rows_ +
static_cast<int64_t
>(pending_rows_.size());
1113 return static_cast<int64_t
>(row_groups_.size());
1118 [[nodiscard]]
bool is_open()
const {
return open_; }
1145 const std::filesystem::path& csv_input,
1146 const std::filesystem::path& parquet_output,
1150 std::ifstream csv(csv_input);
1151 if (!csv.is_open()) {
1153 "Failed to open CSV file: " + csv_input.string()};
1157 std::string header_line;
1158 if (!std::getline(csv, header_line)) {
1162 auto col_names = split_csv_line(header_line);
1163 if (col_names.empty()) {
1167 size_t num_cols = col_names.size();
1170 std::vector<std::vector<std::string>> rows;
1172 while (std::getline(csv, line)) {
1173 if (line.empty())
continue;
1174 auto fields = split_csv_line(line);
1176 fields.resize(num_cols);
1177 rows.push_back(std::move(fields));
1190 for (
size_t c = 0; c < num_cols; ++c) {
1191 bool all_int64 =
true;
1192 bool all_double =
true;
1193 bool all_bool =
true;
1195 for (
const auto& row : rows) {
1196 const std::string& val = row[c];
1197 if (val.empty())
continue;
1202 auto [ptr, ec] = std::from_chars(val.data(),
1203 val.data() + val.size(),
1205 if (ec != std::errc{} || ptr != val.data() + val.size()) {
1220 if (val !=
"true" && val !=
"false" &&
1221 val !=
"TRUE" && val !=
"FALSE" &&
1222 val !=
"True" && val !=
"False" &&
1223 val !=
"1" && val !=
"0") {
1233 }
else if (all_double) {
1236 }
else if (all_bool) {
1246 std::vector<ColumnDescriptor> col_descs;
1247 col_descs.reserve(num_cols);
1248 for (
size_t c = 0; c < num_cols; ++c) {
1249 ColumnDescriptor cd;
1250 cd.name = col_names[c];
1251 cd.physical_type = detected_types[c];
1252 cd.logical_type = detected_logical[c];
1253 col_descs.push_back(std::move(cd));
1256 Schema schema(
"csv_data", std::move(col_descs));
1260 if (!writer_result) {
1261 return writer_result.error();
1263 auto& writer = *writer_result;
1266 for (
const auto& row : rows) {
1274 auto close_result = writer.
close();
1275 if (!close_result)
return close_result.error();
1276 return expected<void>{};
1287 std::filesystem::path path_;
1288 std::ofstream file_;
1289 int64_t file_offset_ = 0;
1290 std::vector<ColumnWriter> col_writers_;
1291 std::vector<int64_t> col_row_counts_;
1292 std::vector<std::vector<std::string>> pending_rows_;
1293 std::vector<thrift::RowGroup> row_groups_;
1294 int64_t total_rows_ = 0;
1296#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1297 std::unique_ptr<crypto::FileEncryptor> encryptor_;
1299 std::vector<std::unique_ptr<SplitBlockBloomFilter>> bloom_filters_;
1300 std::vector<ColumnIndexBuilder> col_index_builders_;
1304 void init_column_writers() {
1305 col_writers_.clear();
1306 col_writers_.reserve(schema_.num_columns());
1307 col_row_counts_.resize(schema_.num_columns(), 0);
1308 for (
size_t c = 0; c < schema_.num_columns(); ++c) {
1309 col_writers_.emplace_back(schema_.column(c).physical_type);
1315 WriteStats build_write_stats()
const {
1317 stats.file_size_bytes = file_offset_;
1318 stats.total_rows = total_rows_;
1319 stats.total_row_groups =
static_cast<int64_t
>(row_groups_.size());
1322 size_t num_cols = schema_.num_columns();
1323 stats.columns.resize(num_cols);
1325 for (
size_t c = 0; c < num_cols; ++c) {
1326 auto& col_stats = stats.columns[c];
1327 col_stats.column_name = schema_.column(c).name;
1328 col_stats.physical_type = schema_.column(c).physical_type;
1331 for (
const auto& rg : row_groups_) {
1332 for (
size_t c = 0; c < rg.columns.size() && c < num_cols; ++c) {
1333 if (!rg.columns[c].meta_data.has_value())
continue;
1334 const auto& cmd = *rg.columns[c].meta_data;
1335 auto& col_stats = stats.columns[c];
1337 col_stats.uncompressed_bytes += cmd.total_uncompressed_size;
1338 col_stats.compressed_bytes += cmd.total_compressed_size;
1339 col_stats.num_values += cmd.num_values;
1343 if (!cmd.encodings.empty()) {
1344 col_stats.encoding = cmd.encodings[0];
1348 if (cmd.statistics.has_value() && cmd.statistics->null_count.has_value()) {
1349 col_stats.null_count += *cmd.statistics->null_count;
1352 stats.total_uncompressed_bytes += cmd.total_uncompressed_size;
1353 stats.total_compressed_bytes += cmd.total_compressed_size;
1358 if (stats.total_compressed_bytes > 0) {
1359 stats.compression_ratio =
static_cast<double>(stats.total_uncompressed_bytes)
1360 /
static_cast<double>(stats.total_compressed_bytes);
1362 if (stats.total_rows > 0) {
1363 stats.bytes_per_row =
static_cast<double>(stats.file_size_bytes)
1364 /
static_cast<double>(stats.total_rows);
1372 void write_raw(
const uint8_t* data,
size_t len) {
1373 file_.write(
reinterpret_cast<const char*
>(data),
static_cast<std::streamsize
>(len));
1374 if (!file_.good()) {
1375 throw std::runtime_error(
"ParquetWriter::write_raw: I/O error after writing "
1376 + std::to_string(len) +
" bytes at offset "
1377 + std::to_string(file_offset_));
1379 file_offset_ +=
static_cast<int64_t
>(len);
1382 void write_raw_le32(uint32_t val) {
1384 bytes[0] =
static_cast<uint8_t
>((val ) & 0xFF);
1385 bytes[1] =
static_cast<uint8_t
>((val >> 8) & 0xFF);
1386 bytes[2] =
static_cast<uint8_t
>((val >> 16) & 0xFF);
1387 bytes[3] =
static_cast<uint8_t
>((val >> 24) & 0xFF);
1388 write_raw(bytes, 4);
1393 [[nodiscard]] expected<void> encode_pending_rows() {
1394 for (
size_t c = 0; c < schema_.num_columns(); ++c) {
1395 const auto& col_desc = schema_.column(c);
1396 auto& cw = col_writers_[c];
1399 bool bf_active = bloom_ensure_filter(c);
1401 for (
const auto& row : pending_rows_) {
1402 const std::string& val = row[c];
1404 switch (col_desc.physical_type) {
1406 bool b = (val ==
"true" || val ==
"TRUE" || val ==
"True" ||
1410 if (bf_active) bloom_filters_[c]->insert_value(
static_cast<int32_t
>(b));
1415 auto [ptr, ec] = std::from_chars(val.data(),
1416 val.data() + val.size(),
1418 if (ec != std::errc{}) {
1422 cw.write_int32(parsed);
1423 if (bf_active) bloom_filters_[c]->insert_value(parsed);
1428 auto [ptr, ec] = std::from_chars(val.data(),
1429 val.data() + val.size(),
1431 if (ec != std::errc{}) {
1435 cw.write_int64(parsed);
1436 if (bf_active) bloom_filters_[c]->insert_value(parsed);
1442 if (bf_active) bloom_filters_[c]->insert_value(f);
1448 if (bf_active) bloom_filters_[c]->insert_value(d);
1452 cw.write_byte_array(val);
1453 if (bf_active) bloom_filters_[c]->insert_value(val);
1457 cw.write_fixed_len_byte_array(
1458 reinterpret_cast<const uint8_t*
>(val.data()), val.size());
1459 if (bf_active) bloom_filters_[c]->insert_value(val);
1464 cw.write_byte_array(val);
1465 if (bf_active) bloom_filters_[c]->insert_value(val);
1472 pending_rows_.clear();
1473 return expected<void>{};
1478 static void ensure_snappy_registered() {
1479 static std::once_flag flag;
1486 [[nodiscard]]
Encoding choose_encoding(
1488 const ColumnDescriptor& col_desc,
1489 const ColumnWriter& cw)
const {
1492 auto it = options_.column_encodings.find(col_desc.name);
1493 if (it != options_.column_encodings.end()) {
1498 if (options_.auto_encoding) {
1499 switch (col_desc.physical_type) {
1510 const auto& stats = cw.statistics();
1511 if (stats.distinct_count().has_value() && cw.num_values() > 0) {
1512 double ratio =
static_cast<double>(*stats.distinct_count()) /
1513 static_cast<double>(cw.num_values());
1535 return options_.default_encoding;
1543 [[nodiscard]]
static std::vector<uint8_t> encode_column_data(
1544 const ColumnWriter& cw,
1548 const auto& plain_data = cw.data();
1549 int64_t num_vals = cw.num_values();
1556 size_t count =
static_cast<size_t>(num_vals);
1557 std::vector<int32_t> values(count);
1558 for (
size_t i = 0; i < count; ++i) {
1559 std::memcpy(&values[i], plain_data.data() + i * 4, 4);
1564 size_t count =
static_cast<size_t>(num_vals);
1565 std::vector<int64_t> values(count);
1566 for (
size_t i = 0; i < count; ++i) {
1567 std::memcpy(&values[i], plain_data.data() + i * 8, 8);
1572 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1577 size_t count =
static_cast<size_t>(num_vals);
1578 const auto* float_ptr =
reinterpret_cast<const float*
>(plain_data.data());
1582 size_t count =
static_cast<size_t>(num_vals);
1583 const auto* double_ptr =
reinterpret_cast<const double*
>(plain_data.data());
1587 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1593 size_t count =
static_cast<size_t>(num_vals);
1595 std::vector<uint32_t> bool_vals(count);
1596 for (
size_t i = 0; i < count; ++i) {
1597 size_t byte_idx = i / 8;
1598 size_t bit_idx = i % 8;
1599 bool_vals[i] = (plain_data[byte_idx] >> bit_idx) & 1;
1605 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1611 return std::vector<uint8_t>(plain_data.begin(), plain_data.end());
1618 struct DictColumnResult {
1619 int64_t total_uncompressed;
1620 int64_t total_compressed;
1621 std::unordered_set<Encoding> used_encodings;
1622 int64_t dict_page_offset;
1623 int64_t data_page_offset;
1627 [[nodiscard]]
static std::vector<std::string> extract_byte_array_strings(
1628 const ColumnWriter& cw) {
1629 const auto& buf = cw.data();
1630 size_t count =
static_cast<size_t>(cw.num_values());
1631 std::vector<std::string> result;
1632 result.reserve(count);
1633 const size_t buf_size = buf.size();
1635 for (
size_t i = 0; i < count; ++i) {
1637 if (pos + 4 > buf_size)
break;
1639 std::memcpy(&len, buf.data() + pos, 4);
1641 if (len > buf_size - pos - 4)
break;
1643 result.emplace_back(
1644 reinterpret_cast<const char*
>(buf.data() + pos), len);
1651 [[nodiscard]]
static std::vector<int32_t> extract_int32_values(
1652 const ColumnWriter& cw) {
1653 size_t count =
static_cast<size_t>(cw.num_values());
1654 std::vector<int32_t> result(count);
1655 for (
size_t i = 0; i < count; ++i) {
1656 std::memcpy(&result[i], cw.data().data() + i * 4, 4);
1662 [[nodiscard]]
static std::vector<int64_t> extract_int64_values(
1663 const ColumnWriter& cw) {
1664 size_t count =
static_cast<size_t>(cw.num_values());
1665 std::vector<int64_t> result(count);
1666 for (
size_t i = 0; i < count; ++i) {
1667 std::memcpy(&result[i], cw.data().data() + i * 8, 8);
1673 [[nodiscard]]
static std::vector<float> extract_float_values(
1674 const ColumnWriter& cw) {
1675 size_t count =
static_cast<size_t>(cw.num_values());
1676 std::vector<float> result(count);
1677 for (
size_t i = 0; i < count; ++i) {
1678 std::memcpy(&result[i], cw.data().data() + i * 4, 4);
1684 [[nodiscard]]
static std::vector<double> extract_double_values(
1685 const ColumnWriter& cw) {
1686 size_t count =
static_cast<size_t>(cw.num_values());
1687 std::vector<double> result(count);
1688 for (
size_t i = 0; i < count; ++i) {
1689 std::memcpy(&result[i], cw.data().data() + i * 8, 8);
1696 [[nodiscard]] expected<DictColumnResult> write_dictionary_column(
1698 const ColumnDescriptor& col_desc,
1699 const ColumnWriter& cw,
1702 DictColumnResult info;
1703 info.total_uncompressed = 0;
1704 info.total_compressed = 0;
1709 std::vector<uint8_t> dict_page_data;
1710 std::vector<uint8_t> indices_page_data;
1711 int32_t num_dict_entries = 0;
1713 switch (col_desc.physical_type) {
1715 auto strings = extract_byte_array_strings(cw);
1716 DictionaryEncoder<std::string> enc;
1717 for (
const auto& s : strings) enc.put(s);
1719 dict_page_data = enc.dictionary_page();
1720 indices_page_data = enc.indices_page();
1721 num_dict_entries =
static_cast<int32_t
>(enc.dictionary_size());
1725 auto vals = extract_int32_values(cw);
1726 DictionaryEncoder<int32_t> enc;
1727 for (
auto v : vals) enc.put(v);
1729 dict_page_data = enc.dictionary_page();
1730 indices_page_data = enc.indices_page();
1731 num_dict_entries =
static_cast<int32_t
>(enc.dictionary_size());
1735 auto vals = extract_int64_values(cw);
1736 DictionaryEncoder<int64_t> enc;
1737 for (
auto v : vals) enc.put(v);
1739 dict_page_data = enc.dictionary_page();
1740 indices_page_data = enc.indices_page();
1741 num_dict_entries =
static_cast<int32_t
>(enc.dictionary_size());
1745 auto vals = extract_float_values(cw);
1746 DictionaryEncoder<float> enc;
1747 for (
auto v : vals) enc.put(v);
1749 dict_page_data = enc.dictionary_page();
1750 indices_page_data = enc.indices_page();
1751 num_dict_entries =
static_cast<int32_t
>(enc.dictionary_size());
1755 auto vals = extract_double_values(cw);
1756 DictionaryEncoder<double> enc;
1757 for (
auto v : vals) enc.put(v);
1759 dict_page_data = enc.dictionary_page();
1760 indices_page_data = enc.indices_page();
1761 num_dict_entries =
static_cast<int32_t
>(enc.dictionary_size());
1768 "Dictionary encoding not supported for this type"};
1772 int32_t dict_uncompressed_size =
static_cast<int32_t
>(dict_page_data.size());
1773 const uint8_t* dict_write_data = dict_page_data.data();
1774 size_t dict_write_size = dict_page_data.size();
1775 std::vector<uint8_t> dict_compressed_buf;
1776 int32_t dict_compressed_size = dict_uncompressed_size;
1779 auto comp =
compress(col_codec, dict_page_data.data(), dict_page_data.size());
1780 if (!comp)
return comp.error();
1781 dict_compressed_buf = std::move(*comp);
1782 dict_compressed_size =
static_cast<int32_t
>(dict_compressed_buf.size());
1783 dict_write_data = dict_compressed_buf.data();
1784 dict_write_size = dict_compressed_buf.size();
1788#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1789 std::vector<uint8_t> dict_encrypted_buf;
1790 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
1791 auto enc_result = encryptor_->encrypt_dict_page(
1792 dict_write_data, dict_write_size, col_desc.name,
1793 static_cast<int32_t
>(row_groups_.size()));
1794 if (!enc_result)
return enc_result.error();
1795 dict_encrypted_buf = std::move(*enc_result);
1796 dict_compressed_size =
static_cast<int32_t
>(dict_encrypted_buf.size());
1797 dict_write_data = dict_encrypted_buf.data();
1798 dict_write_size = dict_encrypted_buf.size();
1802 thrift::PageHeader dict_ph;
1804 dict_ph.uncompressed_page_size = dict_uncompressed_size;
1805 dict_ph.compressed_page_size = dict_compressed_size;
1807 dict_write_data, dict_write_size));
1809 thrift::DictionaryPageHeader dph;
1810 dph.num_values = num_dict_entries;
1812 dict_ph.dictionary_page_header = dph;
1814 thrift::CompactEncoder dict_header_enc;
1815 dict_ph.serialize(dict_header_enc);
1816 const auto& dict_header_bytes = dict_header_enc.data();
1818 info.dict_page_offset = file_offset_;
1820 write_raw(dict_header_bytes.data(), dict_header_bytes.size());
1821 write_raw(dict_write_data, dict_write_size);
1823 info.total_uncompressed +=
static_cast<int64_t
>(dict_header_bytes.size()) +
1824 static_cast<int64_t
>(dict_uncompressed_size);
1825 info.total_compressed +=
static_cast<int64_t
>(dict_header_bytes.size()) +
1826 static_cast<int64_t
>(dict_compressed_size);
1829 int32_t idx_uncompressed_size =
static_cast<int32_t
>(indices_page_data.size());
1830 const uint8_t* idx_write_data = indices_page_data.data();
1831 size_t idx_write_size = indices_page_data.size();
1832 std::vector<uint8_t> idx_compressed_buf;
1833 int32_t idx_compressed_size = idx_uncompressed_size;
1836 auto comp =
compress(col_codec, indices_page_data.data(), indices_page_data.size());
1837 if (!comp)
return comp.error();
1838 idx_compressed_buf = std::move(*comp);
1839 idx_compressed_size =
static_cast<int32_t
>(idx_compressed_buf.size());
1840 idx_write_data = idx_compressed_buf.data();
1841 idx_write_size = idx_compressed_buf.size();
1845#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1846 std::vector<uint8_t> idx_encrypted_buf;
1847 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
1848 auto enc_result = encryptor_->encrypt_column_page(
1849 idx_write_data, idx_write_size, col_desc.name,
1850 static_cast<int32_t
>(row_groups_.size()), 0);
1851 if (!enc_result)
return enc_result.error();
1852 idx_encrypted_buf = std::move(*enc_result);
1853 idx_compressed_size =
static_cast<int32_t
>(idx_encrypted_buf.size());
1854 idx_write_data = idx_encrypted_buf.data();
1855 idx_write_size = idx_encrypted_buf.size();
1859 thrift::PageHeader idx_ph;
1861 idx_ph.uncompressed_page_size = idx_uncompressed_size;
1862 idx_ph.compressed_page_size = idx_compressed_size;
1864 idx_write_data, idx_write_size));
1866 thrift::DataPageHeader idx_dph;
1867 idx_dph.num_values =
static_cast<int32_t
>(cw.num_values());
1871 idx_ph.data_page_header = idx_dph;
1873 thrift::CompactEncoder idx_header_enc;
1874 idx_ph.serialize(idx_header_enc);
1875 const auto& raw_idx_header_bytes = idx_header_enc.data();
1877 const uint8_t* idx_header_write_data = raw_idx_header_bytes.data();
1878 size_t idx_header_write_size = raw_idx_header_bytes.size();
1879#if defined(SIGNET_ENABLE_COMMERCIAL) && SIGNET_ENABLE_COMMERCIAL
1880 std::vector<uint8_t> idx_header_record_buf;
1881 if (encryptor_ && encryptor_->is_column_encrypted(col_desc.name)) {
1882 auto enc_header = encryptor_->encrypt_data_page_header(
1883 raw_idx_header_bytes.data(), raw_idx_header_bytes.size(),
1884 col_desc.name,
static_cast<int32_t
>(row_groups_.size()), 0);
1885 if (!enc_header)
return enc_header.error();
1887 idx_header_write_data = idx_header_record_buf.data();
1888 idx_header_write_size = idx_header_record_buf.size();
1892 info.data_page_offset = file_offset_;
1894 write_raw(idx_header_write_data, idx_header_write_size);
1895 write_raw(idx_write_data, idx_write_size);
1897 info.total_uncompressed +=
static_cast<int64_t
>(idx_header_write_size) +
1898 static_cast<int64_t
>(idx_uncompressed_size);
1899 info.total_compressed +=
static_cast<int64_t
>(idx_header_write_size) +
1900 static_cast<int64_t
>(idx_compressed_size);
1910 bool bloom_ensure_filter(
size_t col_index) {
1911 if (bloom_filters_.empty())
return false;
1912 if (bloom_filters_[col_index])
return true;
1915 const auto& col_name = schema_.column(col_index).name;
1916 if (!options_.bloom_filter_columns.empty() &&
1917 options_.bloom_filter_columns.find(col_name) ==
1918 options_.bloom_filter_columns.end()) {
1923 size_t ndv_estimate =
static_cast<size_t>(
1924 (std::max)(int64_t{1}, options_.row_group_size));
1925 bloom_filters_[col_index] = std::make_unique<SplitBlockBloomFilter>(
1926 ndv_estimate, options_.bloom_filter_fpr);
1931 template <
typename T>
1932 void bloom_insert_typed(
size_t col_index,
const T* values,
size_t count) {
1933 if (!bloom_ensure_filter(col_index))
return;
1934 auto& bf = bloom_filters_[col_index];
1935 for (
size_t i = 0; i < count; ++i) {
1936 bf->insert_value(values[i]);
1944 [[nodiscard]]
static std::vector<std::string> split_csv_line(
const std::string& line) {
1945 std::vector<std::string> fields;
1947 bool in_quotes =
false;
1950 while (i < line.size()) {
1956 if (i + 1 < line.size() && line[i + 1] ==
'"') {
1972 }
else if (ch ==
',') {
1975 fields.push_back(std::move(field));
1978 }
else if (ch ==
'\r') {
1990 fields.push_back(std::move(field));
1996 static void trim_string(std::string& s) {
1997 size_t start = s.find_first_not_of(
" \t\r\n");
1998 if (start == std::string::npos) {
2002 size_t end = s.find_last_not_of(
" \t\r\n");
2003 s = s.substr(start, end - start + 1);