Created
July 17, 2018 14:09
-
-
Save coocood/51ed5a4e4417fae4b74aa9b5055249d3 to your computer and use it in GitHub Desktop.
New row format prototype
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package rowformat | |
import ( | |
"encoding/binary" | |
"fmt" | |
"math" | |
"reflect" | |
"testing" | |
"time" | |
"unsafe" | |
"github.com/juju/errors" | |
"github.com/pingcap/tidb/mysql" | |
"github.com/pingcap/tidb/types" | |
"github.com/pingcap/tidb/util/chunk" | |
"github.com/pingcap/tidb/util/codec" | |
) | |
const codecVer = 128 | |
type Row struct { | |
// small: colID []byte, offsets []uint16, optimized for most cases. | |
// large: colID []uint32, offsets []uint32. | |
large bool | |
numNotNullCols uint16 | |
numNullCols uint16 | |
colIDs []byte | |
offsets []uint16 | |
data []byte | |
// for large row | |
colIDs32 []uint32 | |
offsets32 []uint32 | |
} | |
func (row *Row) decode(rowData []byte) { | |
if rowData[0] != codecVer { | |
return | |
} | |
row.large = rowData[1]&1 > 0 | |
row.numNotNullCols = binary.LittleEndian.Uint16(rowData[2:]) | |
row.numNullCols = binary.LittleEndian.Uint16(rowData[4:]) | |
cursor := 6 | |
if row.large { | |
colIDsLen := int(row.numNotNullCols+row.numNullCols) * 4 | |
row.colIDs32 = bytesToU32Slice(rowData[cursor : cursor+colIDsLen]) | |
cursor += colIDsLen | |
offsetsLen := int(row.numNotNullCols) * 4 | |
row.offsets32 = bytesToU32Slice(rowData[cursor : cursor+offsetsLen]) | |
cursor += offsetsLen | |
} else { | |
colIDsLen := int(row.numNotNullCols + row.numNullCols) | |
row.colIDs = rowData[cursor : cursor+colIDsLen] | |
cursor += colIDsLen | |
offsetsLen := int(row.numNotNullCols) * 2 | |
row.offsets = bytes2U16Slice(rowData[cursor : cursor+offsetsLen]) | |
cursor += offsetsLen | |
} | |
row.data = rowData[cursor:] | |
return | |
} | |
func (row *Row) cut(colIDs []int64, origDefs, buf [][]byte) { | |
for colIdx, colID := range colIDs { | |
// Search the column in not-null columns array. | |
i, j := 0, int(row.numNotNullCols) | |
var found bool | |
for i < j { | |
h := int(uint(i+j) >> 1) // avoid overflow when computing h | |
// i ≤ h < j | |
var v int64 | |
if row.large { | |
v = int64(row.colIDs32[h]) | |
} else { | |
v = int64(row.colIDs[h]) | |
} | |
if v < colID { | |
i = h + 1 | |
} else if v > colID { | |
j = h | |
} else { | |
found = true | |
buf[colIdx] = row.getData(h) | |
break | |
} | |
} | |
if found { | |
continue | |
} | |
// Search the column in null columns array. | |
i, j = int(row.numNotNullCols), int(row.numNotNullCols+row.numNullCols) | |
for i < j { | |
h := int(uint(i+j) >> 1) // avoid overflow when computing h | |
// i ≤ h < j | |
var v int64 | |
if row.large { | |
v = int64(row.colIDs32[h]) | |
} else { | |
v = int64(row.colIDs[h]) | |
} | |
if v < colID { | |
i = h + 1 | |
} else if v > colID { | |
j = h | |
} else { | |
found = true | |
break | |
} | |
} | |
if found { | |
buf[colIdx] = nil | |
} else { | |
buf[colIdx] = origDefs[colIdx] | |
} | |
} | |
return | |
} | |
func (row *Row) getData(i int) []byte { | |
var start, end uint32 | |
if row.large { | |
if i > 0 { | |
start = row.offsets32[i-1] | |
} | |
end = row.offsets32[i] | |
} else { | |
if i > 0 { | |
start = uint32(row.offsets[i-1]) | |
} | |
end = uint32(row.offsets[i]) | |
} | |
return row.data[start:end] | |
} | |
func writeValuesToChunk(chk *chunk.Chunk, tps []*types.FieldType, vals [][]byte) error { | |
for colOff, val := range vals { | |
if val == nil { | |
chk.AppendNull(colOff) | |
continue | |
} | |
switch tps[colOff].Tp { | |
case mysql.TypeLonglong, mysql.TypeLong, mysql.TypeInt24, mysql.TypeShort, mysql.TypeTiny, mysql.TypeYear: | |
switch len(val) { | |
case 1: | |
chk.AppendInt64(colOff, int64(val[0])) | |
case 2: | |
chk.AppendInt64(colOff, int64(binary.LittleEndian.Uint16(val))) | |
case 4: | |
chk.AppendInt64(colOff, int64(binary.LittleEndian.Uint32(val))) | |
case 8: | |
chk.AppendInt64(colOff, int64(binary.LittleEndian.Uint64(val))) | |
} | |
case mysql.TypeString, mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeLongBlob, mysql.TypeMediumBlob, mysql.TypeBlob: | |
chk.AppendBytes(colOff, val) | |
case mysql.TypeFloat: | |
chk.AppendFloat32(colOff, math.Float32frombits(binary.LittleEndian.Uint32(val))) | |
case mysql.TypeDouble: | |
chk.AppendFloat64(colOff, math.Float64frombits(binary.LittleEndian.Uint64(val))) | |
case mysql.TypeNewDecimal: | |
_, dec, _, _, err := codec.DecodeDecimal(val) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
chk.AppendMyDecimal(colOff, dec) | |
case mysql.TypeDatetime, mysql.TypeTimestamp, mysql.TypeDate: | |
// TODO | |
case mysql.TypeEnum, mysql.TypeSet, mysql.TypeBit: | |
// TODO | |
case mysql.TypeDuration: | |
// TODO | |
case mysql.TypeJSON: | |
// TODO | |
} | |
} | |
return nil | |
} | |
func bytes2U16Slice(b []byte) []uint16 { | |
if len(b) == 0 { | |
return nil | |
} | |
var u16s []uint16 | |
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&u16s)) | |
hdr.Len = len(b) / 2 | |
hdr.Cap = hdr.Len | |
hdr.Data = uintptr(unsafe.Pointer(&b[0])) | |
return u16s | |
} | |
func bytesToU32Slice(b []byte) []uint32 { | |
if len(b) == 0 { | |
return nil | |
} | |
var u32s []uint32 | |
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&u32s)) | |
hdr.Len = len(b) / 4 | |
hdr.Cap = hdr.Len | |
hdr.Data = uintptr(unsafe.Pointer(&b[0])) | |
return u32s | |
} | |
func (row *Row) encode(buf []byte) []byte { | |
buf = append(buf, codecVer) | |
flag := byte(0) | |
if row.large { | |
flag = 1 | |
} | |
buf = append(buf, flag) | |
buf = append(buf, byte(row.numNotNullCols), byte(row.numNotNullCols>>8)) | |
buf = append(buf, byte(row.numNullCols), byte(row.numNullCols>>8)) | |
if row.large { | |
buf = append(buf, u32SliceToBytes(row.colIDs32)...) | |
buf = append(buf, u32SliceToBytes(row.offsets32)...) | |
} else { | |
buf = append(buf, row.colIDs...) | |
buf = append(buf, u16SliceToBytes(row.offsets)...) | |
} | |
buf = append(buf, row.data...) | |
return buf | |
} | |
func u16SliceToBytes(u16s []uint16) []byte { | |
if len(u16s) == 0 { | |
return nil | |
} | |
var b []byte | |
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) | |
hdr.Len = len(u16s) * 2 | |
hdr.Cap = hdr.Len | |
hdr.Data = uintptr(unsafe.Pointer(&u16s[0])) | |
return b | |
} | |
func u32SliceToBytes(u32s []uint32) []byte { | |
if len(u32s) == 0 { | |
return nil | |
} | |
var b []byte | |
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) | |
hdr.Len = len(u32s) * 4 | |
hdr.Cap = hdr.Len | |
hdr.Data = uintptr(unsafe.Pointer(&u32s[0])) | |
return b | |
} | |
func BenchmarkDecodeRow(b *testing.B) { | |
bin := makeRow(numCols) | |
var oRow Row | |
oRow.decode(bin) | |
colIDs := makeQueryColumnIDs(colStart, colEnd) | |
ft := types.NewFieldType(mysql.TypeLonglong) | |
var tps []*types.FieldType | |
for i := 0; i < len(colIDs); i++ { | |
tps = append(tps, ft) | |
} | |
chk := chunk.NewChunkWithCapacity(tps, 1) | |
buf := make([][]byte, len(colIDs)) | |
orignDefs := make([][]byte, len(colIDs)) | |
oRow.cut(colIDs, orignDefs, buf) | |
writeValuesToChunk(chk, tps, buf) | |
fmt.Println(chk.GetRow(0).GetInt64(0), chk.GetRow(0).GetInt64(0)) | |
var row Row | |
for i := 0; i < b.N; i++ { | |
chk.Reset() | |
row.decode(bin) | |
row.cut(colIDs, orignDefs, buf) | |
writeValuesToChunk(chk, tps, buf) | |
} | |
_ = row | |
} | |
func BenchmarkDecodeOldRow(b *testing.B) { | |
bin := makeOldRow(numCols) | |
colIDs := make(map[int64]int) | |
var off int | |
for i := int64(colStart); i < colEnd; i++ { | |
colIDs[i] = off | |
off++ | |
} | |
buf := make([][]byte, len(colIDs)) | |
ft := types.NewFieldType(mysql.TypeLonglong) | |
var tps []*types.FieldType | |
for i := 0; i < len(colIDs); i++ { | |
tps = append(tps, ft) | |
} | |
chk := chunk.NewChunkWithCapacity(tps, 1) | |
decoder := codec.NewDecoder(chk, time.Local) | |
CutRowNew(bin, colIDs, buf) | |
for j, data := range buf { | |
decoder.DecodeOne(data, j, tps[j]) | |
} | |
for i := 0; i < b.N; i++ { | |
chk.Reset() | |
CutRowNew(bin, colIDs, buf) | |
for j, data := range buf { | |
decoder.DecodeOne(data, j, tps[j]) | |
} | |
} | |
} | |
// CutRowNew cuts encoded row into byte slices and return columns' byte slice. | |
// Row layout: colID1, value1, colID2, value2, ..... | |
func CutRowNew(data []byte, colIDs map[int64]int, buf [][]byte) error { | |
if data == nil { | |
return nil | |
} | |
if len(data) == 1 && data[0] == codec.NilFlag { | |
return nil | |
} | |
var ( | |
cnt int | |
b []byte | |
) | |
for len(data) > 0 && cnt < len(colIDs) { | |
// Get col id. | |
remain, cid, err := codec.DecodeVarint(data[1:]) | |
// Get col value. | |
b, data, err = codec.CutOne(remain) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
offset, ok := colIDs[int64(cid)] | |
if ok { | |
buf[offset] = b | |
cnt++ | |
} | |
} | |
return nil | |
} | |
func BenchmarkDecodeSysbenchRow(b *testing.B) { | |
bin := makeSysbenchRow() | |
var oRow Row | |
oRow.decode(bin) | |
colIDs := []int64{3} | |
rowTypes := makeSysbenchRowTypes() | |
queryTypes := rowTypes[1:2] | |
chk := chunk.NewChunkWithCapacity(queryTypes, 1) | |
buf := make([][]byte, len(colIDs)) | |
orignDefs := make([][]byte, len(colIDs)) | |
oRow.cut(colIDs, orignDefs, buf) | |
writeValuesToChunk(chk, queryTypes, buf) | |
var row Row | |
for i := 0; i < b.N; i++ { | |
chk.Reset() | |
row.decode(bin) | |
row.cut(colIDs, orignDefs, buf) | |
writeValuesToChunk(chk, queryTypes, buf) | |
} | |
_ = row | |
} | |
func BenchmarkDecodeSysbenchOldRow(b *testing.B) { | |
bin := makeSysbenchOldRow() | |
colIDs := make(map[int64]int) | |
colIDs[3] = 0 | |
buf := make([][]byte, len(colIDs)) | |
rowTypes := makeSysbenchRowTypes() | |
queryTypes := rowTypes[1:2] | |
chk := chunk.NewChunkWithCapacity(queryTypes, 1) | |
decoder := codec.NewDecoder(chk, time.Local) | |
CutRowNew(bin, colIDs, buf) | |
for j, data := range buf { | |
decoder.DecodeOne(data, j, queryTypes[j]) | |
} | |
for i := 0; i < b.N; i++ { | |
chk.Reset() | |
CutRowNew(bin, colIDs, buf) | |
for j, data := range buf { | |
decoder.DecodeOne(data, j, queryTypes[j]) | |
} | |
} | |
} | |
const ( | |
numCols = 3 | |
colStart = 1 | |
colEnd = 3 | |
) | |
func makeRow(nCols int) []byte { | |
var buf []byte | |
buf = append(buf, codecVer, 0) | |
buf = append(buf, byte(nCols), byte(nCols>>8)) | |
buf = append(buf, 0, 0) | |
// column IDs | |
for i := 0; i < nCols; i++ { | |
buf = append(buf, byte(i+1)) | |
} | |
// Offsets | |
for i := 0; i < nCols; i++ { | |
buf = append(buf, byte(i+1), 0) | |
} | |
// Data | |
for i := 0; i < nCols; i++ { | |
buf = append(buf, byte(i)) | |
} | |
return buf | |
} | |
func makeOldRow(nCols int) []byte { | |
var row []byte | |
for i := 0; i < nCols; i++ { | |
// column ID and value | |
row, _ = codec.EncodeValue(nil, row, types.NewIntDatum(int64(i)+1), types.NewIntDatum(int64(i))) | |
} | |
return row | |
} | |
func makeQueryColumnIDs(start, end int64) []int64 { | |
l := end - start | |
ids := make([]int64, l) | |
for i := range ids { | |
ids[i] = int64(i) + start | |
} | |
return ids | |
} | |
func makeSysbenchRow() []byte { | |
var row Row | |
row.numNotNullCols = 3 | |
row.colIDs = []byte{2, 3, 4} | |
row.offsets = []uint16{4, 124, 184} | |
row.data = make([]byte, 4+120+60) | |
return row.encode(nil) | |
} | |
func makeSysbenchOldRow() []byte { | |
bin, _ := codec.EncodeValue(nil, nil, types.NewIntDatum(2), types.NewIntDatum(0), | |
types.NewIntDatum(3), types.NewBytesDatum(make([]byte, 120)), | |
types.NewIntDatum(4), types.NewBytesDatum(make([]byte, 60))) | |
return bin | |
} | |
func makeSysbenchRowTypes() []*types.FieldType { | |
fields := make([]*types.FieldType, 3) | |
fields[0] = types.NewFieldType(mysql.TypeLonglong) | |
fields[1] = types.NewFieldType(mysql.TypeVarchar) | |
fields[2] = types.NewFieldType(mysql.TypeVarchar) | |
return fields | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment