Skip to content

Instantly share code, notes, and snippets.

@hughsaunders
Last active December 12, 2015 09:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hughsaunders/4751341 to your computer and use it in GitHub Desktop.
Save hughsaunders/4751341 to your computer and use it in GitHub Desktop.
Analysing Dependencies in Chef Cookbooks
#!/usr/bin/python
# Script for extracting dependencies from chefcookbook repositories.
# Hugh Saunders 2013
# The output is "item,depdency" one pair per line. The idea is to then graph these with Gegpih (https://gephi.org/)
# There are three methods for working out dependencies see -h or the argparse section in main()
# If you wish to use the cookbook metadata option, use:
# knife cookbook metadata -o cookbook-dir --all
# to generate the json files.
import sys
import os
import json
import re
import argparse
class Cookbook:
"""Class representing a cookbook.
This is used when basing dependencies on cookbook metadata.
There is a class attribute list to keep track of instances."""
cookbooks = []
def __init__(self, file_path):
self.json = json.load(open(file_path, 'r'))
Cookbook.cookbooks.append(self)
def get_deps(self):
return self.json['dependencies'].keys()
def get_name(self):
return self.json['name']
@classmethod
def get_all_cookbook_deps(cls):
return_list = []
for cookbook in cls.cookbooks:
for dep in cookbook.get_deps():
return_list.append((cookbook.get_name(), dep))
return return_list
class GetDeps:
def __init__(self, start_dir):
self.start_dir = start_dir
def process_chef_repo(self, start_dir=None):
if not start_dir:
start_dir = self.start_dir
for directory, folders, files in os.walk(start_dir):
if 'metadata.json' in files:
Cookbook(os.path.join(directory, 'metadata.json'))
elif directory.split(os.sep)[-1] == 'recipes':
for recipe_file in [f for f in files if re.search('\.rb$', f)]:
recipe = Recipe.recipe_from_path(os.path.join(directory, recipe_file))
recipe.process_recipe()
elif directory.split(os.sep)[-1] == 'roles':
for role_file in [f for f in files if re.search('\.rb$', f)]:
role = Role.role_from_path(os.path.join(directory, role_file))
role.process_role()
class Recipe:
recipes = {}
#(?:...) == non capturing group
#(?P<name>...) = named capture group
name_re = re.compile(r'(?:(?P<cookbook>[^:]+)(?:::))?(?P<recipe_name>.+)?')
def __init__(self, name):
self.name = name
self.file_path = None
self.dep_roles = []
self.dep_recipes = []
Recipe.recipes[self.name] = self
@classmethod
def name_from_path(cls, path):
path_array = path.split(os.sep)
recipes_index = path_array.index('recipes')
cookbook = path_array[recipes_index - 1]
recipe_name = path_array[recipes_index + 1].split('.')[0]
name = '%s::%s' % (cookbook, recipe_name)
return name, recipe_name, cookbook
def process_recipe(self):
"""A recicpe is dependent on another recipe if it includes it.
A recipe is dependent on a role if it calls get_*_endpoint(Role,_,_)"""
assert self.file_path
#endpoint_re = re.compile(r'get_(?:lb|access)_endpoint\(["\'\s]*([^"\']+)["\'],')
endpoint_re = re.compile(r'get_(?:[^_]+)_endpoint\(["\'\s]*([^"\']+)["\'],')
self.dep_roles += map(Role.get_role, endpoint_re.findall(open(self.file_path, 'r').read()))
include_re = re.compile(r'include_recipe\s*[\'"]([^\'"]+)[\'"]')
self.dep_recipes += map(Recipe.get_recipe, include_re.findall(open(self.file_path, 'r').read()))
# print "dep recipes",self.name,[r.name for r in self.dep_recipes],"roles",[r.name for r in self.dep_roles]
def get_dep_roles(self):
"""Get roles that this recipe depends on"""
dep_roles = []
dep_roles += self.dep_roles
for recipe in self.dep_recipes:
dep_roles += recipe.get_dep_roles()
return dep_roles
@classmethod
def strip_module(cls, name):
match = cls.name_re.match(name)
return match.group('recipe_name')
@classmethod
def get_recipe(cls, name):
#test if full name is in recipes dict
if name not in cls.recipes:
#test if there is a match for recipe name only (no module::)
for recipe_name, recipe in cls.recipes.items():
if cls.strip_module(recipe_name) == cls.strip_module(name):
return recipe
#if cls.strip_module(name) in map(cls.strip_module,cls.recipes):
# return cls.recipes[cls.strip_module(name)]
#no instance found, even without a module name, so create a new instance
else:
cls.recipes[name] = cls(name)
return cls.recipes[name]
@classmethod
def recipe_from_path(cls, path):
recipe = cls.get_recipe(cls.name_from_path(path)[0])
if not recipe.file_path:
recipe.file_path = path
return recipe
class Role:
roles = {}
def __init__(self, name=None, file_path=None):
self.name = name
self.file_path = file_path
self.dep_recipes = []
self.dep_roles = []
Role.roles[self.name] = self
def process_role(self):
self.role_re = re.compile(r'role\[([^]]*)\]')
self.recipe_re = re.compile(r'recipe\[([^]]*)\]')
self.dep_recipes += map(Recipe.get_recipe, self.recipe_re.findall(open(self.file_path, 'r').read()))
self.dep_roles += map(Role.get_role, self.role_re.findall(open(self.file_path, 'r').read()))
def get_deps_from_recipes(self):
deps = []
for recipe in self.dep_recipes:
deps += recipe.get_dep_roles()
return deps
def get_deps_from_roles(self):
return self.dep_roles
@classmethod
def name_from_path(cls, path):
return os.path.split(path)[1].split('.')[0]
@classmethod
def get_role(cls, name):
if name not in Role.roles:
Role.roles[name] = Role(name=name)
return Role.roles[name]
@classmethod
def role_from_path(cls, path):
role = cls.get_role(cls.name_from_path(path))
if not role.file_path:
role.file_path = path
return role
def main(argv=None):
if not argv:
argv = sys.argv[1:]
parser = argparse.ArgumentParser()
parser.add_argument('path', help='Path to root of chef cookbooks repository')
parser.add_argument('--cookbook-metadata', help='List dependencies between cookbooks from cookbook metadata.',
action='store_true')
parser.add_argument('--get-endpoint', help='List dependencies between roles. '
'Role A is dependent on role B if one of the recipes required by '
+ 'role A calls get_*_endpoint("Role B",_,_)',
action='store_true')
parser.add_argument('--role-def', help='List dependencies between roles as listed in the role definitions.',
action='store_true')
parser.add_argument('--remove-self-deps',help='Dont list self dependencies (eg A-->A)',action='store_true')
args = parser.parse_args(argv)
if not (args.cookbook_metadata or args.get_endpoint or args.role_def):
parser.print_help()
print
print "Atleast one of --cookbook-metadata, --get-endpoint, --role-def is required."
return 1
gd = GetDeps(args.path)
gd.process_chef_repo()
deps = set()
#Add cookbook metadata deps if requested
if args.cookbook_metadata:
deps.update(Cookbook.get_all_cookbook_deps())
#Add get-endpoint deps if requested
if args.get_endpoint:
for role in Role.roles.values():
for dep in role.get_deps_from_recipes():
deps.add((role.name, dep.name))
#Add role-deps if requested
if args.role_def:
for role in Role.roles.values():
for dep in role.get_deps_from_roles():
deps.add((role.name, dep.name))
#Remove loopback connections if requested
if args.remove_self_deps:
del_list = []
for dep in deps:
if dep[0] == dep[1]:
del_list.append(dep)
for dep in del_list:
deps.remove(dep)
if deps:
for dep in deps:
print "%s,%s" % dep
else:
print "No deps found"
return 1
if __name__ == "__main__":
sys.exit(main())
__author__ = 'Hugh Saunders'
#Unit tests for Chef Cookbook Dependency Grapher Script.
import unittest
import os
import sys
import StringIO
import argparse
from dep_graph import Cookbook, GetDeps, Recipe, Role
from dep_graph import main as depgraph_main
class TestDepGraph(unittest.TestCase):
def setUp(self):
self.test_dir="test_data"
def read_cookbook_metadata(self):
Cookbook(os.path.join(self.test_dir, 'cookbooks', 'a', 'metadata.json'))
Cookbook(os.path.join(self.test_dir, 'cookbooks', 'b', 'metadata.json'))
def test_cookbook_metadata(self):
self.read_cookbook_metadata()
self.assertEqual([('a','b')],Cookbook.get_all_cookbook_deps())
def read_recipe(self,path):
r=Recipe.recipe_from_path(path)
r.process_recipe()
def read_recipes(self):
Recipe.recipes = {}
self.read_recipe(os.path.join(self.test_dir, 'cookbooks', 'a', 'recipes', 'a.rb'))
self.read_recipe(os.path.join(self.test_dir, 'cookbooks', 'b', 'recipes', 'b.rb'))
self.read_recipe(os.path.join(self.test_dir, 'cookbooks', 'b', 'recipes', 'c.rb'))
def test_recipe_include(self):
self.read_recipes()
self.assertEqual('b::b',Recipe.get_recipe('a::a').dep_recipes[0].name)
self.assertEqual('c',Recipe.get_recipe('b::b').dep_recipes[0].name)
def test_recipe_role_dep(self):
self.read_recipes()
self.assertEqual('c',Recipe.get_recipe('b::b').get_dep_roles()[0].name)
def test_recipe_role_dep_indirect(self):
self.read_recipes()
self.assertEqual('c',Recipe.get_recipe('a::a').get_dep_roles()[0].name)
def test_recipe_names(self):
self.read_recipes()
self.assertEqual(['a::a','b::b','c'],sorted([r.name for r in Recipe.recipes.values()]))
def read_role(self,role):
r=Role.role_from_path(os.path.join(self.test_dir,'roles',role))
r.process_role()
def read_roles(self):
Role.roles={}
self.read_role('rolea.rb')
self.read_role('roleb.rb')
def test_role_role_dep(self):
self.read_roles()
self.assertEqual('roleb',Role.get_role('rolea').get_deps_from_roles()[0].name)
def test_role_recipe_dep(self):
self.read_roles()
self.assertEqual(['c','d'],sorted([r.name for r in Role.get_role('roleb').get_deps_from_recipes()]))
def reset_instance_lists(self):
Recipe.recipes = {}
Role.roles = {}
Cookbook.cookbooks = []
def test_process_repo(self):
self.reset_instance_lists()
#instantiate get deps
gd=GetDeps(self.test_dir)
#process a whole directory
gd.process_chef_repo(self.test_dir)
#Same assertions as are in the above tests.
#process_chef_repo should load exactly the same data as all the specific loads do.
self.assertEqual(['c','d'],sorted([r.name for r in Role.get_role('roleb').get_deps_from_recipes()]))
self.assertEqual('roleb',Role.get_role('rolea').get_deps_from_roles()[0].name)
self.assertEqual(['a::a','b::b','c'],sorted([r.name for r in Recipe.recipes.values()]))
self.assertEqual('c',Recipe.get_recipe('a::a').get_dep_roles()[0].name)
self.assertEqual('c',Recipe.get_recipe('b::b').get_dep_roles()[0].name)
self.assertEqual('b::b',Recipe.get_recipe('a::a').dep_recipes[0].name)
self.assertEqual('c',Recipe.get_recipe('b::b').dep_recipes[0].name)
self.assertEqual([('a','b')],Cookbook.get_all_cookbook_deps())
def main_test(self, result, *args):
self.reset_instance_lists()
real_out=sys.stdout
string_out=StringIO.StringIO()
sys.stdout=string_out
depgraph_main(list(args)+[self.test_dir])
self.assertEqual(result,string_out.getvalue())
sys.stdout=real_out
def test_main_cookbook_meta(self):
self.main_test('a,b\n', '--cookbook-meta')
def test_main_get_endpoint(self):
self.main_test('roleb,d\nroleb,c\n', '--get-endpoint')
def test_main_role_deps(self):
self.main_test('rolea,roleb\n', '--role-def')
def test_main_endpoint_and_role_deps(self):
self.main_test('rolea,roleb\nroleb,d\nroleb,c\n','--get-endpoint','--role-def')
def test_main_all(self):
self.main_test('a,b\nroleb,d\nrolea,roleb\nroleb,c\n','--get-endpoint','--role-def','--cookbook-meta')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment