Skip to content

Instantly share code, notes, and snippets.

@ebuildy
Last active November 12, 2022 21:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ebuildy/b1ce593a1daafd4b920d5deb23636d90 to your computer and use it in GitHub Desktop.
Save ebuildy/b1ce593a1daafd4b920d5deb23636d90 to your computer and use it in GitHub Desktop.
ETL to elasticsearch with pyspark , argo-workflow , kubernetes
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: djobi-workflow-
spec:
entrypoint: djobi
serviceAccountName: workflow
ttlStrategy:
secondsAfterSuccess: 14400
secondsAfterFailure: 86400
# some workflow arguments
arguments:
parameters:
- name: day
value: 10
- name: month
value: 11
- name: year
value: 2022
templates:
- name: djobi
# only 1 step
steps:
- - name: clicks
# reference the previous template
templateRef:
name: pyspark-sql-elasticsearch
template: pyspark
arguments:
# inputs parameters defined by the template
parameters:
- name: logLevel
value: info
- name: pyCodePreExecute
value: |
# read parquet from HDFS, and register as a view available from SQL code
spark.read \
.parquet("/user/datahub/year={{workflow.parameters.year}}/month={{workflow.parameters.month}}/day={{workflow.parameters.day}}") \
.limit(100000) \
.createOrReplaceTempView("v_datahub")
- name: pyCodePostExecute
value: |
from pyspark.sql.functions import lit
# let's add a column
df = df.withColumn("day", lit("%02d/%02d/%d" % ({{workflow.parameters.day}}, {{workflow.parameters.month}}, {{workflow.parameters.year}})))
- name: sql
value: |
WITH
v_aggs AS
(
....
)
SELECT
map(
...
) as meta,
named_struct(
....
) as dimensions,
map(
....
) as metrics
FROM
v_aggs
- name: esIndex
value: "agg-clicks"
- name: esEndpoint
value: "my-elasticsearch:9200"
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: pyspark-sql-elasticsearch
labels:
workflows.argoproj.io/controller-instanceid: my-ci-controller
annotations:
workflows.argoproj.io/description: |
Run a pyspark job to:
- execute SQL query with Spark
- write results into elasticsearch
spec:
workflowMetadata:
labels:
workflows.argoproj.io/archive-strategy: "always"
annotations:
workflows.argoproj.io/description: |
Run a pyspark job to:
- execute SQL query with Spark
- write results into elasticsearch
entrypoint: pyspark
ttlStrategy:
secondsAfterSuccess: 14400
secondsAfterFailure: 86400
templates:
- name: pyspark
inputs:
parameters:
- name: pyCodePreExecute
description: |
Python code to execute before sql, example to setup view:
default: ""
- name: logLevel
description: Spark root log level
default: "info"
enum: ["trace", "debug", "info", "warn", "error"]
- name: sparkDriverOptions
description: Spark driver java opiotns
default: |
-Dhttp.proxyHost=proxy.svc -Dhttp.proxyPort=2001 -Dhttps.proxyHost=proxy.svc -Dhttps.proxyPort=2001 -Dhttp.nonProxyHosts='127.0.0.1|*.svc|*.local'
- name: sql
description: the SQL query to execute
- name: esIndex
description: the output elasticsearch index
- name: esEndpoint
description: the outpout elasticsearch cluster node
artifacts:
- name: spark-config
path: /opt/spark/conf/spark-defaults.conf
raw:
data: |
# configure HDFS access
spark.hadoop.dfs.nameservices: XXX:8020
spark.hadoop.fs.defaultFS: hdfs://XXXX:8020
spark.eventLog.enabled: false
# download elasticsearch spark lib
spark.jars.packages: org.elasticsearch:elasticsearch-spark-30_2.12:8.4.3
- name: log4j2-properties
path: /opt/spark/conf/log4j2.properties
raw:
data: |
status = warn
rootLogger.level = {{inputs.parameters.logLevel}}
logger.py.name = org.apache.spark.api.python.PythonGatewayServer
logger.py.level = {{inputs.parameters.logLevel}}
logger.repl.name = org.apache.spark.repl.Main
logger.repl.level = {{inputs.parameters.logLevel}}
logger.spark.name = org.apache.spark
logger.spark.level = {{inputs.parameters.logLevel}}
- name: script-python
path: /usr/share/app/script.py
raw:
data: |
from pyspark.sql import SparkSession
import json, requests
spark = SparkSession.builder.getOrCreate()
{{inputs.parameters.pyCodePreExecute}}
df = spark.sql("""
{{inputs.parameters.sql}}
""")
# we use the official elasticsearch spark lib (pretty easy)
df.write \
.format("org.elasticsearch.spark.sql") \
.option("inferSchema", "true") \
.option("es.nodes", "{{inputs.parameters.esEndpoint}}") \
.mode("append") \
.save("{{inputs.parameters.esIndex}}")
# Make sure index is up-to-date (https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html)
requests.post("http://{{inputs.parameters.esEndpoint}}/{{inputs.parameters.esIndex}}/_refresh")
# count how many document we have added (we could add a query string here)
with open("/tmp/results.json", "w") as resultsFile:
countURL = "http://{{inputs.parameters.esEndpoint}}/{{inputs.parameters.esIndex}}/_count"
countData = requests.get(countURL).json()
resultsFile.write(json.dumps({"status":"ok", "count" : countData["count"]}))
# define output from file created by the python script
outputs:
parameters:
- name: results
valueFrom:
default: "{\"status\" : \"failure\"}"
path: /tmp/results.json
# Run as UID 1000 = spark (from docker image)
securityContext:
runAsUser: 1000
runAsGroup: 1000
fsGroup: 1000
volumes:
- name: tmp
emptyDir: {}
- name: spark-config
emptyDir: {}
- name: spark-maven-ivy-cache
persistentVolumeClaim:
claimName: spark-maven-ivy-cache
script:
image: ghcr.io/datatok/spark:v3.3.1-edge
command: ["sh"]
# set spark driver options for client mode
source: |
cat /usr/share/app/script.py
spark-submit \
--master local[1] \
--driver-java-options "{{inputs.parameters.sparkDriverOptions}}" \
/usr/share/app/script.py
resources:
limits:
cpu: '2'
memory: 1Gi
requests:
cpu: 250m
memory: 512Mi
# strong security context
securityContext:
capabilities:
drop:
- ALL
runAsUser: 1000
runAsNonRoot: true
readOnlyRootFilesystem: true
allowPrivilegeEscalation: false
# to access spark driver
ports:
- name: spark-ui
containerPort: 4040
volumeMounts:
- name: tmp
mountPath: /tmp
# this allow to cache downloaded resources
- name: spark-maven-ivy-cache
mountPath: /opt/spark/.ivy2
subPath: ivy2
- name: spark-maven-ivy-cache
mountPath: /opt/spark/.m2
subPath: m2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment