Signet Forge 0.1.0
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
feature_writer.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
5//
6// Writes typed, versioned feature vectors keyed by entity_id and timestamp_ns.
7// Schema: entity_id (STRING) | timestamp_ns (INT64) | version (INT32) | feat_0..N (DOUBLE)
8// Files are append-only; rolling is controlled by max_file_rows.
9
10#pragma once
11
12#include "signet/error.hpp"
13#include "signet/types.hpp"
14#include "signet/schema.hpp"
15#include "signet/writer.hpp"
16
17#include <chrono>
18#include <cstdint>
19#include <cstdio>
20#include <filesystem>
21#include <memory>
22#include <string>
23#include <vector>
24
25namespace signet::forge {
26
27// ============================================================================
28// FeatureGroupDef — schema definition for one feature group
29// (defined at namespace scope to avoid Apple Clang default-member-init bug)
30// ============================================================================
31
39 std::string name;
40 std::vector<std::string> feature_names;
41 int32_t schema_version = 1;
42};
43
44// ============================================================================
45// FeatureVector — one versioned observation for one entity
46// ============================================================================
47
56 std::string entity_id;
57 int64_t timestamp_ns = 0;
58 int32_t version = 1;
59 std::vector<double> values;
60 std::string computation_dag;
61 std::vector<int64_t> source_row_ids;
62};
63
64// ============================================================================
65// FeatureWriterOptions
66// (defined outside FeatureWriter to avoid Apple Clang default-member-init bug)
67// ============================================================================
68
76 std::string output_dir;
78 size_t row_group_size = 10'000;
79 size_t max_file_rows = 1'000'000;
80 std::string file_prefix = "features";
81};
82
83// ============================================================================
84// FeatureWriter — append-only writer for a single feature group
85// ============================================================================
86
103public:
106
107 // -------------------------------------------------------------------------
108 // Factory
109 // -------------------------------------------------------------------------
110
118 [[nodiscard]] static expected<FeatureWriter> create(Options opts) {
119 if (opts.output_dir.empty())
121 "FeatureWriter: output_dir must not be empty"};
122 // Validate output_dir against path traversal:
123 // 1. Check raw path for '..' components (catches ../../etc, /tmp/a/../../../etc)
124 // 2. Also check canonical path in case symlinks resolve to traversal
125 // CWE-59: Improper Link Resolution Before File Access —
126 // weakly_canonical() resolves symlinks for existing path prefixes but
127 // cannot fully validate symlinks for not-yet-created directories.
128 // Deployers should ensure the output directory is not a symlink to
129 // an untrusted location.
130 {
131 std::filesystem::path raw(opts.output_dir);
132 for (const auto& part : raw) {
133 if (part == "..") {
135 "FeatureWriter: output_dir contains '..' path traversal"};
136 }
137 }
138 std::filesystem::path canon = std::filesystem::weakly_canonical(raw);
139 for (const auto& part : canon) {
140 if (part == "..") {
142 "FeatureWriter: output_dir resolves to path with '..' traversal"};
143 }
144 }
145 }
146 if (opts.group.name.empty())
148 "FeatureWriter: group.name must not be empty"};
149 if (opts.group.feature_names.empty())
151 "FeatureWriter: group.feature_names must not be empty"};
152
153 std::error_code ec;
154 std::filesystem::create_directories(opts.output_dir, ec);
155 if (ec)
157 "FeatureWriter: cannot create output_dir '" +
158 opts.output_dir + "': " + ec.message()};
159
160 FeatureWriter fw;
161 fw.opts_ = std::move(opts);
162 fw.feat_bufs_.resize(fw.opts_.group.feature_names.size());
163 return fw;
164 }
165
166 // -------------------------------------------------------------------------
167 // Special members — movable, non-copyable
168 // -------------------------------------------------------------------------
169
171 FeatureWriter() = default;
172 FeatureWriter(FeatureWriter&&) noexcept = default;
173 FeatureWriter& operator=(FeatureWriter&&) noexcept = default;
174 FeatureWriter(const FeatureWriter&) = delete;
175 FeatureWriter& operator=(const FeatureWriter&) = delete;
176
178 if (current_writer_) (void)close();
179 }
180
181 // -------------------------------------------------------------------------
182 // Write a single feature vector
183 // -------------------------------------------------------------------------
184
193 [[nodiscard]] expected<void> write(const FeatureVector& fv) {
194 const size_t nfeat = opts_.group.feature_names.size();
195 if (fv.values.size() != nfeat)
197 "FeatureWriter: values.size() " +
198 std::to_string(fv.values.size()) +
199 " != feature_names.size() " +
200 std::to_string(nfeat)};
201
202 entity_buf_.push_back(fv.entity_id);
203 ts_buf_.push_back(fv.timestamp_ns);
204 ver_buf_.push_back(fv.version);
205 for (size_t i = 0; i < nfeat; ++i)
206 feat_bufs_[i].push_back(fv.values[i]);
207
208 ++pending_;
209 ++total_rows_;
210
211 if (pending_ >= opts_.row_group_size)
212 return flush();
213
214 return expected<void>{};
215 }
216
217 // -------------------------------------------------------------------------
218 // Write a batch of feature vectors
219 // -------------------------------------------------------------------------
220
225 [[nodiscard]] expected<void> write_batch(const FeatureVector* fvs, size_t count) {
226 for (size_t i = 0; i < count; ++i) {
227 auto r = write(fvs[i]);
228 if (!r) return r;
229 }
230 return expected<void>{};
231 }
232
236 [[nodiscard]] expected<void> write_batch(const std::vector<FeatureVector>& fvs) {
237 return write_batch(fvs.data(), fvs.size());
238 }
239
240 // -------------------------------------------------------------------------
241 // Flush pending rows to the current Parquet file (explicit call)
242 // -------------------------------------------------------------------------
243
250 [[nodiscard]] expected<void> flush() {
251 if (pending_ == 0) return expected<void>{};
252
253 if (!current_writer_) {
254 auto r = open_new_file();
255 if (!r) return r;
256 }
257
258 const size_t nfeat = opts_.group.feature_names.size();
259
260 // Do NOT clear buffers on write failure — preserve data for retry (CWE-459)
261 { auto r = current_writer_->write_column(0, entity_buf_.data(), entity_buf_.size()); if (!r) return r; }
262 { auto r = current_writer_->write_column<int64_t>(1, ts_buf_.data(), ts_buf_.size()); if (!r) return r; }
263 { auto r = current_writer_->write_column<int32_t>(2, ver_buf_.data(), ver_buf_.size()); if (!r) return r; }
264 for (size_t i = 0; i < nfeat; ++i) {
265 auto r = current_writer_->write_column<double>(3 + i, feat_bufs_[i].data(), feat_bufs_[i].size());
266 if (!r) return r;
267 }
268 { auto r = current_writer_->flush_row_group(); if (!r) return r; }
269
270 current_file_rows_ += pending_;
271 pending_ = 0;
272 entity_buf_.clear();
273 ts_buf_.clear();
274 ver_buf_.clear();
275 for (auto& fb : feat_bufs_) fb.clear();
276
277 // Roll to new file if this one is full
278 if (current_file_rows_ >= opts_.max_file_rows) {
279 auto r = current_writer_->close();
280 current_writer_.reset();
281 current_file_rows_ = 0;
282 if (!r) {
283 // CWE-459: Incomplete Cleanup — remove the partial/corrupt
284 // file on roll close failure to prevent downstream readers
285 // from ingesting an incomplete Parquet file.
286 std::error_code remove_ec;
287 std::filesystem::remove(current_file_path_, remove_ec);
288 return r.error();
289 }
290 // Register file only after confirmed successful close
291 register_file(current_file_path_);
292 }
293
294 return expected<void>{};
295 }
296
297 // -------------------------------------------------------------------------
298 // Close — flush remaining data, write Parquet footer, finalize file
299 // -------------------------------------------------------------------------
300
303 [[nodiscard]] expected<void> close() {
304 if (pending_ > 0) {
305 auto r = flush();
306 if (!r) return r;
307 }
308 if (current_writer_) {
309 auto cr = current_writer_->close();
310 current_writer_.reset();
311 if (!cr) { current_file_rows_ = 0; return cr.error(); }
312 // Register file only after confirmed successful close
313 if (current_file_rows_ > 0) register_file(current_file_path_);
314 current_file_rows_ = 0;
315 }
316 return expected<void>{};
317 }
318
319 // -------------------------------------------------------------------------
320 // Accessors
321 // -------------------------------------------------------------------------
322
324 [[nodiscard]] const FeatureGroupDef& group_def() const { return opts_.group; }
326 [[nodiscard]] int64_t rows_written() const { return total_rows_; }
328 [[nodiscard]] std::vector<std::string> output_files() const { return output_files_; }
330 [[nodiscard]] bool is_open() const { return static_cast<bool>(current_writer_); }
331
332private:
334 [[nodiscard]] expected<void> open_new_file() {
335 // Build schema: entity_id | timestamp_ns | version | feat_0 .. feat_N
336 auto sb = Schema::builder(opts_.group.name)
337 .column<std::string>("entity_id")
338 .column<int64_t>("timestamp_ns", LogicalType::TIMESTAMP_NS)
339 .column<int32_t>("version");
340
341 for (const auto& fname : opts_.group.feature_names)
342 sb.column<double>(fname);
343
344 Schema schema = sb.build();
345
346 // Generate unique filename based on file_index + nanosecond timestamp
347 using namespace std::chrono;
348 const int64_t ts_ns = static_cast<int64_t>(
349 duration_cast<nanoseconds>(
350 system_clock::now().time_since_epoch()).count());
351 char buf[80];
352 std::snprintf(buf, sizeof(buf), "%08zu_%lld",
353 file_index_++, static_cast<long long>(ts_ns));
354
355 current_file_path_ = opts_.output_dir + "/" + opts_.file_prefix
356 + "_" + opts_.group.name + "_" + buf + ".parquet";
357
358 WriterOptions wo;
359 wo.row_group_size = static_cast<int64_t>(opts_.row_group_size);
360
361 auto wr = ParquetWriter::open(current_file_path_, schema, wo);
362 if (!wr) return wr.error();
363
364 current_writer_ = std::make_unique<ParquetWriter>(std::move(*wr));
365 current_file_rows_ = 0;
366 return expected<void>{};
367 }
368
370 void register_file(const std::string& path) {
371 for (const auto& p : output_files_) { if (p == path) return; }
372 output_files_.push_back(path);
373 }
374
375 // -------------------------------------------------------------------------
376 // State
377 // -------------------------------------------------------------------------
378
379 Options opts_;
380 std::unique_ptr<ParquetWriter> current_writer_;
381 std::string current_file_path_;
382 size_t current_file_rows_{0};
383 size_t file_index_{0};
384 int64_t total_rows_{0};
385 size_t pending_{0};
386
387 // Per-row-group buffers (cleared after each flush)
388 std::vector<std::string> entity_buf_;
389 std::vector<int64_t> ts_buf_;
390 std::vector<int32_t> ver_buf_;
391 std::vector<std::vector<double>> feat_bufs_;
392
393 std::vector<std::string> output_files_;
394};
395
396} // namespace signet::forge
Append-only writer for a single feature group.
FeatureWriter()=default
Default-construct an empty writer (use create() factory instead).
expected< void > write_batch(const std::vector< FeatureVector > &fvs)
Write a vector of feature vectors.
FeatureWriter(FeatureWriter &&) noexcept=default
bool is_open() const
Check whether the writer currently has an open Parquet file.
const FeatureGroupDef & group_def() const
Return the feature group definition used by this writer.
expected< void > flush()
Flush all buffered rows to the current Parquet file as a new row group.
std::vector< std::string > output_files() const
Return a copy of all finalized output file paths (excludes the active file).
static expected< FeatureWriter > create(Options opts)
Create a new FeatureWriter for the given options.
expected< void > write_batch(const FeatureVector *fvs, size_t count)
Write a contiguous array of feature vectors.
int64_t rows_written() const
Total number of rows written (including flushed and pending).
expected< void > close()
Close the writer: flush remaining data, write Parquet footer, and finalize the file.
expected< void > write(const FeatureVector &fv)
Write a single feature vector to the store.
FeatureWriterOptions Options
Alias for the options struct.
static expected< ParquetWriter > open(const std::filesystem::path &path, const Schema &schema, const Options &options=Options{})
Open a new Parquet file for writing.
Definition writer.hpp:303
SchemaBuilder & column(std::string col_name, LogicalType logical_type=LogicalType::NONE)
Add a typed column, deducing PhysicalType from T.
Definition schema.hpp:107
Immutable schema description for a Parquet file.
Definition schema.hpp:192
static Schema build(std::string name, Cols &&... cols)
Build a Schema from typed Column<T> descriptors (variadic factory).
Definition schema.hpp:217
static SchemaBuilder builder(std::string name)
Create a SchemaBuilder for fluent column construction.
Definition schema.hpp:228
A lightweight result type that holds either a success value of type T or an Error.
Definition error.hpp:145
@ TIMESTAMP_NS
Timestamp — INT64, nanoseconds since Unix epoch.
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ SCHEMA_MISMATCH
The requested column name or type does not match the file schema.
Schema definition types: Column<T>, SchemaBuilder, and Schema.
Lightweight error value carrying an ErrorCode and a human-readable message.
Definition error.hpp:101
Schema definition for a single feature group.
std::string name
Group name (e.g. "price_features", "vol_features").
std::vector< std::string > feature_names
Ordered feature column names.
int32_t schema_version
Bumped when features are added or removed.
A single versioned observation for one entity.
int64_t timestamp_ns
Observation timestamp in nanoseconds.
std::vector< double > values
One value per FeatureGroupDef::feature_names.
std::string computation_dag
DAG description of feature computation (EU AI Act Art.12)
std::vector< int64_t > source_row_ids
Source row IDs used in computation.
int32_t version
Version number (for late-arriving corrections).
std::string entity_id
Entity key (e.g. "BTCUSDT").
Configuration options for FeatureWriter::create().
size_t max_file_rows
Rows per output file before rolling to a new file.
std::string file_prefix
Filename prefix for generated Parquet files.
std::string output_dir
Directory for output Parquet files (created if missing).
size_t row_group_size
Rows per Parquet row group.
FeatureGroupDef group
Feature group schema definition.
Configuration options for ParquetWriter.
Definition writer.hpp:188
int64_t row_group_size
Target number of rows per row group.
Definition writer.hpp:192
Parquet format enumerations, type traits, and statistics structs.