33# ifndef WIN32_LEAN_AND_MEAN
34# define WIN32_LEAN_AND_MEAN
55static constexpr uint32_t WAL_RECORD_MAGIC = 0x57414C31u;
58static constexpr uint16_t WAL_FILE_HDR_SIZE = 16;
61static constexpr char WAL_FILE_MAGIC[16] =
"SIGNETWAL1\0\0\0\0\0";
67static constexpr uint32_t WAL_MAX_RECORD_SIZE = 64u * 1024u * 1024u;
85inline uint32_t
crc32(
const void* data,
size_t length)
noexcept {
87 static constexpr auto make_table = []() {
88 std::array<uint32_t, 256> t{};
89 for (uint32_t i = 0; i < 256; ++i) {
91 for (
int k = 0; k < 8; ++k)
92 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
97 static constexpr auto table = make_table();
99 uint32_t crc = 0xFFFFFFFFu;
100 const auto* p =
static_cast<const uint8_t*
>(data);
101 for (
size_t i = 0; i < length; ++i)
102 crc = table[(crc ^ p[i]) & 0xFFu] ^ (crc >> 8);
103 return crc ^ 0xFFFFFFFFu;
108inline uint32_t
crc32_combine(uint32_t crc_a,
const void* data_b,
size_t len_b)
noexcept {
113 (void)crc_a; (void)data_b; (void)len_b;
121 struct timespec ts{};
123 timespec_get(&ts, TIME_UTC);
125 ::clock_gettime(CLOCK_REALTIME, &ts);
127 return static_cast<int64_t
>(ts.tv_sec) * 1'000'000'000LL + ts.tv_nsec;
134 dst[0] =
static_cast<uint8_t
>(v);
135 dst[1] =
static_cast<uint8_t
>(v >> 8);
136 dst[2] =
static_cast<uint8_t
>(v >> 16);
137 dst[3] =
static_cast<uint8_t
>(v >> 24);
143 dst[0] =
static_cast<uint8_t
>(v);
144 dst[1] =
static_cast<uint8_t
>(v >> 8);
145 dst[2] =
static_cast<uint8_t
>(v >> 16);
146 dst[3] =
static_cast<uint8_t
>(v >> 24);
147 dst[4] =
static_cast<uint8_t
>(v >> 32);
148 dst[5] =
static_cast<uint8_t
>(v >> 40);
149 dst[6] =
static_cast<uint8_t
>(v >> 48);
150 dst[7] =
static_cast<uint8_t
>(v >> 56);
156 return static_cast<uint32_t
>(src[0])
157 | (
static_cast<uint32_t
>(src[1]) << 8)
158 | (
static_cast<uint32_t
>(src[2]) << 16)
159 | (
static_cast<uint32_t
>(src[3]) << 24);
165 return static_cast<uint64_t
>(src[0])
166 | (
static_cast<uint64_t
>(src[1]) << 8)
167 | (
static_cast<uint64_t
>(src[2]) << 16)
168 | (
static_cast<uint64_t
>(src[3]) << 24)
169 | (
static_cast<uint64_t
>(src[4]) << 32)
170 | (
static_cast<uint64_t
>(src[5]) << 40)
171 | (
static_cast<uint64_t
>(src[6]) << 48)
172 | (
static_cast<uint64_t
>(src[7]) << 56);
181#if defined(__APPLE__)
182 return ::fcntl(fd, F_FULLFSYNC);
184 HANDLE h =
reinterpret_cast<HANDLE
>(
185 _get_osfhandle(
static_cast<int>(fd)));
186 if (h == INVALID_HANDLE_VALUE)
return -1;
187 return FlushFileBuffers(h) ? 0 : -1;
245 : file_(
nullptr), closed_(
true)
247 std::lock_guard<std::mutex> lk(o.mu_);
249 path_ = std::move(o.path_);
250 next_seq_ = o.next_seq_;
251 bytes_written_ = o.bytes_written_;
274 f = std::fopen(
path.c_str(),
"a+b");
277 int fd =
::open(
path.c_str(), O_RDWR | O_CREAT | O_APPEND, 0600);
278 if (fd >= 0) f = ::fdopen(fd,
"a+b");
282 std::string(
"WalWriter: cannot open '") +
path +
"': " + std::strerror(errno)};
285 if (opts.buffer_size > 0)
286 std::setvbuf(f,
nullptr, _IOFBF, opts.buffer_size);
292 if (_fseeki64(f, 0, SEEK_END) != 0) {
294 if (fseeko(f, 0, SEEK_END) != 0) {
303 int64_t file_size = _ftelli64(f);
305 long file_size = std::ftell(f);
315 if (file_size == 0) {
319 if (std::fwrite(WAL_FILE_MAGIC, 1, WAL_FILE_HDR_SIZE, f) != WAL_FILE_HDR_SIZE) {
331 FILE* rf = std::fopen(
path.c_str(),
"rb");
336 _fseeki64(rf, WAL_FILE_HDR_SIZE, SEEK_SET);
338 fseeko(rf, WAL_FILE_HDR_SIZE, SEEK_SET);
341 int64_t last_seq = -1;
344 if (std::fread(hdr, 1, 24, rf) != 24)
break;
346 if (magic != WAL_RECORD_MAGIC)
break;
351 if (data_sz > WAL_MAX_RECORD_SIZE)
break;
353 std::vector<uint8_t> record_data(data_sz);
354 if (data_sz > 0 && std::fread(record_data.data(), 1, data_sz, rf) != data_sz)
break;
356 if (std::fread(crc_buf, 1, 4, rf) != 4)
break;
359 std::vector<uint8_t> combined(24 + data_sz);
360 std::memcpy(combined.data(), hdr, 24);
362 std::memcpy(combined.data() + 24, record_data.data(), data_sz);
363 uint32_t computed_crc =
detail::crc32(combined.data(), combined.size());
364 if (computed_crc != stored_crc)
break;
368 if (last_seq >= 0)
next_seq = last_seq + 1;
373 f = std::fopen(
path.c_str(),
"ab");
376 int fd2 =
::open(
path.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0600);
377 if (fd2 >= 0) f = ::fdopen(fd2,
"ab");
381 std::string(
"WalWriter: cannot reopen '") +
path +
"': " + std::strerror(errno)};
382 if (opts.buffer_size > 0)
383 std::setvbuf(f,
nullptr, _IOFBF, opts.buffer_size);
398 std::lock_guard<std::mutex> lk(mu_);
402 ++rejected_empty_count_;
405 if (size > WAL_MAX_RECORD_SIZE)
407 if (size >
static_cast<size_t>(UINT32_MAX))
410 const int64_t seq = next_seq_;
412 const uint32_t dsz =
static_cast<uint32_t
>(size);
425 static constexpr auto make_table = []() {
426 std::array<uint32_t, 256> t{};
427 for (uint32_t i = 0; i < 256; ++i) {
429 for (
int k = 0; k < 8; ++k)
430 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
435 static constexpr auto tbl = make_table();
437 uint32_t crc = 0xFFFFFFFFu;
438 for (
size_t i = 0; i < 24; ++i)
439 crc = tbl[(crc ^ hdr[i]) & 0xFFu] ^ (crc >> 8);
440 for (
size_t i = 0; i < size; ++i)
441 crc = tbl[(crc ^ data[i]) & 0xFFu] ^ (crc >> 8);
448 if (std::fwrite(hdr, 1, 24, file_) != 24)
return io_err(
"fwrite hdr");
450 if (std::fwrite(data, 1, size, file_) != size)
return io_err(
"fwrite data");
451 if (std::fwrite(crc_buf, 1, 4, file_) != 4)
return io_err(
"fwrite crc");
453 bytes_written_ +=
static_cast<int64_t
>(28 + size);
457 if (std::fflush(file_) != 0)
470 return append(data.data(), data.size());
478 return append(
reinterpret_cast<const uint8_t*
>(data), size);
485 return append(
reinterpret_cast<const uint8_t*
>(sv.data()), sv.size());
496 std::lock_guard<std::mutex> lk(mu_);
499 if (std::fflush(file_) != 0)
511 std::lock_guard<std::mutex> lk(mu_);
512 if (closed_)
return {};
514 if (std::fflush(file_) != 0) {
515 std::fclose(file_); file_ =
nullptr;
519 std::fclose(file_); file_ =
nullptr;
528 [[nodiscard]]
bool is_open() const noexcept {
return !closed_; }
530 [[nodiscard]] int64_t
next_seq() const noexcept {
return next_seq_; }
532 [[nodiscard]]
const std::string&
path() const noexcept {
return path_; }
534 [[nodiscard]] int64_t
bytes_written() const noexcept {
return bytes_written_; }
541 : file_(f), path_(std::move(
path)),
543 opts_(opts), closed_(false) {}
545 static expected<int64_t> io_err(
const char* ctx) {
547 std::string(
"WalWriter: ") + ctx +
": " + std::strerror(errno)};
550 FILE* file_ =
nullptr;
552 int64_t next_seq_ = 0;
553 int64_t bytes_written_ = 0;
554 uint64_t rejected_empty_count_ = 0;
556 bool closed_ =
false;
582 : file_(o.file_), path_(std::move(o.path_)),
583 last_seq_(o.last_seq_), count_(o.count_), offset_(o.offset_)
584 { o.file_ =
nullptr; }
598 const std::string& path,
600 FILE* f = std::fopen(path.c_str(),
"rb");
602 return Error{ErrorCode::IO_ERROR,
603 std::string(
"WalReader: cannot open '") + path +
"': " + std::strerror(errno)};
606 char hdr[WAL_FILE_HDR_SIZE];
607 if (std::fread(hdr, 1, WAL_FILE_HDR_SIZE, f) != WAL_FILE_HDR_SIZE) {
609 return Error{ErrorCode::INVALID_FILE,
"WalReader: file too short for header"};
611 if (std::memcmp(hdr, WAL_FILE_MAGIC, WAL_FILE_HDR_SIZE) != 0) {
613 return Error{ErrorCode::INVALID_FILE,
"WalReader: bad file magic"};
616 return WalReader(f, path, WAL_FILE_HDR_SIZE, mode);
627 if (!file_)
return std::optional<WalEntry>{std::nullopt};
631 size_t n = std::fread(hdr, 1, 24, file_);
634 if (std::ferror(file_))
635 return Error{ErrorCode::IO_ERROR,
"WalReader: fread header failed"};
636 return std::optional<WalEntry>{std::nullopt};
639 return soft_stop_or_error(
"truncated record header");
641 uint32_t magic = detail::read_le32(hdr);
642 if (magic != WAL_RECORD_MAGIC)
643 return soft_stop_or_error(
"bad record magic");
645 int64_t seq =
static_cast<int64_t
>(detail::read_le64(hdr + 4));
646 int64_t ts =
static_cast<int64_t
>(detail::read_le64(hdr + 12));
647 uint32_t data_sz = detail::read_le32(hdr + 20);
651 if (data_sz > WAL_MAX_RECORD_SIZE) {
652 return soft_stop_or_error(
"record size exceeds WAL_MAX_RECORD_SIZE");
656 std::vector<uint8_t> raw_buf(data_sz);
658 size_t nr = std::fread(raw_buf.data(), 1, data_sz, file_);
660 if (std::ferror(file_))
661 return Error{ErrorCode::IO_ERROR,
"WalReader: fread payload failed"};
662 return soft_stop_or_error(
"truncated payload");
668 if (std::fread(crc_buf, 1, 4, file_) != 4)
669 return soft_stop_or_error(
"truncated record CRC");
671 uint32_t stored_crc = detail::read_le32(crc_buf);
674 static constexpr auto make_table = []() {
675 std::array<uint32_t, 256> t{};
676 for (uint32_t i = 0; i < 256; ++i) {
678 for (
int k = 0; k < 8; ++k)
679 c = (c & 1u) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
684 static constexpr auto tbl = make_table();
686 uint32_t crc = 0xFFFFFFFFu;
687 for (
size_t i = 0; i < 24; ++i)
688 crc = tbl[(crc ^ hdr[i]) & 0xFFu] ^ (crc >> 8);
689 for (
size_t i = 0; i < data_sz; ++i)
690 crc = tbl[(crc ^ raw_buf[i]) & 0xFFu] ^ (crc >> 8);
693 if (crc != stored_crc)
694 return soft_stop_or_error(
"record CRC mismatch");
696 offset_ += 24 + data_sz + 4;
703 entry.
payload = std::move(raw_buf);
704 return std::optional<WalEntry>{std::move(entry)};
714 std::vector<WalEntry> results;
717 if (!res)
return res.error();
718 if (!res->has_value())
break;
719 results.push_back(std::move(res->value()));
725 [[nodiscard]] int64_t
last_seq() const noexcept {
return last_seq_; }
727 [[nodiscard]] int64_t
count() const noexcept {
return count_; }
729 [[nodiscard]]
size_t offset() const noexcept {
return offset_; }
733 if (file_) { std::fclose(file_); file_ =
nullptr; }
738 const char* reason)
const {
739 if (validation_mode_ == ValidationMode::Strict) {
740 return Error{ErrorCode::CORRUPT_DATA,
741 std::string(
"WalReader: ") + reason +
742 " in '" + path_ +
"' at offset " + std::to_string(offset_)};
744 return std::optional<WalEntry>{std::nullopt};
747 WalReader(FILE* f, std::string path,
size_t initial_offset, ValidationMode mode)
748 : file_(f), path_(std::move(path)), offset_(initial_offset), validation_mode_(mode) {}
750 FILE* file_ =
nullptr;
752 int64_t last_seq_ = -1;
753 ValidationMode validation_mode_ = ValidationMode::BestEffort;
755 size_t offset_ = WAL_FILE_HDR_SIZE;
778 size_t max_segment_bytes = 64 * 1024 * 1024;
779 size_t max_records = 1'000'000;
780 bool sync_on_append =
false;
781 bool sync_on_roll =
true;
782 bool strict_replay =
true;
783 std::string file_prefix =
"wal";
784 std::string file_ext =
".wal";
788 bool reset_on_open =
false;
789 bool require_checkpoint_before_prune =
false;
814 : dir_(std::move(o.dir_)), opts_(o.opts_),
815 segments_(std::move(o.segments_)),
816 writer_(std::move(o.writer_)),
817 global_seq_(o.global_seq_),
818 segment_record_count_(o.segment_record_count_),
819 total_records_(o.total_records_),
820 prune_checkpoint_ready_(o.prune_checkpoint_ready_),
822 { o.closed_ =
true; }
838 std::error_code mk_ec;
839 std::filesystem::create_directories(dir, mk_ec);
841 std::error_code isdir_ec;
842 if (!std::filesystem::is_directory(dir, isdir_ec)) {
843 return Error{ErrorCode::IO_ERROR,
844 std::string(
"WalManager: cannot create dir '") + dir +
"': " + mk_ec.
message()};
848 if (opts.reset_on_open && opts.lifecycle_mode == WalLifecycleMode::Production) {
849 return Error{ErrorCode::IO_ERROR,
850 "WalManager: reset_on_open denied in production mode"};
853 if (opts.reset_on_open) {
854 auto reset_paths = scan_segments(dir, opts);
855 for (
const auto& seg : reset_paths) {
856 std::error_code rm_ec;
857 std::filesystem::remove(seg, rm_ec);
858 if (rm_ec && rm_ec != std::make_error_code(std::errc::no_such_file_or_directory)) {
859 return Error{ErrorCode::IO_ERROR,
860 std::string(
"WalManager: reset remove '") + seg +
"': " + rm_ec.message()};
866 std::vector<std::string> existing = scan_segments(dir, opts);
869 int64_t global_seq = 0;
870 if (!existing.empty()) {
871 auto rd = WalReader::open(existing.back(),
872 opts.strict_replay ? WalReader::ValidationMode::Strict
873 : WalReader::ValidationMode::BestEffort);
875 if (opts.strict_replay)
return rd.error();
877 auto entries = rd->read_all();
879 if (opts.strict_replay)
return entries.error();
880 }
else if (rd->last_seq() >= 0) {
881 global_seq = rd->last_seq() + 1;
888 for (
const auto& seg : existing) {
889 auto rd = WalReader::open(seg,
890 opts.strict_replay ? WalReader::ValidationMode::Strict
891 : WalReader::ValidationMode::BestEffort);
893 if (opts.strict_replay)
return rd.error();
895 auto res = rd->read_all();
897 if (opts.strict_replay)
return res.error();
899 total +=
static_cast<int64_t
>(res->size());
905 WalWriter::Options wopts;
906 wopts.sync_on_append = opts.sync_on_append;
907 wopts.sync_on_flush = opts.sync_on_roll;
908 wopts.start_seq = global_seq;
910 std::string seg_path = make_segment_path(dir, opts, global_seq);
911 auto writer = WalWriter::open(seg_path, wopts);
912 if (!writer)
return writer.error();
917 mgr.segments_ = std::move(existing);
918 mgr.segments_.push_back(seg_path);
919 mgr.writer_ = std::make_unique<WalWriter>(std::move(writer.value()));
920 mgr.global_seq_ = global_seq;
921 mgr.segment_record_count_ = 0;
922 mgr.total_records_ = total;
923 mgr.prune_checkpoint_ready_ = !opts.require_checkpoint_before_prune;
934 std::lock_guard<std::mutex> lk(mu_);
936 return Error{ErrorCode::IO_ERROR,
"WalManager: already closed"};
939 bool need_roll =
false;
940 if (opts_.max_records > 0 &&
static_cast<size_t>(segment_record_count_) >= opts_.max_records)
942 if (!need_roll && opts_.max_segment_bytes > 0) {
943 int64_t bw = writer_->bytes_written();
944 if (
static_cast<size_t>(bw) + 28 + size > opts_.max_segment_bytes)
949 auto rc = roll_locked();
950 if (!rc)
return rc.error();
953 auto res = writer_->append(data, size);
954 if (!res)
return res;
955 ++segment_record_count_;
966 return append(
reinterpret_cast<const uint8_t*
>(data), size);
973 return append(
reinterpret_cast<const uint8_t*
>(sv.data()), sv.size());
979 std::lock_guard<std::mutex> lk(mu_);
993 std::lock_guard<std::mutex> lk(mu_);
994 if (!opts_.require_checkpoint_before_prune) {
995 prune_checkpoint_ready_ =
true;
999 if (opts_.checkpoint_manifest_path.empty()) {
1002 prune_checkpoint_ready_ =
true;
1006 auto wr = write_manifest_atomic(opts_.checkpoint_manifest_path, note);
1008 prune_checkpoint_ready_ =
true;
1015 std::lock_guard<std::mutex> lk(mu_);
1016 prune_checkpoint_ready_ = !opts_.require_checkpoint_before_prune;
1028 std::lock_guard<std::mutex> lk(mu_);
1029 if (opts_.require_checkpoint_before_prune && !prune_checkpoint_ready_)
1030 return Error{ErrorCode::IO_ERROR,
1031 "WalManager: prune blocked until compaction checkpoint is committed"};
1033 if (!segments_.empty() && segments_.back() == path)
1034 return Error{ErrorCode::IO_ERROR,
1035 "WalManager: cannot remove active segment"};
1038 std::filesystem::remove(path, ec);
1039 if (ec && ec != std::make_error_code(std::errc::no_such_file_or_directory))
1040 return Error{ErrorCode::IO_ERROR,
1041 std::string(
"WalManager: remove '") + path +
"': " + ec.
message()};
1043 auto it = std::find(segments_.begin(), segments_.end(), path);
1044 if (it != segments_.end()) segments_.erase(it);
1050 std::lock_guard<std::mutex> lk(mu_);
1051 return total_records_;
1057 std::lock_guard<std::mutex> lk(mu_);
1058 if (closed_)
return {};
1061 auto close_result = writer_->close();
1062 if (!close_result)
return close_result.error();
1074 std::lock_guard<std::mutex> lk(mu_);
1076 if (writer_ && !closed_) {
1077 auto flush_result = writer_->flush(
false);
1078 if (!flush_result)
return flush_result.error();
1080 std::vector<WalEntry> result;
1081 for (
const auto& seg : segments_) {
1082 auto rd = WalReader::open(seg,
1083 opts_.strict_replay ? WalReader::ValidationMode::Strict
1084 : WalReader::ValidationMode::BestEffort);
1086 if (opts_.strict_replay)
return rd.error();
1089 auto entries = rd->read_all();
1091 if (opts_.strict_replay)
return entries.error();
1094 for (
auto& e : *entries)
1095 result.push_back(std::move(e));
1103 static expected<void> sync_parent_directory(
const std::string& file_path) {
1105 std::filesystem::path p(file_path);
1106 std::filesystem::path parent = p.parent_path();
1107 if (parent.empty())
return {};
1109 int dfd = ::open(parent.string().c_str(), O_RDONLY);
1111 return Error{ErrorCode::IO_ERROR,
1112 std::string(
"WalManager: open parent dir failed: ") + std::strerror(errno)};
1114 if (::fsync(dfd) != 0) {
1117 return Error{ErrorCode::IO_ERROR,
1118 std::string(
"WalManager: fsync parent dir failed: ") + std::strerror(e)};
1125 static expected<void> write_manifest_atomic(
const std::string& path,
1126 const std::string& note) {
1127 std::filesystem::path p(path);
1129 return Error{ErrorCode::IO_ERROR,
1130 "WalManager: checkpoint_manifest_path is empty"};
1133 std::error_code mk_ec;
1134 auto parent = p.parent_path();
1135 if (!parent.empty())
1136 std::filesystem::create_directories(parent, mk_ec);
1138 return Error{ErrorCode::IO_ERROR,
1139 std::string(
"WalManager: cannot create manifest dir: ") + mk_ec.message()};
1142 const std::string tmp = path +
".tmp." + std::to_string(detail::now_ns());
1145 f = std::fopen(tmp.c_str(),
"wb");
1148 int mfd = ::open(tmp.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0600);
1149 if (mfd >= 0) f = ::fdopen(mfd,
"wb");
1152 return Error{ErrorCode::IO_ERROR,
1153 std::string(
"WalManager: cannot open temp manifest '") + tmp +
"': " + std::strerror(errno)};
1156 std::string body =
"checkpoint_ns=" + std::to_string(detail::now_ns()) +
"\n";
1157 if (!note.empty()) {
1158 body +=
"note=" + note +
"\n";
1161 if (std::fwrite(body.data(), 1, body.size(), f) != body.size()) {
1163 std::remove(tmp.c_str());
1164 return Error{ErrorCode::IO_ERROR,
1165 std::string(
"WalManager: write manifest failed: ") + std::strerror(errno)};
1167 if (std::fflush(f) != 0) {
1169 std::remove(tmp.c_str());
1170 return Error{ErrorCode::IO_ERROR,
1171 std::string(
"WalManager: fflush manifest failed: ") + std::strerror(errno)};
1173 if (detail::full_fsync(::fileno(f)) != 0) {
1175 std::remove(tmp.c_str());
1176 return Error{ErrorCode::IO_ERROR,
1177 std::string(
"WalManager: fsync manifest failed: ") + std::strerror(errno)};
1181 std::error_code mv_ec;
1182 std::filesystem::rename(tmp, path, mv_ec);
1184 std::remove(tmp.c_str());
1185 return Error{ErrorCode::IO_ERROR,
1186 std::string(
"WalManager: atomic rename manifest failed: ") + mv_ec.message()};
1189 auto sync_res = sync_parent_directory(path);
1190 if (!sync_res)
return sync_res;
1195 expected<void> roll_locked() {
1197 if (opts_.sync_on_roll) {
1198 (void)writer_->flush(
true);
1200 (void)writer_->close();
1203 WalWriter::Options wopts;
1204 wopts.sync_on_append = opts_.sync_on_append;
1205 wopts.sync_on_flush = opts_.sync_on_roll;
1206 wopts.start_seq = global_seq_;
1208 std::string seg_path = make_segment_path(dir_, opts_, global_seq_);
1209 auto writer = WalWriter::open(seg_path, wopts);
1210 if (!writer)
return writer.error();
1212 writer_ = std::make_unique<WalWriter>(std::move(writer.value()));
1213 segments_.push_back(seg_path);
1214 segment_record_count_ = 0;
1219 static std::string make_segment_path(
const std::string& dir,
1220 const Options& opts,
1221 int64_t seq_start) {
1223 struct timespec ts{};
1225 timespec_get(&ts, TIME_UTC);
1227 ::clock_gettime(CLOCK_REALTIME, &ts);
1229 int64_t ts_ms =
static_cast<int64_t
>(ts.tv_sec) * 1000LL
1230 + ts.tv_nsec / 1'000'000LL;
1233 std::string name = dir +
"/" + opts.file_prefix
1234 +
"_" + std::to_string(seq_start)
1235 +
"_" + std::to_string(ts_ms)
1243 static std::vector<std::string> scan_segments(
const std::string& dir,
1244 const Options& opts) {
1245 std::vector<std::pair<int64_t, std::string>> found;
1247 for (
const auto& entry : std::filesystem::directory_iterator(dir, ec)) {
1249 if (!entry.is_regular_file())
continue;
1250 const std::string fname = entry.path().filename().string();
1252 const std::string pfx = opts.file_prefix +
"_";
1253 if (fname.size() <= pfx.size() + opts.file_ext.size())
continue;
1254 if (fname.substr(0, pfx.size()) != pfx)
continue;
1255 if (fname.substr(fname.size() - opts.file_ext.size()) != opts.file_ext)
continue;
1259 size_t p1 = pfx.size();
1260 size_t ext_pos = fname.size() - opts.file_ext.size();
1261 if (ext_pos <= p1)
continue;
1262 size_t p2 = fname.find(
'_', p1);
1264 std::string seq_str;
1265 if (p2 == std::string::npos || p2 > ext_pos) {
1266 seq_str = fname.substr(p1, ext_pos - p1);
1268 seq_str = fname.substr(p1, p2 - p1);
1271 int64_t seq_start = 0;
1272 try { seq_start = std::stoll(seq_str); }
catch (...) {
continue; }
1273 found.emplace_back(seq_start, entry.path().string());
1275 std::sort(found.begin(), found.end(),
1276 [](
const auto& a,
const auto& b) { return a.first < b.first; });
1277 std::vector<std::string> paths;
1278 paths.reserve(found.size());
1279 for (
auto& [seq, p] : found) paths.push_back(std::move(p));
1285 std::vector<std::string> segments_;
1286 std::unique_ptr<WalWriter> writer_;
1287 int64_t global_seq_ = 0;
1288 int64_t segment_record_count_ = 0;
1289 int64_t total_records_ = 0;
1290 bool prune_checkpoint_ready_ =
true;
1291 bool closed_ =
false;
1292 mutable std::mutex mu_;
1304#ifndef __EMSCRIPTEN__
Manages multiple rolling WAL segment files in a directory.
WalManager & operator=(const WalManager &)=delete
WalManager(const WalManager &)=delete
static expected< WalManager > open(const std::string &dir, Options opts={})
Open a WAL directory, discovering existing segments and creating a new active segment ready for writi...
expected< void > commit_compaction_checkpoint(const std::string ¬e="")
Record that an external compaction checkpoint has been made durable, allowing old fully-compacted WAL...
WalManager(WalManager &&o) noexcept
expected< std::vector< WalEntry > > read_all()
Read all WAL entries across all segments (sealed and active).
WalManager & operator=(WalManager &&)=delete
expected< void > remove_segment(const std::string &path)
Remove a fully-compacted segment from disk and from the tracking list.
int64_t total_records() const noexcept
Total number of records written across all segments (including rolled ones).
expected< int64_t > append(const uint8_t *data, size_t size)
Append a record to the current segment, rolling to a new segment if needed.
std::vector< std::string > segment_paths() const
List all WAL segment paths in sequence order (oldest first).
void clear_compaction_checkpoint()
Reset the checkpoint-ready flag so subsequent prune calls are blocked again until the next commit_com...
expected< int64_t > append(const char *data, size_t size)
Append a record from a char pointer.
expected< void > close()
Close the manager and the active segment writer.
expected< int64_t > append(std::string_view sv)
Append a record from a string_view.
Sequential WAL file reader for crash recovery and replay.
WalReader & operator=(WalReader &&)=delete
static expected< WalReader > open(const std::string &path, ValidationMode mode=ValidationMode::BestEffort)
Open a WAL file for sequential reading.
expected< std::optional< WalEntry > > next()
Read the next WAL entry from the file.
size_t offset() const noexcept
Current byte offset in the file (past the last read record).
WalReader(WalReader &&o) noexcept
WalReader(const WalReader &)=delete
void close()
Close the underlying file handle.
WalReader & operator=(const WalReader &)=delete
int64_t count() const noexcept
Number of records successfully read so far.
int64_t last_seq() const noexcept
Sequence number of the last successfully read record (-1 if none).
expected< std::vector< WalEntry > > read_all()
Read all valid entries from the current position to end-of-valid-data.
Append-only Write-Ahead Log writer with CRC-32 integrity per record.
WalWriter(WalWriter &&o) noexcept
expected< int64_t > append(const std::vector< uint8_t > &data)
Append a record from a byte vector.
static expected< WalWriter > open(const std::string &path, Options opts={})
Open or create a WAL file for appending.
WalWriter & operator=(WalWriter &&)=delete
const std::string & path() const noexcept
Filesystem path of the WAL file.
int64_t bytes_written() const noexcept
Total bytes written to this WAL file (header + all records).
uint64_t rejected_empty_count() const noexcept
Number of empty records rejected (CWE-754).
int64_t next_seq() const noexcept
The sequence number that will be assigned to the next appended record.
expected< int64_t > append(const char *data, size_t size)
Append a record from a char pointer.
WalWriterOptions Options
Alias for the options struct.
WalWriter(const WalWriter &)=delete
WalWriter & operator=(const WalWriter &)=delete
expected< int64_t > append(const uint8_t *data, size_t size)
Append a raw-byte record to the WAL.
expected< int64_t > append(std::string_view sv)
Append a record from a string_view.
expected< void > close()
Seal the WAL file: flush buffered data, fsync, and close the file handle.
expected< void > flush(bool do_fsync=false)
Flush the stdio buffer to the OS page cache.
bool is_open() const noexcept
True if the writer is open and accepting appends.
A lightweight result type that holds either a success value of type T or an Error.
uint32_t read_le32(const uint8_t *src) noexcept
Read a 32-bit unsigned integer from little-endian byte order.
uint32_t crc32(const void *data, size_t length) noexcept
Compute CRC-32 over a contiguous byte buffer (polynomial 0xEDB88320).
int full_fsync(int fd) noexcept
Force durable flush to storage media.
int64_t now_ns() noexcept
Return nanoseconds since Unix epoch (cross-platform).
void write_le32(uint8_t *dst, uint32_t v) noexcept
Write a 32-bit unsigned integer in little-endian byte order.
uint32_t crc32_combine(uint32_t crc_a, const void *data_b, size_t len_b) noexcept
Combine two CRC regions without concatenating buffers.
void write_le64(uint8_t *dst, uint64_t v) noexcept
Write a 64-bit unsigned integer in little-endian byte order.
uint64_t read_le64(const uint8_t *src) noexcept
Read a 64-bit unsigned integer from little-endian byte order.
WalLifecycleMode
Controls safety guardrails for WAL segment lifecycle operations.
@ Development
Permissive: allows reset_on_open.
@ Benchmark
Same as Development, for benchmark harnesses.
@ Production
Strict: reset_on_open is rejected.
@ IO_ERROR
A file-system or stream I/O operation failed (open, read, write, rename).
@ INVALID_ARGUMENT
A caller-supplied argument is outside the valid range or violates a precondition.
Lightweight error value carrying an ErrorCode and a human-readable message.
std::string message
A human-readable description of what went wrong (may be empty for OK).
A single decoded WAL record returned by WalReader::next() or read_all().
int64_t seq
Sequence number (0-based, monotonically increasing)
std::vector< uint8_t > payload
Raw record bytes (application-defined content)
int64_t timestamp_ns
Wall-clock timestamp in nanoseconds since Unix epoch.
Configuration options for WalManager::open().
std::string checkpoint_manifest_path
Path for atomic checkpoint manifest file.
Configuration options for WalWriter::open().
int64_t start_seq
First sequence number for brand-new files.
bool sync_on_flush
If true, fsync on explicit flush() calls.
size_t buffer_size
stdio setvbuf buffer size in bytes
bool sync_on_append
If true, fsync after every record append.
Parquet format enumerations, type traits, and statistics structs.
Cross-platform memory-mapped WAL segment and ring writer.