Skip to content

Instantly share code, notes, and snippets.

@lmyyao
Created August 1, 2016 01:44
Show Gist options
  • Save lmyyao/a58b8885702c015d72739fbed58c8916 to your computer and use it in GitHub Desktop.
Save lmyyao/a58b8885702c015d72739fbed58c8916 to your computer and use it in GitHub Desktop.
Connection Pool Demo
# from .celery import app
import heapq
import socket
from collections import UserDict
from functools import singledispatch, update_wrapper
from operator import itemgetter
from select import poll, POLLIN
from urllib.parse import urlparse
import httplib2
class LRUCache(object):
pass
class LFUCache(object):
def __init__(self, size=100):
self._data = {}
self._hits = {}
self._size = size
@property
def full(self):
return self._size <= len(self._data)
def put(self, key, value):
if self.full:
self._delete()
self._data[key] = value
self._hits[key] = 0
def get(self, key):
value = self._data.get(key, None)
if value:
self._hits[key] += 1
return value
def _delete(self):
key, = heapq.nsmallest(1, self._hits.items(), key=itemgetter(1))
del self._data[key[0]]
del self._hits[key[0]]
class HeaderDict(UserDict):
def __setitem__(self, key, value):
return super().__setitem__(key.title(), value)
def __getitem__(self, item):
return super().__getitem__(item.title())
def methdispatch(func):
dispatcher = singledispatch(func)
def wrapper(*args, **kw):
return dispatcher.dispatch(args[1].__class__)(*args, **kw)
wrapper.register = dispatcher.register
update_wrapper(wrapper, func)
return wrapper
def _is_active(sock):
if sock is not None:
p = poll()
p.register(sock, POLLIN)
for fno, ev in p.poll(0.0):
if fno == sock.fileno():
return False
else:
return True
return False
class Pool(object):
def __init__(self, size):
self._connections = LFUCache(size)
self.hits = 0
self.count = 0
def __str__(self):
return 'Pool total request: {}, hits: {}'.format(self.count, self.hits)
def get_connection(self, host, port, timeout):
key = "{0}:{1}".format(host, port)
connection = self._connections.get(key)
if connection and _is_active(connection.sock):
self.hits += 1
self.count += 1
else:
self.count += 1
connection = httplib2.HTTPConnectionWithTimeout(host, port, timeout=timeout)
self._connections.put(key, connection)
return connection
def close(self):
pass
DefaultPool = Pool(10)
class Client(object):
def __init__(self, url, method="GET", body=None, timeout=10, headers=None, pool=None):
self.method = method
self.request_body = body
self.timeout = timeout
self.request_headers = HeaderDict(headers or {})
self.response_headers = None
self.status = None
self.pool = pool or DefaultPool
self._parse_url(url)
def _parse_url(self, url):
url = urlparse(url)
self.host, self.port, self.path = url.netloc, url.port or 80, url.path or "/"
def request(self):
try:
http = self.pool.get_connection(self.host, self.port, self.timeout)
http.request(self.method, self.path, self.request_body, self.request_headers)
response = http.getresponse()
self.status = response.status
if response.status == 200:
self.response_headers = HeaderDict(response.getheaders())
self.on_server_response(response.read())
except Exception as e:
self.on_error(e)
def on_server_response(self, body):
# print(body)
pass
@methdispatch
def on_error(self, e):
print(e)
@on_error.register(socket.gaierror)
def _(self, e):
print(e, "gaierror")
@on_error.register(TimeoutError)
def _(self, e):
print(e, "timeout")
@on_error.register(ConnectionError)
def _(self, e):
print(e, "connectionerror")
URLs = ["http://www.baidu.com", "http://www.baidu.com", "http://news.baidu.com", "http://news.baidu.com",
"http://music.baidu.com"]
for i in URLs:
client = Client(i)
client.request()
print(DefaultPool)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment