Skip to content

Instantly share code, notes, and snippets.

@proclamo-zz
Created October 13, 2016 08:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save proclamo-zz/5f7ccf1487cd5560fbc648b6000e7286 to your computer and use it in GitHub Desktop.
Save proclamo-zz/5f7ccf1487cd5560fbc648b6000e7286 to your computer and use it in GitHub Desktop.
Kinesis consumer java

La KCL (Kinesis Client Library) pide implementar una factory de Consumers, así se encarga ella misma de crear tantos workers como hagan falta según la carga. El Consumer tiene que implementar la interfaz IRecordProcessor la cual obliga a implementar tres métodos:

  1. public void initialize(InitializationInput ii) Aquí se puede obtener el id del sharding entre otros datos

  2. public void processRecords(ProcessRecordsInput pri) Aquí se obtine el conjunto de records del microbatch y se procesan. Desde aqui se podría pasar a Spark por ejemplo. IMPORTANTE: después de procesar el batch, hay que decirla a Kinesis que actualice el puntero: checkpoint(pri.getCheckpointer());

  3. public void shutdown(ShutdownInput si) Aquí de momento sólo actualizo el puntero, pero además se podrían cerrar los archivos que estuvieran abiertos, etc

public class Consumer implements IRecordProcessor {
private static final Log log = LogFactory.getLog(Consumer.class);
private static final long BACKOFF_TIME = 1000;
private Logger logger;
private Logger rawLogger;
private Gson gson;
private FileWritterProperties fileWritterProperties;
private String shardId;
public Consumer(FileWritterProperties fileWritterProperties, Gson gson) {
log.info("New Consumer");
this.fileWritterProperties = fileWritterProperties;
this.gson = gson;
}
@Override
public void initialize(InitializationInput ii) {
shardId = ii.getShardId();
String fileName = "track-" + ii.getShardId() + "-" + ii.getExtendedSequenceNumber().getSubSequenceNumber();
logger = FileWritter.getLogger(fileWritterProperties.getPath(), fileName);
log.info("Created file " + fileWritterProperties.getPath() + "/" + fileName);
rawLogger = RawWritter.getLogger(fileWritterProperties.getPath(), fileName);
log.info("Created file " + fileWritterProperties.getPath() + "/raw/" + fileName);
}
@Override
public void processRecords(ProcessRecordsInput pri) {
List<Record> records = pri.getRecords();
log.info("Received " + records.size() + " records");
for (Record record : records) {
String json = new String(record.getData().array(), StandardCharsets.UTF_8);
log.info("Message received: " + json + "\n");
rawLogger.info(json);
try {
Event event = gson.fromJson(json, Event.class);
logger.info(CSVWritter.print(event));
} catch (Exception ex) {
log.error(ex);
}
}
checkpoint(pri.getCheckpointer());
}
@Override
public void shutdown(ShutdownInput si) {
log.info("Shutdown record processor for shard " + shardId);
if (si.getShutdownReason() == ShutdownReason.TERMINATE) {
checkpoint(si.getCheckpointer());
}
}
private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
try {
checkpointer.checkpoint();
} catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException ex) {
log.error(ex);
}
try {
Thread.sleep(BACKOFF_TIME);
} catch (InterruptedException ex) {
log.error(ex);
}
}
}
public class ConsumerFactory implements IRecordProcessorFactory {
private final FileWritterProperties fileWritterProperties;
private final GsonConfigurator gson;
public ConsumerFactory(FileWritterProperties fileWritterProperties, GsonConfigurator gson) {
this.fileWritterProperties = fileWritterProperties;
this.gson = gson;
}
@Override
public IRecordProcessor createProcessor() {
return new Consumer(fileWritterProperties, gson.createGson());
}
}
String workerId = String.valueOf(UUID.randomUUID());
AWSCredentialsProvider credentialsProvider = credentials.profileCredentialsProvider();
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(profile, streamName, credentialsProvider, workerId);
config.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
ConsumerFactory factory = new ConsumerFactory(fileWritterProperties, gson);
Worker worker = new Worker.Builder()
.recordProcessorFactory(factory)
.config(config)
.build();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment