feat(blockchain): start aggregation early when 2/3 of subnet signatures have arrived#487
feat(blockchain): start aggregation early when 2/3 of subnet signatures have arrived#487MegaRedHand wants to merge 30 commits into
Conversation
…me_ms, spec corrections)
… count into actor
…on, add log identity fields
🤖 Kimi Code ReviewThis PR implements early aggregation start with proper threshold checks, publish alignment, and clean plumbing. The logic is sound and follows the actor model patterns established in the codebase. Correctness & Safety
Performance
Code Quality
Nits
Consensus Considerations The 800 ms deadline change (from 750 ms) removes the previous 50 ms propagation margin. The comment correctly notes this is acceptable because the deadline only stops new jobs, not mid-proof work. Verify in the devnet run (Task 9) that proof completion + publish doesn't regularly overrun into interval 3. The publish alignment mechanism (buffering aggregates produced before T2) correctly prevents premature gossip while allowing early computation. The flush timer scheduled via No blocking issues found. The implementation matches the design spec and maintains backward compatibility (interval-2 fallback path remains intact). Automated review by Kimi (Moonshot AI) · kimi-k2.5 · custom prompt |
🤖 Codex Code ReviewFindings
Aside from that race, the early-start threshold/latch flow looks internally consistent. I couldn’t run the requested Rust verification in this environment because Automated review by OpenAI Codex · gpt-5.4 · custom prompt |
🤖 Claude Code ReviewReview:
|
Greptile SummaryThis PR introduces an early-aggregation trigger that allows the XMSS proof worker to start up to 600 ms before the interval-2 boundary when a 2/3 quorum of expected gossip signatures has arrived, giving the expensive leanVM computations more wall-clock runway before block-building consumes the proofs at interval 4.
Confidence Score: 5/5The early-aggregation path is well-guarded and the change is safe to merge; the single finding is a metric accuracy nit in a documented edge case. The early-aggregation path is protected by a time-window check, a once-per-slot latch, and session_id fencing on delayed messages — no double-starts, no premature gossip, no cross-slot leakage. The only finding is a metric overcounting issue in the documented latch-hole edge case (corrupted validator registry), which has no effect on aggregation correctness or chain consensus. crates/blockchain/src/lib.rs — the early-metric emission ordering in start_aggregation_session is the one area worth a second look.
|
| Filename | Overview |
|---|---|
| crates/blockchain/src/aggregation.rs | Adds publish-alignment logic to the worker via send_after keyed on publish_at (the T2 wall-clock instant), new message types for early-check and deadline handling, and the EARLY_AGGREGATION_WINDOW_MS const with a compile-time bounds assertion. |
| crates/blockchain/src/lib.rs | Core orchestration changes: interval-1 schedules EarlyAggregationCheck, interval-2 skips when a session is already present, maybe_start_early_aggregation checks the time window and threshold, and start_aggregation_session now computes publish_at and emits coverage/metrics. Early metric and log fire before the snapshot check, which can produce a misleading count when no snapshot is available. |
| crates/blockchain/src/metrics.rs | Adds LEAN_AGGREGATION_EARLY_STARTS_TOTAL counter and LEAN_AGGREGATION_EARLY_START_LEAD_SECONDS histogram, both registered in init(); straightforward metrics additions. |
| crates/storage/src/store.rs | Adds max_gossip_group_count_for_slot to Store backed by a single-lock scan over the gossip-signature buffer; cheap and correct for the per-insert threshold check. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant T as Timer
participant A as BlockChainServer
participant W as Worker
participant P as P2P
A->>T: schedule EarlyAggregationCheck in 200ms (interval-1 tick)
T->>A: EarlyAggregationCheck fires at T2-600ms
A->>A: maybe_start_early_aggregation
alt threshold met
A->>W: spawn run_aggregation_worker publish_at equals T2
A->>T: schedule AggregationDeadline in 800ms
W->>W: aggregate_job
W->>T: send_after delay_to_T2 AggregateProduced
T->>A: AggregateProduced at T2
A->>P: publish_aggregated_attestation
T->>A: AggregationDeadline at T2 plus 200ms
A->>W: cancel
end
A->>A: interval-2 tick skips if session already present
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant T as Timer
participant A as BlockChainServer
participant W as Worker
participant P as P2P
A->>T: schedule EarlyAggregationCheck in 200ms (interval-1 tick)
T->>A: EarlyAggregationCheck fires at T2-600ms
A->>A: maybe_start_early_aggregation
alt threshold met
A->>W: spawn run_aggregation_worker publish_at equals T2
A->>T: schedule AggregationDeadline in 800ms
W->>W: aggregate_job
W->>T: send_after delay_to_T2 AggregateProduced
T->>A: AggregateProduced at T2
A->>P: publish_aggregated_attestation
T->>A: AggregationDeadline at T2 plus 200ms
A->>W: cancel
end
A->>A: interval-2 tick skips if session already present
Reviews (2): Last reviewed commit: "Merge branch 'main' into feat/early-aggr..." | Re-trigger Greptile
| pub fn compute_subscription_subnets( | ||
| validator_ids: &[u64], | ||
| attestation_committee_count: u64, | ||
| is_aggregator: bool, | ||
| aggregate_subnet_ids: Option<&[u64]>, | ||
| ) -> HashSet<u64> { | ||
| let mut subnets: HashSet<u64> = validator_ids | ||
| .iter() | ||
| .map(|vid| vid % attestation_committee_count) | ||
| .collect(); |
There was a problem hiding this comment.
The new public function panics with integer overflow (division by zero) if
attestation_committee_count == 0 and validator_ids is non-empty — the same latent issue that existed inline in build_swarm, but now exposed as a callable public API without a documented precondition. Adding an early guard keeps the semantics identical to a zero-committee node (no validators belong to any subnet) while making the contract explicit.
| pub fn compute_subscription_subnets( | |
| validator_ids: &[u64], | |
| attestation_committee_count: u64, | |
| is_aggregator: bool, | |
| aggregate_subnet_ids: Option<&[u64]>, | |
| ) -> HashSet<u64> { | |
| let mut subnets: HashSet<u64> = validator_ids | |
| .iter() | |
| .map(|vid| vid % attestation_committee_count) | |
| .collect(); | |
| pub fn compute_subscription_subnets( | |
| validator_ids: &[u64], | |
| attestation_committee_count: u64, | |
| is_aggregator: bool, | |
| aggregate_subnet_ids: Option<&[u64]>, | |
| ) -> HashSet<u64> { | |
| let mut subnets: HashSet<u64> = if attestation_committee_count == 0 { | |
| HashSet::new() | |
| } else { | |
| validator_ids | |
| .iter() | |
| .map(|vid| vid % attestation_committee_count) | |
| .collect() | |
| }; |
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/p2p/src/lib.rs
Line: 194-203
Comment:
The new public function panics with integer overflow (division by zero) if `attestation_committee_count == 0` and `validator_ids` is non-empty — the same latent issue that existed inline in `build_swarm`, but now exposed as a callable public API without a documented precondition. Adding an early guard keeps the semantics identical to a zero-committee node (no validators belong to any subnet) while making the contract explicit.
```suggestion
pub fn compute_subscription_subnets(
validator_ids: &[u64],
attestation_committee_count: u64,
is_aggregator: bool,
aggregate_subnet_ids: Option<&[u64]>,
) -> HashSet<u64> {
let mut subnets: HashSet<u64> = if attestation_committee_count == 0 {
HashSet::new()
} else {
validator_ids
.iter()
.map(|vid| vid % attestation_committee_count)
.collect()
};
```
How can I resolve this? If you propose a fix, please make it concise.| impl Handler<FlushAggregatePublishes> for BlockChainServer { | ||
| async fn handle(&mut self, _msg: FlushAggregatePublishes, _ctx: &Context<Self>) { | ||
| let pending = std::mem::take(&mut self.pending_aggregate_publishes); | ||
| if pending.is_empty() { | ||
| return; | ||
| } | ||
| let session_id = self.current_aggregation.as_ref().map(|s| s.session_id); | ||
| info!( | ||
| session_id, | ||
| count = pending.len(), | ||
| "Publishing aggregates held back until the interval-2 boundary" | ||
| ); | ||
| for aggregate in pending { | ||
| self.publish_aggregate(aggregate); | ||
| } | ||
| } |
There was a problem hiding this comment.
FlushAggregatePublishes carries no slot or session identifier, so it cannot distinguish which slot's buffer to drain. In normal operation this is fine — the timer fires within 400 ms and the next session is ≥4 s away. However, if the message is delayed past the start of the next session, start_aggregation_session drains the stale buffer (correct) but any aggregates the new session subsequently buffers before the stale flush message is processed would be published prematurely, violating the publish-alignment invariant. Adding a slot: u64 field and checking it against current_aggregation.session_id before draining would eliminate this window entirely.
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/blockchain/src/lib.rs
Line: 1221-1236
Comment:
`FlushAggregatePublishes` carries no slot or session identifier, so it cannot distinguish which slot's buffer to drain. In normal operation this is fine — the timer fires within 400 ms and the next session is ≥4 s away. However, if the message is delayed past the start of the next session, `start_aggregation_session` drains the stale buffer (correct) but any aggregates the *new* session subsequently buffers before the stale flush message is processed would be published prematurely, violating the publish-alignment invariant. Adding a `slot: u64` field and checking it against `current_aggregation.session_id` before draining would eliminate this window entirely.
How can I resolve this? If you propose a fix, please make it concise.…div-by-zero and window underflow Addresses PR #487 review feedback: - FlushAggregatePublishes now carries its session slot and drains only when it matches current_aggregation, matching AggregationDeadline's fencing. A flush timer delayed past the next session start can no longer publish the newer session's buffered aggregates before its interval-2 boundary. - compute_subscription_subnets returns an empty set for a zero committee count instead of dividing by zero (defensive; callers enforce >= 1). - const assert that EARLY_AGGREGATION_WINDOW_MS fits within one interval, making the no-underflow invariant self-enforcing.
Halve EARLY_AGGREGATION_WINDOW_MS (400 -> 200). Rescale the lean_aggregation_early_start_lead_seconds histogram buckets to 0.025-0.2s so they keep full resolution over the new window, and update the spec/plan (also fixing an inverted deadline-bound phrasing).
Window opens at T2-600 = 200ms into interval 1, so the trigger can fire as soon as attestations start arriving. Rescale the lead-time histogram buckets to 0.075-0.6s to match.
Replace the per-validator subnet scan in BlockChain::spawn with the closed form 2*N/(3*C) (two-thirds of one committee's expected votes), folding the 2/3 fraction into the precomputed threshold so early_threshold_met is now a direct >= comparison. Drop the now-unused aggregation_subnets parameter and its main.rs plumbing; compute_subscription_subnets is only used inside the p2p crate now, so downgrade it to pub(crate).
…ntime Drop the cached genesis_time_ms and early_aggregation_min_group_sigs fields from BlockChainServer. Genesis time is read via store.config() at each use site (matching the tick paths); the early-aggregation threshold is computed on demand from head_state, reached only inside the early window and only until a session starts.
…hAggregatePublishes Move publish alignment from the actor into the aggregation worker. The worker now receives the interval-2 boundary timestamp and, for each aggregate it produces before that boundary, uses send_after to delay the AggregateProduced message until the boundary (sending immediately if already past it). This removes the FlushAggregatePublishes self-message, the pending_aggregate_publishes buffer, and the T2 check in the AggregateProduced handler. Delivery timers are tied to the actor's cancellation token via Context::from_ref, so they drop on shutdown just like the old buffer did.
Type the aggregation worker's publish boundary as a SystemTime instead of a raw u64 millisecond timestamp. The worker derives each aggregate's delay via duration_since(SystemTime::now()), whose Err branch is exactly the send-immediately (past-boundary) case. SystemTime (not Instant) because the boundary is derived from genesis wall-clock time.
… worker Inline five helpers that ended up with a single caller: compute_subscription_subnets (back into build_swarm), interval2_boundary_ms and early_threshold_met (into their call sites), the early_aggregation_min_group_sigs method (into the trigger), and publish_aggregate (into the AggregateProduced handler). Keep early_aggregation_slot (non-trivial window math, reads clearer named) and the Store/GossipSignatureBuffer delegation pair (the file's encapsulation pattern). Also replace the worker's match on duration_since with an if-else on the unwrapped delay.
Fold the window-detection math into maybe_start_early_aggregation (its only caller) and drop the helper. Update the two doc comments that referenced it.
…ations A late- or future-slot attestation cannot advance the current slot's gossip group counts, so running the early-aggregation threshold check for it is wasted work. Gate the attestation-driven call on the attestation being for the store's current slot (store.time() / INTERVALS_PER_SLOT). The window-opening timer (EarlyAggregationCheck) still fires regardless.
The subnet-computation reorganization in build_swarm was leftover churn from the earlier extract/re-inline of compute_subscription_subnets and produced no behavioral change (min over the deduped set equals min over the mapped iterator; the subscription set collects identically). Restore main's single-pass version to keep this PR focused on early aggregation.
The 2/3-of-expected-votes threshold used floor division, so for counts not divisible by 3 it triggered slightly below a true 2/3. Round up with div_ceil so the early start waits for a genuine 2/3 supermajority of one committee's expected votes (ceil(2*N / (3*C))).
EARLY_AGGREGATION_WINDOW_MS was a bare u64 of milliseconds; make it a Duration (EARLY_AGGREGATION_WINDOW) so its unit is carried in the type. The timer site now subtracts Durations directly; the wall-clock window bounds check converts once via as_millis(), and the compile-time fits-within-one-interval assert uses the const as_millis().
Closes #492
What
Start the committee-signature aggregation session up to 600 ms before the interval-2 boundary (T2) when a single attestation-data group already holds ≥2/3 of the signatures expected from this node's aggregation subnets, and hold any early-produced aggregates back from gossip until T2.
Why
Aggregation (leanVM XMSS proofs) is the dominant cost in the attestation pipeline. When the expected signatures have already arrived, waiting for the interval-2 tick to start wastes runway before block building consumes the proofs at interval 4. Starting early gives the proofs more wall-clock time without moving anything else in the slot.
How
[T2−600ms, T2)— which opens 200 ms into interval 1, right as attestations begin arriving — start the session when the largest current-slot gossip group holds two-thirds of one committee's expected votes, rounded up, i.e.max_gossip_group_count_for_slot(slot) >= ceil(2 * N / (3 * C))(validator countN, committee countC; per-data-group, so at most one group can qualify since each validator signs once). The threshold is computed at runtime from the live validator/committee counts (C == 0never triggers).NewAttestationhandler gates onattestation.slot == store.time() / INTERVALS_PER_SLOT, since a late- or future-slot signature can't advance the current slot's group count), and once via a one-shotEarlyAggregationChecktimer scheduled at the interval-1 tick so it fires exactly at window open (covers signatures that all arrived before the window).current_aggregation.session_id == slot. Falls back to the normal interval-2 start when the threshold is never met.publish_at = genesis + slot·SLOT + 2·INTERVAL) and, for each produced aggregate still ahead of T2, delivers theAggregateProducedmessage viasend_aftertimed to land at T2; if already at/past T2 it sends immediately. A normal interval-2 session starts at T2, so its aggregates always send without delay. TheAggregateProducedhandler is fenced bysession_id, so a delayed message from a superseded session is dropped rather than applied.AGGREGATION_DEADLINE750 → 800 ms, now measured from session start (was the interval-2 tick); aconst_assertenforces that the early window fits within one interval so neither subtraction can underflow.max_gossip_group_count_for_slot(one lock, no signature clones) drives the threshold cheaply on every gossip insert.lean_aggregation_early_starts_total,lean_aggregation_early_start_lead_seconds;earlyfield on session logs.Accepted trade-offs
Testing
make fmt/make lint/make testall green; blockchain lib tests (40) pass.Initial A/B on a local 4-node devnet (1 aggregator, fresh genesis each, ~75–81 slots), baseline
mainvs this branch. Measured with the initial 400 ms window; the publish-alignment and chain-health conclusions are window-independent (aggregates always publish at T2; a wider window only shifts the early-start lead distribution).Early path fired on every eligible slot, hold-till-T2 flushes present, zero stale drops, zero prior-worker-still-running warnings.
Caveat: short single-host run; validates the timing mechanism and confirms no regression, not longevity/contention.
Review feedback addressed
AggregateProducedfenced bysession_id, so a delayed publish from a superseded session can't be applied to the current one.const_assertthatEARLY_AGGREGATION_WINDOW_MSfits within one interval.send_afteron the produced message); the separate flush-timer message and its slot-fencing were removed as no longer needed.build_swarmsubnet-computation refactor to keep the diff focused.