Skip to content

Instantly share code, notes, and snippets.

@haydenflinner
Created September 22, 2018 15:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save haydenflinner/97eec5d3a0f84312bc221367acd3bfa2 to your computer and use it in GitHub Desktop.
Save haydenflinner/97eec5d3a0f84312bc221367acd3bfa2 to your computer and use it in GitHub Desktop.
A pyinoke tasks.py that implements make-like file dependencies.
from invoke import task, Collection
import invoke
import functools, itertools
import structlog
import os
log = structlog.get_logger()
def create_timestamp_differ(file_outputs_query, file_inputs_query, precursor=None):
"""
Creates an `invoke.task` that will return True if all of the files
specified by file_outputs_query are newer than all of the filenames
listed in ctx[file_inputs_query].
Takes `precursor` so that we can set `outputs` after resolving paths, but we
can't resolve paths until we have ctx (when the task actually gets called).
This is so that programs can modify ctx at runtime without strange behavior.
"""
def resolve_for_filenames(ctx, ctx_query):
"""
>>>resolve_for_filenames(ctx, 'buildprogram.outputfiles')
['outputfile1', 'outputfile2'] # If ctx.buildprogram.outputfiles == ['outputfile1', ...]
"""
returning_filenames = []
if not ctx_query:
return returning_filenames
strs = ctx_query.split('.')
# TODO try defaulting to task-name.outputs
assert(len(strs) > 0) # You have to give a path into your config!
last_result = ctx
for i, key in enumerate(strs):
if key not in last_result:
raise ValueError("ctx query resulted in None. Did you forget to configure it at ctx.{}?".format(ctx_query))
last_result = last_result[key]
returning_filenames.extend(last_result)
return returning_filenames
@task
def timestamp_differ(ctx):
"""
Will be executed as a check before running a `make_task`.
Returns True if task can be skipped.
"""
input_filenames = resolve_for_filenames(ctx, file_inputs_query)
# Leave the filenames resolved for our user
if precursor:
precursor.outputs = input_filenames
precursor.output = input_filenames[0] if input_filenames else None
output_filenames = resolve_for_filenames(ctx, file_outputs_query)
# Always run things that don't produce a file
if not output_filenames:
log.debug(event="timestamp_differ.returning.have_to_run", skipping=False)
return False
# If any files are missing (whether inputs or outputs),
# run the task. We run when missing inputs because hopefully
# the task will error out and notify the user, rather than silently
# ignore that it was supposed to do something.
from pathlib import Path
if any(not Path(p).exists()
for p in itertools.chain(input_filenames, output_filenames)):
log.debug(event="timestamp_differ.returning.filemissing",
input_filenames=input_filenames)
return False
# All exist, now make sure oldest output is older than youngest input.
def make_timestamp_list(l):
return sorted(Path(p).stat().st_mtime for p in l)
oldest_output = make_timestamp_list(output_filenames)[0]
youngest_input = make_timestamp_list(input_filenames)[-1]
skipping = youngest_input < oldest_output
log.debug(event="timestamp_differ.returning.haveoutput",
youngest_input=youngest_input, oldest_output=oldest_output,
skipping=skipping)
return skipping
return skipping
return timestamp_differ
class MakeTask(invoke.tasks.Task):
"""
An `invoke.task` replacement that supports make-like file dependencies.
make_task works just like GNU-make: by checking the timestamps on the last
update of each file that you depend on against the timestamp of the files
you create, we can decide whether or not you need to run.
@param outputs: List of strings that will be used to index into your `ctx`
to determine the filepath that you output to. Example:
```
@make_task(pre=[my_earlier_task], outputs=['build.outputfilenames'])
def build(ctx, myparam1):
pass
ns.configure({"build" : {"outputfilenames": ["outfile1"]}})
```
@param pre: List of tasks that this task depends on. If they are
`make_tasks`, they will only run if needed. Note that you can access the
output of a pretask at pretaskname.output regardless of how it configures
its output in ctx or with make_task. That is:
```
@make_task(pre=[my_earlier_task], outputs=['build.outputfilenames'])
def build(ctx):
file.open(my_earlier_task.output)
return 'Found it!'
```
will always work as long as my_earlier_task is a `make_task` and configured
at least one output file. Note that `output` is just a provided shortcut for
`outputs[0]`.
`pre` was chosen to overload the `invoke` pre because the transition should
be seamless; If you specify regular pre's that aren't make_tasks, they will
run as they always did. If they are make_tasks, they will skip if they
aren't required.
"""
def __init__(self, task,
file_inputs=None, pre=None, checks=None, outputs=None,
*args, **kwargs):
self.outputs = outputs
pre = pre or []
checks = checks or []
for precursor in pre:
if hasattr(precursor, 'outputs'):
# One of US!
# Rely on invoke to do our dirty work with `checks`.
# Alternatively, we might be able to reimplement something
# like they have by basically wrapping the task in another task,
checks.append(create_timestamp_differ(self.outputs,
file_inputs_query=precursor.outputs,
precursor=precursor
))
if file_inputs:
checks.append(create_timestamp_differ(self.outputs,
file_inputs_query=file_inputs))
super(MakeTask, self).__init__(task, pre=pre, checks=checks, *args, **kwargs)
make_task = functools.partial(task, klass=MakeTask)
@make_task(file_inputs='step1.inputs', outputs='step1.outputfiles')
def step1(ctx):
path = ctx.step1.inputs[0] # Could be a .c file for example
assert os.path.exists(path)
ctx.run('touch {}'.format(ctx.step1.outputfiles[0]))
@make_task(pre=[step1], outputs='step2.out')
def step2(ctx):
"""
Note that even though step1 specified ctx.something.outputfiles,
we can get its output through step1.output. This is a feature
of make_task: Once it figures out the task's output filenames
by checking the given query in the ctx, it will store that result
on that_task.outputs and the first output filename at that_task.output!
"""
path = step1.output
log.info(event='step2.checking', path=(path))
if os.path.exists(path):
ctx.run('touch {}'.format(ctx.step2.out[0]))
else:
raise RuntimeError("Huh?")
@make_task(pre=[step2], file_inputs='step2.out')
def step3(ctx):
"""
Here we specify both a filename (by checking step2's spot in config, that's
what you get for digging around in another task's private data)
and a pre function that would create it -- this is just for testing, you should
just use pre=[step2] usually.
"""
path = step2.output
log.info(event='step3.checking', path=(path))
if os.path.exists(path):
ctx.run('touch {}'.format(ctx.step3.outputs[0]))
else:
raise RuntimeError("Huh?")
@make_task(pre=[step3])
def build(ctx):
log.info(event="build.succeeded")
print('==============================================')
@task
def test(ctx):
full_run = ['touch x', 'touch y', 'touch z']
# Whole pipeline should run when source.c changes.
log.info(event="test.wholepipeline")
ctx.run('touch {}'.format(ctx.step1.inputs[0]))
res = ctx.run('invoke build')
assert(all(stmt in res.stdout for stmt in full_run))
# Test 2, Only last step should run if next to last step's output changed.
log.info(event="test.laststeponly")
ctx.run('touch {}'.format(ctx.step2.out[0]))
res = ctx.run('invoke build')
assert(all(stmt not in res.stdout for stmt in full_run[:2]))
assert(full_run[2] in res.stdout)
# Test 3, For good measure, try kicking the middle step.
# I hope this constant switch between ctx.step.outputfiles or ctx.step.out
# or ctx.step.outputs will encourage you to use task.outputs, which will
# always be there regardless of how upstream tasks are configured :)
log.info(event="test.middledown")
ctx.run('touch {}'.format(ctx.step1.outputfiles[0]))
res = ctx.run('invoke build')
assert(full_run[0] not in res.stdout)
assert(all(stmt in res.stdout for stmt in full_run[1:]))
# Test 4, Make sure make_tasks with just a pre and nothing else works.
log.info(event="test.justsuccess")
res = ctx.run('invoke build')
# touch z has to run because I forgot to put a file_outputs tag on it.
# That's fine, won't hurt anything :P
assert(all(stmt not in res.stdout for stmt in full_run[:2]))
assert("succeeded" in res.stdout)
ns = Collection(step1, step2, step3, build, test)
ns.configure(
{ # Using the name scheme { task_name: inputs/outputs } is only convention,
# you could config each task from anywhere you'd like and name
# inputs/outputs whatever, too.
'step1': {
'inputs' : ['source.c'],
'outputfiles': ['x'],
},
'step2': {
'out' : ['y']
},
'step3': {
'outputs' : ['z']
},
'run': { 'echo': True, 'pty': True}
})
"""
It would be better to follow a convention for these things, and maybe not nest them in the global ctx, i.e.:
ns.configure(
{ "buildinfo": {
'step1': {
'ins' : ['source.c'],
'outs': ['x'],
},
'step2': {
# TODO Add the ability to give a callable here, like this:
'ins' : [step1, 'regular_file']
# To allow configuring pres from the config, if that's what you're into.
'out' : ['y']
},
'step3': {
'outs' : ['z']
}},
'run': { 'echo': True, 'pty': True}
})
Now that we have this convention, we can iterate over all of the steps in a
pipeline and apply some transformation, or generate extra steps based on
the type of files present as inputs. Anything is possible with Python as your
config language :D
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment