Skip to content

Instantly share code, notes, and snippets.

=== Questions ===
How to automate to delete expired data from EmrFSMetadata
=== Actions ===
step 1: enable TTL on deletionTTL attribute in EmrFSMetadata dynamodb table;
step 2: update emrfs-site.xml on all nodes in the EMR cluster.
Set fs.s3.consistent.metadata.delete.ttl.enabled to true
Set fs.s3.consistent.metadata.delete.ttl.expiration.seconds to 3600 (1h)
from pyspark.sql.types import *
from pyspark.sql.functions import *
#Flatten array of structs and structs
def flatten(df):
# compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0:
@nmukerje
nmukerje / gist:92159d9b0b955e964ee99e47bbdf8b6d
Created March 20, 2019 21:26
Turn off bracket pasting mode in Terminal MacOS
Paste printf "\e[?2004l" in Ierminal.
jupyter nbconvert --to notebook --ExecutePreprocessor.kernel_name=pysparkkernel --ExecutePreprocessor.timeout=18000 --execute <notebook>.ipynb
pip install jupyter_contrib_nbextensions
jupyter contrib nbextension install --user
jupyter nbextension enable execute_time/ExecuteTime
%%sh
echo '{"kernel_python_credentials" : {"url": "http://<EMR Master node Private IP>:8998/"}, "session_configs":
{"executorMemory": "2g","executorCores": 2,"numExecutors":4}}' > ~/.sparkmagic/config.json
less ~/.sparkmagic/config.json
# AWS Lambda function to AutoTerminate Idle EMR Clusters
#
# ENVIRONMENT VARIABLES to define for the Lambda function:
# LOOKBACK_TIME_IN_DAYS : 31
# IDLE_TIME_IN_MINS : 15 (should be intervals of 5 mins)
# HONOR_TERMINATION_PROTECTION : TRUE
#
import json,boto3,os
from datetime import datetime, timedelta
@nmukerje
nmukerje / Spark_Convert_Columns_LowerCase_Remove_Hyphens
Last active October 26, 2020 23:52
Pyspark Makes Nested Coluumns Lower Case and Replaces Hyphens with Underscores.
from pyspark.sql import functions as F
def get_column_wise_schema(df_string_schema, df_columns):
# Returns a dictionary containing column name and corresponding column schema as string.
column_schema_dict = {}
i = 0
while i < len(df_columns):
current_col = df_columns[i]
next_col = df_columns[i + 1] if i < len(df_columns) - 1 else None
current_col_split_key = '[' + current_col + ': ' if i == 0 else ' ' + current_col + ': '
## Convert a StructType to MapType column :
## Useful when you want to move all Dynamic Fields of a Schema within a StructType column into a single MapType Column.
from pyspark.sql.types import *
from pyspark.sql.functions import *
import json
def toMap(d):
if d:
return(json.loads(d))
@nmukerje
nmukerje / gist:98907a6402d5fb14c95809e5fa1d92b7
Created February 8, 2021 22:49
Glue Studio SQL Transform
def SQLTransform (glueContext, dfc) -> DynamicFrameCollection:
dfc.select(list(dfc.keys())[0]).toDF().createOrReplaceTempView("<SRC1>")
### Enter your SQL Statement here
sql_statement = "SELECT bucket,key,struct.col1,array_col[0].array_col1[0].col1 FROM <SRC1> a ..."
###
output_df = spark.sql(sql_statement)
dyf_output = DynamicFrame.fromDF(output_df, glueContext, "SQLTransform0")
return(DynamicFrameCollection({"SQLTransform0": dyf_output}, glueContext))