Skip to content

Instantly share code, notes, and snippets.

@Samrose-Ahmed
Created December 5, 2023 01:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Samrose-Ahmed/f89403c98c8609268b31a6dcb9d8c1be to your computer and use it in GitHub Desktop.
Save Samrose-Ahmed/f89403c98c8609268b31a6dcb9d8c1be to your computer and use it in GitHub Desktop.
athmetahandler1
/*-
* #%L
* athena-example
* %%
* Copyright (C) 2019 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package com.matano.iceberg.datasize
import com.amazonaws.athena.connector.lambda.QueryStatusChecker
import com.amazonaws.athena.connector.lambda.data.BlockAllocator
import com.amazonaws.athena.connector.lambda.data.BlockWriter
import com.amazonaws.athena.connector.lambda.data.SchemaBuilder
import com.amazonaws.athena.connector.lambda.domain.Split
import com.amazonaws.athena.connector.lambda.domain.TableName
import com.amazonaws.athena.connector.lambda.handlers.MetadataHandler
import com.amazonaws.athena.connector.lambda.metadata.*
import com.amazonaws.athena.connector.lambda.metadata.optimizations.DataSourceOptimizations
import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType
import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory
import com.amazonaws.services.athena.AmazonAthena
import com.amazonaws.services.secretsmanager.AWSSecretsManager
import com.matano.iceberg.IcebergMetadataWriter.Companion.createIcebergCatalog
import org.apache.iceberg.catalog.Catalog
import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.expressions.Expressions
import org.slf4j.LoggerFactory
import java.time.ZoneId
import kotlin.math.max
import kotlin.math.min
//DO NOT REMOVE - this will not be _unused_ when customers go through the tutorial and uncomment
//the TODOs
/**
* This class is part of an tutorial that will walk you through how to build a connector for your
* custom data source. The README for this module (athena-example) will guide you through preparing
* your development environment, modifying this example Metadatahandler, building, deploying, and then
* using your new source in an Athena query.
*
*
* More specifically, this class is responsible for providing Athena with metadata about the schemas (aka databases),
* tables, and table partitions that your source contains. Lastly, this class tells Athena how to split up reads against
* this source. This gives you control over the level of performance and parallelism your source can support.
*
*
* For more examples, please see the other connectors in this repository (e.g. athena-cloudwatch, athena-docdb, etc...)
*/
class DatasizeMetadataHandler : MetadataHandler {
constructor(configOptions: Map<String?, String?>?) : super(SOURCE_TYPE, configOptions)
val icebergCatalog: Catalog by lazy {
createIcebergCatalog()
}
protected constructor(
keyFactory: EncryptionKeyFactory?,
awsSecretsManager: AWSSecretsManager?,
athena: AmazonAthena?,
spillBucket: String?,
spillPrefix: String?,
configOptions: Map<String?, String?>?) : super(keyFactory, awsSecretsManager, athena, SOURCE_TYPE, spillBucket, spillPrefix, configOptions)
/**
* Used to get the list of schemas (aka databases) that this source contains.
*
* @param allocator Tool for creating and managing Apache Arrow Blocks.
* @param request Provides details on who made the request and which Athena catalog they are querying.
* @return A ListSchemasResponse which primarily contains a Set<String> of schema names and a catalog name
* corresponding the Athena catalog that was queried.
</String> */
override fun doListSchemaNames(allocator: BlockAllocator, request: ListSchemasRequest): ListSchemasResponse {
logger.info("doListSchemaNames: enter - $request")
val schemas: Set<String> = hashSetOf("matano_db")
return ListSchemasResponse(request.catalogName, schemas)
}
/**
* Used to get a paginated list of tables that this source contains.
*
* @param allocator Tool for creating and managing Apache Arrow Blocks.
* @param request Provides details on who made the request and which Athena catalog and database they are querying.
* @return A ListTablesResponse which primarily contains a List<TableName> enumerating the tables in this
* catalog, database tuple. It also contains the catalog name corresponding the Athena catalog that was queried.
* @implNote A complete (un-paginated) list of tables should be returned if the request's pageSize is set to
* ListTablesRequest.UNLIMITED_PAGE_SIZE_VALUE.
</TableName> */
override fun doListTables(allocator: BlockAllocator, request: ListTablesRequest): ListTablesResponse {
logger.info("doListTables: enter - $request")
val namespace = Namespace.of("matano")
val tables = icebergCatalog.listTables(namespace).map { ti -> TableName("matano_db", ti.name()) }.toMutableList()
return ListTablesResponse(request.catalogName, tables, null)
}
/**
* Used to get definition (field names, types, descriptions, etc...) of a Table.
*
* @param allocator Tool for creating and managing Apache Arrow Blocks.
* @param request Provides details on who made the request and which Athena catalog, database, and table they are querying.
* @return A GetTableResponse which primarily contains:
* 1. An Apache Arrow Schema object describing the table's columns, types, and descriptions.
* 2. A Set<String> of partition column names (or empty if the table isn't partitioned).
* 3. A TableName object confirming the schema and table name the response is for.
* 4. A catalog name corresponding the Athena catalog that was queried.
</String> */
override fun doGetTable(allocator: BlockAllocator, request: GetTableRequest): GetTableResponse {
logger.info("doGetTable: enter - $request")
val partitionColNames: Set<String> = hashSetOf("ts_hour")
val tableSchemaBuilder = SchemaBuilder.newBuilder()
// tableSchemaBuilder.addStringField("ts_hour")
tableSchemaBuilder.addDateMilliField("ts_hour")
tableSchemaBuilder.addStringField("file_path")
tableSchemaBuilder.addBigIntField("size")
return GetTableResponse(request.catalogName,
request.tableName,
tableSchemaBuilder.build(),
partitionColNames)
}
/**
* Used to get the partitions that must be read from the request table in order to satisfy the requested predicate.
*
* @param blockWriter Used to write rows (partitions) into the Apache Arrow response.
* @param request Provides details of the catalog, database, and table being queried as well as any filter predicate.
* @param queryStatusChecker A QueryStatusChecker that you can use to stop doing work for a query that has already terminated
* @note Partitions are partially opaque to Amazon Athena in that it only understands your partition columns and
* how to filter out partitions that do not meet the query's constraints. Any additional columns you add to the
* partition data are ignored by Athena but passed on to calls on GetSplits.
*/
@Throws(Exception::class)
override fun getPartitions(blockWriter: BlockWriter, request: GetTableLayoutRequest, queryStatusChecker: QueryStatusChecker) {
logger.info("getPartitions: enter - $request")
if (!queryStatusChecker.isQueryRunning) {
logger.info("GetPartitions: Query was already finished")
return
}
val currentTsHour = System.currentTimeMillis() / 1000 / 60 / 60
val startTsHour = currentTsHour - 24 * 365 * 4
for (tsHour in startTsHour..currentTsHour) {
val tsHourMillis = tsHour * 60 * 60 * 1000L
blockWriter.writeRows { block, rowNum ->
var matched = true
matched = matched and block.setValue("ts_hour", rowNum, tsHourMillis)
return@writeRows if (matched) 1 else 0
}
}
}
/**
* Used to split-up the reads required to scan the requested batch of partition(s).
*
* @param allocator Tool for creating and managing Apache Arrow Blocks.
* @param request Provides details of the catalog, database, table, andpartition(s) being queried as well as
* any filter predicate.
* @return A GetSplitsResponse which primarily contains:
* 1. A Set<Split> which represent read operations Amazon Athena must perform by calling your read function.
* 2. (Optional) A continuation token which allows you to paginate the generation of splits for large queries.
* @note A Split is a mostly opaque object to Amazon Athena. Amazon Athena will use the optional SpillLocation and
* optional EncryptionKey for pipelined reads but all properties you set on the Split are passed to your read
* function to help you perform the read.
</Split> */
override fun doGetSplits(allocator: BlockAllocator, request: GetSplitsRequest): GetSplitsResponse {
logger.info("doGetSplits: enter - $request")
val catalogName = request.catalogName
val partitions = request.partitions
val tsHour = partitions.getFieldReader("ts_hour")
val splits = java.util.HashSet<Split>()
var minTsHour: Long = 0
var maxTsHour: Long = 0
for (i in 0 until partitions.rowCount) {
//Set the readers to the partition row we area on
tsHour.position = i
val unixHours = tsHour.readLocalDateTime().atZone(ZoneId.of("UTC")).toEpochSecond() / 60 / 60
minTsHour = min(minTsHour, unixHours)
maxTsHour = max(maxTsHour, unixHours)
}
logger.info("doGetSplits: minTsHour=$minTsHour, maxTsHour=$maxTsHour")
val tableId = TableIdentifier.of(Namespace.of("matano"), request.tableName.tableName)
logger.info("doGetSplits: tableId=$tableId")
val table = icebergCatalog.loadTable(tableId) ?: throw RuntimeException("Table not found: $tableId")
logger.info("doGetSplits: table=$table")
val filePaths = mutableListOf<String>()
table.newScan().filter(
Expressions.and(
Expressions.greaterThanOrEqual(Expressions.hour("ts"), minTsHour),
Expressions.lessThanOrEqual(Expressions.hour("ts"), maxTsHour)
)
).ignoreResiduals().planFiles().use { s ->
s.forEach { f ->
// todo: inject ts_hour
filePaths.add(f.file().path().toString()) }
}
// chunk into 50 files per split
val filesChunks = filePaths.chunked(50)
for (filesChunk in filesChunks) {
val split = Split.newBuilder(makeSpillLocation(request), makeEncryptionKey()).add("files", filesChunk.joinToString(",")).build()
splits.add(split)
}
/**
* TODO: For each partition in the request, create 1 or more splits. Splits
* are parallelizable units of work. Each represents a part of your table
* that needs to be read for the query. Splits are opaque to Athena aside from the
* spill location and encryption key. All properties added to a split are solely
* for your use when Athena calls your readWithContraints(...) function to perform
* the read. In this example we just need to know the partition details (year, month, day).
*
* Split split = Split.newBuilder(makeSpillLocation(request), makeEncryptionKey())
* .add("year", String.valueOf(year.readInteger()))
* .add("month", String.valueOf(month.readInteger()))
* .add("day", String.valueOf(day.readInteger()))
* .build();
*
* splits.add(split);
*
*/
logger.info("doGetSplits: exit - " + splits.size)
return GetSplitsResponse(catalogName, splits)
}
/**
* Used to describe the types of capabilities supported by a data source. An engine can use this to determine what
* portions of the query to push down. A connector that returns any optimization will guarantee that the associated
* predicate will be pushed down.
* @param allocator Tool for creating and managing Apache Arrow Blocks.
* @param request Provides details about the catalog being used.
* @return A GetDataSourceCapabilitiesResponse object which returns a map of supported optimizations that
* the connector is advertising to the consumer. The connector assumes all responsibility for whatever is passed here.
*/
override fun doGetDataSourceCapabilities(allocator: BlockAllocator, request: GetDataSourceCapabilitiesRequest): GetDataSourceCapabilitiesResponse {
var capabilities: Map<String, List<OptimizationSubType>> = mutableMapOf()
capabilities = capabilities.plus(DataSourceOptimizations.SUPPORTS_FILTER_PUSHDOWN.withSupportedSubTypes(*FilterPushdownSubType.values()).toPair())
capabilities = capabilities.plus(DataSourceOptimizations.SUPPORTS_LIMIT_PUSHDOWN.withSupportedSubTypes(*LimitPushdownSubType.values()).toPair())
/*
* TODO: Add capabilities which your connector will support.
* The capabilities you return here will cause Athena to fill the Constraints object in the RecordHandler's
* readWithConstraint method's ReadRecordsRequest parameter with the specific pushdowns your connector supports
* and expect your record handler to correctly evaluate the pushdowns present.
*
* Example: capabilities.putAll(DataSourceOptimizations.SUPPORTS_FILTER_PUSHDOWN.withSupportedSubTypes(FilterPushdownSubType.ALL));
* This tells Athena your connector can handle simple associative filters, like colA > 10.
*
* See athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/metadata/optimizations/DataSourceOptimizations.java for options.
* Pushdown subtypes are found in athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/metadata/optimizations/pushdown/
*
*/return GetDataSourceCapabilitiesResponse(request.catalogName, capabilities)
}
val logger = LoggerFactory.getLogger(this::class.java)
companion object {
/**
* used to aid in debugging. Athena will use this name in conjunction with your catalog id
* to correlate relevant query errors.
*/
private const val SOURCE_TYPE = "matano_datasize"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment