Skip to content

Instantly share code, notes, and snippets.

@JKDingwall
Last active August 29, 2015 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JKDingwall/db3706d4a86c1743bbb8 to your computer and use it in GitHub Desktop.
Save JKDingwall/db3706d4a86c1743bbb8 to your computer and use it in GitHub Desktop.
Test case for COUCHDB-2735
#!/usr/bin/env python2.7
from __future__ import print_function
import base64
import couchdbkit
import multiprocessing
import random
import restkit
import socketpool
import threading
import time
import uuid
DB_ADMIN = "admin"
DB_PASSWORD = "password"
DB_HOST = "127.0.0.1"
DB_PORT = 5984
DB_NAME = "atest"
DB_DOCS = 12000 # number of documents to generate
DDOC_COUNT = 8 # how many design documents to create
DDOC_MAPS = 6 # how many map/update pairs on each design document
#EDITS_PER_MAP = 36000 / (DDOC_COUNT * DDOC_MAPS) # how many edits to make per map
EDITS_PER_MAP = DB_DOCS
UPDATE_SLICES = 6 # how many slices in each update view
UPDATE_LIMIT = 250 # max number of rows to retrieve in update
BULK_SIZE = 25 # how many documents for bulk saves
def getserver():
"""\
Provide a couchdbkit server connection.
"""
uri = "http://{host}:{port}/".format(host=DB_HOST, port=DB_PORT)
session_options = {
"retry_delay": 0.3,
"max_size": 3,
"retry_max": 3,
"timeout": None
}
pool = socketpool.ConnectionPool(
factory=restkit.conn.Connection,
backend="thread", **session_options
)
ri = couchdbkit.resource.CouchdbResource(
filters=[restkit.BasicAuth(DB_ADMIN, DB_PASSWORD)]
)
return couchdbkit.Server(uri=uri, resource_instance=ri, pool=pool)
def getdb(server=None):
"""\
Provide couchdbkit database connection.
"""
if server is None:
server = getserver()
return server.get_db(DB_NAME)
def genddoc(num):
"""\
Generate a design document
"""
ddoc = {
"_id": "_design/ddoc{x}".format(x=num),
"language": "javascript",
"views": {
},
"updates": {
},
"validate_doc_update": """\
function(newDoc, oldDoc, userCtx, secObj) {
if(newDoc._deleted) {
return;
}
if(!newDoc.configuration.a0.length) {
throw { forbidden: "just because" };
}
// pointless time wasting
var less = 0;
for(var ni in newDoc.configuration.a0) {
for(var oi in oldDoc.configuration.a0) {
if(newDoc.configuration.a0[ni] < oldDoc.configuration.a0[oi]) {
less += 1
}
}
}
}"""
}
map = """\
function(doc) {{
emit(''+Math.round(Math.random()*{count}));
}}""".format(count=UPDATE_SLICES)
update = """\
function(doc, req) {
doc.configuration.k0 = Math.random();
doc.configuration.k1 = Math.random();
doc.configuration.k2 = Math.random();
doc.configuration.a0.push(Math.random());
if(req.query.pretend) {
rsp = {
code: 200,
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(doc)
};
return([null, rsp]);
}
else {
rsp = {
code: 200,
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({})
};
return([doc, rsp]);
}
}"""
for x in range(0, DDOC_MAPS):
ddoc["views"]["map{x}".format(x=x)] = {
"map": map,
"reduce": "_count"
}
ddoc["updates"]["update{x}".format(x=x)] = update
return ddoc
def gendoc(doc=None):
"""\
Generate a document.
"""
doc = doc or {
"_id": "doc-" + str(uuid.uuid4()),
"configuration": {
"a0": []
},
"longblob": ""
}
doc["garbage"] = {}
for x in range(0, int(round(random.random() * 10))):
xkey = base64.b64encode(str(random.random()))
doc["garbage"][xkey] = {}
for y in range(0, int(round(random.random() * 10))):
doc["garbage"][xkey][base64.b64encode(str(random.random()))] = base64.b64encode(str(random.random()))
doc["configuration"] = {
"k0": base64.b64encode(str(random.random())),
"k1": base64.b64encode(str(random.random())),
"k2": base64.b64encode(str(random.random())),
"a0": doc["configuration"].get("a0", []) + [random.random()]
}
return doc
def reset_database():
"""\
Reset the database.
"""
dbs = getserver()
# delete any existing db instance
try:
dbs.delete_db(DB_NAME)
except couchdbkit.exceptions.ResourceNotFound:
pass
# and create new empty db
dbs.create_db(DB_NAME)
dbc = getdb(dbs)
# seed database with some documents
remaining = DB_DOCS
while remaining:
make = remaining < 25 and remaining or BULK_SIZE
remaining -= make
dbc.save_docs([gendoc() for x in range(0, make)])
# create design documents
dbc.save_docs([genddoc(x) for x in range(0, DDOC_COUNT)])
def start_editors():
"""\
Create the thread classes which will connect the view/update
"""
class Editor(threading.Thread):
def __init__(self, ddoc, view, slice):
super(Editor, self).__init__(target=self.runwrap)
self.daemon = True
self.ddoc = ddoc
self.view = view
self.slice = slice
self.id = "ddoc{ddoc}/view{view}/{slice}".format(ddoc=ddoc, view=view, slice=slice)
self.dbcv = getdb()
self.dbcu = getdb()
self.stop = False
self.remaining = EDITS_PER_MAP
if random.random() < 0.8:
print("{id} is bulk operator ({n} edits scheduled)".format(id=self.id, n=self.remaining))
self.bulk = True
self.batch = []
else:
print("{id} is single operator ({n} edits scheduled)".format(id=self.id, n=self.remaining))
self.bulk = False
def runwrap(self):
try:
self.run()
print("editor {id} completed scheduled edits".format(id=self.id))
except Exception as e:
print("error in editor {id}".format(id=self.id))
finally:
self.remaining = 0
def run(self):
for v in self.dbcv.view(
"ddoc{ddoc}/map{view}".format(ddoc=self.ddoc, view=self.view),
key=str(self.slice),
reduce="false"):
#limit=UPDATE_LIMIT,
self.update(v)
if self.remaining == 0 or self.stop:
break
def update(self, v):
try:
if self.bulk:
self.batch.append(
gendoc(self.dbcu.res.post(path="_design/ddoc{ddoc}/_update/update{u}/{id}".format(ddoc=self.ddoc, u=self.view, id=v["id"]), pretend="true").json_body)
)
if len(self.batch) == BULK_SIZE:
try:
time.sleep(2.5)
self.dbcu.save_docs(self.batch)
finally:
self.batch = []
else:
self.dbcu.res.post(path="_design/ddoc{ddoc}/_update/update{u}/{id}".format(ddoc=self.ddoc, u=self.view, id=v["id"]))
except (couchdbkit.exceptions.ResourceConflict, couchdbkit.exceptions.BulkSaveError):
pass
else:
self.remaining -= 1
def stopit(self):
self.stop = True
editors = []
for ddoc in range(0, DDOC_COUNT):
for view in range(0, DDOC_MAPS):
for slice in range(0, UPDATE_SLICES):
editors.append(Editor(ddoc, view, slice))
try:
print("starting editors")
[editor.start() for editor in editors]
while any([editor.remaining for editor in editors]):
print([editor.remaining for editor in editors if editor.remaining])
print("waiting for editors to complete")
time.sleep(10)
except KeyboardInterrupt:
print("interrupting editors")
[editor.stopit() for editor in editors]
time.sleep(10)
def check_for_duplicates():
"""\
Examine the _all_docs list to see if any id is duplicated.
"""
dbc = getdb()
counter = {}
for d in dbc.all_docs():
count = counter.get(d["id"], 0)
count += 1
counter[d["id"]] = count
for (k, v) in counter.items():
if v > 1:
print((k, v))
if __name__ == "__main__":
print("resetting the database")
reset_database()
print("starting editors")
start_editors()
print("testing for duplicate ids in _all_docs")
check_for_duplicates()
@JKDingwall
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment