Skip to content

Instantly share code, notes, and snippets.

@florianvazelle
Last active May 21, 2022 13:33
Show Gist options
  • Save florianvazelle/c7d8c49fb005a72ed5962ee55a83b000 to your computer and use it in GitHub Desktop.
Save florianvazelle/c7d8c49fb005a72ed5962ee55a83b000 to your computer and use it in GitHub Desktop.
Minimal example of an "ASGI-like" streaming server with Twisted

To execute this example:

  • in a terminal, run:
python async_stream_server.py
  • in another terminal, run:
curl http://localhost:1234/
import asyncio
from twisted.internet import asyncioreactor
twisted_loop = asyncio.new_event_loop()
asyncioreactor.install(twisted_loop)
import time
from sys import stdout
from twisted.web import http
from twisted.python.log import startLogging
from twisted.internet import reactor, endpoints
CHUNK_SIZE = 2**16
def async_partial(async_fn, *partial_args):
async def wrapped(*args):
return await async_fn(*partial_args, *args)
return wrapped
def iterable_content():
for _ in range(5):
time.sleep(1)
yield b"a" * CHUNK_SIZE
async def application(send):
for part in iterable_content():
await send(
{
"body": part,
"more_body": True,
}
)
await send({"more_body": False})
class Dummy(http.Request):
def process(self):
asyncio.ensure_future(
application(send=async_partial(self.handle_reply)),
loop=asyncio.get_event_loop()
)
async def handle_reply(self, message):
http.Request.write(self, message.get("body", b""))
if not message.get("more_body", False):
http.Request.finish(self)
print('HTTP response chunk')
class DummyFactory(http.HTTPFactory):
def buildProtocol(self, addr):
protocol = http.HTTPFactory.buildProtocol(self, addr)
protocol.requestFactory = Dummy
return protocol
startLogging(stdout)
endpoints.serverFromString(reactor, "tcp:1234").listen(DummyFactory())
asyncio.set_event_loop(reactor._asyncioEventloop)
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment