Skip to content

Instantly share code, notes, and snippets.

@plinyar
Last active September 5, 2018 16:56
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save plinyar/e854c9a2279911406f7e3f23c84fc476 to your computer and use it in GitHub Desktop.
Save plinyar/e854c9a2279911406f7e3f23c84fc476 to your computer and use it in GitHub Desktop.
Sample AWS Kinesis Firehose lambda transformation in Java (Kotlin indeed)
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.LambdaLogger
class FirehoseTransformer {
private lateinit var logger: LambdaLogger
fun handler(event: KinesisFirehoseEvent, context: Context): KinesisFirehoseResponse {
logger = context.getLogger()
logger.log("Lambda started. Got messages ${event.records.size}.")
val response = KinesisFirehoseResponse()
for (rec in event.records) {
val srcData = rec.decodedData()
logger.log("Got message ${srcData}")
//now deserialize you srcData from underlying format (e.g. JSON) and transform it to trgData
val trgData = srcData
val transformedRecord = KinesisFirehoseResponse.FirehoseRecord()
transformedRecord.recordId = rec.recordId
transformedRecord.result = KinesisFirehoseResponse.TRANSFORMED_STATE_OK
transformedRecord.encodeAndSetData(trgData)
response.records.add(transformedRecord)
}
return response
}
}
import java.nio.charset.StandardCharsets
import java.util.*
/**
* This class represents the input event from Amazon Kinesis Firehose. It used as the input parameter
* for Lambda functions.
**/
class KinesisFirehoseEvent
{
/**
* The Id of the invocation.
**/
var invocationId : String = ""
/**
* The ARN of the delivery stream sending the event.
**/
var deliveryStreamArn = ""
/**
* The AWS region for delivery stream.
**/
var region = ""
/**
* The Kinesis records to transform.
**/
lateinit var records : List<FirehoseRecord>
/**
* The records for the Kinesis Firehose event to process and transform.
**/
public class FirehoseRecord
{
/**
*The record ID is passed from Firehose to Lambda during the invocation. The transformed record must
*contain the same record ID. Any mismatch between the ID of the original record and the ID of the
*transformed record is treated as a data transformation failure.
**/
var recordId = ""
/**
* The approximate time the record was sent to Kinesis Firehose as a Unix epoch.
**/
var approximateArrivalEpoch : Long? = null
// /**
// * The approximate time the record was sent to Kinesis Firehose.
// **/
// [IgnoreDataMember]
// public DateTime ApproximateArrivalTimestamp
// {
// get
// {
// var epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
// return epoch.AddMilliseconds(ApproximateArrivalEpoch);
// }
// }
/**
* The data sent through as a Kinesis Firehose record. The data is sent to the Lambda function base64 encoded.
**/
var data : String? = null
/**
* Base64 decodes the data.
**/
fun decodedData() = String(Base64.getDecoder().decode(data), StandardCharsets.UTF_8);
}
}
import java.nio.charset.StandardCharsets
import java.util.*
class KinesisFirehoseResponse {
companion object {
/**
* The record was transformed successfully.
**/
val TRANSFORMED_STATE_OK = "Ok";
/**
* The record was dropped intentionally by your processing logic.
**/
val TRANSFORMED_STATE_DROPPED = "Dropped";
/**
* The record could not be transformed.
**/
val TRANSFORMED_STATE_PROCESSINGFAILED = "ProcessingFailed";
}
/**
* The transformed records from the KinesisFirehoseEvent.
**/
var records = mutableListOf<FirehoseRecord>()
/**
* The transformed records after processing KinesisFirehoseEvent.Records
**/
public class FirehoseRecord
{
/**
*The record ID is passed from Firehose to Lambda during the invocation. The transformed record must
*contain the same record ID. Any mismatch between the ID of the original record and the ID of the
*transformed record is treated as a data transformation failure.
**/
var recordId = ""
/**
* The status of the data transformation of the record. The possible values are: "Ok"
* (the record was transformed successfully), "Dropped" (the record was dropped intentionally
* by your processing logic), and "ProcessingFailed" (the record could not be transformed).
* If a record has a status of "Ok" or "Dropped", Firehose considers it successfully
* processed. Otherwise, Firehose considers it unsuccessfully processed.
*
* Possible values:
* * Ok - The record was transformed successfully
* * Dropped- The record was dropped intentionally by your processing logic
* * ProcessingFailed - The record could not be transformed
**/
var result = TRANSFORMED_STATE_OK
/**
* The transformed data payload, after base64-encoding.
**/
var data : String? = null
/**
* Base64 encodes the unencodedData and sets the data property.
**/
fun encodeAndSetData(unencodedData: String)
{
data = Base64.getEncoder().encodeToString(unencodedData.toByteArray(StandardCharsets.UTF_8));
}
fun encodeAndSetData(unencodedData : ByteArray)
{
data = Base64.getEncoder().encodeToString(unencodedData);
}
}
}
@akumariiit
Copy link

Also the data field in KinesisFirehoseEvent is ByteBuffer but this post assumes it is encoded string, not sure how to handle this.

@akumariiit
Copy link

got it working... i dont need to decode when using KinesisFirehoseEvent

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment