Skip to content

Instantly share code, notes, and snippets.

@wsanchez
Last active February 26, 2021 18:42
Show Gist options
  • Save wsanchez/546a170b25339ac656dc86f55cc4740b to your computer and use it in GitHub Desktop.
Save wsanchez/546a170b25339ac656dc86f55cc4740b to your computer and use it in GitHub Desktop.
Okta (Asyncio) + Twisted
##
# See the file COPYRIGHT for copyright information.
##
"""
Burning Man Profiles CLI
"""
from asyncio import ensure_future
from functools import partial, update_wrapper
from sys import stdout
from typing import Any, Awaitable, Callable, Optional, overload
from attr import attrs
import click
from okta.client import Client as OktaClient
from twisted.application.runner._runner import Runner
from twisted.logger import Logger
from twisted.internet import asyncioreactor as asyncioReactorModule
from twisted.internet import default as defaultReactorModule
from twisted.internet.defer import Deferred, ensureDeferred
from twisted.python.failure import Failure
__all__ = ()
log = Logger()
CommandHandler = Callable[..., None]
AsynchronousCommandHandler = Callable[..., Awaitable[None]]
@attrs(frozen=True, auto_attribs=True, slots=True, kw_only=True)
class Command:
"""
Command Line
"""
@classmethod
def main(cls) -> None:
"""
Command line entry point.
"""
main()
@overload
def runInReactor(*, reactorModule: Any) -> Callable:
... # pragma: no cover
@overload
def runInReactor(f: AsynchronousCommandHandler) -> CommandHandler:
... # pragma: no cover
def runInReactor(
f: Optional[AsynchronousCommandHandler] = None,
*,
reactorModule: Any = defaultReactorModule,
) -> Callable:
"""
Decorator that takes an asynchronous callable and returns a synchronous
callable that calls and awaits on the asynchronous callable in a Twisted
reactor, and then stops the reactor.
"""
def react(command: Callable) -> None:
from twisted.internet import reactor as installedReactor
def whenRunning() -> Deferred:
def error(f: Failure) -> None:
log.failure("Uncaught error: {log_failure}", failure=f)
installedReactor.stop()
d = ensureDeferred(command())
d.addErrback(error)
return d
runner = Runner(
reactor=installedReactor,
logFile=stdout,
whenRunning=whenRunning,
)
runner.run()
def wrapper(
_f: AsynchronousCommandHandler, *args: object, **kwargs: object
) -> None:
reactorModule.install()
from twisted.internet import reactor as installedReactor
async def run() -> None:
await _f(*args, **kwargs)
installedReactor.stop()
react(run)
if f is None:
def decorator(f: CommandHandler) -> CommandHandler:
return update_wrapper(partial(wrapper, f), f)
return decorator
else:
return update_wrapper(partial(wrapper, f), f)
@click.group()
@click.version_option()
@click.pass_context
def main(ctx: click.Context) -> None:
pass
@main.command()
@runInReactor(reactorModule=asyncioReactorModule)
async def okta_find_users() -> None:
from twisted.internet import reactor
print(reactor) # Should be twisted.internet.asyncioreactor.*
client = OktaClient(
dict(orgUrl="https://fnord.burningman.org", token="fnord")
)
# Convert the future-yielding coroutine into a Deferred
users, resp, err = await Deferred.fromFuture(
ensure_future(client.list_users())
)
print(users, resp, err)
if __name__ == "__main__":
Command.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment