Skip to content

Instantly share code, notes, and snippets.

@tailhook
Created April 4, 2016 21:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tailhook/7f9df0d53818549d40d7902779758771 to your computer and use it in GitHub Desktop.
Save tailhook/7f9df0d53818549d40d7902779758771 to your computer and use it in GitHub Desktop.
Sendfile
--- examples/serve.rs 2016-03-29 00:14:50.712643740 +0300
+++ examples/serve_sendfile.rs 2016-04-05 00:13:24.651021841 +0300
@@ -14,7 +14,8 @@
enum Http {
ReadHeaders,
- SendResponse,
+ SendingFile { offset: u64 },
+ BodySent,
}
impl Protocol for Http {
@@ -40,15 +41,36 @@
"\r\n",
"Hello World!\r\n",
).as_bytes()).unwrap();
- Intent::of(Http::SendResponse).expect_flush()
- .deadline(scope.now() + Duration::new(10, 0))
+ // Wait until headers are flushed into network buffers
+ Intent::of(Http::SendingFile {
+ offset: 0,
+ file: File::open("/some/file"),
+ }).expect_flush().deadline(scope.now() + Duration::new(10, 0))
}
- fn bytes_flushed(self, _transport: &mut Transport<TcpStream>,
+ fn bytes_flushed(self, transport: &mut Transport<TcpStream>,
_scope: &mut Scope<Context>)
-> Intent<Self>
{
- // TODO(tailhook) or maybe start over?
- Intent::done()
+ match self {
+ ReadHeaders => unreachable!(),
+ SendingFile { offset, file } => {
+ match transport.sendfile(file, offset) {
+ Ok(0) => {
+ // Either connection closed or end of file
+ Intent::done()
+ }
+ Ok(bytes) => {
+ Intent::of(Http::SendingFile {
+ offset: offset + bytes,
+ file: file,
+ }).expect_flush().deadline(scope.now() + Duration::new(10, 0))
+ }
+ Err(e) => {
+ Intent::error(Box::new(e));
+ }
+ }
+ }
+ }
}
fn timeout(self, _transport: &mut Transport<TcpStream>,
_scope: &mut Scope<Context>)
extern crate rotor;
extern crate rotor_stream;
use std::io::{Write, stderr};
use std::error::Error;
use std::time::Duration;
use rotor::mio::tcp::{TcpListener, TcpStream};
use rotor::{Scope};
use rotor_stream::{Accept, Stream, Protocol, Intent, Transport, Exception};
struct Context;
enum Http {
ReadHeaders,
SendingFile { offset: u64 },
BodySent,
}
impl Protocol for Http {
type Context = Context;
type Socket = TcpStream;
type Seed = ();
fn create(_seed: (), _sock: &mut TcpStream, scope: &mut Scope<Context>)
-> Intent<Self>
{
Intent::of(Http::ReadHeaders).expect_delimiter(b"\r\n\r\n", 4096)
.deadline(scope.now() + Duration::new(10, 0))
}
fn bytes_read(self, transport: &mut Transport<TcpStream>,
_end: usize, scope: &mut Scope<Context>)
-> Intent<Self>
{
println!("Request from {:?}", transport.socket().local_addr());
transport.output().write_all(concat!(
"HTTP/1.0 200 OK\r\n",
"Server: rotor-stream-example-serve\r\n",
"Connection: close\r\n",
"Content-Length: 14\r\n",
"\r\n",
"Hello World!\r\n",
).as_bytes()).unwrap();
// Wait until headers are flushed into network buffers
Intent::of(Http::SendingFile {
offset: 0,
file: File::open("/some/file"),
}).expect_flush().deadline(scope.now() + Duration::new(10, 0))
}
fn bytes_flushed(self, transport: &mut Transport<TcpStream>,
_scope: &mut Scope<Context>)
-> Intent<Self>
{
match self {
ReadHeaders => unreachable!(),
SendingFile { offset, file } => {
match transport.sendfile(file, offset) {
Ok(0) => {
// Either connection closed or end of file
Intent::done()
}
Ok(bytes) => {
Intent::of(Http::SendingFile {
offset: offset + bytes,
file: file,
}).expect_flush().deadline(scope.now() + Duration::new(10, 0))
}
Err(e) => {
Intent::error(Box::new(e));
}
}
}
}
}
fn timeout(self, _transport: &mut Transport<TcpStream>,
_scope: &mut Scope<Context>)
-> Intent<Self>
{
writeln!(&mut stderr(), "Timeout happened").ok();
Intent::done()
}
fn wakeup(self, _transport: &mut Transport<TcpStream>,
_scope: &mut Scope<Context>)
-> Intent<Self>
{
unreachable!();
}
fn exception(self, _transport: &mut Transport<Self::Socket>,
reason: Exception, _scope: &mut Scope<Self::Context>)
-> Intent<Self>
{
writeln!(&mut stderr(), "Error: {}", reason).ok();
Intent::done()
}
fn fatal(self, reason: Exception, _scope: &mut Scope<Self::Context>)
-> Option<Box<Error>>
{
writeln!(&mut stderr(), "Error: {}", reason).ok();
None
}
}
fn main() {
let mut event_loop = rotor::Loop::new(&rotor::Config::new()).unwrap();
let lst = TcpListener::bind(&"127.0.0.1:3000".parse().unwrap()).unwrap();
let ok = event_loop.add_machine_with(|scope| {
Accept::<Stream<Http>, _>::new(lst, (), scope)
}).is_ok();
assert!(ok);
event_loop.run(Context).unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment