87 "FeatureReader: parquet_files must not be empty"};
90 reader.opts_ = std::move(opts);
92 auto r = reader.build_index();
93 if (!r)
return r.
error();
105 : opts_(std::move(o.opts_)),
106 index_(std::move(o.index_)),
107 feature_names_(std::move(o.feature_names_)),
108 total_rows_(o.total_rows_),
109 failed_file_count_(o.failed_file_count_),
110 readers_(std::move(o.readers_)) {
112 std::lock_guard<std::mutex> lk(o.rg_cache_mutex_);
113 rg_cache_ = std::move(o.rg_cache_);
117 opts_ = std::move(o.opts_);
118 index_ = std::move(o.index_);
119 feature_names_ = std::move(o.feature_names_);
120 total_rows_ = o.total_rows_;
121 failed_file_count_= o.failed_file_count_;
122 readers_ = std::move(o.readers_);
123 std::scoped_lock lk(rg_cache_mutex_, o.rg_cache_mutex_);
124 rg_cache_ = std::move(o.rg_cache_);
149 const std::string& entity_id,
150 int64_t timestamp_ns,
151 const std::vector<std::string>& project = {})
const {
153 auto it = index_.find(entity_id);
154 if (it == index_.end())
155 return std::optional<FeatureVector>{std::nullopt};
157 const auto& locs = it->second;
160 auto pos = std::upper_bound(
161 locs.begin(), locs.end(), timestamp_ns,
162 [](int64_t ts,
const RowLocation& loc) {
163 return ts < loc.timestamp_ns;
166 if (pos == locs.begin())
167 return std::optional<FeatureVector>{std::nullopt};
171 auto fv = fetch_row(*pos, project);
172 if (!fv)
return fv.error();
173 return std::optional<FeatureVector>{std::move(*fv)};
188 const std::string& entity_id,
189 const std::vector<std::string>& project = {})
const {
190 return as_of(entity_id,
191 (std::numeric_limits<int64_t>::max)(),
208 const std::string& entity_id,
211 const std::vector<std::string>& project = {})
const {
213 auto it = index_.find(entity_id);
214 if (it == index_.end())
215 return std::vector<FeatureVector>{};
217 const auto& locs = it->second;
218 std::vector<FeatureVector> result;
220 for (
const auto& loc : locs) {
221 if (loc.timestamp_ns < start_ns)
continue;
222 if (loc.timestamp_ns > end_ns)
break;
224 auto fv = fetch_row(loc, project);
225 if (!fv)
return fv.error();
226 result.push_back(std::move(*fv));
245 const std::vector<std::string>& entity_ids,
246 int64_t timestamp_ns,
247 const std::vector<std::string>& project = {})
const {
249 std::vector<FeatureVector> result;
250 result.reserve(entity_ids.size());
252 for (
const auto& eid : entity_ids) {
253 auto r =
as_of(eid, timestamp_ns, project);
254 if (!r)
return r.error();
256 result.push_back(std::move(r->value()));
268 return feature_names_;
271 [[nodiscard]]
size_t num_features()
const {
return feature_names_.size(); }
275 [[nodiscard]]
size_t total_rows()
const {
return total_rows_; }
288 int64_t timestamp_ns = 0;
291 size_t row_group = 0;
292 size_t row_offset = 0;
296 using EntityIndex = std::unordered_map<std::string, std::vector<RowLocation>>;
299 [[nodiscard]] expected<void> build_index() {
301 readers_.reserve(num_files);
303 for (
size_t fi = 0; fi < num_files; ++fi) {
309 if (!std::filesystem::exists(path, ec)) {
310 readers_.push_back(
nullptr);
311 ++failed_file_count_;
317 readers_.push_back(
nullptr);
318 ++failed_file_count_;
322 auto& rdr = *rdr_result;
323 const auto& schema = rdr.schema();
328 if (feature_names_.empty() && schema.num_columns() > 3) {
329 for (
size_t ci = 3; ci < schema.num_columns(); ++ci)
330 feature_names_.push_back(schema.column(ci).name);
333 const size_t num_rg =
static_cast<size_t>(rdr.num_row_groups());
335 for (
size_t rg = 0; rg < num_rg; ++rg) {
338 auto eid_result = rdr.read_column<std::string>(rg, 0);
341 "FeatureReader: failed to read entity_id from '" + path +
342 "' row group " + std::to_string(rg) +
343 ": " + eid_result.error().message};
346 auto ts_result = rdr.read_column<int64_t>(rg, 1);
349 "FeatureReader: failed to read timestamp_ns from '" + path +
350 "' row group " + std::to_string(rg) +
351 ": " + ts_result.error().message};
354 auto ver_result = rdr.read_column<int32_t>(rg, 2);
357 "FeatureReader: failed to read version from '" + path +
358 "' row group " + std::to_string(rg) +
359 ": " + ver_result.error().message};
362 const auto& eids = *eid_result;
363 const auto& tss = *ts_result;
364 const auto& vers = *ver_result;
365 if (eids.size() != tss.size() || eids.size() != vers.size()) {
368 "FeatureReader: fixed column length mismatch in '" + path +
369 "' row group " + std::to_string(rg) +
" (entity_id=" +
370 std::to_string(eids.size()) +
", timestamp_ns=" +
371 std::to_string(tss.size()) +
", version=" +
372 std::to_string(vers.size()) +
")"};
374 const size_t nrows = eids.size();
376 for (
size_t row = 0; row < nrows; ++row) {
378 loc.timestamp_ns = tss[row];
379 loc.version = vers[row];
382 loc.row_offset = row;
383 index_[eids[row]].push_back(loc);
386 total_rows_ += nrows;
391 std::make_unique<ParquetReader>(std::move(*rdr_result)));
395 for (
auto& [eid, locs] : index_) {
396 std::sort(locs.begin(), locs.end(),
397 [](
const RowLocation& a,
const RowLocation& b) {
398 if (a.timestamp_ns != b.timestamp_ns)
399 return a.timestamp_ns < b.timestamp_ns;
400 if (a.version != b.version)
401 return a.version < b.version;
402 if (a.file_idx != b.file_idx)
403 return a.file_idx < b.file_idx;
404 if (a.row_group != b.row_group)
405 return a.row_group < b.row_group;
406 return a.row_offset < b.row_offset;
410 return expected<void>{};
418 struct RowGroupCache {
419 size_t file_idx = SIZE_MAX;
420 size_t row_group = SIZE_MAX;
422 std::vector<std::string> eids;
423 std::vector<int64_t> tss;
424 std::vector<int32_t> vers;
425 std::vector<std::vector<double>> feat_cols;
429 [[nodiscard]] expected<const RowGroupCache*> ensure_cached(
430 const RowLocation& loc)
const {
431 std::lock_guard<std::mutex> cache_lk(rg_cache_mutex_);
434 if (rg_cache_.file_idx == loc.file_idx &&
435 rg_cache_.row_group == loc.row_group) {
439 if (loc.file_idx >= readers_.size() || !readers_[loc.file_idx])
441 "FeatureReader: reader for file index " +
442 std::to_string(loc.file_idx) +
" is not available"};
444 auto& rdr = *readers_[loc.file_idx];
445 const auto& schema = rdr.schema();
448 auto eid_result = rdr.read_column<std::string>(loc.row_group, 0);
449 if (!eid_result)
return eid_result.error();
451 auto ts_result = rdr.read_column<int64_t>(loc.row_group, 1);
452 if (!ts_result)
return ts_result.error();
454 auto ver_result = rdr.read_column<int32_t>(loc.row_group, 2);
455 if (!ver_result)
return ver_result.error();
458 std::vector<std::vector<double>> feat_cols;
459 for (
size_t ci = 3; ci < schema.num_columns(); ++ci) {
460 auto feat_result = rdr.read_column<
double>(loc.row_group, ci);
461 if (!feat_result)
return feat_result.error();
462 feat_cols.push_back(std::move(*feat_result));
465 const size_t row_count = eid_result->size();
466 if (ts_result->size() != row_count || ver_result->size() != row_count) {
469 "FeatureReader: fixed column length mismatch for file index " +
470 std::to_string(loc.file_idx) +
", row group " +
471 std::to_string(loc.row_group)};
473 for (
size_t ci = 0; ci < feat_cols.size(); ++ci) {
474 if (feat_cols[ci].size() != row_count) {
477 "FeatureReader: feature column length mismatch for file index " +
478 std::to_string(loc.file_idx) +
", row group " +
479 std::to_string(loc.row_group) +
", column " +
480 std::to_string(ci + 3)};
485 rg_cache_.file_idx = loc.file_idx;
486 rg_cache_.row_group = loc.row_group;
487 rg_cache_.eids = std::move(*eid_result);
488 rg_cache_.tss = std::move(*ts_result);
489 rg_cache_.vers = std::move(*ver_result);
490 rg_cache_.feat_cols = std::move(feat_cols);
496 [[nodiscard]] expected<FeatureVector> fetch_row(
497 const RowLocation& loc,
498 const std::vector<std::string>& project)
const {
500 auto cache_result = ensure_cached(loc);
501 if (!cache_result)
return cache_result.error();
502 const auto* cache = *cache_result;
504 if (loc.row_offset >= cache->tss.size())
506 "FeatureReader: row_offset " +
507 std::to_string(loc.row_offset) +
508 " out of range for row group with " +
509 std::to_string(cache->tss.size()) +
" rows"};
512 if (loc.row_offset >= cache->eids.size() ||
513 loc.row_offset >= cache->vers.size()) {
515 "FeatureReader: cached row layout mismatch for file index " +
516 std::to_string(loc.file_idx) +
", row group " +
517 std::to_string(loc.row_group)};
519 fv.entity_id = cache->eids[loc.row_offset];
520 fv.timestamp_ns = cache->tss[loc.row_offset];
521 fv.version = cache->vers[loc.row_offset];
524 if (project.empty()) {
526 fv.values.reserve(cache->feat_cols.size());
527 for (
const auto& col : cache->feat_cols) {
528 if (loc.row_offset >= col.size()) {
530 "FeatureReader: feature column shorter than cached row count"};
532 fv.values.push_back(col[loc.row_offset]);
536 if (loc.file_idx < readers_.size() && readers_[loc.file_idx]) {
537 const auto& schema = readers_[loc.file_idx]->schema();
538 fv.values.reserve(project.size());
539 for (
const auto& fname : project) {
540 auto idx = schema.find_column(fname);
541 if (!idx.has_value() || *idx < 3) {
543 "FeatureReader: requested feature column '" +
544 fname +
"' is not present"};
547 size_t feat_idx = *idx - 3;
548 if (feat_idx >= cache->feat_cols.size()) {
550 "FeatureReader: cached projected feature column index out of range"};
553 const auto& col = cache->feat_cols[feat_idx];
554 if (loc.row_offset >= col.size()) {
556 "FeatureReader: projected feature column shorter than cached row count"};
558 fv.values.push_back(col[loc.row_offset]);
572 std::vector<std::string> feature_names_;
573 size_t total_rows_{0};
574 size_t failed_file_count_{0};
579 mutable std::vector<std::unique_ptr<ParquetReader>> readers_;
583 mutable std::mutex rg_cache_mutex_;
584 mutable RowGroupCache rg_cache_;