Created
June 13, 2024 12:28
-
-
Save DylanVerstraete/80e858b194a4ad7e318b0a9829edb926 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use crate::Client; | |
use alloy::{providers::Provider, rpc::types::eth::Block}; | |
use anyhow::Result; | |
use async_trait::async_trait; | |
use futures_util::StreamExt; | |
use tokio::sync::mpsc; | |
use tokio::task::JoinHandle; | |
use tracing::{debug, info}; | |
use crate::Error; | |
#[async_trait] | |
pub trait BlockSubscription: Send + Sync { | |
fn cancel(&self) -> Result<()>; | |
async fn next(&mut self) -> Option<Block>; | |
} | |
const BUFFER_SIZE: usize = 100; | |
/// `NewBlockSubscription` is a struct that references to a receiving end of a channel where blocks are pushed upon | |
/// It subscribes to the head of the chain and pushes new blocks to the channel | |
#[derive(Debug)] | |
struct NewBlockSubscription { | |
receiver: mpsc::Receiver<Block>, | |
handle: JoinHandle<Result<(), Error>>, | |
} | |
#[async_trait] | |
impl BlockSubscription for NewBlockSubscription { | |
/// Cancel the subscription | |
fn cancel(&self) -> Result<()> { | |
// Cancel the subscription task | |
debug!("Canceling subscription"); | |
self.handle.abort(); | |
Ok(()) | |
} | |
/// Get the next proof | |
async fn next(&mut self) -> Option<Block> { | |
// Receive the next proof from the channel | |
self.receiver.recv().await | |
} | |
} | |
/// Subscribe to the latest heads of the chain | |
/// This function returns a `BlockSubscription` trait object | |
pub fn subscribe_latest_heads(client: Client) -> Result<Box<dyn BlockSubscription>, Error> { | |
let (sender, receiver) = mpsc::channel(BUFFER_SIZE); | |
let client = client.clone(); | |
let handle = tokio::spawn(async move { | |
let subscription = client.provider.subscribe_blocks().await?; | |
let mut stream = subscription.into_stream(); | |
loop { | |
if let Some(block) = stream.next().await { | |
let block_number = block.header.number.unwrap_or_default(); | |
info!("Received block number: {}", block_number); | |
let block = client.get_block(block_number).await?; | |
sender.send(block).await.ok(); | |
} | |
} | |
}); | |
Ok(Box::new(NewBlockSubscription { receiver, handle })) | |
} | |
/// `BlockFetcher` is a struct that fetches blocks from a given height with a given interval | |
struct BlockFetcher { | |
pub client: Client, | |
pub from_height: u64, | |
pub interval: u64, | |
pub switch_to_latest: bool, | |
} | |
impl BlockFetcher { | |
pub fn new(client: Client, from_height: u64, interval: u64) -> Self { | |
Self { | |
client, | |
from_height, | |
interval, | |
switch_to_latest: false, | |
} | |
} | |
} | |
#[async_trait] | |
impl BlockSubscription for BlockFetcher { | |
fn cancel(&self) -> Result<()> { | |
debug!("Canceling subscription"); | |
Ok(()) | |
} | |
async fn next(&mut self) -> Option<Block> { | |
let block = self.client.get_block(self.from_height).await.ok(); | |
self.from_height += self.interval; | |
// This wont work since the sub will be opened every time next is called | |
if block.is_none() { | |
self.switch_to_latest = true; | |
let mut sub = subscribe_latest_heads(self.client.clone()).ok()?; | |
return sub.next().await; | |
} | |
block | |
} | |
} | |
/// Subscription configuration | |
/// - `start_block`: The block number to start the subscription from | |
/// - `interval`: The interval to fetch blocks | |
/// This is only relevant when fetching blocks from a specific height | |
/// An interval is provided to speed up the fetching process | |
pub struct SubscriptionConfig { | |
pub start_block: u64, | |
pub interval: u64, | |
} | |
impl Client { | |
/// Open a subscription to the chain | |
/// This function returns a `BlockSubscription` trait object | |
/// - `config`: Subscription configuration | |
/// If no configuration is provided, it will subscribe to the latest heads | |
/// If a configuration is provided, it will fetch blocks from a specific height with a given interval & switch to latest heads if it's all caught up | |
pub fn open_subscription( | |
&self, | |
config: Option<SubscriptionConfig>, | |
) -> Result<Box<dyn BlockSubscription>> { | |
let client = self.clone(); | |
if let Some(config) = config { | |
Ok(Box::new(BlockFetcher::new( | |
client, | |
config.start_block, | |
config.interval, | |
))) | |
} else { | |
Ok(subscribe_latest_heads(client)?) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment