Create a gist now

Instantly share code, notes, and snippets.

@smrmkt /README.md
Last active Dec 21, 2016

emr cli example

手順

  • あらかじめ aws cli の設定をすませておいてください
  • あとは以下のスクリプトを順番に実行してください
./emr_create_spark_cluster.sh`
./emr_pyspark_wc.sh`

注意点

  • emr_create_spark_cluster.sh の上の変数群は適宜自分の環境に合わせて置き換えてください
  • クラスタのインスタンスタイプや台数等は適当なので,必要に応じて書き換えてください
  • その他のcliコマンドについては,以下を参照してください
#!/bin/bash -xe
sudo apt-get install python-setuptools
sudo easy_install pip
sudo pip install -U SimpleCV
#!/bin/bash
BOOTSTRAP_PATH=$1 # 例: s3://my_buclet/scripts/
BOOTSTRUP_SCRIPT="bootstrap.sh"
KEY_NAME="XXXXX.pem"
SUBNET_ID="subnet-XXXXXXXX" # 例: subnet-fa16b1d7"
MASTER_SECURITY_GROUP= "sg-XXXXXXXX" # 例: sg-256fe358
SLAVE_SECURITY_GROUP= "sg-XXXXXXXX" # 例: sg-256fe357
aws emr create-cluster --applications Name=Hadoop Name=Hive Name=Spark Name=Tez \
--ec2-attributes '{
"KeyName":${KEY_NAME},
"InstanceProfile":"EMR_EC2_DefaultRole",
"SubnetId":${SUBNET_ID},
"EmrManagedSlaveSecurityGroup":${SLAVE_SECURITY_GROUP},
"EmrManagedMasterSecurityGroup":${MASTER_SECURITY_GROUP}
}' \
--service-role EMR_DefaultRole --release-label emr-5.2.0 \
--name 'Spark Cluster' --instance-groups '[
{
"InstanceCount":1,
"InstanceGroupType":"MASTER",
"InstanceType":"m3.xlarge",
"Name":"Master instance group - 1"
},
{
"InstanceCount":5,
"EbsConfiguration":{
"EbsBlockDeviceConfigs":[
{
"VolumeSpecification":{
"SizeInGB":840,
"VolumeType":"gp2"
},
"VolumesPerInstance":1
}
],
"EbsOptimized":false
},
"InstanceGroupType":"CORE",
"InstanceType":"r3.2xlarge",
"Name":"Core instance group - 2"
}
]' \
--bootstrap-action Path=${BOOTSTRAP_PATH}${BOOTSTRAP_SCRIPT}
#!/bin/bash
CLUSTER_ID=$1 # aws cli list-clustersで取得可能
IN_PATH=$2 # 例: s3://my_buclet/in_path/
OUT_PATH=$3 # 例: s3://my_buclet/out_path/
PYSPARK_PATH=$4 # 例: s3://my_buclet/scripts/
PYSPARK_SCRIPT="wc.py"
aws s3 cp spark/${PYSPARK_SCRIPT} ${PYSPARK_PATH}
aws emr add-steps --cluster-id ${CLUSTER_ID} \
--steps Type=spark,Name=SparkWordCountApp,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,--num-executors,1,--executor-cores,1,--executor-memory,1g,${PYSPARK_PATH}${PYSPARK_SCRIPT},${IN_PTH},${OUT_PATH}],ActionOnFailure=CONTINUE
from __future__ import print_function
from pyspark import SparkContext
import sys
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: wordcount ", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="WordCount")
text_file = sc.textFile(sys.argv[1])
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile(sys.argv[2])
sc.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment