Skip to content

Instantly share code, notes, and snippets.

@monorkin
Created April 10, 2019 17:41
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save monorkin/c463f34764ab23af2fd0fb0c19716177 to your computer and use it in GitHub Desktop.
Save monorkin/c463f34764ab23af2fd0fb0c19716177 to your computer and use it in GitHub Desktop.
Registry for Actix SyncArbiter Actors
/// === DESCRIPTION ===
/// This is a naive implementation of a registry for Actix Actors that
/// implement a SyncContext. Using it you can retreive an address for
/// any actor registered with it from any point in your program.
///
/// === LICENSE ===
/// Copyright 2019 Stanko K.R. <hey@stanko.io>
///
/// Permission is hereby granted, free of charge, to any person
/// obtaining a copy of this software and associated documentation
/// files (the "Software"), to deal in the Software without restriction,
/// including without limitation the rights to use, copy, modify, merge,
/// publish, distribute, sublicense, and/or sell copies of the Software,
/// and to permit persons to whom the Software is furnished to do so,
/// subject to the following conditions:
///
/// The above copyright notice and this permission notice shall be
/// included in all copies or substantial portions of the Software.
///
/// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
/// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
/// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
/// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
/// BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
/// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
/// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
/// SOFTWARE.
///
/// === EXAMPLE ===
/// fn main() {
/// let system = actix::System::new("analytics");
///
/// // Redis is an Actor with a SyncContext
/// let addr = SyncArbiter::start(10, || Redis::new("redis://redis").unwrap());
/// // We register Redis with the registry
/// SyncRegistry::set(addr);
/// // To get the address of Redis use the following `SyncRegistry::<Redis>::get()`
///
/// Arbiter::start(|_| DatapointBucket::default());
///
/// server::new(move || {
/// let state = AppState {};
///
/// App::with_state(state).resource("/datapoints", |r| {
/// r.method(http::Method::POST)
/// .with_async(controllers::datapoints::create)
/// })
/// })
/// .bind("0.0.0.0:8080")
/// .unwrap()
/// .start();
///
/// println!("Server started.");
///
/// system.run();
/// }
///
///
/// === CONTENTS OF sync_registry.rs ===
use actix::prelude::*;
use futures::future::Future;
use std::any::{TypeId};
use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
// I used BTreeMap here because, as far as I know, it's the best suited
// for quick insertions and lookups (and is part of the std package. I
// saw that the official `Registry` implimentation uses a `HashMap` with
// a custom hasher, so take this decision with caution :/
// The phanotm data is needed for type checks to pass - since the
// Actor's type is recursively defined.
#[derive(Debug)]
pub struct SyncRegistry<A: Actor<Context = SyncContext<A>> + Send> {
pub registry: BTreeMap<TypeId, Arc<Mutex<Addr<A>>>>,
phantom: PhantomData<A>
}
// The registry is just an actor in the system, the only major difference being
// that it's run as a `SystemService` and it can be supervised. If it's not run as a
// system service this code will deadlock since the same arbiter will be tasked with
// retreiving and waiting for the data from the registry (chicken and the agg problem)
impl<A: Actor<Context = SyncContext<A>> + Send> Actor for SyncRegistry<A> {
type Context = Context<Self>;
}
impl<A: Actor<Context = SyncContext<A>> + Send> actix::Supervised for SyncRegistry<A> {}
impl<A: Actor<Context = SyncContext<A>> + Send> SystemService for SyncRegistry<A> {}
impl<A: Actor<Context = SyncContext<A>> + Send> Default for SyncRegistry<A> {
fn default() -> Self {
SyncRegistry::new()
}
}
// Implements public methods on the registry. This sin't strictly needed
// but it provides a nicer interface than direct calls of send to the
// registry actor.
impl<A: Actor<Context = SyncContext<A>> + Send> SyncRegistry<A> {
pub fn new() -> Self {
SyncRegistry {
registry: BTreeMap::new(),
phantom: PhantomData
}
}
pub fn get() -> Option<Arc<Mutex<Addr<A>>>> {
let registry = SyncRegistry::<A>::from_registry();
let id = TypeId::of::<A>();
registry.send(Get::new(id)).wait().unwrap().unwrap()
}
pub fn set(addr: Addr<A>) {
let registry: Addr<SyncRegistry<A>> = SyncRegistry::from_registry();
let id = TypeId::of::<A>();
registry.do_send(Set::new(id, addr));
}
}
impl<A: Actor<Context = SyncContext<A>> + Send> Handler<Get<A>> for SyncRegistry<A> {
type Result = Result<Option<Arc<Mutex<Addr<A>>>>, ()>;
fn handle(&mut self, msg: Get<A>, _context: &mut Self::Context) -> Self::Result {
let value = self.registry.get(&msg.type_id).to_owned();
match value {
Some(arc) => Ok(Some(Arc::clone(arc))),
_ => Err(())
}
}
}
impl<A: Actor<Context = SyncContext<A>> + Send> Handler<Set<A>> for SyncRegistry<A> {
type Result = Result<bool, ()>;
fn handle(&mut self, msg: Set<A>, _context: &mut Self::Context) -> Self::Result {
let value = self.registry.get(&msg.type_id).to_owned();
match value {
Some(_) => return Ok(false),
_ => ()
};
let result =
self.registry.insert(msg.type_id, msg.addr.clone());
match result {
Some(_) => Ok(false),
_ => Ok(true)
}
}
}
pub struct Get<A: Actor<Context = SyncContext<A>> + Send> {
type_id: TypeId,
phantom: PhantomData<A>
}
impl<A: Actor<Context = SyncContext<A>> + Send> Message for Get<A> {
type Result = Result<Option<Arc<Mutex<Addr<A>>>>, ()>;
}
impl<A: Actor<Context = SyncContext<A>> + Send> Get<A> {
pub fn new(type_id: TypeId) -> Self {
Get {
type_id: type_id,
phantom: PhantomData
}
}
}
pub struct Set<A: Actor<Context = SyncContext<A>> + Send> {
type_id: TypeId,
addr: Arc<Mutex<Addr<A>>>
}
impl<A: Actor<Context = SyncContext<A>> + Send> Message for Set<A> {
type Result = Result<bool, ()>;
}
impl<A: Actor<Context = SyncContext<A>> + Send> Set<A> {
pub fn new(type_id: TypeId, addr: Addr<A>) -> Self {
Set {
type_id: type_id,
addr: Arc::new(Mutex::new(addr))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment