Skip to content

Instantly share code, notes, and snippets.

@spektom
Created October 7, 2015 10:36
Show Gist options
  • Save spektom/cbac704632422310f4fb to your computer and use it in GitHub Desktop.
Save spektom/cbac704632422310f4fb to your computer and use it in GitHub Desktop.
(defn- db-connect [aerospike-conf]
(log/info (str "Initiating connection with Aerospike DB"))
(let [client-policy (new AsyncClientPolicy)
hosts (map #(new Host % (:port aerospike-conf)) (:hosts aerospike-conf))]
(set! (. client-policy asyncMaxCommands) (:async-max-commands aerospike-conf))
(set! (. client-policy asyncSelectorThreads) (.availableProcessors (Runtime/getRuntime)))
(new AsyncClient client-policy (into-array Host hosts))))
(defn- make-write-policy [config]
"Creates write policy for device records"
(let [write-policy (new WritePolicy)]
(set! (. write-policy commitLevel) CommitLevel/COMMIT_MASTER)
(set! (. write-policy expiration) (:device-ttl config))
(set! (. write-policy recordExistsAction) RecordExistsAction/CREATE_ONLY)
write-policy))
(def ^{:private true} key-namespace "organic")
(defn make-key [app-id device-id]
"Returns key under which device record will be written to Aerospike"
(let [device-key (str app-id ":" device-id)
clean-device-key (apply str (remove #(Character/isISOControl ^char %) ^String device-key))
hash-key (bt/hash clean-device-key :murmur128)]
(new Key ^String key-namespace nil hash-key)))
(defn make-data [msg]
"Create data bins that will be stored under device key"
(into-array [(new Bin nil (af-record/encode msg))]))
(defn- write-message [msg ^AsyncClient db-client ^WritePolicy write-policy]
(let [key (make-key (:app_id msg) (:appsflyer_id (:device msg)))
data (make-data msg)]
(. db-client put write-policy nil ^Key key data)))
;(. db-client put write-policy nil ^Key (make-key (:app_id msg) (.toString (java.util.UUID/randomUUID))) data)))
(defn- read-messages [input-channel db-client write-policy]
"Reads messages from input channel and writes them to Aerospike"
(while @running?
(try
(let [msg (async/<!! input-channel)]
(write-message msg db-client write-policy))
(catch Throwable ex
(log/error ex)))))
(defn start [config input-channel]
(let [aerospike-conf (:aerospike config)
write-policy (make-write-policy config)
db-client (db-connect aerospike-conf)]
(log/info "Starting Aerospike writers using configuration:" aerospike-conf)
(dotimes [writer-n (:writers-num aerospike-conf)]
(future
(try
(do
(read-messages input-channel db-client write-policy)
(log/info (str "Stopping Aerospike writer #" writer-n)))
(catch Throwable ex
(log/error ex)))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment