Skip to content

Instantly share code, notes, and snippets.

@jspooner
Last active July 4, 2018 18:28
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jspooner/ccba83a5a8f36fe1276350ef838be38d to your computer and use it in GitHub Desktop.
Save jspooner/ccba83a5a8f36fe1276350ef838be38d to your computer and use it in GitHub Desktop.
Spark(emr-5.2.0) + Elasticsearch Bulk Loading Optimization on AWS
# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
# Before you set out to tweak and tune the configuration, make sure you
# understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please see the documentation for further information on configuration options:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration.html>
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
#cluster.name: my-application
cluster.name: ES5
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
#node.name: node-1
#
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
#path.data: /path/to/data
path.data: /media/ephemeral0,/media/ephemeral1
#
# Path to log files:
#
#path.logs: /path/to/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
#network.host: 192.168.0.1
network.host: [_local_, _ec2_]
#
# Set a custom port for HTTP:
#
#http.port: 9200
http.max_content_length: 500mb
#
# For more information, see the documentation at:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html>
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when new node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
#discovery.zen.ping.unicast.hosts: ["host1", "host2"]
# discovery.zen.ping.unicast.hosts: ["ec2-54-176-209-44.us-west-1.compute.amazonaws.com", "ec2-54-241-112-51.us-west-1.compute.amazonaws.com"]
#
# Prevent the "split brain" by configuring the majority of nodes (total number of nodes / 2 + 1):
#
#discovery.zen.minimum_master_nodes: 3
#
discovery.zen.hosts_provider: ec2
discovery.ec2.groups: ElasticSearchTest
#
# For more information, see the documentation at:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html>
#
# ---------------------------------- Gateway -----------------------------------
#
# Block initial recovery after a full cluster restart until N nodes are started:
#
#gateway.recover_after_nodes: 3
gateway.recover_after_nodes: 2
#
# For more information, see the documentation at:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-gateway.html>
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true
#
action.auto_create_index: .security,.monitoring*,.watches,.triggered_watches,.watcher-history*,bulk*
#
# Elasticsearch HEAD standalone support
http.cors.enabled: true
http.cors.allow-origin: /http?:\/\/localhost(:[0-9]+)?/
# ---------------------------------- Thread Pools -----------------------------------
#
#thread_pool.bulk.size: 9
thread_pool.bulk.queue_size: 1000
#indices.memory.index_buffer_size: '40%'
# ---------------------------------- Queries -----------------------------------
indices.query.bool.max_clause_count: 600000
# ---------------------------------- xpack -----------------------------------
#
xpack.security.enabled: false
#
xpack.monitoring.enabled: true
#
# ---------------------------------- node -----------------------------------
node.master: false
node.data: true
node.ingest: false
#!/bin/bash
set -x # enable bash debug mode
export INSTANCE_IP=$1
export SSH_CMD="ssh -o ControlPath=~/.ssh/master-$$ -o ControlMaster=auto -o ControlPersist=60 ec2-user@$INSTANCE_IP"
export SUDO_CMD="$SSH_CMD sudo"
rsync -avz ../../elasticsearch ec2-user@$INSTANCE_IP:/tmp/repo
$SUDO_CMD /tmp/repo/elasticsearch/scripts/tag_instance.sh us-east-1
# setup mounts
$SUDO_CMD cp /tmp/repo/elasticsearch/config/etc/fstab /etc/fstab
$SUDO_CMD chmod 0644 /etc/fstab
$SUDO_CMD chown root: /etc/fstab
# build filesystems
$SUDO_CMD mkfs -t ext4 /dev/xvdb
$SUDO_CMD mkfs -t ext4 /dev/xvdc
$SUDO_CMD mkdir /media/ephemeral1
$SUDO_CMD mount /media/ephemeral0
$SUDO_CMD mount /media/ephemeral1
## Installing Elasticsearch
# Remove Java 7
$SUDO_CMD yum remove -y java-1.7.0-openjdk
# Install Java 8
$SUDO_CMD yum install -y java-1.8.0
# Add RPM Packages
$SUDO_CMD rpm -i https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.2.0.rpm
$SUDO_CMD yum install -y sysstat
# Add Services to init startup
$SUDO_CMD chkconfig --add elasticsearch
# Install EC2 Discovery plugin
$SUDO_CMD /usr/share/elasticsearch/bin/elasticsearch-plugin install discovery-ec2 --batch
# chown Disk as elasticsearch
$SUDO_CMD chown -R elasticsearch: /media/ephemeral*
# Copy config files into target locations
$SUDO_CMD cp /tmp/repo/elasticsearch/config/etc/elasticsearch/* /etc/elasticsearch/.
$SUDO_CMD cp /tmp/repo/elasticsearch/config/etc/sysconfig/elasticsearch /etc/sysconfig/elasticsearch
$SUDO_CMD chmod 0644 /etc/sysconfig/elasticsearch
# Install plugins
$SUDO_CMD /usr/share/elasticsearch/bin/elasticsearch-plugin install x-pack --batch
$SUDO_CMD /usr/share/elasticsearch/bin/elasticsearch-plugin install mapper-murmur3 --batch
$SUDO_CMD /usr/share/elasticsearch/bin/elasticsearch-plugin install repository-s3 --batch
$SUDO_CMD service elasticsearch restart
-Xms32g
-Xmx32g
curl -s -XPUT $ES_HOST:9200/_cluster/settings -d '
{
"transient": {
"indices.store.throttle.type": "none"
}
}'
curl -XPUT -s $ES_HOST:9200/_template/bulk-sightings -d '{
"template": "bulk-*",
"index.translog.durability": "async",
"index.translog.sync_interval": "5s",
"settings": {
"index.store.type": "niofs",
"index": {
"number_of_shards": 10,
"number_of_replicas": 0,
"refresh_interval" : "-1"
}
},
"mappings": {
"sighting": {
"_source": {
"enabled": "false"
},
"_all": {
"enabled":"false"
},
"properties": {
"device_id": {
"type": "keyword",
"fields": {
"hash": {
"type": "murmur3"
}
}
},
"hour": {
"type": "integer"
},
"date": {
"type": "date",
"format": "epoch_second"
},
"gh9": {
"type":"keyword",
"index": "not_analyzed"
},
"requests": {
"type":"integer",
"index": "false"
},
"index_suffix": {
"type": "string",
"index": "no"
},
"location": {
"type": "geo_point"
}
}
}
}
}'
import org.elasticsearch.spark.sql._
import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark._
val esNodes =
var esConfig:Map[String,String] = Map(
"es.nodes" -> esNodes,
"es.nodes.discovery" -> "false",
"es.nodes.wan.only" -> "true",
"es.batch.write.retry.count" -> "1",
"es.batch.size.entries" -> "0",
"es.batch.size.bytes" -> "5mb",
"es.net.http.auth.user" -> "elastic",
"es.net.http.auth.pass" -> "changeme",
"es.resource.write" -> "bulk-sightings-{index_suffix}/sighting"
)
typedEsDF.repartition(400).saveToEs(esConfig)
[
{
"Classification": "spark-defaults",
"Properties": {
"spark.scheduler.mode": "FAIR",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.autoBroadcastJoinThreshold": "300000000"
}
},
{
"Classification": "spark-log4j",
"Properties": {
"log4j.category.org.elasticsearch.spark": "TRACE"
}
},
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "true"
}
},
{
"Classification": "zeppelin-env",
"Configurations": [
{
"Classification": "export",
"Properties": {
"ZEPPELIN_NOTEBOOK_USER":"hadoop",
"SPARK_SUBMIT_OPTIONS" : "\"$SPARK_SUBMIT_OPTIONS --packages org.elasticsearch:elasticsearch-spark-20_2.11:5.1.1 --jars /home/hadoop/jars/mysql-connector-java-6.0.5.jar,/home/hadoop/jars/geohex4j-3.2.2.jar --conf spark.executor.extraLibraryPath=/home/hadoop/jars/mysql-connector-java-6.0.5.jar\""
}
}
]
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment