Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save vaquarkhan/374d9e8d83bfe5518215d65bf240a50b to your computer and use it in GitHub Desktop.
Save vaquarkhan/374d9e8d83bfe5518215d65bf240a50b to your computer and use it in GitHub Desktop.
Use hadoop to compare two data tables on s3, write out differences
import os
import subprocess
import sys
os.environ["SPARK_HOME"] = r"/usr/lib/spark"
# Set PYTHONPATH for Spark
for path in [r'/usr/lib/spark/python/', r'/usr/lib/spark/python/lib/py4j-src.zip']:
sys.path.append(path)
from pyspark import SparkContext
from pyspark.sql import SQLContext
def main():
sc = SparkContext()
sqlContext = SQLContext(sc)
print 'reading in dataframes'
old_df = sqlContext.read.csv('s3n://gfw2-data/alerts-tsv/temp/20171012_extract.csv', sep=',', header=True, inferSchema=True)
new_df = sqlContext.read.csv('s3n://gfw2-data/alerts-tsv/temp/output-glad-summary-20171018/part-', sep=',', header=True, inferSchema=True)
print 'doing subtraction'
to_remove = old_df.subtract(new_df)
to_add = new_df.subtract(old_df)
for df, label in zip((to_remove, to_add), ('deletes', 'adds')):
print 'starting {}'.format(label)
s3_temp_dir = r's3://gfw2-data/alerts-tsv/temp/output-glad-diffs-{}/'.format(label)
s3_dest_dir = r's3://gfw2-data/alerts-tsv/temp/output-glad-summary-diffs-{}/'.format(label)
# write hadoop output in parts to S3
df.write.csv(s3_temp_dir)
# group all output files into one
s3_cmd = ['s3-dist-cp', '--src', s3_temp_dir, '--dest', s3_dest_dir, '--groupBy', '.*(part-r*).*']
subprocess.check_call(s3_cmd)
# remove the temp directory
remove_temp_cmd = ['aws', 's3', 'rm', s3_temp_dir, '--recursive']
subprocess.check_call(remove_temp_cmd)
sc.stop()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment