Skip to content

Instantly share code, notes, and snippets.

@daboross
Created May 21, 2017 09:32
Show Gist options
  • Save daboross/414224a0422dc3d5e2e6f3f72c950ead to your computer and use it in GitHub Desktop.
Save daboross/414224a0422dc3d5e2e6f3f72c950ead to your computer and use it in GitHub Desktop.
extern crate redis;
extern crate serde_json;
extern crate futures;
extern crate futures_cpupool;
use redis::Commands;
use std::collections::HashMap;
use std::thread;
use std::cell::Cell;
use futures::sync::mpsc::channel;
use futures::Sink;
use futures::Stream;
use futures::{BoxFuture, Future, future};
use futures_cpupool::CpuPool;
use std::sync::{Arc, Mutex, TryLockError};
use std::sync::PoisonError;
use std::marker::Sync;
#[derive(Debug, Clone)]
struct Message {
channel_name: String,
payload: String,
}
struct RedisPubSub {
thread_pool: Mutex<CpuPool>,
connection: Arc<Mutex<redis::Connection>>,
pub_sub: Arc<Mutex<redis::PubSub>>,
current_subscription_id: Arc<Mutex<Cell<u32>>>,
active_subscriptions: HashMap<u32, Arc<Fn(&str) + Sync + Send>>,
subscriptions_refs: Arc<Mutex<HashMap<String, Vec<u32>>>>,
}
impl RedisPubSub {
fn new(connection_string: &str) -> Result<RedisPubSub, redis::RedisError> {
let client: redis::Client = redis::Client::open(connection_string)
.expect(format!("Could not connect to redis with connection string: {}",
connection_string)
.as_str());
client.get_pubsub()
.and_then(|pub_sub| {
client.get_connection()
.and_then(|connection| {
Ok(RedisPubSub {
thread_pool: Mutex::new(CpuPool::new(10)),
connection: Arc::new(Mutex::new(connection)),
pub_sub: Arc::new(Mutex::new(pub_sub)),
current_subscription_id: Arc::new(Mutex::new(Cell::new(0))),
active_subscriptions: HashMap::new(),
subscriptions_refs: Arc::new(Mutex::new(HashMap::new())),
})
})
})
}
fn subscribe<F: 'static>(&mut self, channel: &str, on_message: F) -> BoxFuture<u32, String>
where F: Fn(&str) -> () + Sync + Send
{
let id: u32 = self.current_subscription_id
.lock()
.map(|mutex| mutex.get() + 1)
.unwrap();
self.current_subscription_id
.lock()
.map(|mutex| mutex.set(id))
.unwrap();
self.active_subscriptions
.insert(id, Arc::new(on_message));
let mut subs = self.subscriptions_refs.lock().unwrap();
if let Some(_) = subs.get(channel) {
self.subscriptions_refs
.lock()
.and_then(|mut subscriptions| {
subscriptions.get_mut(channel)
.and_then(|result| {
result.insert(0, id);
Some(result)
});
Ok(subscriptions)
});
} else {
let _ = self.pub_sub
.lock()
.map(|mut pub_sub| {
println!("Subscribing: {:?}", channel);
pub_sub.subscribe(channel)
});
}
let pool = match self.thread_pool.try_lock() {
Ok(p) => p,
Err(e) => {
return future::err("Unavailable".to_string()).boxed();
}
};
let pub_sub = self.pub_sub.clone();
let refs = self.subscriptions_refs.clone();
let messages_future = pool.spawn_fn(move || {
let redis_msg = pub_sub.lock().unwrap().get_message().unwrap();
let payload: String = redis_msg.get_payload().expect("No payload");
println!("payload in thread: {:?}", payload);
Ok(Message {
payload: payload,
channel_name: redis_msg.get_channel_name().to_string(),
})
});
let active_subscriptions = self.active_subscriptions.clone();
let channel: String = channel.to_string();
messages_future.map(move |redis_msg| {
let subscriptions = refs.lock().unwrap();
let subscription = subscriptions.get(&channel).unwrap();
for subscription_id in subscription {
if redis_msg.channel_name == channel {
println!("subscription_id in map and msg {:?}", subscription_id);
println!("msg: {:?}", redis_msg);
active_subscriptions.get(&subscription_id)
.map(|on_message_closure| on_message_closure(&redis_msg.payload));
}
}
subscription[0]
// refs.lock()
// .map(|subscriptions| {
// subscriptions.get(&channel)
// .map(|subscription| {
// subscription.iter()
// .cloned()
// .map(|subscription_id| {
// if redis_msg.channel_name == channel {
// println!("subscription_id in map and msg {:?}", subscription_id);
// println!("msg: {:?}", redis_msg);
// active_subscriptions.get(&subscription_id)
// .map(|on_message_closure| on_message_closure(&redis_msg.payload));
// };
// subscription_id
// })
// .collect::<Vec<_>>()
// })
// // Ok(subscriptions)
// })
// .unwrap()
// .unwrap()
// .first()
// .map(|x| *x)
// .unwrap()
})
.boxed()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment