Last active
November 12, 2022 21:21
-
-
Save ebuildy/b1ce593a1daafd4b920d5deb23636d90 to your computer and use it in GitHub Desktop.
ETL to elasticsearch with pyspark , argo-workflow , kubernetes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: argoproj.io/v1alpha1 | |
kind: Workflow | |
metadata: | |
generateName: djobi-workflow- | |
spec: | |
entrypoint: djobi | |
serviceAccountName: workflow | |
ttlStrategy: | |
secondsAfterSuccess: 14400 | |
secondsAfterFailure: 86400 | |
# some workflow arguments | |
arguments: | |
parameters: | |
- name: day | |
value: 10 | |
- name: month | |
value: 11 | |
- name: year | |
value: 2022 | |
templates: | |
- name: djobi | |
# only 1 step | |
steps: | |
- - name: clicks | |
# reference the previous template | |
templateRef: | |
name: pyspark-sql-elasticsearch | |
template: pyspark | |
arguments: | |
# inputs parameters defined by the template | |
parameters: | |
- name: logLevel | |
value: info | |
- name: pyCodePreExecute | |
value: | | |
# read parquet from HDFS, and register as a view available from SQL code | |
spark.read \ | |
.parquet("/user/datahub/year={{workflow.parameters.year}}/month={{workflow.parameters.month}}/day={{workflow.parameters.day}}") \ | |
.limit(100000) \ | |
.createOrReplaceTempView("v_datahub") | |
- name: pyCodePostExecute | |
value: | | |
from pyspark.sql.functions import lit | |
# let's add a column | |
df = df.withColumn("day", lit("%02d/%02d/%d" % ({{workflow.parameters.day}}, {{workflow.parameters.month}}, {{workflow.parameters.year}}))) | |
- name: sql | |
value: | | |
WITH | |
v_aggs AS | |
( | |
.... | |
) | |
SELECT | |
map( | |
... | |
) as meta, | |
named_struct( | |
.... | |
) as dimensions, | |
map( | |
.... | |
) as metrics | |
FROM | |
v_aggs | |
- name: esIndex | |
value: "agg-clicks" | |
- name: esEndpoint | |
value: "my-elasticsearch:9200" | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: argoproj.io/v1alpha1 | |
kind: WorkflowTemplate | |
metadata: | |
name: pyspark-sql-elasticsearch | |
labels: | |
workflows.argoproj.io/controller-instanceid: my-ci-controller | |
annotations: | |
workflows.argoproj.io/description: | | |
Run a pyspark job to: | |
- execute SQL query with Spark | |
- write results into elasticsearch | |
spec: | |
workflowMetadata: | |
labels: | |
workflows.argoproj.io/archive-strategy: "always" | |
annotations: | |
workflows.argoproj.io/description: | | |
Run a pyspark job to: | |
- execute SQL query with Spark | |
- write results into elasticsearch | |
entrypoint: pyspark | |
ttlStrategy: | |
secondsAfterSuccess: 14400 | |
secondsAfterFailure: 86400 | |
templates: | |
- name: pyspark | |
inputs: | |
parameters: | |
- name: pyCodePreExecute | |
description: | | |
Python code to execute before sql, example to setup view: | |
default: "" | |
- name: logLevel | |
description: Spark root log level | |
default: "info" | |
enum: ["trace", "debug", "info", "warn", "error"] | |
- name: sparkDriverOptions | |
description: Spark driver java opiotns | |
default: | | |
-Dhttp.proxyHost=proxy.svc -Dhttp.proxyPort=2001 -Dhttps.proxyHost=proxy.svc -Dhttps.proxyPort=2001 -Dhttp.nonProxyHosts='127.0.0.1|*.svc|*.local' | |
- name: sql | |
description: the SQL query to execute | |
- name: esIndex | |
description: the output elasticsearch index | |
- name: esEndpoint | |
description: the outpout elasticsearch cluster node | |
artifacts: | |
- name: spark-config | |
path: /opt/spark/conf/spark-defaults.conf | |
raw: | |
data: | | |
# configure HDFS access | |
spark.hadoop.dfs.nameservices: XXX:8020 | |
spark.hadoop.fs.defaultFS: hdfs://XXXX:8020 | |
spark.eventLog.enabled: false | |
# download elasticsearch spark lib | |
spark.jars.packages: org.elasticsearch:elasticsearch-spark-30_2.12:8.4.3 | |
- name: log4j2-properties | |
path: /opt/spark/conf/log4j2.properties | |
raw: | |
data: | | |
status = warn | |
rootLogger.level = {{inputs.parameters.logLevel}} | |
logger.py.name = org.apache.spark.api.python.PythonGatewayServer | |
logger.py.level = {{inputs.parameters.logLevel}} | |
logger.repl.name = org.apache.spark.repl.Main | |
logger.repl.level = {{inputs.parameters.logLevel}} | |
logger.spark.name = org.apache.spark | |
logger.spark.level = {{inputs.parameters.logLevel}} | |
- name: script-python | |
path: /usr/share/app/script.py | |
raw: | |
data: | | |
from pyspark.sql import SparkSession | |
import json, requests | |
spark = SparkSession.builder.getOrCreate() | |
{{inputs.parameters.pyCodePreExecute}} | |
df = spark.sql(""" | |
{{inputs.parameters.sql}} | |
""") | |
# we use the official elasticsearch spark lib (pretty easy) | |
df.write \ | |
.format("org.elasticsearch.spark.sql") \ | |
.option("inferSchema", "true") \ | |
.option("es.nodes", "{{inputs.parameters.esEndpoint}}") \ | |
.mode("append") \ | |
.save("{{inputs.parameters.esIndex}}") | |
# Make sure index is up-to-date (https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html) | |
requests.post("http://{{inputs.parameters.esEndpoint}}/{{inputs.parameters.esIndex}}/_refresh") | |
# count how many document we have added (we could add a query string here) | |
with open("/tmp/results.json", "w") as resultsFile: | |
countURL = "http://{{inputs.parameters.esEndpoint}}/{{inputs.parameters.esIndex}}/_count" | |
countData = requests.get(countURL).json() | |
resultsFile.write(json.dumps({"status":"ok", "count" : countData["count"]})) | |
# define output from file created by the python script | |
outputs: | |
parameters: | |
- name: results | |
valueFrom: | |
default: "{\"status\" : \"failure\"}" | |
path: /tmp/results.json | |
# Run as UID 1000 = spark (from docker image) | |
securityContext: | |
runAsUser: 1000 | |
runAsGroup: 1000 | |
fsGroup: 1000 | |
volumes: | |
- name: tmp | |
emptyDir: {} | |
- name: spark-config | |
emptyDir: {} | |
- name: spark-maven-ivy-cache | |
persistentVolumeClaim: | |
claimName: spark-maven-ivy-cache | |
script: | |
image: ghcr.io/datatok/spark:v3.3.1-edge | |
command: ["sh"] | |
# set spark driver options for client mode | |
source: | | |
cat /usr/share/app/script.py | |
spark-submit \ | |
--master local[1] \ | |
--driver-java-options "{{inputs.parameters.sparkDriverOptions}}" \ | |
/usr/share/app/script.py | |
resources: | |
limits: | |
cpu: '2' | |
memory: 1Gi | |
requests: | |
cpu: 250m | |
memory: 512Mi | |
# strong security context | |
securityContext: | |
capabilities: | |
drop: | |
- ALL | |
runAsUser: 1000 | |
runAsNonRoot: true | |
readOnlyRootFilesystem: true | |
allowPrivilegeEscalation: false | |
# to access spark driver | |
ports: | |
- name: spark-ui | |
containerPort: 4040 | |
volumeMounts: | |
- name: tmp | |
mountPath: /tmp | |
# this allow to cache downloaded resources | |
- name: spark-maven-ivy-cache | |
mountPath: /opt/spark/.ivy2 | |
subPath: ivy2 | |
- name: spark-maven-ivy-cache | |
mountPath: /opt/spark/.m2 | |
subPath: m2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment