Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import com.twitter.scalding._
class CachesizeApproximation(args : Args) extends Job(args) {
val MINUTE = Duration.MIN_IN_MS
val HOUR = Duration.HOUR_IN_MS
// (1) These are different test configurations
val inactive = List(
10*MINUTE,
30*MINUTE,
HOUR,
2*HOUR,
6*HOUR,
24*HOUR)
val minuses = List(1l, 2l, 3l)
// (2) Not our actual source but it'll do
val log = new Tsv(args("input"), ('eventtime, 'host, 'path, 'bytessent))
// (3) Some preliminary filtering
val access = log
.filter('host) {host: String => host == "images.example.com"}
.filter('path) {path: String => path != null}
.map('path -> 'key) {path: String => path.split("\\?")(0) } // (4)
access
.groupBy('key) {_ // (5)
.reducers(64)
.max('bytessent -> 'filesize)
.mapPlusMap('eventtime -> 'euses){x: Long =>
Map(x -> 1L)
} {m => m}
}
.flatMap('key -> 'inactive) {key: String => inactive} // (6)
.flatMap('key -> 'minuses) {key: String => minuses} // (6)
.flatMap( // (7)
('inactive, 'minuses, 'euses, 'filesize) ->
('etime, 'change, 'uses)) {
a: (Int, Long, Map[Long, Long], Long) =>
val (inactive, minuses, euses, filesize) = a
euses.toList.sorted.foldLeft(List[(Long, Long, Long)]()) { // (8)
case (Nil, (time, uses)) =>
if (uses >= minuses)
(time+inactive, -filesize, minuses) ::
(time, filesize, minuses) ::
Nil
else
(time+inactive, 0l, uses) ::
Nil
case ((exit, 0l, prevuses) :: rest, (time, uses)) if time <= exit =>
if (uses+prevuses >= minuses)
(time+inactive, -filesize, minuses) ::
(time, filesize, minuses) ::
rest
else
(time+inactive, 0l, uses+prevuses) ::
rest
case ((exit, exitsize, prevuses) :: rest, (time, uses)) if time <= exit =>
(time+inactive, exitsize, prevuses) ::
rest
case (history, (time, uses))=>
if (uses >= minuses)
(time+inactive, -filesize, minuses) ::
(time, filesize, minuses) ::
history
else
(time+inactive, 0l, uses) ::
history
}
}
.groupBy('inactive, 'minuses) {_ // (9)
.sortBy('etime)
.foldLeft(
('change, 'uses, 'filesize) ->
('lastsize, 'maxsize, 'upstreamrequests, 'upstreamtransfer)
)((0l, 0l, 0l, 0l)) {
(p: (Long, Long, Long, Long), c: (Long, Long, Long)) =>
val (lastsize, maxsize, upstreamrequests, upstreamtransfer) = p
val (change, uses, filesize) = c
val newlastsize = lastsize + change
( newlastsize,
if (newlastsize > maxsize) newlastsize else maxsize,
if (change >= 0) upstreamrequests + uses else upstreamrequests,
if (change >= 0) upstreamtransfer + filesize * uses else upstreamtransfer
)
}
}
.project('inactive, 'minuses, 'maxsize, 'upstreamrequests, 'upstreamtransfer)
.write(new Tsv(args("output")))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment