Skip to content

Instantly share code, notes, and snippets.

=== Questions ===
How to automate to delete expired data from EmrFSMetadata
=== Actions ===
step 1: enable TTL on deletionTTL attribute in EmrFSMetadata dynamodb table;
step 2: update emrfs-site.xml on all nodes in the EMR cluster.
Set fs.s3.consistent.metadata.delete.ttl.enabled to true
Set fs.s3.consistent.metadata.delete.ttl.expiration.seconds to 3600 (1h)
@nmukerje
nmukerje / gdelta_parquet.py
Last active March 1, 2021 04:07
Converts the GDELT Dataset in S3 to Parquet.
# Get the column names
from urllib import urlopen
html = urlopen("http://gdeltproject.org/data/lookups/CSV.header.dailyupdates.txt").read().rstrip()
columns = html.split('\t')
# Load 73,385,698 records from 2016
df1 = spark.read.option("delimiter", "\t").csv("s3://gdelt-open-data/events/2016*")
# Apply the schema
df2=df1.toDF(*columns)
# Split SQLDATE to Year, Month and Day
from pyspark.sql.functions import expr
>> Created the key
keytool -genkey -alias hiveserver2 -keyalg RSA -keystore /tmp/hs2keystore.jks -keysize 2048
Enter keystore password: XXXXXXXX
Re-enter new password: XXXXXXXX
What is your first and last name?
[Unknown]: localhost
What is the name of your organizational unit?
[Unknown]: myorg
@nmukerje
nmukerje / spark-test.py
Created March 28, 2018 17:12
Read multiline json from S3
data = sc.wholeTextFiles('s3://<bucket>/dataset249').map(lambda x:x[1])
print(data.collect())
df=spark.read.json(data)
df.printSchema()
df.count()
@nmukerje
nmukerje / install_rkernel.txt
Created March 3, 2018 00:07
Install R IRKernel on SageMaker
$> sudo yum install R
$> R
R> install.packages('devtools')
R> devtools::install_github('IRkernel/IRkernel')
R> IRkernel::installspec()
@nmukerje
nmukerje / install_sagemaker.sh
Created February 16, 2018 00:45
install sagemaker on EMR
sudo pip install --upgrade pip
sudo /usr/local/bin/pip install sagemaker_pyspark
@nmukerje
nmukerje / emr-ssh.py
Last active February 8, 2018 00:48
SSH to EMR Master
import paramiko
## get keypair from S3 or application host
k = paramiko.RSAKey.from_private_key_file("<keypair.pem> file")
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
print "connecting"
c.connect( hostname = "<emr master node>", username = "hadoop", pkey = k )
print "connected"
command1='nohup sqoop import -D mapred.job.name=SqoopTest121 --connect jdbc:postgresql://db.rds.amazonaws.com:5432/apostgres --username user --table random_data --m 10 --password XXXX --split-by id >> /tmp/logs/sqoop/SqoopTest121.log 2>&1 &'
print "Executing {}".format( command )
@nmukerje
nmukerje / s3-watcher.sh
Created February 7, 2018 21:15
Syncs files from EC2 to S3.
#!/bin/bash
BUCKET='<bucket>/sqoop/'
function upload() {
local path=$1
local file=$2
echo $path$file
aws s3 cp $path$file s3://$BUCKET$file &
}
@nmukerje
nmukerje / gist:3647491bf4a35be2da7ae32be7585b53
Last active January 11, 2018 01:33
Exports a Zeppelin Notebook in S3 to a Python file in S3
import json,boto3
def notebook2py(nb_bucket,nb_key,py_bucket,py_key):
s3c = boto3.client('s3')
obj = s3c.get_object(Bucket=nb_bucket, Key=nb_key)
content = json.loads(obj['Body'].read())
notebook_text = ['\n'+item['text'][8::] for item in content['paragraphs'] if 'enabled' in item['config'] and item['config']['enabled']==True and item['text'].startswith('%pyspark')]
io_handle = StringIO('\n'.join(notebook_text))
@nmukerje
nmukerje / gist:3e216e03edb82c1b69d41aa4ec46cedd
Last active May 10, 2018 22:17
Python Dependency Zip creation
# Install pip and virtualend if not already installed
$>sudo easy_install pip
$>sudo pip install virtualen
$>sudo pip install virtualenvwrapper
Add to .bash/profile:
# set where virutal environments will live
export WORKON_HOME=$HOME/.virtualenvs
# ensure all new environments are isolated from the site-packages directory