Skip to content

Instantly share code, notes, and snippets.

@Hanny2010
Last active March 13, 2024 02:45
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save Hanny2010/2ebee61894c661b0cab91fd131e2ef6e to your computer and use it in GitHub Desktop.
Save Hanny2010/2ebee61894c661b0cab91fd131e2ef6e to your computer and use it in GitHub Desktop.
Connect to Redis server over Squid Proxy
import socket
from redis.client import Redis
from redis.connection import Connection, ConnectionPool
from redis._compat import iteritems
class ProxyConnection(Connection):
def __init__(self, host='localhost', port=6379, db=0, password=None,
socket_timeout=None, socket_connect_timeout=None,
socket_keepalive=False, socket_keepalive_options=None,
socket_type=0, retry_on_timeout=False, encoding='utf-8',
encoding_errors='strict', decode_responses=False,
socket_read_size=65536, health_check_interval=0, proxy=None):
self.proxy = proxy
super().__init__(host=host, port=port, db=db, password=password,
socket_timeout=socket_timeout, socket_connect_timeout=socket_connect_timeout,
socket_keepalive=socket_keepalive,
socket_keepalive_options=socket_keepalive_options,
socket_type=socket_type, retry_on_timeout=retry_on_timeout,
encoding=encoding, encoding_errors=encoding_errors,
decode_responses=decode_responses,
socket_read_size=socket_read_size, health_check_interval=health_check_interval)
def _connect(self):
err = None
for res in socket.getaddrinfo(self.host, self.port, self.socket_type,
socket.SOCK_STREAM):
family, socktype, proto, canonname, socket_address = res
sock = None
try:
sock = socket.socket(family, socktype, proto)
# TCP_NODELAY
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# TCP_KEEPALIVE
if self.socket_keepalive:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
for k, v in iteritems(self.socket_keepalive_options):
sock.setsockopt(socket.IPPROTO_TCP, k, v)
# set the socket_connect_timeout before we connect
sock.settimeout(self.socket_connect_timeout)
# connect
sock = self._http_proxy_connect(sock, socket_address, self.proxy)
# set the socket_timeout now that we're connected
sock.settimeout(self.socket_timeout)
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
raise socket.error("socket.getaddrinfo returned an empty list")
# https://gist.github.com/frxstrem/4487802
def _http_proxy_connect(self, sock, address, proxy=None):
if not proxy:
sock.connect(address)
return sock
headers = {'host': address}
sock.connect(proxy)
fp = sock.makefile('rw')
fp.write('CONNECT %s:%d HTTP/1.0\r\n' % address)
fp.write('\r\n'.join('%s: %s' % (k, v) for (k, v) in headers.items()) + '\r\n\r\n')
fp.flush()
statusline = fp.readline().rstrip('\r\n')
if statusline.count(' ') < 2:
fp.close()
sock.close()
raise IOError('Bad response')
version, status, _ = statusline.split(' ', 2)
if version not in ('HTTP/1.0', 'HTTP/1.1'):
fp.close()
sock.close()
raise IOError('Unsupported HTTP version')
try:
status = int(status)
except ValueError:
fp.close()
sock.close()
raise IOError('Bad response')
fp.close()
return sock
class ProxyConnectionPool(ConnectionPool):
def __init__(self, connection_class=ProxyConnection, max_connections=None, **connection_kwargs):
super().__init__(connection_class=connection_class, max_connections=max_connections, **connection_kwargs)
class ProxyRedis(Redis):
def __init__(self, host='localhost', port=6379,
db=0, password=None, socket_timeout=None,
socket_connect_timeout=None,
socket_keepalive=None, socket_keepalive_options=None,
connection_pool=None, unix_socket_path=None,
encoding='utf-8', encoding_errors='strict',
charset=None, errors=None,
decode_responses=False, retry_on_timeout=False,
ssl=False, ssl_keyfile=None, ssl_certfile=None,
ssl_cert_reqs='required', ssl_ca_certs=None,
max_connections=None, single_connection_client=False,
health_check_interval=0, proxy=None):
kwargs = {
'db': db,
'password': password,
'socket_timeout': socket_timeout,
'encoding': encoding,
'encoding_errors': encoding_errors,
'decode_responses': decode_responses,
'retry_on_timeout': retry_on_timeout,
'max_connections': max_connections,
'health_check_interval': health_check_interval,
# TCP specific options
'host': host,
'port': port,
'socket_connect_timeout': socket_connect_timeout,
'socket_keepalive': socket_keepalive,
'socket_keepalive_options': socket_keepalive_options,
'proxy': proxy,
}
connection_pool = ProxyConnectionPool(**kwargs)
super().__init__(connection_pool=connection_pool)
import redis
from proxy_connection import ProxyRedis
redis_client = ProxyRedis(host=host, password=password, proxy=(PROXY_IP, PROXY_PORT))
@yoogottamk
Copy link

yoogottamk commented Oct 7, 2020

Hello @Hanny2010!
Thank you!! This was really helpful.

This has a small bug though: line 131 of redis_proxy_connection.py doesn't check whether connection_pool was provided or not.

Changing

        connection_pool = ProxyConnectionPool(**kwargs)

to

        if connection_pool is None:
            kwargs = ...
            connection_pool = ProxyConnectionPool(**kwargs)

will fix this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment