Created
November 13, 2020 23:39
-
-
Save KingMob/149e1b21674c9b027699753143878ab8 to your computer and use it in GitHub Desktop.
Adaptation of java.util.concurrent.SubmissionPublisher to a Manifold sink
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(utils/when-class java.util.concurrent.SubmissionPublisher | |
(import '(java.util.concurrent SubmissionPublisher)) | |
(s/def-sink SubmissionPublisherSink | |
[^SubmissionPublisher publisher | |
^AtomicReference last-put] | |
(isSynchronous [_] | |
false) | |
(close [this] | |
(utils/with-lock lock | |
(.markClosed this) | |
(.close publisher))) | |
(description [this] | |
{:type (.getCanonicalName (class publisher)) | |
:estimated-maximum-lag (.estimateMaximumLag publisher) | |
:estimated-minimum-demand (.estimateMinimumDemand publisher) | |
:maximum-buffer-capacity (.getMaxBufferCapacity publisher) | |
:number-of-subscribers (.getNumberOfSubscribers publisher) | |
:sink? true | |
:closed? (.isClosed this)}) | |
(put [this x blocking?] | |
(assert (some? x) "SubmissionPublisher cannot take `nil` as a message") | |
(utils/with-lock lock | |
(cond | |
(s/closed? this) | |
(if blocking? | |
false | |
(d/success-deferred false)) | |
blocking? | |
(try | |
;; .submit() not actually the same as .offer() with Long/MAX_VALUE | |
;; timeout, wrt interrupt behavior, so we use them both | |
(.submit publisher x) | |
true | |
(catch Exception e | |
(log/error "Failed to submit" :val x) | |
false)) | |
:else | |
(let [d (d/deferred) | |
d' (.getAndSet last-put d) | |
f (fn [_] | |
(utils/with-lock lock | |
(try | |
(if (.isClosed publisher) | |
(d/success! d false) | |
(do | |
(.submit publisher x) | |
(d/success! d true))) | |
(catch Exception e | |
(d/success! d false)))))] | |
(if (d/realized? d') | |
(f nil) | |
(d/on-realized d' f f)) | |
d)))) | |
(put [this x blocking? timeout timeout-val] | |
(assert (some? x) "SubmissionPublisher cannot take `nil` as a message") | |
(utils/with-lock lock | |
(cond | |
(s/closed? this) | |
(if blocking? | |
false | |
(d/success-deferred false)) | |
blocking? | |
(try | |
;; .offer() returns negative number if unable to send to all subscribers | |
;; See https://docs.oracle.com/en/java/javase/15/docs/api/java.base/java/util/concurrent/SubmissionPublisher.html | |
(let [status (.offer publisher x timeout TimeUnit/MILLISECONDS nil)] | |
(if (>= status 0) | |
true | |
timeout-val)) | |
(catch Exception e | |
(log/error "Failed to submit" :val x) | |
false)) | |
:else | |
(let [d (d/deferred) | |
d' (.getAndSet last-put d) | |
f (fn [_] | |
(utils/with-lock lock | |
(try | |
(if (.isClosed publisher) | |
(d/success! d false) | |
(let [status (.offer publisher x timeout TimeUnit/MILLISECONDS nil)] | |
(if (>= status 0) | |
(d/success! d true) | |
(d/success! d timeout-val)))) | |
(catch Exception e | |
(d/success! d false)))))] | |
(if (d/realized? d') | |
(f nil) | |
(d/on-realized d' f f)) | |
d))))) | |
(extend-protocol s/Sinkable | |
SubmissionPublisher | |
(to-sink [publisher] | |
(->SubmissionPublisherSink | |
publisher | |
(AtomicReference. (d/success-deferred true)))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment