Last active
July 10, 2017 09:00
-
-
Save mpenet/a8266ac2d6e081701e74112bd20f3a5a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns foo.testdq | |
(:require | |
[clojure.tools.logging :as log] | |
[durable-queue :as dq])) | |
(def opts | |
{:fsync-threshold 25 | |
:fsync-take? false | |
:fsync-put? false | |
:put-timeout 0 | |
:slab-size (* 1024 1024) | |
:take-timeout 10000}) | |
(def queues (dq/queues "/tmp" opts)) | |
(def id :foo) | |
(defn start-producers! | |
[q num-producers] | |
(doall | |
(repeatedly num-producers | |
#(future | |
(log/info "Start puts") | |
(while true | |
(dq/put! q id (str (java.util.UUID/randomUUID))) | |
(Thread/sleep (rand-int 2))))))) | |
(def last-val (atom nil)) | |
(defn start-consumers! | |
[q num-workers] | |
(cons | |
(future | |
(while true | |
(log/info (dq/stats q)) | |
(Thread/sleep 1000))) | |
(doall | |
(repeatedly num-workers | |
#(future | |
(while true | |
(let [t (dq/take! q id)] | |
(dq/complete! t) | |
(reset! last-val @t) | |
(Thread/sleep (rand-int 5))))))))) | |
(def c (start-consumers! queues 10)) | |
(def p (start-producers! queues 10)) | |
;; (prn @last-val) | |
;; (mapv future-cancel c) | |
;; (mapv future-cancel p) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment