Skip to content

Instantly share code, notes, and snippets.

@dlecocq
Created May 13, 2016 16:35
Show Gist options
  • Save dlecocq/857dfe77e9bb884b2a926a80a15e18db to your computer and use it in GitHub Desktop.
Save dlecocq/857dfe77e9bb884b2a926a80a15e18db to your computer and use it in GitHub Desktop.
#! /usr/bin/env python
from gevent import monkey
monkey.patch_all()
import time
import ujson as json
from functools import wraps
import gevent
import gevent.queue
from drop import BdbQueue, FlatQueue
def timer(func):
@wraps(func)
def decorated(*args, **kwargs):
start = -time.time()
result = func(*args, **kwargs)
start += time.time()
print '%s(*, **%s) => %10.5f' % (func.__name__, kwargs, start)
return result
return decorated
data = {
'url': 'https://moz.com/',
'referrer': 'http://moz.com/'
}
@timer
def time_puts(queue, count):
for i in xrange(count):
queue.put(data)
@timer
def time_pops(queue, count):
for i in xrange(count):
queue.get()
queue = gevent.queue.Queue()
for count in [10, 100, 1000, 10000, 100000]:
time_puts(queue, count=count)
time_pops(queue, count=count)
queue = BdbQueue('/tmp/queue/bar')
for count in [10, 100, 1000, 10000]:
time_puts(queue, count=count)
time_pops(queue, count=count)
queue = FlatQueue('/tmp/queue/foo')
for count in [10, 100, 1000, 10000]:
time_puts(queue, count=count)
time_pops(queue, count=count)
import struct
import Queue
from bsddb3 import DB
import gevent.event
import simplejson as json
class BdbQueue(object):
def __init__(self, path):
self._db = DB.DB()
self._db.open(path, dbtype=DB.DB_BTREE, flags=DB.DB_CREATE)
# Syncronization that allows us to blockingly wait for results
self._event = gevent.event.Event()
# Properly initialize the number of entries
self._length = self._db.stat()['ndata']
# Get the value of the last key as our starting point
last = self._db.cursor().last()
if last:
last = self.unpack(last[0]) + 1
else:
last = 0
self._keys = iter(self.keys_generator(last))
def pack(self, count):
return struct.pack('!Q', count)
def unpack(self, packed):
return struct.unpack('!Q', packed)[0]
def keys_generator(self, count=0):
while True:
yield self.pack(count)
count += 1
def sync(self):
self._db.sync()
def put(self, item):
self._db.put(next(self._keys), json.dumps(item))
self.sync()
self._length += 1
self._event.set()
def get(self):
'''Pop off a result.'''
try:
while True:
while not self._length:
self._event.wait()
self._event.clear()
cursor = self._db.cursor()
result = cursor.first()
if result:
key, value = result
result = json.loads(value)
cursor.delete()
self.sync()
self._length -= 1
return result
except:
raise Queue.Empty()
def __iter__(self):
while True:
try:
yield self.get()
except Queue.Empty:
break
class FlatQueue(object):
def __init__(self, path):
self._path = path
self._fout = open(self._path, 'w+')
self._fin = open(self._path)
self._length = 0
self._event = gevent.event.Event()
def close(self):
# TODO(dan): robust
self._fout.close()
self._fin.close()
def put(self, item):
self._fout.write(json.dumps(item))
self._fout.write('\n')
self._fout.flush()
self._length += 1
self._event.set()
def get(self):
try:
while True:
while not self._length:
self._event.wait()
self._event.clear()
line = self._fin.readline()
if line:
result = json.loads(line)
self._length -= 1
return result
except:
raise Queue.Empty()
def __iter__(self):
while True:
try:
yield self.get()
except Queue.Empty:
break
Gevent Queue
============
time_puts(*, **{'count': 10}) => 0.00002
time_pops(*, **{'count': 10}) => 0.00001
time_puts(*, **{'count': 100}) => 0.00005
time_pops(*, **{'count': 100}) => 0.00009
time_puts(*, **{'count': 1000}) => 0.00044
time_pops(*, **{'count': 1000}) => 0.00053
time_puts(*, **{'count': 10000}) => 0.00437
time_pops(*, **{'count': 10000}) => 0.00511
time_puts(*, **{'count': 100000}) => 0.04923
time_pops(*, **{'count': 100000}) => 0.06889
Bdb Queue
=========
time_puts(*, **{'count': 10}) => 0.00257
time_pops(*, **{'count': 10}) => 0.00217
time_puts(*, **{'count': 100}) => 0.01238
time_pops(*, **{'count': 100}) => 0.01311
time_puts(*, **{'count': 1000}) => 0.17238
time_pops(*, **{'count': 1000}) => 0.14911
time_puts(*, **{'count': 10000}) => 1.28870
time_pops(*, **{'count': 10000}) => 1.23262
Flat Queue
==========
time_puts(*, **{'count': 10}) => 0.00016
time_pops(*, **{'count': 10}) => 0.00007
time_puts(*, **{'count': 100}) => 0.00087
time_pops(*, **{'count': 100}) => 0.00031
time_puts(*, **{'count': 1000}) => 0.00725
time_pops(*, **{'count': 1000}) => 0.00368
time_puts(*, **{'count': 10000}) => 0.08436
time_pops(*, **{'count': 10000}) => 0.03117
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment