Skip to content

Instantly share code, notes, and snippets.

@bstaint
Created September 25, 2019 02:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bstaint/3df1d4b99a10297cc79620952be7f5f4 to your computer and use it in GitHub Desktop.
Save bstaint/3df1d4b99a10297cc79620952be7f5f4 to your computer and use it in GitHub Desktop.
from gevent.pywsgi import WSGIHandler
from gevent.queue import Queue
from gevent import monkey
from bottle import Bottle
monkey.patch_all()
BAD_REQUEST_STATUS = '429 Too Many Requests'
BAD_REQUEST_BODY = b'Too many requests'
BAD_REQUEST_HEADERS = [('Content-Type', 'text/plain'),
('Connection', 'close'),
('Content-Length', f"{len(BAD_REQUEST_BODY)}")]
class NewConnLimiter:
def __init__(self, cc: int):
self.cc = cc
self.queue = Queue(cc)
def get(self) -> True:
if self.queue.qsize() >= self.cc:
print("Reached the rate limitation")
return False
self.queue.put(1)
return True
def release(self):
self.queue.get()
limiter = NewConnLimiter(1)
class MyWSGIHandler(WSGIHandler):
def handle_one_response(self):
if not limiter.get():
self.start_response(BAD_REQUEST_STATUS, BAD_REQUEST_HEADERS[:])
self.result = [BAD_REQUEST_BODY]
self.process_result()
self.close_connection = True
return
super().handle_one_response()
limiter.release()
def streamHandler(vid: int):
pass
def RegisterHandlers():
app = Bottle()
app.route('/videos/<vid:int>', method="GET", callback=streamHandler)
return app
if __name__ == "__main__":
app = RegisterHandlers()
app.run(host='localhost', port=9000, server="gevent", handler_class=MyWSGIHandler)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment