Skip to content

Instantly share code, notes, and snippets.

@thislight
Created October 19, 2019 10:42
Show Gist options
  • Save thislight/f4834a4218c35675526073144b7cfd59 to your computer and use it in GitHub Desktop.
Save thislight/f4834a4218c35675526073144b7cfd59 to your computer and use it in GitHub Desktop.
g.py and js_sdk.py
import os
import glob
import yaml
import json
import glob
import argparse
from pluginbase import PluginBase
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
class GenExt(object):
def at_generate(self, key, document, gconfig, scontroller):
pass
def at_end(self, gconfig, scontroller):
pass
def at_init(self, gconfig, scontroller):
pass
class ExtManager(object):
def __init__(self, ext_dir):
self.dir:str = ext_dir
self.exts = {}
self.plugin_base = PluginBase('g.exts')
def search(self, arguments):
plugin_source = self._search()
plugins = plugin_source.list_plugins()
plugin_count = len(plugins)
if plugin_count <= 0:
return False
else:
for plugin_name in plugins:
plugin = plugin_source.load_plugin(plugin_name)
print('Found plugin {} called {}'.format(plugin.__entry__, plugin.__entry__.__extname__))
instance = plugin.__entry__(arguments)
self.exts[plugin.__entry__.__extname__] = instance
return plugin_count
def _search(self):
plug_sources = self.plugin_base.make_plugin_source(identifier="gexts", searchpath=[self.dir])
return plug_sources
class GroupConfig(object):
def __init__(self, conf_dict, config_dir):
self.dir = config_dir
self.name = conf_dict['name']
self.version = conf_dict['version']
self.type = conf_dict['type']
self.files = conf_dict['files']
self.contents = {}
self.documents = {}
def expand_files(self):
raw_file_list:list[str] = self.files
updated_files = []
for filename in raw_file_list:
if filename.startswith('?'):
real_filename = filename[1:]
filelist = glob.glob(os.sep.join([*self.dir,real_filename]))
for p in filelist:
rp = os.path.abspath(p)
print("Found config {}".format(rp))
updated_files.append(p)
else:
rp = os.path.abspath(os.path.join(*self.dir, filename))
print("Found single config {}".format(rp))
updated_files.append(rp)
self.files = updated_files
def get_contents(self):
for p in self.files:
with open(p, mode='r') as f:
self.contents[p] = f.read()
def parse_documents(self):
for p in self.contents:
self.documents[p] = yaml.load(self.contents[p], Loader=Loader)
class SourceController(object):
def __init__(self, target_dir):
self.dir = target_dir
self.files = {}
def overwrite(self, path, content):
self.files[path] = content
def write_to_real(self):
for path in self.files.keys():
tpath = os.path.join(self.dir, path)
realpath:str = tpath.replace('/', os.sep)
if os.sep in realpath:
dirs = os.sep.join(realpath.split(os.sep)[:-1])
os.makedirs(dirs, exist_ok=True)
with open(realpath, mode='w+') as f:
f.write(self.files[path])
return
class Main(object):
def __init__(self):
self.create_argument_parser()
self.debug = False
def create_ext_manager(self, extdir):
self.extman = ExtManager(extdir)
def init_all_exts(self, group_config):
return self.extman.search({
'main': self,
'group_config': group_config
})
def get_the_dir_of(self, fpath):
return fpath.split(os.sep)[:-1]
def create_group_config(self, fpath):
with open(fpath, mode='r') as f:
return GroupConfig(
yaml.load(f.read(), Loader=Loader),
self.get_the_dir_of(fpath)
)
def create_argument_parser(self):
parser = argparse.ArgumentParser(
'g.py',
description = "A tool to generate codes for SDKs of lightstands"
)
parser.add_argument(
'--group-config','-g', type=str, metavar='group_config',
help="determind which group config needed to generate code",
dest='group_config', required=True
)
parser.add_argument(
'--exts', type=str, metavar='exts\' directory',
dest='exts_dir', required=True
)
parser.add_argument(
'--codec', type=str, metavar='codec',
dest='codec', required=True
)
parser.add_argument(
'--output', '-o', type=str, metavar='output directory',
dest='output_dir', required=True
)
parser.add_argument(
'--debug', type=bool, default=False, dest='debug'
)
self.argument_parser = parser
def generate(self, ext, gconfig, controller):
ext.at_init(gconfig, controller)
for k in gconfig.documents:
doc = gconfig.documents[k]
if self.debug: print(f"ext.at_generate with ({k}, {doc}, {gconfig}, {controller})")
ext.at_generate(k, doc, gconfig, controller)
ext.at_end(gconfig, controller)
controller.write_to_real()
def main(self):
namespace = self.argument_parser.parse_args()
self.debug = namespace.debug
self.create_ext_manager(namespace.exts_dir)
gconfig = self.create_group_config(namespace.group_config)
if not self.init_all_exts(gconfig):
print("Could not found any ext!")
print("exts_dir = {}".format(namespace.exts_dir))
return
target_codec = namespace.codec
ext = self.extman.exts.get(target_codec, None)
if not ext:
print("Could not found ext {}".format(target_codec))
return
else:
ext.debug = self.debug
print("Load Ext: {}".format(ext.__extname__))
controller = SourceController(namespace.output_dir)
gconfig.expand_files()
gconfig.get_contents()
gconfig.parse_documents()
self.generate(ext, gconfig, controller)
if __name__ == "__main__":
main = Main()
main.main()
import os
import glob
import yaml
import json
import json
import argparse
import datetime
from functools import reduce
import tornado.template as ttemplate
MAX_MACRO_EXPAND_TIMES = 100
FILE_TEMPLATE = """/*
This file is generated automatically.
DO NOT EDIT IT BY HAND.
Generation Ext.: {{ ext_name }}
Time: {{ current_time }}
Please use this file under the license of this project.
*/
{{ code_body }}
module.export = {
{% for k in exports %}
"{{ k }}": {{ exports[k] }},
{% end %}
};
"""
MAP_TEMPLATE = """
/* Map {{ mapobj.name }}
{{ make_map_description_of(mapobj) }}
*/
class {{ make_map_name(mapobj) }} {
constructor({% raw '{' %}{{ ','.join(key_names(mapobj.keys)) }}{% raw '}' %}){
{% for arg in mapobj.keys %}
this.{{ arg.name }} = {{ arg.name }};
{% end %}
}
verify(){
{% for arg in mapobj.keys %}
{{ make_data_verify_code(arg) }}
{% end %}
}
}
"""
MSG_TEMPLATE = """
/* Message for {{ command.command }}
{{ make_command_description_of(command) }}
*/
class {{ make_cls_name(command) }} {
constructor({% raw '{' %}{{ ','.join(argument_names(arguments)) }}{% raw '}' %}){
{% for arg in arguments %}
{% if not is_map(arg.type) %}
this.{{ arg.name }} = {{ arg.name }};
{% else %}
this.{{ arg.name }} = new {{ make_map_name_raw(arg.type) }}({{arg.name}});
{% end %}
{% end %}
}
verify(){
{% for arg in arguments %}
{{ make_data_verify_code(arg) }}
{% end %}
}
}
"""
DATA_VERIFY_TEMPLATE = """
{% for arg in arguments %}
if (!({{ make_verify_rule(arg) }})){
return false;
}
{% end %}
"""
PICK_MSG_TEMPLATE = """
let pick_message = (command, arguments) => {
{% set first_if = True%}
{% for command in all_commands %}
{{ '' if first_if else 'else'}} if (command == "{{ command.command }}"){
return new {{ make_cls_name(command.command) }}(arguments);
}
{% if first_if %}
{% set first_if = False %}
{% end %}
{% end %}
else {
return null;
};
};
"""
PICK_MAP_TEMPLATE = """
let pick_map = (name, arguments) => {
{% set first_if = True%}
{% for map_ in all_maps %}
{{ '' if first_if else 'else'}} if (name == "{{ map_.name }}"){
return new {{ make_map_name_raw(map_.name) }}(arguments);
}
{% if first_if %}
{% set first_if = False %}
{% end %}
{% end %}
else {
return null;
};
};
"""
MACROSTORE = {}
def _str2pyval(v: str):
if v == 'T':
return True
elif v == 'F':
return False
elif v == 'nil':
return None
elif (v.startswith('"') and v.endswith('"')):
return v[1:-1]
else:
return int(v)
def _expand_macro(macro_str, **extra_arguments: dict):
if not isinstance(macro_str, str):
return macro_str
if (not (macro_str.startswith("(;;") and macro_str.endswith(")"))):
return macro_str
real_command = macro_str[3:-1]
command_parts = real_command.split(' ')
command = command_parts[0]
arguments = map(_str2pyval, command_parts[1:])
macrof = MACROSTORE.get(command, None)
if not macrof:
return macro_str
return macrof(*arguments, **extra_arguments)
def use_as_macro(name):
def wrapper(func):
assert(name not in MACROSTORE)
MACROSTORE[name] = func
return func
return wrapper
@use_as_macro('refcommand')
def _macro_refcommand(target: str, **extra):
commands = extra['commands']
return commands.get(target, None)
@use_as_macro('refmap')
def _macro_refmap(target: str, **extra):
maps = extra['maps']
return maps.get(target, None)
@use_as_macro('_return_extra')
def _macro_return_extra(target: str, **extra):
return extra
def _get_raw_versions(map_: dict) -> list:
versions = []
raw_versions = map_['versions']
assert(hasattr(raw_versions, '__iter__'))
assert(hasattr(raw_versions, '__getitem__'))
for k in raw_versions:
versions.append((k, raw_versions[k]))
return versions
def _get_arguments(map_: dict, key: str) -> list:
arguments = []
raw_arguments = map_[key]
for k in raw_arguments:
assert(isinstance(raw_arguments[k], dict))
arguments.append(CommandArgument.from_map(raw_arguments[k]))
return arguments
def _get_commands(map_: dict, key: str) -> list:
commands = []
raw_commands = map_[key]
for k in raw_commands:
assert(isinstance(raw_commands[k], dict))
commands.append(Command.from_map(raw_commands[k]))
return commands
def _expand_macro_r(d, f):
if isinstance(d, list):
return map(f, d)
elif isinstance(d, dict):
new_map = {}
for k in d:
new_map[_expand_macro_r(d, f)] = _expand_macro_r(d[k], f)
return new_map
elif hasattr(d, "__expand_macro__"):
return d.__expand_macro__(f)
else:
return f(d)
class CommandArgument(object):
def __init__(self, name, type_: list, **kargs: dict):
self.name = name
self.type = type_
self.description = kargs.get('description', None)
self.map = kargs.get('map', None)
@classmethod
def from_map(cls, json: dict):
name = json['name']
type_ = json['type']
type_ = cls.parse_type(type_)
description = json.get('description', None)
map_ = json.get('map', None)
return cls(name, type_, description=description, map=map_)
@staticmethod
def parse_type(type_string: str) -> list:
return [s.trim() for s in type_string.split(' ')]
def expand_macro(self, **extra):
def expand(v): return _expand_macro(**extra)
self.name = expand(self.name, **extra)
self.type = _expand_macro_r(self.description, expand)
if self.description:
self.description = _expand_macro_r(self.description, expand)
if self.type == "map" or self.map:
self.map = _expand_macro_r(self.map, expand)
def __expand_macro__(self, f):
extra = f("(;;_return_extra)")
self.expand_macro(**extra)
return self
def _is_map(names: list, all_maps: dict):
return reduce(lambda x, y: x or y, map(lambda n: n in all_maps, names), False)
class Command(object):
def __init__(self, command: str):
self.command = command
self.versions: list[Tuple] = []
self.description: str = None
self.arguments: list[CommandArgument] = []
self.next_states: list[Command] = []
@classmethod
def from_map(cls, json: dict):
name = json['name']
ins = Command(name)
ins.versions.extend(_get_raw_versions(json))
ins.description = json.get('description', None)
ins.arguments = _get_arguments(json, 'arguments')
ins.next_states = _get_commands(json, 'next_states')
return ins
def expand_macro(self, **extra):
def expand(v): return _expand_macro(v, **extra)
self.command = expand(self.command)
self.versions = _expand_macro_r(self.versions, expand)
self.description = expand(self.description)
self.arguments = _expand_macro_r(self.arguments, expand)
self.next_states = _expand_macro_r(self.next_states, expand)
def __expand_macro__(self, f):
extra = f("(;;_return_extra)")
self.expand_macro(**extra)
return self
def expand_map(self, maps: dict):
for v in self.arguments:
if _is_map(v.type, maps):
if isinstance(v.map, dict):
v.map = CommandMap.from_map(v.map)
for c in self.next_states:
c.expand_map(maps)
class CommandMap(object):
def __init__(self, name: str):
self.name = name
self.versions: list[Tuple] = []
self.description: str = None
self.keys: list = []
@classmethod
def from_map(cls, json: dict):
ins = cls(json['name'])
raw_versions = json['versions']
ins.versions.extend(_get_raw_versions(json))
raw_description = json.get('description')
ins.description = raw_description
ins.keys.extend(_get_arguments(json, 'keys'))
return ins
def expand_macro(self, **extra):
def expand(v): return _expand_macro(v, **extra)
self.name = expand(self.name)
self.versions = _expand_macro_r(self.versions, expand)
self.description = expand(self.description)
self.keys = _expand_macro_r(self.keys, expand)
def __expand_macro__(self, f):
extra = f("(;;_return_extra)")
self.expand_macro(extra)
return self
def expand_map(self, maps: dict):
for k in self.keys:
if _is_map(k.type, maps) and isinstance(k.map, dict):
k.map = CommandMap.from_map(k.map)
def _still_has_macro(i):
if isinstance(i, list):
return reduce(lambda x, y: x or y, _still_has_macro(i), False)
elif isinstance(i, dict):
def check(d): return lambda k: _still_has_macro(
k) or _still_has_macro(d[k])
return reduce(check(i), i.keys(), False)
else:
return True if isinstance(i, str) and (i.startswith("(;;") and i.endswith(")")) else False
class StandsJavaScriptSDKGen(object):
__extname__ = 'js_sdk'
def __init__(self, arguments):
self.commands = {}
self.maps = {}
self.template_message_class = ttemplate.Template(
MSG_TEMPLATE, name="message_class", whitespace='all')
self.template_data_verify = ttemplate.Template(
DATA_VERIFY_TEMPLATE, name="data_verify", whitespace='all'
)
self.template_pick_message = ttemplate.Template(
PICK_MSG_TEMPLATE, name="pick_message", whitespace='all'
)
self.template_map_class = ttemplate.Template(
MAP_TEMPLATE, name="map_class", whitespace='all'
)
self.template_pick_map = ttemplate.Template(
PICK_MAP_TEMPLATE, name="pick_map", whitespace='all'
)
self.template_file = ttemplate.Template(
FILE_TEMPLATE, name="file", whitespace='all'
)
def at_init(self, gconfig, scontroller):
self.gconfig = gconfig
self.sc = scontroller
@staticmethod
def gtool_make_cls_name_raw(command: str) -> str:
name = command.command.replace('.', ' ').title().replace(' ', '')
return name
@classmethod
def gtool_make_cls_name(cls, command: Command) -> str:
return cls.gtool_make_cls_name_raw(command.command)
@staticmethod
def gtool_argument_names(arguments: list) -> list:
return map(lambda x: x.name, arguments)
@staticmethod
def gtool_make_map_name_raw(name: str):
return name.replace('.', ' ').title().replace(' ', '') + "Map"
@classmethod
def gtool_make_map_name(cls, mapobj: CommandMap):
return cls.gtool_make_map_name_raw(mapobj.name)
def gtool_make_verify_rule(self, ts: list, _r: list = None) -> list:
if len(ts) <= 0:
return _r
if not _r:
_r = []
def call_self(ts, r):
return self.gtool_make_verify_rule(ts, _r=r.extend(_r))
v: str = ts.pop()
if v == "||":
return call_self(ts, lambda _: "||")
elif v == "(":
return call_self(ts, lambda _: "(")
elif v == ")":
return call_self(ts, lambda _: ")")
elif v[0].isalpha():
if self.gtool_make_map_name_raw(v) in self.maps:
return call_self(ts, lambda valname: "{}.verify()".format(valname))
elif self.gtool_make_cls_name_raw(v) in self.commands:
return call_self(ts, lambda valname: "{}.verify()".format(valname))
elif v == "string":
return call_self(ts, lambda valname: "(typeof({}) == \"String\")".format(valname))
elif v == "null":
return call_self(ts, lambda valname: "({} == null)".format(valname))
elif v == "number":
return call_self(ts, lambda valname: "(typeof({}) === 'number')".format(valname))
elif v == "int":
return call_self(ts, lambda valname: "((typeof({name}) === 'number') && {name}.isSafeInteger())".format(name=valname))
elif v == "not(":
return call_self(ts, lambda _: "!(")
elif v == "not":
return call_self(ts, lambda _: "!")
else:
raise LookupError(
"Could not found map or command called {}".format(v))
else:
raise SyntaxError(
"Could not parse {} in {}".format(v, ' '.join(ts)))
def gtool_make_data_verify_rule_function(self):
def make_data_verify_rule(arg: CommandArgument):
in_method_name = "this.{}".format(arg.name)
return ' '.join(
map(lambda l: l(in_method_name), self.gtool_make_verify_rule(arg.type)))
return make_data_verify_rule
def gtool_make_data_verify_code_function(self):
def make_data_verify_code(arguments: list[CommandArguments]) -> str:
return self.template_data_verify.generate(
arguments=arguments,
make_verify_rule=self.gtool_make_data_verify_rule_function()
)
return make_data_verify_code
@staticmethod
def gtool_map_key_names(args_list: list) -> list:
return list(map(lambda a: a.name, args_list))
def gtool_make_command_description_of_function(self):
def make_command_description_of(command: Command) -> str:
changelog = '\n'.join(
map(lambda t: "{}: {}".format(t[0], t[1]), command.versions))
return "{}\nchangelog:\n{}".format(command.description, changelog)
return make_command_description_of
def gtool_make_map_description_of_function(self):
def make_map_description_of(mapobj: CommandMap) -> str:
changelog = '\n'.join(
map(lambda t: "{}: {}".format(t[0], t[1]), mapobj.versions)
)
return "{}\nchanglog:\n{}".format(mapobj.description, changelog)
return make_map_description_of
def gtool_is_map(self, names):
if isinstance(names, list):
return _is_map(names, self.maps)
else:
return _is_map([names], self.maps)
def generate_message_class(self, command: Command) -> str:
return self.template_message_class.generate(
command=command,
arguments=command.arguments,
make_cls_name=self.gtool_make_cls_name,
argument_names=self.gtool_argument_names,
make_data_verify_code=self.gtool_make_data_verify_code_function(),
make_command_description_of=self.gtool_make_command_description_of_function(),
is_map=self.gtool_is_map,
make_map_name_raw=self.gtool_make_map_name_raw
)
def generate_map_class(self, mapobj: CommandMap) -> str:
return self.template_map_class.generate(
mapobj=mapobj,
make_map_name=self.gtool_make_map_name,
key_names=self.gtool_map_key_names,
make_map_description_of=self.gtool_make_map_description_of_function(),
make_data_verify_code=self.gtool_make_data_verify_code_function()
)
def generate_pick_message(self) -> str:
return self.template_pick_message.generate(
all_commands=self.commands.values(),
make_cls_name=self.gtool_make_cls_name_raw
)
def generate_file(self, code_body:str, exports:dict) -> str:
return self.template_file.generate(
ext_name = self.__extname__,
current_time = datetime.datetime.utcnow().isoformat(),
code_body=code_body,
exports=exports,
)
def generate_pick_map(self) -> str:
return self.template_file.generate(
all_maps=self.maps,
make_map_name_raw=self.gtool_make_map_name_raw
)
def at_generate(self, k, document, gconfig, scontroller):
if isinstance(document, list):
for d in document:
assert(hasattr(d, '__contains__'))
assert(hasattr(d, '__getitem__'))
if "map" in d:
name = d['map']
assert(not (name in self.maps))
self.maps[name] = CommandMap.from_map(d)
elif "command" in d:
name = d['command']
assert(not (name in self.commands))
self.commands[name] = Command.from_map(d)
else:
raise EnvironmentError("Could not recvice a non-list document")
def _try_expand_all_macro(self, macro_infob):
def expand(x): return _expand_macro(x, macro_infob)
expand_times = 0
while True:
expand_times += 1
if expand_times > MAX_MACRO_EXPAND_TIMES:
if _still_has_macro(self.maps) and _still_has_macro(self.commands):
print("After {} times expanding, macro still cloud not be expanded".format(
expand_times))
break
if _still_has_macro(self.maps) and _still_has_macro(self.commands):
self.maps = _expand_macro_r(self.maps, expand)
self.commands = _expand_macro_r(self.commands, expand)
else:
break
def _try_expand_maps(self):
for m in self.maps:
m.expand_map(self.maps)
for c in self.commands:
c.expand_map(self.maps)
def _make_defintion_exports(self):
exports = {}
for m in self.maps:
exports[self.gtool_make_map_name(m)] = self.gtool_make_map_name(m)
for c in self.commands:
exports[self.gtool_make_cls_name(c)] = self.gtool_make_cls_name(c)
exports['pick_message'] = "pick_message"
exports['pick_map'] = "pick_map"
return exports
def at_end(self, gconfig, scontroller):
macro_infob = {
'commands': self.commands,
'maps': self.maps
}
self._try_expand_all_macro(macro_infob)
self._try_expand_maps()
defintion_codeparts: list[str] = []
for k in self.maps:
defintion_codeparts.append(
self.generate_map_class(self.maps[k])
)
defintion_codeparts.append(self.generate_pick_map())
for k in self.commands:
defintion_codeparts.append(
self.generate_message_class(self.commands[k]))
defintion_codeparts.append(self.generate_pick_message())
defintion_code = '\n'.join(defintion_codeparts)
defintion_code = self.generate_file(defintion_code, self._make_defintion_exports())
scontroller.overwrite('sdk_def.js', defintion_code)
__entry__ = StandsJavaScriptSDKGen
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment