Skip to content

Instantly share code, notes, and snippets.

@wrouesnel
Created May 13, 2020 17:03
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save wrouesnel/631a12732fc6e888b5d682b9f46d38e7 to your computer and use it in GitHub Desktop.
Implementation of determining ZFS stream type before receiving.
package main
import (
"bytes"
"errors"
"fmt"
"io"
"os"
"os/exec"
"os/signal"
"strings"
"syscall"
)
type StreamType string
const (
// StreamTypeUnknown is returned when the stream fails to identify
StreamTypeUnknown StreamType = "unknown"
// StreamTypeIncremental is when the stream is incremental
StreamTypeIncremental StreamType = "incremental"
// StreamTypeFull is when the stream is a full stream
StreamTypeFull StreamType = "full"
)
const (
// initialByteLen is how many bytes of the stream we need to peek to get the header.
initialByteLen = 312
)
type StreamInfo struct {
ToName string
}
// ZFSSendStreamReader buffers up the first chunk of the send stream
// to get the header and identify the stream type.
type ZFSSendStreamReader struct {
streamReader io.Reader
streamCloser io.Closer
streamType StreamType
streamInfo StreamInfo
}
func (zssr *ZFSSendStreamReader) Read(p []byte) (n int, err error) {
return zssr.streamReader.Read(p)
}
func (zssr *ZFSSendStreamReader) Close() error {
return zssr.streamCloser.Close()
}
// StreamType returns the current determined status of the stream
func (zssr *ZFSSendStreamReader) StreamType() StreamType {
return zssr.streamType
}
// StreamInfo returns auxillary stream information after the type is determined, if it is not unknown.
func (zssr *ZFSSendStreamReader) StreamInfo() StreamInfo {
return zssr.streamInfo
}
// NewZFSSendStreamReader initializes a new ZFS stream reader. It blocks until stream info is available,
// and returns an error if the initial read fails to get the stream info.
func NewZFSSendStreamReader(closer io.ReadCloser) (*ZFSSendStreamReader, error) {
zsdPath, err := exec.LookPath("zstreamdump")
if err != nil {
return nil, err
}
initialBytes := make([]byte, initialByteLen)
peekBuffer := bytes.NewBuffer(initialBytes)
_, err = io.ReadFull(closer, initialBytes)
if err != nil {
return nil, err
}
cmd := exec.Command(zsdPath)
cmd.Stdin = bytes.NewReader(initialBytes)
output, err := cmd.Output()
if err != nil {
return nil, err
}
// Parse the output of zstreamdump. It looks something like this (line numbers added with cat -n)
// An incremental vs a fullstream is determined by whether fromguid has a value that's non-zero.
// 1 BEGIN record
// 2 hdrtype = 1
// 3 features = 430004
// 4 magic = 2f5bacbac
// 5 creation_time = 5ebc10f4
// 6 type = 2
// 7 flags = 0xc
// 8 toguid = b08c88f11bf2c7fc
// 9 fromguid = 0
// 10 toname = rpool/home/tmp/fs1@1
// 11 SUMMARY:
// 12 Total DRR_BEGIN records = 1
// 13 Total DRR_END records = 0
// 14 Total DRR_OBJECT records = 0
// 15 Total DRR_FREEOBJECTS records = 0
// 16 Total DRR_WRITE records = 0
// 17 Total DRR_WRITE_BYREF records = 0
// 18 Total DRR_WRITE_EMBEDDED records = 0
// 19 Total DRR_FREE records = 0
// 20 Total DRR_SPILL records = 0
// 21 Total records = 1
// 22 Total write size = 0 (0x0)
// 23 Total stream length = 312 (0x138)
streamType := StreamTypeUnknown
streamInfo := StreamInfo{}
for idx, line := range strings.Split(string(output), "\n") {
fmt.Fprintln(os.Stderr, idx, line)
if idx == 0 {
if line != "BEGIN record" {
return nil, errors.New("BEGIN record not found in zstreamdump output")
}
continue
}
// Once we run out of tab-delimited lines, we've finished reading the block
if !strings.HasPrefix(line,"\t") {
break
}
line = strings.TrimLeft(line, "\t")
parts := strings.SplitN(line, " = ",2)
if len(parts) != 2 {
break
}
key := parts[0]
value := parts[1]
switch key {
case "fromguid":
if value == "0" {
streamType = StreamTypeFull
} else {
streamType = StreamTypeIncremental
}
case "toname":
streamInfo.ToName = value
}
}
zssr := &ZFSSendStreamReader{
streamReader: io.MultiReader(peekBuffer, closer),
streamCloser: closer,
streamType: streamType,
streamInfo: streamInfo,
}
return zssr, nil
}
func main() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
for _ = range ch {
fmt.Fprintln(os.Stderr, "Interrupted")
os.Exit(1)
}
}()
s := os.Stdin
if len(os.Args) > 1 {
var err error
s, err = os.Open(os.Args[1])
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
zssr, err := NewZFSSendStreamReader(s)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Fprintln(os.Stderr, "Stream Type:", zssr.streamType)
fmt.Fprintln(os.Stderr, "Stream FS name:", zssr.streamInfo.ToName)
io.Copy(os.Stdout, zssr)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment