Skip to content

Instantly share code, notes, and snippets.

@fu5ha
Created March 12, 2018 21:42
Show Gist options
  • Save fu5ha/47426cefa0e66b3d319f946643908cb0 to your computer and use it in GitHub Desktop.
Save fu5ha/47426cefa0e66b3d319f946643908cb0 to your computer and use it in GitHub Desktop.
[2018-03-12][14:34:15][nano_rs][INFO] Starting nano-rs!
[2018-03-12][14:34:15][nano_rs][INFO] Listening on: [::1]:7075
[2018-03-12][14:34:15][nano_rs][INFO] Found initial peer: 139.162.199.142:7075
[2018-03-12][14:34:15][nano_rs][INFO] Found initial peer: 138.201.94.249:7075
[2018-03-12][14:34:15][nano_rs][INFO] Found initial peer: 128.199.199.97:7075
[2018-03-12][14:34:15][nano_rs][INFO] Found initial peer: 192.99.176.122:7075
[2018-03-12][14:34:15][nano_rs][INFO] Found initial peer: 192.99.176.121:7075
[2018-03-12][14:34:15][nano_rs][INFO] Found initial peer: 45.32.246.108:7075
[2018-03-12][14:34:15][nano_rs][INFO] Found initial peer: 138.68.2.234:7075
[2018-03-12][14:34:15][nano_rs][INFO] Found initial peer: 144.217.167.119:7075
[2018-03-12][14:34:15][nano_rs][INFO] Found initial peer: 192.95.57.248:7075
[2018-03-12][14:34:15][nano_rs][INFO] Found initial peer: [2600:3c0c::f03c:91ff:fee5:29e]:7075
[2018-03-12][14:34:15][nano_rs][INFO] Found initial peer: [2a01:7e00::f03c:91ff:fe7a:c863]:7075
[2018-03-12][14:34:15][nano_rs][INFO] Found initial peer: [2607:5300:201:3000::5d94]:7075
[2018-03-12][14:34:15][nano_rs][INFO] Found initial peer: [2001:19f0:5801:332:5400:ff:fe50:7ed7]:7075
[2018-03-12][14:34:15][mio::sys::unix::kqueue][TRACE] registering; token=Token(18446744073709551615); interests=Readable
[2018-03-12][14:34:15][mio::poll][TRACE] registering with poller
[2018-03-12][14:34:15][tokio_reactor::background][DEBUG] starting background reactor
[2018-03-12][14:34:15][tokio_threadpool][TRACE] build; num-workers=4
[2018-03-12][14:34:15][tokio_threadpool][TRACE] execute; count=1
[2018-03-12][14:34:15][tokio_threadpool][TRACE] submit to existing worker; idx=3; state=WorkerState { lifecycle: "WORKER_SHUTDOWN", is_pushed: true }
[2018-03-12][14:34:15][tokio_threadpool][TRACE] spawning new worker thread; idx=3
[2018-03-12][14:34:15][tokio_threadpool][TRACE] found work while draining; signal_work
[2018-03-12][14:34:15][tokio_threadpool][TRACE] signal_work -- spawn; idx=2
[2018-03-12][14:34:15][tokio_threadpool][TRACE] spawning new worker thread; idx=2
[2018-03-12][14:34:15][tokio_threadpool][TRACE] stole task
[2018-03-12][14:34:15][tokio_threadpool::task][TRACE] running; state=Scheduled
[2018-03-12][14:34:15][tokio_threadpool::task][TRACE] Task::run; state=Running
[2018-03-12][14:34:15][tokio::net::udp::frame][TRACE] sending frame
[2018-03-12][14:34:15][nano_lib_rs::message][TRACE] Serialized message: [82, 67, 5, 5, 1, 2, 0, 0]
[2018-03-12][14:34:15][tokio::net::udp::frame][TRACE] frame encoded; length=8
[2018-03-12][14:34:15][tokio::net::udp::frame][TRACE] sending frame
[2018-03-12][14:34:15][tokio::net::udp::frame][TRACE] flushing frame; length=8
[2018-03-12][14:34:15][mio::poll][TRACE] registering with poller
[2018-03-12][14:34:15][mio::sys::unix::kqueue][TRACE] registering; token=Token(1); interests=Readable | Writable | Error | Hup
[2018-03-12][14:34:15][tokio_threadpool::task][TRACE] -> not ready
[2018-03-12][14:34:15][tokio_threadpool][TRACE] try_steal_task -- signal_work; self=2; from=3
[2018-03-12][14:34:15][tokio_threadpool][TRACE] signal_work -- spawn; idx=1
[2018-03-12][14:34:15][tokio_threadpool][TRACE] spawning new worker thread; idx=1
[2018-03-12][14:34:15][tokio_reactor][TRACE] event Writable Token(1)
[2018-03-12][14:34:15][tokio_threadpool][TRACE] Notifier::notify; id=0x10281b370
[2018-03-12][14:34:15][tokio_threadpool][TRACE] submit to existing worker; idx=0; state=WorkerState { lifecycle: "WORKER_SHUTDOWN", is_pushed: true }
[2018-03-12][14:34:15][tokio_threadpool][TRACE] spawning new worker thread; idx=0
[2018-03-12][14:34:15][tokio_threadpool][TRACE] found work while draining; signal_work
[2018-03-12][14:34:15][tokio_threadpool::task][TRACE] running; state=Scheduled
[2018-03-12][14:34:15][tokio_threadpool::task][TRACE] Task::run; state=Running
[2018-03-12][14:34:15][tokio::net::udp::frame][TRACE] sending frame
[2018-03-12][14:34:15][tokio::net::udp::frame][TRACE] flushing frame; length=8
[2018-03-12][14:34:15][mio::poll][TRACE] deregistering handle with poller
[2018-03-12][14:34:15][tokio_reactor][DEBUG] dropping I/O source: 0
[2018-03-12][14:34:15][tokio_threadpool][TRACE] shutdown; state=State { lifecycle: 0, num_futures: 1 }
[2018-03-12][14:34:15][tokio_threadpool][TRACE] -> transitioned to shutdown
[2018-03-12][14:34:15][tokio_threadpool][TRACE] Shutdown::poll
[2018-03-12][14:34:15][tokio_reactor][DEBUG] loop process - 1 events, 0.010s
[2018-03-12][14:34:15][nano_rs][ERROR] Got error: Error(IoError(Os { code: 22, kind: InvalidInput, message: "Invalid argument" }), State { next_error: None, backtrace: None })
[2018-03-12][14:34:15][tokio_threadpool::task][TRACE] -> task complete
[2018-03-12][14:34:15][tokio_threadpool][TRACE] task complete; state=State { lifecycle: 2, num_futures: 0 }
[2018-03-12][14:34:15][tokio_threadpool][TRACE] -> shutting down workers
[2018-03-12][14:34:15][tokio_threadpool][TRACE] shutting down thread; idx=0
[2018-03-12][14:34:15][tokio_threadpool][TRACE] worker_terminated; num_workers=3
[2018-03-12][14:34:15][tokio_threadpool][TRACE] shutting down thread; idx=3
[2018-03-12][14:34:15][tokio_threadpool][TRACE] worker_terminated; num_workers=2
[2018-03-12][14:34:15][tokio_threadpool][TRACE] shutting down thread; idx=2
[2018-03-12][14:34:15][tokio_threadpool][TRACE] worker_terminated; num_workers=1
[2018-03-12][14:34:15][tokio_threadpool][TRACE] shutting down thread; idx=1
[2018-03-12][14:34:15][tokio_threadpool][TRACE] worker_terminated; num_workers=0
[2018-03-12][14:34:15][tokio_threadpool][TRACE] notifying shutdown task
[2018-03-12][14:34:15][tokio_threadpool][TRACE] Shutdown::poll
[2018-03-12][14:34:15][tokio_reactor][TRACE] event Readable Token(0)
[2018-03-12][14:34:15][tokio_reactor][DEBUG] loop process - 1 events, 0.000s
[2018-03-12][14:34:15][tokio_reactor::background][DEBUG] shutting background reactor on idle
[2018-03-12][14:34:15][tokio_reactor::background][DEBUG] background reactor has shutdown
[2018-03-12][14:34:15][nano_rs][INFO] Stopping nano-rs!
fn run() -> Result<()> {
info!("Starting nano-rs!");
let addr = "[::1]:7075".parse()?;
let socket = UdpSocket::bind(&addr)?;
info!("Listening on: {}", socket.local_addr()?);
let (sink, stream) = UdpFramed::new(socket, MessageCodec::new()).split();
let init_addrs = "rai.raiblocks.net:7075".to_socket_addrs()?;
let mut initial_peers = Vec::new();
for addr in init_addrs {
info!("Found initial peer: {}", addr);
initial_peers.push(addr);
}
if let None = initial_peers.get(0) {
return Err("Could not connect to initial peer".into());
}
// let _state = Arc::new(Mutex::new(State::new(initial_peers)));
let init_msgs = stream::iter_ok::<_, nano_lib_rs::error::Error>(initial_peers.into_iter()).map(|peer| {
(MessageBuilder::new(MessageKind::KeepAliveMessage).build(), peer)
});
let handler = sink.send_all(init_msgs).and_then(|(sink, _)| {
let out_stream = stream.map(|(msg, addr)| {
let kind = msg.kind();
info!("Received message of kind: {:?} from {}", kind, addr);
(MessageBuilder::new(MessageKind::KeepAliveMessage).build(), addr)
});
sink.send_all(out_stream)
});
tokio::run(
handler
.map(|_| ())
.map_err(|e| error!("Got error: {:?}", e))
);
info!("Stopping nano-rs!");
Ok(())
}
pub struct MessageCodec(());
impl MessageCodec {
pub fn new() -> Self {
MessageCodec(())
}
}
impl Decoder for MessageCodec {
type Item = Message;
type Error = Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>> {
if buf.len() < 8 {
return Ok(None)
}
let header_bytes = buf.split_to(8);
let header: MessageHeader = bincode::deserialize(&header_bytes[..])?;
let len = buf.len();
let data = Bytes::from(buf.split_off(len));
let message = Message::new(header, data);
Ok(Some(message))
}
}
impl Encoder for MessageCodec {
type Item = Message;
type Error = Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<()> {
let msg_ser = item.serialize()?;
trace!("Serialized message: {:?}", &msg_ser[..]);
dst.reserve(msg_ser.len());
dst.put(msg_ser);
Ok(())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment