Signet Forge 0.1.0
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
wal_mapped_segment.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
5//
6// Provides:
7// MappedSegment — RAII mmap'd file (POSIX + Windows)
8// WalMmapOptions — options struct (namespace scope — Apple Clang restriction)
9// WalMmapWriter — ring of N mmap'd segments with bg pre-allocation thread
10//
11// File format: IDENTICAL to WalWriter (WAL_FILE_MAGIC at byte 0, records at byte 16).
12// WalReader works unchanged on WalMmapWriter segment files.
13//
14// Thread safety: NOT thread-safe for concurrent appends. Single-writer model.
15// flush() and close() must be called from the same thread as append().
16//
17// Crash safety: CRC is written LAST with a release fence.
18// WalReader stops at bad/missing CRC, recovering all complete records.
19//
20// This file is included at the bottom of wal.hpp so that the detail_mmap helpers
21// defined here do not pollute the signet::forge::detail namespace.
22
23#pragma once
24#include <algorithm>
25#include <array>
26#include <atomic>
27#include <cassert>
28#include <chrono>
29#include <condition_variable>
30#include <cerrno>
31#include <cstdint>
32#include <cstdio>
33#include <cstring>
34#include <filesystem>
35#include <memory>
36#include <mutex>
37#include <string>
38#include <thread>
39#include <vector>
40
41#if defined(_WIN32)
42# ifndef WIN32_LEAN_AND_MEAN
43# define WIN32_LEAN_AND_MEAN
44# endif
45# ifndef NOMINMAX
46# define NOMINMAX
47# endif
48# include <windows.h>
49#else
50# include <sys/mman.h>
51# include <sys/stat.h>
52# include <unistd.h>
53# include <fcntl.h>
54# if defined(__APPLE__)
55# include <sys/types.h>
56# endif
57#endif
58
59#include "signet/error.hpp"
60
61namespace signet::forge {
62
63// ---------------------------------------------------------------------------
64// Constants
65// ---------------------------------------------------------------------------
66
68static constexpr size_t WAL_MMAP_MIN_SEGMENT = 65536ULL;
70static constexpr size_t WAL_MMAP_DEFAULT_SEGMENT = 64ULL * 1024ULL * 1024ULL;
72static constexpr size_t WAL_MMAP_MAX_SEGMENT = 16ULL * 1024ULL * 1024ULL * 1024ULL;
74static constexpr size_t WAL_MMAP_MAX_RING = 16;
76static constexpr size_t WAL_MMAP_MAX_RECORD_SIZE = 64ULL * 1024ULL * 1024ULL;
77
78// ---------------------------------------------------------------------------
79// detail_mmap — private helpers (no dependency on wal.hpp detail:: namespace)
80// ---------------------------------------------------------------------------
81
83namespace detail_mmap {
84
86inline int64_t now_ns() noexcept {
87 struct timespec ts{};
88#if defined(_WIN32)
89 timespec_get(&ts, TIME_UTC);
90#else
91 ::clock_gettime(CLOCK_REALTIME, &ts);
92#endif
93 return static_cast<int64_t>(ts.tv_sec) * 1'000'000'000LL + ts.tv_nsec;
94}
95
97inline void write_le32(uint8_t* dst, uint32_t v) noexcept {
98 dst[0] = static_cast<uint8_t>(v);
99 dst[1] = static_cast<uint8_t>(v >> 8);
100 dst[2] = static_cast<uint8_t>(v >> 16);
101 dst[3] = static_cast<uint8_t>(v >> 24);
102}
103
105inline void write_le64(uint8_t* dst, uint64_t v) noexcept {
106 dst[0] = static_cast<uint8_t>(v);
107 dst[1] = static_cast<uint8_t>(v >> 8);
108 dst[2] = static_cast<uint8_t>(v >> 16);
109 dst[3] = static_cast<uint8_t>(v >> 24);
110 dst[4] = static_cast<uint8_t>(v >> 32);
111 dst[5] = static_cast<uint8_t>(v >> 40);
112 dst[6] = static_cast<uint8_t>(v >> 48);
113 dst[7] = static_cast<uint8_t>(v >> 56);
114}
115
117static constexpr uint32_t MMAP_WAL_RECORD_MAGIC = 0x57414C31u;
119static constexpr uint16_t MMAP_WAL_FILE_HDR_SIZE = 16;
121static constexpr char MMAP_WAL_FILE_MAGIC[16] = {'S','I','G','N','E','T','W','A','L','1',
122 '\0','\0','\0','\0','\0','\0'};
123
124} // namespace detail_mmap
125
126// ---------------------------------------------------------------------------
127// MappedSegment — RAII cross-platform mmap'd file
128// ---------------------------------------------------------------------------
129
140public:
142 MappedSegment() = default;
143 ~MappedSegment() noexcept { close(); }
144 MappedSegment(const MappedSegment&) = delete;
146
148 :
149#if defined(_WIN32)
150 hFile_(o.hFile_), hMap_(o.hMap_),
151#else
152 fd_(o.fd_),
153#endif
154 ptr_(o.ptr_), size_(o.size_), path_(std::move(o.path_))
155 {
156#if defined(_WIN32)
157 o.hFile_ = INVALID_HANDLE_VALUE;
158 o.hMap_ = nullptr;
159#else
160 o.fd_ = -1;
161#endif
162 o.ptr_ = nullptr;
163 o.size_ = 0;
164 }
165
167 if (this != &o) {
168 close();
169#if defined(_WIN32)
170 hFile_ = o.hFile_; o.hFile_ = INVALID_HANDLE_VALUE;
171 hMap_ = o.hMap_; o.hMap_ = nullptr;
172#else
173 fd_ = o.fd_; o.fd_ = -1;
174#endif
175 ptr_ = o.ptr_; o.ptr_ = nullptr;
176 size_ = o.size_; o.size_ = 0;
177 path_ = std::move(o.path_);
178 }
179 return *this;
180 }
181
191 static expected<MappedSegment> create(const std::string& path, size_t size) noexcept {
192 if (size < WAL_MMAP_MIN_SEGMENT || size > WAL_MMAP_MAX_SEGMENT)
193 return Error{ErrorCode::IO_ERROR, "MappedSegment: invalid size"};
194 if (size % WAL_MMAP_MIN_SEGMENT != 0)
195 return Error{ErrorCode::IO_ERROR, "MappedSegment: size must be multiple of 65536"};
196
197 MappedSegment seg;
198 seg.size_ = size;
199 seg.path_ = path;
200
201#if defined(_WIN32)
202 // --- Windows path ---
203 seg.hFile_ = CreateFileA(
204 path.c_str(),
205 GENERIC_READ | GENERIC_WRITE,
206 FILE_SHARE_READ | FILE_SHARE_WRITE,
207 nullptr,
208 CREATE_ALWAYS,
209 FILE_ATTRIBUTE_NORMAL,
210 nullptr);
211 if (seg.hFile_ == INVALID_HANDLE_VALUE) {
213 std::string("MappedSegment: CreateFileA failed for '") + path + "'"};
214 }
215
216 // Extend file to requested size
217 LARGE_INTEGER li;
218 li.QuadPart = static_cast<LONGLONG>(size);
219 if (!SetFilePointerEx(seg.hFile_, li, nullptr, FILE_BEGIN) ||
220 !SetEndOfFile(seg.hFile_)) {
221 CloseHandle(seg.hFile_);
222 seg.hFile_ = INVALID_HANDLE_VALUE;
224 std::string("MappedSegment: SetEndOfFile failed for '") + path + "'"};
225 }
226
227 DWORD size_hi = static_cast<DWORD>(size >> 32);
228 DWORD size_lo = static_cast<DWORD>(size & 0xFFFFFFFFULL);
229 seg.hMap_ = CreateFileMappingA(
230 seg.hFile_,
231 nullptr,
232 PAGE_READWRITE,
233 size_hi,
234 size_lo,
235 nullptr);
236 if (!seg.hMap_) {
237 CloseHandle(seg.hFile_);
238 seg.hFile_ = INVALID_HANDLE_VALUE;
240 std::string("MappedSegment: CreateFileMappingA failed for '") + path + "'"};
241 }
242
243 void* ptr = MapViewOfFile(seg.hMap_, FILE_MAP_ALL_ACCESS, 0, 0, size);
244 if (!ptr) {
245 CloseHandle(seg.hMap_);
246 CloseHandle(seg.hFile_);
247 seg.hMap_ = nullptr;
248 seg.hFile_ = INVALID_HANDLE_VALUE;
250 std::string("MappedSegment: MapViewOfFile failed for '") + path + "'"};
251 }
252 seg.ptr_ = static_cast<uint8_t*>(ptr);
253
254#else
255 // --- POSIX path ---
256 int fd = ::open(path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0600);
257 if (fd < 0) {
259 std::string("MappedSegment: open failed for '") + path + "': " + std::strerror(errno)};
260 }
261
262#if defined(__APPLE__)
263 // Best-effort pre-allocation hint on macOS
264 struct fstore fs{};
265 fs.fst_flags = F_ALLOCATECONTIG;
266 fs.fst_posmode = F_PEOFPOSMODE;
267 fs.fst_offset = 0;
268 fs.fst_length = static_cast<off_t>(size);
269 // Ignore return — fallback to non-contiguous if it fails
270 if (::fcntl(fd, F_PREALLOCATE, &fs) < 0) {
271 fs.fst_flags = F_ALLOCATEALL;
272 (void)::fcntl(fd, F_PREALLOCATE, &fs);
273 }
274#endif
275
276 if (::ftruncate(fd, static_cast<off_t>(size)) < 0) {
277 ::close(fd);
279 std::string("MappedSegment: ftruncate failed for '") + path + "': " + std::strerror(errno)};
280 }
281
282 void* ptr = ::mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
283 if (ptr == MAP_FAILED) {
284 ::close(fd);
286 std::string("MappedSegment: mmap failed for '") + path + "': " + std::strerror(errno)};
287 }
288
289 // Sequential access hint — best-effort
290 ::madvise(ptr, size, MADV_SEQUENTIAL);
291
292 seg.fd_ = fd;
293 seg.ptr_ = static_cast<uint8_t*>(ptr);
294#endif
295
296 return seg;
297 }
298
300 [[nodiscard]] uint8_t* data() const noexcept { return ptr_; }
302 [[nodiscard]] size_t capacity() const noexcept { return size_; }
304 [[nodiscard]] bool is_valid() const noexcept { return ptr_ != nullptr; }
306 [[nodiscard]] const std::string& path() const noexcept { return path_; }
307
311 [[nodiscard]] expected<void> flush_async() noexcept {
312 if (!ptr_) return {};
313#if defined(_WIN32)
314 if (!FlushViewOfFile(ptr_, size_)) {
316 "MappedSegment: FlushViewOfFile failed for '" + path_ + "'"};
317 }
318#else
319 if (::msync(ptr_, size_, MS_ASYNC) != 0) {
321 std::string("MappedSegment: msync(MS_ASYNC) failed for '") +
322 path_ + "': " + std::strerror(errno)};
323 }
324#endif
325 return {};
326 }
327
331 [[nodiscard]] expected<void> flush_sync() noexcept {
332 if (!ptr_) return {};
333#if defined(_WIN32)
334 if (!FlushViewOfFile(ptr_, size_)) {
336 "MappedSegment: FlushViewOfFile failed for '" + path_ + "'"};
337 }
338 if (hFile_ != INVALID_HANDLE_VALUE && !FlushFileBuffers(hFile_)) {
340 "MappedSegment: FlushFileBuffers failed for '" + path_ + "'"};
341 }
342#else
343 if (::msync(ptr_, size_, MS_SYNC) != 0) {
345 std::string("MappedSegment: msync(MS_SYNC) failed for '") +
346 path_ + "': " + std::strerror(errno)};
347 }
348#endif
349 return {};
350 }
351
353 void close() noexcept {
354 if (!ptr_) return;
355#if defined(_WIN32)
356 // STRICT ORDER (Windows Issue #3): Unmap → CloseMap → CloseFile
357 UnmapViewOfFile(ptr_);
358 ptr_ = nullptr;
359 if (hMap_ != nullptr) {
360 CloseHandle(hMap_);
361 hMap_ = nullptr;
362 }
363 if (hFile_ != INVALID_HANDLE_VALUE) {
364 CloseHandle(hFile_);
365 hFile_ = INVALID_HANDLE_VALUE;
366 }
367#else
368 ::munmap(ptr_, size_);
369 ptr_ = nullptr;
370 if (fd_ >= 0) {
371 ::close(fd_);
372 fd_ = -1;
373 }
374#endif
375 size_ = 0;
376 }
377
382 void prefault() noexcept {
383 if (!ptr_) return;
384 volatile const uint8_t* p = ptr_;
385 volatile uint8_t sink = 0;
386 for (size_t i = 0; i < size_; i += 4096)
387 sink = p[i];
388 (void)sink;
389 std::atomic_thread_fence(std::memory_order_release);
390 }
391
392private:
393#if defined(_WIN32)
394 HANDLE hFile_ = INVALID_HANDLE_VALUE;
395 HANDLE hMap_ = nullptr;
396#else
397 int fd_ = -1;
398#endif
399 uint8_t* ptr_ = nullptr;
400 size_t size_ = 0;
401 std::string path_;
402};
403
404// ---------------------------------------------------------------------------
405// WalMmapOptions — options for WalMmapWriter::open()
406// (defined at namespace scope — Apple Clang default-member-init restriction)
407// ---------------------------------------------------------------------------
408
416 std::string dir;
417 std::string name_prefix = "wal_mmap";
418 size_t ring_segments = 4;
419 size_t segment_size = WAL_MMAP_DEFAULT_SEGMENT;
420 bool sync_on_append = false;
421 bool sync_on_flush = false;
422 bool use_large_pages = false;
423 int64_t start_seq = 0;
424};
425
426// ---------------------------------------------------------------------------
427// WalMmapWriter — ring of N memory-mapped WAL segments with bg pre-allocation
428// ---------------------------------------------------------------------------
429
451public:
452 WalMmapWriter(const WalMmapWriter&) = delete;
454
456 // If the source has a running bg thread, stop it before transferring.
457 // If bg_deferred_, the thread was never started — skip join.
458 if (!o.bg_deferred_) {
459 o.bg_stop_.store(true, std::memory_order_release);
460 o.bg_cv_.notify_all();
461 if (o.bg_thread_.joinable())
462 o.bg_thread_.join();
463 }
464
465 opts_ = std::move(o.opts_);
466 ring_ = std::move(o.ring_);
467 active_idx_.store(o.active_idx_.load(std::memory_order_acquire), std::memory_order_release);
468 next_seq_.store(o.next_seq_.load(std::memory_order_acquire), std::memory_order_release);
469 next_seg_id_ = o.next_seg_id_;
470 closed_.store(o.closed_.load(std::memory_order_acquire), std::memory_order_release);
471 bg_deferred_ = false;
472 bg_stop_.store(false, std::memory_order_release);
473 if (!closed_.load(std::memory_order_acquire) && !ring_.empty())
474 bg_thread_ = std::thread(&WalMmapWriter::bg_worker, this);
475
476 o.closed_.store(true, std::memory_order_release);
477 o.active_idx_.store(0, std::memory_order_release);
478 }
479
481 if (this != &o) {
482 (void)close();
483 // Stop source bg thread (unless deferred)
484 if (!o.bg_deferred_) {
485 o.bg_stop_.store(true, std::memory_order_release);
486 o.bg_cv_.notify_all();
487 if (o.bg_thread_.joinable())
488 o.bg_thread_.join();
489 }
490
491 opts_ = std::move(o.opts_);
492 ring_ = std::move(o.ring_);
493 active_idx_.store(o.active_idx_.load(std::memory_order_acquire), std::memory_order_release);
494 next_seq_.store(o.next_seq_.load(std::memory_order_acquire), std::memory_order_release);
495 next_seg_id_ = o.next_seg_id_;
496 closed_.store(o.closed_.load(std::memory_order_acquire), std::memory_order_release);
497 bg_deferred_ = false;
498 bg_stop_.store(false, std::memory_order_release);
499 if (!closed_.load(std::memory_order_acquire) && !ring_.empty())
500 bg_thread_ = std::thread(&WalMmapWriter::bg_worker, this);
501
502 o.closed_.store(true, std::memory_order_release);
503 o.active_idx_.store(0, std::memory_order_release);
504 }
505 return *this;
506 }
507
509 if (!closed_.load(std::memory_order_acquire))
510 (void)close();
511 }
512
522 // --- Validate options ---
523 if (opts.ring_segments < 2 || opts.ring_segments > WAL_MMAP_MAX_RING)
525 "WalMmapWriter: ring_segments must be in [2, WAL_MMAP_MAX_RING]"};
526 if (opts.segment_size < WAL_MMAP_MIN_SEGMENT ||
527 opts.segment_size > WAL_MMAP_MAX_SEGMENT)
528 return Error{ErrorCode::IO_ERROR, "WalMmapWriter: invalid segment_size"};
529 if (opts.segment_size % WAL_MMAP_MIN_SEGMENT != 0)
531 "WalMmapWriter: segment_size must be multiple of 65536"};
532 if (opts.dir.empty())
533 return Error{ErrorCode::IO_ERROR, "WalMmapWriter: dir must not be empty"};
534
535 // Security: reject path traversal
536 // Check for ".." components using filesystem path parsing
537 {
538 std::filesystem::path p(opts.dir);
539 for (const auto& comp : p) {
540 if (comp == "..")
542 "WalMmapWriter: path traversal detected in dir"};
543 }
544 }
545 // Also reject literal ".." substring patterns
546 if (opts.dir.find("..") != std::string::npos)
548 "WalMmapWriter: path traversal detected in dir"};
549
550 // --- Create directory ---
551 {
552 std::error_code ec;
553 std::filesystem::create_directories(opts.dir, ec);
554 if (ec) {
555 // If directory already exists that's fine; any other error is fatal
556 std::error_code ec2;
557 if (!std::filesystem::is_directory(opts.dir, ec2))
559 std::string("WalMmapWriter: cannot create dir '") +
560 opts.dir + "': " + ec.message()};
561 }
562 }
563
564 // --- Construct writer ---
565 WalMmapWriter w(opts);
566 w.next_seq_.store(opts.start_seq, std::memory_order_relaxed);
567
568 // Allocate ring slots (heap-allocated to avoid copying atomics)
569 for (size_t i = 0; i < opts.ring_segments; ++i)
570 w.ring_.push_back(std::make_unique<RingSlot>());
571
572 // Synchronously allocate + prefault slot 0 (writer thread, blocking is fine once)
573 uint64_t first_id;
574 { std::lock_guard<std::mutex> lk(w.bg_mutex_); first_id = w.next_seg_id_++; }
575 auto r0 = w.allocate_slot(*w.ring_[0], first_id, /*skip_prefault=*/false);
576 if (!r0) return r0.error();
577
578 w.ring_[0]->first_seq = static_cast<uint64_t>(w.next_seq_.load(std::memory_order_relaxed));
579 w.ring_[0]->write_offset = 0;
580 w.ring_[0]->state.store(SlotState::ACTIVE, std::memory_order_release);
581 w.active_idx_.store(0, std::memory_order_release);
582 w.init_slot_header(*w.ring_[0]);
583
584 // Background thread is started by the move constructor to avoid a
585 // data race: starting the thread here would give it a pointer to the
586 // local `w`, which is about to be moved into expected<WalMmapWriter>.
587 w.bg_deferred_ = true;
588
589 return w;
590 }
591
601 [[nodiscard]] expected<int64_t> append(const uint8_t* data, size_t size) {
602 if (closed_.load(std::memory_order_acquire))
603 return Error{ErrorCode::IO_ERROR, "WalMmapWriter: already closed"};
604 if (size > static_cast<size_t>(UINT32_MAX))
605 return Error{ErrorCode::IO_ERROR, "WalMmapWriter: record too large (> 4 GB)"};
606 // CWE-400: Uncontrolled Resource Consumption — enforce WAL_MAX_RECORD_SIZE cap.
607 if (size > WAL_MMAP_MAX_RECORD_SIZE)
608 return Error{ErrorCode::IO_ERROR, "WalMmapWriter: record exceeds WAL_MAX_RECORD_SIZE (64 MB)"};
609
610 const size_t entry_size = 28 + size; // 24-byte hdr + data + 4-byte crc
611
612 if (entry_size > usable())
614 "WalMmapWriter: record larger than segment usable space"};
615
616 // Check if active slot has room; rotate if not
617 RingSlot* active = ring_[active_idx_.load(std::memory_order_acquire)].get();
618 if (active->write_offset + entry_size > usable()) {
619 auto r = rotate();
620 if (!r) return r.error();
621 active = ring_[active_idx_.load(std::memory_order_acquire)].get();
622 }
623
624 // CWE-617: Reachable Assertion — runtime bounds check replaces assert()
625 // so that out-of-bounds writes are caught in Release builds (not just Debug).
626 if (active->seg.data() == nullptr ||
627 active->seg.data() + detail_mmap::MMAP_WAL_FILE_HDR_SIZE +
628 active->write_offset + entry_size
629 > active->seg.data() + active->seg.capacity()) {
631 "WalMmapWriter: write would exceed mmap'd segment bounds"};
632 }
633 assert(active->seg.data() != nullptr);
634 assert(active->seg.data() + detail_mmap::MMAP_WAL_FILE_HDR_SIZE +
635 active->write_offset + entry_size
636 <= active->seg.data() + active->seg.capacity());
637
638 const int64_t seq = next_seq_.fetch_add(1, std::memory_order_relaxed);
639 const int64_t ts = detail_mmap::now_ns();
640 const auto dsz = static_cast<uint32_t>(size);
641
642 uint8_t* dst = active->seg.data()
643 + detail_mmap::MMAP_WAL_FILE_HDR_SIZE
644 + active->write_offset;
645
646 // Write 24-byte record header
647 detail_mmap::write_le32(dst, detail_mmap::MMAP_WAL_RECORD_MAGIC);
648 detail_mmap::write_le64(dst + 4, static_cast<uint64_t>(seq));
649 detail_mmap::write_le64(dst + 12, static_cast<uint64_t>(ts));
650 detail_mmap::write_le32(dst + 20, dsz);
651
652 // Write payload
653 if (size > 0)
654 std::memcpy(dst + 24, data, size);
655
656 // Compute CRC over header (24 bytes) + data (contiguous in mmap buffer)
657 const uint32_t crc = compute_crc(dst, 24 + size);
658
659 // RELEASE FENCE: ensure header + data bytes are visible before CRC
660 // (crash safety on ARM/POWER architectures with weak memory ordering)
661 std::atomic_thread_fence(std::memory_order_release);
662
663 // Write CRC LAST — crash safety: WalReader stops at missing/bad CRC
664 detail_mmap::write_le32(dst + 24 + size, crc);
665
666 active->write_offset += entry_size;
667 active->last_seq = static_cast<uint64_t>(seq);
668
669 if (opts_.sync_on_append) {
670 auto flush_result = active->seg.flush_async();
671 if (!flush_result) return flush_result.error();
672 }
673
674 // Wake bg thread early when active segment is 75% full
675 if (active->write_offset > usable() * 3 / 4)
676 bg_cv_.notify_one();
677
678 return seq;
679 }
680
685 [[nodiscard]] expected<int64_t> append(const char* data, size_t size) {
686 return append(reinterpret_cast<const uint8_t*>(data), size);
687 }
688
692 [[nodiscard]] expected<int64_t> append(std::string_view sv) {
693 return append(reinterpret_cast<const uint8_t*>(sv.data()), sv.size());
694 }
695
699 [[nodiscard]] expected<int64_t> append(const std::vector<uint8_t>& v) {
700 return append(v.data(), v.size());
701 }
702
706 [[nodiscard]] expected<void> flush(bool do_sync = false) {
707 if (closed_.load(std::memory_order_acquire))
708 return Error{ErrorCode::IO_ERROR, "WalMmapWriter: already closed"};
709 RingSlot& active = *ring_[active_idx_.load(std::memory_order_acquire)];
710 if (do_sync || opts_.sync_on_flush)
711 return active.seg.flush_sync();
712 return active.seg.flush_async();
713 }
714
717 [[nodiscard]] expected<void> close() {
718 if (closed_.load(std::memory_order_acquire)) return {};
719 closed_.store(true, std::memory_order_release);
720
721 // Stop background thread
722 bg_stop_.store(true, std::memory_order_release);
723 bg_cv_.notify_all();
724 if (bg_thread_.joinable())
725 bg_thread_.join();
726
727 // Flush and close all slots
728 std::optional<Error> first_error;
729 for (auto& sp : ring_) {
730 if (!sp) continue;
731 const auto st = sp->state.load(std::memory_order_acquire);
732 if (st == SlotState::ACTIVE) {
733 auto flush_result = sp->seg.flush_sync();
734 if (!flush_result && !first_error.has_value()) {
735 first_error = flush_result.error();
736 }
737 }
738 sp->seg.close();
739 }
740 if (first_error.has_value()) return *first_error;
741 return {};
742 }
743
745 [[nodiscard]] bool is_open() const noexcept { return !closed_.load(std::memory_order_acquire); }
747 [[nodiscard]] int64_t next_seq() const noexcept { return next_seq_.load(std::memory_order_acquire); }
749 [[nodiscard]] const std::string& dir() const noexcept { return opts_.dir; }
750
755 [[nodiscard]] std::vector<std::string> segment_paths() const {
756 std::vector<std::string> paths;
757
758 // Hold bg_mutex_ while reading ring slots — the bg thread may be
759 // writing to file_path in allocate_slot() concurrently.
760 {
761 std::lock_guard<std::mutex> lk(bg_mutex_);
762 for (const auto& sp : ring_) {
763 const auto st = sp->state.load(std::memory_order_acquire);
764 if (!sp->file_path.empty() &&
765 st != SlotState::FREE && st != SlotState::ALLOCATING)
766 paths.push_back(sp->file_path);
767 }
768 }
769
770 // Also scan directory for older segment files from previous sessions
771 std::error_code ec;
772 for (const auto& entry : std::filesystem::directory_iterator(opts_.dir, ec)) {
773 if (ec) break;
774 if (!entry.is_regular_file()) continue;
775 const std::string fname = entry.path().filename().string();
776 if (fname.size() < opts_.name_prefix.size() ||
777 fname.substr(0, opts_.name_prefix.size()) != opts_.name_prefix)
778 continue;
779 const std::string fpath = entry.path().string();
780 if (std::find(paths.begin(), paths.end(), fpath) == paths.end())
781 paths.push_back(fpath);
782 }
783
784 std::sort(paths.begin(), paths.end());
785 return paths;
786 }
787
788private:
790 enum class SlotState : uint8_t {
791 FREE = 0,
792 ALLOCATING = 1,
793 STANDBY = 2,
794 ACTIVE = 3,
795 DRAINING = 4
796 };
797
799 struct RingSlot {
800 MappedSegment seg;
801 std::atomic<size_t> write_offset{0}; // bytes written past WAL_FILE_HDR_SIZE
802 uint64_t segment_id{0};
803 uint64_t first_seq{0};
804 uint64_t last_seq{0};
805 std::string file_path;
806 std::atomic<SlotState> state{SlotState::FREE};
807
808 RingSlot() = default;
809 RingSlot(const RingSlot&) = delete;
810 RingSlot& operator=(const RingSlot&) = delete;
811 };
812
813 explicit WalMmapWriter(WalMmapOptions opts)
814 : opts_(std::move(opts)) {}
815
817 size_t usable() const noexcept {
818 return opts_.segment_size - static_cast<size_t>(detail_mmap::MMAP_WAL_FILE_HDR_SIZE);
819 }
820
822 std::string make_segment_path(uint64_t segment_id) const {
823 char buf[32];
824 std::snprintf(buf, sizeof(buf), "%010llu", static_cast<unsigned long long>(segment_id));
825 return opts_.dir + "/" + opts_.name_prefix + "_" + buf + ".wal";
826 }
827
829 void init_slot_header(RingSlot& slot) noexcept {
830 std::memcpy(slot.seg.data(), detail_mmap::MMAP_WAL_FILE_MAGIC, 16);
831 std::atomic_thread_fence(std::memory_order_release);
832 }
833
835 expected<void> allocate_slot(RingSlot& slot, uint64_t segment_id, bool skip_prefault) {
836 slot.segment_id = segment_id;
837 { std::lock_guard<std::mutex> lk(bg_mutex_); slot.file_path = make_segment_path(segment_id); }
838 slot.write_offset = 0;
839 slot.first_seq = 0;
840 slot.last_seq = 0;
841
842 auto r = MappedSegment::create(slot.file_path, opts_.segment_size);
843 if (!r) {
844 slot.state.store(SlotState::FREE, std::memory_order_release);
845 return r.error();
846 }
847 slot.seg = std::move(r.value());
848
849 if (!skip_prefault)
850 slot.seg.prefault();
851
852 // Standby segments are initialized as syntactically valid empty WAL files
853 // so recovery/compaction can safely parse or skip them deterministically.
854 init_slot_header(slot);
855 auto flush_result = slot.seg.flush_async();
856 if (!flush_result) return flush_result.error();
857
858 slot.state.store(SlotState::STANDBY, std::memory_order_release);
859 return {};
860 }
861
863 expected<void> rotate() {
864 RingSlot& old_slot = *ring_[active_idx_.load(std::memory_order_acquire)];
865 old_slot.last_seq = static_cast<uint64_t>(next_seq_.load(std::memory_order_acquire) - 1);
866
867 // Async flush before setting DRAINING so the bg thread doesn't close
868 // the segment before the flush is scheduled
869 auto flush_result = old_slot.seg.flush_async();
870 if (!flush_result) return flush_result.error();
871 old_slot.state.store(SlotState::DRAINING, std::memory_order_release);
872
873 // Try to find a STANDBY slot pre-allocated by the bg thread
874 int standby_idx = find_standby_idx();
875
876 if (standby_idx < 0) {
877 // No standby available — fallback: find a FREE slot and allocate synchronously
878 // (writer thread, skip prefault to minimize latency)
879 for (size_t i = 0; i < ring_.size(); ++i) {
880 SlotState expected_s = SlotState::FREE;
881 if (ring_[i]->state.compare_exchange_strong(
882 expected_s, SlotState::ALLOCATING,
883 std::memory_order_acq_rel,
884 std::memory_order_relaxed)) {
885 uint64_t new_id;
886 { std::lock_guard<std::mutex> lk(bg_mutex_); new_id = next_seg_id_++; }
887 auto r = allocate_slot(*ring_[i], new_id, /*skip_prefault=*/true);
888 if (!r) return r.error();
889 standby_idx = static_cast<int>(i);
890 break;
891 }
892 }
893 }
894
895 if (standby_idx < 0)
896 return Error{ErrorCode::IO_ERROR,
897 "WalMmapWriter: ring full — all segments occupied. "
898 "Process WAL segments before appending more data."};
899
900 RingSlot& new_slot = *ring_[static_cast<size_t>(standby_idx)];
901 new_slot.first_seq = static_cast<uint64_t>(next_seq_.load(std::memory_order_acquire));
902 new_slot.write_offset = 0;
903 init_slot_header(new_slot);
904 new_slot.state.store(SlotState::ACTIVE, std::memory_order_release);
905 active_idx_.store(static_cast<size_t>(standby_idx), std::memory_order_release);
906
907 bg_cv_.notify_one();
908 return {};
909 }
910
912 int find_standby_idx() const noexcept {
913 for (size_t i = 0; i < ring_.size(); ++i) {
914 if (ring_[i]->state.load(std::memory_order_acquire) == SlotState::STANDBY)
915 return static_cast<int>(i);
916 }
917 return -1;
918 }
919
921 static uint32_t compute_crc(const uint8_t* data, size_t len) noexcept {
922 static constexpr auto make_table = []() {
923 std::array<uint32_t, 256> t{};
924 for (uint32_t i = 0; i < 256; ++i) {
925 uint32_t c = i;
926 for (int k = 0; k < 8; ++k)
927 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
928 t[i] = c;
929 }
930 return t;
931 };
932 static constexpr auto tbl = make_table();
933 uint32_t crc = 0xFFFFFFFFu;
934 for (size_t i = 0; i < len; ++i)
935 crc = tbl[(crc ^ data[i]) & 0xFFu] ^ (crc >> 8);
936 return crc ^ 0xFFFFFFFFu;
937 }
938
940 void bg_worker() {
941 while (true) {
942 try {
943 {
944 std::unique_lock<std::mutex> lk(bg_mutex_);
945 bg_cv_.wait_for(lk, std::chrono::milliseconds(100), [this]() -> bool {
946 if (bg_stop_.load(std::memory_order_relaxed)) return true;
947 // Wake if any slot needs servicing
948 for (const auto& sp : ring_) {
949 const auto st = sp->state.load(std::memory_order_acquire);
950 if (st == SlotState::DRAINING || st == SlotState::FREE)
951 return true;
952 }
953 // Wake if active slot is getting full
954 const size_t aidx = active_idx_.load(std::memory_order_acquire);
955 if (aidx < ring_.size()) {
956 const RingSlot& a = *ring_[aidx];
957 if (a.write_offset > usable() * 3 / 4)
958 return true;
959 }
960 return false;
961 });
962 }
963
964 const bool stop = bg_stop_.load(std::memory_order_relaxed);
965
966 // Step 1: Recycle DRAINING slots — close + re-create as fresh segments
967 for (auto& sp : ring_) {
968 RingSlot& slot = *sp;
969 if (slot.state.load(std::memory_order_acquire) != SlotState::DRAINING)
970 continue;
971
972 // Close (unmap) the exhausted segment
973 slot.seg.close();
974
975 // Allocate a new segment into this slot
976 uint64_t new_id;
977 { std::lock_guard<std::mutex> lk(bg_mutex_); new_id = next_seg_id_++; }
978
979 // Transition DRAINING → FREE so allocate_slot CAS succeeds
980 slot.state.store(SlotState::FREE, std::memory_order_release);
981
982 // allocate_slot does its own CAS (FREE → ALLOCATING → STANDBY)
983 // Use a temporary ALLOCATING state here
984 SlotState expected_s = SlotState::FREE;
985 if (slot.state.compare_exchange_strong(
986 expected_s, SlotState::ALLOCATING,
987 std::memory_order_acq_rel,
988 std::memory_order_relaxed)) {
989 auto alloc_result = allocate_slot(slot, new_id, /*skip_prefault=*/false);
990 if (!alloc_result) {
991 fprintf(stderr, "[SIGNET ERROR] WalMmapWriter bg: allocate_slot failed: %s\n",
992 alloc_result.error().message.c_str());
993 // Slot transitions back to FREE so rotate() can retry
994 }
995 }
996 }
997
998 // Step 2: Ensure at least 1 STANDBY slot exists
999 int standby_count = 0;
1000 for (const auto& sp : ring_) {
1001 if (sp->state.load(std::memory_order_acquire) == SlotState::STANDBY)
1002 ++standby_count;
1003 }
1004 if (standby_count < 1) {
1005 for (auto& sp : ring_) {
1006 RingSlot& slot = *sp;
1007 SlotState expected_s = SlotState::FREE;
1008 if (slot.state.compare_exchange_strong(
1009 expected_s, SlotState::ALLOCATING,
1010 std::memory_order_acq_rel,
1011 std::memory_order_relaxed)) {
1012 uint64_t new_id;
1013 { std::lock_guard<std::mutex> lk(bg_mutex_); new_id = next_seg_id_++; }
1014 (void)allocate_slot(slot, new_id, /*skip_prefault=*/false);
1015 break;
1016 }
1017 }
1018 }
1019
1020 if (stop) break;
1021
1022 } catch (...) {
1023 // Prevent std::terminate — bg thread must not throw
1024 if (bg_stop_.load(std::memory_order_relaxed)) break;
1025 }
1026 }
1027 }
1028
1029 // --- Data members ---
1030 WalMmapOptions opts_;
1031 std::vector<std::unique_ptr<RingSlot>> ring_;
1032 std::atomic<size_t> active_idx_{0}; // CWE-362: atomic for lock-free publish path
1033 std::atomic<int64_t> next_seq_{0}; // H-11: atomic to prevent data race with bg thread
1034 uint64_t next_seg_id_ = 0; // protected by bg_mutex_
1035 std::atomic<bool> closed_{false}; // CWE-362: atomic for safe cross-thread close detection
1036 bool bg_deferred_ = false; // bg thread start deferred until after move
1037
1038 std::thread bg_thread_;
1039 mutable std::mutex bg_mutex_;
1040 std::condition_variable bg_cv_;
1041 std::atomic<bool> bg_stop_{false};
1042};
1043
1044} // namespace signet::forge
RAII cross-platform memory-mapped file segment.
MappedSegment & operator=(const MappedSegment &)=delete
MappedSegment & operator=(MappedSegment &&o) noexcept
uint8_t * data() const noexcept
Pointer to the start of the mapped memory region.
MappedSegment(const MappedSegment &)=delete
expected< void > flush_async() noexcept
Asynchronously flush dirty pages to storage (non-blocking).
const std::string & path() const noexcept
File path of this segment.
size_t capacity() const noexcept
Size of the mapped region in bytes.
void prefault() noexcept
Pre-fault every 4 KB page in the mapping by reading one byte per page.
static expected< MappedSegment > create(const std::string &path, size_t size) noexcept
Create (overwrite) and memory-map a new segment file of the given byte size.
expected< void > flush_sync() noexcept
Synchronously flush dirty pages to storage (blocks until complete).
MappedSegment()=default
Default-construct an invalid (unmapped) segment.
bool is_valid() const noexcept
Check whether the segment is currently mapped and valid.
MappedSegment(MappedSegment &&o) noexcept
void close() noexcept
Unmap and close all OS handles. Safe to call multiple times.
High-performance WAL writer using a ring of N memory-mapped segments.
expected< int64_t > append(std::string_view sv)
Append a record from a string_view.
expected< int64_t > append(const uint8_t *data, size_t size)
Append a record to the WAL.
int64_t next_seq() const noexcept
Return the next sequence number that will be assigned.
expected< void > flush(bool do_sync=false)
Flush the active segment to storage.
expected< int64_t > append(const char *data, size_t size)
Append a record from a char buffer.
std::vector< std::string > segment_paths() const
Return paths of all current ring segments plus any older WAL files in the directory.
expected< void > close()
Close the writer: stop the background thread, flush and close all segments.
WalMmapWriter(const WalMmapWriter &)=delete
WalMmapWriter & operator=(WalMmapWriter &&o)
bool is_open() const noexcept
Check whether the writer is still open (not yet closed).
static expected< WalMmapWriter > open(WalMmapOptions opts)
Open a new WalMmapWriter in the given directory.
expected< int64_t > append(const std::vector< uint8_t > &v)
Append a record from a byte vector.
WalMmapWriter & operator=(const WalMmapWriter &)=delete
const std::string & dir() const noexcept
Return the output directory path.
const Error & error() const
Access the error payload (valid for both success and failure; check ok() on the returned Error).
Definition error.hpp:261
A lightweight result type that holds either a success value of type T or an Error.
Definition error.hpp:145
void write_le32(uint8_t *dst, uint32_t v) noexcept
Write a 32-bit value in little-endian byte order.
int64_t now_ns() noexcept
Return the current wall-clock time in nanoseconds since epoch.
void write_le64(uint8_t *dst, uint64_t v) noexcept
Write a 64-bit value in little-endian byte order.
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
Lightweight error value carrying an ErrorCode and a human-readable message.
Definition error.hpp:101
Configuration options for WalMmapWriter::open().
std::string name_prefix
Filename prefix for segment files.
size_t ring_segments
Number of ring segments (2 to WAL_MMAP_MAX_RING).
bool use_large_pages
Enable huge pages (requires OS privileges).
std::string dir
Output directory for segment files.
size_t segment_size
Size of each segment in bytes (multiple of 65536).
int64_t start_seq
Starting sequence number for the first record.
bool sync_on_flush
If true, use MS_SYNC on flush() instead of MS_ASYNC.
bool sync_on_append
If true, flush_async() after every append (HFT: false).