Signet Forge 0.1.0
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
feature_reader.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
5//
6// Point-in-time correct reads, time-travel history, batch lookups, and column
7// projection over Parquet files written by FeatureWriter.
8//
9// Index is built at open() time by scanning entity_id + timestamp_ns columns
10// across all files. All subsequent queries are O(log N) with no disk I/O
11// (file data is held in memory by ParquetReader).
12
13#pragma once
14
15#include "signet/error.hpp"
16#include "signet/types.hpp"
17#include "signet/reader.hpp"
18#include "signet/ai/feature_writer.hpp" // FeatureGroupDef, FeatureVector
19
20#include <algorithm>
21#include <cmath>
22#include <cstdint>
23#include <filesystem>
24#include <limits>
25#include <memory>
26#include <string>
27#include <tuple>
28#include <mutex>
29#include <unordered_map>
30#include <vector>
31
32namespace signet::forge {
33
34// ============================================================================
35// FeatureReaderOptions
36// (defined outside FeatureReader to avoid Apple Clang default-member-init bug)
37// ============================================================================
38
46 std::vector<std::string> parquet_files;
47 std::string feature_group;
48};
49
50// ============================================================================
51// FeatureReader — point-in-time correct ML feature store reader
52// ============================================================================
53
72public:
75
76 // -------------------------------------------------------------------------
77 // Factory — opens all files and builds in-memory index
78 // -------------------------------------------------------------------------
79
84 [[nodiscard]] static expected<FeatureReader> open(Options opts) {
85 if (opts.parquet_files.empty())
87 "FeatureReader: parquet_files must not be empty"};
88
89 FeatureReader reader;
90 reader.opts_ = std::move(opts);
91
92 auto r = reader.build_index();
93 if (!r) return r.error();
94
95 return reader;
96 }
97
98 // -------------------------------------------------------------------------
99 // Special members — movable, non-copyable
100 // -------------------------------------------------------------------------
101
103 FeatureReader() = default;
105 : opts_(std::move(o.opts_)),
106 index_(std::move(o.index_)),
107 feature_names_(std::move(o.feature_names_)),
108 total_rows_(o.total_rows_),
109 failed_file_count_(o.failed_file_count_),
110 readers_(std::move(o.readers_)) {
111 // Lock source cache mutex to prevent data race with concurrent ensure_cached()
112 std::lock_guard<std::mutex> lk(o.rg_cache_mutex_);
113 rg_cache_ = std::move(o.rg_cache_);
114 }
116 if (this != &o) {
117 opts_ = std::move(o.opts_);
118 index_ = std::move(o.index_);
119 feature_names_ = std::move(o.feature_names_);
120 total_rows_ = o.total_rows_;
121 failed_file_count_= o.failed_file_count_;
122 readers_ = std::move(o.readers_);
123 std::scoped_lock lk(rg_cache_mutex_, o.rg_cache_mutex_);
124 rg_cache_ = std::move(o.rg_cache_);
125 }
126 return *this;
127 }
128 FeatureReader(const FeatureReader&) = delete;
130
131 // =========================================================================
132 // Query API
133 // =========================================================================
134
135 // -------------------------------------------------------------------------
136 // as_of — latest version of entity at or before timestamp_ns
137 // -------------------------------------------------------------------------
138
149 const std::string& entity_id,
150 int64_t timestamp_ns,
151 const std::vector<std::string>& project = {}) const {
152
153 auto it = index_.find(entity_id);
154 if (it == index_.end())
155 return std::optional<FeatureVector>{std::nullopt};
156
157 const auto& locs = it->second;
158
159 // Binary search: find the last entry with timestamp_ns <= query time.
160 auto pos = std::upper_bound(
161 locs.begin(), locs.end(), timestamp_ns,
162 [](int64_t ts, const RowLocation& loc) {
163 return ts < loc.timestamp_ns;
164 });
165
166 if (pos == locs.begin())
167 return std::optional<FeatureVector>{std::nullopt}; // all > timestamp
168
169 --pos; // largest timestamp_ns <= query
170
171 auto fv = fetch_row(*pos, project);
172 if (!fv) return fv.error();
173 return std::optional<FeatureVector>{std::move(*fv)};
174 }
175
176 // -------------------------------------------------------------------------
177 // get — latest version of entity (no time constraint)
178 // -------------------------------------------------------------------------
179
188 const std::string& entity_id,
189 const std::vector<std::string>& project = {}) const {
190 return as_of(entity_id,
191 (std::numeric_limits<int64_t>::max)(),
192 project);
193 }
194
195 // -------------------------------------------------------------------------
196 // history — all versions of entity in the inclusive range [start_ns, end_ns]
197 // -------------------------------------------------------------------------
198
208 const std::string& entity_id,
209 int64_t start_ns,
210 int64_t end_ns,
211 const std::vector<std::string>& project = {}) const {
212
213 auto it = index_.find(entity_id);
214 if (it == index_.end())
215 return std::vector<FeatureVector>{};
216
217 const auto& locs = it->second;
218 std::vector<FeatureVector> result;
219
220 for (const auto& loc : locs) {
221 if (loc.timestamp_ns < start_ns) continue;
222 if (loc.timestamp_ns > end_ns) break;
223
224 auto fv = fetch_row(loc, project);
225 if (!fv) return fv.error();
226 result.push_back(std::move(*fv));
227 }
228
229 return result;
230 }
231
232 // -------------------------------------------------------------------------
233 // as_of_batch — as_of for N entities at the same timestamp
234 // -------------------------------------------------------------------------
235
245 const std::vector<std::string>& entity_ids,
246 int64_t timestamp_ns,
247 const std::vector<std::string>& project = {}) const {
248
249 std::vector<FeatureVector> result;
250 result.reserve(entity_ids.size());
251
252 for (const auto& eid : entity_ids) {
253 auto r = as_of(eid, timestamp_ns, project);
254 if (!r) return r.error();
255 if (r->has_value())
256 result.push_back(std::move(r->value()));
257 }
258
259 return result;
260 }
261
262 // =========================================================================
263 // Schema / metadata accessors
264 // =========================================================================
265
267 [[nodiscard]] const std::vector<std::string>& feature_names() const {
268 return feature_names_;
269 }
271 [[nodiscard]] size_t num_features() const { return feature_names_.size(); }
273 [[nodiscard]] size_t num_entities() const { return index_.size(); }
275 [[nodiscard]] size_t total_rows() const { return total_rows_; }
279 [[nodiscard]] size_t failed_file_count() const { return failed_file_count_; }
280
281private:
282 // =========================================================================
283 // Internal: row location in the index
284 // =========================================================================
285
287 struct RowLocation {
288 int64_t timestamp_ns = 0;
289 int32_t version = 1;
290 size_t file_idx = 0;
291 size_t row_group = 0;
292 size_t row_offset = 0;
293 };
294
295 // entity_id → entries sorted by timestamp_ns (ascending)
296 using EntityIndex = std::unordered_map<std::string, std::vector<RowLocation>>;
297
299 [[nodiscard]] expected<void> build_index() {
300 const size_t num_files = opts_.parquet_files.size();
301 readers_.reserve(num_files);
302
303 for (size_t fi = 0; fi < num_files; ++fi) {
304 const auto& path = opts_.parquet_files[fi];
305
306 // Check existence before opening (ParquetReader::open gives a
307 // poor error message for missing files).
308 std::error_code ec;
309 if (!std::filesystem::exists(path, ec)) {
310 readers_.push_back(nullptr);
311 ++failed_file_count_;
312 continue;
313 }
314
315 auto rdr_result = ParquetReader::open(path);
316 if (!rdr_result) {
317 readers_.push_back(nullptr);
318 ++failed_file_count_;
319 continue;
320 }
321
322 auto& rdr = *rdr_result;
323 const auto& schema = rdr.schema();
324
325 // Discover feature names from the first readable file.
326 // Schema layout: col0=entity_id, col1=timestamp_ns, col2=version,
327 // col3..N = feature columns (DOUBLE).
328 if (feature_names_.empty() && schema.num_columns() > 3) {
329 for (size_t ci = 3; ci < schema.num_columns(); ++ci)
330 feature_names_.push_back(schema.column(ci).name);
331 }
332
333 const size_t num_rg = static_cast<size_t>(rdr.num_row_groups());
334
335 for (size_t rg = 0; rg < num_rg; ++rg) {
336 // Read only entity_id (col 0) and timestamp_ns (col 1)
337 // for the index — feature values are fetched on demand.
338 auto eid_result = rdr.read_column<std::string>(rg, 0);
339 if (!eid_result) {
340 return Error{ErrorCode::CORRUPT_DATA,
341 "FeatureReader: failed to read entity_id from '" + path +
342 "' row group " + std::to_string(rg) +
343 ": " + eid_result.error().message};
344 }
345
346 auto ts_result = rdr.read_column<int64_t>(rg, 1);
347 if (!ts_result) {
348 return Error{ErrorCode::CORRUPT_DATA,
349 "FeatureReader: failed to read timestamp_ns from '" + path +
350 "' row group " + std::to_string(rg) +
351 ": " + ts_result.error().message};
352 }
353
354 auto ver_result = rdr.read_column<int32_t>(rg, 2);
355 if (!ver_result) {
356 return Error{ErrorCode::CORRUPT_DATA,
357 "FeatureReader: failed to read version from '" + path +
358 "' row group " + std::to_string(rg) +
359 ": " + ver_result.error().message};
360 }
361
362 const auto& eids = *eid_result;
363 const auto& tss = *ts_result;
364 const auto& vers = *ver_result;
365 if (eids.size() != tss.size() || eids.size() != vers.size()) {
366 return Error{
368 "FeatureReader: fixed column length mismatch in '" + path +
369 "' row group " + std::to_string(rg) + " (entity_id=" +
370 std::to_string(eids.size()) + ", timestamp_ns=" +
371 std::to_string(tss.size()) + ", version=" +
372 std::to_string(vers.size()) + ")"};
373 }
374 const size_t nrows = eids.size();
375
376 for (size_t row = 0; row < nrows; ++row) {
377 RowLocation loc;
378 loc.timestamp_ns = tss[row];
379 loc.version = vers[row];
380 loc.file_idx = fi;
381 loc.row_group = rg;
382 loc.row_offset = row;
383 index_[eids[row]].push_back(loc);
384 }
385
386 total_rows_ += nrows;
387 }
388
389 // Move reader into storage so fetch_row can use it later.
390 readers_.push_back(
391 std::make_unique<ParquetReader>(std::move(*rdr_result)));
392 }
393
394 // Sort each entity's entries by (timestamp_ns, version) composite key.
395 for (auto& [eid, locs] : index_) {
396 std::sort(locs.begin(), locs.end(),
397 [](const RowLocation& a, const RowLocation& b) {
398 if (a.timestamp_ns != b.timestamp_ns)
399 return a.timestamp_ns < b.timestamp_ns;
400 if (a.version != b.version)
401 return a.version < b.version;
402 if (a.file_idx != b.file_idx)
403 return a.file_idx < b.file_idx;
404 if (a.row_group != b.row_group)
405 return a.row_group < b.row_group;
406 return a.row_offset < b.row_offset;
407 });
408 }
409
410 return expected<void>{};
411 }
412
413 // =========================================================================
414 // Row group cache — avoids re-decoding entire columns per point query
415 // =========================================================================
416
418 struct RowGroupCache {
419 size_t file_idx = SIZE_MAX;
420 size_t row_group = SIZE_MAX;
421
422 std::vector<std::string> eids;
423 std::vector<int64_t> tss;
424 std::vector<int32_t> vers;
425 std::vector<std::vector<double>> feat_cols;
426 };
427
429 [[nodiscard]] expected<const RowGroupCache*> ensure_cached(
430 const RowLocation& loc) const {
431 std::lock_guard<std::mutex> cache_lk(rg_cache_mutex_);
432
433 // Cache hit — same (file_idx, row_group) as last query
434 if (rg_cache_.file_idx == loc.file_idx &&
435 rg_cache_.row_group == loc.row_group) {
436 return &rg_cache_;
437 }
438
439 if (loc.file_idx >= readers_.size() || !readers_[loc.file_idx])
440 return Error{ErrorCode::IO_ERROR,
441 "FeatureReader: reader for file index " +
442 std::to_string(loc.file_idx) + " is not available"};
443
444 auto& rdr = *readers_[loc.file_idx];
445 const auto& schema = rdr.schema();
446
447 // Decode the three fixed columns
448 auto eid_result = rdr.read_column<std::string>(loc.row_group, 0);
449 if (!eid_result) return eid_result.error();
450
451 auto ts_result = rdr.read_column<int64_t>(loc.row_group, 1);
452 if (!ts_result) return ts_result.error();
453
454 auto ver_result = rdr.read_column<int32_t>(loc.row_group, 2);
455 if (!ver_result) return ver_result.error();
456
457 // Decode all feature columns (col 3..N)
458 std::vector<std::vector<double>> feat_cols;
459 for (size_t ci = 3; ci < schema.num_columns(); ++ci) {
460 auto feat_result = rdr.read_column<double>(loc.row_group, ci);
461 if (!feat_result) return feat_result.error();
462 feat_cols.push_back(std::move(*feat_result));
463 }
464
465 const size_t row_count = eid_result->size();
466 if (ts_result->size() != row_count || ver_result->size() != row_count) {
467 return Error{
469 "FeatureReader: fixed column length mismatch for file index " +
470 std::to_string(loc.file_idx) + ", row group " +
471 std::to_string(loc.row_group)};
472 }
473 for (size_t ci = 0; ci < feat_cols.size(); ++ci) {
474 if (feat_cols[ci].size() != row_count) {
475 return Error{
477 "FeatureReader: feature column length mismatch for file index " +
478 std::to_string(loc.file_idx) + ", row group " +
479 std::to_string(loc.row_group) + ", column " +
480 std::to_string(ci + 3)};
481 }
482 }
483
484 // Store in cache
485 rg_cache_.file_idx = loc.file_idx;
486 rg_cache_.row_group = loc.row_group;
487 rg_cache_.eids = std::move(*eid_result);
488 rg_cache_.tss = std::move(*ts_result);
489 rg_cache_.vers = std::move(*ver_result);
490 rg_cache_.feat_cols = std::move(feat_cols);
491
492 return &rg_cache_;
493 }
494
496 [[nodiscard]] expected<FeatureVector> fetch_row(
497 const RowLocation& loc,
498 const std::vector<std::string>& project) const {
499
500 auto cache_result = ensure_cached(loc);
501 if (!cache_result) return cache_result.error();
502 const auto* cache = *cache_result;
503
504 if (loc.row_offset >= cache->tss.size())
505 return Error{ErrorCode::OUT_OF_RANGE,
506 "FeatureReader: row_offset " +
507 std::to_string(loc.row_offset) +
508 " out of range for row group with " +
509 std::to_string(cache->tss.size()) + " rows"};
510
511 FeatureVector fv;
512 if (loc.row_offset >= cache->eids.size() ||
513 loc.row_offset >= cache->vers.size()) {
514 return Error{ErrorCode::CORRUPT_DATA,
515 "FeatureReader: cached row layout mismatch for file index " +
516 std::to_string(loc.file_idx) + ", row group " +
517 std::to_string(loc.row_group)};
518 }
519 fv.entity_id = cache->eids[loc.row_offset];
520 fv.timestamp_ns = cache->tss[loc.row_offset];
521 fv.version = cache->vers[loc.row_offset];
522
523 // --- Determine which feature columns to extract ---
524 if (project.empty()) {
525 // All features — direct index into cached columns
526 fv.values.reserve(cache->feat_cols.size());
527 for (const auto& col : cache->feat_cols) {
528 if (loc.row_offset >= col.size()) {
529 return Error{ErrorCode::CORRUPT_DATA,
530 "FeatureReader: feature column shorter than cached row count"};
531 }
532 fv.values.push_back(col[loc.row_offset]);
533 }
534 } else {
535 // Projected subset — look up column indices by name
536 if (loc.file_idx < readers_.size() && readers_[loc.file_idx]) {
537 const auto& schema = readers_[loc.file_idx]->schema();
538 fv.values.reserve(project.size());
539 for (const auto& fname : project) {
540 auto idx = schema.find_column(fname);
541 if (!idx.has_value() || *idx < 3) {
542 return Error{ErrorCode::SCHEMA_MISMATCH,
543 "FeatureReader: requested feature column '" +
544 fname + "' is not present"};
545 }
546
547 size_t feat_idx = *idx - 3;
548 if (feat_idx >= cache->feat_cols.size()) {
549 return Error{ErrorCode::CORRUPT_DATA,
550 "FeatureReader: cached projected feature column index out of range"};
551 }
552
553 const auto& col = cache->feat_cols[feat_idx];
554 if (loc.row_offset >= col.size()) {
555 return Error{ErrorCode::CORRUPT_DATA,
556 "FeatureReader: projected feature column shorter than cached row count"};
557 }
558 fv.values.push_back(col[loc.row_offset]);
559 }
560 }
561 }
562
563 return fv;
564 }
565
566 // =========================================================================
567 // State
568 // =========================================================================
569
570 Options opts_;
571 EntityIndex index_;
572 std::vector<std::string> feature_names_;
573 size_t total_rows_{0};
574 size_t failed_file_count_{0};
575
576 // One entry per file in opts_.parquet_files (nullptr for unreadable files).
577 // Mutable because ParquetReader::read_column() updates an internal
578 // decompression cache even though it is logically a read operation.
579 mutable std::vector<std::unique_ptr<ParquetReader>> readers_;
580
581 // Single-entry row group cache — consecutive point queries hitting the
582 // same (file_idx, row_group) reuse decoded columns instead of re-decoding.
583 mutable std::mutex rg_cache_mutex_;
584 mutable RowGroupCache rg_cache_;
585};
586
587} // namespace signet::forge
Point-in-time correct ML feature store reader over Parquet files.
FeatureReader()=default
Default-construct an empty reader (use open() factory instead).
const std::vector< std::string > & feature_names() const
Return the ordered feature column names discovered from the first readable file.
FeatureReaderOptions Options
Alias for the options struct.
size_t total_rows() const
Total number of rows indexed across all files and row groups.
FeatureReader & operator=(const FeatureReader &)=delete
size_t num_entities() const
Number of distinct entities in the index.
FeatureReader & operator=(FeatureReader &&o) noexcept
FeatureReader(FeatureReader &&o) noexcept
expected< std::optional< FeatureVector > > get(const std::string &entity_id, const std::vector< std::string > &project={}) const
Retrieve the latest version of an entity regardless of timestamp.
expected< std::vector< FeatureVector > > history(const std::string &entity_id, int64_t start_ns, int64_t end_ns, const std::vector< std::string > &project={}) const
Retrieve all versions of an entity in the inclusive timestamp range.
FeatureReader(const FeatureReader &)=delete
static expected< FeatureReader > open(Options opts)
Open all Parquet files and build the in-memory entity/timestamp index.
size_t failed_file_count() const
L22: Number of files that failed to open during build_index().
expected< std::vector< FeatureVector > > as_of_batch(const std::vector< std::string > &entity_ids, int64_t timestamp_ns, const std::vector< std::string > &project={}) const
Batch as_of query: retrieve the latest version of multiple entities at one timestamp.
expected< std::optional< FeatureVector > > as_of(const std::string &entity_id, int64_t timestamp_ns, const std::vector< std::string > &project={}) const
Retrieve the latest version of an entity at or before the given timestamp.
size_t num_features() const
Number of feature columns in the schema.
static expected< ParquetReader > open(const std::filesystem::path &path)
Open and parse a Parquet file, returning a ready-to-query reader.
Definition reader.hpp:189
const Error & error() const
Access the error payload (valid for both success and failure; check ok() on the returned Error).
Definition error.hpp:261
A lightweight result type that holds either a success value of type T or an Error.
Definition error.hpp:145
Versioned ML feature store writer for appending typed feature vectors to Parquet.
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ OUT_OF_RANGE
An index, offset, or size value is outside the valid range.
@ SCHEMA_MISMATCH
The requested column name or type does not match the file schema.
@ CORRUPT_DATA
Decoded data is corrupt or inconsistent (e.g. out-of-range dictionary index).
Lightweight error value carrying an ErrorCode and a human-readable message.
Definition error.hpp:101
Configuration options for FeatureReader::open().
std::string feature_group
Expected group name (optional validation).
std::vector< std::string > parquet_files
Paths to feature group Parquet files.
Parquet format enumerations, type traits, and statistics structs.