Skip to content

Instantly share code, notes, and snippets.

View Samrose-Ahmed's full-sized avatar

Samrose Samrose-Ahmed

View GitHub Profile
/*-
* #%L
* athena-example
* %%
* Copyright (C) 2019 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
@Samrose-Ahmed
Samrose-Ahmed / MetadataHandler.kt
Created December 5, 2023 01:31
athmetahandler1
/*-
* #%L
* athena-example
* %%
* Copyright (C) 2019 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.from_json
fun checkDuplicate(sequencer: String): Boolean {
// TTL to expire old DynamoDB items
val expireTime = ((System.currentTimeMillis() / 1000L) + DDB_ITEM_EXPIRE_SECONDS).toString()
val attrs = mapOf(
"sequencer" to AttributeValue(sequencer),
"ttl" to AttributeValue().apply { this.setN(expireTime) }
)
val req = PutItemRequest(DUPLICATES_DDB_TABLE_NAME, attrs)
.apply { this.conditionExpression = "attribute_not_exists(sequencer)" }
fun readParquetMetrics(s3Path: String, table: Table): Metrics {
return ParquetUtil.fileMetrics(fileIO.newInputFile(s3Path), MetricsConfig.forTable(table))
}
fun processRecord(sqsMessage: SQSMessage): Unit {
val record = S3EventNotification.parseJson(sqsMessage.body).records[0]
val s3Bucket = record.s3.bucket.name
val s3Object = record.s3.`object`
val s3ObjectKey = s3Object.key
val s3ObjectSize = s3Object.sizeAsLong
val s3Path = "s3://$s3Bucket/$s3ObjectKey"
if (checkDuplicate(s3Object.sequencer)) {
println("Found duplicate SQS message for key: ${s3ObjectKey}. Skipping...")
class IcebergMetadataWriter {
// In a real world usecase, you can create this dynamically from the data.
val icebergTable: Table = icebergCatalog.loadTable(TableIdentifier.of(Namespace.of(NAMESPACE), TABLE_NAME))
val appendFiles: AppendFiles = icebergTable.newAppend()
// Lambda handler
fun handle(sqsEvent: SQSEvent) {
for (record in sqsEvent.records) {
processRecord(record, tableObjs)
}
{
"expiration": "2022-02-14T13:08:46.864Z",
"conditions": [
{ "acl": "bucket-owner-full-control" },
{ "bucket": "my-bucket" },
["starts-with", "$key", "stuff/clientId"],
["content-length-range", 1048576, 10485760]
]
}

Keybase proof

I hereby claim:

  • I am samrose-ahmed on github.
  • I am samrosea (https://keybase.io/samrosea) on keybase.
  • I have a public key ASDcVIqlPmilrdWFz53bERzvk_nxMdVOhGjoDk-WKv0g4go

To claim this, I am signing this object: