Skip to content

Instantly share code, notes, and snippets.

@gvolpe
Last active July 6, 2023 00:19
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gvolpe/40b1f38ebbcbb76266dc40cad587c469 to your computer and use it in GitHub Desktop.
Save gvolpe/40b1f38ebbcbb76266dc40cad587c469 to your computer and use it in GitHub Desktop.
CSV file reader using the Fs2 streaming library.
import cats.effect._
import cats.syntax.functor._
import fs2._
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.util.Try
object CsvApp extends IOApp {
val blockingExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))
case class Sample(
policyID: Long,
stateCode: String,
county: String,
eqSiteLimit: Double,
huSiteLimit: Double,
flSiteLimit: Double,
frSiteLimit: Double,
tiv2011: Double,
tiv2012: Double,
eqSiteDeductible: Double,
huSiteDeductible: Double,
flSiteDeductible: Double,
frSiteDeductible: Double,
pointLatitude: Double,
pointLongitude: Double,
line: String,
construction: String,
pointGranularity: Int
)
val parseSample: List[String] => Option[Sample] = {
case (id :: code :: county :: esl :: hsl :: fll :: frl :: tv1 :: tv2 :: esd :: hsd :: fld :: frd :: plat :: plon :: line :: cons :: pg :: Nil) =>
Try(
Sample(
policyID = id.toLong,
stateCode = code,
county = county,
eqSiteLimit = esl.toDouble,
huSiteLimit = hsl.toDouble,
flSiteLimit = fll.toDouble,
frSiteLimit = frl.toDouble,
tiv2011 = tv1.toDouble,
tiv2012 = tv2.toDouble,
eqSiteDeductible = esd.toDouble,
huSiteDeductible = hsd.toDouble,
flSiteDeductible = fld.toDouble,
frSiteDeductible = frd.toDouble,
pointLatitude = plat.toDouble,
pointLongitude = plon.toDouble,
line = line,
construction = cons,
pointGranularity = pg.trim.toInt
)
).toOption
case _ => None
}
def csvParser[F[_]]: Pipe[F, Byte, List[String]] =
_.through(text.utf8Decode)
.through(text.lines)
.drop(1) // remove headers
.map(_.split(',').toList) // separate by comma
// Get file from https://support.spatialkey.com/spatialkey-sample-csv-data/ and convert it to UTF-8
val parser: Stream[IO, Unit] =
io.file
.readAll[IO](Paths.get("/home/gvolpe/sample.csv"), blockingExecutionContext, 4096)
.through(csvParser)
.map(parseSample) // parse each line into a valid sample
.unNoneTerminate // terminate when done
.evalMap(x => IO(println(x)))
val program: IO[Unit] =
parser.compile.drain.guarantee(IO(blockingExecutionContext.shutdown()))
override def run(args: List[String]): IO[ExitCode] =
program.as(ExitCode.Success)
}
@1244064566
Copy link

haha

@gvolpe
Copy link
Author

gvolpe commented Dec 24, 2018

haha

?

@asachdeva
Copy link

NICE...Thanks for this awesome gist. Talk about local reasoning and this is it. Not much better (as of this writing) than fs2 and cats-effect when it comes to streaming and effects. Of course this comment gets stale within the next few months.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment