Created
June 8, 2018 23:09
-
-
Save wsanchez/d29c7843852981f8cd14f5d886a47c28 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Iterable, Optional | |
from alchimia.engine import TwistedEngine | |
from attr import Factory, attrib, attrs | |
from sqlalchemy import create_engine | |
from sqlalchemy.schema import Column, MetaData, Table | |
from sqlalchemy.types import String | |
from twisted.logger import Logger | |
__all__ = ( | |
"Index", | |
) | |
@attrs(frozen=True) | |
class Transmission(object): | |
""" | |
Radio transmission | |
""" | |
station: str = attrib() | |
system: str = attrib() | |
channel: str = attrib() | |
@attrs(frozen=True) | |
class Index(object): | |
""" | |
Generates an index of :class:`Transmission`s. | |
""" | |
_log = Logger() | |
# Schema | |
_metaData = MetaData() | |
_transmissions = Table( | |
"TRANSMISSION", _metaData, | |
Column("STATION", String, nullable=False), | |
Column("SYSTEM", String, nullable=False), | |
Column("CHANNEL", String, nullable=False), | |
) | |
@attrs | |
class _State(object): | |
""" | |
Internal mutable state for :class:`Index`. | |
""" | |
db: Optional[TwistedEngine] = attrib( | |
default=None, init=False, | |
) | |
_state: _State = attrib(default=Factory(_State), init=False) | |
@property | |
def _db(self) -> TwistedEngine: | |
if self._state.db is None: | |
self._log.info("Initializing SQLite DB...") | |
# engine = create_engine("sqlite:///:memory:", echo=True) | |
engine = create_engine("sqlite:////tmp/test.sqlite", echo=True) | |
self._metaData.create_all(engine) | |
from twisted.internet import reactor | |
self._state.db = TwistedEngine.from_sqlalchemy_engine( | |
reactor, engine | |
) | |
return self._state.db | |
async def addTransmissions( | |
self, transmissions: Iterable[Transmission] | |
) -> None: | |
self._log.info("Importing transmissions...") | |
await self._db.execute( | |
self._transmissions.insert(), [ | |
dict( | |
STATION=transmission.station, | |
SYSTEM=transmission.system, | |
CHANNEL=transmission.channel, | |
) | |
for transmission in transmissions | |
] | |
) | |
if __name__ == "__main__": | |
from sys import stdout | |
from twisted.internet.defer import Deferred, ensureDeferred | |
from twisted.internet.interfaces import IReactorCore | |
from twisted.internet.task import react | |
from twisted.logger import ( | |
FilteringLogObserver, LogLevel, LogLevelFilterPredicate, | |
globalLogBeginner, textFileLogObserver, | |
) | |
fileLogObserver = textFileLogObserver(stdout) | |
logLevelPredicate = LogLevelFilterPredicate( | |
defaultLogLevel=LogLevel.info | |
) | |
filteringObserver = FilteringLogObserver( | |
fileLogObserver, [logLevelPredicate] | |
) | |
globalLogBeginner.beginLoggingTo([filteringObserver]) | |
transmissions = [ | |
Transmission( | |
station="Radio 1", | |
system="A", | |
channel="Ops", | |
), | |
Transmission( | |
station="Radio 4", | |
system="B", | |
channel="Talk", | |
), | |
] | |
def main(reactor: IReactorCore) -> Deferred: | |
index = Index() | |
return ensureDeferred( | |
index.addTransmissions(transmissions) | |
) | |
react(main, []) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment