Skip to content

Instantly share code, notes, and snippets.

@shouichi
Created November 26, 2012 17:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save shouichi/4149428 to your computer and use it in GitHub Desktop.
Save shouichi/4149428 to your computer and use it in GitHub Desktop.
/docs
/target
/lib
/classes
/checkouts
pom.xml
*.jar
*.class
.lein-deps-sum
.lein-failures
.lein-plugins
.lein-repl-history

Aibiki

Usage

The command below will run forever and keep printing number of tweets with/without geo location data.

% lein run username password

The following command will generate docments under docs/

% lein marg

References

License

Copyright © 2012 Aibiki

(ns aibiki.core
(:import [backtype.storm LocalCluster])
(:use [backtype.storm clojure config]
[aibiki.spouts twitter]
[aibiki.bolts geo-location]))
(defn -main
[username password]
(let [topology (topology
{"twitter-spout" (spout-spec (twitter username password) :p 1)}
{"japanese?-bolt" (bolt-spec {"twitter-spout" :shuffle} (lang-filter "ja") :p 1)
"count-geo-location-bolt" (bolt-spec {"japanese?-bolt" :shuffle} count-geo-location :p 1)})
cluster (LocalCluster.)]
(.submitTopology cluster "twitter" {TOPOLOGY-DEBUG false} topology)))
(ns aibiki.core-test
(:use [clojure test]
[aibiki core]))
(ns aibiki.bolts.geo-location
(:use [backtype.storm clojure config]))
(defbolt lang-filter
["lang-filter"]
{:params [lang] :prepare true}
[conf context collector]
(bolt
(execute [tuple]
(let [status (.getValue tuple 0)]
(if (= lang (.. status (getUser) (getLang)))
(emit-bolt! collector [status] :anchor tuple))))))
(defbolt has-geo-location?
["has-geo-location?"]
[tuple collector]
(let [status (.getValue tuple 0)]
(if (.getGeoLocation status)
(emit-bolt! collector [status] :anchor tuple))))
(defbolt count-geo-location
["count-geo-location"]
{:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt
(cleanup [] (println counts))
(execute [tuple]
(let [status (.getValue tuple 0)]
(if (.getGeoLocation status)
(swap! counts (partial merge-with +) {:yes 1})
(swap! counts (partial merge-with +) {:no 1}))
(println counts))))))
(defbolt printer
["printer"]
[tuple collector]
(println tuple))
(ns aibiki.analizers.ja
(:import [org.atilika.kuromoji Tokenizer])
(:use [clojure.string :only [split]]))
(def ^{:private true}
place-name
"地域")
(defn tokenize
[s]
(let [tokenizer (.build (Tokenizer/builder))]
(.tokenize tokenizer s)))
(defn place-name?
[t]
(true? (some #(= place-name %) (.split (.getPartOfSpeech t) ","))))
(ns aibiki.analizers.ja-test
(:use [clojure test]
[aibiki.analizers ja]))
(deftest tokenize-test
(testing "tokenizer"
(is (= 0 (count (seq (tokenize "")))))
(is (= 1 (count (seq (tokenize "九段下")))))
(is (= 3 (count (seq (tokenize "九段下生まれです")))))))
(deftest place-name?-test
(testing "place-name?"
(let [tokens (seq (tokenize "九段下生まれです"))
kudanshita (nth tokens 0)
umare (nth tokens 1)
desu (nth tokens 2)]
(is (= true (place-name? kudanshita)))
(is (= false (place-name? umare)))
(is (= false (place-name? desu))))))
log4j.rootLogger=ERROR, A1
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
log4j.category.org.apache.zookeeper=WARN
(defproject aibiki "0.1.0-SNAPSHOT"
:description "Aibiki Storm: Read twitter/facebook streams."
:url "http://www.aibiki.jp/"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:repositories [["kuromoji" "http://www.atilika.org/nexus/content/repositories/atilika"]
["foursquareapijava" "http://foursquare-api-java.googlecode.com/svn/repository"]]
:plugins [[lein-marginalia "0.7.1"]]
:dependencies [[org.clojure/clojure "1.4.0"]
[storm "0.8.1"]
[org.twitter4j/twitter4j-core "3.0.1"]
[org.twitter4j/twitter4j-stream "3.0.1"]
[fi.foyt/foursquare-api "1.0.2"]
[org.atilika.kuromoji/kuromoji "0.7.7"]]
:omit-source true
:main aibiki.core)
(ns aibiki.spouts.twitter
(:import [java.util.concurrent ConcurrentLinkedQueue]
[twitter4j TwitterStreamFactory StatusListener]
[twitter4j.conf ConfigurationBuilder])
(:use [backtype.storm clojure config]))
(defn- twitter-stream-listener
[queue]
(proxy [StatusListener] []
(onStatus [status] (.offer queue status))
(onDeletionNotice [notice])
(onTrackLimitationNotice [notice])
(onScrubGeo [user-id up-to-status-id])
(onStallWarning [warning])))
(defn- twitter-stream
[username password queue]
(let [config (.. (ConfigurationBuilder.) (setUser username) (setPassword password) (build))
twitter-stream (.getInstance (TwitterStreamFactory. config))]
(.addListener twitter-stream (twitter-stream-listener queue))
(.sample twitter-stream)
twitter-stream))
(defspout twitter
["twitter"]
{:params [username password]}
[conf context collector]
(let [queue (ConcurrentLinkedQueue.)
stream (twitter-stream username password queue)]
(spout
(close []
(.cleanUp stream))
(nextTuple []
(when-let [status (.poll queue)]
(emit-spout! collector [status]))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment