Signet Forge 0.1.0
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
rle.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
3
13
14#pragma once
15
16#include <algorithm>
17#include <cassert>
18#include <cstdint>
19#include <cstring>
20#include <vector>
21
22namespace signet::forge {
23
24// ===========================================================================
25// RLE/Bit-Packing Hybrid Encoding (Parquet spec)
26// ===========================================================================
27//
28// This implements the RLE/Bit-Packing Hybrid encoding used throughout Parquet:
29// - Definition levels and repetition levels
30// - Boolean columns (when RLE-encoded)
31// - Dictionary indices (RLE_DICTIONARY encoding)
32//
33// Wire format (each run begins with a varint header):
34//
35// header & 1 == 0 => RLE run
36// run_length = header >> 1
37// followed by the repeated value in ceil(bit_width/8) bytes, LE
38//
39// header & 1 == 1 => Bit-packed group
40// group_count = header >> 1
41// each group holds 8 values, packed at bit_width bits per value, LSB first
42// total bytes = group_count * bit_width
43//
44// For def/rep levels the encoded payload is prefixed with a 4-byte LE length.
45// For RLE_DICTIONARY, the bit_width is stored as a single preceding byte.
46//
47
48// ---------------------------------------------------------------------------
49// Varint helpers (unsigned LEB128)
50// ---------------------------------------------------------------------------
51
62inline size_t encode_varint(std::vector<uint8_t>& buf, uint64_t value) {
63 size_t start = buf.size();
64 while (value >= 0x80) {
65 buf.push_back(static_cast<uint8_t>(value & 0x7F) | 0x80);
66 value >>= 7;
67 }
68 buf.push_back(static_cast<uint8_t>(value));
69 return buf.size() - start;
70}
71
84inline uint64_t decode_varint(const uint8_t* data, size_t& pos, size_t size) {
85 uint64_t result = 0;
86 int shift = 0;
87 size_t start_pos = pos;
88 while (pos < size) {
89 uint8_t byte = data[pos++];
90 result |= static_cast<uint64_t>(byte & 0x7F) << shift;
91 if ((byte & 0x80) == 0) {
92 return result;
93 }
94 shift += 7;
95 if (shift >= 64) { pos = start_pos; return 0; } // CWE-190: Integer Overflow — restore position on overflow
96 }
97 pos = start_pos;
98 return result;
99}
100
101// ---------------------------------------------------------------------------
102// Bit-packing helpers
103// ---------------------------------------------------------------------------
104
116inline void bit_pack_8(std::vector<uint8_t>& out, const uint64_t* values, int bit_width) {
117 if (bit_width == 0) return; // all zeros, no bytes needed
118
119 // Total bits = 8 * bit_width; bytes = ceil(8 * bit_width / 8) = bit_width
120 size_t start = out.size();
121 out.resize(start + (8 * static_cast<size_t>(bit_width) + 7) / 8, 0);
122 uint8_t* dst = out.data() + start;
123
124 int bit_offset = 0; // absolute bit position in dst
125 for (int i = 0; i < 8; ++i) {
126 uint64_t val = values[i];
127 // Write bit_width bits of val starting at bit_offset
128 int bits_remaining = bit_width;
129 int cur_bit = bit_offset;
130 while (bits_remaining > 0) {
131 int byte_idx = cur_bit / 8;
132 int bit_idx = cur_bit % 8;
133 int bits_to_write = (std::min)(bits_remaining, 8 - bit_idx);
134 uint8_t mask = static_cast<uint8_t>((val & ((uint64_t{1} << bits_to_write) - 1)) << bit_idx);
135 dst[byte_idx] |= mask;
136 val >>= bits_to_write;
137 cur_bit += bits_to_write;
138 bits_remaining -= bits_to_write;
139 }
140 bit_offset += bit_width;
141 }
142}
143
154inline void bit_unpack_8(const uint8_t* src, uint64_t* values, int bit_width) {
155 if (bit_width == 0) {
156 for (int i = 0; i < 8; ++i) values[i] = 0;
157 return;
158 }
159
160 uint64_t mask = (bit_width == 64) ? ~uint64_t(0)
161 : (uint64_t(1) << bit_width) - 1;
162
163 int bit_offset = 0;
164 for (int i = 0; i < 8; ++i) {
165 uint64_t val = 0;
166 int bits_remaining = bit_width;
167 int cur_bit = bit_offset;
168 int val_bit = 0;
169 while (bits_remaining > 0) {
170 int byte_idx = cur_bit / 8;
171 int bit_idx = cur_bit % 8;
172 int bits_avail = 8 - bit_idx;
173 int bits_to_read = (std::min)(bits_remaining, bits_avail);
174 uint64_t chunk = (src[byte_idx] >> bit_idx) & ((uint64_t{1} << bits_to_read) - 1);
175 val |= chunk << val_bit;
176 cur_bit += bits_to_read;
177 val_bit += bits_to_read;
178 bits_remaining -= bits_to_read;
179 }
180 values[i] = val & mask;
181 bit_offset += bit_width;
182 }
183}
184
185// ===========================================================================
186// RleEncoder
187// ===========================================================================
188//
189// Encodes a stream of unsigned integer values using the Parquet RLE/Bit-Pack
190// Hybrid scheme. The encoder buffers values and decides per-group whether to
191// emit an RLE run (for repeated values) or a bit-packed group of 8.
192//
193// Usage:
194// RleEncoder enc(bit_width);
195// for (auto v : values) enc.put(v);
196// enc.flush();
197// // enc.data() contains the encoded bytes (no length prefix)
198//
212public:
217 explicit RleEncoder(int bit_width)
218 : bit_width_(bit_width < 0 ? 0 : (bit_width > 64 ? 0 : bit_width))
219 , byte_width_(bit_width_ > 0 ? static_cast<int>((bit_width_ + 7) / 8) : 0) {}
220
228 void put(uint64_t value) {
229 // bit_width=0: all values are implicitly zero, nothing to encode.
230 // Matches static encode() which returns empty for bit_width=0.
231 if (bit_width_ == 0) return;
232
233 // Accumulate values into the pending buffer
234 if (rle_count_ == 0) {
235 // Start a new potential RLE run
236 rle_value_ = value;
237 rle_count_ = 1;
238 } else if (value == rle_value_ && bp_count_ == 0) {
239 // Extend current RLE run
240 ++rle_count_;
241 } else {
242 // Value differs from current run, or we're already in bit-pack mode
243 if (bp_count_ == 0 && rle_count_ >= 8) {
244 // We have a worthwhile RLE run, flush it
245 flush_rle_run();
246 // Start new run with this value
247 rle_value_ = value;
248 rle_count_ = 1;
249 } else {
250 // Move RLE-buffered values into bit-pack buffer
251 while (rle_count_ > 0) {
252 bp_buffer_[bp_count_++] = rle_value_;
253 --rle_count_;
254 if (bp_count_ == 8) {
255 flush_bp_group();
256 }
257 }
258 // Add the new value to bit-pack buffer
259 bp_buffer_[bp_count_++] = value;
260 if (bp_count_ == 8) {
261 flush_bp_group();
262 }
263 }
264 }
265 }
266
272 void flush() {
273 // First: if we have pending bit-packed values plus RLE values,
274 // drain everything
275 if (bp_count_ > 0 && rle_count_ > 0) {
276 // Move remaining RLE values into bit-pack
277 while (rle_count_ > 0) {
278 bp_buffer_[bp_count_++] = rle_value_;
279 --rle_count_;
280 if (bp_count_ == 8) {
281 flush_bp_group();
282 }
283 }
284 }
285
286 // Flush any pending RLE run
287 if (rle_count_ > 0) {
288 flush_rle_run();
289 }
290
291 // Flush any remaining bit-pack values (partial group, pad with zeros)
292 if (bp_count_ > 0) {
293 // Pad to 8 with zeros
294 while (bp_count_ < 8) {
295 bp_buffer_[bp_count_++] = 0;
296 }
297 flush_bp_group();
298 }
299
300 // Flush accumulated bit-pack groups
301 flush_bp_groups();
302 }
303
308 [[nodiscard]] const std::vector<uint8_t>& data() const { return buffer_; }
309
313 [[nodiscard]] size_t encoded_size() const { return buffer_.size(); }
314
319 void reset() {
320 buffer_.clear();
321 rle_count_ = 0;
322 rle_value_ = 0;
323 bp_count_ = 0;
324 bp_groups_.clear();
325 bp_group_count_ = 0;
326 }
327
328 // -----------------------------------------------------------------------
329 // Static convenience methods
330 // -----------------------------------------------------------------------
331
342 static std::vector<uint8_t> encode(const uint32_t* values, size_t count, int bit_width) {
343 if (bit_width == 0) {
344 // Parquet levels may legally use bit_width=0 when all values are 0.
345 // We encode this as an empty payload (no runs needed).
346 for (size_t i = 0; i < count; ++i) {
347 if (values[i] != 0) return {};
348 }
349 return {};
350 }
351 if (bit_width < 1 || bit_width > 64) return {};
352 RleEncoder enc(bit_width);
353 for (size_t i = 0; i < count; ++i) {
354 enc.put(static_cast<uint64_t>(values[i]));
355 }
356 enc.flush();
357 return enc.buffer_;
358 }
359
371 static std::vector<uint8_t> encode_with_length(const uint32_t* values, size_t count,
372 int bit_width) {
373 auto payload = encode(values, count, bit_width);
374 if (payload.size() > UINT32_MAX) return {}; // CWE-190: Integer Overflow — payload too large for uint32 length prefix
375 std::vector<uint8_t> result;
376 result.reserve(4 + payload.size());
377 // 4-byte LE length prefix
378 uint32_t len = static_cast<uint32_t>(payload.size());
379 result.push_back(static_cast<uint8_t>(len));
380 result.push_back(static_cast<uint8_t>(len >> 8));
381 result.push_back(static_cast<uint8_t>(len >> 16));
382 result.push_back(static_cast<uint8_t>(len >> 24));
383 result.insert(result.end(), payload.begin(), payload.end());
384 return result;
385 }
386
387private:
388 int bit_width_;
389 int byte_width_;
390
391 std::vector<uint8_t> buffer_;
392
393 uint64_t rle_value_ = 0;
394 size_t rle_count_ = 0;
395
396 uint64_t bp_buffer_[8] = {};
397 int bp_count_ = 0;
398
399 std::vector<uint8_t> bp_groups_;
400 int bp_group_count_ = 0;
401
403 void write_rle_value(uint64_t value) {
404 for (int i = 0; i < byte_width_; ++i) {
405 buffer_.push_back(static_cast<uint8_t>(value & 0xFF));
406 value >>= 8;
407 }
408 }
409
411 void flush_rle_run() {
412 // First flush any pending bit-pack groups
413 flush_bp_groups();
414
415 // CWE-190: Integer Overflow / CWE-682: Incorrect Calculation — cap before left shift.
416 // Runs exceeding SIZE_MAX/2 are capped to prevent varint overflow on (count << 1).
417 // In practice, Parquet page sizes limit run lengths well below this threshold.
418 if (rle_count_ > (SIZE_MAX >> 1)) rle_count_ = SIZE_MAX >> 1;
419 // header = (run_length << 1) | 0
420 encode_varint(buffer_, rle_count_ << 1);
421 write_rle_value(rle_value_);
422 rle_count_ = 0;
423 }
424
426 void flush_bp_group() {
427 bit_pack_8(bp_groups_, bp_buffer_, bit_width_);
428 ++bp_group_count_;
429 bp_count_ = 0;
430 std::memset(bp_buffer_, 0, sizeof(bp_buffer_));
431 }
432
434 void flush_bp_groups() {
435 if (bp_group_count_ == 0) return;
436
437 // header = (group_count << 1) | 1
438 encode_varint(buffer_, (static_cast<uint64_t>(bp_group_count_) << 1) | 1);
439 buffer_.insert(buffer_.end(), bp_groups_.begin(), bp_groups_.end());
440
441 bp_groups_.clear();
442 bp_group_count_ = 0;
443 }
444};
445
446// ===========================================================================
447// RleDecoder
448// ===========================================================================
449//
450// Decodes a byte stream encoded with the Parquet RLE/Bit-Pack Hybrid scheme.
451//
452// Usage:
453// RleDecoder dec(data, size, bit_width);
454// uint64_t val;
455// while (dec.get(&val)) { /* process val */ }
456//
468public:
475 RleDecoder(const uint8_t* data, size_t size, int bit_width)
476 : data_(data)
477 , size_(size)
478 , pos_(0)
479 , bit_width_(bit_width < 0 ? 0 : (bit_width > 64 ? 0 : bit_width))
480 , byte_width_(bit_width_ > 0 ? static_cast<int>((bit_width_ + 7) / 8) : 0)
481 , rle_remaining_(0)
482 , rle_value_(0)
483 , bp_remaining_(0)
484 , bp_index_(0) {}
485
495 bool get(uint64_t* value) {
496 // If we have buffered RLE values, return one
497 if (rle_remaining_ > 0) {
498 *value = rle_value_;
499 --rle_remaining_;
500 return true;
501 }
502
503 // If we have buffered bit-packed values, return one
504 if (bp_remaining_ > 0) {
505 *value = bp_buffer_[bp_index_++];
506 --bp_remaining_;
507 return true;
508 }
509
510 // Need to read the next header
511 if (pos_ >= size_) return false;
512
513 uint64_t header = decode_varint(data_, pos_, size_);
514
515 if ((header & 1) == 0) {
516 // RLE run
517 uint64_t run_length = header >> 1;
518 if (run_length == 0) return false;
519 static constexpr uint64_t MAX_RLE_RUN = 256 * 1024 * 1024; // 256M
520 if (run_length > MAX_RLE_RUN) return false;
521
522 // Read the value in byte_width_ LE bytes
523 uint64_t val = 0;
524 int bytes_read = 0;
525 for (int i = 0; i < byte_width_ && pos_ < size_; ++i) {
526 val |= static_cast<uint64_t>(data_[pos_++]) << (8 * i);
527 ++bytes_read;
528 }
529 if (bytes_read < byte_width_) return false; // CWE-125: Out-of-bounds Read — truncated RLE value
530
531 rle_value_ = val;
532 rle_remaining_ = run_length - 1; // return one now
533 *value = val;
534 return true;
535 } else {
536 // Bit-packed groups
537 uint64_t group_count = header >> 1;
538 if (group_count == 0) return false;
539
540 // Cap group_count to available data to prevent OOM from corrupt varints
541 size_t avail_bytes = (pos_ < size_) ? (size_ - pos_) : 0;
542 if (bit_width_ > 0) {
543 size_t max_groups = avail_bytes / static_cast<size_t>(bit_width_) + 1;
544 if (group_count > max_groups) group_count = max_groups;
545 } else {
546 // bit_width_ == 0: all values are 0, cap to available data
547 if (group_count > avail_bytes + 1) group_count = avail_bytes + 1;
548 }
549 if (group_count == 0) return false;
550
551 size_t total_values = group_count * 8;
552 size_t total_bytes = group_count * static_cast<size_t>(bit_width_);
553 // Value-count cap: prevent OOM when bit_width_==0 (CWE-770)
554 static constexpr size_t MAX_BP_VALUES = 32 * 1024 * 1024; // 32M values
555 if (total_values > MAX_BP_VALUES) return false;
556 // Byte-based cap: prevent huge allocations from corrupt varints (CWE-770)
557 static constexpr size_t MAX_BP_BYTES = 256 * 1024 * 1024; // 256 MB
558 if (total_bytes > MAX_BP_BYTES) return false;
559
560 if (pos_ + total_bytes > size_) {
561 // Truncated data: only decode what we can
562 total_bytes = size_ - pos_;
563 }
564
565 // Decode all groups into bp_decoded_
566 bp_decoded_.resize(total_values);
567 size_t src_offset = 0;
568 size_t val_offset = 0;
569 for (uint64_t g = 0; g < group_count; ++g) {
570 if (src_offset + static_cast<size_t>(bit_width_) > total_bytes) break;
571 bit_unpack_8(data_ + pos_ + src_offset,
572 bp_decoded_.data() + val_offset,
573 bit_width_);
574 src_offset += static_cast<size_t>(bit_width_);
575 val_offset += 8;
576 }
577
578 pos_ += total_bytes;
579
580 // If no groups were decoded (all truncated), stop
581 if (val_offset == 0) return false;
582
583 // Set up buffered read state
584 bp_buffer_ = bp_decoded_.data();
585 bp_index_ = 1;
586 bp_remaining_ = val_offset - 1;
587 *value = bp_decoded_[0];
588 return true;
589 }
590 }
591
601 bool get_batch(uint64_t* out, size_t count) {
602 for (size_t i = 0; i < count; ++i) {
603 if (!get(&out[i])) return false;
604 }
605 return true;
606 }
607
608 // -----------------------------------------------------------------------
609 // Static convenience methods
610 // -----------------------------------------------------------------------
611
626 static std::vector<uint32_t> decode(const uint8_t* data, size_t size,
627 int bit_width, size_t num_values) {
628 if (bit_width == 0) {
629 // Canonical empty payload for a zero-width stream. Reject any
630 // non-empty payload as malformed to avoid ambiguous decoding.
631 if (size != 0) return {};
632 return std::vector<uint32_t>(num_values, 0u);
633 }
634 if (bit_width < 1 || bit_width > 64) return {};
635 RleDecoder dec(data, size, bit_width);
636 std::vector<uint32_t> result;
637 result.reserve(num_values);
638 uint64_t val;
639 for (size_t i = 0; i < num_values; ++i) {
640 if (!dec.get(&val)) break;
641 result.push_back(static_cast<uint32_t>(val));
642 }
643 return result;
644 }
645
658 static std::vector<uint32_t> decode_with_length(const uint8_t* data, size_t size,
659 int bit_width, size_t num_values) {
660 if (size < 4) return {};
661
662 uint32_t payload_len = static_cast<uint32_t>(data[0])
663 | (static_cast<uint32_t>(data[1]) << 8)
664 | (static_cast<uint32_t>(data[2]) << 16)
665 | (static_cast<uint32_t>(data[3]) << 24);
666
667 size_t available = (std::min)(static_cast<size_t>(payload_len), size - 4);
668 return decode(data + 4, available, bit_width, num_values);
669 }
670
671private:
672 const uint8_t* data_;
673 size_t size_;
674 size_t pos_;
675 int bit_width_;
676 int byte_width_;
677
678 size_t rle_remaining_;
679 uint64_t rle_value_;
680
681 size_t bp_remaining_;
682 size_t bp_index_;
683 uint64_t* bp_buffer_ = nullptr;
684 std::vector<uint64_t> bp_decoded_;
685};
686
687} // namespace signet::forge
Streaming decoder for the Parquet RLE/Bit-Packing Hybrid scheme.
Definition rle.hpp:467
bool get(uint64_t *value)
Read the next decoded value.
Definition rle.hpp:495
bool get_batch(uint64_t *out, size_t count)
Read a batch of decoded values.
Definition rle.hpp:601
static std::vector< uint32_t > decode(const uint8_t *data, size_t size, int bit_width, size_t num_values)
Decode values from an RLE-encoded buffer without a length prefix.
Definition rle.hpp:626
static std::vector< uint32_t > decode_with_length(const uint8_t *data, size_t size, int bit_width, size_t num_values)
Decode from a buffer that starts with a 4-byte LE length prefix.
Definition rle.hpp:658
RleDecoder(const uint8_t *data, size_t size, int bit_width)
Construct a decoder over a raw encoded byte buffer.
Definition rle.hpp:475
Streaming encoder for the Parquet RLE/Bit-Packing Hybrid scheme.
Definition rle.hpp:211
void flush()
Flush any pending values to the output buffer.
Definition rle.hpp:272
void reset()
Reset the encoder to its initial state, preserving the bit width.
Definition rle.hpp:319
static std::vector< uint8_t > encode_with_length(const uint32_t *values, size_t count, int bit_width)
Encode with a 4-byte little-endian length prefix.
Definition rle.hpp:371
static std::vector< uint8_t > encode(const uint32_t *values, size_t count, int bit_width)
Encode an array of uint32 values using the RLE/Bit-Pack Hybrid scheme.
Definition rle.hpp:342
RleEncoder(int bit_width)
Construct an encoder for values of the given bit width.
Definition rle.hpp:217
const std::vector< uint8_t > & data() const
Returns a reference to the encoded byte buffer (without length prefix).
Definition rle.hpp:308
size_t encoded_size() const
Returns the size of the encoded data in bytes.
Definition rle.hpp:313
void put(uint64_t value)
Add a single value to the encoding stream.
Definition rle.hpp:228
uint64_t decode_varint(const uint8_t *data, size_t &pos, size_t size)
Decode an unsigned varint (LEB128) from a byte buffer.
Definition rle.hpp:84
size_t encode_varint(std::vector< uint8_t > &buf, uint64_t value)
Encode an unsigned varint (LEB128) into a byte buffer.
Definition rle.hpp:62
void bit_unpack_8(const uint8_t *src, uint64_t *values, int bit_width)
Unpack exactly 8 values at the given bit width from a byte buffer.
Definition rle.hpp:154
void bit_pack_8(std::vector< uint8_t > &out, const uint64_t *values, int bit_width)
Pack exactly 8 values at the given bit width into a byte buffer.
Definition rle.hpp:116