Skip to content

Instantly share code, notes, and snippets.

@haridas
Created March 24, 2015 03:10
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save haridas/e7db5d28536fab4ebe52 to your computer and use it in GitHub Desktop.
Save haridas/e7db5d28536fab4ebe52 to your computer and use it in GitHub Desktop.
Ketama based Consistent hashing implementation of python-memcache library.
"""
To Test this Script the start 8 memcache servers using this command.
$ memcached -d -p {PortNumber}
PortNumber:
11211
11212
11213
11214
11215
11216
11217
11218
If this is done then run this program by execution.
$ python new_memecached.py
"""
import random
import string
import memcache
from binascii import crc32
class MemcacheClient(memcache.Client):
""" A memcache subclass. It currently allows you to add a new host at run
time.
Sadly, this truely messes with the our keys. I.E. Adding a host at runtime
effectively wipes our cache all together...Wonder why?
"""
def _get_server(self, key):
""" Current implementation of Memcache client
"""
return super(MemcacheClient, self)._get_server(key)
def add_server(self, server):
""" Adds a host at runtime to client
"""
# Create a new host entry
server = memcache._Host(
server, self.debug, dead_retry=self.dead_retry,
socket_timeout=self.socket_timeout,
flush_on_reconnect=self.flush_on_reconnect
)
# Add this to our server choices
self.servers.append(server)
# Update our buckets
self.buckets.append(server)
class KetamaMemcacheClient(memcache.Client):
"""
This memcache client implements consistent hashing algorithm "ketama".
This will make sure that the cache miss happening while adding or removing
a node from the client to very minimal.
"""
#
# Server weight means, numer of slots given for one server. For better
# performence it whould be between 100-200 - Adjust the weight to see how
# cache miss changing.
#
DEFAULT_SERVER_WEIGHT = 200
# Total number of slots on the ring.
# If addition or deltion of a new node only causes 1 to 5 percentage cache
# miss on the current configuraiton. ie; K / RING_SIZE - where K means total
# keys.
RING_SIZE = 2 ** 16
def __init__(self, *args, **kwargs):
"""
Add some special parameters to handle the servers allocation.
"""
# Mapping between ring slot -> server.
self._ketama_server_ring = {}
# Sorted server slots on top of the virtual ring.
self._ketama_server_slots = []
super(KetamaMemcacheClient, self).__init__(*args, **kwargs)
def _get_server(self, key):
"""
Get the memcache server corresponding to the given key.
:param key: The input query.
:return: A tuple with (server_obj, key).
"""
# map the key on to the ring slot space.
h_key = self._generate_ring_slot(key)
for slot in self._ketama_server_slots:
if h_key <= slot:
server = self._ketama_server_ring[slot]
if server.connect():
return (server, key)
# Even after allocating the server, if the h_key won't fit
# on any server, then pick the first server on the ring.
server = self._ketama_server_ring[self._ketama_server_slots[0]] if \
self._ketama_server_slots else None
server and server.connect()
return server, key
def add_server(self, server):
"""
Add new server to the client.
:param servers: server host in <IP>:<PORT> format.
or in tuple of (<IP>:<PORT>, weight)
"""
server_obj = memcache._Host(
server if isinstance(server, tuple) else (
server, self.DEFAULT_SERVER_WEIGHT),
self.debug, dead_retry=self.dead_retry,
socket_timeout=self.socket_timeout,
flush_on_reconnect=self.flush_on_reconnect)
self._place_server_on_ring(server_obj)
def set_servers(self, servers):
"""
Add a pool of servers into the client.
:param servers: List of server hosts in <IP>:<PORT> format.
or
List of tuples with each tuple of the format
(<IP>:<PORT>, weight)
"""
# Set the default weight if weight isn't passed.
self.servers = [memcache._Host(
s if isinstance(s, tuple) else (s, self.DEFAULT_SERVER_WEIGHT),
self.debug, dead_retry=self.dead_retry,
socket_timeout=self.socket_timeout,
flush_on_reconnect=self.flush_on_reconnect) for s in servers]
# Place all the servers on rings based on the slot allocation
# specifications.
# [self._place_server_on_ring(s) for s in self.servers]
map(self._place_server_on_ring, self.servers)
def _place_server_on_ring(self, server):
"""
Place given server on the ring.
:param server: An instance of :class:~`memcache._Host`.
"""
server_slots = self._get_server_slots_on_ring(server)
for slot in server_slots:
if slot not in self._ketama_server_ring:
self._ketama_server_ring[slot] = server
self._ketama_server_slots.append(slot)
else:
# There is a key collection(<<<1% chance).
# Discarding this scenario now.
# TODO: Handle it.
pass
# Append the sorted server slot list
self._ketama_server_slots.sort()
def _get_server_slots_on_ring(self, server):
"""
Returns list of slot on the ring for given server.
This make sure that the slots won't collid with others server.
:param server: An object of :class:~`memcache._Host`.
:return: list of slots on the ring.
"""
server_slots = []
for i in range(0, server.weight):
server_key = "{}_{}".format("{}:{}".format(server.ip,
server.port), i)
server_slots.append(self._generate_ring_slot(server_key))
return server_slots
def _generate_ring_slot(self, key):
"""
Hash function which give random slots on the ring. Hash functon make
sure that the key distribution is even as much as possible.
:param key: Key which need to be mapped to the hash space.
:type key: str
:return: hash key corresponding to `key`
"""
# Simple hash method using python's internal hash algorithm.
#h_key = hash(key) & 0xffff
# crc32 based hashing
#h_key = ((crc32(key) & 0xffffffff) >> 16) & 0xffff
# For better randomness
h_key = ((crc32(key) & 0xffffffff)) & 0xffff
return h_key
def random_key(size):
""" Generates a random key
"""
return ''.join(random.choice(string.letters) for _ in range(size))
if __name__ == '__main__':
# We have 7 running memcached servers
servers = ['127.0.0.1:1121%d' % i for i in range(1, 8)]
# We have 100 keys to split across our servers
def _naive_memcache_client(servers):
print "============= USING NAIVE HASHING ALGORITHM ================"
keys = [random_key(10) for i in range(100)]
# Init our subclass
client = MemcacheClient(servers=servers)
# Distribute the keys on our servers
for key in keys:
client.set(key, 1)
# Check how many keys come back
valid_keys = client.get_multi(keys)
print '%s percent of keys matched' % ((len(valid_keys)/float(len(keys))) * 100)
# We add another server...and pow!
client.add_server('127.0.0.1:11218')
print 'Added new server'
valid_keys = client.get_multi(keys)
print '%s percent of keys stil matched' % ((len(valid_keys)/float(len(keys))) * 100)
def _ketama_memcache_client(servers):
print "============= USING KETAMA HASHING ALGORITHM ================"
keys = [random_key(10) for i in range(100)]
ketama_client = KetamaMemcacheClient(servers=servers)
# Distribute the keys on our servers
for key in keys:
#client.set(key, 1)
ketama_client.set(key, 1)
valid_keys = ketama_client.get_multi(keys)
print '%s percent of keys matched' % ((len(valid_keys)/float(len(keys))) * 100)
# We add another server...and pow!
ketama_client.add_server('127.0.0.1:11218')
print 'Added new server'
# Check how many keys come back
valid_keys = ketama_client.get_multi(keys)
print '%s percent of keys stil matched' % ((len(valid_keys)/float(len(keys))) * 100)
_naive_memcache_client(servers)
_ketama_memcache_client(servers)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment