Skip to content

Instantly share code, notes, and snippets.

@HurricanKai
Last active September 5, 2022 18:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save HurricanKai/95a40808b849fd560251040d5271ba95 to your computer and use it in GitHub Desktop.
Save HurricanKai/95a40808b849fd560251040d5271ba95 to your computer and use it in GitHub Desktop.
Modification of with-lunatic/submillisecond file to accomodate "max_connections"
#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub enum AppMessage {
ConnectionAccepted(lunatic::net::TcpStream),
TcpStreamCompleted(),
TcpListenerClosed(),
}
#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub enum ListenerMessage {
TcpConnectionClosed()
}
fn start_listener(tag: Tag, addr: String, supervisor: Process<AppMessage>, max_connections: u64) -> Process<ListenerMessage> {
Process::spawn_link_tag((addr, supervisor, max_connections), tag, |(addr, supervisor, max_connections), mailbox: Mailbox<ListenerMessage>| {
let listener = TcpListener::bind(addr);
let mut allowed = max_connections;
if let Ok(listener) = listener {
loop {
if allowed > 0 {
if let Ok((stream, _)) = listener.accept() {
supervisor.send(AppMessage::ConnectionAccepted(stream));
allowed -= 1;
}
else {
supervisor.send(AppMessage::TcpListenerClosed());
break
}
}
loop {
match mailbox.receive_timeout(Duration::from_millis(1)) {
lunatic::MailboxResult::Message(msg) => match msg {
ListenerMessage::TcpConnectionClosed() => allowed += 1,
},
lunatic::MailboxResult::TimedOut => break,
_ => unreachable!()
}
}
}
}
})
}
pub fn run<T, Arg, Ret>(handler: T, addr: String, max_connections: u64) -> Process<AppMessage>
where
T: Handler<Arg, Ret> + Copy,
T: Fn<T> + Copy
{
let handler_ref = FuncRef::new(handler);
Process::spawn((handler_ref, addr, max_connections), |(handler_ref, addr, max_connections), mailbox: Mailbox<AppMessage>| {
let mailbox = mailbox.catch_link_failure();
let listener_tag = Tag::new();
let listener_process = start_listener(listener_tag, addr, mailbox.this(), max_connections);
loop {
match mailbox.receive() {
lunatic::MailboxResult::Message(msg) => match msg {
AppMessage::ConnectionAccepted(stream) => {
Process::spawn_link((stream, handler_ref, Some(mailbox.this())), request_supervisor);
},
AppMessage::TcpListenerClosed() => break,
AppMessage::TcpStreamCompleted() => listener_process.send(ListenerMessage::TcpConnectionClosed())
},
lunatic::MailboxResult::LinkDied(tag) => {
if tag == listener_tag {
break
}
listener_process.send(ListenerMessage::TcpConnectionClosed())
},
_ => unreachable!()
}
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment