Skip to content

Instantly share code, notes, and snippets.

@dfdeshom
Last active April 12, 2021 13:56
Show Gist options
  • Save dfdeshom/3b422148fc46d828df46f113282a1711 to your computer and use it in GitHub Desktop.
Save dfdeshom/3b422148fc46d828df46f113282a1711 to your computer and use it in GitHub Desktop.

Paralellism in ES and Hadoop/Spark

1 shard corresponds to 1 Spark partition.

Reading from ES: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/arch.html#arch-reading . Beware of increasing the number of shards on ES for performance reasons:

A common concern (read optimization) for improving performance is to increase the number of shards and thus increase the number of tasks on the Hadoop side. Unless such gains are demonstrated through benchmarks, we recommend against such a measure since in most cases, an Elasticsearch shard can easily handle data streaming to a Hadoop or Spark task.

Writing from ES: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/arch.html#arch-writing . Write performance can be increased by having more partitions:

elasticsearch-hadoop detects the number of (primary) shards where the write will occur and distributes the writes between these. The more splits/partitions available, the more mappers/reducers can write data in parallel to Elasticsearch.

Note that nothing is said about ES handling these number of writes automatically. Production testing will determine what an ES cluster can handle in terms of write load.

ES serialization settings

Settings here: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html#configuration-serialization

In particular, es.batch.write.retry.count says that if a bulk write is retried and fails, the whole Hadoop/Spark job fails. The default number of retries is 3. es.batch.size.entries and es.batch.size.bytes are 2 settings that can be increased to make writes more poerformant.

Runtime settings

Disable speculative execution, especially if we run into duplicate data:

speculative execution is an optimization, enabled by default, that allows Hadoop to create duplicates tasks of those which it considers hanged or slowed down. When doing data crunching or reading resources, having duplicate tasks is harmless and means at most a waste of computation resources; however when writing data to an external store, this can cause data corruption through duplicates or unnecessary updates.

For Spark, disable it through spark.speculation=false. This is the default.

Writingand reading to ES using pyspark and the Hadoop API

The es-hadoop library has native Spark integration for writing RDDs and DFs but the API in only available for Scala and Java: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html .

Here is the main doc for writing using the Hadoop API: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/mapreduce.html#_emphasis_new_emphasis_literal_org_apache_hadoop_mapreduce_literal_api . The format expected is roughly (None, {k:v}) :

EsOutputFormat expects a Map<Writable, Writable> representing a document value that is converted internally into a JSON document and indexed in Elasticsearch. Hadoop OutputFormat requires implementations to expect a key and a value however, since for Elasticsearch only the document (that is the value) is necessary, EsOutputFormat ignores the key.

Not sure if one should also disable speculative execution when using the Hadoop API directly:

   conf.setBoolean("mapred.map.tasks.speculative.execution", false)    
   conf.setBoolean("mapred.reduce.tasks.speculative.execution", false)

Performance considerations

Summary of this page: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/performance.html Notes:

  • read performance:
    • the number of results do not influence the performance of the connector nor Elasticsearch itself.
    • increasing read performance by adding more shards will most likely be beneficial only if the plan is to scale out, ie add more machines to handle the increased number of shards.
  • write performance:
    • a bulk request should to not take longer than 1-2s to be successfully processed. If it takes more, decrease the size of the bulk request. If it takes less, consider increasing it slowly
    • increasing the numner of retries and wait often mask the problem of the cluster being overloaded, they don't solve it.
    • Limit the numner fo tasks writing to ES if your cluster is too small or the number of write tasks from Spark is too large

Index-tuning guide from ES: https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-indexing-speed.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment