From 1187a62171c92be031a1af050c248855145793ac Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 12:11:36 +0200 Subject: [PATCH 01/12] docs(adr): add ADR-0003 full-node detection, penalisation, and eviction Node-side role in the full-node-shunning plan (full == dishonest == evict). After fresh replication, the node tests every responsible close-group peer 5-15 min later for possession; storing earns no positive trust, while not storing is penalised at AuditChallenge severity. Delivery of the push is best-effort (up to 2 retries/peer) but never an exemption: undelivered peers are still tested and penalised alike. Adds a self-closeness gate on client puts, coupled so its width stays >= the client fallback ceiling. Trust-score eviction is already implemented in saorsa-core; this ADR only emits the signal. Co-Authored-By: Claude Opus 4.8 (1M context) --- ...R-0003-full-node-detection-and-eviction.md | 216 ++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 docs/adr/ADR-0003-full-node-detection-and-eviction.md diff --git a/docs/adr/ADR-0003-full-node-detection-and-eviction.md b/docs/adr/ADR-0003-full-node-detection-and-eviction.md new file mode 100644 index 0000000..9a8aa55 --- /dev/null +++ b/docs/adr/ADR-0003-full-node-detection-and-eviction.md @@ -0,0 +1,216 @@ +# ADR-0003: Full-node detection, penalisation, and eviction + +- **Status:** Proposed +- **Date:** 2026-06-25 +- **Decision owners:** Mick +- **Reviewers:** +- **Supersedes:** none +- **Superseded by:** none +- **Related:** ant-client ADR-0002 (client-side fallback and diagnostics); ADR-0002 (gossip-triggered storage-commitment audit — shares the trust/eviction path); saorsa-core trust-score eviction (the enforcement layer — already implemented, no change required) + +## Context + +A network design axiom frames this whole ADR: **a full node and a dishonest node are +treated identically.** A node that cannot accept puts is an unhealthy close-group +member and must be evicted from routing; the close group must always be healthy and +accept puts. This holds **only while fullness is a minority within any +neighbourhood** — there must always be healthy peers holding the data when a full one +is shed. A globally near-capacity network is the explicit failure boundary, where both +eviction (nothing healthy to shift to) and client fallback (no willing acceptor) +degrade together. Every mechanism below is designed for the minority-full regime and +must degrade gracefully — not cascade-evict — when that assumption breaks. + +Verified current behaviour: + +- A node rejects a put when its disk is full with a **distinct** + `ProtocolError::StorageFailed`, *before* payment verification + (`src/storage/handler.rs:274-281`; `src/storage/lmdb.rs:599-621`). +- A direct client PUT does **not** reject on the node's own storage-responsibility + view (`src/storage/handler.rs:283-285`). Acceptance is bounded only *indirectly* by + the issuer-in-local-20-closest test (`src/payment/verifier.rs:942-1003`; + `PAID_QUOTE_ISSUER_CLOSENESS_WIDTH = K_BUCKET_SIZE = 20`). +- Fresh replication requires a proof of payment **and** enforces closeness via + `admission::is_responsible(... storage_admission_width = close_group + margin)`, + reusing the same `ClientPut` verification path + (`src/replication/mod.rs:1902-1916, 1987-2007, 2035-2048`). +- ADR-0002 established the principle that misbehaviour must be **attributable** before + eviction is enabled. + +The gap: nothing currently turns "a close peer is full" into an attributable penalty, +and nothing stops a non-close node from accepting a client put and then +mis-attributing the resulting replication failures to honest nodes. + +## Decision Drivers + +- Detect full close peers from **direct, locally-observed, verifiable** signals — + never client hearsay, which is an eclipse/grief vector. +- Never wrongly penalise an honest-but-slow node — mirror ADR-0002's + deterministic-vs-transient split and adaptive grace. +- Every close-group peer is responsible for holding the chunk; this node's failure to + deliver its push does not excuse a peer that lacks the data — all are tested and + penalised alike. +- Keep the close group healthy as full nodes are shed, without losing readability of + data already stored (safe while fullness stays a minority). +- Do not strangle the client fallback (ant-client ADR-0002): the self-closeness gate + width and the fallback ceiling are the same knob viewed from two sides. +- Degrade gracefully at the near-capacity boundary instead of cascade-evicting. + +## Considered Options + +1. **Penalise on client-reported full rejections.** Rejected: unverifiable; lets a + client grief honest nodes into eviction. +2. **Detect fullness only through the existing periodic responsible-chunk audit.** + Rejected as insufficient: too slow to react to a peer that is full at put time, and + it does not observe the fresh-replication path where the failure actually shows up. +3. **Detect during fresh replication, verify possession after a 5–15 minute delay, feed + an attributable penalty into the trust score, evict via saorsa-core's existing + eviction, and gate client-put acceptance on self-closeness (chosen).** + +## Decision + +### Detection + +- When a node fresh-replicates a chunk to its close group, it records **every** close + peer responsible for the chunk and schedules a **delayed possession check** for each + one. The push outcome — accepted, refused with `StorageFailed`, or undelivered — does + not change *who* is checked: the delayed check below is the single, authoritative test + and it runs against the **whole** close group. All events are **locally observed** by + the replicating node itself. +- **Best-effort delivery, but no delivery-based exemption.** The node still tries to + deliver each fresh-replication request and retries that peer **up to 2 times** on a + delivery/transport failure. But delivery success is **not** a precondition for + judgement: a close peer is responsible for holding the chunk regardless of whether + *this* node's push got through (the chunk also reaches it via the client's own puts, + other replicators, and neighbour sync). A peer the push never reached after the + retries is therefore **still tested and still penalised** if it lacks the data — the + same as every other close peer. +- **Delayed verification — 5 to 15 minutes after fresh replication.** The node waits a + randomised interval in `[5 min, 15 min]`, then queries each close peer (a PaidForList + possession query) for whether it actually holds the chunk. The delay (a) gives fresh + replication time to settle, so an honest peer still mid-store is never judged + prematurely, and (b) makes the check a surprise the peer cannot anticipate. +- **Asymmetric, unrewarding scoring.** A peer confirmed to hold the chunk receives + **no positive trust** — storing what it was paid to store is the baseline + expectation, not meritorious. A peer that does **not** hold the chunk receives a + penalty **as severe as a normal AuditChallenge failure** (the same magnitude as the + responsible-chunk / storage-commitment audit penalty). +- Only the observing node's **own direct interaction** produces a penalty. No + third-party report and no client claim ever does. + +### Accounting (reusing the ADR-0002 philosophy) + +- A confirmed **not-present** result after the delayed window is a **deterministic + failure** — re-asking cannot turn a genuine absence into possession — and is + penalised at **AuditChallenge severity on its first occurrence**, exactly as ADR-0002 + acts on deterministic audit failures the first time. The push outcome (accepted, + `StorageFailed`, or undelivered) does not change this — the verdict comes solely from + the possession check, applied identically to every close peer. +- A peer unreachable **when the possession check itself is run** has not yet *yielded* + a verdict — distinct from a peer the replication push failed to reach. The check keeps + the same grace allowance as ADR-0002's audit deadline misses (resets on success, + scales with the network-wide timeout level, never with deterministic failures) and is + **re-attempted until it returns present or absent**. This grace buys time to obtain the + answer, not a way to skip the test: every close peer is ultimately judged on whether it + holds the chunk. +- The score moves in one direction only: storing earns nothing, and only a confirmed + absence moves it — downward. + +### Eviction (provided by saorsa-core — already implemented) + +- The trust-score eviction this plan needs **already exists in saorsa-core**: full-peer + penalties feed the peer's trust score, and a peer below the threshold is immediately + evicted from routing. **No saorsa-core change is required by this ADR** — the + node-side work here only emits the full-peer penalty, at AuditChallenge severity, into + that existing trust system. +- This ADR therefore **depends on, rather than defines,** saorsa-core's existing + threshold/eviction policy, its recovery path (a node that frees capacity can + re-enter), and any near-capacity protection that avoids cascade-evicting a + uniformly-full neighbourhood. These are confirmed here as integration behaviour + (see Validation), not implemented as part of this work. + +### Self-closeness gate on client puts + +- Add a gate so a node accepts a client put only when it considers **itself** within + its local K closest to the address. This makes an accepting node's subsequent + fresh-replication participation legitimate, so it cannot mis-penalise honest nodes — + the concern behind the original "only accept a client put if within the local K + closest" requirement. It replaces today's *indirect* issuer-in-20 bound with an + explicit self-closeness check. +- **Coupling (must design as one knob):** the gate width MUST be **≥ the client + fallback ceiling** (ant-client ADR-0002, bounded by the 20-wide window), or the gate + strangles the fallback it is meant to coexist with. Express both against the same + width and choose them together. + +## Consequences + +### Positive + +- Full peers become attributable and are shed, so the close group self-heals toward + peers that can keep storing. +- Honest nodes are protected three ways: the 5–15 minute settle delay before any check, + the deterministic-vs-transient split, and saorsa-core's adaptive grace — so a peer + still mid-store or briefly slow is never penalised. +- Client fallback acceptance and node-side acceptance stay aligned because the gate and + the fallback ceiling are a single tuned width. + +### Negative / Trade-offs + +- Detection adds replication-time verification cost (an extra possession check per + fresh-replication wave). +- **Enabling eviction is a coordinated, breaking change**: the network must run + detection before eviction can be relied on, consistent with ADR-0002's rollout + gating. +- The near-capacity boundary is a real limitation — mitigated by saorsa-core's existing + safety valve and recovery path, not eliminated. +- The self-closeness gate changes today's permissive client-put acceptance and must be + rolled out in step with the client fallback. + +### Neutral / Operational + +- New node-side tunables: the fresh-replication delivery retry budget (up to 2 per + peer), the post-put verification envelope (the 5–15 minute delay window plus per-check + timeout and re-attempt budget), and the self-closeness gate width. The penalty + magnitude is not new — it reuses the existing AuditChallenge severity. +- Trust score, eviction threshold, recovery, and near-capacity protection are + **already implemented in saorsa-core**; this node only emits penalties into them — + **no saorsa-core change in this work**. +- Runs **alongside** ADR-0002's gossip-triggered storage-commitment audit and the + periodic responsible-chunk audit, sharing the same trust/eviction path; full-node + detection is simply another attributable-misbehaviour source feeding it. + +## Validation + +How we will know this decision remains correct: + +- **Minority-full testnet:** full close peers accrue penalties from direct observation + only, cross the threshold, are evicted, and the close group recomputes to healthy + peers — with stored data still readable throughout (the surviving majority held the + replicas). +- **Honest-node safety:** under induced churn, honest-but-slow nodes are not evicted + (grace plus adaptive timeout scaling hold; no eviction death spiral), and a client + cannot induce eviction of an honest node by claiming it is full. +- **Self-closeness gate:** non-close nodes no longer accept client puts, and the chosen + gate width still leaves at least quorum-many acceptors available for the client + fallback set (cross-checked against ant-client ADR-0002). +- **Near-capacity:** when a neighbourhood is uniformly full, the node degrades to + best-effort storage rather than cascade-evicting. +- **Tests required before Accepted:** the possession check fires only within the 5–15 + minute window and never before it, and runs for **every** close-group peer; a peer + confirmed holding the chunk receives **no** trust change, while a confirmed not-present + peer records exactly one penalty at **AuditChallenge severity** against the right + peer — identically whether the push was accepted, refused with `StorageFailed`, or + never delivered; a peer the push never reached after 2 retries is still tested and + still penalised if it lacks the chunk; client/third-party claims record none; a peer + unreachable *at check time* is re-attempted under the grace allowance until it yields + present/absent; the adaptive timeout grace tracks widespread timeouts but never + deterministic failures; the node emits into saorsa-core such that an + evicted-for-fullness node can re-enter after it frees capacity (integration); the + self-closeness gate width is ≥ the fallback ceiling. +- **Re-open triggers:** revisit thresholds if false positives appear; revisit the + near-capacity degradation if the network approaches global capacity. + +## Notes for AI-assisted work + +AI tools may help draft this ADR, but **must not mark it Accepted without human +review**. Accepted ADRs are immutable: create a new superseding ADR rather than +editing this one. From bc07c03dccc5663ee514e9ce60dfab54ffd651fc Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 14:18:23 +0200 Subject: [PATCH 02/12] feat(replication): retry fresh-replication delivery up to 2x per peer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First slice of ADR-0003. replicate_fresh now retries each per-peer push up to FRESH_REPLICATION_DELIVERY_MAX_RETRIES on a transport failure, so a transient hiccup doesn't silently drop the offer. The encoded offer is shared via Arc so the common single-attempt path keeps one clone per peer. Delivery assurance only — possession scoring (the delayed 5-15 min check and AuditChallenge-severity penalty) is the next stage. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/replication/config.rs | 9 ++++++++ src/replication/fresh.rs | 46 +++++++++++++++++++++++++++++++++------ 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/src/replication/config.rs b/src/replication/config.rs index 571c934..3bfaf3c 100644 --- a/src/replication/config.rs +++ b/src/replication/config.rs @@ -47,6 +47,15 @@ pub const NEIGHBOR_SYNC_SCOPE: usize = 20; /// round. pub const NEIGHBOR_SYNC_PEER_COUNT: usize = 4; +/// Best-effort delivery retries for a fresh-replication push, per peer. +/// +/// ADR-0003: on a transport/send failure the offer is retried up to this many +/// times so a transient hiccup does not silently drop it. This is delivery +/// assurance only — possession is judged separately by the delayed possession +/// check, which still penalises a close peer that lacks the chunk even if the +/// push never reached it. +pub const FRESH_REPLICATION_DELIVERY_MAX_RETRIES: u32 = 2; + /// Width used when deciding whether this node may locally store or retain a /// chunk. #[must_use] diff --git a/src/replication/fresh.rs b/src/replication/fresh.rs index af3a93a..7b17bbd 100644 --- a/src/replication/fresh.rs +++ b/src/replication/fresh.rs @@ -14,7 +14,9 @@ use saorsa_core::P2PNode; use tokio::sync::Semaphore; use crate::ant_protocol::XorName; -use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID}; +use crate::replication::config::{ + ReplicationConfig, FRESH_REPLICATION_DELIVERY_MAX_RETRIES, REPLICATION_PROTOCOL_ID, +}; use crate::replication::paid_list::PaidList; use crate::replication::protocol::{ FreshReplicationOffer, PaidNotify, ReplicationMessage, ReplicationMessageBody, @@ -90,9 +92,13 @@ pub async fn replicate_fresh( ); return; }; + // Share one encoded copy across the per-peer send tasks so a retry only + // re-materialises the buffer for the (consuming) send call, keeping the + // common single-attempt path at one clone per peer. + let encoded = Arc::new(encoded); for peer in &target_peers { let p2p = Arc::clone(p2p_node); - let data = encoded.clone(); + let data = Arc::clone(&encoded); let peer_id = *peer; let sem = Arc::clone(send_semaphore); tokio::spawn(async move { @@ -103,11 +109,37 @@ pub async fn replicate_fresh( "Replication send permit acquired for peer {peer_id} ({} available)", sem.available_permits() ); - if let Err(e) = p2p - .send_message(&peer_id, REPLICATION_PROTOCOL_ID, data, &[]) - .await - { - debug!("Failed to send fresh offer to {peer_id}: {e}"); + // ADR-0003: best-effort delivery. Retry the push up to + // FRESH_REPLICATION_DELIVERY_MAX_RETRIES times on a transport + // failure so a transient hiccup doesn't silently drop the offer. + // Possession is judged separately by the delayed possession check. + let mut attempt = 0u32; + loop { + match p2p + .send_message( + &peer_id, + REPLICATION_PROTOCOL_ID, + data.as_ref().clone(), + &[], + ) + .await + { + Ok(()) => break, + Err(e) => { + if attempt >= FRESH_REPLICATION_DELIVERY_MAX_RETRIES { + debug!( + "Failed to send fresh offer to {peer_id} after {} attempts: {e}", + attempt + 1 + ); + break; + } + attempt += 1; + debug!( + "Retrying fresh offer to {peer_id} (attempt {}): {e}", + attempt + 1 + ); + } + } } }); } From 9d43c2708d4c9bf5a9a69b77a72abf0e8f944644 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 15:04:48 +0200 Subject: [PATCH 03/12] build(deps): pin saorsa-core to PR #119 git branch to fix version skew ant-protocol 2.2.0 pulled saorsa-core 0.26.0 from crates.io while the node used a local path of the same version, producing two copies that collided on shared types (e.g. saorsa_core::address::MultiAddr) and broke the ant-devnet binary. Point the direct dependency at WithAutonomi/saorsa-core PR #119 (trust quarantine thresholds) and add a matching [patch.crates-io] so the transitive copy unifies onto one source. Full workspace now builds. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 3 +-- Cargo.toml | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e212fa4..91545e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4867,8 +4867,7 @@ dependencies = [ [[package]] name = "saorsa-core" version = "0.26.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa8cc1b7f59f97d018760ff150bbb4f217197c41622b83f7085c9cf0424b736e" +source = "git+https://github.com/WithAutonomi/saorsa-core?branch=feat/trust-quarantine-thresholds#3afe290442df42a7b6ca8989f860d185d1f6f9b4" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 7d08918..88f570a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,8 +41,11 @@ mimalloc = "0.1" # with ant-protocol's re-exports. ant-protocol = "2.2.0" -# Core (provides EVERYTHING: networking, DHT, security, trust, storage) -saorsa-core = "0.26.0" +# Core (provides EVERYTHING: networking, DHT, security, trust, storage). +# Pinned to WithAutonomi/saorsa-core PR #119 (trust quarantine thresholds); +# the matching `[patch.crates-io]` below redirects ant-protocol's transitive +# saorsa-core to the same source so Cargo unifies on one copy. +saorsa-core = { git = "https://github.com/WithAutonomi/saorsa-core", branch = "feat/trust-quarantine-thresholds" } saorsa-pqc = "0.5" # Payment verification - autonomi network lookup + EVM payment @@ -196,3 +199,11 @@ unused_async = "allow" cognitive_complexity = "allow" # Allow non-const functions during initial development (may need runtime features later) missing_const_for_fn = "allow" + +[patch.crates-io] +# Redirect the saorsa-core that ant-protocol (and other crates) pull from +# crates.io onto the same git source as the node's direct dependency, +# eliminating the duplicate 0.26.0 copies (crates.io vs git) that otherwise +# collide on shared types such as `saorsa_core::address::MultiAddr`. +# Tracks WithAutonomi/saorsa-core PR #119 (trust quarantine thresholds). +saorsa-core = { git = "https://github.com/WithAutonomi/saorsa-core", branch = "feat/trust-quarantine-thresholds" } From 260e31c717df5a55f65d1275097005564bca2606 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 15:13:55 +0200 Subject: [PATCH 04/12] feat(storage): add self-closeness gate on client PUTs Implements the ADR-0003 gate. A client PUT is now accepted only when this node is within its own local SELF_CLOSENESS_GATE_WIDTH (= K_BUCKET_SIZE) closest peers to the address, so the fresh replication it triggers is legitimate and cannot mis-penalise honest peers. The width equals the client's PUT fallback ceiling (ADR-0002), so a client routing past full close-group members onto further peers is still accepted; a genuinely far node is turned away. The P2P handle is bound out of the lock before the await, and the gate no-ops when no handle is attached (unit tests). Fresh-replication receives keep their own narrower admission check. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/storage/handler.rs | 45 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/src/storage/handler.rs b/src/storage/handler.rs index 586b517..13c52d0 100644 --- a/src/storage/handler.rs +++ b/src/storage/handler.rs @@ -38,13 +38,27 @@ use crate::client::compute_address; use crate::error::{Error, Result}; use crate::logging::{debug, info, warn}; use crate::payment::{PaymentVerifier, QuoteGenerator, VerificationContext}; +use crate::replication::admission; +use crate::replication::config::K_BUCKET_SIZE; use crate::replication::fresh::FreshWriteEvent; use crate::storage::lmdb::LmdbStorage; use bytes::Bytes; +use parking_lot::RwLock; use saorsa_core::P2PNode; use std::sync::Arc; use tokio::sync::mpsc; +/// Width of the self-closeness gate on client PUTs (ADR-0003): a node accepts +/// a PUT only when it is within its own local `SELF_CLOSENESS_GATE_WIDTH` +/// closest peers to the address. +/// +/// Set to the client's PUT fallback ceiling (`K_BUCKET_SIZE`), wider than the +/// storage-admission width, so a client routing past full close-group members +/// onto further peers (ADR-0002) is still accepted here, while a genuinely far +/// node — which could only mis-attribute fresh-replication failures — is +/// turned away. +const SELF_CLOSENESS_GATE_WIDTH: usize = K_BUCKET_SIZE; + /// ANT protocol handler. /// /// Handles chunk PUT/GET/Quote requests using LMDB storage for persistence @@ -59,6 +73,10 @@ pub struct AntProtocol { quote_generator: Arc, /// Channel for notifying the replication engine about newly-stored chunks. fresh_write_tx: Option>, + /// The node's P2P handle, attached post-construction via + /// `attach_p2p_node`. Drives the self-closeness gate on client PUTs; + /// `None` in unit tests that never attach a node. + p2p_node: RwLock>>, } impl AntProtocol { @@ -90,6 +108,7 @@ impl AntProtocol { payment_verifier, quote_generator, fresh_write_tx: None, + p2p_node: RwLock::new(None), } } @@ -99,8 +118,9 @@ impl AntProtocol { /// checks can use the live routing view. Idempotent: calling twice /// replaces the verifier handle. pub fn attach_p2p_node(&self, node: Arc) { + *self.p2p_node.write() = Some(Arc::clone(&node)); self.payment_verifier.attach_p2p_node(node); - debug!("AntProtocol: P2PNode attached for payment live-DHT checks"); + debug!("AntProtocol: P2PNode attached for payment live-DHT checks and self-closeness gate"); } /// Set the channel sender for fresh-write replication events. @@ -280,9 +300,28 @@ impl AntProtocol { return ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string())); } + // Self-closeness gate (ADR-0003): accept a client PUT only when this + // node is within its own local closest view of the address, so the + // fresh replication it triggers is legitimate and cannot mis-penalise + // honest peers. The width is the client's PUT fallback ceiling + // (`SELF_CLOSENESS_GATE_WIDTH`), so a client routing past full + // close-group members onto further peers is still accepted here. + // Skipped when no P2P handle is attached (unit tests). Bind the handle + // out of the lock first so no guard is held across the `.await`. + let attached = self.p2p_node.read().as_ref().map(Arc::clone); + if let Some(p2p) = attached { + let self_id = *p2p.peer_id(); + if !admission::is_responsible(&self_id, &address, &p2p, SELF_CLOSENESS_GATE_WIDTH).await + { + debug!("Rejecting PUT for {addr_hex}: not within local closest peers"); + return ChunkPutResponse::Error(ProtocolError::StorageFailed( + "node is not within its local closest peers for this address".to_string(), + )); + } + } + // 5. Verify payment. The ClientPut context applies the store-strength - // payment cache and verifies live proofs. Direct client PUT does not - // reject based on this node's local storage-responsibility view. + // payment cache and verifies live proofs. let payment_result = self .payment_verifier .verify_payment( From 50b472a3335b3d96a38bf90b473b3da4f43dd56c Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 15:29:05 +0200 Subject: [PATCH 05/12] feat(replication): add delayed possession-check scheduler Implements the detection+penalty core of ADR-0003. After fresh replication, replicate_fresh now returns the responsible close-group peers; the fresh-write drainer enqueues a possession-check event, and a new scheduler task waits a randomised 5-15 minute settle delay (POSSESSION_CHECK_DELAY_MIN/MAX) before probing every responsible peer for actual possession via a presence-only VerificationRequest. A peer confirmed absent is penalised at AuditChallenge severity (report_trust_event(ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT))); a peer that holds it earns nothing; an unreachable peer yields no verdict and is re-probed under a bounded grace, never penalised. Every responsible peer is tested regardless of whether the original push reached it. New possession module, config tunables, and a delay-bounds unit test. 376 replication tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/replication/config.rs | 32 ++++++ src/replication/fresh.rs | 13 ++- src/replication/mod.rs | 71 ++++++++++++- src/replication/possession.rs | 192 ++++++++++++++++++++++++++++++++++ 4 files changed, 302 insertions(+), 6 deletions(-) create mode 100644 src/replication/possession.rs diff --git a/src/replication/config.rs b/src/replication/config.rs index 3bfaf3c..7f988be 100644 --- a/src/replication/config.rs +++ b/src/replication/config.rs @@ -56,6 +56,38 @@ pub const NEIGHBOR_SYNC_PEER_COUNT: usize = 4; /// push never reached it. pub const FRESH_REPLICATION_DELIVERY_MAX_RETRIES: u32 = 2; +const POSSESSION_CHECK_DELAY_MIN_SECS: u64 = 5 * 60; +const POSSESSION_CHECK_DELAY_MAX_SECS: u64 = 15 * 60; +const POSSESSION_CHECK_TIMEOUT_SECS: u64 = 15; +const POSSESSION_CHECK_RETRY_BACKOFF_SECS: u64 = 30; + +/// Lower bound of the delay before a fresh-replication possession check runs +/// (ADR-0003). +/// +/// The delay lets replication settle so an honest peer still mid-store is not +/// judged prematurely, and makes the check unpredictable to the peer. +pub const POSSESSION_CHECK_DELAY_MIN: Duration = + Duration::from_secs(POSSESSION_CHECK_DELAY_MIN_SECS); + +/// Upper bound of the possession-check delay (ADR-0003). +pub const POSSESSION_CHECK_DELAY_MAX: Duration = + Duration::from_secs(POSSESSION_CHECK_DELAY_MAX_SECS); + +/// Per-peer request timeout for a single possession probe (ADR-0003). +pub const POSSESSION_CHECK_TIMEOUT: Duration = Duration::from_secs(POSSESSION_CHECK_TIMEOUT_SECS); + +/// Maximum possession-probe attempts per peer before giving up without a +/// verdict (ADR-0003). +/// +/// A peer unreachable at check time is re-attempted under this bounded grace +/// and is never penalised as absent. +pub const POSSESSION_CHECK_MAX_ATTEMPTS: u32 = 3; + +/// Backoff between possession-probe attempts when a peer yields no verdict +/// (ADR-0003). +pub const POSSESSION_CHECK_RETRY_BACKOFF: Duration = + Duration::from_secs(POSSESSION_CHECK_RETRY_BACKOFF_SECS); + /// Width used when deciding whether this node may locally store or retain a /// chunk. #[must_use] diff --git a/src/replication/fresh.rs b/src/replication/fresh.rs index 7b17bbd..80e9766 100644 --- a/src/replication/fresh.rs +++ b/src/replication/fresh.rs @@ -38,9 +38,10 @@ pub struct FreshWriteEvent { /// Execute fresh replication for a newly accepted record. /// -/// Sends fresh offers to close group members and `PaidNotify` to -/// `PaidCloseGroup`. Both are fire-and-forget (no ack tracking or retry per -/// Section 6.1, rule 8). +/// Sends fresh offers to close group members (with bounded delivery retries, +/// ADR-0003) and `PaidNotify` to `PaidCloseGroup`. Returns the close-group +/// peers responsible for the key (excluding self) so the caller can schedule +/// the delayed possession check; `PaidNotify` remains fire-and-forget. /// /// The `send_semaphore` limits how many outbound chunk transfers can be /// in-flight concurrently across the entire replication engine, preventing @@ -53,7 +54,7 @@ pub async fn replicate_fresh( paid_list: &Arc, config: &ReplicationConfig, send_semaphore: &Arc, -) { +) -> Vec { let self_id = *p2p_node.peer_id(); // Rule 6: Node that validates PoP adds K to PaidForList(self). @@ -90,7 +91,7 @@ pub async fn replicate_fresh( "Failed to encode FreshReplicationOffer for {}", hex::encode(key), ); - return; + return Vec::new(); }; // Share one encoded copy across the per-peer send tasks so a retry only // re-materialises the buffer for the (consuming) send call, keeping the @@ -154,6 +155,8 @@ pub async fn replicate_fresh( hex::encode(key), target_peers.len() ); + + target_peers } /// Send `PaidNotify(K)` to every peer in `PaidCloseGroup(K)` (fire-and-forget). diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 0f6394a..d0e6e81 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -23,6 +23,7 @@ pub mod config; pub mod fresh; pub mod neighbor_sync; pub mod paid_list; +pub mod possession; pub mod protocol; pub mod pruning; pub mod quorum; @@ -311,6 +312,12 @@ pub struct ReplicationEngine { /// When present, `start()` spawns a drainer task that calls /// `replicate_fresh` for each event. fresh_write_rx: Option>, + /// Sender for delayed possession-check events (ADR-0003). The fresh-write + /// drainer pushes the responsible close-group peers here after each fresh + /// replication; the possession-check scheduler drains the paired receiver. + possession_check_tx: mpsc::UnboundedSender, + /// Receiver paired with `possession_check_tx`; taken by the scheduler task. + possession_check_rx: Option>, /// Shutdown token. shutdown: CancellationToken, /// Background task handles. @@ -345,6 +352,7 @@ impl ReplicationEngine { let initial_neighbors = NeighborSyncState::new_cycle(Vec::new()); let config = Arc::new(config); + let (possession_check_tx, possession_check_rx) = mpsc::unbounded_channel(); Ok(Self { config: Arc::clone(&config), @@ -373,6 +381,8 @@ impl ReplicationEngine { audit_responder_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_AUDIT_RESPONSES)), audit_responder_inflight: Arc::new(RwLock::new(HashMap::new())), fresh_write_rx: Some(fresh_write_rx), + possession_check_tx, + possession_check_rx: Some(possession_check_rx), shutdown, task_handles: Vec::new(), }) @@ -500,6 +510,7 @@ impl ReplicationEngine { self.start_verification_worker(); self.start_bootstrap_sync(dht_events); self.start_fresh_write_drainer(); + self.start_possession_check_scheduler(); info!( "Replication engine started with {} background tasks", @@ -594,6 +605,7 @@ impl ReplicationEngine { let paid_list = Arc::clone(&self.paid_list); let config = Arc::clone(&self.config); let send_semaphore = Arc::clone(&self.send_semaphore); + let possession_tx = self.possession_check_tx.clone(); let shutdown = self.shutdown.clone(); let handle = tokio::spawn(async move { @@ -602,7 +614,7 @@ impl ReplicationEngine { () = shutdown.cancelled() => break, event = rx.recv() => { let Some(event) = event else { break }; - fresh::replicate_fresh( + let peers = fresh::replicate_fresh( &event.key, &event.data, &event.payment_proof, @@ -612,6 +624,15 @@ impl ReplicationEngine { &send_semaphore, ) .await; + // Schedule the delayed possession check (ADR-0003) for + // the responsible close-group peers. A closed receiver + // (engine shutting down) is ignored. + if !peers.is_empty() { + let _ = possession_tx.send(possession::PossessionCheckEvent { + key: event.key, + peers, + }); + } } } } @@ -620,6 +641,54 @@ impl ReplicationEngine { self.task_handles.push(handle); } + /// Spawn the possession-check scheduler (ADR-0003). + /// + /// Drains scheduled possession-check events and, for each, waits a + /// randomised 5-15 minute settle delay before probing every responsible + /// peer for actual possession. A peer that lacks the chunk is penalised at + /// `AuditChallenge` severity; an unreachable peer is re-probed under a + /// bounded grace and never penalised. + fn start_possession_check_scheduler(&mut self) { + let Some(mut rx) = self.possession_check_rx.take() else { + return; + }; + let p2p = Arc::clone(&self.p2p_node); + let shutdown = self.shutdown.clone(); + + let handle = tokio::spawn(async move { + loop { + tokio::select! { + () = shutdown.cancelled() => break, + event = rx.recv() => { + let Some(event) = event else { break }; + // Spawn a per-chunk delayed check so the drain loop + // keeps pace with the write rate. Each check sleeps the + // randomised settle delay, then probes every peer. + let p2p = Arc::clone(&p2p); + let shutdown = shutdown.clone(); + tokio::spawn(async move { + let delay = possession::random_delay(); + tokio::select! { + () = shutdown.cancelled() => {} + () = tokio::time::sleep(delay) => { + possession::run_possession_check( + event.key, + event.peers, + &p2p, + &shutdown, + ) + .await; + } + } + }); + } + } + } + debug!("Possession-check scheduler shut down"); + }); + self.task_handles.push(handle); + } + #[allow(clippy::too_many_lines)] fn start_message_handler(&mut self) { let mut p2p_events = self.p2p_node.subscribe_events(); diff --git a/src/replication/possession.rs b/src/replication/possession.rs new file mode 100644 index 0000000..5ccefb7 --- /dev/null +++ b/src/replication/possession.rs @@ -0,0 +1,192 @@ +//! Delayed possession verification for fresh replication (ADR-0003). +//! +//! After a node fresh-replicates a chunk, every close-group peer responsible +//! for it is checked 5-15 minutes later for actual possession. A peer that +//! holds the chunk earns nothing — storing what it was paid to store is the +//! baseline expectation, not meritorious; a peer confirmed *not* to hold it is +//! penalised at `AuditChallenge` severity. Delivery of the original push is +//! irrelevant: a peer the push never reached is still checked and penalised if +//! it lacks the chunk. A peer merely unreachable at check time yields no +//! verdict — it is re-attempted under a bounded grace and never penalised as +//! absent. + +use std::sync::Arc; +use std::time::Duration; + +use rand::Rng; +use saorsa_core::identity::PeerId; +use saorsa_core::{P2PNode, TrustEvent}; +use tokio_util::sync::CancellationToken; + +use crate::ant_protocol::XorName; +use crate::logging::{debug, warn}; +use crate::replication::config::{ + self, POSSESSION_CHECK_DELAY_MAX, POSSESSION_CHECK_DELAY_MIN, POSSESSION_CHECK_MAX_ATTEMPTS, + POSSESSION_CHECK_RETRY_BACKOFF, POSSESSION_CHECK_TIMEOUT, REPLICATION_PROTOCOL_ID, +}; +use crate::replication::protocol::{ + ReplicationMessage, ReplicationMessageBody, VerificationRequest, +}; + +/// A scheduled possession check for one freshly-replicated chunk. +pub struct PossessionCheckEvent { + /// Content-address of the chunk. + pub key: XorName, + /// Close-group peers responsible for holding it (excludes self). + pub peers: Vec, +} + +/// Verdict of probing a single peer for possession of a chunk. +enum ProbeOutcome { + /// Peer confirmed it holds the chunk. + Present, + /// Peer confirmed it does not hold the chunk. + Absent, + /// No verdict obtained (timeout / transport error / malformed response). + NoVerdict, +} + +/// Pick a randomised delay in `[POSSESSION_CHECK_DELAY_MIN, +/// POSSESSION_CHECK_DELAY_MAX]` to wait before a possession check runs. +#[must_use] +pub fn random_delay() -> Duration { + let to_millis = |d: Duration| u64::try_from(d.as_millis()).unwrap_or(u64::MAX); + let min = to_millis(POSSESSION_CHECK_DELAY_MIN); + let max = to_millis(POSSESSION_CHECK_DELAY_MAX); + if min >= max { + return POSSESSION_CHECK_DELAY_MIN; + } + Duration::from_millis(rand::thread_rng().gen_range(min..=max)) +} + +/// Run the possession check for one chunk against every responsible peer. +/// +/// Penalises each peer confirmed absent at `AuditChallenge` severity, leaves +/// present peers unrewarded, and never penalises a peer that only failed to +/// yield a verdict. +pub async fn run_possession_check( + key: XorName, + peers: Vec, + p2p_node: &Arc, + shutdown: &CancellationToken, +) { + let key_hex = hex::encode(key); + for peer in peers { + if shutdown.is_cancelled() { + return; + } + match probe_with_grace(&key, &peer, p2p_node, shutdown).await { + ProbeOutcome::Present => { + debug!("Possession check: {peer} holds {key_hex}"); + } + ProbeOutcome::Absent => { + warn!( + "Possession check: {peer} is missing {key_hex}; penalising at audit severity" + ); + p2p_node + .report_trust_event( + &peer, + TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT), + ) + .await; + } + ProbeOutcome::NoVerdict => { + debug!( + "Possession check: no verdict from {peer} for {key_hex} after grace; \ + not penalised" + ); + } + } + } +} + +/// Probe a peer for possession, re-attempting on no-verdict up to the grace +/// bound. A definite Present/Absent verdict short-circuits immediately. +async fn probe_with_grace( + key: &XorName, + peer: &PeerId, + p2p_node: &Arc, + shutdown: &CancellationToken, +) -> ProbeOutcome { + for attempt in 1..=POSSESSION_CHECK_MAX_ATTEMPTS { + match probe_once(key, peer, p2p_node).await { + ProbeOutcome::NoVerdict if attempt < POSSESSION_CHECK_MAX_ATTEMPTS => { + tokio::select! { + () = shutdown.cancelled() => return ProbeOutcome::NoVerdict, + () = tokio::time::sleep(POSSESSION_CHECK_RETRY_BACKOFF) => {} + } + } + outcome => return outcome, + } + } + ProbeOutcome::NoVerdict +} + +/// Send one presence-only `VerificationRequest` and interpret the response. +async fn probe_once(key: &XorName, peer: &PeerId, p2p_node: &Arc) -> ProbeOutcome { + let request = VerificationRequest { + keys: vec![*key], + // Presence-only: no paid-list status is needed to judge possession. + paid_list_check_indices: Vec::new(), + }; + let msg = ReplicationMessage { + request_id: rand::random(), + body: ReplicationMessageBody::VerificationRequest(request), + }; + let Ok(encoded) = msg.encode() else { + warn!( + "Failed to encode possession request for {}", + hex::encode(key) + ); + return ProbeOutcome::NoVerdict; + }; + + let response = match p2p_node + .send_request( + peer, + REPLICATION_PROTOCOL_ID, + encoded, + POSSESSION_CHECK_TIMEOUT, + ) + .await + { + Ok(response) => response, + Err(e) => { + debug!("Possession probe to {peer} failed: {e}"); + return ProbeOutcome::NoVerdict; + } + }; + + let decoded = match ReplicationMessage::decode(&response.data) { + Ok(decoded) => decoded, + Err(e) => { + debug!("Failed to decode possession response from {peer}: {e}"); + return ProbeOutcome::NoVerdict; + } + }; + + let ReplicationMessageBody::VerificationResponse(resp) = decoded.body else { + debug!("Unexpected possession response type from {peer}"); + return ProbeOutcome::NoVerdict; + }; + + match resp.results.iter().find(|r| r.key == *key) { + Some(result) if result.present => ProbeOutcome::Present, + Some(_) => ProbeOutcome::Absent, + None => ProbeOutcome::NoVerdict, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn random_delay_is_within_bounds() { + for _ in 0..100 { + let d = random_delay(); + assert!(d >= POSSESSION_CHECK_DELAY_MIN); + assert!(d <= POSSESSION_CHECK_DELAY_MAX); + } + } +} From 04341849c8d3a685f29f77877f8d416ed4b3d27d Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 16:06:26 +0200 Subject: [PATCH 06/12] test(replication): e2e proof of possession-check penalty Adds a test-only ReplicationEngine::run_possession_check_now that drives the ADR-0003 possession check without the 5-15 min scheduler delay, plus an e2e test on a 10-node testnet: node A probes an absent peer B and a present peer C over real transport; B's trust score drops (penalised, the signal saorsa-core eviction acts on) while C's is untouched. Proves the detection+penalty core deterministically. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/replication/mod.rs | 11 ++++++ tests/e2e/replication.rs | 80 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/src/replication/mod.rs b/src/replication/mod.rs index d0e6e81..64afd60 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -485,6 +485,17 @@ impl ReplicationEngine { .await } + /// Test-only: run the possession check immediately for `key` against + /// `peers`, bypassing the scheduler's randomised 5-15 minute settle delay. + /// + /// Penalises any peer that does not hold `key` at `AuditChallenge` + /// severity (ADR-0003). Lets e2e tests assert the detection+penalty path + /// deterministically without waiting for the scheduled check. + #[cfg(any(test, feature = "test-utils"))] + pub async fn run_possession_check_now(&self, key: XorName, peers: Vec) { + possession::run_possession_check(key, peers, &self.p2p_node, &self.shutdown).await; + } + /// Start all background tasks. /// /// `dht_events` must be subscribed **before** `P2PNode::start()` so that diff --git a/tests/e2e/replication.rs b/tests/e2e/replication.rs index eb8ee49..11342a4 100644 --- a/tests/e2e/replication.rs +++ b/tests/e2e/replication.rs @@ -252,6 +252,86 @@ async fn test_fresh_replication_propagates_to_close_group() { harness.teardown().await.expect("teardown"); } +/// ADR-0003: the delayed possession check penalises a responsible peer that +/// does NOT hold the chunk, and leaves a peer that DOES hold it unpenalised. +/// +/// Drives the check directly (`run_possession_check_now`, bypassing the 5-15 +/// minute settle delay) so the detection+penalty path is asserted +/// deterministically over real transport. The penalty is observed as a drop in +/// the checker's trust score for the absent peer (the same signal saorsa-core +/// eviction acts on), via `P2PNode::peer_trust`. +#[tokio::test] +#[serial] +async fn possession_check_penalises_absent_peer_only() { + let harness = TestHarness::setup_small().await.expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + // A is the checker; B will be absent, C will hold the chunk. All three are + // regular nodes (idx >= 3) with running replication engines and storage. + let a = harness.test_node(3).expect("node a"); + let b = harness.test_node(4).expect("node b"); + let c = harness.test_node(5).expect("node c"); + + let p2p_a = a.p2p_node.as_ref().expect("p2p a"); + let engine_a = a.replication_engine.as_ref().expect("engine a"); + let peer_b = *b.p2p_node.as_ref().expect("p2p b").peer_id(); + let peer_c = *c.p2p_node.as_ref().expect("p2p c").peer_id(); + + let content = b"adr-0003 possession-check payload"; + let address = compute_address(content); + + // C holds the chunk; B never stores it. + c.ant_protocol + .as_ref() + .expect("proto c") + .storage() + .put(&address, content) + .await + .expect("put on c"); + + assert!( + !b.ant_protocol + .as_ref() + .expect("proto b") + .storage() + .exists(&address) + .expect("exists b"), + "precondition: B must not hold the chunk" + ); + assert!( + c.ant_protocol + .as_ref() + .expect("proto c") + .storage() + .exists(&address) + .expect("exists c"), + "precondition: C must hold the chunk" + ); + + let trust_b_before = p2p_a.peer_trust(&peer_b); + let trust_c_before = p2p_a.peer_trust(&peer_c); + + // Probe both peers now (no scheduler delay). B is absent -> penalised; C is + // present -> untouched. + engine_a + .run_possession_check_now(address, vec![peer_b, peer_c]) + .await; + + let trust_b_after = p2p_a.peer_trust(&peer_b); + let trust_c_after = p2p_a.peer_trust(&peer_c); + + assert!( + trust_b_after < trust_b_before, + "absent peer B must be penalised: {trust_b_before} -> {trust_b_after}" + ); + assert!( + trust_c_after >= trust_c_before - f64::EPSILON, + "present peer C must not be penalised: {trust_c_before} -> {trust_c_after}" + ); + + harness.teardown().await.expect("teardown"); +} + /// `PaidForList` persistence (Section 18 #43). /// /// Insert a key into the `PaidList`, verify it persists by reopening the From fe9e4447a9fe28bbbd8cf30f946b2cf5737a6d1f Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 16:32:49 +0200 Subject: [PATCH 07/12] test(replication): e2e proof of possession-check scheduler wiring Proves the full replicate_fresh -> enqueue -> delayed scheduler -> penalty chain. Makes the 5-15 min possession delay a ReplicationConfig field (POSSESSION_CHECK_DELAY_MIN/MAX as defaults) so tests can shorten it, threads an optional replication_config override through TestNetworkConfig, and has the public replicate_fresh entry point schedule the possession check (the production PUT path already does via the drainer). The e2e test runs a 10-node testnet with a ~200-500ms delay, triggers fresh replication with no payment cache so close peers reject and are absent, and asserts the scheduled check penalises an absent close peer. 675 lib tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/replication/config.rs | 9 +++++ src/replication/mod.rs | 18 +++++++-- src/replication/possession.rs | 24 ++++++------ tests/e2e/replication.rs | 74 +++++++++++++++++++++++++++++++++++ tests/e2e/testnet.rs | 8 +++- 5 files changed, 118 insertions(+), 15 deletions(-) diff --git a/src/replication/config.rs b/src/replication/config.rs index 7f988be..98d9259 100644 --- a/src/replication/config.rs +++ b/src/replication/config.rs @@ -469,6 +469,13 @@ pub struct ReplicationConfig { /// Seconds to wait for `DhtNetworkEvent::BootstrapComplete` before /// proceeding with bootstrap sync (covers bootstrap nodes with no peers). pub bootstrap_complete_timeout_secs: u64, + /// Lower bound of the delay before a fresh-replication possession check + /// runs (ADR-0003). Defaults to [`POSSESSION_CHECK_DELAY_MIN`]; tests + /// shorten it so the scheduled check fires quickly. + pub possession_check_delay_min: Duration, + /// Upper bound of the possession-check delay window (ADR-0003). Defaults + /// to [`POSSESSION_CHECK_DELAY_MAX`]. + pub possession_check_delay_max: Duration, } impl Default for ReplicationConfig { @@ -495,6 +502,8 @@ impl Default for ReplicationConfig { verification_request_timeout: VERIFICATION_REQUEST_TIMEOUT, fetch_request_timeout: FETCH_REQUEST_TIMEOUT, bootstrap_complete_timeout_secs: BOOTSTRAP_COMPLETE_TIMEOUT_SECS, + possession_check_delay_min: POSSESSION_CHECK_DELAY_MIN, + possession_check_delay_max: POSSESSION_CHECK_DELAY_MAX, } } } diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 64afd60..3f15d4a 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -588,9 +588,13 @@ impl ReplicationEngine { self.sync_trigger.notify_one(); } - /// Execute fresh replication for a newly stored record. + /// Execute fresh replication for a newly stored record, then schedule the + /// delayed possession check for the responsible close-group peers + /// (ADR-0003). The production PUT path schedules via the fresh-write + /// drainer; this direct entry point schedules here so callers (and tests) + /// that drive replication directly still get the possession check. pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) { - fresh::replicate_fresh( + let peers = fresh::replicate_fresh( key, data, proof_of_payment, @@ -600,6 +604,11 @@ impl ReplicationEngine { &self.send_semaphore, ) .await; + if !peers.is_empty() { + let _ = self + .possession_check_tx + .send(possession::PossessionCheckEvent { key: *key, peers }); + } } // ======================================================================= @@ -664,6 +673,7 @@ impl ReplicationEngine { return; }; let p2p = Arc::clone(&self.p2p_node); + let config = Arc::clone(&self.config); let shutdown = self.shutdown.clone(); let handle = tokio::spawn(async move { @@ -677,8 +687,10 @@ impl ReplicationEngine { // randomised settle delay, then probes every peer. let p2p = Arc::clone(&p2p); let shutdown = shutdown.clone(); + let delay_min = config.possession_check_delay_min; + let delay_max = config.possession_check_delay_max; tokio::spawn(async move { - let delay = possession::random_delay(); + let delay = possession::random_delay(delay_min, delay_max); tokio::select! { () = shutdown.cancelled() => {} () = tokio::time::sleep(delay) => { diff --git a/src/replication/possession.rs b/src/replication/possession.rs index 5ccefb7..79b3b6a 100644 --- a/src/replication/possession.rs +++ b/src/replication/possession.rs @@ -21,8 +21,8 @@ use tokio_util::sync::CancellationToken; use crate::ant_protocol::XorName; use crate::logging::{debug, warn}; use crate::replication::config::{ - self, POSSESSION_CHECK_DELAY_MAX, POSSESSION_CHECK_DELAY_MIN, POSSESSION_CHECK_MAX_ATTEMPTS, - POSSESSION_CHECK_RETRY_BACKOFF, POSSESSION_CHECK_TIMEOUT, REPLICATION_PROTOCOL_ID, + self, POSSESSION_CHECK_MAX_ATTEMPTS, POSSESSION_CHECK_RETRY_BACKOFF, POSSESSION_CHECK_TIMEOUT, + REPLICATION_PROTOCOL_ID, }; use crate::replication::protocol::{ ReplicationMessage, ReplicationMessageBody, VerificationRequest, @@ -46,17 +46,18 @@ enum ProbeOutcome { NoVerdict, } -/// Pick a randomised delay in `[POSSESSION_CHECK_DELAY_MIN, -/// POSSESSION_CHECK_DELAY_MAX]` to wait before a possession check runs. +/// Pick a randomised delay in `[min, max]` to wait before a possession check +/// runs. The bounds come from `ReplicationConfig` (defaulting to +/// `POSSESSION_CHECK_DELAY_MIN`/`MAX`) so tests can shorten them. #[must_use] -pub fn random_delay() -> Duration { +pub fn random_delay(min: Duration, max: Duration) -> Duration { let to_millis = |d: Duration| u64::try_from(d.as_millis()).unwrap_or(u64::MAX); - let min = to_millis(POSSESSION_CHECK_DELAY_MIN); - let max = to_millis(POSSESSION_CHECK_DELAY_MAX); - if min >= max { - return POSSESSION_CHECK_DELAY_MIN; + let min_ms = to_millis(min); + let max_ms = to_millis(max); + if min_ms >= max_ms { + return min; } - Duration::from_millis(rand::thread_rng().gen_range(min..=max)) + Duration::from_millis(rand::thread_rng().gen_range(min_ms..=max_ms)) } /// Run the possession check for one chunk against every responsible peer. @@ -180,11 +181,12 @@ async fn probe_once(key: &XorName, peer: &PeerId, p2p_node: &Arc) -> Pr #[cfg(test)] mod tests { use super::*; + use crate::replication::config::{POSSESSION_CHECK_DELAY_MAX, POSSESSION_CHECK_DELAY_MIN}; #[test] fn random_delay_is_within_bounds() { for _ in 0..100 { - let d = random_delay(); + let d = random_delay(POSSESSION_CHECK_DELAY_MIN, POSSESSION_CHECK_DELAY_MAX); assert!(d >= POSSESSION_CHECK_DELAY_MIN); assert!(d <= POSSESSION_CHECK_DELAY_MAX); } diff --git a/tests/e2e/replication.rs b/tests/e2e/replication.rs index 11342a4..ca62c95 100644 --- a/tests/e2e/replication.rs +++ b/tests/e2e/replication.rs @@ -5,6 +5,7 @@ #![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] +use super::testnet::TestNetworkConfig; use super::TestHarness; use ant_node::client::compute_address; use ant_node::replication::commitment_state::{BuiltCommitment, ResponderCommitmentState}; @@ -332,6 +333,79 @@ async fn possession_check_penalises_absent_peer_only() { harness.teardown().await.expect("teardown"); } +/// ADR-0003: the possession-check *scheduler* (not the direct-drive path) +/// fires after the configured delay and penalises an absent close peer. +/// +/// Uses a shortened possession delay so the scheduled check runs in well under +/// a second. No payment cache is pre-populated, so the close-group peers reject +/// the fresh offer and are absent when the scheduled check probes them. Proves +/// the `replicate_fresh` -> enqueue -> delayed scheduler -> penalty wiring. +#[tokio::test] +#[serial] +async fn possession_scheduler_penalises_absent_close_peer_after_delay() { + let mut net_config = TestNetworkConfig::small(); + net_config.replication_config = Some(ReplicationConfig { + possession_check_delay_min: Duration::from_millis(200), + possession_check_delay_max: Duration::from_millis(500), + ..ReplicationConfig::default() + }); + let harness = TestHarness::setup_with_config(net_config) + .await + .expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + let a = harness.test_node(3).expect("node a"); + let p2p_a = a.p2p_node.as_ref().expect("p2p a"); + let engine_a = a.replication_engine.as_ref().expect("engine a"); + let self_a = *p2p_a.peer_id(); + + let content = b"adr-0003 scheduler-wiring payload"; + let address = compute_address(content); + + // A's close group for this key = exactly the peers the scheduled possession + // check targets. With no payment cache anywhere, they reject the fresh offer + // and are absent when probed. + let close_group_size = ReplicationConfig::default().close_group_size; + let close_group: Vec = p2p_a + .dht_manager() + .find_closest_nodes_local_with_self(&address, close_group_size) + .await + .iter() + .filter(|n| n.peer_id != self_a) + .map(|n| n.peer_id) + .collect(); + assert!(!close_group.is_empty(), "expected a non-empty close group"); + + let trust_before: Vec = close_group.iter().map(|p| p2p_a.peer_trust(p)).collect(); + + // Trigger fresh replication; the engine enqueues the possession check, which + // fires ~200-500 ms later and penalises the absent close peers. + let dummy_pop = [0x01u8; 64]; + engine_a + .replicate_fresh(&address, content, &dummy_pop) + .await; + + // Poll until at least one absent close peer is penalised (trust drops). + let deadline = tokio::time::Instant::now() + PROPAGATION_TIMEOUT; + let mut penalised = false; + while tokio::time::Instant::now() < deadline { + penalised = close_group + .iter() + .zip(trust_before.iter()) + .any(|(peer, &before)| p2p_a.peer_trust(peer) < before - f64::EPSILON); + if penalised { + break; + } + tokio::time::sleep(PROPAGATION_POLL_INTERVAL).await; + } + assert!( + penalised, + "the scheduled possession check should have penalised an absent close peer" + ); + + harness.teardown().await.expect("teardown"); +} + /// `PaidForList` persistence (Section 18 #43). /// /// Insert a key into the `PaidList`, verify it persists by reopening the diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index 6b5ae84..949aa4d 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -222,6 +222,11 @@ pub struct TestNetworkConfig { /// this network (e.g. Anvil testnet) for on-chain verification. /// When `None`, defaults to `ArbitrumOne`. pub evm_network: Option, + + /// Optional replication-config override applied to every node's + /// replication engine. `None` uses `ReplicationConfig::default()`. Tests + /// use this to shorten timers — e.g. the ADR-0003 possession-check delay. + pub replication_config: Option, } impl Default for TestNetworkConfig { @@ -255,6 +260,7 @@ impl Default for TestNetworkConfig { enable_node_logging: false, payment_enforcement: false, evm_network: None, + replication_config: None, } } } @@ -1284,7 +1290,7 @@ impl TestNetwork { (&node.p2p_node, &node.ant_protocol, &node.node_identity) { let shutdown = CancellationToken::new(); - let repl_config = ReplicationConfig::default(); + let repl_config = self.config.replication_config.clone().unwrap_or_default(); let (_fresh_tx, fresh_rx) = tokio::sync::mpsc::unbounded_channel(); let node_identity = Arc::clone(id); match ReplicationEngine::new( From fa0048318f6ee4a8abbdbe32f6c1d61819fe4fb6 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 16:39:37 +0200 Subject: [PATCH 08/12] test(storage): e2e proof of self-closeness gate accept/reject Adds an e2e test on a 25-node testnet: the closest (responsible) node accepts a client PUT (gate must not break normal uploads), while the farthest node (rank >= K_BUCKET_SIZE, outside its own closest view) rejects before payment with a closeness error. Proves both the regression guard and the intended reject behaviour of the ADR-0003 self-closeness gate. Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/e2e/replication.rs | 97 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 96 insertions(+), 1 deletion(-) diff --git a/tests/e2e/replication.rs b/tests/e2e/replication.rs index ca62c95..6acc10c 100644 --- a/tests/e2e/replication.rs +++ b/tests/e2e/replication.rs @@ -10,7 +10,7 @@ use super::TestHarness; use ant_node::client::compute_address; use ant_node::replication::commitment_state::{BuiltCommitment, ResponderCommitmentState}; use ant_node::replication::config::{ - storage_admission_width, REPAIR_HINT_MIN_AGE, REPLICATION_PROTOCOL_ID, + storage_admission_width, K_BUCKET_SIZE, REPAIR_HINT_MIN_AGE, REPLICATION_PROTOCOL_ID, }; use ant_node::replication::protocol::{ compute_audit_digest, AuditChallenge, AuditResponse, FetchRequest, FetchResponse, @@ -406,6 +406,101 @@ async fn possession_scheduler_penalises_absent_close_peer_after_delay() { harness.teardown().await.expect("teardown"); } +/// ADR-0003 self-closeness gate: a node accepts a client PUT only when it is +/// within its own local `K_BUCKET_SIZE`-closest to the address. +/// +/// Needs more than `K_BUCKET_SIZE` nodes so a far node falls outside its own +/// closest view. The responsible (closest) node accepts and stores; the +/// non-responsible (farthest) node rejects before payment with a closeness +/// error. Guards both the regression risk (gate must not reject responsible +/// puts) and the intended reject behaviour. +#[tokio::test] +#[serial] +async fn self_closeness_gate_accepts_responsible_rejects_far_node() { + let harness = TestHarness::setup().await.expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + let content = b"adr-0003 self-closeness gate payload"; + let address = compute_address(content); + + // Rank all peers by XOR distance to the address (identical from any node's + // view once routing tables are warm). + let ranker_p2p = harness + .test_node(3) + .expect("ranker") + .p2p_node + .as_ref() + .expect("p2p"); + let ranked: Vec = ranker_p2p + .dht_manager() + .find_closest_nodes_local_with_self(&address, harness.node_count()) + .await + .iter() + .map(|n| n.peer_id) + .collect(); + assert!( + ranked.len() > K_BUCKET_SIZE, + "need > K_BUCKET_SIZE nodes to exercise the reject path; got {}", + ranked.len() + ); + + // Only nodes that actually run a protocol handler can serve a client PUT. + let has_protocol = |peer: &PeerId| { + node_index_for_peer(&harness, peer) + .and_then(|idx| harness.test_node(idx)) + .is_some_and(|n| n.ant_protocol.is_some()) + }; + + // Closest node with a handler -> within its own K-closest -> gate accepts. + let close_peer = ranked + .iter() + .copied() + .find(|p| has_protocol(p)) + .expect("a close node with a handler"); + // Farthest node beyond the gate width with a handler -> gate rejects. + let far_peer = ranked + .iter() + .copied() + .enumerate() + .rev() + .find(|(rank, p)| *rank >= K_BUCKET_SIZE && has_protocol(p)) + .map(|(_, p)| p) + .expect("a far node (rank >= K_BUCKET_SIZE) with a handler"); + + let close_idx = node_index_for_peer(&harness, &close_peer).expect("close idx"); + let far_idx = node_index_for_peer(&harness, &far_peer).expect("far idx"); + + // Accept path: a responsible node stores the chunk (gate passes). + let close_result = harness + .test_node(close_idx) + .expect("close node") + .store_chunk(content) + .await; + assert!( + close_result.is_ok(), + "responsible (closest) node must accept the PUT, got {close_result:?}" + ); + + // Reject path: a non-responsible node rejects before payment with a + // closeness error. + let far_result = harness + .test_node(far_idx) + .expect("far node") + .store_chunk(content) + .await; + assert!( + far_result.is_err(), + "non-responsible (farthest) node must reject the PUT" + ); + let err = format!("{}", far_result.expect_err("far rejection")); + assert!( + err.contains("closest"), + "rejection should cite closeness, got: {err}" + ); + + harness.teardown().await.expect("teardown"); +} + /// `PaidForList` persistence (Section 18 #43). /// /// Insert a key into the `PaidList`, verify it persists by reopening the From 52b6f49fa22c630ffa0b7502ddf5a3e3771314e4 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 18:18:20 +0200 Subject: [PATCH 09/12] test(replication): prove full close-group nodes are shunned --- tests/e2e/replication.rs | 153 +++++++++++++++++++++++++++++++++++++++ tests/e2e/testnet.rs | 39 +++++++++- 2 files changed, 189 insertions(+), 3 deletions(-) diff --git a/tests/e2e/replication.rs b/tests/e2e/replication.rs index 6acc10c..ab3dc01 100644 --- a/tests/e2e/replication.rs +++ b/tests/e2e/replication.rs @@ -33,6 +33,20 @@ use tokio::sync::RwLock; const PROPAGATION_TIMEOUT: Duration = Duration::from_secs(15); /// Interval between propagation poll checks. const PROPAGATION_POLL_INTERVAL: Duration = Duration::from_millis(200); +/// Checker node used by the full-node shunning regression. +const FULL_NODE_SHUN_CHECKER_INDEX: usize = 3; +/// Target node made disk-full by the full-node shunning regression. +const FULL_NODE_SHUN_TARGET_INDEX: usize = 4; +/// Search budget for finding a key whose close group contains the full node. +const FULL_NODE_SHUN_KEY_SEARCH_ATTEMPTS: usize = 10_000; +/// Fast lower bound for the full-node shunning scheduler check. +const FULL_NODE_SHUN_POSSESSION_DELAY_MIN: Duration = Duration::from_millis(200); +/// Fast upper bound for the full-node shunning scheduler check. +const FULL_NODE_SHUN_POSSESSION_DELAY_MAX: Duration = Duration::from_millis(500); +/// Dummy proof length used when a test only needs to reach pre-payment gates. +const DUMMY_PAYMENT_PROOF_LEN: usize = 64; +/// Dummy proof byte used when a test only needs to reach pre-payment gates. +const DUMMY_PAYMENT_PROOF_BYTE: u8 = 0x01; /// Send a replication request via saorsa-core's request-response mechanism /// and decode the response. @@ -406,6 +420,145 @@ async fn possession_scheduler_penalises_absent_close_peer_after_delay() { harness.teardown().await.expect("teardown"); } +/// ADR-0003 full-node shunning: a close-group peer that is disk-full rejects a +/// fresh-replication offer before payment verification, remains absent for the +/// key, and is penalised when the checker probes possession. +/// +/// This bridges the two protections that make a full node get shunned by close +/// groups: capacity rejection creates a missing replica, and the delayed +/// possession-check verdict turns that absence into the trust signal that +/// saorsa-core eviction acts on. +#[tokio::test] +#[serial] +async fn full_close_group_node_rejects_replica_and_is_penalised_as_absent() { + let mut net_config = TestNetworkConfig::small(); + net_config.replication_config = Some(ReplicationConfig { + possession_check_delay_min: FULL_NODE_SHUN_POSSESSION_DELAY_MIN, + possession_check_delay_max: FULL_NODE_SHUN_POSSESSION_DELAY_MAX, + ..ReplicationConfig::default() + }); + net_config + .storage_disk_reserve_overrides + .insert(FULL_NODE_SHUN_TARGET_INDEX, u64::MAX); + let harness = TestHarness::setup_with_config(net_config) + .await + .expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + let checker = harness + .test_node(FULL_NODE_SHUN_CHECKER_INDEX) + .expect("checker node"); + let full_node = harness + .test_node(FULL_NODE_SHUN_TARGET_INDEX) + .expect("full node"); + let checker_p2p = checker.p2p_node.as_ref().expect("checker p2p"); + let checker_engine = checker.replication_engine.as_ref().expect("checker engine"); + let full_p2p = full_node.p2p_node.as_ref().expect("full node p2p"); + let full_peer = *full_p2p.peer_id(); + + let close_group_size = ReplicationConfig::default().close_group_size; + let admission_width = storage_admission_width(close_group_size); + let mut candidate = None; + for attempt in 0..FULL_NODE_SHUN_KEY_SEARCH_ATTEMPTS { + let content = format!("adr-0003 full-node shunning payload {attempt}").into_bytes(); + let address = compute_address(&content); + let full_node_in_checker_close_group = checker_p2p + .dht_manager() + .find_closest_nodes_local_with_self(&address, close_group_size) + .await + .iter() + .any(|node| node.peer_id == full_peer); + if !full_node_in_checker_close_group { + continue; + } + + let full_node_admits_self = full_p2p + .dht_manager() + .find_closest_nodes_local_with_self(&address, admission_width) + .await + .iter() + .any(|node| node.peer_id == full_peer); + if full_node_admits_self { + candidate = Some((content, address)); + break; + } + } + let (content, address) = + candidate.expect("find key where full node is a responsible close-group peer"); + + for idx in 0..harness.node_count() { + if let Some(protocol) = harness + .test_node(idx) + .and_then(|node| node.ant_protocol.as_ref()) + { + protocol.payment_verifier().cache_insert(address); + } + } + + let dummy_payment_proof = vec![DUMMY_PAYMENT_PROOF_BYTE; DUMMY_PAYMENT_PROOF_LEN]; + let offer = FreshReplicationOffer { + key: address, + data: content.clone(), + proof_of_payment: dummy_payment_proof.clone(), + }; + let response = send_replication_request( + checker_p2p, + &full_peer, + ReplicationMessage { + request_id: rand::random(), + body: ReplicationMessageBody::FreshReplicationOffer(offer), + }, + PROPAGATION_TIMEOUT, + ) + .await; + + match response.body { + ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected { + key, + reason, + }) => { + assert_eq!(key, address); + assert!( + reason.contains("Insufficient disk space"), + "expected disk-full rejection, got: {reason}" + ); + } + other => panic!("expected disk-full rejection, got: {other:?}"), + } + + let full_storage = full_node + .ant_protocol + .as_ref() + .expect("full node protocol") + .storage(); + assert!( + !full_storage.exists(&address).expect("exists on full node"), + "full node must not store the rejected replica" + ); + + let trust_before = checker_p2p.peer_trust(&full_peer); + checker_engine + .replicate_fresh(&address, &content, &dummy_payment_proof) + .await; + + let deadline = tokio::time::Instant::now() + PROPAGATION_TIMEOUT; + let mut trust_after = trust_before; + while tokio::time::Instant::now() < deadline { + trust_after = checker_p2p.peer_trust(&full_peer); + if trust_after < trust_before - f64::EPSILON { + break; + } + tokio::time::sleep(PROPAGATION_POLL_INTERVAL).await; + } + assert!( + trust_after < trust_before - f64::EPSILON, + "full close-group peer should be shunned by the scheduled possession check: \ + {trust_before} -> {trust_after}" + ); + + harness.teardown().await.expect("teardown"); +} + /// ADR-0003 self-closeness gate: a node accepts a client PUT only when it is /// within its own local `K_BUCKET_SIZE`-closest to the address. /// diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index 949aa4d..3eb19a8 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -35,6 +35,7 @@ use saorsa_core::{ identity::NodeIdentity, IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, }; +use std::collections::HashMap; use std::net::{Ipv4Addr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; @@ -227,6 +228,12 @@ pub struct TestNetworkConfig { /// replication engine. `None` uses `ReplicationConfig::default()`. Tests /// use this to shorten timers — e.g. the ADR-0003 possession-check delay. pub replication_config: Option, + + /// Optional per-node storage disk-reserve overrides. + /// + /// Tests can set a node's reserve above available disk space to make its + /// capacity pre-check fail deterministically without filling the host disk. + pub storage_disk_reserve_overrides: HashMap, } impl Default for TestNetworkConfig { @@ -261,6 +268,7 @@ impl Default for TestNetworkConfig { payment_enforcement: false, evm_network: None, replication_config: None, + storage_disk_reserve_overrides: HashMap::new(), } } } @@ -1046,9 +1054,19 @@ impl TestNetwork { })?); // Initialize AntProtocol for this node with payment enforcement setting - let ant_protocol = - Self::create_ant_protocol(&data_dir, self.config.evm_network.clone(), &identity) - .await?; + let storage_disk_reserve = self + .config + .storage_disk_reserve_overrides + .get(&index) + .copied() + .unwrap_or_default(); + let ant_protocol = Self::create_ant_protocol_with_disk_reserve( + &data_dir, + self.config.evm_network.clone(), + storage_disk_reserve, + &identity, + ) + .await?; Ok(TestNode { index, @@ -1086,10 +1104,25 @@ impl TestNetwork { data_dir: &std::path::Path, evm_network: Option, identity: &saorsa_core::identity::NodeIdentity, + ) -> Result { + Self::create_ant_protocol_with_disk_reserve(data_dir, evm_network, 0, identity).await + } + + /// Create an `AntProtocol` handler with an explicit storage disk reserve. + /// + /// # Errors + /// + /// Returns an error if LMDB storage initialisation fails. + pub async fn create_ant_protocol_with_disk_reserve( + data_dir: &std::path::Path, + evm_network: Option, + disk_reserve: u64, + identity: &saorsa_core::identity::NodeIdentity, ) -> Result { // Create LMDB storage let storage_config = LmdbStorageConfig { root_dir: data_dir.to_path_buf(), + disk_reserve, ..LmdbStorageConfig::test_default() }; let storage = LmdbStorage::new(storage_config) From 8639fdabacfb1350ecf25ee7ca43c5cd3a2e7cfd Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 20:07:49 +0200 Subject: [PATCH 10/12] feat(replication): widen fresh-replication accept admission during convergence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Amends ADR-0003. A healthy replica whose routing table still lists closer full nodes it hasn't evicted yet ranks itself outside the narrow storage_admission_width window and would wrongly reject a fresh-replication offer it should accept — stalling convergence, and inconsistent with the wider K-window it already accepts client PUTs within. Widen the fresh-offer accept gate to config.paid_list_close_group_size (= K_BUCKET_SIZE = 20). Safe: the sender still fans out only to close_group_size, so this adds no stores; retention/pruning stays at storage_admission_width, so steady-state replication is unchanged and any transient over-coverage is reclaimed once the close group converges (the multi-day prune hysteresis spans the minutes-long eviction window). Sender fan-out deliberately not widened — the repair path heals that case. 675 lib tests + fresh-replication & full-close-group-shunning e2e tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- ...R-0003-full-node-detection-and-eviction.md | 46 ++++++++++++++++++- src/replication/mod.rs | 12 +++-- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/docs/adr/ADR-0003-full-node-detection-and-eviction.md b/docs/adr/ADR-0003-full-node-detection-and-eviction.md index 9a8aa55..2da2a5d 100644 --- a/docs/adr/ADR-0003-full-node-detection-and-eviction.md +++ b/docs/adr/ADR-0003-full-node-detection-and-eviction.md @@ -141,6 +141,46 @@ mis-attributing the resulting replication failures to honest nodes. strangles the fallback it is meant to coexist with. Express both against the same width and choose them together. +### Fresh-replication admission during convergence + +A healthy node that *should* hold a chunk can transiently fail to recognise its own +responsibility: while full nodes closer to the key still sit in its routing table +(not yet detected and evicted), they push it past the narrow +`storage_admission_width` (close group + small margin) in its *own* view. With that +narrow window it would then **reject** a fresh-replication offer it ought to accept — +so the replication path that is meant to heal coverage instead refuses it, stalling +convergence. This is also inconsistent: the same node already accepts a **client +PUT** for that key within the wider `K_BUCKET_SIZE` window (both the payment +issuer-closeness check and the self-closeness gate above use it), so a key could land +on the node directly from a client but not via replication. + +**Decision:** widen the **fresh-replication accept** admission to the K-wide +paid-close-group neighbourhood (`paid_list_close_group_size`, equal to +`K_BUCKET_SIZE` = 20) — the same window client PUTs use — so transient view-skew +from un-evicted full nodes no longer causes spurious rejection. A node accepts a +fresh offer when it is within its own local `paid_list_close_group_size`-closest to +the key. Two properties keep this safe: + +- **No extra storage from the accept side.** The sender still fans out only to its + `close_group_size` targets, so widening the *receiver's* accept window adds no + stores — it only stops those targets from rejecting due to view-skew. +- **Retention stays narrow.** Long-term retention/pruning keeps using + `storage_admission_width`, so steady-state replication is still ≈ K. Any transient + over-coverage is reclaimed once the close group converges (full nodes evicted → the + node's view tightens → it sits correctly inside or outside the narrow window), and + the multi-day prune hysteresis comfortably spans the minutes-long eviction window, + so a legitimate replica is never pruned mid-convergence. + +**Sender side (deliberately not widened).** The mirror case — the *sender's* own +close group being mostly full, so its `close_group_size` offers never reach the +healthy nodes ranked beyond the full ones — is left to the existing convergence loop: +the possession check evicts the full nodes, the close group tightens to healthy +members, and the responsible-chunk repair (neighbour sync) fetches the chunk to the +nodes that become responsible. Widening the sender fan-out to `K_BUCKET_SIZE` was +considered and rejected — it would triple fresh-replication fan-out on *every* write +and hold large transient over-replication for the full prune-hysteresis window, for a +case the repair path already heals. + ## Consequences ### Positive @@ -205,7 +245,11 @@ How we will know this decision remains correct: present/absent; the adaptive timeout grace tracks widespread timeouts but never deterministic failures; the node emits into saorsa-core such that an evicted-for-fullness node can re-enter after it frees capacity (integration); the - self-closeness gate width is ≥ the fallback ceiling. + self-closeness gate width is ≥ the fallback ceiling; a healthy node with un-evicted + full nodes ahead of it in its routing table (so it ranks outside + `storage_admission_width` but within `K_BUCKET_SIZE`) still **accepts** a + fresh-replication offer for the key, while retention/pruning stays scoped to + `storage_admission_width`. - **Re-open triggers:** revisit thresholds if false positives appear; revisit the near-capacity degradation if the network approaches global capacity. diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 3f15d4a..fb9f6e1 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -2074,13 +2074,19 @@ async fn handle_fresh_offer( return Ok(()); } - // Rule 7: check storage admission. Fresh chunk receivers accept the close - // group plus a small margin to absorb local routing-table disagreement. + // Rule 7: check storage admission. Fresh chunk receivers accept across the + // paid-close-group neighbourhood (`paid_list_close_group_size`, = K_BUCKET_SIZE, + // the same width client PUTs use), not just the close group plus a small + // margin (ADR-0003). During full-node shunning a healthy replica's routing + // table may still list closer full nodes it hasn't evicted yet, ranking it + // outside the narrow window in its own view; the wider accept window absorbs + // that transient skew so the chunk still lands. Retention (pruning) stays at + // `storage_admission_width`, so steady-state replication is unchanged. if !admission::is_responsible( &self_id, &offer.key, p2p_node, - storage_admission_width(config.close_group_size), + config.paid_list_close_group_size, ) .await { From b2c83d35c4937a81caea6fb99c5a4fdba1a03977 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 29 Jun 2026 17:42:31 +0200 Subject: [PATCH 11/12] fix(replication)!: restore audit challenge timeout penalties Prune-confirmation and fresh-replication possession audits now use AuditChallenge-style failure semantics for timeouts and bad proofs, while subtree audits keep their separate timeout grace behavior. Possession checks also share the bootstrap-claim grace and abuse tracking used by responsible and prune audits. BREAKING CHANGE: AuditChallenge-style prune and possession audit timeouts are penalized immediately instead of using timeout grace. --- src/replication/config.rs | 88 +--- src/replication/mod.rs | 852 +++++++--------------------------- src/replication/possession.rs | 483 ++++++++++++++++--- src/replication/protocol.rs | 10 +- src/replication/pruning.rs | 21 +- 5 files changed, 594 insertions(+), 860 deletions(-) diff --git a/src/replication/config.rs b/src/replication/config.rs index 98d9259..c762cbd 100644 --- a/src/replication/config.rs +++ b/src/replication/config.rs @@ -58,8 +58,6 @@ pub const FRESH_REPLICATION_DELIVERY_MAX_RETRIES: u32 = 2; const POSSESSION_CHECK_DELAY_MIN_SECS: u64 = 5 * 60; const POSSESSION_CHECK_DELAY_MAX_SECS: u64 = 15 * 60; -const POSSESSION_CHECK_TIMEOUT_SECS: u64 = 15; -const POSSESSION_CHECK_RETRY_BACKOFF_SECS: u64 = 30; /// Lower bound of the delay before a fresh-replication possession check runs /// (ADR-0003). @@ -73,20 +71,9 @@ pub const POSSESSION_CHECK_DELAY_MIN: Duration = pub const POSSESSION_CHECK_DELAY_MAX: Duration = Duration::from_secs(POSSESSION_CHECK_DELAY_MAX_SECS); -/// Per-peer request timeout for a single possession probe (ADR-0003). -pub const POSSESSION_CHECK_TIMEOUT: Duration = Duration::from_secs(POSSESSION_CHECK_TIMEOUT_SECS); - -/// Maximum possession-probe attempts per peer before giving up without a -/// verdict (ADR-0003). -/// -/// A peer unreachable at check time is re-attempted under this bounded grace -/// and is never penalised as absent. -pub const POSSESSION_CHECK_MAX_ATTEMPTS: u32 = 3; - -/// Backoff between possession-probe attempts when a peer yields no verdict -/// (ADR-0003). -pub const POSSESSION_CHECK_RETRY_BACKOFF: Duration = - Duration::from_secs(POSSESSION_CHECK_RETRY_BACKOFF_SECS); +// The possession probe reuses the `AuditChallenge` wire and the bandwidth- +// calibrated `audit_response_timeout(1)` deadline, so it needs no bespoke +// per-probe timeout or retry constants. /// Width used when deciding whether this node may locally store or retain a /// chunk. @@ -149,8 +136,9 @@ pub const MAX_CONCURRENT_REPLICATION_SENDS: usize = 3; /// their disk reads don't stall replication. This caps how many run at once /// across the engine, restoring backpressure: a peer flooding audit challenges /// cannot fan out unbounded `get_raw` reads or multi-MiB byte serves. When the -/// cap is hit, the challenge is dropped — the auditor graces a non-response as a -/// timeout, so honest auditors are unaffected and only a flooder is throttled. +/// cap is hit, the challenge is dropped and the caller's audit-specific timeout +/// policy applies. The cap must therefore stay high enough for honest audit +/// traffic while still throttling flooders. /// Sized to cover a handful of concurrent honest auditors (the per-peer /// gossip-audit cooldown is 30 min, so genuine concurrent audits are few) while /// bounding the byte round's worst-case resident bytes @@ -161,9 +149,10 @@ pub const MAX_CONCURRENT_AUDIT_RESPONSES: usize = 16; /// /// The global [`MAX_CONCURRENT_AUDIT_RESPONSES`] ceiling alone is not /// flood-fair: one peer spamming challenges could occupy every slot and starve -/// honest auditors (whose dropped challenges convert to timeouts → strikes on -/// the honest peers). This per-peer cap guarantees no source holds more than -/// its share, so a flood self-throttles. Audits are cooldown-gated (one +/// honest auditors (whose dropped challenges convert to audit failures or +/// timeout verdicts on the challenged peers). This per-peer cap guarantees no +/// source holds more than its share, so a flood self-throttles. Audits are +/// cooldown-gated (one /// gossip-triggered audit per peer per 30 min), so 2 in-flight per peer /// comfortably covers the legitimate round-1 + round-2 overlap. pub const MAX_AUDIT_RESPONSES_PER_PEER: u32 = 2; @@ -318,23 +307,6 @@ const _: () = assert!( "wire cap must fit at least one max-size chunk per byte-challenge response" ); -/// Rollout gate for timeout-driven eviction. -/// -/// When `false`, a peer that crosses the consecutive-timeout strike threshold -/// is logged but NOT reported to the trust engine (no eviction). This PR is a -/// breaking wire change (old nodes cannot decode the new `StorageCommitment` -/// gossip), so a not-yet-upgraded peer times out on every new audit and looks -/// exactly like a non-storing peer; penalising timeouts during the mixed-version -/// window would make upgraded nodes evict every old node — a death spiral. -/// -/// Confirmed storage-integrity failures (`DigestMismatch`/`KeyAbsent`/ -/// `Rejected`/`MalformedResponse`) are NEVER gated by this — those only come -/// from a peer that actually answered with bad data, never an old node. Flip to -/// `true` in a small follow-up release once the fleet has upgraded. This is a -/// real `const` (not commented-out code) so both gate sites compile and stay in -/// sync, and the flip is one line. -pub const TIMEOUT_EVICTION_ENABLED: bool = false; - /// Verification request timeout (per-batch). const VERIFICATION_REQUEST_TIMEOUT_SECS: u64 = 15; /// Verification request timeout (per-batch). @@ -354,28 +326,6 @@ pub const PENDING_VERIFY_MAX_AGE: Duration = Duration::from_secs(PENDING_VERIFY_ /// Trust event weight for confirmed audit failures. pub const AUDIT_FAILURE_TRUST_WEIGHT: f64 = 5.0; -/// Consecutive audit *timeouts* a peer may accumulate before a timeout is -/// reported as an `ApplicationFailure` trust event. -/// -/// The audit response timeout is an economic deterrent calibrated for -/// residential bandwidth, not a hard cryptographic bound: a single slow -/// response is routine for an honest node under transient load (GC pause, -/// disk flush, a burst of concurrent requests). Penalizing on the first -/// timeout false-positives those nodes. -/// -/// Requiring `N` *consecutive* timeouts before penalizing removes that -/// false-positive while preserving the deterrent against a peer that does not -/// actually store the data and must fetch it at audit time: such a peer is -/// slow on *every* audit and accumulates a fresh strike each tick until it -/// crosses the threshold, whereas an honest node answers normally between rare -/// slow ticks and any success resets its strike counter to zero (see -/// `handle_audit_result`). The discriminator is *persistence* of slowness -/// versus *transience*. This deliberately does not widen the per-challenge -/// window. Applies ONLY to `AuditFailureReason::Timeout`; confirmed -/// storage-integrity failures (`DigestMismatch` / `KeyAbsent` / `Rejected` / -/// `MalformedResponse`) remain instantly punishable. -pub const AUDIT_TIMEOUT_STRIKE_THRESHOLD: u32 = 3; - /// Probability of launching a subtree audit when a peer's *changed* commitment /// is ingested via gossip (ADR-0002). Keeps audits occasional surprise exams. pub const AUDIT_ON_GOSSIP_PROBABILITY: f64 = 0.2; @@ -642,12 +592,10 @@ impl ReplicationConfig { /// A relay attacker on a residential link (~5-12 MB/s) who must /// fetch the same `k × 4 MiB` over the network sees ~10-100× higher /// latency than disk for the data alone, plus per-chunk round-trips, - /// and misses the budget — recording a timeout strike (per - /// `handle_audit_timeout` → `handle_audit_failure`). After - /// [`AUDIT_TIMEOUT_STRIKE_THRESHOLD`] consecutive timeouts this would - /// fire an `application_failure` trust event — but note that report is - /// currently suppressed for the breaking rollout (grep - /// TIMEOUT-EVICTION-DISABLED); the strike accounting still runs. + /// and misses the budget. In the periodic responsible-chunk + /// `AuditChallenge`, prune-confirmation, and ADR-0003 possession-check paths + /// that timeout is an immediate audit failure. The heavier subtree audit + /// still graces timeouts separately. /// /// This is an economic deterrent for the §7 relay limit calibrated /// for residential bandwidth, NOT a hard bound: a relay on a @@ -789,14 +737,6 @@ mod tests { assert!((AUDIT_FAILURE_TRUST_WEIGHT - 5.0).abs() <= f64::EPSILON); } - #[test] - fn audit_timeout_strike_threshold_is_three() { - // Smallest threshold that tolerates back-to-back transient slowness - // while still penalizing a persistently-slow non-storing peer within a - // few audit ticks. - assert_eq!(AUDIT_TIMEOUT_STRIKE_THRESHOLD, 3); - } - #[test] fn replication_protocol_id_is_v2() { // The v12 storage-bound audit changes replication SEMANTICS. The diff --git a/src/replication/mod.rs b/src/replication/mod.rs index fb9f6e1..80776cd 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -216,18 +216,6 @@ pub struct ReplicationEngine { /// are lightweight (`PeerSyncRecord` is two fields) and peer IDs are /// naturally bounded by the routing table's k-bucket capacity. sync_history: Arc>>, - /// Per-peer consecutive audit-timeout strike counter. - /// - /// A timeout increments the peer's strike count; a successful audit - /// response resets it to zero. Only when a peer reaches - /// [`config::AUDIT_TIMEOUT_STRIKE_THRESHOLD`] consecutive timeouts is a - /// timeout reported as an `ApplicationFailure` trust event. This separates - /// honest transient slowness (resets on the next normal response) from a - /// peer that does not store the data and is slow on every audit. Lives - /// outside `NeighborSyncState` so it is never wiped by a neighbor-sync - /// cycle reset. Grows with peer churn like `sync_history`; entries are a - /// single `u32` and peer IDs are bounded by k-bucket capacity. - audit_timeout_strikes: Arc>>, /// Per-peer cooldown for gossip-triggered subtree audits (ADR-0002). /// /// Records when each peer was last audited so a burst of gossiped @@ -303,7 +291,7 @@ pub struct ReplicationEngine { /// [`MAX_AUDIT_RESPONSES_PER_PEER`]. The GLOBAL semaphore alone is not /// flood-fair: one peer spamming challenges could occupy every slot and /// starve honest auditors, whose dropped challenges then convert to - /// timeouts and record strikes on the HONEST peers (codex-r2 A). This + /// audit timeouts against HONEST peers (codex-r2 A). This /// per-peer cap guarantees no single source can hold more than its share, /// so a flood self-throttles without denying service to everyone else. audit_responder_inflight: Arc>>, @@ -363,7 +351,6 @@ impl ReplicationEngine { queues: Arc::new(RwLock::new(ReplicationQueues::new())), sync_state: Arc::new(RwLock::new(initial_neighbors)), sync_history: Arc::new(RwLock::new(HashMap::new())), - audit_timeout_strikes: Arc::new(RwLock::new(HashMap::new())), audit_on_gossip_cooldown: Arc::new(RwLock::new(HashMap::new())), sync_cycle_epoch: Arc::new(RwLock::new(0)), repair_proofs: Arc::new(RwLock::new(RepairProofs::new())), @@ -493,7 +480,16 @@ impl ReplicationEngine { /// deterministically without waiting for the scheduled check. #[cfg(any(test, feature = "test-utils"))] pub async fn run_possession_check_now(&self, key: XorName, peers: Vec) { - possession::run_possession_check(key, peers, &self.p2p_node, &self.shutdown).await; + possession::run_possession_check( + key, + peers, + &self.p2p_node, + &self.storage, + &self.config, + &self.sync_state, + &self.shutdown, + ) + .await; } /// Start all background tasks. @@ -665,15 +661,17 @@ impl ReplicationEngine { /// /// Drains scheduled possession-check events and, for each, waits a /// randomised 5-15 minute settle delay before probing every responsible - /// peer for actual possession. A peer that lacks the chunk is penalised at - /// `AuditChallenge` severity; an unreachable peer is re-probed under a - /// bounded grace and never penalised. + /// peer for actual possession. A peer that cryptographically fails to prove + /// possession, including by timeout, is penalised at `AuditChallenge` + /// severity. fn start_possession_check_scheduler(&mut self) { let Some(mut rx) = self.possession_check_rx.take() else { return; }; let p2p = Arc::clone(&self.p2p_node); + let storage = Arc::clone(&self.storage); let config = Arc::clone(&self.config); + let sync_state = Arc::clone(&self.sync_state); let shutdown = self.shutdown.clone(); let handle = tokio::spawn(async move { @@ -686,6 +684,9 @@ impl ReplicationEngine { // keeps pace with the write rate. Each check sleeps the // randomised settle delay, then probes every peer. let p2p = Arc::clone(&p2p); + let storage = Arc::clone(&storage); + let config = Arc::clone(&config); + let sync_state = Arc::clone(&sync_state); let shutdown = shutdown.clone(); let delay_min = config.possession_check_delay_min; let delay_max = config.possession_check_delay_max; @@ -698,6 +699,9 @@ impl ReplicationEngine { event.key, event.peers, &p2p, + &storage, + &config, + &sync_state, &shutdown, ) .await; @@ -734,7 +738,6 @@ impl ReplicationEngine { let ever_capable_peers = Arc::clone(&self.ever_capable_peers); let recent_provers = Arc::clone(&self.recent_provers); let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts); - let audit_timeout_strikes = Arc::clone(&self.audit_timeout_strikes); let audit_on_gossip_cooldown = Arc::clone(&self.audit_on_gossip_cooldown); let sync_state = Arc::clone(&self.sync_state); let audit_responder_semaphore = Arc::clone(&self.audit_responder_semaphore); @@ -747,7 +750,6 @@ impl ReplicationEngine { config: Arc::clone(&config), recent_provers: Arc::clone(&recent_provers), sync_state: Arc::clone(&sync_state), - audit_timeout_strikes: Arc::clone(&audit_timeout_strikes), cooldown: Arc::clone(&audit_on_gossip_cooldown), }; @@ -869,10 +871,6 @@ impl ReplicationEngine { last_commitment_by_peer.write().await.remove(&peer_id); recent_provers.write().await.forget_peer(&peer_id); sig_verify_attempts.write().await.remove(&peer_id); - // Drop the timeout-strike entry too, so a - // departed peer leaves no residual (keeps this - // map bounded under churn, like its siblings). - audit_timeout_strikes.write().await.remove(&peer_id); // Same for the gossip-audit cooldown (ADR-0002). audit_on_gossip_cooldown.write().await.remove(&peer_id); // The sticky `commitment_capable` flag is @@ -919,7 +917,6 @@ impl ReplicationEngine { config: Arc::clone(&config), recent_provers: Arc::clone(&self.recent_provers), sync_state: Arc::clone(&sync_state), - audit_timeout_strikes: Arc::clone(&self.audit_timeout_strikes), cooldown: Arc::clone(&self.audit_on_gossip_cooldown), }; @@ -1002,12 +999,6 @@ impl ReplicationEngine { let bootstrap_state = Arc::clone(&self.bootstrap_state); let is_bootstrapping = Arc::clone(&self.is_bootstrapping); let sync_state = Arc::clone(&self.sync_state); - // Needed so the responsible-chunk audit routes failures through the same - // strike/grace path as the storage-commitment audit (timeouts graced, - // not penalised on the first occurrence) and can revoke holder credit on - // a confirmed failure. - let recent_provers = Arc::clone(&self.recent_provers); - let audit_timeout_strikes = Arc::clone(&self.audit_timeout_strikes); let handle = tokio::spawn(async move { // Invariant 19: wait for bootstrap to drain before starting audits. @@ -1041,15 +1032,7 @@ impl ReplicationEngine { ) .await }; - handle_audit_result( - &result, - &p2p, - &sync_state, - &recent_provers, - &audit_timeout_strikes, - &config, - ) - .await; + handle_audit_result(&result, &p2p, &sync_state, &config).await; } // Then run periodically. @@ -1073,15 +1056,7 @@ impl ReplicationEngine { ) .await }; - handle_audit_result( - &result, - &p2p, - &sync_state, - &recent_provers, - &audit_timeout_strikes, - &config, - ) - .await; + handle_audit_result(&result, &p2p, &sync_state, &config).await; } } } @@ -1623,9 +1598,9 @@ impl Drop for AuditResponderGuard { /// Try to admit one audit-responder task for `source`: take a global permit AND /// a per-peer slot (both bounded). Returns `None` (caller drops the challenge, -/// which the auditor graces as a timeout) if either ceiling is hit, so one -/// flooder can neither exhaust the global pool's effect on others nor exceed -/// its own per-peer share (codex-r2 A). +/// leaving the remote auditor to apply that audit path's timeout policy) if +/// either ceiling is hit, so one flooder can neither exhaust the global pool's +/// effect on others nor exceed its own per-peer share (codex-r2 A). async fn admit_audit_responder( semaphore: &Arc, inflight: &Arc>>, @@ -1795,8 +1770,9 @@ async fn handle_replication_message( // block all other replication traffic until its digests complete // (head-of-line blocking). The same flood-fair admission applies: a // global ceiling AND a per-peer cap, dropping the challenge if either - // is hit (an honest auditor graces a non-response as a timeout, while - // a flooder is held to its per-peer share and cannot starve others). + // is hit. Responsible/prune audit timeouts are penalised by the + // caller, so the caps must remain high enough for honest audit load; + // the per-peer share still prevents one flooder from starving others. let Some(guard) = admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source) .await @@ -1840,10 +1816,10 @@ async fn handle_replication_message( // // A bounded, flood-fair admission restores backpressure (codex#1 + // codex-r2 A): a global ceiling AND a per-peer cap. If either is hit - // we drop this challenge — the auditor graces a non-response as a - // timeout, so an honest auditor is unaffected and only a flooder is - // throttled (and it cannot starve other peers, since its share is - // capped per-peer). + // we drop this challenge. Subtree auditors grace timeout + // non-responses, so capacity drops throttle flooders without turning + // into trust penalties (and one source cannot starve other peers, + // since its share is capped per-peer). info!( "Audit challenge received: kind=subtree source={source} request_response={}", rr_message_id.is_some(), @@ -3784,94 +3760,45 @@ fn first_failed_key_label(confirmed_failed_keys: &[XorName]) -> String { ) } -/// Execute the side effects for a confirmed storage-commitment audit failure. +/// Execute the side effects for a subtree storage-commitment audit failure. /// -/// [`plan_failed_audit`] is the pure decision INCLUDING the strike selection -/// (record-a-strike-for-`Timeout` vs leave-untouched for confirmed failures), -/// extracted so the whole glue — not just the verdict — is testable without a -/// live `P2PNode`. This function is only the resulting I/O. Timeouts are graced -/// and rollout-gated (TIMEOUT-EVICTION-DISABLED); confirmed failures penalize on -/// the first occurrence and revoke holder credit. -async fn handle_failed_audit( +/// Subtree timeouts are fully graced: the multi-round, multi-chunk challenge can +/// legitimately time out on slow or loaded honest peers, so it never touches the +/// responsible-chunk audit path or its timeout accounting. Confirmed subtree +/// failures still penalise immediately and revoke holder credit. +async fn handle_subtree_failed_audit( challenged_peer: &PeerId, confirmed_failed_key_count: usize, reason: &AuditFailureReason, p2p_node: &Arc, sync_state: &Arc>, recent_provers: &Arc>, - audit_timeout_strikes: &Arc>>, ) { - let action = { - let mut strikes = audit_timeout_strikes.write().await; - plan_failed_audit(reason, &mut strikes, challenged_peer) - }; - match action { - AuditFailureAction::TimeoutGrace => { - // Honest transient slowness: no penalty, no credit loss, retain the - // bootstrap claim. Only *sustained* timeouts (a peer that always - // has to refetch) survive to the threshold — the per-challenge - // window is never widened. - debug!( - "Audit timeout for {challenged_peer} (under the {}-strike threshold); \ - within grace, retaining bootstrap claim, no penalty", - config::AUDIT_TIMEOUT_STRIKE_THRESHOLD - ); - } - AuditFailureAction::TimeoutPenalize => { - // Strikes are tracked/logged so the mechanism stays observable; the - // trust report that drives eviction is gated behind - // `TIMEOUT_EVICTION_ENABLED` (off this release — see its doc for the - // rollout-death-spiral rationale). Confirmed storage-integrity - // failures (ConfirmedPenalize below) are unaffected. - warn!( - "Audit timeout for {challenged_peer}: reached the {}-strike threshold of \ - consecutive timeouts ({})", - config::AUDIT_TIMEOUT_STRIKE_THRESHOLD, - if config::TIMEOUT_EVICTION_ENABLED { - "penalizing" - } else { - "eviction disabled this release — not penalizing" - } - ); - if config::TIMEOUT_EVICTION_ENABLED { - p2p_node - .report_trust_event( - challenged_peer, - TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT), - ) - .await; - } - } - AuditFailureAction::ConfirmedPenalize => { - // The caller (handle_subtree_audit_result) already logged the rich - // failure line with reason + per-category summary; avoid a redundant - // second error log here. `confirmed_failed_key_count` is retained in - // the signature for callers/tests that assert on it. - let _ = confirmed_failed_key_count; - // Peer returned a non-bootstrap response — clear the active claim - // while retaining claim history. - { - let mut state = sync_state.write().await; - state.clear_active_bootstrap_claim(challenged_peer); - } - // Revoke holder credit on a CONFIRMED failure (DigestMismatch / - // KeyAbsent / Rejected / MalformedResponse): the peer no longer - // provably holds what it committed to, so it must not keep §6 - // holder credit for the proof TTL. The §5 `forget_commitment` path - // only fires on an "unknown commitment hash" reply; genuine byte - // loss surfaces here. - { - let mut provers_guard = recent_provers.write().await; - apply_audit_failure_credit_revocation(&mut provers_guard, challenged_peer, reason); - } - p2p_node - .report_trust_event( - challenged_peer, - TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT), - ) - .await; - } + if matches!(reason, AuditFailureReason::Timeout) { + debug!( + "Audit timeout for {challenged_peer} fully graced \ + (subtree audit does not evict on timeout)" + ); + return; + } + + // The caller already logged the rich failure line with reason + per-category + // summary; avoid a redundant second error log here. + let _ = confirmed_failed_key_count; + { + let mut state = sync_state.write().await; + state.clear_active_bootstrap_claim(challenged_peer); + } + { + let mut provers_guard = recent_provers.write().await; + apply_audit_failure_credit_revocation(&mut provers_guard, challenged_peer, reason); } + p2p_node + .report_trust_event( + challenged_peer, + TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT), + ) + .await; } /// Handle audit result: log findings and emit trust events. @@ -3880,7 +3807,6 @@ async fn handle_subtree_audit_result( p2p_node: &Arc, sync_state: &Arc>, recent_provers: &Arc>, - audit_timeout_strikes: &Arc>>, config: &ReplicationConfig, ) { match result { @@ -3895,14 +3821,6 @@ async fn handle_subtree_audit_result( let mut state = sync_state.write().await; state.clear_active_bootstrap_claim(challenged_peer); } - // A normal response proves the slowness (if any) was transient, so - // reset the timeout-strike counter. Only *sustained* timeouts (a - // peer that must refetch on every audit) survive this reset to - // accumulate toward the penalty threshold. - { - let mut strikes = audit_timeout_strikes.write().await; - strikes.remove(challenged_peer); - } p2p_node .report_trust_event( challenged_peer, @@ -3929,20 +3847,16 @@ async fn handle_subtree_audit_result( summary.absent_keys, summary.digest_mismatch_keys, ); - // Route the side effects through the strike-grace path: timeouts - // are graced (and rollout-gated by TIMEOUT-EVICTION-DISABLED), - // deterministic failures penalize on the first occurrence and - // revoke holder credit. Do NOT report ApplicationFailure inline - // here — that would evict honest not-yet-upgraded peers on a - // single timeout during the breaking rollout. - handle_failed_audit( + // Route the side effects through the subtree-only failure path. + // Responsible-chunk `AuditChallenge` handling intentionally uses + // its own old immediate-penalty handler below. + handle_subtree_failed_audit( challenged_peer, confirmed_failed_keys.len(), reason, p2p_node, sync_state, recent_provers, - audit_timeout_strikes, ) .await; } @@ -3996,11 +3910,9 @@ async fn handle_subtree_audit_result( /// bootstrap claim. A `Timeout` does not (the peer may still be legitimately /// bootstrapping); every confirmed storage-integrity reason does. /// -/// Both audits now funnel through [`handle_failed_audit`], which derives the -/// clear-vs-retain decision from [`decide_audit_failure_action`]; this predicate -/// is retained as the readable single-source-of-truth that those tests assert -/// against (it is the exact `reason != Timeout` rule the action planner uses). -#[cfg(test)] +/// Responsible-chunk `AuditChallenge` failures use this directly: timeouts keep +/// the bootstrap claim but are still reported as audit failures, matching the +/// pre-ADR-0002 behaviour. fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool { !matches!(reason, AuditFailureReason::Timeout) } @@ -4008,162 +3920,106 @@ fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool { /// Handle the result of a responsible-chunk audit tick (audit #2): emit trust /// events and manage bootstrap-claim state. /// -/// Delegates to [`handle_subtree_audit_result`] so BOTH audits share one -/// failure path: timeouts go through the strike/grace logic (graced under the -/// threshold, eviction gated off this release via `TIMEOUT-EVICTION-DISABLED`) -/// and only confirmed storage-integrity failures penalise on the first -/// occurrence and revoke holder credit. Previously this handler reported -/// `ApplicationFailure` inline for EVERY failure including `Timeout`, which — -/// with the breaking v2 wire change — would false-penalise honest -/// not-yet-upgraded peers on a single audit. (Audit #2 cannot credit holders, -/// so the shared handler's strike-reset/credit-revocation is a superset of what -/// it needs; the responsible-chunk audit never produces `Passed { .. }` with -/// holder credit, so nothing is over-credited.) +/// This is intentionally separate from the subtree audit result handler. A +/// responsible-chunk `AuditChallenge` `Failed` result reports +/// `ApplicationFailure` immediately for every reason, including `Timeout`, +/// restoring the pre-ADR-0002 behaviour. async fn handle_audit_result( result: &AuditTickResult, p2p_node: &Arc, sync_state: &Arc>, - recent_provers: &Arc>, - audit_timeout_strikes: &Arc>>, config: &ReplicationConfig, ) { - handle_subtree_audit_result( - result, - p2p_node, - sync_state, - recent_provers, - audit_timeout_strikes, - config, - ) - .await; -} - -/// What the audit-failure handler should do for a given failure, given the -/// peer's post-increment timeout-strike count. Pure (no I/O) so the whole -/// decision can be exercised end-to-end without a live `P2PNode`. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum AuditFailureAction { - /// Timeout under the strike threshold: no trust penalty, no credit - /// revocation, retain the bootstrap claim (honest transient slowness). - TimeoutGrace, - /// Timeout at/over the threshold: report `ApplicationFailure`. Bootstrap - /// claim retained; holder credit NOT revoked (the peer never admitted byte - /// loss). The non-storing-peer case. - TimeoutPenalize, - /// Confirmed storage-integrity failure: penalize immediately, clear the - /// active bootstrap claim, and revoke holder credit. - ConfirmedPenalize, -} - -/// Upper bound on a peer's consecutive-timeout strike count. Must exceed the -/// largest reachable adaptive threshold (base + `MAX_ADAPTIVE_TIMEOUT_GRACE`) so -/// a genuinely non-responsive peer's count can always catch up to and cross an -/// inflated threshold — otherwise capping at the base would make timeout -/// penalties unreachable once the adaptive threshold rose. -const AUDIT_TIMEOUT_STRIKE_MAX: u32 = 64; - -/// Maximum extra grace the adaptive mechanism may add on top of the base -/// threshold. Bounds how far a (possibly stale) set of timing-out peers can -/// widen the window, so a small persistent failing cohort cannot push the -/// threshold arbitrarily high and shield a bad node indefinitely. -const MAX_ADAPTIVE_TIMEOUT_GRACE: u32 = 2 * config::AUDIT_TIMEOUT_STRIKE_THRESHOLD; - -/// Record an audit timeout for `peer` and return its new consecutive-timeout -/// strike count, saturating at [`AUDIT_TIMEOUT_STRIKE_MAX`] (well above any -/// reachable adaptive threshold). A successful audit removes the peer's entry -/// (the `Passed` arm of [`handle_subtree_audit_result`]), so only *consecutive* -/// timeouts accumulate here. -fn record_audit_timeout_strike(strikes: &mut HashMap, peer: &PeerId) -> u32 { - let count = strikes.entry(*peer).or_insert(0); - *count = count.saturating_add(1).min(AUDIT_TIMEOUT_STRIKE_MAX); - *count -} - -/// The adaptive timeout-strike threshold for judging `peer` (ADR-0002 "Network -/// Resilience"): `min(median of the OTHER timing-out peers' counts, -/// MAX_ADAPTIVE_TIMEOUT_GRACE) + base threshold`. -/// -/// In a healthy network almost no peer carries timeout strikes, so the median -/// is 0 and the threshold is the base [`config::AUDIT_TIMEOUT_STRIKE_THRESHOLD`]. -/// During genuine disruption many *honest* peers time out together, lifting the -/// median and widening the grace so the audit system does not pile onto a -/// struggling network — but the widening is capped at `MAX_ADAPTIVE_TIMEOUT_GRACE` -/// so a stale failing cohort cannot inflate it without bound. -/// -/// `peer` is EXCLUDED from the median so a lone timing-out peer cannot raise its -/// own grace bar. Combined with the map being fed ONLY by timeouts (deterministic -/// failures never touch it), this closes self-inflation and bounds -/// attacker-inflation of the grace window. -fn adaptive_timeout_threshold(strikes: &HashMap, peer: &PeerId) -> u32 { - let grace = median_timeout_strikes_excluding(strikes, peer).min(MAX_ADAPTIVE_TIMEOUT_GRACE); - grace.saturating_add(config::AUDIT_TIMEOUT_STRIKE_THRESHOLD) -} - -/// Lower median of the current per-peer consecutive-timeout counts, excluding -/// `peer`. No other peers → 0. -fn median_timeout_strikes_excluding(strikes: &HashMap, peer: &PeerId) -> u32 { - let mut counts: Vec = strikes - .iter() - .filter(|(p, _)| *p != peer) - .map(|(_, c)| *c) - .collect(); - if counts.is_empty() { - return 0; - } - counts.sort_unstable(); - // Lower median: for even-sized inputs take the lower of the two middle - // values ((len-1)/2), so the grace is conservative rather than inflated. - counts.get((counts.len() - 1) / 2).copied().unwrap_or(0) -} - -/// Whether a peer's consecutive-timeout strike count reaches the (adaptive) -/// threshold for emitting an `ApplicationFailure` trust event. -fn timeout_strike_reaches_threshold(strikes: u32, threshold: u32) -> bool { - strikes >= threshold -} - -/// Decide what to do about a confirmed audit failure. `timeout_strikes_after` -/// is the peer's strike count after recording this event and `timeout_threshold` -/// the adaptive threshold to compare against (both only meaningful when -/// `reason == Timeout`). Pure, so the integration-level decision can be asserted -/// in tests with no networking. -fn decide_audit_failure_action( - reason: &AuditFailureReason, - timeout_strikes_after: u32, - timeout_threshold: u32, -) -> AuditFailureAction { - if matches!(reason, AuditFailureReason::Timeout) { - if timeout_strike_reaches_threshold(timeout_strikes_after, timeout_threshold) { - AuditFailureAction::TimeoutPenalize - } else { - AuditFailureAction::TimeoutGrace + match result { + AuditTickResult::Passed { + challenged_peer, + keys_checked, + } => { + debug!("Audit passed for {challenged_peer} ({keys_checked} keys)"); + { + let mut state = sync_state.write().await; + state.clear_active_bootstrap_claim(challenged_peer); + } + p2p_node + .report_trust_event( + challenged_peer, + TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT), + ) + .await; } - } else { - AuditFailureAction::ConfirmedPenalize + AuditTickResult::Failed { evidence } => { + if let FailureEvidence::AuditFailure { + challenged_peer, + confirmed_failed_keys, + summary, + reason, + .. + } = evidence + { + let first_failed_key = first_failed_key_label(confirmed_failed_keys); + error!( + "Audit failure for {challenged_peer}: reason={reason:?}, confirmed_failed_keys={}, challenged_keys={}, absent_keys={}, digest_mismatch_keys={}, first_failed_key={first_failed_key}", + confirmed_failed_keys.len(), + summary.challenged_keys, + summary.absent_keys, + summary.digest_mismatch_keys, + ); + if audit_failure_clears_bootstrap_claim(reason) { + let mut state = sync_state.write().await; + state.clear_active_bootstrap_claim(challenged_peer); + } else { + debug!("Audit timeout for {challenged_peer}; retaining active bootstrap claim"); + } + p2p_node + .report_trust_event( + challenged_peer, + TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT), + ) + .await; + } + } + AuditTickResult::BootstrapClaim { peer } => { + let should_report = { + let now = Instant::now(); + let mut state = sync_state.write().await; + match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period) + { + BootstrapClaimObservation::WithinGrace { .. } => { + debug!("Audit: peer {peer} claims bootstrapping (within grace period)"); + false + } + BootstrapClaimObservation::PastGrace { first_seen } => { + warn!( + "Audit: peer {peer} claiming bootstrap past grace period \ + ({:?} > {:?}), reporting abuse", + now.duration_since(first_seen), + config.bootstrap_claim_grace_period, + ); + true + } + BootstrapClaimObservation::Repeated { first_seen } => { + warn!( + "Audit: peer {peer} repeated bootstrap claim after previously \ + stopping; first claim was {:?} ago, reporting abuse", + now.duration_since(first_seen), + ); + true + } + } + }; + if should_report { + p2p_node + .report_trust_event( + peer, + TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT), + ) + .await; + } + } + AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {} } } -/// Plan the response to a confirmed audit failure, performing the -/// strike-selection glue in-process: a `Timeout` records a strike against -/// `peer` (so consecutive timeouts accumulate) and is judged against the -/// adaptive threshold; every other reason is a confirmed failure that does NOT -/// touch the strike map. The caller owns the lock and performs the resulting I/O. -fn plan_failed_audit( - reason: &AuditFailureReason, - strikes: &mut HashMap, - peer: &PeerId, -) -> AuditFailureAction { - // Snapshot the adaptive threshold from the *other* peers' counts (excluding - // this peer), so a single peer's own timeouts cannot raise its own grace bar. - let threshold = adaptive_timeout_threshold(strikes, peer); - let strikes_after = if matches!(reason, AuditFailureReason::Timeout) { - record_audit_timeout_strike(strikes, peer) - } else { - 0 - }; - decide_audit_failure_action(reason, strikes_after, threshold) -} - /// Whether a confirmed audit failure with this reason should revoke the /// peer's `recent_provers` holder credit immediately (v12 §6). /// @@ -4207,7 +4063,6 @@ struct GossipAuditTrigger { config: Arc, recent_provers: Arc>, sync_state: Arc>, - audit_timeout_strikes: Arc>>, cooldown: Arc>>, } @@ -4318,7 +4173,6 @@ async fn maybe_trigger_gossip_audit( &trigger.p2p_node, &trigger.sync_state, &trigger.recent_provers, - &trigger.audit_timeout_strikes, &trigger.config, ) .await; @@ -4770,12 +4624,9 @@ async fn rebuild_and_rotate_commitment( #[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] mod tests { use super::{ - adaptive_timeout_threshold, apply_audit_failure_credit_revocation, - audit_failure_clears_bootstrap_claim, audit_failure_revokes_holder_credit, - audit_launch_decision, config, cooldown_allows_audit, decide_audit_failure_action, - first_failed_key_label, fresh_offer_payment_context, median_timeout_strikes_excluding, - paid_notify_payment_context, plan_failed_audit, record_audit_timeout_strike, - timeout_strike_reaches_threshold, AuditFailureAction, AUDIT_TIMEOUT_STRIKE_MAX, + apply_audit_failure_credit_revocation, audit_failure_clears_bootstrap_claim, + audit_failure_revokes_holder_credit, audit_launch_decision, config, cooldown_allows_audit, + first_failed_key_label, fresh_offer_payment_context, paid_notify_payment_context, }; use crate::payment::VerificationContext; use crate::replication::recent_provers::RecentProvers; @@ -4826,197 +4677,6 @@ mod tests { PeerId::from_bytes(bytes) } - // HELPER-LEVEL: counter arithmetic + threshold predicate. The reset is - // simulated by an in-test `strikes.remove`; the real reset path (the - // `Passed` arm) is covered at the glue level below. - #[test] - fn single_timeout_then_success_emits_no_failure_and_resets() { - let peer = strike_peer(1); - let mut strikes: HashMap = HashMap::new(); - let base = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD; - let after_one = record_audit_timeout_strike(&mut strikes, &peer); - assert_eq!(after_one, 1); - assert!(!timeout_strike_reaches_threshold(after_one, base)); - strikes.remove(&peer); - assert!(!strikes.contains_key(&peer)); - } - - #[test] - fn consecutive_timeouts_cross_threshold_at_n() { - let peer = strike_peer(2); - let mut strikes: HashMap = HashMap::new(); - let n = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD; - let mut last = 0; - for i in 1..=n { - last = record_audit_timeout_strike(&mut strikes, &peer); - if i < n { - assert!(!timeout_strike_reaches_threshold(last, n)); - } - } - assert!(timeout_strike_reaches_threshold(last, n)); - // The count keeps climbing past the base threshold (so it can also - // cross a higher *adaptive* threshold), but is bounded by the strike - // cap — no unbounded growth. - let mut c = last; - for _ in 0..200 { - c = record_audit_timeout_strike(&mut strikes, &peer); - } - assert_eq!( - c, - super::AUDIT_TIMEOUT_STRIKE_MAX, - "count saturates at the max cap" - ); - assert!(c > n, "count must be able to exceed the base threshold"); - } - - // ADR-0002 Network Resilience: adaptive timeout threshold. - - #[test] - fn median_timeout_strikes_basics() { - let target = strike_peer(99); - let mut strikes: HashMap = HashMap::new(); - // No other peers → 0 (healthy network, threshold == base). - assert_eq!(median_timeout_strikes_excluding(&strikes, &target), 0); - strikes.insert(strike_peer(1), 1); - strikes.insert(strike_peer(2), 3); - strikes.insert(strike_peer(3), 5); - // Sorted [1,3,5], lower-median index 1 → 3. - assert_eq!(median_timeout_strikes_excluding(&strikes, &target), 3); - } - - // ADVERSARIAL (ADR point e + sybil-inflation bound). Two invariants the - // existing suite leaves unpinned: - // 1. EVEN-count inputs must take the LOWER of the two middle values. The - // existing basics test only feeds an odd-length cohort, so an - // implementation that used `len/2` (upper median) would still pass it. - // Here [1,4] -> lower median 1 (not 4) and [2,4,6,8] -> 4 (not 6). - // 2. A sybil cohort pinned at the *strike cap* (the most an attacker could - // ever drive fabricated peers to) STILL cannot push the grace past - // MAX_ADAPTIVE_TIMEOUT_GRACE: the threshold saturates at base + max - // grace regardless of how high or how numerous the cohort is. - // FLIPS IF: median switches to the upper element on even input, or the - // grace clamp (`.min(MAX_ADAPTIVE_TIMEOUT_GRACE)`) is removed. - #[test] - fn even_count_takes_lower_median_and_sybil_cohort_cannot_exceed_grace_bound() { - let target = strike_peer(150); - - // Even count == 2: lower of [1, 4] is 1. - let mut two: HashMap = HashMap::new(); - two.insert(strike_peer(1), 1); - two.insert(strike_peer(2), 4); - assert_eq!( - median_timeout_strikes_excluding(&two, &target), - 1, - "even-count median must take the LOWER middle value (1), not the upper (4)" - ); - - // Even count == 4: sorted [2,4,6,8], lower median index (4-1)/2 = 1 → 4. - let mut four: HashMap = HashMap::new(); - for (i, v) in (10u8..).zip([2u32, 4, 6, 8]) { - four.insert(strike_peer(i), v); - } - assert_eq!( - median_timeout_strikes_excluding(&four, &target), - 4, - "even-count median must be the lower middle (4), not the upper (6)" - ); - - // Sybil cohort pinned at the strike CAP — the strongest inflation an - // attacker could mount — must not lift the threshold past base + max - // grace. Try several cohort sizes (odd and even) to be sure. - for cohort in [2u8, 5, 8, 20] { - let mut strikes: HashMap = HashMap::new(); - for i in 0..cohort { - strikes.insert(strike_peer(50 + i), super::AUDIT_TIMEOUT_STRIKE_MAX); - } - let threshold = adaptive_timeout_threshold(&strikes, &target); - assert_eq!( - threshold, - config::AUDIT_TIMEOUT_STRIKE_THRESHOLD + super::MAX_ADAPTIVE_TIMEOUT_GRACE, - "a sybil cohort at the strike cap (size {cohort}) must saturate the grace at \ - the bound, never exceed it" - ); - } - - // And even at the bounded-but-inflated threshold, a genuinely - // non-responsive target can still cross it (cap > max reachable - // threshold), so the bound never shields a bad node forever. - let mut strikes: HashMap = HashMap::new(); - for i in 0..8u8 { - strikes.insert(strike_peer(80 + i), super::AUDIT_TIMEOUT_STRIKE_MAX); - } - let threshold = adaptive_timeout_threshold(&strikes, &target); - let mut c = 0; - for _ in 0..(threshold + 5) { - c = record_audit_timeout_strike(&mut strikes, &target); - } - assert!( - timeout_strike_reaches_threshold(c, threshold), - "target must still cross the bounded inflated threshold ({c} vs {threshold})" - ); - } - - #[test] - fn lone_timing_out_peer_does_not_inflate_its_own_grace() { - // The peer under judgement is excluded from the median, so a single bad - // peer (the common case) is judged against the base threshold and caught - // — it cannot raise its own bar as its strike count climbs. - let bad = strike_peer(7); - let mut strikes: HashMap = HashMap::new(); - strikes.insert(bad, 5); // its own large count must not count - assert_eq!( - adaptive_timeout_threshold(&strikes, &bad), - config::AUDIT_TIMEOUT_STRIKE_THRESHOLD - ); - } - - #[test] - fn widespread_timeouts_widen_the_grace() { - // Genuine disruption: many OTHER honest peers carry timeout strikes. The - // median rises, so the threshold for any given peer widens beyond the - // base — the audit system does not pile onto a struggling network. - let target = strike_peer(100); - let mut strikes: HashMap = HashMap::new(); - for i in 0..9u8 { - strikes.insert(strike_peer(i), 4); - } - assert_eq!( - adaptive_timeout_threshold(&strikes, &target), - 4 + config::AUDIT_TIMEOUT_STRIKE_THRESHOLD - ); - assert!( - adaptive_timeout_threshold(&strikes, &target) > config::AUDIT_TIMEOUT_STRIKE_THRESHOLD - ); - } - - #[test] - fn adaptive_grace_only_responds_to_timeouts_not_deterministic_failures() { - // The strike map is fed ONLY by timeouts (plan_failed_audit records a - // strike for Timeout and never for confirmed failures). So a flood of - // deterministic failures cannot inflate the median to buy grace. - let target = strike_peer(101); - let mut strikes: HashMap = HashMap::new(); - // Many confirmed (non-timeout) failures: these must NOT touch the map. - for i in 0..9u8 { - let action = plan_failed_audit( - &AuditFailureReason::DigestMismatch, - &mut strikes, - &strike_peer(i), - ); - assert_eq!(action, AuditFailureAction::ConfirmedPenalize); - } - assert!( - strikes.is_empty(), - "deterministic failures must not record strikes" - ); - // Threshold stays at the base — an attacker cannot widen grace by - // failing audits on purpose. - assert_eq!( - adaptive_timeout_threshold(&strikes, &target), - config::AUDIT_TIMEOUT_STRIKE_THRESHOLD - ); - } - // ADR-0002: "occasional surprise exams, keeps load low" — the per-peer // cooldown must collapse a gossip flood into at most one audit per window. @@ -5130,35 +4790,6 @@ mod tests { } } - #[test] - fn inflated_adaptive_threshold_is_still_reachable_and_bounded() { - // When the median lifts the threshold above the base, a genuinely - // non-responsive peer's strike count must still be able to - // reach it (the count is no longer capped at the base). And the grace - // widening itself is bounded so it can't shield a bad node forever. - let target = strike_peer(200); - let mut strikes: HashMap = HashMap::new(); - // A cohort of other peers each at a high strike count. - for i in 0..9u8 { - strikes.insert(strike_peer(i), 10); - } - let threshold = adaptive_timeout_threshold(&strikes, &target); - // Grace is capped, so the threshold cannot exceed base + max grace. - assert!( - threshold <= config::AUDIT_TIMEOUT_STRIKE_THRESHOLD + super::MAX_ADAPTIVE_TIMEOUT_GRACE - ); - assert!(threshold > config::AUDIT_TIMEOUT_STRIKE_THRESHOLD); - // The target peer can accumulate strikes past that inflated threshold. - let mut c = 0; - for _ in 0..threshold + 5 { - c = record_audit_timeout_strike(&mut strikes, &target); - } - assert!( - timeout_strike_reaches_threshold(c, threshold), - "a persistent peer must be able to cross the inflated threshold ({c} vs {threshold})" - ); - } - #[test] fn audit_on_gossip_constants_match_adr() { // Tripwire on the ADR-locked tunables. The spot-check count sits at the @@ -5181,159 +4812,6 @@ mod tests { )); } - // E2E (pure decision): an honest peer that times out once, recovers, - // repeatedly, never reaches a penalty because each success resets strikes. - // FLIPS IF: the strike threshold is removed or success stops resetting. - #[test] - fn e2e_honest_intermittent_timeouts_never_penalized() { - let peer = strike_peer(10); - let base = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD; - let mut strikes: HashMap = HashMap::new(); - for _ in 0..10 { - let after = record_audit_timeout_strike(&mut strikes, &peer); - assert_eq!( - decide_audit_failure_action(&AuditFailureReason::Timeout, after, base), - AuditFailureAction::TimeoutGrace - ); - strikes.remove(&peer); - } - assert!(!strikes.contains_key(&peer)); - } - - // E2E: a peer that times out on EVERY audit (never reset) crosses the - // threshold and is penalized — the deterrent against non-storing peers. - // FLIPS IF: per-challenge window widened so it answers in time, or strikes - // reset without a success. - #[test] - fn e2e_persistent_timeouts_get_penalized() { - let peer = strike_peer(11); - let mut strikes: HashMap = HashMap::new(); - let threshold = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD; - let mut penalized_at = None; - for tick in 1..=(threshold + 2) { - let after = record_audit_timeout_strike(&mut strikes, &peer); - if decide_audit_failure_action(&AuditFailureReason::Timeout, after, threshold) - == AuditFailureAction::TimeoutPenalize - && penalized_at.is_none() - { - penalized_at = Some(tick); - } - } - assert_eq!(penalized_at, Some(threshold)); - } - - // Glue: a Timeout through the real plan_failed_audit MUST record a strike on - // the map AND penalize once enough accumulate. - // FLIPS IF: the handler stops feeding Timeout through the strike counter - // (e.g. strikes_after hard-coded to 0). (Mutation-verified.) - #[test] - fn e2e_glue_timeout_records_strike_and_penalizes_at_threshold() { - let peer = strike_peer(20); - let mut strikes: HashMap = HashMap::new(); - let threshold = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD; - let mut action = AuditFailureAction::TimeoutGrace; - for tick in 1..=threshold { - action = plan_failed_audit(&AuditFailureReason::Timeout, &mut strikes, &peer); - assert_eq!(strikes.get(&peer).copied(), Some(tick)); - } - assert_eq!(action, AuditFailureAction::TimeoutPenalize); - } - - // Glue: a confirmed failure through plan_failed_audit must NOT touch the - // strike map and must return ConfirmedPenalize. - #[test] - fn e2e_glue_confirmed_failure_leaves_strike_map_untouched() { - let peer = strike_peer(21); - let mut strikes: HashMap = HashMap::new(); - for reason in [ - AuditFailureReason::DigestMismatch, - AuditFailureReason::KeyAbsent, - AuditFailureReason::Rejected, - AuditFailureReason::MalformedResponse, - ] { - assert_eq!( - plan_failed_audit(&reason, &mut strikes, &peer), - AuditFailureAction::ConfirmedPenalize - ); - } - assert!(strikes.is_empty()); - } - - // ADR-0002 "Accounting and False Positives", adversarial: a DETERMINISTIC - // failure is acted on the FIRST time it occurs, "regardless of network - // conditions". Here the strike map is pre-loaded with many *other* peers - // timing out, which inflates the adaptive timeout grace to its cap — the - // most forgiving the network ever gets. Under that maximally-relaxed - // window: - // - a brand-new peer's FIRST deterministic failure (DigestMismatch / - // Rejected / MalformedResponse) STILL returns ConfirmedPenalize, never - // a grace lane, and never touches the strike map; while - // - that same peer's FIRST timeout is only TimeoutGrace. - // This proves the inflated grace is the timeout-only lane and can NEVER be - // weaponized to buy a deterministic failure even one round of delay. - // FLIPS IF: deterministic failures start consulting the strike threshold, - // or ConfirmedPenalize is collapsed into a timeout action. - #[test] - fn deterministic_failure_penalizes_first_time_under_inflated_grace() { - let mut strikes: HashMap = HashMap::new(); - // Saturate the adaptive grace: many other peers each carrying a high - // consecutive-timeout count, so the median (and thus the grace) is - // pushed to its MAX cap for any newly-judged peer. - for b in 100..150u8 { - let other = strike_peer(b); - for _ in 0..AUDIT_TIMEOUT_STRIKE_MAX { - record_audit_timeout_strike(&mut strikes, &other); - } - } - let victim = strike_peer(7); - // Sanity: the grace seen by the victim is genuinely inflated above base. - let inflated = adaptive_timeout_threshold(&strikes, &victim); - assert!( - inflated > config::AUDIT_TIMEOUT_STRIKE_THRESHOLD, - "test precondition: grace must be inflated, got {inflated}" - ); - - // First deterministic failure of each kind -> ConfirmedPenalize on - // occurrence #1, and the victim is never inserted into the strike map. - for reason in [ - AuditFailureReason::DigestMismatch, - AuditFailureReason::Rejected, - AuditFailureReason::MalformedResponse, - ] { - let action = plan_failed_audit(&reason, &mut strikes, &victim); - assert_eq!( - action, - AuditFailureAction::ConfirmedPenalize, - "{reason:?} must penalize on the first occurrence regardless of grace" - ); - assert_ne!( - action, - AuditFailureAction::TimeoutPenalize, - "a deterministic failure must NOT be routed through the (eviction-gated) \ - timeout-penalize lane" - ); - assert!( - !strikes.contains_key(&victim), - "deterministic failure must not touch the timeout strike map" - ); - // And it always revokes holder credit / clears the claim. - assert!(audit_failure_revokes_holder_credit(&reason)); - assert!(audit_failure_clears_bootstrap_claim(&reason)); - } - - // The SAME victim's first timeout, under the same inflated grace, is - // only TimeoutGrace (no penalty, no revocation, claim retained). - let timeout_action = plan_failed_audit(&AuditFailureReason::Timeout, &mut strikes, &victim); - assert_eq!(timeout_action, AuditFailureAction::TimeoutGrace); - assert_eq!(strikes.get(&victim).copied(), Some(1)); - assert!(!audit_failure_revokes_holder_credit( - &AuditFailureReason::Timeout - )); - assert!(!audit_failure_clears_bootstrap_claim( - &AuditFailureReason::Timeout - )); - } - /// The exact decision the `Failed` arm of `handle_subtree_audit_result` /// uses: confirmed failures revoke credit, `Timeout` does not. #[test] @@ -5433,7 +4911,7 @@ mod tests { #[test] fn first_failed_key_label_falls_back_when_empty() { - // Should never happen in production (handle_audit_failure rejects + // Should never happen in production (audit failure handling rejects // empty sets), but the formatter must still produce a valid label // so the log line doesn't contain a misleading default. assert_eq!(first_failed_key_label(&[]), "0x"); diff --git a/src/replication/possession.rs b/src/replication/possession.rs index 79b3b6a..cab27cf 100644 --- a/src/replication/possession.rs +++ b/src/replication/possession.rs @@ -1,32 +1,51 @@ //! Delayed possession verification for fresh replication (ADR-0003). //! //! After a node fresh-replicates a chunk, every close-group peer responsible -//! for it is checked 5-15 minutes later for actual possession. A peer that -//! holds the chunk earns nothing — storing what it was paid to store is the -//! baseline expectation, not meritorious; a peer confirmed *not* to hold it is -//! penalised at `AuditChallenge` severity. Delivery of the original push is -//! irrelevant: a peer the push never reached is still checked and penalised if -//! it lacks the chunk. A peer merely unreachable at check time yields no -//! verdict — it is re-attempted under a bounded grace and never penalised as -//! absent. +//! for it is checked 5-15 minutes later for actual possession. The check is a +//! single-key cryptographic +//! [`AuditChallenge`](crate::replication::protocol::AuditChallenge): the probed +//! peer must return `BLAKE3(nonce ‖ peer_id ‖ key ‖ bytes)` computed over the +//! chunk it claims to hold. It cannot produce that digest without the bytes, so +//! — unlike a self-reported presence flag — a peer cannot escape the check by +//! falsely asserting possession. A peer that holds the chunk earns nothing — +//! storing what it was paid to store is the baseline expectation, not +//! meritorious; a peer that returns the absent sentinel, or a digest that does +//! not match the checker's canonical copy (cryptographic proof it lacks the +//! bytes), is penalised at `AuditChallenge` severity. Delivery of the original +//! push is irrelevant: a peer the push never reached is still checked and +//! penalised if it lacks the chunk. +//! +//! A peer unreachable at check time is penalised immediately at audit severity, +//! matching the responsible-chunk `AuditChallenge` path. A matching bootstrap +//! claim uses the shared bootstrap-claim grace/abuse tracker; peer-side +//! malformed, rejected, or mismatched responses are audit failures. use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use rand::Rng; use saorsa_core::identity::PeerId; use saorsa_core::{P2PNode, TrustEvent}; +use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; use crate::ant_protocol::XorName; use crate::logging::{debug, warn}; use crate::replication::config::{ - self, POSSESSION_CHECK_MAX_ATTEMPTS, POSSESSION_CHECK_RETRY_BACKOFF, POSSESSION_CHECK_TIMEOUT, - REPLICATION_PROTOCOL_ID, + ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT, REPLICATION_PROTOCOL_ID, }; use crate::replication::protocol::{ - ReplicationMessage, ReplicationMessageBody, VerificationRequest, + compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage, + ReplicationMessageBody, ABSENT_KEY_DIGEST, }; +use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState}; +use crate::storage::LmdbStorage; + +use super::REPLICATION_TRUST_WEIGHT; + +/// A possession probe challenges exactly one key, so the per-probe response +/// budget is the audit-response timeout sized for a single chunk. +const POSSESSION_PROBE_KEY_COUNT: usize = 1; /// A scheduled possession check for one freshly-replicated chunk. pub struct PossessionCheckEvent { @@ -36,14 +55,22 @@ pub struct PossessionCheckEvent { pub peers: Vec, } -/// Verdict of probing a single peer for possession of a chunk. +/// Verdict of cryptographically probing a single peer for possession of a chunk. +#[cfg_attr(test, derive(Debug, PartialEq, Eq))] enum ProbeOutcome { - /// Peer confirmed it holds the chunk. + /// Peer returned a digest proving it holds the chunk's bytes. Present, - /// Peer confirmed it does not hold the chunk. - Absent, - /// No verdict obtained (timeout / transport error / malformed response). - NoVerdict, + /// Peer failed the audit challenge: absent sentinel, digest mismatch, + /// rejection, mismatched challenge ID, wrong digest count, or malformed reply. + Failed, + /// No response (transport error / deadline). Penalised immediately at + /// audit-failure severity. + Timeout, + /// Peer returned a matching bootstrap claim. Graced only through the shared + /// bootstrap-claim tracker. + BootstrapClaim, + /// The probe could not be sent locally. Graced: no penalty. + Inconclusive, } /// Pick a randomised delay in `[min, max]` to wait before a possession check @@ -62,99 +89,205 @@ pub fn random_delay(min: Duration, max: Duration) -> Duration { /// Run the possession check for one chunk against every responsible peer. /// -/// Penalises each peer confirmed absent at `AuditChallenge` severity, leaves -/// present peers unrewarded, and never penalises a peer that only failed to -/// yield a verdict. -pub async fn run_possession_check( +/// Recomputes the expected audit digest from the checker's own canonical copy +/// of `key`, so the check is meaningful only while the checker still holds the +/// chunk — which it does immediately after accepting and fresh-replicating a +/// PUT. If the checker no longer holds it (e.g. pruned), the check is moot and +/// is skipped without penalising anyone. +/// +/// A peer that fails to prove possession, including by timeout, is penalised at +/// `AuditChallenge` severity immediately. A responsive peer is left unrewarded. +pub(crate) async fn run_possession_check( key: XorName, peers: Vec, p2p_node: &Arc, + storage: &Arc, + config: &ReplicationConfig, + sync_state: &Arc>, shutdown: &CancellationToken, ) { let key_hex = hex::encode(key); + + // Read our canonical copy once: the audit digest is recomputed from these + // bytes for every peer (hoisted out of the per-peer loop). If we no longer + // hold the chunk we cannot verify any peer's proof, and we are no longer a + // responsible checker for it — skip without penalising anyone. + let local_bytes = match storage.get_raw(&key).await { + Ok(Some(bytes)) => bytes, + Ok(None) => { + debug!("Possession check: checker no longer holds {key_hex}; skipping"); + return; + } + Err(e) => { + warn!("Possession check: failed to read local {key_hex}: {e}; skipping"); + return; + } + }; + + // Single-key probe budget, matched to the audit response timeout's + // bandwidth-calibrated deadline (tight enough that a relay that must refetch + // the bytes blows it, generous for an honest local-disk read). + let probe_timeout = config.audit_response_timeout(POSSESSION_PROBE_KEY_COUNT); + for peer in peers { if shutdown.is_cancelled() { return; } - match probe_with_grace(&key, &peer, p2p_node, shutdown).await { + match probe_once(&key, &local_bytes, &peer, p2p_node, probe_timeout).await { ProbeOutcome::Present => { - debug!("Possession check: {peer} holds {key_hex}"); + debug!("Possession check: {peer} proved possession of {key_hex}"); + clear_possession_bootstrap_claim(&peer, sync_state).await; } - ProbeOutcome::Absent => { - warn!( - "Possession check: {peer} is missing {key_hex}; penalising at audit severity" - ); - p2p_node - .report_trust_event( - &peer, - TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT), - ) + ProbeOutcome::Failed => { + clear_possession_bootstrap_claim(&peer, sync_state).await; + report_possession_audit_failure( + &peer, + &key_hex, + "failed to prove possession", + p2p_node, + ) + .await; + } + ProbeOutcome::Timeout => { + report_possession_audit_failure(&peer, &key_hex, "timed out", p2p_node).await; + } + ProbeOutcome::BootstrapClaim => { + handle_possession_bootstrap_claim(&peer, &key_hex, p2p_node, config, sync_state) .await; } - ProbeOutcome::NoVerdict => { + ProbeOutcome::Inconclusive => { debug!( - "Possession check: no verdict from {peer} for {key_hex} after grace; \ - not penalised" + "Possession check: inconclusive probe of {peer} for {key_hex}; not penalised" ); } } } } -/// Probe a peer for possession, re-attempting on no-verdict up to the grace -/// bound. A definite Present/Absent verdict short-circuits immediately. -async fn probe_with_grace( - key: &XorName, +async fn clear_possession_bootstrap_claim( + peer: &PeerId, + sync_state: &Arc>, +) { + sync_state.write().await.clear_active_bootstrap_claim(peer); +} + +async fn report_possession_audit_failure( peer: &PeerId, + key_hex: &str, + reason: &str, p2p_node: &Arc, - shutdown: &CancellationToken, -) -> ProbeOutcome { - for attempt in 1..=POSSESSION_CHECK_MAX_ATTEMPTS { - match probe_once(key, peer, p2p_node).await { - ProbeOutcome::NoVerdict if attempt < POSSESSION_CHECK_MAX_ATTEMPTS => { - tokio::select! { - () = shutdown.cancelled() => return ProbeOutcome::NoVerdict, - () = tokio::time::sleep(POSSESSION_CHECK_RETRY_BACKOFF) => {} - } - } - outcome => return outcome, +) { + warn!("Possession check: {peer} {reason} for {key_hex}; penalising at audit severity"); + p2p_node + .report_trust_event( + peer, + TrustEvent::ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT), + ) + .await; +} + +async fn handle_possession_bootstrap_claim( + peer: &PeerId, + key_hex: &str, + p2p_node: &Arc, + config: &ReplicationConfig, + sync_state: &Arc>, +) { + let (now, observation) = { + let now = Instant::now(); + let mut state = sync_state.write().await; + ( + now, + state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period), + ) + }; + + match observation { + BootstrapClaimObservation::WithinGrace { .. } => { + debug!( + "Possession check: peer {peer} claims bootstrapping for {key_hex} \ + (within grace period)" + ); + } + BootstrapClaimObservation::PastGrace { first_seen } => { + warn!( + "Possession check: peer {peer} claiming bootstrap for {key_hex} past grace period \ + ({:?} > {:?}), reporting abuse", + now.duration_since(first_seen), + config.bootstrap_claim_grace_period, + ); + p2p_node + .report_trust_event( + peer, + TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT), + ) + .await; + } + BootstrapClaimObservation::Repeated { first_seen } => { + warn!( + "Possession check: peer {peer} repeated bootstrap claim for {key_hex} after \ + previously stopping; first claim was {:?} ago, reporting abuse", + now.duration_since(first_seen), + ); + p2p_node + .report_trust_event( + peer, + TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT), + ) + .await; } } - ProbeOutcome::NoVerdict } -/// Send one presence-only `VerificationRequest` and interpret the response. -async fn probe_once(key: &XorName, peer: &PeerId, p2p_node: &Arc) -> ProbeOutcome { - let request = VerificationRequest { +/// Send one single-key cryptographic [`AuditChallenge`] and interpret the +/// response. The peer proves possession by returning +/// `compute_audit_digest(nonce, peer, key, bytes)`; absence is proven by the +/// [`ABSENT_KEY_DIGEST`] sentinel or any digest that does not match the +/// checker's canonical copy. A transport failure / deadline is a `Timeout`; a +/// matching bootstrap response is a `BootstrapClaim`; a local encode failure is +/// `Inconclusive`; peer-side malformed, rejected, or mismatched replies are +/// `Failed`. +async fn probe_once( + key: &XorName, + local_bytes: &[u8], + peer: &PeerId, + p2p_node: &Arc, + probe_timeout: Duration, +) -> ProbeOutcome { + // Fresh nonce per probe so a stored digest cannot be replayed, and bind the + // challenge to this peer's identity so it cannot relay another node's proof. + let (nonce, challenge_id) = { + let mut rng = rand::thread_rng(); + let nonce: [u8; 32] = rng.gen(); + let challenge_id: u64 = rng.gen(); + (nonce, challenge_id) + }; + let challenge = AuditChallenge { + challenge_id, + nonce, + challenged_peer_id: *peer.as_bytes(), keys: vec![*key], - // Presence-only: no paid-list status is needed to judge possession. - paid_list_check_indices: Vec::new(), }; let msg = ReplicationMessage { - request_id: rand::random(), - body: ReplicationMessageBody::VerificationRequest(request), + request_id: challenge_id, + body: ReplicationMessageBody::AuditChallenge(challenge), }; let Ok(encoded) = msg.encode() else { warn!( - "Failed to encode possession request for {}", + "Failed to encode possession challenge for {}", hex::encode(key) ); - return ProbeOutcome::NoVerdict; + return ProbeOutcome::Inconclusive; }; let response = match p2p_node - .send_request( - peer, - REPLICATION_PROTOCOL_ID, - encoded, - POSSESSION_CHECK_TIMEOUT, - ) + .send_request(peer, REPLICATION_PROTOCOL_ID, encoded, probe_timeout) .await { Ok(response) => response, Err(e) => { - debug!("Possession probe to {peer} failed: {e}"); - return ProbeOutcome::NoVerdict; + debug!("Possession probe to {peer} got no response: {e}"); + return ProbeOutcome::Timeout; } }; @@ -162,19 +295,67 @@ async fn probe_once(key: &XorName, peer: &PeerId, p2p_node: &Arc) -> Pr Ok(decoded) => decoded, Err(e) => { debug!("Failed to decode possession response from {peer}: {e}"); - return ProbeOutcome::NoVerdict; + return ProbeOutcome::Failed; } }; - let ReplicationMessageBody::VerificationResponse(resp) = decoded.body else { + let ReplicationMessageBody::AuditResponse(resp) = decoded.body else { debug!("Unexpected possession response type from {peer}"); - return ProbeOutcome::NoVerdict; + return ProbeOutcome::Failed; }; - match resp.results.iter().find(|r| r.key == *key) { - Some(result) if result.present => ProbeOutcome::Present, - Some(_) => ProbeOutcome::Absent, - None => ProbeOutcome::NoVerdict, + interpret_audit_response( + key, + local_bytes, + peer.as_bytes(), + &nonce, + challenge_id, + resp, + ) +} + +/// Classify an [`AuditResponse`] into a possession verdict. Pure (no I/O): the +/// digest is verified against `local_bytes`, the checker's canonical copy. +fn interpret_audit_response( + key: &XorName, + local_bytes: &[u8], + challenged_peer_id: &[u8; 32], + nonce: &[u8; 32], + challenge_id: u64, + response: AuditResponse, +) -> ProbeOutcome { + match response { + AuditResponse::Digests { + challenge_id: resp_id, + digests, + } => { + if resp_id != challenge_id || digests.len() != 1 { + return ProbeOutcome::Failed; + } + let received = digests[0]; + if received == ABSENT_KEY_DIGEST { + return ProbeOutcome::Failed; + } + let expected = compute_audit_digest(nonce, challenged_peer_id, key, local_bytes); + if received == expected { + ProbeOutcome::Present + } else { + // A non-sentinel digest that does not match our canonical bytes + // proves the peer cannot reproduce the content — treat as absent + // (matches the audit's DigestMismatch handling). + ProbeOutcome::Failed + } + } + AuditResponse::Bootstrapping { + challenge_id: resp_id, + } => { + if resp_id == challenge_id { + ProbeOutcome::BootstrapClaim + } else { + ProbeOutcome::Failed + } + } + AuditResponse::Rejected { .. } => ProbeOutcome::Failed, } } @@ -183,6 +364,19 @@ mod tests { use super::*; use crate::replication::config::{POSSESSION_CHECK_DELAY_MAX, POSSESSION_CHECK_DELAY_MIN}; + const PEER_ID: [u8; 32] = [0x42; 32]; + const NONCE: [u8; 32] = [0x7a; 32]; + const CHALLENGE_ID: u64 = 0xDEAD_BEEF; + const KEY: XorName = [0x11; 32]; + const BYTES: &[u8] = b"possession-check payload"; + + fn digests_response(challenge_id: u64, digests: Vec<[u8; 32]>) -> AuditResponse { + AuditResponse::Digests { + challenge_id, + digests, + } + } + #[test] fn random_delay_is_within_bounds() { for _ in 0..100 { @@ -191,4 +385,141 @@ mod tests { assert!(d <= POSSESSION_CHECK_DELAY_MAX); } } + + #[test] + fn matching_digest_is_present() { + let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES); + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + digests_response(CHALLENGE_ID, vec![valid]), + ); + assert_eq!(verdict, ProbeOutcome::Present); + } + + #[test] + fn absent_sentinel_is_failed() { + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + digests_response(CHALLENGE_ID, vec![ABSENT_KEY_DIGEST]), + ); + assert_eq!(verdict, ProbeOutcome::Failed); + } + + #[test] + fn forged_digest_is_failed() { + // A peer that lacks the bytes cannot compute the right digest; whatever + // non-sentinel value it sends must not match our canonical copy. + let forged = [0x99; 32]; + let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES); + assert_ne!(forged, valid, "test fixture must use a wrong digest"); + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + digests_response(CHALLENGE_ID, vec![forged]), + ); + assert_eq!(verdict, ProbeOutcome::Failed); + } + + #[test] + fn mismatched_challenge_id_is_failed() { + let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES); + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + digests_response(CHALLENGE_ID.wrapping_add(1), vec![valid]), + ); + assert_eq!(verdict, ProbeOutcome::Failed); + } + + #[test] + fn wrong_arity_is_failed() { + let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES); + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + digests_response(CHALLENGE_ID, vec![valid, ABSENT_KEY_DIGEST]), + ); + assert_eq!(verdict, ProbeOutcome::Failed); + } + + #[test] + fn bootstrapping_is_bootstrap_claim() { + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + AuditResponse::Bootstrapping { + challenge_id: CHALLENGE_ID, + }, + ); + assert_eq!(verdict, ProbeOutcome::BootstrapClaim); + } + + #[test] + fn bootstrapping_with_wrong_challenge_id_is_failed() { + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + AuditResponse::Bootstrapping { + challenge_id: CHALLENGE_ID.wrapping_add(1), + }, + ); + assert_eq!(verdict, ProbeOutcome::Failed); + } + + #[tokio::test] + async fn possession_success_clears_active_bootstrap_claim_but_keeps_history() { + let peer = PeerId::from_bytes(PEER_ID); + let sync_state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(Vec::new()))); + { + let mut state = sync_state.write().await; + let now = Instant::now(); + state.bootstrap_claims.insert(peer, now); + state.bootstrap_claim_history.insert(peer, now); + } + + clear_possession_bootstrap_claim(&peer, &sync_state).await; + + let state = sync_state.read().await; + assert!(!state.bootstrap_claims.contains_key(&peer)); + assert!(state.bootstrap_claim_history.contains_key(&peer)); + } + + #[test] + fn rejected_is_failed() { + let verdict = interpret_audit_response( + &KEY, + BYTES, + &PEER_ID, + &NONCE, + CHALLENGE_ID, + AuditResponse::Rejected { + challenge_id: CHALLENGE_ID, + reason: "nope".to_string(), + }, + ); + assert_eq!(verdict, ProbeOutcome::Failed); + } } diff --git a/src/replication/protocol.rs b/src/replication/protocol.rs index 5058bab..1181de5 100644 --- a/src/replication/protocol.rs +++ b/src/replication/protocol.rs @@ -438,11 +438,9 @@ pub enum SubtreeAuditResponse { /// peer would not prove without touching credit it legitimately re-earned for a /// newer commitment. Lying therefore does not let a deleter keep "proven holder" /// status for that root until the credit TTL — the loophole a plain timeout -/// would leave. It also accumulates a timeout strike, so a peer -/// that self-graces on every audit still crosses the strike threshold once -/// timeout-eviction is enabled. An honest peer that genuinely rotated simply -/// re-earns credit on the next audit of its current commitment, so the grace -/// strips no peer that is actually holding its responsible data. The grace +/// would leave. A peer that self-graces on every audit remains uncredited for the +/// pinned commitments it refuses to prove; an honest peer that genuinely rotated +/// simply re-earns credit on the next audit of its current commitment. The grace /// removes only the false TRUST PENALTY for the genuinely-ambiguous /// rotated/transient case; it does not remove the possession requirement. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -642,7 +640,7 @@ mod tests { // The auditor batches its round-2 sample to MAX_BYTE_CHALLENGE_KEYS per // challenge precisely so this worst case — every requested chunk at // MAX_CHUNK_SIZE — still encodes. If this fails, honest responders - // would hit encode errors and be penalized as timeouts. + // would hit encode errors and fail otherwise valid byte challenges. let items: Vec = (0..crate::replication::config::MAX_BYTE_CHALLENGE_KEYS) .map(|i| SubtreeByteItem::Present { key: [u8::try_from(i).unwrap_or(u8::MAX); 32], diff --git a/src/replication/pruning.rs b/src/replication/pruning.rs index 9727836..e1e0d3f 100644 --- a/src/replication/pruning.rs +++ b/src/replication/pruning.rs @@ -1154,23 +1154,10 @@ async fn peer_proves_record( let encoded = encode_prune_audit_challenge(&peer, key, challenge_id, nonce)?; let Some(decoded) = send_prune_audit_challenge(&peer, &key, encoded, p2p_node, config).await else { - // No decoded response means a timeout or an undecodable reply — the - // same "no response" case the main audit path treats as a timeout. The - // penalty is gated behind `TIMEOUT_EVICTION_ENABLED` (off this - // release — a not-yet-upgraded or briefly-slow peer must not be evicted - // by a no-response during the breaking rollout). Mirrors the suppressed - // timeout penalty in handle_failed_audit; only a DECODED - // PruneAuditStatus::Failed below (a peer that answered with bad/absent - // bytes) is penalised regardless of the gate. - if crate::replication::config::TIMEOUT_EVICTION_ENABLED { - report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await; - } else { - debug!( - "Prune audit for {peer} key {} got no decodable response \ - (eviction disabled this release — not penalising)", - hex::encode(key) - ); - } + // No decoded response means a timeout or malformed reply. Prune + // confirmation reuses `AuditChallenge` semantics, so this is an immediate + // audit failure just like a decoded bad proof below. + report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await; return None; }; From e2e022df0b811295c3bd011396fbbdad7a9425e2 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 29 Jun 2026 18:12:38 +0200 Subject: [PATCH 12/12] test(replication): seed checker storage for possession audits --- tests/e2e/replication.rs | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/tests/e2e/replication.rs b/tests/e2e/replication.rs index ab3dc01..df9e05a 100644 --- a/tests/e2e/replication.rs +++ b/tests/e2e/replication.rs @@ -295,6 +295,18 @@ async fn possession_check_penalises_absent_peer_only() { let content = b"adr-0003 possession-check payload"; let address = compute_address(content); + // The checker A must hold the chunk it verifies: the possession check + // recomputes the audit digest from its own canonical copy. In production the + // PUT handler stores K before fresh-replicating; here we store it on the + // checker explicitly. + a.ant_protocol + .as_ref() + .expect("proto a") + .storage() + .put(&address, content) + .await + .expect("put on a (checker)"); + // C holds the chunk; B never stores it. c.ant_protocol .as_ref() @@ -392,6 +404,17 @@ async fn possession_scheduler_penalises_absent_close_peer_after_delay() { let trust_before: Vec = close_group.iter().map(|p| p2p_a.peer_trust(p)).collect(); + // The checker must hold the chunk it later probes for: the possession check + // recomputes the audit digest from its own copy. `replicate_fresh` assumes + // the PUT handler already stored K locally, so store it on the checker here. + a.ant_protocol + .as_ref() + .expect("proto a") + .storage() + .put(&address, content) + .await + .expect("put on a (checker)"); + // Trigger fresh replication; the engine enqueues the possession check, which // fires ~200-500 ms later and penalises the absent close peers. let dummy_pop = [0x01u8; 64]; @@ -536,6 +559,18 @@ async fn full_close_group_node_rejects_replica_and_is_penalised_as_absent() { "full node must not store the rejected replica" ); + // The checker must hold the chunk it probes for: the possession check + // recomputes the audit digest from its own copy. `replicate_fresh` assumes + // the PUT handler already stored K locally, so store it on the checker here. + checker + .ant_protocol + .as_ref() + .expect("checker protocol") + .storage() + .put(&address, &content) + .await + .expect("put on checker"); + let trust_before = checker_p2p.peer_trust(&full_peer); checker_engine .replicate_fresh(&address, &content, &dummy_payment_proof)