Skip to content

Instantly share code, notes, and snippets.

@spullara
Created February 15, 2010 05:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save spullara/304434 to your computer and use it in GitHub Desktop.
Save spullara/304434 to your computer and use it in GitHub Desktop.
package com.sampullara.twitter.track;
import com.espertech.esper.client.*;
import com.sampullara.twitter.JsonNodeMap;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* This simple program calcuates the median statuses and followers for a Twitter firehose feed for the last 60 seconds
* Results 2/14/2010 @ 9:19PM PST for the sample feed:
* 1730 status updates, 120 followers, 1000 tweets per minute (the last is likely a twitter setting)
* <p/>
* User: sam
* Date: Feb 14, 2010
* Time: 8:15:50 PM
*/
public class EsperServer {
public static void main(String[] args) {
// Start up the firehose feed
ExecutorService es = Executors.newCachedThreadPool();
Spritzer spritzer = new Spritzer();
es.submit(spritzer);
// Create a partial type map for Tweets from the firehose
Map<String, Object> user = new HashMap<String, Object>();
user.put("screen_name", String.class);
user.put("lang", String.class);
user.put("verified", Boolean.class);
user.put("followers_count", Long.class);
user.put("friends_count", Long.class);
user.put("name", String.class);
user.put("statuses_count", Long.class);
user.put("id", Long.class);
Map<String, Object> tweet = new HashMap<String, Object>();
tweet.put("text", String.class);
tweet.put("created_at", String.class);
tweet.put("id", Long.class);
tweet.put("user", user);
// Configure and initialize the Esper system
Configuration config = new Configuration();
config.addEventType("tweet", tweet);
EPServiceProvider epsp = EPServiceProviderManager.getDefaultProvider(config);
epsp.initialize();
// Inject events into the Esper runtime
final EPRuntime runtime = epsp.getEPRuntime();
spritzer.addEventListener(new SpritzerListener() {
public void messageReceived(SpritzerEvent se) {
runtime.sendEvent(new JsonNodeMap(se.getNode()), "tweet");
}
public void tooSlow() {
System.out.println("Too slow");
}
});
// Execute a query over the data from the firehose
EPStatement statement = epsp.getEPAdministrator().createEPL("select median(user.statuses_count), median(user.followers_count), count(*) as statuses from tweet.win:time(60 second)");
statement.setSubscriber(new Object() {
public void update(double statuses, double followers, long cnt) {
System.out.println(statuses + ", " + followers + ", " + cnt);
}
});
statement.start();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment