Last active
February 26, 2021 18:42
-
-
Save wsanchez/546a170b25339ac656dc86f55cc4740b to your computer and use it in GitHub Desktop.
Okta (Asyncio) + Twisted
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## | |
# See the file COPYRIGHT for copyright information. | |
## | |
""" | |
Burning Man Profiles CLI | |
""" | |
from asyncio import ensure_future | |
from functools import partial, update_wrapper | |
from sys import stdout | |
from typing import Any, Awaitable, Callable, Optional, overload | |
from attr import attrs | |
import click | |
from okta.client import Client as OktaClient | |
from twisted.application.runner._runner import Runner | |
from twisted.logger import Logger | |
from twisted.internet import asyncioreactor as asyncioReactorModule | |
from twisted.internet import default as defaultReactorModule | |
from twisted.internet.defer import Deferred, ensureDeferred | |
from twisted.python.failure import Failure | |
__all__ = () | |
log = Logger() | |
CommandHandler = Callable[..., None] | |
AsynchronousCommandHandler = Callable[..., Awaitable[None]] | |
@attrs(frozen=True, auto_attribs=True, slots=True, kw_only=True) | |
class Command: | |
""" | |
Command Line | |
""" | |
@classmethod | |
def main(cls) -> None: | |
""" | |
Command line entry point. | |
""" | |
main() | |
@overload | |
def runInReactor(*, reactorModule: Any) -> Callable: | |
... # pragma: no cover | |
@overload | |
def runInReactor(f: AsynchronousCommandHandler) -> CommandHandler: | |
... # pragma: no cover | |
def runInReactor( | |
f: Optional[AsynchronousCommandHandler] = None, | |
*, | |
reactorModule: Any = defaultReactorModule, | |
) -> Callable: | |
""" | |
Decorator that takes an asynchronous callable and returns a synchronous | |
callable that calls and awaits on the asynchronous callable in a Twisted | |
reactor, and then stops the reactor. | |
""" | |
def react(command: Callable) -> None: | |
from twisted.internet import reactor as installedReactor | |
def whenRunning() -> Deferred: | |
def error(f: Failure) -> None: | |
log.failure("Uncaught error: {log_failure}", failure=f) | |
installedReactor.stop() | |
d = ensureDeferred(command()) | |
d.addErrback(error) | |
return d | |
runner = Runner( | |
reactor=installedReactor, | |
logFile=stdout, | |
whenRunning=whenRunning, | |
) | |
runner.run() | |
def wrapper( | |
_f: AsynchronousCommandHandler, *args: object, **kwargs: object | |
) -> None: | |
reactorModule.install() | |
from twisted.internet import reactor as installedReactor | |
async def run() -> None: | |
await _f(*args, **kwargs) | |
installedReactor.stop() | |
react(run) | |
if f is None: | |
def decorator(f: CommandHandler) -> CommandHandler: | |
return update_wrapper(partial(wrapper, f), f) | |
return decorator | |
else: | |
return update_wrapper(partial(wrapper, f), f) | |
@click.group() | |
@click.version_option() | |
@click.pass_context | |
def main(ctx: click.Context) -> None: | |
pass | |
@main.command() | |
@runInReactor(reactorModule=asyncioReactorModule) | |
async def okta_find_users() -> None: | |
from twisted.internet import reactor | |
print(reactor) # Should be twisted.internet.asyncioreactor.* | |
client = OktaClient( | |
dict(orgUrl="https://fnord.burningman.org", token="fnord") | |
) | |
# Convert the future-yielding coroutine into a Deferred | |
users, resp, err = await Deferred.fromFuture( | |
ensure_future(client.list_users()) | |
) | |
print(users, resp, err) | |
if __name__ == "__main__": | |
Command.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment