Signet Forge 0.1.0
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
column_index.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
3#pragma once
4
19
21#include "signet/statistics.hpp" // from_le_bytes<T>
22
23#include <cstdint>
24#include <cstring>
25#include <string>
26#include <vector>
27
28namespace signet::forge {
29
36 int64_t offset = 0;
38 int64_t first_row_index = 0;
39
43 enc.begin_struct();
44
45 // field 1: offset (i64)
47 enc.write_i64(offset);
48
49 // field 2: compressed_page_size (i32)
52
53 // field 3: first_row_index (i64)
56
57 enc.write_stop();
58 enc.end_struct();
59 }
60
64 dec.begin_struct();
65 for (;;) {
66 auto [fid, ftype] = dec.read_field_header();
67 if (ftype == thrift::compact_type::STOP) break;
68 switch (fid) {
69 case 1: offset = dec.read_i64(); break;
70 case 2: compressed_page_size = dec.read_i32(); break;
71 case 3: first_row_index = dec.read_i64(); break;
72 default: dec.skip_field(ftype); break;
73 }
74 }
75 dec.end_struct();
76 }
77};
78
87 bool valid_ = true;
88 std::vector<PageLocation> page_locations;
89
91 [[nodiscard]] bool valid() const { return valid_; }
92
96 enc.begin_struct();
97
98 // field 1: page_locations (list<struct>)
101 static_cast<int32_t>(page_locations.size()));
102 for (const auto& loc : page_locations) {
103 loc.serialize(enc);
104 }
105
106 enc.write_stop();
107 enc.end_struct();
108 }
109
113 dec.begin_struct();
114 for (;;) {
115 auto [fid, ftype] = dec.read_field_header();
116 if (ftype == thrift::compact_type::STOP) break;
117 switch (fid) {
118 case 1: {
119 auto [elem_type, count] = dec.read_list_header();
120 // CWE-400: Uncontrolled Resource Consumption — 10M cap on list counts
121 if (count < 0 || static_cast<size_t>(count) > 10'000'000) {
122 valid_ = false;
123 return; // list count exceeds 10M cap or is negative
124 }
125 page_locations.resize(static_cast<size_t>(count));
126 for (int32_t i = 0; i < count; ++i) {
127 page_locations[static_cast<size_t>(i)].deserialize(dec);
128 }
129 break;
130 }
131 default: dec.skip_field(ftype); break;
132 }
133 }
134 dec.end_struct();
135 }
136};
137
148 bool valid_ = true;
149 std::vector<bool> null_pages;
150 std::vector<std::string> min_values;
151 std::vector<std::string> max_values;
152
154 enum class BoundaryOrder : int32_t {
155 UNORDERED = 0,
156 ASCENDING = 1,
157 DESCENDING = 2
158 };
160
161 std::vector<int64_t> null_counts;
162
164 [[nodiscard]] bool valid() const { return valid_; }
165
169 enc.begin_struct();
170
171 // field 1: null_pages (list<bool>)
174 static_cast<int32_t>(null_pages.size()));
175 for (bool np : null_pages) {
176 enc.write_bool(np);
177 }
178
179 // field 2: min_values (list<binary>)
182 static_cast<int32_t>(min_values.size()));
183 for (const auto& mv : min_values) {
184 enc.write_string(mv);
185 }
186
187 // field 3: max_values (list<binary>)
190 static_cast<int32_t>(max_values.size()));
191 for (const auto& mv : max_values) {
192 enc.write_string(mv);
193 }
194
195 // field 4: boundary_order (i32)
197 enc.write_i32(static_cast<int32_t>(boundary_order));
198
199 // field 5: null_counts (list<i64>, optional -- written only if non-empty)
200 if (!null_counts.empty()) {
203 static_cast<int32_t>(null_counts.size()));
204 for (int64_t nc : null_counts) {
205 enc.write_i64(nc);
206 }
207 }
208
209 enc.write_stop();
210 enc.end_struct();
211 }
212
216 dec.begin_struct();
217 for (;;) {
218 auto [fid, ftype] = dec.read_field_header();
219 if (ftype == thrift::compact_type::STOP) break;
220 switch (fid) {
221 case 1: {
222 // null_pages: list<bool>
223 auto [elem_type, count] = dec.read_list_header();
224 // CWE-400: Uncontrolled Resource Consumption — 10M cap on list counts
225 if (count < 0 || static_cast<size_t>(count) > 10'000'000) {
226 valid_ = false; return; // list count exceeds 10M cap or is negative
227 }
228 null_pages.resize(static_cast<size_t>(count));
229 for (int32_t i = 0; i < count; ++i) {
230 null_pages[static_cast<size_t>(i)] = dec.read_bool();
231 }
232 break;
233 }
234 case 2: {
235 // min_values: list<binary>
236 auto [elem_type, count] = dec.read_list_header();
237 // CWE-400: Uncontrolled Resource Consumption — 10M cap on list counts
238 if (count < 0 || static_cast<size_t>(count) > 10'000'000) {
239 valid_ = false; return; // list count exceeds 10M cap or is negative
240 }
241 min_values.resize(static_cast<size_t>(count));
242 for (int32_t i = 0; i < count; ++i) {
243 min_values[static_cast<size_t>(i)] = dec.read_string();
244 }
245 break;
246 }
247 case 3: {
248 // max_values: list<binary>
249 auto [elem_type, count] = dec.read_list_header();
250 // CWE-400: Uncontrolled Resource Consumption — 10M cap on list counts
251 if (count < 0 || static_cast<size_t>(count) > 10'000'000) {
252 valid_ = false; return; // list count exceeds 10M cap or is negative
253 }
254 max_values.resize(static_cast<size_t>(count));
255 for (int32_t i = 0; i < count; ++i) {
256 max_values[static_cast<size_t>(i)] = dec.read_string();
257 }
258 break;
259 }
260 case 4:
261 boundary_order = static_cast<BoundaryOrder>(dec.read_i32());
262 break;
263 case 5: {
264 // null_counts: list<i64>
265 auto [elem_type, count] = dec.read_list_header();
266 // CWE-400: Uncontrolled Resource Consumption — 10M cap on list counts
267 if (count < 0 || static_cast<size_t>(count) > 10'000'000) {
268 valid_ = false; return; // list count exceeds 10M cap or is negative
269 }
270 null_counts.resize(static_cast<size_t>(count));
271 for (int32_t i = 0; i < count; ++i) {
272 null_counts[static_cast<size_t>(i)] = dec.read_i64();
273 }
274 break;
275 }
276 default: dec.skip_field(ftype); break;
277 }
278 }
279 dec.end_struct();
280 }
281
294 [[nodiscard]] std::vector<size_t> filter_pages(
295 const std::string& min_val,
296 const std::string& max_val,
297 PhysicalType physical_type = PhysicalType::BYTE_ARRAY) const {
298
299 // Typed comparison helper: returns negative/zero/positive like strcmp.
300 auto typed_compare = [&](const std::string& a, const std::string& b) -> int {
301 switch (physical_type) {
302 case PhysicalType::INT32: {
303 if (a.size() >= sizeof(int32_t) && b.size() >= sizeof(int32_t)) {
304 auto va = from_le_bytes<int32_t>({reinterpret_cast<const uint8_t*>(a.data()),
305 reinterpret_cast<const uint8_t*>(a.data()) + a.size()});
306 auto vb = from_le_bytes<int32_t>({reinterpret_cast<const uint8_t*>(b.data()),
307 reinterpret_cast<const uint8_t*>(b.data()) + b.size()});
308 return (va < vb) ? -1 : (va > vb) ? 1 : 0;
309 }
310 break;
311 }
312 case PhysicalType::INT64: {
313 if (a.size() >= sizeof(int64_t) && b.size() >= sizeof(int64_t)) {
314 auto va = from_le_bytes<int64_t>({reinterpret_cast<const uint8_t*>(a.data()),
315 reinterpret_cast<const uint8_t*>(a.data()) + a.size()});
316 auto vb = from_le_bytes<int64_t>({reinterpret_cast<const uint8_t*>(b.data()),
317 reinterpret_cast<const uint8_t*>(b.data()) + b.size()});
318 return (va < vb) ? -1 : (va > vb) ? 1 : 0;
319 }
320 break;
321 }
322 case PhysicalType::FLOAT: {
323 if (a.size() >= sizeof(float) && b.size() >= sizeof(float)) {
324 auto va = from_le_bytes<float>({reinterpret_cast<const uint8_t*>(a.data()),
325 reinterpret_cast<const uint8_t*>(a.data()) + a.size()});
326 auto vb = from_le_bytes<float>({reinterpret_cast<const uint8_t*>(b.data()),
327 reinterpret_cast<const uint8_t*>(b.data()) + b.size()});
328 return (va < vb) ? -1 : (va > vb) ? 1 : 0;
329 }
330 break;
331 }
333 if (a.size() >= sizeof(double) && b.size() >= sizeof(double)) {
334 auto va = from_le_bytes<double>({reinterpret_cast<const uint8_t*>(a.data()),
335 reinterpret_cast<const uint8_t*>(a.data()) + a.size()});
336 auto vb = from_le_bytes<double>({reinterpret_cast<const uint8_t*>(b.data()),
337 reinterpret_cast<const uint8_t*>(b.data()) + b.size()});
338 return (va < vb) ? -1 : (va > vb) ? 1 : 0;
339 }
340 break;
341 }
342 default:
343 break;
344 }
345 // Fallback: lexicographic comparison
346 return (a < b) ? -1 : (a > b) ? 1 : 0;
347 };
348
349 std::vector<size_t> matching;
350 size_t num_pages = min_values.size();
351
352 for (size_t i = 0; i < num_pages; ++i) {
353 // Skip all-null pages -- they cannot contain any matching data
354 if (i < null_pages.size() && null_pages[i]) {
355 continue;
356 }
357
358 // Skip if page max < query min (entire page below the range)
359 if (i < max_values.size() && typed_compare(max_values[i], min_val) < 0) {
360 continue;
361 }
362
363 // Skip if page min > query max (entire page above the range)
364 if (i < min_values.size() && typed_compare(min_values[i], max_val) > 0) {
365 continue;
366 }
367
368 matching.push_back(i);
369 }
370
371 return matching;
372 }
373};
374
396public:
398 void start_page() {
399 pages_.emplace_back();
400 }
401
404 void set_min(const std::string& min_val) {
405 if (!pages_.empty()) {
406 pages_.back().min_value = min_val;
407 }
408 }
409
412 void set_max(const std::string& max_val) {
413 if (!pages_.empty()) {
414 pages_.back().max_value = max_val;
415 }
416 }
417
420 void set_null_page(bool is_null) {
421 if (!pages_.empty()) {
422 pages_.back().null_page = is_null;
423 }
424 }
425
428 void set_null_count(int64_t count) {
429 if (!pages_.empty()) {
430 pages_.back().null_count = count;
431 }
432 }
433
436 void set_first_row_index(int64_t row_index) {
437 if (!pages_.empty()) {
438 pages_.back().first_row_index = row_index;
439 }
440 }
441
445 void set_page_location(int64_t offset, int32_t compressed_size) {
446 if (!pages_.empty()) {
447 pages_.back().offset = offset;
448 pages_.back().compressed_size = compressed_size;
449 }
450 }
451
461 ColumnIndex ci;
462 ci.null_pages.reserve(pages_.size());
463 ci.min_values.reserve(pages_.size());
464 ci.max_values.reserve(pages_.size());
465 ci.null_counts.reserve(pages_.size());
466
467 bool has_any_null_counts = false;
468 for (const auto& p : pages_) {
469 ci.null_pages.push_back(p.null_page);
470 ci.min_values.push_back(p.min_value);
471 ci.max_values.push_back(p.max_value);
472 ci.null_counts.push_back(p.null_count);
473 if (p.null_count > 0) {
474 has_any_null_counts = true;
475 }
476 }
477
478 // Determine boundary order by scanning min_values (type-aware)
479 ci.boundary_order = detect_boundary_order(ci.min_values, pt);
480
481 // Only include null_counts if at least one page has nulls,
482 // or if the caller explicitly set null counts. The Parquet spec
483 // makes null_counts optional, so we always include them for
484 // completeness (readers expect them for null-page detection).
485 (void)has_any_null_counts;
486
487 return ci;
488 }
489
492 [[nodiscard]] OffsetIndex build_offset_index() const {
493 OffsetIndex oi;
494 oi.page_locations.reserve(pages_.size());
495
496 for (const auto& p : pages_) {
497 PageLocation loc;
498 loc.offset = p.offset;
499 loc.compressed_page_size = p.compressed_size;
500 loc.first_row_index = p.first_row_index;
501 oi.page_locations.push_back(loc);
502 }
503
504 return oi;
505 }
506
508 void reset() {
509 pages_.clear();
510 }
511
513 [[nodiscard]] size_t num_pages() const { return pages_.size(); }
514
515private:
517 struct PageInfo {
518 std::string min_value;
519 std::string max_value;
520 bool null_page = false;
521 int64_t null_count = 0;
522 int64_t first_row_index = 0;
523 int64_t offset = 0;
524 int32_t compressed_size = 0;
525 };
526
527 std::vector<PageInfo> pages_;
528
534 [[nodiscard]] static ColumnIndex::BoundaryOrder detect_boundary_order(
535 const std::vector<std::string>& values,
537
538 if (values.size() <= 1) {
540 }
541
542 // Type-aware comparator: returns <0, 0, >0 like strcmp
543 auto typed_cmp = [pt](const std::string& a, const std::string& b) -> int {
544 switch (pt) {
546 if (a.size() >= sizeof(int32_t) && b.size() >= sizeof(int32_t)) {
547 int32_t va, vb;
548 std::memcpy(&va, a.data(), sizeof(int32_t));
549 std::memcpy(&vb, b.data(), sizeof(int32_t));
550 return (va < vb) ? -1 : (va > vb) ? 1 : 0;
551 }
552 break;
554 if (a.size() >= sizeof(int64_t) && b.size() >= sizeof(int64_t)) {
555 int64_t va, vb;
556 std::memcpy(&va, a.data(), sizeof(int64_t));
557 std::memcpy(&vb, b.data(), sizeof(int64_t));
558 return (va < vb) ? -1 : (va > vb) ? 1 : 0;
559 }
560 break;
562 if (a.size() >= sizeof(float) && b.size() >= sizeof(float)) {
563 float va, vb;
564 std::memcpy(&va, a.data(), sizeof(float));
565 std::memcpy(&vb, b.data(), sizeof(float));
566 return (va < vb) ? -1 : (va > vb) ? 1 : 0;
567 }
568 break;
570 if (a.size() >= sizeof(double) && b.size() >= sizeof(double)) {
571 double va, vb;
572 std::memcpy(&va, a.data(), sizeof(double));
573 std::memcpy(&vb, b.data(), sizeof(double));
574 return (va < vb) ? -1 : (va > vb) ? 1 : 0;
575 }
576 break;
577 default:
578 break;
579 }
580 // Fallback: lexicographic for BYTE_ARRAY, BOOLEAN, etc.
581 return a.compare(b);
582 };
583
584 bool ascending = true;
585 bool descending = true;
586
587 for (size_t i = 1; i < values.size(); ++i) {
588 int cmp = typed_cmp(values[i], values[i - 1]);
589 if (cmp < 0) ascending = false;
590 if (cmp > 0) descending = false;
591 if (!ascending && !descending) break;
592 }
593
594 if (ascending) return ColumnIndex::BoundaryOrder::ASCENDING;
595 if (descending) return ColumnIndex::BoundaryOrder::DESCENDING;
597 }
598};
599
600} // namespace signet::forge
Builder that accumulates per-page statistics during column writing.
void set_page_location(int64_t offset, int32_t compressed_size)
Record the page location (file offset and compressed size) for the current page.
size_t num_pages() const
Number of pages accumulated so far.
void set_min(const std::string &min_val)
Record the minimum value for the current page (binary-encoded).
void set_max(const std::string &max_val)
Record the maximum value for the current page (binary-encoded).
void set_null_page(bool is_null)
Mark the current page as all-nulls (or not).
void start_page()
Start a new page. Must be called before set_min/set_max etc.
void set_first_row_index(int64_t row_index)
Record the first row index for the current page (relative to row group).
ColumnIndex build_column_index(PhysicalType pt=PhysicalType::BYTE_ARRAY) const
Finalize and return the ColumnIndex from accumulated page info.
void reset()
Reset the builder, discarding all accumulated page info.
OffsetIndex build_offset_index() const
Finalize and return the OffsetIndex from accumulated page info.
void set_null_count(int64_t count)
Record the null count for the current page.
Thrift Compact Protocol reader.
Definition compact.hpp:267
void begin_struct()
Push a new field-ID context for reading a nested struct.
Definition compact.hpp:508
void end_struct()
Pop the field-ID context after finishing a nested struct.
Definition compact.hpp:515
FieldHeader read_field_header()
Read a field header.
Definition compact.hpp:285
int64_t read_i64()
Read a 64-bit integer (zigzag + varint64 decode).
Definition compact.hpp:353
ListHeader read_list_header()
Read a list header. Returns element type and count.
Definition compact.hpp:400
void skip_field(uint8_t thrift_type)
Skip a field without parsing its value.
Definition compact.hpp:427
std::string read_string()
Read a string (varint-length-prefixed UTF-8 bytes).
Definition compact.hpp:380
bool read_bool()
Read a boolean value.
Definition compact.hpp:332
int32_t read_i32()
Read a 32-bit integer (zigzag + varint decode).
Definition compact.hpp:348
Thrift Compact Protocol writer.
Definition compact.hpp:72
void begin_struct()
Push a new field-ID context for a nested struct.
Definition compact.hpp:100
void write_bool(bool val)
Write a standalone bool (not embedded in a field header).
Definition compact.hpp:108
void end_struct()
Pop the field-ID context after finishing a nested struct.
Definition compact.hpp:103
void write_string(const std::string &val)
Write a string as varint-length-prefixed UTF-8 bytes.
Definition compact.hpp:163
void write_field(int16_t field_id, uint8_t thrift_type)
Write a field header.
Definition compact.hpp:85
void write_i32(int32_t val)
Write a 32-bit integer as zigzag + varint.
Definition compact.hpp:134
void write_stop()
Write struct stop marker (0x00).
Definition compact.hpp:97
void write_i64(int64_t val)
Write a 64-bit integer as zigzag + varint.
Definition compact.hpp:139
void write_list_header(uint8_t elem_type, int32_t size)
Write a list header.
Definition compact.hpp:185
Thrift Compact Protocol encoder and decoder for Parquet metadata serialization.
constexpr uint8_t STRUCT
Nested struct.
Definition compact.hpp:39
constexpr uint8_t I32
32-bit signed integer (zigzag + varint).
Definition compact.hpp:32
constexpr uint8_t BINARY
Length-prefixed bytes (also used for STRING).
Definition compact.hpp:35
constexpr uint8_t LIST
List container.
Definition compact.hpp:36
constexpr uint8_t STOP
Struct stop marker.
Definition compact.hpp:27
constexpr uint8_t BOOL_TRUE
Boolean true (embedded in field header).
Definition compact.hpp:28
constexpr uint8_t I64
64-bit signed integer (zigzag + varint).
Definition compact.hpp:33
PhysicalType
Parquet physical (storage) types as defined in parquet.thrift.
Definition types.hpp:20
@ INT64
64-bit signed integer (little-endian).
@ INT32
32-bit signed integer (little-endian).
@ BYTE_ARRAY
Variable-length byte sequence (strings, binary).
@ FLOAT
IEEE 754 single-precision float.
@ DOUBLE
IEEE 754 double-precision float.
Per-column-chunk statistics tracker and little-endian byte helpers.
Per-page min/max statistics for predicate pushdown.
void deserialize(thrift::CompactDecoder &dec)
Deserialize this ColumnIndex from a Thrift compact decoder.
bool valid_
False if deserialization failed (M-V7).
bool valid() const
Check if deserialization was successful.
BoundaryOrder boundary_order
Boundary order of min values.
std::vector< bool > null_pages
True if the corresponding page is all nulls.
std::vector< size_t > filter_pages(const std::string &min_val, const std::string &max_val, PhysicalType physical_type=PhysicalType::BYTE_ARRAY) const
Filter pages by a value range for predicate pushdown.
std::vector< std::string > max_values
Binary-encoded maximum value per page.
BoundaryOrder
Ordering of min values across pages, used to short-circuit filtering.
@ UNORDERED
Min values have no particular order.
@ ASCENDING
Min values are non-decreasing across pages.
@ DESCENDING
Min values are non-increasing across pages.
void serialize(thrift::CompactEncoder &enc) const
Serialize this ColumnIndex to a Thrift compact encoder.
std::vector< std::string > min_values
Binary-encoded minimum value per page.
std::vector< int64_t > null_counts
Null count per page (optional).
Page locations for random access within a column chunk.
void deserialize(thrift::CompactDecoder &dec)
Deserialize this OffsetIndex from a Thrift compact decoder.
void serialize(thrift::CompactEncoder &enc) const
Serialize this OffsetIndex to a Thrift compact encoder.
bool valid_
False if deserialization failed (M-V7).
bool valid() const
Check if deserialization was successful.
std::vector< PageLocation > page_locations
One entry per data page.
File offset and size descriptor for a single data page.
int32_t compressed_page_size
Size of the page in compressed bytes.
void serialize(thrift::CompactEncoder &enc) const
Serialize this PageLocation to a Thrift compact encoder.
int64_t first_row_index
First row in this page (relative to row group).
int64_t offset
Absolute file offset of the page header.
void deserialize(thrift::CompactDecoder &dec)
Deserialize this PageLocation from a Thrift compact decoder.