Last active
August 29, 2015 14:24
-
-
Save JKDingwall/db3706d4a86c1743bbb8 to your computer and use it in GitHub Desktop.
Test case for COUCHDB-2735
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2.7 | |
from __future__ import print_function | |
import base64 | |
import couchdbkit | |
import multiprocessing | |
import random | |
import restkit | |
import socketpool | |
import threading | |
import time | |
import uuid | |
DB_ADMIN = "admin" | |
DB_PASSWORD = "password" | |
DB_HOST = "127.0.0.1" | |
DB_PORT = 5984 | |
DB_NAME = "atest" | |
DB_DOCS = 12000 # number of documents to generate | |
DDOC_COUNT = 8 # how many design documents to create | |
DDOC_MAPS = 6 # how many map/update pairs on each design document | |
#EDITS_PER_MAP = 36000 / (DDOC_COUNT * DDOC_MAPS) # how many edits to make per map | |
EDITS_PER_MAP = DB_DOCS | |
UPDATE_SLICES = 6 # how many slices in each update view | |
UPDATE_LIMIT = 250 # max number of rows to retrieve in update | |
BULK_SIZE = 25 # how many documents for bulk saves | |
def getserver(): | |
"""\ | |
Provide a couchdbkit server connection. | |
""" | |
uri = "http://{host}:{port}/".format(host=DB_HOST, port=DB_PORT) | |
session_options = { | |
"retry_delay": 0.3, | |
"max_size": 3, | |
"retry_max": 3, | |
"timeout": None | |
} | |
pool = socketpool.ConnectionPool( | |
factory=restkit.conn.Connection, | |
backend="thread", **session_options | |
) | |
ri = couchdbkit.resource.CouchdbResource( | |
filters=[restkit.BasicAuth(DB_ADMIN, DB_PASSWORD)] | |
) | |
return couchdbkit.Server(uri=uri, resource_instance=ri, pool=pool) | |
def getdb(server=None): | |
"""\ | |
Provide couchdbkit database connection. | |
""" | |
if server is None: | |
server = getserver() | |
return server.get_db(DB_NAME) | |
def genddoc(num): | |
"""\ | |
Generate a design document | |
""" | |
ddoc = { | |
"_id": "_design/ddoc{x}".format(x=num), | |
"language": "javascript", | |
"views": { | |
}, | |
"updates": { | |
}, | |
"validate_doc_update": """\ | |
function(newDoc, oldDoc, userCtx, secObj) { | |
if(newDoc._deleted) { | |
return; | |
} | |
if(!newDoc.configuration.a0.length) { | |
throw { forbidden: "just because" }; | |
} | |
// pointless time wasting | |
var less = 0; | |
for(var ni in newDoc.configuration.a0) { | |
for(var oi in oldDoc.configuration.a0) { | |
if(newDoc.configuration.a0[ni] < oldDoc.configuration.a0[oi]) { | |
less += 1 | |
} | |
} | |
} | |
}""" | |
} | |
map = """\ | |
function(doc) {{ | |
emit(''+Math.round(Math.random()*{count})); | |
}}""".format(count=UPDATE_SLICES) | |
update = """\ | |
function(doc, req) { | |
doc.configuration.k0 = Math.random(); | |
doc.configuration.k1 = Math.random(); | |
doc.configuration.k2 = Math.random(); | |
doc.configuration.a0.push(Math.random()); | |
if(req.query.pretend) { | |
rsp = { | |
code: 200, | |
headers: { | |
'Content-Type': 'application/json' | |
}, | |
body: JSON.stringify(doc) | |
}; | |
return([null, rsp]); | |
} | |
else { | |
rsp = { | |
code: 200, | |
headers: { | |
'Content-Type': 'application/json' | |
}, | |
body: JSON.stringify({}) | |
}; | |
return([doc, rsp]); | |
} | |
}""" | |
for x in range(0, DDOC_MAPS): | |
ddoc["views"]["map{x}".format(x=x)] = { | |
"map": map, | |
"reduce": "_count" | |
} | |
ddoc["updates"]["update{x}".format(x=x)] = update | |
return ddoc | |
def gendoc(doc=None): | |
"""\ | |
Generate a document. | |
""" | |
doc = doc or { | |
"_id": "doc-" + str(uuid.uuid4()), | |
"configuration": { | |
"a0": [] | |
}, | |
"longblob": "#!/usr/bin/env python2.7

from __future__ import print_function

import base64
import couchdbkit
import json
import multiprocessing
import random
import restkit
import socketpool
import threading
import time
import uuid


DB_ADMIN = "admin"
DB_PASSWORD = "password"
DB_HOST = "127.0.0.1"
DB_PORT = 5985
DB_NAME = "atest"

DB_DOCS = 12000 # number of documents to generate

DDOC_COUNT = 8 # how many design documents to create
DDOC_MAPS = 6 # how many map/update pairs on each design document

#EDITS_PER_MAP = 36000 / (DDOC_COUNT * DDOC_MAPS) # how many edits to make per map
EDITS_PER_MAP = DB_DOCS
UPDATE_SLICES = 6 # how many slices in each update view
UPDATE_LIMIT = 250 # max number of rows to retrieve in update
BULK_SIZE = 25 # how many documents for bulk saves


def getserver():
    """\
    Provide a couchdbkit server connection.
    """
    uri = "http://{host}:{port}/".format(host=DB_HOST, port=DB_PORT)
    session_options = {
        "retry_delay": 0.3,
        "max_size": 3,
        "retry_max": 3,
        "timeout": None
    }
    pool = socketpool.ConnectionPool(
               factory=restkit.conn.Connection,
               backend="thread", **session_options
           )
    ri = couchdbkit.resource.CouchdbResource(
             filters=[restkit.BasicAuth(DB_ADMIN, DB_PASSWORD)]
         )

    return couchdbkit.Server(uri=uri, resource_instance=ri, pool=pool)


def getdb(server=None):
    """\
    Provide couchdbkit database connection.
    """
    if server is None:
        server = getserver()

    return server.get_db(DB_NAME)


def genddoc(num):
    """\
    Generate a design document
    """
    ddoc = {
        "_id": "_design/ddoc{x}".format(x=num),
        "language": "javascript",
        "views": {
        },
        "updates": {
        },
        "validate_doc_update": """\
function(newDoc, oldDoc, userCtx, secObj) {
    if(newDoc._deleted) {
       return;
    }

    if(!newDoc.configuration.a0.length) {
        throw { forbidden: "just because" };
    }

    // pointless time wasting
    var less = 0;
    for(var ni in newDoc.configuration.a0) {
        for(var oi in oldDoc.configuration.a0) {
            if(newDoc.configuration.a0[ni] < oldDoc.configuration.a0[oi]) {
                less += 1
            }
        }
    }
}"""
    }

    map = """\
function(doc) {{
    emit(''+Math.round(Math.random()*{count}));
}}""".format(count=UPDATE_SLICES)

    update = """\
function(doc, req) {
    doc.configuration.k0 = Math.random();
    doc.configuration.k1 = Math.random();
    doc.configuration.k2 = Math.random();
    doc.configuration.a0.push(Math.random());

    if(req.query.pretend) {
        rsp = {
            code: 200,
            headers: {
                'Content-Type': 'application/json'
            },
            body: JSON.stringify(doc)
        };

        return([null, rsp]);
    }
    else {
        rsp = {
            code: 200,
            headers: {
                'Content-Type': 'application/json'
            },
            body: JSON.stringify({})
        };

        return([doc, rsp]);
    }
}"""

    for x in range(0, DDOC_MAPS):
        ddoc["views"]["map{x}".format(x=x)] = {
            "map": map,
            "reduce": "_count"
        }

        ddoc["updates"]["update{x}".format(x=x)] = update

    return ddoc


def gendoc(doc=None):
    """\
    Generate a document.
    """
    doc = doc or {
        "_id": "doc-" + str(uuid.uuid4()),
        "configuration": {
            "a0": []
        },
        "longblob": ""
    }

    doc["garbage"] = {}
    for x in range(0, int(round(random.random() * 10))):
        xkey = base64.b64encode(str(random.random()))
        doc["garbage"][xkey] = {}
        for y in range(0, int(round(random.random() * 10))):
            doc["garbage"][xkey][base64.b64encode(str(random.random()))] = base64.b64encode(str(random.random()))
        

    doc["configuration"] = {
        "k0": base64.b64encode(str(random.random())),
        "k1": base64.b64encode(str(random.random())),
        "k2": base64.b64encode(str(random.random())),
        "a0": doc["configuration"].get("a0", []) + [random.random()]
    }

    return doc


def reset_database():
    """\
    Reset the database.
    """
    dbs = getserver()

    # delete any existing db instance
    try:
        dbs.delete_db(DB_NAME)
    except couchdbkit.exceptions.ResourceNotFound:
        pass
    # and create new empty db
    dbs.create_db(DB_NAME)

    dbc = getdb(dbs)

    # seed database with some documents
    remaining = DB_DOCS
    while remaining:
        make = remaining < 25 and remaining or BULK_SIZE
        remaining -= make

        dbc.save_docs([gendoc() for x in range(0, make)])

    # create design documents
    dbc.save_docs([genddoc(x) for x in range(0, DDOC_COUNT)])


def start_editors():
    """\
    Create the thread classes which will connect the view/update
    """

    class Editor(threading.Thread):
        def __init__(self, ddoc, view, slice):
            super(Editor, self).__init__(target=self.runwrap)
            self.daemon = True
            self.ddoc = ddoc
            self.view = view
            self.slice = slice
            self.id = "ddoc{ddoc}/view{view}/{slice}".format(ddoc=ddoc, view=view, slice=slice)
            self.dbcv = getdb()
            self.dbcu = getdb()
            self.stop = False
            self.remaining = EDITS_PER_MAP

            if random.random() < 0.8:
                print("{id} is bulk operator ({n} edits scheduled)".format(id=self.id, n=self.remaining))
                self.bulk = True
                self.batch = []
            else:
                print("{id} is single operator ({n} edits scheduled)".format(id=self.id, n=self.remaining))
                self.bulk = False

        def runwrap(self):
            try:
                self.run()
                print("editor {id} completed scheduled edits".format(id=self.id))
            except Exception as e:
                print("error in editor {id}".format(id=self.id))
            finally:
                self.remaining = 0

        def run(self):
            for v in self.dbcv.view(
                "ddoc{ddoc}/map{view}".format(ddoc=self.ddoc, view=self.view),
                key=str(self.slice),
                reduce="false"):
                #limit=UPDATE_LIMIT,

                self.update(v)

                if self.remaining == 0 or self.stop:
                    break

        def update(self, v):
            try:
                if self.bulk:
                    self.batch.append(
                        gendoc(self.dbcu.res.post(path="_design/ddoc{ddoc}/_update/update{u}/{id}".format(ddoc=self.ddoc, u=self.view, id=v["id"]), pretend="true").json_body)
                    )
                    if len(self.batch) == BULK_SIZE:
                        try:
                            time.sleep(2.5)
                            self.dbcu.save_docs(self.batch)
                        finally:
                            self.batch = []
                else:
                    self.dbcu.res.post(path="_design/ddoc{ddoc}/_update/update{u}/{id}".format(ddoc=self.ddoc, u=self.view, id=v["id"]))
            except (couchdbkit.exceptions.ResourceConflict, couchdbkit.exceptions.BulkSaveError):
                pass
            else:
                self.remaining -= 1


        def stopit(self):
            self.stop = True


    editors = []
    for ddoc in range(0, DDOC_COUNT):
        for view in range(0, DDOC_MAPS):
            for slice in range(0, UPDATE_SLICES):
                editors.append(Editor(ddoc, view, slice))

    try:
        print("starting editors")
        [editor.start() for editor in editors]
        while any([editor.remaining for editor in editors]):
            print([editor.remaining for editor in editors if editor.remaining])
            print("waiting for editors to complete")
            time.sleep(10)
    except KeyboardInterrupt:
        print("interrupting editors")
        [editor.stopit() for editor in editors]
        time.sleep(10)


def check_for_duplicates():
    """\
    Examine the _all_docs list to see if any id is duplicated.
    """
    dbc = getdb()
    counter = {}
    for d in dbc.all_docs():
        count = counter.get(d["id"], 0)
        count += 1
        counter[d["id"]] = count

    for (k, v) in counter.items():
        if v > 1:
            print(k)        


if __name__ == "__main__":
    print("resetting the database")
    reset_database()
    print("starting editors")
    start_editors()
    print("testing for duplicate ids in _all_docs")
    check_for_duplicates()
" | |
} | |
doc["garbage"] = {} | |
for x in range(0, int(round(random.random() * 10))): | |
xkey = base64.b64encode(str(random.random())) | |
doc["garbage"][xkey] = {} | |
for y in range(0, int(round(random.random() * 10))): | |
doc["garbage"][xkey][base64.b64encode(str(random.random()))] = base64.b64encode(str(random.random())) | |
doc["configuration"] = { | |
"k0": base64.b64encode(str(random.random())), | |
"k1": base64.b64encode(str(random.random())), | |
"k2": base64.b64encode(str(random.random())), | |
"a0": doc["configuration"].get("a0", []) + [random.random()] | |
} | |
return doc | |
def reset_database(): | |
"""\ | |
Reset the database. | |
""" | |
dbs = getserver() | |
# delete any existing db instance | |
try: | |
dbs.delete_db(DB_NAME) | |
except couchdbkit.exceptions.ResourceNotFound: | |
pass | |
# and create new empty db | |
dbs.create_db(DB_NAME) | |
dbc = getdb(dbs) | |
# seed database with some documents | |
remaining = DB_DOCS | |
while remaining: | |
make = remaining < 25 and remaining or BULK_SIZE | |
remaining -= make | |
dbc.save_docs([gendoc() for x in range(0, make)]) | |
# create design documents | |
dbc.save_docs([genddoc(x) for x in range(0, DDOC_COUNT)]) | |
def start_editors(): | |
"""\ | |
Create the thread classes which will connect the view/update | |
""" | |
class Editor(threading.Thread): | |
def __init__(self, ddoc, view, slice): | |
super(Editor, self).__init__(target=self.runwrap) | |
self.daemon = True | |
self.ddoc = ddoc | |
self.view = view | |
self.slice = slice | |
self.id = "ddoc{ddoc}/view{view}/{slice}".format(ddoc=ddoc, view=view, slice=slice) | |
self.dbcv = getdb() | |
self.dbcu = getdb() | |
self.stop = False | |
self.remaining = EDITS_PER_MAP | |
if random.random() < 0.8: | |
print("{id} is bulk operator ({n} edits scheduled)".format(id=self.id, n=self.remaining)) | |
self.bulk = True | |
self.batch = [] | |
else: | |
print("{id} is single operator ({n} edits scheduled)".format(id=self.id, n=self.remaining)) | |
self.bulk = False | |
def runwrap(self): | |
try: | |
self.run() | |
print("editor {id} completed scheduled edits".format(id=self.id)) | |
except Exception as e: | |
print("error in editor {id}".format(id=self.id)) | |
finally: | |
self.remaining = 0 | |
def run(self): | |
for v in self.dbcv.view( | |
"ddoc{ddoc}/map{view}".format(ddoc=self.ddoc, view=self.view), | |
key=str(self.slice), | |
reduce="false"): | |
#limit=UPDATE_LIMIT, | |
self.update(v) | |
if self.remaining == 0 or self.stop: | |
break | |
def update(self, v): | |
try: | |
if self.bulk: | |
self.batch.append( | |
gendoc(self.dbcu.res.post(path="_design/ddoc{ddoc}/_update/update{u}/{id}".format(ddoc=self.ddoc, u=self.view, id=v["id"]), pretend="true").json_body) | |
) | |
if len(self.batch) == BULK_SIZE: | |
try: | |
time.sleep(2.5) | |
self.dbcu.save_docs(self.batch) | |
finally: | |
self.batch = [] | |
else: | |
self.dbcu.res.post(path="_design/ddoc{ddoc}/_update/update{u}/{id}".format(ddoc=self.ddoc, u=self.view, id=v["id"])) | |
except (couchdbkit.exceptions.ResourceConflict, couchdbkit.exceptions.BulkSaveError): | |
pass | |
else: | |
self.remaining -= 1 | |
def stopit(self): | |
self.stop = True | |
editors = [] | |
for ddoc in range(0, DDOC_COUNT): | |
for view in range(0, DDOC_MAPS): | |
for slice in range(0, UPDATE_SLICES): | |
editors.append(Editor(ddoc, view, slice)) | |
try: | |
print("starting editors") | |
[editor.start() for editor in editors] | |
while any([editor.remaining for editor in editors]): | |
print([editor.remaining for editor in editors if editor.remaining]) | |
print("waiting for editors to complete") | |
time.sleep(10) | |
except KeyboardInterrupt: | |
print("interrupting editors") | |
[editor.stopit() for editor in editors] | |
time.sleep(10) | |
def check_for_duplicates(): | |
"""\ | |
Examine the _all_docs list to see if any id is duplicated. | |
""" | |
dbc = getdb() | |
counter = {} | |
for d in dbc.all_docs(): | |
count = counter.get(d["id"], 0) | |
count += 1 | |
counter[d["id"]] = count | |
for (k, v) in counter.items(): | |
if v > 1: | |
print((k, v)) | |
if __name__ == "__main__": | |
print("resetting the database") | |
reset_database() | |
print("starting editors") | |
start_editors() | |
print("testing for duplicate ids in _all_docs") | |
check_for_duplicates() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Produced for https://issues.apache.org/jira/browse/COUCHDB-2735