37#include <initializer_list>
93 size_t reserve_rows = 64) {
95 b.schema_ = std::move(
schema);
96 b.columns_.resize(b.schema_.size());
97 for (
auto& col : b.columns_)
98 col.reserve(reserve_rows);
101 using namespace std::chrono;
103 duration_cast<nanoseconds>(
104 system_clock::now().time_since_epoch()).count());
122 if (count != schema_.size())
124 "ColumnBatch::push_row: got " + std::to_string(count) +
125 " values, schema has " + std::to_string(schema_.size())};
126 for (
size_t i = 0; i < count; ++i)
127 columns_[i].push_back(values[i]);
136 std::vector<double> tmp(values);
137 return push_row(tmp.data(), tmp.size());
144 return push_row(values.data(), values.size());
152 [[nodiscard]]
size_t num_rows() const noexcept {
return num_rows_; }
154 [[nodiscard]]
size_t num_columns() const noexcept {
return schema_.size(); }
156 [[nodiscard]]
bool empty() const noexcept {
return num_rows_ == 0; }
159 [[nodiscard]]
const std::vector<ColumnDesc>&
schema() const noexcept {
167 if (col_idx >= columns_.size() || columns_[col_idx].empty())
170 columns_[col_idx].data(),
177 [[nodiscard]] std::span<const double>
column_span(
size_t col_idx)
const {
178 if (col_idx >= columns_.size())
180 return {columns_[col_idx].data(),
181 (std::min)(num_rows_, columns_[col_idx].size())};
201 if (num_rows_ == 0 || schema_.empty())
203 "ColumnBatch::as_tensor: batch is empty"};
206 for (
size_t i = 0; i < schema_.size(); ++i) {
210 "ColumnBatch::as_tensor: column '" +
211 schema_[i].name +
"' view is invalid"};
214 return builder.
build(output_dtype);
235 int64_t timestamp_ns = 0,
236 uint32_t type_id = 0x434F4C42u )
const {
240 if (num_rows_ >
static_cast<size_t>(UINT32_MAX)) {
241 throw std::overflow_error(
242 "ColumnBatch::to_stream_record: num_rows exceeds UINT32_MAX ("
243 + std::to_string(num_rows_) +
") — batch too large for WAL serialization");
245 const auto ncols =
static_cast<uint32_t
>(schema_.size());
246 const auto nrows =
static_cast<uint32_t
>(num_rows_);
249 size_t payload_bytes =
sizeof(uint32_t) * 2;
250 for (
const auto& desc : schema_) {
251 payload_bytes +=
sizeof(uint32_t) + desc.name.size();
255 const size_t ncols_sz =
static_cast<size_t>(ncols);
256 const size_t nrows_sz =
static_cast<size_t>(nrows);
257 if (ncols_sz > 0 && nrows_sz > SIZE_MAX / ncols_sz) {
258 throw std::overflow_error(
259 "ColumnBatch::to_stream_record: ncols*nrows overflows size_t");
261 const size_t cells = ncols_sz * nrows_sz;
262 if (cells > SIZE_MAX /
sizeof(
double)) {
263 throw std::overflow_error(
264 "ColumnBatch::to_stream_record: payload size overflows size_t");
266 payload_bytes +=
sizeof(double) * cells;
270 payload.resize(payload_bytes);
272 char* p = payload.data();
274 auto write_u32 = [&](uint32_t v) {
275 std::memcpy(p, &v,
sizeof(v)); p +=
sizeof(v);
277 auto write_f64 = [&](
double v) {
278 std::memcpy(p, &v,
sizeof(v)); p +=
sizeof(v);
284 for (
const auto& desc : schema_) {
285 write_u32(
static_cast<uint32_t
>(desc.name.size()));
286 std::memcpy(p, desc.name.data(), desc.name.size());
287 p += desc.name.size();
290 for (
size_t ci = 0; ci < schema_.size(); ++ci)
291 for (
size_t ri = 0; ri < num_rows_; ++ri)
292 write_f64(columns_[ci][ri]);
297 rec.
payload = std::move(payload);
315 const char* p = rec.
payload.data();
316 const char* end = p + rec.
payload.size();
318 auto read_u32 = [&](uint32_t& v) ->
bool {
319 if (p +
sizeof(v) > end)
return false;
320 std::memcpy(&v, p,
sizeof(v)); p +=
sizeof(v);
323 auto read_f64 = [&](
double& v) ->
bool {
324 if (p +
sizeof(v) > end)
return false;
325 std::memcpy(&v, p,
sizeof(v)); p +=
sizeof(v);
329 uint32_t ncols = 0, nrows = 0;
330 if (!read_u32(ncols) || !read_u32(nrows))
332 "ColumnBatch::from_stream_record: truncated header"};
335 static constexpr size_t MAX_BATCH_CELLS = 100'000'000;
336 if (
static_cast<size_t>(ncols) *
static_cast<size_t>(nrows) > MAX_BATCH_CELLS)
338 "ColumnBatch::from_stream_record: ncols*nrows exceeds safety limit"};
340 std::vector<ColumnDesc>
schema;
342 for (uint32_t ci = 0; ci < ncols; ++ci) {
343 uint32_t name_len = 0;
344 if (!read_u32(name_len))
346 "ColumnBatch::from_stream_record: truncated schema"};
347 if (p + name_len > end)
349 "ColumnBatch::from_stream_record: name overflow"};
351 desc.
name.assign(p, name_len);
353 schema.push_back(std::move(desc));
358 for (uint32_t ci = 0; ci < ncols; ++ci) {
359 b.columns_[ci].resize(nrows);
360 for (uint32_t ri = 0; ri < nrows; ++ri) {
361 if (!read_f64(b.columns_[ci][ri]))
363 "ColumnBatch::from_stream_record: truncated data"};
378 for (
auto& col : columns_) col.clear();
385 for (
auto& col : columns_) col.reserve(rows);
389 std::vector<ColumnDesc> schema_;
390 std::vector<std::vector<double>> columns_;
404 size_t reserve_rows = 64) {
405 return std::make_shared<ColumnBatch>(
Builds a single contiguous 2D tensor from multiple column tensors, suitable for passing to an ML infe...
BatchTensorBuilder & add_column(const std::string &name, const TensorView &tensor)
Add a column tensor as a feature source.
expected< OwnedTensor > build(TensorDataType output_dtype=TensorDataType::FLOAT32)
Build the final batch tensor.
A column-major batch of feature rows for ML inference and WAL serialization.
int64_t seq_first
First WAL sequence number in this batch.
expected< OwnedTensor > as_tensor(TensorDataType output_dtype=TensorDataType::FLOAT32) const
Assemble all columns into a single 2D [rows x cols] OwnedTensor.
void reserve(size_t rows)
Pre-allocate storage for the given number of rows in each column.
ColumnBatch & operator=(const ColumnBatch &)=default
Copy assignment.
expected< void > push_row(const double *values, size_t count)
Append one row of feature values.
std::span< const double > column_span(size_t col_idx) const
Span accessor for a single column — zero-copy, range-checked.
void clear()
Clear all row data while preserving the schema.
std::string symbol
Instrument symbol.
bool empty() const noexcept
True if the batch contains no rows.
expected< void > push_row(const std::vector< double > &values)
Append one row from a vector.
static ColumnBatch with_schema(std::vector< ColumnDesc > schema, size_t reserve_rows=64)
Create an empty ColumnBatch with the given schema.
expected< void > push_row(std::initializer_list< double > values)
Append one row from an initializer list (e.g.
ColumnBatch(const ColumnBatch &)=default
Copy constructor.
ColumnBatch & operator=(ColumnBatch &&)=default
Move assignment.
const std::vector< ColumnDesc > & schema() const noexcept
The schema (column descriptors) this batch was created with.
size_t num_columns() const noexcept
Number of columns defined by the schema.
int64_t created_ns
Batch creation timestamp (ns since epoch)
size_t num_rows() const noexcept
Number of rows currently in the batch.
std::string source_id
Exchange / feed identifier.
ColumnBatch()=default
Default constructor (empty batch, no schema).
static expected< ColumnBatch > from_stream_record(const StreamRecord &rec)
Deserialize a StreamRecord payload back into a ColumnBatch.
ColumnBatch(ColumnBatch &&)=default
Move constructor.
int64_t seq_last
Last WAL sequence number in this batch.
TensorView column_view(size_t col_idx) const
Zero-copy TensorView over a single column's contiguous double array.
StreamRecord to_stream_record(int64_t timestamp_ns=0, uint32_t type_id=0x434F4C42u) const
Serialize the batch into a WAL StreamRecord.
A lightweight, non-owning view into a contiguous block of typed memory, interpreted as a multi-dimens...
A lightweight result type that holds either a success value of type T or an Error.
std::shared_ptr< ColumnBatch > SharedColumnBatch
Thread-safe shared pointer to a ColumnBatch – the unit transferred between producer and consumer thre...
SharedColumnBatch make_column_batch(std::vector< ColumnDesc > schema, size_t reserve_rows=64)
Convenience factory: create a shared batch with a given schema.
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ SCHEMA_MISMATCH
The requested column name or type does not match the file schema.
@ INTERNAL_ERROR
An unexpected internal error that does not fit any other category.
TensorDataType
Element data type for tensor storage, mapping to ONNX/PyTorch/TF type enums.
@ FLOAT64
IEEE 754 double-precision (8 bytes)
@ FLOAT32
IEEE 754 single-precision (4 bytes)
Lock-free SPSC/MPSC ring buffers, StreamingSink for background Parquet compaction,...
Describes a single column in a ColumnBatch schema.
std::string name
Column name (e.g. "price", "volume")
TensorDataType dtype
Physical storage type (always stored as double internally)
Lightweight error value carrying an ErrorCode and a human-readable message.
A single record flowing through the StreamingSink pipeline.
int64_t timestamp_ns
Wall-clock timestamp in nanoseconds since Unix epoch.
uint32_t type_id
User-defined record type tag (0 = untyped)
std::string payload
Serialized record bytes (UTF-8 safe or binary via base64)
Describes the shape of a tensor as a vector of dimension sizes.
Zero-copy tensor bridge: maps Parquet column data directly into ML-framework-compatible tensor views ...