Skip to content

Instantly share code, notes, and snippets.

@enryold
Last active September 8, 2021 06:48
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save enryold/4be63342c8dd3d2cdd772178dd5b2b45 to your computer and use it in GitHub Desktop.
Save enryold/4be63342c8dd3d2cdd772178dd5b2b45 to your computer and use it in GitHub Desktop.
Java classes for Kinesis Firehose record transformation lambda
import com.amazonaws.protocol.json.JsonClientMetadata;
import com.amazonaws.protocol.json.SdkJsonProtocolFactory;
import com.amazonaws.protocol.json.StructuredJsonGenerator;
import com.amazonaws.services.kinesisfirehose.model.Record;
import com.amazonaws.services.kinesisfirehose.model.transform.RecordJsonMarshaller;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.nio.ByteBuffer;
import java.util.function.Function;
/**
* Helper function that encode datas as AWSKinesisFirehoseClient does.
*/
public class FnEncodeBytesAsFirehose implements Function<ByteBuffer, String> {
@Override
public String apply(ByteBuffer byteBuffer) {
Record record = new Record();
record.setData(byteBuffer);
final StructuredJsonGenerator jsonGenerator = new SdkJsonProtocolFactory(new JsonClientMetadata()
.withProtocolVersion("1.1")
.withSupportsCbor(false))
.createGenerator();
jsonGenerator.writeStartObject();
jsonGenerator.writeFieldName("Record");
RecordJsonMarshaller.getInstance().marshall(
record, jsonGenerator);
jsonGenerator.writeEndObject();
JsonParser parser = new JsonParser();
JsonObject o = parser.parse(new String(jsonGenerator.getBytes())).getAsJsonObject();
return o.get("Record").getAsJsonObject().get("Data").getAsString();
}
}
import java.util.List;
public class KinesisFirehoseEvent {
/**
* The Id of the invocation.
**/
String invocationId = "";
/**
* The ARN of the delivery stream sending the event.
**/
String deliveryStreamArn = "";
/**
* The AWS region for delivery stream.
**/
String region = "";
/**
* The Kinesis records to transform.
**/
List<KinesisFirehoseEventRecord> records;
public String getInvocationId() {
return invocationId;
}
public void setInvocationId(String invocationId) {
this.invocationId = invocationId;
}
public String getDeliveryStreamArn() {
return deliveryStreamArn;
}
public void setDeliveryStreamArn(String deliveryStreamArn) {
this.deliveryStreamArn = deliveryStreamArn;
}
public String getRegion() {
return region;
}
public void setRegion(String region) {
this.region = region;
}
public List<KinesisFirehoseEventRecord> getRecords() {
return records;
}
public void setRecords(List<KinesisFirehoseEventRecord> records) {
this.records = records;
}
}
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
public class KinesisFirehoseEventRecord {
public KinesisFirehoseEventRecord() {}
/**
*The record ID is passed from Firehose to Lambda during the invocation. The transformed record must
*contain the same record ID. Any mismatch between the ID of the original record and the ID of the
*transformed record is treated as a data transformation failure.
**/
String recordId = "";
/**
* The approximate time the record was sent to Kinesis Firehose as a Unix epoch.
**/
Long approximateArrivalEpoch;
/**
* The data sent through as a Kinesis Firehose record. The data is sent to the Lambda function base64 encoded.
**/
String data;
public String getRecordId() {
return recordId;
}
public void setRecordId(String recordId) {
this.recordId = recordId;
}
public Long getApproximateArrivalEpoch() {
return approximateArrivalEpoch;
}
public void setApproximateArrivalEpoch(Long approximateArrivalEpoch) {
this.approximateArrivalEpoch = approximateArrivalEpoch;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
public void setEncodedData(ByteBuffer byteBuffer)
{
data = new FnEncodeBytesAsFirehose().apply(byteBuffer);
}
/**
* Base64 decodes the data.
**/
public String decodedData()
{
return new String(Base64.getDecoder().decode(data), StandardCharsets.UTF_8);
}
}
import java.util.List;
public class KinesisFirehoseResponse {
public static final String TRANSFORMED_STATE_OK = "Ok";
public static final String TRANSFORMED_STATE_DROPPED = "Dropped";
public static final String TRANSFORMED_STATE_PROCESSINGFAILED = "ProcessingFailed";
public List<KinesisFirehoseResponseRecord> records;
}
import java.io.UnsupportedEncodingException;
import java.util.Base64;
public class KinesisFirehoseResponseRecord {
public KinesisFirehoseResponseRecord() {}
/**
*The record ID is passed from Firehose to Lambda during the invocation. The transformed record must
*contain the same record ID. Any mismatch between the ID of the original record and the ID of the
*transformed record is treated as a data transformation failure.
**/
String recordId = "";
/**
* The status of the data transformation of the record. The possible values are: "Ok"
* (the record was transformed successfully), "Dropped" (the record was dropped intentionally
* by your processing logic), and "ProcessingFailed" (the record could not be transformed).
* If a record has a status of "Ok" or "Dropped", Firehose considers it successfully
* processed. Otherwise, Firehose considers it unsuccessfully processed.
*
* Possible values:
* * Ok - The record was transformed successfully
* * Dropped- The record was dropped intentionally by your processing logic
* * ProcessingFailed - The record could not be transformed
**/
String result = KinesisFirehoseResponse.TRANSFORMED_STATE_OK;
/**
* The transformed data payload, after base64-encoding.
**/
String data;
public String getRecordId() {
return recordId;
}
public void setRecordId(String recordId) {
this.recordId = recordId;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
/**
* Base64 encodes the unencodedData and sets the data property.
**/
public void encodeAndSetData(String unencodedData)
{
try {
data = Base64.getEncoder().encodeToString(unencodedData.getBytes("utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
public void encodeAndSetData(byte[] unencodedData)
{
data = Base64.getEncoder().encodeToString(unencodedData);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment