Created
April 8, 2022 17:38
-
-
Save alanbernstein/03b0710fadcb19acc5b8a60c4b268e8d to your computer and use it in GitHub Desktop.
PQL+CLI - a simple local pilosa client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import json | |
import random | |
import requests | |
import time | |
#from plotly.offline import plot | |
#import plotly.graph_objs as go | |
import q2 | |
# from pprint import pprint | |
from panda import pm | |
""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-I", "--interactive", help="interactive mode", default=True) | |
parser.add_argument("-b", "--host", help="set pilosa host", default="localhost:10101") | |
parser.add_argument("-i", "--index", help="set pilosa index", default="i") | |
parser.add_argument("-f", "--field", help="set pilosa field", default="f") | |
parser.add_argument("-q", "--query", help="execute query") | |
parser.add_argument("-d", "--delete", action='store_true', help="execute query") | |
parser.add_argument("-s", "--schema", action='store_true', help="get schema") | |
parser.add_argument("-g", "--generate", help="generate data") | |
parser.add_argument("-v", "--visualize", help="visualize field contents") | |
args = parser.parse_args() | |
""" | |
default_query_index = 'i' | |
plot_args = dict( | |
# auto_open=True, | |
# output_type='div', | |
show_link=False, | |
config={'displayModeBar': False} | |
) | |
from bunch import Bunch | |
args = Bunch({ | |
'host': 'localhost:10101', | |
'schema': True, | |
'index': 'i', | |
}) | |
@pm | |
def main(): | |
print('cli args: %s' % args) | |
args.host = normalize_host(args.host) | |
#if args.interactive: | |
# pass | |
if args.schema: | |
# TODO: split index list on comma | |
print('get schema') | |
index_filter = None | |
if args.index: | |
if args.index == 'i': | |
# can't get schema for just index=i | |
# TODO figure out a better way | |
pass | |
else: | |
index_filter = [args.index] | |
data = get_schema() | |
print_schema_filtered(data, index_filter) | |
elif args.query: | |
print('run query') | |
url = '%s/index/%s/query' % (args.host, args.index) | |
print(url) | |
print(args.query) | |
resp = requests.post(url, data=args.query) | |
resp_json = json.loads(resp.content) | |
print(resp.content) | |
# formatted query results | |
if args.query.startswith('TopN'): | |
print('TopN query formatted:') | |
total = 0 | |
print('%10s id' % ('count')) | |
for pair in resp_json['results'][0]: | |
print('%10d %s' % (pair['count'], pair['id'])) | |
total += pair['count'] | |
print('%d total' % total) | |
else: | |
print(json.dumps(resp_json, indent=2)) | |
elif args.generate: | |
if args.generate == 'bsi': | |
print('generating BSI data') | |
generate_bsi_data(args.index, 'b') | |
if args.generate == 'uniform': | |
print('generating uniform binary data') | |
generate_uniform_data(args.index, args.field) | |
if args.generate == 'zipf': | |
print('generating zipfian binary data') | |
# TODO | |
if args.generate == 'q2': | |
print('generating q2 data') | |
generate_q2_data(args.index, args.field) | |
elif args.visualize: | |
if args.visualize in [None, 'hist']: | |
print('visualize - histogram') | |
histogram() | |
elif args.visualize == 'image': | |
image() | |
elif args.delete: | |
print('cli args: %s' % args) | |
if args.index: | |
url = '%s/index/%s' % (args.host, args.index) | |
resp = requests.delete(url) | |
if args.field: | |
url = '%s/index/%s/field/%s' % (args.host, args.index) | |
resp = requests.delete(url) | |
else: | |
# print argparse usage | |
pass | |
def image(row_max='first-empty'): | |
url = '%s/index/%s/query' % (args.host, args.index) | |
row = 0 | |
bitmaps = [] | |
while True: | |
q = 'Bitmap(frame=%s, rowID=%d)' % (args.frame, row) | |
resp = requests.post(url, data=q) | |
resp_json = json.loads(resp.content) | |
bitmap = resp_json['results'][0]['bits'] | |
bitmaps.append(bitmap) | |
if row_max == 'first-empty' and len(bitmap) == 0 or row == row_max: | |
break | |
row += 1 | |
heatmap_args = {} | |
data = go.Heatmap(z=bitmaps, **heatmap_args) | |
def histogram(): | |
url = '%s/index/%s/query' % (args.host, args.index) | |
q = 'TopN(frame=%s)' % args.frame | |
resp = requests.post(url, data=q) | |
resp_json = json.loads(resp.content) | |
counts, rowIDs = zip(*[x.values() for x in resp_json['results'][0]]) | |
bar = [go.Bar(x=rowIDs, y=counts)] | |
layout = go.Layout( | |
title='TopN histogram %s/%s' % (args.index, args.frame), | |
xaxis=dict(title='rowID'), | |
yaxis=dict(title='count'), | |
showlegend=False, | |
) | |
fig = go.Figure(data=bar, layout=layout) | |
plot(fig, filename='pilosa-vis.html', **plot_args) | |
def generate_q2_data(index, frame): | |
num_cols = 1000 | |
for col in range(num_cols): | |
rec = q2.generate_record() | |
indexed = q2.map_record(rec) | |
def generate_uniform_data(index, frame, clear_frame=True): | |
if clear_frame: | |
delete_frame(index, frame) | |
create_frame(index, frame) | |
num_cols = 50 | |
num_rows = 50 | |
density = 0.1 | |
count = 0 | |
for row in range(num_rows): | |
q = '' | |
for col in range(num_cols): | |
if random.random() < density: | |
q += 'SetBit(frame=%s, rowID=%d, columnID=%d)' % (frame, row, col) | |
count += 1 | |
resp = requests.post('%s/index/%s/query' % (args.host, index), data=q) | |
resp_json = json.loads(resp.content) | |
print(resp.content.strip()) | |
print('set %d bits with probability %f in %d x %d grid %s/%s' % (count, density, num_rows, num_cols, index, frame)) | |
def generate_bsi_data(index, frame): | |
# create_frame_with_fields() | |
create_frame(index, frame, bsi=True) | |
fields = ['a', 'b', 'c'] | |
for f in fields: | |
create_field(index, frame, f) | |
num_cols = 50 | |
for col in range(num_cols): | |
for f in fields: | |
val = random.randint(0, 1000) | |
q = 'SetFieldValue(columnID=%d, frame=b, %s=%d)' % (col, f, val) | |
resp = requests.post('%s/index/%s/query' % (args.host, index), data=q) | |
print('wrote %d x %d random values to fields %s' % (len(fields), num_cols, fields)) | |
def delete_frame(index, frame): | |
url = '%s/index/%s/frame/%s' % (args.host, index, frame) | |
resp = requests.delete(url) | |
if resp.status_code == 200: | |
print('deleted frame %s/%s' % (index, frame)) | |
else: | |
print(resp.content) | |
def create_frame(index, frame, bsi=False): | |
data = {} | |
ftype = '' | |
if bsi: | |
ftype = 'bsi ' | |
data = {"options": {"rangeEnabled": True}} | |
url = '%s/index/%s/frame/%s' % (args.host, index, frame) | |
resp = requests.post(url, json=data) | |
if resp.status_code == 200: | |
m = 'created %sframe %s/%s' % (ftype, index, frame) | |
print(m) | |
else: | |
print(resp.content) | |
def create_field(index, frame, field, minv=0, maxv=1000): | |
data = {'type': 'int', 'min': minv, 'max': maxv} | |
url = '%s/index/%s/frame/%s/field/%s' % (args.host, index, frame, field) | |
resp = requests.post(url, json=data) | |
if resp.status_code == 200: | |
print('created field %s/%s/%s' % (index, frame, field)) | |
else: | |
print(resp.content) | |
def create_frame_with_fields(index, frame): | |
data = { | |
"options": { | |
"rangeEnabled": True, | |
"fields": [ | |
{"name": "a", "type": "int", "min": 0, "max": 1000}, | |
{"name": "b", "type": "int", "min": 0, "max": 1000}, | |
{"name": "c", "type": "int", "min": 0, "max": 1000}, | |
] | |
} | |
} | |
url = '%s/index/%s/frame/%s' % (args.host, index, frame) | |
resp = requests.post(url, json=data) | |
if resp.status_code == 200: | |
print('created fields a,b,c in %s/%s' % (index, frame)) | |
else: | |
print(resp.content) | |
def print_schema_filtered(data, index_filter=None): | |
if index_filter: | |
print('print schema: %s' % ' '.join(index_filter)) | |
else: | |
print('print schema') | |
if data['indexes'] is None: | |
print('no indexes!') | |
return | |
print('<index>\n <field>\n ...\n\n') | |
for index in data['indexes']: | |
if index_filter and index['name'] not in index_filter: | |
continue | |
print(index['name']) | |
if 'fields' not in index or not index['fields']: | |
print(' <no fields>') | |
continue | |
for field in index['fields']: | |
# two different schema formats... | |
# show some of the views | |
fmt = 0 | |
if 'views' in field: | |
fmt = 1 | |
if 'options' in field and 'fields' in field['options']: | |
fmt = 2 | |
if 'options' in field and 'type' in field['options']: | |
fmt = 3 | |
hide_flag = False | |
n = 0 | |
if fmt == 0: | |
# no fields in frame | |
continue | |
elif fmt == 1: | |
# old format | |
print(' %s' % field['name']) | |
for view in field['views']: | |
if view['name'] == 'standard': | |
continue | |
n += 1 | |
if n < 5 or n > len(field['views']) - 6: | |
print(' %s' % view['name']) | |
else: | |
if not hide_flag: | |
print(' ... (%d hidden)' % (len(field['views']) - 20)) | |
hide_flag = True | |
elif fmt == 2: | |
# i think this is cluster-resize format | |
print(' %s' % field['name']) | |
fields = field['options']['fields'] | |
for field in fields: | |
n += 1 | |
if n < 5 or n > len(fields) - 6: | |
print(' %s' % field['name']) | |
else: | |
if not hide_flag: | |
print(' ... (%d hidden)' % (len(fields) - 20)) | |
hide_flag = True | |
elif fmt == 3: | |
# TODO: only print '(keys)' spacing if necessary... | |
keystr = ['', '(keys)'][field['options']['keys']] | |
print(' %5s %6s :: %s' % (field['options']['type'], keystr, field['name'])) | |
# import ipdb; ipdb.set_trace() | |
def get_schema(): | |
print(args.host) | |
t0 = time.time() | |
resp = requests.get('%s/schema' % args.host) | |
ss = json.loads(resp.content) | |
print('got schema in %f sec' % (time.time() - t0)) | |
return ss | |
def normalize_host(host): | |
# TODO: generalize | |
if not host.startswith('http'): | |
host = 'http://' + host | |
return host | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment