Skip to content

Instantly share code, notes, and snippets.

@prakashrd
Last active June 29, 2017 01:59
Show Gist options
  • Save prakashrd/0d480f999f80e4bc5f0f02727597cac3 to your computer and use it in GitHub Desktop.
Save prakashrd/0d480f999f80e4bc5f0f02727597cac3 to your computer and use it in GitHub Desktop.
Convert a CSV file to Parquet
# A simple script to convert traffic csv to parquet file. Demonstrates the usage of csv to parquet, usage of udfs and applying the
import argparse
from pyspark.sql import SparkSession
# Import data types
from pyspark.sql.types import *
from pyspark.sql.functions import when, lit, col, udf
def convert_csv_to_parquet(spark_context, custom_schema, csv_file, parquet_file):
"""
spark_context: spark instance
custom_schema: the schema of the source csv file
csv_file: the source csv_file
parquet_file: destination parquet file
converts the csv file to parquet file
"""
df = spark_context.read.csv(csv_file, sep="\t", schema=custom_schema)
df_columns = df.columns
# read the model and parse through each column
# if the row in model is present in df_columns then replace the default values
# if it is not present means a new column needs to be added,
# add it and assign a default value
# for column in df_columns:
update_columns = {}
for update_column in df_columns:
if update_column in traffic_model:
update_columns[update_column] = traffic_model[update_column]
df = df.na.fill(update_columns)
for update_column in traffic_model:
if update_column not in df_columns:
df = df.withColumn(update_column, lit(traffic_model[update_column]))
df.write.parquet(parquet_file)
def get_command_line_opts():
p = argparse.ArgumentParser(description='Specify the source traffic CSV and Parquett')
p.add_argument('-s', '--source_csv', help='source traffic csv file', required=True, type=str)
p.add_argument('-t', '--target_parquet', help='coma separated country codes <c1,c2>', required=False)
args = p.parse_args()
return [args.source_csv, args.target_parquet]
def get_traffic_schema():
"""
Defines the schema of the input csv file
"""
fields = [
StructField('publisher_id', IntegerType(), True),
StructField('prev_publisher_id', IntegerType(), True),
StructField('publication_id', IntegerType(), True),
StructField('prev_publication_id', IntegerType(), True),
StructField('section_id', IntegerType(), True),
StructField('network_id', IntegerType(), True),
StructField('cat1_id', IntegerType(), True),
StructField('cat2_id', IntegerType(), True),
StructField('cat3_id', IntegerType(), True),
StructField('visitor_id', StringType(), True),
StructField('visit_id', StringType(), True),
StructField('xvisit_id', StringType(), True),
StructField('country', IntegerType(), True),
StructField('state', StringType(), True),
StructField('duration', IntegerType(), True),
StructField('xduration', IntegerType(), True),
StructField('new_visitor', IntegerType(), True),
StructField('is_mobile', IntegerType(), True),
StructField('mobile_vendor', StringType(), True),
StructField('mobile_handset', StringType(), True),
StructField('browser_vendor', StringType(), True),
StructField('browser_version', StringType(), True),
StructField('os', StringType(), True),
StructField('platform', StringType(), True),
StructField('connection', StringType(), True),
StructField('page_domain', StringType(), True),
StructField('ref_domain', StringType(), True),
StructField('page_url', StringType(), True),
StructField('ad_slot_id', StringType(), True),
StructField('hour', IntegerType(), True),
StructField('plugin_partner', StringType(), False),
StructField('postal_code', StringType(), False),
StructField('ip', StringType(), True),
StructField('agent', StringType(), True),
StructField('referrer', StringType(), True),
StructField('timestamp', IntegerType(), True),
StructField('timezone', StringType(), True),
StructField('is_panel', IntegerType(), True),
StructField('time_index', IntegerType(), True),
StructField('region', IntegerType(), True)
]
return StructType(fields)
if __name__ == "__main__":
source_csv, target_parquet = get_command_line_opts()
with open("model.json") as json_data_file:
traffic_model = json.load(json_data_file)
spark = SparkSession \
.builder \
.appName("Python spark Traffic CSV to Parquet") \
.getOrCreate()
convert_csv_to_parquet(spark, get_traffic_schema(), source_csv, target_parquet)
spark.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment