Skip to content

Instantly share code, notes, and snippets.

@kidpollo
Created September 1, 2016 23:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kidpollo/d6462feae7d52d3c0352754d45d5b0f9 to your computer and use it in GitHub Desktop.
Save kidpollo/d6462feae7d52d3c0352754d45d5b0f9 to your computer and use it in GitHub Desktop.
posmanoid
; core.clj
(defn -main
"Start Ordasity worker by joining ZooKeeper cluster."
[& args]
(enable-http-server)
(enable-ordacity)
(mount/start)
(initialize-statsd) ; clj-statsd handles its own non-reloadable state
(events/scheduled-mailboxes-reset)
(schedule-mailboxes-sync)
(.await (CountDownLatch. 1)))
; worker.clj
(defstate ^:private ordacity-cluster
:start
(when @join-ordacity-cluster?
(let [{cluster-name :zk-cluster hosts :zk-hosts work-unit-name :zk-work-unit} env]
(assert (some? cluster-name))
(assert (some? hosts))
(assert (some? work-unit-name))
(trace "Worker/joinCluster"
(events/zookeper-cluster-joined work-unit-name)
(.join ^Cluster
(Cluster. cluster-name
(make-listener cron/scheduler)
(-> (ClusterConfig/builder)
(.setHosts hosts)
(.setWorkUnitName work-unit-name)
(.setAutoRebalanceInterval 60) ; default 60s, just explicit
(.build)))
(Option/apply
zookeeper/tw-zk-client)))))
:stop
(when @join-ordacity-cluster?
(.forceShutdown ordacity-cluster)))
(defn- make-listener
"Create Ordasity ClusterListener
On StartWork, create and connect to mailbox defined by `unit'"
([queue] (make-listener queue (atom {}) (atom {})))
([queue mailboxes locks]
(proxy [ClusterListener] []
(onJoin [_])
(onLeave [])
(onDisconnect []
(events/zookeeper-cluster-disconnected))
(onReconnect []
(events/zookeeper-cluster-reconnected))
(startWork [unit]
(trace "Worker/startWork" {:unit unit}
(start-work unit queue mailboxes locks)))
(shutdownWork [unit]
(trace "Worker/shutdownWork" {:unit unit}
(shutdown-work unit queue mailboxes locks))))))
(defn- start-work [unit queue mailboxes locks]
(locking (get-lock unit locks)
(if-some [mbox (get @mailboxes unit)]
(events/scheduled-mailbox-start-wtf unit)
(if-some [mbox (mailbox/load-mailbox unit)]
(do
(swap! mailboxes assoc unit mbox)
(if (and (setting/enabled? mbox)
(api/active? mbox))
(do
(schedule mbox queue)
(events/scheduled-mailboxes-inc unit))
(events/mailbox-disabled unit)))
(swap! mailboxes assoc unit :missing)))))
(defn- schedule [mbox queue]
(deschedule mbox queue)
(reset! (get-in mbox [:state :job])
(.scheduleWithFixedDelay ^ScheduledThreadPoolExecutor queue
^Runnable (reify RunnableFuture
(run [_]
(maybe-collect-mail mbox)))
^long (if (= (env :do-not-wait-on-startup) "true")
0
initial-delay)
^long mail-check-interval
TimeUnit/MILLISECONDS)))
; mail.core.clj
(defn maybe-collect-mail [mbox]
(let [{:keys [site-id id host user state]} mbox
handle (-> (partial collect-mail)
with-periodic-deletion
with-consistency-check
with-ensure-connection
with-periodic-disconnection
with-unfriendly-mailbox-disconnection
with-state-transition-handling
with-error-on-abort-handling
with-error-handling
with-newrelic-tracing
with-safe-reconnect-interval
with-backoff ; may shortcircuit call chain
with-resetting-slow-server-at
with-top-level-logging-context)]
(try
(handle mbox)
(catch Exception e
(with-logging-context {:site-id site-id :mailbox-id id :mailbox-host host}
(add-to-invisible-error-state mbox)
(events/mailbox-collect-mail-diaper-error host user (:invisible-error-count state) e))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment