-
-
Save mehmetgunturkun/e3f7974819a29bc6cd048eb4366707cc to your computer and use it in GitHub Desktop.
[Finagle.Mysql] Value Extraction with Implicit Converters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.finagle.mysql | |
import com.twitter.finagle.mysql.transport.{MysqlBuf, MysqlBufReader} | |
import com.twitter.io.Buf | |
/** | |
* A `Row` makes it easy to extract [[Value]]'s from a mysql row. | |
* Specific [[Value]]'s based on mysql column name can be accessed | |
* via the `apply` method. | |
*/ | |
trait Row { | |
/** | |
* Contains a Field object for each | |
* Column in the Row. The data is 0-indexed | |
* so fields(0) contains the column meta-data | |
* for the first column in the Row. | |
*/ | |
val fields: IndexedSeq[Field] | |
/** The values for this Row. */ | |
val values: IndexedSeq[Value] | |
/** | |
* Retrieves the index of the column with the given | |
* name. | |
* @param columnName name of the column. | |
* @return Some(Int) if the column | |
* exists with the given name. Otherwise, None. | |
*/ | |
def indexOf(columnName: String): Option[Int] | |
/** | |
* Retrieves the Value in the column with the | |
* given name. | |
* @param columnName name of the column. | |
* @return Some(Value) if the column | |
* exists with the given name. Otherwise, None. | |
*/ | |
def apply(columnName: String): Option[Value] = | |
apply(indexOf(columnName)) | |
protected def apply(columnIndex: Option[Int]): Option[Value] = | |
for (idx <- columnIndex) yield values(idx) | |
override def toString = (fields zip values).toString | |
} | |
private[mysql] trait StringValueConverter[Signed, Unsigned] { | |
def convertSigned(str: String): Value | |
def convertUnsigned(str: String): Value | |
} | |
private[mysql] object StringValueConverter { | |
implicit object StringTinyIntValueConverter extends StringValueConverter[Byte, Short] { | |
override def convertSigned(str: String): Value = ByteValue(str.toByte) | |
override def convertUnsigned(str: String): Value = ShortValue(str.toShort) | |
} | |
implicit object StringShortValueConverter extends StringValueConverter[Short, Int] { | |
override def convertSigned(str: String): Value = ShortValue(str.toShort) | |
override def convertUnsigned(str: String): Value = IntValue(str.toInt) | |
} | |
implicit object StringInt24ValueConverter extends StringValueConverter[Int, Int] { | |
override def convertSigned(str: String): Value = IntValue(str.toInt) | |
override def convertUnsigned(str: String): Value = IntValue(str.toInt) | |
} | |
implicit object StringIntValueConverter extends StringValueConverter[Int, Long] { | |
override def convertSigned(str: String): Value = IntValue(str.toInt) | |
override def convertUnsigned(str: String): Value = LongValue(str.toLong) | |
} | |
implicit object StringLongValueConverter extends StringValueConverter[Long, BigInt] { | |
override def convertSigned(str: String): Value = LongValue(str.toLong) | |
override def convertUnsigned(str: String): Value = BigIntValue(BigInt(str)) | |
} | |
} | |
/** | |
* Defines a row where the data is presumed to be encoded with the mysql | |
* text-based protocol. | |
* [[http://dev.mysql.com/doc/internals/en/com-query-response.html#packet-ProtocolText::ResultsetRow]] | |
*/ | |
class StringEncodedRow(rawRow: Buf, val fields: IndexedSeq[Field], indexMap: Map[String, Int]) extends Row { | |
private val reader = MysqlBuf.reader(rawRow) | |
import StringValueConverter._ | |
/** | |
* Convert the string representation of each value | |
* into an appropriate Value object. | |
* [[http://dev.mysql.com/doc/internals/en/com-query-response.html#packet-ProtocolText::ResultsetRow]] | |
*/ | |
lazy val values: IndexedSeq[Value] = | |
for (field <- fields) yield { | |
val charset = field.charset | |
val bytes = reader.readLengthCodedBytes() | |
if (bytes == null) | |
NullValue | |
else if (bytes.isEmpty) | |
EmptyValue | |
else if (!Charset.isCompatible(charset)) | |
RawValue(field.fieldType, field.charset, false, bytes) | |
else { | |
val str = new String(bytes, Charset(charset)) | |
field.fieldType match { | |
case Type.Tiny => getValueFromString[Byte, Short](str, field) | |
case Type.Short => getValueFromString[Short, Int](str, field) | |
case Type.Int24 => getValueFromString[Int, Int](str, field) | |
case Type.Long => getValueFromString[Int, Long](str, field) | |
case Type.LongLong => getValueFromString[Long, BigInt](str, field) | |
case Type.Float => FloatValue(str.toFloat) | |
case Type.Double => DoubleValue(str.toDouble) | |
case Type.Year => ShortValue(str.toShort) | |
// Nonbinary strings as stored in the CHAR, VARCHAR, and TEXT data types | |
case Type.VarChar | Type.String | Type.VarString | | |
Type.TinyBlob | Type.Blob | Type.MediumBlob | |
if !Charset.isBinary(charset) => StringValue(str) | |
// LongBlobs indicate a sequence of bytes with length >= 2^24 which | |
// can't fit into a Array[Byte]. This should be streamed and | |
// support for this needs to begin at the transport layer. | |
case Type.LongBlob => throw new UnsupportedOperationException("LongBlob is not supported!") | |
case typ => RawValue(typ, charset, isBinary = false, bytes) | |
} | |
} | |
} | |
def getValueFromString[T, R](str: String, field: Field)(implicit converter: StringValueConverter[T, R]): Value = { | |
if (field.isUnsigned()) { | |
converter.convertUnsigned(str) | |
} else { | |
converter.convertSigned(str) | |
} | |
} | |
def indexOf(name: String) = indexMap.get(name) | |
} | |
private[mysql] trait BinaryValueConverter[Signed, Unsigned] { | |
def convertSigned(reader: MysqlBufReader): Value | |
def convertUnsigned(reader: MysqlBufReader): Value | |
} | |
private[mysql] object BinaryValueConverter { | |
implicit object BinaryTinyIntValueConverter extends BinaryValueConverter[Byte, Short] { | |
override def convertSigned(reader: MysqlBufReader): Value = ByteValue(reader.readByte()) | |
override def convertUnsigned(reader: MysqlBufReader): Value = ShortValue(reader.readUnsignedByte) | |
} | |
implicit object BinaryShortValueConverter extends BinaryValueConverter[Short, Int] { | |
override def convertSigned(reader: MysqlBufReader): Value = ShortValue(reader.readShortLE) | |
override def convertUnsigned(reader: MysqlBufReader): Value = IntValue(reader.readUnsignedShortLE()) | |
} | |
implicit object BinaryInt24ValueConverter extends BinaryValueConverter[Int, Int] { | |
override def convertSigned(reader: MysqlBufReader): Value = IntValue(reader.readIntLE()) | |
override def convertUnsigned(reader: MysqlBufReader): Value = IntValue(reader.readIntLE()) | |
} | |
implicit object BinaryIntValueConverter extends BinaryValueConverter[Int, Long] { | |
override def convertSigned(reader: MysqlBufReader): Value = IntValue(reader.readIntLE) | |
override def convertUnsigned(reader: MysqlBufReader): Value = LongValue(reader.readUnsignedIntLE) | |
} | |
implicit object BinaryLongValueConverter extends BinaryValueConverter[Long, BigInt] { | |
override def convertSigned(reader: MysqlBufReader): Value = LongValue(reader.readLongLE) | |
override def convertUnsigned(reader: MysqlBufReader): Value = BigIntValue(reader.readUnsignedLongLE()) | |
} | |
} | |
/** | |
* Defines a Row where the data is presumed to be encoded with the | |
* mysql binary protocol. | |
* [[http://dev.mysql.com/doc/internals/en/binary-protocol-resultset-row.html]] | |
*/ | |
class BinaryEncodedRow(rawRow: Buf, val fields: IndexedSeq[Field], indexMap: Map[String, Int]) extends Row { | |
private val reader: MysqlBufReader = MysqlBuf.reader(rawRow) | |
reader.skip(1) | |
import BinaryValueConverter._ | |
/** | |
* In a binary encoded row, null values are not sent from the | |
* server. Instead, the server sends a bit vector where | |
* each bit corresponds to the index of the column. If the bit | |
* is set, the value is null. | |
*/ | |
val nullBitmap: BigInt = { | |
val len = ((fields.size + 7 + 2) / 8).toInt | |
val bytesAsBigEndian = reader.take(len).reverse | |
BigInt(bytesAsBigEndian) | |
} | |
/** | |
* Check if the bit is set. Note, the | |
* first 2 bits are reserved. | |
*/ | |
def isNull(index: Int) = nullBitmap.testBit(index + 2) | |
/** | |
* Convert the binary representation of each value | |
* into an appropriate Value object. | |
*/ | |
lazy val values: IndexedSeq[Value] = | |
for ((field, idx) <- fields.zipWithIndex) yield { | |
if (isNull(idx)) NullValue | |
else field.fieldType match { | |
case Type.Tiny => getValueFromMysqlBufReader[Byte, Short](reader, field) | |
case Type.Short => getValueFromMysqlBufReader[Short, Int](reader, field) | |
case Type.Int24 => getValueFromMysqlBufReader[Int, Int](reader, field) | |
case Type.Long => getValueFromMysqlBufReader[Int, Long](reader, field) | |
case Type.LongLong => getValueFromMysqlBufReader[Long, BigInt](reader, field) | |
case Type.Float => FloatValue(reader.readFloatLE()) | |
case Type.Double => DoubleValue(reader.readDoubleLE()) | |
case Type.Year => ShortValue(reader.readShortLE()) | |
// Nonbinary strings as stored in the CHAR, VARCHAR, and TEXT data types | |
case Type.VarChar | Type.String | Type.VarString | | |
Type.TinyBlob | Type.Blob | Type.MediumBlob | |
if !Charset.isBinary(field.charset) && Charset.isCompatible(field.charset) => | |
StringValue(reader.readLengthCodedString(Charset(field.charset))) | |
case Type.LongBlob => throw new UnsupportedOperationException("LongBlob is not supported!") | |
case typ => RawValue(typ, field.charset, isBinary = true, reader.readLengthCodedBytes()) | |
} | |
} | |
def getValueFromMysqlBufReader[T, R](reader: MysqlBufReader, field: Field)(implicit converter: BinaryValueConverter[T, R]): Value = { | |
if (field.isUnsigned()) { | |
converter.convertUnsigned(reader) | |
} else { | |
converter.convertSigned(reader) | |
} | |
} | |
def indexOf(name: String) = indexMap.get(name) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment