Skip to content

Instantly share code, notes, and snippets.

@balajisivaraman
Last active November 10, 2019 14:42
Show Gist options
  • Save balajisivaraman/83901d177cbfac61061bc4b926223037 to your computer and use it in GitHub Desktop.
Save balajisivaraman/83901d177cbfac61061bc4b926223037 to your computer and use it in GitHub Desktop.
Creating a wrapper for Hive ORC API using FS2
import java.sql.Timestamp
import java.time.{Instant, ZonedDateTime}
import cats.effect.IO
import cats.implicits._
import doobie._
import doobie.implicits._
import fs2.Stream
import scodec.bits.ByteVector
trait CSVSerializer[A] {
def toCSV(createdAt: Timestamp)(a: A): String
}
case class DataModel(id: Int, name: String)
object Main extends App {
implicit object DataModelCSVSerializer extends CSVSerializer[DataModel] {
override def toCSV(createdAt: Timestamp)(a: DataModel): String =
List(
a.id.show,
a.name.show,
createdAt.toString
).intercalate(",")
}
val xa = Transactor.fromDriverManager[IO](
"org.postgresql.Driver",
"jdbc:postgresql://localhost:5432/testdb",
"postgres",
""
)
val ts = new Timestamp(Instant.from(ZonedDateTime.now).toEpochMilli)
val byteStream: Stream[IO, ByteVector] =
sql"SELECT id, name FROM table_name"
.query[DataModel]
.streamWithChunkSize(5000)
.transact(xa)
.map((sc: DataModel) => DataModelCSVSerializer.toCSV(ts)(sc))
.map((str: String) => ByteVector.view(str.getBytes("UTF-8")))
println(
"The no. of records imported is " + HiveImporter
.writeToHive[IO](byteStream)
.runLog
.unsafeRunSync)
}
object HiveImporter {
def writeToHive[F[_]](s: Stream[F, ByteVector])(implicit F: Effect[F]): Stream[F, Long] = {
val thriftUrl = "thrift://localhost:9083"
for {
msc <- HiveMetastore.client[F](thriftUrl)
table <- msc.getTable("database", "table_name")
basePath <- msc.getWritePath(table, None)
writer <- OrcWriter.writer[F](thriftUrl, createSerde(thriftUrl, table), getFilePath(basePath))
_ <- s.to(writer.writeRows)
rowCount <- writer.getRowCount
} yield rowCount
}
}
trait HiveMetastore[F[_]] {
def getTable(database: String, table: String): Stream[F, Table]
def getWritePath(table: Table, partitionVal: Option[String]): Stream[F, Path]
}
object HiveMetastore {
def client[F[_]](thriftUrl: String)(implicit F: Effect[F]): Stream[F, HiveMetastore[F]] = {
def setup(thriftUrl: String): F[IMetaStoreClient] =
F.delay {
new HiveMetaStoreClient(Utils.getHiveConf(thriftUrl))
}
def cleanup(msc: IMetaStoreClient): F[Unit] =
F.delay {
msc.close()
}
Stream.bracket(setup(thriftUrl))({ msc =>
Stream.eval(mkMetastore[F](msc))
}, cleanup)
}
def mkMetastore[F[_]](msc: IMetaStoreClient)(implicit F: Effect[F]): F[HiveMetastore[F]] =
F.pure {
new HiveMetastore[F] {
def getTable(database: String, table: String): Stream[F, Table] =
Stream(msc.getTable(database, table)).covary[F]
def getWritePath(table: Table, partitionVal: Option[String]): Stream[F, Path] = {
val location = partitionVal
.map(
value =>
msc
.getPartition(table.getDbName, table.getTableName, value)
.getSd
.getLocation)
.getOrElse(table.getSd.getLocation)
Stream(new Path(location)).covary[F]
}
}
}
}
trait OrcWriter[F[_]] {
def writeBytes(byteVector: ByteVector): F[Unit]
def writeRows: Sink[F, ByteVector]
def getRowCount: Stream[F, Long]
}
object OrcWriter {
def writer[F[_]](thriftUrl: String, serde: AbstractSerDe, filePath: Path)(
implicit F: Effect[F]): Stream[F, OrcWriter[F]] = {
def setup(thriftUrl: String, serde: AbstractSerDe, filePath: Path): F[Writer] =
F.delay {
val options = OrcFile.writerOptions(Utils.getHiveConf(thriftUrl))
options.inspector(serde.getObjectInspector)
OrcFile.createWriter(filePath, options)
}
def cleanup(w: Writer): F[Unit] =
F.delay {
w.close()
}
Stream.bracket(setup(thriftUrl, serde, filePath))({ w =>
Stream.eval(mkOrcWriter[F](w, serde, filePath))
}, cleanup)
}
def mkOrcWriter[F[_]](writer: Writer, serde: AbstractSerDe, filePath: Path)(
implicit F: Effect[F]): F[OrcWriter[F]] = F.pure(
new OrcWriter[F] {
override def writeBytes(byteVector: ByteVector): F[Unit] = {
F.onError(F.catchNonFatal(
writer.addRow(serde.deserialize(new BytesWritable(byteVector.toArray))))) {
case _: SerDeException | _: IOException =>
F.catchNonFatal(FilePathUtils.deleteFilePathIfExists(filePath))
}
}
override def writeRows: Sink[F, ByteVector] =
_.flatMap(bs => Stream.eval(writeBytes(bs)))
override def getRowCount: Stream[F, Long] = Stream(writer.getNumberOfRows).covary[F]
}
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment