Created
May 25, 2013 07:30
-
-
Save laclefyoshi/5648271 to your computer and use it in GitHub Desktop.
hosebird_kafka java code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hosebird_kafka; | |
import com.twitter.hbc.ClientBuilder; | |
import com.twitter.hbc.core.Constants; | |
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint; | |
import com.twitter.hbc.core.processor.StringDelimitedProcessor; | |
import com.twitter.hbc.httpclient.BasicClient; | |
import com.twitter.hbc.httpclient.auth.Authentication; | |
import com.twitter.hbc.httpclient.auth.OAuth1; | |
import kafka.javaapi.producer.Producer; | |
import kafka.producer.ProducerConfig; | |
import kafka.javaapi.producer.ProducerData; | |
import java.util.Properties; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* simple Hosebird client | |
*/ | |
public class App { | |
public static void main(String[] args) { | |
Properties props = new Properties(); | |
props.put("broker.list", "0:localhost:9092"); | |
props.put("serializer.class", "kafka.serializer.StringEncoder"); | |
Producer<String, String> producer = new Producer(new ProducerConfig(props)); | |
String topic = "twitter-sample"; | |
String consumerKey = args[0]; | |
String consumerSecret = args[1]; | |
String token = args[2]; | |
String secret = args[3]; | |
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000); | |
StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint(); | |
endpoint.stallWarnings(false); | |
Authentication auth = new OAuth1(consumerKey, consumerSecret, token, secret); | |
BasicClient client = new ClientBuilder() | |
.name("sampleExampleClient") | |
.hosts(Constants.STREAM_HOST) | |
.endpoint(endpoint) | |
.authentication(auth) | |
.processor(new StringDelimitedProcessor(queue)) | |
.build(); | |
client.connect(); | |
// Do whatever needs to be done with messages | |
for (int msgRead = 0; msgRead < 1000; msgRead++) { | |
if (client.isDone()) { | |
System.out.println("Client connection closed unexpectedly: " + | |
client.getExitEvent().getMessage()); | |
break; | |
} | |
try { | |
String msg = queue.poll(5, TimeUnit.SECONDS); | |
if (msg == null) { | |
System.out.println("Did not receive a message in 5 seconds"); | |
} else { | |
producer.send(new ProducerData<String, String>(topic, msg)); | |
} | |
} catch (InterruptedException ie) { | |
ie.printStackTrace(); | |
} | |
} | |
client.stop(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment