121 "FeatureWriter: output_dir must not be empty"};
132 for (
const auto& part : raw) {
135 "FeatureWriter: output_dir contains '..' path traversal"};
138 std::filesystem::path canon = std::filesystem::weakly_canonical(raw);
139 for (
const auto& part : canon) {
142 "FeatureWriter: output_dir resolves to path with '..' traversal"};
148 "FeatureWriter: group.name must not be empty"};
151 "FeatureWriter: group.feature_names must not be empty"};
154 std::filesystem::create_directories(opts.
output_dir, ec);
157 "FeatureWriter: cannot create output_dir '" +
161 fw.opts_ = std::move(opts);
178 if (current_writer_) (void)
close();
195 if (fv.
values.size() != nfeat)
197 "FeatureWriter: values.size() " +
198 std::to_string(fv.
values.size()) +
199 " != feature_names.size() " +
200 std::to_string(nfeat)};
204 ver_buf_.push_back(fv.
version);
205 for (
size_t i = 0; i < nfeat; ++i)
206 feat_bufs_[i].push_back(fv.
values[i]);
226 for (
size_t i = 0; i < count; ++i) {
227 auto r =
write(fvs[i]);
253 if (!current_writer_) {
254 auto r = open_new_file();
261 {
auto r = current_writer_->write_column(0, entity_buf_.data(), entity_buf_.size());
if (!r)
return r; }
262 {
auto r = current_writer_->write_column<int64_t>(1, ts_buf_.data(), ts_buf_.size());
if (!r)
return r; }
263 {
auto r = current_writer_->write_column<int32_t>(2, ver_buf_.data(), ver_buf_.size());
if (!r)
return r; }
264 for (
size_t i = 0; i < nfeat; ++i) {
265 auto r = current_writer_->write_column<
double>(3 + i, feat_bufs_[i].data(), feat_bufs_[i].size());
268 {
auto r = current_writer_->flush_row_group();
if (!r)
return r; }
270 current_file_rows_ += pending_;
275 for (
auto& fb : feat_bufs_) fb.clear();
279 auto r = current_writer_->close();
280 current_writer_.reset();
281 current_file_rows_ = 0;
286 std::error_code remove_ec;
287 std::filesystem::remove(current_file_path_, remove_ec);
291 register_file(current_file_path_);
308 if (current_writer_) {
309 auto cr = current_writer_->close();
310 current_writer_.reset();
311 if (!cr) { current_file_rows_ = 0;
return cr.error(); }
313 if (current_file_rows_ > 0) register_file(current_file_path_);
314 current_file_rows_ = 0;
328 [[nodiscard]] std::vector<std::string>
output_files()
const {
return output_files_; }
330 [[nodiscard]]
bool is_open()
const {
return static_cast<bool>(current_writer_); }
337 .
column<std::string>(
"entity_id")
339 .
column<int32_t>(
"version");
341 for (
const auto& fname : opts_.group.feature_names)
342 sb.column<double>(fname);
347 using namespace std::chrono;
348 const int64_t ts_ns =
static_cast<int64_t
>(
349 duration_cast<nanoseconds>(
350 system_clock::now().time_since_epoch()).count());
352 std::snprintf(buf,
sizeof(buf),
"%08zu_%lld",
353 file_index_++,
static_cast<long long>(ts_ns));
356 +
"_" + opts_.
group.
name +
"_" + buf +
".parquet";
362 if (!wr)
return wr.error();
364 current_writer_ = std::make_unique<ParquetWriter>(std::move(*wr));
365 current_file_rows_ = 0;
370 void register_file(
const std::string& path) {
371 for (
const auto& p : output_files_) {
if (p == path)
return; }
372 output_files_.push_back(path);
380 std::unique_ptr<ParquetWriter> current_writer_;
381 std::string current_file_path_;
382 size_t current_file_rows_{0};
383 size_t file_index_{0};
384 int64_t total_rows_{0};
388 std::vector<std::string> entity_buf_;
389 std::vector<int64_t> ts_buf_;
390 std::vector<int32_t> ver_buf_;
391 std::vector<std::vector<double>> feat_bufs_;
393 std::vector<std::string> output_files_;
Append-only writer for a single feature group.
FeatureWriter()=default
Default-construct an empty writer (use create() factory instead).
expected< void > write_batch(const std::vector< FeatureVector > &fvs)
Write a vector of feature vectors.
FeatureWriter(FeatureWriter &&) noexcept=default
bool is_open() const
Check whether the writer currently has an open Parquet file.
const FeatureGroupDef & group_def() const
Return the feature group definition used by this writer.
expected< void > flush()
Flush all buffered rows to the current Parquet file as a new row group.
std::vector< std::string > output_files() const
Return a copy of all finalized output file paths (excludes the active file).
static expected< FeatureWriter > create(Options opts)
Create a new FeatureWriter for the given options.
expected< void > write_batch(const FeatureVector *fvs, size_t count)
Write a contiguous array of feature vectors.
int64_t rows_written() const
Total number of rows written (including flushed and pending).
expected< void > close()
Close the writer: flush remaining data, write Parquet footer, and finalize the file.
expected< void > write(const FeatureVector &fv)
Write a single feature vector to the store.
FeatureWriterOptions Options
Alias for the options struct.
static expected< ParquetWriter > open(const std::filesystem::path &path, const Schema &schema, const Options &options=Options{})
Open a new Parquet file for writing.
SchemaBuilder & column(std::string col_name, LogicalType logical_type=LogicalType::NONE)
Add a typed column, deducing PhysicalType from T.
Immutable schema description for a Parquet file.
static Schema build(std::string name, Cols &&... cols)
Build a Schema from typed Column<T> descriptors (variadic factory).
static SchemaBuilder builder(std::string name)
Create a SchemaBuilder for fluent column construction.
A lightweight result type that holds either a success value of type T or an Error.
@ TIMESTAMP_NS
Timestamp — INT64, nanoseconds since Unix epoch.
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ SCHEMA_MISMATCH
The requested column name or type does not match the file schema.
Schema definition types: Column<T>, SchemaBuilder, and Schema.
Lightweight error value carrying an ErrorCode and a human-readable message.
Schema definition for a single feature group.
std::string name
Group name (e.g. "price_features", "vol_features").
std::vector< std::string > feature_names
Ordered feature column names.
int32_t schema_version
Bumped when features are added or removed.
A single versioned observation for one entity.
int64_t timestamp_ns
Observation timestamp in nanoseconds.
std::vector< double > values
One value per FeatureGroupDef::feature_names.
std::string computation_dag
DAG description of feature computation (EU AI Act Art.12)
std::vector< int64_t > source_row_ids
Source row IDs used in computation.
int32_t version
Version number (for late-arriving corrections).
std::string entity_id
Entity key (e.g. "BTCUSDT").
Configuration options for FeatureWriter::create().
size_t max_file_rows
Rows per output file before rolling to a new file.
std::string file_prefix
Filename prefix for generated Parquet files.
std::string output_dir
Directory for output Parquet files (created if missing).
size_t row_group_size
Rows per Parquet row group.
FeatureGroupDef group
Feature group schema definition.
Configuration options for ParquetWriter.
int64_t row_group_size
Target number of rows per row group.
Parquet format enumerations, type traits, and statistics structs.