Skip to content

Instantly share code, notes, and snippets.

@anaisbetts
Created December 7, 2016 18:40
Show Gist options
  • Save anaisbetts/4c3fd91dbec5ecc436962f88a974142c to your computer and use it in GitHub Desktop.
Save anaisbetts/4c3fd91dbec5ecc436962f88a974142c to your computer and use it in GitHub Desktop.
Rx Subscription in Rust
use std::cell::RefCell;
use std::collections::LinkedList;
use std::sync::atomic::*;
pub trait Subscription {
fn unsubscribe(&mut self);
fn closed(&self) -> bool;
}
/*
* EmptySubscription
*/
struct EmptySubscription {}
impl Subscription for EmptySubscription {
fn unsubscribe(&mut self) {}
fn closed(&self) -> bool { true }
}
static EMPTY_SUB: EmptySubscription = EmptySubscription {};
impl Subscription {
fn empty() -> &'static EmptySubscription {
return &EMPTY_SUB;
}
}
#[test]
fn empty_subscription_should_always_be_closed() {
assert_eq!(true, Subscription::empty().closed());
}
/*
* AnonymousSubscription
*/
pub struct AnonymousSubscription<'a> {
is_unsubscribed: AtomicBool,
fun: Box<FnMut() + 'a>
}
impl<'a> AnonymousSubscription<'a> {
fn new<F : FnMut() + 'a>(f: F) -> AnonymousSubscription<'a> {
return AnonymousSubscription { fun: Box::new(f), is_unsubscribed: Default::default() };
}
}
impl<'a> Subscription for AnonymousSubscription<'a> {
fn unsubscribe(&mut self) {
let result = self.is_unsubscribed.compare_exchange(false, true,
Ordering::Acquire,
Ordering::Relaxed);
if result == Err(true) {
return;
}
(self.fun)();
}
fn closed(&self) -> bool {
return self.is_unsubscribed.load(Ordering::Relaxed);
}
}
#[test]
fn anon_subscription_should_unsubscribe_exactly_once() {
let dispose_count = RefCell::new(0);
let mut f = AnonymousSubscription::new(|| {
let mut val = dispose_count.borrow_mut();
*val += 1;
});
assert_eq!(0, *dispose_count.borrow());
f.unsubscribe();
assert_eq!(1, *dispose_count.borrow());
f.unsubscribe();
assert_eq!(1, *dispose_count.borrow());
}
#[test]
fn anon_subscription_should_report_closed() {
let mut f = AnonymousSubscription::new(|| {});
assert_eq!(false, f.closed());
f.unsubscribe();
assert_eq!(true, f.closed());
f.unsubscribe();
assert_eq!(true, f.closed());
}
/*
* CompositeSubscription
*/
pub struct CompositeSubscription<'a> {
is_unsubscribed: AtomicBool,
subs: LinkedList<&'a mut Subscription>
}
impl<'a> CompositeSubscription<'a> {
fn new() -> CompositeSubscription<'a> {
return CompositeSubscription { is_unsubscribed: AtomicBool::new(false), subs: LinkedList::new() };
}
fn add(&mut self, s: &'a mut Subscription) {
self.subs.push_back(s);
}
}
impl<'a> Subscription for CompositeSubscription<'a> {
fn unsubscribe(&mut self) {
let result = self.is_unsubscribed.compare_exchange(false, true,
Ordering::Acquire,
Ordering::Relaxed);
if result == Err(true) {
return;
}
loop {
if let Some(x) = self.subs.pop_back() {
x.unsubscribe();
} else {
break;
}
}
/*
for sub in self.subs.iter_mut() {
sub.unsubscribe();
}
*/
self.subs.clear();
}
fn closed(&self) -> bool {
return self.is_unsubscribed.load(Ordering::Relaxed);
}
}
#[test]
fn composite_subscription_should_unsubscribe_exactly_once() {
let dispose_count = RefCell::new(0);
let mut f1 = AnonymousSubscription::new(|| {
let mut val = dispose_count.borrow_mut();
*val += 1;
});
let mut f2 = AnonymousSubscription::new(|| {
let mut val = dispose_count.borrow_mut();
*val += 1;
});
assert_eq!(0, *dispose_count.borrow());
let mut f = CompositeSubscription::new();
f.add(&mut f1);
f.add(&mut f2);
assert_eq!(0, *dispose_count.borrow());
f.unsubscribe();
assert_eq!(2, *dispose_count.borrow());
f.unsubscribe();
assert_eq!(2, *dispose_count.borrow());
}
#[test]
fn composite_subscription_should_unsubscribe_exactly_once_no_children() {
let dispose_count = RefCell::new(0);
assert_eq!(0, *dispose_count.borrow());
let mut f = CompositeSubscription::new();
assert_eq!(0, *dispose_count.borrow());
f.unsubscribe();
f.unsubscribe();
}
#[test]
fn composite_subscription_should_report_closed() {
let mut f = CompositeSubscription::new();
assert_eq!(false, f.closed());
f.unsubscribe();
assert_eq!(true, f.closed());
f.unsubscribe();
assert_eq!(true, f.closed());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment