Skip to content

Instantly share code, notes, and snippets.

@fabianvf
Created December 20, 2018 22:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fabianvf/92c5f48092820e3fa0749adf0ab8c973 to your computer and use it in GitHub Desktop.
Save fabianvf/92c5f48092820e3fa0749adf0ab8c973 to your computer and use it in GitHub Desktop.
from __future__ import print_function
import os
import re
import yaml
base_dirs = {
'build': 'scaffold.BuildDir',
'build/test-framework': 'scaffold.BuildTestDir',
'deploy': 'scaffold.DeployDir',
'deploy/crds': 'scaffold.CrdsDir',
'roles/{{.Resource.LowerKind}}': 'RolesDir, {short_name}.Resource.LowerKind',
'molecule': 'MoleculeDir',
'molecule/test-cluster': 'MoleculeTestClusterDir',
'molecule/test-local': 'MoleculeTestLocalDir',
'molecule/default': 'MoleculeDefaultDir',
}
class Formatter(object):
dir_constants = {
'RolesDir': '"roles"',
'MoleculeDir': '"molecule"',
'MoleculeTestClusterDir': 'MoleculeDir + filePathSep + "test-cluster"',
'MoleculeTestLocalDir': 'MoleculeDir + filePathSep + "test-local"',
'MoleculeDefaultDir': 'MoleculeDir + filePathSep + "default"',
}
__name = None
__filename = None
__vars_decl = None
__base_dir = None
__raw_base_dir = None
__vars_value = None
__file_content = None
def __init__(self, root, input_file):
self.constants_map = {}
self.go_vars = set()
self.input_root = root
self.root = self._remove_top_dir(root)
self.input_file = input_file
self.filename_vars = self.extract_vars(os.path.join(root, input_file))
with open(os.path.join(root, input_file)) as f:
self.input_content = f.read()
self.content_vars = self.extract_vars(self.input_content)
def _remove_top_dir(self, path):
if '/' in path:
return path.split('/', 1)[1]
return ''
@property
def name(self):
if self.__name is not None:
return self.__name
if '.' in self.input_file:
name = camel_case(''.join(filter(None, self.input_file.split('.')[0:-1])).capitalize())
else:
name = camel_case(self.input_file)
name = camel_case(self.raw_base_dir) + name
self.__name = re.sub(r'{.*}', '', name)
return self.__name
def process_vars(self, s, names):
if names:
for var in names:
if not conversions.get(var):
update_conversions(var, raw_input('{}: '.format(var)))
s = s.replace(var, conversions[var])
return s
@property
def raw_base_dir(self):
if self.__raw_base_dir is not None:
return self.__raw_base_dir
base_dir = self.root
base_dir = self.process_vars(base_dir, self.filename_vars)
self.__raw_base_dir = base_dir
return self.__raw_base_dir
@property
def base_dir(self):
if self.__base_dir is not None:
return self.__base_dir
base_dir = self.raw_base_dir
matches = sorted([(k, v) for k, v in base_dirs.items() if base_dir.startswith(k.strip('/'))], key=lambda x: 0 - len(x[0]))
if matches:
base_dir = matches[0][1]
base_dir = base_dir.replace(matches[0][0], '')
elif base_dir:
__import__('pdb').set_trace()
self.__base_dir = base_dir
return self.__base_dir
@property
def filename(self):
if self.__filename is not None:
return self.__filename
filename = self.process_vars(self.input_file, self.filename_vars)
self.constants_map[camel_case(self.name) + 'File'] = filename
self.__filename = camel_case(self.name + 'File')
return self.__filename
@property
def file_content(self):
if self.__file_content is not None:
return self.__file_content
self.__file_content = self.process_vars(self.input_content, self.content_vars).replace('{', '{{').replace('}', '}}')
return self.__file_content
def parse_vars(self):
self.__vars_decl = ""
self.__vars_value = ""
if 'Resource' in self.filepath_string:
self.__vars_decl += '\n\tResource scaffold.Resource'
self.go_vars.add('Resource')
var_re = r'\.([A-Z][a-z]*\.?.*?)[ }]+'
matches = self.extract_vars(self.file_content)
if matches:
go_vars = {y.split('.')[0] for x in matches for y in re.findall(var_re, x) if y != 'ProjectName'}
for var in list(go_vars):
if not conversions.get(var):
update_conversions(var, raw_input('type of {}?: '.format(var)))
if go_vars:
self.go_vars = self.go_vars.union(go_vars)
longest = max(map(len, go_vars))
vars_decl_tmpl = '\t{} {}'
self.__vars_decl += '\n'.join([
vars_decl_tmpl.format(item.ljust(longest), conversions.get(item))
for item in go_vars
])
vars_value_tmpl = '\t{} = {}'
vars_value = ['{short_name}.' + var for var in go_vars]
for var in vars_value:
if not conversions.get(var):
update_conversions(var, raw_input('{} = ?: '.format(var)))
self.__vars_value = '\n'.join([
vars_value_tmpl.format(item, conversions.get(item))
for item in vars_value if conversions.get(item) != "IGNORE"
])
return
@property
def vars_decl(self):
if self.__vars_decl is not None:
return self.__vars_decl
self.parse_vars()
return self.__vars_decl
@property
def vars_value(self):
if self.__vars_value is not None:
return self.__vars_value
self.parse_vars()
return self.__vars_value
@property
def short_name(self):
return self.name[0].lower()
@property
def private_name(self):
return self.name[0].lower() + ''.join(self.name[1:])
@property
def source_filename(self):
return snake_case(self.name) + '.go'
@property
def constants(self):
ret = ""
for k, v in self.constants_map.items():
ret = ret + 'const {} = "{}"\n'.format(k, v)
return ret
@property
def filepath_string(self):
if self.raw_base_dir:
return 'filepath.Join({base_dir}, {filename})'.format(base_dir=self.base_dir, filename=self.filename)
return '{filename}'.format(filename=self.filename)
def format(self):
format_args = dict(
name=self.name,
vars_decl=self.vars_decl,
vars_value=self.vars_value,
short_name=self.short_name,
private_name=self.private_name,
filepath_string=self.filepath_string,
base_dir=self.base_dir,
filename=self.filename,
file_content=self.file_content,
constants=self.constants,
)
return TMPL.format(**format_args).format(**format_args)
def extract_vars(self, text):
return re.findall(r'{[{%].*?[}%]}', text)
TMPL = """// Copyright 2018 The Operator-SDK Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ansible
{constants}
type {name} struct {{{{
\tinput.Input
{vars_decl}
}}}}
// GetInput - gets the input
func ({short_name} *{name}) GetInput() (input.Input, error) {{{{
\tif {short_name}.Path == "" {{{{
\t\t{short_name}.Path = {filepath_string}
\t}}}}
\t{short_name}.TemplateBody = {private_name}AnsibleTmpl
{vars_value}
\treturn {short_name}.Input, nil
}}}}
const {private_name}AnsibleTmpl = `{file_content}`
"""
CONSTANTS_TMPL = """// Copyright 2018 The Operator-SDK Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ansible
import (
"path/filepath"
)
const (
\tfilePathSep = string(filepath.Separator)
{constants}
)
"""
CMD_TEMPLATE = """
func doAnsibleScaffold() {{
\tcfg := &input.Config{{
\t\tAbsProjectPath: filepath.Join(projutil.MustGetwd(), projectName),
\t\tProjectName: projectName,
\t}}
\tresource, err := scaffold.NewResource(apiVersion, kind)
\tif err != nil {{
\t\tlog.Fatalf("invalid apiVersion and kind: (%v)", err)
\t}}
\ts := &scaffold.Scaffold{{}}
\terr = s.Execute(cfg,
\t\t&scaffold.ServiceAccount{{}},
\t\t&scaffold.Role{{
\t\t\tIsClusterScoped: isClusterScoped,
\t\t}},
\t\t&scaffold.RoleBinding{{
\t\t\tIsClusterScoped: isClusterScoped,
\t\t}},
\t\t&scaffold.Crd{{
\t\t\tResource: resource,
\t\t}},
\t\t&scaffold.Cr{{
\t\t\tResource: resource,
\t\t}},
{modules}
\t)
\tif err != nil {{
\t\tlog.Fatalf("new ansible scaffold failed: (%v)", err)
\t}}
\t// Decide on playbook.
\tif generatePlaybook {{
\t\tlog.Infof("Generating %s playbook.", strings.Title(operatorType))
\t\terr := s.Execute(cfg,
\t\t\t&ansible.Playbook{{
\t\t\t\tResource: *resource,
\t\t\t}},
\t\t)
\t\tif err != nil {{
\t\t\tlog.Fatalf("new ansible playbook scaffold failed: (%v)", err)
\t\t}}
\t}}
\t// update deploy/role.yaml for the given resource r.
\tif err := scaffold.UpdateRoleForResource(resource, cfg.AbsProjectPath); err != nil {{
\t\tlog.Fatalf("failed to update the RBAC manifest for the resource (%v, %v): (%v)", resource.APIVersion, resource.Kind, err)
\t}}
}}
"""
MODULE_TMPL = """\t\t&ansible.{name}{{{args}\t\t}},
"""
conversions = {}
if os.path.exists('.cache'):
with open('.cache') as f:
conversions = yaml.load(f.read())
def update_conversions(key, value):
if not value:
value = key
conversions[key] = value
with open('.cache', 'w') as f:
f.write(yaml.dump(conversions, default_flow_style=False))
def capitalize(s):
if not s:
return s
return s[0].upper() + (''.join(s[1:]) if len(s) > 1 else '')
def snake_case(s):
if not s:
return s
return '_'.join(filter(lambda x: x, re.findall(r'([A-Z]*[a-z]*)', s))).lower()
def camel_case(s):
new_s = ''.join([capitalize(x) for x in s.split('/')])
new_s = ''.join([capitalize(x) for x in new_s.split('-')])
new_s = ''.join([capitalize(x) for x in new_s.split('_')])
new_s = ''.join([capitalize(x) for x in new_s.split('.')])
return new_s
def generate_constants(formatter):
fmt_line = '\t{} = {}'
lines = []
constants = Formatter.dir_constants.items()
longest = max(list(map(lambda x: len(x[0]), constants)))
for (k, v) in constants:
lines.append(fmt_line.format(k.ljust(longest), v))
formatted = '\n'.join(lines)
with open('constants.go', 'w') as f:
f.write(CONSTANTS_TMPL.format(constants=formatted))
def generate_cmd(go_vars):
modules = []
SPECIAL_VARS = {'GeneratePlaybook', 'IsClusterScoped', 'Resource'}
vars_mapping = {
'GeneratePlaybook': 'generatePlaybook',
'IsClusterScoped': 'isClusterScoped',
'Resource': '*resource',
}
args_tmpl = '\t\t\t{var_name}: {var_value},'
for name, instance in go_vars.items():
args = []
for var in SPECIAL_VARS.intersection(go_vars[name]):
args.append(args_tmpl.format(
var_name=var,
var_value=vars_mapping[var]
))
modules.append(MODULE_TMPL.format(
name=name,
args="" if not args else '\n' + '\n'.join(args) + '\n'
))
print(CMD_TEMPLATE.format(modules='\n'.join(modules)))
def main():
go_vars = {}
for root, dirs, files in os.walk('tmpl'):
for file in files:
formatter = Formatter(root, file)
with open(formatter.source_filename, 'w') as f:
f.write(formatter.format())
go_vars[formatter.name] = formatter.go_vars
generate_constants(formatter)
generate_cmd(go_vars)
if __name__ == '__main__':
import ipdb
with ipdb.launch_ipdb_on_exception():
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment