Skip to content

Instantly share code, notes, and snippets.

@mehmetgunturkun
Created May 23, 2017 18:16
Show Gist options
  • Save mehmetgunturkun/e3f7974819a29bc6cd048eb4366707cc to your computer and use it in GitHub Desktop.
Save mehmetgunturkun/e3f7974819a29bc6cd048eb4366707cc to your computer and use it in GitHub Desktop.
[Finagle.Mysql] Value Extraction with Implicit Converters
package com.twitter.finagle.mysql
import com.twitter.finagle.mysql.transport.{MysqlBuf, MysqlBufReader}
import com.twitter.io.Buf
/**
* A `Row` makes it easy to extract [[Value]]'s from a mysql row.
* Specific [[Value]]'s based on mysql column name can be accessed
* via the `apply` method.
*/
trait Row {
/**
* Contains a Field object for each
* Column in the Row. The data is 0-indexed
* so fields(0) contains the column meta-data
* for the first column in the Row.
*/
val fields: IndexedSeq[Field]
/** The values for this Row. */
val values: IndexedSeq[Value]
/**
* Retrieves the index of the column with the given
* name.
* @param columnName name of the column.
* @return Some(Int) if the column
* exists with the given name. Otherwise, None.
*/
def indexOf(columnName: String): Option[Int]
/**
* Retrieves the Value in the column with the
* given name.
* @param columnName name of the column.
* @return Some(Value) if the column
* exists with the given name. Otherwise, None.
*/
def apply(columnName: String): Option[Value] =
apply(indexOf(columnName))
protected def apply(columnIndex: Option[Int]): Option[Value] =
for (idx <- columnIndex) yield values(idx)
override def toString = (fields zip values).toString
}
private[mysql] trait StringValueConverter[Signed, Unsigned] {
def convertSigned(str: String): Value
def convertUnsigned(str: String): Value
}
private[mysql] object StringValueConverter {
implicit object StringTinyIntValueConverter extends StringValueConverter[Byte, Short] {
override def convertSigned(str: String): Value = ByteValue(str.toByte)
override def convertUnsigned(str: String): Value = ShortValue(str.toShort)
}
implicit object StringShortValueConverter extends StringValueConverter[Short, Int] {
override def convertSigned(str: String): Value = ShortValue(str.toShort)
override def convertUnsigned(str: String): Value = IntValue(str.toInt)
}
implicit object StringInt24ValueConverter extends StringValueConverter[Int, Int] {
override def convertSigned(str: String): Value = IntValue(str.toInt)
override def convertUnsigned(str: String): Value = IntValue(str.toInt)
}
implicit object StringIntValueConverter extends StringValueConverter[Int, Long] {
override def convertSigned(str: String): Value = IntValue(str.toInt)
override def convertUnsigned(str: String): Value = LongValue(str.toLong)
}
implicit object StringLongValueConverter extends StringValueConverter[Long, BigInt] {
override def convertSigned(str: String): Value = LongValue(str.toLong)
override def convertUnsigned(str: String): Value = BigIntValue(BigInt(str))
}
}
/**
* Defines a row where the data is presumed to be encoded with the mysql
* text-based protocol.
* [[http://dev.mysql.com/doc/internals/en/com-query-response.html#packet-ProtocolText::ResultsetRow]]
*/
class StringEncodedRow(rawRow: Buf, val fields: IndexedSeq[Field], indexMap: Map[String, Int]) extends Row {
private val reader = MysqlBuf.reader(rawRow)
import StringValueConverter._
/**
* Convert the string representation of each value
* into an appropriate Value object.
* [[http://dev.mysql.com/doc/internals/en/com-query-response.html#packet-ProtocolText::ResultsetRow]]
*/
lazy val values: IndexedSeq[Value] =
for (field <- fields) yield {
val charset = field.charset
val bytes = reader.readLengthCodedBytes()
if (bytes == null)
NullValue
else if (bytes.isEmpty)
EmptyValue
else if (!Charset.isCompatible(charset))
RawValue(field.fieldType, field.charset, false, bytes)
else {
val str = new String(bytes, Charset(charset))
field.fieldType match {
case Type.Tiny => getValueFromString[Byte, Short](str, field)
case Type.Short => getValueFromString[Short, Int](str, field)
case Type.Int24 => getValueFromString[Int, Int](str, field)
case Type.Long => getValueFromString[Int, Long](str, field)
case Type.LongLong => getValueFromString[Long, BigInt](str, field)
case Type.Float => FloatValue(str.toFloat)
case Type.Double => DoubleValue(str.toDouble)
case Type.Year => ShortValue(str.toShort)
// Nonbinary strings as stored in the CHAR, VARCHAR, and TEXT data types
case Type.VarChar | Type.String | Type.VarString |
Type.TinyBlob | Type.Blob | Type.MediumBlob
if !Charset.isBinary(charset) => StringValue(str)
// LongBlobs indicate a sequence of bytes with length >= 2^24 which
// can't fit into a Array[Byte]. This should be streamed and
// support for this needs to begin at the transport layer.
case Type.LongBlob => throw new UnsupportedOperationException("LongBlob is not supported!")
case typ => RawValue(typ, charset, isBinary = false, bytes)
}
}
}
def getValueFromString[T, R](str: String, field: Field)(implicit converter: StringValueConverter[T, R]): Value = {
if (field.isUnsigned()) {
converter.convertUnsigned(str)
} else {
converter.convertSigned(str)
}
}
def indexOf(name: String) = indexMap.get(name)
}
private[mysql] trait BinaryValueConverter[Signed, Unsigned] {
def convertSigned(reader: MysqlBufReader): Value
def convertUnsigned(reader: MysqlBufReader): Value
}
private[mysql] object BinaryValueConverter {
implicit object BinaryTinyIntValueConverter extends BinaryValueConverter[Byte, Short] {
override def convertSigned(reader: MysqlBufReader): Value = ByteValue(reader.readByte())
override def convertUnsigned(reader: MysqlBufReader): Value = ShortValue(reader.readUnsignedByte)
}
implicit object BinaryShortValueConverter extends BinaryValueConverter[Short, Int] {
override def convertSigned(reader: MysqlBufReader): Value = ShortValue(reader.readShortLE)
override def convertUnsigned(reader: MysqlBufReader): Value = IntValue(reader.readUnsignedShortLE())
}
implicit object BinaryInt24ValueConverter extends BinaryValueConverter[Int, Int] {
override def convertSigned(reader: MysqlBufReader): Value = IntValue(reader.readIntLE())
override def convertUnsigned(reader: MysqlBufReader): Value = IntValue(reader.readIntLE())
}
implicit object BinaryIntValueConverter extends BinaryValueConverter[Int, Long] {
override def convertSigned(reader: MysqlBufReader): Value = IntValue(reader.readIntLE)
override def convertUnsigned(reader: MysqlBufReader): Value = LongValue(reader.readUnsignedIntLE)
}
implicit object BinaryLongValueConverter extends BinaryValueConverter[Long, BigInt] {
override def convertSigned(reader: MysqlBufReader): Value = LongValue(reader.readLongLE)
override def convertUnsigned(reader: MysqlBufReader): Value = BigIntValue(reader.readUnsignedLongLE())
}
}
/**
* Defines a Row where the data is presumed to be encoded with the
* mysql binary protocol.
* [[http://dev.mysql.com/doc/internals/en/binary-protocol-resultset-row.html]]
*/
class BinaryEncodedRow(rawRow: Buf, val fields: IndexedSeq[Field], indexMap: Map[String, Int]) extends Row {
private val reader: MysqlBufReader = MysqlBuf.reader(rawRow)
reader.skip(1)
import BinaryValueConverter._
/**
* In a binary encoded row, null values are not sent from the
* server. Instead, the server sends a bit vector where
* each bit corresponds to the index of the column. If the bit
* is set, the value is null.
*/
val nullBitmap: BigInt = {
val len = ((fields.size + 7 + 2) / 8).toInt
val bytesAsBigEndian = reader.take(len).reverse
BigInt(bytesAsBigEndian)
}
/**
* Check if the bit is set. Note, the
* first 2 bits are reserved.
*/
def isNull(index: Int) = nullBitmap.testBit(index + 2)
/**
* Convert the binary representation of each value
* into an appropriate Value object.
*/
lazy val values: IndexedSeq[Value] =
for ((field, idx) <- fields.zipWithIndex) yield {
if (isNull(idx)) NullValue
else field.fieldType match {
case Type.Tiny => getValueFromMysqlBufReader[Byte, Short](reader, field)
case Type.Short => getValueFromMysqlBufReader[Short, Int](reader, field)
case Type.Int24 => getValueFromMysqlBufReader[Int, Int](reader, field)
case Type.Long => getValueFromMysqlBufReader[Int, Long](reader, field)
case Type.LongLong => getValueFromMysqlBufReader[Long, BigInt](reader, field)
case Type.Float => FloatValue(reader.readFloatLE())
case Type.Double => DoubleValue(reader.readDoubleLE())
case Type.Year => ShortValue(reader.readShortLE())
// Nonbinary strings as stored in the CHAR, VARCHAR, and TEXT data types
case Type.VarChar | Type.String | Type.VarString |
Type.TinyBlob | Type.Blob | Type.MediumBlob
if !Charset.isBinary(field.charset) && Charset.isCompatible(field.charset) =>
StringValue(reader.readLengthCodedString(Charset(field.charset)))
case Type.LongBlob => throw new UnsupportedOperationException("LongBlob is not supported!")
case typ => RawValue(typ, field.charset, isBinary = true, reader.readLengthCodedBytes())
}
}
def getValueFromMysqlBufReader[T, R](reader: MysqlBufReader, field: Field)(implicit converter: BinaryValueConverter[T, R]): Value = {
if (field.isUnsigned()) {
converter.convertUnsigned(reader)
} else {
converter.convertSigned(reader)
}
}
def indexOf(name: String) = indexMap.get(name)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment