Skip to content

Instantly share code, notes, and snippets.

@mattdeboard
Last active September 11, 2015 19:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattdeboard/ecd0d033d1977bf6da78 to your computer and use it in GitHub Desktop.
Save mattdeboard/ecd0d033d1977bf6da78 to your computer and use it in GitHub Desktop.
(defn read-from
"Read from source queue, and write the message to both to the destination
queue as well as to disk."
[creds src dest]
(let [ch (receive! creds src)
{:keys [in-chan]} (batching-sends creds dest)
wo (writer output-file :append true)]
(async/go
(with-open [o (writer output-file :append true)]
(loop []
;; Using `alts!' in this way allows us to flush buffer output to disk
;; and close the channel after 10 seconds of no new messages.
(let [[entry source-chan] (async/alts! [ch (async/timeout 500)]
:priority true)
s (json/write-str entry)]
;; `entry' will be nil when a take operation has been performed with
;; a channel returned by `timeout'. When that happens, flush buffer
;; to disk and close the channel.
(if (nil? entry)
(do
(.flush o)
(async/close! ch))
;; If `entry' is not nil, instead write the message to `in-chan'
;; (which is our secondary queue for already-processed messages),
;; remove the message from the source queue (via `processed!'),
;; and write the data (with a newline appended) to the buffer
;; before recurring.
(do
(async/>! in-chan entry)
(processed! creds src entry)
(.write o (str s "\n"))
(recur)))))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment