-
-
Save Texlo-Dev/011a5f34556ca50ff7fa4675b56f9446 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use crate::{ | |
errors::Result, | |
cfg::Config, | |
discord::DiscordWorker, | |
}; | |
use byteorder::{LE, ReadBytesExt as _}; | |
use futures::{ | |
compat::Future01CompatExt as _, | |
future::TryFutureExt as _, | |
}; | |
use hyper::{ | |
client::{Client as HyperClient, HttpConnector}, | |
Body, | |
}; | |
use hyper_tls::HttpsConnector; | |
use redis_async::{ | |
client::{self as redis_client, PairedConnection}, | |
resp::{RespValue, FromResp} | |
}; | |
use reqwest::r#async::Client; | |
use std::{ | |
sync::Arc, | |
collections::HashMap, | |
}; | |
use serenity::{ | |
http::Client as SerenityHttpClient, | |
model::event::GatewayEvent, | |
}; | |
pub struct State { | |
config: Arc<Config>, | |
http: Arc<Client>, | |
redis: Arc<PairedConnection>, | |
serenity: Arc<SerenityHttpClient> | |
} | |
pub struct Worker { | |
discord: DiscordWorker, | |
redis_pop: PairedConnection, | |
} | |
impl Worker { | |
pub async fn new(cfg: Config) -> Result<Self> { | |
debug!("Beginning connection to Redis."); | |
let redis_addr = cfg.parse_redis()?; | |
let redis = Arc::new(await!(redis_client::paired_connect(&redis_addr).compat())?); // This connection is successful. | |
debug!("Making second connection to Redis."); | |
let redis2 = await!(redis_client::paired_connect(&redis_addr).compat())?; // This also connects, but the connection is lost. | |
let config = Arc::new(cfg); | |
let hyper = Arc::new(HyperClient::builder().build(HttpsConnector::new(4)?)); | |
let http = Arc::new(Client::builder().use_rustls_tls().build()?); | |
let serenity = Arc::new( | |
SerenityHttpClient::new(Arc::clone(&hyper), Arc::new(config.discord_token.clone()))?, | |
); | |
let state = Arc::new(State { | |
config, | |
http, | |
redis, | |
serenity | |
}); | |
let discord = DiscordWorker::new(Arc::clone(&state)); | |
Ok(Self { | |
redis_pop: redis2, | |
discord | |
}) | |
} | |
pub async fn run(&self) -> Result<()> { | |
loop { | |
let (event, shard_id) = match await!(self.recv()) { | |
Ok(v) => v, | |
Err(r) => { | |
warn!("There was an error while attempting to receive a redis event. {:?}", r); // FAILS to receive, throws error here. | |
continue; | |
}, | |
}; | |
trace!("Sending event to Discord Dispatcher."); | |
self.discord.dispatch(event, shard_id); | |
trace!("Event has been send to the Discord handler."); | |
} | |
} | |
async fn blpop(&self) -> Result<Vec<RespValue>> { | |
let array = resp_array!["BLPOP", "sharder:from", 0]; | |
await!(self.redis_pop.send(array).compat()).map_err(From::from) // FAILS to send payload. | |
} | |
async fn recv(&self) -> Result<(GatewayEvent, u64)> { | |
let mut parts = await!(self.blpop())?; | |
let part = if parts.len() == 2 { | |
parts.remove(1) | |
} else { | |
warn!("blpop result part count != 2: {:?}", parts); | |
None?; | |
unreachable!(); | |
}; | |
let mut message: Vec<u8> = match FromResp::from_resp(part) { | |
Ok(msg) => msg, | |
Err(why) => { | |
warn!("Err parsing part to bytes: {:?}", why); | |
None?; | |
unreachable!(); | |
}, | |
}; | |
let message_len = message.len(); | |
let shard_id = { | |
let mut shard_bytes = &message[message_len - 2..]; | |
shard_bytes.read_u16::<LE>()? as u64 | |
}; | |
message.truncate(message_len - 2); | |
let event = serde_json::from_slice(&message)?; | |
trace!("Got event: {:?}", event); | |
Ok((event, shard_id)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment