Skip to content

Instantly share code, notes, and snippets.

@riga
Last active August 26, 2023 18:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save riga/eb4337b90b2285c00aa7a2f2b4ac2f22 to your computer and use it in GitHub Desktop.
Save riga/eb4337b90b2285c00aa7a2f2b4ac2f22 to your computer and use it in GitHub Desktop.
Dynamic law workflows
# coding: utf-8
import law
class LongRunning(law.Task):
def output(self):
return law.LocalFileTarget("$PWD/data/long_running.json")
def run(self):
# let's assume it's computationally expensive to compute n
expensive_computation = lambda: 5
self.output().dump({"n": expensive_computation()}, indent=4, formatter="json")
class CreateChars(law.LocalWorkflow):
# the initial default value of the *cache_branch_map* attribute
cache_branch_map_default = False
# collection class that wraps branch outputs of a workflow
output_collection_cls = law.SiblingFileCollection
def create_branch_map(self):
inp = self.input()["long_running"]
if not inp.exists():
return [None]
# cache the branch map once the input exists
self.cache_branch_map = True
return {
i: chr(97 + i)
for i in range(inp.load(formatter="json")["n"])
}
def workflow_requires(self):
reqs = super().workflow_requires()
reqs["long_running"] = self.requires_from_branch()
return reqs
def requires(self):
return LongRunning.req(self)
def output(self):
# return a placeholder target as long as the branch map is not final,
# mainly not to use branch 0 and which would be logically incorrect
return (
law.LocalFileTarget(f"$PWD/data/char_{self.branch}.json")
if self.cache_branch_map
else law.LocalFileTarget("$PWD/data/char_PLACEHOLDER.json")
)
def run(self):
self.output().dump({"char": self.branch_data}, indent=4, formatter="json")
class CreateAlphabet(law.Task):
def requires(self):
return CreateChars.req(self) # this will be the workflow
def output(self):
return law.LocalFileTarget("$PWD/data/alphabet.txt")
def run(self):
alphabet = "".join(
inp.load(formatter="json")["char"]
for inp in self.input()["collection"].targets.values()
)
self.output().dump(alphabet, formatter="text")
self.publish_message(f"\nalphabet: {alphabet}\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment