Skip to content

Instantly share code, notes, and snippets.

@fsarradin
Created February 26, 2020 20:44
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save fsarradin/88447deb2cf44c2f559ef3aa3fd0ee43 to your computer and use it in GitHub Desktop.
Save fsarradin/88447deb2cf44c2f559ef3aa3fd0ee43 to your computer and use it in GitHub Desktop.
Read a fixed length format file by using ZIO, ZStream, and Magnolia
name := "zkafka"
version := "0.1"
scalaVersion := "2.13.1"
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "1.0.0-RC17",
"dev.zio" %% "zio-streams" % "1.0.0-RC17",
"com.propensive" %% "magnolia" % "0.12.6"
)
H
D12345 John 32
A12345 5 RUE DU TEMPLES93170 BAGNOLET
D12346 Mary 30
D12347 Sebastian 35
D12348 Ahmed 21
D12349 Tony 63
F
package io.univalence
import java.io.{File, FileInputStream, IOException}
import java.util.concurrent.TimeUnit
import scala.language.experimental.macros
import scala.util.Random
import zio._
import zio.clock.Clock
import zio.console._
import zio.duration.Duration
import zio.stream._
object zkafka extends zio.App {
import FromFixedLengthData._
override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] = {
val stream = open("data.flf")
val program: ZIO[Console, IOException, Unit] =
for {
data <- stream.chunks
.aggregate(ZSink.utf8DecodeChunk)
.aggregate(ZSink.splitLines)
.mapConcatChunk(identity)
.filter(line => line.length > 0 && !Set('H', 'F').contains(line(0)))
.map(parse)
.fold(Map.empty[UserId, CompleteUser]) {
case (m, None) => m
case (m, Some(Left(u))) =>
val user = CompleteUser(u.id, u.name, u.age, None)
m.updated(u.id, user)
case (m, Some(Right(a))) =>
m.updated(a.userId, m(a.userId).copy(address = Some(a)))
}
_ <- putStrLn(data.toString())
} yield ()
program
.fold({ f =>
f.printStackTrace()
1
}, _ => 0)
}
type ParseResult = Option[Either[User, Address]]
def parse(line: String): ParseResult = {
val discr = line.splitAt(1)._1
if (discr == "D")
Some(Left(fromFixedLength[User](line)))
else if (discr == "A")
Some(Right(fromFixedLength[Address](line)))
else None
}
def open(filename: String): ZStreamChunk[Any, IOException, Byte] = {
val file = new File(filename)
val fis = new FileInputStream(file)
ZStream.fromInputStream(fis)
}
}
trait FixedLengthField {
val offset: Int
val length: Int
lazy val endOffset: Int = offset + length
// def from(line: String): this.type
}
case class UserId(value: String)
object UserId extends FixedLengthField {
override val offset: Int = 1
override val length: Int = 5
def from(line: String): UserId =
UserId(line.substring(UserId.offset, UserId.endOffset))
}
case class UserName(value: String)
object UserName extends FixedLengthField {
override val offset: Int = 6
override val length: Int = 10
def from(line: String): UserName =
UserName(line.substring(UserName.offset, UserName.endOffset).trim)
}
case class UserAge(value: Int)
object UserAge extends FixedLengthField {
override val offset: Int = 16
override val length: Int = 3
def from(line: String): UserAge =
UserAge(line.substring(UserAge.offset, UserAge.endOffset).trim.toInt)
}
case class User(id: UserId, name: UserName, age: UserAge)
case class AddressStreet(value: String)
object AddressStreet extends FixedLengthField {
override val offset: Int = 6
override val length: Int = 20
def from(line: String): AddressStreet =
AddressStreet(
line.substring(AddressStreet.offset, AddressStreet.endOffset).trim
)
}
case class AddressPostCode(value: String)
object AddressPostCode extends FixedLengthField {
override val offset: Int = 26
override val length: Int = 5
def from(line: String): AddressPostCode =
AddressPostCode(
line.substring(AddressPostCode.offset, AddressPostCode.endOffset).trim
)
}
case class AddressCity(value: String)
object AddressCity extends FixedLengthField {
override val offset: Int = 31
override val length: Int = 20
def from(line: String): AddressCity =
AddressCity(line.substring(AddressCity.offset, AddressCity.endOffset).trim)
}
case class Address(userId: UserId,
street: AddressStreet,
postCode: AddressPostCode,
city: AddressCity)
case class CompleteUser(id: UserId,
name: UserName,
age: UserAge,
address: Option[Address])
trait FromFixedLengthData[A] {
def fromFixedLength(line: String): A
}
object FromFixedLengthData {
import magnolia._
@inline def apply[A](
implicit ev: FromFixedLengthData[A]
): FromFixedLengthData[A] = ev
type Typeclass[A] = FromFixedLengthData[A]
implicit val userIdFromLength: FromFixedLengthData[UserId] = line =>
UserId.from(line)
implicit val userNameFromLength: FromFixedLengthData[UserName] = line =>
UserName.from(line)
implicit val userAgeFromLength: FromFixedLengthData[UserAge] = line =>
UserAge.from(line)
implicit val addressStreetFromLength: FromFixedLengthData[AddressStreet] =
line => AddressStreet.from(line)
implicit val addressPostCodeFromLength: FromFixedLengthData[AddressPostCode] =
line => AddressPostCode.from(line)
implicit val addressCityFromLength: FromFixedLengthData[AddressCity] = line =>
AddressCity.from(line)
def combine[T](ctx: CaseClass[Typeclass, T]): Typeclass[T] =
line =>
ctx.construct { p =>
p.typeclass.fromFixedLength(line)
}
implicit def gen[T]: Typeclass[T] = macro Magnolia.gen[T]
def fromFixedLength[A: FromFixedLengthData](line: String): A = FromFixedLengthData[A].fromFixedLength(line)
def main(args: Array[String]): Unit = {
val dataUser = "D12345 John 32"
val dataAddress = "A123455 RUE DU TEMPLES 93170BAGNOLET "
println(fromFixedLength[User](dataUser))
println(fromFixedLength[Address](dataAddress))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment