Skip to content

Instantly share code, notes, and snippets.

@kavanmevada
Created August 24, 2021 11:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kavanmevada/453c53c1c7f9a9a3a663bd16aa371673 to your computer and use it in GitHub Desktop.
Save kavanmevada/453c53c1c7f9a9a3a663bd16aa371673 to your computer and use it in GitHub Desktop.
epoll future
#![feature(async_stream)]
use std::future::Future;
use std::mem::size_of;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::os::unix::io::FromRawFd;
use std::ptr::read;
use std::str::FromStr;
use std::{
cell::RefCell,
fs::File,
io,
pin::Pin,
stream::Stream,
sync::Arc,
task::{Context, Poll, Wake},
};
//use futures::StreamExt;
// use std::{cell::RefCell, future::Future, net::SocketAddr, os::unix::prelude::{AsRawFd, FromRawFd, RawFd}, pin::Pin, sync::Arc, task::{Context, Poll, Wake}};
// use async_io::Async;
// use futures::{Stream, lock::Mutex, ready};
// use async_std::{fs::File, io::{self, BufReader}, task};
// //use async_std::net::{TcpStream};
mod main_async;
mod sys;
thread_local!(static NOTIFY: RefCell<bool> = RefCell::new(true));
thread_local!(static POOL: RefCell<[Option<Pin<Box<dyn Future<Output = ()>>>>; 10]> = RefCell::default());
fn main() {
block_on(server()).expect("Error creating server!");
}
pub fn block_on<F, T>(future: F) -> T
where
F: Future<Output = T>,
{
let mut pin = Box::pin(future);
NOTIFY.with(|n| loop {
println!("Loop goes insane!");
if *n.borrow() {
*n.borrow_mut() = false;
let waker = std::task::Waker::from(Arc::from(Waker));
let mut cx = Context::from_waker(&waker);
let mutable = pin.as_mut().poll(&mut cx);
if let Poll::Ready(val) = mutable {
return val;
}
}
})
}
pub async fn run<T>(future: impl Future<Output = T>) -> T {
future.await
}
async fn server() -> Result<(), String> {
let listener = unsafe { Listener::new("127.0.0.1:8080")? };
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
dbg!(stream);
}
println!("Kavan!");
Ok(())
}
#[derive(Debug)]
struct Listener(i32);
impl Listener {
pub unsafe fn new(addr: &str) -> Result<Self, String> {
let s = sys::Socket::from_str(addr)?;
Ok(Self(s.0))
}
pub fn incoming(&self) -> Incoming {
Incoming {
listener: self,
accept: None,
}
}
pub async unsafe fn accept(&self) -> Result<File, String> {
poll_fn(|_| {
match sys::Socket(self.0).accept() {
Ok(sock) => {
let file = File::from_raw_fd(sock);
Poll::Ready(Ok(file))
},
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
Poll::Pending
} else {
Err(e.to_string())?
}
}
}
}).await
}
}
struct Incoming<'a> {
listener: &'a Listener,
accept: Option<Pin<Box<dyn Future<Output = Result<File, String>> + Send + Sync + 'a>>>,
}
impl Stream for Incoming<'_> {
type Item = Result<File, String>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
println!("Pooling");
if self.accept.is_none() {
self.accept = Some(Box::pin(unsafe { self.listener.accept() }));
}
if let Some(f) = &mut self.accept {
let res = match f.as_mut().poll(cx) {
Poll::Ready(t) => t,
Poll::Pending => {
// Spawn task
println!("Future should be added to pool!");
return Poll::Pending
},
};
self.accept = None;
return Poll::Ready(Some(res));
}
}
}
}
impl StreamExt for Incoming<'_> {}
pub trait StreamExt: Stream {
fn next(&mut self) -> Next<'_, Self>
where
Self: Unpin,
{
Next { stream: self }
}
}
impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {}
pub struct Next<'a, T: Unpin + ?Sized> {
pub(crate) stream: &'a mut T,
}
impl<T: Stream + Unpin + ?Sized> Future for Next<'_, T> {
type Output = Option<T::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut *self.stream).poll_next(cx)
}
}
pub struct Waker;
impl Wake for Waker {
fn wake(self: Arc<Self>) {
NOTIFY.with(|f| *f.borrow_mut() = true)
}
}
// async fn server() -> io::Result<()> {
// let file = unsafe { File::from_raw_fd(0) };
// let afd = AsyncFd { file };
// let incoming = afd.incoming();
// while let Some(stream) = incoming.next().await {
// let stream = stream?;
// println!("Accepting from: {}", stream.peer_addr()?);
// let _handle = task::spawn(client(stream));
// }
// Ok(())
// }
// struct AsyncFd {
// file: Async<File>
// }
// impl AsyncFd {
// pub async fn accept(&self) -> io::Result<(AsyncFd, SocketAddr)> {
// let (stream, addr) = self.file.read_with(|io| AsyncFd { file: }).await?;
// //let (stream, addr) = self.watcher.accept().await?;
// let stream = TcpStream {
// watcher: Arc::new(stream),
// };
// Ok((stream, addr))
// }
// pub fn incoming(&self) -> Incoming<'_> {
// Incoming {
// listener: self,
// accept: None,
// }
// }
// }
// struct Incoming<'a> {
// listener: &'a AsyncFd,
// accept: Option<
// Pin<Box<dyn Future<Output = io::Result<()>> + Send + Sync + 'a>>,
// >,
// }
// impl Stream for Incoming<'_> {
// type Item = io::Result<()>;
// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// loop {
// if self.accept.is_none() {
// self.accept = Some(Box::pin(self.listener.accept()));
// }
// if let Some(f) = &mut self.accept {
// let res = ready!(f.as_mut().poll(cx));
// self.accept = None;
// return Poll::Ready(Some(res.map(|stream| stream)));
// }
// }
// }
// }
// async fn client(stream: AsyncFd) -> Result<()> {
// let reader = BufReader::new(&stream);
// let mut lines = reader.lines();
// //dbg!(std::str::from_utf8(&lines));
// // let name = match lines.next().await {
// // None => std::io::Error::last_os_error(),
// // Some(line) => line?,
// // };
// println!("name = {:?}", lines);
// while let Some(line) = lines.next().await {
// let line = line?;
// let (dest, msg) = match line.find(':') {
// None => continue,
// Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
// };
// let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
// let msg: String = msg.trim().to_string();
// }
// Ok(())
// }
#[derive(Debug, Clone, Copy)]
pub struct PollFn<F: Sized> {
pub f: F,
}
impl<F> Unpin for PollFn<F> {}
impl<T, F> Future for PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
(&mut self.f)(cx)
}
}
pub async fn poll_fn<F, T>(f: F) -> T
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
let fut = PollFn { f };
fut.await
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment