Skip to content

Instantly share code, notes, and snippets.

View AutoTerminate_Idle_EMR_Clusters.py
# AWS Lambda function to AutoTerminate Idle EMR Clusters
#
# ENVIRONMENT VARIABLES to define for the Lambda function:
# LOOKBACK_TIME_IN_DAYS : 31
# IDLE_TIME_IN_MINS : 15 (should be intervals of 5 mins)
# HONOR_TERMINATION_PROTECTION : TRUE
#
import json,boto3,os
from datetime import datetime, timedelta
View Point Jupyter to EMR Cluster
%%sh
echo '{"kernel_python_credentials" : {"url": "http://<EMR Master node Private IP>:8998/"}, "session_configs":
{"executorMemory": "2g","executorCores": 2,"numExecutors":4}}' > ~/.sparkmagic/config.json
less ~/.sparkmagic/config.json
View Jupyter Execution Time
pip install jupyter_contrib_nbextensions
jupyter contrib nbextension install --user
jupyter nbextension enable execute_time/ExecuteTime
View Execute Jupyter Notebooks
jupyter nbconvert --to notebook --ExecutePreprocessor.kernel_name=pysparkkernel --ExecutePreprocessor.timeout=18000 --execute <notebook>.ipynb
@nmukerje
nmukerje / gist:92159d9b0b955e964ee99e47bbdf8b6d
Created Mar 20, 2019
Turn off bracket pasting mode in Terminal MacOS
View gist:92159d9b0b955e964ee99e47bbdf8b6d
Paste printf "\e[?2004l" in Ierminal.
View Pyspark Flatten json
from pyspark.sql.types import *
from pyspark.sql.functions import *
#Flatten array of structs and structs
def flatten(df):
# compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0:
View gist:9bcdd58343c70a3a39f2a31134608360
=== Questions ===
How to automate to delete expired data from EmrFSMetadata
=== Actions ===
step 1: enable TTL on deletionTTL attribute in EmrFSMetadata dynamodb table;
step 2: update emrfs-site.xml on all nodes in the EMR cluster.
Set fs.s3.consistent.metadata.delete.ttl.enabled to true
Set fs.s3.consistent.metadata.delete.ttl.expiration.seconds to 3600 (1h)
@nmukerje
nmukerje / gdelta_parquet.py
Last active Jun 29, 2018
Converts the GDELT Dataset in S3 to Parquet.
View gdelta_parquet.py
# Get the column names
from urllib import urlopen
html = urlopen("http://gdeltproject.org/data/lookups/CSV.header.dailyupdates.txt").read().rstrip()
columns = html.split('\t')
# Load 73,385,698 records from 2016
df1 = spark.read.option("delimiter", "\t").csv("s3://gdelt-open-data/events/2016*")
# Apply the schema
df2=df1.toDF(*columns)
# Split SQLDATE to Year, Month and Day
from pyspark.sql.functions import expr
View hive_ssl
>> Created the key
keytool -genkey -alias hiveserver2 -keyalg RSA -keystore /tmp/hs2keystore.jks -keysize 2048
Enter keystore password: XXXXXXXX
Re-enter new password: XXXXXXXX
What is your first and last name?
[Unknown]: localhost
What is the name of your organizational unit?
[Unknown]: myorg
@nmukerje
nmukerje / spark-test.py
Created Mar 28, 2018
Read multiline json from S3
View spark-test.py
data = sc.wholeTextFiles('s3://<bucket>/dataset249').map(lambda x:x[1])
print(data.collect())
df=spark.read.json(data)
df.printSchema()
df.count()
You can’t perform that action at this time.