Created
September 18, 2017 18:04
-
-
Save nayato/769f91db411f7be7f8f59b05e024d80e to your computer and use it in GitHub Desktop.
SASL exchange in rust-amqp1 expanded
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![feature(prelude_import)] | |
#![no_std] | |
#![feature(proc_macro, conservative_impl_trait, generators)] | |
#[prelude_import] | |
use std::prelude::v1::*; | |
#[macro_use] | |
extern crate std as std; | |
extern crate amqp1 as amqp; | |
extern crate futures_await as futures; | |
extern crate tokio_io; | |
extern crate tokio_core; | |
extern crate bytes; | |
use futures::prelude::*; | |
use tokio_core::net::TcpStream; | |
use tokio_core::reactor::{Core, Handle}; | |
use tokio_io::AsyncRead; | |
use tokio_io::io::{read_exact, write_all}; | |
use std::net::SocketAddr; | |
use futures::{Future, Sink, Stream}; | |
use amqp::{Error, Result, ResultExt}; | |
use amqp::types::Symbol; | |
use amqp::io::{AmqpDecoder, AmqpEncoder}; | |
use amqp::protocol::{ProtocolId, encode_protocol_header, | |
decode_protocol_header}; | |
use amqp::framing::{AmqpFrame, SaslFrame}; | |
use amqp::protocol::{Frame, SaslFrameBody, SaslInit}; | |
use bytes::BytesMut; | |
fn main() { | |
let mut core = Core::new().unwrap(); | |
let handle = core.handle(); | |
let client = send(handle); | |
core.run(client).unwrap(); | |
} | |
fn send(handle: Handle) | |
-> impl ::futures::__rt::MyFuture<Result<()>> + 'static { | |
::futures::__rt::gen( // todo: surface for higher level to be able to respond properly / validate | |
move || | |
{ | |
let __e: ::futures::__rt::Result<_, _> = | |
{ | |
{ | |
let addr = | |
"127.0.0.1:5769".parse().unwrap(); | |
let tcp = | |
{ | |
let mut future = | |
TcpStream::connect(&addr, | |
&handle); | |
loop { | |
match ::futures::Future::poll(&mut future) | |
{ | |
::futures::__rt::Ok(::futures::Async::Ready(e)) | |
=> { | |
break | |
::futures::__rt::Ok(e) | |
} | |
::futures::__rt::Ok(::futures::Async::NotReady) | |
=> { | |
} | |
::futures::__rt::Err(e) | |
=> { | |
break | |
::futures::__rt::Err(e) | |
} | |
} | |
yield | |
} | |
}?; | |
let (mut reader, mut writer) = | |
tcp.split(); | |
let header_buf = | |
encode_protocol_header(ProtocolId::AmqpSasl); | |
let (writer, _) = | |
{ | |
let mut future = | |
write_all(writer, | |
header_buf); | |
loop { | |
match ::futures::Future::poll(&mut future) | |
{ | |
::futures::__rt::Ok(::futures::Async::Ready(e)) | |
=> { | |
break | |
::futures::__rt::Ok(e) | |
} | |
::futures::__rt::Ok(::futures::Async::NotReady) | |
=> { | |
} | |
::futures::__rt::Err(e) | |
=> { | |
break | |
::futures::__rt::Err(e) | |
} | |
} | |
yield | |
} | |
}?; | |
let mut header_buf = [0; 8]; | |
let (reader, header_buf) = | |
{ | |
let mut future = | |
read_exact(reader, | |
header_buf); | |
loop { | |
match ::futures::Future::poll(&mut future) | |
{ | |
::futures::__rt::Ok(::futures::Async::Ready(e)) | |
=> { | |
break | |
::futures::__rt::Ok(e) | |
} | |
::futures::__rt::Ok(::futures::Async::NotReady) | |
=> { | |
} | |
::futures::__rt::Err(e) | |
=> { | |
break | |
::futures::__rt::Err(e) | |
} | |
} | |
yield | |
} | |
}?; | |
let protocol_id = | |
decode_protocol_header(&header_buf)?; | |
if protocol_id != | |
ProtocolId::AmqpSasl { | |
return Err(::fmt::format(::std::fmt::Arguments::new_v1(&["expected SASL protocol id, seen `", | |
" instead.`"], | |
&match (&protocol_id,) | |
{ | |
(__arg0,) | |
=> | |
[::std::fmt::ArgumentV1::new(__arg0, | |
::std::fmt::Debug::fmt)], | |
})).into()); | |
} | |
let sasl_reader = | |
tokio_io::codec::FramedRead::new(reader, | |
AmqpDecoder::<SaslFrame>::new()); | |
let (sasl_frame, sasl_reader) = | |
{ | |
let mut future = | |
sasl_reader.into_future(); | |
loop { | |
match ::futures::Future::poll(&mut future) | |
{ | |
::futures::__rt::Ok(::futures::Async::Ready(e)) | |
=> { | |
break | |
::futures::__rt::Ok(e) | |
} | |
::futures::__rt::Ok(::futures::Async::NotReady) | |
=> { | |
} | |
::futures::__rt::Err(e) | |
=> { | |
break | |
::futures::__rt::Err(e) | |
} | |
} | |
yield | |
} | |
}.map_err(|e| e.0)?; | |
let plain_symbol = | |
Symbol::from_static("PLAIN"); | |
if let Some(SaslFrame { | |
body: SaslFrameBody::SaslMechanisms(mechs) | |
}) = sasl_frame { | |
if !mechs.sasl_server_mechanisms().0.iter().any(|m| | |
*m | |
== | |
plain_symbol) | |
{ | |
return Err(::fmt::format(::std::fmt::Arguments::new_v1(&["only PLAIN SASL mechanism is supported. server supports: "], | |
&match (&mechs.sasl_server_mechanisms(),) | |
{ | |
(__arg0,) | |
=> | |
[::std::fmt::ArgumentV1::new(__arg0, | |
::std::fmt::Debug::fmt)], | |
})).into()); | |
} | |
} else { | |
return Err(::fmt::format(::std::fmt::Arguments::new_v1(&["expected SASL mechanisms frame to arrive, seen `", | |
"` instead."], | |
&match (&sasl_frame,) | |
{ | |
(__arg0,) | |
=> | |
[::std::fmt::ArgumentV1::new(__arg0, | |
::std::fmt::Debug::fmt)], | |
})).into()); | |
} | |
let sasl_writer = | |
tokio_io::codec::FramedWrite::new(writer, | |
AmqpEncoder::<SaslFrame>::new()); | |
let initial_response = | |
SaslInit::prepare_response("", | |
"duggie", | |
"pow wow"); | |
let sasl_writer = | |
{ | |
let mut future = | |
sasl_writer.send(SaslFrame::new(SaslFrameBody::SaslInit(SaslInit{mechanism: | |
plain_symbol, | |
initial_response: | |
Some(initial_response), | |
hostname: | |
None,}))); | |
loop { | |
match ::futures::Future::poll(&mut future) | |
{ | |
::futures::__rt::Ok(::futures::Async::Ready(e)) | |
=> { | |
break | |
::futures::__rt::Ok(e) | |
} | |
::futures::__rt::Ok(::futures::Async::NotReady) | |
=> { | |
} | |
::futures::__rt::Err(e) | |
=> { | |
break | |
::futures::__rt::Err(e) | |
} | |
} | |
yield | |
} | |
}?; | |
let (sasl_frame, sasl_reader) = | |
{ | |
let mut future = | |
sasl_reader.into_future(); | |
loop { | |
match ::futures::Future::poll(&mut future) | |
{ | |
::futures::__rt::Ok(::futures::Async::Ready(e)) | |
=> { | |
break | |
::futures::__rt::Ok(e) | |
} | |
::futures::__rt::Ok(::futures::Async::NotReady) | |
=> { | |
} | |
::futures::__rt::Err(e) | |
=> { | |
break | |
::futures::__rt::Err(e) | |
} | |
} | |
yield | |
} | |
}.map_err(|e| e.0)?; | |
::io::_print(::std::fmt::Arguments::new_v1(&["sasl.outcome: ", | |
"\n"], | |
&match (&sasl_frame,) | |
{ | |
(__arg0,) | |
=> | |
[::std::fmt::ArgumentV1::new(__arg0, | |
::std::fmt::Debug::fmt)], | |
})); | |
Ok(()) | |
} | |
}; | |
#[allow(unreachable_code)] | |
{ return __e; loop { yield } } | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment