Last active
May 21, 2018 06:53
-
-
Save drgarcia1986/2b8c1a8080de67877066 to your computer and use it in GitHub Desktop.
asyncio/muffin/aiohttp examples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import ProcessPoolExecutor | |
import time | |
import muffin | |
app = muffin.Application('hello_world') | |
def get_msg(): | |
time.sleep(5) | |
return 'foo' | |
@app.register('/') | |
def msg(request): | |
with ProcessPoolExecutor() as executor: | |
msg = yield from request.app.loop.run_in_executor(executor, get_msg) | |
return msg | |
if __name__ == '__main__': | |
app.manage() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import ThreadPoolExecutor | |
import time | |
import muffin | |
app = muffin.Application('hello_world') | |
def get_msg(): | |
time.sleep(5) | |
return 'foo' | |
@app.register('/') | |
def msg(request): | |
with ThreadPoolExecutor(max_workers=2) as executor: | |
msg = yield from request.app.loop.run_in_executor(executor, get_msg) | |
return msg | |
if __name__ == '__main__': | |
app.manage() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import muffin | |
app = muffin.Application('cbv') | |
@app.register('/hello', '/hello/{name}') | |
class Hello(muffin.Handler): | |
def get(self, request): | |
name = request.match_info.get('name') | |
if name: # /hello/foo for example | |
return {'hello': name} | |
name_query = request.GET.get('name', 'anonymous') | |
return {'hello': name_query} | |
def post(self, request): | |
data = yield from request.json() | |
return {'data_posted': data} | |
if __name__ == '__main__': | |
app.manage() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import muffin | |
app = muffin.Application('cmdline') | |
@app.manage.command | |
def hello(name: str, upper=False): | |
""" | |
Print a simple hello msg | |
:param name: Your name :) | |
:param upper: Show msg in uppercase | |
""" | |
msg = 'Hello {}'.format(name) | |
if upper: | |
msg = msg.upper() | |
print(msg) | |
""" | |
$ muffin muffin_command_line hello Diego --upper | |
HELLO DIEGO | |
$ muffin muffin_command_line hello Diego | |
Hello Diego | |
$ muffin muffin_command_line hello -h | |
usage: muffin hello [-h] [--upper] name | |
Print a simple hello msg | |
positional arguments: | |
name Your name :) | |
optional arguments: | |
-h, --help show this help message and exit | |
--upper Enable show msg in uppercase | |
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import muffin | |
app = muffin.Application('hello_world') | |
@app.register('/') | |
def hello(request): | |
return 'Hello World!' | |
if __name__ == '__main__': | |
# python muffin_hello_world_example.py run | |
app.manage() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import muffin | |
@asyncio.coroutine | |
def version_middleware(app, handler): | |
@asyncio.coroutine | |
def middleware(request): | |
response = yield from handler(request) | |
response.headers['X-Version'] = '1.0' | |
return response | |
return middleware | |
app = muffin.Application( | |
'middleware_example', | |
middlewares=(version_middleware,) | |
) | |
@app.register('/') | |
def version(request): | |
return 'check version in http header!' | |
if __name__ == '__main__': | |
app.manage() | |
""" | |
$ curl -i http://localhost:5000/ | |
HTTP/1.1 200 OK | |
CONTENT-TYPE: text/html; charset=utf-8 | |
CONTENT-LENGTH: 29 | |
X-VERSION: 1.0 | |
CONNECTION: keep-alive | |
DATE: Sun, 08 Nov 2015 00:38:38 GMT | |
SERVER: Python/3.4 aiohttp/0.18.3 | |
check version in http header! | |
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import muffin | |
app = muffin.Application('not_found') | |
@app.register('/not_found') | |
def not_found(request): | |
return muffin.Response( | |
status=404, | |
text='{"msg": "Not Found"}', | |
content_type = 'application/json' | |
) | |
# muffin muffin_status_code run |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import aiohttp | |
URL_LIST = [ | |
'http://google.com', | |
'http://abc.xyz', | |
'http://github.com', | |
'https://www.python.org/' | |
] | |
@asyncio.coroutine | |
def fetch_url(url): | |
response = yield from aiohttp.request('GET', url) | |
return (yield from response.text()) | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
urls = { | |
asyncio.async(fetch_url(url)): url | |
for url in URL_LIST | |
} | |
responses, _ = loop.run_until_complete(asyncio.wait(urls)) | |
for response in responses: | |
url = urls[response] | |
print('{}: {}'.format(url, len(response.result()))) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment