Skip to content

Instantly share code, notes, and snippets.

@jobliz
Created June 19, 2018 00:13
Show Gist options
  • Save jobliz/e9cd16314be31bd4c9c0bd953866870e to your computer and use it in GitHub Desktop.
Save jobliz/e9cd16314be31bd4c9c0bd953866870e to your computer and use it in GitHub Desktop.
Running PySpark commands from a list
from typing import List, Tuple
from pyspark import SparkContext
from pyspark.sql import SparkSession
commands = [
['read'],
['option', ("inferSchema", "true")],
['option', ("header", "true")],
['option', ("dateFormat", "dd/MM/yyyy H:m")],
['option', ("timestampFormat", "dd/MM/yyyy H:m")],
['csv', ("retail.csv",)]
]
context = SparkContext()
spark = SparkSession(context)
spark.conf.set("spark.sql.shuffle.partitions", "5")
def execute_commands(ss: SparkSession, pairs: List[List[str, Tuple]]):
"""
Executes commands in a SparkSession from a command list.
Currently assumes that the first command doesn't receive parameters.
TODO: Other starters that are not 'read' might receive parameters.
TODO: Other commands that are not 'option' might receive more than two parameters.
"""
root = None
for pair in pairs:
if root is None:
root = getattr(ss, pair[0])
continue
else:
method = getattr(root, pair[0])
if len(pair[1]) == 1:
root = method(pair[1][0])
elif len(pair[1]) == 2:
root = method(pair[1][0], pair[1][1])
else:
raise ValueError("Must receive len 1 or 2")
return root
print(execute_commands(spark, commands))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment