Skip to content

Instantly share code, notes, and snippets.

@j-keck
Last active June 10, 2020 12:02
Show Gist options
  • Save j-keck/749f5791a4bc0633f6e1c776c6d88e57 to your computer and use it in GitHub Desktop.
Save j-keck/749f5791a4bc0633f6e1c776c6d88e57 to your computer and use it in GitHub Desktop.
small ammonite script to fetch covid19 datasets from https://pavelmayer.de/covid/risks/
//
// covid19 dataset from https://pavelmayer.de/covid/risks/
//
// amm script.sc fetch : fetch the actual day
// amm script.sc cron : fetch every 24 hours
// amm script.sc show <region>: show the last dataset for <region> (default: Ortenaukreis)
// amm script.sc list <region>: list all datasets for <region> (default: Ortenaukreis)
//
import $ivy.`org.http4s::http4s-blaze-client:0.21.4`
import $ivy.`co.fs2::fs2-core:2.2.1`
import $ivy.`co.fs2::fs2-io:2.2.1`
import $ivy.`com.nrinaudo::kantan.csv:0.6.1`
//import $ivy.`ch.qos.logback:logback-classic:1.2.3`
import org.http4s.client.blaze._
import org.http4s.client._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import cats.effect.{IO, ContextShift, Blocker}
import java.util.concurrent._
import java.nio.file.{Paths, Path}
import java.nio.file.StandardOpenOption._
import fs2.{Stream, text}
import fs2.io.file
import kantan.csv._
import kantan.csv.ops._
import cats._
import cats.implicits._
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
implicit val blocker = {
val blockingPool = Executors.newCachedThreadPool()
Blocker.liftExecutorService(blockingPool)
}
@doc("fetch the actual data and display the current entry for the given region")
@main
def fetch(region: String = "Ortenaukreis") =
(download *> entries(region).map(_.lastOption).flatMap(display(_))).unsafeRunSync
@doc("runs fetch every 24 hours")
@main
def cron() {
val duration = 24.hours
println(s"run every $duration")
(download *>
Stream.repeatEval(download)
.metered(duration)
.compile
.drain
).unsafeRunSync
}
@doc("show the last entry for a given region")
@main
def show(region: String = "Ortenaukreis") =
entries(region).map(_.lastOption).flatMap(display(_)).unsafeRunSync
@doc("list all entries for a given region")
@main
def list(region: String = "Ortenaukreis") =
entries(region).flatMap(display(_)).unsafeRunSync
case class Entry(date: String, region: String, state: String, population: Int,
risk: Float, rang: Int, rangChange: Int, rangYesterday: Int, rangeChangeStr: String,
cases: Int, casesPer100k: Float, deaths: Int, deathsPer100k: Float,
casesLast7Days: Float, casesLast7DaysBefore: Float){
lazy val rwk = (casesLast7Days+5)/(casesLast7DaysBefore+5)
}
implicit val entryDecoder: RowDecoder[Entry] = RowDecoder.decoder(0, 2, 11, 7, 27, 31, 32, 33, 34,
3, 4, 5, 6, 16, 20)(Entry.apply)
def download: IO[Unit] =
for {
ts <- IO(java.time.LocalDate.now)
val destPath = Paths.get(s"data/data-${ts}.csv")
_ <- IO(println(s"download actual data as $destPath"))
_ <- BlazeClientBuilder[IO](ExecutionContext.global).resource.use { client =>
file.createDirectories[IO](blocker, destPath.getParent()) *>
Stream.eval(client.expect[String]("https://pavelmayer.de/covid/risks/data.csv"))
.through(text.utf8Encode)
.through(file.writeAll[IO](destPath, blocker, List(CREATE, TRUNCATE_EXISTING)))
.compile
.drain
}
} yield ()
def entries(region: String): IO[List[Entry]] =
Monad[IO].ifM(file.exists[IO](blocker, path = Paths.get("data/")))(IO.unit, download) *>
file.directoryStream[IO](blocker, path = Paths.get("data/"), glob = "data-*.csv")
// for each file
.flatMap{ path =>
val date = {
// try to extract the date from the filename,
// use the filename if the date part was not found.
val Pattern = raw"data-(\d{4}-\d{2}-\d{2}).csv".r
path.getFileName().toString() match {
case Pattern(p) => p
case _ => path
}
}
// read and parse each file
file.readAll[IO](path, blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.drop(1) // skip header
.filter(_.nonEmpty) // filter out empty lines
.map(line => s"${date},${line}".unsafeReadCsv[List, Entry](rfc).head)
.filter(_.region.toLowerCase.contains(region.toLowerCase))
}
.compile
.toList
.map(_.sortBy(_.date))
def display[F[_]: Functor](x: F[Entry]): IO[Unit] =
IO(printf("| %-10s | %-10s | %-5s | %-9s | %-30s | %5s | %6s |\n", "Date", "Rang", "RwK", "Risk", "Region","Cases", "Deaths")) *>
IO(Functor[F].map(x){ entry =>
import entry._
println(f"| $date%s | $rangeChangeStr%1s $rang%3d($rangChange%3d) | $rwk%5.2f | 1/$risk%-7.1f | $region%-30s | $cases%-5d | $deaths%-6d |")
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment