Last active
October 23, 2015 09:33
-
-
Save agaoglu/a3ddc7e3b5cdb2bf1ab1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.twitter.scalding._ | |
class CachesizeApproximation(args : Args) extends Job(args) { | |
val MINUTE = Duration.MIN_IN_MS | |
val HOUR = Duration.HOUR_IN_MS | |
// (1) These are different test configurations | |
val inactive = List( | |
10*MINUTE, | |
30*MINUTE, | |
HOUR, | |
2*HOUR, | |
6*HOUR, | |
24*HOUR) | |
val minuses = List(1l, 2l, 3l) | |
// (2) Not our actual source but it'll do | |
val log = new Tsv(args("input"), ('eventtime, 'host, 'path, 'bytessent)) | |
// (3) Some preliminary filtering | |
val access = log | |
.filter('host) {host: String => host == "images.example.com"} | |
.filter('path) {path: String => path != null} | |
.map('path -> 'key) {path: String => path.split("\\?")(0) } // (4) | |
access | |
.groupBy('key) {_ // (5) | |
.reducers(64) | |
.max('bytessent -> 'filesize) | |
.mapPlusMap('eventtime -> 'euses){x: Long => | |
Map(x -> 1L) | |
} {m => m} | |
} | |
.flatMap('key -> 'inactive) {key: String => inactive} // (6) | |
.flatMap('key -> 'minuses) {key: String => minuses} // (6) | |
.flatMap( // (7) | |
('inactive, 'minuses, 'euses, 'filesize) -> | |
('etime, 'change, 'uses)) { | |
a: (Int, Long, Map[Long, Long], Long) => | |
val (inactive, minuses, euses, filesize) = a | |
euses.toList.sorted.foldLeft(List[(Long, Long, Long)]()) { // (8) | |
case (Nil, (time, uses)) => | |
if (uses >= minuses) | |
(time+inactive, -filesize, minuses) :: | |
(time, filesize, minuses) :: | |
Nil | |
else | |
(time+inactive, 0l, uses) :: | |
Nil | |
case ((exit, 0l, prevuses) :: rest, (time, uses)) if time <= exit => | |
if (uses+prevuses >= minuses) | |
(time+inactive, -filesize, minuses) :: | |
(time, filesize, minuses) :: | |
rest | |
else | |
(time+inactive, 0l, uses+prevuses) :: | |
rest | |
case ((exit, exitsize, prevuses) :: rest, (time, uses)) if time <= exit => | |
(time+inactive, exitsize, prevuses) :: | |
rest | |
case (history, (time, uses))=> | |
if (uses >= minuses) | |
(time+inactive, -filesize, minuses) :: | |
(time, filesize, minuses) :: | |
history | |
else | |
(time+inactive, 0l, uses) :: | |
history | |
} | |
} | |
.groupBy('inactive, 'minuses) {_ // (9) | |
.sortBy('etime) | |
.foldLeft( | |
('change, 'uses, 'filesize) -> | |
('lastsize, 'maxsize, 'upstreamrequests, 'upstreamtransfer) | |
)((0l, 0l, 0l, 0l)) { | |
(p: (Long, Long, Long, Long), c: (Long, Long, Long)) => | |
val (lastsize, maxsize, upstreamrequests, upstreamtransfer) = p | |
val (change, uses, filesize) = c | |
val newlastsize = lastsize + change | |
( newlastsize, | |
if (newlastsize > maxsize) newlastsize else maxsize, | |
if (change >= 0) upstreamrequests + uses else upstreamrequests, | |
if (change >= 0) upstreamtransfer + filesize * uses else upstreamtransfer | |
) | |
} | |
} | |
.project('inactive, 'minuses, 'maxsize, 'upstreamrequests, 'upstreamtransfer) | |
.write(new Tsv(args("output"))) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment