-
-
Save kidpollo/d6462feae7d52d3c0352754d45d5b0f9 to your computer and use it in GitHub Desktop.
posmanoid
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
; core.clj | |
(defn -main | |
"Start Ordasity worker by joining ZooKeeper cluster." | |
[& args] | |
(enable-http-server) | |
(enable-ordacity) | |
(mount/start) | |
(initialize-statsd) ; clj-statsd handles its own non-reloadable state | |
(events/scheduled-mailboxes-reset) | |
(schedule-mailboxes-sync) | |
(.await (CountDownLatch. 1))) | |
; worker.clj | |
(defstate ^:private ordacity-cluster | |
:start | |
(when @join-ordacity-cluster? | |
(let [{cluster-name :zk-cluster hosts :zk-hosts work-unit-name :zk-work-unit} env] | |
(assert (some? cluster-name)) | |
(assert (some? hosts)) | |
(assert (some? work-unit-name)) | |
(trace "Worker/joinCluster" | |
(events/zookeper-cluster-joined work-unit-name) | |
(.join ^Cluster | |
(Cluster. cluster-name | |
(make-listener cron/scheduler) | |
(-> (ClusterConfig/builder) | |
(.setHosts hosts) | |
(.setWorkUnitName work-unit-name) | |
(.setAutoRebalanceInterval 60) ; default 60s, just explicit | |
(.build))) | |
(Option/apply | |
zookeeper/tw-zk-client))))) | |
:stop | |
(when @join-ordacity-cluster? | |
(.forceShutdown ordacity-cluster))) | |
(defn- make-listener | |
"Create Ordasity ClusterListener | |
On StartWork, create and connect to mailbox defined by `unit'" | |
([queue] (make-listener queue (atom {}) (atom {}))) | |
([queue mailboxes locks] | |
(proxy [ClusterListener] [] | |
(onJoin [_]) | |
(onLeave []) | |
(onDisconnect [] | |
(events/zookeeper-cluster-disconnected)) | |
(onReconnect [] | |
(events/zookeeper-cluster-reconnected)) | |
(startWork [unit] | |
(trace "Worker/startWork" {:unit unit} | |
(start-work unit queue mailboxes locks))) | |
(shutdownWork [unit] | |
(trace "Worker/shutdownWork" {:unit unit} | |
(shutdown-work unit queue mailboxes locks)))))) | |
(defn- start-work [unit queue mailboxes locks] | |
(locking (get-lock unit locks) | |
(if-some [mbox (get @mailboxes unit)] | |
(events/scheduled-mailbox-start-wtf unit) | |
(if-some [mbox (mailbox/load-mailbox unit)] | |
(do | |
(swap! mailboxes assoc unit mbox) | |
(if (and (setting/enabled? mbox) | |
(api/active? mbox)) | |
(do | |
(schedule mbox queue) | |
(events/scheduled-mailboxes-inc unit)) | |
(events/mailbox-disabled unit))) | |
(swap! mailboxes assoc unit :missing))))) | |
(defn- schedule [mbox queue] | |
(deschedule mbox queue) | |
(reset! (get-in mbox [:state :job]) | |
(.scheduleWithFixedDelay ^ScheduledThreadPoolExecutor queue | |
^Runnable (reify RunnableFuture | |
(run [_] | |
(maybe-collect-mail mbox))) | |
^long (if (= (env :do-not-wait-on-startup) "true") | |
0 | |
initial-delay) | |
^long mail-check-interval | |
TimeUnit/MILLISECONDS))) | |
; mail.core.clj | |
(defn maybe-collect-mail [mbox] | |
(let [{:keys [site-id id host user state]} mbox | |
handle (-> (partial collect-mail) | |
with-periodic-deletion | |
with-consistency-check | |
with-ensure-connection | |
with-periodic-disconnection | |
with-unfriendly-mailbox-disconnection | |
with-state-transition-handling | |
with-error-on-abort-handling | |
with-error-handling | |
with-newrelic-tracing | |
with-safe-reconnect-interval | |
with-backoff ; may shortcircuit call chain | |
with-resetting-slow-server-at | |
with-top-level-logging-context)] | |
(try | |
(handle mbox) | |
(catch Exception e | |
(with-logging-context {:site-id site-id :mailbox-id id :mailbox-host host} | |
(add-to-invisible-error-state mbox) | |
(events/mailbox-collect-mail-diaper-error host user (:invisible-error-count state) e)))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment