Blog post: Evaluating ScyllaDB for production 2/2
#!/usr/bin/env python3.5 | |
from time import time | |
import dask.dataframe as dd | |
import pandas as pd | |
from cassandra.cluster import Cluster | |
from cassandra.concurrent import execute_concurrent | |
from cassandra.policies import ConstantReconnectionPolicy | |
from cassandra.query import tuple_factory | |
from dask.distributed import Client | |
def get_df_from_scylla(pandas_df): | |
ts = time() | |
cluster = Cluster( | |
['scylla-node-1', 'scylla-node-2', 'scylla-node-3'], | |
connect_timeout=300, | |
executor_threads=2, | |
control_connection_timeout=None, | |
protocol_version=4, | |
reconnection_policy=ConstantReconnectionPolicy( | |
delay=1, max_attempts=300), ) | |
session = cluster.connect() | |
session.default_timeout = 300 | |
session.row_factory = tuple_factory | |
session.set_keyspace('test_keyspace') | |
select_partnerid = session.prepare( | |
"SELECT id, partnerid FROM partner_id_match_400M_rows WHERE id = ?") | |
ids = set() | |
statements_and_params = [] | |
for id in pandas_df['id']: | |
if id in ids: | |
continue | |
ids.add(id) | |
statements_and_params.append((select_partnerid, (id, ))) | |
concurrent = execute_concurrent( | |
session, | |
statements_and_params, | |
concurrency=512, | |
raise_on_first_error=True, | |
results_generator=True) | |
print('waiting for', len(statements_and_params), 'concurrent queries') | |
results = [] | |
for (success, result) in concurrent: | |
if not success: | |
raise Exception('something bad happened', success, result) | |
else: | |
for r in result: | |
results.append(r) | |
print('creating dataframe') | |
pandas_df = pandas_df.merge( | |
pd.DataFrame(results, columns=['id', 'partnerid'], dtype=object), | |
on='id') | |
cluster.shutdown() | |
print('run time:', time() - ts) | |
print('') | |
return pandas_df | |
if __name__ == '__main__': | |
ts = time() | |
client = Client('dask-scheduler.fqdn:8786') | |
client.restart() | |
nworkers = len(client.ncores()) | |
print(nworkers, 'workers available') | |
ids_df = dd.read_sql_table( | |
table='population_10M_rows', | |
uri='hive://hive-server.fqdn:10000/test_db', | |
index_col='id', | |
columns=['id'], | |
npartitions=nworkers, | |
limits=(1, 10000000), ) | |
print('read sql done:', time() - ts) | |
ids_df = ids_df.map_partitions(get_df_from_scylla, | |
meta={'id': object, 'partnerid': object}) | |
ids_df.compute() | |
print('compute done:', time() - ts) | |
count = ids_df.count('columns') | |
print('count:', len(count)) | |
client.close() | |
print('run time:', time() - ts) |
#!/usr/bin/env python3 | |
from time import time | |
import dask.dataframe as dd | |
import pandas as pd | |
import pyarrow.parquet as pq | |
from cassandra.cluster import Cluster | |
from cassandra.concurrent import execute_concurrent | |
from cassandra.policies import ConstantReconnectionPolicy | |
from cassandra.query import tuple_factory | |
from dask.distributed import Client | |
from hdfs3 import HDFileSystem | |
def get_df_from_scylla(pandas_df): | |
ts = time() | |
cluster = Cluster( | |
['scylla-node-1', 'scylla-node-2', 'scylla-node-3'], | |
connect_timeout=300, | |
executor_threads=2, | |
control_connection_timeout=None, | |
protocol_version=4, | |
reconnection_policy=ConstantReconnectionPolicy( | |
delay=1, max_attempts=300), ) | |
session = cluster.connect() | |
session.default_timeout = 300 | |
session.row_factory = tuple_factory | |
session.set_keyspace('test_keyspace') | |
select_partnerid = session.prepare( | |
"SELECT id, partnerid FROM partner_id_match_400M_rows WHERE id = ?") | |
ids = set() | |
statements_and_params = [] | |
for id in pandas_df['id']: | |
if id in ids: | |
continue | |
ids.add(id) | |
statements_and_params.append((select_partnerid, (id, ))) | |
concurrent = execute_concurrent( | |
session, | |
statements_and_params, | |
concurrency=512, | |
raise_on_first_error=True, | |
results_generator=True) | |
print('waiting for', len(statements_and_params), 'concurrent queries') | |
results = [] | |
for (success, result) in concurrent: | |
if not success: | |
raise Exception('something bad happened', success, result) | |
else: | |
for r in result: | |
results.append(r) | |
print('creating dataframe') | |
pandas_df = pandas_df.merge( | |
pd.DataFrame(results, columns=['id', 'partnerid'], dtype=object), | |
on='id') | |
cluster.shutdown() | |
print('run time:', time() - ts) | |
print('') | |
return pandas_df | |
def run(ts): | |
client = Client('dask-scheduler.fqdn:8786') | |
client.restart() | |
nworkers = len(client.ncores()) | |
print(nworkers, 'workers available') | |
hdfs = HDFileSystem() | |
with hdfs.open('/path/to/population_10M_rows.parquet') as f: | |
df = pq.read_table(f, columns=['id'], nthreads=2) | |
ids = df.column(0) | |
ids_dd = dd.from_pandas( | |
pd.DataFrame(ids.to_pandas(), columns=['id'], dtype=object), | |
npartitions=nworkers) | |
print('load parquet done:', time() - ts) | |
print(ids_dd.npartitions, 'npartitions') | |
ids_dd = ids_dd.map_partitions(get_df_from_scylla, | |
meta={'id': object, 'partnerid': object}) | |
ids_dd.compute() | |
print('compute done:', time() - ts) | |
count = ids_dd.count('columns') | |
print('count:', len(count)) | |
client.close() | |
if __name__ == '__main__': | |
ts = time() | |
run(ts) | |
print('run time:', time() - ts) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment