Last active
July 8, 2024 19:42
-
-
Save mattlord/6c8a2b90b2cb100da7fe22c90196065e to your computer and use it in GitHub Desktop.
Re-use stateful decoder in vstreamer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/examples/common/scripts/vttablet-up.sh b/examples/common/scripts/vttablet-up.sh | |
index daa40aee89..a3e1caadc6 100755 | |
--- a/examples/common/scripts/vttablet-up.sh | |
+++ b/examples/common/scripts/vttablet-up.sh | |
@@ -54,6 +54,8 @@ vttablet \ | |
--service_map 'grpc-queryservice,grpc-tabletmanager,grpc-updatestream' \ | |
--pid_file $VTDATAROOT/$tablet_dir/vttablet.pid \ | |
--heartbeat_on_demand_duration=5s \ | |
+ --grpc_max_message_size 9900000000 \ | |
+ --pprof-http \ | |
> $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 & | |
# Block waiting for the tablet to be listening | |
diff --git a/go/mysql/binlog_event.go b/go/mysql/binlog_event.go | |
index 84f92c3809..eca16d24e1 100644 | |
--- a/go/mysql/binlog_event.go | |
+++ b/go/mysql/binlog_event.go | |
@@ -19,6 +19,7 @@ package mysql | |
import ( | |
"fmt" | |
+ "github.com/klauspost/compress/zstd" | |
"vitess.io/vitess/go/mysql/collations" | |
"vitess.io/vitess/go/mysql/replication" | |
@@ -129,7 +130,7 @@ type BinlogEvent interface { | |
// the uncompressed payload. You must call Close() when you are done | |
// with the TransactionPayload to ensure that the underlying resources | |
// used are cleaned up. | |
- TransactionPayload(BinlogFormat) (*TransactionPayload, error) | |
+ TransactionPayload(BinlogFormat, *zstd.Decoder) (*TransactionPayload, error) | |
// NextLogFile returns the name of the next binary log file & pos. | |
// This is only valid if IsRotate() returns true | |
NextLogFile(BinlogFormat) (string, uint64, error) | |
diff --git a/go/mysql/binlog_event_compression.go b/go/mysql/binlog_event_compression.go | |
index c60c88397a..7921548a08 100644 | |
--- a/go/mysql/binlog_event_compression.go | |
+++ b/go/mysql/binlog_event_compression.go | |
@@ -56,7 +56,7 @@ const ( | |
// At what size should we switch from the in-memory buffer | |
// decoding to streaming mode which is much slower, but does | |
// not require everything be done in memory. | |
- zstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB | |
+ ZstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB | |
) | |
var ( | |
@@ -72,15 +72,16 @@ var ( | |
// Create a stateless reader that caches decompressors. This is used | |
// for smaller events that we want to handle entirely using in-memory | |
// buffers. | |
- zstdDecoder, _ = zstd.NewReader(nil, zstd.WithDecoderConcurrency(0)) | |
+ statelessDecoder, _ = zstd.NewReader(nil, zstd.WithDecoderConcurrency(0)) | |
) | |
type TransactionPayload struct { | |
- size uint64 | |
+ size uint64 // The size of the compressed transaction payload event | |
compressionType uint64 | |
uncompressedSize uint64 | |
- payload []byte | |
+ payload []byte // The raw compressed transaction payload | |
reader io.Reader | |
+ statefulDecoder *zstd.Decoder | |
iterator func() (BinlogEvent, error) | |
} | |
@@ -145,7 +146,7 @@ func (ev binlogEvent) IsTransactionPayload() bool { | |
// We need to extract the compressed transaction payload from the GTID | |
// event, decompress it with zstd, and then process the internal events | |
// (e.g. Query and Row events) that make up the transaction. | |
-func (ev binlogEvent) TransactionPayload(format BinlogFormat) (*TransactionPayload, error) { | |
+func (ev binlogEvent) TransactionPayload(format BinlogFormat, statefulDecoder *zstd.Decoder) (*TransactionPayload, error) { | |
tp := &TransactionPayload{} | |
if err := tp.process(ev.Bytes()[format.HeaderLength:]); err != nil { | |
return nil, vterrors.Wrapf(err, "error decoding transaction payload event") | |
@@ -268,20 +269,26 @@ func (tp *TransactionPayload) decompress() error { | |
// Switch to slower but less memory intensive stream mode for | |
// larger payloads. | |
- if tp.uncompressedSize > zstdInMemoryDecompressorMaxSize { | |
+ if tp.uncompressedSize > ZstdInMemoryDecompressorMaxSize { | |
in := bytes.NewReader(tp.payload) | |
- streamDecoder, err := zstd.NewReader(in, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize)) | |
+ var err error | |
+ statefulDecoder := tp.statefulDecoder | |
+ if statefulDecoder == nil { | |
+ statefulDecoder, err = zstd.NewReader(in, zstd.WithDecoderMaxMemory(ZstdInMemoryDecompressorMaxSize)) | |
+ } else { | |
+ statefulDecoder.Reset(in) | |
+ } | |
if err != nil { | |
return err | |
} | |
compressedTrxPayloadsUsingStream.Add(1) | |
- tp.reader = streamDecoder.IOReadCloser() | |
+ tp.reader = statefulDecoder.IOReadCloser() | |
return nil | |
} | |
// Process smaller payloads using only in-memory buffers. | |
decompressedBytes := make([]byte, 0, tp.uncompressedSize) // Perform a single pre-allocation | |
- decompressedBytes, err := zstdDecoder.DecodeAll(tp.payload, decompressedBytes[:0]) | |
+ decompressedBytes, err := statelessDecoder.DecodeAll(tp.payload, decompressedBytes[:0]) | |
if err != nil { | |
return err | |
} | |
@@ -302,7 +309,9 @@ func (tp *TransactionPayload) Close() { | |
case *bytes.Reader: | |
reader = nil | |
case io.ReadCloser: | |
- reader.Close() | |
+ if tp.statefulDecoder == nil { | |
+ reader.Close() | |
+ } | |
} | |
tp.iterator = nil | |
} | |
diff --git a/go/mysql/binlog_event_filepos.go b/go/mysql/binlog_event_filepos.go | |
index 8a2976da80..0413d149ca 100644 | |
--- a/go/mysql/binlog_event_filepos.go | |
+++ b/go/mysql/binlog_event_filepos.go | |
@@ -20,6 +20,7 @@ import ( | |
"encoding/binary" | |
"fmt" | |
+ "github.com/klauspost/compress/zstd" | |
"vitess.io/vitess/go/mysql/replication" | |
) | |
@@ -247,7 +248,7 @@ func (ev filePosFakeEvent) Rows(BinlogFormat, *TableMap) (Rows, error) { | |
return Rows{}, nil | |
} | |
-func (ev filePosFakeEvent) TransactionPayload(BinlogFormat) (*TransactionPayload, error) { | |
+func (ev filePosFakeEvent) TransactionPayload(BinlogFormat, *zstd.Decoder) (*TransactionPayload, error) { | |
return &TransactionPayload{}, nil | |
} | |
diff --git a/go/mysql/binlog_event_mysql56_test.go b/go/mysql/binlog_event_mysql56_test.go | |
index f173e27e4a..22afb05bf1 100644 | |
--- a/go/mysql/binlog_event_mysql56_test.go | |
+++ b/go/mysql/binlog_event_mysql56_test.go | |
@@ -191,7 +191,7 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) { | |
totalSize += len(eventStr) | |
require.True(t, strings.HasPrefix(eventStr, want)) | |
} | |
- require.Greater(t, totalSize, zstdInMemoryDecompressorMaxSize) | |
+ require.Greater(t, totalSize, ZstdInMemoryDecompressorMaxSize) | |
} | |
} | |
} | |
diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | |
index 3413c53d81..f6e4a6bcaf 100644 | |
--- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | |
+++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | |
@@ -24,6 +24,7 @@ import ( | |
"strings" | |
"time" | |
+ "github.com/klauspost/compress/zstd" | |
"google.golang.org/protobuf/encoding/prototext" | |
"vitess.io/vitess/go/constants/sidecar" | |
@@ -84,6 +85,10 @@ type vstreamer struct { | |
pos replication.Position | |
stopPos string | |
+ // For re-use in decoding large compressed transaction | |
+ // payload events. | |
+ zstdDecoder *zstd.Decoder | |
+ | |
phase string | |
vse *Engine | |
} | |
@@ -157,6 +162,9 @@ func (vs *vstreamer) SetVSchema(vschema *localVSchema) { | |
// Cancel stops the streaming. | |
func (vs *vstreamer) Cancel() { | |
vs.cancel() | |
+ if vs.zstdDecoder != nil { | |
+ vs.zstdDecoder.Close() | |
+ } | |
} | |
// Stream streams binlog events. | |
@@ -642,7 +650,13 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e | |
return nil, fmt.Errorf("compressed transaction payload events are not supported with database flavor %s", | |
vs.vse.env.Config().DB.Flavor) | |
} | |
- tp, err := ev.TransactionPayload(vs.format) | |
+ if vs.zstdDecoder == nil { | |
+ vs.zstdDecoder, err = zstd.NewReader(nil, zstd.WithDecoderMaxMemory(mysql.ZstdInMemoryDecompressorMaxSize)) | |
+ if err != nil { | |
+ return nil, vterrors.Wrap(err, "failed to create reusable stateful zstd decoder") | |
+ } | |
+ } | |
+ tp, err := ev.TransactionPayload(vs.format, vs.zstdDecoder) | |
if err != nil { | |
return nil, err | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment