Skip to content

Instantly share code, notes, and snippets.

@angryTit
Created February 14, 2019 15:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save angryTit/565ba608b718005bfed0ac91d07407c2 to your computer and use it in GitHub Desktop.
Save angryTit/565ba608b718005bfed0ac91d07407c2 to your computer and use it in GitHub Desktop.
KinesisConsumer
public class KinesisConsumer {
private final Driver driver;
public KinesisConsumer() {
//in case of casual cluster
List<URI> uris = ...
driver = GraphDatabase.routingDriver(uris, AuthTokens.basic(userName, password),
Config.build().withMaxTransactionRetryTime(10, TimeUnit.SECONDS).toConfig());
//in case of single instance
driver = GraphDatabase.driver("bolt://122.122.122.122:7687", AuthTokens.basic(userName, password),
Config.build().withMaxTransactionRetryTime(10, TimeUnit.SECONDS).toConfig());
}
public void handler(KinesisEvent event, Context context) {
try (Session session = driver.session()) {
long start = System.currentTimeMillis();
for (KinesisEvent.KinesisEventRecord each : event.getRecords()) {
session.writeTransaction(new TransactionWork<Integer>() {
@Override
public Integer execute(Transaction tx) {
tx.run(MAKE_SOMETHING_BASED_ON_EVENT, parameters);
return 1;
}
});
}//for loop
log.info("execution time {}", System.currentTimeMillis() - start);
}//try session block
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment