Created
August 4, 2015 05:54
-
-
Save ergo70/4f804034b2dd099c3bff to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.theplateisbad.consumer; | |
import java.sql.Connection; | |
import java.sql.DriverManager; | |
import java.sql.PreparedStatement; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import java.sql.Statement; | |
import java.util.List; | |
import java.util.Timer; | |
import java.util.TimerTask; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import com.google.common.collect.Lists; | |
import com.twitter.hbc.ClientBuilder; | |
import com.twitter.hbc.core.Client; | |
import com.twitter.hbc.core.Constants; | |
import com.twitter.hbc.core.Hosts; | |
import com.twitter.hbc.core.HttpHosts; | |
import com.twitter.hbc.core.endpoint.Location; | |
import com.twitter.hbc.core.endpoint.Location.Coordinate; | |
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; | |
import com.twitter.hbc.core.event.Event; | |
import com.twitter.hbc.core.processor.StringDelimitedProcessor; | |
import com.twitter.hbc.httpclient.auth.Authentication; | |
import com.twitter.hbc.httpclient.auth.OAuth1; | |
public class StreamConsumer { | |
static { | |
try { | |
Class.forName("org.postgresql.Driver"); | |
} catch (ClassNotFoundException e) { | |
} | |
} | |
public static void main(String[] args) throws InterruptedException, SQLException { | |
Timer timer = new Timer(); | |
final Connection con1 = DriverManager.getConnection("jdbc:postgresql://localhost/postgres?user=<user>&password=<password>"); | |
final Connection con2 = DriverManager.getConnection("jdbc:postgresql://localhost/postgres?user=<user>&password=<password>"); | |
final PreparedStatement insert = con1.prepareStatement("INSERT INTO tstest.tstream (tevent) VALUES (?::jsonb)", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE); | |
final Statement delete = con2.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE); | |
TimerTask task = new TimerTask() { | |
@Override | |
public void run() { | |
try { | |
delete.executeUpdate("DELETE FROM tstest.tstream WHERE rec < clock_timestamp() - interval '1 HOUR'"); | |
//delete.execute("VACUUM ANALYZE tstest.tstream"); | |
System.out.println("60 Minute cutoff"); | |
} catch (SQLException e) { | |
} | |
} | |
}; | |
/** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */ | |
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(10000); | |
BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<Event>(1000); | |
/** Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */ | |
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); | |
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint(); | |
// Optional: set up some followings and track terms | |
//List<Long> followings = Lists.newArrayList(1234L, 566788L); | |
//List<String> terms = Lists.newArrayList("Germany","Deutschland"); | |
List<Location> locations = Lists.newArrayList(new Location(new Coordinate(6.7222,51.2254), new Coordinate(8.5344,51.5064))); | |
//hosebirdEndpoint.followings(followings); | |
//hosebirdEndpoint.trackTerms(terms); | |
hosebirdEndpoint.locations(locations); | |
// These secrets should be read from a config file | |
Authentication hosebirdAuth = new OAuth1("CONSUMER_KEY", | |
"CONSUMER_SECRET", | |
"ACCESS_TOKEN_KEY", | |
"ACCESS_TOKEN_SECRET"); | |
ClientBuilder builder = new ClientBuilder() | |
.name("Hosebird-Client-01") // optional: mainly for the logs | |
.hosts(hosebirdHosts) | |
.authentication(hosebirdAuth) | |
.endpoint(hosebirdEndpoint) | |
.processor(new StringDelimitedProcessor(msgQueue)) | |
.eventMessageQueue(eventQueue); // optional: use this if you want to process client events | |
Client hosebirdClient = builder.build(); | |
// Attempts to establish a connection. | |
hosebirdClient.connect(); | |
timer.scheduleAtFixedRate(task, 0, 300000); | |
while (!hosebirdClient.isDone()) { | |
String msg = msgQueue.take(); | |
insert.setString(1,msg); | |
insert.executeUpdate(); | |
System.out.println(msg); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment