Created
August 24, 2021 11:58
-
-
Save kavanmevada/453c53c1c7f9a9a3a663bd16aa371673 to your computer and use it in GitHub Desktop.
epoll future
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![feature(async_stream)] | |
use std::future::Future; | |
use std::mem::size_of; | |
use std::net::{Ipv4Addr, SocketAddrV4}; | |
use std::os::unix::io::FromRawFd; | |
use std::ptr::read; | |
use std::str::FromStr; | |
use std::{ | |
cell::RefCell, | |
fs::File, | |
io, | |
pin::Pin, | |
stream::Stream, | |
sync::Arc, | |
task::{Context, Poll, Wake}, | |
}; | |
//use futures::StreamExt; | |
// use std::{cell::RefCell, future::Future, net::SocketAddr, os::unix::prelude::{AsRawFd, FromRawFd, RawFd}, pin::Pin, sync::Arc, task::{Context, Poll, Wake}}; | |
// use async_io::Async; | |
// use futures::{Stream, lock::Mutex, ready}; | |
// use async_std::{fs::File, io::{self, BufReader}, task}; | |
// //use async_std::net::{TcpStream}; | |
mod main_async; | |
mod sys; | |
thread_local!(static NOTIFY: RefCell<bool> = RefCell::new(true)); | |
thread_local!(static POOL: RefCell<[Option<Pin<Box<dyn Future<Output = ()>>>>; 10]> = RefCell::default()); | |
fn main() { | |
block_on(server()).expect("Error creating server!"); | |
} | |
pub fn block_on<F, T>(future: F) -> T | |
where | |
F: Future<Output = T>, | |
{ | |
let mut pin = Box::pin(future); | |
NOTIFY.with(|n| loop { | |
println!("Loop goes insane!"); | |
if *n.borrow() { | |
*n.borrow_mut() = false; | |
let waker = std::task::Waker::from(Arc::from(Waker)); | |
let mut cx = Context::from_waker(&waker); | |
let mutable = pin.as_mut().poll(&mut cx); | |
if let Poll::Ready(val) = mutable { | |
return val; | |
} | |
} | |
}) | |
} | |
pub async fn run<T>(future: impl Future<Output = T>) -> T { | |
future.await | |
} | |
async fn server() -> Result<(), String> { | |
let listener = unsafe { Listener::new("127.0.0.1:8080")? }; | |
let mut incoming = listener.incoming(); | |
while let Some(stream) = incoming.next().await { | |
dbg!(stream); | |
} | |
println!("Kavan!"); | |
Ok(()) | |
} | |
#[derive(Debug)] | |
struct Listener(i32); | |
impl Listener { | |
pub unsafe fn new(addr: &str) -> Result<Self, String> { | |
let s = sys::Socket::from_str(addr)?; | |
Ok(Self(s.0)) | |
} | |
pub fn incoming(&self) -> Incoming { | |
Incoming { | |
listener: self, | |
accept: None, | |
} | |
} | |
pub async unsafe fn accept(&self) -> Result<File, String> { | |
poll_fn(|_| { | |
match sys::Socket(self.0).accept() { | |
Ok(sock) => { | |
let file = File::from_raw_fd(sock); | |
Poll::Ready(Ok(file)) | |
}, | |
Err(e) => { | |
if e.kind() == std::io::ErrorKind::WouldBlock { | |
Poll::Pending | |
} else { | |
Err(e.to_string())? | |
} | |
} | |
} | |
}).await | |
} | |
} | |
struct Incoming<'a> { | |
listener: &'a Listener, | |
accept: Option<Pin<Box<dyn Future<Output = Result<File, String>> + Send + Sync + 'a>>>, | |
} | |
impl Stream for Incoming<'_> { | |
type Item = Result<File, String>; | |
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
loop { | |
println!("Pooling"); | |
if self.accept.is_none() { | |
self.accept = Some(Box::pin(unsafe { self.listener.accept() })); | |
} | |
if let Some(f) = &mut self.accept { | |
let res = match f.as_mut().poll(cx) { | |
Poll::Ready(t) => t, | |
Poll::Pending => { | |
// Spawn task | |
println!("Future should be added to pool!"); | |
return Poll::Pending | |
}, | |
}; | |
self.accept = None; | |
return Poll::Ready(Some(res)); | |
} | |
} | |
} | |
} | |
impl StreamExt for Incoming<'_> {} | |
pub trait StreamExt: Stream { | |
fn next(&mut self) -> Next<'_, Self> | |
where | |
Self: Unpin, | |
{ | |
Next { stream: self } | |
} | |
} | |
impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {} | |
pub struct Next<'a, T: Unpin + ?Sized> { | |
pub(crate) stream: &'a mut T, | |
} | |
impl<T: Stream + Unpin + ?Sized> Future for Next<'_, T> { | |
type Output = Option<T::Item>; | |
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
Pin::new(&mut *self.stream).poll_next(cx) | |
} | |
} | |
pub struct Waker; | |
impl Wake for Waker { | |
fn wake(self: Arc<Self>) { | |
NOTIFY.with(|f| *f.borrow_mut() = true) | |
} | |
} | |
// async fn server() -> io::Result<()> { | |
// let file = unsafe { File::from_raw_fd(0) }; | |
// let afd = AsyncFd { file }; | |
// let incoming = afd.incoming(); | |
// while let Some(stream) = incoming.next().await { | |
// let stream = stream?; | |
// println!("Accepting from: {}", stream.peer_addr()?); | |
// let _handle = task::spawn(client(stream)); | |
// } | |
// Ok(()) | |
// } | |
// struct AsyncFd { | |
// file: Async<File> | |
// } | |
// impl AsyncFd { | |
// pub async fn accept(&self) -> io::Result<(AsyncFd, SocketAddr)> { | |
// let (stream, addr) = self.file.read_with(|io| AsyncFd { file: }).await?; | |
// //let (stream, addr) = self.watcher.accept().await?; | |
// let stream = TcpStream { | |
// watcher: Arc::new(stream), | |
// }; | |
// Ok((stream, addr)) | |
// } | |
// pub fn incoming(&self) -> Incoming<'_> { | |
// Incoming { | |
// listener: self, | |
// accept: None, | |
// } | |
// } | |
// } | |
// struct Incoming<'a> { | |
// listener: &'a AsyncFd, | |
// accept: Option< | |
// Pin<Box<dyn Future<Output = io::Result<()>> + Send + Sync + 'a>>, | |
// >, | |
// } | |
// impl Stream for Incoming<'_> { | |
// type Item = io::Result<()>; | |
// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
// loop { | |
// if self.accept.is_none() { | |
// self.accept = Some(Box::pin(self.listener.accept())); | |
// } | |
// if let Some(f) = &mut self.accept { | |
// let res = ready!(f.as_mut().poll(cx)); | |
// self.accept = None; | |
// return Poll::Ready(Some(res.map(|stream| stream))); | |
// } | |
// } | |
// } | |
// } | |
// async fn client(stream: AsyncFd) -> Result<()> { | |
// let reader = BufReader::new(&stream); | |
// let mut lines = reader.lines(); | |
// //dbg!(std::str::from_utf8(&lines)); | |
// // let name = match lines.next().await { | |
// // None => std::io::Error::last_os_error(), | |
// // Some(line) => line?, | |
// // }; | |
// println!("name = {:?}", lines); | |
// while let Some(line) = lines.next().await { | |
// let line = line?; | |
// let (dest, msg) = match line.find(':') { | |
// None => continue, | |
// Some(idx) => (&line[..idx], line[idx + 1 ..].trim()), | |
// }; | |
// let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect(); | |
// let msg: String = msg.trim().to_string(); | |
// } | |
// Ok(()) | |
// } | |
#[derive(Debug, Clone, Copy)] | |
pub struct PollFn<F: Sized> { | |
pub f: F, | |
} | |
impl<F> Unpin for PollFn<F> {} | |
impl<T, F> Future for PollFn<F> | |
where | |
F: FnMut(&mut Context<'_>) -> Poll<T>, | |
{ | |
type Output = T; | |
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { | |
(&mut self.f)(cx) | |
} | |
} | |
pub async fn poll_fn<F, T>(f: F) -> T | |
where | |
F: FnMut(&mut Context<'_>) -> Poll<T>, | |
{ | |
let fut = PollFn { f }; | |
fut.await | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment