-
-
Save plinyar/e854c9a2279911406f7e3f23c84fc476 to your computer and use it in GitHub Desktop.
import com.amazonaws.services.lambda.runtime.Context | |
import com.amazonaws.services.lambda.runtime.LambdaLogger | |
class FirehoseTransformer { | |
private lateinit var logger: LambdaLogger | |
fun handler(event: KinesisFirehoseEvent, context: Context): KinesisFirehoseResponse { | |
logger = context.getLogger() | |
logger.log("Lambda started. Got messages ${event.records.size}.") | |
val response = KinesisFirehoseResponse() | |
for (rec in event.records) { | |
val srcData = rec.decodedData() | |
logger.log("Got message ${srcData}") | |
//now deserialize you srcData from underlying format (e.g. JSON) and transform it to trgData | |
val trgData = srcData | |
val transformedRecord = KinesisFirehoseResponse.FirehoseRecord() | |
transformedRecord.recordId = rec.recordId | |
transformedRecord.result = KinesisFirehoseResponse.TRANSFORMED_STATE_OK | |
transformedRecord.encodeAndSetData(trgData) | |
response.records.add(transformedRecord) | |
} | |
return response | |
} | |
} |
import java.nio.charset.StandardCharsets | |
import java.util.* | |
/** | |
* This class represents the input event from Amazon Kinesis Firehose. It used as the input parameter | |
* for Lambda functions. | |
**/ | |
class KinesisFirehoseEvent | |
{ | |
/** | |
* The Id of the invocation. | |
**/ | |
var invocationId : String = "" | |
/** | |
* The ARN of the delivery stream sending the event. | |
**/ | |
var deliveryStreamArn = "" | |
/** | |
* The AWS region for delivery stream. | |
**/ | |
var region = "" | |
/** | |
* The Kinesis records to transform. | |
**/ | |
lateinit var records : List<FirehoseRecord> | |
/** | |
* The records for the Kinesis Firehose event to process and transform. | |
**/ | |
public class FirehoseRecord | |
{ | |
/** | |
*The record ID is passed from Firehose to Lambda during the invocation. The transformed record must | |
*contain the same record ID. Any mismatch between the ID of the original record and the ID of the | |
*transformed record is treated as a data transformation failure. | |
**/ | |
var recordId = "" | |
/** | |
* The approximate time the record was sent to Kinesis Firehose as a Unix epoch. | |
**/ | |
var approximateArrivalEpoch : Long? = null | |
// /** | |
// * The approximate time the record was sent to Kinesis Firehose. | |
// **/ | |
// [IgnoreDataMember] | |
// public DateTime ApproximateArrivalTimestamp | |
// { | |
// get | |
// { | |
// var epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); | |
// return epoch.AddMilliseconds(ApproximateArrivalEpoch); | |
// } | |
// } | |
/** | |
* The data sent through as a Kinesis Firehose record. The data is sent to the Lambda function base64 encoded. | |
**/ | |
var data : String? = null | |
/** | |
* Base64 decodes the data. | |
**/ | |
fun decodedData() = String(Base64.getDecoder().decode(data), StandardCharsets.UTF_8); | |
} | |
} |
import java.nio.charset.StandardCharsets | |
import java.util.* | |
class KinesisFirehoseResponse { | |
companion object { | |
/** | |
* The record was transformed successfully. | |
**/ | |
val TRANSFORMED_STATE_OK = "Ok"; | |
/** | |
* The record was dropped intentionally by your processing logic. | |
**/ | |
val TRANSFORMED_STATE_DROPPED = "Dropped"; | |
/** | |
* The record could not be transformed. | |
**/ | |
val TRANSFORMED_STATE_PROCESSINGFAILED = "ProcessingFailed"; | |
} | |
/** | |
* The transformed records from the KinesisFirehoseEvent. | |
**/ | |
var records = mutableListOf<FirehoseRecord>() | |
/** | |
* The transformed records after processing KinesisFirehoseEvent.Records | |
**/ | |
public class FirehoseRecord | |
{ | |
/** | |
*The record ID is passed from Firehose to Lambda during the invocation. The transformed record must | |
*contain the same record ID. Any mismatch between the ID of the original record and the ID of the | |
*transformed record is treated as a data transformation failure. | |
**/ | |
var recordId = "" | |
/** | |
* The status of the data transformation of the record. The possible values are: "Ok" | |
* (the record was transformed successfully), "Dropped" (the record was dropped intentionally | |
* by your processing logic), and "ProcessingFailed" (the record could not be transformed). | |
* If a record has a status of "Ok" or "Dropped", Firehose considers it successfully | |
* processed. Otherwise, Firehose considers it unsuccessfully processed. | |
* | |
* Possible values: | |
* * Ok - The record was transformed successfully | |
* * Dropped- The record was dropped intentionally by your processing logic | |
* * ProcessingFailed - The record could not be transformed | |
**/ | |
var result = TRANSFORMED_STATE_OK | |
/** | |
* The transformed data payload, after base64-encoding. | |
**/ | |
var data : String? = null | |
/** | |
* Base64 encodes the unencodedData and sets the data property. | |
**/ | |
fun encodeAndSetData(unencodedData: String) | |
{ | |
data = Base64.getEncoder().encodeToString(unencodedData.toByteArray(StandardCharsets.UTF_8)); | |
} | |
fun encodeAndSetData(unencodedData : ByteArray) | |
{ | |
data = Base64.getEncoder().encodeToString(unencodedData); | |
} | |
} | |
} |
Hi @rageshd, if you want something similar in java, just look at the aws basic articles:
https://docs.aws.amazon.com/lambda/latest/dg/java-handler-using-predefined-interfaces.html
Only thing is to use the already given KinesisFirehoseEvent provided by the aws SDK for the request object.
The response is whatever POJO you want to use/create, just follow what is on the article.
In case you still need I can provide something on a gist for you.
@llcfromhell - Just converted this to Java and it works perfectly! Thanks for the jump start!
Hi
I did the modification in the above code for java. But what i observed my java lambda transformation function is getting called 4 times for the same record id. Any idea why ?
Hi I am using KinesisFirehoseEvent class from com.amazonaws.services.lambda.runtime.events
Do i still need to decode and encode back? .... i am getting decode error when i am testing with aws console with sample data
{
"records": [
{
"recordId": "49546986683135544286507457936321625675700192471156785154",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=",
"approximateArrivalTimestamp": 1495072949453
}
],
"region": "us-west-2",
"deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
"invocationId": "invocationIdExample"
}
this is my code
for (final KinesisFirehoseEvent.Record record: kinesisFirehoseEvent.getRecords()) {
final String data1 = new String(record.getData().array(), "UTF-8");
logger.log(data1+"\n");
final byte[] data2 = Base64.getDecoder().decode(data1);
}
Also the data field in KinesisFirehoseEvent is ByteBuffer but this post assumes it is encoded string, not sure how to handle this.
got it working... i dont need to decode when using KinesisFirehoseEvent
Hi ,
Is there a java example ,what would be the corresponding event type in java for KinesisFirehoseEvent .
Regards,
Ragesh