Last active
November 25, 2021 18:32
-
-
Save Bryji/2bb77d85421d2fb1ea3ccc3c5940ea2a to your computer and use it in GitHub Desktop.
Kinesis Firehose Transformation Lambda objects in Java (do not seem to be provided for in an AWS SDK for now!)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mport java.util.List; | |
public class KinesisFirehoseEvent { | |
String invocationId; | |
String deliveryStreamArn; | |
String region; | |
List<KinesisFirehoseInputRecord> records; | |
public String getInvocationId() { | |
return invocationId; | |
} | |
public void setInvocationId(String invocationId) { | |
this.invocationId = invocationId; | |
} | |
public String getDeliveryStreamArn() { | |
return deliveryStreamArn; | |
} | |
public void setDeliveryStreamArn(String deliveryStreamArn) { | |
this.deliveryStreamArn = deliveryStreamArn; | |
} | |
public String getRegion() { | |
return region; | |
} | |
public void setRegion(String region) { | |
this.region = region; | |
} | |
public List<KinesisFirehoseInputRecord> getRecords() { | |
return records; | |
} | |
public void setRecords(List<KinesisFirehoseInputRecord> records) { | |
this.records = records; | |
} | |
@Override | |
public String toString() { | |
return "KinesisFirehoseEvent [invocationId=" + invocationId + ", deliveryStreamArn=" + deliveryStreamArn | |
+ ", region=" + region + ", records=" + records + "]"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Base64; | |
public class KinesisFirehoseInputRecord { | |
String recordId; | |
Long approximateArrivalTimestamp; | |
String data; | |
public String getRecordId() { | |
return recordId; | |
} | |
public void setRecordId(String recordId) { | |
this.recordId = recordId; | |
} | |
public Long getApproximateArrivalTimestamp() { | |
return approximateArrivalTimestamp; | |
} | |
public void setApproximateArrivalTimestamp(Long approximateArrivalTimestamp) { | |
this.approximateArrivalTimestamp = approximateArrivalTimestamp; | |
} | |
public String getData() { | |
return data; | |
} | |
public String getDataDecoded() { | |
return new String(Base64.getDecoder().decode(data)); | |
} | |
public void setData(String data) { | |
this.data = data; | |
} | |
@Override | |
public String toString() { | |
return "KinesisFirehoseRecord [recordId=" + recordId + ", approximateArrivalTimestamp=" | |
+ approximateArrivalTimestamp + ", data=" + getDataDecoded() + "]"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class KinesisFirehoseOutputRecord { | |
String recordId; | |
String data; | |
String result; | |
public String getRecordId() { | |
return recordId; | |
} | |
public void setRecordId(String recordId) { | |
this.recordId = recordId; | |
} | |
public String getData() { | |
return data; | |
} | |
public void setData(String data) { | |
this.data = data; | |
} | |
public String getResult() { | |
return result; | |
} | |
public void setResult(String result) { | |
this.result = result; | |
} | |
@Override | |
public String toString() { | |
return "KinesisFirehoseOuputRecord [recordId=" + recordId + ", data=" + data + ", result=" + result + "]"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.List; | |
public class KinesisFirehoseResponse { | |
List<KinesisFirehoseOutputRecord> records; | |
public KinesisFirehoseResponse(List<KinesisFirehoseOutputRecord> results) { | |
this.records = results; | |
} | |
public List<KinesisFirehoseOutputRecord> getRecords() { | |
return records; | |
} | |
public void setRecords(List<KinesisFirehoseOutputRecord> records) { | |
this.records = records; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.List; | |
import java.util.stream.Collectors; | |
import com.amazonaws.services.lambda.runtime.Context; | |
import com.amazonaws.services.lambda.runtime.RequestHandler; | |
public class KinesisFirehoseTransformer implements RequestHandler<KinesisFirehoseEvent, KinesisFirehoseResponse> { | |
@Override | |
public KinesisFirehoseResponse handleRequest(KinesisFirehoseEvent event, Context context) { | |
context.getLogger().log("Input: " + event); | |
List<KinesisFirehoseOutputRecord> results = event.getRecords().stream().map(record -> { | |
KinesisFirehoseOutputRecord outRec = new KinesisFirehoseOutputRecord(); | |
outRec.setRecordId(record.getRecordId()); | |
outRec.setData(record.getData()); // do more interesting transformation stuff here | |
outRec.setResult("Ok"); | |
return outRec; | |
}).collect(Collectors.toList()); | |
return new KinesisFirehoseResponse(results); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
the events are defined in following repository:
https://github.com/aws/aws-lambda-java-libs/tree/master/aws-lambda-java-events