Skip to content

Instantly share code, notes, and snippets.

@wsanchez
Created June 8, 2018 23:09
Show Gist options
  • Save wsanchez/d29c7843852981f8cd14f5d886a47c28 to your computer and use it in GitHub Desktop.
Save wsanchez/d29c7843852981f8cd14f5d886a47c28 to your computer and use it in GitHub Desktop.
from typing import Iterable, Optional
from alchimia.engine import TwistedEngine
from attr import Factory, attrib, attrs
from sqlalchemy import create_engine
from sqlalchemy.schema import Column, MetaData, Table
from sqlalchemy.types import String
from twisted.logger import Logger
__all__ = (
"Index",
)
@attrs(frozen=True)
class Transmission(object):
"""
Radio transmission
"""
station: str = attrib()
system: str = attrib()
channel: str = attrib()
@attrs(frozen=True)
class Index(object):
"""
Generates an index of :class:`Transmission`s.
"""
_log = Logger()
# Schema
_metaData = MetaData()
_transmissions = Table(
"TRANSMISSION", _metaData,
Column("STATION", String, nullable=False),
Column("SYSTEM", String, nullable=False),
Column("CHANNEL", String, nullable=False),
)
@attrs
class _State(object):
"""
Internal mutable state for :class:`Index`.
"""
db: Optional[TwistedEngine] = attrib(
default=None, init=False,
)
_state: _State = attrib(default=Factory(_State), init=False)
@property
def _db(self) -> TwistedEngine:
if self._state.db is None:
self._log.info("Initializing SQLite DB...")
# engine = create_engine("sqlite:///:memory:", echo=True)
engine = create_engine("sqlite:////tmp/test.sqlite", echo=True)
self._metaData.create_all(engine)
from twisted.internet import reactor
self._state.db = TwistedEngine.from_sqlalchemy_engine(
reactor, engine
)
return self._state.db
async def addTransmissions(
self, transmissions: Iterable[Transmission]
) -> None:
self._log.info("Importing transmissions...")
await self._db.execute(
self._transmissions.insert(), [
dict(
STATION=transmission.station,
SYSTEM=transmission.system,
CHANNEL=transmission.channel,
)
for transmission in transmissions
]
)
if __name__ == "__main__":
from sys import stdout
from twisted.internet.defer import Deferred, ensureDeferred
from twisted.internet.interfaces import IReactorCore
from twisted.internet.task import react
from twisted.logger import (
FilteringLogObserver, LogLevel, LogLevelFilterPredicate,
globalLogBeginner, textFileLogObserver,
)
fileLogObserver = textFileLogObserver(stdout)
logLevelPredicate = LogLevelFilterPredicate(
defaultLogLevel=LogLevel.info
)
filteringObserver = FilteringLogObserver(
fileLogObserver, [logLevelPredicate]
)
globalLogBeginner.beginLoggingTo([filteringObserver])
transmissions = [
Transmission(
station="Radio 1",
system="A",
channel="Ops",
),
Transmission(
station="Radio 4",
system="B",
channel="Talk",
),
]
def main(reactor: IReactorCore) -> Deferred:
index = Index()
return ensureDeferred(
index.addTransmissions(transmissions)
)
react(main, [])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment