Created
October 7, 2015 10:36
-
-
Save spektom/cbac704632422310f4fb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn- db-connect [aerospike-conf] | |
(log/info (str "Initiating connection with Aerospike DB")) | |
(let [client-policy (new AsyncClientPolicy) | |
hosts (map #(new Host % (:port aerospike-conf)) (:hosts aerospike-conf))] | |
(set! (. client-policy asyncMaxCommands) (:async-max-commands aerospike-conf)) | |
(set! (. client-policy asyncSelectorThreads) (.availableProcessors (Runtime/getRuntime))) | |
(new AsyncClient client-policy (into-array Host hosts)))) | |
(defn- make-write-policy [config] | |
"Creates write policy for device records" | |
(let [write-policy (new WritePolicy)] | |
(set! (. write-policy commitLevel) CommitLevel/COMMIT_MASTER) | |
(set! (. write-policy expiration) (:device-ttl config)) | |
(set! (. write-policy recordExistsAction) RecordExistsAction/CREATE_ONLY) | |
write-policy)) | |
(def ^{:private true} key-namespace "organic") | |
(defn make-key [app-id device-id] | |
"Returns key under which device record will be written to Aerospike" | |
(let [device-key (str app-id ":" device-id) | |
clean-device-key (apply str (remove #(Character/isISOControl ^char %) ^String device-key)) | |
hash-key (bt/hash clean-device-key :murmur128)] | |
(new Key ^String key-namespace nil hash-key))) | |
(defn make-data [msg] | |
"Create data bins that will be stored under device key" | |
(into-array [(new Bin nil (af-record/encode msg))])) | |
(defn- write-message [msg ^AsyncClient db-client ^WritePolicy write-policy] | |
(let [key (make-key (:app_id msg) (:appsflyer_id (:device msg))) | |
data (make-data msg)] | |
(. db-client put write-policy nil ^Key key data))) | |
;(. db-client put write-policy nil ^Key (make-key (:app_id msg) (.toString (java.util.UUID/randomUUID))) data))) | |
(defn- read-messages [input-channel db-client write-policy] | |
"Reads messages from input channel and writes them to Aerospike" | |
(while @running? | |
(try | |
(let [msg (async/<!! input-channel)] | |
(write-message msg db-client write-policy)) | |
(catch Throwable ex | |
(log/error ex))))) | |
(defn start [config input-channel] | |
(let [aerospike-conf (:aerospike config) | |
write-policy (make-write-policy config) | |
db-client (db-connect aerospike-conf)] | |
(log/info "Starting Aerospike writers using configuration:" aerospike-conf) | |
(dotimes [writer-n (:writers-num aerospike-conf)] | |
(future | |
(try | |
(do | |
(read-messages input-channel db-client write-policy) | |
(log/info (str "Stopping Aerospike writer #" writer-n))) | |
(catch Throwable ex | |
(log/error ex))))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment