Skip to content

Instantly share code, notes, and snippets.

@pluveto
Created October 13, 2023 08:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pluveto/b7a96ffa24c9ed8ef301a40f11b84fad to your computer and use it in GitHub Desktop.
Save pluveto/b7a96ffa24c9ed8ef301a40f11b84fad to your computer and use it in GitHub Desktop.
import abc
class ConfigSource(metaclass=abc.ABCMeta):
@abc.abstractmethod
def sync(self, dest: str) -> bool:
return NotImplemented
class ObtuseAngle:
def talk(self):
print("I'm obtuse")
class LocalConfigSource:
def sync(self, dest: str) -> bool:
print(f"Syncing from local to {dest}")
return True
class S3ConfigSource(ConfigSource):
def sync(self, dest: str) -> bool:
print(f"Syncing from s3 to {dest}")
return True
class OnedriveConfigSource(ConfigSource):
def blabula(self):
pass
class ConfigService:
def __init__(self):
self._sources = {}
def add_source(self, name: str, source: ConfigSource):
if name in self._sources:
raise ValueError(f"Variant {name} already exists")
self._sources[name] = source
def source_names(self):
return self._sources.keys()
def sync(self, name: str, dest: str) -> bool:
if not name in self._sources:
raise ValueError(f"Unknown variant {name}")
return self._sources[name].sync(dest)
def protected_call(fn):
try:
fn()
except Exception as ex:
print(f"{type(ex).__name__}: {ex}")
if __name__ == "__main__":
service = ConfigService()
protected_call(lambda: service.add_source("obtuse", ObtuseAngle()))
protected_call(lambda: service.add_source("local", LocalConfigSource()))
protected_call(lambda: service.add_source("s3", S3ConfigSource()))
protected_call(lambda: service.add_source("onedrive", OnedriveConfigSource()))
print(f"Available sources: {service.source_names()}")
protected_call(lambda: service.sync("obtuse", "dir1"))
protected_call(lambda: service.sync("local", "dir1"))
protected_call(lambda: service.sync("s3", "dir1"))
protected_call(lambda: service.sync("onedrive", "dir1"))
import abc
from typing import Protocol
class ConfigSource(Protocol):
@abc.abstractmethod
def sync(self, dest: str) -> bool:
return NotImplemented
class ObtuseAngle:
def talk(self):
print("I'm obtuse")
class LocalConfigSource:
def sync(self, dest: str) -> bool:
print(f"Syncing from local to {dest}")
return True
class S3ConfigSource(ConfigSource):
def sync(self, dest: str) -> bool:
print(f"Syncing from s3 to {dest}")
return True
class OnedriveConfigSource(ConfigSource):
def blabula(self):
pass
class ConfigService:
def __init__(self):
self._sources = {}
def add_source(self, name: str, source: ConfigSource):
if name in self._sources:
raise ValueError(f"Variant {name} already exists")
self._sources[name] = source
def source_names(self):
return self._sources.keys()
def sync(self, name: str, dest: str) -> bool:
if not name in self._sources:
raise ValueError(f"Unknown variant {name}")
return self._sources[name].sync(dest)
def protected_call(fn):
try:
fn()
except Exception as ex:
print(f"{type(ex).__name__}: {ex}")
if __name__ == "__main__":
service = ConfigService()
protected_call(lambda: service.add_source("obtuse", ObtuseAngle()))
protected_call(lambda: service.add_source("local", LocalConfigSource()))
protected_call(lambda: service.add_source("s3", S3ConfigSource()))
protected_call(lambda: service.add_source("onedrive", OnedriveConfigSource()))
print(f"Available sources: {service.source_names()}")
protected_call(lambda: service.sync("obtuse", "dir1"))
protected_call(lambda: service.sync("local", "dir1"))
protected_call(lambda: service.sync("s3", "dir1"))
protected_call(lambda: service.sync("onedrive", "dir1"))
from typing import Protocol
class ConfigSource(Protocol):
def sync(self, dest: str) -> bool:
return NotImplemented
class ObtuseAngle:
def talk(self):
print("I'm obtuse")
class LocalConfigSource:
def sync(self, dest: str) -> bool:
print(f"Syncing from local to {dest}")
return True
class S3ConfigSource(ConfigSource):
def sync(self, dest: str) -> bool:
print(f"Syncing from s3 to {dest}")
return True
class OnedriveConfigSource(ConfigSource):
def blabula(self):
pass
class ConfigService:
def __init__(self):
self._sources = {}
def add_source(self, name: str, source: ConfigSource):
if name in self._sources:
raise ValueError(f"Variant {name} already exists")
self._sources[name] = source
def source_names(self):
return self._sources.keys()
def sync(self, name: str, dest: str) -> bool:
if not name in self._sources:
raise ValueError(f"Unknown variant {name}")
return self._sources[name].sync(dest)
def protected_call(fn):
try:
fn()
except Exception as ex:
print(f"{type(ex).__name__}: {ex}")
if __name__ == "__main__":
service = ConfigService()
protected_call(lambda: service.add_source("obtuse", ObtuseAngle()))
protected_call(lambda: service.add_source("local", LocalConfigSource()))
protected_call(lambda: service.add_source("s3", S3ConfigSource()))
protected_call(lambda: service.add_source("onedrive", OnedriveConfigSource()))
print(f"Available sources: {service.source_names()}")
protected_call(lambda: service.sync("obtuse", "dir1"))
protected_call(lambda: service.sync("local", "dir1"))
protected_call(lambda: service.sync("s3", "dir1"))
protected_call(lambda: service.sync("onedrive", "dir1"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment