Skip to content

Instantly share code, notes, and snippets.

@szczeles
Created March 31, 2020 13:25
Show Gist options
  • Save szczeles/3207acb04ed9230efd57106b42b03c48 to your computer and use it in GitHub Desktop.
Save szczeles/3207acb04ed9230efd57106b42b03c48 to your computer and use it in GitHub Desktop.
Integration tests of Spark applications
from pyspark.sql import SparkSession
from argparse import ArgumentParser
# parse arguments
parser = ArgumentParser()
parser.add_argument('--input-events', help='Events, parquet format')
parser.add_argument('--input-ads', help='Ads, JSON format')
parser.add_argument('--output-joined', help='Output location of enriched data')
parser.add_argument('--output-invalid', help='Invalid data')
parser.add_argument('--dt', help='Date partition indicator')
args = parser.parse_args()
# load the data
spark = SparkSession.builder.getOrCreate()
all_events = spark.read.parquet(args.input_events)
events = all_events.where(all_events.dt == args.dt)
ads = spark.read.json(args.input_ads)
# save the reults
events.join(ads, events.ad_id == ads.id) \
.write.parquet(f'{args.output_joined}/dt={args.dt}')
events.join(ads, events.ad_id == ads.id, 'leftanti') \
.write.parquet(f'{args.output_invalid}/dt={args.dt}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment