Skip to content

Instantly share code, notes, and snippets.

@shon
Created November 18, 2019 09:07
Show Gist options
  • Save shon/36c59af03f8c3c24da55925d01286c16 to your computer and use it in GitHub Desktop.
Save shon/36c59af03f8c3c24da55925d01286c16 to your computer and use it in GitHub Desktop.
Benchmarking Multi tenant | Shared Table vs Postgres Schema
import datetime
import os
import string
from random import randrange
from peewee import Model, PostgresqlDatabase
from peewee import BooleanField, TextField, CharField
from playhouse.postgres_ext import ArrayField, BinaryJSONField
NO_OF_DBS = 500
NO_OF_POSTS = 100
NO_OF_SELECTS = 1000
DB_NAME = 'monolith'
CONTENT = string.ascii_letters
database = PostgresqlDatabase(DB_NAME)
rands = [randrange(1, NO_OF_DBS * NO_OF_POSTS) for i in range(NO_OF_SELECTS)]
class ctx:
schema = None
def timeit(f):
def wrapped():
then = datetime.datetime.now()
for n in rands:
f(n) == CONTENT
now = datetime.datetime.now()
delta = (now - then) / len(rands)
return (f.__name__, delta)
return wrapped
class PostBase(Model):
class Meta:
database = database
content = TextField()
published = BooleanField(default=datetime.datetime.now)
updated = BooleanField(default=datetime.datetime.now)
author = BinaryJSONField(default={'name': 'Firstname Lastname', 'id': 9001})
tags = ArrayField(CharField, default=['tag A', 'tag B', 'tag C'])
categories = ArrayField(CharField, default=['cat 1', 'cat 2'])
class SharedTablePost(PostBase):
site = TextField(index=True)
class SchemaPost(PostBase):
pass
def drop_db():
print(f'dropping db: {DB_NAME}')
os.system(f'dropdb {DB_NAME}')
def populate_shared_table():
ctx.schema = 'public'
database.execute_sql(f'set search_path to {ctx.schema}')
SharedTablePost.create_table()
for i in range(NO_OF_DBS * NO_OF_POSTS):
content = f'{CONTENT}:{i}'
SharedTablePost.create(site=i, content=content).save()
def populate_db_schema():
SchemaPost.create_table()
for i in range(NO_OF_DBS):
schemaname = f'site{i}'
ctx.schema = schemaname
database.execute_sql(f'create schema {schemaname}')
database.execute_sql(f'set search_path to {ctx.schema}')
SchemaPost.create_table()
for j in range(NO_OF_POSTS):
content = f'{CONTENT}:{j}'
SchemaPost.create(content=content).save()
if (i % 10) == 0:
print('populate_db_schema: ', i)
def select_schema(n):
db = int(n / NO_OF_POSTS)
post = (n % NO_OF_POSTS) or 1
# print(NO_OF_POSTS, db, post)
ctx.schema = f'site{db}'
database.execute_sql(f'set search_path to {ctx.schema}')
content = SchemaPost.get(SchemaPost.id == post).content
content = SchemaPost.get(SchemaPost.id == post).content
#assert content.endswith(f':{post}')
def select_schema_sql(n):
db = int(n / NO_OF_POSTS)
post = (n % NO_OF_POSTS) or 1
# print(NO_OF_POSTS, db, post)
ctx.schema = f'site{db}'
cursor = database.execute_sql(f'select * from {ctx.schema}.schemapost where id = {post}')
content = cursor.fetchall()[0][1]
cursor = database.execute_sql(f'select * from {ctx.schema}.schemapost where id = {post}')
content = cursor.fetchall()[0][1]
def select_schema_sql_set_path(n):
db = int(n / NO_OF_POSTS)
post = (n % NO_OF_POSTS) or 1
# print(NO_OF_POSTS, db, post)
ctx.schema = f'site{db}'
database.execute_sql(f'set search_path to {ctx.schema}')
cursor = database.execute_sql(f'select * from schemapost where id = {post}')
content = cursor.fetchall()[0][1]
cursor = database.execute_sql(f'select * from schemapost where id = {post}')
content = cursor.fetchall()[0][1]
def select_shared_table(n):
content = SharedTablePost.get(SharedTablePost.id == n).content
content = SharedTablePost.get(SharedTablePost.id == n).content
# assert content.endswith(f':{n}')
def select_shared_table_sql(n):
cursor = database.execute_sql(f'select * from sharedtablepost where id = {n}')
content = cursor.fetchall()[0][1]
cursor = database.execute_sql(f'select * from sharedtablepost where id = {n}')
content = cursor.fetchall()[0][1]
def prepare():
print('prepare: begin')
drop_db()
os.system(f'createdb {DB_NAME}')
def measure():
print('prepare: populate_db_schema')
populate_db_schema()
print('measuring..')
t_schema = timeit(select_schema)()
t_schema_sql = timeit(select_schema_sql)()
print('prepare: populate_shared_table')
ctx.schema = 'public'
database.execute_sql(f'set search_path to {ctx.schema}')
populate_shared_table()
print('measuring..')
t_st = timeit(select_shared_table)()
t_st_sql = timeit(select_shared_table_sql)()
print(t_st)
print(t_st_sql)
print(t_schema)
print(t_schema_sql)
if __name__ == '__main__':
prepare()
measure()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment