Skip to content

Instantly share code, notes, and snippets.

@0xNineteen
Last active March 8, 2023 18:04
Show Gist options
  • Save 0xNineteen/0ce70e2dd950b4397c68f214f63e2321 to your computer and use it in GitHub Desktop.
Save 0xNineteen/0ce70e2dd950b4397c68f214f63e2321 to your computer and use it in GitHub Desktop.
discv5 + libp2p gossipsub -- built with lighthouse eth2 client
use std::collections::HashSet;
use std::error::Error;
use std::hash::Hash;
use std::sync::Arc;
use discv5::Enr;
use futures::lock::Mutex;
use libp2p::gossipsub::subscription_filter::AllowAllSubscriptionFilter;
use lighthouse_network::{EnrExt, GossipTopic, Eth2Enr, strip_peer_id};
use lighthouse_network::config::{Config, gossipsub_config};
use lighthouse_network::types::{SnappyTransform, GossipEncoding, GossipKind};
use lighthouse_network::{service::behaviour::Gossipsub, build_transport};
use futures::{prelude::*, select};
use libp2p::gossipsub::{Gossipsub as BaseGossipsub, GossipsubConfig, Sha256Topic, GossipsubEvent, MessageAuthenticity};
use libp2p::{
gossipsub, identity, swarm::NetworkBehaviour, swarm::SwarmEvent, PeerId, Swarm,
};
use ssz::Encode;
use types::{EthSpec, MainnetEthSpec, ChainSpec, BeaconState, EnrForkId};
use discv5::{enr::{self, EnrBuilder}, enr::CombinedKey, Discv5, Discv5Config, Discv5Event, Discv5ConfigBuilder};
use std::{net::{Ipv4Addr, SocketAddr, Ipv6Addr}, time::Duration, str::FromStr};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time::sleep;
use tokio::runtime::Builder;
use tracing_subscriber;
use tracing::{info, warn, Instrument};
use libp2p::multiaddr::{Multiaddr, Protocol as MProtocol};
#[derive(NetworkBehaviour)]
pub struct Behaviour {
pub gossipsub: BaseGossipsub<SnappyTransform, AllowAllSubscriptionFilter>,
}
// used in gossipsub
pub fn get_fork_digest() -> [u8; 4] {
let bytes = include_bytes!("../../../common/eth2_network_config/built_in_network_configs/mainnet/genesis.ssz");
let spec = ChainSpec::mainnet();
let genesis_state: Result<BeaconState<MainnetEthSpec>, ssz::DecodeError> = BeaconState::from_ssz_bytes(bytes, &spec);
let genesis_state = genesis_state.unwrap();
let gvr = genesis_state.genesis_validators_root();
let fork_version = spec.bellatrix_fork_version;
// let fork_version = spec.genesis_fork_version;
let fork_digest = ChainSpec::compute_fork_digest(fork_version, gvr);
fork_digest
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let port = 9000;
let query_interval = 10;
tracing_subscriber::fmt::init();
let fork_digest = get_fork_digest(); // newest fork/get the newest blocks
let spec = ChainSpec::mainnet();
let enr_fork_id = EnrForkId {
fork_digest: fork_digest, // bellatrix
next_fork_version: spec.capella_fork_version,
next_fork_epoch: spec.far_future_epoch,
}.as_ssz_bytes();
// setup enr + discv5
let enr_key = CombinedKey::generate_secp256k1();
let enr = enr::EnrBuilder::new("v4")
.ip4(Ipv4Addr::LOCALHOST).udp4(port)
.ip6(Ipv6Addr::LOCALHOST).udp6(port)
.add_value("eth2", enr_fork_id.as_slice())
.build(&enr_key)
.unwrap();
let _enr_sign_key = if let CombinedKey::Secp256k1(key) = &enr_key {
key.clone()
} else { panic!("ah"); };
let config = Discv5ConfigBuilder::new().enable_packet_filter().build();
let mut discv5 = Discv5::new(enr, enr_key, config).unwrap();
// init from a bootnode
//../../common/eth2_network_config/built_in_network_configs/mainnet/boot_enr.yaml
let bootnodes = vec![
// Lighthouse Team (Sigma Prime)
"enr:-Jq4QItoFUuug_n_qbYbU0OY04-np2wT8rUCauOOXNi0H3BWbDj-zbfZb7otA7jZ6flbBpx1LNZK2TDebZ9dEKx84LYBhGV0aDKQtTA_KgEAAAD__________4JpZIJ2NIJpcISsaa0ZiXNlY3AyNTZrMaEDHAD2JKYevx89W0CcFJFiskdcEzkH_Wdv9iW42qLK79ODdWRwgiMo",
"enr:-Jq4QN_YBsUOqQsty1OGvYv48PMaiEt1AzGD1NkYQHaxZoTyVGqMYXg0K9c0LPNWC9pkXmggApp8nygYLsQwScwAgfgBhGV0aDKQtTA_KgEAAAD__________4JpZIJ2NIJpcISLosQxiXNlY3AyNTZrMaEDBJj7_dLFACaxBfaI8KZTh_SSJUjhyAyfshimvSqo22WDdWRwgiMo",
// EF Team
"enr:-Ku4QHqVeJ8PPICcWk1vSn_XcSkjOkNiTg6Fmii5j6vUQgvzMc9L1goFnLKgXqBJspJjIsB91LTOleFmyWWrFVATGngBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhAMRHkWJc2VjcDI1NmsxoQKLVXFOhp2uX6jeT0DvvDpPcU8FWMjQdR4wMuORMhpX24N1ZHCCIyg",
"enr:-Ku4QG-2_Md3sZIAUebGYT6g0SMskIml77l6yR-M_JXc-UdNHCmHQeOiMLbylPejyJsdAPsTHJyjJB2sYGDLe0dn8uYBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhBLY-NyJc2VjcDI1NmsxoQORcM6e19T1T9gi7jxEZjk_sjVLGFscUNqAY9obgZaxbIN1ZHCCIyg",
"enr:-Ku4QPn5eVhcoF1opaFEvg1b6JNFD2rqVkHQ8HApOKK61OIcIXD127bKWgAtbwI7pnxx6cDyk_nI88TrZKQaGMZj0q0Bh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhDayLMaJc2VjcDI1NmsxoQK2sBOLGcUb4AwuYzFuAVCaNHA-dy24UuEKkeFNgCVCsIN1ZHCCIyg",
"enr:-Ku4QEWzdnVtXc2Q0ZVigfCGggOVB2Vc1ZCPEc6j21NIFLODSJbvNaef1g4PxhPwl_3kax86YPheFUSLXPRs98vvYsoBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhDZBrP2Jc2VjcDI1NmsxoQM6jr8Rb1ktLEsVcKAPa08wCsKUmvoQ8khiOl_SLozf9IN1ZHCCIyg",
// Teku team (Consensys)
"enr:-KG4QOtcP9X1FbIMOe17QNMKqDxCpm14jcX5tiOE4_TyMrFqbmhPZHK_ZPG2Gxb1GE2xdtodOfx9-cgvNtxnRyHEmC0ghGV0aDKQ9aX9QgAAAAD__________4JpZIJ2NIJpcIQDE8KdiXNlY3AyNTZrMaEDhpehBDbZjM_L9ek699Y7vhUJ-eAdMyQW_Fil522Y0fODdGNwgiMog3VkcIIjKA",
"enr:-KG4QDyytgmE4f7AnvW-ZaUOIi9i79qX4JwjRAiXBZCU65wOfBu-3Nb5I7b_Rmg3KCOcZM_C3y5pg7EBU5XGrcLTduQEhGV0aDKQ9aX9QgAAAAD__________4JpZIJ2NIJpcIQ2_DUbiXNlY3AyNTZrMaEDKnz_-ps3UUOfHWVYaskI5kWYO_vtYMGYCQRAR3gHDouDdGNwgiMog3VkcIIjKA",
// Prysm team (Prysmatic Labs)
"enr:-Ku4QImhMc1z8yCiNJ1TyUxdcfNucje3BGwEHzodEZUan8PherEo4sF7pPHPSIB1NNuSg5fZy7qFsjmUKs2ea1Whi0EBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhBLf22SJc2VjcDI1NmsxoQOVphkDqal4QzPMksc5wnpuC3gvSC8AfbFOnZY_On34wIN1ZHCCIyg",
"enr:-Ku4QP2xDnEtUXIjzJ_DhlCRN9SN99RYQPJL92TMlSv7U5C1YnYLjwOQHgZIUXw6c-BvRg2Yc2QsZxxoS_pPRVe0yK8Bh2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhBLf22SJc2VjcDI1NmsxoQMeFF5GrS7UZpAH2Ly84aLK-TyvH-dRo0JM1i8yygH50YN1ZHCCJxA",
"enr:-Ku4QPp9z1W4tAO8Ber_NQierYaOStqhDqQdOPY3bB3jDgkjcbk6YrEnVYIiCBbTxuar3CzS528d2iE7TdJsrL-dEKoBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhBLf22SJc2VjcDI1NmsxoQMw5fqqkw2hHC4F5HZZDPsNmPdB1Gi8JPQK7pRc9XHh-oN1ZHCCKvg",
// Nimbus team
"enr:-LK4QA8FfhaAjlb_BXsXxSfiysR7R52Nhi9JBt4F8SPssu8hdE1BXQQEtVDC3qStCW60LSO7hEsVHv5zm8_6Vnjhcn0Bh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhAN4aBKJc2VjcDI1NmsxoQJerDhsJ-KxZ8sHySMOCmTO6sHM3iCFQ6VMvLTe948MyYN0Y3CCI4yDdWRwgiOM",
"enr:-LK4QKWrXTpV9T78hNG6s8AM6IO4XH9kFT91uZtFg1GcsJ6dKovDOr1jtAAFPnS2lvNltkOGA9k29BUN7lFh_sjuc9QBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpC1MD8qAAAAAP__________gmlkgnY0gmlwhANAdd-Jc2VjcDI1NmsxoQLQa6ai7y9PMN5hpLe5HmiJSlYzMuzP7ZhwRiwHvqNXdoN0Y3CCI4yDdWRwgiOM",
];
for bootnode in bootnodes {
let enr: Enr = bootnode.parse().unwrap();
discv5.add_enr(enr).unwrap();
}
// start disc service
let listen_addr = format!("0.0.0.0:{port}").parse::<SocketAddr>().unwrap();
discv5.start(listen_addr).await.unwrap();
let mut event_stream = discv5.event_stream().await.unwrap();
let mut query_interval = tokio::time::interval(Duration::from_secs(query_interval));
// setup gossip sub
// derive peer_id from enr key
let bytes = _enr_sign_key.to_bytes();
let secret = identity::secp256k1::SecretKey::from_bytes(bytes).unwrap();
let local_keypair = identity::secp256k1::Keypair::from(secret);
let local_keypair = identity::Keypair::Secp256k1(local_keypair);
let local_peer_id = PeerId::from(local_keypair.public());
println!("Local peer id: {local_peer_id}");
let (transport, _) = build_transport(local_keypair.clone())
.map_err(|e| format!("Failed to build transport: {:?}", e))?;
let snappy_transform = SnappyTransform::new(65536);
let gossipsub = BaseGossipsub::new_with_transform(
MessageAuthenticity::Signed(local_keypair.clone()),
GossipsubConfig::default(),
None,
snappy_transform
)
.map_err(|e| format!("Could not construct gossipsub: {:?}", e))?;
let behaviour = Behaviour {
gossipsub,
};
let mut swarm = Swarm::with_tokio_executor(
transport,
behaviour,
local_peer_id
);
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
// // subscribe to the topics
// let local_fork_id = discv5.local_enr().eth2().unwrap().fork_digest;
// println!("fork_digest: local vs global: {fork_digest:?} {local_fork_id:?}");
// return Ok(());
// let topics = lighthouse_network::types::CORE_TOPICS;
let topics = vec![
GossipKind::BeaconBlock,
];
for topic_kind in topics.iter() {
let topic = GossipTopic::new(
topic_kind.clone(),
GossipEncoding::default(),
fork_digest
);
swarm.behaviour_mut().gossipsub.subscribe(&topic.into())?;
}
let mut known_peers = HashSet::new();
// listen for new events
info!("starting event loop...");
loop {
tokio::select! {
swarm_event = swarm.select_next_some() => {
match swarm_event {
SwarmEvent::Behaviour(behaviour_event) => match behaviour_event {
// Handle sub-behaviour events.
BehaviourEvent::Gossipsub(ge) => {
match ge {
GossipsubEvent::Message {
propagation_source,
message_id: id,
message: gs_msg,
} => {
info!("gossip message: {propagation_source:#?} {gs_msg:#?}");
},
GossipsubEvent::Subscribed { peer_id, topic } => {
info!("subscribe: {peer_id:?} {topic:?}");
}
_ => {}
}
},
_ => {}
// BehaviourEvent::Eth2Rpc(re) => self.inject_rpc_event(re),
// BehaviourEvent::Discovery(de) => self.inject_discovery_event(de),
// BehaviourEvent::Identify(ie) => self.inject_identify_event(ie),
// BehaviourEvent::PeerManager(pe) => self.inject_pm_event(pe),
},
_ => {},
};
},
Some(discv5_ev) = event_stream.recv() => {
// info!("discv5: {discv5_ev:?}");
},
_ = query_interval.tick() => {
// get metrics
let metrics = discv5.metrics();
let connected_peers = discv5.connected_peers();
info!("Connected peers: {}, Active sessions: {}, Unsolicited requests/s: {:.2}", connected_peers, metrics.active_sessions, metrics.unsolicited_requests_per_second);
if connected_peers > 50 {
continue
}
info!("Searching for peers...");
// pick a random node target
let target_random_node_id = enr::NodeId::random();
let predicate: Box<dyn Fn(&Enr) -> bool + Send> = Box::new(move |enr: &Enr| {
enr.eth2().map(|e2| e2.fork_digest) == Ok(fork_digest) && (enr.tcp4().is_some() || enr.tcp6().is_some())
});
match discv5.find_node_predicate(target_random_node_id, predicate, 16).await {
Err(e) => warn!("Find Node result failed: {:?}", e),
Ok(found_peers) => {
let n = found_peers.len();
info!("found {n:?} peers...");
for enr in found_peers {
let enr_b64 = enr.to_base64();
if !known_peers.contains(&enr_b64) {
// dial new peer
for mut multiaddr in enr.multiaddr() {
// ignore udp multiaddr if it exists
let components = multiaddr.iter().collect::<Vec<_>>();
if let MProtocol::Udp(_) = components[1] {
continue;
}
// info!("dialing peer...");
strip_peer_id(&mut multiaddr);
swarm.dial(multiaddr)?;
}
// update records
known_peers.insert(enr_b64);
}
}
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment