Skip to content

Instantly share code, notes, and snippets.

@martell
Created November 21, 2019 09:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save martell/1afe6fb9442e550e7b3def82c76d5c5b to your computer and use it in GitHub Desktop.
Save martell/1afe6fb9442e550e7b3def82c76d5c5b to your computer and use it in GitHub Desktop.
actix-web-actors futures 0.3 WIP
diff --git a/Cargo.toml b/Cargo.toml
index 3efc058d..92491f8a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -37,7 +37,7 @@ members = [
#"actix-session",
"actix-identity",
#"actix-multipart",
- #"actix-web-actors",
+ "actix-web-actors",
"actix-web-codegen",
"test-server",
]
@@ -124,7 +124,7 @@ actix-web = { path = "." }
actix-http = { path = "actix-http" }
actix-http-test = { path = "test-server" }
actix-web-codegen = { path = "actix-web-codegen" }
-# actix-web-actors = { path = "actix-web-actors" }
+actix-web-actors = { path = "actix-web-actors" }
# actix-session = { path = "actix-session" }
actix-files = { path = "actix-files" }
# actix-multipart = { path = "actix-multipart" }
diff --git a/actix-web-actors/Cargo.toml b/actix-web-actors/Cargo.toml
index d5a6ce2c..e5f32c95 100644
--- a/actix-web-actors/Cargo.toml
+++ b/actix-web-actors/Cargo.toml
@@ -19,12 +19,12 @@ path = "src/lib.rs"
[dependencies]
actix = "0.8.3"
-actix-web = "1.0.9"
-actix-http = "0.2.11"
-actix-codec = "0.1.2"
+actix-web = "2.0.0-alpha.1"
+actix-http = "0.3.0-alpha.1"
+actix-codec = "0.2.0-alpha.1"
bytes = "0.4"
-futures = "0.1.25"
+futures = "0.3.1"
[dev-dependencies]
env_logger = "0.6"
-actix-http-test = { version = "0.2.4", features=["ssl"] }
+actix-http-test = { version = "0.3.0-alpha.1", features=["openssl"] }
diff --git a/actix-web-actors/src/context.rs b/actix-web-actors/src/context.rs
index 31b29500..8adf0458 100644
--- a/actix-web-actors/src/context.rs
+++ b/actix-web-actors/src/context.rs
@@ -1,4 +1,5 @@
use std::collections::VecDeque;
+use std::task::{Context, Poll};
use actix::dev::{
AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, ToEnvelope,
@@ -7,10 +8,11 @@ use actix::fut::ActorFuture;
use actix::{
Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle,
};
+use actix_http::error::PayloadError;
use actix_web::error::{Error, ErrorInternalServerError};
use bytes::Bytes;
-use futures::sync::oneshot::Sender;
-use futures::{Async, Future, Poll, Stream};
+use futures::channel::oneshot::Sender;
+use futures::{Future, Stream};
/// Execution context for http actors
pub struct HttpContext<A>
@@ -81,7 +83,7 @@ where
{
#[inline]
/// Create a new HTTP Context from a request and an actor
- pub fn create(actor: A) -> impl Stream<Item = Bytes, Error = Error> {
+ pub fn create(actor: A) -> impl Stream<Item = Result<Bytes, PayloadError>> {
let mb = Mailbox::default();
let ctx = HttpContext {
inner: ContextParts::new(mb.sender_producer()),
@@ -91,7 +93,7 @@ where
}
/// Create a new HTTP Context
- pub fn with_factory<F>(f: F) -> impl Stream<Item = Bytes, Error = Error>
+ pub fn with_factory<F>(f: F) -> impl Stream<Item = Result<Bytes, PayloadError>>
where
F: FnOnce(&mut Self) -> A + 'static,
{
@@ -160,24 +162,23 @@ impl<A> Stream for HttpContextFut<A>
where
A: Actor<Context = HttpContext<A>>,
{
- type Item = Bytes;
- type Error = Error;
+ type Item = Result<Bytes, PayloadError>;
- fn poll(&mut self) -> Poll<Option<Bytes>, Error> {
+ fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.fut.alive() {
- match self.fut.poll() {
- Ok(Async::NotReady) | Ok(Async::Ready(())) => (),
- Err(_) => return Err(ErrorInternalServerError("error")),
+ match self.fut.poll(cx) {
+ Poll::Pending | Poll::Ready(Ok(())) => (),
+ Poll::Ready(Err(err)) => Err(ErrorInternalServerError("error")),
}
}
// frames
if let Some(data) = self.fut.ctx().stream.pop_front() {
- Ok(Async::Ready(data))
+ Poll::Ready(data)
} else if self.fut.alive() {
- Ok(Async::NotReady)
+ Poll::Pending
} else {
- Ok(Async::Ready(None))
+ Poll::Ready(None)
}
}
}
diff --git a/actix-web-actors/src/ws.rs b/actix-web-actors/src/ws.rs
index e25a7e6e..8442e3db 100644
--- a/actix-web-actors/src/ws.rs
+++ b/actix-web-actors/src/ws.rs
@@ -1,6 +1,7 @@
//! Websocket integration
use std::collections::VecDeque;
use std::io;
+use std::task::{Context, Poll};
use actix::dev::{
AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, StreamHandler,
@@ -22,14 +23,14 @@ use actix_web::error::{Error, ErrorInternalServerError, PayloadError};
use actix_web::http::{header, Method, StatusCode};
use actix_web::{HttpRequest, HttpResponse};
use bytes::{Bytes, BytesMut};
-use futures::sync::oneshot::Sender;
-use futures::{Async, Future, Poll, Stream};
+use futures::channel::oneshot::Sender;
+use futures::{Future, Stream};
/// Do websocket handshake and start ws actor.
pub fn start<A, T>(actor: A, req: &HttpRequest, stream: T) -> Result<HttpResponse, Error>
where
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>,
- T: Stream<Item = Bytes, Error = PayloadError> + 'static,
+ T: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let mut res = handshake(req)?;
Ok(res.streaming(WebsocketContext::create(actor, stream)))
@@ -53,7 +54,7 @@ pub fn start_with_addr<A, T>(
) -> Result<(Addr<A>, HttpResponse), Error>
where
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>,
- T: Stream<Item = Bytes, Error = PayloadError> + 'static,
+ T: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let mut res = handshake(req)?;
let (addr, out_stream) = WebsocketContext::create_with_addr(actor, stream);
@@ -71,7 +72,7 @@ pub fn start_with_protocols<A, T>(
) -> Result<HttpResponse, Error>
where
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>,
- T: Stream<Item = Bytes, Error = PayloadError> + 'static,
+ T: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let mut res = handshake_with_protocols(req, protocols)?;
Ok(res.streaming(WebsocketContext::create(actor, stream)))
@@ -240,7 +241,7 @@ where
pub fn create<S>(actor: A, stream: S) -> impl Stream<Item = Bytes, Error = Error>
where
A: StreamHandler<Message, ProtocolError>,
- S: Stream<Item = Bytes, Error = PayloadError> + 'static,
+ S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let (_, stream) = WebsocketContext::create_with_addr(actor, stream);
stream
@@ -258,7 +259,7 @@ where
) -> (Addr<A>, impl Stream<Item = Bytes, Error = Error>)
where
A: StreamHandler<Message, ProtocolError>,
- S: Stream<Item = Bytes, Error = PayloadError> + 'static,
+ S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let mb = Mailbox::default();
let mut ctx = WebsocketContext {
@@ -281,7 +282,7 @@ where
) -> impl Stream<Item = Bytes, Error = Error>
where
A: StreamHandler<Message, ProtocolError>,
- S: Stream<Item = Bytes, Error = PayloadError> + 'static,
+ S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let mb = Mailbox::default();
let mut ctx = WebsocketContext {
@@ -301,7 +302,7 @@ where
where
F: FnOnce(&mut Self) -> A + 'static,
A: StreamHandler<Message, ProtocolError>,
- S: Stream<Item = Bytes, Error = PayloadError> + 'static,
+ S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let mb = Mailbox::default();
let mut ctx = WebsocketContext {
@@ -414,11 +415,10 @@ impl<A> Stream for WebsocketContextFut<A>
where
A: Actor<Context = WebsocketContext<A>>,
{
- type Item = Bytes;
- type Error = Error;
+ type Item = Result<Bytes, PayloadError>;
- fn poll(&mut self) -> Poll<Option<Bytes>, Error> {
- if self.fut.alive() && self.fut.poll().is_err() {
+ fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Self::Item>> {
+ if self.fut.alive() && self.fut.poll(cx).is_err() {
return Err(ErrorInternalServerError("error"));
}
@@ -433,11 +433,11 @@ where
}
if !self.buf.is_empty() {
- Ok(Async::Ready(Some(self.buf.take().freeze())))
+ Poll::Ready(Some(self.buf.take().freeze()))
} else if self.fut.alive() && !self.closed {
- Ok(Async::NotReady)
+ Poll::Pending
} else {
- Ok(Async::Ready(None))
+ Poll::Ready(None)
}
}
}
@@ -462,7 +462,7 @@ struct WsStream<S> {
impl<S> WsStream<S>
where
- S: Stream<Item = Bytes, Error = PayloadError>,
+ S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
fn new(stream: S, codec: Codec) -> Self {
Self {
@@ -476,23 +476,22 @@ where
impl<S> Stream for WsStream<S>
where
- S: Stream<Item = Bytes, Error = PayloadError>,
+ S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
- type Item = Message;
- type Error = ProtocolError;
+ type Item = Result<Message, ProtocolError>;
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Self::Item>> {
if !self.closed {
loop {
- match self.stream.poll() {
- Ok(Async::Ready(Some(chunk))) => {
+ match self.stream.poll_next(cx) {
+ Poll::Ready(Some(Ok(chunk))) => {
self.buf.extend_from_slice(&chunk[..]);
}
- Ok(Async::Ready(None)) => {
+ Poll::Ready(None) => {
self.closed = true;
break;
}
- Ok(Async::NotReady) => break,
+ Poll::Pending => break,
Err(e) => {
return Err(ProtocolError::Io(io::Error::new(
io::ErrorKind::Other,
@@ -506,9 +505,9 @@ where
match self.decoder.decode(&mut self.buf)? {
None => {
if self.closed {
- Ok(Async::Ready(None))
+ Poll::Ready(None)
} else {
- Ok(Async::NotReady)
+ Poll::Pending
}
}
Some(frm) => {
@@ -531,7 +530,7 @@ where
Frame::Pong(s) => Message::Pong(s),
Frame::Close(reason) => Message::Close(reason),
};
- Ok(Async::Ready(Some(msg)))
+ Poll::Ready(Some(Ok(msg)))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment