Skip to content

Instantly share code, notes, and snippets.

@jctanner
Created July 12, 2019 19:24
Show Gist options
  • Save jctanner/e6d8e60fc3988853534865129057042c to your computer and use it in GitHub Desktop.
Save jctanner/e6d8e60fc3988853534865129057042c to your computer and use it in GitHub Desktop.
jq database thingy
#!/usr/bin/env python
# http://stackoverflow.com/a/14299004
import calendar
import datetime
import json
#import logging
import os
import re
from multiprocessing.dummy import Pool as ThreadPool
from pprint import pprint
from graff.utils.systemtools import run_command
from graff.utils.systemtools import run_find
from graff.jqdb.reducers import JQReducers
'''
logging.basicConfig(
level=logging.INFO,
format='(%(threadName)-9s) %(message)s',
)
'''
from logzero import logger
logging = logger
HOSTS = [
'ssh -i ~/.ssh/id_ecdsa jtanner@g73.lab.net'
]
HOSTS = []
def threaded_cmd(cmd):
logging.info(cmd)
(rc, so, se) = run_command(cmd)
if rc != 0:
logging.error(rc)
logging.error(se)
logger.info('rc: %s' % rc)
logger.info('so: %s' % len(so))
logger.info('se: %s' % len(se))
if rc != 0:
if 'actor.login' in cmd:
newcmd = cmd.replace('actor.login', 'actor')
(rc, so, se) = threaded_cmd(newcmd)
if rc != 0:
with open('/tmp/errors.log', 'a') as f:
f.write(cmd + '\n')
return (rc, so, se)
class JQDatabase(object):
'''A database-like thing with gharchive data and jq'''
def __init__(self, cachedir=None):
self.cachedir = cachedir
self.cachefiles = {}
self.build_cache_files()
def build_cache_files(self):
'''Index all the files by date'''
files = run_find(self.cachedir + '/', ftype='f')
for f in files:
if not f.endswith('.gz'):
continue
# make a timestamp
fname = os.path.basename(f)
fdate = fname.replace('.json.gz', '')
fdate = datetime.datetime.strptime(fdate, '%Y-%m-%d-%H')
# inject into dict
self.cachefiles[f] = fdate
def get_file_content(self, filename, repository=None):
if not repository:
cmd = "zcat %s | jq -a -c -M '.'" % filename
(rc, so, se) = run_command(cmd)
jdata = self.splitjsonrecords(so)
else:
if isinstance(filename, list):
jdata = self.select(repo=repository, queryfiles=filename)
else:
jdata = self.select(repo=repository, queryfiles=[filename])
return jdata
@property
def issue_and_pr_actions(self):
jquery = '.payload.action == "opened"'
jquery += ' or .payload.action == "reopened"'
jquery += ' or .payload.action == "closed"'
jquery += ' or .payload.action == "merged"'
return jquery
def rawquery(self, jquery, start, stop, limit=None, queryfiles=[], mapper_cmd=None, threads=4):
'''Run arbitrary query across range of files'''
if not queryfiles and (start and stop):
queryfiles = self.get_query_files(start, stop)
if limit:
queryfiles = queryfiles[:limit]
pprint(queryfiles)
#cmds = ['zcat %s | jq %s' % (x, jquery) for x in queryfiles]
# timeline events had a 'repository' key instead of a 'repo' key
shim = 'sed \'s/"repository":/"repo":/g\''
cmds = ['zcat %s | %s | jq -a -c -M %s' % (x, shim, jquery) for x in queryfiles]
# reduce/map the final outputs
if mapper_cmd:
cmds = [x + ' | ' + mapper_cmd for x in cmds]
if not HOSTS:
pool = ThreadPool(threads)
outputs = pool.map(threaded_cmd, cmds)
pool.close()
pool.join()
else:
chunks = []
chunksize = (len(cmds) / 4) * 3
while cmds:
if len(cmds) < chunksize:
#chunks[-1] += cmds
chunks.append(cmds)
cmds = []
else:
chunks.append(cmds[:chunksize])
cmds = cmds[chunksize:]
#import epdb; epdb.st()
# ssh -i ~/ssh/id_ecdsa jtanner@g73.lab.net
# "zcat /data/ssd/gharchive/2015/01/01/2015-01-01-17.json.gz |
# jq 'select(.repo.name == \"ansible/ansible\")'".
for idx,x in enumerate(chunks[1]):
x = x.replace("\\'", '\\\"')
x = x.replace('"', '\\\"')
x = '%s "%s"' % (HOSTS[0], x)
#(rc, so, se) = run_command(x)
#if so:
# import epdb; epdb.st()
chunks[1][idx] = x
#import sys; sys.exit(1)
lpool = ThreadPool(4)
rpool = ThreadPool(4)
loutputs = lpool.imap_unordered(threaded_cmd, chunks[0])
routputs = rpool.imap_unordered(threaded_cmd, chunks[1])
lpool.close()
rpool.close()
lpool.join()
rpool.join()
outputs = []
for lo in loutputs:
outputs.append(lo)
for ro in routputs:
outputs.append(ro)
#import epdb; epdb.st()
results = []
for output in outputs:
if mapper_cmd:
jdata = json.loads(output[1])
else:
jdata = self.splitjsonrecords(output[1])
if isinstance(jdata, list):
results += jdata
elif isinstance(jdata, dict):
results.append(jdata)
#import epdb; epdb.st()
return results
def select(self, namespace=None, repo=None, actor=None, eventtype=None, actions=None,
start=None, stop=None, queryfiles=[], mapper_cmd=None, reducer=None):
'''Mimic select across the cached files'''
# start+stop == YYYY-MM-DD
if not queryfiles:
queryfiles = self.get_query_files(start, stop)
logger.debug('# QUERYFILES ...')
pprint(queryfiles)
jquery = '.'
params = []
if repo:
if isinstance(repo, str):
repo = [repo]
rparams = []
for r in repo:
rparams.append(".repo.name == \"%s\"" % r)
rparams.append(".repo.url == \"https://github.com/%s\"" % r)
params.append('(' + ' or '.join(rparams) + ')')
if actor:
if not isinstance(actor, list):
params.append(".actor.login == \"%s\"" % actor)
else:
aparams = []
for _actor in actor:
aparams.append(".actor.login == \"%s\"" % _actor)
params.append('(' + ' or '.join(aparams) + ')')
if eventtype:
if isinstance(eventtype, str):
params.append(".type == \"%s\"" % eventtype)
elif isinstance(eventtype, list):
rparams = []
for r in eventtype:
rparams.append(".type == \"%s\"" % r)
params.append('(' + ' or '.join(rparams) + ')')
# opened,closed,merged,etc
if actions:
if isinstance(actions, str):
params.append(".payload.action == \"%s\"" % actions)
elif isinstance(actions, list):
rparams = []
for r in actions:
rparams.append(".payload.action == \"%s\"" % r)
params.append('(' + ' or '.join(rparams) + ')')
if params:
#jquery = '\'select(' + ' and '.join(params) + ')\''
jquery = '\'select(' + ' and '.join(params) + ')\''
if reducer:
if not isinstance(reducer, list):
if isinstance(JQReducers[reducer], dict):
jquery = jquery.rstrip("'")
jquery += ' | %s\'' % JQReducers[reducer]
else:
jquery += ' | %s' % JQReducers[reducer]['cmd']
import epdb; epdb.st()
else:
noncmds = [x for x in reducer if not isinstance(JQReducers[x], dict)]
cmds = [x for x in reducer if isinstance(JQReducers[x], dict)]
for noncmd in noncmds:
jquery = jquery.rstrip("'")
jquery += ' | %s\'' % JQReducers[noncmd]
for cmd in cmds:
jquery += ' | %s' % JQReducers[cmd]['cmd']
#import epdb; epdb.st()
logger.debug('# JQUERY ...')
pprint(jquery)
results = self.rawquery(jquery, start, stop, queryfiles=queryfiles, mapper_cmd=mapper_cmd)
#import epdb; epdb.st()
return results
def get_query_files(self, start, stop):
# start+stop == YYYY-MM-DD
if start is not None:
try:
start = datetime.datetime.strptime(start, '%Y-%m-%d')
except ValueError as e:
dp = stop.split('-')
md = calendar.monthrange(int(dp[0]), int(dp[1]))
stop = datetime.datetime(int(dp[0]), int(dp[1]), md[1])
else:
dates = list(self.cachefiles.values())
start = sorted(dates)[0]
if stop is not None:
try:
stop = datetime.datetime.strptime(stop, '%Y-%m-%d')
except ValueError as e:
dp = stop.split('-')
md = calendar.monthrange(int(dp[0]), int(dp[1]))
stop = datetime.datetime(int(dp[0]), int(dp[1]), md[1])
else:
dates = list(self.cachefiles.values())
stop = sorted(dates)[-1]
logger.debug('# START: %s' % start)
logger.debug('# STOP: %s' % stop)
queryfiles = []
for item in self.cachefiles.items():
if start != stop:
if item[1] >= start and item[1] <= stop:
queryfiles.append(item[0])
else:
# assume a single day is wanted!?
if item[1].year == start.year:
if item[1].month == start.month:
if item[1].day == start.day:
queryfiles.append(item[0])
queryfiles = sorted(set(queryfiles))
return queryfiles
def splitjsonrecords(self, rawtext):
# https://stackoverflow.com/a/50384432
NOT_WHITESPACE = re.compile(r'[^\s]')
def decode_stacked(document, pos=0, decoder=json.JSONDecoder()):
while True:
match = NOT_WHITESPACE.search(document, pos)
if not match:
return
pos = match.start()
try:
obj, pos = decoder.raw_decode(document, pos)
except JSONDecodeError:
# do something sensible if there's some error
raise
yield obj
records = []
for obj in decode_stacked(rawtext):
records.append(obj)
return records
def _splitjsonrecords(self, rawtext):
'''split the newline delimited json from jq'''
lines = rawtext.split('\n')
records = []
thisrecord = {}
for x in lines:
if x.startswith('{'):
thisrecord = x
elif x.startswith('}'):
thisrecord += x
records.append(thisrecord)
elif not x:
pass
else:
try:
thisrecord += x
except Exception as e:
print(e)
import epdb; epdb.st()
records = [self.safe_ascii(x) for x in records]
'''
for idx,x in enumerate(records):
if not 'id' in x:
# needs to be a bigint
m = hashlib.sha256()
m.update(str(x))
eid = str(m.hexdigest)
import epdb; epdb.st()
'''
return records
def gzjsonload(self, jsonfile):
cmd = 'zcat %s' % jsonfile
(rc, so, se) = run_command(cmd)
jdata = json.loads(so)
return jdata
def safe_ascii(self, rawtext):
'''
rawtext2 = rawtext.replace('\\\\u0000', '<NULL>')
#if '\u0000' in rawtext:
# import epdb; epdb.st()
newtext = None
try:
newtext = rawtext2.encode('ascii', 'ignore')
except UnicodeDecodeError:
newtext = rawtext2.decode('utf-8').encode('ascii', 'ignore')
if not newtext:
import epdb; epdb.st()
try:
jdata = json.loads(newtext)
except Exception as e:
print(e)
import epdb; epdb.st()
'''
#lt = filter(lambda x:ord(x) != 0, list(rawtext))
#jdata = json.loads(''.join(lt))
try:
jdata = json.loads(rawtext)
except Exception as e:
print(e)
import epdb; epdb.st()
'''
if 'issue' in jdata['payload']:
jdata['payload'].pop('issue', None)
elif 'pull_request' in jdata['payload']:
jdata['payload'].pop('pull_request', None)
else:
import epdb; epdb.st()
#import epdb; epdb.st()
#if 'u0000' in str(jdata) \
# or 'u0000' in json.dumps(jdata) \
# or 'u0000' in rawtext:
if 'u0000' in str(jdata):
#import epdb; epdb.st()
pass
'''
return jdata
if __name__ == "__main__":
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment