Skip to content

Instantly share code, notes, and snippets.

@dehora
Created November 11, 2016 14:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dehora/fd777139c90dbb57233f11d521575564 to your computer and use it in GitHub Desktop.
Save dehora/fd777139c90dbb57233f11d521575564 to your computer and use it in GitHub Desktop.
Example Streams
package spike;
import java.util.concurrent.TimeUnit;
import nakadi.Cursor;
import nakadi.LoggingStreamObserverProvider;
import nakadi.LoggingStreamOffsetObserver;
import nakadi.NakadiClient;
import nakadi.Response;
import nakadi.StreamConfiguration;
import nakadi.StreamProcessor;
class StreamsMain {
public static void main(String[] args) throws Exception {
final NakadiClient client = NakadiClient.newBuilder()
.baseURI("http://localhost:9080")
.build();
healthcheck(client);
streams(client);
}
private static void streams(NakadiClient client) throws Exception {
StreamConfiguration sc = new StreamConfiguration()
.eventTypeName("et-1")
.cursors(new Cursor("0", "741"))
.batchLimit(2)
.batchFlushTimeout(3, TimeUnit.SECONDS)
;
StreamProcessor processor = client.resources().streamBuilder()
.streamConfiguration(sc)
.streamObserverFactory(new LoggingStreamObserverProvider())
.streamOffsetObserver(new LoggingStreamOffsetObserver())
.build();
processor.start();
//Thread.sleep(500000);
//processor.stop();
}
private static void healthcheck(NakadiClient client) {
final Response healthcheck1 = client.resources().health().healthcheckThrowing();
System.out.println(healthcheck1.statusCode() + " " + healthcheck1.reason());
System.out.println(healthcheck1.headers());
System.out.println(healthcheck1.responseBody().asString());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment