Last active
March 8, 2023 18:04
-
-
Save 0xNineteen/0ce70e2dd950b4397c68f214f63e2321 to your computer and use it in GitHub Desktop.
discv5 + libp2p gossipsub -- built with lighthouse eth2 client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::collections::HashSet; | |
use std::error::Error; | |
use std::hash::Hash; | |
use std::sync::Arc; | |
use discv5::Enr; | |
use futures::lock::Mutex; | |
use libp2p::gossipsub::subscription_filter::AllowAllSubscriptionFilter; | |
use lighthouse_network::{EnrExt, GossipTopic, Eth2Enr, strip_peer_id}; | |
use lighthouse_network::config::{Config, gossipsub_config}; | |
use lighthouse_network::types::{SnappyTransform, GossipEncoding, GossipKind}; | |
use lighthouse_network::{service::behaviour::Gossipsub, build_transport}; | |
use futures::{prelude::*, select}; | |
use libp2p::gossipsub::{Gossipsub as BaseGossipsub, GossipsubConfig, Sha256Topic, GossipsubEvent, MessageAuthenticity}; | |
use libp2p::{ | |
gossipsub, identity, swarm::NetworkBehaviour, swarm::SwarmEvent, PeerId, Swarm, | |
}; | |
use ssz::Encode; | |
use types::{EthSpec, MainnetEthSpec, ChainSpec, BeaconState, EnrForkId}; | |
use discv5::{enr::{self, EnrBuilder}, enr::CombinedKey, Discv5, Discv5Config, Discv5Event, Discv5ConfigBuilder}; | |
use std::{net::{Ipv4Addr, SocketAddr, Ipv6Addr}, time::Duration, str::FromStr}; | |
use tokio::sync::mpsc::{channel, Receiver, Sender}; | |
use tokio::time::sleep; | |
use tokio::runtime::Builder; | |
use tracing_subscriber; | |
use tracing::{info, warn, Instrument}; | |
use libp2p::multiaddr::{Multiaddr, Protocol as MProtocol}; | |
#[derive(NetworkBehaviour)] | |
pub struct Behaviour { | |
pub gossipsub: BaseGossipsub<SnappyTransform, AllowAllSubscriptionFilter>, | |
} | |
// used in gossipsub | |
pub fn get_fork_digest() -> [u8; 4] { | |
let bytes = include_bytes!("../../../common/eth2_network_config/built_in_network_configs/mainnet/genesis.ssz"); | |
let spec = ChainSpec::mainnet(); | |
let genesis_state: Result<BeaconState<MainnetEthSpec>, ssz::DecodeError> = BeaconState::from_ssz_bytes(bytes, &spec); | |
let genesis_state = genesis_state.unwrap(); | |
let gvr = genesis_state.genesis_validators_root(); | |
let fork_version = spec.bellatrix_fork_version; | |
// let fork_version = spec.genesis_fork_version; | |
let fork_digest = ChainSpec::compute_fork_digest(fork_version, gvr); | |
fork_digest | |
} | |
#[tokio::main] | |
async fn main() -> Result<(), Box<dyn Error>> { | |
let port = 9000; | |
let query_interval = 10; | |
tracing_subscriber::fmt::init(); | |
let fork_digest = get_fork_digest(); // newest fork/get the newest blocks | |
let spec = ChainSpec::mainnet(); | |
let enr_fork_id = EnrForkId { | |
fork_digest: fork_digest, // bellatrix | |
next_fork_version: spec.capella_fork_version, | |
next_fork_epoch: spec.far_future_epoch, | |
}.as_ssz_bytes(); | |
// setup enr + discv5 | |
let enr_key = CombinedKey::generate_secp256k1(); | |
let enr = enr::EnrBuilder::new("v4") | |
.ip4(Ipv4Addr::LOCALHOST).udp4(port) | |
.ip6(Ipv6Addr::LOCALHOST).udp6(port) | |
.add_value("eth2", enr_fork_id.as_slice()) | |
.build(&enr_key) | |
.unwrap(); | |
let _enr_sign_key = if let CombinedKey::Secp256k1(key) = &enr_key { | |
key.clone() | |
} else { panic!("ah"); }; | |
let config = Discv5ConfigBuilder::new().enable_packet_filter().build(); | |
let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); | |
// init from a bootnode | |
//../../common/eth2_network_config/built_in_network_configs/mainnet/boot_enr.yaml | |
let bootnodes = vec![ | |
// Lighthouse Team (Sigma Prime) | |
"enr:-Jq4QItoFUuug_n_qbYbU0OY04-np2wT8rUCauOOXNi0H3BWbDj-zbfZb7otA7jZ6flbBpx1LNZK2TDebZ9dEKx84LYBhGV0aDKQtTA_KgEAAAD__________4JpZIJ2NIJpcISsaa0ZiXNlY3AyNTZrMaEDHAD2JKYevx89W0CcFJFiskdcEzkH_Wdv9iW42qLK79ODdWRwgiMo", | |
"enr:-Jq4QN_YBsUOqQsty1OGvYv48PMaiEt1AzGD1NkYQHaxZoTyVGqMYXg0K9c0LPNWC9pkXmggApp8nygYLsQwScwAgfgBhGV0aDKQtTA_KgEAAAD__________4JpZIJ2NIJpcISLosQxiXNlY3AyNTZrMaEDBJj7_dLFACaxBfaI8KZTh_SSJUjhyAyfshimvSqo22WDdWRwgiMo", | |
// EF Team | |
"enr:-Ku4QHqVeJ8PPICcWk1vSn_XcSkjOkNiTg6Fmii5j6vUQgvzMc9L1goFnLKgXqBJspJjIsB91LTOleFmyWWrFVATGngBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhAMRHkWJc2VjcDI1NmsxoQKLVXFOhp2uX6jeT0DvvDpPcU8FWMjQdR4wMuORMhpX24N1ZHCCIyg", | |
"enr:-Ku4QG-2_Md3sZIAUebGYT6g0SMskIml77l6yR-M_JXc-UdNHCmHQeOiMLbylPejyJsdAPsTHJyjJB2sYGDLe0dn8uYBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhBLY-NyJc2VjcDI1NmsxoQORcM6e19T1T9gi7jxEZjk_sjVLGFscUNqAY9obgZaxbIN1ZHCCIyg", | |
"enr:-Ku4QPn5eVhcoF1opaFEvg1b6JNFD2rqVkHQ8HApOKK61OIcIXD127bKWgAtbwI7pnxx6cDyk_nI88TrZKQaGMZj0q0Bh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhDayLMaJc2VjcDI1NmsxoQK2sBOLGcUb4AwuYzFuAVCaNHA-dy24UuEKkeFNgCVCsIN1ZHCCIyg", | |
"enr:-Ku4QEWzdnVtXc2Q0ZVigfCGggOVB2Vc1ZCPEc6j21NIFLODSJbvNaef1g4PxhPwl_3kax86YPheFUSLXPRs98vvYsoBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhDZBrP2Jc2VjcDI1NmsxoQM6jr8Rb1ktLEsVcKAPa08wCsKUmvoQ8khiOl_SLozf9IN1ZHCCIyg", | |
// Teku team (Consensys) | |
"enr:-KG4QOtcP9X1FbIMOe17QNMKqDxCpm14jcX5tiOE4_TyMrFqbmhPZHK_ZPG2Gxb1GE2xdtodOfx9-cgvNtxnRyHEmC0ghGV0aDKQ9aX9QgAAAAD__________4JpZIJ2NIJpcIQDE8KdiXNlY3AyNTZrMaEDhpehBDbZjM_L9ek699Y7vhUJ-eAdMyQW_Fil522Y0fODdGNwgiMog3VkcIIjKA", | |
"enr:-KG4QDyytgmE4f7AnvW-ZaUOIi9i79qX4JwjRAiXBZCU65wOfBu-3Nb5I7b_Rmg3KCOcZM_C3y5pg7EBU5XGrcLTduQEhGV0aDKQ9aX9QgAAAAD__________4JpZIJ2NIJpcIQ2_DUbiXNlY3AyNTZrMaEDKnz_-ps3UUOfHWVYaskI5kWYO_vtYMGYCQRAR3gHDouDdGNwgiMog3VkcIIjKA", | |
// Prysm team (Prysmatic Labs) | |
"enr:-Ku4QImhMc1z8yCiNJ1TyUxdcfNucje3BGwEHzodEZUan8PherEo4sF7pPHPSIB1NNuSg5fZy7qFsjmUKs2ea1Whi0EBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhBLf22SJc2VjcDI1NmsxoQOVphkDqal4QzPMksc5wnpuC3gvSC8AfbFOnZY_On34wIN1ZHCCIyg", | |
"enr:-Ku4QP2xDnEtUXIjzJ_DhlCRN9SN99RYQPJL92TMlSv7U5C1YnYLjwOQHgZIUXw6c-BvRg2Yc2QsZxxoS_pPRVe0yK8Bh2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhBLf22SJc2VjcDI1NmsxoQMeFF5GrS7UZpAH2Ly84aLK-TyvH-dRo0JM1i8yygH50YN1ZHCCJxA", | |
"enr:-Ku4QPp9z1W4tAO8Ber_NQierYaOStqhDqQdOPY3bB3jDgkjcbk6YrEnVYIiCBbTxuar3CzS528d2iE7TdJsrL-dEKoBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhBLf22SJc2VjcDI1NmsxoQMw5fqqkw2hHC4F5HZZDPsNmPdB1Gi8JPQK7pRc9XHh-oN1ZHCCKvg", | |
// Nimbus team | |
"enr:-LK4QA8FfhaAjlb_BXsXxSfiysR7R52Nhi9JBt4F8SPssu8hdE1BXQQEtVDC3qStCW60LSO7hEsVHv5zm8_6Vnjhcn0Bh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhAN4aBKJc2VjcDI1NmsxoQJerDhsJ-KxZ8sHySMOCmTO6sHM3iCFQ6VMvLTe948MyYN0Y3CCI4yDdWRwgiOM", | |
"enr:-LK4QKWrXTpV9T78hNG6s8AM6IO4XH9kFT91uZtFg1GcsJ6dKovDOr1jtAAFPnS2lvNltkOGA9k29BUN7lFh_sjuc9QBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhANAdd-Jc2VjcDI1NmsxoQLQa6ai7y9PMN5hpLe5HmiJSlYzMuzP7ZhwRiwHvqNXdoN0Y3CCI4yDdWRwgiOM", | |
]; | |
for bootnode in bootnodes { | |
let enr: Enr = bootnode.parse().unwrap(); | |
discv5.add_enr(enr).unwrap(); | |
} | |
// start disc service | |
let listen_addr = format!("0.0.0.0:{port}").parse::<SocketAddr>().unwrap(); | |
discv5.start(listen_addr).await.unwrap(); | |
let mut event_stream = discv5.event_stream().await.unwrap(); | |
let mut query_interval = tokio::time::interval(Duration::from_secs(query_interval)); | |
// setup gossip sub | |
// derive peer_id from enr key | |
let bytes = _enr_sign_key.to_bytes(); | |
let secret = identity::secp256k1::SecretKey::from_bytes(bytes).unwrap(); | |
let local_keypair = identity::secp256k1::Keypair::from(secret); | |
let local_keypair = identity::Keypair::Secp256k1(local_keypair); | |
let local_peer_id = PeerId::from(local_keypair.public()); | |
println!("Local peer id: {local_peer_id}"); | |
let (transport, _) = build_transport(local_keypair.clone()) | |
.map_err(|e| format!("Failed to build transport: {:?}", e))?; | |
let snappy_transform = SnappyTransform::new(65536); | |
let gossipsub = BaseGossipsub::new_with_transform( | |
MessageAuthenticity::Signed(local_keypair.clone()), | |
GossipsubConfig::default(), | |
None, | |
snappy_transform | |
) | |
.map_err(|e| format!("Could not construct gossipsub: {:?}", e))?; | |
let behaviour = Behaviour { | |
gossipsub, | |
}; | |
let mut swarm = Swarm::with_tokio_executor( | |
transport, | |
behaviour, | |
local_peer_id | |
); | |
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; | |
// // subscribe to the topics | |
// let local_fork_id = discv5.local_enr().eth2().unwrap().fork_digest; | |
// println!("fork_digest: local vs global: {fork_digest:?} {local_fork_id:?}"); | |
// return Ok(()); | |
// let topics = lighthouse_network::types::CORE_TOPICS; | |
let topics = vec![ | |
GossipKind::BeaconBlock, | |
]; | |
for topic_kind in topics.iter() { | |
let topic = GossipTopic::new( | |
topic_kind.clone(), | |
GossipEncoding::default(), | |
fork_digest | |
); | |
swarm.behaviour_mut().gossipsub.subscribe(&topic.into())?; | |
} | |
let mut known_peers = HashSet::new(); | |
// listen for new events | |
info!("starting event loop..."); | |
loop { | |
tokio::select! { | |
swarm_event = swarm.select_next_some() => { | |
match swarm_event { | |
SwarmEvent::Behaviour(behaviour_event) => match behaviour_event { | |
// Handle sub-behaviour events. | |
BehaviourEvent::Gossipsub(ge) => { | |
match ge { | |
GossipsubEvent::Message { | |
propagation_source, | |
message_id: id, | |
message: gs_msg, | |
} => { | |
info!("gossip message: {propagation_source:#?} {gs_msg:#?}"); | |
}, | |
GossipsubEvent::Subscribed { peer_id, topic } => { | |
info!("subscribe: {peer_id:?} {topic:?}"); | |
} | |
_ => {} | |
} | |
}, | |
_ => {} | |
// BehaviourEvent::Eth2Rpc(re) => self.inject_rpc_event(re), | |
// BehaviourEvent::Discovery(de) => self.inject_discovery_event(de), | |
// BehaviourEvent::Identify(ie) => self.inject_identify_event(ie), | |
// BehaviourEvent::PeerManager(pe) => self.inject_pm_event(pe), | |
}, | |
_ => {}, | |
}; | |
}, | |
Some(discv5_ev) = event_stream.recv() => { | |
// info!("discv5: {discv5_ev:?}"); | |
}, | |
_ = query_interval.tick() => { | |
// get metrics | |
let metrics = discv5.metrics(); | |
let connected_peers = discv5.connected_peers(); | |
info!("Connected peers: {}, Active sessions: {}, Unsolicited requests/s: {:.2}", connected_peers, metrics.active_sessions, metrics.unsolicited_requests_per_second); | |
if connected_peers > 50 { | |
continue | |
} | |
info!("Searching for peers..."); | |
// pick a random node target | |
let target_random_node_id = enr::NodeId::random(); | |
let predicate: Box<dyn Fn(&Enr) -> bool + Send> = Box::new(move |enr: &Enr| { | |
enr.eth2().map(|e2| e2.fork_digest) == Ok(fork_digest) && (enr.tcp4().is_some() || enr.tcp6().is_some()) | |
}); | |
match discv5.find_node_predicate(target_random_node_id, predicate, 16).await { | |
Err(e) => warn!("Find Node result failed: {:?}", e), | |
Ok(found_peers) => { | |
let n = found_peers.len(); | |
info!("found {n:?} peers..."); | |
for enr in found_peers { | |
let enr_b64 = enr.to_base64(); | |
if !known_peers.contains(&enr_b64) { | |
// dial new peer | |
for mut multiaddr in enr.multiaddr() { | |
// ignore udp multiaddr if it exists | |
let components = multiaddr.iter().collect::<Vec<_>>(); | |
if let MProtocol::Udp(_) = components[1] { | |
continue; | |
} | |
// info!("dialing peer..."); | |
strip_peer_id(&mut multiaddr); | |
swarm.dial(multiaddr)?; | |
} | |
// update records | |
known_peers.insert(enr_b64); | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment