458 if (!o.bg_deferred_) {
459 o.bg_stop_.store(
true, std::memory_order_release);
460 o.bg_cv_.notify_all();
461 if (o.bg_thread_.joinable())
465 opts_ = std::move(o.opts_);
466 ring_ = std::move(o.ring_);
467 active_idx_.store(o.active_idx_.load(std::memory_order_acquire), std::memory_order_release);
468 next_seq_.store(o.next_seq_.load(std::memory_order_acquire), std::memory_order_release);
469 next_seg_id_ = o.next_seg_id_;
470 closed_.store(o.closed_.load(std::memory_order_acquire), std::memory_order_release);
471 bg_deferred_ =
false;
472 bg_stop_.store(
false, std::memory_order_release);
473 if (!closed_.load(std::memory_order_acquire) && !ring_.empty())
474 bg_thread_ = std::thread(&WalMmapWriter::bg_worker,
this);
476 o.closed_.store(
true, std::memory_order_release);
477 o.active_idx_.store(0, std::memory_order_release);
484 if (!o.bg_deferred_) {
485 o.bg_stop_.store(
true, std::memory_order_release);
486 o.bg_cv_.notify_all();
487 if (o.bg_thread_.joinable())
491 opts_ = std::move(o.opts_);
492 ring_ = std::move(o.ring_);
493 active_idx_.store(o.active_idx_.load(std::memory_order_acquire), std::memory_order_release);
494 next_seq_.store(o.next_seq_.load(std::memory_order_acquire), std::memory_order_release);
495 next_seg_id_ = o.next_seg_id_;
496 closed_.store(o.closed_.load(std::memory_order_acquire), std::memory_order_release);
497 bg_deferred_ =
false;
498 bg_stop_.store(
false, std::memory_order_release);
499 if (!closed_.load(std::memory_order_acquire) && !ring_.empty())
500 bg_thread_ = std::thread(&WalMmapWriter::bg_worker,
this);
502 o.closed_.store(
true, std::memory_order_release);
503 o.active_idx_.store(0, std::memory_order_release);
509 if (!closed_.load(std::memory_order_acquire))
525 "WalMmapWriter: ring_segments must be in [2, WAL_MMAP_MAX_RING]"};
531 "WalMmapWriter: segment_size must be multiple of 65536"};
532 if (opts.
dir.empty())
538 std::filesystem::path p(opts.
dir);
539 for (
const auto& comp : p) {
542 "WalMmapWriter: path traversal detected in dir"};
546 if (opts.
dir.find(
"..") != std::string::npos)
548 "WalMmapWriter: path traversal detected in dir"};
553 std::filesystem::create_directories(opts.
dir, ec);
557 if (!std::filesystem::is_directory(opts.
dir, ec2))
559 std::string(
"WalMmapWriter: cannot create dir '") +
560 opts.
dir +
"': " + ec.message()};
566 w.next_seq_.store(opts.
start_seq, std::memory_order_relaxed);
570 w.ring_.push_back(std::make_unique<RingSlot>());
574 { std::lock_guard<std::mutex> lk(w.bg_mutex_); first_id = w.next_seg_id_++; }
575 auto r0 = w.allocate_slot(*w.ring_[0], first_id,
false);
576 if (!r0)
return r0.
error();
578 w.ring_[0]->first_seq =
static_cast<uint64_t
>(w.next_seq_.load(std::memory_order_relaxed));
579 w.ring_[0]->write_offset = 0;
580 w.ring_[0]->state.store(SlotState::ACTIVE, std::memory_order_release);
581 w.active_idx_.store(0, std::memory_order_release);
582 w.init_slot_header(*w.ring_[0]);
587 w.bg_deferred_ =
true;
602 if (closed_.load(std::memory_order_acquire))
604 if (size >
static_cast<size_t>(UINT32_MAX))
607 if (size > WAL_MMAP_MAX_RECORD_SIZE)
610 const size_t entry_size = 28 + size;
612 if (entry_size > usable())
614 "WalMmapWriter: record larger than segment usable space"};
617 RingSlot* active = ring_[active_idx_.load(std::memory_order_acquire)].get();
618 if (active->write_offset + entry_size > usable()) {
620 if (!r)
return r.error();
621 active = ring_[active_idx_.load(std::memory_order_acquire)].get();
626 if (active->seg.data() ==
nullptr ||
627 active->seg.data() + detail_mmap::MMAP_WAL_FILE_HDR_SIZE +
628 active->write_offset + entry_size
629 > active->seg.data() + active->seg.capacity()) {
631 "WalMmapWriter: write would exceed mmap'd segment bounds"};
633 assert(active->seg.data() !=
nullptr);
634 assert(active->seg.data() + detail_mmap::MMAP_WAL_FILE_HDR_SIZE +
635 active->write_offset + entry_size
636 <= active->seg.data() + active->seg.capacity());
638 const int64_t seq = next_seq_.fetch_add(1, std::memory_order_relaxed);
640 const auto dsz =
static_cast<uint32_t
>(size);
642 uint8_t* dst = active->seg.data()
643 + detail_mmap::MMAP_WAL_FILE_HDR_SIZE
644 + active->write_offset;
654 std::memcpy(dst + 24, data, size);
657 const uint32_t crc = compute_crc(dst, 24 + size);
661 std::atomic_thread_fence(std::memory_order_release);
666 active->write_offset += entry_size;
667 active->last_seq =
static_cast<uint64_t
>(seq);
670 auto flush_result = active->seg.flush_async();
671 if (!flush_result)
return flush_result.error();
675 if (active->write_offset > usable() * 3 / 4)
686 return append(
reinterpret_cast<const uint8_t*
>(data), size);
693 return append(
reinterpret_cast<const uint8_t*
>(sv.data()), sv.size());
700 return append(v.data(), v.size());
707 if (closed_.load(std::memory_order_acquire))
709 RingSlot& active = *ring_[active_idx_.load(std::memory_order_acquire)];
711 return active.seg.flush_sync();
712 return active.seg.flush_async();
718 if (closed_.load(std::memory_order_acquire))
return {};
719 closed_.store(
true, std::memory_order_release);
722 bg_stop_.store(
true, std::memory_order_release);
724 if (bg_thread_.joinable())
728 std::optional<Error> first_error;
729 for (
auto& sp : ring_) {
731 const auto st = sp->state.load(std::memory_order_acquire);
732 if (st == SlotState::ACTIVE) {
733 auto flush_result = sp->seg.flush_sync();
734 if (!flush_result && !first_error.has_value()) {
735 first_error = flush_result.error();
740 if (first_error.has_value())
return *first_error;
745 [[nodiscard]]
bool is_open() const noexcept {
return !closed_.load(std::memory_order_acquire); }
747 [[nodiscard]] int64_t
next_seq() const noexcept {
return next_seq_.load(std::memory_order_acquire); }
749 [[nodiscard]]
const std::string&
dir() const noexcept {
return opts_.
dir; }
756 std::vector<std::string> paths;
761 std::lock_guard<std::mutex> lk(bg_mutex_);
762 for (
const auto& sp : ring_) {
763 const auto st = sp->state.load(std::memory_order_acquire);
764 if (!sp->file_path.empty() &&
765 st != SlotState::FREE && st != SlotState::ALLOCATING)
766 paths.push_back(sp->file_path);
772 for (
const auto& entry : std::filesystem::directory_iterator(opts_.
dir, ec)) {
774 if (!entry.is_regular_file())
continue;
775 const std::string fname = entry.path().filename().string();
779 const std::string fpath = entry.path().string();
780 if (std::find(paths.begin(), paths.end(), fpath) == paths.end())
781 paths.push_back(fpath);
784 std::sort(paths.begin(), paths.end());
790 enum class SlotState : uint8_t {
801 std::atomic<size_t> write_offset{0};
802 uint64_t segment_id{0};
803 uint64_t first_seq{0};
804 uint64_t last_seq{0};
805 std::string file_path;
806 std::atomic<SlotState> state{SlotState::FREE};
808 RingSlot() =
default;
809 RingSlot(
const RingSlot&) =
delete;
810 RingSlot& operator=(
const RingSlot&) =
delete;
814 : opts_(std::move(opts)) {}
817 size_t usable() const noexcept {
818 return opts_.
segment_size -
static_cast<size_t>(detail_mmap::MMAP_WAL_FILE_HDR_SIZE);
822 std::string make_segment_path(uint64_t segment_id)
const {
824 std::snprintf(buf,
sizeof(buf),
"%010llu",
static_cast<unsigned long long>(segment_id));
829 void init_slot_header(RingSlot& slot)
noexcept {
830 std::memcpy(slot.seg.data(), detail_mmap::MMAP_WAL_FILE_MAGIC, 16);
831 std::atomic_thread_fence(std::memory_order_release);
835 expected<void> allocate_slot(RingSlot& slot, uint64_t segment_id,
bool skip_prefault) {
836 slot.segment_id = segment_id;
837 { std::lock_guard<std::mutex> lk(bg_mutex_); slot.file_path = make_segment_path(segment_id); }
838 slot.write_offset = 0;
844 slot.state.store(SlotState::FREE, std::memory_order_release);
847 slot.seg = std::move(r.value());
854 init_slot_header(slot);
855 auto flush_result = slot.seg.flush_async();
856 if (!flush_result)
return flush_result.error();
858 slot.state.store(SlotState::STANDBY, std::memory_order_release);
863 expected<void> rotate() {
864 RingSlot& old_slot = *ring_[active_idx_.load(std::memory_order_acquire)];
865 old_slot.last_seq =
static_cast<uint64_t
>(next_seq_.load(std::memory_order_acquire) - 1);
869 auto flush_result = old_slot.seg.flush_async();
870 if (!flush_result)
return flush_result.error();
871 old_slot.state.store(SlotState::DRAINING, std::memory_order_release);
874 int standby_idx = find_standby_idx();
876 if (standby_idx < 0) {
879 for (
size_t i = 0; i < ring_.size(); ++i) {
880 SlotState expected_s = SlotState::FREE;
881 if (ring_[i]->state.compare_exchange_strong(
882 expected_s, SlotState::ALLOCATING,
883 std::memory_order_acq_rel,
884 std::memory_order_relaxed)) {
886 { std::lock_guard<std::mutex> lk(bg_mutex_); new_id = next_seg_id_++; }
887 auto r = allocate_slot(*ring_[i], new_id,
true);
888 if (!r)
return r.error();
889 standby_idx =
static_cast<int>(i);
897 "WalMmapWriter: ring full — all segments occupied. "
898 "Process WAL segments before appending more data."};
900 RingSlot& new_slot = *ring_[
static_cast<size_t>(standby_idx)];
901 new_slot.first_seq =
static_cast<uint64_t
>(next_seq_.load(std::memory_order_acquire));
902 new_slot.write_offset = 0;
903 init_slot_header(new_slot);
904 new_slot.state.store(SlotState::ACTIVE, std::memory_order_release);
905 active_idx_.store(
static_cast<size_t>(standby_idx), std::memory_order_release);
912 int find_standby_idx() const noexcept {
913 for (
size_t i = 0; i < ring_.size(); ++i) {
914 if (ring_[i]->state.load(std::memory_order_acquire) == SlotState::STANDBY)
915 return static_cast<int>(i);
921 static uint32_t compute_crc(
const uint8_t* data,
size_t len)
noexcept {
922 static constexpr auto make_table = []() {
923 std::array<uint32_t, 256> t{};
924 for (uint32_t i = 0; i < 256; ++i) {
926 for (
int k = 0; k < 8; ++k)
927 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
932 static constexpr auto tbl = make_table();
933 uint32_t crc = 0xFFFFFFFFu;
934 for (
size_t i = 0; i < len; ++i)
935 crc = tbl[(crc ^ data[i]) & 0xFFu] ^ (crc >> 8);
936 return crc ^ 0xFFFFFFFFu;
944 std::unique_lock<std::mutex> lk(bg_mutex_);
945 bg_cv_.wait_for(lk, std::chrono::milliseconds(100), [
this]() ->
bool {
946 if (bg_stop_.load(std::memory_order_relaxed))
return true;
948 for (
const auto& sp : ring_) {
949 const auto st = sp->state.load(std::memory_order_acquire);
950 if (st == SlotState::DRAINING || st == SlotState::FREE)
954 const size_t aidx = active_idx_.load(std::memory_order_acquire);
955 if (aidx < ring_.size()) {
956 const RingSlot& a = *ring_[aidx];
957 if (a.write_offset > usable() * 3 / 4)
964 const bool stop = bg_stop_.load(std::memory_order_relaxed);
967 for (
auto& sp : ring_) {
968 RingSlot& slot = *sp;
969 if (slot.state.load(std::memory_order_acquire) != SlotState::DRAINING)
977 { std::lock_guard<std::mutex> lk(bg_mutex_); new_id = next_seg_id_++; }
980 slot.state.store(SlotState::FREE, std::memory_order_release);
984 SlotState expected_s = SlotState::FREE;
985 if (slot.state.compare_exchange_strong(
986 expected_s, SlotState::ALLOCATING,
987 std::memory_order_acq_rel,
988 std::memory_order_relaxed)) {
989 auto alloc_result = allocate_slot(slot, new_id,
false);
991 fprintf(stderr,
"[SIGNET ERROR] WalMmapWriter bg: allocate_slot failed: %s\n",
992 alloc_result.error().message.c_str());
999 int standby_count = 0;
1000 for (
const auto& sp : ring_) {
1001 if (sp->state.load(std::memory_order_acquire) == SlotState::STANDBY)
1004 if (standby_count < 1) {
1005 for (
auto& sp : ring_) {
1006 RingSlot& slot = *sp;
1007 SlotState expected_s = SlotState::FREE;
1008 if (slot.state.compare_exchange_strong(
1009 expected_s, SlotState::ALLOCATING,
1010 std::memory_order_acq_rel,
1011 std::memory_order_relaxed)) {
1013 { std::lock_guard<std::mutex> lk(bg_mutex_); new_id = next_seg_id_++; }
1014 (void)allocate_slot(slot, new_id,
false);
1024 if (bg_stop_.load(std::memory_order_relaxed))
break;
1030 WalMmapOptions opts_;
1031 std::vector<std::unique_ptr<RingSlot>> ring_;
1032 std::atomic<size_t> active_idx_{0};
1033 std::atomic<int64_t> next_seq_{0};
1034 uint64_t next_seg_id_ = 0;
1035 std::atomic<bool> closed_{
false};
1036 bool bg_deferred_ =
false;
1038 std::thread bg_thread_;
1039 mutable std::mutex bg_mutex_;
1040 std::condition_variable bg_cv_;
1041 std::atomic<bool> bg_stop_{
false};