⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export-sample-test-data:

.PHONY: docs
docs:
cargo docs --document-private-items --exclude rollup-node-chain-orchestrator
cargo +$(NIGHTLY_TOOLCHAIN) docs --document-private-items --exclude rollup-node-chain-orchestrator

.PHONY: pr
pr: lint test docs
Expand Down
2 changes: 2 additions & 0 deletions crates/chain-orchestrator/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use scroll_network::NewBlockWithPeer;
pub enum ChainOrchestratorEvent {
/// A received block failed the consensus checks.
BlockFailedConsensusChecks(B256, PeerId),
/// A finalized block was received from a peer.
L2FinalizedBlockReceived(B256, PeerId),
/// A new block has been received from the network but we have insufficient data to process it
/// due to being in optimistic mode.
InsufficientDataForReceivedBlock(B256),
Expand Down
11 changes: 11 additions & 0 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,17 @@ impl<
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
tracing::debug!(target: "scroll::chain_orchestrator", block_hash = ?block_with_peer.block.header.hash_slow(), block_number = ?block_with_peer.block.number, peer_id = ?block_with_peer.peer_id, "Received new block from peer");

// Check we are not handling a finalized block.
if block_with_peer.block.header.number <= self.engine.fcs().finalized_block_info().number {
self.network
.handle()
.block_import_outcome(BlockImportOutcome::finalized_block(block_with_peer.peer_id));
return Ok(Some(ChainOrchestratorEvent::L2FinalizedBlockReceived(
block_with_peer.block.header.hash_slow(),
block_with_peer.peer_id,
)));
}

if let Err(err) =
self.consensus.validate_new_block(&block_with_peer.block, &block_with_peer.signature)
{
Expand Down
7 changes: 7 additions & 0 deletions crates/network/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ pub struct BlockImportOutcome {
}

impl BlockImportOutcome {
/// Creates a new `BlockImportOutcome` instance for a finalized block with the given peer ID.
pub fn finalized_block(peer: PeerId) -> Self {
Self { peer, result: Err(BlockImportError::L2FinalizedBlockReceived(peer)) }
}

/// Creates a new `BlockImportOutcome` instance for an invalid block with the given peer ID.
pub fn invalid_block(peer: PeerId) -> Self {
Self { peer, result: Err(BlockImportError::Validation(BlockValidationError::InvalidBlock)) }
Expand Down Expand Up @@ -56,6 +61,8 @@ pub enum BlockImportError {
Consensus(ConsensusError),
/// An error occurred during block validation.
Validation(BlockValidationError),
/// A finalized block was received from a peer.
L2FinalizedBlockReceived(PeerId),
}

/// A consensus related error that can occur during block import.
Expand Down
103 changes: 74 additions & 29 deletions crates/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use reth_tokio_util::{EventSender, EventStream};
use rollup_node_primitives::{sig_encode_hash, BlockInfo};
use scroll_alloy_hardforks::ScrollHardforks;
use scroll_wire::{
NewBlock, ScrollWireConfig, ScrollWireEvent, ScrollWireManager, ScrollWireProtocolHandler,
LRU_CACHE_SIZE,
NewBlock, PeerState, ScrollWireConfig, ScrollWireEvent, ScrollWireManager,
ScrollWireProtocolHandler, LRU_CACHE_SIZE,
};
use std::{
pin::Pin,
Expand Down Expand Up @@ -184,12 +184,26 @@ impl<
// Compute the block hash.
let hash = block.block.hash_slow();

// Filter the peers that have not seen this block hash.
// Filter the peers that have not seen this block hash via either protocol.
// We iterate over all connected scroll-wire peers.
let peers: Vec<FixedBytes<64>> = self
.scroll_wire
.state()
.iter()
.filter_map(|(peer_id, blocks)| (!blocks.contains(&hash)).then_some(*peer_id))
.connected_peers()
.filter_map(|peer_id| {
// Check if peer has seen this block via any protocol
let has_seen = self
.scroll_wire
.peer_state()
.get(peer_id)
.is_some_and(|state| state.has_seen(&hash));

// Only announce if peer hasn't seen this block
if !has_seen {
Some(*peer_id)
} else {
None
}
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should change the logic below.

If we have a connection with a peer via scroll wire - only send via scroll wire. If we don't have a connection with a peer via scroll wire then send only via eth wire.

What do you think?

Copy link
Member Author

@yiweichi yiweichi Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good idea! The only thing is about implementation.

If we don't have a connection with a peer via scroll wire then send only via eth wire.

The implementation in my mind is that, we will need to save all none-scroll-wire peerIds here, so we can send block directly to the peer only via eth wire by inner_network_handle. But seems the trait of inner_network_handle does export method send_eth_message.

And also I'm not sure how to maintain the none-scroll-wire peerlist on RN side, how do we handle peer disconnect.

Do you have a viable implementation in your mind?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modify this trait to implement it https://github.com/scroll-tech/reth/blob/079764e909be74219f32796342c4dd911fdd30a9/crates/net/network-api/src/block.rs#L15-L26

/// Provides a listener for new blocks on the eth wire protocol.
pub trait EthWireProvider<N: NetworkPrimitives> {
    /// Create a new eth wire block listener.
    fn eth_wire_block_listener(
        &self,
    ) -> impl Future<
        Output = Result<EventStream<NewBlockWithPeer<N::Block>>, oneshot::error::RecvError>,
    > + Send;

    /// Announce a new block to the network over the eth wire protocol.
    fn eth_wire_announce_block(&self, block: N::NewBlockPayload, hash: B256);
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking you could use something like this

        let peers = self.inner_network_handle.get_all_peers().await?;
        for peer in peers {
            if peer.capabilities.capabilities().iter().any(|cap| matches!(cap, ScrollWire)) {
                todo!("send via scroll wire")
            } else {
                todo!("send via eth wire")
            };
        }

.collect();

// TODO: remove this once we deprecate l2geth.
Expand Down Expand Up @@ -240,17 +254,31 @@ impl<
ScrollWireEvent::NewBlock { peer_id, block, signature } => {
let block_hash = block.hash_slow();
trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, signature = ?signature, "Received new block");

// Check if we have already received this block via scroll-wire from this peer, if
// so penalize it.
let state = self
.scroll_wire
.peer_state_mut()
.entry(peer_id)
.or_insert_with(|| PeerState::new(LRU_CACHE_SIZE));

if state.has_seen_via_scroll_wire(&block_hash) {
tracing::warn!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block via scroll-wire, penalizing");
self.inner_network_handle.reputation_change(
peer_id,
reth_network_api::ReputationChangeKind::BadBlock,
);
return None;
}
// Update the state: peer has seen this block via scroll-wire
state.insert_scroll_wire(block_hash);

if self.blocks_seen.contains(&(block_hash, signature)) {
None
} else {
// Update the state of the peer cache i.e. peer has seen this block.
self.scroll_wire
.state_mut()
.entry(peer_id)
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
.insert(block_hash);
// Update the state of the block cache i.e. we have seen this block.
self.blocks_seen.insert((block.hash_slow(), signature));
self.blocks_seen.insert((block_hash, signature));

Some(ScrollNetworkManagerEvent::NewBlock(NewBlockWithPeer {
peer_id,
Expand Down Expand Up @@ -310,6 +338,11 @@ impl<
self.inner_network_handle
.reputation_change(peer, reth_network_api::ReputationChangeKind::BadBlock);
}
Err(BlockImportError::L2FinalizedBlockReceived(peer)) => {
trace!(target: "scroll::network::manager", peer_id = ?peer, "Block import failed - finalized block received - penalizing peer");
self.inner_network_handle
.reputation_change(peer, reth_network_api::ReputationChangeKind::BadBlock);
}
}
}

Expand Down Expand Up @@ -339,25 +372,37 @@ impl<
.and_then(|i| Signature::from_raw(&extra_data[i..]).ok())
{
let block_hash = block.hash_slow();
if self.blocks_seen.contains(&(block_hash, signature)) {

// Check if we have already received this block from this peer via eth-wire, if so,
// penalize the peer.
let state = self
.scroll_wire
.peer_state_mut()
.entry(peer_id)
.or_insert_with(|| PeerState::new(LRU_CACHE_SIZE));

if state.has_seen_via_eth_wire(&block_hash) {
tracing::warn!(target: "scroll::bridge::import", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block via eth-wire, penalizing");
self.inner_network_handle
.reputation_change(peer_id, reth_network_api::ReputationChangeKind::BadBlock);
return None;
}
trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, signature = %signature.to_string(), extra_data = %extra_data.to_string(), "Received new block from eth-wire protocol");
// Update the state: peer has seen this block via eth-wire
state.insert_eth_wire(block_hash);

// Update the state of the peer cache i.e. peer has seen this block.
self.scroll_wire
.state_mut()
.entry(peer_id)
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
.insert(block_hash);

// Update the state of the block cache i.e. we have seen this block.
self.blocks_seen.insert((block_hash, signature));
Some(ScrollNetworkManagerEvent::NewBlock(NewBlockWithPeer {
peer_id,
block,
signature,
}))
if self.blocks_seen.contains(&(block_hash, signature)) {
None
} else {
trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, signature = %signature.to_string(), extra_data = %extra_data.to_string(), "Received new block from eth-wire protocol");

// Update the state of the block cache i.e. we have seen this block.
self.blocks_seen.insert((block_hash, signature));
Some(ScrollNetworkManagerEvent::NewBlock(NewBlockWithPeer {
peer_id,
block,
signature,
}))
}
} else {
tracing::warn!(target: "scroll::bridge::import", peer_id = %peer_id, "Failed to extract signature from block extra data, penalizing peer");
self.inner_network_handle
Expand Down
2 changes: 1 addition & 1 deletion crates/scroll-wire/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub use config::ScrollWireConfig;

mod connection;
mod manager;
pub use manager::{ScrollWireManager, LRU_CACHE_SIZE};
pub use manager::{PeerState, ScrollWireManager, LRU_CACHE_SIZE};

mod protocol;
pub use protocol::{NewBlock, ScrollWireEvent, ScrollWireProtocolHandler};
94 changes: 77 additions & 17 deletions crates/scroll-wire/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,114 @@ use tracing::trace;
/// The size of the LRU cache used to track blocks that have been seen by peers.
pub const LRU_CACHE_SIZE: u32 = 100;

/// Tracks block announced and received state for a peer.
#[derive(Debug)]
pub struct PeerState {
/// blocks announced to the peer
announced: LruCache<B256>,
/// blocks received via scroll-wire protocol, this is used to penalize peers that send
/// duplicate blocks via scroll-wire.
scroll_wire_received: LruCache<B256>,
/// blocks received via eth-wire protocol, this is used to penalize peers that send duplicate
/// blocks via eth-wire.
eth_wire_received: LruCache<B256>,
}

impl PeerState {
/// Creates a new `PeerBlockState` with the specified LRU cache capacity.
pub fn new(capacity: u32) -> Self {
Self {
announced: LruCache::new(capacity),
scroll_wire_received: LruCache::new(capacity),
eth_wire_received: LruCache::new(capacity),
}
}

/// Check if peer knows about this block (either received or announced).
pub fn has_seen(&self, hash: &B256) -> bool {
self.announced.contains(hash) ||
self.scroll_wire_received.contains(hash) ||
self.eth_wire_received.contains(hash)
}

/// Check if peer has received this block via scroll-wire specifically (for duplicate
/// detection).
pub fn has_seen_via_scroll_wire(&self, hash: &B256) -> bool {
self.scroll_wire_received.contains(hash)
}

/// Check if peer has received this block via eth-wire specifically (for duplicate detection).
pub fn has_seen_via_eth_wire(&self, hash: &B256) -> bool {
self.eth_wire_received.contains(hash)
}

/// Record that this peer has received a block via scroll-wire.
pub fn insert_scroll_wire(&mut self, hash: B256) {
self.scroll_wire_received.insert(hash); // Track for duplicate detection
}

/// Record that this peer has received a block via eth-wire.
pub fn insert_eth_wire(&mut self, hash: B256) {
self.eth_wire_received.insert(hash); // Track for duplicate detection
}

/// Record that we have announced a block to this peer.
pub fn insert_announced(&mut self, hash: B256) {
self.announced.insert(hash); // Only update unified announced, not protocol-specific
}
}

/// A manager for the `ScrollWire` protocol.
#[derive(Debug)]
pub struct ScrollWireManager {
/// A stream of [`ScrollWireEvent`]s produced by the scroll wire protocol.
events: UnboundedReceiverStream<ScrollWireEvent>,
/// A map of connections to peers.
connections: HashMap<PeerId, UnboundedSender<ScrollMessage>>,
/// A map of the state of the scroll wire protocol. Currently the state for each peer
/// is just a cache of the last 100 blocks seen by each peer.
state: HashMap<PeerId, LruCache<B256>>,
/// Unified state tracking block state and blocks received from each peer via both protocols.
peer_state: HashMap<PeerId, PeerState>,
}

impl ScrollWireManager {
/// Creates a new [`ScrollWireManager`] instance.
pub fn new(events: UnboundedReceiver<ScrollWireEvent>) -> Self {
trace!(target: "scroll::wire::manager", "Creating new ScrollWireManager instance");
Self { events: events.into(), connections: HashMap::new(), state: HashMap::new() }
Self { events: events.into(), connections: HashMap::new(), peer_state: HashMap::new() }
}

/// Announces a new block to the specified peer.
pub fn announce_block(&mut self, peer_id: PeerId, block: &NewBlock, hash: B256) {
if let Entry::Occupied(to_connection) = self.connections.entry(peer_id) {
// We send the block to the peer. If we receive an error we remove the peer from the
// connections map and delete its state as the connection is no longer valid.
// connections map and peer_block_state as the connection is no longer valid.
if to_connection.get().send(ScrollMessage::new_block(block.clone())).is_err() {
trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Failed to send block to peer - dropping peer.");
self.state.remove(&peer_id);
self.peer_state.remove(&peer_id);
to_connection.remove();
} else {
// Upon successful sending of the block we update the state of the peer.
trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Announced block to peer");
self.state
// Record that we announced this block to the peer
self.peer_state
.entry(peer_id)
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
.insert(hash);
.or_insert_with(|| PeerState::new(LRU_CACHE_SIZE))
.insert_announced(hash);
}
}
}

/// Returns the state of the `ScrollWire` protocol.
pub const fn state(&self) -> &HashMap<PeerId, LruCache<B256>> {
&self.state
/// Returns an iterator over the connected peer IDs.
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.connections.keys()
}

/// Returns a reference to the peer state map.
pub const fn peer_state(&self) -> &HashMap<PeerId, PeerState> {
&self.peer_state
}

/// Returns a mutable reference to the state of the `ScrollWire` protocol.
pub const fn state_mut(&mut self) -> &mut HashMap<PeerId, LruCache<B256>> {
&mut self.state
/// Returns a mutable reference to the peer state map.
pub const fn peer_state_mut(&mut self) -> &mut HashMap<PeerId, PeerState> {
&mut self.peer_state
}
}

Expand Down Expand Up @@ -94,7 +155,6 @@ impl Future for ScrollWireManager {
direction
);
this.connections.insert(peer_id, to_connection);
this.state.insert(peer_id, LruCache::new(100));
}
None => break,
}
Expand Down
Loading