Skip to content

Instantly share code, notes, and snippets.

@sdroege
Created January 5, 2020 02:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sdroege/da6369a37a46b74984a7ade0a430922b to your computer and use it in GitHub Desktop.
Save sdroege/da6369a37a46b74984a7ade0a430922b to your computer and use it in GitHub Desktop.
async-tungstenite 0.2 gio example
[package]
name = "foo"
version = "0.1.0"
authors = ["Sebastian Dröge <sebastian@centricular.com>"]
edition = "2018"
[dependencies]
futures = "0.3"
async-tungstenite = { version = "0.2", default-features = false, features = ["stream"] }
url = "2"
glib = "0.9"
gio = "0.8"
use async_tungstenite::{client_async, tungstenite::Message};
use futures::prelude::*;
use gio::prelude::*;
use std::io;
async fn run() -> Result<(), Box<dyn std::error::Error>> {
let client = gio::SocketClient::new();
client.set_tls(true);
let connectable = gio::NetworkAddress::new("echo.websocket.org", 443);
let socket = client
.connect_async_future(&connectable)
.await
.map_err(|err| to_std_io_error(err))?;
let socket = IOStreamAsyncReadWrite::new(socket)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Unsupported gio::IOStream"))?;
let url = url::Url::parse("wss://echo.websocket.org").unwrap();
let (mut ws_stream, _) = client_async(url, socket).await?;
let text = "Hello, World!";
println!("Sending: \"{}\"", text);
ws_stream.send(Message::text(text)).await?;
let msg = ws_stream
.next()
.await
.ok_or_else(|| "didn't receive anything")??;
println!("Received: {:?}", msg);
Ok(())
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Get the default main context and run our async function on it
let main_context = glib::MainContext::default();
main_context.block_on(run())
}
use futures::io::{AsyncRead, AsyncWrite};
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Debug)]
struct IOStreamAsyncReadWrite<T: IsA<gio::IOStream>> {
io_stream: T,
read: gio::InputStreamAsyncRead<gio::PollableInputStream>,
write: gio::OutputStreamAsyncWrite<gio::PollableOutputStream>,
}
impl<T: IsA<gio::IOStream>> IOStreamAsyncReadWrite<T> {
fn new(stream: T) -> Result<IOStreamAsyncReadWrite<T>, T> {
let write = stream
.get_output_stream()
.and_then(|s| s.dynamic_cast::<gio::PollableOutputStream>().ok())
.and_then(|s| s.into_async_write().ok());
let read = stream
.get_input_stream()
.and_then(|s| s.dynamic_cast::<gio::PollableInputStream>().ok())
.and_then(|s| s.into_async_read().ok());
let (read, write) = match (read, write) {
(Some(read), Some(write)) => (read, write),
_ => return Err(stream),
};
Ok(IOStreamAsyncReadWrite {
io_stream: stream,
read,
write,
})
}
}
impl<T: IsA<gio::IOStream> + Unpin> AsyncRead for IOStreamAsyncReadWrite<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut Pin::get_mut(self).read).poll_read(cx, buf)
}
}
impl<T: IsA<gio::IOStream> + Unpin> AsyncWrite for IOStreamAsyncReadWrite<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut Pin::get_mut(self).write).poll_write(cx, buf)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut Pin::get_mut(self).write).poll_close(cx)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut Pin::get_mut(self).write).poll_flush(cx)
}
}
unsafe impl<T: IsA<gio::IOStream>> Send for IOStreamAsyncReadWrite<T> {}
unsafe impl<T: IsA<gio::IOStream>> Sync for IOStreamAsyncReadWrite<T> {}
fn to_std_io_error(error: glib::Error) -> io::Error {
match error.kind::<gio::IOErrorEnum>() {
Some(io_error_enum) => io::Error::new(io_error_enum.into(), error),
None => io::Error::new(io::ErrorKind::Other, error),
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment