Skip to content

Instantly share code, notes, and snippets.

@jwmatthews
Created November 14, 2013 02:42
Show Gist options
  • Save jwmatthews/7460446 to your computer and use it in GitHub Desktop.
Save jwmatthews/7460446 to your computer and use it in GitHub Desktop.
package com.sciencebehindsweat
import scala.collection.parallel._
import scala.compat.Platform
import java.io.File
import java.io.FileWriter
object Main {
val url_athlete = "http://games.crossfit.com/athlete/"
val outputDir = "./output"
val numThreads = 50
def getAthleteFileName(id: String) = {
s"${outputDir}/athlete_${id}.html"
}
def processAthlete(id: String) : Boolean = {
try {
_processAthlete(id)
}
catch {
case ex : Throwable => {
println("Caught Exception: " + ex)
false
}
}
}
def _processAthlete(id: String) : Boolean = {
// Fetch URL data
val start = Platform.currentTime
val sourceURL = url_athlete+id
val data = scala.io.Source.fromURL(sourceURL).getLines.mkString("\n")
// Store data to file
val fw = new FileWriter(getAthleteFileName(id))
fw.write(data)
fw.close()
val end = Platform.currentTime
println(s"Fetched ${data.length} bytes from ${sourceURL} in ${(end-start)/1000.0} seconds")
true
}
def getIds(filePath: String) = {
println(s"Will parse: '${filePath}'")
val bs = scala.io.Source.fromFile(filePath)
val lines = bs.getLines.map(_.split(",")(0)).toArray
bs.close
lines
}
def isDownloaded(id: String) = {
val f = new File(getAthleteFileName(id))
f.isFile
}
def main(args: Array[String]) {
val start = Platform.currentTime
if (args.length < 1) {
println("Please re-run with path to a csv file of athelete ids")
System.exit(1)
}
// Check that output directory exists
val dir = new File(outputDir);
if (!dir.isDirectory()) {
dir.mkdir()
}
val csv_file = args(0)
val potential_ids = getIds(csv_file).toArray
// Filter out the IDs we've already processed
val ids = potential_ids.filter( x => !isDownloaded(x))
// Check if we want to limit the data we work on.
val limit = if (args.length > 1) Some(args(1).toInt) else None
val athleteIds = (limit match {
case Some(x) => ids.slice(0, x)
case None => ids
}).par
println(s"Of the possible ${potential_ids.length} ids available ${potential_ids.length - ids.length} have already been download.")
println(s"Will process ${athleteIds.length} athletes (out of ${ids.length} potential) with ${numThreads} threads.")
// http://docs.scala-lang.org/overviews/parallel-collections/configuration.html
athleteIds.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(numThreads))
val results = athleteIds.map(processAthlete)
results.filter(_ == false).length match {
case 0 => println(s"Succes, processed ${athleteIds.length} entries, no errors reported.")
case num_errors => println(s"Error, attempted ${athleteIds.length}, found ${num_errors} errors.")
}
val end = Platform.currentTime
println(s"Completed in ${(end-start)/1000.0} seconds")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment