Skip to content

Instantly share code, notes, and snippets.

@Texlo-Dev
Last active February 19, 2019 00:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Texlo-Dev/011a5f34556ca50ff7fa4675b56f9446 to your computer and use it in GitHub Desktop.
Save Texlo-Dev/011a5f34556ca50ff7fa4675b56f9446 to your computer and use it in GitHub Desktop.
use crate::{
errors::Result,
cfg::Config,
discord::DiscordWorker,
};
use byteorder::{LE, ReadBytesExt as _};
use futures::{
compat::Future01CompatExt as _,
future::TryFutureExt as _,
};
use hyper::{
client::{Client as HyperClient, HttpConnector},
Body,
};
use hyper_tls::HttpsConnector;
use redis_async::{
client::{self as redis_client, PairedConnection},
resp::{RespValue, FromResp}
};
use reqwest::r#async::Client;
use std::{
sync::Arc,
collections::HashMap,
};
use serenity::{
http::Client as SerenityHttpClient,
model::event::GatewayEvent,
};
pub struct State {
config: Arc<Config>,
http: Arc<Client>,
redis: Arc<PairedConnection>,
serenity: Arc<SerenityHttpClient>
}
pub struct Worker {
discord: DiscordWorker,
redis_pop: PairedConnection,
}
impl Worker {
pub async fn new(cfg: Config) -> Result<Self> {
debug!("Beginning connection to Redis.");
let redis_addr = cfg.parse_redis()?;
let redis = Arc::new(await!(redis_client::paired_connect(&redis_addr).compat())?); // This connection is successful.
debug!("Making second connection to Redis.");
let redis2 = await!(redis_client::paired_connect(&redis_addr).compat())?; // This also connects, but the connection is lost.
let config = Arc::new(cfg);
let hyper = Arc::new(HyperClient::builder().build(HttpsConnector::new(4)?));
let http = Arc::new(Client::builder().use_rustls_tls().build()?);
let serenity = Arc::new(
SerenityHttpClient::new(Arc::clone(&hyper), Arc::new(config.discord_token.clone()))?,
);
let state = Arc::new(State {
config,
http,
redis,
serenity
});
let discord = DiscordWorker::new(Arc::clone(&state));
Ok(Self {
redis_pop: redis2,
discord
})
}
pub async fn run(&self) -> Result<()> {
loop {
let (event, shard_id) = match await!(self.recv()) {
Ok(v) => v,
Err(r) => {
warn!("There was an error while attempting to receive a redis event. {:?}", r); // FAILS to receive, throws error here.
continue;
},
};
trace!("Sending event to Discord Dispatcher.");
self.discord.dispatch(event, shard_id);
trace!("Event has been send to the Discord handler.");
}
}
async fn blpop(&self) -> Result<Vec<RespValue>> {
let array = resp_array!["BLPOP", "sharder:from", 0];
await!(self.redis_pop.send(array).compat()).map_err(From::from) // FAILS to send payload.
}
async fn recv(&self) -> Result<(GatewayEvent, u64)> {
let mut parts = await!(self.blpop())?;
let part = if parts.len() == 2 {
parts.remove(1)
} else {
warn!("blpop result part count != 2: {:?}", parts);
None?;
unreachable!();
};
let mut message: Vec<u8> = match FromResp::from_resp(part) {
Ok(msg) => msg,
Err(why) => {
warn!("Err parsing part to bytes: {:?}", why);
None?;
unreachable!();
},
};
let message_len = message.len();
let shard_id = {
let mut shard_bytes = &message[message_len - 2..];
shard_bytes.read_u16::<LE>()? as u64
};
message.truncate(message_len - 2);
let event = serde_json::from_slice(&message)?;
trace!("Got event: {:?}", event);
Ok((event, shard_id))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment