Skip to content

Instantly share code, notes, and snippets.

@jasonnerothin
Created September 24, 2019 04:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jasonnerothin/753892bbb9b02aab1ba5f654c72aec0e to your computer and use it in GitHub Desktop.
Save jasonnerothin/753892bbb9b02aab1ba5f654c72aec0e to your computer and use it in GitHub Desktop.
Read parquet, simple transformations, write parquet using spark sql
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from random import randint, seed
input_file = '/tmp/input.snappy.parquet'
output_file = '/tmp/output.snappy.parquet'
spark = SparkSession(SparkContext('local', 'make-recs-application'))
class ReadRewriteParquet:
def __init__(self):
self.id_ctr = 0
def ids(self):
df = spark.read.parquet(input_file)
cnt = df.count()
print(cnt)
return df. \
selectExpr("id as patientId"). \
withColumn("amount", lit(40)). \
withColumn("id", lit(randint(0, 1112)))
# sort("]", ascending=True). \
def make_record(self, patient_id):
x = self.id_ctr
record = [x, patient_id, 40.0]
self.id_ctr = self.id_ctr + 1
print(record)
if __name__ == '__main__':
seed()
f = ReadRewriteParquet()
id_df = f.ids()
id_df.write.parquet(output_file, compression='snappy')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment