Skip to content

Instantly share code, notes, and snippets.

@awwx
Last active May 4, 2024 11:51
Show Gist options
  • Save awwx/6d02e6ea702bdc499bcf847e9b9bb98f to your computer and use it in GitHub Desktop.
Save awwx/6d02e6ea702bdc499bcf847e9b9bb98f to your computer and use it in GitHub Desktop.
observe Missionary task and flow events
;; SPDX-License-Identifier: EPL-2.0 OR MIT
(ns mobserve
(:require
[missionary.core :as m])
(:import
missionary.Cancelled))
;; Encapsulate differences between Clojure and ClojureScript
;; on how IFn and IDeref are implemented.
#?(:clj
(deftype FlowIterator [transfer cancel]
clojure.lang.IDeref
(deref [_this]
(transfer))
clojure.lang.IFn
(invoke [_this]
(cancel)))
:cljs
(deftype FlowIterator [transfer cancel]
IDeref
(-deref [_this]
(transfer))
IFn
(-invoke [_this]
(cancel))))
(defn println-reporter [description msg & args]
;; Lazy evaluation interacts badly with dynamic binding;
;; evaluate the pr-str before println changes the print mode.
(let [pr-args (mapv pr-str args)]
(apply println description msg pr-args)))
(defn characterize-error [e]
(cond
(instance? Cancelled e)
:canceled
:else
e))
(defn observe-task
([description task]
(observe-task println-reporter description task))
([report description task]
(fn [s f]
(report description :task-instantiates)
(let [cancel! (task
(fn [v]
(report description :task-succeeded v)
(s v))
(fn [e]
(report description :task-failed (characterize-error e))
(f e)))]
(report description :task-instantiated)
(fn []
(report description :caller-cancels)
(cancel!))))))
(defn hook-call [thunk successful-call call-threw]
(let [v
(try
(thunk)
(catch #?(:clj Throwable, :cljs :default) e
(call-threw e)
(throw e)))]
(successful-call v)
v))
(defn observe-flow
([description flow]
(observe-flow println-reporter description flow))
([report description flow]
(fn [notifier terminator]
(report description :consumer-instantiates)
(let [child-iterator
(flow
(fn notify []
(report description :producer-notifies)
(notifier))
(fn terminate []
(report description :producer-terminates)
(terminator)))]
(report description :producer-instantiated)
(FlowIterator.
(fn transfer []
(report description :consumer-transfers)
(hook-call
(fn [] @child-iterator)
(fn [v]
(report description :transferred-value v))
(fn [e]
(report description :transferred-error (characterize-error e)))))
(fn cancel []
(report description :consumer-cancels)
(child-iterator)))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment