Signet Forge 0.1.0
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
column_batch.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
3// column_batch.hpp — Arena-backed columnar event batch for SignetStack Signet Forge
4//
5// ColumnBatch stores N rows × M feature columns in column-major layout so
6// that each column is a contiguous double[] suitable for zero-copy wrapping
7// as a TensorView / OrtValue without transposition.
8//
9// Usage pattern (producer):
10// auto batch = ColumnBatch::with_schema({{"price", TDT::FLOAT64},
11// {"qty", TDT::FLOAT64}}, 512);
12// batch.push_row({mid, qty});
13// ...
14// bus.publish(std::make_shared<ColumnBatch>(std::move(batch)));
15//
16// Usage pattern (consumer / ML inference):
17// auto tv = batch->column_view(0); // zero-copy TensorView
18// auto ot = batch->as_tensor(TDT::FLOAT32); // 2D OwnedTensor [rows, cols]
19// auto rec = batch->to_stream_record(ts_ns); // serialise → WAL
20//
21// Phase 9b: MPMC ColumnBatch Event Bus.
22
26
27#pragma once
28
29#include "signet/error.hpp"
31#include "signet/ai/streaming_sink.hpp" // StreamRecord
32
33#include <chrono>
34#include <cstddef>
35#include <cstdint>
36#include <cstring>
37#include <initializer_list>
38#include <memory>
39#include <span>
40#include <string>
41#include <vector>
42
43namespace signet::forge {
44
47
48// ============================================================================
49// ColumnDesc — schema descriptor for one column in a ColumnBatch
50// ============================================================================
51
57
58// ============================================================================
59// ColumnBatch — columnar, reference-counted event batch
60//
61// Data layout: columns_[col_idx][row_idx] — column-major for zero-copy tensor
62// wrapping. Each column is a contiguous std::vector<double>.
63// ============================================================================
64
74public:
75 // -------------------------------------------------------------------------
76 // Producer-side metadata (set before publishing)
77 // -------------------------------------------------------------------------
78
79 std::string source_id;
80 std::string symbol;
81 int64_t seq_first = 0;
82 int64_t seq_last = 0;
83 int64_t created_ns = 0;
84
85 // -------------------------------------------------------------------------
86 // Factory
87 // -------------------------------------------------------------------------
88
92 static ColumnBatch with_schema(std::vector<ColumnDesc> schema,
93 size_t reserve_rows = 64) {
95 b.schema_ = std::move(schema);
96 b.columns_.resize(b.schema_.size());
97 for (auto& col : b.columns_)
98 col.reserve(reserve_rows);
99 b.num_rows_ = 0;
100
101 using namespace std::chrono;
102 b.created_ns = static_cast<int64_t>(
103 duration_cast<nanoseconds>(
104 system_clock::now().time_since_epoch()).count());
105 return b;
106 }
107
109 ColumnBatch() = default;
112 ColumnBatch(const ColumnBatch&) = default;
113 ColumnBatch& operator=(const ColumnBatch&) = default;
114
115 // -------------------------------------------------------------------------
116 // Build API — called from producer thread
117 // -------------------------------------------------------------------------
118
121 [[nodiscard]] expected<void> push_row(const double* values, size_t count) {
122 if (count != schema_.size())
124 "ColumnBatch::push_row: got " + std::to_string(count) +
125 " values, schema has " + std::to_string(schema_.size())};
126 for (size_t i = 0; i < count; ++i)
127 columns_[i].push_back(values[i]);
128 ++num_rows_;
129 return expected<void>{};
130 }
131
135 [[nodiscard]] expected<void> push_row(std::initializer_list<double> values) {
136 std::vector<double> tmp(values);
137 return push_row(tmp.data(), tmp.size());
138 }
139
143 [[nodiscard]] expected<void> push_row(const std::vector<double>& values) {
144 return push_row(values.data(), values.size());
145 }
146
147 // -------------------------------------------------------------------------
148 // Query API — called from consumer / ML thread
149 // -------------------------------------------------------------------------
150
152 [[nodiscard]] size_t num_rows() const noexcept { return num_rows_; }
154 [[nodiscard]] size_t num_columns() const noexcept { return schema_.size(); }
156 [[nodiscard]] bool empty() const noexcept { return num_rows_ == 0; }
157
159 [[nodiscard]] const std::vector<ColumnDesc>& schema() const noexcept {
160 return schema_;
161 }
162
166 [[nodiscard]] TensorView column_view(size_t col_idx) const {
167 if (col_idx >= columns_.size() || columns_[col_idx].empty())
168 return TensorView{}; // invalid view
169 return TensorView{
170 columns_[col_idx].data(),
171 TensorShape{static_cast<int64_t>(num_rows_)},
173 };
174 }
175
177 [[nodiscard]] std::span<const double> column_span(size_t col_idx) const {
178 if (col_idx >= columns_.size())
179 return {};
180 return {columns_[col_idx].data(),
181 (std::min)(num_rows_, columns_[col_idx].size())};
182 }
183
184 // -------------------------------------------------------------------------
185 // as_tensor — assemble all columns into a 2D [rows × cols] OwnedTensor
186 //
187 // Uses BatchTensorBuilder to interleave columns into a single contiguous
188 // buffer. output_dtype defaults to FLOAT32 for ONNX compatibility.
189 // -------------------------------------------------------------------------
190
199 TensorDataType output_dtype = TensorDataType::FLOAT32) const {
200
201 if (num_rows_ == 0 || schema_.empty())
203 "ColumnBatch::as_tensor: batch is empty"};
204
205 BatchTensorBuilder builder;
206 for (size_t i = 0; i < schema_.size(); ++i) {
207 auto tv = column_view(i);
208 if (!tv.is_valid())
210 "ColumnBatch::as_tensor: column '" +
211 schema_[i].name + "' view is invalid"};
212 builder.add_column(schema_[i].name, tv);
213 }
214 return builder.build(output_dtype);
215 }
216
217 // -------------------------------------------------------------------------
218 // to_stream_record — serialise batch into a WAL StreamRecord
219 //
220 // Binary wire format (little-endian):
221 // [uint32 num_columns][uint32 num_rows]
222 // [uint64 column_name_len][column_name_bytes ...] × num_columns
223 // [float64 values × num_rows] × num_columns (column-major)
224 // -------------------------------------------------------------------------
225
235 int64_t timestamp_ns = 0,
236 uint32_t type_id = 0x434F4C42u /*"COLB"*/) const {
237
238 // CWE-190: Integer Overflow or Wraparound — check row count fits in
239 // uint32_t before narrowing cast into the serialization header.
240 if (num_rows_ > static_cast<size_t>(UINT32_MAX)) {
241 throw std::overflow_error(
242 "ColumnBatch::to_stream_record: num_rows exceeds UINT32_MAX ("
243 + std::to_string(num_rows_) + ") — batch too large for WAL serialization");
244 }
245 const auto ncols = static_cast<uint32_t>(schema_.size());
246 const auto nrows = static_cast<uint32_t>(num_rows_);
247
248 // Compute total payload size
249 size_t payload_bytes = sizeof(uint32_t) * 2; // ncols + nrows
250 for (const auto& desc : schema_) {
251 payload_bytes += sizeof(uint32_t) + desc.name.size();
252 }
253 // CWE-190: overflow check for sizeof(double) * ncols * nrows
254 {
255 const size_t ncols_sz = static_cast<size_t>(ncols);
256 const size_t nrows_sz = static_cast<size_t>(nrows);
257 if (ncols_sz > 0 && nrows_sz > SIZE_MAX / ncols_sz) {
258 throw std::overflow_error(
259 "ColumnBatch::to_stream_record: ncols*nrows overflows size_t");
260 }
261 const size_t cells = ncols_sz * nrows_sz;
262 if (cells > SIZE_MAX / sizeof(double)) {
263 throw std::overflow_error(
264 "ColumnBatch::to_stream_record: payload size overflows size_t");
265 }
266 payload_bytes += sizeof(double) * cells;
267 }
268
269 std::string payload;
270 payload.resize(payload_bytes);
271
272 char* p = payload.data();
273
274 auto write_u32 = [&](uint32_t v) {
275 std::memcpy(p, &v, sizeof(v)); p += sizeof(v);
276 };
277 auto write_f64 = [&](double v) {
278 std::memcpy(p, &v, sizeof(v)); p += sizeof(v);
279 };
280
281 write_u32(ncols);
282 write_u32(nrows);
283
284 for (const auto& desc : schema_) {
285 write_u32(static_cast<uint32_t>(desc.name.size()));
286 std::memcpy(p, desc.name.data(), desc.name.size());
287 p += desc.name.size();
288 }
289
290 for (size_t ci = 0; ci < schema_.size(); ++ci)
291 for (size_t ri = 0; ri < num_rows_; ++ri)
292 write_f64(columns_[ci][ri]);
293
294 StreamRecord rec;
295 rec.timestamp_ns = (timestamp_ns != 0) ? timestamp_ns : created_ns;
296 rec.type_id = type_id;
297 rec.payload = std::move(payload);
298 return rec;
299 }
300
301 // -------------------------------------------------------------------------
302 // Deserialise a StreamRecord payload back into a ColumnBatch
303 // -------------------------------------------------------------------------
304
313 const StreamRecord& rec) {
314
315 const char* p = rec.payload.data();
316 const char* end = p + rec.payload.size();
317
318 auto read_u32 = [&](uint32_t& v) -> bool {
319 if (p + sizeof(v) > end) return false;
320 std::memcpy(&v, p, sizeof(v)); p += sizeof(v);
321 return true;
322 };
323 auto read_f64 = [&](double& v) -> bool {
324 if (p + sizeof(v) > end) return false;
325 std::memcpy(&v, p, sizeof(v)); p += sizeof(v);
326 return true;
327 };
328
329 uint32_t ncols = 0, nrows = 0;
330 if (!read_u32(ncols) || !read_u32(nrows))
332 "ColumnBatch::from_stream_record: truncated header"};
333
334 // OOM guard: cap total cells to prevent crafted payloads from exhausting memory
335 static constexpr size_t MAX_BATCH_CELLS = 100'000'000; // 100M cells (~800 MB)
336 if (static_cast<size_t>(ncols) * static_cast<size_t>(nrows) > MAX_BATCH_CELLS)
338 "ColumnBatch::from_stream_record: ncols*nrows exceeds safety limit"};
339
340 std::vector<ColumnDesc> schema;
341 schema.reserve(ncols);
342 for (uint32_t ci = 0; ci < ncols; ++ci) {
343 uint32_t name_len = 0;
344 if (!read_u32(name_len))
346 "ColumnBatch::from_stream_record: truncated schema"};
347 if (p + name_len > end)
349 "ColumnBatch::from_stream_record: name overflow"};
350 ColumnDesc desc;
351 desc.name.assign(p, name_len);
352 p += name_len;
353 schema.push_back(std::move(desc));
354 }
355
356 ColumnBatch b = ColumnBatch::with_schema(std::move(schema), nrows);
357
358 for (uint32_t ci = 0; ci < ncols; ++ci) {
359 b.columns_[ci].resize(nrows);
360 for (uint32_t ri = 0; ri < nrows; ++ri) {
361 if (!read_f64(b.columns_[ci][ri]))
363 "ColumnBatch::from_stream_record: truncated data"};
364 }
365 }
366 b.num_rows_ = nrows;
367 b.created_ns = rec.timestamp_ns;
368
369 return b;
370 }
371
372 // -------------------------------------------------------------------------
373 // Utility
374 // -------------------------------------------------------------------------
375
377 void clear() {
378 for (auto& col : columns_) col.clear();
379 num_rows_ = 0;
380 }
381
384 void reserve(size_t rows) {
385 for (auto& col : columns_) col.reserve(rows);
386 }
387
388private:
389 std::vector<ColumnDesc> schema_;
390 std::vector<std::vector<double>> columns_;
391 size_t num_rows_{0};
392};
393
394// ---------------------------------------------------------------------------
395// SharedColumnBatch — the unit transferred between threads
396// ---------------------------------------------------------------------------
397
400using SharedColumnBatch = std::shared_ptr<ColumnBatch>;
401
403inline SharedColumnBatch make_column_batch(std::vector<ColumnDesc> schema,
404 size_t reserve_rows = 64) {
405 return std::make_shared<ColumnBatch>(
406 ColumnBatch::with_schema(std::move(schema), reserve_rows));
407}
408
409} // namespace signet::forge
Builds a single contiguous 2D tensor from multiple column tensors, suitable for passing to an ML infe...
BatchTensorBuilder & add_column(const std::string &name, const TensorView &tensor)
Add a column tensor as a feature source.
expected< OwnedTensor > build(TensorDataType output_dtype=TensorDataType::FLOAT32)
Build the final batch tensor.
A column-major batch of feature rows for ML inference and WAL serialization.
int64_t seq_first
First WAL sequence number in this batch.
expected< OwnedTensor > as_tensor(TensorDataType output_dtype=TensorDataType::FLOAT32) const
Assemble all columns into a single 2D [rows x cols] OwnedTensor.
void reserve(size_t rows)
Pre-allocate storage for the given number of rows in each column.
ColumnBatch & operator=(const ColumnBatch &)=default
Copy assignment.
expected< void > push_row(const double *values, size_t count)
Append one row of feature values.
std::span< const double > column_span(size_t col_idx) const
Span accessor for a single column — zero-copy, range-checked.
void clear()
Clear all row data while preserving the schema.
std::string symbol
Instrument symbol.
bool empty() const noexcept
True if the batch contains no rows.
expected< void > push_row(const std::vector< double > &values)
Append one row from a vector.
static ColumnBatch with_schema(std::vector< ColumnDesc > schema, size_t reserve_rows=64)
Create an empty ColumnBatch with the given schema.
expected< void > push_row(std::initializer_list< double > values)
Append one row from an initializer list (e.g.
ColumnBatch(const ColumnBatch &)=default
Copy constructor.
ColumnBatch & operator=(ColumnBatch &&)=default
Move assignment.
const std::vector< ColumnDesc > & schema() const noexcept
The schema (column descriptors) this batch was created with.
size_t num_columns() const noexcept
Number of columns defined by the schema.
int64_t created_ns
Batch creation timestamp (ns since epoch)
size_t num_rows() const noexcept
Number of rows currently in the batch.
std::string source_id
Exchange / feed identifier.
ColumnBatch()=default
Default constructor (empty batch, no schema).
static expected< ColumnBatch > from_stream_record(const StreamRecord &rec)
Deserialize a StreamRecord payload back into a ColumnBatch.
ColumnBatch(ColumnBatch &&)=default
Move constructor.
int64_t seq_last
Last WAL sequence number in this batch.
TensorView column_view(size_t col_idx) const
Zero-copy TensorView over a single column's contiguous double array.
StreamRecord to_stream_record(int64_t timestamp_ns=0, uint32_t type_id=0x434F4C42u) const
Serialize the batch into a WAL StreamRecord.
A lightweight, non-owning view into a contiguous block of typed memory, interpreted as a multi-dimens...
A lightweight result type that holds either a success value of type T or an Error.
Definition error.hpp:145
std::shared_ptr< ColumnBatch > SharedColumnBatch
Thread-safe shared pointer to a ColumnBatch – the unit transferred between producer and consumer thre...
SharedColumnBatch make_column_batch(std::vector< ColumnDesc > schema, size_t reserve_rows=64)
Convenience factory: create a shared batch with a given schema.
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ SCHEMA_MISMATCH
The requested column name or type does not match the file schema.
@ INTERNAL_ERROR
An unexpected internal error that does not fit any other category.
TensorDataType
Element data type for tensor storage, mapping to ONNX/PyTorch/TF type enums.
@ FLOAT64
IEEE 754 double-precision (8 bytes)
@ FLOAT32
IEEE 754 single-precision (4 bytes)
Lock-free SPSC/MPSC ring buffers, StreamingSink for background Parquet compaction,...
Describes a single column in a ColumnBatch schema.
std::string name
Column name (e.g. "price", "volume")
TensorDataType dtype
Physical storage type (always stored as double internally)
Lightweight error value carrying an ErrorCode and a human-readable message.
Definition error.hpp:101
A single record flowing through the StreamingSink pipeline.
int64_t timestamp_ns
Wall-clock timestamp in nanoseconds since Unix epoch.
uint32_t type_id
User-defined record type tag (0 = untyped)
std::string payload
Serialized record bytes (UTF-8 safe or binary via base64)
Describes the shape of a tensor as a vector of dimension sizes.
Zero-copy tensor bridge: maps Parquet column data directly into ML-framework-compatible tensor views ...