Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Blog post: Evaluating ScyllaDB for production 2/2
#!/usr/bin/env python3.5
from time import time
import dask.dataframe as dd
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.concurrent import execute_concurrent
from cassandra.policies import ConstantReconnectionPolicy
from cassandra.query import tuple_factory
from dask.distributed import Client
def get_df_from_scylla(pandas_df):
ts = time()
cluster = Cluster(
['scylla-node-1', 'scylla-node-2', 'scylla-node-3'],
connect_timeout=300,
executor_threads=2,
control_connection_timeout=None,
protocol_version=4,
reconnection_policy=ConstantReconnectionPolicy(
delay=1, max_attempts=300), )
session = cluster.connect()
session.default_timeout = 300
session.row_factory = tuple_factory
session.set_keyspace('test_keyspace')
select_partnerid = session.prepare(
"SELECT id, partnerid FROM partner_id_match_400M_rows WHERE id = ?")
ids = set()
statements_and_params = []
for id in pandas_df['id']:
if id in ids:
continue
ids.add(id)
statements_and_params.append((select_partnerid, (id, )))
concurrent = execute_concurrent(
session,
statements_and_params,
concurrency=512,
raise_on_first_error=True,
results_generator=True)
print('waiting for', len(statements_and_params), 'concurrent queries')
results = []
for (success, result) in concurrent:
if not success:
raise Exception('something bad happened', success, result)
else:
for r in result:
results.append(r)
print('creating dataframe')
pandas_df = pandas_df.merge(
pd.DataFrame(results, columns=['id', 'partnerid'], dtype=object),
on='id')
cluster.shutdown()
print('run time:', time() - ts)
print('')
return pandas_df
if __name__ == '__main__':
ts = time()
client = Client('dask-scheduler.fqdn:8786')
client.restart()
nworkers = len(client.ncores())
print(nworkers, 'workers available')
ids_df = dd.read_sql_table(
table='population_10M_rows',
uri='hive://hive-server.fqdn:10000/test_db',
index_col='id',
columns=['id'],
npartitions=nworkers,
limits=(1, 10000000), )
print('read sql done:', time() - ts)
ids_df = ids_df.map_partitions(get_df_from_scylla,
meta={'id': object, 'partnerid': object})
ids_df.compute()
print('compute done:', time() - ts)
count = ids_df.count('columns')
print('count:', len(count))
client.close()
print('run time:', time() - ts)
#!/usr/bin/env python3
from time import time
import dask.dataframe as dd
import pandas as pd
import pyarrow.parquet as pq
from cassandra.cluster import Cluster
from cassandra.concurrent import execute_concurrent
from cassandra.policies import ConstantReconnectionPolicy
from cassandra.query import tuple_factory
from dask.distributed import Client
from hdfs3 import HDFileSystem
def get_df_from_scylla(pandas_df):
ts = time()
cluster = Cluster(
['scylla-node-1', 'scylla-node-2', 'scylla-node-3'],
connect_timeout=300,
executor_threads=2,
control_connection_timeout=None,
protocol_version=4,
reconnection_policy=ConstantReconnectionPolicy(
delay=1, max_attempts=300), )
session = cluster.connect()
session.default_timeout = 300
session.row_factory = tuple_factory
session.set_keyspace('test_keyspace')
select_partnerid = session.prepare(
"SELECT id, partnerid FROM partner_id_match_400M_rows WHERE id = ?")
ids = set()
statements_and_params = []
for id in pandas_df['id']:
if id in ids:
continue
ids.add(id)
statements_and_params.append((select_partnerid, (id, )))
concurrent = execute_concurrent(
session,
statements_and_params,
concurrency=512,
raise_on_first_error=True,
results_generator=True)
print('waiting for', len(statements_and_params), 'concurrent queries')
results = []
for (success, result) in concurrent:
if not success:
raise Exception('something bad happened', success, result)
else:
for r in result:
results.append(r)
print('creating dataframe')
pandas_df = pandas_df.merge(
pd.DataFrame(results, columns=['id', 'partnerid'], dtype=object),
on='id')
cluster.shutdown()
print('run time:', time() - ts)
print('')
return pandas_df
def run(ts):
client = Client('dask-scheduler.fqdn:8786')
client.restart()
nworkers = len(client.ncores())
print(nworkers, 'workers available')
hdfs = HDFileSystem()
with hdfs.open('/path/to/population_10M_rows.parquet') as f:
df = pq.read_table(f, columns=['id'], nthreads=2)
ids = df.column(0)
ids_dd = dd.from_pandas(
pd.DataFrame(ids.to_pandas(), columns=['id'], dtype=object),
npartitions=nworkers)
print('load parquet done:', time() - ts)
print(ids_dd.npartitions, 'npartitions')
ids_dd = ids_dd.map_partitions(get_df_from_scylla,
meta={'id': object, 'partnerid': object})
ids_dd.compute()
print('compute done:', time() - ts)
count = ids_dd.count('columns')
print('count:', len(count))
client.close()
if __name__ == '__main__':
ts = time()
run(ts)
print('run time:', time() - ts)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment