Skip to content

Instantly share code, notes, and snippets.

@niwinz
Last active November 24, 2016 11:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save niwinz/0301331bee4d5148330c to your computer and use it in GitHub Desktop.
Save niwinz/0301331bee4d5148330c to your computer and use it in GitHub Desktop.
Stream & Events based state management toolkit for ClojureScript.
;; Copyright (c) 2015-2016 Andrey Antukh <niwi@niwi.nz>
;; All rights reserved.
;;
;; Redistribution and use in source and binary forms, with or without
;; modification, are permitted provided that the following conditions are met:
;;
;; * Redistributions of source code must retain the above copyright notice, this
;; list of conditions and the following disclaimer.
;;
;; * Redistributions in binary form must reproduce the above copyright notice,
;; this list of conditions and the following disclaimer in the documentation
;; and/or other materials provided with the distribution.
;;
;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
;; AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
;; IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
;; DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
;; FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
;; DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
;; SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
;; CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
;; OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
;; OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
(ns potok.core
"Stream & Events based state management toolkit for ClojureScript."
(:refer-clojure :exclude [update])
(:require [beicon.core :as rx]))
;; --- Protocols
;; An abstraction that represents a simple state
;; transformation. The `update` function will receive
;; the current state and will return the transformend
;; state. It also can be interpreted like a reducer.
(defprotocol UpdateEvent
(update [event state] "Apply a transformation to the state."))
;; An abstraction that represents an asynchronous
;; computation. The `watch` function receives the
;; current state and the stream and should return
;; an other stream with events (that can be of
;; `UpdateEvent`, `WatchEvent` or `EffectEvent`.
(defprotocol WatchEvent
(watch [event state stream]))
;; An abstraction for perform just side effects. It
;; receives state and its return value is completly
;; ignored.
(defprotocol EffectEvent
(effect [event state stream]))
;; An abstraction used for send data to the stream loop.
(defprotocol PushStream
(^:private -push [_ event] "Push event into the stream."))
;; --- Predicates
(defn update?
"Return `true` when `e` satisfies
the UpdateEvent protocol."
[e]
(satisfies? UpdateEvent e))
(defn watch?
"Return `true` when `e` satisfies
the WatchEvent protocol."
[e]
(satisfies? WatchEvent e))
(defn effect?
"Return `true` when `e` satisfies
the EffectEvent protocol."
[e]
(satisfies? EffectEvent e))
;; --- Implementation Details
(extend-protocol UpdateEvent
function
(update [func state]
(func state)))
(def ^:dynamic *on-error* nil)
(defn- default-error-handler
[error]
(if (fn? *on-error*)
(*on-error* error)
(do
(js/console.error "Using default error handler, consider using your own!")
(js/console.error error)
(throw error))))
(enable-console-print!)
;; --- Public API
(defn init
"Start a new stream loop.
This function initializes a new event processing stream
loop and returns a bi-directional rx stream that should
be used to push new events and subscribe to state changes.
You probably should not be using this function directly,
because a default stream loop is already instanciated
under the `stream` var. This function is indented to be
used for advanced use cases, where one stream loop is not
enough."
([] (init nil))
([{:keys [on-error] :or {on-error default-error-handler}}]
{:pre [(fn? on-error)]}
(let [bus (rx/bus)
state-s (->> (rx/filter update? bus)
(rx/scan #(update %2 %1) nil)
(rx/merge (rx/just nil))
(rx/catch on-error)
(rx/retry 1024)
(rx/share))
watch-s (->> (rx/filter watch? bus)
(rx/with-latest-from vector state-s)
(rx/flat-map (fn [[event state]] (watch event state bus)))
(rx/catch on-error)
(rx/retry 1024))
effect-s (->> (rx/filter effect? bus)
(rx/with-latest-from vector state-s))
subw (rx/on-value watch-s #(rx/push! bus %))
sube (rx/on-value effect-s (fn [[event state]] (effect event state bus)))
stoped? (volatile! false)]
(specify! state-s
PushStream
(-push [_ event]
(when @stoped? (throw (ex-info "Stream loop already closed." {})))
(rx/push! bus event))
Object
(close [_]
(vreset! stoped? true)
(.unsubscribe subw)
(.unsubscribe sube)
(rx/end! bus))))))
;; A default instance of the stream loop.
(defonce stream (init stream))
(defn emit!
"Emits an event or a collection of them into the default stream loop.
If you have instanciated your own stream-loop, this function provides
2-arity that allows specify a user defined stream loop."
([event]
(if (vector? event)
(run! (partial -push stream) event)
(-push stream event)))
([stream event]
{:pre [(satisfies? PushStream stream)]}
(if (vector? event)
(run! (partial -push stream) event)
(-push stream event))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment