Skip to content

Instantly share code, notes, and snippets.

@tkrs
Last active January 3, 2021 19:50
Show Gist options
  • Save tkrs/8068f42b1a1c6d98183cfac680edbca9 to your computer and use it in GitHub Desktop.
Save tkrs/8068f42b1a1c6d98183cfac680edbca9 to your computer and use it in GitHub Desktop.
Bigtable CLI
import $ivy.`com.github.tkrs::orcus-bigtable:0.25.2`
import $ivy.`com.github.tkrs::orcus-cats-effect:0.25.2`
import $ivy.`com.github.tkrs::mess-core:0.2.4`
import $ivy.`com.github.scopt::scopt:4.0.0`
import java.time.Instant
import cats.effect.IO
import cats.instances.map._
import cats.syntax.all._
import cats.syntax.group
import com.google.cloud.bigtable.data.v2.BigtableDataClient
import com.google.cloud.bigtable.data.v2.BigtableDataSettings
import com.google.cloud.bigtable.data.v2.models.Filters.FILTERS
import com.google.cloud.bigtable.data.v2.models.Query
import com.google.cloud.bigtable.data.v2.models.RowCell
import com.google.protobuf.ByteString
import mainargs.TokensReader
import mess._
import mess.codec._
import orcus.async.instances.catsEffect.effect._
import orcus.bigtable.DataClient
import orcus.bigtable.async.implicits._
import org.msgpack.core.MessagePack
import scopt.Read
@main
def lookup(
project: String,
instance: String,
table: String,
profile: String = "",
key: ByteString,
@arg(name = "cells-per-column")
cellsPerColumn: Option[Int] = None,
@arg(doc = "family:qualifier:format,...")
format: Map[String, Map[String, Format]] = Map.empty
): Unit = {
val client = buildClient(project, instance, profile)
val chain = FILTERS.chain()
cellsPerColumn.foreach { x =>
chain.filter(FILTERS.limit().cellsPerColumn(x))
}
if (!format.contains("*")) {
chain.filter(
format.foldLeft(FILTERS.interleave()) { case (acc, (key, _)) =>
acc.filter(FILTERS.family().exactMatch(key))
}
)
}
val q = Query.create(table).prefix(key).filter(chain)
try client.readRowAsync(q).unsafeRunSync() match {
case Some(row) =>
val size =
row.families.toList
.flatMap(_._2)
.map { cell =>
Seq(
cell.getFamily().size,
cell.getQualifier().size,
cell.getValue().size
).sum
}
.sum
println(s""""${row.rowKey}":""")
println(s""" size: $size""")
println(s""" families:""")
row.families.foreach { case (family, cells) =>
val f = format
.get(family)
.orElse(format.get("*"))
.getOrElse(Map("*" -> Format.String))
println(s""" "${family}":""")
val grouped =
cells.groupBy(_.getQualifier().toStringUtf8()).toList.sortBy(_._1)
grouped.foreach { case (q, cells) =>
println(s""" "$q":""")
cells.foreach { cell =>
val ts = Instant.ofEpochMilli(cell.getTimestamp() / 1000L)
val v = formatValue(q, cell, f)
println(s" - value: ${v}")
println(s" size: ${cell.getValue().size()}")
println(s" timestamp: ${ts}")
}
}
}
case _ =>
println("not found")
} finally client.close()
}
implicit object readByteString
extends TokensReader[ByteString](
"key",
args => TokensReader.tryEither(ByteString.copyFromUtf8(args.head))
)
implicit object ReadFormat
extends TokensReader[Map[String, Map[String, Format]]](
"format",
s =>
TokensReader.tryEither {
s.head
.split(',')
.toList
.map(_.split(':') match {
case Array(a, b, c) => a -> Map(b -> Format(c))
})
.foldLeft[Map[String, Map[String, Format]]](Map.empty) {
case (acc, (a, b)) =>
acc.updated(a, acc.get(a).fold(b)(_ ++ b))
}
}
)
sealed abstract class Format(val value: String)
object Format {
def apply(v: String): Format = v match {
case String.value => String
case Int.value => Int
case Long.value => Long
case Msgpack.value => Msgpack
case _ => throw new IllegalArgumentException(v)
}
case object String extends Format("string")
case object Int extends Format("int")
case object Long extends Format("long")
case object Msgpack extends Format("msgpack")
}
def buildClient(
project: String,
instance: String,
profile: String
): DataClient[IO] = {
val settings = BigtableDataSettings
.newBuilder()
.setProjectId(project)
.setInstanceId(instance)
if (profile.nonEmpty) {
settings.setAppProfileId(profile)
}
DataClient(BigtableDataClient.create(settings.build()))
}
def formatValue(q: String, cell: RowCell, f: Map[String, Format]): String = {
if (cell.getValue().isEmpty()) ""
else
f
.get(q)
.orElse(f.get("*"))
.getOrElse(Format.String) match {
case Format.Int =>
orcus.bigtable.codec
.PrimitiveDecoder[Int]
.apply(cell.getValue())
.toTry
.get
.toString()
case Format.Long =>
orcus.bigtable.codec
.PrimitiveDecoder[Long]
.apply(cell.getValue())
.toTry
.get
.toString()
case Format.String =>
s""""${cell.getValue().toStringUtf8()}""""
case Format.Msgpack =>
formatMsgpack(
Fmt
.unpack(
cell.getValue().toByteArray(),
MessagePack.DEFAULT_UNPACKER_CONFIG
)
)
}
}
def formatMsgpack(a: Fmt): String = a match {
case Fmt.MNil => "nil"
case Fmt.MUnit => ""
case v: Fmt.MBool => v.value.toString()
case Fmt.MByte(v) => v.toString()
case Fmt.MShort(v) => v.toString()
case Fmt.MInt(v) => v.toString()
case Fmt.MLong(v) => v.toString()
case Fmt.MBigInt(v) => v.toString()
case Fmt.MFloat(v) => v.toString()
case Fmt.MDouble(v) => v.toString()
case Fmt.MString(v) => s""""${v}""""
case Fmt.MBin(v) => binString(v)
case Fmt.MExtension(t, l, v) =>
s"""{ type: 0x${"%x02".format(t)}, size: $l, value: ${binString(v)} }"""
case v: Fmt.MMap =>
v.iterator
.map { case (k, v) =>
s"${formatMsgpack(k)}: ${formatMsgpack(v)}"
}
.mkString("{ ", ", ", " }")
case v: Fmt.MArray =>
v.iterator.map(formatMsgpack).mkString("[ ", ", ", " ]")
}
def binString(xs: Array[Byte]): String =
xs.map(v => "%x02".format(v)).mkString("0x", "", "")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment