-
-
Save danielstockton/1004aed11873daa4730199829f2bef19 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns etl.jobs.basic | |
(:require [aero.core :refer [read-config]] | |
[clojure.data.json :as json] | |
[etl.tasks.math :as math] | |
[onyx.job :refer [add-task register-job]] | |
[onyx.plugin.core-async :refer [get-core-async-channels]] | |
[onyx.plugin.kinesis :as kinesis] | |
[onyx.plugin.s3-output :as output] | |
[onyx.tasks.core-async :as core-async-task] | |
[onyx.tasks.kinesis :as kinesis-task] | |
[onyx.tasks.s3 :as s3-task] | |
[onyx.test-helper :refer [with-test-env feedback-exception!]])) | |
(def aws-credentials (:aws (read-config "resources/config.edn"))) | |
(defn build-job | |
[{:keys [batch-settings | |
stream-name | |
region]}] | |
(let [base-job {:workflow [[:in :identity] | |
[:identity :out]] | |
:catalog [(merge {:onyx/name :identity | |
:onyx/fn :clojure.core/identity | |
:onyx/type :function} | |
batch-settings)] | |
:lifecycles [] | |
:windows [] | |
:triggers [] | |
:flow-conditions [] | |
:task-scheduler :onyx.task-scheduler/balanced} | |
kinesis-opts {:kinesis/stream-name stream-name | |
:kinesis/shard-initialize-type :latest | |
:kinesis/deserializer-fn :onyx.tasks.kinesis/deserialize-message-edn | |
:kinesis/region region | |
:onyx/min-peers 1 | |
:onyx/max-peers 1} | |
s3-opts {:s3/encryption :none | |
:s3/access-key (:access-key aws-credentials) | |
:s3/secret-key (:secret-key aws-credentials) | |
:s3/multi-upload true | |
:s3/region region | |
:s3/serialize-per-element? true | |
:onyx/min-peers 1 | |
:onyx/max-peers 1 | |
:onyx/batch-size 2000 | |
:onyx/batch-timeout 2000}] | |
(-> base-job | |
(add-task (kinesis-task/consumer :in (merge kinesis-opts batch-settings))) | |
;; (add-task (core-async-task/output :out batch-settings)) | |
(add-task (s3-task/s3-output :out "newsroom-logs" :onyx.tasks.kinesis/serialize-message-edn s3-opts)) | |
))) | |
(defmethod register-job "basic-job" | |
[job-name config] | |
(let [batch-settings {:onyx/batch-size 1 :onyx/batch-timeout 1000}] | |
(build-job {:batch-settings batch-settings | |
:stream-name "impressions" | |
:region "us-east-1"}))) | |
(comment | |
(let [{:keys [env-config peer-config] | |
:as config} (read-config "resources/config.edn") | |
tenancy-id (str (java.util.UUID/randomUUID)) | |
env-config (assoc env-config :onyx/tenancy-id tenancy-id) | |
peer-config (assoc peer-config :onyx/tenancy-id tenancy-id) | |
batch-settings {:onyx/batch-size 1 :onyx/batch-timeout 1000} | |
stream-name "impressions" | |
region "us-east-1" | |
segments (mapv (fn [n] {:partition-key (rand-int 200) | |
:data {:n n}}) | |
(range 10)) | |
client (onyx.plugin.kinesis/new-client {:kinesis/region region}) | |
job (build-job {:batch-settings batch-settings | |
:stream-name "impressions" | |
:region "us-east-1"}) | |
;; {:keys [out]} (get-core-async-channels job) | |
] | |
(with-test-env [test-env [3 env-config peer-config]] | |
(onyx.test-helper/validate-enough-peers! test-env job) | |
(let [job-id (:job-id (onyx.api/submit-job peer-config job))] | |
;; (feedback-exception! peer-config job-id) | |
(Thread/sleep 2000) | |
(.putRecords client (onyx.plugin.kinesis/build-put-request stream-name | |
segments | |
onyx.tasks.kinesis/serialize-message-edn)) | |
;; (let [results (onyx.plugin.core-async/take-segments! out 20000)] | |
;; (onyx.api/kill-job peer-config job-id) | |
;; results) | |
(onyx.api/kill-job peer-config job-id) | |
))) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment