Skip to content

Instantly share code, notes, and snippets.

@gumdropsteve
Last active February 3, 2020 23:04
Show Gist options
  • Save gumdropsteve/a069b48112ca7ddb1791a3069f8dc4d6 to your computer and use it in GitHub Desktop.
Save gumdropsteve/a069b48112ca7ddb1791a3069f8dc4d6 to your computer and use it in GitHub Desktop.
Test pool vs no pool performance with BlazingSQL
import os
import urllib
from blazingsql import BlazingContext
# set number of times to run each query
n_runs = 3
# let user know
print(f'nruns = {n_runs}')
'''CHECK FOR DATA
'''
# tag base url, file name & relative data folder
base_url = 'https://blazingsql-colab.s3.amazonaws.com/netflow_data/'
fn = 'nf-chunk2.csv'
# do we already have the data?
if not os.path.isfile(fn):
# no, so let us know then download it
print(f'Downloading {base_url + fn} to {fn}')
urllib.request.urlretrieve(base_url + fn, fn)
'''CLEAR BSQL MEMORY
'''
# remove existing RAL & algebra logs
try:
os.system('rm RAL.0.log')
except:
print('no RAL log to remove')
try:
os.system('rm algebra.log')
except:
print('no algebra log to remove')
'''START TESTING
'''
# connect to BlazingSQL
bc = BlazingContext(pool=False)
# determine path to data
path = os.getcwd() + '/nf-chunk2.csv'
# create table
bc.create_table('netflow', path, header=0)
# define the query
query = '''
SELECT
a.firstSeenSrcIp as source,
a.firstSeenDestIp as destination,
count(a.firstSeenDestPort) as targetPorts,
SUM(a.firstSeenSrcTotalBytes) as bytesOut,
SUM(a.firstSeenDestTotalBytes) as bytesIn,
SUM(a.durationSeconds) as durationSeconds,
MIN(parsedDate) as firstFlowDate,
MAX(parsedDate) as lastFlowDate,
COUNT(*) as attemptCount
FROM
netflow a
GROUP BY
a.firstSeenSrcIp,
a.firstSeenDestIp
'''
# query the table 3 times
for i in range(n_runs):
bc.sql(query)
# This query against the logs will tell you the average execution time for every query.
log_query = """
SELECT
MAX(end_time) AS end_time, SUM(query_duration)/COUNT(query_duration) AS avg_time,
MIN(query_duration) AS min_time, MAX(query_duration) AS max_time, COUNT(query_duration) AS num_times,
relational_algebra
FROM (
SELECT
times.end_time as end_time, times.query_id, times.avg_time,
times.max_time as query_duration, times.min_time, ral.relational_algebra as relational_algebra
FROM (
SELECT
query_id, MAX(log_time) AS end_time, SUM(duration)/COUNT(duration) AS avg_time,
MIN(duration) AS min_time, MAX(duration) AS max_time
FROM
bsql_logs
WHERE
info = 'Query Execution Done'
GROUP BY
query_id ) AS times
INNER JOIN (
SELECT
query_id,
SUBSTRING(info, 13, 2000) AS relational_algebra
FROM
bsql_logs
WHERE
info LIKE 'Query Start%'
GROUP BY
query_id, info ) AS ral
ON
times.query_id = ral.query_id
ORDER BY
times.end_time DESC) AS temp GROUP BY relational_algebra
"""
# save results to CSV
bc.log(log_query).to_csv('false_pool_runtimes.csv', index=False)
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
import os
import urllib
from blazingsql import BlazingContext
# set number of times to run each query
n_runs = 3
# let user know
print(f'nruns = {n_runs}')
'''CHECK FOR DATA
'''
# tag base url, file name & relative data folder
base_url = 'https://blazingsql-colab.s3.amazonaws.com/netflow_data/'
fn = 'nf-chunk2.csv'
# do we already have the data?
if not os.path.isfile(fn):
# no, so let us know then download it
print(f'Downloading {base_url + fn} to {fn}')
urllib.request.urlretrieve(base_url + fn, fn)
'''CLEAR BSQL MEMORY
'''
# remove existing RAL & algebra logs
try:
os.system('rm RAL.0.log')
except:
print('no RAL log to remove')
try:
os.system('rm algebra.log')
except:
print('no algebra log to remove')
'''START TESTING
'''
# connect to BlazingSQL
bc = BlazingContext(pool=True)
# determine path to data
path = os.getcwd() + '/nf-chunk2.csv'
# create table
bc.create_table('netflow', path, header=0)
# define the query
query = '''
SELECT
a.firstSeenSrcIp as source,
a.firstSeenDestIp as destination,
count(a.firstSeenDestPort) as targetPorts,
SUM(a.firstSeenSrcTotalBytes) as bytesOut,
SUM(a.firstSeenDestTotalBytes) as bytesIn,
SUM(a.durationSeconds) as durationSeconds,
MIN(parsedDate) as firstFlowDate,
MAX(parsedDate) as lastFlowDate,
COUNT(*) as attemptCount
FROM
netflow a
GROUP BY
a.firstSeenSrcIp,
a.firstSeenDestIp
'''
# query the table 3 times
for i in range(n_runs):
bc.sql(query)
# This query against the logs will tell you the average execution time for every query.
log_query = """
SELECT
MAX(end_time) AS end_time, SUM(query_duration)/COUNT(query_duration) AS avg_time,
MIN(query_duration) AS min_time, MAX(query_duration) AS max_time, COUNT(query_duration) AS num_times,
relational_algebra
FROM (
SELECT
times.end_time as end_time, times.query_id, times.avg_time,
times.max_time as query_duration, times.min_time, ral.relational_algebra as relational_algebra
FROM (
SELECT
query_id, MAX(log_time) AS end_time, SUM(duration)/COUNT(duration) AS avg_time,
MIN(duration) AS min_time, MAX(duration) AS max_time
FROM
bsql_logs
WHERE
info = 'Query Execution Done'
GROUP BY
query_id ) AS times
INNER JOIN (
SELECT
query_id,
SUBSTRING(info, 13, 2000) AS relational_algebra
FROM
bsql_logs
WHERE
info LIKE 'Query Start%'
GROUP BY
query_id, info ) AS ral
ON
times.query_id = ral.query_id
ORDER BY
times.end_time DESC) AS temp GROUP BY relational_algebra
"""
# save results to CSV
bc.log(log_query).to_csv('true_pool_runtimes.csv', index=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment