-
-
Save altendky/6a085be24502f3e2bce915fcf1db298d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import argparse | |
import errno | |
import functools | |
import os | |
import os.path | |
import shlex | |
import shutil | |
import stat | |
import subprocess | |
import sys | |
import time | |
this = os.path.normpath(os.path.abspath(__file__)) | |
project_root = os.path.dirname(this) | |
venv_path = os.path.join(project_root, 'venv') | |
venv_common_bin = os.path.join(venv_path, 'Scripts') | |
venv_python = os.path.join(venv_common_bin, 'python') | |
lib_src = os.path.join(project_root, 'python', 'libs') | |
requirements_stem = os.path.join(project_root, 'requirements') | |
py3 = sys.version_info[0] == 3 | |
class ExitError(Exception): | |
pass | |
def check_call(command, *args, **kwargs): | |
command = list(command) | |
print('Launching: ') | |
for arg in command: | |
print(' {}'.format(arg)) | |
return subprocess.check_call(command, *args, **kwargs) | |
def check_output(command, *args, **kwargs): | |
command = list(command) | |
print('Launching: ') | |
for arg in command: | |
print(' {}'.format(arg)) | |
return subprocess.check_output(command, *args, **kwargs) | |
def read_dot_env(): | |
env = {} | |
with open(os.path.join(project_root, '.env')) as f: | |
for line in f: | |
line = line.strip() | |
if line.startswith('#'): | |
continue | |
k, _, v = line.partition('=') | |
env[k] = v | |
return env | |
custom_env = { | |
'PIP_SRC': lib_src, | |
} | |
def create(bare_test, hil_test): | |
d = { | |
'linux': linux_create, | |
'win32': windows_create, | |
} | |
dispatch(d, bare_test=bare_test, hil_test=hil_test) | |
def common_create( | |
bare_test, | |
hil_test, | |
python, | |
venv_bin, | |
requirements_platform, | |
symlink, | |
): | |
if os.path.exists(venv_path): | |
raise ExitError( | |
'venv already exists. if you know it is safe, remove it with:\n' | |
' {} --rm'.format(str(this)) | |
) | |
try: | |
os.mkdir(lib_src) | |
except OSError: | |
pass | |
env = dict(os.environ) | |
env.update(read_dot_env()) | |
env.update(custom_env) | |
check_call( | |
[ | |
python, | |
'-m', 'venv', | |
'--prompt', os.path.join('gridtied', os.path.basename(venv_path)), | |
venv_path, | |
], | |
cwd=project_root, | |
env=env, | |
) | |
if symlink: | |
os.symlink(venv_bin, venv_common_bin) | |
check_call( | |
[ | |
venv_python, | |
'-m', 'pip', | |
'install', | |
'--upgrade', 'pip', | |
], | |
cwd=project_root, | |
env=env, | |
) | |
install_requirements( | |
bare_test=bare_test, | |
hil_test=hil_test, | |
requirements_platform=requirements_platform, | |
) | |
def install_requirements(bare_test, hil_test, requirements_platform): | |
base = requirements_stem | |
if bare_test: | |
base += '.' + 'bare_test' | |
base += '.' + requirements_platform | |
to_install = [base] | |
if hil_test: | |
to_install.append( | |
requirements_stem + '.hil_test.' + requirements_platform | |
) | |
env = dict(os.environ) | |
env.update(read_dot_env()) | |
env.update(custom_env) | |
for requirements in to_install: | |
install_requirements_file( | |
python=venv_python, | |
env=env, | |
requirements=requirements, | |
) | |
def install_requirements_file(python, env, requirements): | |
check_call( | |
[ | |
python, | |
'-m', 'pip', | |
'install', | |
'-r', requirements, | |
], | |
cwd=project_root, | |
env=env, | |
) | |
def linux_create(bare_test, hil_test): | |
venv_bin = os.path.join(venv_path, 'bin') | |
common_create( | |
bare_test=bare_test, | |
hil_test=hil_test, | |
python='python3.6', | |
venv_bin=venv_bin, | |
requirements_platform='linux', | |
symlink=True, | |
) | |
def windows_create(bare_test, hil_test): | |
python_path = check_output( | |
[ | |
'py', | |
'-3.6-32', | |
'-c', 'import sys; print(sys.executable)', | |
], | |
cwd=str(project_root), | |
) | |
if py3: | |
python_path = python_path.decode() | |
python_path = python_path.strip() | |
common_create( | |
bare_test=bare_test, | |
hil_test=hil_test, | |
python=python_path, | |
venv_bin=venv_common_bin, | |
requirements_platform='windows', | |
symlink=False, | |
) | |
def rm(ignore_missing): | |
try: | |
rmtree(venv_path) | |
except OSError as e: | |
if e.errno != errno.ENOENT: | |
raise | |
if not ignore_missing: | |
raise ExitError('venv not found at: {}'.format(venv_path)) | |
def ensure(bare_test, hil_test, quick): | |
d = { | |
'linux': functools.partial( | |
common_ensure, | |
requirements_platform='linux', | |
), | |
'win32': functools.partial( | |
common_ensure, | |
requirements_platform='windows', | |
), | |
} | |
dispatch(d, bare_test=bare_test, hil_test=hil_test, quick=quick) | |
def common_ensure(bare_test, hil_test, quick, requirements_platform): | |
existed = os.path.exists(venv_path) | |
if not existed: | |
create(bare_test=bare_test, hil_test=hil_test) | |
elif not quick: | |
install_requirements( | |
bare_test=bare_test, | |
hil_test=hil_test, | |
requirements_platform=requirements_platform, | |
) | |
check(bare_test=bare_test, hil_test=hil_test) | |
if existed: | |
print('venv already present and passes some basic checks') | |
else: | |
print('venv created and passed some basic checks') | |
def clean_path(path): | |
return os.path.normpath(os.path.abspath(path)) | |
def check(bare_test, hil_test): | |
activate = os.path.join(venv_common_bin, 'activate') | |
expected_name = 'VIRTUAL_ENV' | |
# try: | |
with open(activate) as f: | |
for line in f: | |
line = line.strip() | |
try: | |
name, original_venv_path = line.split('=', 1) | |
except ValueError: | |
continue | |
if name == expected_name: | |
original_venv_path, = shlex.split(original_venv_path) | |
break | |
else: | |
raise Exception( | |
'{} assignment not found ' | |
'in "{}"'.format(expected_name,activate), | |
) | |
# except OSError as e: | |
# if e.errno == errno.ENOENT: | |
# | |
# | |
# raise | |
if clean_path(venv_path) != clean_path(original_venv_path): | |
raise ExitError( | |
'venv should be at "{}" but has been moved to "{}"'.format( | |
original_venv_path, | |
venv_path, | |
), | |
) | |
gridtied = os.path.join(venv_common_bin, 'gridtied') | |
executables = [gridtied] | |
if bare_test: | |
dss = os.path.join(venv_common_bin, 'dss') | |
executables.append(dss) | |
if hil_test: | |
try: | |
check_call( | |
( | |
venv_python, | |
'-c', | |
'import typhoon', | |
), | |
) | |
except CalledProcessError: | |
raise ExitError( | |
'typhoon library can not be imported' | |
) | |
for executable in executables: | |
try: | |
check_call( | |
[ | |
executable, | |
], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
) | |
except OSError as e: | |
if e.errno == errno.ENOENT: | |
raise ExitError( | |
'required file "{}" not found'.format(gridtied), | |
) | |
elif e.errno == errno.EACCES: | |
raise ExitError( | |
'required file "{}" not runnable'.format(gridtied), | |
) | |
raise | |
def dispatch(d, *args, **kwargs): | |
for name, f in d.items(): | |
if sys.platform.startswith(name): | |
f(*args, **kwargs) | |
break | |
else: | |
raise ExitError('Platform not supported: {}'.format(sys.platform)) | |
def add_bare_flag(parser): | |
parser.add_argument( | |
'--bare-test', | |
action='store_true', | |
help='Include bare-board testing requirements', | |
) | |
def add_hil_flag(parser): | |
parser.add_argument( | |
'--hil-test', | |
action='store_true', | |
help='Include HiL requirements', | |
) | |
def add_test_flags(parser): | |
group = parser.add_mutually_exclusive_group() | |
add_bare_flag(group) | |
add_hil_flag(group) | |
def main(): | |
parser = argparse.ArgumentParser(description='Create and manage the venv') | |
parser.set_defaults(func=parser.print_help) | |
subparsers = parser.add_subparsers() | |
check_parser = subparsers.add_parser( | |
'check', | |
description='Do some basic validity checks against the venv', | |
) | |
check_parser.set_defaults(func=check) | |
create_parser = subparsers.add_parser( | |
'create', | |
description='Create the venv', | |
) | |
add_test_flags(create_parser) | |
create_parser.set_defaults(func=create) | |
ensure_parser = subparsers.add_parser( | |
'ensure', | |
description='Create the venv if not already present', | |
) | |
add_test_flags(ensure_parser) | |
ensure_parser.add_argument( | |
'--quick', | |
action='store_true', | |
help=( | |
'Consider valid if venv directory exists, ' | |
'do not make sure that all packages are installed' | |
), | |
) | |
ensure_parser.set_defaults(func=ensure) | |
rm_parser = subparsers.add_parser('rm', description='Remove the venv') | |
rm_parser.add_argument( | |
'--ignore-missing', | |
action='store_true', | |
help='Do not raise an error if no venv is present', | |
) | |
rm_parser.set_defaults(func=rm) | |
args = parser.parse_args() | |
cleaned = {k: v for k, v in vars(args).items() if k != 'func'} | |
args.func(**cleaned) | |
# TODO: CAMPid 0238493420143087667542054268097120437916848 | |
# http://stackoverflow.com/a/21263493/228539 | |
def del_rw(action, name, exc): | |
os.chmod(name, stat.S_IWRITE) | |
if os.path.isdir(name): | |
os.rmdir(name) | |
else: | |
os.remove(name) | |
def rmtree(path, retries=4): | |
for remaining in reversed(range(retries)): | |
try: | |
shutil.rmtree(path, onerror=del_rw) | |
except OSError as e: | |
if remaining == 0 or e.errno == errno.ENOENT: | |
raise | |
else: | |
break | |
print('{} remaining removal attempts'.format(remaining)) | |
time.sleep(0.5) | |
def _entry_point(): | |
try: | |
sys.exit(main()) | |
except ExitError as e: | |
sys.stderr.write(str(e) + '\n') | |
sys.exit(1) | |
if __name__ == '__main__': | |
_entry_point() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment