-
-
Save nickva/cf9cac975ceb062b61872f7857a0e104 to your computer and use it in GitHub Desktop.
CouchDB Podman Compose
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# podman-compose up --build | |
version: '3.5' | |
services: | |
couchdb1: | |
image: couchdb:latest | |
container_name: couchdb1 | |
environment: | |
- "COUCHDB_USER=${COUCHDB_USER:-adm}" | |
- "COUCHDB_PASSWORD=${COUCHDB_PASSWORD:-pass}" | |
- "COUCHDB_SECRET=${COUCHDB_SECRET:-123456}" | |
- "COUCHDB_ERLANG_COOKIE=${COUCHDB_ERLANG_COOKIE:-thecookie}" | |
- "NODENAME=10.0.0.11" | |
networks: | |
couchdb-cluster: | |
ipv4_address: 10.0.0.11 | |
couchdb2: | |
image: couchdb:latest | |
container_name: couchdb2 | |
environment: | |
- "COUCHDB_USER=${COUCHDB_USER:-adm}" | |
- "COUCHDB_PASSWORD=${COUCHDB_PASSWORD:-pass}" | |
- "COUCHDB_SECRET=${COUCHDB_SECRET:-123456}" | |
- "COUCHDB_ERLANG_COOKIE=${COUCHDB_ERLANG_COOKIE:-thecookie}" | |
- "NODENAME=10.0.0.12" | |
networks: | |
couchdb-cluster: | |
ipv4_address: 10.0.0.12 | |
couchdb3: | |
image: couchdb:latest | |
container_name: couchdb3 | |
environment: | |
- "COUCHDB_USER=${COUCHDB_USER:-adm}" | |
- "COUCHDB_PASSWORD=${COUCHDB_PASSWORD:-pass}" | |
- "COUCHDB_SECRET=${COUCHDB_SECRET:-123456}" | |
- "COUCHDB_ERLANG_COOKIE=${COUCHDB_ERLANG_COOKIE:-thecookie}" | |
- "NODENAME=10.0.0.13" | |
networks: | |
couchdb-cluster: | |
ipv4_address: 10.0.0.13 | |
workload: | |
image: python:3.11-slim-bullseye | |
container_name: workload | |
build: . | |
command: >- | |
python workload.py | |
-u http://10.0.0.11:5984 | |
-u http://10.0.0.12:5984 | |
-u http://10.0.0.13:5984 | |
-s couchdb@10.0.0.11 | |
-s couchdb@10.0.0.12 | |
-s couchdb@10.0.0.13 | |
-n 10000 | |
-w 6 | |
-t 4 | |
-k | |
depends_on: | |
- couchdb1 | |
- couchdb2 | |
- couchdb3 | |
network: | |
couchdb-cluster: | |
ipv4_address: 10.0.0.14 | |
networks: | |
couchdb-cluster: | |
driver: bridge | |
ipam: | |
config: | |
- subnet: 10.0.0.0/24 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM python:3.11-slim-bullseye | |
RUN pip install --no-cache-dir requests | |
COPY . . |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# ./workload.py -x 2 -n 10 -t 2 -w 10 | |
import copy | |
import sys | |
import time | |
import threading | |
import os | |
import argparse | |
import uuid | |
import traceback | |
import random | |
import requests | |
from requests.exceptions import HTTPError | |
from requests.adapters import HTTPAdapter, Retry | |
import multiprocessing as mp | |
from multiprocessing.dummy import Pool as ThreadPool | |
from multiprocessing import Pool | |
BATCH = 1000 | |
VIEW_BATCH = 500 | |
# Revs limit will be lowered and increased between these values | |
MIN_REVS_LIMIT=900 | |
MAX_REVS_LIMIT=1000 | |
# Q the default is 2 | |
Q = 8 | |
# When using a cluster N=#nodes | |
N = 3 | |
USER='adm' | |
PASS='pass' | |
VIEW = 'v1' | |
WITH_REDUCE = False | |
TIMEOUT = 60 | |
REQUESTS_RETRY_COUNT = 9 | |
REQUESTS_RETRY_CODES = [500, 502, 503, 504] | |
REQUESTS_RETRY_BACKOFF_FACTOR = 0.1 | |
CONFIG = { | |
'couchdb':{ | |
'max_dbs_open': '5000' | |
}, | |
'cluster':{ | |
'q': str(Q), | |
'n': str(N) | |
}, | |
'log': { | |
'level': 'notice' | |
}, | |
'query_server_config': { | |
'os_process_timeout': 'infinity', | |
'os_process_limit': '500' | |
} | |
} | |
SETUP_NODES = [ | |
'couchdb@10.0.0.11', | |
'couchdb@10.0.0.12', | |
'couchdb@10.0.0.13' | |
] | |
DB_URLS = [ | |
'http://10.0.0.11:5984', | |
'http://10.0.0.12:5984', | |
'http://10.0.0.13:5984' | |
] | |
DB_NAME = 'db' | |
def log(*args): | |
sargs = [] | |
for a in args: | |
try: | |
sargs.append(str(a)) | |
except: | |
sargs.append('?') | |
msg = " ".join(sargs) | |
sys.stderr.write(msg + '\n') | |
sys.stderr.flush() | |
def pick_server(urls): | |
if isinstance(urls, list): | |
return random.choice(urls) | |
return urls | |
def get_design_doc(docnum, key): | |
ddoc = { | |
"_id": "_design/%s" % docnum, | |
"autoupdate": False, | |
"views": {VIEW: {"map": 'function(d){emit(1,1);}'}} | |
} | |
if WITH_REDUCE: | |
ddoc['views'][VIEW]['reduce'] = ''' | |
function(keys, values, rereduce) { | |
if (rereduce) { | |
return sum(values); | |
} else { | |
return 2 * values.length; | |
} | |
} | |
''' | |
return ddoc | |
class Server: | |
def __init__(self, args, url, timeout=TIMEOUT): | |
self.sess = requests.Session() | |
retries = Retry( | |
total = REQUESTS_RETRY_COUNT, | |
backoff_factor = REQUESTS_RETRY_BACKOFF_FACTOR, | |
status_forcelist = REQUESTS_RETRY_CODES | |
) | |
self.sess.mount('http://', HTTPAdapter(max_retries = retries)) | |
self.sess.auth = args.auth | |
self.url = url.rstrip('/') | |
self.timeout = timeout | |
def _apply_timeout(self, kw): | |
if self.timeout is not None and 'timeout' not in kw: | |
kw['timeout'] = self.timeout | |
return kw | |
def get(self, path = '', **kw): | |
kw = self._apply_timeout(kw) | |
r = self.sess.get(f'{self.url}/{path}', **kw) | |
r.raise_for_status() | |
return r.json() | |
def post(self, path, **kw): | |
kw = self._apply_timeout(kw) | |
r = self.sess.post(f'{self.url}/{path}', **kw) | |
r.raise_for_status() | |
return r.json() | |
def put(self, path, **kw): | |
kw = self._apply_timeout(kw) | |
r = self.sess.put(f'{self.url}/{path}', **kw) | |
r.raise_for_status() | |
return r.json() | |
def delete(self, path, **kw): | |
kw = self._apply_timeout(kw) | |
r = self.sess.delete(f'{self.url}/{path}', **kw) | |
r.raise_for_status() | |
return r.json() | |
def head(self, path, **kw): | |
kw = self._apply_timeout(kw) | |
r = self.sess.head(f'{self.url}/{path}', **kw) | |
return r.status_code | |
def version(self): | |
return self.get()['version'] | |
def membership(self): | |
return self.get('_membership') | |
def cluster_setup(self, req): | |
return self.post('_cluster_setup', json = req) | |
def create_db(self, dbname): | |
if dbname not in self: | |
try: | |
self.put(dbname, timeout=TIMEOUT) | |
except HTTPError as err: | |
response = err.response | |
if not response: | |
Exception(f"{dbname} could not be created") | |
if response.status_code == 412: | |
log(f" -> {dbname} PUT returned a 412. DB is already created") | |
return True | |
raise err | |
if dbname not in self: | |
raise Exception(f"{dbname} could not be created") | |
else: | |
return True | |
def bulk_docs(self, dbname, docs, timeout=TIMEOUT): | |
return self.post(f'{dbname}/_bulk_docs', json = {'docs': docs}) | |
def bulk_get(self, dbname, docs, timeout=TIMEOUT): | |
return self.post(f'{dbname}/_bulk_get', json = {'docs': docs}) | |
def compact(self, dbname, **kw): | |
r = self.sess.post(f'{self.url}/{dbname}/_compact', json = {}, **kw) | |
r.raise_for_status() | |
return r.json() | |
def set_revs_limit(self, dbname, revs_limit): | |
if not isinstance(revs_limit, str): | |
revs_limit = str(revs_limit) | |
r = self.sess.put(f'{self.url}/{dbname}/_revs_limit', data = revs_limit) | |
r.raise_for_status() | |
return r.json() | |
def get_revs_limit(self, dbname): | |
r = self.sess.get(f'{self.url}/{dbname}/_revs_limit') | |
r.raise_for_status() | |
return int(r.json()) | |
def config_set(self, section, key, val): | |
url = f'_node/_local/_config/{section}/{key}' | |
return self.put(url, data='"'+val+'"') | |
def config_get(self, section, key): | |
url = f'_node/_local/_config/{section}/{key}' | |
return self.get(url) | |
def __iter__(self): | |
dbs = self.get('_all_dbs') | |
return iter(dbs) | |
def __str__(self): | |
return "<Server:%s>" % self.url | |
def __contains__(self, dbname): | |
res = self.head(dbname) | |
if res == 200: | |
return True | |
if res == 404: | |
return False | |
raise Exception(f"Unexpected head status code {res}") | |
def execute_query_fun(args, dbname, ddoc, i, srv): | |
params = {'limit': str(VIEW_BATCH)} | |
try: | |
res = srv.get(f'{dbname}/_design/{ddoc}/_view/{VIEW}', params=params) | |
count = len(res['rows']) | |
log(" **** %s/%s rows: %s" % (dbname, ddoc, count)) | |
except HTTPError as e: | |
log(" ***** %s/%s view call failed with :%s" % (dbname, ddoc, e)) | |
def execute_create_fun(args, pid, tid, i, srv): | |
dbname = "%s_%s_%s_%s" % (args.dbname, pid, tid, i) | |
revs_limit = MAX_REVS_LIMIT | |
num_docs = args.num_docs | |
srv.create_db(dbname) | |
update_revs_limit(srv, dbname, revs_limit) | |
key = [pid, tid, i] | |
doc_id = i * num_docs | |
all_docs = {} | |
ddocs = [get_design_doc(docnum, key) for docnum in range(args.design_docs)] | |
bulk_docs_with_retry(srv, dbname, ddocs) | |
batches = num_docs // BATCH | |
for b in range(batches): | |
(doc_id, docs) = generate_docs(doc_id, BATCH, all_docs) | |
bulk_docs_with_retry(srv, dbname, docs) | |
if args.compact: | |
srv.compact(dbname) | |
revs_limit = update_revs_limit(srv, dbname, revs_limit) | |
log(f" -> batch pid:{pid} tid:{tid} i:{i} batch:{b} revs_limit:{revs_limit}") | |
left = num_docs - batches * BATCH | |
(doc_id, docs) = generate_docs(doc_id, left, all_docs) | |
bulk_docs_with_retry(srv, dbname, docs) | |
verify_docs(srv, dbname, all_docs) | |
if args.compact: | |
srv.compact(dbname) | |
def bulk_docs_with_retry(srv, dbname, docs): | |
try: | |
r = srv.bulk_docs(dbname, docs) | |
r.raise_for_status() | |
except HTTPError as err: | |
response = err.response | |
if response and response.status_code == 500: | |
# We expect a {{badmatch,{error,timeout}},[{ddoc_cache_entry_validation_funs,recover,..} | |
# so we try to mask that | |
log(f" -> retrying _bulk_docs due to a 500 error {dbname} {len(docs)} {response}") | |
time.sleep(1.0) | |
return bulk_docs_with_retry(srv, dbname, docs) | |
raise err | |
def update_revs_limit(srv, dbname, revs_limit): | |
cur_revs_limit = srv.get_revs_limit(dbname) | |
if cur_revs_limit != revs_limit: | |
Exception(f" !!! Error revs_limit not persisted {cur_revs_limit} != {revs_limit}") | |
if revs_limit >= MAX_REVS_LIMIT: | |
revs_limit = MIN_REVS_LIMIT | |
else: | |
revs_limit += 1 | |
srv.set_revs_limit(dbname, revs_limit) | |
return revs_limit | |
def generate_docs(doc_id, batch, all_docs): | |
docs = [] | |
for i in range(batch): | |
doc_id += 1 | |
doc = generate_doc(doc_id) | |
docs.append(doc) | |
all_docs[doc['_id']] = doc | |
return (doc_id, docs) | |
def generate_doc(doc_id): | |
doc_id_str = '%012d' % doc_id | |
return { | |
'_id': doc_id_str, | |
'data': doc_id | |
} | |
def verify_docs(srv, dbname, all_docs): | |
batch_size = 0 | |
batch = {} | |
for doc_id in all_docs: | |
doc = all_docs[doc_id] | |
batch[doc_id] = doc | |
if len(batch) >= BATCH: | |
verify_batch(srv, dbname, batch) | |
batch = {} | |
verify_batch(srv, dbname, batch) | |
def verify_batch(srv, dbname, docs, retries = 100, sleep = 2.0): | |
getdocs = [{'id': doc['_id']} for doc in docs.values()] | |
res = srv.bulk_get(dbname, getdocs) | |
if 'results' not in res: | |
raise Exception("!!! Invalid _bulk_ response no 'results' ***") | |
results = res['results'] | |
# row example might be: | |
# { | |
# 'id': '000000000001', | |
# 'docs': [ | |
# {'ok': {'_id': '000000000001', '_rev': '1-13..', 'data': 1}} | |
# ] | |
# } | |
retry_docs = {} | |
for row in results: | |
row_id = row['id'] | |
if 'docs' not in row: | |
raise Exception(f"!!! Invalid docs for row {row_id}") | |
row_docs = row['docs'] | |
if len(row_docs) != 1: | |
raise Exception(f"!!! Row docs length != 1 {row_docs}") | |
doc_res = row_docs[0] | |
if 'error' in doc_res: | |
errobj = doc_res['error'] | |
is_timeout = errobj.get('reason') == 'timeout' | |
is_internal_fabric_error = errobj.get('error') == 'internal_fabric_error' | |
if is_timeout or is_internal_fabric_error: | |
retry_docs[row_id] = docs[row_id] | |
else: | |
raise Exception(f"!!! Doc error for row {doc_res}") | |
if 'ok' not in doc_res: | |
raise Exception(f"!!! Doc error for row {doc_res}") | |
doc_body = doc_res['ok'] | |
all_docs_doc = docs[row_id] | |
if all_docs_doc['data'] != doc_body['data']: | |
raise Exception(f"!!! Error data doesn't match {all_docs_doc} != {doc_body}") | |
if retry_docs: | |
log(" Found some doc timeouts during _bulk_get fetch, retrying: ", len(d)) | |
if retries <= 0: | |
raise Exception(f"!!! Ran out of {retries} with these docs {retry_docs}") | |
time.sleep(sleep) | |
verify_batch(srv, dbname, retry_docs, retries - 1, sleep = sleep) | |
def thread_worker(args): | |
pid = os.getpid() | |
tid = args.tid | |
url = pick_server(args.urls) | |
srv = Server(args, url) | |
ver = srv.version() | |
tries = args.tries | |
query = args.query | |
if query: | |
dbnames = [dbname for dbname in srv if dbname.startswith(args.dbname + '_')] | |
if not dbnames: | |
log(" >>> no databases matching prefix",args.dbname, "found") | |
return | |
for i in range(tries): | |
dbname = random.choice(dbnames) | |
ddoc = random.randrange(0, args.design_docs) | |
try: | |
execute_query_fun(args, dbname, ddoc, i, srv) | |
except Exception as e: | |
log(" >>> Query worker exception", e) | |
traceback.print_exc(file=sys.stderr) | |
continue | |
else: | |
for i in range(tries): | |
# try: | |
execute_create_fun(args, pid, tid, i, srv) | |
# except Exception as e: | |
# log(" >>> Worker exception caught", e) | |
# traceback.print_exc(file=sys.stderr) | |
# continue | |
return tid | |
def set_worker_id(args, tid): | |
args = copy.deepcopy(args) | |
args.tid = tid | |
return args | |
def process_worker(args): | |
wcount = args.worker_count | |
tpool = ThreadPool(wcount) | |
worker_args = [set_worker_id(args, i) for i in range(wcount)] | |
list(tpool.imap_unordered(thread_worker, worker_args)) | |
return True | |
def wait_urls(args): | |
srvs = [] | |
for url in args.urls: | |
srv = wait_url(args, url) | |
log(" >> Server up", url, srv.version()) | |
srvs.append(srv) | |
return srvs | |
def wait_url(args, url): | |
while True: | |
srv = Server(args, url) | |
try: | |
srv.version() | |
return srv | |
except Exception as e: | |
log(">>> Waiting for server", url) | |
time.sleep(1.0) | |
def setup_cluster(args): | |
srvs = wait_urls(args) | |
srv = srvs[0] | |
for srv in srvs: | |
membership = srv.membership() | |
cluster_nodes = membership['cluster_nodes'] | |
for node in args.setup: | |
if node not in cluster_nodes: | |
try: | |
srv.put(f'_node/_local/_nodes/{node}', json = {}) | |
except HTTPError as e: | |
if e.response.status_code == 409: | |
log(">> Node already set up", node, srv.url) | |
else: | |
raise e | |
srv.create_db('_users') | |
def config_nodes(args): | |
if args.setup != []: | |
setup_cluster(args) | |
for url in args.urls: | |
srv = Server(args, url) | |
for section in CONFIG: | |
for (key, val) in CONFIG[section].items(): | |
srv.config_set(section, key, val) | |
def clear(args): | |
prefix = args.dbname + '_' | |
srv = Server(args, args.urls[0]) | |
dbnames = [srv.delete(db) for db in srv if db.startswith(prefix)] | |
def main(args): | |
if args.urls == []: | |
args.urls = DB_URLS | |
args = _get_auth(args) | |
config_nodes(args) | |
clear(args) | |
ppool = Pool(processes=args.processes) | |
pool_args = [args for pnum in range(args.processes)] | |
log() | |
log("********************* START ******************") | |
log() | |
list(ppool.imap_unordered(process_worker, pool_args)) | |
log() | |
log("********************* DONE *******************") | |
log() | |
def _get_auth(args): | |
if args.auth: | |
args.auth = tuple(args.auth.split(':')) | |
elif 'AUTH' in os.environ: | |
authstr = os.environ['AUTH'] | |
args.auth = tuple(authstr.split(':')) | |
log(" ! using auth", username," from AUTH env var") | |
else: | |
args.auth = (USER, PASS) | |
return args | |
def _args(): | |
description = "Do a few crud operations as a stampede" | |
p = argparse.ArgumentParser(description = description) | |
# list of couchdb server nodes to set up initially | |
p.add_argument('-s', '--setup', action="append", default=[], help = "Server nodes") | |
# list of couchdb server urls to connect to (usually one, but can have multiple | |
# in that case they are randomly picked) | |
p.add_argument('-u', '--urls', action="append", default=[], help = "Server URL(s)") | |
# db name prefix databases will be created with db_pid_threadid_.... pattern | |
p.add_argument('-d', '--dbname', default=DB_NAME, help = "DB name") | |
# how many design docs (indexes) to create | |
p.add_argument('-x', '--design_docs', type=int, default=1) | |
# how many regular docs to insert, docs are inserted in a batch | |
# default batch size is BATCH | |
p.add_argument('-n', '--num_docs', type=int, default=10) | |
# how many worker threads to start per process | |
p.add_argument('-w', '--worker-count', type=int, default=1) | |
# how many times to repeat the operation per-tread (create docs, query) | |
p.add_argument('-t', '--tries', type=int, default=1) | |
# how many processes to start, each process will start `worker-count` threads | |
p.add_argument('-p', '--processes', type=int, default=1) | |
# user:pass basic auth creds | |
p.add_argument('-a', '--auth', default=None) | |
# run queries on dbs instead of inserting data | |
p.add_argument('-q', '--query', action="store_true", default=False) | |
# clear all the database starting with out DB_NAME prefix (db_) | |
p.add_argument('-c', '--clear', action="store_true", default=False) | |
# compact after inserting every batch | |
p.add_argument('-k', '--compact', action="store_true", default=False) | |
res = p.parse_args() | |
if res.setup != [] and res.urls != []: | |
if len(res.setup) != len(res.urls): | |
raise Exception("Setup nodes number must match number of URLs") | |
return res | |
if __name__=='__main__': | |
mp.set_start_method('spawn') | |
args = _args() | |
main(_args()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment