Signet Forge 0.1.1
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
feature_writer.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
5//
6// Writes typed, versioned feature vectors keyed by entity_id and timestamp_ns.
7// Schema: entity_id (STRING) | timestamp_ns (INT64) | version (INT32) | feat_0..N (DOUBLE)
8// Files are append-only; rolling is controlled by max_file_rows.
9
10#pragma once
11
12#include "signet/error.hpp"
13#include "signet/types.hpp"
14#include "signet/schema.hpp"
15#include "signet/writer.hpp"
16
17#include <chrono>
18#include <cstdint>
19#include <cstdio>
20#include <filesystem>
21#include <memory>
22#include <string>
23#include <vector>
24
25namespace signet::forge {
26
27// ============================================================================
28// FeatureGroupDef — schema definition for one feature group
29// (defined at namespace scope to avoid Apple Clang default-member-init bug)
30// ============================================================================
31
39 std::string name;
40 std::vector<std::string> feature_names;
41 int32_t schema_version = 1;
42};
43
44// ============================================================================
45// FeatureVector — one versioned observation for one entity
46// ============================================================================
47
56 std::string entity_id;
57 int64_t timestamp_ns = 0;
58 int32_t version = 1;
59 std::vector<double> values;
60 std::string computation_dag;
61 std::vector<int64_t> source_row_ids;
62};
63
64// ============================================================================
65// FeatureWriterOptions
66// (defined outside FeatureWriter to avoid Apple Clang default-member-init bug)
67// ============================================================================
68
76 std::string output_dir;
78 size_t row_group_size = 10'000;
79 size_t max_file_rows = 1'000'000;
80 std::string file_prefix = "features";
81};
82
83// ============================================================================
84// FeatureWriter — append-only writer for a single feature group
85// ============================================================================
86
103public:
106
107 // -------------------------------------------------------------------------
108 // Factory
109 // -------------------------------------------------------------------------
110
118 [[nodiscard]] static expected<FeatureWriter> create(Options opts) {
119 if (opts.output_dir.empty())
121 "FeatureWriter: output_dir must not be empty"};
122 // Validate output_dir against path traversal:
123 // 1. Check raw path for '..' components (catches ../../etc, /tmp/a/../../../etc)
124 // 2. Also check canonical path in case symlinks resolve to traversal
125 // CWE-59: Improper Link Resolution Before File Access —
126 // weakly_canonical() resolves symlinks for existing path prefixes but
127 // cannot fully validate symlinks for not-yet-created directories.
128 // Deployers should ensure the output directory is not a symlink to
129 // an untrusted location.
130 {
131 std::filesystem::path raw(opts.output_dir);
132 for (const auto& part : raw) {
133 if (part == "..") {
135 "FeatureWriter: output_dir contains '..' path traversal"};
136 }
137 }
138 std::filesystem::path canon = std::filesystem::weakly_canonical(raw);
139 for (const auto& part : canon) {
140 if (part == "..") {
142 "FeatureWriter: output_dir resolves to path with '..' traversal"};
143 }
144 }
145 }
146 if (opts.group.name.empty())
148 "FeatureWriter: group.name must not be empty"};
149 if (opts.group.feature_names.empty())
151 "FeatureWriter: group.feature_names must not be empty"};
152
153 // CWE-22: Improper Limitation of a Pathname to a Restricted Directory —
154 // file_prefix and group.name are concatenated into output filenames;
155 // reject path separators and traversal components to prevent writes
156 // outside the validated output_dir.
157 auto has_path_chars = [](const std::string& s) {
158 return s.find('/') != std::string::npos ||
159 s.find('\\') != std::string::npos ||
160 s.find("..") != std::string::npos ||
161 s.find('\0') != std::string::npos;
162 };
163 if (has_path_chars(opts.file_prefix))
165 "FeatureWriter: file_prefix contains path separator or traversal"};
166 if (has_path_chars(opts.group.name))
168 "FeatureWriter: group.name contains path separator or traversal"};
169
170 std::error_code ec;
171 std::filesystem::create_directories(opts.output_dir, ec);
172 if (ec)
174 "FeatureWriter: cannot create output_dir '" +
175 opts.output_dir + "': " + ec.message()};
176
177 FeatureWriter fw;
178 fw.opts_ = std::move(opts);
179 fw.feat_bufs_.resize(fw.opts_.group.feature_names.size());
180 return fw;
181 }
182
183 // -------------------------------------------------------------------------
184 // Special members — movable, non-copyable
185 // -------------------------------------------------------------------------
186
188 FeatureWriter() = default;
189 FeatureWriter(FeatureWriter&&) noexcept = default;
190 FeatureWriter& operator=(FeatureWriter&&) noexcept = default;
191 FeatureWriter(const FeatureWriter&) = delete;
192 FeatureWriter& operator=(const FeatureWriter&) = delete;
193
195 if (current_writer_) (void)close();
196 }
197
198 // -------------------------------------------------------------------------
199 // Write a single feature vector
200 // -------------------------------------------------------------------------
201
210 [[nodiscard]] expected<void> write(const FeatureVector& fv) {
211 const size_t nfeat = opts_.group.feature_names.size();
212 if (fv.values.size() != nfeat)
214 "FeatureWriter: values.size() " +
215 std::to_string(fv.values.size()) +
216 " != feature_names.size() " +
217 std::to_string(nfeat)};
218
219 entity_buf_.push_back(fv.entity_id);
220 ts_buf_.push_back(fv.timestamp_ns);
221 ver_buf_.push_back(fv.version);
222 for (size_t i = 0; i < nfeat; ++i)
223 feat_bufs_[i].push_back(fv.values[i]);
224
225 ++pending_;
226 ++total_rows_;
227
228 if (pending_ >= opts_.row_group_size)
229 return flush();
230
231 return expected<void>{};
232 }
233
234 // -------------------------------------------------------------------------
235 // Write a batch of feature vectors
236 // -------------------------------------------------------------------------
237
242 [[nodiscard]] expected<void> write_batch(const FeatureVector* fvs, size_t count) {
243 for (size_t i = 0; i < count; ++i) {
244 auto r = write(fvs[i]);
245 if (!r) return r;
246 }
247 return expected<void>{};
248 }
249
253 [[nodiscard]] expected<void> write_batch(const std::vector<FeatureVector>& fvs) {
254 return write_batch(fvs.data(), fvs.size());
255 }
256
257 // -------------------------------------------------------------------------
258 // Flush pending rows to the current Parquet file (explicit call)
259 // -------------------------------------------------------------------------
260
267 [[nodiscard]] expected<void> flush() {
268 if (pending_ == 0) return expected<void>{};
269
270 if (!current_writer_) {
271 auto r = open_new_file();
272 if (!r) return r;
273 }
274
275 const size_t nfeat = opts_.group.feature_names.size();
276
277 // Do NOT clear buffers on write failure — preserve data for retry (CWE-459)
278 { auto r = current_writer_->write_column(0, entity_buf_.data(), entity_buf_.size()); if (!r) return r; }
279 { auto r = current_writer_->write_column<int64_t>(1, ts_buf_.data(), ts_buf_.size()); if (!r) return r; }
280 { auto r = current_writer_->write_column<int32_t>(2, ver_buf_.data(), ver_buf_.size()); if (!r) return r; }
281 for (size_t i = 0; i < nfeat; ++i) {
282 auto r = current_writer_->write_column<double>(3 + i, feat_bufs_[i].data(), feat_bufs_[i].size());
283 if (!r) return r;
284 }
285 { auto r = current_writer_->flush_row_group(); if (!r) return r; }
286
287 current_file_rows_ += pending_;
288 pending_ = 0;
289 entity_buf_.clear();
290 ts_buf_.clear();
291 ver_buf_.clear();
292 for (auto& fb : feat_bufs_) fb.clear();
293
294 // Roll to new file if this one is full
295 if (current_file_rows_ >= opts_.max_file_rows) {
296 auto r = current_writer_->close();
297 current_writer_.reset();
298 current_file_rows_ = 0;
299 if (!r) {
300 // CWE-459: Incomplete Cleanup — remove the partial/corrupt
301 // file on roll close failure to prevent downstream readers
302 // from ingesting an incomplete Parquet file.
303 std::error_code remove_ec;
304 std::filesystem::remove(current_file_path_, remove_ec);
305 return r.error();
306 }
307 // Register file only after confirmed successful close
308 register_file(current_file_path_);
309 }
310
311 return expected<void>{};
312 }
313
314 // -------------------------------------------------------------------------
315 // Close — flush remaining data, write Parquet footer, finalize file
316 // -------------------------------------------------------------------------
317
320 [[nodiscard]] expected<void> close() {
321 if (pending_ > 0) {
322 auto r = flush();
323 if (!r) return r;
324 }
325 if (current_writer_) {
326 auto cr = current_writer_->close();
327 current_writer_.reset();
328 if (!cr) { current_file_rows_ = 0; return cr.error(); }
329 // Register file only after confirmed successful close
330 if (current_file_rows_ > 0) register_file(current_file_path_);
331 current_file_rows_ = 0;
332 }
333 return expected<void>{};
334 }
335
336 // -------------------------------------------------------------------------
337 // Accessors
338 // -------------------------------------------------------------------------
339
341 [[nodiscard]] const FeatureGroupDef& group_def() const { return opts_.group; }
343 [[nodiscard]] int64_t rows_written() const { return total_rows_; }
345 [[nodiscard]] std::vector<std::string> output_files() const { return output_files_; }
347 [[nodiscard]] bool is_open() const { return static_cast<bool>(current_writer_); }
348
349private:
351 [[nodiscard]] expected<void> open_new_file() {
352 // Build schema: entity_id | timestamp_ns | version | feat_0 .. feat_N
353 auto sb = Schema::builder(opts_.group.name)
354 .column<std::string>("entity_id")
355 .column<int64_t>("timestamp_ns", LogicalType::TIMESTAMP_NS)
356 .column<int32_t>("version");
357
358 for (const auto& fname : opts_.group.feature_names)
359 sb.column<double>(fname);
360
361 Schema schema = sb.build();
362
363 // Generate unique filename based on file_index + nanosecond timestamp
364 using namespace std::chrono;
365 const int64_t ts_ns = static_cast<int64_t>(
366 duration_cast<nanoseconds>(
367 system_clock::now().time_since_epoch()).count());
368 char buf[80];
369 std::snprintf(buf, sizeof(buf), "%08zu_%lld",
370 file_index_++, static_cast<long long>(ts_ns));
371
372 current_file_path_ = opts_.output_dir + "/" + opts_.file_prefix
373 + "_" + opts_.group.name + "_" + buf + ".parquet";
374
375 WriterOptions wo;
376 wo.row_group_size = static_cast<int64_t>(opts_.row_group_size);
377
378 auto wr = ParquetWriter::open(current_file_path_, schema, wo);
379 if (!wr) return wr.error();
380
381 current_writer_ = std::make_unique<ParquetWriter>(std::move(*wr));
382 current_file_rows_ = 0;
383 return expected<void>{};
384 }
385
387 void register_file(const std::string& path) {
388 for (const auto& p : output_files_) { if (p == path) return; }
389 output_files_.push_back(path);
390 }
391
392 // -------------------------------------------------------------------------
393 // State
394 // -------------------------------------------------------------------------
395
396 Options opts_;
397 std::unique_ptr<ParquetWriter> current_writer_;
398 std::string current_file_path_;
399 size_t current_file_rows_{0};
400 size_t file_index_{0};
401 int64_t total_rows_{0};
402 size_t pending_{0};
403
404 // Per-row-group buffers (cleared after each flush)
405 std::vector<std::string> entity_buf_;
406 std::vector<int64_t> ts_buf_;
407 std::vector<int32_t> ver_buf_;
408 std::vector<std::vector<double>> feat_bufs_;
409
410 std::vector<std::string> output_files_;
411};
412
413} // namespace signet::forge
Append-only writer for a single feature group.
FeatureWriter()=default
Default-construct an empty writer (use create() factory instead).
expected< void > write_batch(const std::vector< FeatureVector > &fvs)
Write a vector of feature vectors.
FeatureWriter(FeatureWriter &&) noexcept=default
bool is_open() const
Check whether the writer currently has an open Parquet file.
const FeatureGroupDef & group_def() const
Return the feature group definition used by this writer.
expected< void > flush()
Flush all buffered rows to the current Parquet file as a new row group.
std::vector< std::string > output_files() const
Return a copy of all finalized output file paths (excludes the active file).
static expected< FeatureWriter > create(Options opts)
Create a new FeatureWriter for the given options.
expected< void > write_batch(const FeatureVector *fvs, size_t count)
Write a contiguous array of feature vectors.
int64_t rows_written() const
Total number of rows written (including flushed and pending).
expected< void > close()
Close the writer: flush remaining data, write Parquet footer, and finalize the file.
expected< void > write(const FeatureVector &fv)
Write a single feature vector to the store.
FeatureWriterOptions Options
Alias for the options struct.
static expected< ParquetWriter > open(const std::filesystem::path &path, const Schema &schema, const Options &options=Options{})
Open a new Parquet file for writing.
Definition writer.hpp:303
SchemaBuilder & column(std::string col_name, LogicalType logical_type=LogicalType::NONE)
Add a typed column, deducing PhysicalType from T.
Definition schema.hpp:107
Immutable schema description for a Parquet file.
Definition schema.hpp:192
static Schema build(std::string name, Cols &&... cols)
Build a Schema from typed Column<T> descriptors (variadic factory).
Definition schema.hpp:217
static SchemaBuilder builder(std::string name)
Create a SchemaBuilder for fluent column construction.
Definition schema.hpp:228
A lightweight result type that holds either a success value of type T or an Error.
Definition error.hpp:143
@ TIMESTAMP_NS
Timestamp — INT64, nanoseconds since Unix epoch.
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ SCHEMA_MISMATCH
The requested column name or type does not match the file schema.
Schema definition types: Column<T>, SchemaBuilder, and Schema.
Lightweight error value carrying an ErrorCode and a human-readable message.
Definition error.hpp:99
Schema definition for a single feature group.
std::string name
Group name (e.g. "price_features", "vol_features").
std::vector< std::string > feature_names
Ordered feature column names.
int32_t schema_version
Bumped when features are added or removed.
A single versioned observation for one entity.
int64_t timestamp_ns
Observation timestamp in nanoseconds.
std::vector< double > values
One value per FeatureGroupDef::feature_names.
std::string computation_dag
DAG description of feature computation (EU AI Act Art.12)
std::vector< int64_t > source_row_ids
Source row IDs used in computation.
int32_t version
Version number (for late-arriving corrections).
std::string entity_id
Entity key (e.g. "BTCUSDT").
Configuration options for FeatureWriter::create().
size_t max_file_rows
Rows per output file before rolling to a new file.
std::string file_prefix
Filename prefix for generated Parquet files.
std::string output_dir
Directory for output Parquet files (created if missing).
size_t row_group_size
Rows per Parquet row group.
FeatureGroupDef group
Feature group schema definition.
Configuration options for ParquetWriter.
Definition writer.hpp:188
int64_t row_group_size
Target number of rows per row group.
Definition writer.hpp:192
Parquet format enumerations, type traits, and statistics structs.