Created
October 7, 2015 10:46
-
-
Save spektom/611618709458f4b06c44 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn- db-connect [] | |
(let [client-policy (new AsyncClientPolicy) | |
hosts (map #(new Host % 3000) ["aerospike001.local" | |
"aerospike002.local" | |
"aerospike003.local" | |
"aerospike004.local"])] | |
(set! (. client-policy asyncMaxCommands) 1000) | |
(set! (. client-policy asyncSelectorThreads) (.availableProcessors (Runtime/getRuntime))) | |
(new AsyncClient client-policy (into-array Host hosts)))) | |
(defn- make-read-policy [] | |
(let [read-policy (new Policy)] | |
(set! (. read-policy consistencyLevel) ConsistencyLevel/CONSISTENCY_ONE) | |
(set! (. read-policy timeout) 3000) | |
read-policy)) | |
(def read-policy (make-read-policy)) | |
(defn make-key [app-id device-id] | |
(new Key "organic" nil (str app-id ":" device-id))) | |
(defn- handle-msg [db-client msg] | |
(let [app-id (:app_id msg) | |
device-id (:appsflyer_id (:device msg)) | |
^Key key (make-key app-id device-id) | |
record-listener (reify RecordListener | |
(onSuccess [this key record] | |
(if record (swap! stats/success-count inc) (swap! stats/failure-count inc))) | |
(onFailure [this exception] | |
(swap! stats/failure-count inc)))] | |
(. db-client get read-policy record-listener key))) | |
(defn start-client [input-channel] | |
(let [db-client (db-connect)] | |
(dotimes [n 8] | |
(future | |
(while true | |
(try | |
(let [msg (async/<!! input-channel)] | |
(handle-msg db-client msg)) | |
(catch Exception ex | |
(.printStackTrace ex)))))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment