Skip to content

Instantly share code, notes, and snippets.

@pardom-zz
Created June 8, 2016 13:13
Show Gist options
  • Save pardom-zz/bc6cef79f71bfb28d9f9bcb99afd4e72 to your computer and use it in GitHub Desktop.
Save pardom-zz/bc6cef79f71bfb28d9f9bcb99afd4e72 to your computer and use it in GitHub Desktop.
RPM
import rx.Observable
import java.util.LinkedHashMap
class HashStreamMap<K> : LinkedHashMap<K, Observable<*>>, MutableStreamMap<K> {
constructor() : super()
constructor(initialCapacity: Int) : super(initialCapacity)
constructor(initialCapacity: Int, loadFactor: Float) : super(initialCapacity, loadFactor)
constructor(m: MutableMap<out K, out Observable<*>>?) : super(m)
constructor(initialCapacity: Int, loadFactor: Float, accessOrder: Boolean) : super(initialCapacity, loadFactor, accessOrder)
override fun <T : Any> stream(key: K): Observable<T> {
@Suppress("UNCHECKED_CAST")
return this[key] as Observable<T>
}
}
import rx.Observable
import rx.subjects.ReplaySubject
import rx.subjects.Subject
import rx.subscriptions.CompositeSubscription
abstract class Model<I : Key, O : Key> {
private val proxy = SourcesProxy<I>()
private val sinks: StreamMap<O> by lazy { setUp(proxy) }
abstract protected fun setUp(sources: StreamMap<I>): StreamMap<O>
open protected fun onAttach() {
// no op
}
open protected fun onDetach() {
// no op
}
fun attach(sources: StreamMap<I>): StreamMap<O> {
proxy.attach(sources)
onAttach()
return sinks
}
fun detach() {
proxy.detach()
onDetach()
}
interface Key {
operator fun <T : Any> invoke(streamMap: StreamMap<out Key>): Observable<T> {
@Suppress("UNCHECKED_CAST")
return (streamMap as StreamMap<Key>).stream<T>(this)
}
}
// Proxy
private class SourcesProxy<K : Key> : MutableStreamMap<K> by mutableStreamMapOf<K>() {
private val subscriptions = CompositeSubscription()
override fun <T : Any> stream(key: K): Observable<T> {
if (!containsKey(key)) {
put(key, ReplaySubject.create<T>(1))
}
@Suppress("UNCHECKED_CAST")
return this[key] as Observable<T>
}
fun attach(sources: StreamMap<K>) {
sources.keys.forEach { key ->
@Suppress("UNCHECKED_CAST")
sources.stream<Any>(key)
.subscribe(stream<Any>(key) as Subject<Any, Any>)
.apply { subscriptions.add(this) }
}
}
fun detach() {
subscriptions.clear()
}
}
}
import rx.Observable
// Maps
interface StreamMap<K> : Map<K, Observable<*>> {
fun <T : Any> stream(key: K): Observable<T>
}
interface MutableStreamMap<K> : StreamMap<K>, MutableMap<K, Observable<*>>
// Utils
fun <K> streamMapOf(vararg pairs: Pair<K, Observable<*>>): StreamMap<K> = hashStreamMapOf(*pairs)
fun <K> mutableStreamMapOf(vararg pairs: Pair<K, Observable<*>>): MutableStreamMap<K> = hashStreamMapOf(*pairs)
fun <K> hashStreamMapOf(vararg pairs: Pair<K, Observable<*>>): MutableStreamMap<K>
= HashStreamMap<K>(mapCapacity(pairs.size)).apply { putAll(pairs) }
// Taken from Maps.kt
private val INT_MAX_POWER_OF_TWO: Int = Int.MAX_VALUE / 2 + 1
private fun mapCapacity(expectedSize: Int): Int {
if (expectedSize < 3) {
return expectedSize + 1
}
if (expectedSize < INT_MAX_POWER_OF_TWO) {
return expectedSize + expectedSize / 3
}
return Int.MAX_VALUE // any large value
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment