Skip to content

Instantly share code, notes, and snippets.

@k1ife
Forked from vadv/init.sql
Created May 9, 2016 12:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save k1ife/cb435d54c1df1cda29daeacb733bc9dc to your computer and use it in GitHub Desktop.
Save k1ife/cb435d54c1df1cda29daeacb733bc9dc to your computer and use it in GitHub Desktop.
bench postgresql
psql -c 'create database pgbench'
psql -c 'create database results'
psql -c '
CREATE TABLE testset(
set serial PRIMARY KEY,
info text
);
CREATE TABLE tests(
test serial PRIMARY KEY,
set int NOT NULL REFERENCES testset(set) ON DELETE CASCADE,
scale int,
dbsize int8,
start_time timestamp default now(),
end_time timestamp default null,
tps decimal default 0,
clients int,
workers int,
trans int,
avg_latency float,
wal_written numeric
);
' results
#!/usr/bin/env python
# encoding: utf-8
# без зависимостей и по идее должно работать на python 2.4-3.x, точно работает на дефолтном rhel 6
# используеться 2 db, одна под pgbench, другая, results под результаты
import time, subprocess, sys, optparse, tempfile, re
import multiprocessing
class Defaults:
"""дефолтные настройки"""
HOST = "localhost"
PORT = 5432
USER = "postgres"
TESTDB = "pgbench"
RESULTDB = "results"
SCALES = "1 200"
CLIENTS = "1 64 128 256"
TIMES = 2
RUNTIME = 5*60
WORKERS = 0
SCRIPT_SELECT = """
\set naccounts 100000 * :scale
\setrandom aid 1 :naccounts
SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
"""
SCRIPT_INSERT = """
\set nbranches :scale
\set ntellers 10 * :scale
\set naccounts 100000 * :scale
\setrandom aid 1 :naccounts
\setrandom bid 1 :nbranches
\setrandom tid 1 :ntellers
\setrandom delta -5000 5000
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
"""
class Args(Defaults):
"""Парсер аргументов"""
def __init__(self):
parser = optparse.OptionParser(description="Postgres benchmark tools")
parser.add_option("--info", help="Test set info")
parser.add_option("--host", default=self.HOST)
parser.add_option("-p", "--port", type=int, default=self.PORT)
parser.add_option("-U", "--user", default=self.USER)
parser.add_option("-t", "--type", default="select", help="select|insert or path to script")
parser.add_option("-S", "--scales", default=self.SCALES)
parser.add_option("-c", "--clients", default=self.CLIENTS)
parser.add_option("--times", type=int, default=self.TIMES)
parser.add_option("-T", "--runtime", type=int, default=self.RUNTIME)
parser.add_option("-W", "--workers", type=int, default=self.WORKERS)
parser.add_option("--testdb", default=self.TESTDB)
parser.add_option("--resultdb", default=self.RESULTDB)
self.args, _ = parser.parse_args()
if self.args.info is None:
sys.exit("Info must be set.")
def __get_script_path(self):
f = tempfile.NamedTemporaryFile(delete=False)
if self.type == 'select':
f.write(self.SCRIPT_SELECT)
elif self.type == 'insert':
f.write(self.SCRIPT_INSERT)
else:
return self.type
return f.name
def script_path(self):
if self.__script_path is None:
self.__script_path = self.__get_script_path()
return self.__script_path
def workers_for(self, clients):
max_workers = self.workers
if max_workers == 0:
max_workers = multiprocessing.cpu_count()
num_workers = max_workers
while(num_workers > 1):
reminder = int(clients) % num_workers
if reminder == 0:
break
num_workers = num_workers - 1
return num_workers
def __getattr__(self, name):
try:
return self.args.__dict__[name]
except KeyError:
return None
class Shell(object):
"""обертка над subprocess для запуска команд"""
def __init__(self, cmd, wait_time = 0):
self.cmd = cmd
self.stdout, self.stderr = None, None
self.wait_time, self.exec_time = wait_time, 0
self.bar_length = 40 # todo: get console window width
self.run()
def run(self):
p = subprocess.Popen(self.cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
while p.poll() is None:
self.__progressbar_write()
time.sleep(1)
self.__progressbar_finish()
if p.returncode != 0:
sys.exit("Command '{0}' failed with code: {1}".format(self.cmd, p.returncode))
self.stdout = "".join( p.stdout.readlines() )
self.stderr = "".join( p.stderr.readlines() )
def __progressbar_write(self):
if self.wait_time == 0:
return
percent = float(self.exec_time) / self.wait_time
hashes = '#' * int(round(percent * self.bar_length))
spaces = ' ' * (self.bar_length - len(hashes))
self.exec_time += 1
sys.stdout.write("\rProgress: [{0}] {1}%".format(hashes + spaces, int(round(percent * 100))))
sys.stdout.flush()
def __progressbar_finish(self):
if self.wait_time != 0:
sys.stdout.write("\rProgress: [{0}] {1}%\n".format('#'*self.bar_length, 100))
sys.stdout.flush()
class Commands(object):
"""формирует командную строку"""
def __init__(self, args=Args()):
self.args = args
self.sql = "psql -h {0} -U {1} -p {2}".format(args.host, args.user, args.port)
self.testdb = "{0} -d {1}".format(self.sql, args.testdb)
self.resultdb = "{0} -d {1}".format(self.sql, args.resultdb)
self.checkpoint = "{0} -c 'checkpoint'".format(self.testdb)
self.vacuum = "{0} -c 'vacuum'".format(self.testdb)
self.checkpoints_req = "{0} -At -c 'select checkpoints_req from pg_stat_bgwriter'".format(self.testdb)
self.set_num = "{0} -At -c 'select max(set) from testset'".format(self.resultdb)
self.test_num = "{0} -At -c 'select max(test) from tests'".format(self.resultdb)
self.testdb_size = "{0} -At -c \"select pg_database_size('{1}')\"".format(self.testdb, args.testdb)
self.set_new = "{0} -At -c \"insert into testset (info) values('{1}')\"".format(self.resultdb, self.args.info)
def truncate(self, table):
return "{0} -c 'truncate table {1}'".format(self.testdb, table)
def populate(self, scale):
return "pgbench -i -s {0} -h {1} -U {2} -p {3} {4}".format(scale,
self.args.host, self.args.user, self.args.port, self.args.testdb)
def benchmark(self, clients, scale):
return "pgbench -f {0} -s {1} -n -T {2} -U {3} -h {4} -p {5} -c {6} -j {7} {8}".format(
self.args.script_path(), scale, self.args.runtime, self.args.user, self.args.host,
self.args.port, clients, self.args.workers_for(clients), self.args.testdb)
def test_new(self, sset, clients, scale, dbsize):
return "{0} -q -c \"insert into tests (clients,workers,set,scale,dbsize) values('{1}','{2}','{3}','{4}','{5}')\"".format(
self.resultdb, clients, self.args.workers_for(clients), sset, scale, dbsize)
def test_end(self, test_num, tps, trans, avg_latency):
return "{0} -q -c 'update tests set (end_time, tps, trans, avg_latency) = (now(), {1}, {2}, {3}) where test={4}'".format(
self.resultdb, tps, trans, avg_latency, test_num)
class Result(object):
"""Парсер результатов pgbench"""
def __init__(self, out):
try:
self.out = out
m = re.search('tps = (\d+)(,|\.)(.+)including connections establishing(.+)', self.out)
self.tps = int(m.group(1))
m = re.search('number of transactions actually processed\: (\d+)', self.out)
self.trans = int(m.group(1))
except AttributeError:
sys.exit("Can't parse stdout:\n{0}".format(self.out))
try:
m = re.search('latency average\: (\d+)\.(\d+) ms', self.out)
self.avg_latency = float(m.group(1)+"."+m.group(2))
except AttributeError:
self.avg_latency = 0.0
pass
def run():
args = Args()
cmds = Commands(args)
# init test set
Shell(cmds.set_new)
set_num = int(Shell(cmds.set_num).stdout)
for scale in args.scales.split(" "):
print("Run vacuum")
Shell(cmds.vacuum)
print("Run populate for scale: {0}\n".format(scale))
Shell(cmds.populate(scale))
for x in range(args.times):
for client in args.clients.split(" "):
print("Run test №{0} for scale={1} clients={2}".format(x+1, scale, client))
print("Truncate tables for insert")
Shell(cmds.truncate('pgbench_history'))
Shell(cmds.truncate('pgbench_branches'))
Shell(cmds.truncate('pgbench_tellers'))
print("Wait checkpoint")
start_req = Shell(cmds.checkpoints_req).stdout
Shell(cmds.checkpoint)
while(True):
if Shell(cmds.checkpoints_req).stdout != start_req:
break
time.sleep(1)
# init test
dbsize = int(Shell(cmds.testdb_size).stdout)
Shell(cmds.test_new(set_num, client, scale, dbsize))
test_num = int(Shell(cmds.test_num).stdout)
# run benchmark
benchmark = cmds.benchmark(client, scale)
print("Run: '{0}'".format(benchmark))
out = Shell(benchmark, wait_time=args.runtime).stdout
# finish test
res = Result(out)
Shell(cmds.test_end(test_num, res.tps, res.trans, res.avg_latency))
print("Test result: tps={0} trans={1} avg_latency={2}\n".format(res.tps, res.trans, res.avg_latency))
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment