Skip to content

Instantly share code, notes, and snippets.

@danielstockton
Last active December 14, 2017 11:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielstockton/1004aed11873daa4730199829f2bef19 to your computer and use it in GitHub Desktop.
Save danielstockton/1004aed11873daa4730199829f2bef19 to your computer and use it in GitHub Desktop.
(ns etl.jobs.basic
(:require [aero.core :refer [read-config]]
[clojure.data.json :as json]
[etl.tasks.math :as math]
[onyx.job :refer [add-task register-job]]
[onyx.plugin.core-async :refer [get-core-async-channels]]
[onyx.plugin.kinesis :as kinesis]
[onyx.plugin.s3-output :as output]
[onyx.tasks.core-async :as core-async-task]
[onyx.tasks.kinesis :as kinesis-task]
[onyx.tasks.s3 :as s3-task]
[onyx.test-helper :refer [with-test-env feedback-exception!]]))
(def aws-credentials (:aws (read-config "resources/config.edn")))
(defn build-job
[{:keys [batch-settings
stream-name
region]}]
(let [base-job {:workflow [[:in :identity]
[:identity :out]]
:catalog [(merge {:onyx/name :identity
:onyx/fn :clojure.core/identity
:onyx/type :function}
batch-settings)]
:lifecycles []
:windows []
:triggers []
:flow-conditions []
:task-scheduler :onyx.task-scheduler/balanced}
kinesis-opts {:kinesis/stream-name stream-name
:kinesis/shard-initialize-type :latest
:kinesis/deserializer-fn :onyx.tasks.kinesis/deserialize-message-edn
:kinesis/region region
:onyx/min-peers 1
:onyx/max-peers 1}
s3-opts {:s3/encryption :none
:s3/access-key (:access-key aws-credentials)
:s3/secret-key (:secret-key aws-credentials)
:s3/multi-upload true
:s3/region region
:s3/serialize-per-element? true
:onyx/min-peers 1
:onyx/max-peers 1
:onyx/batch-size 2000
:onyx/batch-timeout 2000}]
(-> base-job
(add-task (kinesis-task/consumer :in (merge kinesis-opts batch-settings)))
;; (add-task (core-async-task/output :out batch-settings))
(add-task (s3-task/s3-output :out "newsroom-logs" :onyx.tasks.kinesis/serialize-message-edn s3-opts))
)))
(defmethod register-job "basic-job"
[job-name config]
(let [batch-settings {:onyx/batch-size 1 :onyx/batch-timeout 1000}]
(build-job {:batch-settings batch-settings
:stream-name "impressions"
:region "us-east-1"})))
(comment
(let [{:keys [env-config peer-config]
:as config} (read-config "resources/config.edn")
tenancy-id (str (java.util.UUID/randomUUID))
env-config (assoc env-config :onyx/tenancy-id tenancy-id)
peer-config (assoc peer-config :onyx/tenancy-id tenancy-id)
batch-settings {:onyx/batch-size 1 :onyx/batch-timeout 1000}
stream-name "impressions"
region "us-east-1"
segments (mapv (fn [n] {:partition-key (rand-int 200)
:data {:n n}})
(range 10))
client (onyx.plugin.kinesis/new-client {:kinesis/region region})
job (build-job {:batch-settings batch-settings
:stream-name "impressions"
:region "us-east-1"})
;; {:keys [out]} (get-core-async-channels job)
]
(with-test-env [test-env [3 env-config peer-config]]
(onyx.test-helper/validate-enough-peers! test-env job)
(let [job-id (:job-id (onyx.api/submit-job peer-config job))]
;; (feedback-exception! peer-config job-id)
(Thread/sleep 2000)
(.putRecords client (onyx.plugin.kinesis/build-put-request stream-name
segments
onyx.tasks.kinesis/serialize-message-edn))
;; (let [results (onyx.plugin.core-async/take-segments! out 20000)]
;; (onyx.api/kill-job peer-config job-id)
;; results)
(onyx.api/kill-job peer-config job-id)
)))
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment