Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package org.theplateisbad.consumer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.Location;
import com.twitter.hbc.core.endpoint.Location.Coordinate;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.event.Event;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
public class StreamConsumer {
static {
try {
Class.forName("org.postgresql.Driver");
} catch (ClassNotFoundException e) {
}
}
public static void main(String[] args) throws InterruptedException, SQLException {
Timer timer = new Timer();
final Connection con1 = DriverManager.getConnection("jdbc:postgresql://localhost/postgres?user=<user>&password=<password>");
final Connection con2 = DriverManager.getConnection("jdbc:postgresql://localhost/postgres?user=<user>&password=<password>");
final PreparedStatement insert = con1.prepareStatement("INSERT INTO tstest.tstream (tevent) VALUES (?::jsonb)", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
final Statement delete = con2.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
TimerTask task = new TimerTask() {
@Override
public void run() {
try {
delete.executeUpdate("DELETE FROM tstest.tstream WHERE rec < clock_timestamp() - interval '1 HOUR'");
//delete.execute("VACUUM ANALYZE tstest.tstream");
System.out.println("60 Minute cutoff");
} catch (SQLException e) {
}
}
};
/** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(10000);
BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<Event>(1000);
/** Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
// Optional: set up some followings and track terms
//List<Long> followings = Lists.newArrayList(1234L, 566788L);
//List<String> terms = Lists.newArrayList("Germany","Deutschland");
List<Location> locations = Lists.newArrayList(new Location(new Coordinate(6.7222,51.2254), new Coordinate(8.5344,51.5064)));
//hosebirdEndpoint.followings(followings);
//hosebirdEndpoint.trackTerms(terms);
hosebirdEndpoint.locations(locations);
// These secrets should be read from a config file
Authentication hosebirdAuth = new OAuth1("CONSUMER_KEY",
"CONSUMER_SECRET",
"ACCESS_TOKEN_KEY",
"ACCESS_TOKEN_SECRET");
ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01") // optional: mainly for the logs
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue))
.eventMessageQueue(eventQueue); // optional: use this if you want to process client events
Client hosebirdClient = builder.build();
// Attempts to establish a connection.
hosebirdClient.connect();
timer.scheduleAtFixedRate(task, 0, 300000);
while (!hosebirdClient.isDone()) {
String msg = msgQueue.take();
insert.setString(1,msg);
insert.executeUpdate();
System.out.println(msg);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.