Skip to content

Instantly share code, notes, and snippets.

@quake
Last active April 25, 2018 07:37
Show Gist options
  • Save quake/a395da438d8110acb5acee517b84ec1f to your computer and use it in GitHub Desktop.
Save quake/a395da438d8110acb5acee517b84ec1f to your computer and use it in GitHub Desktop.
[dev-dependencies]
tokio-core = "0.1"
libp2p-tcp-transport = { path = "../tcp-transport"}
libp2p-peerstore = { path = "../peerstore"}
multiplex = { path = "../multiplex-rs" }
rand = "0.4"
env_logger = "0.5.4"
extern crate futures;
extern crate libp2p_swarm;
extern crate libp2p_floodsub;
extern crate libp2p_tcp_transport;
extern crate libp2p_peerstore;
extern crate multiplex;
extern crate tokio_core;
extern crate tokio_io;
extern crate rand;
extern crate env_logger;
use libp2p_swarm::{swarm, Multiaddr, MuxedTransport, StreamMuxer, Transport, SwarmController};
use libp2p_tcp_transport::TcpConfig;
use libp2p_floodsub::{FloodSubUpgrade, TopicBuilder, FloodSubController, FloodSubReceiver};
use tokio_core::reactor::Core;
use tokio_io::codec::length_delimited::Framed;
use std::sync::{atomic, mpsc};
use std::thread;
use std::time::Duration;
use futures::sync::oneshot;
use libp2p_peerstore::PeerId;
use futures::{Future, Stream};
use multiplex::MultiplexConfig;
#[test]
fn basic_floodsub() {
let _ = env_logger::try_init();
let (addr_tx, addr_rx) = mpsc::channel();
let (msg_tx, msg_rx) = mpsc::channel::<&[u8]>();
let bg_thread = thread::spawn(move || {
let my_id = {
let key = (0..2048).map(|_| rand::random::<u8>()).collect::<Vec<_>>();
PeerId::from_public_key(&key)
};
let (floodsub_upgrade, floodsub_rx) = FloodSubUpgrade::new(my_id);
let mut core = Core::new().unwrap();
let transport = TcpConfig::new(core.handle()).with_upgrade(floodsub_upgrade.clone());
let (listener, addr) = transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
addr_tx.send(addr).unwrap();
let future = listener
.into_future()
.map_err(|(err, _)| err)
.and_then(|(client, _)| client.unwrap().map(|v| v.0))
.and_then(|floodsub_future| floodsub_future);
let topic = TopicBuilder::new("test").build();
let floodsub_ctl = FloodSubController::new(&floodsub_upgrade);
floodsub_ctl.publish(&topic, msg_rx.recv().unwrap().to_vec());
core.run(future);
});
let my_id = {
let key = (0..2048).map(|_| rand::random::<u8>()).collect::<Vec<_>>();
PeerId::from_public_key(&key)
};
let (floodsub_upgrade, floodsub_rx) = FloodSubUpgrade::new(my_id);
let mut core = Core::new().unwrap();
let transport = TcpConfig::new(core.handle()).with_upgrade(floodsub_upgrade.clone());
let future = transport
.dial(addr_rx.recv().unwrap())
.unwrap_or_else(|_| panic!())
.and_then(|(floodsub_future, _)| floodsub_future );
let topic = TopicBuilder::new("test").build();
let floodsub_ctl = FloodSubController::new(&floodsub_upgrade);
floodsub_ctl.subscribe(&topic);
msg_tx.send(b"hello");
let rx = floodsub_rx.for_each(|msg| {
assert_eq!(b"hello".to_vec(), msg.data);
Ok(())
});
core.run(future.select(rx));
bg_thread.join().unwrap();
}
DEBUG 2018-04-25T07:33:55Z: libp2p_floodsub: Queuing sub/unsub message ; sub = ["5zKtTsxw"] ; unsub = []
DEBUG 2018-04-25T07:33:55Z: libp2p_floodsub: Message queued for 0 remotes
DEBUG 2018-04-25T07:33:55Z: libp2p_floodsub: Queueing publish message ; topics = ["5zKtTsxw"] ; data_len = 5
DEBUG 2018-04-25T07:33:55Z: libp2p_floodsub: Message queued for 0 remotes
DEBUG 2018-04-25T07:33:55Z: libp2p_floodsub: Upgrading connection to /ip4/127.0.0.1/tcp/64984 as floodsub
TRACE 2018-04-25T07:33:55Z: libp2p_floodsub: Effectively sending message to remote
DEBUG 2018-04-25T07:33:55Z: libp2p_floodsub: Upgrading connection to /ip4/127.0.0.1/tcp/64983 as floodsub
TRACE 2018-04-25T07:33:55Z: libp2p_floodsub: Effectively sending message to remote
TRACE 2018-04-25T07:33:55Z: libp2p_floodsub: Received packet from /ip4/127.0.0.1/tcp/64983
TRACE 2018-04-25T07:33:55Z: libp2p_floodsub: Received packet from /ip4/127.0.0.1/tcp/64984
TRACE 2018-04-25T07:33:55Z: libp2p_floodsub: Remote /ip4/127.0.0.1/tcp/64984 subscribed to TopicHash { hash: "5zKtTsxw" }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment