Skip to content

Instantly share code, notes, and snippets.

@gumdropsteve
Last active February 3, 2020 23:04
Show Gist options
  • Save gumdropsteve/a069b48112ca7ddb1791a3069f8dc4d6 to your computer and use it in GitHub Desktop.
Save gumdropsteve/a069b48112ca7ddb1791a3069f8dc4d6 to your computer and use it in GitHub Desktop.
Test pool vs no pool performance with BlazingSQL
import os
import urllib
from blazingsql import BlazingContext
# set number of times to run each query
n_runs = 3
# let user know
print(f'nruns = {n_runs}')
'''CHECK FOR DATA
'''
# tag base url, file name & relative data folder
base_url = 'https://blazingsql-colab.s3.amazonaws.com/netflow_data/'
fn = 'nf-chunk2.csv'
# do we already have the data?
if not os.path.isfile(fn):
# no, so let us know then download it
print(f'Downloading {base_url + fn} to {fn}')
urllib.request.urlretrieve(base_url + fn, fn)
'''CLEAR BSQL MEMORY
'''
# remove existing RAL & algebra logs
try:
os.system('rm RAL.0.log')
except:
print('no RAL log to remove')
try:
os.system('rm algebra.log')
except:
print('no algebra log to remove')
'''START TESTING
'''
# connect to BlazingSQL
bc = BlazingContext(pool=False)
# determine path to data
path = os.getcwd() + '/nf-chunk2.csv'
# create table
bc.create_table('netflow', path, header=0)
# define the query
query = '''
SELECT
a.firstSeenSrcIp as source,
a.firstSeenDestIp as destination,
count(a.firstSeenDestPort) as targetPorts,
SUM(a.firstSeenSrcTotalBytes) as bytesOut,
SUM(a.firstSeenDestTotalBytes) as bytesIn,
SUM(a.durationSeconds) as durationSeconds,
MIN(parsedDate) as firstFlowDate,
MAX(parsedDate) as lastFlowDate,
COUNT(*) as attemptCount
FROM
netflow a
GROUP BY
a.firstSeenSrcIp,
a.firstSeenDestIp
'''
# query the table 3 times
for i in range(n_runs):
bc.sql(query)
# This query against the logs will tell you the average execution time for every query.
log_query = """
SELECT
MAX(end_time) AS end_time, SUM(query_duration)/COUNT(query_duration) AS avg_time,
MIN(query_duration) AS min_time, MAX(query_duration) AS max_time, COUNT(query_duration) AS num_times,
relational_algebra
FROM (
SELECT
times.end_time as end_time, times.query_id, times.avg_time,
times.max_time as query_duration, times.min_time, ral.relational_algebra as relational_algebra
FROM (
SELECT
query_id, MAX(log_time) AS end_time, SUM(duration)/COUNT(duration) AS avg_time,
MIN(duration) AS min_time, MAX(duration) AS max_time
FROM
bsql_logs
WHERE
info = 'Query Execution Done'
GROUP BY
query_id ) AS times
INNER JOIN (
SELECT
query_id,
SUBSTRING(info, 13, 2000) AS relational_algebra
FROM
bsql_logs
WHERE
info LIKE 'Query Start%'
GROUP BY
query_id, info ) AS ral
ON
times.query_id = ral.query_id
ORDER BY
times.end_time DESC) AS temp GROUP BY relational_algebra
"""
# save results to CSV
bc.log(log_query).to_csv('false_pool_runtimes.csv', index=False)
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Pool vs No Pool\n",
"In this notebook we will compare BlazingSQL performance with and without memory pool."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Imports"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import numpy as np\n",
"from blazingsql import BlazingContext\n",
"import matplotlib.pyplot as plt; plt.rcdefaults()"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"BlazingContext ready\n"
]
},
{
"data": {
"text/plain": [
"<pyblazing.apiv2.context.BlazingTable at 0x7fdafb703d30>"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# identify path to data\n",
"path = os.getcwd() \n",
"\n",
"# start up BlazingSQL\n",
"bc = BlazingContext()\n",
"\n",
"# create tables for pool & no pool performance\n",
"bc.create_table('yes_pool', path + '/true_pool_runtimes.csv', header=0)\n",
"bc.create_table('no_pool', path + '/false_pool_runtimes.csv', header=0)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Visualize Results"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Pool is 24.6350% faster\n"
]
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 640x480 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"# focus results to runtime\n",
"n_pool_performance = list(bc.sql('select avg_time from no_pool')['avg_time']) \n",
"y_pool_performance = list(bc.sql('select avg_time from yes_pool')['avg_time']) \n",
"\n",
"# set bar chart details\n",
"objects = (['Netflow Query'])\n",
"y_pos = np.arange(len(objects))\n",
"bar_width = 0.05\n",
"\n",
"# horizontal bar chart of query performances\n",
"pool_true = plt.barh(y_pos+bar_width*1.5, y_pool_performance, bar_width, align='center', \n",
" label=\"Default (pool=True)\", color='seagreen', alpha=0.9)\n",
"\n",
"pool_false = plt.barh(y_pos, n_pool_performance, bar_width, align='center', \n",
" label=\"No Pool (pool=False)\", color='darkslateblue', alpha=0.9)\n",
"\n",
"# format chart\n",
"plt.yticks(y_pos, objects)\n",
"plt.xlabel('Runtime (ms)')\n",
"plt.title('Netflow Pool vs No Pool', size=15)\n",
"plt.legend()\n",
"\n",
"# how much of a speedup have we achieved\n",
"speed_up = ((sum(n_pool_performance) / sum(y_pool_performance))-1)*100\n",
"print(f'Pool is {str(speed_up)[:7]}% faster')\n",
"\n",
"# display\n",
"plt.show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.7"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
import os
import urllib
from blazingsql import BlazingContext
# set number of times to run each query
n_runs = 3
# let user know
print(f'nruns = {n_runs}')
'''CHECK FOR DATA
'''
# tag base url, file name & relative data folder
base_url = 'https://blazingsql-colab.s3.amazonaws.com/netflow_data/'
fn = 'nf-chunk2.csv'
# do we already have the data?
if not os.path.isfile(fn):
# no, so let us know then download it
print(f'Downloading {base_url + fn} to {fn}')
urllib.request.urlretrieve(base_url + fn, fn)
'''CLEAR BSQL MEMORY
'''
# remove existing RAL & algebra logs
try:
os.system('rm RAL.0.log')
except:
print('no RAL log to remove')
try:
os.system('rm algebra.log')
except:
print('no algebra log to remove')
'''START TESTING
'''
# connect to BlazingSQL
bc = BlazingContext(pool=True)
# determine path to data
path = os.getcwd() + '/nf-chunk2.csv'
# create table
bc.create_table('netflow', path, header=0)
# define the query
query = '''
SELECT
a.firstSeenSrcIp as source,
a.firstSeenDestIp as destination,
count(a.firstSeenDestPort) as targetPorts,
SUM(a.firstSeenSrcTotalBytes) as bytesOut,
SUM(a.firstSeenDestTotalBytes) as bytesIn,
SUM(a.durationSeconds) as durationSeconds,
MIN(parsedDate) as firstFlowDate,
MAX(parsedDate) as lastFlowDate,
COUNT(*) as attemptCount
FROM
netflow a
GROUP BY
a.firstSeenSrcIp,
a.firstSeenDestIp
'''
# query the table 3 times
for i in range(n_runs):
bc.sql(query)
# This query against the logs will tell you the average execution time for every query.
log_query = """
SELECT
MAX(end_time) AS end_time, SUM(query_duration)/COUNT(query_duration) AS avg_time,
MIN(query_duration) AS min_time, MAX(query_duration) AS max_time, COUNT(query_duration) AS num_times,
relational_algebra
FROM (
SELECT
times.end_time as end_time, times.query_id, times.avg_time,
times.max_time as query_duration, times.min_time, ral.relational_algebra as relational_algebra
FROM (
SELECT
query_id, MAX(log_time) AS end_time, SUM(duration)/COUNT(duration) AS avg_time,
MIN(duration) AS min_time, MAX(duration) AS max_time
FROM
bsql_logs
WHERE
info = 'Query Execution Done'
GROUP BY
query_id ) AS times
INNER JOIN (
SELECT
query_id,
SUBSTRING(info, 13, 2000) AS relational_algebra
FROM
bsql_logs
WHERE
info LIKE 'Query Start%'
GROUP BY
query_id, info ) AS ral
ON
times.query_id = ral.query_id
ORDER BY
times.end_time DESC) AS temp GROUP BY relational_algebra
"""
# save results to CSV
bc.log(log_query).to_csv('true_pool_runtimes.csv', index=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment