Created
August 12, 2020 20:09
-
-
Save Jonty/01b7fb2b0896431dc41ad65383b26f41 to your computer and use it in GitHub Desktop.
Pipecleaner is a tool for validating concourse pipelines
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""Pipecleaner is a tool for validating concourse pipelines. | |
Someone rewrote this in go because "it's not python 3 compatible", | |
so now they have a version 5x longer and less comprehensible. This | |
is a "port" to python3, via the 2to3 tool. It took 600ms to do. | |
It can check for the following issues: | |
* Resources being used in a job that have not been defined in the | |
`resources:` block. (Fatal) | |
* Resources that have been defined in the `resources:` block and are not | |
used in the pipeline. (Warning) | |
* Resources being used in an `input:` block that have not been | |
`get:`-ted. (Fatal) | |
* Resources that have been `get:`-ted and are not used in the job. | |
(Warning) | |
* `output:`s that are not used later in the job. (Warning) | |
* Scriptlets that fail the tests implemented by `shellcheck` | |
http://www.shellcheck.net/ (Fatal) | |
""" | |
import yaml | |
import re | |
import sys | |
import os | |
import getopt | |
import subprocess | |
class Pipecleaner(object): | |
def __init__(self): | |
pass | |
def validate(self, filename): | |
data = self.load_pipeline(filename) | |
return self.check_pipeline(data) | |
def load_pipeline(self, filename): | |
raw = open(filename).read() | |
# Regexp taken from https://github.com/cloudfoundry/bosh-cli/blob/21639e8/director/template/template.go#L90 | |
# (Fly uses bosh-cli's template package to do the interpolation. | |
# | |
# Include $(date) in the replacement so that shellcheck doesn't assume these substitutions have safe contents | |
raw = re.sub('\(\((!?[-/\.\w\pL]+)\)\)', 'DUMMY-$(date)', raw) | |
return yaml.load(raw) | |
def call_shellcheck(self, shell, args, variables): | |
""""Returns the exitcode and any output from running shellcheck""" | |
script = "" | |
for switch in args[:-1]: | |
if switch != "-c": | |
script += "set " + switch + "\n" | |
for name, value in variables.items(): | |
# Include $(date) so that shellcheck doesn't assume these variables have safe contents | |
script += name + "=\"DUMMY-$(date)\"\n" | |
script += "export " + name + "\n" | |
script += args[-1] | |
process = subprocess.Popen(["shellcheck", "-s", shell, "-"], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT) | |
output = process.communicate(script)[0] | |
exitcode = process.returncode | |
return exitcode, output | |
def check_pipeline(self, data): | |
errors = { | |
'unknown_resource': [], | |
'unfetched_resource': [], | |
'unused_fetch': [], | |
'unused_resource': [], | |
'unused_output': [], | |
'shellcheck': [], | |
'secret-interpolation': [], | |
} | |
defined_resource_names = set([e['name'] for e in data['resources']]) | |
overall_used_resources = set() | |
for job in data['jobs']: | |
plan = job['plan'] | |
get_resources = set() | |
used_resources = set() | |
output_resources = set() | |
triggered_resources = set() | |
while plan: | |
item = plan.pop(0) | |
discovered_blocks = [] | |
# Flatten array blocks as we don't care | |
for block_type in ('aggregate', 'in_parallel', 'do'): | |
if block_type in item: | |
discovered_blocks.extend(item[block_type]) | |
del item[block_type] | |
# Flatten single blocks we don't care about | |
for block_type in ('on_success', 'on_failure', 'ensure', 'try'): | |
if block_type in item: | |
discovered_blocks.append(item[block_type]) | |
del item[block_type] | |
if discovered_blocks: | |
plan = [item] + discovered_blocks + plan | |
continue | |
if 'get' in item: | |
get_resources.add(item['get']) | |
if 'trigger' in item: | |
triggered_resources.add(item['get']) | |
# `get:` referring to a resource that has not been defined | |
# in `resources:` | |
if item['get'] not in defined_resource_names: | |
errors['unknown_resource'].append({ | |
'job': job['name'], | |
'resource': item['get'], | |
'fatal': True | |
}) | |
# A 'put' also registers the resource as fetched | |
if 'put' in item: | |
output_resources.add(item['put']) | |
used_resources.add(item['put']) | |
# It is a common pattern to define an output that is then | |
# used in an ensure block that does not need explicit | |
# inputs | |
if 'params' in item and 'file' in item['params']: | |
directory = os.path.split(item['params']['file'])[0] | |
used_resources.add(directory) | |
# `put:` referring to a resource that has not been defined | |
# in `resources:` | |
if item['put'] not in defined_resource_names: | |
errors['unknown_resource'].append({ | |
'job': job['name'], | |
'resource': item['put'], | |
'fatal': True | |
}) | |
if 'task' in item: | |
if 'file' in item: | |
item['config'] = self.load_pipeline(item['file']) | |
if 'inputs' in item['config']: | |
for i in item['config']['inputs']: | |
used_resources.add(i['name']) | |
# Resources listed as an `input:` to a task that | |
# have not been `get:` or `put:` in this job | |
if i['name'] not in get_resources.union(output_resources): | |
errors['unfetched_resource'].append({ | |
'job': job['name'], | |
'resource': i['name'], | |
'task': item['task'], | |
'fatal': True | |
}) | |
if 'outputs' in item['config']: | |
for i in item['config']['outputs']: | |
output_resources.add(i['name']) | |
if 'run' in item['config']: | |
# This list of dialects may expand over time if/when | |
# shellcheck expands its set of dialects. | |
shellcheck_dialects = ['sh', 'bash', 'dash', 'ksh'] | |
if item['config']['run']['path'] in shellcheck_dialects: | |
params = item['config'].get('params', {}) or {} | |
item_params = item.get('params', {}) or {} | |
combined_params = params.copy() | |
combined_params.update(item_params) | |
exitcode, output = self.call_shellcheck(item['config']['run']['path'], | |
item['config']['run']['args'], | |
combined_params) | |
if exitcode != 0: | |
errors['shellcheck'].append({ | |
'job': job['name'], | |
'task': item['task'], | |
'~': output, | |
'fatal': True, | |
}) | |
if 'params' in item['config']: | |
config = item['config'] | |
for key in config['params']: | |
if re.search(r'SECRET|KEY(?!S)', key): | |
value = config['params'][key] | |
if not re.search(r'DUMMY', value): | |
errors['secret-interpolation'].append({ | |
'job': job['name'], | |
'task': item['task'], | |
'~': 'Key "' + key + '" looks like a secret, but the value is not interpolated', | |
'fatal': False, | |
}) | |
overall_used_resources = overall_used_resources.union(used_resources).union(get_resources) | |
# Resources that were fetched with a `get:` but never referred to | |
# We ignore triggered resources as they are often only used to | |
# trigger and not used in the tasks | |
get_remainder = get_resources - used_resources - triggered_resources | |
if get_remainder: | |
for resource in get_remainder: | |
errors['unused_fetch'].append({ | |
'job': job['name'], | |
'resource': resource, | |
'fatal': False | |
}) | |
# `output:` from tasks that were never referred to | |
out_remainder = output_resources - used_resources | |
if out_remainder: | |
for resource in out_remainder: | |
errors['unused_output'].append({ | |
'job': job['name'], | |
'resource': resource, | |
'fatal': False | |
}) | |
# Resources that were defined in the `resources:` block but never used | |
unused_resources = defined_resource_names - overall_used_resources | |
if unused_resources: | |
for resource in unused_resources: | |
errors['unused_resource'].append({ | |
'resource': resource, | |
'fatal': False | |
}) | |
return errors | |
if __name__ == '__main__': | |
def usage(): | |
print(""" | |
pipecleaner.py pipeline1.yml [pipelineN.yml...]""") | |
sys.exit(2) | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], '', []) | |
except getopt.GetoptError: | |
usage() | |
files = args | |
if not files: | |
usage() | |
BOLD = '\033[1m' | |
ENDC = '\033[0m' | |
FMT = '%s' + BOLD + '* ' + ENDC + '%s' + ': %s' | |
fatal = None | |
p = Pipecleaner() | |
for filename in files: | |
errors = p.validate(filename) | |
if [j for i in list(errors.values()) for j in i]: | |
print("\n==", BOLD, filename, ENDC, "==") | |
for err_type, err_list in list(errors.items()): | |
for err in err_list: | |
if err['fatal']: | |
msg_prefix = '\033[91mERROR ' | |
else: | |
msg_prefix = '\033[93mWARNING ' | |
if not fatal: | |
fatal = err['fatal'] | |
del err['fatal'] | |
error_strings = [] | |
for k, v in sorted(err.items()): | |
error_strings.append("%s='%s'" % (k, v)) | |
print(FMT % (msg_prefix, err_type.replace('_', ' ').title(), ', '.join(error_strings))) | |
if fatal is True: | |
sys.exit(10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment