Skip to content

Instantly share code, notes, and snippets.

@mattlord
Last active July 8, 2024 19:42
Show Gist options
  • Save mattlord/6c8a2b90b2cb100da7fe22c90196065e to your computer and use it in GitHub Desktop.
Save mattlord/6c8a2b90b2cb100da7fe22c90196065e to your computer and use it in GitHub Desktop.
Re-use stateful decoder in vstreamer
diff --git a/examples/common/scripts/vttablet-up.sh b/examples/common/scripts/vttablet-up.sh
index daa40aee89..a3e1caadc6 100755
--- a/examples/common/scripts/vttablet-up.sh
+++ b/examples/common/scripts/vttablet-up.sh
@@ -54,6 +54,8 @@ vttablet \
--service_map 'grpc-queryservice,grpc-tabletmanager,grpc-updatestream' \
--pid_file $VTDATAROOT/$tablet_dir/vttablet.pid \
--heartbeat_on_demand_duration=5s \
+ --grpc_max_message_size 9900000000 \
+ --pprof-http \
> $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 &
# Block waiting for the tablet to be listening
diff --git a/go/mysql/binlog_event.go b/go/mysql/binlog_event.go
index 84f92c3809..eca16d24e1 100644
--- a/go/mysql/binlog_event.go
+++ b/go/mysql/binlog_event.go
@@ -19,6 +19,7 @@ package mysql
import (
"fmt"
+ "github.com/klauspost/compress/zstd"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/replication"
@@ -129,7 +130,7 @@ type BinlogEvent interface {
// the uncompressed payload. You must call Close() when you are done
// with the TransactionPayload to ensure that the underlying resources
// used are cleaned up.
- TransactionPayload(BinlogFormat) (*TransactionPayload, error)
+ TransactionPayload(BinlogFormat, *zstd.Decoder) (*TransactionPayload, error)
// NextLogFile returns the name of the next binary log file & pos.
// This is only valid if IsRotate() returns true
NextLogFile(BinlogFormat) (string, uint64, error)
diff --git a/go/mysql/binlog_event_compression.go b/go/mysql/binlog_event_compression.go
index c60c88397a..7921548a08 100644
--- a/go/mysql/binlog_event_compression.go
+++ b/go/mysql/binlog_event_compression.go
@@ -56,7 +56,7 @@ const (
// At what size should we switch from the in-memory buffer
// decoding to streaming mode which is much slower, but does
// not require everything be done in memory.
- zstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB
+ ZstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB
)
var (
@@ -72,15 +72,16 @@ var (
// Create a stateless reader that caches decompressors. This is used
// for smaller events that we want to handle entirely using in-memory
// buffers.
- zstdDecoder, _ = zstd.NewReader(nil, zstd.WithDecoderConcurrency(0))
+ statelessDecoder, _ = zstd.NewReader(nil, zstd.WithDecoderConcurrency(0))
)
type TransactionPayload struct {
- size uint64
+ size uint64 // The size of the compressed transaction payload event
compressionType uint64
uncompressedSize uint64
- payload []byte
+ payload []byte // The raw compressed transaction payload
reader io.Reader
+ statefulDecoder *zstd.Decoder
iterator func() (BinlogEvent, error)
}
@@ -145,7 +146,7 @@ func (ev binlogEvent) IsTransactionPayload() bool {
// We need to extract the compressed transaction payload from the GTID
// event, decompress it with zstd, and then process the internal events
// (e.g. Query and Row events) that make up the transaction.
-func (ev binlogEvent) TransactionPayload(format BinlogFormat) (*TransactionPayload, error) {
+func (ev binlogEvent) TransactionPayload(format BinlogFormat, statefulDecoder *zstd.Decoder) (*TransactionPayload, error) {
tp := &TransactionPayload{}
if err := tp.process(ev.Bytes()[format.HeaderLength:]); err != nil {
return nil, vterrors.Wrapf(err, "error decoding transaction payload event")
@@ -268,20 +269,26 @@ func (tp *TransactionPayload) decompress() error {
// Switch to slower but less memory intensive stream mode for
// larger payloads.
- if tp.uncompressedSize > zstdInMemoryDecompressorMaxSize {
+ if tp.uncompressedSize > ZstdInMemoryDecompressorMaxSize {
in := bytes.NewReader(tp.payload)
- streamDecoder, err := zstd.NewReader(in, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize))
+ var err error
+ statefulDecoder := tp.statefulDecoder
+ if statefulDecoder == nil {
+ statefulDecoder, err = zstd.NewReader(in, zstd.WithDecoderMaxMemory(ZstdInMemoryDecompressorMaxSize))
+ } else {
+ statefulDecoder.Reset(in)
+ }
if err != nil {
return err
}
compressedTrxPayloadsUsingStream.Add(1)
- tp.reader = streamDecoder.IOReadCloser()
+ tp.reader = statefulDecoder.IOReadCloser()
return nil
}
// Process smaller payloads using only in-memory buffers.
decompressedBytes := make([]byte, 0, tp.uncompressedSize) // Perform a single pre-allocation
- decompressedBytes, err := zstdDecoder.DecodeAll(tp.payload, decompressedBytes[:0])
+ decompressedBytes, err := statelessDecoder.DecodeAll(tp.payload, decompressedBytes[:0])
if err != nil {
return err
}
@@ -302,7 +309,9 @@ func (tp *TransactionPayload) Close() {
case *bytes.Reader:
reader = nil
case io.ReadCloser:
- reader.Close()
+ if tp.statefulDecoder == nil {
+ reader.Close()
+ }
}
tp.iterator = nil
}
diff --git a/go/mysql/binlog_event_filepos.go b/go/mysql/binlog_event_filepos.go
index 8a2976da80..0413d149ca 100644
--- a/go/mysql/binlog_event_filepos.go
+++ b/go/mysql/binlog_event_filepos.go
@@ -20,6 +20,7 @@ import (
"encoding/binary"
"fmt"
+ "github.com/klauspost/compress/zstd"
"vitess.io/vitess/go/mysql/replication"
)
@@ -247,7 +248,7 @@ func (ev filePosFakeEvent) Rows(BinlogFormat, *TableMap) (Rows, error) {
return Rows{}, nil
}
-func (ev filePosFakeEvent) TransactionPayload(BinlogFormat) (*TransactionPayload, error) {
+func (ev filePosFakeEvent) TransactionPayload(BinlogFormat, *zstd.Decoder) (*TransactionPayload, error) {
return &TransactionPayload{}, nil
}
diff --git a/go/mysql/binlog_event_mysql56_test.go b/go/mysql/binlog_event_mysql56_test.go
index f173e27e4a..22afb05bf1 100644
--- a/go/mysql/binlog_event_mysql56_test.go
+++ b/go/mysql/binlog_event_mysql56_test.go
@@ -191,7 +191,7 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) {
totalSize += len(eventStr)
require.True(t, strings.HasPrefix(eventStr, want))
}
- require.Greater(t, totalSize, zstdInMemoryDecompressorMaxSize)
+ require.Greater(t, totalSize, ZstdInMemoryDecompressorMaxSize)
}
}
}
diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
index 3413c53d81..f6e4a6bcaf 100644
--- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
+++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
@@ -24,6 +24,7 @@ import (
"strings"
"time"
+ "github.com/klauspost/compress/zstd"
"google.golang.org/protobuf/encoding/prototext"
"vitess.io/vitess/go/constants/sidecar"
@@ -84,6 +85,10 @@ type vstreamer struct {
pos replication.Position
stopPos string
+ // For re-use in decoding large compressed transaction
+ // payload events.
+ zstdDecoder *zstd.Decoder
+
phase string
vse *Engine
}
@@ -157,6 +162,9 @@ func (vs *vstreamer) SetVSchema(vschema *localVSchema) {
// Cancel stops the streaming.
func (vs *vstreamer) Cancel() {
vs.cancel()
+ if vs.zstdDecoder != nil {
+ vs.zstdDecoder.Close()
+ }
}
// Stream streams binlog events.
@@ -642,7 +650,13 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
return nil, fmt.Errorf("compressed transaction payload events are not supported with database flavor %s",
vs.vse.env.Config().DB.Flavor)
}
- tp, err := ev.TransactionPayload(vs.format)
+ if vs.zstdDecoder == nil {
+ vs.zstdDecoder, err = zstd.NewReader(nil, zstd.WithDecoderMaxMemory(mysql.ZstdInMemoryDecompressorMaxSize))
+ if err != nil {
+ return nil, vterrors.Wrap(err, "failed to create reusable stateful zstd decoder")
+ }
+ }
+ tp, err := ev.TransactionPayload(vs.format, vs.zstdDecoder)
if err != nil {
return nil, err
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment