Skip to content

Instantly share code, notes, and snippets.

@Bryji
Last active November 25, 2021 18:32
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Bryji/2bb77d85421d2fb1ea3ccc3c5940ea2a to your computer and use it in GitHub Desktop.
Save Bryji/2bb77d85421d2fb1ea3ccc3c5940ea2a to your computer and use it in GitHub Desktop.
Kinesis Firehose Transformation Lambda objects in Java (do not seem to be provided for in an AWS SDK for now!)
mport java.util.List;
public class KinesisFirehoseEvent {
String invocationId;
String deliveryStreamArn;
String region;
List<KinesisFirehoseInputRecord> records;
public String getInvocationId() {
return invocationId;
}
public void setInvocationId(String invocationId) {
this.invocationId = invocationId;
}
public String getDeliveryStreamArn() {
return deliveryStreamArn;
}
public void setDeliveryStreamArn(String deliveryStreamArn) {
this.deliveryStreamArn = deliveryStreamArn;
}
public String getRegion() {
return region;
}
public void setRegion(String region) {
this.region = region;
}
public List<KinesisFirehoseInputRecord> getRecords() {
return records;
}
public void setRecords(List<KinesisFirehoseInputRecord> records) {
this.records = records;
}
@Override
public String toString() {
return "KinesisFirehoseEvent [invocationId=" + invocationId + ", deliveryStreamArn=" + deliveryStreamArn
+ ", region=" + region + ", records=" + records + "]";
}
}
import java.util.Base64;
public class KinesisFirehoseInputRecord {
String recordId;
Long approximateArrivalTimestamp;
String data;
public String getRecordId() {
return recordId;
}
public void setRecordId(String recordId) {
this.recordId = recordId;
}
public Long getApproximateArrivalTimestamp() {
return approximateArrivalTimestamp;
}
public void setApproximateArrivalTimestamp(Long approximateArrivalTimestamp) {
this.approximateArrivalTimestamp = approximateArrivalTimestamp;
}
public String getData() {
return data;
}
public String getDataDecoded() {
return new String(Base64.getDecoder().decode(data));
}
public void setData(String data) {
this.data = data;
}
@Override
public String toString() {
return "KinesisFirehoseRecord [recordId=" + recordId + ", approximateArrivalTimestamp="
+ approximateArrivalTimestamp + ", data=" + getDataDecoded() + "]";
}
}
public class KinesisFirehoseOutputRecord {
String recordId;
String data;
String result;
public String getRecordId() {
return recordId;
}
public void setRecordId(String recordId) {
this.recordId = recordId;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
@Override
public String toString() {
return "KinesisFirehoseOuputRecord [recordId=" + recordId + ", data=" + data + ", result=" + result + "]";
}
}
import java.util.List;
public class KinesisFirehoseResponse {
List<KinesisFirehoseOutputRecord> records;
public KinesisFirehoseResponse(List<KinesisFirehoseOutputRecord> results) {
this.records = results;
}
public List<KinesisFirehoseOutputRecord> getRecords() {
return records;
}
public void setRecords(List<KinesisFirehoseOutputRecord> records) {
this.records = records;
}
}
import java.util.List;
import java.util.stream.Collectors;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
public class KinesisFirehoseTransformer implements RequestHandler<KinesisFirehoseEvent, KinesisFirehoseResponse> {
@Override
public KinesisFirehoseResponse handleRequest(KinesisFirehoseEvent event, Context context) {
context.getLogger().log("Input: " + event);
List<KinesisFirehoseOutputRecord> results = event.getRecords().stream().map(record -> {
KinesisFirehoseOutputRecord outRec = new KinesisFirehoseOutputRecord();
outRec.setRecordId(record.getRecordId());
outRec.setData(record.getData()); // do more interesting transformation stuff here
outRec.setResult("Ok");
return outRec;
}).collect(Collectors.toList());
return new KinesisFirehoseResponse(results);
}
}
@wasserholz
Copy link

@nburana
Copy link

nburana commented Mar 29, 2021

Super helpful, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment