Skip to content

Instantly share code, notes, and snippets.

@arnaudsj
Last active August 29, 2015 14:07
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arnaudsj/f1967a7d66609c094447 to your computer and use it in GitHub Desktop.
Save arnaudsj/f1967a7d66609c094447 to your computer and use it in GitHub Desktop.
Spark streaming tweets using Flambo Clojure library
(ns flambo-streaming-101.core
(:require [flambo.api :as f]
[flambo.streaming :as fs]
[flambo.conf :as fconf]
[flambo.function :refer [pair-function]]
[clj-time.format :as tf]
[clj-time.core :as tc]
[clojure.tools.trace :refer [trace]]
[clojure.java.shell :refer [sh]]
[clojure.pprint :refer [pprint]]
[org.satta.glob :refer [glob]])
(:import [org.apache.spark.streaming.twitter TwitterUtils]
[org.apache.log4j Level Logger])
(:gen-class))
(def master "local[*]")
(def conf {})
(def env {
"spark.executor.memory" "1G",
"spark.files.overwrite" "true"
})
;; We don't need to see everything ;
(.setLevel (Logger/getRootLogger) Level/WARN)
(def ^:dynamic *app-consumer-key* "XXX")
(def ^:dynamic *app-consumer-secret* "XXX")
(def ^:dynamic *user-access-token* "XXX")
(def ^:dynamic *user-access-token-secret* "XXX")
(System/setProperty "twitter4j.oauth.consumerKey", *app-consumer-key*)
(System/setProperty "twitter4j.oauth.consumerSecret", *app-consumer-secret*)
(System/setProperty "twitter4j.oauth.accessToken", *user-access-token*)
(System/setProperty "twitter4j.oauth.accessTokenSecret", *user-access-token-secret*)
(defn new-spark-context []
(let [c (-> (fconf/spark-conf)
(fconf/master master)
(fconf/app-name "core-nlp-flambo-101")
(fconf/set "spark.akka.timeout" "300")
(fconf/set conf)
(fconf/set-executor-env env))]
(fs/streaming-context c 2000) ))
(defonce sc (new-spark-context))
(fs/checkpoint sc "/tmp/")
(defonce tweet-stream (TwitterUtils/createStream sc))
;; ==================
;; Stream processing
;; ==================
(defonce tweet-stream-text (fs/map tweet-stream (f/fn[t] (.getText t))))
(defn -main []
(fs/print tweet-stream-text)
(.start sc)
(.awaitTermination sc))
(defproject flambo-streaming-101 "0.1.0-SNAPSHOT"
:description "Streaming example using Flambo"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.6.0"]
[yieldbot/flambo "0.4.0-SNAPSHOT"]
[org.apache.spark/spark-core_2.10 "1.1.0"]
[org.apache.spark/spark-streaming_2.10 "1.1.0"]
[org.apache.spark/spark-streaming-twitter_2.10 "1.1.0"]
[org.apache.spark/spark-streaming-kafka_2.10 "1.1.0"]
[clj-time "0.8.0"]
[org.clojure/tools.trace "0.7.8"]
[clj-glob "1.0.0"]]
:main flambo-streaming-101.core
:profiles {:dev
{:aot [core-nlp-flambo-101.core]}
:uberjar
{:aot :all}})
#!/bin/bash
JAVA_HOME= spark-submit --deploy-mode cluster --master spark://1.1.1.1:7077 --name flambo-streaming-101 --class flambo-streaming-101.core /path/to/jar/on/cluster/flambo-streaming-101-0.1.0-SNAPSHOT-standalone.jar
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment