Skip to content
20 changes: 6 additions & 14 deletions crates/blockchain/src/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::{
use ethlambda_crypto::aggregate_proofs;
use ethlambda_state_transition::{
attestation_data_matches_chain, justified_slots_ops, process_block, process_slots,
slot_is_justifiable_after,
};
use ethlambda_types::{
ShortRoot,
Expand Down Expand Up @@ -338,11 +337,6 @@ fn entry_passes_filters(
{
return Err("target_already_justified");
}
if !is_genesis_self_vote
&& !slot_is_justifiable_after(att_data.target.slot, projected_finalized_slot)
{
return Err("target_not_justifiable");
}
Ok(())
}

Expand Down Expand Up @@ -384,16 +378,14 @@ fn score_entry(
let total = prior_count + new_voters.len();
let crosses_2_3 = 3 * total >= 2 * validator_count;

// 3SF-mini finalization requires the source to lie past the finalized
// boundary (a source at or behind it is already final and must not
// re-finalize) and no slot strictly between source.slot and target.slot to
// still be justifiable (so source and target are consecutive justified
// checkpoints in the projected post-state). Mirrors `try_finalize` in the
// state transition.
// The simple BFT finality condition finalizes the source when it lies past
// the finalized boundary (a source at or behind it is already final and must
// not re-finalize) and the target is its immediate successor, so the two are
// consecutive justified checkpoints in the projected post-state. Mirrors
// `try_finalize` in the state transition.
let finalizes = crosses_2_3
&& att_data.source.slot > projected_finalized_slot
&& (att_data.source.slot + 1..att_data.target.slot)
.all(|s| !slot_is_justifiable_after(s, projected_finalized_slot));
&& att_data.source.slot + 1 == att_data.target.slot;

let tier = if is_genesis_self_vote(att_data) || !crosses_2_3 {
Tier::Build
Expand Down
17 changes: 13 additions & 4 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant, SystemTime};

use ethlambda_network_api::{BlockChainToP2PRef, InitP2P};
use ethlambda_state_transition::is_proposer;
use ethlambda_state_transition::{is_heartbeat_committee_member, is_proposer};
use ethlambda_storage::{ALL_TABLES, Store};
use ethlambda_types::{
ShortRoot,
Expand Down Expand Up @@ -433,9 +433,18 @@ impl BlockChainServer {

// Publish to gossip network
if let Some(ref p2p) = self.p2p {
let _ = p2p.publish_attestation(signed_attestation).inspect_err(
|err| error!(%slot, %validator_id, %err, "Failed to publish attestation"),
);
let _ = p2p
.publish_attestation(signed_attestation.clone())
.inspect_err(
|err| error!(%slot, %validator_id, %err, "Failed to publish attestation"),
);
let head_state = self.store.head_state();
let num_validators = head_state.validators.len() as u64;
if is_heartbeat_committee_member(validator_id, slot, num_validators) {
let _ = p2p.publish_heartbeat_attestation(signed_attestation).inspect_err(
|err| error!(%slot, %validator_id, %err, "Failed to publish attestation"),
);
}
info!(%slot, %validator_id, "Published attestation");
}
}
Expand Down
41 changes: 17 additions & 24 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashSet;

use ethlambda_state_transition::{is_proposer, slot_is_justifiable_after};
use ethlambda_state_transition::{is_heartbeat_committee_member, is_proposer};
use ethlambda_storage::{ForkCheckpoints, Store};
use ethlambda_types::{
ShortRoot,
Expand Down Expand Up @@ -39,10 +39,10 @@ fn accept_new_attestations(store: &mut Store, log_tree: bool) {
/// fork choice tree to the terminal.
pub fn update_head(store: &mut Store, log_tree: bool) {
let blocks = store.get_live_chain();
let attestations = store.extract_latest_known_attestations();
let attestations = store.get_last_slot_votes();
let old_head = store.head();
let (new_head, weights) = ethlambda_fork_choice::compute_lmd_ghost_head(
store.latest_justified().root,
store.safe_target(),
&blocks,
&attestations,
0,
Expand Down Expand Up @@ -108,13 +108,11 @@ pub fn update_head(store: &mut Store, log_tree: bool) {
/// evidence even when live participation has collapsed: exactly the failure
/// mode safe target is supposed to prevent. See leanSpec PR #680.
fn update_safe_target(store: &mut Store) {
let head_state = store.get_state(&store.head()).expect("head state exists");
let num_validators = head_state.validators.len() as u64;

let min_target_score = (num_validators * 2).div_ceil(3);

let blocks = store.get_live_chain();
let attestations = store.extract_latest_new_attestations();
let attestations = store.get_last_period_votes();
// Use a 2/3 threshold of the number of voting validators
let min_target_score = (attestations.len() as u64 * 2).div_ceil(3);

let (safe_target, _weights) = ethlambda_fork_choice::compute_lmd_ghost_head(
store.latest_justified().root,
&blocks,
Expand Down Expand Up @@ -364,6 +362,12 @@ pub fn on_gossip_attestation(
}
metrics::inc_pq_sig_attestation_signatures_valid();

let num_validators = target_state.validators.len() as u64;
// If the validator is in the heartbeat committee, persist the vote for fork choice usage.
if is_heartbeat_committee_member(validator_id, attestation.data.slot, num_validators) {
store.insert_heartbeat_vote(validator_id, attestation.data.clone());
}

// Only aggregators persist the signature for later aggregation at
// interval 2. Non-aggregators drop the validated attestation — they
// still participate in the mesh so peers see the message propagate.
Expand Down Expand Up @@ -640,7 +644,10 @@ pub fn get_attestation_target(store: &Store) -> Checkpoint {
pub fn get_attestation_target_with_checkpoints(
store: &Store,
justified: Checkpoint,
finalized: Checkpoint,
// Unused under the simple BFT finality condition (every slot is justifiable,
// so the target no longer needs a justifiability walk-back). Kept on the
// signature pending the finality redesign.
_finalized: Checkpoint,
) -> Checkpoint {
// Start from current head
let mut target_block_root = store.head();
Expand Down Expand Up @@ -668,20 +675,6 @@ pub fn get_attestation_target_with_checkpoints(
}
}

let finalized_slot = finalized.slot;

// Ensure target is in justifiable slot range
//
// Walk back until we find a slot that satisfies justifiability rules
// relative to the latest finalized checkpoint.
while target_header.slot > finalized_slot
&& !slot_is_justifiable_after(target_header.slot, finalized_slot)
{
target_block_root = target_header.parent_root;
target_header = store
.get_block_header(&target_block_root)
.expect("parent block exists");
}
// Guard: clamp target to justified (not in the spec).
//
// The spec's walk-back has no lower bound, so it can produce attestations
Expand Down
67 changes: 18 additions & 49 deletions crates/blockchain/state_transition/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use tracing::{info, warn};
pub mod justified_slots_ops;
pub mod metrics;

pub const HEARTBEAT_COMMITTEE_SIZE: usize = 4;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("target slot {target_slot} is in the past (current is {current_slot})")]
Expand Down Expand Up @@ -231,6 +233,21 @@ pub fn is_proposer(validator_index: u64, slot: u64, num_validators: u64) -> bool
current_proposer(slot, num_validators) == Some(validator_index)
}

/// Check if a validator is part of the heartbeat committee for a given slot.
///
/// The heartbeat committee is formed by the proposer and the next N validators.
pub fn is_heartbeat_committee_member(validator_index: u64, slot: u64, num_validators: u64) -> bool {
let Some(proposer) = current_proposer(slot, num_validators) else {
return false;
};
for i in 0..HEARTBEAT_COMMITTEE_SIZE as u64 {
if validator_index == (proposer + i) % num_validators {
return true;
}
}
false
}

/// Apply attestations and update justification/finalization
/// according to the Lean Consensus 3SF-mini rules.
fn process_attestations(
Expand Down Expand Up @@ -396,11 +413,6 @@ fn is_valid_vote(state: &State, data: &AttestationData) -> bool {
return false;
}

// Ensure the target falls on a slot that can be justified after the finalized one.
if !slot_is_justifiable_after(target.slot, state.latest_finalized.slot) {
return false;
}

true
}

Expand All @@ -426,9 +438,7 @@ fn try_finalize(
}

// Consider whether finalization can advance.
if ((source.slot + 1)..target.slot)
.any(|slot| slot_is_justifiable_after(slot, state.latest_finalized.slot))
{
if source.slot + 1 != target.slot {
metrics::inc_finalizations("error");
return;
}
Expand Down Expand Up @@ -537,47 +547,6 @@ pub fn attestation_data_matches_chain(
&& historical_block_hashes[head_slot] == data.head.root
}

/// Checks if the slot is a valid candidate for justification after a given finalized slot.
///
/// According to the 3SF-mini specification, a slot is justifiable if its
/// distance (`delta`) from the last finalized slot is:
/// 1. Less than or equal to 5.
/// 2. A perfect square (e.g., 9, 16, 25...).
/// 3. A pronic number (of the form x^2 + x, e.g., 6, 12, 20...).
///
/// See https://github.com/ethereum/research/blob/c003fe1c1a785797e7b53e3cbf9569b989be6e93/3sf-mini/consensus.py#L52-L54
/// for the 3SF-mini reference.
///
/// For why we have unjustifiable slots, consider that in high-latency
/// scenarios, validators may vote for many different slots, making none of them
/// reach the supermajority threshold. By having unjustifiable slots, we can
/// funnel votes towards only some slots, increasing finalization chances.
pub fn slot_is_justifiable_after(slot: u64, finalized_slot: u64) -> bool {
let Some(delta) = slot.checked_sub(finalized_slot) else {
// Candidate slot must not be before finalized slot
return false;
};
// Rule 1: The first 5 slots after finalization are always justifiable.
//
// Examples: delta = 0, 1, 2, 3, 4, 5
delta <= 5
// Rule 2: Slots at perfect square distances are justifiable.
//
// Examples: delta = 1, 4, 9, 16, 25, 36, 49, 64, ...
// Check: integer square root squared equals delta
|| delta.isqrt().pow(2) == delta
// Rule 3: Slots at pronic number distances are justifiable.
//
// Pronic numbers have the form n(n+1): 2, 6, 12, 20, 30, 42, 56, ...
// Mathematical insight: For pronic delta = n(n+1), we have:
// 4*delta + 1 = 4n(n+1) + 1 = (2n+1)^2
// Check: 4*delta+1 is an odd perfect square
|| delta
.checked_mul(4)
.and_then(|v| v.checked_add(1))
.is_some_and(|val| val.isqrt().pow(2) == val && val % 2 == 1)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 4 additions & 0 deletions crates/net/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ use spawned_concurrency::protocol;
pub trait BlockChainToP2P: Send + Sync {
fn publish_block(&self, block: SignedBlock) -> Result<(), ActorError>;
fn publish_attestation(&self, attestation: SignedAttestation) -> Result<(), ActorError>;
fn publish_heartbeat_attestation(
&self,
attestation: SignedAttestation,
) -> Result<(), ActorError>;
fn publish_aggregated_attestation(
&self,
attestation: SignedAggregatedAttestation,
Expand Down
28 changes: 26 additions & 2 deletions crates/net/p2p/src/gossipsub/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::{
attestation_subnet_topic,
},
};
use crate::{P2PServer, metrics};
use crate::{P2PServer, gossipsub::messages::HEARTBEAT_TOPIC_KIND, metrics};

pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
let Event::Message {
Expand Down Expand Up @@ -95,7 +95,10 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
);
}
}
Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => {
Some(kind)
if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX)
|| kind == HEARTBEAT_TOPIC_KIND =>
{
info!(kind = "attestation", peer_count, "P2P message received");
let compressed_len = message.data.len();
let Ok(uncompressed_data) = decompress_message(&message.data)
Expand Down Expand Up @@ -196,6 +199,27 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlock) {
);
}

pub async fn publish_heartbeat_attestation(server: &mut P2PServer, attestation: SignedAttestation) {
let slot = attestation.data.slot;
let validator = attestation.validator_id;

// Encode to SSZ
let ssz_bytes = attestation.to_ssz();

// Compress with raw snappy
let compressed = compress_message(&ssz_bytes);

// Publish to gossipsub
server
.swarm_handle
.publish(server.heartbeat_topic.clone(), compressed);
info!(
%slot,
validator,
"Published heartbeat attestation to gossipsub"
);
}

pub async fn publish_aggregated_attestation(
server: &mut P2PServer,
attestation: SignedAggregatedAttestation,
Expand Down
11 changes: 11 additions & 0 deletions crates/net/p2p/src/gossipsub/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ pub const FORK_DIGEST: &str = "12345678";

/// Topic kind for block gossip
pub const BLOCK_TOPIC_KIND: &str = "block";

/// Topic kind for heartbeat gossip
pub const HEARTBEAT_TOPIC_KIND: &str = "heartbeat";

/// Topic kind prefix for per-committee attestation subnets.
///
/// Full topic format: `/leanconsensus/{FORK_DIGEST}/attestation_{subnet_id}/ssz_snappy`
Expand Down Expand Up @@ -38,3 +42,10 @@ pub fn attestation_subnet_topic(subnet_id: u64) -> libp2p::gossipsub::IdentTopic
"/leanconsensus/{FORK_DIGEST}/{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}/ssz_snappy"
))
}

/// Build a heartbeat gossipsub topic.
pub fn heartbeat_topic() -> libp2p::gossipsub::IdentTopic {
libp2p::gossipsub::IdentTopic::new(format!(
"/leanconsensus/{FORK_DIGEST}/{HEARTBEAT_TOPIC_KIND}/ssz_snappy"
))
}
3 changes: 2 additions & 1 deletion crates/net/p2p/src/gossipsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ mod messages;
pub use encoding::decompress_message;
pub use handler::{
handle_gossipsub_message, publish_aggregated_attestation, publish_attestation, publish_block,
publish_heartbeat_attestation,
};
pub use messages::{aggregation_topic, attestation_subnet_topic, block_topic};
pub use messages::{aggregation_topic, attestation_subnet_topic, block_topic, heartbeat_topic};
Loading
Loading