Skip to content

Instantly share code, notes, and snippets.

@luminousmen
Created August 1, 2019 18:34
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save luminousmen/ac38d8935ea71ac738ba92677b0eb703 to your computer and use it in GitHub Desktop.
Save luminousmen/ac38d8935ea71ac738ba92677b0eb703 to your computer and use it in GitHub Desktop.
Big Data file formats
#!/bin/bash
END=3
FUNC="stats"
for ((i=1;i<=END;i++)); do
for fmt in csv json avro parquet; do
spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.0 script.py $fmt $FUNC
done
done
# spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.0 run.py
# http://academictorrents.com/details/9b13183dc4d60676b773c9e2cd6de5e5542cee9a
import time
import argparse
import pyspark
from pyspark.sql import SparkSession
def groupby(sdf):
return sdf.groupBy("rating").count()
def stats(sdf, field="rating"):
return sdf.agg({field: "max"}), sdf.agg({field: "min"}), sdf.agg({field: "count"})
def random_batch(sdf):
return sdf.sample(False, 0.05).collect()
def distinct(sdf):
return sdf.distinct().count()
def filtering(sdf, date="2005-11-15"):
return sdf.filter(sdf.date > date).count()
def get_op(op):
return {
"stats": stats,
"random_batch": random_batch,
"distinct": distinct,
"filtering": filtering,
"groupby": groupby,
}.get(op)
def read(fmt):
if fmt == "json":
sdf = spark.read.option("header", "true").json("netflix.json")
elif fmt == "csv":
sdf = spark.read.option("header","true").csv("netflix.csv")
elif fmt == "avro":
sdf = spark.read.format("avro").option("header","true").load("netflix.avro")
elif fmt == "parquet":
sdf = spark.read.option("header","true").parquet("netflix.parquet")
return sdf
def write(sdf, fmt, name="test"):
sdf = sdf.withColumnRenamed("_c0", "user_id") \
.withColumnRenamed("_c1", "rating") \
.withColumnRenamed("_c2", "date")
if fmt == "json":
sdf.write.option("header","true").json("{}.json".format(name))
elif fmt == "csv":
sdf.write.option("header","true").csv("{}.csv".format(name))
elif fmt == "avro":
sdf.write.format("avro").option("header","true").save("{}.avro".format(name))
elif fmt == "parquet":
sdf.write.option("header","true").parquet("{}.parquet".format(name))
def mute_spark_logs(sc):
"""Mute Spark info logging"""
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("fmt", type=str)
parser.add_argument("op", type=str)
args = parser.parse_args()
spark = SparkSession.builder.master("local[*]").getOrCreate()
mute_spark_logs(spark)
sdf = read("csv")
start = time.time()
get_op(args.op)(sdf)
print("{}: {}".format(args.fmt, time.time() - start))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment