Skip to content

Instantly share code, notes, and snippets.

View qi-qi's full-sized avatar

Qi Qi qi-qi

View GitHub Profile
@qi-qi
qi-qi / athena-join-parquet.sql
Last active March 8, 2019 10:43
athena-join-parquet.sql
CREATE table test.stats_20190210_20190306 WITH (
bucketed_by = ARRAY['event_ts'],
bucket_count = 10,
format = 'orc',
external_location = 's3://acast-data-dev/stats_20190210_20190306/') AS
with t2 as (select request_id, bytes from test.cf_stitch)
SELECT cast(from_iso8601_timestamp(t1.event_date) AS timestamp) AS event_ts,
t1.best_effort_user_id,
t1.ip,
@qi-qi
qi-qi / emr-5.21
Last active March 8, 2019 13:55
emr-5.21
[hadoop@ip-192-168-47-191 ~]$ sudo find / -iname "*amzn*.jar"
/mnt/var/lib/hadoop-httpfs/tomcat-deployment/webapps/webhdfs/WEB-INF/lib/hadoop-hdfs-client-2.8.5-amzn-1.jar
/mnt/var/lib/hadoop-httpfs/tomcat-deployment/webapps/webhdfs/WEB-INF/lib/hadoop-auth-2.8.5-amzn-1.jar
/mnt/var/lib/hadoop-httpfs/tomcat-deployment/webapps/webhdfs/WEB-INF/lib/hadoop-hdfs-2.8.5-amzn-1.jar
/mnt/var/lib/hadoop-httpfs/tomcat-deployment/webapps/webhdfs/WEB-INF/lib/hadoop-annotations-2.8.5-amzn-1.jar
/mnt/var/lib/hadoop-httpfs/tomcat-deployment/webapps/webhdfs/WEB-INF/lib/hadoop-common-2.8.5-amzn-1.jar
/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-core-2.8.5-amzn-1.jar
/usr/lib/hadoop-mapreduce/hadoop-rumen-2.8.5-amzn-1.jar
/usr/lib/hadoop-mapreduce/hadoop-distcp-2.8.5-amzn-1.jar
/usr/lib/hadoop-mapreduce/hadoop-azure-2.8.5-amzn-1.jar
name := "batch_scala"
version := "0.1"
scalaVersion := "2.11.12"
resolvers += "EMR Maven Repository" at "https://s3.eu-west-1.amazonaws.com/eu-west-1-emr-artifacts/emr-5.21.0/repos/maven/"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.0",
@qi-qi
qi-qi / emr-master
Last active March 11, 2019 15:42
emr-master
spark-submit --master yarn --deploy-mode cluster --class Main s3://acast-data-dev/batch_scala_s3_2.11-0.1.jar
val sample = Seq(
("A", Date.valueOf("2018-03-01")),
("A", Date.valueOf("2018-03-02")),
("A", Date.valueOf("2018-03-01")),
("B", Date.valueOf("2018-03-01")),
("B", Date.valueOf("2018-03-01")),
("C", Date.valueOf("2018-03-01")),
("C", Date.valueOf("2018-03-02")),
("C", Date.valueOf("2018-03-03")))
.toDF("id", "dt")
aws emr add-steps --cluster-id j-2LBN2MDG76KPW --steps Type=Spark,Name="data-job-extract-requests",ActionOnFailure=CONTINUE,Args=[--master,yarn,--deploy-mode,cluster,--driver-memory,12g,--num-executors,20,--executor-cores,5,--executor-memory,12g,--class,com.acast.data.job.extract.requests.Main,s3://acast-data-job/extract/data-job-extract-requests/data-job-extract-requests_2.11-1.0.jar,2019-03-01]
CREATE table data_extract.stitch_parquet WITH (
partitioned_by = ARRAY['dt'],
bucketed_by = ARRAY['request_id'],
bucket_count = 25,
format = 'parquet',
parquet_compression = 'SNAPPY',
external_location = 's3://acast-data-extract-cloudfront/stitch_parquet/') AS
SELECT request_id, bytes, edge_location, dt from data_raw.cf_stitch
@qi-qi
qi-qi / athena-array-join.sql
Created April 7, 2019 16:29
athena-array-join
SELECT ua_source,
array_join(transform(rt_filters,
x -> concat(x.k, '_',cast(x.v AS varchar))),',') AS rt
FROM "data_extract"."batch_parquet"
WHERE array_join(transform(rt_filters,
x -> concat(x.k, '_',cast(x.v AS varchar))),',') LIKE '%bots_false%uablacklist_true%' limit 100
SELECT *
FROM "data_transform"."session_parquet" t1
JOIN "data_transform"."index_iab_parquet" t2
ON t1.session_id=t2.session_id
AND t1.dt=t2.dt
AND t1.h = t2.h
WHERE t1.dt='2019-02-02'
AND t2.h=0
limit 100;
object Job {
private lazy val spark = SparkSession.active
import spark.implicits._
def run(srcPath: String, dstPathBatch: String, dstPathAd: String, runDate: LocalDate) {
val batchDf = loadBatchExtraDates(srcPath, runDate)
.transform(Util.filterByEventTimestamp(runDate))
.transform(Util.removeDuplicates)
.transform(Util.makeBatchId)