Signet Forge 0.1.1
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
wal.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
3// wal.hpp — Write-Ahead Log for SignetStack Signet Forge
4// Sub-millisecond append with crash recovery and segment rolling.
5// Phase 8: Streaming WAL + Async Compaction.
6
10
11#pragma once
12
13#include <algorithm>
14#include <array>
15#include <cassert>
16#include <cstdint>
17#include <cstdio>
18#include <cstring>
19#include <ctime>
20#include <filesystem>
21#include <memory>
22#include <mutex>
23#include <optional>
24#include <string>
25#include <utility>
26#include <vector>
27
28#if !defined(_WIN32)
29# include <fcntl.h>
30# include <unistd.h>
31#endif
32#if defined(_WIN32)
33# ifndef WIN32_LEAN_AND_MEAN
34# define WIN32_LEAN_AND_MEAN
35# endif
36# ifndef NOMINMAX
37# define NOMINMAX
38# endif
39# include <windows.h>
40# include <io.h> // _get_osfhandle, _fileno
41#endif
42
43#include "signet/error.hpp"
44#include "signet/types.hpp"
45
46namespace signet::forge {
47
48// ---------------------------------------------------------------------------
49// WAL record on-disk layout:
50// [magic(4)] [seq_num(8)] [timestamp_ns(8)] [data_size(4)] [data(N)] [crc32(4)]
51// Total overhead per record: 28 bytes.
52// ---------------------------------------------------------------------------
53
55static constexpr uint32_t WAL_RECORD_MAGIC = 0x57414C31u; // "WAL1"
56
58static constexpr uint16_t WAL_FILE_HDR_SIZE = 16;
59
61static constexpr char WAL_FILE_MAGIC[16] = "SIGNETWAL1\0\0\0\0\0"; // written once at pos 0
62
67static constexpr uint32_t WAL_MAX_RECORD_SIZE = 64u * 1024u * 1024u; // 64 MB hard cap per record
68
69// ---------------------------------------------------------------------------
70// detail::crc32 — standard CRC-32 (polynomial 0xEDB88320), table-driven
71// ---------------------------------------------------------------------------
72namespace detail {
73
85inline uint32_t crc32(const void* data, size_t length) noexcept {
86 // Build table on first call (lazy-init, thread-safe via constinit table below)
87 static constexpr auto make_table = []() {
88 std::array<uint32_t, 256> t{};
89 for (uint32_t i = 0; i < 256; ++i) {
90 uint32_t c = i;
91 for (int k = 0; k < 8; ++k)
92 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
93 t[i] = c;
94 }
95 return t;
96 };
97 static constexpr auto table = make_table();
98
99 uint32_t crc = 0xFFFFFFFFu;
100 const auto* p = static_cast<const uint8_t*>(data);
101 for (size_t i = 0; i < length; ++i)
102 crc = table[(crc ^ p[i]) & 0xFFu] ^ (crc >> 8);
103 return crc ^ 0xFFFFFFFFu;
104}
105
108inline uint32_t crc32_combine(uint32_t crc_a, const void* data_b, size_t len_b) noexcept {
109 // We want CRC over the combined region. Since we already have crc_a,
110 // we can't extend incrementally without the table's running state.
111 // This helper is intentionally unused in favour of the full-buffer variant;
112 // kept as a hook for future incremental use.
113 (void)crc_a; (void)data_b; (void)len_b;
114 return 0;
115}
116
120inline int64_t now_ns() noexcept {
121 struct timespec ts{};
122#if defined(_WIN32)
123 timespec_get(&ts, TIME_UTC);
124#else
125 ::clock_gettime(CLOCK_REALTIME, &ts);
126#endif
127 return static_cast<int64_t>(ts.tv_sec) * 1'000'000'000LL + ts.tv_nsec;
128}
129
133inline void write_le32(uint8_t* dst, uint32_t v) noexcept {
134 dst[0] = static_cast<uint8_t>(v);
135 dst[1] = static_cast<uint8_t>(v >> 8);
136 dst[2] = static_cast<uint8_t>(v >> 16);
137 dst[3] = static_cast<uint8_t>(v >> 24);
138}
142inline void write_le64(uint8_t* dst, uint64_t v) noexcept {
143 dst[0] = static_cast<uint8_t>(v);
144 dst[1] = static_cast<uint8_t>(v >> 8);
145 dst[2] = static_cast<uint8_t>(v >> 16);
146 dst[3] = static_cast<uint8_t>(v >> 24);
147 dst[4] = static_cast<uint8_t>(v >> 32);
148 dst[5] = static_cast<uint8_t>(v >> 40);
149 dst[6] = static_cast<uint8_t>(v >> 48);
150 dst[7] = static_cast<uint8_t>(v >> 56);
151}
155inline uint32_t read_le32(const uint8_t* src) noexcept {
156 return static_cast<uint32_t>(src[0])
157 | (static_cast<uint32_t>(src[1]) << 8)
158 | (static_cast<uint32_t>(src[2]) << 16)
159 | (static_cast<uint32_t>(src[3]) << 24);
160}
164inline uint64_t read_le64(const uint8_t* src) noexcept {
165 return static_cast<uint64_t>(src[0])
166 | (static_cast<uint64_t>(src[1]) << 8)
167 | (static_cast<uint64_t>(src[2]) << 16)
168 | (static_cast<uint64_t>(src[3]) << 24)
169 | (static_cast<uint64_t>(src[4]) << 32)
170 | (static_cast<uint64_t>(src[5]) << 40)
171 | (static_cast<uint64_t>(src[6]) << 48)
172 | (static_cast<uint64_t>(src[7]) << 56);
173}
174
180inline int full_fsync(int fd) noexcept {
181#if defined(__APPLE__)
182 return ::fcntl(fd, F_FULLFSYNC);
183#elif defined(_WIN32)
184 HANDLE h = reinterpret_cast<HANDLE>(
185 _get_osfhandle(static_cast<int>(fd)));
186 if (h == INVALID_HANDLE_VALUE) return -1;
187 return FlushFileBuffers(h) ? 0 : -1;
188#else
189 return ::fsync(fd);
190#endif
191}
192
193} // namespace detail
194
195// ---------------------------------------------------------------------------
196// WalEntry — one decoded record returned by WalReader
197// ---------------------------------------------------------------------------
198
200struct WalEntry {
201 int64_t seq = -1;
202 int64_t timestamp_ns = 0;
203 std::vector<uint8_t> payload;
204};
205
208
209// ---------------------------------------------------------------------------
210// WalWriterOptions — options for WalWriter::open()
211// (defined outside WalWriter to avoid Apple Clang default-member-init bug)
212// ---------------------------------------------------------------------------
213
219 bool sync_on_append = false;
220 bool sync_on_flush = true;
221 size_t buffer_size = 65536;
222 int64_t start_seq = 0;
223};
224
225// ---------------------------------------------------------------------------
226// WalWriter — append-only WAL file writer
227// ---------------------------------------------------------------------------
228
237public:
240
241 // Non-copyable, movable.
242 WalWriter(const WalWriter&) = delete;
243 WalWriter& operator=(const WalWriter&) = delete;
244 WalWriter(WalWriter&& o) noexcept
245 : file_(nullptr), closed_(true)
246 {
247 std::lock_guard<std::mutex> lk(o.mu_);
248 file_ = o.file_;
249 path_ = std::move(o.path_);
250 next_seq_ = o.next_seq_;
251 bytes_written_ = o.bytes_written_;
252 opts_ = o.opts_;
253 closed_ = o.closed_;
254 o.file_ = nullptr;
255 o.closed_ = true;
256 }
258
259 ~WalWriter() { if (!closed_) (void)close(); }
260
270 static expected<WalWriter> open(const std::string& path, Options opts = {}) {
271 // Try to open existing file for appending.
272 FILE* f = nullptr;
273#ifdef _WIN32
274 f = std::fopen(path.c_str(), "a+b");
275#else
276 // CWE-732: Open with explicit 0600 permissions to prevent world-writable files.
277 int fd = ::open(path.c_str(), O_RDWR | O_CREAT | O_APPEND, 0600);
278 if (fd >= 0) f = ::fdopen(fd, "a+b");
279#endif
280 if (!f)
282 std::string("WalWriter: cannot open '") + path + "': " + std::strerror(errno)};
283
284 // Apply stdio buffer size.
285 if (opts.buffer_size > 0)
286 std::setvbuf(f, nullptr, _IOFBF, opts.buffer_size);
287
288 // Determine current file size to decide whether to write the file header.
289 // H-10: Use platform-specific 64-bit seek to avoid long overflow
290 // on Windows LLP64 where long is 32-bit.
291#ifdef _WIN32
292 if (_fseeki64(f, 0, SEEK_END) != 0) {
293#else
294 if (fseeko(f, 0, SEEK_END) != 0) {
295#endif
296 std::fclose(f);
297 return Error{ErrorCode::IO_ERROR, "WalWriter: fseek failed"};
298 }
299 // L17: use 64-bit ftell on Windows for files > 2 GB.
300 // Note: _ftelli64 is required on Windows LLP64 data model where
301 // long is 32-bit; standard ftell would silently truncate offsets > 2 GB.
302#ifdef _WIN32
303 int64_t file_size = _ftelli64(f);
304#else
305 long file_size = std::ftell(f);
306#endif
307 if (file_size < 0) {
308 std::fclose(f);
309 return Error{ErrorCode::IO_ERROR, "WalWriter: ftell failed"};
310 }
311
312 int64_t next_seq = 0;
313 int64_t bytes_written = 0;
314
315 if (file_size == 0) {
316 // New file — start sequence at opts.start_seq (for segment continuity).
317 next_seq = opts.start_seq;
318 // Write 16-byte file header.
319 if (std::fwrite(WAL_FILE_MAGIC, 1, WAL_FILE_HDR_SIZE, f) != WAL_FILE_HDR_SIZE) {
320 std::fclose(f);
321 return Error{ErrorCode::IO_ERROR, "WalWriter: failed to write file header"};
322 }
323 std::fflush(f);
324 } else {
325 // Existing file — scan to determine the highest seq_num so we can
326 // continue the sequence without a gap.
327 std::fclose(f);
328 f = nullptr;
329
330 // Scan via a temporary reader to find next_seq.
331 FILE* rf = std::fopen(path.c_str(), "rb");
332 if (rf) {
333 // Skip file header.
334 // H-10: Use platform-specific 64-bit seek
335#ifdef _WIN32
336 _fseeki64(rf, WAL_FILE_HDR_SIZE, SEEK_SET);
337#else
338 fseeko(rf, WAL_FILE_HDR_SIZE, SEEK_SET);
339#endif
340 // Scan forward collecting seq numbers.
341 int64_t last_seq = -1;
342 while (true) {
343 uint8_t hdr[24]; // magic(4)+seq(8)+ts(8)+size(4)
344 if (std::fread(hdr, 1, 24, rf) != 24) break;
345 uint32_t magic = detail::read_le32(hdr);
346 if (magic != WAL_RECORD_MAGIC) break;
347 int64_t seq = static_cast<int64_t>(detail::read_le64(hdr + 4));
348 uint32_t data_sz = detail::read_le32(hdr + 20);
349 // CWE-400: reject corrupt/oversized records during resume scan
350 // (H18+L18: prevents unbounded fseek from crafted data_sz).
351 if (data_sz > WAL_MAX_RECORD_SIZE) break;
352 // Read data + CRC and verify integrity
353 std::vector<uint8_t> record_data(data_sz);
354 if (data_sz > 0 && std::fread(record_data.data(), 1, data_sz, rf) != data_sz) break;
355 uint8_t crc_buf[4];
356 if (std::fread(crc_buf, 1, 4, rf) != 4) break;
357 uint32_t stored_crc = detail::read_le32(crc_buf);
358 // CRC covers header(24) + data
359 std::vector<uint8_t> combined(24 + data_sz);
360 std::memcpy(combined.data(), hdr, 24);
361 if (data_sz > 0)
362 std::memcpy(combined.data() + 24, record_data.data(), data_sz);
363 uint32_t computed_crc = detail::crc32(combined.data(), combined.size());
364 if (computed_crc != stored_crc) break; // reject records with bad CRC
365 last_seq = seq;
366 }
367 std::fclose(rf);
368 if (last_seq >= 0) next_seq = last_seq + 1;
369 }
370
371 // Re-open for appending.
372#ifdef _WIN32
373 f = std::fopen(path.c_str(), "ab");
374#else
375 // CWE-732: Reopen with explicit 0600 permissions.
376 int fd2 = ::open(path.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0600);
377 if (fd2 >= 0) f = ::fdopen(fd2, "ab");
378#endif
379 if (!f)
380 return Error{ErrorCode::IO_ERROR,
381 std::string("WalWriter: cannot reopen '") + path + "': " + std::strerror(errno)};
382 if (opts.buffer_size > 0)
383 std::setvbuf(f, nullptr, _IOFBF, opts.buffer_size);
384 }
385
386 return WalWriter(f, path, next_seq, bytes_written, opts);
387 }
388
397 [[nodiscard]] expected<int64_t> append(const uint8_t* data, size_t size) {
398 std::lock_guard<std::mutex> lk(mu_);
399 if (closed_)
400 return Error{ErrorCode::IO_ERROR, "WalWriter: already closed"};
401 if (size == 0) {
402 ++rejected_empty_count_;
403 return Error{ErrorCode::IO_ERROR, "WalWriter: empty record rejected"};
404 }
405 if (size > WAL_MAX_RECORD_SIZE)
406 return Error{ErrorCode::INVALID_ARGUMENT, "WAL record exceeds maximum size"};
407 if (size > static_cast<size_t>(UINT32_MAX))
408 return Error{ErrorCode::IO_ERROR, "WalWriter: record too large"};
409
410 const int64_t seq = next_seq_;
411 const int64_t ts = detail::now_ns();
412 const uint32_t dsz = static_cast<uint32_t>(size);
413
414 // Build header for CRC: magic(4) + seq(8) + ts(8) + size(4) = 24 bytes
415 uint8_t hdr[24];
416 detail::write_le32(hdr, WAL_RECORD_MAGIC);
417 detail::write_le64(hdr + 4, static_cast<uint64_t>(seq));
418 detail::write_le64(hdr + 12, static_cast<uint64_t>(ts));
419 detail::write_le32(hdr + 20, dsz);
420
421 // Compute CRC over header + data.
422 // We use a two-pass approach: hash header, then hash data.
423 // Since our crc32() requires a contiguous buffer, build one for header
424 // and extend manually using the table loop approach inline.
425 static constexpr auto make_table = []() {
426 std::array<uint32_t, 256> t{};
427 for (uint32_t i = 0; i < 256; ++i) {
428 uint32_t c = i;
429 for (int k = 0; k < 8; ++k)
430 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
431 t[i] = c;
432 }
433 return t;
434 };
435 static constexpr auto tbl = make_table();
436
437 uint32_t crc = 0xFFFFFFFFu;
438 for (size_t i = 0; i < 24; ++i)
439 crc = tbl[(crc ^ hdr[i]) & 0xFFu] ^ (crc >> 8);
440 for (size_t i = 0; i < size; ++i)
441 crc = tbl[(crc ^ data[i]) & 0xFFu] ^ (crc >> 8);
442 crc ^= 0xFFFFFFFFu;
443
444 uint8_t crc_buf[4];
445 detail::write_le32(crc_buf, crc);
446
447 // Write: header, data, crc.
448 if (std::fwrite(hdr, 1, 24, file_) != 24) return io_err("fwrite hdr");
449 if (size > 0)
450 if (std::fwrite(data, 1, size, file_) != size) return io_err("fwrite data");
451 if (std::fwrite(crc_buf, 1, 4, file_) != 4) return io_err("fwrite crc");
452
453 bytes_written_ += static_cast<int64_t>(28 + size);
454 ++next_seq_;
455
456 if (opts_.sync_on_append) {
457 if (std::fflush(file_) != 0)
458 return Error{ErrorCode::IO_ERROR, "WalWriter: fflush failed on sync_on_append"};
459 if (detail::full_fsync(::fileno(file_)) != 0)
460 return Error{ErrorCode::IO_ERROR, "WalWriter: fsync failed on sync_on_append"};
461 }
462
463 return seq;
464 }
465
469 [[nodiscard]] expected<int64_t> append(const std::vector<uint8_t>& data) {
470 return append(data.data(), data.size());
471 }
472
477 [[nodiscard]] expected<int64_t> append(const char* data, size_t size) {
478 return append(reinterpret_cast<const uint8_t*>(data), size);
479 }
480
484 [[nodiscard]] expected<int64_t> append(std::string_view sv) {
485 return append(reinterpret_cast<const uint8_t*>(sv.data()), sv.size());
486 }
487
495 [[nodiscard]] expected<void> flush(bool do_fsync = false) {
496 std::lock_guard<std::mutex> lk(mu_);
497 if (closed_)
498 return Error{ErrorCode::IO_ERROR, "WalWriter: already closed"};
499 if (std::fflush(file_) != 0)
500 return Error{ErrorCode::IO_ERROR, "WalWriter: fflush failed"};
501 if (do_fsync || opts_.sync_on_flush) {
502 if (detail::full_fsync(::fileno(file_)) != 0)
503 return Error{ErrorCode::IO_ERROR, "WalWriter: fsync failed"};
504 }
505 return {};
506 }
507
510 [[nodiscard]] expected<void> close() {
511 std::lock_guard<std::mutex> lk(mu_);
512 if (closed_) return {};
513 closed_ = true;
514 if (std::fflush(file_) != 0) {
515 std::fclose(file_); file_ = nullptr;
516 return Error{ErrorCode::IO_ERROR, "WalWriter: fflush failed on close"};
517 }
518 if (detail::full_fsync(::fileno(file_)) != 0) {
519 std::fclose(file_); file_ = nullptr;
520 return Error{ErrorCode::IO_ERROR, "WalWriter: fsync failed on close"};
521 }
522 std::fclose(file_);
523 file_ = nullptr;
524 return {};
525 }
526
528 [[nodiscard]] bool is_open() const noexcept { return !closed_; }
530 [[nodiscard]] int64_t next_seq() const noexcept { return next_seq_; }
532 [[nodiscard]] const std::string& path() const noexcept { return path_; }
534 [[nodiscard]] int64_t bytes_written() const noexcept { return bytes_written_; }
536 [[nodiscard]] uint64_t rejected_empty_count() const noexcept { return rejected_empty_count_; }
537
538private:
539 WalWriter(FILE* f, std::string path, int64_t next_seq,
540 int64_t bytes_written, Options opts)
541 : file_(f), path_(std::move(path)),
542 next_seq_(next_seq), bytes_written_(bytes_written),
543 opts_(opts), closed_(false) {}
544
545 static expected<int64_t> io_err(const char* ctx) {
546 return Error{ErrorCode::IO_ERROR,
547 std::string("WalWriter: ") + ctx + ": " + std::strerror(errno)};
548 }
549
550 FILE* file_ = nullptr;
551 std::string path_;
552 int64_t next_seq_ = 0;
553 int64_t bytes_written_ = 0;
554 uint64_t rejected_empty_count_ = 0;
555 Options opts_;
556 bool closed_ = false;
557 std::mutex mu_;
558};
559
560// ---------------------------------------------------------------------------
561// WalReader — sequential scan for crash recovery
562// ---------------------------------------------------------------------------
563
573public:
574 enum class ValidationMode : uint8_t {
575 BestEffort = 0,
576 Strict = 1
577 };
578
579 WalReader(const WalReader&) = delete;
580 WalReader& operator=(const WalReader&) = delete;
581 WalReader(WalReader&& o) noexcept
582 : file_(o.file_), path_(std::move(o.path_)),
583 last_seq_(o.last_seq_), count_(o.count_), offset_(o.offset_)
584 { o.file_ = nullptr; }
586
587 ~WalReader() { close(); }
588
598 const std::string& path,
599 ValidationMode mode = ValidationMode::BestEffort) {
600 FILE* f = std::fopen(path.c_str(), "rb");
601 if (!f)
602 return Error{ErrorCode::IO_ERROR,
603 std::string("WalReader: cannot open '") + path + "': " + std::strerror(errno)};
604
605 // Read and validate 16-byte file header.
606 char hdr[WAL_FILE_HDR_SIZE];
607 if (std::fread(hdr, 1, WAL_FILE_HDR_SIZE, f) != WAL_FILE_HDR_SIZE) {
608 std::fclose(f);
609 return Error{ErrorCode::INVALID_FILE, "WalReader: file too short for header"};
610 }
611 if (std::memcmp(hdr, WAL_FILE_MAGIC, WAL_FILE_HDR_SIZE) != 0) {
612 std::fclose(f);
613 return Error{ErrorCode::INVALID_FILE, "WalReader: bad file magic"};
614 }
615
616 return WalReader(f, path, WAL_FILE_HDR_SIZE, mode);
617 }
618
627 if (!file_) return std::optional<WalEntry>{std::nullopt};
628
629 // Read 24-byte record header.
630 uint8_t hdr[24];
631 size_t n = std::fread(hdr, 1, 24, file_);
632 if (n == 0) {
633 // True EOF or error.
634 if (std::ferror(file_))
635 return Error{ErrorCode::IO_ERROR, "WalReader: fread header failed"};
636 return std::optional<WalEntry>{std::nullopt}; // clean EOF
637 }
638 if (n < 24)
639 return soft_stop_or_error("truncated record header");
640
641 uint32_t magic = detail::read_le32(hdr);
642 if (magic != WAL_RECORD_MAGIC)
643 return soft_stop_or_error("bad record magic");
644
645 int64_t seq = static_cast<int64_t>(detail::read_le64(hdr + 4));
646 int64_t ts = static_cast<int64_t>(detail::read_le64(hdr + 12));
647 uint32_t data_sz = detail::read_le32(hdr + 20);
648
649 // CWE-400: Uncontrolled Resource Consumption — reject oversized records
650 // to prevent unbounded memory allocation from crafted data_sz values.
651 if (data_sz > WAL_MAX_RECORD_SIZE) {
652 return soft_stop_or_error("record size exceeds WAL_MAX_RECORD_SIZE");
653 }
654
655 // Read payload bytes into raw_buf (renamed to avoid collision with WalEntry::payload field).
656 std::vector<uint8_t> raw_buf(data_sz);
657 if (data_sz > 0) {
658 size_t nr = std::fread(raw_buf.data(), 1, data_sz, file_);
659 if (nr < data_sz) {
660 if (std::ferror(file_))
661 return Error{ErrorCode::IO_ERROR, "WalReader: fread payload failed"};
662 return soft_stop_or_error("truncated payload");
663 }
664 }
665
666 // Read stored CRC.
667 uint8_t crc_buf[4];
668 if (std::fread(crc_buf, 1, 4, file_) != 4)
669 return soft_stop_or_error("truncated record CRC");
670
671 uint32_t stored_crc = detail::read_le32(crc_buf);
672
673 // Compute expected CRC over header + data.
674 static constexpr auto make_table = []() {
675 std::array<uint32_t, 256> t{};
676 for (uint32_t i = 0; i < 256; ++i) {
677 uint32_t c = i;
678 for (int k = 0; k < 8; ++k)
679 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
680 t[i] = c;
681 }
682 return t;
683 };
684 static constexpr auto tbl = make_table();
685
686 uint32_t crc = 0xFFFFFFFFu;
687 for (size_t i = 0; i < 24; ++i)
688 crc = tbl[(crc ^ hdr[i]) & 0xFFu] ^ (crc >> 8);
689 for (size_t i = 0; i < data_sz; ++i)
690 crc = tbl[(crc ^ raw_buf[i]) & 0xFFu] ^ (crc >> 8);
691 crc ^= 0xFFFFFFFFu;
692
693 if (crc != stored_crc)
694 return soft_stop_or_error("record CRC mismatch");
695
696 offset_ += 24 + data_sz + 4;
697 last_seq_ = seq;
698 ++count_;
699
700 WalEntry entry;
701 entry.seq = seq;
702 entry.timestamp_ns = ts;
703 entry.payload = std::move(raw_buf);
704 return std::optional<WalEntry>{std::move(entry)};
705 }
706
714 std::vector<WalEntry> results;
715 while (true) {
716 auto res = next();
717 if (!res) return res.error();
718 if (!res->has_value()) break;
719 results.push_back(std::move(res->value()));
720 }
721 return results;
722 }
723
725 [[nodiscard]] int64_t last_seq() const noexcept { return last_seq_; }
727 [[nodiscard]] int64_t count() const noexcept { return count_; }
729 [[nodiscard]] size_t offset() const noexcept { return offset_; }
730
732 void close() {
733 if (file_) { std::fclose(file_); file_ = nullptr; }
734 }
735
736private:
737 [[nodiscard]] expected<std::optional<WalEntry>> soft_stop_or_error(
738 const char* reason) const {
739 if (validation_mode_ == ValidationMode::Strict) {
740 return Error{ErrorCode::CORRUPT_DATA,
741 std::string("WalReader: ") + reason +
742 " in '" + path_ + "' at offset " + std::to_string(offset_)};
743 }
744 return std::optional<WalEntry>{std::nullopt};
745 }
746
747 WalReader(FILE* f, std::string path, size_t initial_offset, ValidationMode mode)
748 : file_(f), path_(std::move(path)), offset_(initial_offset), validation_mode_(mode) {}
749
750 FILE* file_ = nullptr;
751 std::string path_;
752 int64_t last_seq_ = -1;
753 ValidationMode validation_mode_ = ValidationMode::BestEffort;
754 int64_t count_ = 0;
755 size_t offset_ = WAL_FILE_HDR_SIZE;
756};
757
758// ---------------------------------------------------------------------------
759// WalManagerOptions — options for WalManager::open()
760// (defined outside WalManager to avoid Apple Clang default-member-init bug)
761// ---------------------------------------------------------------------------
762
766enum class WalLifecycleMode : uint8_t {
767 Development = 0,
768 Benchmark = 1,
769 Production = 2
770};
771
778 size_t max_segment_bytes = 64 * 1024 * 1024;
779 size_t max_records = 1'000'000;
780 bool sync_on_append = false;
781 bool sync_on_roll = true;
782 bool strict_replay = true;
783 std::string file_prefix = "wal";
784 std::string file_ext = ".wal";
785
786 // Safety guardrails
787 WalLifecycleMode lifecycle_mode = WalLifecycleMode::Development;
788 bool reset_on_open = false;
789 bool require_checkpoint_before_prune = false;
791};
792
793// ---------------------------------------------------------------------------
794// WalManager — rolling WAL segments
795// ---------------------------------------------------------------------------
796
807public:
810
811 WalManager(const WalManager&) = delete;
812 WalManager& operator=(const WalManager&) = delete;
813 WalManager(WalManager&& o) noexcept
814 : dir_(std::move(o.dir_)), opts_(o.opts_),
815 segments_(std::move(o.segments_)),
816 writer_(std::move(o.writer_)),
817 global_seq_(o.global_seq_),
818 segment_record_count_(o.segment_record_count_),
819 total_records_(o.total_records_),
820 prune_checkpoint_ready_(o.prune_checkpoint_ready_),
821 closed_(o.closed_)
822 { o.closed_ = true; }
824
825 ~WalManager() { (void)close(); }
826
836 static expected<WalManager> open(const std::string& dir, Options opts = {}) {
837 // Ensure directory exists first.
838 std::error_code mk_ec;
839 std::filesystem::create_directories(dir, mk_ec);
840 if (mk_ec) {
841 std::error_code isdir_ec;
842 if (!std::filesystem::is_directory(dir, isdir_ec)) {
843 return Error{ErrorCode::IO_ERROR,
844 std::string("WalManager: cannot create dir '") + dir + "': " + mk_ec.message()};
845 }
846 }
847
848 // CWE-22: file_prefix is concatenated into segment filenames —
849 // reject path separators and traversal to prevent directory escape.
850 {
851 const auto& pfx = opts.file_prefix;
852 if (pfx.find('/') != std::string::npos ||
853 pfx.find('\\') != std::string::npos ||
854 pfx.find("..") != std::string::npos ||
855 pfx.find('\0') != std::string::npos)
856 return Error{ErrorCode::IO_ERROR,
857 "WalManager: file_prefix contains path separator or traversal"};
858 }
859
860 if (opts.reset_on_open && opts.lifecycle_mode == WalLifecycleMode::Production) {
861 return Error{ErrorCode::IO_ERROR,
862 "WalManager: reset_on_open denied in production mode"};
863 }
864
865 if (opts.reset_on_open) {
866 auto reset_paths = scan_segments(dir, opts);
867 for (const auto& seg : reset_paths) {
868 std::error_code rm_ec;
869 std::filesystem::remove(seg, rm_ec);
870 if (rm_ec && rm_ec != std::make_error_code(std::errc::no_such_file_or_directory)) {
871 return Error{ErrorCode::IO_ERROR,
872 std::string("WalManager: reset remove '") + seg + "': " + rm_ec.message()};
873 }
874 }
875 }
876
877 // Discover existing WAL segment files in the directory.
878 std::vector<std::string> existing = scan_segments(dir, opts);
879
880 // Determine global_seq from the last segment if any exist.
881 int64_t global_seq = 0;
882 if (!existing.empty()) {
883 auto rd = WalReader::open(existing.back(),
884 opts.strict_replay ? WalReader::ValidationMode::Strict
885 : WalReader::ValidationMode::BestEffort);
886 if (!rd) {
887 if (opts.strict_replay) return rd.error();
888 } else {
889 auto entries = rd->read_all();
890 if (!entries) {
891 if (opts.strict_replay) return entries.error();
892 } else if (rd->last_seq() >= 0) {
893 global_seq = rd->last_seq() + 1;
894 }
895 }
896 }
897
898 // Count total records across all existing segments.
899 int64_t total = 0;
900 for (const auto& seg : existing) {
901 auto rd = WalReader::open(seg,
902 opts.strict_replay ? WalReader::ValidationMode::Strict
903 : WalReader::ValidationMode::BestEffort);
904 if (!rd) {
905 if (opts.strict_replay) return rd.error();
906 } else {
907 auto res = rd->read_all();
908 if (!res) {
909 if (opts.strict_replay) return res.error();
910 } else {
911 total += static_cast<int64_t>(res->size());
912 }
913 }
914 }
915
916 // Open a new active segment starting at the current global sequence number.
917 WalWriter::Options wopts;
918 wopts.sync_on_append = opts.sync_on_append;
919 wopts.sync_on_flush = opts.sync_on_roll;
920 wopts.start_seq = global_seq;
921
922 std::string seg_path = make_segment_path(dir, opts, global_seq);
923 auto writer = WalWriter::open(seg_path, wopts);
924 if (!writer) return writer.error();
925
926 WalManager mgr;
927 mgr.dir_ = dir;
928 mgr.opts_ = opts;
929 mgr.segments_ = std::move(existing);
930 mgr.segments_.push_back(seg_path);
931 mgr.writer_ = std::make_unique<WalWriter>(std::move(writer.value()));
932 mgr.global_seq_ = global_seq;
933 mgr.segment_record_count_ = 0;
934 mgr.total_records_ = total;
935 mgr.prune_checkpoint_ready_ = !opts.require_checkpoint_before_prune;
936 mgr.closed_ = false;
937 return mgr;
938 }
939
945 [[nodiscard]] expected<int64_t> append(const uint8_t* data, size_t size) {
946 std::lock_guard<std::mutex> lk(mu_);
947 if (closed_)
948 return Error{ErrorCode::IO_ERROR, "WalManager: already closed"};
949
950 // Check roll conditions.
951 bool need_roll = false;
952 if (opts_.max_records > 0 && static_cast<size_t>(segment_record_count_) >= opts_.max_records)
953 need_roll = true;
954 if (!need_roll && opts_.max_segment_bytes > 0) {
955 int64_t bw = writer_->bytes_written();
956 if (static_cast<size_t>(bw) + 28 + size > opts_.max_segment_bytes)
957 need_roll = true;
958 }
959
960 if (need_roll) {
961 auto rc = roll_locked();
962 if (!rc) return rc.error();
963 }
964
965 auto res = writer_->append(data, size);
966 if (!res) return res;
967 ++segment_record_count_;
968 ++total_records_;
969 ++global_seq_;
970 return res;
971 }
972
977 [[nodiscard]] expected<int64_t> append(const char* data, size_t size) {
978 return append(reinterpret_cast<const uint8_t*>(data), size);
979 }
980
984 [[nodiscard]] expected<int64_t> append(std::string_view sv) {
985 return append(reinterpret_cast<const uint8_t*>(sv.data()), sv.size());
986 }
987
990 [[nodiscard]] std::vector<std::string> segment_paths() const {
991 std::lock_guard<std::mutex> lk(mu_);
992 return segments_;
993 }
994
1004 [[nodiscard]] expected<void> commit_compaction_checkpoint(const std::string& note = "") {
1005 std::lock_guard<std::mutex> lk(mu_);
1006 if (!opts_.require_checkpoint_before_prune) {
1007 prune_checkpoint_ready_ = true;
1008 return {};
1009 }
1010
1011 if (opts_.checkpoint_manifest_path.empty()) {
1012 // Caller opts out of marker-file persistence and takes responsibility
1013 // for external durable checkpointing.
1014 prune_checkpoint_ready_ = true;
1015 return {};
1016 }
1017
1018 auto wr = write_manifest_atomic(opts_.checkpoint_manifest_path, note);
1019 if (!wr) return wr;
1020 prune_checkpoint_ready_ = true;
1021 return {};
1022 }
1023
1027 std::lock_guard<std::mutex> lk(mu_);
1028 prune_checkpoint_ready_ = !opts_.require_checkpoint_before_prune;
1029 }
1030
1039 [[nodiscard]] expected<void> remove_segment(const std::string& path) {
1040 std::lock_guard<std::mutex> lk(mu_);
1041 if (opts_.require_checkpoint_before_prune && !prune_checkpoint_ready_)
1042 return Error{ErrorCode::IO_ERROR,
1043 "WalManager: prune blocked until compaction checkpoint is committed"};
1044 // Do not remove the currently active segment.
1045 if (!segments_.empty() && segments_.back() == path)
1046 return Error{ErrorCode::IO_ERROR,
1047 "WalManager: cannot remove active segment"};
1048 {
1049 std::error_code ec;
1050 std::filesystem::remove(path, ec);
1051 if (ec && ec != std::make_error_code(std::errc::no_such_file_or_directory))
1052 return Error{ErrorCode::IO_ERROR,
1053 std::string("WalManager: remove '") + path + "': " + ec.message()};
1054 }
1055 auto it = std::find(segments_.begin(), segments_.end(), path);
1056 if (it != segments_.end()) segments_.erase(it);
1057 return {};
1058 }
1059
1061 [[nodiscard]] int64_t total_records() const noexcept {
1062 std::lock_guard<std::mutex> lk(mu_);
1063 return total_records_;
1064 }
1065
1068 [[nodiscard]] expected<void> close() {
1069 std::lock_guard<std::mutex> lk(mu_);
1070 if (closed_) return {};
1071 closed_ = true;
1072 if (writer_) {
1073 auto close_result = writer_->close();
1074 if (!close_result) return close_result.error();
1075 }
1076 return {};
1077 }
1078
1086 std::lock_guard<std::mutex> lk(mu_);
1087 // Flush active segment so the reader can see all written records.
1088 if (writer_ && !closed_) {
1089 auto flush_result = writer_->flush(false);
1090 if (!flush_result) return flush_result.error();
1091 }
1092 std::vector<WalEntry> result;
1093 for (const auto& seg : segments_) {
1094 auto rd = WalReader::open(seg,
1095 opts_.strict_replay ? WalReader::ValidationMode::Strict
1096 : WalReader::ValidationMode::BestEffort);
1097 if (!rd) {
1098 if (opts_.strict_replay) return rd.error();
1099 continue;
1100 }
1101 auto entries = rd->read_all();
1102 if (!entries) {
1103 if (opts_.strict_replay) return entries.error();
1104 continue;
1105 }
1106 for (auto& e : *entries)
1107 result.push_back(std::move(e));
1108 }
1109 return result;
1110 }
1111
1112private:
1113 WalManager() = default;
1114
1115 static expected<void> sync_parent_directory(const std::string& file_path) {
1116#if !defined(_WIN32)
1117 std::filesystem::path p(file_path);
1118 std::filesystem::path parent = p.parent_path();
1119 if (parent.empty()) return {};
1120
1121 int dfd = ::open(parent.string().c_str(), O_RDONLY);
1122 if (dfd < 0) {
1123 return Error{ErrorCode::IO_ERROR,
1124 std::string("WalManager: open parent dir failed: ") + std::strerror(errno)};
1125 }
1126 if (::fsync(dfd) != 0) {
1127 int e = errno;
1128 ::close(dfd);
1129 return Error{ErrorCode::IO_ERROR,
1130 std::string("WalManager: fsync parent dir failed: ") + std::strerror(e)};
1131 }
1132 ::close(dfd);
1133#endif
1134 return {};
1135 }
1136
1137 static expected<void> write_manifest_atomic(const std::string& path,
1138 const std::string& note) {
1139 std::filesystem::path p(path);
1140 if (p.empty()) {
1141 return Error{ErrorCode::IO_ERROR,
1142 "WalManager: checkpoint_manifest_path is empty"};
1143 }
1144
1145 std::error_code mk_ec;
1146 auto parent = p.parent_path();
1147 if (!parent.empty())
1148 std::filesystem::create_directories(parent, mk_ec);
1149 if (mk_ec) {
1150 return Error{ErrorCode::IO_ERROR,
1151 std::string("WalManager: cannot create manifest dir: ") + mk_ec.message()};
1152 }
1153
1154 const std::string tmp = path + ".tmp." + std::to_string(detail::now_ns());
1155 FILE* f = nullptr;
1156#ifdef _WIN32
1157 f = std::fopen(tmp.c_str(), "wb");
1158#else
1159 // CWE-732: Create manifest with explicit 0600 permissions.
1160 int mfd = ::open(tmp.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0600);
1161 if (mfd >= 0) f = ::fdopen(mfd, "wb");
1162#endif
1163 if (!f) {
1164 return Error{ErrorCode::IO_ERROR,
1165 std::string("WalManager: cannot open temp manifest '") + tmp + "': " + std::strerror(errno)};
1166 }
1167
1168 std::string body = "checkpoint_ns=" + std::to_string(detail::now_ns()) + "\n";
1169 if (!note.empty()) {
1170 body += "note=" + note + "\n";
1171 }
1172
1173 if (std::fwrite(body.data(), 1, body.size(), f) != body.size()) {
1174 std::fclose(f);
1175 std::remove(tmp.c_str());
1176 return Error{ErrorCode::IO_ERROR,
1177 std::string("WalManager: write manifest failed: ") + std::strerror(errno)};
1178 }
1179 if (std::fflush(f) != 0) {
1180 std::fclose(f);
1181 std::remove(tmp.c_str());
1182 return Error{ErrorCode::IO_ERROR,
1183 std::string("WalManager: fflush manifest failed: ") + std::strerror(errno)};
1184 }
1185 if (detail::full_fsync(::fileno(f)) != 0) {
1186 std::fclose(f);
1187 std::remove(tmp.c_str());
1188 return Error{ErrorCode::IO_ERROR,
1189 std::string("WalManager: fsync manifest failed: ") + std::strerror(errno)};
1190 }
1191 std::fclose(f);
1192
1193 std::error_code mv_ec;
1194 std::filesystem::rename(tmp, path, mv_ec);
1195 if (mv_ec) {
1196 std::remove(tmp.c_str());
1197 return Error{ErrorCode::IO_ERROR,
1198 std::string("WalManager: atomic rename manifest failed: ") + mv_ec.message()};
1199 }
1200
1201 auto sync_res = sync_parent_directory(path);
1202 if (!sync_res) return sync_res;
1203 return {};
1204 }
1205
1206 // Must be called with mu_ held.
1207 expected<void> roll_locked() {
1208 // Flush + fsync the current segment.
1209 if (opts_.sync_on_roll) {
1210 (void)writer_->flush(true);
1211 }
1212 (void)writer_->close();
1213
1214 // Open a new segment continuing the global sequence.
1215 WalWriter::Options wopts;
1216 wopts.sync_on_append = opts_.sync_on_append;
1217 wopts.sync_on_flush = opts_.sync_on_roll;
1218 wopts.start_seq = global_seq_;
1219
1220 std::string seg_path = make_segment_path(dir_, opts_, global_seq_);
1221 auto writer = WalWriter::open(seg_path, wopts);
1222 if (!writer) return writer.error();
1223
1224 writer_ = std::make_unique<WalWriter>(std::move(writer.value()));
1225 segments_.push_back(seg_path);
1226 segment_record_count_ = 0;
1227 return {};
1228 }
1229
1230 // Generate segment filename: {prefix}_{seq_start}_{timestamp_ms}.{ext}
1231 static std::string make_segment_path(const std::string& dir,
1232 const Options& opts,
1233 int64_t seq_start) {
1234 // timestamp in milliseconds for the filename (cross-platform).
1235 struct timespec ts{};
1236#if defined(_WIN32)
1237 timespec_get(&ts, TIME_UTC);
1238#else
1239 ::clock_gettime(CLOCK_REALTIME, &ts);
1240#endif
1241 int64_t ts_ms = static_cast<int64_t>(ts.tv_sec) * 1000LL
1242 + ts.tv_nsec / 1'000'000LL;
1243
1244 // Build: dir + "/" + prefix + "_" + seq + "_" + ts_ms + ext
1245 std::string name = dir + "/" + opts.file_prefix
1246 + "_" + std::to_string(seq_start)
1247 + "_" + std::to_string(ts_ms)
1248 + opts.file_ext;
1249 return name;
1250 }
1251
1252 // Scan directory for existing WAL segments matching prefix/ext.
1253 // Returns sorted list by embedded seq_start (numeric parse of filename).
1254 // Uses std::filesystem::directory_iterator (cross-platform; no opendir/readdir).
1255 static std::vector<std::string> scan_segments(const std::string& dir,
1256 const Options& opts) {
1257 std::vector<std::pair<int64_t, std::string>> found;
1258 std::error_code ec;
1259 for (const auto& entry : std::filesystem::directory_iterator(dir, ec)) {
1260 if (ec) break;
1261 if (!entry.is_regular_file()) continue;
1262 const std::string fname = entry.path().filename().string();
1263 // Must start with prefix + "_" and end with ext.
1264 const std::string pfx = opts.file_prefix + "_";
1265 if (fname.size() <= pfx.size() + opts.file_ext.size()) continue;
1266 if (fname.substr(0, pfx.size()) != pfx) continue;
1267 if (fname.substr(fname.size() - opts.file_ext.size()) != opts.file_ext) continue;
1268 // Extract seq_start from either:
1269 // prefix_<seq>_<ts>.wal (WalManager format)
1270 // prefix_<seq>.wal (WalMmapWriter format)
1271 size_t p1 = pfx.size();
1272 size_t ext_pos = fname.size() - opts.file_ext.size();
1273 if (ext_pos <= p1) continue;
1274 size_t p2 = fname.find('_', p1);
1275
1276 std::string seq_str;
1277 if (p2 == std::string::npos || p2 > ext_pos) {
1278 seq_str = fname.substr(p1, ext_pos - p1);
1279 } else {
1280 seq_str = fname.substr(p1, p2 - p1);
1281 }
1282
1283 int64_t seq_start = 0;
1284 try { seq_start = std::stoll(seq_str); } catch (...) { continue; }
1285 found.emplace_back(seq_start, entry.path().string());
1286 }
1287 std::sort(found.begin(), found.end(),
1288 [](const auto& a, const auto& b) { return a.first < b.first; });
1289 std::vector<std::string> paths;
1290 paths.reserve(found.size());
1291 for (auto& [seq, p] : found) paths.push_back(std::move(p));
1292 return paths;
1293 }
1294
1295 std::string dir_;
1296 Options opts_;
1297 std::vector<std::string> segments_;
1298 std::unique_ptr<WalWriter> writer_;
1299 int64_t global_seq_ = 0;
1300 int64_t segment_record_count_ = 0;
1301 int64_t total_records_ = 0;
1302 bool prune_checkpoint_ready_ = true;
1303 bool closed_ = false;
1304 mutable std::mutex mu_;
1305};
1306
1307} // namespace signet::forge
1308
1309// ---------------------------------------------------------------------------
1310// Memory-mapped WAL segment and ring writer.
1311// Included here so detail_mmap:: helpers have access to the signet::forge
1312// namespace already opened above; WalMmapWriter produces files readable by
1313// WalReader without any changes to the reader.
1314// Excluded on Emscripten — WalMmapWriter requires std::thread + POSIX mmap.
1315// ---------------------------------------------------------------------------
1316#ifndef __EMSCRIPTEN__
1318#endif
1319
Manages multiple rolling WAL segment files in a directory.
Definition wal.hpp:806
WalManager & operator=(const WalManager &)=delete
WalManager(const WalManager &)=delete
static expected< WalManager > open(const std::string &dir, Options opts={})
Open a WAL directory, discovering existing segments and creating a new active segment ready for writi...
Definition wal.hpp:836
expected< void > commit_compaction_checkpoint(const std::string &note="")
Record that an external compaction checkpoint has been made durable, allowing old fully-compacted WAL...
Definition wal.hpp:1004
WalManager(WalManager &&o) noexcept
Definition wal.hpp:813
expected< std::vector< WalEntry > > read_all()
Read all WAL entries across all segments (sealed and active).
Definition wal.hpp:1085
WalManager & operator=(WalManager &&)=delete
expected< void > remove_segment(const std::string &path)
Remove a fully-compacted segment from disk and from the tracking list.
Definition wal.hpp:1039
int64_t total_records() const noexcept
Total number of records written across all segments (including rolled ones).
Definition wal.hpp:1061
expected< int64_t > append(const uint8_t *data, size_t size)
Append a record to the current segment, rolling to a new segment if needed.
Definition wal.hpp:945
std::vector< std::string > segment_paths() const
List all WAL segment paths in sequence order (oldest first).
Definition wal.hpp:990
void clear_compaction_checkpoint()
Reset the checkpoint-ready flag so subsequent prune calls are blocked again until the next commit_com...
Definition wal.hpp:1026
expected< int64_t > append(const char *data, size_t size)
Append a record from a char pointer.
Definition wal.hpp:977
expected< void > close()
Close the manager and the active segment writer.
Definition wal.hpp:1068
expected< int64_t > append(std::string_view sv)
Append a record from a string_view.
Definition wal.hpp:984
Sequential WAL file reader for crash recovery and replay.
Definition wal.hpp:572
WalReader & operator=(WalReader &&)=delete
static expected< WalReader > open(const std::string &path, ValidationMode mode=ValidationMode::BestEffort)
Open a WAL file for sequential reading.
Definition wal.hpp:597
expected< std::optional< WalEntry > > next()
Read the next WAL entry from the file.
Definition wal.hpp:626
size_t offset() const noexcept
Current byte offset in the file (past the last read record).
Definition wal.hpp:729
WalReader(WalReader &&o) noexcept
Definition wal.hpp:581
WalReader(const WalReader &)=delete
void close()
Close the underlying file handle.
Definition wal.hpp:732
WalReader & operator=(const WalReader &)=delete
int64_t count() const noexcept
Number of records successfully read so far.
Definition wal.hpp:727
int64_t last_seq() const noexcept
Sequence number of the last successfully read record (-1 if none).
Definition wal.hpp:725
expected< std::vector< WalEntry > > read_all()
Read all valid entries from the current position to end-of-valid-data.
Definition wal.hpp:713
Append-only Write-Ahead Log writer with CRC-32 integrity per record.
Definition wal.hpp:236
WalWriter(WalWriter &&o) noexcept
Definition wal.hpp:244
expected< int64_t > append(const std::vector< uint8_t > &data)
Append a record from a byte vector.
Definition wal.hpp:469
static expected< WalWriter > open(const std::string &path, Options opts={})
Open or create a WAL file for appending.
Definition wal.hpp:270
WalWriter & operator=(WalWriter &&)=delete
const std::string & path() const noexcept
Filesystem path of the WAL file.
Definition wal.hpp:532
int64_t bytes_written() const noexcept
Total bytes written to this WAL file (header + all records).
Definition wal.hpp:534
uint64_t rejected_empty_count() const noexcept
Number of empty records rejected (CWE-754).
Definition wal.hpp:536
int64_t next_seq() const noexcept
The sequence number that will be assigned to the next appended record.
Definition wal.hpp:530
expected< int64_t > append(const char *data, size_t size)
Append a record from a char pointer.
Definition wal.hpp:477
WalWriterOptions Options
Alias for the options struct.
Definition wal.hpp:239
WalWriter(const WalWriter &)=delete
WalWriter & operator=(const WalWriter &)=delete
expected< int64_t > append(const uint8_t *data, size_t size)
Append a raw-byte record to the WAL.
Definition wal.hpp:397
expected< int64_t > append(std::string_view sv)
Append a record from a string_view.
Definition wal.hpp:484
expected< void > close()
Seal the WAL file: flush buffered data, fsync, and close the file handle.
Definition wal.hpp:510
expected< void > flush(bool do_fsync=false)
Flush the stdio buffer to the OS page cache.
Definition wal.hpp:495
bool is_open() const noexcept
True if the writer is open and accepting appends.
Definition wal.hpp:528
A lightweight result type that holds either a success value of type T or an Error.
Definition error.hpp:143
uint32_t read_le32(const uint8_t *src) noexcept
Read a 32-bit unsigned integer from little-endian byte order.
Definition wal.hpp:155
uint32_t crc32(const void *data, size_t length) noexcept
Compute CRC-32 over a contiguous byte buffer (polynomial 0xEDB88320).
Definition wal.hpp:85
int full_fsync(int fd) noexcept
Force durable flush to storage media.
Definition wal.hpp:180
int64_t now_ns() noexcept
Return nanoseconds since Unix epoch (cross-platform).
Definition wal.hpp:120
void write_le32(uint8_t *dst, uint32_t v) noexcept
Write a 32-bit unsigned integer in little-endian byte order.
Definition wal.hpp:133
uint32_t crc32_combine(uint32_t crc_a, const void *data_b, size_t len_b) noexcept
Combine two CRC regions without concatenating buffers.
Definition wal.hpp:108
void write_le64(uint8_t *dst, uint64_t v) noexcept
Write a 64-bit unsigned integer in little-endian byte order.
Definition wal.hpp:142
uint64_t read_le64(const uint8_t *src) noexcept
Read a 64-bit unsigned integer from little-endian byte order.
Definition wal.hpp:164
WalLifecycleMode
Controls safety guardrails for WAL segment lifecycle operations.
Definition wal.hpp:766
@ Development
Permissive: allows reset_on_open.
@ Benchmark
Same as Development, for benchmark harnesses.
@ Production
Strict: reset_on_open is rejected.
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ INVALID_ARGUMENT
A caller-supplied argument is outside the valid range or violates a precondition.
Lightweight error value carrying an ErrorCode and a human-readable message.
Definition error.hpp:99
std::string message
A human-readable description of what went wrong (may be empty for OK).
Definition error.hpp:103
A single decoded WAL record returned by WalReader::next() or read_all().
Definition wal.hpp:200
int64_t seq
Sequence number (0-based, monotonically increasing)
Definition wal.hpp:201
std::vector< uint8_t > payload
Raw record bytes (application-defined content)
Definition wal.hpp:203
int64_t timestamp_ns
Wall-clock timestamp in nanoseconds since Unix epoch.
Definition wal.hpp:202
Configuration options for WalManager::open().
Definition wal.hpp:777
std::string checkpoint_manifest_path
Path for atomic checkpoint manifest file.
Definition wal.hpp:790
Configuration options for WalWriter::open().
Definition wal.hpp:218
int64_t start_seq
First sequence number for brand-new files.
Definition wal.hpp:222
bool sync_on_flush
If true, fsync on explicit flush() calls.
Definition wal.hpp:220
size_t buffer_size
stdio setvbuf buffer size in bytes
Definition wal.hpp:221
bool sync_on_append
If true, fsync after every record append.
Definition wal.hpp:219
Parquet format enumerations, type traits, and statistics structs.
Cross-platform memory-mapped WAL segment and ring writer.