Skip to content

Instantly share code, notes, and snippets.

@nickva
Created February 19, 2021 22:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nickva/7e86b3df19537a60372217e0f68b693a to your computer and use it in GitHub Desktop.
Save nickva/7e86b3df19537a60372217e0f68b693a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# Run with an n=1 dev/run test cluster
# Needs python 3.7 minimum
#
# $ virtualenv -p python3 venv3
# $ . venv3/bin/activate
# $ pip install requests
import base64
import time
import json
from datetime import datetime
from urllib.parse import quote_plus
import requests
USER = 'adm'
PASS = 'pass'
SESS = requests.session()
SESS.auth = ("adm", "pass")
URL = "http://127.0.0.1:15984/"
CONFIG_URL = URL + '_node/_local/_config/'
RDB1 = "rdb1/_replicator"
RDB2 = "rdb2/_replicator"
RDB3 = "rdb3/_replicator"
INTERVAL = 10000
CONFIG = {
"replicator": {
"max_jobs": "100",
"max_churn": "10",
"startup_jitter": "0",
"interval" : str(INTERVAL),
},
"replicator.shares": {
RDB1 : "300",
RDB2 : "200",
RDB3 : "100",
}
}
REPS = {
RDB1 : 100,
RDB2 : 100,
RDB3 : 100,
}
def update_config(config):
print(" ** Updating config")
print()
for section, kv in config.items():
print(f" [{section}] ")
for k, v in kv.items():
url = CONFIG_URL + section + "/" + quote_plus(k)
data = json.dumps(v)
SESS.put(url, data = data).raise_for_status()
print(f" {k} = {v}")
print()
def maybe_delete_db(db):
url = db_url(db)
resp = SESS.get(url)
if resp.ok:
SESS.delete(url).raise_for_status()
def create_db(db):
SESS.put(db_url(db), params = {'q':'1'}).raise_for_status()
def create_source_dbs(start, end):
dbs = []
for i in range(start, end):
db = "tst_" + str(i)
resp = SESS.get(db_url(db))
if not resp.ok:
create_db(db)
dbs.append(db)
return dbs
def create_reps(rdb, source_dbs):
for i, src in enumerate(source_dbs):
tgt = src + '_t'
headers = {'Authorization': ba_header(USER, PASS)}
rdoc = {
'_id': str(i),
'source': {'url': db_url(src), 'headers': headers},
'target': {'url': db_url(tgt), 'headers': headers},
'continuous': True,
'create_target': True,
'worker_processes': 1
}
headers = {"Content-Type": "application/json"}
data = json.dumps(rdoc)
SESS.post(db_url(rdb), headers=headers, data=data).raise_for_status()
def ba_header(user, password):
user_pass = (user + ':' + password).encode('utf-8')
return 'Basic ' + base64.b64encode(user_pass).decode('utf-8')
def db_url(db):
if db.lower().startswith('http://') or db.lower().startswith('https://'):
return db
else:
return URL + quote_plus(db)
def create_all_reps(rep_spec):
dbcount = 0
for rdb, num in rep_spec.items():
maybe_delete_db(rdb)
create_db(rdb)
source_dbs = create_source_dbs(dbcount, dbcount + num)
print(f" ** Creating {num} replications in {rdb}")
create_reps(rdb, source_dbs)
dbcount += num
def unix_ts(iso_ts):
iso_ts = iso_ts.upper()
if iso_ts[-1] == 'Z':
iso_ts = iso_ts[0:-1]
dtime = datetime.fromisoformat(iso_ts)
return time.mktime(dtime.timetuple())
def job_states(rdb):
url = URL + '_scheduler/docs/' + rdb
resp = SESS.get(url)
resp.raise_for_status()
jobs = {}
for job in resp.json()["docs"]:
jobs[job['doc_id']] = {
'state': job['state'],
'last_updated': unix_ts(job['last_updated'])
}
return jobs
def sample_states(REPS):
jobs = {}
while True:
for rdb in REPS:
jobs[rdb] = job_states(rdb)
print()
print_summary(jobs)
time.sleep(float(INTERVAL / 1000))
def print_summary(jobs):
print()
running = {}
total = 0
for rdb in jobs:
running[rdb] = 0
for doc_id in jobs[rdb]:
state = jobs[rdb][doc_id]['state']
if state == "running":
running[rdb] +=1
total += 1
print(f" ** run stats num dbs: {len(running)}, total jobs: {total}")
if total <= 0:
return
for rdb in running:
rcount = running[rdb]
percent = int(round(rcount / total * 100.0))
print(f"{rdb} {rcount} {percent}%")
def print_jobs(jobs):
for rdb in jobs:
for doc_id in jobs[rdb]:
state = jobs[rdb][doc_id]['state']
if state == "running":
last_updated = jobs[rdb][doc_id]['last_updated']
print(f"{rdb} {doc_id} {state} {last_updated}")
def main():
update_config(CONFIG)
create_all_reps(REPS)
sample_states(REPS)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment