-
-
Save mattdeboard/ecd0d033d1977bf6da78 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn read-from | |
"Read from source queue, and write the message to both to the destination | |
queue as well as to disk." | |
[creds src dest] | |
(let [ch (receive! creds src) | |
{:keys [in-chan]} (batching-sends creds dest) | |
wo (writer output-file :append true)] | |
(async/go | |
(with-open [o (writer output-file :append true)] | |
(loop [] | |
;; Using `alts!' in this way allows us to flush buffer output to disk | |
;; and close the channel after 10 seconds of no new messages. | |
(let [[entry source-chan] (async/alts! [ch (async/timeout 500)] | |
:priority true) | |
s (json/write-str entry)] | |
;; `entry' will be nil when a take operation has been performed with | |
;; a channel returned by `timeout'. When that happens, flush buffer | |
;; to disk and close the channel. | |
(if (nil? entry) | |
(do | |
(.flush o) | |
(async/close! ch)) | |
;; If `entry' is not nil, instead write the message to `in-chan' | |
;; (which is our secondary queue for already-processed messages), | |
;; remove the message from the source queue (via `processed!'), | |
;; and write the data (with a newline appended) to the buffer | |
;; before recurring. | |
(do | |
(async/>! in-chan entry) | |
(processed! creds src entry) | |
(.write o (str s "\n")) | |
(recur))))))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment