Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
class DataProcSparkOperator(BaseOperator):
"""
Start a Cloud DataProc cluster, run a Spark job, then shut down the Spark cluster.
"""
template_fields = ['arguments']
ui_color = '#0273d4'
@apply_defaults
def __init__(
self,
main_jar=None,
main_class=None,
arguments=None,
archives=None,
files=None,
labels=None,
dataproc_cluster=None,
dataproc_spark_properties=None,
dataproc_spark_jars=None,
project_id=None,
dataproc_cluster_properties=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
allowed_envs=Environment.ALL,
*args,
**kwargs):
super(DataProcSparkOperator, self).__init__(*args, **kwargs)
dataproc_properties = DEFAULT_SPARK_DATAPROC_PROPERTIES.copy()
if dataproc_spark_properties:
dataproc_properties.update(dataproc_spark_properties)
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.main_jar = main_jar
self.main_class = main_class
self.arguments = arguments
self.archives = archives
self.files = files
self.labels = labels
self.dataproc_cluster = dataproc_cluster
self.dataproc_properties = dataproc_properties
self.dataproc_jars = dataproc_spark_jars
self.dataproc_cluster_properties = dataproc_cluster_properties
self.allowed_envs = allowed_envs
@limit_by_environment
def execute(self, context):
# Create a cluster if requested
cluster_hook = None
cluster_created = False
if self.dataproc_cluster_properties is not None:
# Ensure cluster name exists, create a random one if not
if self.dataproc_cluster is None:
curr_time_millis = int(round(time.time() * 1000))
self.dataproc_cluster = 'dp-%s-%d' % (uuid.uuid4().hex, curr_time_millis)
self.dataproc_cluster_properties['cluster_name'] = self.dataproc_cluster
cluster_hook = DataProcClusterHook(
project_id=self.project_id,
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to
)
cluster_created = cluster_hook.create_cluster(**self.dataproc_cluster_properties)
# Run job on cluster
try:
hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
project_id=self.project_id)
job = hook.create_job_template(self.project_id,
self.task_id,
self.dataproc_cluster,
"sparkJob",
self.dataproc_properties)
job.set_main(self.main_jar, self.main_class)
job.add_args(self.arguments)
job.add_jar_file_uris(self.dataproc_jars)
job.add_archive_uris(self.archives)
job.add_file_uris(self.files)
labels = self.labels
def sanitize(l):
return re.sub('[^a-z0-9-]', '-', l.lower())[:63]
if labels is None:
labels = {
'task-id': sanitize(self.task_id),
'dag-id': sanitize(self.dag_id),
}
job.add_labels(labels)
logging.info("Submitting job: \n{}".format(job.pretty()))
hook.submit(job.build())
except Exception, e:
logging.error("job submission failed %s", e)
raise
finally:
# Tear down cluster when we're done
if cluster_created:
cluster_hook.delete_cluster(self.dataproc_cluster)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.