Skip to content

feat(blockchain): start aggregation early when 2/3 of subnet signatures have arrived#487

Open
MegaRedHand wants to merge 30 commits into
mainfrom
feat/early-aggregation-start
Open

feat(blockchain): start aggregation early when 2/3 of subnet signatures have arrived#487
MegaRedHand wants to merge 30 commits into
mainfrom
feat/early-aggregation-start

Conversation

@MegaRedHand

@MegaRedHand MegaRedHand commented Jul 2, 2026

Copy link
Copy Markdown
Collaborator

Closes #492

What

Start the committee-signature aggregation session up to 600 ms before the interval-2 boundary (T2) when a single attestation-data group already holds ≥2/3 of the signatures expected from this node's aggregation subnets, and hold any early-produced aggregates back from gossip until T2.

Why

Aggregation (leanVM XMSS proofs) is the dominant cost in the attestation pipeline. When the expected signatures have already arrived, waiting for the interval-2 tick to start wastes runway before block building consumes the proofs at interval 4. Starting early gives the proofs more wall-clock time without moving anything else in the slot.

How

  • Early trigger (aggregator only, once per slot): inside the window [T2−600ms, T2) — which opens 200 ms into interval 1, right as attestations begin arriving — start the session when the largest current-slot gossip group holds two-thirds of one committee's expected votes, rounded up, i.e. max_gossip_group_count_for_slot(slot) >= ceil(2 * N / (3 * C)) (validator count N, committee count C; per-data-group, so at most one group can qualify since each validator signs once). The threshold is computed at runtime from the live validator/committee counts (C == 0 never triggers).
  • When the check runs: after every stored current-slot gossip signature (the NewAttestation handler gates on attestation.slot == store.time() / INTERVALS_PER_SLOT, since a late- or future-slot signature can't advance the current slot's group count), and once via a one-shot EarlyAggregationCheck timer scheduled at the interval-1 tick so it fires exactly at window open (covers signatures that all arrived before the window).
  • The early session is the slot's only session: the interval-2 tick skips starting a second one when current_aggregation.session_id == slot. Falls back to the normal interval-2 start when the threshold is never met.
  • Publish alignment: aggregates must not reach gossip before T2. The off-thread worker computes T2 as a wall-clock instant (publish_at = genesis + slot·SLOT + 2·INTERVAL) and, for each produced aggregate still ahead of T2, delivers the AggregateProduced message via send_after timed to land at T2; if already at/past T2 it sends immediately. A normal interval-2 session starts at T2, so its aggregates always send without delay. The AggregateProduced handler is fenced by session_id, so a delayed message from a superseded session is dropped rather than applied.
  • AGGREGATION_DEADLINE 750 → 800 ms, now measured from session start (was the interval-2 tick); a const_assert enforces that the early window fits within one interval so neither subtraction can underflow.
  • New store method max_gossip_group_count_for_slot (one lock, no signature clones) drives the threshold cheaply on every gossip insert.
  • New metrics lean_aggregation_early_starts_total, lean_aggregation_early_start_lead_seconds; early field on session logs.

Accepted trade-offs

  • No top-up session: signatures arriving after the early snapshot are dropped for that slot.
  • Deadline lands on the interval-3 boundary for a normal session (removes the old 50 ms publish margin); it only stops new jobs from starting, so a job mid-proof still finishes and publishes.
  • Once-per-slot latch has a documented hole only under a corrupted validator registry: if the snapshot yields no jobs (no signer pubkey resolves) no session is installed and the check retries on later inserts as no-ops.

Testing

make fmt / make lint / make test all green; blockchain lib tests (40) pass.

Initial A/B on a local 4-node devnet (1 aggregator, fresh genesis each, ~75–81 slots), baseline main vs this branch. Measured with the initial 400 ms window; the publish-alignment and chain-health conclusions are window-independent (aggregates always publish at T2; a wider window only shifts the early-start lead distribution).

Metric (slots ≥5) Baseline This branch
Aggregate publish offset (aggregator, p50) 1.744s 1.602s (−142 ms, on T2=1.6s)
Publish spread (p90 − min) 56 ms 2 ms
Aggregate arrival offset (receivers, p50) 1.776s 1.647s (−129 ms)
Aggregation compute (p50) 141.9 ms 145.8 ms (unchanged)
Block attestation_count (p50 / max) 1 / 2 1 / 2
Finalization (head − finalized, all nodes) gap 3 gap 3
WARN / ERROR / panic 0 / 0 / 0 0 / 0 / 0

Early path fired on every eligible slot, hold-till-T2 flushes present, zero stale drops, zero prior-worker-still-running warnings.

Caveat: short single-host run; validates the timing mechanism and confirms no regression, not longevity/contention.

Review feedback addressed

  • AggregateProduced fenced by session_id, so a delayed publish from a superseded session can't be applied to the current one.
  • Guarded the threshold against a zero committee count (never triggers instead of dividing by zero).
  • Added a const_assert that EARLY_AGGREGATION_WINDOW_MS fits within one interval.
  • The publish-delay mechanism moved into the worker (a timed send_after on the produced message); the separate flush-timer message and its slot-fencing were removed as no longer needed.
  • Reverted an unrelated no-op build_swarm subnet-computation refactor to keep the diff focused.

@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown

🤖 Kimi Code Review

This PR implements early aggregation start with proper threshold checks, publish alignment, and clean plumbing. The logic is sound and follows the actor model patterns established in the codebase.

Correctness & Safety

  • Line 53-55 (crates/blockchain/src/aggregation.rs): The early window check ms_into_slot >= t2_offset - EARLY_AGGREGATION_WINDOW_MS is safe from underflow because 2 * MILLISECONDS_PER_INTERVAL (1600) > 400, but consider making this explicit with saturating_sub if these constants ever become configurable.
  • Line 418-420 (crates/blockchain/src/lib.rs): Good guard against zero attestation_committee_count in the early_aggregation_expected_sigs calculation. The modulo operation in compute_subscription_subnets could still panic if called with zero, but this matches existing behavior in build_swarm.
  • Line 1099-1100 (crates/blockchain/src/lib.rs): The stale aggregate cleanup using std::mem::take with warning log is the right approach for handling clock skew or delayed flush timers.

Performance

  • Line 1471 (crates/storage/src/store.rs): max_group_count_for_slot iterates the entire gossip buffer under lock. Called per gossip insert when in the early window, this could contend with signature insertion. Consider documenting the expected buffer size or confirming this is acceptable via the devnet metrics.
  • Line 436 (crates/blockchain/src/lib.rs): The (0..validator_count).filter(...) loop at spawn time is O(n) over the validator set. Acceptable since it runs once at startup, but note that head_state().validators.len() clones the full state (as acknowledged in comments).

Code Quality

  • Line 316-322 (crates/blockchain/src/lib.rs): The already_started check in interval 2 correctly prevents double-starting when the early trigger fired, leveraging the fact that current_aggregation persists after worker completion.
  • Line 1053-1055 (crates/blockchain/src/lib.rs): The publish_aggregate helper properly isolates the P2P publish logic. Consider whether the inspect_err should increment a metrics counter for failed publishes (existing pattern elsewhere).

Nits

  • Line 60 (crates/blockchain/src/aggregation.rs): early_threshold_met could use a doc example showing the 2/3 math (e.g., expected=3, max_group=2 returns true).
  • Line 228 (bin/ethlambda/src/main.rs): Good extraction of compute_subscription_subnets ensuring the p2p subscription set and the aggregation expected-count calculation use identical logic.

Consensus Considerations

The 800 ms deadline change (from 750 ms) removes the previous 50 ms propagation margin. The comment correctly notes this is acceptable because the deadline only stops new jobs, not mid-proof work. Verify in the devnet run (Task 9) that proof completion + publish doesn't regularly overrun into interval 3.

The publish alignment mechanism (buffering aggregates produced before T2) correctly prevents premature gossip while allowing early computation. The flush timer scheduled via send_after uses monotonic time, so wall-clock skew won't cause premature publication.

No blocking issues found. The implementation matches the design spec and maintains backward compatibility (interval-2 fallback path remains intact).


Automated review by Kimi (Moonshot AI) · kimi-k2.5 · custom prompt

@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown

🤖 Codex Code Review

Findings

  1. crates/blockchain/src/aggregation.rs, crates/blockchain/src/lib.rs, crates/blockchain/src/lib.rs: FlushAggregatePublishes is not fenced by session_id, while pending_aggregate_publishes is a single global buffer. If an old flush timer fires late, it will drain and publish whatever is currently buffered, including aggregates from the next slot’s early session. That violates the new “do not gossip before interval 2” rule and can leak next-slot aggregates early. The flush message needs the same slot/session fencing as AggregationDeadline, and the buffered publishes need to be keyed or tagged by slot so stale timers cannot flush a newer session.

Aside from that race, the early-start threshold/latch flow looks internally consistent.

I couldn’t run the requested Rust verification in this environment because cargo/rustup failed to create temp files under a read-only home directory, so this review is static only.


Automated review by OpenAI Codex · gpt-5.4 · custom prompt

@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown

🤖 Claude Code Review

Review: feat/early-aggregation-start (PR 487)

Overall this is a well-designed, carefully documented change. The timing math, session-fencing (session_id == slot latch), and time-based (rather than flag-based) publish-buffering decision in AggregateProduced are all sound — I traced through the interval-1 timer scheduling, the per-insert trigger, the early/normal session merge, and the flush-buffer logic and didn't find a correctness or race-condition bug in the hot path. A few points worth addressing:

Minor issues

  1. crates/blockchain/src/lib.rs:441 (start_aggregation_session)coverage::emit_agg_start_new_coverage now runs on every call to start_aggregation_session, including the documented "no-op" retries that occur when the snapshot is empty (the latch-hole case, lib.rs:443-448, where current_aggregation is never installed and the interval-2 tick or a later gossip insert retries). Since the underlying metric is a gauge (cov_recordset_..., not a counter), repeated calls aren't incorrect, just wasted work in the pathological case. Worth a one-line comment noting this is intentional, since it's easy to mistake for a bug on future read-through.

  2. crates/blockchain/src/aggregation.rs:60 (early_aggregation_slot)t2_offset - EARLY_AGGREGATION_WINDOW_MS will underflow (panic in debug builds) if EARLY_AGGREGATION_WINDOW_MS is ever increased to ≥ 2 * MILLISECONDS_PER_INTERVAL, or if MILLISECONDS_PER_INTERVAL - EARLY_AGGREGATION_WINDOW_MS at lib.rs:353 goes negative. Both are safe today (400 < 1600, 400 < 800) but there's no const_assert guarding the relationship between the two constants. A static_assertions-style check (or a debug_assert at startup) would make this invariant self-enforcing instead of relying on reviewers noticing it.

  3. crates/blockchain/src/lib.rs:190-200 (early_aggregation_expected_sigs) — computed once at spawn from store.head_state().validators.len(), under the explicit assumption the validator registry is static for this node's lifetime. That's reasonable for the current lean-consensus model (no deposit processing yet per the docs), but if validator-set growth is ever added, this denominator silently goes stale and the 2/3 threshold drifts (an undercount would make the early trigger too eager, not just less accurate). Given the PR already documents the "hot-standby model" assumption for subnet freezing, it'd be worth adding the same explicit caveat here so it isn't missed when deposits land.

  4. No unit tests for the new pure functions (interval2_boundary_ms, early_aggregation_slot, early_threshold_met in aggregation.rs). The plan doc states this was a deliberate scope decision ("no new unit tests… final validation is a devnet run"), but these are exactly the kind of boundary-condition-heavy (window edges, checked_sub, >=/<) functions that are cheap to unit test and easy to silently regress later (e.g., an off-by-one on the < t2_offset vs <= t2_offset boundary would only show up as a subtle timing shift on devnet, not a hard failure). Given the accepted-trade-offs section already flags timing precision as important, I'd push back gently on skipping tests here even though the rest of the validation plan is fine.

Things that look correct and worth calling out positively

  • The publish-buffering decision in AggregateProduced (lib.rs:1213) re-checks unix_now_ms() < t2_ms at handling time rather than trusting a captured "early" flag — this correctly handles the case where the AggregateProduced message is delayed past the FlushAggregatePublishes timer, avoiding both a stuck-buffer bug and a double-publish.
  • Session fencing (current != Some(msg.session_id) in the AggregateProduced handler) was already in place pre-PR and correctly protects the new buffering logic from stale-worker messages after a session is superseded.
  • The AGGREGATION_DEADLINE change (750ms→800ms, now measured from session start rather than a fixed interval-2 anchor) keeps the worst-case deadline pinned to the interval-3 boundary regardless of how early a session starts — early sessions get more runway, never less, which matches the stated goal.
  • compute_subscription_subnets extraction is a clean, behavior-preserving refactor — confirmed main.rs and build_swarm now call the exact same helper with the same inputs (validator_ids, attestation_committee_count, options.is_aggregator), so the "single source of truth" claim in the PR description holds.
  • Lock usage in store.rs (max_gossip_group_count_for_slot) is a single, non-nested Mutex acquisition consistent with existing methods — no new deadlock risk.

No blocking issues found; the items above are suggestions rather than must-fix defects.


Automated review by Claude (Anthropic) · sonnet · custom prompt

@greptile-apps

greptile-apps Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces an early-aggregation trigger that allows the XMSS proof worker to start up to 600 ms before the interval-2 boundary when a 2/3 quorum of expected gossip signatures has arrived, giving the expensive leanVM computations more wall-clock runway before block-building consumes the proofs at interval 4.

  • Early session start: a one-shot timer at interval-1 and a per-insert check in NewAttestation call maybe_start_early_aggregation; the started session is the slot's only session, and the interval-2 tick skips if one is already present.
  • Publish alignment: early-produced aggregates are held via send_after until the T2 wall-clock instant before being forwarded to gossip, matching the pre-PR behavior for non-early sessions.
  • New metrics (lean_aggregation_early_starts_total, lean_aggregation_early_start_lead_seconds) and an early field on session-done log events enable observability of the feature.

Confidence Score: 5/5

The early-aggregation path is well-guarded and the change is safe to merge; the single finding is a metric accuracy nit in a documented edge case.

The early-aggregation path is protected by a time-window check, a once-per-slot latch, and session_id fencing on delayed messages — no double-starts, no premature gossip, no cross-slot leakage. The only finding is a metric overcounting issue in the documented latch-hole edge case (corrupted validator registry), which has no effect on aggregation correctness or chain consensus.

crates/blockchain/src/lib.rs — the early-metric emission ordering in start_aggregation_session is the one area worth a second look.

Important Files Changed

Filename Overview
crates/blockchain/src/aggregation.rs Adds publish-alignment logic to the worker via send_after keyed on publish_at (the T2 wall-clock instant), new message types for early-check and deadline handling, and the EARLY_AGGREGATION_WINDOW_MS const with a compile-time bounds assertion.
crates/blockchain/src/lib.rs Core orchestration changes: interval-1 schedules EarlyAggregationCheck, interval-2 skips when a session is already present, maybe_start_early_aggregation checks the time window and threshold, and start_aggregation_session now computes publish_at and emits coverage/metrics. Early metric and log fire before the snapshot check, which can produce a misleading count when no snapshot is available.
crates/blockchain/src/metrics.rs Adds LEAN_AGGREGATION_EARLY_STARTS_TOTAL counter and LEAN_AGGREGATION_EARLY_START_LEAD_SECONDS histogram, both registered in init(); straightforward metrics additions.
crates/storage/src/store.rs Adds max_gossip_group_count_for_slot to Store backed by a single-lock scan over the gossip-signature buffer; cheap and correct for the per-insert threshold check.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant T as Timer
    participant A as BlockChainServer
    participant W as Worker
    participant P as P2P

    A->>T: schedule EarlyAggregationCheck in 200ms (interval-1 tick)
    T->>A: EarlyAggregationCheck fires at T2-600ms
    A->>A: maybe_start_early_aggregation
    alt threshold met
        A->>W: spawn run_aggregation_worker publish_at equals T2
        A->>T: schedule AggregationDeadline in 800ms
        W->>W: aggregate_job
        W->>T: send_after delay_to_T2 AggregateProduced
        T->>A: AggregateProduced at T2
        A->>P: publish_aggregated_attestation
        T->>A: AggregationDeadline at T2 plus 200ms
        A->>W: cancel
    end
    A->>A: interval-2 tick skips if session already present
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant T as Timer
    participant A as BlockChainServer
    participant W as Worker
    participant P as P2P

    A->>T: schedule EarlyAggregationCheck in 200ms (interval-1 tick)
    T->>A: EarlyAggregationCheck fires at T2-600ms
    A->>A: maybe_start_early_aggregation
    alt threshold met
        A->>W: spawn run_aggregation_worker publish_at equals T2
        A->>T: schedule AggregationDeadline in 800ms
        W->>W: aggregate_job
        W->>T: send_after delay_to_T2 AggregateProduced
        T->>A: AggregateProduced at T2
        A->>P: publish_aggregated_attestation
        T->>A: AggregationDeadline at T2 plus 200ms
        A->>W: cancel
    end
    A->>A: interval-2 tick skips if session already present
Loading

Reviews (2): Last reviewed commit: "Merge branch 'main' into feat/early-aggr..." | Re-trigger Greptile

Comment thread crates/net/p2p/src/lib.rs Outdated
Comment on lines +194 to +203
pub fn compute_subscription_subnets(
validator_ids: &[u64],
attestation_committee_count: u64,
is_aggregator: bool,
aggregate_subnet_ids: Option<&[u64]>,
) -> HashSet<u64> {
let mut subnets: HashSet<u64> = validator_ids
.iter()
.map(|vid| vid % attestation_committee_count)
.collect();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The new public function panics with integer overflow (division by zero) if attestation_committee_count == 0 and validator_ids is non-empty — the same latent issue that existed inline in build_swarm, but now exposed as a callable public API without a documented precondition. Adding an early guard keeps the semantics identical to a zero-committee node (no validators belong to any subnet) while making the contract explicit.

Suggested change
pub fn compute_subscription_subnets(
validator_ids: &[u64],
attestation_committee_count: u64,
is_aggregator: bool,
aggregate_subnet_ids: Option<&[u64]>,
) -> HashSet<u64> {
let mut subnets: HashSet<u64> = validator_ids
.iter()
.map(|vid| vid % attestation_committee_count)
.collect();
pub fn compute_subscription_subnets(
validator_ids: &[u64],
attestation_committee_count: u64,
is_aggregator: bool,
aggregate_subnet_ids: Option<&[u64]>,
) -> HashSet<u64> {
let mut subnets: HashSet<u64> = if attestation_committee_count == 0 {
HashSet::new()
} else {
validator_ids
.iter()
.map(|vid| vid % attestation_committee_count)
.collect()
};
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/p2p/src/lib.rs
Line: 194-203

Comment:
The new public function panics with integer overflow (division by zero) if `attestation_committee_count == 0` and `validator_ids` is non-empty — the same latent issue that existed inline in `build_swarm`, but now exposed as a callable public API without a documented precondition. Adding an early guard keeps the semantics identical to a zero-committee node (no validators belong to any subnet) while making the contract explicit.

```suggestion
pub fn compute_subscription_subnets(
    validator_ids: &[u64],
    attestation_committee_count: u64,
    is_aggregator: bool,
    aggregate_subnet_ids: Option<&[u64]>,
) -> HashSet<u64> {
    let mut subnets: HashSet<u64> = if attestation_committee_count == 0 {
        HashSet::new()
    } else {
        validator_ids
            .iter()
            .map(|vid| vid % attestation_committee_count)
            .collect()
    };
```

How can I resolve this? If you propose a fix, please make it concise.

Comment thread crates/blockchain/src/lib.rs Outdated
Comment on lines 1221 to 1236
impl Handler<FlushAggregatePublishes> for BlockChainServer {
async fn handle(&mut self, _msg: FlushAggregatePublishes, _ctx: &Context<Self>) {
let pending = std::mem::take(&mut self.pending_aggregate_publishes);
if pending.is_empty() {
return;
}
let session_id = self.current_aggregation.as_ref().map(|s| s.session_id);
info!(
session_id,
count = pending.len(),
"Publishing aggregates held back until the interval-2 boundary"
);
for aggregate in pending {
self.publish_aggregate(aggregate);
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 FlushAggregatePublishes carries no slot or session identifier, so it cannot distinguish which slot's buffer to drain. In normal operation this is fine — the timer fires within 400 ms and the next session is ≥4 s away. However, if the message is delayed past the start of the next session, start_aggregation_session drains the stale buffer (correct) but any aggregates the new session subsequently buffers before the stale flush message is processed would be published prematurely, violating the publish-alignment invariant. Adding a slot: u64 field and checking it against current_aggregation.session_id before draining would eliminate this window entirely.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/blockchain/src/lib.rs
Line: 1221-1236

Comment:
`FlushAggregatePublishes` carries no slot or session identifier, so it cannot distinguish which slot's buffer to drain. In normal operation this is fine — the timer fires within 400 ms and the next session is ≥4 s away. However, if the message is delayed past the start of the next session, `start_aggregation_session` drains the stale buffer (correct) but any aggregates the *new* session subsequently buffers before the stale flush message is processed would be published prematurely, violating the publish-alignment invariant. Adding a `slot: u64` field and checking it against `current_aggregation.session_id` before draining would eliminate this window entirely.

How can I resolve this? If you propose a fix, please make it concise.

…div-by-zero and window underflow

Addresses PR #487 review feedback:
- FlushAggregatePublishes now carries its session slot and drains only when
  it matches current_aggregation, matching AggregationDeadline's fencing. A
  flush timer delayed past the next session start can no longer publish the
  newer session's buffered aggregates before its interval-2 boundary.
- compute_subscription_subnets returns an empty set for a zero committee
  count instead of dividing by zero (defensive; callers enforce >= 1).
- const assert that EARLY_AGGREGATION_WINDOW_MS fits within one interval,
  making the no-underflow invariant self-enforcing.
Halve EARLY_AGGREGATION_WINDOW_MS (400 -> 200). Rescale the
lean_aggregation_early_start_lead_seconds histogram buckets to 0.025-0.2s
so they keep full resolution over the new window, and update the spec/plan
(also fixing an inverted deadline-bound phrasing).
Window opens at T2-600 = 200ms into interval 1, so the trigger can fire as
soon as attestations start arriving. Rescale the lead-time histogram buckets
to 0.075-0.6s to match.
@MegaRedHand MegaRedHand marked this pull request as draft July 2, 2026 20:09
Replace the per-validator subnet scan in BlockChain::spawn with the closed
form 2*N/(3*C) (two-thirds of one committee's expected votes), folding the
2/3 fraction into the precomputed threshold so early_threshold_met is now a
direct >= comparison. Drop the now-unused aggregation_subnets parameter and
its main.rs plumbing; compute_subscription_subnets is only used inside the
p2p crate now, so downgrade it to pub(crate).
…ntime

Drop the cached genesis_time_ms and early_aggregation_min_group_sigs fields
from BlockChainServer. Genesis time is read via store.config() at each use
site (matching the tick paths); the early-aggregation threshold is computed
on demand from head_state, reached only inside the early window and only
until a session starts.
…hAggregatePublishes

Move publish alignment from the actor into the aggregation worker. The worker
now receives the interval-2 boundary timestamp and, for each aggregate it
produces before that boundary, uses send_after to delay the AggregateProduced
message until the boundary (sending immediately if already past it). This
removes the FlushAggregatePublishes self-message, the pending_aggregate_publishes
buffer, and the T2 check in the AggregateProduced handler. Delivery timers are
tied to the actor's cancellation token via Context::from_ref, so they drop on
shutdown just like the old buffer did.
Type the aggregation worker's publish boundary as a SystemTime instead of a
raw u64 millisecond timestamp. The worker derives each aggregate's delay via
duration_since(SystemTime::now()), whose Err branch is exactly the
send-immediately (past-boundary) case. SystemTime (not Instant) because the
boundary is derived from genesis wall-clock time.
… worker

Inline five helpers that ended up with a single caller: compute_subscription_subnets
(back into build_swarm), interval2_boundary_ms and early_threshold_met (into their
call sites), the early_aggregation_min_group_sigs method (into the trigger), and
publish_aggregate (into the AggregateProduced handler). Keep early_aggregation_slot
(non-trivial window math, reads clearer named) and the Store/GossipSignatureBuffer
delegation pair (the file's encapsulation pattern). Also replace the worker's
match on duration_since with an if-else on the unwrapped delay.
Fold the window-detection math into maybe_start_early_aggregation (its only
caller) and drop the helper. Update the two doc comments that referenced it.
…ations

A late- or future-slot attestation cannot advance the current slot's
gossip group counts, so running the early-aggregation threshold check for
it is wasted work. Gate the attestation-driven call on the attestation
being for the store's current slot (store.time() / INTERVALS_PER_SLOT).
The window-opening timer (EarlyAggregationCheck) still fires regardless.
The subnet-computation reorganization in build_swarm was leftover churn
from the earlier extract/re-inline of compute_subscription_subnets and
produced no behavioral change (min over the deduped set equals min over
the mapped iterator; the subscription set collects identically). Restore
main's single-pass version to keep this PR focused on early aggregation.
@MegaRedHand MegaRedHand marked this pull request as ready for review July 2, 2026 21:55
The 2/3-of-expected-votes threshold used floor division, so for counts
not divisible by 3 it triggered slightly below a true 2/3. Round up with
div_ceil so the early start waits for a genuine 2/3 supermajority of one
committee's expected votes (ceil(2*N / (3*C))).
EARLY_AGGREGATION_WINDOW_MS was a bare u64 of milliseconds; make it a
Duration (EARLY_AGGREGATION_WINDOW) so its unit is carried in the type.
The timer site now subtracts Durations directly; the wall-clock window
bounds check converts once via as_millis(), and the compile-time
fits-within-one-interval assert uses the const as_millis().
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Start aggregation earlier

1 participant