Skip to content

Instantly share code, notes, and snippets.

@gregglind
Created November 17, 2014 21:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gregglind/cf04e10f6d99a6801e77 to your computer and use it in GitHub Desktop.
Save gregglind/cf04e10f6d99a6801e77 to your computer and use it in GitHub Desktop.
"""
ENV:
ONEPERCENT - use the 1% sample
JOB_* - these will go to the JOB enivornment, and are viewable from the map functions
RUN - to run from cli. (gross, sorry!)
Examples:
rm -rf driver.jar /tmp/output;
ONEPERCENT=1 JOB_SUBSAMPLE=.02 make ARGS="scripts/whatever.py /tmp/output" hadoop # implied 1 pct location
ONEPERCENT=1 make ARGS="scripts/filter.py /tmp/output /user/sguha/fhr/samples/output/1pct" hadoop
"""
"""
do addons cause bad outcome?
- do they install another browser which then is default?
"""
import itertools
import fileinput
try:
import ujson as json
except:
import json
import random
import sys
import datetime
import logging
import os
##########################################
## Jydoop related machinery, do not change
global setupjob
class Fake(object):
pass
try:
import jydoop
import healthreportutils
except ImportError:
logging.warn("using fake jydoop, healthreportutils")
healthreportutils = Fake()
healthreportutils.setupjob = lambda: 1
setupjob = healthreportutils.setupjob
omap = map
def setupjob_onepercent(job, args):
"""
Set up a job to run on one or more HDFS locations
Jobs expect one or more arguments, the HDFS path(s) to the data.
"""
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat as FileInputFormat
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat as MyInputFormat
print "ARGS", args
if len(args) < 1:
args.append("/user/sguha/fhr/samples/output/1pct")
print "adding 1pct path"
#raise Exception("Usage: <hdfs-location1> [ <location2> ] [ <location3> ] [ ... ]")
job.setInputFormatClass(MyInputFormat)
FileInputFormat.setInputPaths(job, ",".join(args));
job.getConfiguration().set("org.mozilla.jydoop.mappertype", "TEXT")
# set the job to run in the RESEARCH queue
job.getConfiguration().set("mapred.job.queue.name","research")
if 'ONEPERCENT' in os.environ:
print "setting up ONEPERCENT"
setupjob = setupjob_onepercent
def wrapsetup(setup):
print "wrapping"
def newsetup(job,*args,**kwargs):
print "SETUP WITH EXTRA ENV"
print "MAP.CHILD.ENV", job.getConfiguration().get("mapred.map.child.env")
v = job.getConfiguration().get("mapred.map.child.env")
vars = []
for k,v in os.environ.iteritems():
if k.startswith("JOB_"):
vars.append(k+"="+v)
if v:
vars = [v] + vars
v = ",".join(vars)
print v
job.getConfiguration().set("mapred.map.child.env", v)
print "MAP.CHILD.ENV", job.getConfiguration().get("mapred.map.child.env")
setup(job,*args,**kwargs)
return newsetup
setupjob = wrapsetup(setupjob)
##############################################
### Main code. If you make edits, start here.
addon_names= dict()
if "JOB_ADDONS" in os.environ:
addon_names = json.load(open(os.environ["JOB_ADDONS"]))
class FakeContext(object):
def write(self, k, v):
sys.stdout.write("%s\n" % (v,))
def uquote(thing):
return '"%s"' % unicode(thing)
def dump(*args):
return ','.join(omap(uquote,args))
def dt(dashdate):
return datetime.date(*omap(int, dashdate.split('-')))
## should these two functions bulk up to before / after?
## TODO, these need to bulk up to understand "period"
def defaultness(fhr, when):
before = dict(c=0,n=0)
after = dict(c=0,n=0)
for day,v in fhr['days'].iteritems():
D = before
if day > when:
D = after
D['n'] += 1
D['c'] += v.get("org.mozilla.appInfo.appinfo",{}).get("isDefaultBrowser",0)
#print n, c, c>=(n-c)
labels = ['n','y']
ans = labels[before['c'] >= (before['n']/2.0)] + '->' + \
labels[after['c'] >= (after['n']/2.0)]
return dict(before=before,after=after, ans=ans)
## TODO.
def sapness(fhr, when):
before = dict(c=0,n=0)
after = dict(c=0,n=0)
for day,v in fhr['days'].iteritems():
D = before
if day > when:
D = after
for sap,n in v.get("org.mozilla.searches.counts",{}).iteritems():
if sap == "_v": continue
#p = sap.split('.',1)[0]
D['c'] += n
#print ("FHR:", day, sap, n)
#print n, c, c>=(n-c)
labels = ['n','y']
ans = labels[before['c'] > 0] + '->' + \
labels[after['c'] > 0]
return dict(before=before,after=after, ans=ans)
def map(key, value, context):
''' m-r map function'''
pct = float(os.environ.get('JOB_SUBSAMPLE',1))
if random.random() > pct:
return
# for each addon event
# if it's during the fhr period?
# what addon was it?
# did the person go to non-default within a week?
# did they not return?
# did they go to 'sap 0', after that?
# then we need to find a proper control sample.
d = json.loads(value)
if not d['data'].get('days',{}):
return
minday = min(*d['data']['days'].keys())
maxday = max(*d['data']['days'].keys())
try:
dt(minday)
dt(maxday)
except:
logging.error("invalid min or max day, key:%s", key)
return False
# sample some days
controldayfrac = float(os.environ.get('JOB_CONTROL_DAYS_FRAC',.01))
#logging.warning("controldayfrac %s", controldayfrac)
installdays = set() # days when installs happened for a user, ex. '2013-02-13'
L = d['data']['last']
for (addonid, ainfo) in itertools.chain(
L.get('org.mozilla.addons.active',{}).iteritems(),
L.get('org.mozilla.addons.addons',{}).iteritems(),
L.get('org.mozilla.addons.plugins',{}).iteritems()
):
if addonid == "_v":
continue
if "installDay" not in ainfo:
logging.error("old style, ignore, key:%s", key)
return
try:
installDay = datetime.date.fromtimestamp(86400 * int(ainfo['installDay'])).strftime("%Y-%m-%d") ## 2013-02-13 or such.
installdays.add(installDay)
isInstallDay = True
foreign = ainfo.get("foreignInstall",False)
atype = ainfo.get("type","addon")
if minday < installDay <= maxday:
#context.write(key,dump(addonid,installDay,minday,maxday))
# so, does it have a return more than 7 days later?
return7 = (dt(maxday) - dt(installDay)).days >= 7
return14 = (dt(maxday) - dt(installDay)).days >= 14
return28 = (dt(maxday) - dt(installDay)).days >= 28
# are searches down to 0, after 3 days?
usesap = sapness(d['data'], installDay)
# default-status? does a change happen within 7 days?
# d-d, u-u, d-u, u-d
dd = defaultness(d['data'], installDay)
#
context.write(key,dump(key,
addonid,ainfo.get('name',addon_names.get(addonid,'unknown')),
int(return7),
int(return14),
int(return28),
usesap["ans"],
dd["ans"],
len(d['data']['days']),
usesap,
dd,
int(isInstallDay),
int(foreign),
atype
))
except Exception as exc:
logging.error("%s, %s" % (key,exc))
return
for (day,dayinfo) in d['data']['days'].iteritems():
if random.random() > controldayfrac: continue # 1/10000 of days?
# so, does it have a return more than 7 days later?
installDay = day
isInstallDay = installDay in installdays # as seen earlier
return7 = (dt(maxday) - dt(installDay)).days >= 7
return14 = (dt(maxday) - dt(installDay)).days >= 14
return28 = (dt(maxday) - dt(installDay)).days >= 28
# are searches down to 0, after 3 days?
usesap = sapness(d['data'], installDay)
# default-status? does a change happen within 7 days?
# d-d, u-u, d-u, u-d
dd = defaultness(d['data'], installDay)
foreign = False
#
addonid = 'alldays'
ainfo = dict(name="alldays")
t = [
key,
addonid,
ainfo.get('name',addon_names.get(addonid,'unknown')),
int(return7),
int(return14),
int(return28),
usesap["ans"],
dd["ans"],
len(d['data']['days']),
usesap,
dd,
int(foreign),
int(isInstallDay),
"day"
]
context.write(key,dump(*t))
# special rows, split for install or not.
if isInstallDay:
t[1:3] = ['installday', 'installday']
else:
t[1:3] = ['non-installday', 'non-installday']
context.write(key,dump(*t))
if os.environ.get("RUN",False):
for line in fileinput.input():
k, v = line.split("\t",1)
map(k, v, FakeContext())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment