Skip to content

Instantly share code, notes, and snippets.

@laclefyoshi
Created May 25, 2013 07:30
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save laclefyoshi/5648271 to your computer and use it in GitHub Desktop.
Save laclefyoshi/5648271 to your computer and use it in GitHub Desktop.
hosebird_kafka java code
package hosebird_kafka;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.ProducerData;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* simple Hosebird client
*/
public class App {
public static void main(String[] args) {
Properties props = new Properties();
props.put("broker.list", "0:localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
Producer<String, String> producer = new Producer(new ProducerConfig(props));
String topic = "twitter-sample";
String consumerKey = args[0];
String consumerSecret = args[1];
String token = args[2];
String secret = args[3];
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);
StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
endpoint.stallWarnings(false);
Authentication auth = new OAuth1(consumerKey, consumerSecret, token, secret);
BasicClient client = new ClientBuilder()
.name("sampleExampleClient")
.hosts(Constants.STREAM_HOST)
.endpoint(endpoint)
.authentication(auth)
.processor(new StringDelimitedProcessor(queue))
.build();
client.connect();
// Do whatever needs to be done with messages
for (int msgRead = 0; msgRead < 1000; msgRead++) {
if (client.isDone()) {
System.out.println("Client connection closed unexpectedly: " +
client.getExitEvent().getMessage());
break;
}
try {
String msg = queue.poll(5, TimeUnit.SECONDS);
if (msg == null) {
System.out.println("Did not receive a message in 5 seconds");
} else {
producer.send(new ProducerData<String, String>(topic, msg));
}
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
client.stop();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment