Skip to content

Instantly share code, notes, and snippets.

@rob-blackbourn
Last active October 8, 2020 10:19
Show Gist options
  • Save rob-blackbourn/49f2f053bf3106e71a8fed735f9b91c2 to your computer and use it in GitHub Desktop.
Save rob-blackbourn/49f2f053bf3106e71a8fed735f9b91c2 to your computer and use it in GitHub Desktop.
Graphene 3 example subscription
import typing
import graphene
from jetblack_tweeter import Tweeter
from .types import TweetType
class Subscription(graphene.ObjectType):
filter = graphene.Field(
TweetType,
track=graphene.List(graphene.String, required=True))
@staticmethod
async def subscribe_filter(root, info, track: typing.List[str]):
tweeter: Tweeter = info.context['tweeter']
async for tweet in tweeter.stream.filter(track=track):
yield tweet
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment