Created
July 12, 2019 19:24
-
-
Save jctanner/e6d8e60fc3988853534865129057042c to your computer and use it in GitHub Desktop.
jq database thingy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# http://stackoverflow.com/a/14299004 | |
import calendar | |
import datetime | |
import json | |
#import logging | |
import os | |
import re | |
from multiprocessing.dummy import Pool as ThreadPool | |
from pprint import pprint | |
from graff.utils.systemtools import run_command | |
from graff.utils.systemtools import run_find | |
from graff.jqdb.reducers import JQReducers | |
''' | |
logging.basicConfig( | |
level=logging.INFO, | |
format='(%(threadName)-9s) %(message)s', | |
) | |
''' | |
from logzero import logger | |
logging = logger | |
HOSTS = [ | |
'ssh -i ~/.ssh/id_ecdsa jtanner@g73.lab.net' | |
] | |
HOSTS = [] | |
def threaded_cmd(cmd): | |
logging.info(cmd) | |
(rc, so, se) = run_command(cmd) | |
if rc != 0: | |
logging.error(rc) | |
logging.error(se) | |
logger.info('rc: %s' % rc) | |
logger.info('so: %s' % len(so)) | |
logger.info('se: %s' % len(se)) | |
if rc != 0: | |
if 'actor.login' in cmd: | |
newcmd = cmd.replace('actor.login', 'actor') | |
(rc, so, se) = threaded_cmd(newcmd) | |
if rc != 0: | |
with open('/tmp/errors.log', 'a') as f: | |
f.write(cmd + '\n') | |
return (rc, so, se) | |
class JQDatabase(object): | |
'''A database-like thing with gharchive data and jq''' | |
def __init__(self, cachedir=None): | |
self.cachedir = cachedir | |
self.cachefiles = {} | |
self.build_cache_files() | |
def build_cache_files(self): | |
'''Index all the files by date''' | |
files = run_find(self.cachedir + '/', ftype='f') | |
for f in files: | |
if not f.endswith('.gz'): | |
continue | |
# make a timestamp | |
fname = os.path.basename(f) | |
fdate = fname.replace('.json.gz', '') | |
fdate = datetime.datetime.strptime(fdate, '%Y-%m-%d-%H') | |
# inject into dict | |
self.cachefiles[f] = fdate | |
def get_file_content(self, filename, repository=None): | |
if not repository: | |
cmd = "zcat %s | jq -a -c -M '.'" % filename | |
(rc, so, se) = run_command(cmd) | |
jdata = self.splitjsonrecords(so) | |
else: | |
if isinstance(filename, list): | |
jdata = self.select(repo=repository, queryfiles=filename) | |
else: | |
jdata = self.select(repo=repository, queryfiles=[filename]) | |
return jdata | |
@property | |
def issue_and_pr_actions(self): | |
jquery = '.payload.action == "opened"' | |
jquery += ' or .payload.action == "reopened"' | |
jquery += ' or .payload.action == "closed"' | |
jquery += ' or .payload.action == "merged"' | |
return jquery | |
def rawquery(self, jquery, start, stop, limit=None, queryfiles=[], mapper_cmd=None, threads=4): | |
'''Run arbitrary query across range of files''' | |
if not queryfiles and (start and stop): | |
queryfiles = self.get_query_files(start, stop) | |
if limit: | |
queryfiles = queryfiles[:limit] | |
pprint(queryfiles) | |
#cmds = ['zcat %s | jq %s' % (x, jquery) for x in queryfiles] | |
# timeline events had a 'repository' key instead of a 'repo' key | |
shim = 'sed \'s/"repository":/"repo":/g\'' | |
cmds = ['zcat %s | %s | jq -a -c -M %s' % (x, shim, jquery) for x in queryfiles] | |
# reduce/map the final outputs | |
if mapper_cmd: | |
cmds = [x + ' | ' + mapper_cmd for x in cmds] | |
if not HOSTS: | |
pool = ThreadPool(threads) | |
outputs = pool.map(threaded_cmd, cmds) | |
pool.close() | |
pool.join() | |
else: | |
chunks = [] | |
chunksize = (len(cmds) / 4) * 3 | |
while cmds: | |
if len(cmds) < chunksize: | |
#chunks[-1] += cmds | |
chunks.append(cmds) | |
cmds = [] | |
else: | |
chunks.append(cmds[:chunksize]) | |
cmds = cmds[chunksize:] | |
#import epdb; epdb.st() | |
# ssh -i ~/ssh/id_ecdsa jtanner@g73.lab.net | |
# "zcat /data/ssd/gharchive/2015/01/01/2015-01-01-17.json.gz | | |
# jq 'select(.repo.name == \"ansible/ansible\")'". | |
for idx,x in enumerate(chunks[1]): | |
x = x.replace("\\'", '\\\"') | |
x = x.replace('"', '\\\"') | |
x = '%s "%s"' % (HOSTS[0], x) | |
#(rc, so, se) = run_command(x) | |
#if so: | |
# import epdb; epdb.st() | |
chunks[1][idx] = x | |
#import sys; sys.exit(1) | |
lpool = ThreadPool(4) | |
rpool = ThreadPool(4) | |
loutputs = lpool.imap_unordered(threaded_cmd, chunks[0]) | |
routputs = rpool.imap_unordered(threaded_cmd, chunks[1]) | |
lpool.close() | |
rpool.close() | |
lpool.join() | |
rpool.join() | |
outputs = [] | |
for lo in loutputs: | |
outputs.append(lo) | |
for ro in routputs: | |
outputs.append(ro) | |
#import epdb; epdb.st() | |
results = [] | |
for output in outputs: | |
if mapper_cmd: | |
jdata = json.loads(output[1]) | |
else: | |
jdata = self.splitjsonrecords(output[1]) | |
if isinstance(jdata, list): | |
results += jdata | |
elif isinstance(jdata, dict): | |
results.append(jdata) | |
#import epdb; epdb.st() | |
return results | |
def select(self, namespace=None, repo=None, actor=None, eventtype=None, actions=None, | |
start=None, stop=None, queryfiles=[], mapper_cmd=None, reducer=None): | |
'''Mimic select across the cached files''' | |
# start+stop == YYYY-MM-DD | |
if not queryfiles: | |
queryfiles = self.get_query_files(start, stop) | |
logger.debug('# QUERYFILES ...') | |
pprint(queryfiles) | |
jquery = '.' | |
params = [] | |
if repo: | |
if isinstance(repo, str): | |
repo = [repo] | |
rparams = [] | |
for r in repo: | |
rparams.append(".repo.name == \"%s\"" % r) | |
rparams.append(".repo.url == \"https://github.com/%s\"" % r) | |
params.append('(' + ' or '.join(rparams) + ')') | |
if actor: | |
if not isinstance(actor, list): | |
params.append(".actor.login == \"%s\"" % actor) | |
else: | |
aparams = [] | |
for _actor in actor: | |
aparams.append(".actor.login == \"%s\"" % _actor) | |
params.append('(' + ' or '.join(aparams) + ')') | |
if eventtype: | |
if isinstance(eventtype, str): | |
params.append(".type == \"%s\"" % eventtype) | |
elif isinstance(eventtype, list): | |
rparams = [] | |
for r in eventtype: | |
rparams.append(".type == \"%s\"" % r) | |
params.append('(' + ' or '.join(rparams) + ')') | |
# opened,closed,merged,etc | |
if actions: | |
if isinstance(actions, str): | |
params.append(".payload.action == \"%s\"" % actions) | |
elif isinstance(actions, list): | |
rparams = [] | |
for r in actions: | |
rparams.append(".payload.action == \"%s\"" % r) | |
params.append('(' + ' or '.join(rparams) + ')') | |
if params: | |
#jquery = '\'select(' + ' and '.join(params) + ')\'' | |
jquery = '\'select(' + ' and '.join(params) + ')\'' | |
if reducer: | |
if not isinstance(reducer, list): | |
if isinstance(JQReducers[reducer], dict): | |
jquery = jquery.rstrip("'") | |
jquery += ' | %s\'' % JQReducers[reducer] | |
else: | |
jquery += ' | %s' % JQReducers[reducer]['cmd'] | |
import epdb; epdb.st() | |
else: | |
noncmds = [x for x in reducer if not isinstance(JQReducers[x], dict)] | |
cmds = [x for x in reducer if isinstance(JQReducers[x], dict)] | |
for noncmd in noncmds: | |
jquery = jquery.rstrip("'") | |
jquery += ' | %s\'' % JQReducers[noncmd] | |
for cmd in cmds: | |
jquery += ' | %s' % JQReducers[cmd]['cmd'] | |
#import epdb; epdb.st() | |
logger.debug('# JQUERY ...') | |
pprint(jquery) | |
results = self.rawquery(jquery, start, stop, queryfiles=queryfiles, mapper_cmd=mapper_cmd) | |
#import epdb; epdb.st() | |
return results | |
def get_query_files(self, start, stop): | |
# start+stop == YYYY-MM-DD | |
if start is not None: | |
try: | |
start = datetime.datetime.strptime(start, '%Y-%m-%d') | |
except ValueError as e: | |
dp = stop.split('-') | |
md = calendar.monthrange(int(dp[0]), int(dp[1])) | |
stop = datetime.datetime(int(dp[0]), int(dp[1]), md[1]) | |
else: | |
dates = list(self.cachefiles.values()) | |
start = sorted(dates)[0] | |
if stop is not None: | |
try: | |
stop = datetime.datetime.strptime(stop, '%Y-%m-%d') | |
except ValueError as e: | |
dp = stop.split('-') | |
md = calendar.monthrange(int(dp[0]), int(dp[1])) | |
stop = datetime.datetime(int(dp[0]), int(dp[1]), md[1]) | |
else: | |
dates = list(self.cachefiles.values()) | |
stop = sorted(dates)[-1] | |
logger.debug('# START: %s' % start) | |
logger.debug('# STOP: %s' % stop) | |
queryfiles = [] | |
for item in self.cachefiles.items(): | |
if start != stop: | |
if item[1] >= start and item[1] <= stop: | |
queryfiles.append(item[0]) | |
else: | |
# assume a single day is wanted!? | |
if item[1].year == start.year: | |
if item[1].month == start.month: | |
if item[1].day == start.day: | |
queryfiles.append(item[0]) | |
queryfiles = sorted(set(queryfiles)) | |
return queryfiles | |
def splitjsonrecords(self, rawtext): | |
# https://stackoverflow.com/a/50384432 | |
NOT_WHITESPACE = re.compile(r'[^\s]') | |
def decode_stacked(document, pos=0, decoder=json.JSONDecoder()): | |
while True: | |
match = NOT_WHITESPACE.search(document, pos) | |
if not match: | |
return | |
pos = match.start() | |
try: | |
obj, pos = decoder.raw_decode(document, pos) | |
except JSONDecodeError: | |
# do something sensible if there's some error | |
raise | |
yield obj | |
records = [] | |
for obj in decode_stacked(rawtext): | |
records.append(obj) | |
return records | |
def _splitjsonrecords(self, rawtext): | |
'''split the newline delimited json from jq''' | |
lines = rawtext.split('\n') | |
records = [] | |
thisrecord = {} | |
for x in lines: | |
if x.startswith('{'): | |
thisrecord = x | |
elif x.startswith('}'): | |
thisrecord += x | |
records.append(thisrecord) | |
elif not x: | |
pass | |
else: | |
try: | |
thisrecord += x | |
except Exception as e: | |
print(e) | |
import epdb; epdb.st() | |
records = [self.safe_ascii(x) for x in records] | |
''' | |
for idx,x in enumerate(records): | |
if not 'id' in x: | |
# needs to be a bigint | |
m = hashlib.sha256() | |
m.update(str(x)) | |
eid = str(m.hexdigest) | |
import epdb; epdb.st() | |
''' | |
return records | |
def gzjsonload(self, jsonfile): | |
cmd = 'zcat %s' % jsonfile | |
(rc, so, se) = run_command(cmd) | |
jdata = json.loads(so) | |
return jdata | |
def safe_ascii(self, rawtext): | |
''' | |
rawtext2 = rawtext.replace('\\\\u0000', '<NULL>') | |
#if '\u0000' in rawtext: | |
# import epdb; epdb.st() | |
newtext = None | |
try: | |
newtext = rawtext2.encode('ascii', 'ignore') | |
except UnicodeDecodeError: | |
newtext = rawtext2.decode('utf-8').encode('ascii', 'ignore') | |
if not newtext: | |
import epdb; epdb.st() | |
try: | |
jdata = json.loads(newtext) | |
except Exception as e: | |
print(e) | |
import epdb; epdb.st() | |
''' | |
#lt = filter(lambda x:ord(x) != 0, list(rawtext)) | |
#jdata = json.loads(''.join(lt)) | |
try: | |
jdata = json.loads(rawtext) | |
except Exception as e: | |
print(e) | |
import epdb; epdb.st() | |
''' | |
if 'issue' in jdata['payload']: | |
jdata['payload'].pop('issue', None) | |
elif 'pull_request' in jdata['payload']: | |
jdata['payload'].pop('pull_request', None) | |
else: | |
import epdb; epdb.st() | |
#import epdb; epdb.st() | |
#if 'u0000' in str(jdata) \ | |
# or 'u0000' in json.dumps(jdata) \ | |
# or 'u0000' in rawtext: | |
if 'u0000' in str(jdata): | |
#import epdb; epdb.st() | |
pass | |
''' | |
return jdata | |
if __name__ == "__main__": | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment