Skip to content

Instantly share code, notes, and snippets.

@smalyshev
Created April 29, 2019 22:39
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save smalyshev/6e69b1c26432859366c26df02763909d to your computer and use it in GitHub Desktop.
w = Window.partitionBy(F.col('ip'), F.col('year'), F.col('month'), F.col('day'))
results = (
spark.read.table('wmf.webrequest')
.where(row_timestamp > ts_start)
.where(row_timestamp < ts_end)
.where(F.col('webrequest_source') == 'text')
.where(F.col('uri_host') == 'query.wikidata.org')
.where(F.col('cache_status') == 'miss')
.where(F.col('uri_path') != '/bigdata/ldf')
.where(F.col('http_status') == '200')
# Posts are useless for us since we don't have queries
.where(F.col('http_method') == 'GET')
.withColumn('query_len', F.length(F.col('uri_query')))
.where(F.col('query_len') < 2000)
# Query takes between 5 and 30 secs - we don't want too easy or too heavy ones
.where(F.col('time_firstbyte') > 5)
.where(F.col('time_firstbyte') < 30)
# Not too many from one source
.withColumn('q_by_day', F.count(F.lit(1)).over(w))
.where(F.col('q_by_day') <= max_q_by_day)
.groupBy(F.col('ip'), F.col('year'), F.col('month'), F.col('day'))
.agg(F.collect_list(F.struct('uri_query', 'time_firstbyte', 'ip')).alias('ip_requests'))
.withColumn('random_req_index', F.floor(F.rand(random_seed) * F.size(F.col('ip_requests'))))
.select(F.col('ip_requests')[F.col('random_req_index')].alias('req'))
.select(F.col('req.uri_query'), F.col('req.time_firstbyte'), F.col('ip'))
.limit(100)
.toPandas()
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment