Last active
June 29, 2017 01:59
-
-
Save prakashrd/0d480f999f80e4bc5f0f02727597cac3 to your computer and use it in GitHub Desktop.
Convert a CSV file to Parquet
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# A simple script to convert traffic csv to parquet file. Demonstrates the usage of csv to parquet, usage of udfs and applying the | |
import argparse | |
from pyspark.sql import SparkSession | |
# Import data types | |
from pyspark.sql.types import * | |
from pyspark.sql.functions import when, lit, col, udf | |
def convert_csv_to_parquet(spark_context, custom_schema, csv_file, parquet_file): | |
""" | |
spark_context: spark instance | |
custom_schema: the schema of the source csv file | |
csv_file: the source csv_file | |
parquet_file: destination parquet file | |
converts the csv file to parquet file | |
""" | |
df = spark_context.read.csv(csv_file, sep="\t", schema=custom_schema) | |
df_columns = df.columns | |
# read the model and parse through each column | |
# if the row in model is present in df_columns then replace the default values | |
# if it is not present means a new column needs to be added, | |
# add it and assign a default value | |
# for column in df_columns: | |
update_columns = {} | |
for update_column in df_columns: | |
if update_column in traffic_model: | |
update_columns[update_column] = traffic_model[update_column] | |
df = df.na.fill(update_columns) | |
for update_column in traffic_model: | |
if update_column not in df_columns: | |
df = df.withColumn(update_column, lit(traffic_model[update_column])) | |
df.write.parquet(parquet_file) | |
def get_command_line_opts(): | |
p = argparse.ArgumentParser(description='Specify the source traffic CSV and Parquett') | |
p.add_argument('-s', '--source_csv', help='source traffic csv file', required=True, type=str) | |
p.add_argument('-t', '--target_parquet', help='coma separated country codes <c1,c2>', required=False) | |
args = p.parse_args() | |
return [args.source_csv, args.target_parquet] | |
def get_traffic_schema(): | |
""" | |
Defines the schema of the input csv file | |
""" | |
fields = [ | |
StructField('publisher_id', IntegerType(), True), | |
StructField('prev_publisher_id', IntegerType(), True), | |
StructField('publication_id', IntegerType(), True), | |
StructField('prev_publication_id', IntegerType(), True), | |
StructField('section_id', IntegerType(), True), | |
StructField('network_id', IntegerType(), True), | |
StructField('cat1_id', IntegerType(), True), | |
StructField('cat2_id', IntegerType(), True), | |
StructField('cat3_id', IntegerType(), True), | |
StructField('visitor_id', StringType(), True), | |
StructField('visit_id', StringType(), True), | |
StructField('xvisit_id', StringType(), True), | |
StructField('country', IntegerType(), True), | |
StructField('state', StringType(), True), | |
StructField('duration', IntegerType(), True), | |
StructField('xduration', IntegerType(), True), | |
StructField('new_visitor', IntegerType(), True), | |
StructField('is_mobile', IntegerType(), True), | |
StructField('mobile_vendor', StringType(), True), | |
StructField('mobile_handset', StringType(), True), | |
StructField('browser_vendor', StringType(), True), | |
StructField('browser_version', StringType(), True), | |
StructField('os', StringType(), True), | |
StructField('platform', StringType(), True), | |
StructField('connection', StringType(), True), | |
StructField('page_domain', StringType(), True), | |
StructField('ref_domain', StringType(), True), | |
StructField('page_url', StringType(), True), | |
StructField('ad_slot_id', StringType(), True), | |
StructField('hour', IntegerType(), True), | |
StructField('plugin_partner', StringType(), False), | |
StructField('postal_code', StringType(), False), | |
StructField('ip', StringType(), True), | |
StructField('agent', StringType(), True), | |
StructField('referrer', StringType(), True), | |
StructField('timestamp', IntegerType(), True), | |
StructField('timezone', StringType(), True), | |
StructField('is_panel', IntegerType(), True), | |
StructField('time_index', IntegerType(), True), | |
StructField('region', IntegerType(), True) | |
] | |
return StructType(fields) | |
if __name__ == "__main__": | |
source_csv, target_parquet = get_command_line_opts() | |
with open("model.json") as json_data_file: | |
traffic_model = json.load(json_data_file) | |
spark = SparkSession \ | |
.builder \ | |
.appName("Python spark Traffic CSV to Parquet") \ | |
.getOrCreate() | |
convert_csv_to_parquet(spark, get_traffic_schema(), source_csv, target_parquet) | |
spark.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment