Skip to content

Instantly share code, notes, and snippets.

@nmukerje
nmukerje / Build_Hudi
Last active May 18, 2021 20:28
How to build Hudi on EC2
## How to build Hudi on EC2
## install mavn
sudo wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo
sudo yum install -y apache-maven
mvn --version
## install JDK 1.8
sudo yum install java-1.8.0
@nmukerje
nmukerje / gist:98907a6402d5fb14c95809e5fa1d92b7
Created February 8, 2021 22:49
Glue Studio SQL Transform
def SQLTransform (glueContext, dfc) -> DynamicFrameCollection:
dfc.select(list(dfc.keys())[0]).toDF().createOrReplaceTempView("<SRC1>")
### Enter your SQL Statement here
sql_statement = "SELECT bucket,key,struct.col1,array_col[0].array_col1[0].col1 FROM <SRC1> a ..."
###
output_df = spark.sql(sql_statement)
dyf_output = DynamicFrame.fromDF(output_df, glueContext, "SQLTransform0")
return(DynamicFrameCollection({"SQLTransform0": dyf_output}, glueContext))
## Convert a StructType to MapType column :
## Useful when you want to move all Dynamic Fields of a Schema within a StructType column into a single MapType Column.
from pyspark.sql.types import *
from pyspark.sql.functions import *
import json
def toMap(d):
if d:
return(json.loads(d))
@nmukerje
nmukerje / Spark_Convert_Columns_LowerCase_Remove_Hyphens
Last active October 26, 2020 23:52
Pyspark Makes Nested Coluumns Lower Case and Replaces Hyphens with Underscores.
from pyspark.sql import functions as F
def get_column_wise_schema(df_string_schema, df_columns):
# Returns a dictionary containing column name and corresponding column schema as string.
column_schema_dict = {}
i = 0
while i < len(df_columns):
current_col = df_columns[i]
next_col = df_columns[i + 1] if i < len(df_columns) - 1 else None
current_col_split_key = '[' + current_col + ': ' if i == 0 else ' ' + current_col + ': '
# AWS Lambda function to AutoTerminate Idle EMR Clusters
#
# ENVIRONMENT VARIABLES to define for the Lambda function:
# LOOKBACK_TIME_IN_DAYS : 31
# IDLE_TIME_IN_MINS : 15 (should be intervals of 5 mins)
# HONOR_TERMINATION_PROTECTION : TRUE
#
import json,boto3,os
from datetime import datetime, timedelta
%%sh
echo '{"kernel_python_credentials" : {"url": "http://<EMR Master node Private IP>:8998/"}, "session_configs":
{"executorMemory": "2g","executorCores": 2,"numExecutors":4}}' > ~/.sparkmagic/config.json
less ~/.sparkmagic/config.json
pip install jupyter_contrib_nbextensions
jupyter contrib nbextension install --user
jupyter nbextension enable execute_time/ExecuteTime
jupyter nbconvert --to notebook --ExecutePreprocessor.kernel_name=pysparkkernel --ExecutePreprocessor.timeout=18000 --execute <notebook>.ipynb
@nmukerje
nmukerje / gist:92159d9b0b955e964ee99e47bbdf8b6d
Created March 20, 2019 21:26
Turn off bracket pasting mode in Terminal MacOS
Paste printf "\e[?2004l" in Ierminal.
from pyspark.sql.types import *
from pyspark.sql.functions import *
#Flatten array of structs and structs
def flatten(df):
# compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0: