Skip to content

Instantly share code, notes, and snippets.

@KingMob
Created November 13, 2020 23:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save KingMob/149e1b21674c9b027699753143878ab8 to your computer and use it in GitHub Desktop.
Save KingMob/149e1b21674c9b027699753143878ab8 to your computer and use it in GitHub Desktop.
Adaptation of java.util.concurrent.SubmissionPublisher to a Manifold sink
(utils/when-class java.util.concurrent.SubmissionPublisher
(import '(java.util.concurrent SubmissionPublisher))
(s/def-sink SubmissionPublisherSink
[^SubmissionPublisher publisher
^AtomicReference last-put]
(isSynchronous [_]
false)
(close [this]
(utils/with-lock lock
(.markClosed this)
(.close publisher)))
(description [this]
{:type (.getCanonicalName (class publisher))
:estimated-maximum-lag (.estimateMaximumLag publisher)
:estimated-minimum-demand (.estimateMinimumDemand publisher)
:maximum-buffer-capacity (.getMaxBufferCapacity publisher)
:number-of-subscribers (.getNumberOfSubscribers publisher)
:sink? true
:closed? (.isClosed this)})
(put [this x blocking?]
(assert (some? x) "SubmissionPublisher cannot take `nil` as a message")
(utils/with-lock lock
(cond
(s/closed? this)
(if blocking?
false
(d/success-deferred false))
blocking?
(try
;; .submit() not actually the same as .offer() with Long/MAX_VALUE
;; timeout, wrt interrupt behavior, so we use them both
(.submit publisher x)
true
(catch Exception e
(log/error "Failed to submit" :val x)
false))
:else
(let [d (d/deferred)
d' (.getAndSet last-put d)
f (fn [_]
(utils/with-lock lock
(try
(if (.isClosed publisher)
(d/success! d false)
(do
(.submit publisher x)
(d/success! d true)))
(catch Exception e
(d/success! d false)))))]
(if (d/realized? d')
(f nil)
(d/on-realized d' f f))
d))))
(put [this x blocking? timeout timeout-val]
(assert (some? x) "SubmissionPublisher cannot take `nil` as a message")
(utils/with-lock lock
(cond
(s/closed? this)
(if blocking?
false
(d/success-deferred false))
blocking?
(try
;; .offer() returns negative number if unable to send to all subscribers
;; See https://docs.oracle.com/en/java/javase/15/docs/api/java.base/java/util/concurrent/SubmissionPublisher.html
(let [status (.offer publisher x timeout TimeUnit/MILLISECONDS nil)]
(if (>= status 0)
true
timeout-val))
(catch Exception e
(log/error "Failed to submit" :val x)
false))
:else
(let [d (d/deferred)
d' (.getAndSet last-put d)
f (fn [_]
(utils/with-lock lock
(try
(if (.isClosed publisher)
(d/success! d false)
(let [status (.offer publisher x timeout TimeUnit/MILLISECONDS nil)]
(if (>= status 0)
(d/success! d true)
(d/success! d timeout-val))))
(catch Exception e
(d/success! d false)))))]
(if (d/realized? d')
(f nil)
(d/on-realized d' f f))
d)))))
(extend-protocol s/Sinkable
SubmissionPublisher
(to-sink [publisher]
(->SubmissionPublisherSink
publisher
(AtomicReference. (d/success-deferred true))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment