Skip to content

Instantly share code, notes, and snippets.

@cburgdorf
Created May 9, 2019 13:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cburgdorf/ecee18992b88b16e417fd73d3edef332 to your computer and use it in GitHub Desktop.
Save cburgdorf/ecee18992b88b16e417fd73d3edef332 to your computer and use it in GitHub Desktop.
import logging
import multiprocessing
from multiprocessing.managers import (
BaseManager,
)
import tempfile
import time
import pytest
from eth.chains.ropsten import ROPSTEN_GENESIS_HEADER
from eth.db.backends.level import LevelDB
from eth.db.atomic import (
AtomicDB,
)
from eth.db.chain import (
ChainDB,
)
from trinity.db.eth1.manager import (
create_db_server_manager,
create_db_consumer_manager,
)
from trinity.config import (
TrinityConfig,
)
from trinity.initialization import (
initialize_data_dir,
)
from trinity.constants import ROPSTEN_NETWORK_ID
from trinity.db.rocksdb import RocksDB
from trinity.db.eth1.chain import AsyncChainDBProxy
from trinity.db.base import AsyncDBProxy
from trinity._utils.ipc import (
wait_for_ipc,
kill_process_gracefully,
)
def serve_db_process(dbtype):
def serve_chaindb(manager):
server = manager.get_server()
server.serve_forever()
with tempfile.TemporaryDirectory() as temp_dir:
trinity_config = TrinityConfig(
network_id=ROPSTEN_NETWORK_ID,
trinity_root_dir=temp_dir,
)
db_path = temp_dir + '/db'
core_db = dbtype(db_path)
core_db[b'key-a'] = b'value-a'
initialize_data_dir(trinity_config)
manager = create_db_server_manager(trinity_config, core_db)
chaindb_server_process = multiprocessing.Process(
target=serve_chaindb,
args=(manager,),
)
chaindb_server_process.start()
wait_for_ipc(trinity_config.database_ipc_path)
try:
yield trinity_config.database_ipc_path, db_path
finally:
kill_process_gracefully(chaindb_server_process, logging.getLogger())
NUM_READS = 10000
NUM_READER_PROCESSES = 3
def make_reads(db):
for _ in range(NUM_READS):
assert b'key-a' in db
assert db[b'key-a'] == b'value-a'
with pytest.raises(KeyError):
db[b'not-present']
def launch_leveldb_reader_process(path):
manager = create_db_consumer_manager(path, connect=True)
db = manager.get_db()
make_reads(db)
def launch_rocksdb_reader_process(path):
db = RocksDB(path, read_only=True)
make_reads(db)
def run_benchmark(db_type, launcher):
for (ipc_path, db_path) in serve_db_process(db_type):
procs = []
started_at = time.perf_counter()
for _ in range(NUM_READER_PROCESSES):
proc = launcher(ipc_path, db_path)
proc.start()
procs.append(proc)
for proc in procs:
proc.join()
finished_at = time.perf_counter()
print(f"Took {finished_at - started_at}")
def benchmark_leveldb_via_proxy():
print(f"1 process serving leveldb, {NUM_READER_PROCESSES} processes reading {NUM_READS} times through IPC proxy")
run_benchmark(LevelDB, lambda path, db_path: multiprocessing.Process(target=launch_leveldb_reader_process, kwargs={'path': path}))
def benchmark_rocksdb_locally():
print(f"1 process serving rocksdb, {NUM_READER_PROCESSES} processes reading {NUM_READS} times through local readonly connection")
run_benchmark(RocksDB, lambda path, db_path: multiprocessing.Process(target=launch_rocksdb_reader_process, kwargs={'path': db_path}))
if __name__ == '__main__':
benchmark_leveldb_via_proxy()
benchmark_rocksdb_locally()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment