Skip to content

Instantly share code, notes, and snippets.

# AWS Lambda function to AutoTerminate Idle EMR Clusters
# ENVIRONMENT VARIABLES to define for the Lambda function:
# IDLE_TIME_IN_MINS : 15 (should be intervals of 5 mins)
import json,boto3,os
from datetime import datetime, timedelta
View Point Jupyter to EMR Cluster
echo '{"kernel_python_credentials" : {"url": "http://<EMR Master node Private IP>:8998/"}, "session_configs":
{"executorMemory": "2g","executorCores": 2,"numExecutors":4}}' > ~/.sparkmagic/config.json
less ~/.sparkmagic/config.json
View Jupyter Execution Time
pip install jupyter_contrib_nbextensions
jupyter contrib nbextension install --user
jupyter nbextension enable execute_time/ExecuteTime
View Execute Jupyter Notebooks
jupyter nbconvert --to notebook --ExecutePreprocessor.kernel_name=pysparkkernel --ExecutePreprocessor.timeout=18000 --execute <notebook>.ipynb
nmukerje / gist:92159d9b0b955e964ee99e47bbdf8b6d
Created Mar 20, 2019
Turn off bracket pasting mode in Terminal MacOS
View gist:92159d9b0b955e964ee99e47bbdf8b6d
Paste printf "\e[?2004l" in Ierminal.
View Pyspark Flatten json
from pyspark.sql.types import *
from pyspark.sql.functions import *
#Flatten array of structs and structs
def flatten(df):
# compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0:
View gist:9bcdd58343c70a3a39f2a31134608360
=== Questions ===
How to automate to delete expired data from EmrFSMetadata
=== Actions ===
step 1: enable TTL on deletionTTL attribute in EmrFSMetadata dynamodb table;
step 2: update emrfs-site.xml on all nodes in the EMR cluster.
Set fs.s3.consistent.metadata.delete.ttl.enabled to true
Set fs.s3.consistent.metadata.delete.ttl.expiration.seconds to 3600 (1h)
nmukerje /
Last active Jun 29, 2018
Converts the GDELT Dataset in S3 to Parquet.
# Get the column names
from urllib import urlopen
html = urlopen("").read().rstrip()
columns = html.split('\t')
# Load 73,385,698 records from 2016
df1 ="delimiter", "\t").csv("s3://gdelt-open-data/events/2016*")
# Apply the schema
# Split SQLDATE to Year, Month and Day
from pyspark.sql.functions import expr
View hive_ssl
>> Created the key
keytool -genkey -alias hiveserver2 -keyalg RSA -keystore /tmp/hs2keystore.jks -keysize 2048
Enter keystore password: XXXXXXXX
Re-enter new password: XXXXXXXX
What is your first and last name?
[Unknown]: localhost
What is the name of your organizational unit?
[Unknown]: myorg
nmukerje /
Created Mar 28, 2018
Read multiline json from S3
data = sc.wholeTextFiles('s3://<bucket>/dataset249').map(lambda x:x[1])
You can’t perform that action at this time.