Skip to content

Instantly share code, notes, and snippets.

@eknowlton
Last active August 24, 2019 18:25
Show Gist options
  • Save eknowlton/fffca26d0f642a9e3f8628beba2fb5d8 to your computer and use it in GitHub Desktop.
Save eknowlton/fffca26d0f642a9e3f8628beba2fb5d8 to your computer and use it in GitHub Desktop.
imap.rs
extern crate config;
extern crate ctrlc;
extern crate native_tls;
extern crate serde;
extern crate threadpool;
mod configuration;
mod imap_client;
use std::sync::mpsc::channel;
use threadpool::ThreadPool;
use std::io::Write;
use std::process::{Command, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use configuration::ConfigObject;
use imap_client::ImapClient;
fn pass_to_ingress(
body: &[u8],
url: &str,
password: &str,
) -> std::io::Result<std::process::Output> {
let mut child = Command::new("bundle")
.env("URL", url)
.env("INGRESS_PASSWORD", password)
.args(&["exec", "rails", "action_mailbox:ingress:imap"])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let mut_stdin = child.stdin.as_mut().unwrap();
mut_stdin.write_all(body)?;
let output = child.wait_with_output()?;
Ok(output)
}
fn main() {
let config = match ConfigObject::new("config/actionmailbox_imap.yml") {
Ok(config) => config,
Err(error) => {
println!("Failed to build configuration.");
println!("Error: {}", error);
std::process::exit(64)
}
};
let client = match ImapClient::new(&config) {
Ok(client) => client,
Err(error) => {
println!("Failed to create IMAP client and connect to server.");
println!("Error: {}", error);
std::process::exit(126);
}
};
println!("Connected to IMAP server and created client.");
let mut session = match client.login() {
Ok(session) => session,
Err((error, _)) => {
println!("Failed logging into the IMAP server. ");
println!("Error: {}", error);
std::process::exit(126);
}
};
println!("Logged into the IMAP server.");
match session.select(config.mailbox()) {
Err(error) => {
println!("A error occured selecting the inbox.");
println!("Error: {}", error);
std::process::exit(126);
}
_ => (),
}
println!("Selected '{}' mailbox.", config.mailbox());
let running = Arc::new(AtomicBool::new(true));
let r = running.clone();
ctrlc::set_handler(move || {
r.store(false, Ordering::SeqCst);
})
.expect("Error listening for SIGTERM.");
loop {
let pool = ThreadPool::new(4);
let (tx, rx) = channel();
let idle = match session.idle() {
Err(error) => {
println!("Failed to send command: IDLE");
println!("IMAP server may not support IDLE command.");
println!("Error: {}", error);
std::process::exit(126);
}
Ok(idle) => idle,
};
println!("Begin listening for activity on IMAP server.");
match idle.wait_keepalive() {
Err(error) => {
println!("Failed to wait and keepalive.");
println!("Error: {}", error);
std::process::exit(126);
}
_ => (),
}
println!("Activity detected.");
println!("Grabbing new messages from mailbox.");
let mut message_ids = match session.search("NOT DELETED NOT SEEN") {
Ok(message_ids) => message_ids,
Err(error) => {
println!("Failed to search for NOT DELETED NOT SEEN messages.");
println!("Error: {}", error);
std::process::exit(126);
}
};
for message_id in message_ids.drain() {
println!("Passing message to ingress: Seq {}", message_id);
let tx = tx.clone();
match session.store(format!("{}", message_id), "+FLAGS (\\Seen)") {
Err(error) => {
println!("Failed to mark message as Seen: Seq {}", message_id);
println!("Error: {}", error);
}
_ => (),
}
// fetch message
let messages = match session.fetch(format!("{}", message_id), "RFC822") {
Ok(messages) => messages,
Err(error) => {
println!("Failed to fetch message: Seq {}", message_id);
println!("Error: {}", error);
continue;
}
};
let message = match messages.iter().next() {
Some(message) => message,
None => {
println!("Failed to fetch message: Seq {}", message_id);
match session.store(format!("{}", message_id), "-flags (\\Seen)") {
Err(error) => {
println!("Failed to fetch message: Seq {}", message_id);
println!("Error: {}", error);
}
_ => (),
}
continue;
}
};
let body = match message.body() {
Some(body) => body,
None => {
println!("Failed to read body or empty body: Seq {}", message_id);
match session.store(format!("{}", message_id), "-FLAGS (\\Seen)") {
Err(error) => {
println!("Error marking message unread: Seq {}", message_id);
println!("Error: {}", error);
}
_ => (),
}
continue;
}
};
let url = match std::env::var("URL") {
Ok(url) => url,
_ => {
println!("Environment variable URL missing. URL is required.");
std::process::exit(64);
}
};
let ingress_password = match std::env::var("INGRESS_PASSWORD") {
Ok(url) => url,
_ => {
println!("Environment variable URL missing. URL is required.");
std::process::exit(64);
}
};
pool.execute(
move || match pass_to_ingress(body, &url[..], &ingress_password[..]) {
Ok(output) => {
let response = match String::from_utf8(output.stdout) {
Ok(response) => response,
Err(_) => String::from("Error reading STDOUT"),
};
println!("Response from ingress command: {}", response);
if output.status.success() {
tx.send(Ok(message_id)).expect("Something here");
return;
}
tx.send(Err(message_id)).expect("Something here");
}
Err(error) => {
println!("Failed to pass to ingress.");
println!("Error: {}", error);
std::process::exit(126);
}
},
);
}
while running.load(Ordering::SeqCst) {
match rx.try_recv() {
Ok(result) => match result {
Ok(message_id) => {
println!("Message successfully passed to ingres. Seq {}", message_id);
match session.store(format!("{}", message_id), "+FLAGS (\\Deleted)") {
Err(error) => {
println!("Error deleting message: Seq {}", message_id);
println!("Error: {}", error);
}
_ => (),
}
match session.expunge() {
Ok(_) => (),
Err(error) => {
println!("Failed to expunge.");
println!("Error: {}", error);
}
}
}
Err(message_id) => {
match session.store(format!("{}", message_id), "-FLAGS (\\Seen)") {
Err(error) => {
println!("Error marking message unread: Seq {}", message_id);
println!("Error: {}", error);
}
_ => (),
}
}
},
_ => (),
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment