Skip to content

Instantly share code, notes, and snippets.

@ma-ric
Last active September 16, 2015 14:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ma-ric/d10bd7b201771dffdc85 to your computer and use it in GitHub Desktop.
Save ma-ric/d10bd7b201771dffdc85 to your computer and use it in GitHub Desktop.
# The MIT License (MIT)
#
# Copyright (c) 2015 ma-ric
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Reload package and contained subpackages and modules"""
__author__ = 'ma-ric'
import types
import importlib
import sys
from collections import deque
def is_module(item):
return isinstance(item, types.ModuleType)
def is_package(item):
return (is_module(item) and
(item.__package__ == item.__name__))
def reload_module(module):
assert is_module(module)
print("reload module:", module.__name__)
importlib.reload(module)
def is_sub_module(module0, module1):
assert is_module(module0)
assert is_module(module1)
p0 = module0.__package__
p1 = module1.__package__
if p1:
return p1.startswith(p0)
return False
def get_module(item):
if is_module(item):
return item
else:
module_name = getattr(item, '__module__', None)
return sys.modules[module_name] if module_name else None
def print_modules(modules, *title):
args = list(title)
args.append([m.__name__ for m in modules])
print(*args)
def module_key(module):
return module.__name__
class PackageReload:
def __init__(self, package):
assert is_package(package)
self.package = package
self.used_modules = dict()
self.module_usage = dict()
def used_package_modules(self, module):
modules = set()
for item_name in dir(module):
if not item_name.startswith('__'):
item = getattr(module, item_name)
m = get_module(item)
# if not m:
# print("NOT in a module", item)
if m and m is not module and is_sub_module(self.package, m):
modules.add(m)
return modules
def register_used_package_modules(self, parent_module):
key = module_key(parent_module)
um = self.used_package_modules(parent_module)
# print_modules(um, "register_used_package_module", key)
self.used_modules[key] = um
return um
def is_registered(self, module):
assert is_module(module)
um = self.used_modules.get(module_key(module))
return um is not None
def collect_dependence_data(self):
# print("collect_dependence_data")
um = self.register_used_package_modules(self.package)
unvisited_sub_modules = deque(um)
# print_modules(unvisited_sub_modules, "unvisited_sub_modules")
while unvisited_sub_modules:
parent_module = unvisited_sub_modules.pop()
# print("parent_module", parent_module)
if not self.is_registered(parent_module):
um = self.register_used_package_modules(parent_module)
new_sub_modules = [m for m in um if is_sub_module(parent_module, m)]
# print_modules(new_sub_modules, "new_sub_modules")
unvisited_sub_modules.extend(new_sub_modules)
# else:
# print("skip, parent_module already registered")
# print_modules(unvisited_sub_modules, "unvisited_sub_modules")
# print("collect_dependence_data finished")
# print_modules(unvisited_sub_modules, "unvisited_sub_modules")
self.print_used_modules()
def print_used_modules(self):
print("used_modules")
for m, used_modules in self.used_modules.items():
print("\t", m)
for um in used_modules:
print("\t\t", um.__name__)
print("-"*50)
def index_module_usage(self):
for module_user, used_modules in self.used_modules.items():
# to make sure that non-used modules has an empty entry in module_usage
self.module_usage.setdefault(module_user, set())
# fill module_uage
for um in used_modules:
key = module_key(um)
module_users = self.module_usage.setdefault(key, set())
module_users.add(module_user)
self.print_module_usage()
def print_module_usage(self):
print("module_usage")
for m, module_users in self.module_usage.items():
print("\t", m)
for mu in module_users:
print("\t\t", mu)
print("-"*50)
def get_reload_order(self):
self.collect_dependence_data()
self.index_module_usage()
reload_order = deque()
independent = deque()
not_reloaded_dep = dict()
for m, dep in self.used_modules.items():
dep_set = set((module_key(d) for d in dep))
# print_modules(nrd, "nrd", m)
not_reloaded_dep[m] = dep_set
if not dep_set:
independent.append(m)
# print("independent", independent)
while independent:
reload_order.extend(independent)
new_independent = deque()
for m in independent:
module_users = self.module_usage[m]
for u in module_users:
dep_set = not_reloaded_dep[u]
dep_set.remove(m)
if not dep_set:
new_independent.append(u)
independent = new_independent
# print("independent", independent)
# print("reload_order", reload_order)
return reload_order
def reload(self):
reload_order = self.get_reload_order()
for module_name in reload_order:
m = sys.modules[module_name]
reload_module(m)
def reload_package(package):
pr = PackageReload(package)
pr.reload()
__all__ = [reload_module, reload_package]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment