Skip to content

Instantly share code, notes, and snippets.

@JKDingwall
Last active August 29, 2015 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JKDingwall/db3706d4a86c1743bbb8 to your computer and use it in GitHub Desktop.
Save JKDingwall/db3706d4a86c1743bbb8 to your computer and use it in GitHub Desktop.
Test case for COUCHDB-2735
#!/usr/bin/env python2.7
from __future__ import print_function
import base64
import couchdbkit
import multiprocessing
import random
import restkit
import socketpool
import threading
import time
import uuid
DB_ADMIN = "admin"
DB_PASSWORD = "password"
DB_HOST = "127.0.0.1"
DB_PORT = 5984
DB_NAME = "atest"
DB_DOCS = 12000 # number of documents to generate
DDOC_COUNT = 8 # how many design documents to create
DDOC_MAPS = 6 # how many map/update pairs on each design document
#EDITS_PER_MAP = 36000 / (DDOC_COUNT * DDOC_MAPS) # how many edits to make per map
EDITS_PER_MAP = DB_DOCS
UPDATE_SLICES = 6 # how many slices in each update view
UPDATE_LIMIT = 250 # max number of rows to retrieve in update
BULK_SIZE = 25 # how many documents for bulk saves
def getserver():
"""\
Provide a couchdbkit server connection.
"""
uri = "http://{host}:{port}/".format(host=DB_HOST, port=DB_PORT)
session_options = {
"retry_delay": 0.3,
"max_size": 3,
"retry_max": 3,
"timeout": None
}
pool = socketpool.ConnectionPool(
factory=restkit.conn.Connection,
backend="thread", **session_options
)
ri = couchdbkit.resource.CouchdbResource(
filters=[restkit.BasicAuth(DB_ADMIN, DB_PASSWORD)]
)
return couchdbkit.Server(uri=uri, resource_instance=ri, pool=pool)
def getdb(server=None):
"""\
Provide couchdbkit database connection.
"""
if server is None:
server = getserver()
return server.get_db(DB_NAME)
def genddoc(num):
"""\
Generate a design document
"""
ddoc = {
"_id": "_design/ddoc{x}".format(x=num),
"language": "javascript",
"views": {
},
"updates": {
},
"validate_doc_update": """\
function(newDoc, oldDoc, userCtx, secObj) {
if(newDoc._deleted) {
return;
}
if(!newDoc.configuration.a0.length) {
throw { forbidden: "just because" };
}
// pointless time wasting
var less = 0;
for(var ni in newDoc.configuration.a0) {
for(var oi in oldDoc.configuration.a0) {
if(newDoc.configuration.a0[ni] < oldDoc.configuration.a0[oi]) {
less += 1
}
}
}
}"""
}
map = """\
function(doc) {{
emit(''+Math.round(Math.random()*{count}));
}}""".format(count=UPDATE_SLICES)
update = """\
function(doc, req) {
doc.configuration.k0 = Math.random();
doc.configuration.k1 = Math.random();
doc.configuration.k2 = Math.random();
doc.configuration.a0.push(Math.random());
if(req.query.pretend) {
rsp = {
code: 200,
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(doc)
};
return([null, rsp]);
}
else {
rsp = {
code: 200,
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({})
};
return([doc, rsp]);
}
}"""
for x in range(0, DDOC_MAPS):
ddoc["views"]["map{x}".format(x=x)] = {
"map": map,
"reduce": "_count"
}
ddoc["updates"]["update{x}".format(x=x)] = update
return ddoc
def gendoc(doc=None):
"""\
Generate a document.
"""
doc = doc or {
"_id": "doc-" + str(uuid.uuid4()),
"configuration": {
"a0": []
},
"longblob": "#!/usr/bin/env python2.7

from __future__ import print_function

import base64
import couchdbkit
import json
import multiprocessing
import random
import restkit
import socketpool
import threading
import time
import uuid


DB_ADMIN = "admin"
DB_PASSWORD = "password"
DB_HOST = "127.0.0.1"
DB_PORT = 5985
DB_NAME = "atest"

DB_DOCS = 12000 # number of documents to generate

DDOC_COUNT = 8 # how many design documents to create
DDOC_MAPS = 6 # how many map/update pairs on each design document

#EDITS_PER_MAP = 36000 / (DDOC_COUNT * DDOC_MAPS) # how many edits to make per map
EDITS_PER_MAP = DB_DOCS
UPDATE_SLICES = 6 # how many slices in each update view
UPDATE_LIMIT = 250 # max number of rows to retrieve in update
BULK_SIZE = 25 # how many documents for bulk saves


def getserver():
    """\
    Provide a couchdbkit server connection.
    """
    uri = "http://{host}:{port}/".format(host=DB_HOST, port=DB_PORT)
    session_options = {
        "retry_delay": 0.3,
        "max_size": 3,
        "retry_max": 3,
        "timeout": None
    }
    pool = socketpool.ConnectionPool(
               factory=restkit.conn.Connection,
               backend="thread", **session_options
           )
    ri = couchdbkit.resource.CouchdbResource(
             filters=[restkit.BasicAuth(DB_ADMIN, DB_PASSWORD)]
         )

    return couchdbkit.Server(uri=uri, resource_instance=ri, pool=pool)


def getdb(server=None):
    """\
    Provide couchdbkit database connection.
    """
    if server is None:
        server = getserver()

    return server.get_db(DB_NAME)


def genddoc(num):
    """\
    Generate a design document
    """
    ddoc = {
        "_id": "_design/ddoc{x}".format(x=num),
        "language": "javascript",
        "views": {
        },
        "updates": {
        },
        "validate_doc_update": """\
function(newDoc, oldDoc, userCtx, secObj) {
    if(newDoc._deleted) {
       return;
    }

    if(!newDoc.configuration.a0.length) {
        throw { forbidden: "just because" };
    }

    // pointless time wasting
    var less = 0;
    for(var ni in newDoc.configuration.a0) {
        for(var oi in oldDoc.configuration.a0) {
            if(newDoc.configuration.a0[ni] < oldDoc.configuration.a0[oi]) {
                less += 1
            }
        }
    }
}"""
    }

    map = """\
function(doc) {{
    emit(''+Math.round(Math.random()*{count}));
}}""".format(count=UPDATE_SLICES)

    update = """\
function(doc, req) {
    doc.configuration.k0 = Math.random();
    doc.configuration.k1 = Math.random();
    doc.configuration.k2 = Math.random();
    doc.configuration.a0.push(Math.random());

    if(req.query.pretend) {
        rsp = {
            code: 200,
            headers: {
                'Content-Type': 'application/json'
            },
            body: JSON.stringify(doc)
        };

        return([null, rsp]);
    }
    else {
        rsp = {
            code: 200,
            headers: {
                'Content-Type': 'application/json'
            },
            body: JSON.stringify({})
        };

        return([doc, rsp]);
    }
}"""

    for x in range(0, DDOC_MAPS):
        ddoc["views"]["map{x}".format(x=x)] = {
            "map": map,
            "reduce": "_count"
        }

        ddoc["updates"]["update{x}".format(x=x)] = update

    return ddoc


def gendoc(doc=None):
    """\
    Generate a document.
    """
    doc = doc or {
        "_id": "doc-" + str(uuid.uuid4()),
        "configuration": {
            "a0": []
        },
        "longblob": ""
    }

    doc["garbage"] = {}
    for x in range(0, int(round(random.random() * 10))):
        xkey = base64.b64encode(str(random.random()))
        doc["garbage"][xkey] = {}
        for y in range(0, int(round(random.random() * 10))):
            doc["garbage"][xkey][base64.b64encode(str(random.random()))] = base64.b64encode(str(random.random()))
        

    doc["configuration"] = {
        "k0": base64.b64encode(str(random.random())),
        "k1": base64.b64encode(str(random.random())),
        "k2": base64.b64encode(str(random.random())),
        "a0": doc["configuration"].get("a0", []) + [random.random()]
    }

    return doc


def reset_database():
    """\
    Reset the database.
    """
    dbs = getserver()

    # delete any existing db instance
    try:
        dbs.delete_db(DB_NAME)
    except couchdbkit.exceptions.ResourceNotFound:
        pass
    # and create new empty db
    dbs.create_db(DB_NAME)

    dbc = getdb(dbs)

    # seed database with some documents
    remaining = DB_DOCS
    while remaining:
        make = remaining < 25 and remaining or BULK_SIZE
        remaining -= make

        dbc.save_docs([gendoc() for x in range(0, make)])

    # create design documents
    dbc.save_docs([genddoc(x) for x in range(0, DDOC_COUNT)])


def start_editors():
    """\
    Create the thread classes which will connect the view/update
    """

    class Editor(threading.Thread):
        def __init__(self, ddoc, view, slice):
            super(Editor, self).__init__(target=self.runwrap)
            self.daemon = True
            self.ddoc = ddoc
            self.view = view
            self.slice = slice
            self.id = "ddoc{ddoc}/view{view}/{slice}".format(ddoc=ddoc, view=view, slice=slice)
            self.dbcv = getdb()
            self.dbcu = getdb()
            self.stop = False
            self.remaining = EDITS_PER_MAP

            if random.random() < 0.8:
                print("{id} is bulk operator ({n} edits scheduled)".format(id=self.id, n=self.remaining))
                self.bulk = True
                self.batch = []
            else:
                print("{id} is single operator ({n} edits scheduled)".format(id=self.id, n=self.remaining))
                self.bulk = False

        def runwrap(self):
            try:
                self.run()
                print("editor {id} completed scheduled edits".format(id=self.id))
            except Exception as e:
                print("error in editor {id}".format(id=self.id))
            finally:
                self.remaining = 0

        def run(self):
            for v in self.dbcv.view(
                "ddoc{ddoc}/map{view}".format(ddoc=self.ddoc, view=self.view),
                key=str(self.slice),
                reduce="false"):
                #limit=UPDATE_LIMIT,

                self.update(v)

                if self.remaining == 0 or self.stop:
                    break

        def update(self, v):
            try:
                if self.bulk:
                    self.batch.append(
                        gendoc(self.dbcu.res.post(path="_design/ddoc{ddoc}/_update/update{u}/{id}".format(ddoc=self.ddoc, u=self.view, id=v["id"]), pretend="true").json_body)
                    )
                    if len(self.batch) == BULK_SIZE:
                        try:
                            time.sleep(2.5)
                            self.dbcu.save_docs(self.batch)
                        finally:
                            self.batch = []
                else:
                    self.dbcu.res.post(path="_design/ddoc{ddoc}/_update/update{u}/{id}".format(ddoc=self.ddoc, u=self.view, id=v["id"]))
            except (couchdbkit.exceptions.ResourceConflict, couchdbkit.exceptions.BulkSaveError):
                pass
            else:
                self.remaining -= 1


        def stopit(self):
            self.stop = True


    editors = []
    for ddoc in range(0, DDOC_COUNT):
        for view in range(0, DDOC_MAPS):
            for slice in range(0, UPDATE_SLICES):
                editors.append(Editor(ddoc, view, slice))

    try:
        print("starting editors")
        [editor.start() for editor in editors]
        while any([editor.remaining for editor in editors]):
            print([editor.remaining for editor in editors if editor.remaining])
            print("waiting for editors to complete")
            time.sleep(10)
    except KeyboardInterrupt:
        print("interrupting editors")
        [editor.stopit() for editor in editors]
        time.sleep(10)


def check_for_duplicates():
    """\
    Examine the _all_docs list to see if any id is duplicated.
    """
    dbc = getdb()
    counter = {}
    for d in dbc.all_docs():
        count = counter.get(d["id"], 0)
        count += 1
        counter[d["id"]] = count

    for (k, v) in counter.items():
        if v > 1:
            print(k)        


if __name__ == "__main__":
    print("resetting the database")
    reset_database()
    print("starting editors")
    start_editors()
    print("testing for duplicate ids in _all_docs")
    check_for_duplicates()
"
}
doc["garbage"] = {}
for x in range(0, int(round(random.random() * 10))):
xkey = base64.b64encode(str(random.random()))
doc["garbage"][xkey] = {}
for y in range(0, int(round(random.random() * 10))):
doc["garbage"][xkey][base64.b64encode(str(random.random()))] = base64.b64encode(str(random.random()))
doc["configuration"] = {
"k0": base64.b64encode(str(random.random())),
"k1": base64.b64encode(str(random.random())),
"k2": base64.b64encode(str(random.random())),
"a0": doc["configuration"].get("a0", []) + [random.random()]
}
return doc
def reset_database():
"""\
Reset the database.
"""
dbs = getserver()
# delete any existing db instance
try:
dbs.delete_db(DB_NAME)
except couchdbkit.exceptions.ResourceNotFound:
pass
# and create new empty db
dbs.create_db(DB_NAME)
dbc = getdb(dbs)
# seed database with some documents
remaining = DB_DOCS
while remaining:
make = remaining < 25 and remaining or BULK_SIZE
remaining -= make
dbc.save_docs([gendoc() for x in range(0, make)])
# create design documents
dbc.save_docs([genddoc(x) for x in range(0, DDOC_COUNT)])
def start_editors():
"""\
Create the thread classes which will connect the view/update
"""
class Editor(threading.Thread):
def __init__(self, ddoc, view, slice):
super(Editor, self).__init__(target=self.runwrap)
self.daemon = True
self.ddoc = ddoc
self.view = view
self.slice = slice
self.id = "ddoc{ddoc}/view{view}/{slice}".format(ddoc=ddoc, view=view, slice=slice)
self.dbcv = getdb()
self.dbcu = getdb()
self.stop = False
self.remaining = EDITS_PER_MAP
if random.random() < 0.8:
print("{id} is bulk operator ({n} edits scheduled)".format(id=self.id, n=self.remaining))
self.bulk = True
self.batch = []
else:
print("{id} is single operator ({n} edits scheduled)".format(id=self.id, n=self.remaining))
self.bulk = False
def runwrap(self):
try:
self.run()
print("editor {id} completed scheduled edits".format(id=self.id))
except Exception as e:
print("error in editor {id}".format(id=self.id))
finally:
self.remaining = 0
def run(self):
for v in self.dbcv.view(
"ddoc{ddoc}/map{view}".format(ddoc=self.ddoc, view=self.view),
key=str(self.slice),
reduce="false"):
#limit=UPDATE_LIMIT,
self.update(v)
if self.remaining == 0 or self.stop:
break
def update(self, v):
try:
if self.bulk:
self.batch.append(
gendoc(self.dbcu.res.post(path="_design/ddoc{ddoc}/_update/update{u}/{id}".format(ddoc=self.ddoc, u=self.view, id=v["id"]), pretend="true").json_body)
)
if len(self.batch) == BULK_SIZE:
try:
time.sleep(2.5)
self.dbcu.save_docs(self.batch)
finally:
self.batch = []
else:
self.dbcu.res.post(path="_design/ddoc{ddoc}/_update/update{u}/{id}".format(ddoc=self.ddoc, u=self.view, id=v["id"]))
except (couchdbkit.exceptions.ResourceConflict, couchdbkit.exceptions.BulkSaveError):
pass
else:
self.remaining -= 1
def stopit(self):
self.stop = True
editors = []
for ddoc in range(0, DDOC_COUNT):
for view in range(0, DDOC_MAPS):
for slice in range(0, UPDATE_SLICES):
editors.append(Editor(ddoc, view, slice))
try:
print("starting editors")
[editor.start() for editor in editors]
while any([editor.remaining for editor in editors]):
print([editor.remaining for editor in editors if editor.remaining])
print("waiting for editors to complete")
time.sleep(10)
except KeyboardInterrupt:
print("interrupting editors")
[editor.stopit() for editor in editors]
time.sleep(10)
def check_for_duplicates():
"""\
Examine the _all_docs list to see if any id is duplicated.
"""
dbc = getdb()
counter = {}
for d in dbc.all_docs():
count = counter.get(d["id"], 0)
count += 1
counter[d["id"]] = count
for (k, v) in counter.items():
if v > 1:
print((k, v))
if __name__ == "__main__":
print("resetting the database")
reset_database()
print("starting editors")
start_editors()
print("testing for duplicate ids in _all_docs")
check_for_duplicates()
@JKDingwall
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment