Skip to content

Instantly share code, notes, and snippets.

@spektom
Created October 7, 2015 10:26
Show Gist options
  • Save spektom/eb7fcb20d23d0cacb27b to your computer and use it in GitHub Desktop.
Save spektom/eb7fcb20d23d0cacb27b to your computer and use it in GitHub Desktop.
(defn- db-connect [couchbase-conf]
(log/info "Initiating connection with Couchbase")
(couchbase/create-client couchbase-conf))
(defn make-key [app-id device-id]
"Returns key under which device record will be written to Couchbase"
(str app-id ":" device-id))
(defn make-data [msg]
"Create data bins that will be stored under device key"
(af-record/encode msg))
(defn- write-message [msg db-client write-policy]
(couchbase/set db-client
(make-key (:app_id msg) (:appsflyer_id (:device msg)))
(make-data msg)
write-policy))
(defn- read-messages [input-channel db-client config]
"Reads messages from input channel and writes them to Couchbase"
(while @running?
(try
(let [msg (async/<!! input-channel)
write-policy {:expiry (+ (quot (System/currentTimeMillis) 1000) (:device-ttl config))
:persist :master}]
(write-message msg db-client write-policy))
(catch Throwable ex
(log/error ex)))))
(defn start [config input-channel]
(let [couchbase-conf (:couchbase config)
db-client (db-connect (:client-conf couchbase-conf))]
(log/info "Starting Couchbase writers using configuration:" couchbase-conf)
(dotimes [writer-n (:writers-num couchbase-conf)]
(future
(try
(read-messages input-channel db-client config)
(log/info (str "Stopping Couchbase writer #" writer-n))
(catch Throwable ex
(log/error ex)))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment