Last active
September 5, 2018 16:56
-
-
Save plinyar/e854c9a2279911406f7e3f23c84fc476 to your computer and use it in GitHub Desktop.
Sample AWS Kinesis Firehose lambda transformation in Java (Kotlin indeed)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.amazonaws.services.lambda.runtime.Context | |
import com.amazonaws.services.lambda.runtime.LambdaLogger | |
class FirehoseTransformer { | |
private lateinit var logger: LambdaLogger | |
fun handler(event: KinesisFirehoseEvent, context: Context): KinesisFirehoseResponse { | |
logger = context.getLogger() | |
logger.log("Lambda started. Got messages ${event.records.size}.") | |
val response = KinesisFirehoseResponse() | |
for (rec in event.records) { | |
val srcData = rec.decodedData() | |
logger.log("Got message ${srcData}") | |
//now deserialize you srcData from underlying format (e.g. JSON) and transform it to trgData | |
val trgData = srcData | |
val transformedRecord = KinesisFirehoseResponse.FirehoseRecord() | |
transformedRecord.recordId = rec.recordId | |
transformedRecord.result = KinesisFirehoseResponse.TRANSFORMED_STATE_OK | |
transformedRecord.encodeAndSetData(trgData) | |
response.records.add(transformedRecord) | |
} | |
return response | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.charset.StandardCharsets | |
import java.util.* | |
/** | |
* This class represents the input event from Amazon Kinesis Firehose. It used as the input parameter | |
* for Lambda functions. | |
**/ | |
class KinesisFirehoseEvent | |
{ | |
/** | |
* The Id of the invocation. | |
**/ | |
var invocationId : String = "" | |
/** | |
* The ARN of the delivery stream sending the event. | |
**/ | |
var deliveryStreamArn = "" | |
/** | |
* The AWS region for delivery stream. | |
**/ | |
var region = "" | |
/** | |
* The Kinesis records to transform. | |
**/ | |
lateinit var records : List<FirehoseRecord> | |
/** | |
* The records for the Kinesis Firehose event to process and transform. | |
**/ | |
public class FirehoseRecord | |
{ | |
/** | |
*The record ID is passed from Firehose to Lambda during the invocation. The transformed record must | |
*contain the same record ID. Any mismatch between the ID of the original record and the ID of the | |
*transformed record is treated as a data transformation failure. | |
**/ | |
var recordId = "" | |
/** | |
* The approximate time the record was sent to Kinesis Firehose as a Unix epoch. | |
**/ | |
var approximateArrivalEpoch : Long? = null | |
// /** | |
// * The approximate time the record was sent to Kinesis Firehose. | |
// **/ | |
// [IgnoreDataMember] | |
// public DateTime ApproximateArrivalTimestamp | |
// { | |
// get | |
// { | |
// var epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); | |
// return epoch.AddMilliseconds(ApproximateArrivalEpoch); | |
// } | |
// } | |
/** | |
* The data sent through as a Kinesis Firehose record. The data is sent to the Lambda function base64 encoded. | |
**/ | |
var data : String? = null | |
/** | |
* Base64 decodes the data. | |
**/ | |
fun decodedData() = String(Base64.getDecoder().decode(data), StandardCharsets.UTF_8); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.charset.StandardCharsets | |
import java.util.* | |
class KinesisFirehoseResponse { | |
companion object { | |
/** | |
* The record was transformed successfully. | |
**/ | |
val TRANSFORMED_STATE_OK = "Ok"; | |
/** | |
* The record was dropped intentionally by your processing logic. | |
**/ | |
val TRANSFORMED_STATE_DROPPED = "Dropped"; | |
/** | |
* The record could not be transformed. | |
**/ | |
val TRANSFORMED_STATE_PROCESSINGFAILED = "ProcessingFailed"; | |
} | |
/** | |
* The transformed records from the KinesisFirehoseEvent. | |
**/ | |
var records = mutableListOf<FirehoseRecord>() | |
/** | |
* The transformed records after processing KinesisFirehoseEvent.Records | |
**/ | |
public class FirehoseRecord | |
{ | |
/** | |
*The record ID is passed from Firehose to Lambda during the invocation. The transformed record must | |
*contain the same record ID. Any mismatch between the ID of the original record and the ID of the | |
*transformed record is treated as a data transformation failure. | |
**/ | |
var recordId = "" | |
/** | |
* The status of the data transformation of the record. The possible values are: "Ok" | |
* (the record was transformed successfully), "Dropped" (the record was dropped intentionally | |
* by your processing logic), and "ProcessingFailed" (the record could not be transformed). | |
* If a record has a status of "Ok" or "Dropped", Firehose considers it successfully | |
* processed. Otherwise, Firehose considers it unsuccessfully processed. | |
* | |
* Possible values: | |
* * Ok - The record was transformed successfully | |
* * Dropped- The record was dropped intentionally by your processing logic | |
* * ProcessingFailed - The record could not be transformed | |
**/ | |
var result = TRANSFORMED_STATE_OK | |
/** | |
* The transformed data payload, after base64-encoding. | |
**/ | |
var data : String? = null | |
/** | |
* Base64 encodes the unencodedData and sets the data property. | |
**/ | |
fun encodeAndSetData(unencodedData: String) | |
{ | |
data = Base64.getEncoder().encodeToString(unencodedData.toByteArray(StandardCharsets.UTF_8)); | |
} | |
fun encodeAndSetData(unencodedData : ByteArray) | |
{ | |
data = Base64.getEncoder().encodeToString(unencodedData); | |
} | |
} | |
} |
Also the data field in KinesisFirehoseEvent is ByteBuffer but this post assumes it is encoded string, not sure how to handle this.
got it working... i dont need to decode when using KinesisFirehoseEvent
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi I am using KinesisFirehoseEvent class from com.amazonaws.services.lambda.runtime.events
Do i still need to decode and encode back? .... i am getting decode error when i am testing with aws console with sample data
{
"records": [
{
"recordId": "49546986683135544286507457936321625675700192471156785154",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=",
"approximateArrivalTimestamp": 1495072949453
}
],
"region": "us-west-2",
"deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
"invocationId": "invocationIdExample"
}
this is my code
for (final KinesisFirehoseEvent.Record record: kinesisFirehoseEvent.getRecords()) {
final String data1 = new String(record.getData().array(), "UTF-8");
}