Created
May 21, 2017 09:32
-
-
Save daboross/414224a0422dc3d5e2e6f3f72c950ead to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate redis; | |
extern crate serde_json; | |
extern crate futures; | |
extern crate futures_cpupool; | |
use redis::Commands; | |
use std::collections::HashMap; | |
use std::thread; | |
use std::cell::Cell; | |
use futures::sync::mpsc::channel; | |
use futures::Sink; | |
use futures::Stream; | |
use futures::{BoxFuture, Future, future}; | |
use futures_cpupool::CpuPool; | |
use std::sync::{Arc, Mutex, TryLockError}; | |
use std::sync::PoisonError; | |
use std::marker::Sync; | |
#[derive(Debug, Clone)] | |
struct Message { | |
channel_name: String, | |
payload: String, | |
} | |
struct RedisPubSub { | |
thread_pool: Mutex<CpuPool>, | |
connection: Arc<Mutex<redis::Connection>>, | |
pub_sub: Arc<Mutex<redis::PubSub>>, | |
current_subscription_id: Arc<Mutex<Cell<u32>>>, | |
active_subscriptions: HashMap<u32, Arc<Fn(&str) + Sync + Send>>, | |
subscriptions_refs: Arc<Mutex<HashMap<String, Vec<u32>>>>, | |
} | |
impl RedisPubSub { | |
fn new(connection_string: &str) -> Result<RedisPubSub, redis::RedisError> { | |
let client: redis::Client = redis::Client::open(connection_string) | |
.expect(format!("Could not connect to redis with connection string: {}", | |
connection_string) | |
.as_str()); | |
client.get_pubsub() | |
.and_then(|pub_sub| { | |
client.get_connection() | |
.and_then(|connection| { | |
Ok(RedisPubSub { | |
thread_pool: Mutex::new(CpuPool::new(10)), | |
connection: Arc::new(Mutex::new(connection)), | |
pub_sub: Arc::new(Mutex::new(pub_sub)), | |
current_subscription_id: Arc::new(Mutex::new(Cell::new(0))), | |
active_subscriptions: HashMap::new(), | |
subscriptions_refs: Arc::new(Mutex::new(HashMap::new())), | |
}) | |
}) | |
}) | |
} | |
fn subscribe<F: 'static>(&mut self, channel: &str, on_message: F) -> BoxFuture<u32, String> | |
where F: Fn(&str) -> () + Sync + Send | |
{ | |
let id: u32 = self.current_subscription_id | |
.lock() | |
.map(|mutex| mutex.get() + 1) | |
.unwrap(); | |
self.current_subscription_id | |
.lock() | |
.map(|mutex| mutex.set(id)) | |
.unwrap(); | |
self.active_subscriptions | |
.insert(id, Arc::new(on_message)); | |
let mut subs = self.subscriptions_refs.lock().unwrap(); | |
if let Some(_) = subs.get(channel) { | |
self.subscriptions_refs | |
.lock() | |
.and_then(|mut subscriptions| { | |
subscriptions.get_mut(channel) | |
.and_then(|result| { | |
result.insert(0, id); | |
Some(result) | |
}); | |
Ok(subscriptions) | |
}); | |
} else { | |
let _ = self.pub_sub | |
.lock() | |
.map(|mut pub_sub| { | |
println!("Subscribing: {:?}", channel); | |
pub_sub.subscribe(channel) | |
}); | |
} | |
let pool = match self.thread_pool.try_lock() { | |
Ok(p) => p, | |
Err(e) => { | |
return future::err("Unavailable".to_string()).boxed(); | |
} | |
}; | |
let pub_sub = self.pub_sub.clone(); | |
let refs = self.subscriptions_refs.clone(); | |
let messages_future = pool.spawn_fn(move || { | |
let redis_msg = pub_sub.lock().unwrap().get_message().unwrap(); | |
let payload: String = redis_msg.get_payload().expect("No payload"); | |
println!("payload in thread: {:?}", payload); | |
Ok(Message { | |
payload: payload, | |
channel_name: redis_msg.get_channel_name().to_string(), | |
}) | |
}); | |
let active_subscriptions = self.active_subscriptions.clone(); | |
let channel: String = channel.to_string(); | |
messages_future.map(move |redis_msg| { | |
let subscriptions = refs.lock().unwrap(); | |
let subscription = subscriptions.get(&channel).unwrap(); | |
for subscription_id in subscription { | |
if redis_msg.channel_name == channel { | |
println!("subscription_id in map and msg {:?}", subscription_id); | |
println!("msg: {:?}", redis_msg); | |
active_subscriptions.get(&subscription_id) | |
.map(|on_message_closure| on_message_closure(&redis_msg.payload)); | |
} | |
} | |
subscription[0] | |
// refs.lock() | |
// .map(|subscriptions| { | |
// subscriptions.get(&channel) | |
// .map(|subscription| { | |
// subscription.iter() | |
// .cloned() | |
// .map(|subscription_id| { | |
// if redis_msg.channel_name == channel { | |
// println!("subscription_id in map and msg {:?}", subscription_id); | |
// println!("msg: {:?}", redis_msg); | |
// active_subscriptions.get(&subscription_id) | |
// .map(|on_message_closure| on_message_closure(&redis_msg.payload)); | |
// }; | |
// subscription_id | |
// }) | |
// .collect::<Vec<_>>() | |
// }) | |
// // Ok(subscriptions) | |
// }) | |
// .unwrap() | |
// .unwrap() | |
// .first() | |
// .map(|x| *x) | |
// .unwrap() | |
}) | |
.boxed() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment