The command below will run forever and keep printing number of tweets with/without geo location data.
% lein run username password
The following command will generate docments under docs/
% lein marg
Copyright © 2012 Aibiki
/docs | |
/target | |
/lib | |
/classes | |
/checkouts | |
pom.xml | |
*.jar | |
*.class | |
.lein-deps-sum | |
.lein-failures | |
.lein-plugins | |
.lein-repl-history |
The command below will run forever and keep printing number of tweets with/without geo location data.
% lein run username password
The following command will generate docments under docs/
% lein marg
Copyright © 2012 Aibiki
(ns aibiki.core | |
(:import [backtype.storm LocalCluster]) | |
(:use [backtype.storm clojure config] | |
[aibiki.spouts twitter] | |
[aibiki.bolts geo-location])) | |
(defn -main | |
[username password] | |
(let [topology (topology | |
{"twitter-spout" (spout-spec (twitter username password) :p 1)} | |
{"japanese?-bolt" (bolt-spec {"twitter-spout" :shuffle} (lang-filter "ja") :p 1) | |
"count-geo-location-bolt" (bolt-spec {"japanese?-bolt" :shuffle} count-geo-location :p 1)}) | |
cluster (LocalCluster.)] | |
(.submitTopology cluster "twitter" {TOPOLOGY-DEBUG false} topology))) |
(ns aibiki.core-test | |
(:use [clojure test] | |
[aibiki core])) |
(ns aibiki.bolts.geo-location | |
(:use [backtype.storm clojure config])) | |
(defbolt lang-filter | |
["lang-filter"] | |
{:params [lang] :prepare true} | |
[conf context collector] | |
(bolt | |
(execute [tuple] | |
(let [status (.getValue tuple 0)] | |
(if (= lang (.. status (getUser) (getLang))) | |
(emit-bolt! collector [status] :anchor tuple)))))) | |
(defbolt has-geo-location? | |
["has-geo-location?"] | |
[tuple collector] | |
(let [status (.getValue tuple 0)] | |
(if (.getGeoLocation status) | |
(emit-bolt! collector [status] :anchor tuple)))) | |
(defbolt count-geo-location | |
["count-geo-location"] | |
{:prepare true} | |
[conf context collector] | |
(let [counts (atom {})] | |
(bolt | |
(cleanup [] (println counts)) | |
(execute [tuple] | |
(let [status (.getValue tuple 0)] | |
(if (.getGeoLocation status) | |
(swap! counts (partial merge-with +) {:yes 1}) | |
(swap! counts (partial merge-with +) {:no 1})) | |
(println counts)))))) | |
(defbolt printer | |
["printer"] | |
[tuple collector] | |
(println tuple)) |
(ns aibiki.analizers.ja | |
(:import [org.atilika.kuromoji Tokenizer]) | |
(:use [clojure.string :only [split]])) | |
(def ^{:private true} | |
place-name | |
"地域") | |
(defn tokenize | |
[s] | |
(let [tokenizer (.build (Tokenizer/builder))] | |
(.tokenize tokenizer s))) | |
(defn place-name? | |
[t] | |
(true? (some #(= place-name %) (.split (.getPartOfSpeech t) ",")))) |
(ns aibiki.analizers.ja-test | |
(:use [clojure test] | |
[aibiki.analizers ja])) | |
(deftest tokenize-test | |
(testing "tokenizer" | |
(is (= 0 (count (seq (tokenize ""))))) | |
(is (= 1 (count (seq (tokenize "九段下"))))) | |
(is (= 3 (count (seq (tokenize "九段下生まれです"))))))) | |
(deftest place-name?-test | |
(testing "place-name?" | |
(let [tokens (seq (tokenize "九段下生まれです")) | |
kudanshita (nth tokens 0) | |
umare (nth tokens 1) | |
desu (nth tokens 2)] | |
(is (= true (place-name? kudanshita))) | |
(is (= false (place-name? umare))) | |
(is (= false (place-name? desu)))))) |
log4j.rootLogger=ERROR, A1 | |
log4j.appender.A1=org.apache.log4j.ConsoleAppender | |
log4j.appender.A1.layout=org.apache.log4j.PatternLayout | |
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n | |
log4j.category.org.apache.zookeeper=WARN |
(defproject aibiki "0.1.0-SNAPSHOT" | |
:description "Aibiki Storm: Read twitter/facebook streams." | |
:url "http://www.aibiki.jp/" | |
:license {:name "Eclipse Public License" | |
:url "http://www.eclipse.org/legal/epl-v10.html"} | |
:repositories [["kuromoji" "http://www.atilika.org/nexus/content/repositories/atilika"] | |
["foursquareapijava" "http://foursquare-api-java.googlecode.com/svn/repository"]] | |
:plugins [[lein-marginalia "0.7.1"]] | |
:dependencies [[org.clojure/clojure "1.4.0"] | |
[storm "0.8.1"] | |
[org.twitter4j/twitter4j-core "3.0.1"] | |
[org.twitter4j/twitter4j-stream "3.0.1"] | |
[fi.foyt/foursquare-api "1.0.2"] | |
[org.atilika.kuromoji/kuromoji "0.7.7"]] | |
:omit-source true | |
:main aibiki.core) |
(ns aibiki.spouts.twitter | |
(:import [java.util.concurrent ConcurrentLinkedQueue] | |
[twitter4j TwitterStreamFactory StatusListener] | |
[twitter4j.conf ConfigurationBuilder]) | |
(:use [backtype.storm clojure config])) | |
(defn- twitter-stream-listener | |
[queue] | |
(proxy [StatusListener] [] | |
(onStatus [status] (.offer queue status)) | |
(onDeletionNotice [notice]) | |
(onTrackLimitationNotice [notice]) | |
(onScrubGeo [user-id up-to-status-id]) | |
(onStallWarning [warning]))) | |
(defn- twitter-stream | |
[username password queue] | |
(let [config (.. (ConfigurationBuilder.) (setUser username) (setPassword password) (build)) | |
twitter-stream (.getInstance (TwitterStreamFactory. config))] | |
(.addListener twitter-stream (twitter-stream-listener queue)) | |
(.sample twitter-stream) | |
twitter-stream)) | |
(defspout twitter | |
["twitter"] | |
{:params [username password]} | |
[conf context collector] | |
(let [queue (ConcurrentLinkedQueue.) | |
stream (twitter-stream username password queue)] | |
(spout | |
(close [] | |
(.cleanUp stream)) | |
(nextTuple [] | |
(when-let [status (.poll queue)] | |
(emit-spout! collector [status])))))) |