Skip to content

Instantly share code, notes, and snippets.

@jmcarp
Last active August 29, 2015 14:10
Show Gist options
  • Save jmcarp/1e0d17f4f4541ff63866 to your computer and use it in GitHub Desktop.
Save jmcarp/1e0d17f4f4541ff63866 to your computer and use it in GitHub Desktop.
water-butlers
import asyncio
import aiohttp
import tornado.web
import tornado.options
import tornado.concurrent
import tornado.platform.asyncio
URL = '...'
UPLOAD_URL = '...'
DEBUG = True
PORT = 8080
ADDRESS = '127.0.0.1'
CHUNK_SIZE = 1024 * 64
# Running Tornado on asyncio's event loop, including 'yield from' support in request handlers
# https://gist.github.com/BeholdMyGlory/11067131
def coroutine(func):
func = asyncio.coroutine(func)
def decorator(*args, **kwargs):
future = tornado.concurrent.Future()
def future_done(f):
try:
future.set_result(f.result())
except Exception as e:
future.set_exception(e)
asyncio.async(func(*args, **kwargs)).add_done_callback(future_done)
return future
return decorator
class DownloadHandler(tornado.web.RequestHandler):
@coroutine
def get(self):
resp = yield from aiohttp.request('get', URL)
while True:
chunk = yield from resp.content.read(CHUNK_SIZE)
if not chunk:
break
self.write(chunk)
self.flush()
@tornado.web.stream_request_body
class UploadHandler(tornado.web.RequestHandler):
def prepare(self):
self.stream = asyncio.StreamReader()
self.uploader = aiohttp.request(
'put',
UPLOAD_URL,
data=self.stream,
headers={'Content-Length': self.request.headers.get('Content-Length')},
)
def data_received(self, chunk):
self.stream.feed_data(chunk)
@coroutine
def put(self):
self.stream.feed_eof()
resp = yield from self.uploader
self.set_status(resp.status)
def make_app(debug):
return tornado.web.Application(
[
(r'/', DownloadHandler),
(r'/upload', UploadHandler),
],
debug=debug,
)
def main():
tornado.platform.asyncio.AsyncIOMainLoop().install()
tornado.options.parse_command_line()
app = make_app(DEBUG)
app.listen(PORT, ADDRESS)
asyncio.get_event_loop().set_debug(DEBUG)
asyncio.get_event_loop().run_forever()
if __name__ == '__main__':
main()
import asyncio
import aiohttp
from aiohttp import web, request
DOWNLOAD_URL = '...'
UPLOAD_URL = '...'
@asyncio.coroutine
def fetch(req):
resp = yield from request('GET', DOWNLOAD_URL)
stream = web.StreamResponse(req)
while True:
chunk = yield from resp.content.read(1024 * 128)
if not chunk:
break
stream.write(chunk)
return stream
@asyncio.coroutine
def upload(req):
stream = asyncio.StreamReader()
uploader = request('PUT', UPLOAD_URL, data=stream, headers={'Content-Length': req.headers.get('Content-Length')})
while True:
chunk = yield from req.payload.read(1024 * 128)
if not chunk:
break
stream.feed_data(chunk)
stream.feed_eof()
resp = yield from uploader
return web.Response(req, b'success')
app = web.Application()
app.router.add_route('GET', '/', fetch)
app.router.add_route('PUT', '/upload', upload)
loop = asyncio.get_event_loop()
f = loop.create_server(app.make_handler, '0.0.0.0', 8080)
srv = loop.run_until_complete(f)
loop.run_forever()
var request = require('request');
var express = require('express');
var app = express();
var downloadUrl = '...';
var uploadUrl = '...';
app.get('/', function (req, res) {
request.get(downloadUrl).pipe(res);
});
app.put('/upload/', function(req, res) {
req.pipe(request.put(uploadUrl)).pipe(res);
});
app.listen(8080);
package main
import (
"io"
"log"
"net/http"
)
func hello(w http.ResponseWriter, r *http.Request) {
resp, err := http.Get("...")
if (err != nil) {
log.Fatal(err)
}
io.Copy(w, resp.Body)
defer resp.Body.Close()
}
func main() {
http.HandleFunc("/", hello)
log.Fatal(http.ListenAndServe(":8080", nil))
}
import tornado.gen
import tornado.web
import tornado.ioloop
from tornado.httpclient import AsyncHTTPClient
import toro
DOWNLOAD_URL = '...'
UPLOAD_URL = '...'
class MainHandler(tornado.web.RequestHandler):
client = AsyncHTTPClient()
@tornado.gen.coroutine
def get(self):
yield self.client.fetch(DOWNLOAD_URL, streaming_callback=self.streaming)
self.finish()
def streaming(self, chunk):
self.write(chunk)
@tornado.web.stream_request_body
class UploadHandler(tornado.web.RequestHandler):
client = AsyncHTTPClient()
def prepare(self):
self.queue = toro.Queue()
self.uploading = False
@tornado.gen.coroutine
def data_received(self, chunk):
yield self.queue.put(chunk)
if not self.uploading:
self.uploading = True
request = tornado.httpclient.HTTPRequest(UPLOAD_URL, method='PUT', body_producer=self.produce, headers={'Content-Length': self.request.headers.get('Content-Length')})
self.client.fetch(request, callback=self.finished)
@tornado.gen.coroutine
def produce(self, write):
while True:
try:
chunk = yield self.queue.get(deadline=1000)
except toro.Timeout:
break
yield write(chunk)
def finished(self, response):
self.write(response.body)
self.set_status(response.code)
self.finish()
@tornado.web.asynchronous
def put(self):
pass
if __name__ == "__main__":
application = tornado.web.Application([
(r"/", MainHandler),
(r"/upload", UploadHandler),
], debug=True)
application.listen(8080)
tornado.ioloop.IOLoop.instance().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment