Skip to content

Instantly share code, notes, and snippets.

@pirate
Last active December 18, 2023 10:18
Show Gist options
  • Save pirate/7193ab54557b051aa1e3a83191b69793 to your computer and use it in GitHub Desktop.
Save pirate/7193ab54557b051aa1e3a83191b69793 to your computer and use it in GitHub Desktop.
Example of how to pluginize a complex app using a hooks system
"""
Example of a pluginized architecture breaking up a large app
with complex behavior (ArchiveBox), into a series of steps
that plugins can hook into.
(read from the bottom to top to get a quick overview)
"""
import re
import json
from datetime import datetime
from copy import deepcopy
from functools import wraps, partial
from typing import Union
from prettyprinter import cpprint
class Mapping(dict):
"""dot.notation access to dictionary attributes"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
class classproperty(object):
def __init__(self, f):
self.f = f
def __get__(self, obj, owner):
return self.f(owner)
def deep_convert_dict(obj):
"""recursively convert native dicts into Mapping objects"""
new_obj = obj
if isinstance(obj, dict) and not isinstance(obj, Mapping):
new_obj = Mapping(obj)
try:
for key, value in new_obj.items():
new_obj[key] = deep_convert_dict(value)
except AttributeError:
pass
return new_obj
def recursive_merge(dict1, dict2, concat_lists=True, lists_unique=True, allow_mismatched_types=False):
"""deep merge two dictionaries, concatenating any lists encountered at the same key"""
for key in dict.fromkeys(tuple(dict1.keys()) + tuple(dict2.keys())):
assert key not in dir({}), f'Key {key} is not allowed in state is it conflicts with a builtin dir method!'
val1, val1_type = dict1.get(key), type(dict1.get(key))
val2, val2_type = dict2.get(key), type(dict2.get(key))
# convert native dict objects into Mapping whenever encountered
val1 = deep_convert_dict(val1)
val2 = deep_convert_dict(val2)
# iterate through key:value pairs merging each value
if key in dict1 and key in dict2:
if val1 is not None and not issubclass(val1_type, val2_type):
if allow_mismatched_types:
yield (key, val2)
else:
raise TypeError(
f'Value in dict1[{key}] has different type than value in dict2! '
f'{val1} ({val1_type.__name__}) != {val2} ({val2_type.__name__})'
)
if isinstance(val1, dict) and isinstance(val2, dict):
yield (key, Mapping(recursive_merge(val1, val2, concat_lists=concat_lists, lists_unique=lists_unique, allow_mismatched_types=allow_mismatched_types)))
elif isinstance(val1, list) and isinstance(val2, list):
if concat_lists:
if lists_unique:
yield (key, list(dict.fromkeys(val1 + val2)))
else:
yield (key, val1 + val2)
else:
yield (key, val2)
else:
# If one of the values is not a dict, you can't continue merging it.
# Value from second dict overrides one in first and we move on.
yield (key, val2)
# Alternatively, replace this with exception raiser to alert you of value conflicts
elif key in dict1:
yield (key, val1)
else:
yield (key, val2)
def deepmerge(dict1, dict2):
return Mapping(recursive_merge(dict1, dict2, concat_lists=True, lists_unique=True, allow_mismatched_types=False))
def update_state(func):
"""decorator to apply the returned dict as a patch deepmerged into state"""
@wraps(func)
def wrapper(cls, state, *args, **kwargs):
assert issubclass(cls, ArchiveBoxPlugin)
assert isinstance(state, (dict, Mapping))
state_patch = func(cls, state, *args, **kwargs)
return deepmerge(state, state_patch)
return wrapper
def flatten_hooks(hooks, plugins):
for plugin in plugins:
hooks = deepmerge(hooks, plugin.get_hooks())
return hooks
def get_plugin(plugin_name):
"""get the plugin python class given the type name"""
return globals()[plugin_name]
def load_plugins(state, plugins: Union[list, dict]):
for plugin_name in plugins:
state = deepmerge(state, {
'hooks': get_plugin(plugin_name).get_hooks(),
'plugins': {
plugin_name: {
'state': {
'loaded': 'imported',
'enabled': True,
},
},
},
})
return state
def run_hooks(state: Mapping, hook_name: str=None):
print()
print('>', hook_name)
for plugin in state.plugins:
state = get_plugin(plugin).run_hook(state, hook_name)
return deepmerge(state, {
'meta': {
'active_hook': hook_name,
},
})
class ArchiveBoxPlugin:
ENABLED = True
REQUIRED = False
DEFAULT_CONFIG = {}
REQUIRED_CONFIG = []
REQUIRED_PLUGINS = []
REQUIRED_HOOKS = []
ADVERTISED_HOOKS = []
@classproperty
def NAME(cls):
words = re.findall(r'[A-Z](?:[a-z]+|[A-Z]*(?=[A-Z]|$))', cls.__name__)
return ' '.join(words)
@classproperty
def CONFIG_NAME(cls):
return cls.__name__.replace("Plugin", "").upper()
@classmethod
def get_hooks(cls, include_super=True):
hook_methods = [
method for method in dir(cls)
if method.startswith('hook_')
]
if include_super:
return hook_methods
# find all methods implemented in the class itself (not superclass)
return [
name
for name, method in vars(cls).items()
if (callable(method)
or isinstance(method, classmethod)
or isinstance(method, staticmethod))
and name.startswith('hook_')
]
@classmethod
def get_plugin_state(cls, state, plugin_name=None):
return state.plugins[plugin_name or cls.__name__].state
@classmethod
def set_plugin_state(cls, plugin_state, plugin_name=None):
return Mapping({
'plugins': {
(plugin_name or cls.__name__): {
'state': plugin_state,
}
}
})
@classmethod
def get_plugin_config(cls, state: dict, plugin_name: str=None, flatten: bool=True):
if plugin_name:
CONFIG_NAME = state.plugins[plugin_name].config_name
else:
CONFIG_NAME = cls.CONFIG_NAME
CONFIG_PREFIX = f'PLUGIN_{CONFIG_NAME}_'
return Mapping({
(key.replace(CONFIG_PREFIX, '').lower() if flatten else key): val
for key, val in state.config.state.items()
if key.startswith(CONFIG_PREFIX)
})
@classmethod
def set_plugin_config(cls, flat_lowercase_config: dict):
return Mapping({
'config': {
f'PLUGIN_{cls.CONFIG_NAME}_{key.upper()}': val
for key, val in flat_lowercase_config.items()
}
})
@classmethod
def set_general_config(cls, flat_config: dict):
return Mapping({
'config': {
'state': flat_config,
}
})
@classmethod
def is_enabled(cls, state):
try:
return cls.get_plugin_state(state).enabled
except KeyError:
# before initial plugin state is set up
return True
@classmethod
def run_hook(cls, state, hook_name, *args, **kwargs):
# the dynamic calling + result merging logic here is critical and fragile,
# be careful editing this function, make sure to test output before and after
if not cls.is_enabled(state):
return state
try:
hook_function = getattr(cls, hook_name)
print(' >', cls.__name__, hook_name)
state = hook_function(state, *args, **kwargs)
except AttributeError:
pass
return deepmerge(state, {
'meta': {
'active_hook': f'{cls.__name__}.{hook_name}',
},
})
@classmethod
@update_state
def hook_setup_plugins(cls, state):
"""load the plugin metadata into the global state object"""
return {
'plugins': {
cls.__name__: {
'key': cls.__name__,
'name': cls.NAME,
'config_name': cls.CONFIG_NAME,
'path': f'./plugins/{cls.__name__}',
'required': cls.REQUIRED,
'state': {
'loaded': 'initialized',
'enabled': cls.ENABLED,
},
'hooks': cls.get_hooks(),
'hooks_defined': cls.get_hooks(include_super=False),
'hooks_advertised': cls.ADVERTISED_HOOKS,
'required_plugins': cls.REQUIRED_PLUGINS,
'required_config': cls.REQUIRED_CONFIG,
'default_config': cls.DEFAULT_CONFIG,
},
},
}
@classmethod
@update_state
def hook_config_start(cls, state):
"""load the default config into the global state object"""
prefixed_configs = {
(f'PLUGIN_{cls.CONFIG_NAME}_{key.upper()}' if key.islower() else key): value
for key, value in cls.DEFAULT_CONFIG.items()
}
return {
'config': {
'state': {
**prefixed_configs,
f'PLUGIN_{cls.CONFIG_NAME}_ENABLED': cls.ENABLED,
},
},
}
@classmethod
@update_state
def hook_config_plugins(cls, state):
"""load the plugin enabled status based its dependencies presence/status"""
self_enabled = cls.get_plugin_state(state).enabled
has_required_plugins = all(
cls.get_plugin_state(state, plugin_name).enabled
for plugin_name in cls.REQUIRED_PLUGINS
)
has_required_configs = all(
state.config.state[
config_key
if config_key.isupper() else
f'PLUGIN_{cls.CONFIG_NAME}_{config_key.upper()}'
]
for config_key in cls.REQUIRED_CONFIG
) and cls.get_plugin_config(state).enabled
should_enable = (
self_enabled
and has_required_plugins
and has_required_configs
)
if should_enable:
assert all(
hook_name in state.hooks
for hook_name in cls.REQUIRED_HOOKS
)
return cls.set_plugin_state({
'loaded': 'configured',
'enabled': should_enable,
})
class ArchiveBoxCorePlugin(ArchiveBoxPlugin):
REQUIRED = True
class ConfigFilePlugin(ArchiveBoxPlugin):
NAME = 'Config via Config File'
REQUIRED = True
@classmethod
@update_state
def hook_config(cls, state):
# config = load_config_file(state.config.config_file, state.config.schema)
return {
'config': {
'state': {'loaded_file_config': True},
}
}
@classmethod
@update_state
def hook_config_save(cls, state):
# config = write_config_file(state.config.config_file, state.config.schema, state.config)
return {
'config': {
'state': {'saved_config_file': True},
}
}
class ConfigEnvPlugin(ArchiveBoxPlugin):
NAME = 'Config via Environment Variables'
REQUIRED = True
@classmethod
@update_state
def hook_config(cls, state):
# config = load_config_env(state.config.schema, state.config.state)
# config = {'loaded_env_config': True}
return {
'config': {
'state': {
'loaded_env_config': True,
'PLUGIN_DARKTHEME_ENABLED': True, # just for testing
},
}
}
class DarkThemePlugin(ArchiveBoxPlugin):
DEFAULT_CONFIG = {
'TEMPLATE_DIRS': [
'./plugins/DarkThemePlugin',
],
}
class PocketHTMLParserPlugin(ArchiveBoxPlugin):
DEFAULT_CONFIG = {
'cleanup': True,
}
@classmethod
@update_state
def hook_parse(cls, state):
for line in state.parse.urls_text.split('\n'):
if cls.get_plugin_state(state).cleanup:
state.parse.links.append('clean+' + line)
else:
state.parse.links.append(line)
return {
'parse': {
'links': state.parse.links,
}
}
class ChromiumDependencyPlugin(ArchiveBoxPlugin):
REQUIRED_CONFIG = ['binary']
DEFAULT_CONFIG = {
'binary': 'chromium',
}
@classmethod
@update_state
def hook_config(cls, state):
binary = cls.get_plugin_config(state).binary
version = '92.234.234'
# version = False
return {
**cls.set_plugin_state({
'binary': binary,
'version': version,
'enabled': bool(binary and version),
}),
**cls.set_plugin_config({
'enabled': bool(binary and version),
})
}
class PlaywrightExtractorPlugin(ArchiveBoxPlugin):
REQUIRED_PLUGINS = ['PlaywrightExtractorPlugin']
DEFAULT_CONFIG = {
'enabled': True,
'user_agent': 'Chrome',
'geolocation': '234,234',
}
ADVERTISED_HOOKS = {
'hook_snapshot_start_setup_browser',
'hook_snapshot_start_setup_context',
'hook_snapshot_start_setup_page',
'hook_snapshot_load_start',
'hook_snapshot_load',
'hook_snapshot_load_end',
}
@classmethod
def hook_snapshot_start(cls, state):
state = run_hooks(state, 'hook_snapshot_start_setup_browser')
state = run_hooks(state, 'hook_snapshot_start_setup_context')
state = run_hooks(state, 'hook_snapshot_start_setup_page')
return state
@classmethod
def hook_snapshot(cls, state):
state = run_hooks(state, 'hook_snapshot_load_start')
state = run_hooks(state, 'hook_snapshot_load')
state = run_hooks(state, 'hook_snapshot_load_end')
return state
@classmethod
@update_state
def hook_snapshot_start_setup_browser(cls, state):
return cls.set_plugin_state({
'browser': 'sync_playwright.chromium',
})
@classmethod
@update_state
def hook_snapshot_start_setup_context(cls, state):
return cls.set_plugin_state({
'context_args': {
'executable_path': '/bin/' + cls.get_plugin_state(state, 'ChromiumDependencyPlugin').binary,
'timeout': 60_000,
},
})
@classmethod
@update_state
def hook_snapshot_start_setup_page(cls, state):
browser = cls.get_plugin_state(state).browser
context = 'browser.launch_persistent_context(**runner.context_args)'
page = 'context.new_page()'
return cls.set_plugin_state({
'context': context,
'page': page,
})
@classmethod
def hook_snapshot_load_start(cls, state):
return state
@classmethod
def hook_snapshot_load(cls, state):
return state
cls.get_plugin_state(state).page.goto(state.snapshot.url)
@classmethod
def hook_snapshot_load_end(cls, state):
return state
class TitleRecorderPlugin(ArchiveBoxPlugin):
ENABLED = True
REQUIRED_PLUGINS = ['PlaywrightExtractorPlugin']
REQUIRED_HOOKS = ['hook_snapshot_load_end']
@classmethod
@update_state
def hook_snapshot_load_end(cls, state):
# title = cls.get_plugin_state(state, 'PlaywrightExtractorPlugin').page.title()
title = 'Example title'
return {
'snapshot': {
'results': [
('title:' + title),
],
},
}
class VideoRecorderPlugin(ArchiveBoxPlugin):
ENABLED = True
REQUIRED_PLUGINS = ['PlaywrightExtractorPlugin']
REQUIRED_HOOKS = [
'hook_snapshot_start_setup_context',
'hook_snapshot_load_end',
]
DEFAULT_CONFIG = {
'TEMPLATE_DIRS': [
'./plugins/VideoRecorderPlugin',
],
'INDEX_COLUMNS': [
{'key': 'VideoRecorder', 'name': 'Video Recording', 'icon': 'video.png'},
],
'SNAPSHOT_PREVIEWS': [
{'key': 'VideoRecorder', 'name': 'Video Recording', 'icon': 'video.png', 'src': 'VideoRecorder/recording.mp4'},
],
}
@classmethod
@update_state
def hook_snapshot_start_setup_context(cls, state: dict):
return cls.set_plugin_state({
'context_args': {
'record_video_dir': './video',
'slow_mo': 0,
},
}, 'PlaywrightExtractorPlugin')
@classmethod
@update_state
def hook_snapshot_load_end(cls, state: dict):
# Path(state.archive.PlaywrightExtractorPlugin.page.video.path()).move_to('./VideoRecorder/recording.mp4')
return {
'snapshot': {
'results': [
'./VideoRecorder/recording.mp4',
],
},
}
ALL_PLUGINS = [
'ArchiveBoxCorePlugin',
'ConfigFilePlugin',
'ConfigEnvPlugin',
'DarkThemePlugin',
'PocketHTMLParserPlugin',
'ChromiumDependencyPlugin',
'PlaywrightExtractorPlugin',
'TitleRecorderPlugin',
'VideoRecorderPlugin',
]
INITIAL_STATE = {
'hooks': [
'hook_config_start',
'hook_config',
'hook_config_plugins',
'hook_config_end',
'hook_config_save',
'hook_parse_start',
'hook_parse',
'hook_parse_end',
'hook_archive_start',
'hook_archive',
'hook_archive_end',
'hook_pre_snapshot',
'hook_snapshot',
'hook_post_snapshot',
'hook_pre_save_result',
'hook_save_result',
'hook_post_save_result',
'hook_render_icon',
],
'plugins': {},
'config': {
'config_file_path': None,
'schema': {},
'state': {},
},
'meta': {
'start_time': None,
'end_time': None,
'active_hook': None,
'version': None,
},
'parse': {
'urls_text': 'https://example.com\nhttps://example.com/other',
'links': [],
},
'snapshot': {
'url': 'https://example.com',
'results': [],
},
}
VERSION = '0.7.1'
def run(state=INITIAL_STATE, plugins=ALL_PLUGINS):
try:
state = deepmerge(state, {
'meta': {
'start_time': datetime.now().isoformat(),
'version': VERSION,
},
})
state = load_plugins(state, plugins)
state = run_hooks(state, 'hook_setup_plugins')
state = run_hooks(state, 'hook_config_start')
state = run_hooks(state, 'hook_config')
state = run_hooks(state, 'hook_config_plugins')
state = run_hooks(state, 'hook_config_end')
# load_django(config=state.config.state)
state = run_hooks(state, 'hook_parse_start')
state = run_hooks(state, 'hook_parse')
state = run_hooks(state, 'hook_parse_end')
state = run_hooks(state, 'hook_archive_start')
state = run_hooks(state, 'hook_archive')
state = run_hooks(state, 'hook_snapshot_start')
state = run_hooks(state, 'hook_snapshot')
state = run_hooks(state, 'hook_snapshot_end')
state = run_hooks(state, 'hook_archive_end')
raise Exception('success!')
except Exception:
state = deepmerge(state, {
'meta': {
'end_time': datetime.now().isoformat(),
},
})
print()
print('STATE DUMP:')
# print(json.dumps(state, indent=4))
cpprint(state)
print()
raise
return state
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment