Skip to content

Instantly share code, notes, and snippets.

@antonymilne
Created May 25, 2022 09:27
Show Gist options
  • Save antonymilne/792a748b0d921e2f9f78cc7dd9c13c97 to your computer and use it in GitHub Desktop.
Save antonymilne/792a748b0d921e2f9f78cc7dd9c13c97 to your computer and use it in GitHub Desktop.
per-node spark config using hooks
from kedro.framework.context import KedroContext
from kedro.framework.hooks import hook_impl
from kedro.pipeline.node import Node
class SparkHooks:
# Mapping of node names to spark configuration. e.g. node with name node_1
# will use configuration that matches the pattern spark_1* (like spark_1.yml).
# You could make this per-pipeline or done by node.tags if you prefer (so you
# tag each node with the spark config it needs).
NODE_SPARK_CONFIG = {"node_1": "spark_1*", "node_2": "spark_2*"}
@hook_impl
def after_context_created(
self,
context: KedroContext,
) -> None:
self.config_loader = context.config_loader
@hook_impl
def before_node_run(self, node: Node):
self.init_spark_session(self.NODE_SPARK_CONFIG[node.name])
@hook_impl
def after_node_run(self):
# do we need something to teardown current spark config? I'm not sure.
def init_spark_session(self, spark_config_pattern) -> None:
"""Initialises a SparkSession using the config
defined in project's conf folder.
"""
# Load the spark configuration in spark.yaml using the config loader
parameters = self.config_loader.get(spark_config_pattern)
spark_conf = SparkConf().setAll(parameters.items())
# Initialise the spark session
spark_session_conf = (
SparkSession.builder.appName(self._package_name)
.enableHiveSupport()
.config(conf=spark_conf)
)
_spark_session = spark_session_conf.getOrCreate()
_spark_session.sparkContext.setLogLevel("WARN")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment