Created
February 1, 2022 12:30
-
-
Save Primetalk/562c5b80994c7c96e945947ec0f36734 to your computer and use it in GitHub Desktop.
Split a CSV file based on a field name
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env scala-cli | |
// Usage: | |
// 1. Install scala-cli as described here: https://scala-cli.virtuslab.org/install/ | |
// (For macos it's: `brew install Virtuslab/scala-cli/scala-cli`) | |
// 2. Run `scala-cli <this gist url> <original-file>.csv <count of same value chunks> <chunk id field> | |
// 3. It'll produce many smaller files each of which will contain the requested number of | |
// line chunks with the same identifier. | |
// For each line it'll get the field and compare with the previous line. If it's new, | |
// then counter is incremented. | |
// After size it'll emit a new file. | |
using lib "co.fs2::fs2-core:3.2.3" | |
using lib "co.fs2::fs2-io:3.2.3" | |
import fs2._ | |
import java.nio.file.Path | |
import cats.effect.IO | |
import cats.effect.IOApp | |
import cats.effect.ExitCode | |
import cats.effect.unsafe.implicits.global | |
object csvUtils: | |
/** Extract key value for a string. */ | |
def fieldExtractor(fieldIndex: Int): String => String = | |
_.split(',').apply(fieldIndex) | |
def splitCsv(filename: String, partSize: Int, fieldName: String): IO[Long] = | |
val bytes: Stream[IO, Byte] = fs2.io.file.readAll(Path.of(filename), 10_000) | |
val lines = bytes | |
.through(text.utf8.decode) | |
.through(text.lines) | |
.filter(_.nonEmpty) | |
val subfiles: Stream[IO, Stream[IO, String]] = lines.head.flatMap{ header => | |
val indexOfFieldName = header.split(',').indexOf(fieldName) | |
val rows = lines.tail | |
val key = fieldExtractor(indexOfFieldName) | |
val parts = rows | |
.groupAdjacentBy(key)//group by key | |
.map(_._2)// throw away the key | |
val subfileRows = parts | |
.sliding(partSize, partSize) // combine parts by the requested count | |
.map(_.flatMap(identity)) // flatten parts to be a simple sequence of rows. | |
subfileRows | |
.map(rows => | |
Stream.emit(header) ++ //prepend the same header to each subfile. | |
Stream.chunk(rows)) | |
} | |
val writeSubfileCommands: Stream[IO, String] = | |
subfiles.zipWithIndex.flatMap{ (stm, i) => | |
val subFilename = s"$filename.$i.csv" | |
val streamWithLineEnds = ( | |
stm | |
.intersperse("\n") ++ // separate by newlines | |
Stream.emit("\n") // add a new line at the end | |
) | |
val ioToSaveFile = streamWithLineEnds | |
.through(text.utf8.encode) | |
.through(fs2.io.file.writeAll(Path.of(subFilename))) | |
.compile.drain.as(subFilename) | |
Stream.eval(ioToSaveFile) | |
} | |
writeSubfileCommands | |
.map(subfilename => Stream.eval(IO{println(s"Saved file $subfilename")})) | |
.compile.count | |
val filename = args(0) | |
val partSize = args(1).toInt | |
val fieldName = args(2) | |
println(s"Reading file '$filename' to split in chunks of size $partSize based on field name '$fieldName'.") | |
val count = csvUtils.splitCsv(filename, partSize, fieldName).unsafeRunSync() | |
println(count) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment