Skip to content

Instantly share code, notes, and snippets.

@robert-stuttaford
Last active May 1, 2017 07:07
Show Gist options
  • Save robert-stuttaford/54029eaf9eb37e96d450 to your computer and use it in GitHub Desktop.
Save robert-stuttaford/54029eaf9eb37e96d450 to your computer and use it in GitHub Desktop.
Using Onyx with Trapperkeeper
{:environment :production
:global {:logging-config "./logback.xml"}
:onyx {:job-scheduler :onyx.job-scheduler/balanced
:task-scheduler :onyx.task-scheduler/balanced
:peer-config {:onyx.messaging/impl :netty
:onyx.messaging/peer-port-range [40200 40220]
:onyx.messaging/peer-ports [40199]
:onyx.messaging/bind-addr "localhost"
:onyx.messaging/backpressure-strategy :high-restart-latency}
:peer-count 20
:zookeeper {:address "127.0.0.1:2186"
:port 2186}}}
(ns a.trapperkeeper.service.onyx
(:require [clojure.tools.logging :as log]
[onyx.api :as onyx]
[plumbing.core :refer :all]
[puppetlabs.trapperkeeper.core :as tk]
[puppetlabs.trapperkeeper.services :refer [service-context]])
(:import [java.util UUID]))
(defnk config [onyx-id job-scheduler {server? false} {peer? false} peer-config zookeeper]
(merge
(cond-> {:zookeeper/address (:address zookeeper)
:onyx/id onyx-id
:onyx.peer/job-scheduler job-scheduler}
server? (merge {:zookeeper/server? true
:zookeeper.server/port (:port zookeeper)}))
(if peer?
peer-config
{})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Service
(defprotocol OnyxService
(submit-job [this catalog workflow flow-conditions lifecycles])
(kill-job [this job-uuid]))
(tk/defservice service
OnyxService
[[:ConfigService get-in-config]]
(start [this context]
(log/info "Starting Onyx service")
(let [onyx-config (-> (get-in-config [:onyx])
(assoc :onyx-id (UUID/randomUUID)))
env-config (config (assoc onyx-config :server? true))
peer-config (config (assoc onyx-config :peer? true))
env (onyx/start-env env-config)
peer-group (onyx.api/start-peer-group peer-config)
peers (onyx/start-peers (:peer-count onyx-config) peer-group)]
(assoc context :env env :peer-config peer-config :peers peers)))
(stop [this {:keys [env peers] :as context}]
(log/info "Stopping Onyx service")
(doseq [peer peers]
(try
(onyx/shutdown-peer peer)
(catch Throwable e (log/info "Error during peer shutdown:" e))))
(try
(log/info "* Stopping env")
(onyx/shutdown-env env)
(catch Throwable e (log/warn "Error during env shutdown:" e)))
(dissoc context :conn :peers))
(submit-job
[this catalog workflow flow-conditions lifecycles]
(log/info "Submitting job")
(onyx/submit-job (:peer-config (service-context this))
{:catalog catalog
:workflow workflow
:lifecycles lifecycles
:flow-conditions flow-conditions
:task-scheduler (get-in-config [:onyx :task-scheduler])}))
(kill-job
[this job-id]
(log/info "Killing job")
(onyx.api/kill-job (:peer-config (service-context this)) job-id)))
;; We have another service which calls (submit-job) in its own (start) implementation, and (kill-job) in its (stop).
;; This job has Datomic tx-report-queue :input and Datomic transact :output catalog items.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment