Skip to content

Instantly share code, notes, and snippets.

@aep
Created October 29, 2018 15:55
Show Gist options
  • Save aep/b5f6e2172108e8744eb590e2e4940a47 to your computer and use it in GitHub Desktop.
Save aep/b5f6e2172108e8744eb590e2e4940a47 to your computer and use it in GitHub Desktop.
use carrier::*;
use futures::{Future, Sink, Stream};
use failure::Error;
use diesel::{self, RunQueryDsl};
use std::env;
use std::time::{SystemTime};
use std::sync::Mutex;
use futures::sync::mpsc;
use std::mem;
use std::thread;
use tokio;
use futures;
use std::io;
use super::{
DB,
models,
schema,
};
lazy_static! {
static ref STOP: Mutex<Vec<mpsc::Sender<()>>> = Mutex::new(Vec::new());
}
pub fn restart() {
let (tx, rx) = mpsc::channel(10);
{
let mut stop = STOP.lock().unwrap();
for mut stop in stop.drain(..) {
stop.start_send(()).ok();
}
stop.push(tx);
}
let secrets = keystore::Secrets::load().unwrap();
thread::spawn(||{
tokio::run(futures::lazy(move || {
use schema::shadows::dsl::*;
use models::*;
use diesel::RunQueryDsl;
let db = DB.get().unwrap();
let r = shadows.load::<Shadow>(&*db).unwrap();
let mut fs : Vec<Box<Future<Item=(), Error=()> + Send>> = Vec::new();
for shadow in r {
if !shadow.enabled {
continue;
}
let shadow_ = shadow.clone();
let f = subscribe(secrets.identity.clone(), shadow.identity.parse().unwrap()).map_err(|e| error!("{}", e));
fs.push(Box::new(f));
}
fs.push(Box::new(rx.into_future()
.then(|_|{
info!("subscriber worker ends");
Ok(())
})
));
futures::future::select_all(fs)
.then(|_|Ok(()))
}));
});
}
fn update_device(identity: String, shadow: String, available_now: bool) -> Result<(), Error> {
let db = DB.get().expect("database failure");
let new = models::Publisher {
identity: identity,
shadow: shadow,
available_now: available_now,
available_change_at: SystemTime::now(),
last_seen_at: SystemTime::now(),
};
diesel::insert_into(schema::publishers::table)
.values(&new)
.on_conflict((schema::publishers::identity, schema::publishers::shadow))
.do_update()
.set(&new)
.execute(&*db)
?;
Ok(())
}
fn subscribe(
secret: identity::Secret,
shadow: identity::Address,
) -> impl Future<Item = (), Error = Error> {
let domain = env::var("CARRIER_BROKER_DOMAIN").unwrap_or("2.carrier.devguard.io".to_string());
connect::connect(domain, secret.clone()).and_then(move |(ep, mut brk, sock, addr)| {
brk.message("/carrier.broker.v1/broker/subscribe")
.unwrap()
.send(proto::SubscribeRequest {
shadow: shadow.as_bytes().to_vec(),
filter: Vec::new(),
}).flatten_stream()
.for_each(move |m: proto::SubscribeChange| {
let shadow = shadow.clone();
match m.m {
Some(proto::subscribe_change::M::Publish(m)) => {
let identity = identity::Identity::from_bytes(&m.identity).expect("decoding identity");
info!("publish: {}", identity);
update_device(identity.to_string(), shadow.to_string(), true)?;
let (stop_tx, stop_rx) = mpsc::channel(10);
STOP.lock().unwrap().push(stop_tx);
let ft = subscriber::connect(identity.clone(), ep.clone(), &mut brk, sock.try_clone().unwrap(), addr.clone(), secret.clone())
.and_then(move |mut channel| {
super::system_stats::spawn(channel, identity.clone(), shadow.clone())
});
let ft = ft.select(stop_rx.into_future().then(|_|Ok(()))).then(|_|Ok(()));
tokio::spawn(ft);
},
Some(proto::subscribe_change::M::Unpublish(m)) => {
let identity = identity::Identity::from_bytes(&m.identity).expect("decoding identity");
info!("unpublish: {}", identity);
update_device(identity.to_string(), shadow.to_string(), false)?;
},
Some(proto::subscribe_change::M::Supersede(_)) => {
},
None => (),
}
Ok(())
})
.and_then(|_| {
Ok(())
})
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment