diff --git a/Cargo.lock b/Cargo.lock index e212fa4..91545e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4867,8 +4867,7 @@ dependencies = [ [[package]] name = "saorsa-core" version = "0.26.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa8cc1b7f59f97d018760ff150bbb4f217197c41622b83f7085c9cf0424b736e" +source = "git+https://github.com/WithAutonomi/saorsa-core?branch=feat/trust-quarantine-thresholds#3afe290442df42a7b6ca8989f860d185d1f6f9b4" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 7d08918..88f570a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,8 +41,11 @@ mimalloc = "0.1" # with ant-protocol's re-exports. ant-protocol = "2.2.0" -# Core (provides EVERYTHING: networking, DHT, security, trust, storage) -saorsa-core = "0.26.0" +# Core (provides EVERYTHING: networking, DHT, security, trust, storage). +# Pinned to WithAutonomi/saorsa-core PR #119 (trust quarantine thresholds); +# the matching `[patch.crates-io]` below redirects ant-protocol's transitive +# saorsa-core to the same source so Cargo unifies on one copy. +saorsa-core = { git = "https://github.com/WithAutonomi/saorsa-core", branch = "feat/trust-quarantine-thresholds" } saorsa-pqc = "0.5" # Payment verification - autonomi network lookup + EVM payment @@ -196,3 +199,11 @@ unused_async = "allow" cognitive_complexity = "allow" # Allow non-const functions during initial development (may need runtime features later) missing_const_for_fn = "allow" + +[patch.crates-io] +# Redirect the saorsa-core that ant-protocol (and other crates) pull from +# crates.io onto the same git source as the node's direct dependency, +# eliminating the duplicate 0.26.0 copies (crates.io vs git) that otherwise +# collide on shared types such as `saorsa_core::address::MultiAddr`. +# Tracks WithAutonomi/saorsa-core PR #119 (trust quarantine thresholds). +saorsa-core = { git = "https://github.com/WithAutonomi/saorsa-core", branch = "feat/trust-quarantine-thresholds" } diff --git a/docs/adr/ADR-0003-full-node-detection-and-eviction.md b/docs/adr/ADR-0003-full-node-detection-and-eviction.md new file mode 100644 index 0000000..2da2a5d --- /dev/null +++ b/docs/adr/ADR-0003-full-node-detection-and-eviction.md @@ -0,0 +1,260 @@ +# ADR-0003: Full-node detection, penalisation, and eviction + +- **Status:** Proposed +- **Date:** 2026-06-25 +- **Decision owners:** Mick +- **Reviewers:** +- **Supersedes:** none +- **Superseded by:** none +- **Related:** ant-client ADR-0002 (client-side fallback and diagnostics); ADR-0002 (gossip-triggered storage-commitment audit — shares the trust/eviction path); saorsa-core trust-score eviction (the enforcement layer — already implemented, no change required) + +## Context + +A network design axiom frames this whole ADR: **a full node and a dishonest node are +treated identically.** A node that cannot accept puts is an unhealthy close-group +member and must be evicted from routing; the close group must always be healthy and +accept puts. This holds **only while fullness is a minority within any +neighbourhood** — there must always be healthy peers holding the data when a full one +is shed. A globally near-capacity network is the explicit failure boundary, where both +eviction (nothing healthy to shift to) and client fallback (no willing acceptor) +degrade together. Every mechanism below is designed for the minority-full regime and +must degrade gracefully — not cascade-evict — when that assumption breaks. + +Verified current behaviour: + +- A node rejects a put when its disk is full with a **distinct** + `ProtocolError::StorageFailed`, *before* payment verification + (`src/storage/handler.rs:274-281`; `src/storage/lmdb.rs:599-621`). +- A direct client PUT does **not** reject on the node's own storage-responsibility + view (`src/storage/handler.rs:283-285`). Acceptance is bounded only *indirectly* by + the issuer-in-local-20-closest test (`src/payment/verifier.rs:942-1003`; + `PAID_QUOTE_ISSUER_CLOSENESS_WIDTH = K_BUCKET_SIZE = 20`). +- Fresh replication requires a proof of payment **and** enforces closeness via + `admission::is_responsible(... storage_admission_width = close_group + margin)`, + reusing the same `ClientPut` verification path + (`src/replication/mod.rs:1902-1916, 1987-2007, 2035-2048`). +- ADR-0002 established the principle that misbehaviour must be **attributable** before + eviction is enabled. + +The gap: nothing currently turns "a close peer is full" into an attributable penalty, +and nothing stops a non-close node from accepting a client put and then +mis-attributing the resulting replication failures to honest nodes. + +## Decision Drivers + +- Detect full close peers from **direct, locally-observed, verifiable** signals — + never client hearsay, which is an eclipse/grief vector. +- Never wrongly penalise an honest-but-slow node — mirror ADR-0002's + deterministic-vs-transient split and adaptive grace. +- Every close-group peer is responsible for holding the chunk; this node's failure to + deliver its push does not excuse a peer that lacks the data — all are tested and + penalised alike. +- Keep the close group healthy as full nodes are shed, without losing readability of + data already stored (safe while fullness stays a minority). +- Do not strangle the client fallback (ant-client ADR-0002): the self-closeness gate + width and the fallback ceiling are the same knob viewed from two sides. +- Degrade gracefully at the near-capacity boundary instead of cascade-evicting. + +## Considered Options + +1. **Penalise on client-reported full rejections.** Rejected: unverifiable; lets a + client grief honest nodes into eviction. +2. **Detect fullness only through the existing periodic responsible-chunk audit.** + Rejected as insufficient: too slow to react to a peer that is full at put time, and + it does not observe the fresh-replication path where the failure actually shows up. +3. **Detect during fresh replication, verify possession after a 5–15 minute delay, feed + an attributable penalty into the trust score, evict via saorsa-core's existing + eviction, and gate client-put acceptance on self-closeness (chosen).** + +## Decision + +### Detection + +- When a node fresh-replicates a chunk to its close group, it records **every** close + peer responsible for the chunk and schedules a **delayed possession check** for each + one. The push outcome — accepted, refused with `StorageFailed`, or undelivered — does + not change *who* is checked: the delayed check below is the single, authoritative test + and it runs against the **whole** close group. All events are **locally observed** by + the replicating node itself. +- **Best-effort delivery, but no delivery-based exemption.** The node still tries to + deliver each fresh-replication request and retries that peer **up to 2 times** on a + delivery/transport failure. But delivery success is **not** a precondition for + judgement: a close peer is responsible for holding the chunk regardless of whether + *this* node's push got through (the chunk also reaches it via the client's own puts, + other replicators, and neighbour sync). A peer the push never reached after the + retries is therefore **still tested and still penalised** if it lacks the data — the + same as every other close peer. +- **Delayed verification — 5 to 15 minutes after fresh replication.** The node waits a + randomised interval in `[5 min, 15 min]`, then queries each close peer (a PaidForList + possession query) for whether it actually holds the chunk. The delay (a) gives fresh + replication time to settle, so an honest peer still mid-store is never judged + prematurely, and (b) makes the check a surprise the peer cannot anticipate. +- **Asymmetric, unrewarding scoring.** A peer confirmed to hold the chunk receives + **no positive trust** — storing what it was paid to store is the baseline + expectation, not meritorious. A peer that does **not** hold the chunk receives a + penalty **as severe as a normal AuditChallenge failure** (the same magnitude as the + responsible-chunk / storage-commitment audit penalty). +- Only the observing node's **own direct interaction** produces a penalty. No + third-party report and no client claim ever does. + +### Accounting (reusing the ADR-0002 philosophy) + +- A confirmed **not-present** result after the delayed window is a **deterministic + failure** — re-asking cannot turn a genuine absence into possession — and is + penalised at **AuditChallenge severity on its first occurrence**, exactly as ADR-0002 + acts on deterministic audit failures the first time. The push outcome (accepted, + `StorageFailed`, or undelivered) does not change this — the verdict comes solely from + the possession check, applied identically to every close peer. +- A peer unreachable **when the possession check itself is run** has not yet *yielded* + a verdict — distinct from a peer the replication push failed to reach. The check keeps + the same grace allowance as ADR-0002's audit deadline misses (resets on success, + scales with the network-wide timeout level, never with deterministic failures) and is + **re-attempted until it returns present or absent**. This grace buys time to obtain the + answer, not a way to skip the test: every close peer is ultimately judged on whether it + holds the chunk. +- The score moves in one direction only: storing earns nothing, and only a confirmed + absence moves it — downward. + +### Eviction (provided by saorsa-core — already implemented) + +- The trust-score eviction this plan needs **already exists in saorsa-core**: full-peer + penalties feed the peer's trust score, and a peer below the threshold is immediately + evicted from routing. **No saorsa-core change is required by this ADR** — the + node-side work here only emits the full-peer penalty, at AuditChallenge severity, into + that existing trust system. +- This ADR therefore **depends on, rather than defines,** saorsa-core's existing + threshold/eviction policy, its recovery path (a node that frees capacity can + re-enter), and any near-capacity protection that avoids cascade-evicting a + uniformly-full neighbourhood. These are confirmed here as integration behaviour + (see Validation), not implemented as part of this work. + +### Self-closeness gate on client puts + +- Add a gate so a node accepts a client put only when it considers **itself** within + its local K closest to the address. This makes an accepting node's subsequent + fresh-replication participation legitimate, so it cannot mis-penalise honest nodes — + the concern behind the original "only accept a client put if within the local K + closest" requirement. It replaces today's *indirect* issuer-in-20 bound with an + explicit self-closeness check. +- **Coupling (must design as one knob):** the gate width MUST be **≥ the client + fallback ceiling** (ant-client ADR-0002, bounded by the 20-wide window), or the gate + strangles the fallback it is meant to coexist with. Express both against the same + width and choose them together. + +### Fresh-replication admission during convergence + +A healthy node that *should* hold a chunk can transiently fail to recognise its own +responsibility: while full nodes closer to the key still sit in its routing table +(not yet detected and evicted), they push it past the narrow +`storage_admission_width` (close group + small margin) in its *own* view. With that +narrow window it would then **reject** a fresh-replication offer it ought to accept — +so the replication path that is meant to heal coverage instead refuses it, stalling +convergence. This is also inconsistent: the same node already accepts a **client +PUT** for that key within the wider `K_BUCKET_SIZE` window (both the payment +issuer-closeness check and the self-closeness gate above use it), so a key could land +on the node directly from a client but not via replication. + +**Decision:** widen the **fresh-replication accept** admission to the K-wide +paid-close-group neighbourhood (`paid_list_close_group_size`, equal to +`K_BUCKET_SIZE` = 20) — the same window client PUTs use — so transient view-skew +from un-evicted full nodes no longer causes spurious rejection. A node accepts a +fresh offer when it is within its own local `paid_list_close_group_size`-closest to +the key. Two properties keep this safe: + +- **No extra storage from the accept side.** The sender still fans out only to its + `close_group_size` targets, so widening the *receiver's* accept window adds no + stores — it only stops those targets from rejecting due to view-skew. +- **Retention stays narrow.** Long-term retention/pruning keeps using + `storage_admission_width`, so steady-state replication is still ≈ K. Any transient + over-coverage is reclaimed once the close group converges (full nodes evicted → the + node's view tightens → it sits correctly inside or outside the narrow window), and + the multi-day prune hysteresis comfortably spans the minutes-long eviction window, + so a legitimate replica is never pruned mid-convergence. + +**Sender side (deliberately not widened).** The mirror case — the *sender's* own +close group being mostly full, so its `close_group_size` offers never reach the +healthy nodes ranked beyond the full ones — is left to the existing convergence loop: +the possession check evicts the full nodes, the close group tightens to healthy +members, and the responsible-chunk repair (neighbour sync) fetches the chunk to the +nodes that become responsible. Widening the sender fan-out to `K_BUCKET_SIZE` was +considered and rejected — it would triple fresh-replication fan-out on *every* write +and hold large transient over-replication for the full prune-hysteresis window, for a +case the repair path already heals. + +## Consequences + +### Positive + +- Full peers become attributable and are shed, so the close group self-heals toward + peers that can keep storing. +- Honest nodes are protected three ways: the 5–15 minute settle delay before any check, + the deterministic-vs-transient split, and saorsa-core's adaptive grace — so a peer + still mid-store or briefly slow is never penalised. +- Client fallback acceptance and node-side acceptance stay aligned because the gate and + the fallback ceiling are a single tuned width. + +### Negative / Trade-offs + +- Detection adds replication-time verification cost (an extra possession check per + fresh-replication wave). +- **Enabling eviction is a coordinated, breaking change**: the network must run + detection before eviction can be relied on, consistent with ADR-0002's rollout + gating. +- The near-capacity boundary is a real limitation — mitigated by saorsa-core's existing + safety valve and recovery path, not eliminated. +- The self-closeness gate changes today's permissive client-put acceptance and must be + rolled out in step with the client fallback. + +### Neutral / Operational + +- New node-side tunables: the fresh-replication delivery retry budget (up to 2 per + peer), the post-put verification envelope (the 5–15 minute delay window plus per-check + timeout and re-attempt budget), and the self-closeness gate width. The penalty + magnitude is not new — it reuses the existing AuditChallenge severity. +- Trust score, eviction threshold, recovery, and near-capacity protection are + **already implemented in saorsa-core**; this node only emits penalties into them — + **no saorsa-core change in this work**. +- Runs **alongside** ADR-0002's gossip-triggered storage-commitment audit and the + periodic responsible-chunk audit, sharing the same trust/eviction path; full-node + detection is simply another attributable-misbehaviour source feeding it. + +## Validation + +How we will know this decision remains correct: + +- **Minority-full testnet:** full close peers accrue penalties from direct observation + only, cross the threshold, are evicted, and the close group recomputes to healthy + peers — with stored data still readable throughout (the surviving majority held the + replicas). +- **Honest-node safety:** under induced churn, honest-but-slow nodes are not evicted + (grace plus adaptive timeout scaling hold; no eviction death spiral), and a client + cannot induce eviction of an honest node by claiming it is full. +- **Self-closeness gate:** non-close nodes no longer accept client puts, and the chosen + gate width still leaves at least quorum-many acceptors available for the client + fallback set (cross-checked against ant-client ADR-0002). +- **Near-capacity:** when a neighbourhood is uniformly full, the node degrades to + best-effort storage rather than cascade-evicting. +- **Tests required before Accepted:** the possession check fires only within the 5–15 + minute window and never before it, and runs for **every** close-group peer; a peer + confirmed holding the chunk receives **no** trust change, while a confirmed not-present + peer records exactly one penalty at **AuditChallenge severity** against the right + peer — identically whether the push was accepted, refused with `StorageFailed`, or + never delivered; a peer the push never reached after 2 retries is still tested and + still penalised if it lacks the chunk; client/third-party claims record none; a peer + unreachable *at check time* is re-attempted under the grace allowance until it yields + present/absent; the adaptive timeout grace tracks widespread timeouts but never + deterministic failures; the node emits into saorsa-core such that an + evicted-for-fullness node can re-enter after it frees capacity (integration); the + self-closeness gate width is ≥ the fallback ceiling; a healthy node with un-evicted + full nodes ahead of it in its routing table (so it ranks outside + `storage_admission_width` but within `K_BUCKET_SIZE`) still **accepts** a + fresh-replication offer for the key, while retention/pruning stays scoped to + `storage_admission_width`. +- **Re-open triggers:** revisit thresholds if false positives appear; revisit the + near-capacity degradation if the network approaches global capacity. + +## Notes for AI-assisted work + +AI tools may help draft this ADR, but **must not mark it Accepted without human +review**. Accepted ADRs are immutable: create a new superseding ADR rather than +editing this one. diff --git a/src/replication/config.rs b/src/replication/config.rs index 571c934..c762cbd 100644 --- a/src/replication/config.rs +++ b/src/replication/config.rs @@ -47,6 +47,34 @@ pub const NEIGHBOR_SYNC_SCOPE: usize = 20; /// round. pub const NEIGHBOR_SYNC_PEER_COUNT: usize = 4; +/// Best-effort delivery retries for a fresh-replication push, per peer. +/// +/// ADR-0003: on a transport/send failure the offer is retried up to this many +/// times so a transient hiccup does not silently drop it. This is delivery +/// assurance only — possession is judged separately by the delayed possession +/// check, which still penalises a close peer that lacks the chunk even if the +/// push never reached it. +pub const FRESH_REPLICATION_DELIVERY_MAX_RETRIES: u32 = 2; + +const POSSESSION_CHECK_DELAY_MIN_SECS: u64 = 5 * 60; +const POSSESSION_CHECK_DELAY_MAX_SECS: u64 = 15 * 60; + +/// Lower bound of the delay before a fresh-replication possession check runs +/// (ADR-0003). +/// +/// The delay lets replication settle so an honest peer still mid-store is not +/// judged prematurely, and makes the check unpredictable to the peer. +pub const POSSESSION_CHECK_DELAY_MIN: Duration = + Duration::from_secs(POSSESSION_CHECK_DELAY_MIN_SECS); + +/// Upper bound of the possession-check delay (ADR-0003). +pub const POSSESSION_CHECK_DELAY_MAX: Duration = + Duration::from_secs(POSSESSION_CHECK_DELAY_MAX_SECS); + +// The possession probe reuses the `AuditChallenge` wire and the bandwidth- +// calibrated `audit_response_timeout(1)` deadline, so it needs no bespoke +// per-probe timeout or retry constants. + /// Width used when deciding whether this node may locally store or retain a /// chunk. #[must_use] @@ -108,8 +136,9 @@ pub const MAX_CONCURRENT_REPLICATION_SENDS: usize = 3; /// their disk reads don't stall replication. This caps how many run at once /// across the engine, restoring backpressure: a peer flooding audit challenges /// cannot fan out unbounded `get_raw` reads or multi-MiB byte serves. When the -/// cap is hit, the challenge is dropped — the auditor graces a non-response as a -/// timeout, so honest auditors are unaffected and only a flooder is throttled. +/// cap is hit, the challenge is dropped and the caller's audit-specific timeout +/// policy applies. The cap must therefore stay high enough for honest audit +/// traffic while still throttling flooders. /// Sized to cover a handful of concurrent honest auditors (the per-peer /// gossip-audit cooldown is 30 min, so genuine concurrent audits are few) while /// bounding the byte round's worst-case resident bytes @@ -120,9 +149,10 @@ pub const MAX_CONCURRENT_AUDIT_RESPONSES: usize = 16; /// /// The global [`MAX_CONCURRENT_AUDIT_RESPONSES`] ceiling alone is not /// flood-fair: one peer spamming challenges could occupy every slot and starve -/// honest auditors (whose dropped challenges convert to timeouts → strikes on -/// the honest peers). This per-peer cap guarantees no source holds more than -/// its share, so a flood self-throttles. Audits are cooldown-gated (one +/// honest auditors (whose dropped challenges convert to audit failures or +/// timeout verdicts on the challenged peers). This per-peer cap guarantees no +/// source holds more than its share, so a flood self-throttles. Audits are +/// cooldown-gated (one /// gossip-triggered audit per peer per 30 min), so 2 in-flight per peer /// comfortably covers the legitimate round-1 + round-2 overlap. pub const MAX_AUDIT_RESPONSES_PER_PEER: u32 = 2; @@ -277,23 +307,6 @@ const _: () = assert!( "wire cap must fit at least one max-size chunk per byte-challenge response" ); -/// Rollout gate for timeout-driven eviction. -/// -/// When `false`, a peer that crosses the consecutive-timeout strike threshold -/// is logged but NOT reported to the trust engine (no eviction). This PR is a -/// breaking wire change (old nodes cannot decode the new `StorageCommitment` -/// gossip), so a not-yet-upgraded peer times out on every new audit and looks -/// exactly like a non-storing peer; penalising timeouts during the mixed-version -/// window would make upgraded nodes evict every old node — a death spiral. -/// -/// Confirmed storage-integrity failures (`DigestMismatch`/`KeyAbsent`/ -/// `Rejected`/`MalformedResponse`) are NEVER gated by this — those only come -/// from a peer that actually answered with bad data, never an old node. Flip to -/// `true` in a small follow-up release once the fleet has upgraded. This is a -/// real `const` (not commented-out code) so both gate sites compile and stay in -/// sync, and the flip is one line. -pub const TIMEOUT_EVICTION_ENABLED: bool = false; - /// Verification request timeout (per-batch). const VERIFICATION_REQUEST_TIMEOUT_SECS: u64 = 15; /// Verification request timeout (per-batch). @@ -313,28 +326,6 @@ pub const PENDING_VERIFY_MAX_AGE: Duration = Duration::from_secs(PENDING_VERIFY_ /// Trust event weight for confirmed audit failures. pub const AUDIT_FAILURE_TRUST_WEIGHT: f64 = 5.0; -/// Consecutive audit *timeouts* a peer may accumulate before a timeout is -/// reported as an `ApplicationFailure` trust event. -/// -/// The audit response timeout is an economic deterrent calibrated for -/// residential bandwidth, not a hard cryptographic bound: a single slow -/// response is routine for an honest node under transient load (GC pause, -/// disk flush, a burst of concurrent requests). Penalizing on the first -/// timeout false-positives those nodes. -/// -/// Requiring `N` *consecutive* timeouts before penalizing removes that -/// false-positive while preserving the deterrent against a peer that does not -/// actually store the data and must fetch it at audit time: such a peer is -/// slow on *every* audit and accumulates a fresh strike each tick until it -/// crosses the threshold, whereas an honest node answers normally between rare -/// slow ticks and any success resets its strike counter to zero (see -/// `handle_audit_result`). The discriminator is *persistence* of slowness -/// versus *transience*. This deliberately does not widen the per-challenge -/// window. Applies ONLY to `AuditFailureReason::Timeout`; confirmed -/// storage-integrity failures (`DigestMismatch` / `KeyAbsent` / `Rejected` / -/// `MalformedResponse`) remain instantly punishable. -pub const AUDIT_TIMEOUT_STRIKE_THRESHOLD: u32 = 3; - /// Probability of launching a subtree audit when a peer's *changed* commitment /// is ingested via gossip (ADR-0002). Keeps audits occasional surprise exams. pub const AUDIT_ON_GOSSIP_PROBABILITY: f64 = 0.2; @@ -428,6 +419,13 @@ pub struct ReplicationConfig { /// Seconds to wait for `DhtNetworkEvent::BootstrapComplete` before /// proceeding with bootstrap sync (covers bootstrap nodes with no peers). pub bootstrap_complete_timeout_secs: u64, + /// Lower bound of the delay before a fresh-replication possession check + /// runs (ADR-0003). Defaults to [`POSSESSION_CHECK_DELAY_MIN`]; tests + /// shorten it so the scheduled check fires quickly. + pub possession_check_delay_min: Duration, + /// Upper bound of the possession-check delay window (ADR-0003). Defaults + /// to [`POSSESSION_CHECK_DELAY_MAX`]. + pub possession_check_delay_max: Duration, } impl Default for ReplicationConfig { @@ -454,6 +452,8 @@ impl Default for ReplicationConfig { verification_request_timeout: VERIFICATION_REQUEST_TIMEOUT, fetch_request_timeout: FETCH_REQUEST_TIMEOUT, bootstrap_complete_timeout_secs: BOOTSTRAP_COMPLETE_TIMEOUT_SECS, + possession_check_delay_min: POSSESSION_CHECK_DELAY_MIN, + possession_check_delay_max: POSSESSION_CHECK_DELAY_MAX, } } } @@ -592,12 +592,10 @@ impl ReplicationConfig { /// A relay attacker on a residential link (~5-12 MB/s) who must /// fetch the same `k × 4 MiB` over the network sees ~10-100× higher /// latency than disk for the data alone, plus per-chunk round-trips, - /// and misses the budget — recording a timeout strike (per - /// `handle_audit_timeout` → `handle_audit_failure`). After - /// [`AUDIT_TIMEOUT_STRIKE_THRESHOLD`] consecutive timeouts this would - /// fire an `application_failure` trust event — but note that report is - /// currently suppressed for the breaking rollout (grep - /// TIMEOUT-EVICTION-DISABLED); the strike accounting still runs. + /// and misses the budget. In the periodic responsible-chunk + /// `AuditChallenge`, prune-confirmation, and ADR-0003 possession-check paths + /// that timeout is an immediate audit failure. The heavier subtree audit + /// still graces timeouts separately. /// /// This is an economic deterrent for the §7 relay limit calibrated /// for residential bandwidth, NOT a hard bound: a relay on a @@ -739,14 +737,6 @@ mod tests { assert!((AUDIT_FAILURE_TRUST_WEIGHT - 5.0).abs() <= f64::EPSILON); } - #[test] - fn audit_timeout_strike_threshold_is_three() { - // Smallest threshold that tolerates back-to-back transient slowness - // while still penalizing a persistently-slow non-storing peer within a - // few audit ticks. - assert_eq!(AUDIT_TIMEOUT_STRIKE_THRESHOLD, 3); - } - #[test] fn replication_protocol_id_is_v2() { // The v12 storage-bound audit changes replication SEMANTICS. The diff --git a/src/replication/fresh.rs b/src/replication/fresh.rs index af3a93a..80e9766 100644 --- a/src/replication/fresh.rs +++ b/src/replication/fresh.rs @@ -14,7 +14,9 @@ use saorsa_core::P2PNode; use tokio::sync::Semaphore; use crate::ant_protocol::XorName; -use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID}; +use crate::replication::config::{ + ReplicationConfig, FRESH_REPLICATION_DELIVERY_MAX_RETRIES, REPLICATION_PROTOCOL_ID, +}; use crate::replication::paid_list::PaidList; use crate::replication::protocol::{ FreshReplicationOffer, PaidNotify, ReplicationMessage, ReplicationMessageBody, @@ -36,9 +38,10 @@ pub struct FreshWriteEvent { /// Execute fresh replication for a newly accepted record. /// -/// Sends fresh offers to close group members and `PaidNotify` to -/// `PaidCloseGroup`. Both are fire-and-forget (no ack tracking or retry per -/// Section 6.1, rule 8). +/// Sends fresh offers to close group members (with bounded delivery retries, +/// ADR-0003) and `PaidNotify` to `PaidCloseGroup`. Returns the close-group +/// peers responsible for the key (excluding self) so the caller can schedule +/// the delayed possession check; `PaidNotify` remains fire-and-forget. /// /// The `send_semaphore` limits how many outbound chunk transfers can be /// in-flight concurrently across the entire replication engine, preventing @@ -51,7 +54,7 @@ pub async fn replicate_fresh( paid_list: &Arc, config: &ReplicationConfig, send_semaphore: &Arc, -) { +) -> Vec { let self_id = *p2p_node.peer_id(); // Rule 6: Node that validates PoP adds K to PaidForList(self). @@ -88,11 +91,15 @@ pub async fn replicate_fresh( "Failed to encode FreshReplicationOffer for {}", hex::encode(key), ); - return; + return Vec::new(); }; + // Share one encoded copy across the per-peer send tasks so a retry only + // re-materialises the buffer for the (consuming) send call, keeping the + // common single-attempt path at one clone per peer. + let encoded = Arc::new(encoded); for peer in &target_peers { let p2p = Arc::clone(p2p_node); - let data = encoded.clone(); + let data = Arc::clone(&encoded); let peer_id = *peer; let sem = Arc::clone(send_semaphore); tokio::spawn(async move { @@ -103,11 +110,37 @@ pub async fn replicate_fresh( "Replication send permit acquired for peer {peer_id} ({} available)", sem.available_permits() ); - if let Err(e) = p2p - .send_message(&peer_id, REPLICATION_PROTOCOL_ID, data, &[]) - .await - { - debug!("Failed to send fresh offer to {peer_id}: {e}"); + // ADR-0003: best-effort delivery. Retry the push up to + // FRESH_REPLICATION_DELIVERY_MAX_RETRIES times on a transport + // failure so a transient hiccup doesn't silently drop the offer. + // Possession is judged separately by the delayed possession check. + let mut attempt = 0u32; + loop { + match p2p + .send_message( + &peer_id, + REPLICATION_PROTOCOL_ID, + data.as_ref().clone(), + &[], + ) + .await + { + Ok(()) => break, + Err(e) => { + if attempt >= FRESH_REPLICATION_DELIVERY_MAX_RETRIES { + debug!( + "Failed to send fresh offer to {peer_id} after {} attempts: {e}", + attempt + 1 + ); + break; + } + attempt += 1; + debug!( + "Retrying fresh offer to {peer_id} (attempt {}): {e}", + attempt + 1 + ); + } + } } }); } @@ -122,6 +155,8 @@ pub async fn replicate_fresh( hex::encode(key), target_peers.len() ); + + target_peers } /// Send `PaidNotify(K)` to every peer in `PaidCloseGroup(K)` (fire-and-forget). diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 0f6394a..80776cd 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -23,6 +23,7 @@ pub mod config; pub mod fresh; pub mod neighbor_sync; pub mod paid_list; +pub mod possession; pub mod protocol; pub mod pruning; pub mod quorum; @@ -215,18 +216,6 @@ pub struct ReplicationEngine { /// are lightweight (`PeerSyncRecord` is two fields) and peer IDs are /// naturally bounded by the routing table's k-bucket capacity. sync_history: Arc>>, - /// Per-peer consecutive audit-timeout strike counter. - /// - /// A timeout increments the peer's strike count; a successful audit - /// response resets it to zero. Only when a peer reaches - /// [`config::AUDIT_TIMEOUT_STRIKE_THRESHOLD`] consecutive timeouts is a - /// timeout reported as an `ApplicationFailure` trust event. This separates - /// honest transient slowness (resets on the next normal response) from a - /// peer that does not store the data and is slow on every audit. Lives - /// outside `NeighborSyncState` so it is never wiped by a neighbor-sync - /// cycle reset. Grows with peer churn like `sync_history`; entries are a - /// single `u32` and peer IDs are bounded by k-bucket capacity. - audit_timeout_strikes: Arc>>, /// Per-peer cooldown for gossip-triggered subtree audits (ADR-0002). /// /// Records when each peer was last audited so a burst of gossiped @@ -302,7 +291,7 @@ pub struct ReplicationEngine { /// [`MAX_AUDIT_RESPONSES_PER_PEER`]. The GLOBAL semaphore alone is not /// flood-fair: one peer spamming challenges could occupy every slot and /// starve honest auditors, whose dropped challenges then convert to - /// timeouts and record strikes on the HONEST peers (codex-r2 A). This + /// audit timeouts against HONEST peers (codex-r2 A). This /// per-peer cap guarantees no single source can hold more than its share, /// so a flood self-throttles without denying service to everyone else. audit_responder_inflight: Arc>>, @@ -311,6 +300,12 @@ pub struct ReplicationEngine { /// When present, `start()` spawns a drainer task that calls /// `replicate_fresh` for each event. fresh_write_rx: Option>, + /// Sender for delayed possession-check events (ADR-0003). The fresh-write + /// drainer pushes the responsible close-group peers here after each fresh + /// replication; the possession-check scheduler drains the paired receiver. + possession_check_tx: mpsc::UnboundedSender, + /// Receiver paired with `possession_check_tx`; taken by the scheduler task. + possession_check_rx: Option>, /// Shutdown token. shutdown: CancellationToken, /// Background task handles. @@ -345,6 +340,7 @@ impl ReplicationEngine { let initial_neighbors = NeighborSyncState::new_cycle(Vec::new()); let config = Arc::new(config); + let (possession_check_tx, possession_check_rx) = mpsc::unbounded_channel(); Ok(Self { config: Arc::clone(&config), @@ -355,7 +351,6 @@ impl ReplicationEngine { queues: Arc::new(RwLock::new(ReplicationQueues::new())), sync_state: Arc::new(RwLock::new(initial_neighbors)), sync_history: Arc::new(RwLock::new(HashMap::new())), - audit_timeout_strikes: Arc::new(RwLock::new(HashMap::new())), audit_on_gossip_cooldown: Arc::new(RwLock::new(HashMap::new())), sync_cycle_epoch: Arc::new(RwLock::new(0)), repair_proofs: Arc::new(RwLock::new(RepairProofs::new())), @@ -373,6 +368,8 @@ impl ReplicationEngine { audit_responder_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_AUDIT_RESPONSES)), audit_responder_inflight: Arc::new(RwLock::new(HashMap::new())), fresh_write_rx: Some(fresh_write_rx), + possession_check_tx, + possession_check_rx: Some(possession_check_rx), shutdown, task_handles: Vec::new(), }) @@ -475,6 +472,26 @@ impl ReplicationEngine { .await } + /// Test-only: run the possession check immediately for `key` against + /// `peers`, bypassing the scheduler's randomised 5-15 minute settle delay. + /// + /// Penalises any peer that does not hold `key` at `AuditChallenge` + /// severity (ADR-0003). Lets e2e tests assert the detection+penalty path + /// deterministically without waiting for the scheduled check. + #[cfg(any(test, feature = "test-utils"))] + pub async fn run_possession_check_now(&self, key: XorName, peers: Vec) { + possession::run_possession_check( + key, + peers, + &self.p2p_node, + &self.storage, + &self.config, + &self.sync_state, + &self.shutdown, + ) + .await; + } + /// Start all background tasks. /// /// `dht_events` must be subscribed **before** `P2PNode::start()` so that @@ -500,6 +517,7 @@ impl ReplicationEngine { self.start_verification_worker(); self.start_bootstrap_sync(dht_events); self.start_fresh_write_drainer(); + self.start_possession_check_scheduler(); info!( "Replication engine started with {} background tasks", @@ -566,9 +584,13 @@ impl ReplicationEngine { self.sync_trigger.notify_one(); } - /// Execute fresh replication for a newly stored record. + /// Execute fresh replication for a newly stored record, then schedule the + /// delayed possession check for the responsible close-group peers + /// (ADR-0003). The production PUT path schedules via the fresh-write + /// drainer; this direct entry point schedules here so callers (and tests) + /// that drive replication directly still get the possession check. pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) { - fresh::replicate_fresh( + let peers = fresh::replicate_fresh( key, data, proof_of_payment, @@ -578,6 +600,11 @@ impl ReplicationEngine { &self.send_semaphore, ) .await; + if !peers.is_empty() { + let _ = self + .possession_check_tx + .send(possession::PossessionCheckEvent { key: *key, peers }); + } } // ======================================================================= @@ -594,6 +621,7 @@ impl ReplicationEngine { let paid_list = Arc::clone(&self.paid_list); let config = Arc::clone(&self.config); let send_semaphore = Arc::clone(&self.send_semaphore); + let possession_tx = self.possession_check_tx.clone(); let shutdown = self.shutdown.clone(); let handle = tokio::spawn(async move { @@ -602,7 +630,7 @@ impl ReplicationEngine { () = shutdown.cancelled() => break, event = rx.recv() => { let Some(event) = event else { break }; - fresh::replicate_fresh( + let peers = fresh::replicate_fresh( &event.key, &event.data, &event.payment_proof, @@ -612,6 +640,15 @@ impl ReplicationEngine { &send_semaphore, ) .await; + // Schedule the delayed possession check (ADR-0003) for + // the responsible close-group peers. A closed receiver + // (engine shutting down) is ignored. + if !peers.is_empty() { + let _ = possession_tx.send(possession::PossessionCheckEvent { + key: event.key, + peers, + }); + } } } } @@ -620,6 +657,65 @@ impl ReplicationEngine { self.task_handles.push(handle); } + /// Spawn the possession-check scheduler (ADR-0003). + /// + /// Drains scheduled possession-check events and, for each, waits a + /// randomised 5-15 minute settle delay before probing every responsible + /// peer for actual possession. A peer that cryptographically fails to prove + /// possession, including by timeout, is penalised at `AuditChallenge` + /// severity. + fn start_possession_check_scheduler(&mut self) { + let Some(mut rx) = self.possession_check_rx.take() else { + return; + }; + let p2p = Arc::clone(&self.p2p_node); + let storage = Arc::clone(&self.storage); + let config = Arc::clone(&self.config); + let sync_state = Arc::clone(&self.sync_state); + let shutdown = self.shutdown.clone(); + + let handle = tokio::spawn(async move { + loop { + tokio::select! { + () = shutdown.cancelled() => break, + event = rx.recv() => { + let Some(event) = event else { break }; + // Spawn a per-chunk delayed check so the drain loop + // keeps pace with the write rate. Each check sleeps the + // randomised settle delay, then probes every peer. + let p2p = Arc::clone(&p2p); + let storage = Arc::clone(&storage); + let config = Arc::clone(&config); + let sync_state = Arc::clone(&sync_state); + let shutdown = shutdown.clone(); + let delay_min = config.possession_check_delay_min; + let delay_max = config.possession_check_delay_max; + tokio::spawn(async move { + let delay = possession::random_delay(delay_min, delay_max); + tokio::select! { + () = shutdown.cancelled() => {} + () = tokio::time::sleep(delay) => { + possession::run_possession_check( + event.key, + event.peers, + &p2p, + &storage, + &config, + &sync_state, + &shutdown, + ) + .await; + } + } + }); + } + } + } + debug!("Possession-check scheduler shut down"); + }); + self.task_handles.push(handle); + } + #[allow(clippy::too_many_lines)] fn start_message_handler(&mut self) { let mut p2p_events = self.p2p_node.subscribe_events(); @@ -642,7 +738,6 @@ impl ReplicationEngine { let ever_capable_peers = Arc::clone(&self.ever_capable_peers); let recent_provers = Arc::clone(&self.recent_provers); let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts); - let audit_timeout_strikes = Arc::clone(&self.audit_timeout_strikes); let audit_on_gossip_cooldown = Arc::clone(&self.audit_on_gossip_cooldown); let sync_state = Arc::clone(&self.sync_state); let audit_responder_semaphore = Arc::clone(&self.audit_responder_semaphore); @@ -655,7 +750,6 @@ impl ReplicationEngine { config: Arc::clone(&config), recent_provers: Arc::clone(&recent_provers), sync_state: Arc::clone(&sync_state), - audit_timeout_strikes: Arc::clone(&audit_timeout_strikes), cooldown: Arc::clone(&audit_on_gossip_cooldown), }; @@ -777,10 +871,6 @@ impl ReplicationEngine { last_commitment_by_peer.write().await.remove(&peer_id); recent_provers.write().await.forget_peer(&peer_id); sig_verify_attempts.write().await.remove(&peer_id); - // Drop the timeout-strike entry too, so a - // departed peer leaves no residual (keeps this - // map bounded under churn, like its siblings). - audit_timeout_strikes.write().await.remove(&peer_id); // Same for the gossip-audit cooldown (ADR-0002). audit_on_gossip_cooldown.write().await.remove(&peer_id); // The sticky `commitment_capable` flag is @@ -827,7 +917,6 @@ impl ReplicationEngine { config: Arc::clone(&config), recent_provers: Arc::clone(&self.recent_provers), sync_state: Arc::clone(&sync_state), - audit_timeout_strikes: Arc::clone(&self.audit_timeout_strikes), cooldown: Arc::clone(&self.audit_on_gossip_cooldown), }; @@ -910,12 +999,6 @@ impl ReplicationEngine { let bootstrap_state = Arc::clone(&self.bootstrap_state); let is_bootstrapping = Arc::clone(&self.is_bootstrapping); let sync_state = Arc::clone(&self.sync_state); - // Needed so the responsible-chunk audit routes failures through the same - // strike/grace path as the storage-commitment audit (timeouts graced, - // not penalised on the first occurrence) and can revoke holder credit on - // a confirmed failure. - let recent_provers = Arc::clone(&self.recent_provers); - let audit_timeout_strikes = Arc::clone(&self.audit_timeout_strikes); let handle = tokio::spawn(async move { // Invariant 19: wait for bootstrap to drain before starting audits. @@ -949,15 +1032,7 @@ impl ReplicationEngine { ) .await }; - handle_audit_result( - &result, - &p2p, - &sync_state, - &recent_provers, - &audit_timeout_strikes, - &config, - ) - .await; + handle_audit_result(&result, &p2p, &sync_state, &config).await; } // Then run periodically. @@ -981,15 +1056,7 @@ impl ReplicationEngine { ) .await }; - handle_audit_result( - &result, - &p2p, - &sync_state, - &recent_provers, - &audit_timeout_strikes, - &config, - ) - .await; + handle_audit_result(&result, &p2p, &sync_state, &config).await; } } } @@ -1531,9 +1598,9 @@ impl Drop for AuditResponderGuard { /// Try to admit one audit-responder task for `source`: take a global permit AND /// a per-peer slot (both bounded). Returns `None` (caller drops the challenge, -/// which the auditor graces as a timeout) if either ceiling is hit, so one -/// flooder can neither exhaust the global pool's effect on others nor exceed -/// its own per-peer share (codex-r2 A). +/// leaving the remote auditor to apply that audit path's timeout policy) if +/// either ceiling is hit, so one flooder can neither exhaust the global pool's +/// effect on others nor exceed its own per-peer share (codex-r2 A). async fn admit_audit_responder( semaphore: &Arc, inflight: &Arc>>, @@ -1703,8 +1770,9 @@ async fn handle_replication_message( // block all other replication traffic until its digests complete // (head-of-line blocking). The same flood-fair admission applies: a // global ceiling AND a per-peer cap, dropping the challenge if either - // is hit (an honest auditor graces a non-response as a timeout, while - // a flooder is held to its per-peer share and cannot starve others). + // is hit. Responsible/prune audit timeouts are penalised by the + // caller, so the caps must remain high enough for honest audit load; + // the per-peer share still prevents one flooder from starving others. let Some(guard) = admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source) .await @@ -1748,10 +1816,10 @@ async fn handle_replication_message( // // A bounded, flood-fair admission restores backpressure (codex#1 + // codex-r2 A): a global ceiling AND a per-peer cap. If either is hit - // we drop this challenge — the auditor graces a non-response as a - // timeout, so an honest auditor is unaffected and only a flooder is - // throttled (and it cannot starve other peers, since its share is - // capped per-peer). + // we drop this challenge. Subtree auditors grace timeout + // non-responses, so capacity drops throttle flooders without turning + // into trust penalties (and one source cannot starve other peers, + // since its share is capped per-peer). info!( "Audit challenge received: kind=subtree source={source} request_response={}", rr_message_id.is_some(), @@ -1982,13 +2050,19 @@ async fn handle_fresh_offer( return Ok(()); } - // Rule 7: check storage admission. Fresh chunk receivers accept the close - // group plus a small margin to absorb local routing-table disagreement. + // Rule 7: check storage admission. Fresh chunk receivers accept across the + // paid-close-group neighbourhood (`paid_list_close_group_size`, = K_BUCKET_SIZE, + // the same width client PUTs use), not just the close group plus a small + // margin (ADR-0003). During full-node shunning a healthy replica's routing + // table may still list closer full nodes it hasn't evicted yet, ranking it + // outside the narrow window in its own view; the wider accept window absorbs + // that transient skew so the chunk still lands. Retention (pruning) stays at + // `storage_admission_width`, so steady-state replication is unchanged. if !admission::is_responsible( &self_id, &offer.key, p2p_node, - storage_admission_width(config.close_group_size), + config.paid_list_close_group_size, ) .await { @@ -3686,94 +3760,45 @@ fn first_failed_key_label(confirmed_failed_keys: &[XorName]) -> String { ) } -/// Execute the side effects for a confirmed storage-commitment audit failure. +/// Execute the side effects for a subtree storage-commitment audit failure. /// -/// [`plan_failed_audit`] is the pure decision INCLUDING the strike selection -/// (record-a-strike-for-`Timeout` vs leave-untouched for confirmed failures), -/// extracted so the whole glue — not just the verdict — is testable without a -/// live `P2PNode`. This function is only the resulting I/O. Timeouts are graced -/// and rollout-gated (TIMEOUT-EVICTION-DISABLED); confirmed failures penalize on -/// the first occurrence and revoke holder credit. -async fn handle_failed_audit( +/// Subtree timeouts are fully graced: the multi-round, multi-chunk challenge can +/// legitimately time out on slow or loaded honest peers, so it never touches the +/// responsible-chunk audit path or its timeout accounting. Confirmed subtree +/// failures still penalise immediately and revoke holder credit. +async fn handle_subtree_failed_audit( challenged_peer: &PeerId, confirmed_failed_key_count: usize, reason: &AuditFailureReason, p2p_node: &Arc, sync_state: &Arc>, recent_provers: &Arc>, - audit_timeout_strikes: &Arc>>, ) { - let action = { - let mut strikes = audit_timeout_strikes.write().await; - plan_failed_audit(reason, &mut strikes, challenged_peer) - }; - match action { - AuditFailureAction::TimeoutGrace => { - // Honest transient slowness: no penalty, no credit loss, retain the - // bootstrap claim. Only *sustained* timeouts (a peer that always - // has to refetch) survive to the threshold — the per-challenge - // window is never widened. - debug!( - "Audit timeout for {challenged_peer} (under the {}-strike threshold); \ - within grace, retaining bootstrap claim, no penalty", - config::AUDIT_TIMEOUT_STRIKE_THRESHOLD - ); - } - AuditFailureAction::TimeoutPenalize => { - // Strikes are tracked/logged so the mechanism stays observable; the - // trust report that drives eviction is gated behind - // `TIMEOUT_EVICTION_ENABLED` (off this release — see its doc for the - // rollout-death-spiral rationale). Confirmed storage-integrity - // failures (ConfirmedPenalize below) are unaffected. - warn!( - "Audit timeout for {challenged_peer}: reached the {}-strike threshold of \ - consecutive timeouts ({})", - config::AUDIT_TIMEOUT_STRIKE_THRESHOLD, - if config::TIMEOUT_EVICTION_ENABLED { - "penalizing" - } else { - "eviction disabled this release — not penalizing" - } - ); - if config::TIMEOUT_EVICTION_ENABLED { - p2p_node - .report_trust_event( - challenged_peer, - TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT), - ) - .await; - } - } - AuditFailureAction::ConfirmedPenalize => { - // The caller (handle_subtree_audit_result) already logged the rich - // failure line with reason + per-category summary; avoid a redundant - // second error log here. `confirmed_failed_key_count` is retained in - // the signature for callers/tests that assert on it. - let _ = confirmed_failed_key_count; - // Peer returned a non-bootstrap response — clear the active claim - // while retaining claim history. - { - let mut state = sync_state.write().await; - state.clear_active_bootstrap_claim(challenged_peer); - } - // Revoke holder credit on a CONFIRMED failure (DigestMismatch / - // KeyAbsent / Rejected / MalformedResponse): the peer no longer - // provably holds what it committed to, so it must not keep §6 - // holder credit for the proof TTL. The §5 `forget_commitment` path - // only fires on an "unknown commitment hash" reply; genuine byte - // loss surfaces here. - { - let mut provers_guard = recent_provers.write().await; - apply_audit_failure_credit_revocation(&mut provers_guard, challenged_peer, reason); - } - p2p_node - .report_trust_event( - challenged_peer, - TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT), - ) - .await; - } + if matches!(reason, AuditFailureReason::Timeout) { + debug!( + "Audit timeout for {challenged_peer} fully graced \ + (subtree audit does not evict on timeout)" + ); + return; } + + // The caller already logged the rich failure line with reason + per-category + // summary; avoid a redundant second error log here. + let _ = confirmed_failed_key_count; + { + let mut state = sync_state.write().await; + state.clear_active_bootstrap_claim(challenged_peer); + } + { + let mut provers_guard = recent_provers.write().await; + apply_audit_failure_credit_revocation(&mut provers_guard, challenged_peer, reason); + } + p2p_node + .report_trust_event( + challenged_peer, + TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT), + ) + .await; } /// Handle audit result: log findings and emit trust events. @@ -3782,7 +3807,6 @@ async fn handle_subtree_audit_result( p2p_node: &Arc, sync_state: &Arc>, recent_provers: &Arc>, - audit_timeout_strikes: &Arc>>, config: &ReplicationConfig, ) { match result { @@ -3797,14 +3821,6 @@ async fn handle_subtree_audit_result( let mut state = sync_state.write().await; state.clear_active_bootstrap_claim(challenged_peer); } - // A normal response proves the slowness (if any) was transient, so - // reset the timeout-strike counter. Only *sustained* timeouts (a - // peer that must refetch on every audit) survive this reset to - // accumulate toward the penalty threshold. - { - let mut strikes = audit_timeout_strikes.write().await; - strikes.remove(challenged_peer); - } p2p_node .report_trust_event( challenged_peer, @@ -3831,20 +3847,16 @@ async fn handle_subtree_audit_result( summary.absent_keys, summary.digest_mismatch_keys, ); - // Route the side effects through the strike-grace path: timeouts - // are graced (and rollout-gated by TIMEOUT-EVICTION-DISABLED), - // deterministic failures penalize on the first occurrence and - // revoke holder credit. Do NOT report ApplicationFailure inline - // here — that would evict honest not-yet-upgraded peers on a - // single timeout during the breaking rollout. - handle_failed_audit( + // Route the side effects through the subtree-only failure path. + // Responsible-chunk `AuditChallenge` handling intentionally uses + // its own old immediate-penalty handler below. + handle_subtree_failed_audit( challenged_peer, confirmed_failed_keys.len(), reason, p2p_node, sync_state, recent_provers, - audit_timeout_strikes, ) .await; } @@ -3898,11 +3910,9 @@ async fn handle_subtree_audit_result( /// bootstrap claim. A `Timeout` does not (the peer may still be legitimately /// bootstrapping); every confirmed storage-integrity reason does. /// -/// Both audits now funnel through [`handle_failed_audit`], which derives the -/// clear-vs-retain decision from [`decide_audit_failure_action`]; this predicate -/// is retained as the readable single-source-of-truth that those tests assert -/// against (it is the exact `reason != Timeout` rule the action planner uses). -#[cfg(test)] +/// Responsible-chunk `AuditChallenge` failures use this directly: timeouts keep +/// the bootstrap claim but are still reported as audit failures, matching the +/// pre-ADR-0002 behaviour. fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool { !matches!(reason, AuditFailureReason::Timeout) } @@ -3910,162 +3920,106 @@ fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool { /// Handle the result of a responsible-chunk audit tick (audit #2): emit trust /// events and manage bootstrap-claim state. /// -/// Delegates to [`handle_subtree_audit_result`] so BOTH audits share one -/// failure path: timeouts go through the strike/grace logic (graced under the -/// threshold, eviction gated off this release via `TIMEOUT-EVICTION-DISABLED`) -/// and only confirmed storage-integrity failures penalise on the first -/// occurrence and revoke holder credit. Previously this handler reported -/// `ApplicationFailure` inline for EVERY failure including `Timeout`, which — -/// with the breaking v2 wire change — would false-penalise honest -/// not-yet-upgraded peers on a single audit. (Audit #2 cannot credit holders, -/// so the shared handler's strike-reset/credit-revocation is a superset of what -/// it needs; the responsible-chunk audit never produces `Passed { .. }` with -/// holder credit, so nothing is over-credited.) +/// This is intentionally separate from the subtree audit result handler. A +/// responsible-chunk `AuditChallenge` `Failed` result reports +/// `ApplicationFailure` immediately for every reason, including `Timeout`, +/// restoring the pre-ADR-0002 behaviour. async fn handle_audit_result( result: &AuditTickResult, p2p_node: &Arc, sync_state: &Arc>, - recent_provers: &Arc>, - audit_timeout_strikes: &Arc>>, config: &ReplicationConfig, ) { - handle_subtree_audit_result( - result, - p2p_node, - sync_state, - recent_provers, - audit_timeout_strikes, - config, - ) - .await; -} - -/// What the audit-failure handler should do for a given failure, given the -/// peer's post-increment timeout-strike count. Pure (no I/O) so the whole -/// decision can be exercised end-to-end without a live `P2PNode`. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum AuditFailureAction { - /// Timeout under the strike threshold: no trust penalty, no credit - /// revocation, retain the bootstrap claim (honest transient slowness). - TimeoutGrace, - /// Timeout at/over the threshold: report `ApplicationFailure`. Bootstrap - /// claim retained; holder credit NOT revoked (the peer never admitted byte - /// loss). The non-storing-peer case. - TimeoutPenalize, - /// Confirmed storage-integrity failure: penalize immediately, clear the - /// active bootstrap claim, and revoke holder credit. - ConfirmedPenalize, -} - -/// Upper bound on a peer's consecutive-timeout strike count. Must exceed the -/// largest reachable adaptive threshold (base + `MAX_ADAPTIVE_TIMEOUT_GRACE`) so -/// a genuinely non-responsive peer's count can always catch up to and cross an -/// inflated threshold — otherwise capping at the base would make timeout -/// penalties unreachable once the adaptive threshold rose. -const AUDIT_TIMEOUT_STRIKE_MAX: u32 = 64; - -/// Maximum extra grace the adaptive mechanism may add on top of the base -/// threshold. Bounds how far a (possibly stale) set of timing-out peers can -/// widen the window, so a small persistent failing cohort cannot push the -/// threshold arbitrarily high and shield a bad node indefinitely. -const MAX_ADAPTIVE_TIMEOUT_GRACE: u32 = 2 * config::AUDIT_TIMEOUT_STRIKE_THRESHOLD; - -/// Record an audit timeout for `peer` and return its new consecutive-timeout -/// strike count, saturating at [`AUDIT_TIMEOUT_STRIKE_MAX`] (well above any -/// reachable adaptive threshold). A successful audit removes the peer's entry -/// (the `Passed` arm of [`handle_subtree_audit_result`]), so only *consecutive* -/// timeouts accumulate here. -fn record_audit_timeout_strike(strikes: &mut HashMap, peer: &PeerId) -> u32 { - let count = strikes.entry(*peer).or_insert(0); - *count = count.saturating_add(1).min(AUDIT_TIMEOUT_STRIKE_MAX); - *count -} - -/// The adaptive timeout-strike threshold for judging `peer` (ADR-0002 "Network -/// Resilience"): `min(median of the OTHER timing-out peers' counts, -/// MAX_ADAPTIVE_TIMEOUT_GRACE) + base threshold`. -/// -/// In a healthy network almost no peer carries timeout strikes, so the median -/// is 0 and the threshold is the base [`config::AUDIT_TIMEOUT_STRIKE_THRESHOLD`]. -/// During genuine disruption many *honest* peers time out together, lifting the -/// median and widening the grace so the audit system does not pile onto a -/// struggling network — but the widening is capped at `MAX_ADAPTIVE_TIMEOUT_GRACE` -/// so a stale failing cohort cannot inflate it without bound. -/// -/// `peer` is EXCLUDED from the median so a lone timing-out peer cannot raise its -/// own grace bar. Combined with the map being fed ONLY by timeouts (deterministic -/// failures never touch it), this closes self-inflation and bounds -/// attacker-inflation of the grace window. -fn adaptive_timeout_threshold(strikes: &HashMap, peer: &PeerId) -> u32 { - let grace = median_timeout_strikes_excluding(strikes, peer).min(MAX_ADAPTIVE_TIMEOUT_GRACE); - grace.saturating_add(config::AUDIT_TIMEOUT_STRIKE_THRESHOLD) -} - -/// Lower median of the current per-peer consecutive-timeout counts, excluding -/// `peer`. No other peers → 0. -fn median_timeout_strikes_excluding(strikes: &HashMap, peer: &PeerId) -> u32 { - let mut counts: Vec = strikes - .iter() - .filter(|(p, _)| *p != peer) - .map(|(_, c)| *c) - .collect(); - if counts.is_empty() { - return 0; - } - counts.sort_unstable(); - // Lower median: for even-sized inputs take the lower of the two middle - // values ((len-1)/2), so the grace is conservative rather than inflated. - counts.get((counts.len() - 1) / 2).copied().unwrap_or(0) -} - -/// Whether a peer's consecutive-timeout strike count reaches the (adaptive) -/// threshold for emitting an `ApplicationFailure` trust event. -fn timeout_strike_reaches_threshold(strikes: u32, threshold: u32) -> bool { - strikes >= threshold -} - -/// Decide what to do about a confirmed audit failure. `timeout_strikes_after` -/// is the peer's strike count after recording this event and `timeout_threshold` -/// the adaptive threshold to compare against (both only meaningful when -/// `reason == Timeout`). Pure, so the integration-level decision can be asserted -/// in tests with no networking. -fn decide_audit_failure_action( - reason: &AuditFailureReason, - timeout_strikes_after: u32, - timeout_threshold: u32, -) -> AuditFailureAction { - if matches!(reason, AuditFailureReason::Timeout) { - if timeout_strike_reaches_threshold(timeout_strikes_after, timeout_threshold) { - AuditFailureAction::TimeoutPenalize - } else { - AuditFailureAction::TimeoutGrace + match result { + AuditTickResult::Passed { + challenged_peer, + keys_checked, + } => { + debug!("Audit passed for {challenged_peer} ({keys_checked} keys)"); + { + let mut state = sync_state.write().await; + state.clear_active_bootstrap_claim(challenged_peer); + } + p2p_node + .report_trust_event( + challenged_peer, + TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT), + ) + .await; } - } else { - AuditFailureAction::ConfirmedPenalize + AuditTickResult::Failed { evidence } => { + if let FailureEvidence::AuditFailure { + challenged_peer, + confirmed_failed_keys, + summary, + reason, + .. + } = evidence + { + let first_failed_key = first_failed_key_label(confirmed_failed_keys); + error!( + "Audit failure for {challenged_peer}: reason={reason:?}, confirmed_failed_keys={}, challenged_keys={}, absent_keys={}, digest_mismatch_keys={}, first_failed_key={first_failed_key}", + confirmed_failed_keys.len(), + summary.challenged_keys, + summary.absent_keys, + summary.digest_mismatch_keys, + ); + if audit_failure_clears_bootstrap_claim(reason) { + let mut state = sync_state.write().await; + state.clear_active_bootstrap_claim(challenged_peer); + } else { + debug!("Audit timeout for {challenged_peer}; retaining active bootstrap claim"); + } + p2p_node + .report_trust_event( + challenged_peer, + TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT), + ) + .await; + } + } + AuditTickResult::BootstrapClaim { peer } => { + let should_report = { + let now = Instant::now(); + let mut state = sync_state.write().await; + match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period) + { + BootstrapClaimObservation::WithinGrace { .. } => { + debug!("Audit: peer {peer} claims bootstrapping (within grace period)"); + false + } + BootstrapClaimObservation::PastGrace { first_seen } => { + warn!( + "Audit: peer {peer} claiming bootstrap past grace period \ + ({:?} > {:?}), reporting abuse", + now.duration_since(first_seen), + config.bootstrap_claim_grace_period, + ); + true + } + BootstrapClaimObservation::Repeated { first_seen } => { + warn!( + "Audit: peer {peer} repeated bootstrap claim after previously \ + stopping; first claim was {:?} ago, reporting abuse", + now.duration_since(first_seen), + ); + true + } + } + }; + if should_report { + p2p_node + .report_trust_event( + peer, + TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT), + ) + .await; + } + } + AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {} } } -/// Plan the response to a confirmed audit failure, performing the -/// strike-selection glue in-process: a `Timeout` records a strike against -/// `peer` (so consecutive timeouts accumulate) and is judged against the -/// adaptive threshold; every other reason is a confirmed failure that does NOT -/// touch the strike map. The caller owns the lock and performs the resulting I/O. -fn plan_failed_audit( - reason: &AuditFailureReason, - strikes: &mut HashMap, - peer: &PeerId, -) -> AuditFailureAction { - // Snapshot the adaptive threshold from the *other* peers' counts (excluding - // this peer), so a single peer's own timeouts cannot raise its own grace bar. - let threshold = adaptive_timeout_threshold(strikes, peer); - let strikes_after = if matches!(reason, AuditFailureReason::Timeout) { - record_audit_timeout_strike(strikes, peer) - } else { - 0 - }; - decide_audit_failure_action(reason, strikes_after, threshold) -} - /// Whether a confirmed audit failure with this reason should revoke the /// peer's `recent_provers` holder credit immediately (v12 §6). /// @@ -4109,7 +4063,6 @@ struct GossipAuditTrigger { config: Arc, recent_provers: Arc>, sync_state: Arc>, - audit_timeout_strikes: Arc>>, cooldown: Arc>>, } @@ -4220,7 +4173,6 @@ async fn maybe_trigger_gossip_audit( &trigger.p2p_node, &trigger.sync_state, &trigger.recent_provers, - &trigger.audit_timeout_strikes, &trigger.config, ) .await; @@ -4672,12 +4624,9 @@ async fn rebuild_and_rotate_commitment( #[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] mod tests { use super::{ - adaptive_timeout_threshold, apply_audit_failure_credit_revocation, - audit_failure_clears_bootstrap_claim, audit_failure_revokes_holder_credit, - audit_launch_decision, config, cooldown_allows_audit, decide_audit_failure_action, - first_failed_key_label, fresh_offer_payment_context, median_timeout_strikes_excluding, - paid_notify_payment_context, plan_failed_audit, record_audit_timeout_strike, - timeout_strike_reaches_threshold, AuditFailureAction, AUDIT_TIMEOUT_STRIKE_MAX, + apply_audit_failure_credit_revocation, audit_failure_clears_bootstrap_claim, + audit_failure_revokes_holder_credit, audit_launch_decision, config, cooldown_allows_audit, + first_failed_key_label, fresh_offer_payment_context, paid_notify_payment_context, }; use crate::payment::VerificationContext; use crate::replication::recent_provers::RecentProvers; @@ -4728,197 +4677,6 @@ mod tests { PeerId::from_bytes(bytes) } - // HELPER-LEVEL: counter arithmetic + threshold predicate. The reset is - // simulated by an in-test `strikes.remove`; the real reset path (the - // `Passed` arm) is covered at the glue level below. - #[test] - fn single_timeout_then_success_emits_no_failure_and_resets() { - let peer = strike_peer(1); - let mut strikes: HashMap = HashMap::new(); - let base = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD; - let after_one = record_audit_timeout_strike(&mut strikes, &peer); - assert_eq!(after_one, 1); - assert!(!timeout_strike_reaches_threshold(after_one, base)); - strikes.remove(&peer); - assert!(!strikes.contains_key(&peer)); - } - - #[test] - fn consecutive_timeouts_cross_threshold_at_n() { - let peer = strike_peer(2); - let mut strikes: HashMap = HashMap::new(); - let n = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD; - let mut last = 0; - for i in 1..=n { - last = record_audit_timeout_strike(&mut strikes, &peer); - if i < n { - assert!(!timeout_strike_reaches_threshold(last, n)); - } - } - assert!(timeout_strike_reaches_threshold(last, n)); - // The count keeps climbing past the base threshold (so it can also - // cross a higher *adaptive* threshold), but is bounded by the strike - // cap — no unbounded growth. - let mut c = last; - for _ in 0..200 { - c = record_audit_timeout_strike(&mut strikes, &peer); - } - assert_eq!( - c, - super::AUDIT_TIMEOUT_STRIKE_MAX, - "count saturates at the max cap" - ); - assert!(c > n, "count must be able to exceed the base threshold"); - } - - // ADR-0002 Network Resilience: adaptive timeout threshold. - - #[test] - fn median_timeout_strikes_basics() { - let target = strike_peer(99); - let mut strikes: HashMap = HashMap::new(); - // No other peers → 0 (healthy network, threshold == base). - assert_eq!(median_timeout_strikes_excluding(&strikes, &target), 0); - strikes.insert(strike_peer(1), 1); - strikes.insert(strike_peer(2), 3); - strikes.insert(strike_peer(3), 5); - // Sorted [1,3,5], lower-median index 1 → 3. - assert_eq!(median_timeout_strikes_excluding(&strikes, &target), 3); - } - - // ADVERSARIAL (ADR point e + sybil-inflation bound). Two invariants the - // existing suite leaves unpinned: - // 1. EVEN-count inputs must take the LOWER of the two middle values. The - // existing basics test only feeds an odd-length cohort, so an - // implementation that used `len/2` (upper median) would still pass it. - // Here [1,4] -> lower median 1 (not 4) and [2,4,6,8] -> 4 (not 6). - // 2. A sybil cohort pinned at the *strike cap* (the most an attacker could - // ever drive fabricated peers to) STILL cannot push the grace past - // MAX_ADAPTIVE_TIMEOUT_GRACE: the threshold saturates at base + max - // grace regardless of how high or how numerous the cohort is. - // FLIPS IF: median switches to the upper element on even input, or the - // grace clamp (`.min(MAX_ADAPTIVE_TIMEOUT_GRACE)`) is removed. - #[test] - fn even_count_takes_lower_median_and_sybil_cohort_cannot_exceed_grace_bound() { - let target = strike_peer(150); - - // Even count == 2: lower of [1, 4] is 1. - let mut two: HashMap = HashMap::new(); - two.insert(strike_peer(1), 1); - two.insert(strike_peer(2), 4); - assert_eq!( - median_timeout_strikes_excluding(&two, &target), - 1, - "even-count median must take the LOWER middle value (1), not the upper (4)" - ); - - // Even count == 4: sorted [2,4,6,8], lower median index (4-1)/2 = 1 → 4. - let mut four: HashMap = HashMap::new(); - for (i, v) in (10u8..).zip([2u32, 4, 6, 8]) { - four.insert(strike_peer(i), v); - } - assert_eq!( - median_timeout_strikes_excluding(&four, &target), - 4, - "even-count median must be the lower middle (4), not the upper (6)" - ); - - // Sybil cohort pinned at the strike CAP — the strongest inflation an - // attacker could mount — must not lift the threshold past base + max - // grace. Try several cohort sizes (odd and even) to be sure. - for cohort in [2u8, 5, 8, 20] { - let mut strikes: HashMap = HashMap::new(); - for i in 0..cohort { - strikes.insert(strike_peer(50 + i), super::AUDIT_TIMEOUT_STRIKE_MAX); - } - let threshold = adaptive_timeout_threshold(&strikes, &target); - assert_eq!( - threshold, - config::AUDIT_TIMEOUT_STRIKE_THRESHOLD + super::MAX_ADAPTIVE_TIMEOUT_GRACE, - "a sybil cohort at the strike cap (size {cohort}) must saturate the grace at \ - the bound, never exceed it" - ); - } - - // And even at the bounded-but-inflated threshold, a genuinely - // non-responsive target can still cross it (cap > max reachable - // threshold), so the bound never shields a bad node forever. - let mut strikes: HashMap = HashMap::new(); - for i in 0..8u8 { - strikes.insert(strike_peer(80 + i), super::AUDIT_TIMEOUT_STRIKE_MAX); - } - let threshold = adaptive_timeout_threshold(&strikes, &target); - let mut c = 0; - for _ in 0..(threshold + 5) { - c = record_audit_timeout_strike(&mut strikes, &target); - } - assert!( - timeout_strike_reaches_threshold(c, threshold), - "target must still cross the bounded inflated threshold ({c} vs {threshold})" - ); - } - - #[test] - fn lone_timing_out_peer_does_not_inflate_its_own_grace() { - // The peer under judgement is excluded from the median, so a single bad - // peer (the common case) is judged against the base threshold and caught - // — it cannot raise its own bar as its strike count climbs. - let bad = strike_peer(7); - let mut strikes: HashMap = HashMap::new(); - strikes.insert(bad, 5); // its own large count must not count - assert_eq!( - adaptive_timeout_threshold(&strikes, &bad), - config::AUDIT_TIMEOUT_STRIKE_THRESHOLD - ); - } - - #[test] - fn widespread_timeouts_widen_the_grace() { - // Genuine disruption: many OTHER honest peers carry timeout strikes. The - // median rises, so the threshold for any given peer widens beyond the - // base — the audit system does not pile onto a struggling network. - let target = strike_peer(100); - let mut strikes: HashMap = HashMap::new(); - for i in 0..9u8 { - strikes.insert(strike_peer(i), 4); - } - assert_eq!( - adaptive_timeout_threshold(&strikes, &target), - 4 + config::AUDIT_TIMEOUT_STRIKE_THRESHOLD - ); - assert!( - adaptive_timeout_threshold(&strikes, &target) > config::AUDIT_TIMEOUT_STRIKE_THRESHOLD - ); - } - - #[test] - fn adaptive_grace_only_responds_to_timeouts_not_deterministic_failures() { - // The strike map is fed ONLY by timeouts (plan_failed_audit records a - // strike for Timeout and never for confirmed failures). So a flood of - // deterministic failures cannot inflate the median to buy grace. - let target = strike_peer(101); - let mut strikes: HashMap = HashMap::new(); - // Many confirmed (non-timeout) failures: these must NOT touch the map. - for i in 0..9u8 { - let action = plan_failed_audit( - &AuditFailureReason::DigestMismatch, - &mut strikes, - &strike_peer(i), - ); - assert_eq!(action, AuditFailureAction::ConfirmedPenalize); - } - assert!( - strikes.is_empty(), - "deterministic failures must not record strikes" - ); - // Threshold stays at the base — an attacker cannot widen grace by - // failing audits on purpose. - assert_eq!( - adaptive_timeout_threshold(&strikes, &target), - config::AUDIT_TIMEOUT_STRIKE_THRESHOLD - ); - } - // ADR-0002: "occasional surprise exams, keeps load low" — the per-peer // cooldown must collapse a gossip flood into at most one audit per window. @@ -5032,35 +4790,6 @@ mod tests { } } - #[test] - fn inflated_adaptive_threshold_is_still_reachable_and_bounded() { - // When the median lifts the threshold above the base, a genuinely - // non-responsive peer's strike count must still be able to - // reach it (the count is no longer capped at the base). And the grace - // widening itself is bounded so it can't shield a bad node forever. - let target = strike_peer(200); - let mut strikes: HashMap = HashMap::new(); - // A cohort of other peers each at a high strike count. - for i in 0..9u8 { - strikes.insert(strike_peer(i), 10); - } - let threshold = adaptive_timeout_threshold(&strikes, &target); - // Grace is capped, so the threshold cannot exceed base + max grace. - assert!( - threshold <= config::AUDIT_TIMEOUT_STRIKE_THRESHOLD + super::MAX_ADAPTIVE_TIMEOUT_GRACE - ); - assert!(threshold > config::AUDIT_TIMEOUT_STRIKE_THRESHOLD); - // The target peer can accumulate strikes past that inflated threshold. - let mut c = 0; - for _ in 0..threshold + 5 { - c = record_audit_timeout_strike(&mut strikes, &target); - } - assert!( - timeout_strike_reaches_threshold(c, threshold), - "a persistent peer must be able to cross the inflated threshold ({c} vs {threshold})" - ); - } - #[test] fn audit_on_gossip_constants_match_adr() { // Tripwire on the ADR-locked tunables. The spot-check count sits at the @@ -5083,159 +4812,6 @@ mod tests { )); } - // E2E (pure decision): an honest peer that times out once, recovers, - // repeatedly, never reaches a penalty because each success resets strikes. - // FLIPS IF: the strike threshold is removed or success stops resetting. - #[test] - fn e2e_honest_intermittent_timeouts_never_penalized() { - let peer = strike_peer(10); - let base = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD; - let mut strikes: HashMap = HashMap::new(); - for _ in 0..10 { - let after = record_audit_timeout_strike(&mut strikes, &peer); - assert_eq!( - decide_audit_failure_action(&AuditFailureReason::Timeout, after, base), - AuditFailureAction::TimeoutGrace - ); - strikes.remove(&peer); - } - assert!(!strikes.contains_key(&peer)); - } - - // E2E: a peer that times out on EVERY audit (never reset) crosses the - // threshold and is penalized — the deterrent against non-storing peers. - // FLIPS IF: per-challenge window widened so it answers in time, or strikes - // reset without a success. - #[test] - fn e2e_persistent_timeouts_get_penalized() { - let peer = strike_peer(11); - let mut strikes: HashMap = HashMap::new(); - let threshold = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD; - let mut penalized_at = None; - for tick in 1..=(threshold + 2) { - let after = record_audit_timeout_strike(&mut strikes, &peer); - if decide_audit_failure_action(&AuditFailureReason::Timeout, after, threshold) - == AuditFailureAction::TimeoutPenalize - && penalized_at.is_none() - { - penalized_at = Some(tick); - } - } - assert_eq!(penalized_at, Some(threshold)); - } - - // Glue: a Timeout through the real plan_failed_audit MUST record a strike on - // the map AND penalize once enough accumulate. - // FLIPS IF: the handler stops feeding Timeout through the strike counter - // (e.g. strikes_after hard-coded to 0). (Mutation-verified.) - #[test] - fn e2e_glue_timeout_records_strike_and_penalizes_at_threshold() { - let peer = strike_peer(20); - let mut strikes: HashMap = HashMap::new(); - let threshold = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD; - let mut action = AuditFailureAction::TimeoutGrace; - for tick in 1..=threshold { - action = plan_failed_audit(&AuditFailureReason::Timeout, &mut strikes, &peer); - assert_eq!(strikes.get(&peer).copied(), Some(tick)); - } - assert_eq!(action, AuditFailureAction::TimeoutPenalize); - } - - // Glue: a confirmed failure through plan_failed_audit must NOT touch the - // strike map and must return ConfirmedPenalize. - #[test] - fn e2e_glue_confirmed_failure_leaves_strike_map_untouched() { - let peer = strike_peer(21); - let mut strikes: HashMap = HashMap::new(); - for reason in [ - AuditFailureReason::DigestMismatch, - AuditFailureReason::KeyAbsent, - AuditFailureReason::Rejected, - AuditFailureReason::MalformedResponse, - ] { - assert_eq!( - plan_failed_audit(&reason, &mut strikes, &peer), - AuditFailureAction::ConfirmedPenalize - ); - } - assert!(strikes.is_empty()); - } - - // ADR-0002 "Accounting and False Positives", adversarial: a DETERMINISTIC - // failure is acted on the FIRST time it occurs, "regardless of network - // conditions". Here the strike map is pre-loaded with many *other* peers - // timing out, which inflates the adaptive timeout grace to its cap — the - // most forgiving the network ever gets. Under that maximally-relaxed - // window: - // - a brand-new peer's FIRST deterministic failure (DigestMismatch / - // Rejected / MalformedResponse) STILL returns ConfirmedPenalize, never - // a grace lane, and never touches the strike map; while - // - that same peer's FIRST timeout is only TimeoutGrace. - // This proves the inflated grace is the timeout-only lane and can NEVER be - // weaponized to buy a deterministic failure even one round of delay. - // FLIPS IF: deterministic failures start consulting the strike threshold, - // or ConfirmedPenalize is collapsed into a timeout action. - #[test] - fn deterministic_failure_penalizes_first_time_under_inflated_grace() { - let mut strikes: HashMap = HashMap::new(); - // Saturate the adaptive grace: many other peers each carrying a high - // consecutive-timeout count, so the median (and thus the grace) is - // pushed to its MAX cap for any newly-judged peer. - for b in 100..150u8 { - let other = strike_peer(b); - for _ in 0..AUDIT_TIMEOUT_STRIKE_MAX { - record_audit_timeout_strike(&mut strikes, &other); - } - } - let victim = strike_peer(7); - // Sanity: the grace seen by the victim is genuinely inflated above base. - let inflated = adaptive_timeout_threshold(&strikes, &victim); - assert!( - inflated > config::AUDIT_TIMEOUT_STRIKE_THRESHOLD, - "test precondition: grace must be inflated, got {inflated}" - ); - - // First deterministic failure of each kind -> ConfirmedPenalize on - // occurrence #1, and the victim is never inserted into the strike map. - for reason in [ - AuditFailureReason::DigestMismatch, - AuditFailureReason::Rejected, - AuditFailureReason::MalformedResponse, - ] { - let action = plan_failed_audit(&reason, &mut strikes, &victim); - assert_eq!( - action, - AuditFailureAction::ConfirmedPenalize, - "{reason:?} must penalize on the first occurrence regardless of grace" - ); - assert_ne!( - action, - AuditFailureAction::TimeoutPenalize, - "a deterministic failure must NOT be routed through the (eviction-gated) \ - timeout-penalize lane" - ); - assert!( - !strikes.contains_key(&victim), - "deterministic failure must not touch the timeout strike map" - ); - // And it always revokes holder credit / clears the claim. - assert!(audit_failure_revokes_holder_credit(&reason)); - assert!(audit_failure_clears_bootstrap_claim(&reason)); - } - - // The SAME victim's first timeout, under the same inflated grace, is - // only TimeoutGrace (no penalty, no revocation, claim retained). - let timeout_action = plan_failed_audit(&AuditFailureReason::Timeout, &mut strikes, &victim); - assert_eq!(timeout_action, AuditFailureAction::TimeoutGrace); - assert_eq!(strikes.get(&victim).copied(), Some(1)); - assert!(!audit_failure_revokes_holder_credit( - &AuditFailureReason::Timeout - )); - assert!(!audit_failure_clears_bootstrap_claim( - &AuditFailureReason::Timeout - )); - } - /// The exact decision the `Failed` arm of `handle_subtree_audit_result` /// uses: confirmed failures revoke credit, `Timeout` does not. #[test] @@ -5335,7 +4911,7 @@ mod tests { #[test] fn first_failed_key_label_falls_back_when_empty() { - // Should never happen in production (handle_audit_failure rejects + // Should never happen in production (audit failure handling rejects // empty sets), but the formatter must still produce a valid label // so the log line doesn't contain a misleading default. assert_eq!(first_failed_key_label(&[]), "0x"); diff --git a/src/replication/possession.rs b/src/replication/possession.rs new file mode 100644 index 0000000..cab27cf --- /dev/null +++ b/src/replication/possession.rs @@ -0,0 +1,525 @@ +//! Delayed possession verification for fresh replication (ADR-0003). +//! +//! After a node fresh-replicates a chunk, every close-group peer responsible +//! for it is checked 5-15 minutes later for actual possession. The check is a +//! single-key cryptographic +//! [`AuditChallenge`](crate::replication::protocol::AuditChallenge): the probed +//! peer must return `BLAKE3(nonce ‖ peer_id ‖ key ‖ bytes)` computed over the +//! chunk it claims to hold. It cannot produce that digest without the bytes, so +//! — unlike a self-reported presence flag — a peer cannot escape the check by +//! falsely asserting possession. A peer that holds the chunk earns nothing — +//! storing what it was paid to store is the baseline expectation, not +//! meritorious; a peer that returns the absent sentinel, or a digest that does +//! not match the checker's canonical copy (cryptographic proof it lacks the +//! bytes), is penalised at `AuditChallenge` severity. Delivery of the original +//! push is irrelevant: a peer the push never reached is still checked and +//! penalised if it lacks the chunk. +//! +//! A peer unreachable at check time is penalised immediately at audit severity, +//! matching the responsible-chunk `AuditChallenge` path. A matching bootstrap +//! claim uses the shared bootstrap-claim grace/abuse tracker; peer-side +//! malformed, rejected, or mismatched responses are audit failures. + +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use rand::Rng; +use saorsa_core::identity::PeerId; +use saorsa_core::{P2PNode, TrustEvent}; +use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; + +use crate::ant_protocol::XorName; +use crate::logging::{debug, warn}; +use crate::replication::config::{ + ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT, REPLICATION_PROTOCOL_ID, +}; +use crate::replication::protocol::{ + compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage, + ReplicationMessageBody, ABSENT_KEY_DIGEST, +}; +use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState}; +use crate::storage::LmdbStorage; + +use super::REPLICATION_TRUST_WEIGHT; + +/// A possession probe challenges exactly one key, so the per-probe response +/// budget is the audit-response timeout sized for a single chunk. +const POSSESSION_PROBE_KEY_COUNT: usize = 1; + +/// A scheduled possession check for one freshly-replicated chunk. +pub struct PossessionCheckEvent { + /// Content-address of the chunk. + pub key: XorName, + /// Close-group peers responsible for holding it (excludes self). + pub peers: Vec, +} + +/// Verdict of cryptographically probing a single peer for possession of a chunk. +#[cfg_attr(test, derive(Debug, PartialEq, Eq))] +enum ProbeOutcome { + /// Peer returned a digest proving it holds the chunk's bytes. + Present, + /// Peer failed the audit challenge: absent sentinel, digest mismatch, + /// rejection, mismatched challenge ID, wrong digest count, or malformed reply. + Failed, + /// No response (transport error / deadline). Penalised immediately at + /// audit-failure severity. + Timeout, + /// Peer returned a matching bootstrap claim. Graced only through the shared + /// bootstrap-claim tracker. + BootstrapClaim, + /// The probe could not be sent locally. Graced: no penalty. + Inconclusive, +} + +/// Pick a randomised delay in `[min, max]` to wait before a possession check +/// runs. The bounds come from `ReplicationConfig` (defaulting to +/// `POSSESSION_CHECK_DELAY_MIN`/`MAX`) so tests can shorten them. +#[must_use] +pub fn random_delay(min: Duration, max: Duration) -> Duration { + let to_millis = |d: Duration| u64::try_from(d.as_millis()).unwrap_or(u64::MAX); + let min_ms = to_millis(min); + let max_ms = to_millis(max); + if min_ms >= max_ms { + return min; + } + Duration::from_millis(rand::thread_rng().gen_range(min_ms..=max_ms)) +} + +/// Run the possession check for one chunk against every responsible peer. +/// +/// Recomputes the expected audit digest from the checker's own canonical copy +/// of `key`, so the check is meaningful only while the checker still holds the +/// chunk — which it does immediately after accepting and fresh-replicating a +/// PUT. If the checker no longer holds it (e.g. pruned), the check is moot and +/// is skipped without penalising anyone. +/// +/// A peer that fails to prove possession, including by timeout, is penalised at +/// `AuditChallenge` severity immediately. A responsive peer is left unrewarded. +pub(crate) async fn run_possession_check( + key: XorName, + peers: Vec, + p2p_node: &Arc, + storage: &Arc, + config: &ReplicationConfig, + sync_state: &Arc>, + shutdown: &CancellationToken, +) { + let key_hex = hex::encode(key); + + // Read our canonical copy once: the audit digest is recomputed from these + // bytes for every peer (hoisted out of the per-peer loop). If we no longer + // hold the chunk we cannot verify any peer's proof, and we are no longer a + // responsible checker for it — skip without penalising anyone. + let local_bytes = match storage.get_raw(&key).await { + Ok(Some(bytes)) => bytes, + Ok(None) => { + debug!("Possession check: checker no longer holds {key_hex}; skipping"); + return; + } + Err(e) => { + warn!("Possession check: failed to read local {key_hex}: {e}; skipping"); + return; + } + }; + + // Single-key probe budget, matched to the audit response timeout's + // bandwidth-calibrated deadline (tight enough that a relay that must refetch + // the bytes blows it, generous for an honest local-disk read). + let probe_timeout = config.audit_response_timeout(POSSESSION_PROBE_KEY_COUNT); + + for peer in peers { + if shutdown.is_cancelled() { + return; + } + match probe_once(&key, &local_bytes, &peer, p2p_node, probe_timeout).await { + ProbeOutcome::Present => { + debug!("Possession check: {peer} proved possession of {key_hex}"); + clear_possession_bootstrap_claim(&peer, sync_state).await; + } + ProbeOutcome::Failed => { + clear_possession_bootstrap_claim(&peer, sync_state).await; + report_possession_audit_failure( + &peer, + &key_hex, + "failed to prove possession", + p2p_node, + ) + .await; + } + ProbeOutcome::Timeout => { + report_possession_audit_failure(&peer, &key_hex, "timed out", p2p_node).await; + } + ProbeOutcome::BootstrapClaim => { + handle_possession_bootstrap_claim(&peer, &key_hex, p2p_node, config, sync_state) + .await; + } + ProbeOutcome::Inconclusive => { + debug!( + "Possession check: inconclusive probe of {peer} for {key_hex}; not penalised" + ); + } + } + } +} + +async fn clear_possession_bootstrap_claim( + peer: &PeerId, + sync_state: &Arc>, +) { + sync_state.write().await.clear_active_bootstrap_claim(peer); +} + +async fn report_possession_audit_failure( + peer: &PeerId, + key_hex: &str, + reason: &str, + p2p_node: &Arc, +) { + warn!("Possession check: {peer} {reason} for {key_hex}; penalising at audit severity"); + p2p_node + .report_trust_event( + peer, + TrustEvent::ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT), + ) + .await; +} + +async fn handle_possession_bootstrap_claim( + peer: &PeerId, + key_hex: &str, + p2p_node: &Arc, + config: &ReplicationConfig, + sync_state: &Arc>, +) { + let (now, observation) = { + let now = Instant::now(); + let mut state = sync_state.write().await; + ( + now, + state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period), + ) + }; + + match observation { + BootstrapClaimObservation::WithinGrace { .. } => { + debug!( + "Possession check: peer {peer} claims bootstrapping for {key_hex} \ + (within grace period)" + ); + } + BootstrapClaimObservation::PastGrace { first_seen } => { + warn!( + "Possession check: peer {peer} claiming bootstrap for {key_hex} past grace period \ + ({:?} > {:?}), reporting abuse", + now.duration_since(first_seen), + config.bootstrap_claim_grace_period, + ); + p2p_node + .report_trust_event( + peer, + TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT), + ) + .await; + } + BootstrapClaimObservation::Repeated { first_seen } => { + warn!( + "Possession check: peer {peer} repeated bootstrap claim for {key_hex} after \ + previously stopping; first claim was {:?} ago, reporting abuse", + now.duration_since(first_seen), + ); + p2p_node + .report_trust_event( + peer, + TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT), + ) + .await; + } + } +} + +/// Send one single-key cryptographic [`AuditChallenge`] and interpret the +/// response. The peer proves possession by returning +/// `compute_audit_digest(nonce, peer, key, bytes)`; absence is proven by the +/// [`ABSENT_KEY_DIGEST`] sentinel or any digest that does not match the +/// checker's canonical copy. A transport failure / deadline is a `Timeout`; a +/// matching bootstrap response is a `BootstrapClaim`; a local encode failure is +/// `Inconclusive`; peer-side malformed, rejected, or mismatched replies are +/// `Failed`. +async fn probe_once( + key: &XorName, + local_bytes: &[u8], + peer: &PeerId, + p2p_node: &Arc, + probe_timeout: Duration, +) -> ProbeOutcome { + // Fresh nonce per probe so a stored digest cannot be replayed, and bind the + // challenge to this peer's identity so it cannot relay another node's proof. + let (nonce, challenge_id) = { + let mut rng = rand::thread_rng(); + let nonce: [u8; 32] = rng.gen(); + let challenge_id: u64 = rng.gen(); + (nonce, challenge_id) + }; + let challenge = AuditChallenge { + challenge_id, + nonce, + challenged_peer_id: *peer.as_bytes(), + keys: vec![*key], + }; + let msg = ReplicationMessage { + request_id: challenge_id, + body: ReplicationMessageBody::AuditChallenge(challenge), + }; + let Ok(encoded) = msg.encode() else { + warn!( + "Failed to encode possession challenge for {}", + hex::encode(key) + ); + return ProbeOutcome::Inconclusive; + }; + + let response = match p2p_node + .send_request(peer, REPLICATION_PROTOCOL_ID, encoded, probe_timeout) + .await + { + Ok(response) => response, + Err(e) => { + debug!("Possession probe to {peer} got no response: {e}"); + return ProbeOutcome::Timeout; + } + }; + + let decoded = match ReplicationMessage::decode(&response.data) { + Ok(decoded) => decoded, + Err(e) => { + debug!("Failed to decode possession response from {peer}: {e}"); + return ProbeOutcome::Failed; + } + }; + + let ReplicationMessageBody::AuditResponse(resp) = decoded.body else { + debug!("Unexpected possession response type from {peer}"); + return ProbeOutcome::Failed; + }; + + interpret_audit_response( + key, + local_bytes, + peer.as_bytes(), + &nonce, + challenge_id, + resp, + ) +} + +/// Classify an [`AuditResponse`] into a possession verdict. Pure (no I/O): the +/// digest is verified against `local_bytes`, the checker's canonical copy. +fn interpret_audit_response( + key: &XorName, + local_bytes: &[u8], + challenged_peer_id: &[u8; 32], + nonce: &[u8; 32], + challenge_id: u64, + response: AuditResponse, +) -> ProbeOutcome { + match response { + AuditResponse::Digests { + challenge_id: resp_id, + digests, + } => { + if resp_id != challenge_id || digests.len() != 1 { + return ProbeOutcome::Failed; + } + let received = digests[0]; + if received == ABSENT_KEY_DIGEST { + return ProbeOutcome::Failed; + } + let expected = compute_audit_digest(nonce, challenged_peer_id, key, local_bytes); + if received == expected { + ProbeOutcome::Present + } else { + // A non-sentinel digest that does not match our canonical bytes + // proves the peer cannot reproduce the content — treat as absent + // (matches the audit's DigestMismatch handling). + ProbeOutcome::Failed + } + } + AuditResponse::Bootstrapping { + challenge_id: resp_id, + } => { + if resp_id == challenge_id { + ProbeOutcome::BootstrapClaim + } else { + ProbeOutcome::Failed + } + } + AuditResponse::Rejected { .. } => ProbeOutcome::Failed, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::replication::config::{POSSESSION_CHECK_DELAY_MAX, POSSESSION_CHECK_DELAY_MIN}; + + const PEER_ID: [u8; 32] = [0x42; 32]; + const NONCE: [u8; 32] = [0x7a; 32]; + const CHALLENGE_ID: u64 = 0xDEAD_BEEF; + const KEY: XorName = [0x11; 32]; + const BYTES: &[u8] = b"possession-check payload"; + + fn digests_response(challenge_id: u64, digests: Vec<[u8; 32]>) -> AuditResponse { + AuditResponse::Digests { + challenge_id, + digests, + } + } + + #[test] + fn random_delay_is_within_bounds() { + for _ in 0..100 { + let d = random_delay(POSSESSION_CHECK_DELAY_MIN, POSSESSION_CHECK_DELAY_MAX); + assert!(d >= POSSESSION_CHECK_DELAY_MIN); + assert!(d <= POSSESSION_CHECK_DELAY_MAX); + } + } + + #[test] + fn matching_digest_is_present() { + let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES); + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + digests_response(CHALLENGE_ID, vec![valid]), + ); + assert_eq!(verdict, ProbeOutcome::Present); + } + + #[test] + fn absent_sentinel_is_failed() { + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + digests_response(CHALLENGE_ID, vec![ABSENT_KEY_DIGEST]), + ); + assert_eq!(verdict, ProbeOutcome::Failed); + } + + #[test] + fn forged_digest_is_failed() { + // A peer that lacks the bytes cannot compute the right digest; whatever + // non-sentinel value it sends must not match our canonical copy. + let forged = [0x99; 32]; + let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES); + assert_ne!(forged, valid, "test fixture must use a wrong digest"); + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + digests_response(CHALLENGE_ID, vec![forged]), + ); + assert_eq!(verdict, ProbeOutcome::Failed); + } + + #[test] + fn mismatched_challenge_id_is_failed() { + let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES); + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + digests_response(CHALLENGE_ID.wrapping_add(1), vec![valid]), + ); + assert_eq!(verdict, ProbeOutcome::Failed); + } + + #[test] + fn wrong_arity_is_failed() { + let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES); + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + digests_response(CHALLENGE_ID, vec![valid, ABSENT_KEY_DIGEST]), + ); + assert_eq!(verdict, ProbeOutcome::Failed); + } + + #[test] + fn bootstrapping_is_bootstrap_claim() { + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + AuditResponse::Bootstrapping { + challenge_id: CHALLENGE_ID, + }, + ); + assert_eq!(verdict, ProbeOutcome::BootstrapClaim); + } + + #[test] + fn bootstrapping_with_wrong_challenge_id_is_failed() { + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + AuditResponse::Bootstrapping { + challenge_id: CHALLENGE_ID.wrapping_add(1), + }, + ); + assert_eq!(verdict, ProbeOutcome::Failed); + } + + #[tokio::test] + async fn possession_success_clears_active_bootstrap_claim_but_keeps_history() { + let peer = PeerId::from_bytes(PEER_ID); + let sync_state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(Vec::new()))); + { + let mut state = sync_state.write().await; + let now = Instant::now(); + state.bootstrap_claims.insert(peer, now); + state.bootstrap_claim_history.insert(peer, now); + } + + clear_possession_bootstrap_claim(&peer, &sync_state).await; + + let state = sync_state.read().await; + assert!(!state.bootstrap_claims.contains_key(&peer)); + assert!(state.bootstrap_claim_history.contains_key(&peer)); + } + + #[test] + fn rejected_is_failed() { + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + AuditResponse::Rejected { + challenge_id: CHALLENGE_ID, + reason: "nope".to_string(), + }, + ); + assert_eq!(verdict, ProbeOutcome::Failed); + } +} diff --git a/src/replication/protocol.rs b/src/replication/protocol.rs index 5058bab..1181de5 100644 --- a/src/replication/protocol.rs +++ b/src/replication/protocol.rs @@ -438,11 +438,9 @@ pub enum SubtreeAuditResponse { /// peer would not prove without touching credit it legitimately re-earned for a /// newer commitment. Lying therefore does not let a deleter keep "proven holder" /// status for that root until the credit TTL — the loophole a plain timeout -/// would leave. It also accumulates a timeout strike, so a peer -/// that self-graces on every audit still crosses the strike threshold once -/// timeout-eviction is enabled. An honest peer that genuinely rotated simply -/// re-earns credit on the next audit of its current commitment, so the grace -/// strips no peer that is actually holding its responsible data. The grace +/// would leave. A peer that self-graces on every audit remains uncredited for the +/// pinned commitments it refuses to prove; an honest peer that genuinely rotated +/// simply re-earns credit on the next audit of its current commitment. The grace /// removes only the false TRUST PENALTY for the genuinely-ambiguous /// rotated/transient case; it does not remove the possession requirement. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -642,7 +640,7 @@ mod tests { // The auditor batches its round-2 sample to MAX_BYTE_CHALLENGE_KEYS per // challenge precisely so this worst case — every requested chunk at // MAX_CHUNK_SIZE — still encodes. If this fails, honest responders - // would hit encode errors and be penalized as timeouts. + // would hit encode errors and fail otherwise valid byte challenges. let items: Vec = (0..crate::replication::config::MAX_BYTE_CHALLENGE_KEYS) .map(|i| SubtreeByteItem::Present { key: [u8::try_from(i).unwrap_or(u8::MAX); 32], diff --git a/src/replication/pruning.rs b/src/replication/pruning.rs index 9727836..e1e0d3f 100644 --- a/src/replication/pruning.rs +++ b/src/replication/pruning.rs @@ -1154,23 +1154,10 @@ async fn peer_proves_record( let encoded = encode_prune_audit_challenge(&peer, key, challenge_id, nonce)?; let Some(decoded) = send_prune_audit_challenge(&peer, &key, encoded, p2p_node, config).await else { - // No decoded response means a timeout or an undecodable reply — the - // same "no response" case the main audit path treats as a timeout. The - // penalty is gated behind `TIMEOUT_EVICTION_ENABLED` (off this - // release — a not-yet-upgraded or briefly-slow peer must not be evicted - // by a no-response during the breaking rollout). Mirrors the suppressed - // timeout penalty in handle_failed_audit; only a DECODED - // PruneAuditStatus::Failed below (a peer that answered with bad/absent - // bytes) is penalised regardless of the gate. - if crate::replication::config::TIMEOUT_EVICTION_ENABLED { - report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await; - } else { - debug!( - "Prune audit for {peer} key {} got no decodable response \ - (eviction disabled this release — not penalising)", - hex::encode(key) - ); - } + // No decoded response means a timeout or malformed reply. Prune + // confirmation reuses `AuditChallenge` semantics, so this is an immediate + // audit failure just like a decoded bad proof below. + report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await; return None; }; diff --git a/src/storage/handler.rs b/src/storage/handler.rs index 586b517..13c52d0 100644 --- a/src/storage/handler.rs +++ b/src/storage/handler.rs @@ -38,13 +38,27 @@ use crate::client::compute_address; use crate::error::{Error, Result}; use crate::logging::{debug, info, warn}; use crate::payment::{PaymentVerifier, QuoteGenerator, VerificationContext}; +use crate::replication::admission; +use crate::replication::config::K_BUCKET_SIZE; use crate::replication::fresh::FreshWriteEvent; use crate::storage::lmdb::LmdbStorage; use bytes::Bytes; +use parking_lot::RwLock; use saorsa_core::P2PNode; use std::sync::Arc; use tokio::sync::mpsc; +/// Width of the self-closeness gate on client PUTs (ADR-0003): a node accepts +/// a PUT only when it is within its own local `SELF_CLOSENESS_GATE_WIDTH` +/// closest peers to the address. +/// +/// Set to the client's PUT fallback ceiling (`K_BUCKET_SIZE`), wider than the +/// storage-admission width, so a client routing past full close-group members +/// onto further peers (ADR-0002) is still accepted here, while a genuinely far +/// node — which could only mis-attribute fresh-replication failures — is +/// turned away. +const SELF_CLOSENESS_GATE_WIDTH: usize = K_BUCKET_SIZE; + /// ANT protocol handler. /// /// Handles chunk PUT/GET/Quote requests using LMDB storage for persistence @@ -59,6 +73,10 @@ pub struct AntProtocol { quote_generator: Arc, /// Channel for notifying the replication engine about newly-stored chunks. fresh_write_tx: Option>, + /// The node's P2P handle, attached post-construction via + /// `attach_p2p_node`. Drives the self-closeness gate on client PUTs; + /// `None` in unit tests that never attach a node. + p2p_node: RwLock>>, } impl AntProtocol { @@ -90,6 +108,7 @@ impl AntProtocol { payment_verifier, quote_generator, fresh_write_tx: None, + p2p_node: RwLock::new(None), } } @@ -99,8 +118,9 @@ impl AntProtocol { /// checks can use the live routing view. Idempotent: calling twice /// replaces the verifier handle. pub fn attach_p2p_node(&self, node: Arc) { + *self.p2p_node.write() = Some(Arc::clone(&node)); self.payment_verifier.attach_p2p_node(node); - debug!("AntProtocol: P2PNode attached for payment live-DHT checks"); + debug!("AntProtocol: P2PNode attached for payment live-DHT checks and self-closeness gate"); } /// Set the channel sender for fresh-write replication events. @@ -280,9 +300,28 @@ impl AntProtocol { return ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string())); } + // Self-closeness gate (ADR-0003): accept a client PUT only when this + // node is within its own local closest view of the address, so the + // fresh replication it triggers is legitimate and cannot mis-penalise + // honest peers. The width is the client's PUT fallback ceiling + // (`SELF_CLOSENESS_GATE_WIDTH`), so a client routing past full + // close-group members onto further peers is still accepted here. + // Skipped when no P2P handle is attached (unit tests). Bind the handle + // out of the lock first so no guard is held across the `.await`. + let attached = self.p2p_node.read().as_ref().map(Arc::clone); + if let Some(p2p) = attached { + let self_id = *p2p.peer_id(); + if !admission::is_responsible(&self_id, &address, &p2p, SELF_CLOSENESS_GATE_WIDTH).await + { + debug!("Rejecting PUT for {addr_hex}: not within local closest peers"); + return ChunkPutResponse::Error(ProtocolError::StorageFailed( + "node is not within its local closest peers for this address".to_string(), + )); + } + } + // 5. Verify payment. The ClientPut context applies the store-strength - // payment cache and verifies live proofs. Direct client PUT does not - // reject based on this node's local storage-responsibility view. + // payment cache and verifies live proofs. let payment_result = self .payment_verifier .verify_payment( diff --git a/tests/e2e/replication.rs b/tests/e2e/replication.rs index eb8ee49..df9e05a 100644 --- a/tests/e2e/replication.rs +++ b/tests/e2e/replication.rs @@ -5,11 +5,12 @@ #![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] +use super::testnet::TestNetworkConfig; use super::TestHarness; use ant_node::client::compute_address; use ant_node::replication::commitment_state::{BuiltCommitment, ResponderCommitmentState}; use ant_node::replication::config::{ - storage_admission_width, REPAIR_HINT_MIN_AGE, REPLICATION_PROTOCOL_ID, + storage_admission_width, K_BUCKET_SIZE, REPAIR_HINT_MIN_AGE, REPLICATION_PROTOCOL_ID, }; use ant_node::replication::protocol::{ compute_audit_digest, AuditChallenge, AuditResponse, FetchRequest, FetchResponse, @@ -32,6 +33,20 @@ use tokio::sync::RwLock; const PROPAGATION_TIMEOUT: Duration = Duration::from_secs(15); /// Interval between propagation poll checks. const PROPAGATION_POLL_INTERVAL: Duration = Duration::from_millis(200); +/// Checker node used by the full-node shunning regression. +const FULL_NODE_SHUN_CHECKER_INDEX: usize = 3; +/// Target node made disk-full by the full-node shunning regression. +const FULL_NODE_SHUN_TARGET_INDEX: usize = 4; +/// Search budget for finding a key whose close group contains the full node. +const FULL_NODE_SHUN_KEY_SEARCH_ATTEMPTS: usize = 10_000; +/// Fast lower bound for the full-node shunning scheduler check. +const FULL_NODE_SHUN_POSSESSION_DELAY_MIN: Duration = Duration::from_millis(200); +/// Fast upper bound for the full-node shunning scheduler check. +const FULL_NODE_SHUN_POSSESSION_DELAY_MAX: Duration = Duration::from_millis(500); +/// Dummy proof length used when a test only needs to reach pre-payment gates. +const DUMMY_PAYMENT_PROOF_LEN: usize = 64; +/// Dummy proof byte used when a test only needs to reach pre-payment gates. +const DUMMY_PAYMENT_PROOF_BYTE: u8 = 0x01; /// Send a replication request via saorsa-core's request-response mechanism /// and decode the response. @@ -252,6 +267,428 @@ async fn test_fresh_replication_propagates_to_close_group() { harness.teardown().await.expect("teardown"); } +/// ADR-0003: the delayed possession check penalises a responsible peer that +/// does NOT hold the chunk, and leaves a peer that DOES hold it unpenalised. +/// +/// Drives the check directly (`run_possession_check_now`, bypassing the 5-15 +/// minute settle delay) so the detection+penalty path is asserted +/// deterministically over real transport. The penalty is observed as a drop in +/// the checker's trust score for the absent peer (the same signal saorsa-core +/// eviction acts on), via `P2PNode::peer_trust`. +#[tokio::test] +#[serial] +async fn possession_check_penalises_absent_peer_only() { + let harness = TestHarness::setup_small().await.expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + // A is the checker; B will be absent, C will hold the chunk. All three are + // regular nodes (idx >= 3) with running replication engines and storage. + let a = harness.test_node(3).expect("node a"); + let b = harness.test_node(4).expect("node b"); + let c = harness.test_node(5).expect("node c"); + + let p2p_a = a.p2p_node.as_ref().expect("p2p a"); + let engine_a = a.replication_engine.as_ref().expect("engine a"); + let peer_b = *b.p2p_node.as_ref().expect("p2p b").peer_id(); + let peer_c = *c.p2p_node.as_ref().expect("p2p c").peer_id(); + + let content = b"adr-0003 possession-check payload"; + let address = compute_address(content); + + // The checker A must hold the chunk it verifies: the possession check + // recomputes the audit digest from its own canonical copy. In production the + // PUT handler stores K before fresh-replicating; here we store it on the + // checker explicitly. + a.ant_protocol + .as_ref() + .expect("proto a") + .storage() + .put(&address, content) + .await + .expect("put on a (checker)"); + + // C holds the chunk; B never stores it. + c.ant_protocol + .as_ref() + .expect("proto c") + .storage() + .put(&address, content) + .await + .expect("put on c"); + + assert!( + !b.ant_protocol + .as_ref() + .expect("proto b") + .storage() + .exists(&address) + .expect("exists b"), + "precondition: B must not hold the chunk" + ); + assert!( + c.ant_protocol + .as_ref() + .expect("proto c") + .storage() + .exists(&address) + .expect("exists c"), + "precondition: C must hold the chunk" + ); + + let trust_b_before = p2p_a.peer_trust(&peer_b); + let trust_c_before = p2p_a.peer_trust(&peer_c); + + // Probe both peers now (no scheduler delay). B is absent -> penalised; C is + // present -> untouched. + engine_a + .run_possession_check_now(address, vec![peer_b, peer_c]) + .await; + + let trust_b_after = p2p_a.peer_trust(&peer_b); + let trust_c_after = p2p_a.peer_trust(&peer_c); + + assert!( + trust_b_after < trust_b_before, + "absent peer B must be penalised: {trust_b_before} -> {trust_b_after}" + ); + assert!( + trust_c_after >= trust_c_before - f64::EPSILON, + "present peer C must not be penalised: {trust_c_before} -> {trust_c_after}" + ); + + harness.teardown().await.expect("teardown"); +} + +/// ADR-0003: the possession-check *scheduler* (not the direct-drive path) +/// fires after the configured delay and penalises an absent close peer. +/// +/// Uses a shortened possession delay so the scheduled check runs in well under +/// a second. No payment cache is pre-populated, so the close-group peers reject +/// the fresh offer and are absent when the scheduled check probes them. Proves +/// the `replicate_fresh` -> enqueue -> delayed scheduler -> penalty wiring. +#[tokio::test] +#[serial] +async fn possession_scheduler_penalises_absent_close_peer_after_delay() { + let mut net_config = TestNetworkConfig::small(); + net_config.replication_config = Some(ReplicationConfig { + possession_check_delay_min: Duration::from_millis(200), + possession_check_delay_max: Duration::from_millis(500), + ..ReplicationConfig::default() + }); + let harness = TestHarness::setup_with_config(net_config) + .await + .expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + let a = harness.test_node(3).expect("node a"); + let p2p_a = a.p2p_node.as_ref().expect("p2p a"); + let engine_a = a.replication_engine.as_ref().expect("engine a"); + let self_a = *p2p_a.peer_id(); + + let content = b"adr-0003 scheduler-wiring payload"; + let address = compute_address(content); + + // A's close group for this key = exactly the peers the scheduled possession + // check targets. With no payment cache anywhere, they reject the fresh offer + // and are absent when probed. + let close_group_size = ReplicationConfig::default().close_group_size; + let close_group: Vec = p2p_a + .dht_manager() + .find_closest_nodes_local_with_self(&address, close_group_size) + .await + .iter() + .filter(|n| n.peer_id != self_a) + .map(|n| n.peer_id) + .collect(); + assert!(!close_group.is_empty(), "expected a non-empty close group"); + + let trust_before: Vec = close_group.iter().map(|p| p2p_a.peer_trust(p)).collect(); + + // The checker must hold the chunk it later probes for: the possession check + // recomputes the audit digest from its own copy. `replicate_fresh` assumes + // the PUT handler already stored K locally, so store it on the checker here. + a.ant_protocol + .as_ref() + .expect("proto a") + .storage() + .put(&address, content) + .await + .expect("put on a (checker)"); + + // Trigger fresh replication; the engine enqueues the possession check, which + // fires ~200-500 ms later and penalises the absent close peers. + let dummy_pop = [0x01u8; 64]; + engine_a + .replicate_fresh(&address, content, &dummy_pop) + .await; + + // Poll until at least one absent close peer is penalised (trust drops). + let deadline = tokio::time::Instant::now() + PROPAGATION_TIMEOUT; + let mut penalised = false; + while tokio::time::Instant::now() < deadline { + penalised = close_group + .iter() + .zip(trust_before.iter()) + .any(|(peer, &before)| p2p_a.peer_trust(peer) < before - f64::EPSILON); + if penalised { + break; + } + tokio::time::sleep(PROPAGATION_POLL_INTERVAL).await; + } + assert!( + penalised, + "the scheduled possession check should have penalised an absent close peer" + ); + + harness.teardown().await.expect("teardown"); +} + +/// ADR-0003 full-node shunning: a close-group peer that is disk-full rejects a +/// fresh-replication offer before payment verification, remains absent for the +/// key, and is penalised when the checker probes possession. +/// +/// This bridges the two protections that make a full node get shunned by close +/// groups: capacity rejection creates a missing replica, and the delayed +/// possession-check verdict turns that absence into the trust signal that +/// saorsa-core eviction acts on. +#[tokio::test] +#[serial] +async fn full_close_group_node_rejects_replica_and_is_penalised_as_absent() { + let mut net_config = TestNetworkConfig::small(); + net_config.replication_config = Some(ReplicationConfig { + possession_check_delay_min: FULL_NODE_SHUN_POSSESSION_DELAY_MIN, + possession_check_delay_max: FULL_NODE_SHUN_POSSESSION_DELAY_MAX, + ..ReplicationConfig::default() + }); + net_config + .storage_disk_reserve_overrides + .insert(FULL_NODE_SHUN_TARGET_INDEX, u64::MAX); + let harness = TestHarness::setup_with_config(net_config) + .await + .expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + let checker = harness + .test_node(FULL_NODE_SHUN_CHECKER_INDEX) + .expect("checker node"); + let full_node = harness + .test_node(FULL_NODE_SHUN_TARGET_INDEX) + .expect("full node"); + let checker_p2p = checker.p2p_node.as_ref().expect("checker p2p"); + let checker_engine = checker.replication_engine.as_ref().expect("checker engine"); + let full_p2p = full_node.p2p_node.as_ref().expect("full node p2p"); + let full_peer = *full_p2p.peer_id(); + + let close_group_size = ReplicationConfig::default().close_group_size; + let admission_width = storage_admission_width(close_group_size); + let mut candidate = None; + for attempt in 0..FULL_NODE_SHUN_KEY_SEARCH_ATTEMPTS { + let content = format!("adr-0003 full-node shunning payload {attempt}").into_bytes(); + let address = compute_address(&content); + let full_node_in_checker_close_group = checker_p2p + .dht_manager() + .find_closest_nodes_local_with_self(&address, close_group_size) + .await + .iter() + .any(|node| node.peer_id == full_peer); + if !full_node_in_checker_close_group { + continue; + } + + let full_node_admits_self = full_p2p + .dht_manager() + .find_closest_nodes_local_with_self(&address, admission_width) + .await + .iter() + .any(|node| node.peer_id == full_peer); + if full_node_admits_self { + candidate = Some((content, address)); + break; + } + } + let (content, address) = + candidate.expect("find key where full node is a responsible close-group peer"); + + for idx in 0..harness.node_count() { + if let Some(protocol) = harness + .test_node(idx) + .and_then(|node| node.ant_protocol.as_ref()) + { + protocol.payment_verifier().cache_insert(address); + } + } + + let dummy_payment_proof = vec![DUMMY_PAYMENT_PROOF_BYTE; DUMMY_PAYMENT_PROOF_LEN]; + let offer = FreshReplicationOffer { + key: address, + data: content.clone(), + proof_of_payment: dummy_payment_proof.clone(), + }; + let response = send_replication_request( + checker_p2p, + &full_peer, + ReplicationMessage { + request_id: rand::random(), + body: ReplicationMessageBody::FreshReplicationOffer(offer), + }, + PROPAGATION_TIMEOUT, + ) + .await; + + match response.body { + ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected { + key, + reason, + }) => { + assert_eq!(key, address); + assert!( + reason.contains("Insufficient disk space"), + "expected disk-full rejection, got: {reason}" + ); + } + other => panic!("expected disk-full rejection, got: {other:?}"), + } + + let full_storage = full_node + .ant_protocol + .as_ref() + .expect("full node protocol") + .storage(); + assert!( + !full_storage.exists(&address).expect("exists on full node"), + "full node must not store the rejected replica" + ); + + // The checker must hold the chunk it probes for: the possession check + // recomputes the audit digest from its own copy. `replicate_fresh` assumes + // the PUT handler already stored K locally, so store it on the checker here. + checker + .ant_protocol + .as_ref() + .expect("checker protocol") + .storage() + .put(&address, &content) + .await + .expect("put on checker"); + + let trust_before = checker_p2p.peer_trust(&full_peer); + checker_engine + .replicate_fresh(&address, &content, &dummy_payment_proof) + .await; + + let deadline = tokio::time::Instant::now() + PROPAGATION_TIMEOUT; + let mut trust_after = trust_before; + while tokio::time::Instant::now() < deadline { + trust_after = checker_p2p.peer_trust(&full_peer); + if trust_after < trust_before - f64::EPSILON { + break; + } + tokio::time::sleep(PROPAGATION_POLL_INTERVAL).await; + } + assert!( + trust_after < trust_before - f64::EPSILON, + "full close-group peer should be shunned by the scheduled possession check: \ + {trust_before} -> {trust_after}" + ); + + harness.teardown().await.expect("teardown"); +} + +/// ADR-0003 self-closeness gate: a node accepts a client PUT only when it is +/// within its own local `K_BUCKET_SIZE`-closest to the address. +/// +/// Needs more than `K_BUCKET_SIZE` nodes so a far node falls outside its own +/// closest view. The responsible (closest) node accepts and stores; the +/// non-responsible (farthest) node rejects before payment with a closeness +/// error. Guards both the regression risk (gate must not reject responsible +/// puts) and the intended reject behaviour. +#[tokio::test] +#[serial] +async fn self_closeness_gate_accepts_responsible_rejects_far_node() { + let harness = TestHarness::setup().await.expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + let content = b"adr-0003 self-closeness gate payload"; + let address = compute_address(content); + + // Rank all peers by XOR distance to the address (identical from any node's + // view once routing tables are warm). + let ranker_p2p = harness + .test_node(3) + .expect("ranker") + .p2p_node + .as_ref() + .expect("p2p"); + let ranked: Vec = ranker_p2p + .dht_manager() + .find_closest_nodes_local_with_self(&address, harness.node_count()) + .await + .iter() + .map(|n| n.peer_id) + .collect(); + assert!( + ranked.len() > K_BUCKET_SIZE, + "need > K_BUCKET_SIZE nodes to exercise the reject path; got {}", + ranked.len() + ); + + // Only nodes that actually run a protocol handler can serve a client PUT. + let has_protocol = |peer: &PeerId| { + node_index_for_peer(&harness, peer) + .and_then(|idx| harness.test_node(idx)) + .is_some_and(|n| n.ant_protocol.is_some()) + }; + + // Closest node with a handler -> within its own K-closest -> gate accepts. + let close_peer = ranked + .iter() + .copied() + .find(|p| has_protocol(p)) + .expect("a close node with a handler"); + // Farthest node beyond the gate width with a handler -> gate rejects. + let far_peer = ranked + .iter() + .copied() + .enumerate() + .rev() + .find(|(rank, p)| *rank >= K_BUCKET_SIZE && has_protocol(p)) + .map(|(_, p)| p) + .expect("a far node (rank >= K_BUCKET_SIZE) with a handler"); + + let close_idx = node_index_for_peer(&harness, &close_peer).expect("close idx"); + let far_idx = node_index_for_peer(&harness, &far_peer).expect("far idx"); + + // Accept path: a responsible node stores the chunk (gate passes). + let close_result = harness + .test_node(close_idx) + .expect("close node") + .store_chunk(content) + .await; + assert!( + close_result.is_ok(), + "responsible (closest) node must accept the PUT, got {close_result:?}" + ); + + // Reject path: a non-responsible node rejects before payment with a + // closeness error. + let far_result = harness + .test_node(far_idx) + .expect("far node") + .store_chunk(content) + .await; + assert!( + far_result.is_err(), + "non-responsible (farthest) node must reject the PUT" + ); + let err = format!("{}", far_result.expect_err("far rejection")); + assert!( + err.contains("closest"), + "rejection should cite closeness, got: {err}" + ); + + harness.teardown().await.expect("teardown"); +} + /// `PaidForList` persistence (Section 18 #43). /// /// Insert a key into the `PaidList`, verify it persists by reopening the diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index 6b5ae84..3eb19a8 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -35,6 +35,7 @@ use saorsa_core::{ identity::NodeIdentity, IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, }; +use std::collections::HashMap; use std::net::{Ipv4Addr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; @@ -222,6 +223,17 @@ pub struct TestNetworkConfig { /// this network (e.g. Anvil testnet) for on-chain verification. /// When `None`, defaults to `ArbitrumOne`. pub evm_network: Option, + + /// Optional replication-config override applied to every node's + /// replication engine. `None` uses `ReplicationConfig::default()`. Tests + /// use this to shorten timers — e.g. the ADR-0003 possession-check delay. + pub replication_config: Option, + + /// Optional per-node storage disk-reserve overrides. + /// + /// Tests can set a node's reserve above available disk space to make its + /// capacity pre-check fail deterministically without filling the host disk. + pub storage_disk_reserve_overrides: HashMap, } impl Default for TestNetworkConfig { @@ -255,6 +267,8 @@ impl Default for TestNetworkConfig { enable_node_logging: false, payment_enforcement: false, evm_network: None, + replication_config: None, + storage_disk_reserve_overrides: HashMap::new(), } } } @@ -1040,9 +1054,19 @@ impl TestNetwork { })?); // Initialize AntProtocol for this node with payment enforcement setting - let ant_protocol = - Self::create_ant_protocol(&data_dir, self.config.evm_network.clone(), &identity) - .await?; + let storage_disk_reserve = self + .config + .storage_disk_reserve_overrides + .get(&index) + .copied() + .unwrap_or_default(); + let ant_protocol = Self::create_ant_protocol_with_disk_reserve( + &data_dir, + self.config.evm_network.clone(), + storage_disk_reserve, + &identity, + ) + .await?; Ok(TestNode { index, @@ -1080,10 +1104,25 @@ impl TestNetwork { data_dir: &std::path::Path, evm_network: Option, identity: &saorsa_core::identity::NodeIdentity, + ) -> Result { + Self::create_ant_protocol_with_disk_reserve(data_dir, evm_network, 0, identity).await + } + + /// Create an `AntProtocol` handler with an explicit storage disk reserve. + /// + /// # Errors + /// + /// Returns an error if LMDB storage initialisation fails. + pub async fn create_ant_protocol_with_disk_reserve( + data_dir: &std::path::Path, + evm_network: Option, + disk_reserve: u64, + identity: &saorsa_core::identity::NodeIdentity, ) -> Result { // Create LMDB storage let storage_config = LmdbStorageConfig { root_dir: data_dir.to_path_buf(), + disk_reserve, ..LmdbStorageConfig::test_default() }; let storage = LmdbStorage::new(storage_config) @@ -1284,7 +1323,7 @@ impl TestNetwork { (&node.p2p_node, &node.ant_protocol, &node.node_identity) { let shutdown = CancellationToken::new(); - let repl_config = ReplicationConfig::default(); + let repl_config = self.config.replication_config.clone().unwrap_or_default(); let (_fresh_tx, fresh_rx) = tokio::sync::mpsc::unbounded_channel(); let node_identity = Arc::clone(id); match ReplicationEngine::new(