Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Streaming word extract
(ns streaming-word-extract
(:require
[pubsub-utils] ;; This is our local pubsub-utils namespace
[datasplash
[api :as ds]
[bq :as bq]
[pubsub :as ps]]
[clojure.string :as string])
(:import
(com.google.cloud.dataflow.sdk.options DataflowPipelineOptions)))
;; The name of the Google Cloud project we'll start this pipeline in
(def project-id "[CLOUD PROJECT ID HERE]")
;; First we need to define some options for the pipeline
;; Note that we don't create a custom StreamingWordExtractOptions interface
;; as they do in the Java example, although Datasplash supports that as well
(def pipeline-options
{:runner "DataflowPipelineRunner"
:job-name "streaming-word-extract"
:project project-id
:streaming true
:stagingLocation (str "gs://[STAGING LOCATION HERE]/jars")})
;; --------------------------------------------------------------------------------
;; Pure functions being used in the transformations further DataflowPipelineOptions
;; They can easily be unit tested
;; --------------------------------------------------------------------------------
; Tokenizes lines of text into individual words and removes the empty ones
(defn extract-words
[line]
(->> (string/split line #"\s+")
(filter not-empty)))
;; This is where apply the different transformation steps to the pipeline
;; The pipeline isn't run at this point, just configured through a series of
;; composed function calls
;; The actual functionality of each step is isolated to separate, pure functions
;; As you can see, each step has a beend given a name to make it easier to find in the
;; Dataflow UI afterwards
(defn apply-transforms-to-pipeline
[pipeline topic-name bigquery-table]
(->> pipeline
;; we'll read all the lines from the King Lear PubSub topic
(ps/read-from-pubsub topic-name {:name "read-from-pubsub"})
;; here we apply mapcat to flatten the vector returned by extract-words
(ds/mapcat extract-words {:name "extract-words"})
;; here we can use Clojure's upper-case directly
(ds/map string/upper-case {:name "uppercase"})
;; create the BigQuery row representation of the uppercased word
;; here we use an inline anonymous function
(ds/map (fn [word] (assoc {} :uppercase_word word)) {:name "create-row"})
;; writing to BigQuery using the schema defined inline in the options
(bq/write-bq-table
bigquery-table
{:schema [{:name "uppercase_word" :type "STRING" :mode "REQUIRED"}]
:name "write-to-bigquery"
:create-disposition :if-needed
:write-disposition :append})))
(defn run-example
;; We pass in the topic name and the input file as simple map
[{:keys [topic-name input-file bigquery-table]}]
;; first we initalize the pipeline using the options
(let [pipeline (ds/make-pipeline
DataflowPipelineOptions
[]
pipeline-options)
full-topic-path (pubsub-utils/full-topic-path project-id topic-name)]
;; set up the topic and subscritp in PubSub if it doesn't exists
(pubsub-utils/setup-topic project-id topic-name)
;; apply all the transformations to the pipeline and then start it
(apply-transforms-to-pipeline pipeline full-topic-path bigquery-table)
(ds/run-pipeline pipeline)
;; populate the topic with all the words from the kinglear.txt file
;; stored in Google Cloud Storage
(pubsub-utils/populate-topic project-id topic-name input-file)))
(comment
(run-example {:topic-name "streaming-word-extract"
:input-file "gs://dataflow-samples/shakespeare/kinglear.txt"
:bigquery-table (str project-id ":streaming_word_extract.words")}))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment