Skip to content

Instantly share code, notes, and snippets.

@skliarpawlo
Last active May 6, 2016 08:03
Show Gist options
  • Save skliarpawlo/f57a40a9d554a8707386 to your computer and use it in GitHub Desktop.
Save skliarpawlo/f57a40a9d554a8707386 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
args_parser = argparse.ArgumentParser('YT Activities join')
args_parser.add_argument(
'--drain',
type=str,
required=True
)
if __name__ == '__main__':
packages_args = '--packages com.databricks:spark-avro_2.10:2.0.1'
os.environ['PYSPARK_SUBMIT_ARGS'] = '{} pyspark-shell'.format(
packages_args
)
conf = SparkConf().setAppName('Join YT activities')
conf.set('spark.hadoop.avro.mapred.ignore.inputs.without.extension', 'false')
sc = SparkContext(conf=conf)
hive = HiveContext(sc)
args = args_parser.parse_args()
drain = args.drain
df_old = hive.read.option("mergeSchema", "true") \
.format('com.databricks.spark.avro') \
.load('s3a://tubular-cassiopeia/tasks/cassandra2hive/cassiopeia-avro-drain/youtube_activities/')
df_new = hive.read.option("mergeSchema", "true") \
.format('com.databricks.spark.avro') \
.load('s3a://tubular-cassiopeia/tasks/cassandra2hive/cassiopeia-avro-drain/youtube_activities_old/')
df_old.registerTempTable('old_dataframe')
df_new.registerTempTable('new_dataframe')
df_joined = hive.sql(
"""
SELECT
if(new_dataframe.uid IS NULL, old_dataframe.uid, new_dataframe.uid) as uid,
if(new_dataframe.uid IS NULL, old_dataframe.activity, new_dataframe.activity) as activity,
if(new_dataframe.uid IS NULL, old_dataframe.author_id, new_dataframe.author_id) as author_id,
if(new_dataframe.uid IS NULL, old_dataframe.channel_id, new_dataframe.channel_id) as channel_id,
if(new_dataframe.uid IS NULL, old_dataframe.comment_id, new_dataframe.comment_id) as comment_id,
if(new_dataframe.uid IS NULL, unix_timestamp(old_dataframe.created_at), unix_timestamp(new_dataframe.created_at)) as created_at,
if(new_dataframe.uid IS NULL, to_date(old_dataframe.created_at), to_date(new_dataframe.created_at)) as date,
if(new_dataframe.uid IS NULL, old_dataframe.video_id, new_dataframe.video_id) as video_id,
'youtube' as platform,
ceil(rand(2) * 100) % 5 as rnd
FROM new_dataframe
FULL OUTER JOIN old_dataframe ON new_dataframe.uid=old_dataframe.uid
"""
)
df_joined.repartition(3600, 'platform', 'date', 'rnd').drop('rnd').write.parquet(path=drain, partitionBy=['platform', 'date'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment