-
-
Save martell/1afe6fb9442e550e7b3def82c76d5c5b to your computer and use it in GitHub Desktop.
actix-web-actors futures 0.3 WIP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/Cargo.toml b/Cargo.toml | |
index 3efc058d..92491f8a 100644 | |
--- a/Cargo.toml | |
+++ b/Cargo.toml | |
@@ -37,7 +37,7 @@ members = [ | |
#"actix-session", | |
"actix-identity", | |
#"actix-multipart", | |
- #"actix-web-actors", | |
+ "actix-web-actors", | |
"actix-web-codegen", | |
"test-server", | |
] | |
@@ -124,7 +124,7 @@ actix-web = { path = "." } | |
actix-http = { path = "actix-http" } | |
actix-http-test = { path = "test-server" } | |
actix-web-codegen = { path = "actix-web-codegen" } | |
-# actix-web-actors = { path = "actix-web-actors" } | |
+actix-web-actors = { path = "actix-web-actors" } | |
# actix-session = { path = "actix-session" } | |
actix-files = { path = "actix-files" } | |
# actix-multipart = { path = "actix-multipart" } | |
diff --git a/actix-web-actors/Cargo.toml b/actix-web-actors/Cargo.toml | |
index d5a6ce2c..e5f32c95 100644 | |
--- a/actix-web-actors/Cargo.toml | |
+++ b/actix-web-actors/Cargo.toml | |
@@ -19,12 +19,12 @@ path = "src/lib.rs" | |
[dependencies] | |
actix = "0.8.3" | |
-actix-web = "1.0.9" | |
-actix-http = "0.2.11" | |
-actix-codec = "0.1.2" | |
+actix-web = "2.0.0-alpha.1" | |
+actix-http = "0.3.0-alpha.1" | |
+actix-codec = "0.2.0-alpha.1" | |
bytes = "0.4" | |
-futures = "0.1.25" | |
+futures = "0.3.1" | |
[dev-dependencies] | |
env_logger = "0.6" | |
-actix-http-test = { version = "0.2.4", features=["ssl"] } | |
+actix-http-test = { version = "0.3.0-alpha.1", features=["openssl"] } | |
diff --git a/actix-web-actors/src/context.rs b/actix-web-actors/src/context.rs | |
index 31b29500..8adf0458 100644 | |
--- a/actix-web-actors/src/context.rs | |
+++ b/actix-web-actors/src/context.rs | |
@@ -1,4 +1,5 @@ | |
use std::collections::VecDeque; | |
+use std::task::{Context, Poll}; | |
use actix::dev::{ | |
AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, ToEnvelope, | |
@@ -7,10 +8,11 @@ use actix::fut::ActorFuture; | |
use actix::{ | |
Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle, | |
}; | |
+use actix_http::error::PayloadError; | |
use actix_web::error::{Error, ErrorInternalServerError}; | |
use bytes::Bytes; | |
-use futures::sync::oneshot::Sender; | |
-use futures::{Async, Future, Poll, Stream}; | |
+use futures::channel::oneshot::Sender; | |
+use futures::{Future, Stream}; | |
/// Execution context for http actors | |
pub struct HttpContext<A> | |
@@ -81,7 +83,7 @@ where | |
{ | |
#[inline] | |
/// Create a new HTTP Context from a request and an actor | |
- pub fn create(actor: A) -> impl Stream<Item = Bytes, Error = Error> { | |
+ pub fn create(actor: A) -> impl Stream<Item = Result<Bytes, PayloadError>> { | |
let mb = Mailbox::default(); | |
let ctx = HttpContext { | |
inner: ContextParts::new(mb.sender_producer()), | |
@@ -91,7 +93,7 @@ where | |
} | |
/// Create a new HTTP Context | |
- pub fn with_factory<F>(f: F) -> impl Stream<Item = Bytes, Error = Error> | |
+ pub fn with_factory<F>(f: F) -> impl Stream<Item = Result<Bytes, PayloadError>> | |
where | |
F: FnOnce(&mut Self) -> A + 'static, | |
{ | |
@@ -160,24 +162,23 @@ impl<A> Stream for HttpContextFut<A> | |
where | |
A: Actor<Context = HttpContext<A>>, | |
{ | |
- type Item = Bytes; | |
- type Error = Error; | |
+ type Item = Result<Bytes, PayloadError>; | |
- fn poll(&mut self) -> Poll<Option<Bytes>, Error> { | |
+ fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Self::Item>> { | |
if self.fut.alive() { | |
- match self.fut.poll() { | |
- Ok(Async::NotReady) | Ok(Async::Ready(())) => (), | |
- Err(_) => return Err(ErrorInternalServerError("error")), | |
+ match self.fut.poll(cx) { | |
+ Poll::Pending | Poll::Ready(Ok(())) => (), | |
+ Poll::Ready(Err(err)) => Err(ErrorInternalServerError("error")), | |
} | |
} | |
// frames | |
if let Some(data) = self.fut.ctx().stream.pop_front() { | |
- Ok(Async::Ready(data)) | |
+ Poll::Ready(data) | |
} else if self.fut.alive() { | |
- Ok(Async::NotReady) | |
+ Poll::Pending | |
} else { | |
- Ok(Async::Ready(None)) | |
+ Poll::Ready(None) | |
} | |
} | |
} | |
diff --git a/actix-web-actors/src/ws.rs b/actix-web-actors/src/ws.rs | |
index e25a7e6e..8442e3db 100644 | |
--- a/actix-web-actors/src/ws.rs | |
+++ b/actix-web-actors/src/ws.rs | |
@@ -1,6 +1,7 @@ | |
//! Websocket integration | |
use std::collections::VecDeque; | |
use std::io; | |
+use std::task::{Context, Poll}; | |
use actix::dev::{ | |
AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, StreamHandler, | |
@@ -22,14 +23,14 @@ use actix_web::error::{Error, ErrorInternalServerError, PayloadError}; | |
use actix_web::http::{header, Method, StatusCode}; | |
use actix_web::{HttpRequest, HttpResponse}; | |
use bytes::{Bytes, BytesMut}; | |
-use futures::sync::oneshot::Sender; | |
-use futures::{Async, Future, Poll, Stream}; | |
+use futures::channel::oneshot::Sender; | |
+use futures::{Future, Stream}; | |
/// Do websocket handshake and start ws actor. | |
pub fn start<A, T>(actor: A, req: &HttpRequest, stream: T) -> Result<HttpResponse, Error> | |
where | |
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>, | |
- T: Stream<Item = Bytes, Error = PayloadError> + 'static, | |
+ T: Stream<Item = Result<Bytes, PayloadError>> + 'static, | |
{ | |
let mut res = handshake(req)?; | |
Ok(res.streaming(WebsocketContext::create(actor, stream))) | |
@@ -53,7 +54,7 @@ pub fn start_with_addr<A, T>( | |
) -> Result<(Addr<A>, HttpResponse), Error> | |
where | |
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>, | |
- T: Stream<Item = Bytes, Error = PayloadError> + 'static, | |
+ T: Stream<Item = Result<Bytes, PayloadError>> + 'static, | |
{ | |
let mut res = handshake(req)?; | |
let (addr, out_stream) = WebsocketContext::create_with_addr(actor, stream); | |
@@ -71,7 +72,7 @@ pub fn start_with_protocols<A, T>( | |
) -> Result<HttpResponse, Error> | |
where | |
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>, | |
- T: Stream<Item = Bytes, Error = PayloadError> + 'static, | |
+ T: Stream<Item = Result<Bytes, PayloadError>> + 'static, | |
{ | |
let mut res = handshake_with_protocols(req, protocols)?; | |
Ok(res.streaming(WebsocketContext::create(actor, stream))) | |
@@ -240,7 +241,7 @@ where | |
pub fn create<S>(actor: A, stream: S) -> impl Stream<Item = Bytes, Error = Error> | |
where | |
A: StreamHandler<Message, ProtocolError>, | |
- S: Stream<Item = Bytes, Error = PayloadError> + 'static, | |
+ S: Stream<Item = Result<Bytes, PayloadError>> + 'static, | |
{ | |
let (_, stream) = WebsocketContext::create_with_addr(actor, stream); | |
stream | |
@@ -258,7 +259,7 @@ where | |
) -> (Addr<A>, impl Stream<Item = Bytes, Error = Error>) | |
where | |
A: StreamHandler<Message, ProtocolError>, | |
- S: Stream<Item = Bytes, Error = PayloadError> + 'static, | |
+ S: Stream<Item = Result<Bytes, PayloadError>> + 'static, | |
{ | |
let mb = Mailbox::default(); | |
let mut ctx = WebsocketContext { | |
@@ -281,7 +282,7 @@ where | |
) -> impl Stream<Item = Bytes, Error = Error> | |
where | |
A: StreamHandler<Message, ProtocolError>, | |
- S: Stream<Item = Bytes, Error = PayloadError> + 'static, | |
+ S: Stream<Item = Result<Bytes, PayloadError>> + 'static, | |
{ | |
let mb = Mailbox::default(); | |
let mut ctx = WebsocketContext { | |
@@ -301,7 +302,7 @@ where | |
where | |
F: FnOnce(&mut Self) -> A + 'static, | |
A: StreamHandler<Message, ProtocolError>, | |
- S: Stream<Item = Bytes, Error = PayloadError> + 'static, | |
+ S: Stream<Item = Result<Bytes, PayloadError>> + 'static, | |
{ | |
let mb = Mailbox::default(); | |
let mut ctx = WebsocketContext { | |
@@ -414,11 +415,10 @@ impl<A> Stream for WebsocketContextFut<A> | |
where | |
A: Actor<Context = WebsocketContext<A>>, | |
{ | |
- type Item = Bytes; | |
- type Error = Error; | |
+ type Item = Result<Bytes, PayloadError>; | |
- fn poll(&mut self) -> Poll<Option<Bytes>, Error> { | |
- if self.fut.alive() && self.fut.poll().is_err() { | |
+ fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Self::Item>> { | |
+ if self.fut.alive() && self.fut.poll(cx).is_err() { | |
return Err(ErrorInternalServerError("error")); | |
} | |
@@ -433,11 +433,11 @@ where | |
} | |
if !self.buf.is_empty() { | |
- Ok(Async::Ready(Some(self.buf.take().freeze()))) | |
+ Poll::Ready(Some(self.buf.take().freeze())) | |
} else if self.fut.alive() && !self.closed { | |
- Ok(Async::NotReady) | |
+ Poll::Pending | |
} else { | |
- Ok(Async::Ready(None)) | |
+ Poll::Ready(None) | |
} | |
} | |
} | |
@@ -462,7 +462,7 @@ struct WsStream<S> { | |
impl<S> WsStream<S> | |
where | |
- S: Stream<Item = Bytes, Error = PayloadError>, | |
+ S: Stream<Item = Result<Bytes, PayloadError>> + 'static, | |
{ | |
fn new(stream: S, codec: Codec) -> Self { | |
Self { | |
@@ -476,23 +476,22 @@ where | |
impl<S> Stream for WsStream<S> | |
where | |
- S: Stream<Item = Bytes, Error = PayloadError>, | |
+ S: Stream<Item = Result<Bytes, PayloadError>> + 'static, | |
{ | |
- type Item = Message; | |
- type Error = ProtocolError; | |
+ type Item = Result<Message, ProtocolError>; | |
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | |
+ fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Self::Item>> { | |
if !self.closed { | |
loop { | |
- match self.stream.poll() { | |
- Ok(Async::Ready(Some(chunk))) => { | |
+ match self.stream.poll_next(cx) { | |
+ Poll::Ready(Some(Ok(chunk))) => { | |
self.buf.extend_from_slice(&chunk[..]); | |
} | |
- Ok(Async::Ready(None)) => { | |
+ Poll::Ready(None) => { | |
self.closed = true; | |
break; | |
} | |
- Ok(Async::NotReady) => break, | |
+ Poll::Pending => break, | |
Err(e) => { | |
return Err(ProtocolError::Io(io::Error::new( | |
io::ErrorKind::Other, | |
@@ -506,9 +505,9 @@ where | |
match self.decoder.decode(&mut self.buf)? { | |
None => { | |
if self.closed { | |
- Ok(Async::Ready(None)) | |
+ Poll::Ready(None) | |
} else { | |
- Ok(Async::NotReady) | |
+ Poll::Pending | |
} | |
} | |
Some(frm) => { | |
@@ -531,7 +530,7 @@ where | |
Frame::Pong(s) => Message::Pong(s), | |
Frame::Close(reason) => Message::Close(reason), | |
}; | |
- Ok(Async::Ready(Some(msg))) | |
+ Poll::Ready(Some(Ok(msg))) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment