Skip to content

Instantly share code, notes, and snippets.

View qi-qi's full-sized avatar

Qi Qi qi-qi

View GitHub Profile
@qi-qi
qi-qi / emr-master
Last active March 11, 2019 15:42
emr-master
spark-submit --master yarn --deploy-mode cluster --class Main s3://acast-data-dev/batch_scala_s3_2.11-0.1.jar
name := "batch_scala"
version := "0.1"
scalaVersion := "2.11.12"
resolvers += "EMR Maven Repository" at "https://s3.eu-west-1.amazonaws.com/eu-west-1-emr-artifacts/emr-5.21.0/repos/maven/"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.0",
val sample = Seq(
("A", Date.valueOf("2018-03-01")),
("A", Date.valueOf("2018-03-02")),
("A", Date.valueOf("2018-03-01")),
("B", Date.valueOf("2018-03-01")),
("B", Date.valueOf("2018-03-01")),
("C", Date.valueOf("2018-03-01")),
("C", Date.valueOf("2018-03-02")),
("C", Date.valueOf("2018-03-03")))
.toDF("id", "dt")
aws emr add-steps --cluster-id j-2LBN2MDG76KPW --steps Type=Spark,Name="data-job-extract-requests",ActionOnFailure=CONTINUE,Args=[--master,yarn,--deploy-mode,cluster,--driver-memory,12g,--num-executors,20,--executor-cores,5,--executor-memory,12g,--class,com.acast.data.job.extract.requests.Main,s3://acast-data-job/extract/data-job-extract-requests/data-job-extract-requests_2.11-1.0.jar,2019-03-01]
@qi-qi
qi-qi / athena-array-join.sql
Created April 7, 2019 16:29
athena-array-join
SELECT ua_source,
array_join(transform(rt_filters,
x -> concat(x.k, '_',cast(x.v AS varchar))),',') AS rt
FROM "data_extract"."batch_parquet"
WHERE array_join(transform(rt_filters,
x -> concat(x.k, '_',cast(x.v AS varchar))),',') LIKE '%bots_false%uablacklist_true%' limit 100
SELECT *
FROM "data_transform"."session_parquet" t1
JOIN "data_transform"."index_iab_parquet" t2
ON t1.session_id=t2.session_id
AND t1.dt=t2.dt
AND t1.h = t2.h
WHERE t1.dt='2019-02-02'
AND t2.h=0
limit 100;
object Job {
private lazy val spark = SparkSession.active
import spark.implicits._
def run(srcPath: String, dstPathBatch: String, dstPathAd: String, runDate: LocalDate) {
val batchDf = loadBatchExtraDates(srcPath, runDate)
.transform(Util.filterByEventTimestamp(runDate))
.transform(Util.removeDuplicates)
.transform(Util.makeBatchId)
aws s3 sync s3://acast-data-extract-requests/ad_parquet/dt=2019-03-01/ ./dt=2019-03-01/ --exclude="*/*folder*"
CREATE table data_extract.stitch_parquet WITH (
partitioned_by = ARRAY['dt'],
bucketed_by = ARRAY['request_id'],
bucket_count = 25,
format = 'parquet',
parquet_compression = 'SNAPPY',
external_location = 's3://acast-data-extract-cloudfront/stitch_parquet/') AS
SELECT request_id, bytes, edge_location, dt from data_raw.cf_stitch
@qi-qi
qi-qi / firehose-s3-prefix.txt
Last active April 17, 2019 18:27
AWS Firehose Prefix
aws firehose update-destination --delivery-stream-name 'test' --current-delivery-stream-version-id '7' --destination-id 'destinationId-000000000001' --extended-s3-destination-update '{"Prefix":"test_hose/dt=!{timestamp:yyyy'-'MM'-'dd}/h=!{timestamp:HH}/","ErrorOutputPrefix":"test_hose/processing-failed/dt=!{timestamp:yyyy'-'MM'-'dd}/h=!{timestamp:HH}/!{firehose:error-output-type}"}'
aws firehose update-destination --delivery-stream-name 'requests-batch-all' --current-delivery-stream-version-id '16' --destination-id 'destinationId-000000000001' --extended-s3-destination-update '{"Prefix":"dt=!{timestamp:yyyy'-'MM'-'dd}/hh=!{timestamp:HH}/","ErrorOutputPrefix":"processing-failed/dt=!{timestamp:yyyy'-'MM'-'dd}/hh=!{timestamp:HH}/!{firehose:error-output-type}"}'
aws firehose describe-delivery-stream --delivery-stream-name test
https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
https://aws.amazon.com/about-aws/whats-new/2019/02/amazon-kinesis-data-firehose-announces-support-for-custom-amazon-s3-p