Skip to content

Instantly share code, notes, and snippets.

@mpenet
Last active July 10, 2017 09:00
Show Gist options
  • Save mpenet/a8266ac2d6e081701e74112bd20f3a5a to your computer and use it in GitHub Desktop.
Save mpenet/a8266ac2d6e081701e74112bd20f3a5a to your computer and use it in GitHub Desktop.
(ns foo.testdq
(:require
[clojure.tools.logging :as log]
[durable-queue :as dq]))
(def opts
{:fsync-threshold 25
:fsync-take? false
:fsync-put? false
:put-timeout 0
:slab-size (* 1024 1024)
:take-timeout 10000})
(def queues (dq/queues "/tmp" opts))
(def id :foo)
(defn start-producers!
[q num-producers]
(doall
(repeatedly num-producers
#(future
(log/info "Start puts")
(while true
(dq/put! q id (str (java.util.UUID/randomUUID)))
(Thread/sleep (rand-int 2)))))))
(def last-val (atom nil))
(defn start-consumers!
[q num-workers]
(cons
(future
(while true
(log/info (dq/stats q))
(Thread/sleep 1000)))
(doall
(repeatedly num-workers
#(future
(while true
(let [t (dq/take! q id)]
(dq/complete! t)
(reset! last-val @t)
(Thread/sleep (rand-int 5)))))))))
(def c (start-consumers! queues 10))
(def p (start-producers! queues 10))
;; (prn @last-val)
;; (mapv future-cancel c)
;; (mapv future-cancel p)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment