Skip to content

Instantly share code, notes, and snippets.

@karpanGit
karpanGit / pyspark, local vs global views.py
Created May 3, 2022 19:18
pyspark, local vs global views
spark = (
SparkSession.builder
.appName('learn')
# .config('spark.sql.shuffle.partitions', 10)
# .config('spark.default.parallelism', 10)
# .config('spark.executor.memory', '1g')
# .config('spark.driver.memory', '1g')
# .config('spark.executor.instances', 1)
#.config('spark.executor.cores', 2)
.getOrCreate()
@karpanGit
karpanGit / pyspark, create struct from columns.py
Created May 1, 2022 14:20
pyspark, create struct from columns
# simple example, create struct
import pyspark.sql.functions as F
df = [[1, 'mplah', 'gogo'], [2, 'mplah2', 'gogo2'], [3, 'mplah3', 'gogo3']]
df = spark.createDataFrame(df, schema=['x', 'y', 'z'])
res = df.select(F.col('x'), F.struct(F.col('x').alias('_x'), F.col('y').alias('_y')).alias('_xy'))
res.show()
# | x| _xy|
# +---+-----------+
# | 1| {1, mplah}|
# | 2|{2, mplah2}|
@karpanGit
karpanGit / pyspark, explode array, explode list, collect_list, collect_set.py
Created May 1, 2022 14:17
pyspark, explode array, explode list, collect_list, collect_set
# simple example: explode array
import pyspark.sql.functions as F
df = [[1, 'mplah', ['a', 'b', 'c']], [2, 'mplah2', ['a2', 'b2', 'c2']], [3, 'mplah3', ['a3', 'b3', 'c3', 'd3']]]
df = spark.createDataFrame(df, schema=['x', 'y', 'z'])
df.printSchema()
res = df.select(F.col('x'), F.explode(F.col('z')).alias('z'))
res.show(truncate=False)
res = df.select(F.col('x'), F.posexplode(F.col('z')).alias('z_id', 'z'))
res.show(truncate=False)
@karpanGit
karpanGit / pyspark, from pyspark to json and back.py
Created May 1, 2022 09:38
pyspark, from pyspark to json and back
# from pyspark schema to json and the other way round
import pyspark.sql.types as T
from pprint import pprint
# -- create simple dataframe and schema
df = [[1, 'mplah', ['Panos', 'George'], {'a': 'b', 'c': 'd'}, ('mplip1', 'mplip1_')], [2, 'mplah2', ['Panos2', 'George2'], {'a2': 'b2', 'c2': 'd2'}, ('mplip2', 'mplip2_')] ]
schema = T.StructType([
T.StructField('x1', T.LongType()),
T.StructField('x2', T.StringType()),
T.StructField('x3', T.ArrayType(StringType())),
T.StructField('x4', T.MapType(StringType(), StringType())),
@karpanGit
karpanGit / pyspark, read csv by specifying date and datetime format.py
Created May 1, 2022 08:39
pyspark, read csv by specifying date and datetime format
# note the FAILFAST mode. It is much preferred to PERMISSIVE to catch errors early
# read dates and datetimes using the default ISO format
# date: yyyy-MM-dd
# datetime: yyyy-MM-ddTHH:mm:ss.SSS
import pyspark.sql.types as T
with open(r'D:/junk/tmp.csv', 'wt') as f:
f.write('1\t2022-10-03\t2022-10-03T06:02:01.657\n')
f.write('1\t2022-10-13\t2021-10-03T06:32:01.001')
@karpanGit
karpanGit / pyspark, manually create schema containing complex columns, populate dataframe and extract data.py
Created April 30, 2022 13:23
pyspark, manually create schema containing complex columns, populate dataframe and extract data
# create a simple schema and populate an example dataframe
childSchema = StructType([
StructField('child name', StringType(), nullable=False),
StructField('child age', LongType(), nullable=False)
])
schema = StructType([
StructField('name', StringType(), nullable=False),
StructField('age', LongType(), nullable=False),
StructField('children', ArrayType(childSchema, containsNull=False), nullable=False)
])
@karpanGit
karpanGit / pyspark, extract data from structs with scalars and structs with arrays.py
Created April 30, 2022 13:09
pyspark, extract data from structs with scalars and structs with arrays
# extract data from array of structs, nested
import json
from pyspark.sql.types import *
structureSchema = StructType([
StructField('name', ArrayType(StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
]))),
@karpanGit
karpanGit / pyspark, create map from arrays.py
Last active April 30, 2022 12:06
pyspark, create map from arrays
# experiment with map
# experiment 1
df = [[1,2], [3,4]]
df = spark.createDataFrame(df, schema=['a', 'b'])
res = df.select(f.array([f.lit(col) for col in df.columns]).alias('names'), f.array(f.col('a'), f.col('b')).alias('values'))
res.printSchema()
res = res.select(f.map_from_arrays(f.col('names'), f.col('values')).alias('mapped'))
res.printSchema()
# |-- mapped: map (nullable = false)
@karpanGit
karpanGit / pyspark, read json.py
Created April 27, 2022 05:48
pyspark, read json
# read single singleline json
ex1 = '''{"a": "hello", "b": "hello 2", "c":{"x":[1,2,3], "y": "bye"} }'''
with open(r'./junk/ex1.json', 'wt') as f:
f.write(ex1)
data = spark.read.json(r'./junk/ex1.json')
data.show()
# +-----+-------+----------------+
# | a| b| c|
# +-----+-------+----------------+
# |hello|hello 2|{[1, 2, 3], bye}|
@karpanGit
karpanGit / python, enforcing type checking.py
Created January 9, 2022 15:43
python, enforcing type checking
def f(a: int, b: str, c: float):
import inspect
args = inspect.getfullargspec(f).args
annotations = inspect.getfullargspec(f).annotations
# annotations = f.__annotations__
# print(type(locals()), locals())
for x in args:
print(x, '->',
'arg is', type(locals()[x]), ',',
'annotation is', annotations[x],