Skip to content

Instantly share code, notes, and snippets.

@snower
Last active August 29, 2015 14:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save snower/02a208ad99835fd56e57 to your computer and use it in GitHub Desktop.
Save snower/02a208ad99835fd56e57 to your computer and use it in GitHub Desktop.
auto static refresh time proxy
# -*- coding: utf-8 -*-
# 14/12/16
# create by: snower
import sys
import time
import re
import logging
import traceback
from tornado.ioloop import IOLoop
from tornado.httpserver import HTTPServer
from tornado.web import Application, RequestHandler
from tornado.httpclient import HTTPError
from tornado import gen
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
application = None
server = None
ioloop = None
PORT = sys.argv[1]
PROXY_PASS = sys.argv[2]
def request(method):
@gen.coroutine
def _(self, *args, **kwargs):
try:
yield method(self, *args, **kwargs)
except Exception as e:
self.set_status(200)
tb = traceback.format_exc()
tb = "".join(["<p>%s</p>\n" % _tb for _tb in tb.split("\n")])
self.finish(u'''
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=1,minimum-scale=1,user-scalable=no" />
<div>proxy error: %s</div>\n<div>%s</div>'''
% (e, tb))
return _
class ProxyRequestHandler(RequestHandler):
rs = (
re.compile(r'[\'"](.+?\/.+?\.html.*?)[\'"]'),
re.compile(r'[\'"](.+?\/.+?\.js.*?)[\'"]'),
re.compile(r'[\'"](.+?\/.+?\.css.*?)[\'"]'),
re.compile(r'[\'"](.+?\/.+?\.(png|jpeg|jpg|gif|webp).*?)[\'"]'),
)
def check_etag_header(self):
return False
def set_etag_header(self):
return
@request
@gen.coroutine
def head(self, url):
response = yield self.fetch(url, 'head')
yield self.write_response(response)
@request
@gen.coroutine
def get(self, url):
response = yield self.fetch(url, 'get')
if not response.error:
body = self.rtpath(response.body)
else:
body = None
yield self.write_response(response, body)
@request
@gen.coroutine
def post(self, url):
response = yield self.fetch(url, 'post')
yield self.write_response(response)
@request
@gen.coroutine
def delete(self, url):
response = yield self.fetch(url, 'delete')
yield self.write_response(response)
@request
@gen.coroutine
def patch(self, url):
response = yield self.fetch(url, 'patch')
yield self.write_response(response)
@request
@gen.coroutine
def put(self, url):
response = yield self.fetch(url, 'put')
yield self.write_response(response)
@request
@gen.coroutine
def options(self, url):
response = yield self.fetch(url, 'options')
yield self.write_response(response)
@gen.coroutine
def fetch(self, url, method):
client = AsyncHTTPClient()
request = HTTPRequest(PROXY_PASS + self.request.uri, method.upper(), self.request.headers, self.request.body or None, connect_timeout=5, request_timeout=5, follow_redirects=False, max_redirects=0)
try:
response = yield client.fetch(request)
except HTTPError as e:
response = e.response
raise gen.Return(response)
@gen.coroutine
def write_response(self, response, body=None):
self.set_status(response.code, response.reason)
for key, value in response.headers.iteritems():
if key != 'Content-Length':
self.set_header(key, value)
self.finish(response.body or None if body is None else body)
def rtpath(self, body):
for r in self.rs:
urls = re.findall(r, body)
for url in urls:
if isinstance(url, tuple):
url = url[0]
rurl = url+"&ts="+str(int(time.time())) if "?" in url else url+"?ts="+str(int(time.time()))
body = re.sub(url, rurl, body)
return body
urls = (
(r'^(.*)$',ProxyRequestHandler),
)
def start():
global application, server, ioloop
application = Application(urls, debug=False, autoreload=False)
server = HTTPServer(application, xheaders=True)
server.bind(PORT)
server.start()
ioloop = IOLoop.instance()
ioloop.start()
def stop():
global application, server, ioloop
def _():
application.stop()
server.stop()
def stop_loop():
now = time.time()
if ioloop._callbacks or ioloop._timeouts:
ioloop.add_timeout(now + 0.5, stop_loop)
else:
ioloop.stop()
stop_loop()
ioloop.add_callback_from_signal(_)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(process)d %(levelname)s %(message)s")
start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment