Created
May 13, 2020 17:03
Star
You must be signed in to star a gist
Implementation of determining ZFS stream type before receiving.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"errors" | |
"fmt" | |
"io" | |
"os" | |
"os/exec" | |
"os/signal" | |
"strings" | |
"syscall" | |
) | |
type StreamType string | |
const ( | |
// StreamTypeUnknown is returned when the stream fails to identify | |
StreamTypeUnknown StreamType = "unknown" | |
// StreamTypeIncremental is when the stream is incremental | |
StreamTypeIncremental StreamType = "incremental" | |
// StreamTypeFull is when the stream is a full stream | |
StreamTypeFull StreamType = "full" | |
) | |
const ( | |
// initialByteLen is how many bytes of the stream we need to peek to get the header. | |
initialByteLen = 312 | |
) | |
type StreamInfo struct { | |
ToName string | |
} | |
// ZFSSendStreamReader buffers up the first chunk of the send stream | |
// to get the header and identify the stream type. | |
type ZFSSendStreamReader struct { | |
streamReader io.Reader | |
streamCloser io.Closer | |
streamType StreamType | |
streamInfo StreamInfo | |
} | |
func (zssr *ZFSSendStreamReader) Read(p []byte) (n int, err error) { | |
return zssr.streamReader.Read(p) | |
} | |
func (zssr *ZFSSendStreamReader) Close() error { | |
return zssr.streamCloser.Close() | |
} | |
// StreamType returns the current determined status of the stream | |
func (zssr *ZFSSendStreamReader) StreamType() StreamType { | |
return zssr.streamType | |
} | |
// StreamInfo returns auxillary stream information after the type is determined, if it is not unknown. | |
func (zssr *ZFSSendStreamReader) StreamInfo() StreamInfo { | |
return zssr.streamInfo | |
} | |
// NewZFSSendStreamReader initializes a new ZFS stream reader. It blocks until stream info is available, | |
// and returns an error if the initial read fails to get the stream info. | |
func NewZFSSendStreamReader(closer io.ReadCloser) (*ZFSSendStreamReader, error) { | |
zsdPath, err := exec.LookPath("zstreamdump") | |
if err != nil { | |
return nil, err | |
} | |
initialBytes := make([]byte, initialByteLen) | |
peekBuffer := bytes.NewBuffer(initialBytes) | |
_, err = io.ReadFull(closer, initialBytes) | |
if err != nil { | |
return nil, err | |
} | |
cmd := exec.Command(zsdPath) | |
cmd.Stdin = bytes.NewReader(initialBytes) | |
output, err := cmd.Output() | |
if err != nil { | |
return nil, err | |
} | |
// Parse the output of zstreamdump. It looks something like this (line numbers added with cat -n) | |
// An incremental vs a fullstream is determined by whether fromguid has a value that's non-zero. | |
// 1 BEGIN record | |
// 2 hdrtype = 1 | |
// 3 features = 430004 | |
// 4 magic = 2f5bacbac | |
// 5 creation_time = 5ebc10f4 | |
// 6 type = 2 | |
// 7 flags = 0xc | |
// 8 toguid = b08c88f11bf2c7fc | |
// 9 fromguid = 0 | |
// 10 toname = rpool/home/tmp/fs1@1 | |
// 11 SUMMARY: | |
// 12 Total DRR_BEGIN records = 1 | |
// 13 Total DRR_END records = 0 | |
// 14 Total DRR_OBJECT records = 0 | |
// 15 Total DRR_FREEOBJECTS records = 0 | |
// 16 Total DRR_WRITE records = 0 | |
// 17 Total DRR_WRITE_BYREF records = 0 | |
// 18 Total DRR_WRITE_EMBEDDED records = 0 | |
// 19 Total DRR_FREE records = 0 | |
// 20 Total DRR_SPILL records = 0 | |
// 21 Total records = 1 | |
// 22 Total write size = 0 (0x0) | |
// 23 Total stream length = 312 (0x138) | |
streamType := StreamTypeUnknown | |
streamInfo := StreamInfo{} | |
for idx, line := range strings.Split(string(output), "\n") { | |
fmt.Fprintln(os.Stderr, idx, line) | |
if idx == 0 { | |
if line != "BEGIN record" { | |
return nil, errors.New("BEGIN record not found in zstreamdump output") | |
} | |
continue | |
} | |
// Once we run out of tab-delimited lines, we've finished reading the block | |
if !strings.HasPrefix(line,"\t") { | |
break | |
} | |
line = strings.TrimLeft(line, "\t") | |
parts := strings.SplitN(line, " = ",2) | |
if len(parts) != 2 { | |
break | |
} | |
key := parts[0] | |
value := parts[1] | |
switch key { | |
case "fromguid": | |
if value == "0" { | |
streamType = StreamTypeFull | |
} else { | |
streamType = StreamTypeIncremental | |
} | |
case "toname": | |
streamInfo.ToName = value | |
} | |
} | |
zssr := &ZFSSendStreamReader{ | |
streamReader: io.MultiReader(peekBuffer, closer), | |
streamCloser: closer, | |
streamType: streamType, | |
streamInfo: streamInfo, | |
} | |
return zssr, nil | |
} | |
func main() { | |
ch := make(chan os.Signal, 1) | |
signal.Notify(ch, os.Interrupt, syscall.SIGTERM) | |
go func() { | |
for _ = range ch { | |
fmt.Fprintln(os.Stderr, "Interrupted") | |
os.Exit(1) | |
} | |
}() | |
s := os.Stdin | |
if len(os.Args) > 1 { | |
var err error | |
s, err = os.Open(os.Args[1]) | |
if err != nil { | |
fmt.Fprintln(os.Stderr, err) | |
os.Exit(1) | |
} | |
} | |
zssr, err := NewZFSSendStreamReader(s) | |
if err != nil { | |
fmt.Println(err) | |
os.Exit(1) | |
} | |
fmt.Fprintln(os.Stderr, "Stream Type:", zssr.streamType) | |
fmt.Fprintln(os.Stderr, "Stream FS name:", zssr.streamInfo.ToName) | |
io.Copy(os.Stdout, zssr) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment