Signet Forge 0.1.0
C++20 Parquet library with AI-native extensions
DEMO
Loading...
Searching...
No Matches
human_oversight.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright 2026 Johnson Ogundeji
3// See LICENSE_COMMERCIAL for full terms.
4#pragma once
5
6#if !defined(SIGNET_ENABLE_COMMERCIAL) || !SIGNET_ENABLE_COMMERCIAL
7#error "signet/ai/human_oversight.hpp requires SIGNET_ENABLE_COMMERCIAL=ON (AGPL-3.0 commercial tier). See LICENSE_COMMERCIAL."
8#endif
9
10// ---------------------------------------------------------------------------
11// human_oversight.hpp -- EU AI Act Article 14 Human Oversight Implementation
12//
13// Provides a complete human oversight layer for high-risk AI systems:
14//
15// Art.14(1): Systems designed to allow effective human oversight
16// Art.14(2): Ability to fully understand capabilities and limitations
17// Art.14(3): Ability to correctly interpret outputs
18// Art.14(4): Ability to override, interrupt, or halt the system ("stop button")
19// Art.14(5): All override events logged with full provenance
20//
21// Components:
22// - OverrideSource / OverrideAction / HaltReason enums
23// - HumanOverrideRecord: captures each human intervention event
24// - SystemHaltRecord: captures system halt ("stop button") events
25// - HumanOverrideLogWriter: Parquet writer with hash chaining
26// - HumanOverrideLogReader: reader with chain verification
27// - OverrideRateMonitor: sliding-window override frequency tracker
28//
29// Header-only. Part of the signet::forge AI module.
30// ---------------------------------------------------------------------------
31
34#include "signet/error.hpp"
35#include "signet/schema.hpp"
36#include "signet/types.hpp"
37#include "signet/writer.hpp"
38#include "signet/reader.hpp"
39
40#include <algorithm>
41#include <cctype>
42#include <cstdint>
43#include <cstring>
44#include <deque>
45#include <filesystem>
46#include <functional>
47#include <mutex>
48#include <stdexcept>
49#include <string>
50#include <vector>
51
52namespace signet::forge {
53
54// ---------------------------------------------------------------------------
55// Enumerations
56// ---------------------------------------------------------------------------
57
60enum class OverrideSource : int32_t {
61 ALGORITHMIC = 0,
62 HUMAN = 1,
63 AUTOMATED = 2,
64};
65
67enum class OverrideAction : int32_t {
68 APPROVE = 0,
69 MODIFY = 1,
70 REJECT = 2,
71 ESCALATE = 3,
72 HALT = 4,
73};
74
76enum class HaltReason : int32_t {
77 MANUAL = 0,
80 REGULATORY = 3,
81 MAINTENANCE = 4,
82 EXTERNAL = 5,
83};
84
85// ---------------------------------------------------------------------------
86// HumanOverrideRecord
87// ---------------------------------------------------------------------------
88
95 int64_t timestamp_ns{0};
96 std::string operator_id;
97 std::string operator_role;
100 std::string system_id;
101
102 // --- Original AI output context ---
104 std::string original_output;
106
107 // --- Override details ---
108 std::string override_output;
109 std::string rationale;
110 int32_t urgency{0};
111
112 // --- Serialization ---
113
114 [[nodiscard]] inline std::vector<uint8_t> serialize() const {
115 std::vector<uint8_t> buf;
116 buf.reserve(256);
117
118 append_le64(buf, static_cast<uint64_t>(timestamp_ns));
119 append_string(buf, operator_id);
120 append_string(buf, operator_role);
121 append_le32(buf, static_cast<uint32_t>(source));
122 append_le32(buf, static_cast<uint32_t>(action));
123 append_string(buf, system_id);
124 append_string(buf, original_decision_id);
125 append_string(buf, original_output);
126 append_float(buf, original_confidence);
127 append_string(buf, override_output);
128 append_string(buf, rationale);
129 append_le32(buf, static_cast<uint32_t>(urgency));
130
131 return buf;
132 }
133
134 [[nodiscard]] static inline expected<HumanOverrideRecord> deserialize(
135 const uint8_t* data, size_t size) {
136 size_t offset = 0;
138
139 if (!read_le64(data, size, offset, rec.timestamp_ns))
140 return Error{ErrorCode::CORRUPT_PAGE, "HumanOverrideRecord: truncated timestamp_ns"};
141 if (!read_string(data, size, offset, rec.operator_id))
142 return Error{ErrorCode::CORRUPT_PAGE, "HumanOverrideRecord: truncated operator_id"};
143 if (!read_string(data, size, offset, rec.operator_role))
144 return Error{ErrorCode::CORRUPT_PAGE, "HumanOverrideRecord: truncated operator_role"};
145
146 int32_t src_val = 0;
147 if (!read_le32(data, size, offset, src_val))
148 return Error{ErrorCode::CORRUPT_PAGE, "HumanOverrideRecord: truncated source"};
149 rec.source = static_cast<OverrideSource>(src_val);
150 if (src_val < 0 || src_val > 2)
152
153 int32_t act_val = 0;
154 if (!read_le32(data, size, offset, act_val))
155 return Error{ErrorCode::CORRUPT_PAGE, "HumanOverrideRecord: truncated action"};
156 rec.action = static_cast<OverrideAction>(act_val);
157 if (act_val < 0 || act_val > 4)
159
160 if (!read_string(data, size, offset, rec.system_id))
161 return Error{ErrorCode::CORRUPT_PAGE, "HumanOverrideRecord: truncated system_id"};
162 if (!read_string(data, size, offset, rec.original_decision_id))
163 return Error{ErrorCode::CORRUPT_PAGE, "HumanOverrideRecord: truncated original_decision_id"};
164 if (!read_string(data, size, offset, rec.original_output))
165 return Error{ErrorCode::CORRUPT_PAGE, "HumanOverrideRecord: truncated original_output"};
166 if (!read_float(data, size, offset, rec.original_confidence))
167 return Error{ErrorCode::CORRUPT_PAGE, "HumanOverrideRecord: truncated original_confidence"};
168 if (!read_string(data, size, offset, rec.override_output))
169 return Error{ErrorCode::CORRUPT_PAGE, "HumanOverrideRecord: truncated override_output"};
170 if (!read_string(data, size, offset, rec.rationale))
171 return Error{ErrorCode::CORRUPT_PAGE, "HumanOverrideRecord: truncated rationale"};
172
173 int32_t urg_val = 0;
174 if (!read_le32(data, size, offset, urg_val))
175 return Error{ErrorCode::CORRUPT_PAGE, "HumanOverrideRecord: truncated urgency"};
176 rec.urgency = urg_val;
177
178 return rec;
179 }
180
181private:
182 // -- Serialization helpers (same as DecisionRecord) -----------------------
183
184 static inline void append_le32(std::vector<uint8_t>& buf, uint32_t v) {
185 buf.push_back(static_cast<uint8_t>(v));
186 buf.push_back(static_cast<uint8_t>(v >> 8));
187 buf.push_back(static_cast<uint8_t>(v >> 16));
188 buf.push_back(static_cast<uint8_t>(v >> 24));
189 }
190
191 static inline void append_le64(std::vector<uint8_t>& buf, uint64_t v) {
192 for (int i = 0; i < 8; ++i)
193 buf.push_back(static_cast<uint8_t>(v >> (i * 8)));
194 }
195
196 static inline void append_float(std::vector<uint8_t>& buf, float v) {
197 uint32_t bits;
198 std::memcpy(&bits, &v, 4);
199 append_le32(buf, bits);
200 }
201
202 static inline void append_string(std::vector<uint8_t>& buf, const std::string& s) {
203 const size_t clamped = (std::min)(s.size(), static_cast<size_t>(UINT32_MAX));
204 append_le32(buf, static_cast<uint32_t>(clamped));
205 buf.insert(buf.end(), s.begin(), s.begin() + static_cast<ptrdiff_t>(clamped));
206 }
207
208 // -- Deserialization helpers -----------------------------------------------
209
210 static inline bool read_le64(const uint8_t* data, size_t size, size_t& offset, int64_t& out) {
211 if (offset + 8 > size) return false;
212 uint64_t v = 0;
213 for (int i = 0; i < 8; ++i)
214 v |= static_cast<uint64_t>(data[offset + i]) << (i * 8);
215 out = static_cast<int64_t>(v);
216 offset += 8;
217 return true;
218 }
219
220 static inline bool read_le32(const uint8_t* data, size_t size, size_t& offset, int32_t& out) {
221 if (offset + 4 > size) return false;
222 uint32_t v = 0;
223 for (int i = 0; i < 4; ++i)
224 v |= static_cast<uint32_t>(data[offset + i]) << (i * 8);
225 out = static_cast<int32_t>(v);
226 offset += 4;
227 return true;
228 }
229
230 static inline bool read_float(const uint8_t* data, size_t size, size_t& offset, float& out) {
231 if (offset + 4 > size) return false;
232 uint32_t bits = 0;
233 for (int i = 0; i < 4; ++i)
234 bits |= static_cast<uint32_t>(data[offset + i]) << (i * 8);
235 std::memcpy(&out, &bits, 4);
236 offset += 4;
237 return true;
238 }
239
240 static inline bool read_string(const uint8_t* data, size_t size, size_t& offset, std::string& out) {
241 int32_t len = 0;
242 if (!read_le32(data, size, offset, len)) return false;
243 if (len < 0) return false;
244 auto ulen = static_cast<size_t>(len);
245 if (offset + ulen > size) return false;
246 out.assign(reinterpret_cast<const char*>(data + offset), ulen);
247 offset += ulen;
248 return true;
249 }
250};
251
252// ---------------------------------------------------------------------------
253// Parquet schema for human override logs
254// ---------------------------------------------------------------------------
255
259[[nodiscard]] inline Schema human_override_log_schema() {
260 return Schema::builder("human_override_log")
261 .column<int64_t>("timestamp_ns", LogicalType::TIMESTAMP_NS)
262 .column<std::string>("operator_id")
263 .column<std::string>("operator_role")
264 .column<int32_t>("source") // OverrideSource enum
265 .column<int32_t>("action") // OverrideAction enum
266 .column<std::string>("system_id")
267 .column<std::string>("original_decision_id")
268 .column<std::string>("original_output")
269 .column<double>("original_confidence")
270 .column<std::string>("override_output")
271 .column<std::string>("rationale")
272 .column<int32_t>("urgency")
273 .column<int64_t>("chain_seq")
274 .column<std::string>("chain_hash")
275 .column<std::string>("prev_hash")
276 .column<int64_t>("row_id")
277 .column<int32_t>("row_version")
278 .column<std::string>("row_origin_file")
279 .column<std::string>("row_prev_hash")
280 .build();
281}
282
283// ---------------------------------------------------------------------------
284// OverrideRateMonitor
285// ---------------------------------------------------------------------------
286
292 int64_t window_ns = INT64_C(3600000000000);
293
296 int64_t alert_threshold = 10;
297
300};
301
310public:
311 using AlertCallback = std::function<void(int64_t override_count, int64_t window_ns)>;
312 using HaltCallback = std::function<void(HaltReason reason, const std::string& detail)>;
313
315 : opts_(std::move(opts)) {}
316
319 std::lock_guard<std::mutex> lock(mu_);
320 alert_cb_ = std::move(cb);
321 }
322
325 std::lock_guard<std::mutex> lock(mu_);
326 halt_cb_ = std::move(cb);
327 }
328
331 int64_t record_override(int64_t timestamp_ns) {
332 std::lock_guard<std::mutex> lock(mu_);
333
334 // Reject out-of-order timestamps to maintain deque invariant
335 if (!timestamps_.empty() && timestamp_ns < timestamps_.back()) {
336 return -1; // error: out-of-order timestamp
337 }
338
339 timestamps_.push_back(timestamp_ns);
340 evict_old(timestamp_ns);
341
342 auto count = static_cast<int64_t>(timestamps_.size());
343
344 if (count >= opts_.alert_threshold) {
345 if (alert_cb_) {
346 alert_cb_(count, opts_.window_ns);
347 }
348 if (opts_.auto_halt_on_threshold && halt_cb_) {
350 "Override rate " + std::to_string(count) +
351 " exceeded threshold " + std::to_string(opts_.alert_threshold));
352 }
353 }
354
355 return count;
356 }
357
359 [[nodiscard]] int64_t current_count(int64_t now_ns) {
360 std::lock_guard<std::mutex> lock(mu_);
361 evict_old(now_ns);
362 return static_cast<int64_t>(timestamps_.size());
363 }
364
366 void trigger_halt(HaltReason reason, const std::string& detail = "") {
367 std::lock_guard<std::mutex> lock(mu_);
368 if (halt_cb_) {
369 halt_cb_(reason, detail);
370 }
371 }
372
374 [[nodiscard]] const OverrideRateMonitorOptions& options() const noexcept { return opts_; }
375
376private:
377 void evict_old(int64_t now_ns) {
378 int64_t cutoff = now_ns - opts_.window_ns;
379 while (!timestamps_.empty() && timestamps_.front() < cutoff) {
380 timestamps_.pop_front();
381 }
382 }
383
384 OverrideRateMonitorOptions opts_;
385 std::deque<int64_t> timestamps_;
386 AlertCallback alert_cb_;
387 HaltCallback halt_cb_;
388 std::mutex mu_;
389};
390
391// ---------------------------------------------------------------------------
392// HumanOverrideLogWriter
393// ---------------------------------------------------------------------------
394
412public:
417 inline HumanOverrideLogWriter(const std::string& output_dir,
418 const std::string& chain_id = "",
419 size_t max_records = 100000)
420 : output_dir_(output_dir)
421 , chain_id_(chain_id.empty() ? generate_chain_id() : chain_id)
422 , max_records_(max_records)
423 , schema_(human_override_log_schema())
424 , lineage_tracker_(chain_id.empty() ? chain_id_ : chain_id, 1)
425 {
426 auto license = commercial::require_feature("HumanOverrideLogWriter");
427 if (!license) {
428 throw std::runtime_error(license.error().message);
429 }
430
431 // Validate output_dir (CWE-22: Path Traversal)
432 if (output_dir_.empty())
433 throw std::invalid_argument("HumanOverrideLogWriter: output_dir must not be empty");
434 for (size_t s = 0, e; s <= output_dir_.size(); s = e + 1) {
435 e = output_dir_.find_first_of("/\\", s);
436 if (e == std::string::npos) e = output_dir_.size();
437 if (output_dir_.substr(s, e - s) == "..")
438 throw std::invalid_argument(
439 "HumanOverrideLogWriter: output_dir must not contain '..' path traversal");
440 }
441 // Validate chain_id: [a-zA-Z0-9_-]+
442 for (char c : chain_id_) {
443 if (!std::isalnum(static_cast<unsigned char>(c)) && c != '_' && c != '-')
444 throw std::invalid_argument(
445 "HumanOverrideLogWriter: chain_id must only contain [a-zA-Z0-9_-]");
446 }
447 }
448
450 [[nodiscard]] inline expected<HashChainEntry> log(const HumanOverrideRecord& record) {
451 std::lock_guard<std::mutex> lock(write_mutex_);
452 auto usage = commercial::record_usage_rows("HumanOverrideLogWriter::log", 1);
453 if (!usage) return usage.error();
454
455 auto data = record.serialize();
456
457 int64_t ts = record.timestamp_ns;
458 if (ts == 0) ts = now_ns();
459
460 auto entry = chain_.append(data.data(), data.size(), ts);
461
462 pending_records_.push_back(record);
463 pending_entries_.push_back(entry);
464 pending_data_.push_back(std::move(data));
465
466 if (pending_records_.size() >= max_records_) {
467 auto result = flush_unlocked();
468 if (!result) return result.error();
469 }
470
471 ++total_records_;
472 return entry;
473 }
474
476 [[nodiscard]] inline expected<void> flush() {
477 std::lock_guard<std::mutex> lock(write_mutex_);
478 return flush_unlocked();
479 }
480
482 [[nodiscard]] inline expected<void> close() {
483 std::lock_guard<std::mutex> lock(write_mutex_);
484 if (!pending_records_.empty()) {
485 return flush_unlocked();
486 }
487 return expected<void>{};
488 }
489
490private:
491 [[nodiscard]] inline expected<void> flush_unlocked() {
492 if (pending_records_.empty()) {
493 return expected<void>{};
494 }
495
496 int64_t start_seq = pending_entries_.front().sequence_number;
497 int64_t end_seq = pending_entries_.back().sequence_number;
498
499 current_file_path_ = output_dir_ + "/human_override_log_" + chain_id_ + "_"
500 + std::to_string(start_seq) + "_"
501 + std::to_string(end_seq) + ".parquet";
502
503 WriterOptions opts;
504 opts.created_by = "SignetStack signet-forge human_override_log v1.0";
505
506 auto meta = current_metadata();
507 auto meta_kvs = meta.to_key_values();
508 for (auto& [k, v] : meta_kvs) {
509 opts.file_metadata.push_back(thrift::KeyValue(std::move(k), std::move(v)));
510 }
511
512 auto writer_result = ParquetWriter::open(current_file_path_, schema_, opts);
513 if (!writer_result) return writer_result.error();
514 auto& writer = *writer_result;
515
516 size_t n = pending_records_.size();
517 for (size_t i = 0; i < n; ++i) {
518 const auto& rec = pending_records_[i];
519 const auto& entry = pending_entries_[i];
520 const auto& row_data = pending_data_[i];
521 auto lineage = lineage_tracker_.next(row_data.data(), row_data.size());
522
523 std::vector<std::string> row;
524 row.reserve(19);
525
526 row.push_back(std::to_string(rec.timestamp_ns));
527 row.push_back(rec.operator_id);
528 row.push_back(rec.operator_role);
529 row.push_back(std::to_string(static_cast<int32_t>(rec.source)));
530 row.push_back(std::to_string(static_cast<int32_t>(rec.action)));
531 row.push_back(rec.system_id);
532 row.push_back(rec.original_decision_id);
533 row.push_back(rec.original_output);
534 row.push_back(double_to_string(static_cast<double>(rec.original_confidence)));
535 row.push_back(rec.override_output);
536 row.push_back(rec.rationale);
537 row.push_back(std::to_string(rec.urgency));
538 row.push_back(std::to_string(entry.sequence_number));
539 row.push_back(hash_to_hex(entry.entry_hash));
540 row.push_back(hash_to_hex(entry.prev_hash));
541 row.push_back(std::to_string(lineage.row_id));
542 row.push_back(std::to_string(lineage.row_version));
543 row.push_back(lineage.row_origin_file);
544 row.push_back(lineage.row_prev_hash);
545
546 auto write_result = writer.write_row(row);
547 if (!write_result) return write_result.error();
548 }
549
550 auto close_result = writer.close();
551 if (!close_result) return close_result.error();
552
553 pending_records_.clear();
554 pending_entries_.clear();
555 pending_data_.clear();
556 ++file_count_;
557
558 return expected<void>{};
559 }
560
561public:
563 [[nodiscard]] inline AuditMetadata current_metadata() const {
564 AuditMetadata meta;
565 meta.chain_id = chain_id_;
566
567 if (!pending_entries_.empty()) {
568 meta.start_sequence = pending_entries_.front().sequence_number;
569 meta.end_sequence = pending_entries_.back().sequence_number;
570 meta.first_hash = hash_to_hex(pending_entries_.front().entry_hash);
571 meta.last_hash = hash_to_hex(pending_entries_.back().entry_hash);
572 meta.prev_file_hash = hash_to_hex(pending_entries_.front().prev_hash);
573 } else if (!chain_.entries().empty()) {
574 const auto& last = chain_.entries().back();
575 meta.start_sequence = last.sequence_number;
576 meta.end_sequence = last.sequence_number;
577 meta.first_hash = hash_to_hex(last.entry_hash);
578 meta.last_hash = hash_to_hex(last.entry_hash);
579 }
580
581 meta.record_count = static_cast<int64_t>(pending_entries_.size());
582 meta.record_type = "human_override";
583 return meta;
584 }
585
586 [[nodiscard]] inline size_t pending_records() const { return pending_records_.size(); }
587 [[nodiscard]] inline int64_t total_records() const { return total_records_; }
588 [[nodiscard]] inline std::string current_file_path() const { return current_file_path_; }
589
590private:
591 std::string output_dir_;
592 std::string chain_id_;
593 size_t max_records_;
594 Schema schema_;
595 AuditChainWriter chain_;
596 std::vector<HumanOverrideRecord> pending_records_;
597 std::vector<HashChainEntry> pending_entries_;
598 std::vector<std::vector<uint8_t>> pending_data_;
599 RowLineageTracker lineage_tracker_;
600 std::string current_file_path_;
601 int64_t total_records_{0};
602 int64_t file_count_{0};
603 mutable std::mutex write_mutex_;
604
605 static inline std::string double_to_string(double v) {
606 char buf[32];
607 std::snprintf(buf, sizeof(buf), "%.17g", v);
608 return buf;
609 }
610};
611
612// ---------------------------------------------------------------------------
613// HumanOverrideLogReader
614// ---------------------------------------------------------------------------
615
623public:
624 [[nodiscard]] static inline expected<HumanOverrideLogReader> open(const std::string& path) {
625 auto license = commercial::require_feature("HumanOverrideLogReader");
626 if (!license) return license.error();
627
628 auto reader_result = ParquetReader::open(path);
629 if (!reader_result) return reader_result.error();
630
632 hlr.reader_ = std::make_unique<ParquetReader>(std::move(*reader_result));
633 hlr.path_ = path;
634
635 auto load_result = hlr.load_columns();
636 if (!load_result) return load_result.error();
637
638 return hlr;
639 }
640
649
652 size_t n = col_timestamp_ns_.size();
653 std::vector<HumanOverrideRecord> records;
654 records.reserve(n);
655
656 for (size_t i = 0; i < n; ++i) {
658 rec.timestamp_ns = col_timestamp_ns_[i];
659 rec.operator_id = col_operator_id_[i];
660 rec.operator_role = col_operator_role_[i];
661 rec.source = static_cast<OverrideSource>(col_source_[i]);
662 if (col_source_[i] < 0 || col_source_[i] > 2)
664 rec.action = static_cast<OverrideAction>(col_action_[i]);
665 if (col_action_[i] < 0 || col_action_[i] > 4)
667 rec.system_id = col_system_id_[i];
668 rec.original_decision_id = col_original_decision_id_[i];
669 rec.original_output = col_original_output_[i];
670 rec.original_confidence = static_cast<float>(col_original_confidence_[i]);
671 rec.override_output = col_override_output_[i];
672 rec.rationale = col_rationale_[i];
673 rec.urgency = col_urgency_[i];
674 records.push_back(std::move(rec));
675 }
676
677 return records;
678 }
679
681 [[nodiscard]] inline expected<AuditMetadata> audit_metadata() const {
682 const auto& kvs = reader_->key_value_metadata();
683 AuditMetadata meta;
684
685 for (const auto& kv : kvs) {
686 if (!kv.value.has_value()) continue;
687 const auto& val = *kv.value;
688
689 if (kv.key == "signetstack.audit.chain_id") meta.chain_id = val;
690 else if (kv.key == "signetstack.audit.first_seq") { try { meta.start_sequence = std::stoll(val); } catch (...) {} }
691 else if (kv.key == "signetstack.audit.last_seq") { try { meta.end_sequence = std::stoll(val); } catch (...) {} }
692 else if (kv.key == "signetstack.audit.first_hash") meta.first_hash = val;
693 else if (kv.key == "signetstack.audit.last_hash") meta.last_hash = val;
694 else if (kv.key == "signetstack.audit.prev_file_hash") meta.prev_file_hash = val;
695 else if (kv.key == "signetstack.audit.record_count") { try { meta.record_count = std::stoll(val); } catch (...) {} }
696 else if (kv.key == "signetstack.audit.record_type") meta.record_type = val;
697 }
698
699 return meta;
700 }
701
704 size_t n = col_timestamp_ns_.size();
705 if (n == 0) {
707 empty_ok.valid = true;
708 empty_ok.entries_checked = 0;
709 empty_ok.error_message = "Empty file — no entries to verify";
710 return empty_ok;
711 }
712
713 // Reconstruct chain entries from stored columns
714 std::vector<HashChainEntry> entries;
715 entries.reserve(n);
716
717 for (size_t i = 0; i < n; ++i) {
718 HashChainEntry entry;
719 entry.sequence_number = col_chain_seq_[i];
720 entry.timestamp_ns = col_timestamp_ns_[i];
721
722 auto eh = hex_to_hash(col_chain_hash_[i]);
723 auto ph = hex_to_hash(col_prev_hash_[i]);
724 if (!eh || !ph) {
726 bad.valid = false;
727 bad.entries_checked = static_cast<int64_t>(i);
728 bad.first_bad_index = static_cast<int64_t>(i);
729 bad.error_message = !eh ? "entry_hash deserialization failed at record "
730 + std::to_string(i)
731 : "prev_hash deserialization failed at record "
732 + std::to_string(i);
733 return bad;
734 }
735 entry.entry_hash = *eh;
736 entry.prev_hash = *ph;
737
738 // Recompute data_hash from the record
740 rec.timestamp_ns = col_timestamp_ns_[i];
741 rec.operator_id = col_operator_id_[i];
742 rec.operator_role = col_operator_role_[i];
743 rec.source = static_cast<OverrideSource>(col_source_[i]);
744 if (col_source_[i] < 0 || col_source_[i] > 2)
746 rec.action = static_cast<OverrideAction>(col_action_[i]);
747 if (col_action_[i] < 0 || col_action_[i] > 4)
749 rec.system_id = col_system_id_[i];
750 rec.original_decision_id = col_original_decision_id_[i];
751 rec.original_output = col_original_output_[i];
752 rec.original_confidence = static_cast<float>(col_original_confidence_[i]);
753 rec.override_output = col_override_output_[i];
754 rec.rationale = col_rationale_[i];
755 rec.urgency = col_urgency_[i];
756
757 auto serialized = rec.serialize();
758 entry.data_hash = crypto::detail::sha256::sha256(serialized.data(), serialized.size());
759
760 entries.push_back(std::move(entry));
761 }
762
763 AuditChainVerifier verifier;
764 return verifier.verify(entries);
765 }
766
768 [[nodiscard]] inline size_t record_count() const { return col_timestamp_ns_.size(); }
769
771 [[nodiscard]] inline const std::string& path() const { return path_; }
772
773private:
774 std::unique_ptr<ParquetReader> reader_;
775 std::string path_;
776
777 // Column data
778 std::vector<int64_t> col_timestamp_ns_;
779 std::vector<std::string> col_operator_id_;
780 std::vector<std::string> col_operator_role_;
781 std::vector<int32_t> col_source_;
782 std::vector<int32_t> col_action_;
783 std::vector<std::string> col_system_id_;
784 std::vector<std::string> col_original_decision_id_;
785 std::vector<std::string> col_original_output_;
786 std::vector<double> col_original_confidence_;
787 std::vector<std::string> col_override_output_;
788 std::vector<std::string> col_rationale_;
789 std::vector<int32_t> col_urgency_;
790 std::vector<int64_t> col_chain_seq_;
791 std::vector<std::string> col_chain_hash_;
792 std::vector<std::string> col_prev_hash_;
793 std::vector<int64_t> col_row_id_;
794 std::vector<int32_t> col_row_version_;
795 std::vector<std::string> col_row_origin_file_;
796 std::vector<std::string> col_row_prev_hash_;
797
798 [[nodiscard]] inline expected<void> load_columns() {
799 int64_t num_rgs = reader_->num_row_groups();
800
801 for (int64_t rg = 0; rg < num_rgs; ++rg) {
802 size_t rg_idx = static_cast<size_t>(rg);
803
804 // Col 0: timestamp_ns (INT64)
805 auto r0 = reader_->read_column<int64_t>(rg_idx, 0);
806 if (!r0) return r0.error();
807 col_timestamp_ns_.insert(col_timestamp_ns_.end(), r0->begin(), r0->end());
808
809 // Col 1: operator_id (STRING)
810 auto r1 = reader_->read_column<std::string>(rg_idx, 1);
811 if (!r1) return r1.error();
812 col_operator_id_.insert(col_operator_id_.end(),
813 std::make_move_iterator(r1->begin()), std::make_move_iterator(r1->end()));
814
815 // Col 2: operator_role (STRING)
816 auto r2 = reader_->read_column<std::string>(rg_idx, 2);
817 if (!r2) return r2.error();
818 col_operator_role_.insert(col_operator_role_.end(),
819 std::make_move_iterator(r2->begin()), std::make_move_iterator(r2->end()));
820
821 // Col 3: source (INT32)
822 auto r3 = reader_->read_column<int32_t>(rg_idx, 3);
823 if (!r3) return r3.error();
824 col_source_.insert(col_source_.end(), r3->begin(), r3->end());
825
826 // Col 4: action (INT32)
827 auto r4 = reader_->read_column<int32_t>(rg_idx, 4);
828 if (!r4) return r4.error();
829 col_action_.insert(col_action_.end(), r4->begin(), r4->end());
830
831 // Col 5: system_id (STRING)
832 auto r5 = reader_->read_column<std::string>(rg_idx, 5);
833 if (!r5) return r5.error();
834 col_system_id_.insert(col_system_id_.end(),
835 std::make_move_iterator(r5->begin()), std::make_move_iterator(r5->end()));
836
837 // Col 6: original_decision_id (STRING)
838 auto r6 = reader_->read_column<std::string>(rg_idx, 6);
839 if (!r6) return r6.error();
840 col_original_decision_id_.insert(col_original_decision_id_.end(),
841 std::make_move_iterator(r6->begin()), std::make_move_iterator(r6->end()));
842
843 // Col 7: original_output (STRING)
844 auto r7 = reader_->read_column<std::string>(rg_idx, 7);
845 if (!r7) return r7.error();
846 col_original_output_.insert(col_original_output_.end(),
847 std::make_move_iterator(r7->begin()), std::make_move_iterator(r7->end()));
848
849 // Col 8: original_confidence (DOUBLE)
850 auto r8 = reader_->read_column<double>(rg_idx, 8);
851 if (!r8) return r8.error();
852 col_original_confidence_.insert(col_original_confidence_.end(), r8->begin(), r8->end());
853
854 // Col 9: override_output (STRING)
855 auto r9 = reader_->read_column<std::string>(rg_idx, 9);
856 if (!r9) return r9.error();
857 col_override_output_.insert(col_override_output_.end(),
858 std::make_move_iterator(r9->begin()), std::make_move_iterator(r9->end()));
859
860 // Col 10: rationale (STRING)
861 auto r10 = reader_->read_column<std::string>(rg_idx, 10);
862 if (!r10) return r10.error();
863 col_rationale_.insert(col_rationale_.end(),
864 std::make_move_iterator(r10->begin()), std::make_move_iterator(r10->end()));
865
866 // Col 11: urgency (INT32)
867 auto r11 = reader_->read_column<int32_t>(rg_idx, 11);
868 if (!r11) return r11.error();
869 col_urgency_.insert(col_urgency_.end(), r11->begin(), r11->end());
870
871 // Col 12: chain_seq (INT64)
872 auto r12 = reader_->read_column<int64_t>(rg_idx, 12);
873 if (!r12) return r12.error();
874 col_chain_seq_.insert(col_chain_seq_.end(), r12->begin(), r12->end());
875
876 // Col 13: chain_hash (STRING)
877 auto r13 = reader_->read_column<std::string>(rg_idx, 13);
878 if (!r13) return r13.error();
879 col_chain_hash_.insert(col_chain_hash_.end(),
880 std::make_move_iterator(r13->begin()), std::make_move_iterator(r13->end()));
881
882 // Col 14: prev_hash (STRING)
883 auto r14 = reader_->read_column<std::string>(rg_idx, 14);
884 if (!r14) return r14.error();
885 col_prev_hash_.insert(col_prev_hash_.end(),
886 std::make_move_iterator(r14->begin()), std::make_move_iterator(r14->end()));
887
888 // Col 15: row_id (INT64) — optional row lineage columns
889 if (reader_->schema().num_columns() > 15) {
890 auto r15 = reader_->read_column<int64_t>(rg_idx, 15);
891 if (!r15) return r15.error();
892 col_row_id_.insert(col_row_id_.end(), r15->begin(), r15->end());
893 }
894
895 // Col 16: row_version (INT32)
896 if (reader_->schema().num_columns() > 16) {
897 auto r16 = reader_->read_column<int32_t>(rg_idx, 16);
898 if (!r16) return r16.error();
899 col_row_version_.insert(col_row_version_.end(), r16->begin(), r16->end());
900 }
901
902 // Col 17: row_origin_file (STRING)
903 if (reader_->schema().num_columns() > 17) {
904 auto r17 = reader_->read_column<std::string>(rg_idx, 17);
905 if (!r17) return r17.error();
906 col_row_origin_file_.insert(col_row_origin_file_.end(),
907 std::make_move_iterator(r17->begin()), std::make_move_iterator(r17->end()));
908 }
909
910 // Col 18: row_prev_hash (STRING)
911 if (reader_->schema().num_columns() > 18) {
912 auto r18 = reader_->read_column<std::string>(rg_idx, 18);
913 if (!r18) return r18.error();
914 col_row_prev_hash_.insert(col_row_prev_hash_.end(),
915 std::make_move_iterator(r18->begin()), std::make_move_iterator(r18->end()));
916 }
917 }
918 return expected<void>{};
919 }
920};
921
922} // namespace signet::forge
Verifies hash chain integrity.
static VerificationResult verify(const uint8_t *chain_data, size_t chain_size)
Verify a chain from serialized bytes.
Builds SHA-256 hash chains during Parquet writes.
const std::vector< HashChainEntry > & entries() const
Return a const reference to the internal entry list.
HashChainEntry append(const uint8_t *record_data, size_t record_size, int64_t timestamp_ns)
Append a record to the chain with an explicit timestamp.
Reads human override log Parquet files and verifies hash chain integrity.
expected< std::vector< HumanOverrideRecord > > read_all() const
Get all override records from the file.
const std::string & path() const
Get the file path.
HumanOverrideLogReader(HumanOverrideLogReader &&)=default
HumanOverrideLogReader & operator=(HumanOverrideLogReader &&)=default
expected< AuditMetadata > audit_metadata() const
Get the audit chain metadata from the Parquet file's key-value metadata.
size_t record_count() const
Get number of records in the file.
HumanOverrideLogReader & operator=(const HumanOverrideLogReader &)=delete
static expected< HumanOverrideLogReader > open(const std::string &path)
HumanOverrideLogReader(const HumanOverrideLogReader &)=delete
AuditChainVerifier::VerificationResult verify_chain() const
Verify the hash chain integrity.
Writes human override events to Parquet files with cryptographic hash chaining for tamper-evident aud...
expected< void > flush()
Flush current records to a Parquet file.
expected< HashChainEntry > log(const HumanOverrideRecord &record)
Log a human override event. Returns the hash chain entry.
expected< void > close()
Close the writer (flushes remaining records).
HumanOverrideLogWriter(const std::string &output_dir, const std::string &chain_id="", size_t max_records=100000)
Create a human override log writer.
AuditMetadata current_metadata() const
Get the chain metadata for the current batch.
Sliding-window override rate monitor — EU AI Act Art.14(5).
int64_t current_count(int64_t now_ns)
Get the current override count within the window.
std::function< void(int64_t override_count, int64_t window_ns)> AlertCallback
void trigger_halt(HaltReason reason, const std::string &detail="")
Manually trigger a system halt (Art.14(4) "stop button").
const OverrideRateMonitorOptions & options() const noexcept
Get the configured options.
void set_alert_callback(AlertCallback cb)
Register a callback for when override rate exceeds threshold.
std::function< void(HaltReason reason, const std::string &detail)> HaltCallback
int64_t record_override(int64_t timestamp_ns)
Record an override event at the given timestamp.
void set_halt_callback(HaltCallback cb)
Register a callback for system halt requests.
OverrideRateMonitor(OverrideRateMonitorOptions opts={})
static expected< ParquetReader > open(const std::filesystem::path &path)
Open and parse a Parquet file, returning a ready-to-query reader.
Definition reader.hpp:189
static expected< ParquetWriter > open(const std::filesystem::path &path, const Schema &schema, const Options &options=Options{})
Open a new Parquet file for writing.
Definition writer.hpp:303
Per-row lineage tracking inspired by Iceberg V3-style data governance.
RowLineage next(const uint8_t *row_data, size_t row_size)
Generate lineage for the next row.
SchemaBuilder & column(std::string col_name, LogicalType logical_type=LogicalType::NONE)
Add a typed column, deducing PhysicalType from T.
Definition schema.hpp:107
Immutable schema description for a Parquet file.
Definition schema.hpp:192
static SchemaBuilder builder(std::string name)
Create a SchemaBuilder for fluent column construction.
Definition schema.hpp:228
const Error & error() const
Access the error payload (valid for both success and failure; check ok() on the returned Error).
Definition error.hpp:261
A lightweight result type that holds either a success value of type T or an Error.
Definition error.hpp:145
const Error & error() const
Access the error payload.
Definition error.hpp:199
std::array< uint8_t, 32 > sha256(const uint8_t *data, size_t size)
Compute SHA-256 hash of arbitrary-length input.
Definition sha256.hpp:165
int64_t now_ns()
Return the current time as nanoseconds since the Unix epoch (UTC).
expected< std::array< uint8_t, 32 > > hex_to_hash(const std::string &hex)
Convert a 64-character lowercase hex string back to a 32-byte hash.
Schema human_override_log_schema()
Build the Parquet schema for human override log files.
HaltReason
Reason for system halt — EU AI Act Art.14(4) "stop button".
@ EXTERNAL
External event (market halt, circuit breaker)
@ ANOMALY_DETECTED
Anomalous behavior detected.
@ REGULATORY
Regulatory or compliance-driven halt.
@ SAFETY_THRESHOLD
Override rate exceeded safety threshold.
@ MAINTENANCE
Scheduled maintenance halt.
@ MANUAL
Operator manually halted the system.
OverrideAction
What action the human override took — EU AI Act Art.14(4).
@ ESCALATE
Human escalated to a higher authority.
@ REJECT
Human rejected the AI system's output entirely.
@ MODIFY
Human modified the AI system's output.
@ HALT
Human triggered system halt ("stop button")
@ APPROVE
Human approved the AI system's output as-is.
@ TIMESTAMP_NS
Timestamp — INT64, nanoseconds since Unix epoch.
std::string hash_to_hex(const std::array< uint8_t, 32 > &hash)
Convert a 32-byte SHA-256 hash to a lowercase hexadecimal string (64 chars).
@ CORRUPT_PAGE
A data page failed integrity checks (bad CRC, truncated, or exceeds size limits).
OverrideSource
Source of a decision or override — EU AI Act Art.14(4).
@ AUTOMATED
Automated safety system override (e.g. risk gate)
@ HUMAN
Human operator override.
@ ALGORITHMIC
Original AI system output (no human intervention)
std::string generate_chain_id()
Generate a simple chain identifier based on the current timestamp.
Per-row lineage tracking (Iceberg V3-style) with monotonic row IDs, mutation versioning,...
Schema definition types: Column<T>, SchemaBuilder, and Schema.
bool valid
True if the entire chain passed all integrity checks.
int64_t first_bad_index
Index of the first entry that failed verification, or -1 if all entries are valid.
int64_t entries_checked
Number of entries that were successfully verified before a failure was detected (or the total count i...
std::string error_message
Human-readable description of the verification outcome.
Chain summary stored in Parquet key-value metadata.
int64_t end_sequence
Sequence number of the last entry in this file's chain segment.
int64_t record_count
Number of audit records in this segment.
std::string last_hash
Hex string of the last entry's entry_hash in this segment.
std::string chain_id
Unique identifier for this chain (generated by generate_chain_id()).
std::string record_type
Record type: "decision", "inference", etc.
std::string first_hash
Hex string of the first entry's entry_hash in this segment.
std::string prev_file_hash
Hex string of the first entry's prev_hash (links to the prior file).
int64_t start_sequence
Sequence number of the first entry in this file's chain segment.
Lightweight error value carrying an ErrorCode and a human-readable message.
Definition error.hpp:101
A single link in the cryptographic hash chain.
int64_t sequence_number
0-indexed position in the chain, monotonically increasing.
std::array< uint8_t, 32 > entry_hash
SHA-256 commitment over (sequence_number, timestamp_ns, prev_hash, data_hash).
int64_t timestamp_ns
Nanoseconds since Unix epoch when this entry was created.
std::array< uint8_t, 32 > prev_hash
SHA-256 hash of the previous entry (all zeros for the first entry, or a user-supplied continuation ha...
std::array< uint8_t, 32 > data_hash
SHA-256 hash of the record/row data that this entry covers.
A single human oversight event with full provenance.
std::string rationale
Human-provided reason for the override (Art.14(5))
std::string system_id
AI system identifier (matches ReportOptions::system_id)
int32_t urgency
Override urgency level (0=routine, 1=elevated, 2=critical)
int64_t timestamp_ns
When the override occurred (ns since epoch)
std::string operator_role
Operator role (e.g. "trader", "risk_officer", "supervisor")
std::string operator_id
Human operator identifier (pseudonymised per GDPR Art.25)
OverrideSource source
Who initiated this action.
std::vector< uint8_t > serialize() const
float original_confidence
AI system's confidence in the original output.
std::string original_output
String representation of the AI system's original output.
static expected< HumanOverrideRecord > deserialize(const uint8_t *data, size_t size)
std::string override_output
The human's replacement output (if action == MODIFY)
OverrideAction action
What action was taken.
std::string original_decision_id
Reference to the DecisionRecord/order_id being overridden.
Options for the override rate monitor.
bool auto_halt_on_threshold
If true, automatically fire the halt callback when threshold is exceeded.
int64_t alert_threshold
Override rate threshold (overrides per window) that triggers an alert.
int64_t window_ns
Sliding window duration for rate calculation (nanoseconds).
Parquet format enumerations, type traits, and statistics structs.