Skip to content

Instantly share code, notes, and snippets.

@breath103
Last active February 23, 2018 08:34
Show Gist options
  • Save breath103/7c70fafe3a106d21d1b25e91ede707ba to your computer and use it in GitHub Desktop.
Save breath103/7c70fafe3a106d21d1b25e91ede707ba to your computer and use it in GitHub Desktop.
CloudFront logs athena table partition indexer
import * as AWS from "aws-sdk";
// Athena Helper
export async function executeAthenaQuery(query: string | string[], outputLocation: string) {
const athena = new AWS.Athena();
let queryString: string;
if (typeof query === "string") {
queryString = query;
} else {
queryString = query.join("\n");
}
const queryExecution = await athena.startQueryExecution({
QueryString: queryString,
ResultConfiguration: { OutputLocation: outputLocation },
}).promise();
while (true) {
const latestExecution =
await athena.getQueryExecution({ QueryExecutionId: queryExecution.QueryExecutionId! }).promise();
await new Promise((resolve, reject) => {
setTimeout(resolve, 1000);
});
switch (latestExecution.QueryExecution!.Status!.State) {
case "FAILED": {
throw new Error(`Execution failed. ${latestExecution.QueryExecution!.QueryExecutionId}`);
}
case "CANCELLED": {
throw new Error(`Execution cancelled. ${latestExecution.QueryExecution!.QueryExecutionId}`);
}
case "QUEUED":
case "RUNNING": break;
case "SUCCEEDED": {
return latestExecution;
}
}
}
}
import * as AWS from "aws-sdk";
import HandlerWrapper from "../handler_wrapper";
import { executeAthenaQuery } from "./athena";
import { S3Event } from "./s3_event";
import * as debug from "debug";
const log = debug("HANDLER");
export async function __handler(event: S3Event) {
log(`Received Total: ${event.Records.length} records`);
try {
const logS3Bucket: string = (() => {
const raw = process.env.CLOUDFRONT_LOG_S3_BUCKET;
if (typeof raw !== "string") {
throw new Error(`CLOUDFRONT_LOG_S3_BUCKET must be provided`);
}
return raw;
})();
const athenaOutputLocation = `s3://${logS3Bucket}/athena/`;
const s3RootLocation = `s3://${logS3Bucket}/log/`;
const tableName = "cloudfront_logs";
const resetTable = false;
if (resetTable) {
// tslint:disable
// Drop Athena Table
await executeAthenaQuery([`DROP TABLE IF EXISTS ${tableName}`], athenaOutputLocation);
// Create Table
await executeAthenaQuery(
`CREATE EXTERNAL TABLE ${tableName}(
date date COMMENT '',
time string COMMENT '',
location string COMMENT '',
bytes bigint COMMENT '',
requestip string COMMENT '',
method string COMMENT '',
host string COMMENT '',
uri string COMMENT '',
status int COMMENT '',
referrer string COMMENT '',
useragent string COMMENT '',
querystring string COMMENT '',
cookie string COMMENT '',
resulttype string COMMENT '',
requestid string COMMENT '',
hostheader string COMMENT '',
requestprotocol int COMMENT '',
requestbytes bigint COMMENT '',
timetaken double COMMENT '',
xforwardedfor string COMMENT '',
sslprotocol string COMMENT '',
sslcipher string COMMENT '',
responseresulttype string COMMENT '',
protocol_version string COMMENT '',
fle_status string COMMENT '',
fle_encrypted_fields string COMMENT ''
)
PARTITIONED BY (
distributionId string, year string, month string, day string, hour string
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
'input.regex'='^(?!#)([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)$')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'${s3RootLocation}'
`, athenaOutputLocation);
// tslint:enable
}
const s3 = new AWS.S3();
for (const record of event.Records) {
const key = record.s3.object.key;
const pathComponents = key.split("/");
const [distributionId, date, hash] = pathComponents[pathComponents.length - 1].split(".");
const [year, month, day, hour] = date.split("-");
log(`Processing ${record.s3.bucket.name}/${record.s3.object.key}`);
// First, we've got to move file into new folder..
const newKeyFolder = `logs/${distributionId}/${year}/${month}/${day}/${hour}/`;
const newKey = `${newKeyFolder}${hash}.gz`;
await s3.copyObject({
Bucket: record.s3.bucket.name,
Key: newKey,
CopySource: `/${record.s3.bucket.name}/${record.s3.object.key}`,
}).promise();
// And register it
const addPartitionResult = await executeAthenaQuery([
`ALTER TABLE ${tableName}`,
// tslint:disable-next-line
`ADD IF NOT EXISTS PARTITION (distributionId="${distributionId}", year="${year}", month="${month}", day="${day}", hour="${hour}")`,
`LOCATION 's3://${record.s3.bucket.name}/${newKeyFolder}'`,
], athenaOutputLocation);
}
} catch (e) {
log("Error : ", e);
throw e;
}
}
export const handler = HandlerWrapper.safelyWrap(__handler);
// tslint:disable
export interface S3Event {
Records: S3EventRecord[];
}
export interface S3EventRecord {
eventVersion: string;
eventSource: string;
awsRegion: string;
eventTime: string; // 2017-04-11T11:29:00.611Z;
eventName: string;
userIdentity: {
principalId: string;
};
requestParameters: {
sourceIPAddress: string;
};
responseElements: {
["x-amz-request-id"]: string;
["x-amz-id-2"]: string;
};
s3: {
s3SchemaVersion: string;
configurationId: string;
bucket: {
name: string;
ownerIdentity: {
principalId: string;
};
arn: string;
};
object: {
key: string;
size: number;
eTag: string;
sequencer: string;
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment