Skip to content

Instantly share code, notes, and snippets.

@chewbranca
Created June 17, 2014 21:59
Show Gist options
  • Save chewbranca/9117f0871477b88b022a to your computer and use it in GitHub Desktop.
Save chewbranca/9117f0871477b88b022a to your computer and use it in GitHub Desktop.
Rough script to test migrating a file from CouchDB --> BigCouch
#!/usr/bin/env python
import os
import sys
import json
import time
import shutil
import inspect
import requests
couch_host = "http://localhost:5984"
clustered_host = "http://localhost:15984"
single_node_host = "http://localhost:15986"
parent_auth = ()
DEBUG = True
dbname = "test_couch_files"
shard_range = "00000000-ffffffff"
shard_suffix = ".1401320900"
shard_suffix_arr = [46, 49, 52, 48, 49, 51, 50, 48, 57, 48, 48]
src_dir = "/Users/russell/src"
couch_db_path = src_dir + "/couchdb/tmp/lib"
def node_db_path(i=1):
return "{0}/bigcouch/dev/lib/node{1}/data".format(src_dir, i)
def node_shards_path(i=1, s_range=None):
shards_path = node_db_path(i)
if s_range is not None:
shards_path += "/shards/" + s_range
return shards_path
def node_shard_path(dbname, i=1):
shards_path = node_shards_path(i, shard_range)
return "{0}/{1}{2}.couch".format(shards_path, dbname, shard_suffix)
def http(method, url, data=None, auth=None, headers=None, assertion=None):
meth = getattr(requests, method)
if headers is None:
headers = {"Content-Type": "application/json"}
if data is not None:
data = json.dumps(data)
if auth is None:
auth = parent_auth
log("HTTP[{0}]: {1}".format(method, url))
resp = meth(url, data=data, auth=auth, headers=headers)
if assertion is not None:
name = auth[0] if len(auth) == 2 else "undefined"
msg = "ERROR IN {5}: {0} {1} - {6} || expected ({2}), but received ({3}): {4}".format(
method.upper(), url, assertion, resp.status_code, resp.json(), func_to_db(), name)
assert (assertion == resp.status_code), msg
return resp.status_code, resp.json()
def log(msg):
if DEBUG:
print "[DEBUG::{0}] {1}".format(func_to_db(), msg)
def func_to_db():
return inspect.stack()[2][3]
def gen_test_db():
dbname = func_to_db()
# return create_db(dbname)
return dbname
def passed_test():
log("{0} passed".format(func_to_db()))
def delete_db(db):
return http('delete', db)
def create_db(db, cycle=True):
if cycle: delete_db(db)
http('put', db, assertion=201)
def save_ddoc(url, ddoc):
code, _resp = http('put', url, data=ddoc)
print "Saving ddoc({0}): {1}".format(code, url)
def delete_doc(url, force=False):
status, resp = http('get', url)
if status == 404 and not force:
return True
elif status == 200:
rev = resp['_rev']
http('delete', url + '?rev=' + rev, assertion=200)
return True
else:
raise Exception("Error deleting doc({0}): {1}".format(status, resp))
def dbs_doc(dbname):
return {
"by_range": {
shard_range : [
"node1@127.0.0.1",
"node2@127.0.0.1",
"node3@127.0.0.1"
]
},
"by_node": {
"node1@127.0.0.1": [shard_range],
"node2@127.0.0.1": [shard_range],
"node3@127.0.0.1": [shard_range]
},
"shard_suffix": shard_suffix_arr,
"_id": dbname
}
def file_exists(filename):
return os.path.exists(filename)
def get_db_urls(dbname):
host_db = "{0}/{1}".format(couch_host, dbname)
single_node_db = "{0}/{1}".format(single_node_host, dbname)
clustered_db = "{0}/{1}".format(clustered_host, dbname)
return (host_db, single_node_db, clustered_db)
def get_dbs_db_url(dbname):
return "{0}/dbs/{1}".format(single_node_host, dbname)
# mkdir -p dev/lib/node{1,2,3}/data/shards/00000000-ffffffff
def get_db_paths(dbname):
host_file = "{0}/{1}.couch".format(couch_db_path, dbname)
single_node_file = "{0}/{1}.couch".format(node_db_path(), dbname)
clustered = [node_shard_path(dbname, i) for i in [1,2,3]]
return (host_file, single_node_file, clustered)
def test_single_node():
dbname = gen_test_db()
host_db, single_node_db, _clustered_db = get_db_urls(dbname)
create_db(host_db)
host_file, single_node_file, _clustered_files = get_db_paths(dbname)
assert(file_exists(host_file))
delete_db(single_node_db)
assert(not file_exists(single_node_file))
log("Copy db file: cp {0} {1}".format(host_file, single_node_file))
shutil.copy(host_file, single_node_file)
assert(file_exists(single_node_file))
status, resp = http('get', single_node_db, assertion=200)
log("RESP[{0}]: {1}".format(status, resp))
assert(0 == resp['update_seq'])
assert(0 == resp['doc_count'])
def test_single_node_with_doc ():
dbname = gen_test_db()
host_db, single_node_db, _clustered_db = get_db_urls(dbname)
doc_id = "asdf"
create_db(host_db)
http('put', host_db + "/" + doc_id, data={"foo":"bar"}, assertion=201)
host_file, single_node_file, _clustered_files = get_db_paths(dbname)
assert(file_exists(host_file))
delete_db(single_node_db)
assert(not file_exists(single_node_file))
stats = os.stat(host_file)
log("HOST FILE STATS({2}) -- [{0}]: {1}".format(stats.st_size, stats, host_file))
log("Copy db file: cp {0} {1}".format(host_file, single_node_file))
shutil.copy(host_file, single_node_file)
assert(file_exists(single_node_file))
stats = os.stat(single_node_file)
log("FILE STATS[{0}]: {1}".format(stats.st_size, stats))
status, resp = http('get', single_node_db, assertion=200)
log("RESP[{0}]: {1}".format(status, resp))
assert(1 == resp['update_seq'])
assert(1 == resp['doc_count'])
status, resp = http('get', single_node_db + "/" + doc_id, assertion=200)
log("RESP B[{0}]: {1}".format(status, resp))
assert(resp["_id"] == doc_id)
assert(resp["foo"] == "bar")
def test_clustered_node():
dbname = gen_test_db()
host_db, single_node_db, clustered_db = get_db_urls(dbname)
dbs_db_doc_url = get_dbs_db_url(dbname)
dbs_db_doc = dbs_doc(dbname)
create_db(host_db)
host_file, _single_node_file, clustered_files = get_db_paths(dbname)
log("CLUSTERED FILES: {}".format(clustered_files))
assert(file_exists(host_file))
# delete_db(single_node_db)
delete_doc(dbs_db_doc_url)
http('put', dbs_db_doc_url, data=dbs_db_doc, assertion=201)
# assert(not file_exists(single_node_file))
log("Copy db file: cp {0} {1}".format(host_file, clustered_files))
[shutil.copy(host_file, f) for f in clustered_files]
for f in clustered_files:
assert(file_exists(f))
status, resp = http('get', clustered_db, assertion=200)
log("RESP[{0}]: {1}".format(status, resp))
[num, extra] = resp['update_seq']
assert(0 == num)
assert(len(extra) > 0)
assert(0 == resp['doc_count'])
def test_clustered_node_with_doc():
dbname = gen_test_db()
host_db, single_node_db, clustered_db = get_db_urls(dbname)
doc_id = "asdf"
dbs_db_doc_url = get_dbs_db_url(dbname)
dbs_db_doc = dbs_doc(dbname)
create_db(host_db)
http('put', host_db + "/" + doc_id, data={"foo":"bar"}, assertion=201)
host_file, _single_node_file, clustered_files = get_db_paths(dbname)
log("CLUSTERED FILES: {}".format(clustered_files))
assert(file_exists(host_file))
delete_db(clustered_db)
# delete_doc(dbs_db_doc_url)
# http('put', dbs_db_doc_url, data=dbs_db_doc, assertion=201)
# assert(not file_exists(single_node_file))
log("Copy db file: cp {0} {1}".format(host_file, clustered_files))
# for f in clustered_files:
# assert(not file_exists(f))
[shutil.copy(host_file, f) for f in clustered_files]
for f in clustered_files:
assert(file_exists(f))
time.sleep(2)
http('put', dbs_db_doc_url, data=dbs_db_doc, assertion=201)
status, resp = http('get', clustered_db, assertion=200)
log("RESP[{0}]: {1}".format(status, resp))
[num, extra] = resp['update_seq']
assert(1 == num)
assert(len(extra) > 0)
assert(1 == resp['doc_count'])
def run_test(name, test):
log("Running test: {}".format(name))
test()
def main(test=None):
print "Running tests({})".format(test)
if test is None:
attrs = inspect.getmembers(sys.modules[__name__])
[run_test(n, t) for n, t in attrs if "test_" == n[0:5]]
else:
func = getattr(sys.modules[__name__], test)
run_test(test, func)
if __name__ == "__main__":
if len(sys.argv) == 2:
main(sys.argv[1])
else:
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment