-
-
Save angryTit/565ba608b718005bfed0ac91d07407c2 to your computer and use it in GitHub Desktop.
KinesisConsumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class KinesisConsumer { | |
private final Driver driver; | |
public KinesisConsumer() { | |
//in case of casual cluster | |
List<URI> uris = ... | |
driver = GraphDatabase.routingDriver(uris, AuthTokens.basic(userName, password), | |
Config.build().withMaxTransactionRetryTime(10, TimeUnit.SECONDS).toConfig()); | |
//in case of single instance | |
driver = GraphDatabase.driver("bolt://122.122.122.122:7687", AuthTokens.basic(userName, password), | |
Config.build().withMaxTransactionRetryTime(10, TimeUnit.SECONDS).toConfig()); | |
} | |
public void handler(KinesisEvent event, Context context) { | |
try (Session session = driver.session()) { | |
long start = System.currentTimeMillis(); | |
for (KinesisEvent.KinesisEventRecord each : event.getRecords()) { | |
session.writeTransaction(new TransactionWork<Integer>() { | |
@Override | |
public Integer execute(Transaction tx) { | |
tx.run(MAKE_SOMETHING_BASED_ON_EVENT, parameters); | |
return 1; | |
} | |
}); | |
}//for loop | |
log.info("execution time {}", System.currentTimeMillis() - start); | |
}//try session block | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment