Created
November 17, 2014 21:53
-
-
Save gregglind/cf04e10f6d99a6801e77 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
ENV: | |
ONEPERCENT - use the 1% sample | |
JOB_* - these will go to the JOB enivornment, and are viewable from the map functions | |
RUN - to run from cli. (gross, sorry!) | |
Examples: | |
rm -rf driver.jar /tmp/output; | |
ONEPERCENT=1 JOB_SUBSAMPLE=.02 make ARGS="scripts/whatever.py /tmp/output" hadoop # implied 1 pct location | |
ONEPERCENT=1 make ARGS="scripts/filter.py /tmp/output /user/sguha/fhr/samples/output/1pct" hadoop | |
""" | |
""" | |
do addons cause bad outcome? | |
- do they install another browser which then is default? | |
""" | |
import itertools | |
import fileinput | |
try: | |
import ujson as json | |
except: | |
import json | |
import random | |
import sys | |
import datetime | |
import logging | |
import os | |
########################################## | |
## Jydoop related machinery, do not change | |
global setupjob | |
class Fake(object): | |
pass | |
try: | |
import jydoop | |
import healthreportutils | |
except ImportError: | |
logging.warn("using fake jydoop, healthreportutils") | |
healthreportutils = Fake() | |
healthreportutils.setupjob = lambda: 1 | |
setupjob = healthreportutils.setupjob | |
omap = map | |
def setupjob_onepercent(job, args): | |
""" | |
Set up a job to run on one or more HDFS locations | |
Jobs expect one or more arguments, the HDFS path(s) to the data. | |
""" | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat as FileInputFormat | |
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat as MyInputFormat | |
print "ARGS", args | |
if len(args) < 1: | |
args.append("/user/sguha/fhr/samples/output/1pct") | |
print "adding 1pct path" | |
#raise Exception("Usage: <hdfs-location1> [ <location2> ] [ <location3> ] [ ... ]") | |
job.setInputFormatClass(MyInputFormat) | |
FileInputFormat.setInputPaths(job, ",".join(args)); | |
job.getConfiguration().set("org.mozilla.jydoop.mappertype", "TEXT") | |
# set the job to run in the RESEARCH queue | |
job.getConfiguration().set("mapred.job.queue.name","research") | |
if 'ONEPERCENT' in os.environ: | |
print "setting up ONEPERCENT" | |
setupjob = setupjob_onepercent | |
def wrapsetup(setup): | |
print "wrapping" | |
def newsetup(job,*args,**kwargs): | |
print "SETUP WITH EXTRA ENV" | |
print "MAP.CHILD.ENV", job.getConfiguration().get("mapred.map.child.env") | |
v = job.getConfiguration().get("mapred.map.child.env") | |
vars = [] | |
for k,v in os.environ.iteritems(): | |
if k.startswith("JOB_"): | |
vars.append(k+"="+v) | |
if v: | |
vars = [v] + vars | |
v = ",".join(vars) | |
print v | |
job.getConfiguration().set("mapred.map.child.env", v) | |
print "MAP.CHILD.ENV", job.getConfiguration().get("mapred.map.child.env") | |
setup(job,*args,**kwargs) | |
return newsetup | |
setupjob = wrapsetup(setupjob) | |
############################################## | |
### Main code. If you make edits, start here. | |
addon_names= dict() | |
if "JOB_ADDONS" in os.environ: | |
addon_names = json.load(open(os.environ["JOB_ADDONS"])) | |
class FakeContext(object): | |
def write(self, k, v): | |
sys.stdout.write("%s\n" % (v,)) | |
def uquote(thing): | |
return '"%s"' % unicode(thing) | |
def dump(*args): | |
return ','.join(omap(uquote,args)) | |
def dt(dashdate): | |
return datetime.date(*omap(int, dashdate.split('-'))) | |
## should these two functions bulk up to before / after? | |
## TODO, these need to bulk up to understand "period" | |
def defaultness(fhr, when): | |
before = dict(c=0,n=0) | |
after = dict(c=0,n=0) | |
for day,v in fhr['days'].iteritems(): | |
D = before | |
if day > when: | |
D = after | |
D['n'] += 1 | |
D['c'] += v.get("org.mozilla.appInfo.appinfo",{}).get("isDefaultBrowser",0) | |
#print n, c, c>=(n-c) | |
labels = ['n','y'] | |
ans = labels[before['c'] >= (before['n']/2.0)] + '->' + \ | |
labels[after['c'] >= (after['n']/2.0)] | |
return dict(before=before,after=after, ans=ans) | |
## TODO. | |
def sapness(fhr, when): | |
before = dict(c=0,n=0) | |
after = dict(c=0,n=0) | |
for day,v in fhr['days'].iteritems(): | |
D = before | |
if day > when: | |
D = after | |
for sap,n in v.get("org.mozilla.searches.counts",{}).iteritems(): | |
if sap == "_v": continue | |
#p = sap.split('.',1)[0] | |
D['c'] += n | |
#print ("FHR:", day, sap, n) | |
#print n, c, c>=(n-c) | |
labels = ['n','y'] | |
ans = labels[before['c'] > 0] + '->' + \ | |
labels[after['c'] > 0] | |
return dict(before=before,after=after, ans=ans) | |
def map(key, value, context): | |
''' m-r map function''' | |
pct = float(os.environ.get('JOB_SUBSAMPLE',1)) | |
if random.random() > pct: | |
return | |
# for each addon event | |
# if it's during the fhr period? | |
# what addon was it? | |
# did the person go to non-default within a week? | |
# did they not return? | |
# did they go to 'sap 0', after that? | |
# then we need to find a proper control sample. | |
d = json.loads(value) | |
if not d['data'].get('days',{}): | |
return | |
minday = min(*d['data']['days'].keys()) | |
maxday = max(*d['data']['days'].keys()) | |
try: | |
dt(minday) | |
dt(maxday) | |
except: | |
logging.error("invalid min or max day, key:%s", key) | |
return False | |
# sample some days | |
controldayfrac = float(os.environ.get('JOB_CONTROL_DAYS_FRAC',.01)) | |
#logging.warning("controldayfrac %s", controldayfrac) | |
installdays = set() # days when installs happened for a user, ex. '2013-02-13' | |
L = d['data']['last'] | |
for (addonid, ainfo) in itertools.chain( | |
L.get('org.mozilla.addons.active',{}).iteritems(), | |
L.get('org.mozilla.addons.addons',{}).iteritems(), | |
L.get('org.mozilla.addons.plugins',{}).iteritems() | |
): | |
if addonid == "_v": | |
continue | |
if "installDay" not in ainfo: | |
logging.error("old style, ignore, key:%s", key) | |
return | |
try: | |
installDay = datetime.date.fromtimestamp(86400 * int(ainfo['installDay'])).strftime("%Y-%m-%d") ## 2013-02-13 or such. | |
installdays.add(installDay) | |
isInstallDay = True | |
foreign = ainfo.get("foreignInstall",False) | |
atype = ainfo.get("type","addon") | |
if minday < installDay <= maxday: | |
#context.write(key,dump(addonid,installDay,minday,maxday)) | |
# so, does it have a return more than 7 days later? | |
return7 = (dt(maxday) - dt(installDay)).days >= 7 | |
return14 = (dt(maxday) - dt(installDay)).days >= 14 | |
return28 = (dt(maxday) - dt(installDay)).days >= 28 | |
# are searches down to 0, after 3 days? | |
usesap = sapness(d['data'], installDay) | |
# default-status? does a change happen within 7 days? | |
# d-d, u-u, d-u, u-d | |
dd = defaultness(d['data'], installDay) | |
# | |
context.write(key,dump(key, | |
addonid,ainfo.get('name',addon_names.get(addonid,'unknown')), | |
int(return7), | |
int(return14), | |
int(return28), | |
usesap["ans"], | |
dd["ans"], | |
len(d['data']['days']), | |
usesap, | |
dd, | |
int(isInstallDay), | |
int(foreign), | |
atype | |
)) | |
except Exception as exc: | |
logging.error("%s, %s" % (key,exc)) | |
return | |
for (day,dayinfo) in d['data']['days'].iteritems(): | |
if random.random() > controldayfrac: continue # 1/10000 of days? | |
# so, does it have a return more than 7 days later? | |
installDay = day | |
isInstallDay = installDay in installdays # as seen earlier | |
return7 = (dt(maxday) - dt(installDay)).days >= 7 | |
return14 = (dt(maxday) - dt(installDay)).days >= 14 | |
return28 = (dt(maxday) - dt(installDay)).days >= 28 | |
# are searches down to 0, after 3 days? | |
usesap = sapness(d['data'], installDay) | |
# default-status? does a change happen within 7 days? | |
# d-d, u-u, d-u, u-d | |
dd = defaultness(d['data'], installDay) | |
foreign = False | |
# | |
addonid = 'alldays' | |
ainfo = dict(name="alldays") | |
t = [ | |
key, | |
addonid, | |
ainfo.get('name',addon_names.get(addonid,'unknown')), | |
int(return7), | |
int(return14), | |
int(return28), | |
usesap["ans"], | |
dd["ans"], | |
len(d['data']['days']), | |
usesap, | |
dd, | |
int(foreign), | |
int(isInstallDay), | |
"day" | |
] | |
context.write(key,dump(*t)) | |
# special rows, split for install or not. | |
if isInstallDay: | |
t[1:3] = ['installday', 'installday'] | |
else: | |
t[1:3] = ['non-installday', 'non-installday'] | |
context.write(key,dump(*t)) | |
if os.environ.get("RUN",False): | |
for line in fileinput.input(): | |
k, v = line.split("\t",1) | |
map(k, v, FakeContext()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment