Signet Forge 0.1.0
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
wal.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
3// wal.hpp — Write-Ahead Log for SignetStack Signet Forge
4// Sub-millisecond append with crash recovery and segment rolling.
5// Phase 8: Streaming WAL + Async Compaction.
6
10
11#pragma once
12
13#include <algorithm>
14#include <array>
15#include <cassert>
16#include <cstdint>
17#include <cstdio>
18#include <cstring>
19#include <ctime>
20#include <filesystem>
21#include <memory>
22#include <mutex>
23#include <optional>
24#include <string>
25#include <utility>
26#include <vector>
27
28#if !defined(_WIN32)
29# include <fcntl.h>
30# include <unistd.h>
31#endif
32#if defined(_WIN32)
33# ifndef WIN32_LEAN_AND_MEAN
34# define WIN32_LEAN_AND_MEAN
35# endif
36# ifndef NOMINMAX
37# define NOMINMAX
38# endif
39# include <windows.h>
40# include <io.h> // _get_osfhandle, _fileno
41#endif
42
43#include "signet/error.hpp"
44#include "signet/types.hpp"
45
46namespace signet::forge {
47
48// ---------------------------------------------------------------------------
49// WAL record on-disk layout:
50// [magic(4)] [seq_num(8)] [timestamp_ns(8)] [data_size(4)] [data(N)] [crc32(4)]
51// Total overhead per record: 28 bytes.
52// ---------------------------------------------------------------------------
53
55static constexpr uint32_t WAL_RECORD_MAGIC = 0x57414C31u; // "WAL1"
56
58static constexpr uint16_t WAL_FILE_HDR_SIZE = 16;
59
61static constexpr char WAL_FILE_MAGIC[16] = "SIGNETWAL1\0\0\0\0\0"; // written once at pos 0
62
67static constexpr uint32_t WAL_MAX_RECORD_SIZE = 64u * 1024u * 1024u; // 64 MB hard cap per record
68
69// ---------------------------------------------------------------------------
70// detail::crc32 — standard CRC-32 (polynomial 0xEDB88320), table-driven
71// ---------------------------------------------------------------------------
72namespace detail {
73
85inline uint32_t crc32(const void* data, size_t length) noexcept {
86 // Build table on first call (lazy-init, thread-safe via constinit table below)
87 static constexpr auto make_table = []() {
88 std::array<uint32_t, 256> t{};
89 for (uint32_t i = 0; i < 256; ++i) {
90 uint32_t c = i;
91 for (int k = 0; k < 8; ++k)
92 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
93 t[i] = c;
94 }
95 return t;
96 };
97 static constexpr auto table = make_table();
98
99 uint32_t crc = 0xFFFFFFFFu;
100 const auto* p = static_cast<const uint8_t*>(data);
101 for (size_t i = 0; i < length; ++i)
102 crc = table[(crc ^ p[i]) & 0xFFu] ^ (crc >> 8);
103 return crc ^ 0xFFFFFFFFu;
104}
105
108inline uint32_t crc32_combine(uint32_t crc_a, const void* data_b, size_t len_b) noexcept {
109 // We want CRC over the combined region. Since we already have crc_a,
110 // we can't extend incrementally without the table's running state.
111 // This helper is intentionally unused in favour of the full-buffer variant;
112 // kept as a hook for future incremental use.
113 (void)crc_a; (void)data_b; (void)len_b;
114 return 0;
115}
116
120inline int64_t now_ns() noexcept {
121 struct timespec ts{};
122#if defined(_WIN32)
123 timespec_get(&ts, TIME_UTC);
124#else
125 ::clock_gettime(CLOCK_REALTIME, &ts);
126#endif
127 return static_cast<int64_t>(ts.tv_sec) * 1'000'000'000LL + ts.tv_nsec;
128}
129
133inline void write_le32(uint8_t* dst, uint32_t v) noexcept {
134 dst[0] = static_cast<uint8_t>(v);
135 dst[1] = static_cast<uint8_t>(v >> 8);
136 dst[2] = static_cast<uint8_t>(v >> 16);
137 dst[3] = static_cast<uint8_t>(v >> 24);
138}
142inline void write_le64(uint8_t* dst, uint64_t v) noexcept {
143 dst[0] = static_cast<uint8_t>(v);
144 dst[1] = static_cast<uint8_t>(v >> 8);
145 dst[2] = static_cast<uint8_t>(v >> 16);
146 dst[3] = static_cast<uint8_t>(v >> 24);
147 dst[4] = static_cast<uint8_t>(v >> 32);
148 dst[5] = static_cast<uint8_t>(v >> 40);
149 dst[6] = static_cast<uint8_t>(v >> 48);
150 dst[7] = static_cast<uint8_t>(v >> 56);
151}
155inline uint32_t read_le32(const uint8_t* src) noexcept {
156 return static_cast<uint32_t>(src[0])
157 | (static_cast<uint32_t>(src[1]) << 8)
158 | (static_cast<uint32_t>(src[2]) << 16)
159 | (static_cast<uint32_t>(src[3]) << 24);
160}
164inline uint64_t read_le64(const uint8_t* src) noexcept {
165 return static_cast<uint64_t>(src[0])
166 | (static_cast<uint64_t>(src[1]) << 8)
167 | (static_cast<uint64_t>(src[2]) << 16)
168 | (static_cast<uint64_t>(src[3]) << 24)
169 | (static_cast<uint64_t>(src[4]) << 32)
170 | (static_cast<uint64_t>(src[5]) << 40)
171 | (static_cast<uint64_t>(src[6]) << 48)
172 | (static_cast<uint64_t>(src[7]) << 56);
173}
174
180inline int full_fsync(int fd) noexcept {
181#if defined(__APPLE__)
182 return ::fcntl(fd, F_FULLFSYNC);
183#elif defined(_WIN32)
184 HANDLE h = reinterpret_cast<HANDLE>(
185 _get_osfhandle(static_cast<int>(fd)));
186 if (h == INVALID_HANDLE_VALUE) return -1;
187 return FlushFileBuffers(h) ? 0 : -1;
188#else
189 return ::fsync(fd);
190#endif
191}
192
193} // namespace detail
194
195// ---------------------------------------------------------------------------
196// WalEntry — one decoded record returned by WalReader
197// ---------------------------------------------------------------------------
198
200struct WalEntry {
201 int64_t seq = -1;
202 int64_t timestamp_ns = 0;
203 std::vector<uint8_t> payload;
204};
205
208
209// ---------------------------------------------------------------------------
210// WalWriterOptions — options for WalWriter::open()
211// (defined outside WalWriter to avoid Apple Clang default-member-init bug)
212// ---------------------------------------------------------------------------
213
219 bool sync_on_append = false;
220 bool sync_on_flush = true;
221 size_t buffer_size = 65536;
222 int64_t start_seq = 0;
223};
224
225// ---------------------------------------------------------------------------
226// WalWriter — append-only WAL file writer
227// ---------------------------------------------------------------------------
228
237public:
240
241 // Non-copyable, movable.
242 WalWriter(const WalWriter&) = delete;
243 WalWriter& operator=(const WalWriter&) = delete;
244 WalWriter(WalWriter&& o) noexcept
245 : file_(nullptr), closed_(true)
246 {
247 std::lock_guard<std::mutex> lk(o.mu_);
248 file_ = o.file_;
249 path_ = std::move(o.path_);
250 next_seq_ = o.next_seq_;
251 bytes_written_ = o.bytes_written_;
252 opts_ = o.opts_;
253 closed_ = o.closed_;
254 o.file_ = nullptr;
255 o.closed_ = true;
256 }
258
259 ~WalWriter() { if (!closed_) (void)close(); }
260
270 static expected<WalWriter> open(const std::string& path, Options opts = {}) {
271 // Try to open existing file for appending.
272 FILE* f = nullptr;
273#ifdef _WIN32
274 f = std::fopen(path.c_str(), "a+b");
275#else
276 // CWE-732: Open with explicit 0600 permissions to prevent world-writable files.
277 int fd = ::open(path.c_str(), O_RDWR | O_CREAT | O_APPEND, 0600);
278 if (fd >= 0) f = ::fdopen(fd, "a+b");
279#endif
280 if (!f)
282 std::string("WalWriter: cannot open '") + path + "': " + std::strerror(errno)};
283
284 // Apply stdio buffer size.
285 if (opts.buffer_size > 0)
286 std::setvbuf(f, nullptr, _IOFBF, opts.buffer_size);
287
288 // Determine current file size to decide whether to write the file header.
289 // H-10: Use platform-specific 64-bit seek to avoid long overflow
290 // on Windows LLP64 where long is 32-bit.
291#ifdef _WIN32
292 if (_fseeki64(f, 0, SEEK_END) != 0) {
293#else
294 if (fseeko(f, 0, SEEK_END) != 0) {
295#endif
296 std::fclose(f);
297 return Error{ErrorCode::IO_ERROR, "WalWriter: fseek failed"};
298 }
299 // L17: use 64-bit ftell on Windows for files > 2 GB.
300 // Note: _ftelli64 is required on Windows LLP64 data model where
301 // long is 32-bit; standard ftell would silently truncate offsets > 2 GB.
302#ifdef _WIN32
303 int64_t file_size = _ftelli64(f);
304#else
305 long file_size = std::ftell(f);
306#endif
307 if (file_size < 0) {
308 std::fclose(f);
309 return Error{ErrorCode::IO_ERROR, "WalWriter: ftell failed"};
310 }
311
312 int64_t next_seq = 0;
313 int64_t bytes_written = 0;
314
315 if (file_size == 0) {
316 // New file — start sequence at opts.start_seq (for segment continuity).
317 next_seq = opts.start_seq;
318 // Write 16-byte file header.
319 if (std::fwrite(WAL_FILE_MAGIC, 1, WAL_FILE_HDR_SIZE, f) != WAL_FILE_HDR_SIZE) {
320 std::fclose(f);
321 return Error{ErrorCode::IO_ERROR, "WalWriter: failed to write file header"};
322 }
323 std::fflush(f);
324 } else {
325 // Existing file — scan to determine the highest seq_num so we can
326 // continue the sequence without a gap.
327 std::fclose(f);
328 f = nullptr;
329
330 // Scan via a temporary reader to find next_seq.
331 FILE* rf = std::fopen(path.c_str(), "rb");
332 if (rf) {
333 // Skip file header.
334 // H-10: Use platform-specific 64-bit seek
335#ifdef _WIN32
336 _fseeki64(rf, WAL_FILE_HDR_SIZE, SEEK_SET);
337#else
338 fseeko(rf, WAL_FILE_HDR_SIZE, SEEK_SET);
339#endif
340 // Scan forward collecting seq numbers.
341 int64_t last_seq = -1;
342 while (true) {
343 uint8_t hdr[24]; // magic(4)+seq(8)+ts(8)+size(4)
344 if (std::fread(hdr, 1, 24, rf) != 24) break;
345 uint32_t magic = detail::read_le32(hdr);
346 if (magic != WAL_RECORD_MAGIC) break;
347 int64_t seq = static_cast<int64_t>(detail::read_le64(hdr + 4));
348 uint32_t data_sz = detail::read_le32(hdr + 20);
349 // CWE-400: reject corrupt/oversized records during resume scan
350 // (H18+L18: prevents unbounded fseek from crafted data_sz).
351 if (data_sz > WAL_MAX_RECORD_SIZE) break;
352 // Read data + CRC and verify integrity
353 std::vector<uint8_t> record_data(data_sz);
354 if (data_sz > 0 && std::fread(record_data.data(), 1, data_sz, rf) != data_sz) break;
355 uint8_t crc_buf[4];
356 if (std::fread(crc_buf, 1, 4, rf) != 4) break;
357 uint32_t stored_crc = detail::read_le32(crc_buf);
358 // CRC covers header(24) + data
359 std::vector<uint8_t> combined(24 + data_sz);
360 std::memcpy(combined.data(), hdr, 24);
361 if (data_sz > 0)
362 std::memcpy(combined.data() + 24, record_data.data(), data_sz);
363 uint32_t computed_crc = detail::crc32(combined.data(), combined.size());
364 if (computed_crc != stored_crc) break; // reject records with bad CRC
365 last_seq = seq;
366 }
367 std::fclose(rf);
368 if (last_seq >= 0) next_seq = last_seq + 1;
369 }
370
371 // Re-open for appending.
372#ifdef _WIN32
373 f = std::fopen(path.c_str(), "ab");
374#else
375 // CWE-732: Reopen with explicit 0600 permissions.
376 int fd2 = ::open(path.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0600);
377 if (fd2 >= 0) f = ::fdopen(fd2, "ab");
378#endif
379 if (!f)
380 return Error{ErrorCode::IO_ERROR,
381 std::string("WalWriter: cannot reopen '") + path + "': " + std::strerror(errno)};
382 if (opts.buffer_size > 0)
383 std::setvbuf(f, nullptr, _IOFBF, opts.buffer_size);
384 }
385
386 return WalWriter(f, path, next_seq, bytes_written, opts);
387 }
388
397 [[nodiscard]] expected<int64_t> append(const uint8_t* data, size_t size) {
398 std::lock_guard<std::mutex> lk(mu_);
399 if (closed_)
400 return Error{ErrorCode::IO_ERROR, "WalWriter: already closed"};
401 if (size == 0) {
402 ++rejected_empty_count_;
403 return Error{ErrorCode::IO_ERROR, "WalWriter: empty record rejected"};
404 }
405 if (size > WAL_MAX_RECORD_SIZE)
406 return Error{ErrorCode::INVALID_ARGUMENT, "WAL record exceeds maximum size"};
407 if (size > static_cast<size_t>(UINT32_MAX))
408 return Error{ErrorCode::IO_ERROR, "WalWriter: record too large"};
409
410 const int64_t seq = next_seq_;
411 const int64_t ts = detail::now_ns();
412 const uint32_t dsz = static_cast<uint32_t>(size);
413
414 // Build header for CRC: magic(4) + seq(8) + ts(8) + size(4) = 24 bytes
415 uint8_t hdr[24];
416 detail::write_le32(hdr, WAL_RECORD_MAGIC);
417 detail::write_le64(hdr + 4, static_cast<uint64_t>(seq));
418 detail::write_le64(hdr + 12, static_cast<uint64_t>(ts));
419 detail::write_le32(hdr + 20, dsz);
420
421 // Compute CRC over header + data.
422 // We use a two-pass approach: hash header, then hash data.
423 // Since our crc32() requires a contiguous buffer, build one for header
424 // and extend manually using the table loop approach inline.
425 static constexpr auto make_table = []() {
426 std::array<uint32_t, 256> t{};
427 for (uint32_t i = 0; i < 256; ++i) {
428 uint32_t c = i;
429 for (int k = 0; k < 8; ++k)
430 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
431 t[i] = c;
432 }
433 return t;
434 };
435 static constexpr auto tbl = make_table();
436
437 uint32_t crc = 0xFFFFFFFFu;
438 for (size_t i = 0; i < 24; ++i)
439 crc = tbl[(crc ^ hdr[i]) & 0xFFu] ^ (crc >> 8);
440 for (size_t i = 0; i < size; ++i)
441 crc = tbl[(crc ^ data[i]) & 0xFFu] ^ (crc >> 8);
442 crc ^= 0xFFFFFFFFu;
443
444 uint8_t crc_buf[4];
445 detail::write_le32(crc_buf, crc);
446
447 // Write: header, data, crc.
448 if (std::fwrite(hdr, 1, 24, file_) != 24) return io_err("fwrite hdr");
449 if (size > 0)
450 if (std::fwrite(data, 1, size, file_) != size) return io_err("fwrite data");
451 if (std::fwrite(crc_buf, 1, 4, file_) != 4) return io_err("fwrite crc");
452
453 bytes_written_ += static_cast<int64_t>(28 + size);
454 ++next_seq_;
455
456 if (opts_.sync_on_append) {
457 if (std::fflush(file_) != 0)
458 return Error{ErrorCode::IO_ERROR, "WalWriter: fflush failed on sync_on_append"};
459 if (detail::full_fsync(::fileno(file_)) != 0)
460 return Error{ErrorCode::IO_ERROR, "WalWriter: fsync failed on sync_on_append"};
461 }
462
463 return seq;
464 }
465
469 [[nodiscard]] expected<int64_t> append(const std::vector<uint8_t>& data) {
470 return append(data.data(), data.size());
471 }
472
477 [[nodiscard]] expected<int64_t> append(const char* data, size_t size) {
478 return append(reinterpret_cast<const uint8_t*>(data), size);
479 }
480
484 [[nodiscard]] expected<int64_t> append(std::string_view sv) {
485 return append(reinterpret_cast<const uint8_t*>(sv.data()), sv.size());
486 }
487
495 [[nodiscard]] expected<void> flush(bool do_fsync = false) {
496 std::lock_guard<std::mutex> lk(mu_);
497 if (closed_)
498 return Error{ErrorCode::IO_ERROR, "WalWriter: already closed"};
499 if (std::fflush(file_) != 0)
500 return Error{ErrorCode::IO_ERROR, "WalWriter: fflush failed"};
501 if (do_fsync || opts_.sync_on_flush) {
502 if (detail::full_fsync(::fileno(file_)) != 0)
503 return Error{ErrorCode::IO_ERROR, "WalWriter: fsync failed"};
504 }
505 return {};
506 }
507
510 [[nodiscard]] expected<void> close() {
511 std::lock_guard<std::mutex> lk(mu_);
512 if (closed_) return {};
513 closed_ = true;
514 if (std::fflush(file_) != 0) {
515 std::fclose(file_); file_ = nullptr;
516 return Error{ErrorCode::IO_ERROR, "WalWriter: fflush failed on close"};
517 }
518 if (detail::full_fsync(::fileno(file_)) != 0) {
519 std::fclose(file_); file_ = nullptr;
520 return Error{ErrorCode::IO_ERROR, "WalWriter: fsync failed on close"};
521 }
522 std::fclose(file_);
523 file_ = nullptr;
524 return {};
525 }
526
528 [[nodiscard]] bool is_open() const noexcept { return !closed_; }
530 [[nodiscard]] int64_t next_seq() const noexcept { return next_seq_; }
532 [[nodiscard]] const std::string& path() const noexcept { return path_; }
534 [[nodiscard]] int64_t bytes_written() const noexcept { return bytes_written_; }
536 [[nodiscard]] uint64_t rejected_empty_count() const noexcept { return rejected_empty_count_; }
537
538private:
539 WalWriter(FILE* f, std::string path, int64_t next_seq,
540 int64_t bytes_written, Options opts)
541 : file_(f), path_(std::move(path)),
542 next_seq_(next_seq), bytes_written_(bytes_written),
543 opts_(opts), closed_(false) {}
544
545 static expected<int64_t> io_err(const char* ctx) {
546 return Error{ErrorCode::IO_ERROR,
547 std::string("WalWriter: ") + ctx + ": " + std::strerror(errno)};
548 }
549
550 FILE* file_ = nullptr;
551 std::string path_;
552 int64_t next_seq_ = 0;
553 int64_t bytes_written_ = 0;
554 uint64_t rejected_empty_count_ = 0;
555 Options opts_;
556 bool closed_ = false;
557 std::mutex mu_;
558};
559
560// ---------------------------------------------------------------------------
561// WalReader — sequential scan for crash recovery
562// ---------------------------------------------------------------------------
563
573public:
574 enum class ValidationMode : uint8_t {
575 BestEffort = 0,
576 Strict = 1
577 };
578
579 WalReader(const WalReader&) = delete;
580 WalReader& operator=(const WalReader&) = delete;
581 WalReader(WalReader&& o) noexcept
582 : file_(o.file_), path_(std::move(o.path_)),
583 last_seq_(o.last_seq_), count_(o.count_), offset_(o.offset_)
584 { o.file_ = nullptr; }
586
587 ~WalReader() { close(); }
588
598 const std::string& path,
599 ValidationMode mode = ValidationMode::BestEffort) {
600 FILE* f = std::fopen(path.c_str(), "rb");
601 if (!f)
602 return Error{ErrorCode::IO_ERROR,
603 std::string("WalReader: cannot open '") + path + "': " + std::strerror(errno)};
604
605 // Read and validate 16-byte file header.
606 char hdr[WAL_FILE_HDR_SIZE];
607 if (std::fread(hdr, 1, WAL_FILE_HDR_SIZE, f) != WAL_FILE_HDR_SIZE) {
608 std::fclose(f);
609 return Error{ErrorCode::INVALID_FILE, "WalReader: file too short for header"};
610 }
611 if (std::memcmp(hdr, WAL_FILE_MAGIC, WAL_FILE_HDR_SIZE) != 0) {
612 std::fclose(f);
613 return Error{ErrorCode::INVALID_FILE, "WalReader: bad file magic"};
614 }
615
616 return WalReader(f, path, WAL_FILE_HDR_SIZE, mode);
617 }
618
627 if (!file_) return std::optional<WalEntry>{std::nullopt};
628
629 // Read 24-byte record header.
630 uint8_t hdr[24];
631 size_t n = std::fread(hdr, 1, 24, file_);
632 if (n == 0) {
633 // True EOF or error.
634 if (std::ferror(file_))
635 return Error{ErrorCode::IO_ERROR, "WalReader: fread header failed"};
636 return std::optional<WalEntry>{std::nullopt}; // clean EOF
637 }
638 if (n < 24)
639 return soft_stop_or_error("truncated record header");
640
641 uint32_t magic = detail::read_le32(hdr);
642 if (magic != WAL_RECORD_MAGIC)
643 return soft_stop_or_error("bad record magic");
644
645 int64_t seq = static_cast<int64_t>(detail::read_le64(hdr + 4));
646 int64_t ts = static_cast<int64_t>(detail::read_le64(hdr + 12));
647 uint32_t data_sz = detail::read_le32(hdr + 20);
648
649 // CWE-400: Uncontrolled Resource Consumption — reject oversized records
650 // to prevent unbounded memory allocation from crafted data_sz values.
651 if (data_sz > WAL_MAX_RECORD_SIZE) {
652 return soft_stop_or_error("record size exceeds WAL_MAX_RECORD_SIZE");
653 }
654
655 // Read payload bytes into raw_buf (renamed to avoid collision with WalEntry::payload field).
656 std::vector<uint8_t> raw_buf(data_sz);
657 if (data_sz > 0) {
658 size_t nr = std::fread(raw_buf.data(), 1, data_sz, file_);
659 if (nr < data_sz) {
660 if (std::ferror(file_))
661 return Error{ErrorCode::IO_ERROR, "WalReader: fread payload failed"};
662 return soft_stop_or_error("truncated payload");
663 }
664 }
665
666 // Read stored CRC.
667 uint8_t crc_buf[4];
668 if (std::fread(crc_buf, 1, 4, file_) != 4)
669 return soft_stop_or_error("truncated record CRC");
670
671 uint32_t stored_crc = detail::read_le32(crc_buf);
672
673 // Compute expected CRC over header + data.
674 static constexpr auto make_table = []() {
675 std::array<uint32_t, 256> t{};
676 for (uint32_t i = 0; i < 256; ++i) {
677 uint32_t c = i;
678 for (int k = 0; k < 8; ++k)
679 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
680 t[i] = c;
681 }
682 return t;
683 };
684 static constexpr auto tbl = make_table();
685
686 uint32_t crc = 0xFFFFFFFFu;
687 for (size_t i = 0; i < 24; ++i)
688 crc = tbl[(crc ^ hdr[i]) & 0xFFu] ^ (crc >> 8);
689 for (size_t i = 0; i < data_sz; ++i)
690 crc = tbl[(crc ^ raw_buf[i]) & 0xFFu] ^ (crc >> 8);
691 crc ^= 0xFFFFFFFFu;
692
693 if (crc != stored_crc)
694 return soft_stop_or_error("record CRC mismatch");
695
696 offset_ += 24 + data_sz + 4;
697 last_seq_ = seq;
698 ++count_;
699
700 WalEntry entry;
701 entry.seq = seq;
702 entry.timestamp_ns = ts;
703 entry.payload = std::move(raw_buf);
704 return std::optional<WalEntry>{std::move(entry)};
705 }
706
714 std::vector<WalEntry> results;
715 while (true) {
716 auto res = next();
717 if (!res) return res.error();
718 if (!res->has_value()) break;
719 results.push_back(std::move(res->value()));
720 }
721 return results;
722 }
723
725 [[nodiscard]] int64_t last_seq() const noexcept { return last_seq_; }
727 [[nodiscard]] int64_t count() const noexcept { return count_; }
729 [[nodiscard]] size_t offset() const noexcept { return offset_; }
730
732 void close() {
733 if (file_) { std::fclose(file_); file_ = nullptr; }
734 }
735
736private:
737 [[nodiscard]] expected<std::optional<WalEntry>> soft_stop_or_error(
738 const char* reason) const {
739 if (validation_mode_ == ValidationMode::Strict) {
740 return Error{ErrorCode::CORRUPT_DATA,
741 std::string("WalReader: ") + reason +
742 " in '" + path_ + "' at offset " + std::to_string(offset_)};
743 }
744 return std::optional<WalEntry>{std::nullopt};
745 }
746
747 WalReader(FILE* f, std::string path, size_t initial_offset, ValidationMode mode)
748 : file_(f), path_(std::move(path)), offset_(initial_offset), validation_mode_(mode) {}
749
750 FILE* file_ = nullptr;
751 std::string path_;
752 int64_t last_seq_ = -1;
753 ValidationMode validation_mode_ = ValidationMode::BestEffort;
754 int64_t count_ = 0;
755 size_t offset_ = WAL_FILE_HDR_SIZE;
756};
757
758// ---------------------------------------------------------------------------
759// WalManagerOptions — options for WalManager::open()
760// (defined outside WalManager to avoid Apple Clang default-member-init bug)
761// ---------------------------------------------------------------------------
762
766enum class WalLifecycleMode : uint8_t {
767 Development = 0,
768 Benchmark = 1,
769 Production = 2
770};
771
778 size_t max_segment_bytes = 64 * 1024 * 1024;
779 size_t max_records = 1'000'000;
780 bool sync_on_append = false;
781 bool sync_on_roll = true;
782 bool strict_replay = true;
783 std::string file_prefix = "wal";
784 std::string file_ext = ".wal";
785
786 // Safety guardrails
787 WalLifecycleMode lifecycle_mode = WalLifecycleMode::Development;
788 bool reset_on_open = false;
789 bool require_checkpoint_before_prune = false;
791};
792
793// ---------------------------------------------------------------------------
794// WalManager — rolling WAL segments
795// ---------------------------------------------------------------------------
796
807public:
810
811 WalManager(const WalManager&) = delete;
812 WalManager& operator=(const WalManager&) = delete;
813 WalManager(WalManager&& o) noexcept
814 : dir_(std::move(o.dir_)), opts_(o.opts_),
815 segments_(std::move(o.segments_)),
816 writer_(std::move(o.writer_)),
817 global_seq_(o.global_seq_),
818 segment_record_count_(o.segment_record_count_),
819 total_records_(o.total_records_),
820 prune_checkpoint_ready_(o.prune_checkpoint_ready_),
821 closed_(o.closed_)
822 { o.closed_ = true; }
824
825 ~WalManager() { (void)close(); }
826
836 static expected<WalManager> open(const std::string& dir, Options opts = {}) {
837 // Ensure directory exists first.
838 std::error_code mk_ec;
839 std::filesystem::create_directories(dir, mk_ec);
840 if (mk_ec) {
841 std::error_code isdir_ec;
842 if (!std::filesystem::is_directory(dir, isdir_ec)) {
843 return Error{ErrorCode::IO_ERROR,
844 std::string("WalManager: cannot create dir '") + dir + "': " + mk_ec.message()};
845 }
846 }
847
848 if (opts.reset_on_open && opts.lifecycle_mode == WalLifecycleMode::Production) {
849 return Error{ErrorCode::IO_ERROR,
850 "WalManager: reset_on_open denied in production mode"};
851 }
852
853 if (opts.reset_on_open) {
854 auto reset_paths = scan_segments(dir, opts);
855 for (const auto& seg : reset_paths) {
856 std::error_code rm_ec;
857 std::filesystem::remove(seg, rm_ec);
858 if (rm_ec && rm_ec != std::make_error_code(std::errc::no_such_file_or_directory)) {
859 return Error{ErrorCode::IO_ERROR,
860 std::string("WalManager: reset remove '") + seg + "': " + rm_ec.message()};
861 }
862 }
863 }
864
865 // Discover existing WAL segment files in the directory.
866 std::vector<std::string> existing = scan_segments(dir, opts);
867
868 // Determine global_seq from the last segment if any exist.
869 int64_t global_seq = 0;
870 if (!existing.empty()) {
871 auto rd = WalReader::open(existing.back(),
872 opts.strict_replay ? WalReader::ValidationMode::Strict
873 : WalReader::ValidationMode::BestEffort);
874 if (!rd) {
875 if (opts.strict_replay) return rd.error();
876 } else {
877 auto entries = rd->read_all();
878 if (!entries) {
879 if (opts.strict_replay) return entries.error();
880 } else if (rd->last_seq() >= 0) {
881 global_seq = rd->last_seq() + 1;
882 }
883 }
884 }
885
886 // Count total records across all existing segments.
887 int64_t total = 0;
888 for (const auto& seg : existing) {
889 auto rd = WalReader::open(seg,
890 opts.strict_replay ? WalReader::ValidationMode::Strict
891 : WalReader::ValidationMode::BestEffort);
892 if (!rd) {
893 if (opts.strict_replay) return rd.error();
894 } else {
895 auto res = rd->read_all();
896 if (!res) {
897 if (opts.strict_replay) return res.error();
898 } else {
899 total += static_cast<int64_t>(res->size());
900 }
901 }
902 }
903
904 // Open a new active segment starting at the current global sequence number.
905 WalWriter::Options wopts;
906 wopts.sync_on_append = opts.sync_on_append;
907 wopts.sync_on_flush = opts.sync_on_roll;
908 wopts.start_seq = global_seq;
909
910 std::string seg_path = make_segment_path(dir, opts, global_seq);
911 auto writer = WalWriter::open(seg_path, wopts);
912 if (!writer) return writer.error();
913
914 WalManager mgr;
915 mgr.dir_ = dir;
916 mgr.opts_ = opts;
917 mgr.segments_ = std::move(existing);
918 mgr.segments_.push_back(seg_path);
919 mgr.writer_ = std::make_unique<WalWriter>(std::move(writer.value()));
920 mgr.global_seq_ = global_seq;
921 mgr.segment_record_count_ = 0;
922 mgr.total_records_ = total;
923 mgr.prune_checkpoint_ready_ = !opts.require_checkpoint_before_prune;
924 mgr.closed_ = false;
925 return mgr;
926 }
927
933 [[nodiscard]] expected<int64_t> append(const uint8_t* data, size_t size) {
934 std::lock_guard<std::mutex> lk(mu_);
935 if (closed_)
936 return Error{ErrorCode::IO_ERROR, "WalManager: already closed"};
937
938 // Check roll conditions.
939 bool need_roll = false;
940 if (opts_.max_records > 0 && static_cast<size_t>(segment_record_count_) >= opts_.max_records)
941 need_roll = true;
942 if (!need_roll && opts_.max_segment_bytes > 0) {
943 int64_t bw = writer_->bytes_written();
944 if (static_cast<size_t>(bw) + 28 + size > opts_.max_segment_bytes)
945 need_roll = true;
946 }
947
948 if (need_roll) {
949 auto rc = roll_locked();
950 if (!rc) return rc.error();
951 }
952
953 auto res = writer_->append(data, size);
954 if (!res) return res;
955 ++segment_record_count_;
956 ++total_records_;
957 ++global_seq_;
958 return res;
959 }
960
965 [[nodiscard]] expected<int64_t> append(const char* data, size_t size) {
966 return append(reinterpret_cast<const uint8_t*>(data), size);
967 }
968
972 [[nodiscard]] expected<int64_t> append(std::string_view sv) {
973 return append(reinterpret_cast<const uint8_t*>(sv.data()), sv.size());
974 }
975
978 [[nodiscard]] std::vector<std::string> segment_paths() const {
979 std::lock_guard<std::mutex> lk(mu_);
980 return segments_;
981 }
982
992 [[nodiscard]] expected<void> commit_compaction_checkpoint(const std::string& note = "") {
993 std::lock_guard<std::mutex> lk(mu_);
994 if (!opts_.require_checkpoint_before_prune) {
995 prune_checkpoint_ready_ = true;
996 return {};
997 }
998
999 if (opts_.checkpoint_manifest_path.empty()) {
1000 // Caller opts out of marker-file persistence and takes responsibility
1001 // for external durable checkpointing.
1002 prune_checkpoint_ready_ = true;
1003 return {};
1004 }
1005
1006 auto wr = write_manifest_atomic(opts_.checkpoint_manifest_path, note);
1007 if (!wr) return wr;
1008 prune_checkpoint_ready_ = true;
1009 return {};
1010 }
1011
1015 std::lock_guard<std::mutex> lk(mu_);
1016 prune_checkpoint_ready_ = !opts_.require_checkpoint_before_prune;
1017 }
1018
1027 [[nodiscard]] expected<void> remove_segment(const std::string& path) {
1028 std::lock_guard<std::mutex> lk(mu_);
1029 if (opts_.require_checkpoint_before_prune && !prune_checkpoint_ready_)
1030 return Error{ErrorCode::IO_ERROR,
1031 "WalManager: prune blocked until compaction checkpoint is committed"};
1032 // Do not remove the currently active segment.
1033 if (!segments_.empty() && segments_.back() == path)
1034 return Error{ErrorCode::IO_ERROR,
1035 "WalManager: cannot remove active segment"};
1036 {
1037 std::error_code ec;
1038 std::filesystem::remove(path, ec);
1039 if (ec && ec != std::make_error_code(std::errc::no_such_file_or_directory))
1040 return Error{ErrorCode::IO_ERROR,
1041 std::string("WalManager: remove '") + path + "': " + ec.message()};
1042 }
1043 auto it = std::find(segments_.begin(), segments_.end(), path);
1044 if (it != segments_.end()) segments_.erase(it);
1045 return {};
1046 }
1047
1049 [[nodiscard]] int64_t total_records() const noexcept {
1050 std::lock_guard<std::mutex> lk(mu_);
1051 return total_records_;
1052 }
1053
1056 [[nodiscard]] expected<void> close() {
1057 std::lock_guard<std::mutex> lk(mu_);
1058 if (closed_) return {};
1059 closed_ = true;
1060 if (writer_) {
1061 auto close_result = writer_->close();
1062 if (!close_result) return close_result.error();
1063 }
1064 return {};
1065 }
1066
1074 std::lock_guard<std::mutex> lk(mu_);
1075 // Flush active segment so the reader can see all written records.
1076 if (writer_ && !closed_) {
1077 auto flush_result = writer_->flush(false);
1078 if (!flush_result) return flush_result.error();
1079 }
1080 std::vector<WalEntry> result;
1081 for (const auto& seg : segments_) {
1082 auto rd = WalReader::open(seg,
1083 opts_.strict_replay ? WalReader::ValidationMode::Strict
1084 : WalReader::ValidationMode::BestEffort);
1085 if (!rd) {
1086 if (opts_.strict_replay) return rd.error();
1087 continue;
1088 }
1089 auto entries = rd->read_all();
1090 if (!entries) {
1091 if (opts_.strict_replay) return entries.error();
1092 continue;
1093 }
1094 for (auto& e : *entries)
1095 result.push_back(std::move(e));
1096 }
1097 return result;
1098 }
1099
1100private:
1101 WalManager() = default;
1102
1103 static expected<void> sync_parent_directory(const std::string& file_path) {
1104#if !defined(_WIN32)
1105 std::filesystem::path p(file_path);
1106 std::filesystem::path parent = p.parent_path();
1107 if (parent.empty()) return {};
1108
1109 int dfd = ::open(parent.string().c_str(), O_RDONLY);
1110 if (dfd < 0) {
1111 return Error{ErrorCode::IO_ERROR,
1112 std::string("WalManager: open parent dir failed: ") + std::strerror(errno)};
1113 }
1114 if (::fsync(dfd) != 0) {
1115 int e = errno;
1116 ::close(dfd);
1117 return Error{ErrorCode::IO_ERROR,
1118 std::string("WalManager: fsync parent dir failed: ") + std::strerror(e)};
1119 }
1120 ::close(dfd);
1121#endif
1122 return {};
1123 }
1124
1125 static expected<void> write_manifest_atomic(const std::string& path,
1126 const std::string& note) {
1127 std::filesystem::path p(path);
1128 if (p.empty()) {
1129 return Error{ErrorCode::IO_ERROR,
1130 "WalManager: checkpoint_manifest_path is empty"};
1131 }
1132
1133 std::error_code mk_ec;
1134 auto parent = p.parent_path();
1135 if (!parent.empty())
1136 std::filesystem::create_directories(parent, mk_ec);
1137 if (mk_ec) {
1138 return Error{ErrorCode::IO_ERROR,
1139 std::string("WalManager: cannot create manifest dir: ") + mk_ec.message()};
1140 }
1141
1142 const std::string tmp = path + ".tmp." + std::to_string(detail::now_ns());
1143 FILE* f = nullptr;
1144#ifdef _WIN32
1145 f = std::fopen(tmp.c_str(), "wb");
1146#else
1147 // CWE-732: Create manifest with explicit 0600 permissions.
1148 int mfd = ::open(tmp.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0600);
1149 if (mfd >= 0) f = ::fdopen(mfd, "wb");
1150#endif
1151 if (!f) {
1152 return Error{ErrorCode::IO_ERROR,
1153 std::string("WalManager: cannot open temp manifest '") + tmp + "': " + std::strerror(errno)};
1154 }
1155
1156 std::string body = "checkpoint_ns=" + std::to_string(detail::now_ns()) + "\n";
1157 if (!note.empty()) {
1158 body += "note=" + note + "\n";
1159 }
1160
1161 if (std::fwrite(body.data(), 1, body.size(), f) != body.size()) {
1162 std::fclose(f);
1163 std::remove(tmp.c_str());
1164 return Error{ErrorCode::IO_ERROR,
1165 std::string("WalManager: write manifest failed: ") + std::strerror(errno)};
1166 }
1167 if (std::fflush(f) != 0) {
1168 std::fclose(f);
1169 std::remove(tmp.c_str());
1170 return Error{ErrorCode::IO_ERROR,
1171 std::string("WalManager: fflush manifest failed: ") + std::strerror(errno)};
1172 }
1173 if (detail::full_fsync(::fileno(f)) != 0) {
1174 std::fclose(f);
1175 std::remove(tmp.c_str());
1176 return Error{ErrorCode::IO_ERROR,
1177 std::string("WalManager: fsync manifest failed: ") + std::strerror(errno)};
1178 }
1179 std::fclose(f);
1180
1181 std::error_code mv_ec;
1182 std::filesystem::rename(tmp, path, mv_ec);
1183 if (mv_ec) {
1184 std::remove(tmp.c_str());
1185 return Error{ErrorCode::IO_ERROR,
1186 std::string("WalManager: atomic rename manifest failed: ") + mv_ec.message()};
1187 }
1188
1189 auto sync_res = sync_parent_directory(path);
1190 if (!sync_res) return sync_res;
1191 return {};
1192 }
1193
1194 // Must be called with mu_ held.
1195 expected<void> roll_locked() {
1196 // Flush + fsync the current segment.
1197 if (opts_.sync_on_roll) {
1198 (void)writer_->flush(true);
1199 }
1200 (void)writer_->close();
1201
1202 // Open a new segment continuing the global sequence.
1203 WalWriter::Options wopts;
1204 wopts.sync_on_append = opts_.sync_on_append;
1205 wopts.sync_on_flush = opts_.sync_on_roll;
1206 wopts.start_seq = global_seq_;
1207
1208 std::string seg_path = make_segment_path(dir_, opts_, global_seq_);
1209 auto writer = WalWriter::open(seg_path, wopts);
1210 if (!writer) return writer.error();
1211
1212 writer_ = std::make_unique<WalWriter>(std::move(writer.value()));
1213 segments_.push_back(seg_path);
1214 segment_record_count_ = 0;
1215 return {};
1216 }
1217
1218 // Generate segment filename: {prefix}_{seq_start}_{timestamp_ms}.{ext}
1219 static std::string make_segment_path(const std::string& dir,
1220 const Options& opts,
1221 int64_t seq_start) {
1222 // timestamp in milliseconds for the filename (cross-platform).
1223 struct timespec ts{};
1224#if defined(_WIN32)
1225 timespec_get(&ts, TIME_UTC);
1226#else
1227 ::clock_gettime(CLOCK_REALTIME, &ts);
1228#endif
1229 int64_t ts_ms = static_cast<int64_t>(ts.tv_sec) * 1000LL
1230 + ts.tv_nsec / 1'000'000LL;
1231
1232 // Build: dir + "/" + prefix + "_" + seq + "_" + ts_ms + ext
1233 std::string name = dir + "/" + opts.file_prefix
1234 + "_" + std::to_string(seq_start)
1235 + "_" + std::to_string(ts_ms)
1236 + opts.file_ext;
1237 return name;
1238 }
1239
1240 // Scan directory for existing WAL segments matching prefix/ext.
1241 // Returns sorted list by embedded seq_start (numeric parse of filename).
1242 // Uses std::filesystem::directory_iterator (cross-platform; no opendir/readdir).
1243 static std::vector<std::string> scan_segments(const std::string& dir,
1244 const Options& opts) {
1245 std::vector<std::pair<int64_t, std::string>> found;
1246 std::error_code ec;
1247 for (const auto& entry : std::filesystem::directory_iterator(dir, ec)) {
1248 if (ec) break;
1249 if (!entry.is_regular_file()) continue;
1250 const std::string fname = entry.path().filename().string();
1251 // Must start with prefix + "_" and end with ext.
1252 const std::string pfx = opts.file_prefix + "_";
1253 if (fname.size() <= pfx.size() + opts.file_ext.size()) continue;
1254 if (fname.substr(0, pfx.size()) != pfx) continue;
1255 if (fname.substr(fname.size() - opts.file_ext.size()) != opts.file_ext) continue;
1256 // Extract seq_start from either:
1257 // prefix_<seq>_<ts>.wal (WalManager format)
1258 // prefix_<seq>.wal (WalMmapWriter format)
1259 size_t p1 = pfx.size();
1260 size_t ext_pos = fname.size() - opts.file_ext.size();
1261 if (ext_pos <= p1) continue;
1262 size_t p2 = fname.find('_', p1);
1263
1264 std::string seq_str;
1265 if (p2 == std::string::npos || p2 > ext_pos) {
1266 seq_str = fname.substr(p1, ext_pos - p1);
1267 } else {
1268 seq_str = fname.substr(p1, p2 - p1);
1269 }
1270
1271 int64_t seq_start = 0;
1272 try { seq_start = std::stoll(seq_str); } catch (...) { continue; }
1273 found.emplace_back(seq_start, entry.path().string());
1274 }
1275 std::sort(found.begin(), found.end(),
1276 [](const auto& a, const auto& b) { return a.first < b.first; });
1277 std::vector<std::string> paths;
1278 paths.reserve(found.size());
1279 for (auto& [seq, p] : found) paths.push_back(std::move(p));
1280 return paths;
1281 }
1282
1283 std::string dir_;
1284 Options opts_;
1285 std::vector<std::string> segments_;
1286 std::unique_ptr<WalWriter> writer_;
1287 int64_t global_seq_ = 0;
1288 int64_t segment_record_count_ = 0;
1289 int64_t total_records_ = 0;
1290 bool prune_checkpoint_ready_ = true;
1291 bool closed_ = false;
1292 mutable std::mutex mu_;
1293};
1294
1295} // namespace signet::forge
1296
1297// ---------------------------------------------------------------------------
1298// Memory-mapped WAL segment and ring writer.
1299// Included here so detail_mmap:: helpers have access to the signet::forge
1300// namespace already opened above; WalMmapWriter produces files readable by
1301// WalReader without any changes to the reader.
1302// Excluded on Emscripten — WalMmapWriter requires std::thread + POSIX mmap.
1303// ---------------------------------------------------------------------------
1304#ifndef __EMSCRIPTEN__
1306#endif
1307
Manages multiple rolling WAL segment files in a directory.
Definition wal.hpp:806
WalManager & operator=(const WalManager &)=delete
WalManager(const WalManager &)=delete
static expected< WalManager > open(const std::string &dir, Options opts={})
Open a WAL directory, discovering existing segments and creating a new active segment ready for writi...
Definition wal.hpp:836
expected< void > commit_compaction_checkpoint(const std::string &note="")
Record that an external compaction checkpoint has been made durable, allowing old fully-compacted WAL...
Definition wal.hpp:992
WalManager(WalManager &&o) noexcept
Definition wal.hpp:813
expected< std::vector< WalEntry > > read_all()
Read all WAL entries across all segments (sealed and active).
Definition wal.hpp:1073
WalManager & operator=(WalManager &&)=delete
expected< void > remove_segment(const std::string &path)
Remove a fully-compacted segment from disk and from the tracking list.
Definition wal.hpp:1027
int64_t total_records() const noexcept
Total number of records written across all segments (including rolled ones).
Definition wal.hpp:1049
expected< int64_t > append(const uint8_t *data, size_t size)
Append a record to the current segment, rolling to a new segment if needed.
Definition wal.hpp:933
std::vector< std::string > segment_paths() const
List all WAL segment paths in sequence order (oldest first).
Definition wal.hpp:978
void clear_compaction_checkpoint()
Reset the checkpoint-ready flag so subsequent prune calls are blocked again until the next commit_com...
Definition wal.hpp:1014
expected< int64_t > append(const char *data, size_t size)
Append a record from a char pointer.
Definition wal.hpp:965
expected< void > close()
Close the manager and the active segment writer.
Definition wal.hpp:1056
expected< int64_t > append(std::string_view sv)
Append a record from a string_view.
Definition wal.hpp:972
Sequential WAL file reader for crash recovery and replay.
Definition wal.hpp:572
WalReader & operator=(WalReader &&)=delete
static expected< WalReader > open(const std::string &path, ValidationMode mode=ValidationMode::BestEffort)
Open a WAL file for sequential reading.
Definition wal.hpp:597
expected< std::optional< WalEntry > > next()
Read the next WAL entry from the file.
Definition wal.hpp:626
size_t offset() const noexcept
Current byte offset in the file (past the last read record).
Definition wal.hpp:729
WalReader(WalReader &&o) noexcept
Definition wal.hpp:581
WalReader(const WalReader &)=delete
void close()
Close the underlying file handle.
Definition wal.hpp:732
WalReader & operator=(const WalReader &)=delete
int64_t count() const noexcept
Number of records successfully read so far.
Definition wal.hpp:727
int64_t last_seq() const noexcept
Sequence number of the last successfully read record (-1 if none).
Definition wal.hpp:725
expected< std::vector< WalEntry > > read_all()
Read all valid entries from the current position to end-of-valid-data.
Definition wal.hpp:713
Append-only Write-Ahead Log writer with CRC-32 integrity per record.
Definition wal.hpp:236
WalWriter(WalWriter &&o) noexcept
Definition wal.hpp:244
expected< int64_t > append(const std::vector< uint8_t > &data)
Append a record from a byte vector.
Definition wal.hpp:469
static expected< WalWriter > open(const std::string &path, Options opts={})
Open or create a WAL file for appending.
Definition wal.hpp:270
WalWriter & operator=(WalWriter &&)=delete
const std::string & path() const noexcept
Filesystem path of the WAL file.
Definition wal.hpp:532
int64_t bytes_written() const noexcept
Total bytes written to this WAL file (header + all records).
Definition wal.hpp:534
uint64_t rejected_empty_count() const noexcept
Number of empty records rejected (CWE-754).
Definition wal.hpp:536
int64_t next_seq() const noexcept
The sequence number that will be assigned to the next appended record.
Definition wal.hpp:530
expected< int64_t > append(const char *data, size_t size)
Append a record from a char pointer.
Definition wal.hpp:477
WalWriterOptions Options
Alias for the options struct.
Definition wal.hpp:239
WalWriter(const WalWriter &)=delete
WalWriter & operator=(const WalWriter &)=delete
expected< int64_t > append(const uint8_t *data, size_t size)
Append a raw-byte record to the WAL.
Definition wal.hpp:397
expected< int64_t > append(std::string_view sv)
Append a record from a string_view.
Definition wal.hpp:484
expected< void > close()
Seal the WAL file: flush buffered data, fsync, and close the file handle.
Definition wal.hpp:510
expected< void > flush(bool do_fsync=false)
Flush the stdio buffer to the OS page cache.
Definition wal.hpp:495
bool is_open() const noexcept
True if the writer is open and accepting appends.
Definition wal.hpp:528
A lightweight result type that holds either a success value of type T or an Error.
Definition error.hpp:145
uint32_t read_le32(const uint8_t *src) noexcept
Read a 32-bit unsigned integer from little-endian byte order.
Definition wal.hpp:155
uint32_t crc32(const void *data, size_t length) noexcept
Compute CRC-32 over a contiguous byte buffer (polynomial 0xEDB88320).
Definition wal.hpp:85
int full_fsync(int fd) noexcept
Force durable flush to storage media.
Definition wal.hpp:180
int64_t now_ns() noexcept
Return nanoseconds since Unix epoch (cross-platform).
Definition wal.hpp:120
void write_le32(uint8_t *dst, uint32_t v) noexcept
Write a 32-bit unsigned integer in little-endian byte order.
Definition wal.hpp:133
uint32_t crc32_combine(uint32_t crc_a, const void *data_b, size_t len_b) noexcept
Combine two CRC regions without concatenating buffers.
Definition wal.hpp:108
void write_le64(uint8_t *dst, uint64_t v) noexcept
Write a 64-bit unsigned integer in little-endian byte order.
Definition wal.hpp:142
uint64_t read_le64(const uint8_t *src) noexcept
Read a 64-bit unsigned integer from little-endian byte order.
Definition wal.hpp:164
WalLifecycleMode
Controls safety guardrails for WAL segment lifecycle operations.
Definition wal.hpp:766
@ Development
Permissive: allows reset_on_open.
@ Benchmark
Same as Development, for benchmark harnesses.
@ Production
Strict: reset_on_open is rejected.
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ INVALID_ARGUMENT
A caller-supplied argument is outside the valid range or violates a precondition.
Lightweight error value carrying an ErrorCode and a human-readable message.
Definition error.hpp:101
std::string message
A human-readable description of what went wrong (may be empty for OK).
Definition error.hpp:105
A single decoded WAL record returned by WalReader::next() or read_all().
Definition wal.hpp:200
int64_t seq
Sequence number (0-based, monotonically increasing)
Definition wal.hpp:201
std::vector< uint8_t > payload
Raw record bytes (application-defined content)
Definition wal.hpp:203
int64_t timestamp_ns
Wall-clock timestamp in nanoseconds since Unix epoch.
Definition wal.hpp:202
Configuration options for WalManager::open().
Definition wal.hpp:777
std::string checkpoint_manifest_path
Path for atomic checkpoint manifest file.
Definition wal.hpp:790
Configuration options for WalWriter::open().
Definition wal.hpp:218
int64_t start_seq
First sequence number for brand-new files.
Definition wal.hpp:222
bool sync_on_flush
If true, fsync on explicit flush() calls.
Definition wal.hpp:220
size_t buffer_size
stdio setvbuf buffer size in bytes
Definition wal.hpp:221
bool sync_on_append
If true, fsync after every record append.
Definition wal.hpp:219
Parquet format enumerations, type traits, and statistics structs.
Cross-platform memory-mapped WAL segment and ring writer.