Skip to content

Instantly share code, notes, and snippets.

@sabrinalui
Last active November 29, 2022 21:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sabrinalui/e474b2c82a9eb67a082b4875adbe9471 to your computer and use it in GitHub Desktop.
Save sabrinalui/e474b2c82a9eb67a082b4875adbe9471 to your computer and use it in GitHub Desktop.
dask flyte task plugin
class DaskFunctionTask(PythonFunctionTask[DaskConfig]):
def pre_execute(self, user_params: ExecutionParameters):
self.cluster = KubeCluster(...)
self.client = Client(cluster)
return (
user_params.builder()
.add_attr("DASK_CLIENT", self.client)
.add_attr("DASK_CLUSTER", self.cluster)
.build()
)
def execute(self, **kwargs):
try:
return super().execute(**kwargs)
except Exception as e:
self.cleanup()
raise e
def post_execute(self, user_params: ExecutionParameters, rval: Any):
self.cleanup()
return rval
def cleanup(self):
self.client.close()
self.cluster.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment