Skip to content

Instantly share code, notes, and snippets.

@Primetalk
Created February 1, 2022 12:30
Show Gist options
  • Save Primetalk/562c5b80994c7c96e945947ec0f36734 to your computer and use it in GitHub Desktop.
Save Primetalk/562c5b80994c7c96e945947ec0f36734 to your computer and use it in GitHub Desktop.
Split a CSV file based on a field name
#!/usr/bin/env scala-cli
// Usage:
// 1. Install scala-cli as described here: https://scala-cli.virtuslab.org/install/
// (For macos it's: `brew install Virtuslab/scala-cli/scala-cli`)
// 2. Run `scala-cli <this gist url> <original-file>.csv <count of same value chunks> <chunk id field>
// 3. It'll produce many smaller files each of which will contain the requested number of
// line chunks with the same identifier.
// For each line it'll get the field and compare with the previous line. If it's new,
// then counter is incremented.
// After size it'll emit a new file.
using lib "co.fs2::fs2-core:3.2.3"
using lib "co.fs2::fs2-io:3.2.3"
import fs2._
import java.nio.file.Path
import cats.effect.IO
import cats.effect.IOApp
import cats.effect.ExitCode
import cats.effect.unsafe.implicits.global
object csvUtils:
/** Extract key value for a string. */
def fieldExtractor(fieldIndex: Int): String => String =
_.split(',').apply(fieldIndex)
def splitCsv(filename: String, partSize: Int, fieldName: String): IO[Long] =
val bytes: Stream[IO, Byte] = fs2.io.file.readAll(Path.of(filename), 10_000)
val lines = bytes
.through(text.utf8.decode)
.through(text.lines)
.filter(_.nonEmpty)
val subfiles: Stream[IO, Stream[IO, String]] = lines.head.flatMap{ header =>
val indexOfFieldName = header.split(',').indexOf(fieldName)
val rows = lines.tail
val key = fieldExtractor(indexOfFieldName)
val parts = rows
.groupAdjacentBy(key)//group by key
.map(_._2)// throw away the key
val subfileRows = parts
.sliding(partSize, partSize) // combine parts by the requested count
.map(_.flatMap(identity)) // flatten parts to be a simple sequence of rows.
subfileRows
.map(rows =>
Stream.emit(header) ++ //prepend the same header to each subfile.
Stream.chunk(rows))
}
val writeSubfileCommands: Stream[IO, String] =
subfiles.zipWithIndex.flatMap{ (stm, i) =>
val subFilename = s"$filename.$i.csv"
val streamWithLineEnds = (
stm
.intersperse("\n") ++ // separate by newlines
Stream.emit("\n") // add a new line at the end
)
val ioToSaveFile = streamWithLineEnds
.through(text.utf8.encode)
.through(fs2.io.file.writeAll(Path.of(subFilename)))
.compile.drain.as(subFilename)
Stream.eval(ioToSaveFile)
}
writeSubfileCommands
.map(subfilename => Stream.eval(IO{println(s"Saved file $subfilename")}))
.compile.count
val filename = args(0)
val partSize = args(1).toInt
val fieldName = args(2)
println(s"Reading file '$filename' to split in chunks of size $partSize based on field name '$fieldName'.")
val count = csvUtils.splitCsv(filename, partSize, fieldName).unsafeRunSync()
println(count)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment