Skip to content

Instantly share code, notes, and snippets.

@nayato
Created September 18, 2017 18:04
Show Gist options
  • Save nayato/769f91db411f7be7f8f59b05e024d80e to your computer and use it in GitHub Desktop.
Save nayato/769f91db411f7be7f8f59b05e024d80e to your computer and use it in GitHub Desktop.
SASL exchange in rust-amqp1 expanded
#![feature(prelude_import)]
#![no_std]
#![feature(proc_macro, conservative_impl_trait, generators)]
#[prelude_import]
use std::prelude::v1::*;
#[macro_use]
extern crate std as std;
extern crate amqp1 as amqp;
extern crate futures_await as futures;
extern crate tokio_io;
extern crate tokio_core;
extern crate bytes;
use futures::prelude::*;
use tokio_core::net::TcpStream;
use tokio_core::reactor::{Core, Handle};
use tokio_io::AsyncRead;
use tokio_io::io::{read_exact, write_all};
use std::net::SocketAddr;
use futures::{Future, Sink, Stream};
use amqp::{Error, Result, ResultExt};
use amqp::types::Symbol;
use amqp::io::{AmqpDecoder, AmqpEncoder};
use amqp::protocol::{ProtocolId, encode_protocol_header,
decode_protocol_header};
use amqp::framing::{AmqpFrame, SaslFrame};
use amqp::protocol::{Frame, SaslFrameBody, SaslInit};
use bytes::BytesMut;
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = send(handle);
core.run(client).unwrap();
}
fn send(handle: Handle)
-> impl ::futures::__rt::MyFuture<Result<()>> + 'static {
::futures::__rt::gen( // todo: surface for higher level to be able to respond properly / validate
move ||
{
let __e: ::futures::__rt::Result<_, _> =
{
{
let addr =
"127.0.0.1:5769".parse().unwrap();
let tcp =
{
let mut future =
TcpStream::connect(&addr,
&handle);
loop {
match ::futures::Future::poll(&mut future)
{
::futures::__rt::Ok(::futures::Async::Ready(e))
=> {
break
::futures::__rt::Ok(e)
}
::futures::__rt::Ok(::futures::Async::NotReady)
=> {
}
::futures::__rt::Err(e)
=> {
break
::futures::__rt::Err(e)
}
}
yield
}
}?;
let (mut reader, mut writer) =
tcp.split();
let header_buf =
encode_protocol_header(ProtocolId::AmqpSasl);
let (writer, _) =
{
let mut future =
write_all(writer,
header_buf);
loop {
match ::futures::Future::poll(&mut future)
{
::futures::__rt::Ok(::futures::Async::Ready(e))
=> {
break
::futures::__rt::Ok(e)
}
::futures::__rt::Ok(::futures::Async::NotReady)
=> {
}
::futures::__rt::Err(e)
=> {
break
::futures::__rt::Err(e)
}
}
yield
}
}?;
let mut header_buf = [0; 8];
let (reader, header_buf) =
{
let mut future =
read_exact(reader,
header_buf);
loop {
match ::futures::Future::poll(&mut future)
{
::futures::__rt::Ok(::futures::Async::Ready(e))
=> {
break
::futures::__rt::Ok(e)
}
::futures::__rt::Ok(::futures::Async::NotReady)
=> {
}
::futures::__rt::Err(e)
=> {
break
::futures::__rt::Err(e)
}
}
yield
}
}?;
let protocol_id =
decode_protocol_header(&header_buf)?;
if protocol_id !=
ProtocolId::AmqpSasl {
return Err(::fmt::format(::std::fmt::Arguments::new_v1(&["expected SASL protocol id, seen `",
" instead.`"],
&match (&protocol_id,)
{
(__arg0,)
=>
[::std::fmt::ArgumentV1::new(__arg0,
::std::fmt::Debug::fmt)],
})).into());
}
let sasl_reader =
tokio_io::codec::FramedRead::new(reader,
AmqpDecoder::<SaslFrame>::new());
let (sasl_frame, sasl_reader) =
{
let mut future =
sasl_reader.into_future();
loop {
match ::futures::Future::poll(&mut future)
{
::futures::__rt::Ok(::futures::Async::Ready(e))
=> {
break
::futures::__rt::Ok(e)
}
::futures::__rt::Ok(::futures::Async::NotReady)
=> {
}
::futures::__rt::Err(e)
=> {
break
::futures::__rt::Err(e)
}
}
yield
}
}.map_err(|e| e.0)?;
let plain_symbol =
Symbol::from_static("PLAIN");
if let Some(SaslFrame {
body: SaslFrameBody::SaslMechanisms(mechs)
}) = sasl_frame {
if !mechs.sasl_server_mechanisms().0.iter().any(|m|
*m
==
plain_symbol)
{
return Err(::fmt::format(::std::fmt::Arguments::new_v1(&["only PLAIN SASL mechanism is supported. server supports: "],
&match (&mechs.sasl_server_mechanisms(),)
{
(__arg0,)
=>
[::std::fmt::ArgumentV1::new(__arg0,
::std::fmt::Debug::fmt)],
})).into());
}
} else {
return Err(::fmt::format(::std::fmt::Arguments::new_v1(&["expected SASL mechanisms frame to arrive, seen `",
"` instead."],
&match (&sasl_frame,)
{
(__arg0,)
=>
[::std::fmt::ArgumentV1::new(__arg0,
::std::fmt::Debug::fmt)],
})).into());
}
let sasl_writer =
tokio_io::codec::FramedWrite::new(writer,
AmqpEncoder::<SaslFrame>::new());
let initial_response =
SaslInit::prepare_response("",
"duggie",
"pow wow");
let sasl_writer =
{
let mut future =
sasl_writer.send(SaslFrame::new(SaslFrameBody::SaslInit(SaslInit{mechanism:
plain_symbol,
initial_response:
Some(initial_response),
hostname:
None,})));
loop {
match ::futures::Future::poll(&mut future)
{
::futures::__rt::Ok(::futures::Async::Ready(e))
=> {
break
::futures::__rt::Ok(e)
}
::futures::__rt::Ok(::futures::Async::NotReady)
=> {
}
::futures::__rt::Err(e)
=> {
break
::futures::__rt::Err(e)
}
}
yield
}
}?;
let (sasl_frame, sasl_reader) =
{
let mut future =
sasl_reader.into_future();
loop {
match ::futures::Future::poll(&mut future)
{
::futures::__rt::Ok(::futures::Async::Ready(e))
=> {
break
::futures::__rt::Ok(e)
}
::futures::__rt::Ok(::futures::Async::NotReady)
=> {
}
::futures::__rt::Err(e)
=> {
break
::futures::__rt::Err(e)
}
}
yield
}
}.map_err(|e| e.0)?;
::io::_print(::std::fmt::Arguments::new_v1(&["sasl.outcome: ",
"\n"],
&match (&sasl_frame,)
{
(__arg0,)
=>
[::std::fmt::ArgumentV1::new(__arg0,
::std::fmt::Debug::fmt)],
}));
Ok(())
}
};
#[allow(unreachable_code)]
{ return __e; loop { yield } }
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment