Skip to content

Instantly share code, notes, and snippets.

@coocood
Created July 17, 2018 14:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save coocood/51ed5a4e4417fae4b74aa9b5055249d3 to your computer and use it in GitHub Desktop.
Save coocood/51ed5a4e4417fae4b74aa9b5055249d3 to your computer and use it in GitHub Desktop.
New row format prototype
package rowformat
import (
"encoding/binary"
"fmt"
"math"
"reflect"
"testing"
"time"
"unsafe"
"github.com/juju/errors"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
)
const codecVer = 128
type Row struct {
// small: colID []byte, offsets []uint16, optimized for most cases.
// large: colID []uint32, offsets []uint32.
large bool
numNotNullCols uint16
numNullCols uint16
colIDs []byte
offsets []uint16
data []byte
// for large row
colIDs32 []uint32
offsets32 []uint32
}
func (row *Row) decode(rowData []byte) {
if rowData[0] != codecVer {
return
}
row.large = rowData[1]&1 > 0
row.numNotNullCols = binary.LittleEndian.Uint16(rowData[2:])
row.numNullCols = binary.LittleEndian.Uint16(rowData[4:])
cursor := 6
if row.large {
colIDsLen := int(row.numNotNullCols+row.numNullCols) * 4
row.colIDs32 = bytesToU32Slice(rowData[cursor : cursor+colIDsLen])
cursor += colIDsLen
offsetsLen := int(row.numNotNullCols) * 4
row.offsets32 = bytesToU32Slice(rowData[cursor : cursor+offsetsLen])
cursor += offsetsLen
} else {
colIDsLen := int(row.numNotNullCols + row.numNullCols)
row.colIDs = rowData[cursor : cursor+colIDsLen]
cursor += colIDsLen
offsetsLen := int(row.numNotNullCols) * 2
row.offsets = bytes2U16Slice(rowData[cursor : cursor+offsetsLen])
cursor += offsetsLen
}
row.data = rowData[cursor:]
return
}
func (row *Row) cut(colIDs []int64, origDefs, buf [][]byte) {
for colIdx, colID := range colIDs {
// Search the column in not-null columns array.
i, j := 0, int(row.numNotNullCols)
var found bool
for i < j {
h := int(uint(i+j) >> 1) // avoid overflow when computing h
// i ≤ h < j
var v int64
if row.large {
v = int64(row.colIDs32[h])
} else {
v = int64(row.colIDs[h])
}
if v < colID {
i = h + 1
} else if v > colID {
j = h
} else {
found = true
buf[colIdx] = row.getData(h)
break
}
}
if found {
continue
}
// Search the column in null columns array.
i, j = int(row.numNotNullCols), int(row.numNotNullCols+row.numNullCols)
for i < j {
h := int(uint(i+j) >> 1) // avoid overflow when computing h
// i ≤ h < j
var v int64
if row.large {
v = int64(row.colIDs32[h])
} else {
v = int64(row.colIDs[h])
}
if v < colID {
i = h + 1
} else if v > colID {
j = h
} else {
found = true
break
}
}
if found {
buf[colIdx] = nil
} else {
buf[colIdx] = origDefs[colIdx]
}
}
return
}
func (row *Row) getData(i int) []byte {
var start, end uint32
if row.large {
if i > 0 {
start = row.offsets32[i-1]
}
end = row.offsets32[i]
} else {
if i > 0 {
start = uint32(row.offsets[i-1])
}
end = uint32(row.offsets[i])
}
return row.data[start:end]
}
func writeValuesToChunk(chk *chunk.Chunk, tps []*types.FieldType, vals [][]byte) error {
for colOff, val := range vals {
if val == nil {
chk.AppendNull(colOff)
continue
}
switch tps[colOff].Tp {
case mysql.TypeLonglong, mysql.TypeLong, mysql.TypeInt24, mysql.TypeShort, mysql.TypeTiny, mysql.TypeYear:
switch len(val) {
case 1:
chk.AppendInt64(colOff, int64(val[0]))
case 2:
chk.AppendInt64(colOff, int64(binary.LittleEndian.Uint16(val)))
case 4:
chk.AppendInt64(colOff, int64(binary.LittleEndian.Uint32(val)))
case 8:
chk.AppendInt64(colOff, int64(binary.LittleEndian.Uint64(val)))
}
case mysql.TypeString, mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeLongBlob, mysql.TypeMediumBlob, mysql.TypeBlob:
chk.AppendBytes(colOff, val)
case mysql.TypeFloat:
chk.AppendFloat32(colOff, math.Float32frombits(binary.LittleEndian.Uint32(val)))
case mysql.TypeDouble:
chk.AppendFloat64(colOff, math.Float64frombits(binary.LittleEndian.Uint64(val)))
case mysql.TypeNewDecimal:
_, dec, _, _, err := codec.DecodeDecimal(val)
if err != nil {
return errors.Trace(err)
}
chk.AppendMyDecimal(colOff, dec)
case mysql.TypeDatetime, mysql.TypeTimestamp, mysql.TypeDate:
// TODO
case mysql.TypeEnum, mysql.TypeSet, mysql.TypeBit:
// TODO
case mysql.TypeDuration:
// TODO
case mysql.TypeJSON:
// TODO
}
}
return nil
}
func bytes2U16Slice(b []byte) []uint16 {
if len(b) == 0 {
return nil
}
var u16s []uint16
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&u16s))
hdr.Len = len(b) / 2
hdr.Cap = hdr.Len
hdr.Data = uintptr(unsafe.Pointer(&b[0]))
return u16s
}
func bytesToU32Slice(b []byte) []uint32 {
if len(b) == 0 {
return nil
}
var u32s []uint32
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&u32s))
hdr.Len = len(b) / 4
hdr.Cap = hdr.Len
hdr.Data = uintptr(unsafe.Pointer(&b[0]))
return u32s
}
func (row *Row) encode(buf []byte) []byte {
buf = append(buf, codecVer)
flag := byte(0)
if row.large {
flag = 1
}
buf = append(buf, flag)
buf = append(buf, byte(row.numNotNullCols), byte(row.numNotNullCols>>8))
buf = append(buf, byte(row.numNullCols), byte(row.numNullCols>>8))
if row.large {
buf = append(buf, u32SliceToBytes(row.colIDs32)...)
buf = append(buf, u32SliceToBytes(row.offsets32)...)
} else {
buf = append(buf, row.colIDs...)
buf = append(buf, u16SliceToBytes(row.offsets)...)
}
buf = append(buf, row.data...)
return buf
}
func u16SliceToBytes(u16s []uint16) []byte {
if len(u16s) == 0 {
return nil
}
var b []byte
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
hdr.Len = len(u16s) * 2
hdr.Cap = hdr.Len
hdr.Data = uintptr(unsafe.Pointer(&u16s[0]))
return b
}
func u32SliceToBytes(u32s []uint32) []byte {
if len(u32s) == 0 {
return nil
}
var b []byte
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
hdr.Len = len(u32s) * 4
hdr.Cap = hdr.Len
hdr.Data = uintptr(unsafe.Pointer(&u32s[0]))
return b
}
func BenchmarkDecodeRow(b *testing.B) {
bin := makeRow(numCols)
var oRow Row
oRow.decode(bin)
colIDs := makeQueryColumnIDs(colStart, colEnd)
ft := types.NewFieldType(mysql.TypeLonglong)
var tps []*types.FieldType
for i := 0; i < len(colIDs); i++ {
tps = append(tps, ft)
}
chk := chunk.NewChunkWithCapacity(tps, 1)
buf := make([][]byte, len(colIDs))
orignDefs := make([][]byte, len(colIDs))
oRow.cut(colIDs, orignDefs, buf)
writeValuesToChunk(chk, tps, buf)
fmt.Println(chk.GetRow(0).GetInt64(0), chk.GetRow(0).GetInt64(0))
var row Row
for i := 0; i < b.N; i++ {
chk.Reset()
row.decode(bin)
row.cut(colIDs, orignDefs, buf)
writeValuesToChunk(chk, tps, buf)
}
_ = row
}
func BenchmarkDecodeOldRow(b *testing.B) {
bin := makeOldRow(numCols)
colIDs := make(map[int64]int)
var off int
for i := int64(colStart); i < colEnd; i++ {
colIDs[i] = off
off++
}
buf := make([][]byte, len(colIDs))
ft := types.NewFieldType(mysql.TypeLonglong)
var tps []*types.FieldType
for i := 0; i < len(colIDs); i++ {
tps = append(tps, ft)
}
chk := chunk.NewChunkWithCapacity(tps, 1)
decoder := codec.NewDecoder(chk, time.Local)
CutRowNew(bin, colIDs, buf)
for j, data := range buf {
decoder.DecodeOne(data, j, tps[j])
}
for i := 0; i < b.N; i++ {
chk.Reset()
CutRowNew(bin, colIDs, buf)
for j, data := range buf {
decoder.DecodeOne(data, j, tps[j])
}
}
}
// CutRowNew cuts encoded row into byte slices and return columns' byte slice.
// Row layout: colID1, value1, colID2, value2, .....
func CutRowNew(data []byte, colIDs map[int64]int, buf [][]byte) error {
if data == nil {
return nil
}
if len(data) == 1 && data[0] == codec.NilFlag {
return nil
}
var (
cnt int
b []byte
)
for len(data) > 0 && cnt < len(colIDs) {
// Get col id.
remain, cid, err := codec.DecodeVarint(data[1:])
// Get col value.
b, data, err = codec.CutOne(remain)
if err != nil {
return errors.Trace(err)
}
offset, ok := colIDs[int64(cid)]
if ok {
buf[offset] = b
cnt++
}
}
return nil
}
func BenchmarkDecodeSysbenchRow(b *testing.B) {
bin := makeSysbenchRow()
var oRow Row
oRow.decode(bin)
colIDs := []int64{3}
rowTypes := makeSysbenchRowTypes()
queryTypes := rowTypes[1:2]
chk := chunk.NewChunkWithCapacity(queryTypes, 1)
buf := make([][]byte, len(colIDs))
orignDefs := make([][]byte, len(colIDs))
oRow.cut(colIDs, orignDefs, buf)
writeValuesToChunk(chk, queryTypes, buf)
var row Row
for i := 0; i < b.N; i++ {
chk.Reset()
row.decode(bin)
row.cut(colIDs, orignDefs, buf)
writeValuesToChunk(chk, queryTypes, buf)
}
_ = row
}
func BenchmarkDecodeSysbenchOldRow(b *testing.B) {
bin := makeSysbenchOldRow()
colIDs := make(map[int64]int)
colIDs[3] = 0
buf := make([][]byte, len(colIDs))
rowTypes := makeSysbenchRowTypes()
queryTypes := rowTypes[1:2]
chk := chunk.NewChunkWithCapacity(queryTypes, 1)
decoder := codec.NewDecoder(chk, time.Local)
CutRowNew(bin, colIDs, buf)
for j, data := range buf {
decoder.DecodeOne(data, j, queryTypes[j])
}
for i := 0; i < b.N; i++ {
chk.Reset()
CutRowNew(bin, colIDs, buf)
for j, data := range buf {
decoder.DecodeOne(data, j, queryTypes[j])
}
}
}
const (
numCols = 3
colStart = 1
colEnd = 3
)
func makeRow(nCols int) []byte {
var buf []byte
buf = append(buf, codecVer, 0)
buf = append(buf, byte(nCols), byte(nCols>>8))
buf = append(buf, 0, 0)
// column IDs
for i := 0; i < nCols; i++ {
buf = append(buf, byte(i+1))
}
// Offsets
for i := 0; i < nCols; i++ {
buf = append(buf, byte(i+1), 0)
}
// Data
for i := 0; i < nCols; i++ {
buf = append(buf, byte(i))
}
return buf
}
func makeOldRow(nCols int) []byte {
var row []byte
for i := 0; i < nCols; i++ {
// column ID and value
row, _ = codec.EncodeValue(nil, row, types.NewIntDatum(int64(i)+1), types.NewIntDatum(int64(i)))
}
return row
}
func makeQueryColumnIDs(start, end int64) []int64 {
l := end - start
ids := make([]int64, l)
for i := range ids {
ids[i] = int64(i) + start
}
return ids
}
func makeSysbenchRow() []byte {
var row Row
row.numNotNullCols = 3
row.colIDs = []byte{2, 3, 4}
row.offsets = []uint16{4, 124, 184}
row.data = make([]byte, 4+120+60)
return row.encode(nil)
}
func makeSysbenchOldRow() []byte {
bin, _ := codec.EncodeValue(nil, nil, types.NewIntDatum(2), types.NewIntDatum(0),
types.NewIntDatum(3), types.NewBytesDatum(make([]byte, 120)),
types.NewIntDatum(4), types.NewBytesDatum(make([]byte, 60)))
return bin
}
func makeSysbenchRowTypes() []*types.FieldType {
fields := make([]*types.FieldType, 3)
fields[0] = types.NewFieldType(mysql.TypeLonglong)
fields[1] = types.NewFieldType(mysql.TypeVarchar)
fields[2] = types.NewFieldType(mysql.TypeVarchar)
return fields
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment