Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
w = Window.partitionBy(F.col('ip'), F.col('year'), F.col('month'), F.col('day'))
results = ('wmf.webrequest')
.where(row_timestamp > ts_start)
.where(row_timestamp < ts_end)
.where(F.col('webrequest_source') == 'text')
.where(F.col('uri_host') == '')
.where(F.col('cache_status') == 'miss')
.where(F.col('uri_path') != '/bigdata/ldf')
.where(F.col('http_status') == '200')
# Posts are useless for us since we don't have queries
.where(F.col('http_method') == 'GET')
.withColumn('query_len', F.length(F.col('uri_query')))
.where(F.col('query_len') < 2000)
# Query takes between 5 and 30 secs - we don't want too easy or too heavy ones
.where(F.col('time_firstbyte') > 5)
.where(F.col('time_firstbyte') < 30)
# Not too many from one source
.withColumn('q_by_day', F.count(F.lit(1)).over(w))
.where(F.col('q_by_day') <= max_q_by_day)
.groupBy(F.col('ip'), F.col('year'), F.col('month'), F.col('day'))
.agg(F.collect_list(F.struct('uri_query', 'time_firstbyte', 'ip')).alias('ip_requests'))
.withColumn('random_req_index', F.floor(F.rand(random_seed) * F.size(F.col('ip_requests'))))
.select(F.col('req.uri_query'), F.col('req.time_firstbyte'), F.col('ip'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment