Skip to content

Instantly share code, notes, and snippets.

@plinyar
Last active September 5, 2018 16:56
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save plinyar/e854c9a2279911406f7e3f23c84fc476 to your computer and use it in GitHub Desktop.
Save plinyar/e854c9a2279911406f7e3f23c84fc476 to your computer and use it in GitHub Desktop.
Sample AWS Kinesis Firehose lambda transformation in Java (Kotlin indeed)
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.LambdaLogger
class FirehoseTransformer {
private lateinit var logger: LambdaLogger
fun handler(event: KinesisFirehoseEvent, context: Context): KinesisFirehoseResponse {
logger = context.getLogger()
logger.log("Lambda started. Got messages ${event.records.size}.")
val response = KinesisFirehoseResponse()
for (rec in event.records) {
val srcData = rec.decodedData()
logger.log("Got message ${srcData}")
//now deserialize you srcData from underlying format (e.g. JSON) and transform it to trgData
val trgData = srcData
val transformedRecord = KinesisFirehoseResponse.FirehoseRecord()
transformedRecord.recordId = rec.recordId
transformedRecord.result = KinesisFirehoseResponse.TRANSFORMED_STATE_OK
transformedRecord.encodeAndSetData(trgData)
response.records.add(transformedRecord)
}
return response
}
}
import java.nio.charset.StandardCharsets
import java.util.*
/**
* This class represents the input event from Amazon Kinesis Firehose. It used as the input parameter
* for Lambda functions.
**/
class KinesisFirehoseEvent
{
/**
* The Id of the invocation.
**/
var invocationId : String = ""
/**
* The ARN of the delivery stream sending the event.
**/
var deliveryStreamArn = ""
/**
* The AWS region for delivery stream.
**/
var region = ""
/**
* The Kinesis records to transform.
**/
lateinit var records : List<FirehoseRecord>
/**
* The records for the Kinesis Firehose event to process and transform.
**/
public class FirehoseRecord
{
/**
*The record ID is passed from Firehose to Lambda during the invocation. The transformed record must
*contain the same record ID. Any mismatch between the ID of the original record and the ID of the
*transformed record is treated as a data transformation failure.
**/
var recordId = ""
/**
* The approximate time the record was sent to Kinesis Firehose as a Unix epoch.
**/
var approximateArrivalEpoch : Long? = null
// /**
// * The approximate time the record was sent to Kinesis Firehose.
// **/
// [IgnoreDataMember]
// public DateTime ApproximateArrivalTimestamp
// {
// get
// {
// var epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
// return epoch.AddMilliseconds(ApproximateArrivalEpoch);
// }
// }
/**
* The data sent through as a Kinesis Firehose record. The data is sent to the Lambda function base64 encoded.
**/
var data : String? = null
/**
* Base64 decodes the data.
**/
fun decodedData() = String(Base64.getDecoder().decode(data), StandardCharsets.UTF_8);
}
}
import java.nio.charset.StandardCharsets
import java.util.*
class KinesisFirehoseResponse {
companion object {
/**
* The record was transformed successfully.
**/
val TRANSFORMED_STATE_OK = "Ok";
/**
* The record was dropped intentionally by your processing logic.
**/
val TRANSFORMED_STATE_DROPPED = "Dropped";
/**
* The record could not be transformed.
**/
val TRANSFORMED_STATE_PROCESSINGFAILED = "ProcessingFailed";
}
/**
* The transformed records from the KinesisFirehoseEvent.
**/
var records = mutableListOf<FirehoseRecord>()
/**
* The transformed records after processing KinesisFirehoseEvent.Records
**/
public class FirehoseRecord
{
/**
*The record ID is passed from Firehose to Lambda during the invocation. The transformed record must
*contain the same record ID. Any mismatch between the ID of the original record and the ID of the
*transformed record is treated as a data transformation failure.
**/
var recordId = ""
/**
* The status of the data transformation of the record. The possible values are: "Ok"
* (the record was transformed successfully), "Dropped" (the record was dropped intentionally
* by your processing logic), and "ProcessingFailed" (the record could not be transformed).
* If a record has a status of "Ok" or "Dropped", Firehose considers it successfully
* processed. Otherwise, Firehose considers it unsuccessfully processed.
*
* Possible values:
* * Ok - The record was transformed successfully
* * Dropped- The record was dropped intentionally by your processing logic
* * ProcessingFailed - The record could not be transformed
**/
var result = TRANSFORMED_STATE_OK
/**
* The transformed data payload, after base64-encoding.
**/
var data : String? = null
/**
* Base64 encodes the unencodedData and sets the data property.
**/
fun encodeAndSetData(unencodedData: String)
{
data = Base64.getEncoder().encodeToString(unencodedData.toByteArray(StandardCharsets.UTF_8));
}
fun encodeAndSetData(unencodedData : ByteArray)
{
data = Base64.getEncoder().encodeToString(unencodedData);
}
}
}
@codesolace
Copy link

Hi ,

Is there a java example ,what would be the corresponding event type in java for KinesisFirehoseEvent .

Regards,
Ragesh

@lourenco-lima-cunha
Copy link

Hi @rageshd, if you want something similar in java, just look at the aws basic articles:

https://docs.aws.amazon.com/lambda/latest/dg/java-handler-using-predefined-interfaces.html

Only thing is to use the already given KinesisFirehoseEvent provided by the aws SDK for the request object.

The response is whatever POJO you want to use/create, just follow what is on the article.

In case you still need I can provide something on a gist for you.

@szaluk
Copy link

szaluk commented Apr 6, 2018

@llcfromhell - Just converted this to Java and it works perfectly! Thanks for the jump start!

@rahultokase
Copy link

Hi
I did the modification in the above code for java. But what i observed my java lambda transformation function is getting called 4 times for the same record id. Any idea why ?

@akumariiit
Copy link

Hi I am using KinesisFirehoseEvent class from com.amazonaws.services.lambda.runtime.events
Do i still need to decode and encode back? .... i am getting decode error when i am testing with aws console with sample data

{
"records": [
{
"recordId": "49546986683135544286507457936321625675700192471156785154",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=",
"approximateArrivalTimestamp": 1495072949453
}
],
"region": "us-west-2",
"deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
"invocationId": "invocationIdExample"
}

this is my code

for (final KinesisFirehoseEvent.Record record: kinesisFirehoseEvent.getRecords()) {
final String data1 = new String(record.getData().array(), "UTF-8");

  logger.log(data1+"\n");

  final byte[] data2 = Base64.getDecoder().decode(data1);

}

@akumariiit
Copy link

Also the data field in KinesisFirehoseEvent is ByteBuffer but this post assumes it is encoded string, not sure how to handle this.

@akumariiit
Copy link

got it working... i dont need to decode when using KinesisFirehoseEvent

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment