Skip to content

Instantly share code, notes, and snippets.

@dignifiedquire
Last active March 15, 2020 21:39
Show Gist options
  • Save dignifiedquire/69608edd6199b1d2ea96c0785d7b0cd5 to your computer and use it in GitHub Desktop.
Save dignifiedquire/69608edd6199b1d2ea96c0785d7b0cd5 to your computer and use it in GitHub Desktop.
jobs
use anyhow::Result;
use async_std::prelude::*;
use async_std::sync::Arc;
use async_std::sync::{channel, Receiver, Sender};
use async_std::task;
use log::info;
use std::time::Duration;
fn setup_logger() {
pretty_env_logger::init();
}
#[derive(Debug, Default)]
struct Context {}
impl Context {
async fn configure(self) -> ConfiguredContext {
ConfiguredContext::default()
}
async fn import(&mut self) {
info!("importing");
}
}
#[derive(Debug, Default, Clone)]
struct ConfiguredContext {}
#[derive(Debug, Clone)]
struct RunningContext {
inner: Arc<InnerContext>,
}
#[derive(Debug)]
struct InnerContext {
inbox: ImapConnection,
smtp: SmtpConnection,
}
#[derive(Debug)]
struct ImapConnection {
/// Channel to notify that the inbox task was finished.
shutdown_receiver: Receiver<()>,
stop_sender: Sender<()>,
idle_interrupt_sender: Sender<()>,
jobs_receiver: Receiver<ImapJob>,
jobs_sender: Sender<ImapJob>,
}
#[derive(Debug)]
struct SmtpConnection {
/// Channel to notify that the smtp task was finished.
shutdown_receiver: Receiver<()>,
stop_sender: Sender<()>,
jobs_receiver: Receiver<SmtpJob>,
jobs_sender: Sender<SmtpJob>,
}
impl SmtpConnection {
async fn new() -> (Self, SmtpConnectionHandlers) {
let (jobs_sender, jobs_receiver) = channel(50);
let (stop_sender, stop_receiver) = channel(1);
let (shutdown_sender, shutdown_receiver) = channel(1);
let handlers = SmtpConnectionHandlers {
connection: Smtp::new().await,
stop_receiver,
shutdown_sender,
};
let conn = SmtpConnection {
shutdown_receiver,
stop_sender,
jobs_sender,
jobs_receiver,
};
(conn, handlers)
}
async fn send_job(&self, job: SmtpJob) {
self.jobs_sender.send(job).await
}
async fn stop(&self) {
self.stop_sender.send(()).await;
self.shutdown_receiver.recv().await;
}
}
#[derive(Debug, Clone)]
enum ImapJob {
Move { from: String, to: String },
}
#[derive(Debug, Clone)]
enum SmtpJob {
SendMessage { msg: String },
}
impl ConfiguredContext {
async fn run(self) -> RunningContext {
RunningContext::new().await
}
async fn export(&self) {
info!("exporting");
}
}
#[derive(Debug)]
struct Imap {
idle_interrupt: Receiver<()>,
}
#[derive(Debug)]
struct Smtp {}
impl Smtp {
async fn new() -> Self {
Smtp {}
}
async fn send(&mut self, msg: &str) {
info!("_smtp_ sending message: {}", msg);
}
}
impl Imap {
async fn new(idle_interrupt: Receiver<()>) -> Self {
Imap { idle_interrupt }
}
async fn fetch(&mut self) {
info!("_imap_ fetching");
}
async fn idle(&mut self) {
use futures::future::FutureExt;
info!("_imap_ idle start");
enum Idle {
Interrupt,
Done,
}
// clear channel to make sure we don't reuse an old interrupt.
if !self.idle_interrupt.is_empty() {
self.idle_interrupt.recv().await;
}
let fut = task::sleep(Duration::from_secs(5))
.map(|_| Idle::Done)
.race(self.idle_interrupt.recv().map(|_| Idle::Interrupt));
match fut.await {
Idle::Interrupt => info!("_imap_ idle interrupted"),
Idle::Done => info!("_imap_ idle done"),
}
}
async fn mov(&mut self, from: &str, to: &str) {
info!("_imap_ move from: '{}' to: '{}'", from, to);
}
}
impl ImapConnection {
async fn new() -> (Self, ImapConnectionHandlers) {
let (jobs_sender, jobs_receiver) = channel(50);
let (stop_sender, stop_receiver) = channel(1);
let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1);
let (shutdown_sender, shutdown_receiver) = channel(1);
let handlers = ImapConnectionHandlers {
connection: Imap::new(idle_interrupt_receiver).await,
stop_receiver,
shutdown_sender,
};
let conn = ImapConnection {
shutdown_receiver,
stop_sender,
idle_interrupt_sender,
jobs_sender,
jobs_receiver,
};
(conn, handlers)
}
async fn send_job(&self, job: ImapJob) {
self.jobs_sender
.send(job)
.join(self.idle_interrupt_sender.send(()))
.await;
}
async fn stop(&self) {
self.stop_sender.send(()).await;
self.shutdown_receiver.recv().await;
}
}
struct ImapConnectionHandlers {
connection: Imap,
stop_receiver: Receiver<()>,
shutdown_sender: Sender<()>,
}
struct SmtpConnectionHandlers {
connection: Smtp,
stop_receiver: Receiver<()>,
shutdown_sender: Sender<()>,
}
impl RunningContext {
async fn new() -> Self {
let (inbox, inbox_handlers) = ImapConnection::new().await;
let (smtp, smtp_handlers) = SmtpConnection::new().await;
let inner = InnerContext { inbox, smtp };
let ctx = RunningContext {
inner: Arc::new(inner),
};
let ctx1 = ctx.clone();
task::spawn(async move {
let ImapConnectionHandlers {
mut connection,
stop_receiver,
shutdown_sender,
} = inbox_handlers;
let fut = async move {
loop {
info!(" loop-imap");
match ctx1
.inner
.inbox
.jobs_receiver
.recv()
.timeout(Duration::from_millis(200))
.await
{
Ok(Some(job)) => {
// execute job
info!(" executing job: {:?}", job);
match job {
ImapJob::Move { from, to } => connection.mov(&from, &to).await,
}
}
Ok(None) | Err(async_std::future::TimeoutError { .. }) => {
// fetch
connection.fetch().await;
// idle
connection.idle().await;
info!(" idle done");
}
}
}
};
info!(" awaiting inbox future");
fut.race(stop_receiver.recv()).await;
info!(" inbox interrupted");
shutdown_sender.send(()).await;
});
let ctx1 = ctx.clone();
task::spawn(async move {
let SmtpConnectionHandlers {
mut connection,
stop_receiver,
shutdown_sender,
} = smtp_handlers;
let fut = async move {
loop {
info!(" loop-smtp");
match ctx1.inner.smtp.jobs_receiver.recv().await {
Some(job) => {
// execute job
info!(" executing job: {:?}", job);
match job {
SmtpJob::SendMessage { msg } => connection.send(&msg).await,
}
}
None => {}
}
}
};
info!(" awaiting smtp future");
fut.race(stop_receiver.recv()).await;
info!(" smtp interrupted");
shutdown_sender.send(()).await;
});
ctx
}
async fn send_inbox_job(&self, job: ImapJob) {
self.inner.inbox.send_job(job).await;
}
async fn send_smtp_job(&self, job: SmtpJob) {
self.inner.smtp.send_job(job).await;
}
async fn send_message(&self, msg: impl AsRef<str>) {
self.send_smtp_job(SmtpJob::SendMessage {
msg: msg.as_ref().to_string(),
})
.await;
}
async fn mov(&self, from: impl AsRef<str>, to: impl AsRef<str>) {
self.send_inbox_job(ImapJob::Move {
from: from.as_ref().to_string(),
to: to.as_ref().to_string(),
})
.await;
}
async fn stop(self) -> ConfiguredContext {
self.inner.inbox.stop().join(self.inner.smtp.stop()).await;
drop(self);
info!("dropped");
ConfiguredContext::default()
}
}
#[async_std::main]
async fn main() -> Result<()> {
setup_logger();
let ctx = Context::default();
let ctx = ctx.configure().await;
info!("running");
let ctx = ctx.run().await;
let ctx1 = ctx.clone();
let mov_task = task::spawn(async move {
ctx1.mov("hello", "world").await;
});
let ctx1 = ctx.clone();
let send_msg_task = task::spawn(async move {
for i in 0..10 {
ctx1.send_message(format!("hello_{}", i)).await;
}
});
info!("sleep start");
task::sleep(Duration::from_secs(1)).await;
info!("sleep end");
ctx.send_message("world").await;
ctx.mov("a", "b").await;
task::sleep(Duration::from_secs(4)).await;
send_msg_task.await;
mov_task.await;
info!("stopping");
let _ctx = ctx.stop().await;
info!("stopped");
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment