Skip to content

Instantly share code, notes, and snippets.

@valerybriz
Last active June 1, 2023 13:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save valerybriz/a5a69a65df523529dd21498fbfe19dbb to your computer and use it in GitHub Desktop.
Save valerybriz/a5a69a65df523529dd21498fbfe19dbb to your computer and use it in GitHub Desktop.
Pyspark Elastic Bulk Loader
import sys
from pyspark.sql import SparkSession
import logging
APP_NAME = "Elastic Bulk loader"
INPUT_PATH = "gs://PATH/TO/THE/PARQUETS"
OUTPUT_PATH = "gs://PATH/TO/THE/PARQUETS"
ELASTIC_HOST = "http://localhost"
WRITE_MODE = "append"
ELASTIC_PORT = "9200"
def run(spark: SparkSession, elastic_search_index: str,
truststore_location: str, truststore_pass: str,
elastic_host: str, api_key: str) -> None:
# We need to add the full path to the file in the cluster
truststore_location = f"file:///PATH/TO/THE/FILE/{truststore_location}"
if not elastic_host:
elastic_host = ELASTIC_HOST
else:
elastic_host = f"http://{elastic_host}"
# The bearer auth method that the connection will use
AUTH_TOKEN = f"ApiKey {api_key}"
df = spark.read\
.load(path=INPUT_PATH, format="parquet")
# Optional configurations for elastic:
# es.http.timeout
# es.http.retries
# es.batch.size.entries
# es.batch.write.retry.count
# es.batch.write.retry.wait
df.write.format("org.elasticsearch.spark.sql") \
.option("es.nodes.wan.only", "true") \
.option("es.nodes.discovery", "false") \
.option("es.nodes", elastic_host) \
.option("es.port", ELASTIC_PORT) \
.option("es.net.ssl.truststore.location", truststore_location) \
.option("es.net.ssl.truststore.pass", truststore_pass) \
.option("es.index.auto.index", "true") \
.option("es.write.operation", "index") \
.option("es.net.ssl.cert.allow.self.signed", "true") \
.option("es.net.http.header.Authorization", AUTH_TOKEN) \
.option("es.mapping.id", "id") \
.option("es.net.ssl", "true") \
.option("es.field.read.empty.as.null", "true") \
.option("es.resource", elastic_search_index) \
.mode(WRITE_MODE) \
.save()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
arguments = sys.argv
logging.info(f"Argument list passed: {arguments}")
if len(arguments) < 4:
logging.warning("Arguments missing")
sys.exit(1)
elastic_index = arguments[1]
truststore_location = arguments[2]
truststore_pass = arguments[3]
elastic_host = arguments[4]
api_key = arguments[5]
spark = SparkSession.builder\
.appName(APP_NAME) \
.getOrCreate()
logging.info("Application Initialized: " + spark.sparkContext.appName)
run(spark, elastic_index, truststore_location, truststore_pass, elastic_host, api_key)
logging.info("Application Done: " + spark.sparkContext.appName)
spark.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment