63 size_t start = buf.size();
64 while (value >= 0x80) {
65 buf.push_back(
static_cast<uint8_t
>(value & 0x7F) | 0x80);
68 buf.push_back(
static_cast<uint8_t
>(value));
69 return buf.size() - start;
84inline uint64_t
decode_varint(
const uint8_t* data,
size_t& pos,
size_t size) {
87 size_t start_pos = pos;
89 uint8_t
byte = data[pos++];
90 result |=
static_cast<uint64_t
>(
byte & 0x7F) << shift;
91 if ((
byte & 0x80) == 0) {
95 if (shift >= 64) { pos = start_pos;
return 0; }
116inline void bit_pack_8(std::vector<uint8_t>& out,
const uint64_t* values,
int bit_width) {
117 if (bit_width == 0)
return;
120 size_t start = out.size();
121 out.resize(start + (8 *
static_cast<size_t>(bit_width) + 7) / 8, 0);
122 uint8_t* dst = out.data() + start;
125 for (
int i = 0; i < 8; ++i) {
126 uint64_t val = values[i];
128 int bits_remaining = bit_width;
129 int cur_bit = bit_offset;
130 while (bits_remaining > 0) {
131 int byte_idx = cur_bit / 8;
132 int bit_idx = cur_bit % 8;
133 int bits_to_write = (std::min)(bits_remaining, 8 - bit_idx);
134 uint8_t mask =
static_cast<uint8_t
>((val & ((uint64_t{1} << bits_to_write) - 1)) << bit_idx);
135 dst[byte_idx] |= mask;
136 val >>= bits_to_write;
137 cur_bit += bits_to_write;
138 bits_remaining -= bits_to_write;
140 bit_offset += bit_width;
154inline void bit_unpack_8(
const uint8_t* src, uint64_t* values,
int bit_width) {
155 if (bit_width == 0) {
156 for (
int i = 0; i < 8; ++i) values[i] = 0;
160 uint64_t mask = (bit_width == 64) ? ~uint64_t(0)
161 : (uint64_t(1) << bit_width) - 1;
164 for (
int i = 0; i < 8; ++i) {
166 int bits_remaining = bit_width;
167 int cur_bit = bit_offset;
169 while (bits_remaining > 0) {
170 int byte_idx = cur_bit / 8;
171 int bit_idx = cur_bit % 8;
172 int bits_avail = 8 - bit_idx;
173 int bits_to_read = (std::min)(bits_remaining, bits_avail);
174 uint64_t chunk = (src[byte_idx] >> bit_idx) & ((uint64_t{1} << bits_to_read) - 1);
175 val |= chunk << val_bit;
176 cur_bit += bits_to_read;
177 val_bit += bits_to_read;
178 bits_remaining -= bits_to_read;
180 values[i] = val & mask;
181 bit_offset += bit_width;
218 : bit_width_(bit_width < 0 ? 0 : (bit_width > 64 ? 0 : bit_width))
219 , byte_width_(bit_width_ > 0 ? static_cast<int>((bit_width_ + 7) / 8) : 0) {}
228 void put(uint64_t value) {
231 if (bit_width_ == 0)
return;
234 if (rle_count_ == 0) {
238 }
else if (value == rle_value_ && bp_count_ == 0) {
243 if (bp_count_ == 0 && rle_count_ >= 8) {
251 while (rle_count_ > 0) {
252 bp_buffer_[bp_count_++] = rle_value_;
254 if (bp_count_ == 8) {
259 bp_buffer_[bp_count_++] = value;
260 if (bp_count_ == 8) {
275 if (bp_count_ > 0 && rle_count_ > 0) {
277 while (rle_count_ > 0) {
278 bp_buffer_[bp_count_++] = rle_value_;
280 if (bp_count_ == 8) {
287 if (rle_count_ > 0) {
294 while (bp_count_ < 8) {
295 bp_buffer_[bp_count_++] = 0;
308 [[nodiscard]]
const std::vector<uint8_t>&
data()
const {
return buffer_; }
342 static std::vector<uint8_t>
encode(
const uint32_t* values,
size_t count,
int bit_width) {
343 if (bit_width == 0) {
346 for (
size_t i = 0; i < count; ++i) {
347 if (values[i] != 0)
return {};
351 if (bit_width < 1 || bit_width > 64)
return {};
353 for (
size_t i = 0; i < count; ++i) {
354 enc.
put(
static_cast<uint64_t
>(values[i]));
373 auto payload =
encode(values, count, bit_width);
374 if (payload.size() > UINT32_MAX)
return {};
375 std::vector<uint8_t> result;
376 result.reserve(4 + payload.size());
378 uint32_t len =
static_cast<uint32_t
>(payload.size());
379 result.push_back(
static_cast<uint8_t
>(len));
380 result.push_back(
static_cast<uint8_t
>(len >> 8));
381 result.push_back(
static_cast<uint8_t
>(len >> 16));
382 result.push_back(
static_cast<uint8_t
>(len >> 24));
383 result.insert(result.end(), payload.begin(), payload.end());
391 std::vector<uint8_t> buffer_;
393 uint64_t rle_value_ = 0;
394 size_t rle_count_ = 0;
396 uint64_t bp_buffer_[8] = {};
399 std::vector<uint8_t> bp_groups_;
400 int bp_group_count_ = 0;
403 void write_rle_value(uint64_t value) {
404 for (
int i = 0; i < byte_width_; ++i) {
405 buffer_.push_back(
static_cast<uint8_t
>(value & 0xFF));
411 void flush_rle_run() {
418 if (rle_count_ > (SIZE_MAX >> 1)) rle_count_ = SIZE_MAX >> 1;
421 write_rle_value(rle_value_);
426 void flush_bp_group() {
427 bit_pack_8(bp_groups_, bp_buffer_, bit_width_);
430 std::memset(bp_buffer_, 0,
sizeof(bp_buffer_));
434 void flush_bp_groups() {
435 if (bp_group_count_ == 0)
return;
438 encode_varint(buffer_, (
static_cast<uint64_t
>(bp_group_count_) << 1) | 1);
439 buffer_.insert(buffer_.end(), bp_groups_.begin(), bp_groups_.end());
479 , bit_width_(bit_width < 0 ? 0 : (bit_width > 64 ? 0 : bit_width))
480 , byte_width_(bit_width_ > 0 ? static_cast<int>((bit_width_ + 7) / 8) : 0)
495 bool get(uint64_t* value) {
497 if (rle_remaining_ > 0) {
504 if (bp_remaining_ > 0) {
505 *value = bp_buffer_[bp_index_++];
511 if (pos_ >= size_)
return false;
515 if ((header & 1) == 0) {
517 uint64_t run_length = header >> 1;
518 if (run_length == 0)
return false;
519 static constexpr uint64_t MAX_RLE_RUN = 256 * 1024 * 1024;
520 if (run_length > MAX_RLE_RUN)
return false;
525 for (
int i = 0; i < byte_width_ && pos_ < size_; ++i) {
526 val |=
static_cast<uint64_t
>(data_[pos_++]) << (8 * i);
529 if (bytes_read < byte_width_)
return false;
532 rle_remaining_ = run_length - 1;
537 uint64_t group_count = header >> 1;
538 if (group_count == 0)
return false;
541 size_t avail_bytes = (pos_ < size_) ? (size_ - pos_) : 0;
542 if (bit_width_ > 0) {
543 size_t max_groups = avail_bytes /
static_cast<size_t>(bit_width_) + 1;
544 if (group_count > max_groups) group_count = max_groups;
547 if (group_count > avail_bytes + 1) group_count = avail_bytes + 1;
549 if (group_count == 0)
return false;
551 size_t total_values = group_count * 8;
552 size_t total_bytes = group_count *
static_cast<size_t>(bit_width_);
554 static constexpr size_t MAX_BP_VALUES = 32 * 1024 * 1024;
555 if (total_values > MAX_BP_VALUES)
return false;
557 static constexpr size_t MAX_BP_BYTES = 256 * 1024 * 1024;
558 if (total_bytes > MAX_BP_BYTES)
return false;
560 if (pos_ + total_bytes > size_) {
562 total_bytes = size_ - pos_;
566 bp_decoded_.resize(total_values);
567 size_t src_offset = 0;
568 size_t val_offset = 0;
569 for (uint64_t g = 0; g < group_count; ++g) {
570 if (src_offset +
static_cast<size_t>(bit_width_) > total_bytes)
break;
572 bp_decoded_.data() + val_offset,
574 src_offset +=
static_cast<size_t>(bit_width_);
581 if (val_offset == 0)
return false;
584 bp_buffer_ = bp_decoded_.data();
586 bp_remaining_ = val_offset - 1;
587 *value = bp_decoded_[0];
602 for (
size_t i = 0; i < count; ++i) {
603 if (!
get(&out[i]))
return false;
626 static std::vector<uint32_t>
decode(
const uint8_t* data,
size_t size,
627 int bit_width,
size_t num_values) {
628 if (bit_width == 0) {
631 if (size != 0)
return {};
632 return std::vector<uint32_t>(num_values, 0u);
634 if (bit_width < 1 || bit_width > 64)
return {};
636 std::vector<uint32_t> result;
637 result.reserve(num_values);
639 for (
size_t i = 0; i < num_values; ++i) {
640 if (!dec.
get(&val))
break;
641 result.push_back(
static_cast<uint32_t
>(val));
659 int bit_width,
size_t num_values) {
660 if (size < 4)
return {};
662 uint32_t payload_len =
static_cast<uint32_t
>(data[0])
663 | (
static_cast<uint32_t
>(data[1]) << 8)
664 | (
static_cast<uint32_t
>(data[2]) << 16)
665 | (
static_cast<uint32_t
>(data[3]) << 24);
667 size_t available = (std::min)(
static_cast<size_t>(payload_len), size - 4);
668 return decode(data + 4, available, bit_width, num_values);
672 const uint8_t* data_;
678 size_t rle_remaining_;
681 size_t bp_remaining_;
683 uint64_t* bp_buffer_ =
nullptr;
684 std::vector<uint64_t> bp_decoded_;
Streaming decoder for the Parquet RLE/Bit-Packing Hybrid scheme.
bool get(uint64_t *value)
Read the next decoded value.
bool get_batch(uint64_t *out, size_t count)
Read a batch of decoded values.
static std::vector< uint32_t > decode(const uint8_t *data, size_t size, int bit_width, size_t num_values)
Decode values from an RLE-encoded buffer without a length prefix.
static std::vector< uint32_t > decode_with_length(const uint8_t *data, size_t size, int bit_width, size_t num_values)
Decode from a buffer that starts with a 4-byte LE length prefix.
RleDecoder(const uint8_t *data, size_t size, int bit_width)
Construct a decoder over a raw encoded byte buffer.
Streaming encoder for the Parquet RLE/Bit-Packing Hybrid scheme.
void flush()
Flush any pending values to the output buffer.
void reset()
Reset the encoder to its initial state, preserving the bit width.
static std::vector< uint8_t > encode_with_length(const uint32_t *values, size_t count, int bit_width)
Encode with a 4-byte little-endian length prefix.
static std::vector< uint8_t > encode(const uint32_t *values, size_t count, int bit_width)
Encode an array of uint32 values using the RLE/Bit-Pack Hybrid scheme.
RleEncoder(int bit_width)
Construct an encoder for values of the given bit width.
const std::vector< uint8_t > & data() const
Returns a reference to the encoded byte buffer (without length prefix).
size_t encoded_size() const
Returns the size of the encoded data in bytes.
void put(uint64_t value)
Add a single value to the encoding stream.
uint64_t decode_varint(const uint8_t *data, size_t &pos, size_t size)
Decode an unsigned varint (LEB128) from a byte buffer.
size_t encode_varint(std::vector< uint8_t > &buf, uint64_t value)
Encode an unsigned varint (LEB128) into a byte buffer.
void bit_unpack_8(const uint8_t *src, uint64_t *values, int bit_width)
Unpack exactly 8 values at the given bit width from a byte buffer.
void bit_pack_8(std::vector< uint8_t > &out, const uint64_t *values, int bit_width)
Pack exactly 8 values at the given bit width into a byte buffer.