Skip to content

Instantly share code, notes, and snippets.

@Mekajiki
Forked from shouichi/.gitignore
Created November 26, 2012 18:03
Show Gist options
  • Save Mekajiki/4149683 to your computer and use it in GitHub Desktop.
Save Mekajiki/4149683 to your computer and use it in GitHub Desktop.
/target
/lib
/classes
/checkouts
pom.xml
*.jar
*.class
.lein-deps-sum
.lein-failures
.lein-plugins
.lein-repl-history

Aibiki

Usage

The command below will run forever and keep printing number of tweets with/without get location data.

% lein run username password

References

License

Copyright © 2012 Aibiki

(ns aibiki.core
(:import [java.util.concurrent ConcurrentLinkedQueue]
[twitter4j TwitterStreamFactory StatusListener]
[twitter4j.conf ConfigurationBuilder]
[backtype.storm LocalCluster])
(:use [backtype.storm clojure config])
(:gen-class))
(defn twitter-stream-listener
[queue]
(proxy [StatusListener] []
(onStatus [status] (.offer queue status))
(onDeletionNotice [notice])
(onTrackLimitationNotice [notice])
(onScrubGeo [user-id up-to-status-id])
(onStallWarning [warning])))
(defn twitter-stream
[username password queue]
(let [config (.. (ConfigurationBuilder.) (setUser username) (setPassword password) (build))
twitter-stream (.getInstance (TwitterStreamFactory. config))]
(.addListener twitter-stream (twitter-stream-listener queue))
(.sample twitter-stream)
twitter-stream))
(defspout twitter
["twitter"]
{:params [username password]}
[conf context collector]
(let [queue (ConcurrentLinkedQueue.)
stream (twitter-stream username password queue)]
(spout
(close []
(.cleanUp stream))
(nextTuple []
(when-let [status (.poll queue)]
(emit-spout! collector [status]))))))
(defbolt lang-filter
["lang-filter"]
{:params [lang] :prepare true}
[conf context collector]
(bolt
(execute [tuple]
(let [status (.getValue tuple 0)]
(if (= lang (.. status (getUser) (getLang)))
(emit-bolt! collector [status] :anchor tuple))))))
(defbolt has-geo-location?
["has-geo-location?"]
[tuple collector]
(let [status (.getValue tuple 0)]
(if (.getGeoLocation status)
(emit-bolt! collector [status] :anchor tuple))))
(defbolt count-get-location
["count-get-location"]
{:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt
(cleanup [] (println counts))
(execute [tuple]
(let [status (.getValue tuple 0)]
(if (.getGeoLocation status)
(swap! counts (partial merge-with +) {:yes 1})
(swap! counts (partial merge-with +) {:no 1}))
(println counts))))))
(defbolt printer
["printer"]
[tuple collector]
(println tuple))
(defn -main
[username password]
(let [topology (topology
{"twitter-spout" (spout-spec (twitter username password) :p 1)}
{"japanese?-bolt" (bolt-spec {"twitter-spout" :shuffle} (lang-filter "ja") :p 1)
"count-get-location-bolt" (bolt-spec {"japanese?-bolt" :shuffle} count-get-location :p 1)})
cluster (LocalCluster.)]
(.submitTopology cluster "twitter" {TOPOLOGY-DEBUG false} topology)))
(ns aibiki.core-test
(:use clojure.test
aibiki.core))
(deftest a-test
(testing "FIXME, I fail."
(is (= 0 1))))
log4j.rootLogger=ERROR, A1
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
log4j.category.org.apache.zookeeper=WARN
(defproject aibiki "0.1.0-SNAPSHOT"
:description "Aibiki Storm: Read twitter/facebook streams."
:url "http://www.aibiki.jp/"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.4.0"]
[storm "0.8.1"]
[org.twitter4j/twitter4j-core "3.0.1"]
[org.twitter4j/twitter4j-stream "3.0.1"]]
:main aibiki.core)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment