Created
February 23, 2018 15:11
Star
You must be signed in to star a gist
Blog post: Evaluating ScyllaDB for production 2/2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.5 | |
from time import time | |
import dask.dataframe as dd | |
import pandas as pd | |
from cassandra.cluster import Cluster | |
from cassandra.concurrent import execute_concurrent | |
from cassandra.policies import ConstantReconnectionPolicy | |
from cassandra.query import tuple_factory | |
from dask.distributed import Client | |
def get_df_from_scylla(pandas_df): | |
ts = time() | |
cluster = Cluster( | |
['scylla-node-1', 'scylla-node-2', 'scylla-node-3'], | |
connect_timeout=300, | |
executor_threads=2, | |
control_connection_timeout=None, | |
protocol_version=4, | |
reconnection_policy=ConstantReconnectionPolicy( | |
delay=1, max_attempts=300), ) | |
session = cluster.connect() | |
session.default_timeout = 300 | |
session.row_factory = tuple_factory | |
session.set_keyspace('test_keyspace') | |
select_partnerid = session.prepare( | |
"SELECT id, partnerid FROM partner_id_match_400M_rows WHERE id = ?") | |
ids = set() | |
statements_and_params = [] | |
for id in pandas_df['id']: | |
if id in ids: | |
continue | |
ids.add(id) | |
statements_and_params.append((select_partnerid, (id, ))) | |
concurrent = execute_concurrent( | |
session, | |
statements_and_params, | |
concurrency=512, | |
raise_on_first_error=True, | |
results_generator=True) | |
print('waiting for', len(statements_and_params), 'concurrent queries') | |
results = [] | |
for (success, result) in concurrent: | |
if not success: | |
raise Exception('something bad happened', success, result) | |
else: | |
for r in result: | |
results.append(r) | |
print('creating dataframe') | |
pandas_df = pandas_df.merge( | |
pd.DataFrame(results, columns=['id', 'partnerid'], dtype=object), | |
on='id') | |
cluster.shutdown() | |
print('run time:', time() - ts) | |
print('') | |
return pandas_df | |
if __name__ == '__main__': | |
ts = time() | |
client = Client('dask-scheduler.fqdn:8786') | |
client.restart() | |
nworkers = len(client.ncores()) | |
print(nworkers, 'workers available') | |
ids_df = dd.read_sql_table( | |
table='population_10M_rows', | |
uri='hive://hive-server.fqdn:10000/test_db', | |
index_col='id', | |
columns=['id'], | |
npartitions=nworkers, | |
limits=(1, 10000000), ) | |
print('read sql done:', time() - ts) | |
ids_df = ids_df.map_partitions(get_df_from_scylla, | |
meta={'id': object, 'partnerid': object}) | |
ids_df.compute() | |
print('compute done:', time() - ts) | |
count = ids_df.count('columns') | |
print('count:', len(count)) | |
client.close() | |
print('run time:', time() - ts) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from time import time | |
import dask.dataframe as dd | |
import pandas as pd | |
import pyarrow.parquet as pq | |
from cassandra.cluster import Cluster | |
from cassandra.concurrent import execute_concurrent | |
from cassandra.policies import ConstantReconnectionPolicy | |
from cassandra.query import tuple_factory | |
from dask.distributed import Client | |
from hdfs3 import HDFileSystem | |
def get_df_from_scylla(pandas_df): | |
ts = time() | |
cluster = Cluster( | |
['scylla-node-1', 'scylla-node-2', 'scylla-node-3'], | |
connect_timeout=300, | |
executor_threads=2, | |
control_connection_timeout=None, | |
protocol_version=4, | |
reconnection_policy=ConstantReconnectionPolicy( | |
delay=1, max_attempts=300), ) | |
session = cluster.connect() | |
session.default_timeout = 300 | |
session.row_factory = tuple_factory | |
session.set_keyspace('test_keyspace') | |
select_partnerid = session.prepare( | |
"SELECT id, partnerid FROM partner_id_match_400M_rows WHERE id = ?") | |
ids = set() | |
statements_and_params = [] | |
for id in pandas_df['id']: | |
if id in ids: | |
continue | |
ids.add(id) | |
statements_and_params.append((select_partnerid, (id, ))) | |
concurrent = execute_concurrent( | |
session, | |
statements_and_params, | |
concurrency=512, | |
raise_on_first_error=True, | |
results_generator=True) | |
print('waiting for', len(statements_and_params), 'concurrent queries') | |
results = [] | |
for (success, result) in concurrent: | |
if not success: | |
raise Exception('something bad happened', success, result) | |
else: | |
for r in result: | |
results.append(r) | |
print('creating dataframe') | |
pandas_df = pandas_df.merge( | |
pd.DataFrame(results, columns=['id', 'partnerid'], dtype=object), | |
on='id') | |
cluster.shutdown() | |
print('run time:', time() - ts) | |
print('') | |
return pandas_df | |
def run(ts): | |
client = Client('dask-scheduler.fqdn:8786') | |
client.restart() | |
nworkers = len(client.ncores()) | |
print(nworkers, 'workers available') | |
hdfs = HDFileSystem() | |
with hdfs.open('/path/to/population_10M_rows.parquet') as f: | |
df = pq.read_table(f, columns=['id'], nthreads=2) | |
ids = df.column(0) | |
ids_dd = dd.from_pandas( | |
pd.DataFrame(ids.to_pandas(), columns=['id'], dtype=object), | |
npartitions=nworkers) | |
print('load parquet done:', time() - ts) | |
print(ids_dd.npartitions, 'npartitions') | |
ids_dd = ids_dd.map_partitions(get_df_from_scylla, | |
meta={'id': object, 'partnerid': object}) | |
ids_dd.compute() | |
print('compute done:', time() - ts) | |
count = ids_dd.count('columns') | |
print('count:', len(count)) | |
client.close() | |
if __name__ == '__main__': | |
ts = time() | |
run(ts) | |
print('run time:', time() - ts) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment