Created
December 12, 2018 00:31
-
-
Save mccutchen/174bf5681264871644dfb5b8ec079cd9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
// Here's a half-assed "jepsen" test for nsq_to_file that exercises its | |
// management of files in -work-dir and -output-dir in the face of multiple | |
// processes sharing those directories. | |
// | |
// Usage: | |
// | |
// 1. Ensure that the versions of nsqd & nsq_to_file you intend to test are on | |
// your $PATH: | |
// | |
// cd $GOPATH/src/github.com/nsqio/nsq && git checkout my_branch && make | |
// export PATH="$GOPATH/src/github.com/nsqio/nsq/build:$PATH" | |
// | |
// 2. Run this test: | |
// | |
// go run nsq_to_file_jepsen.go | |
// | |
// Test behavior: | |
// | |
// This test will launch 1 nsqd process and N nsq_to_file processes, publish M | |
// messages to a given topic, and validate that the expected messages were | |
// correctly archived in the output dir. | |
// | |
// During the course of the test, each nsq_to_file process will be restarted on | |
// some interval, to generate duplicate files in the working directory and | |
// output directory that must be accounted for when nsq_to_file chooses names. | |
// | |
// All output from nsqd and each nsq_to_file process is captured in a single | |
// combined log file for later examination. | |
import ( | |
"bufio" | |
"bytes" | |
"compress/gzip" | |
"context" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"log" | |
"net/http" | |
"os" | |
"os/exec" | |
"os/signal" | |
"path/filepath" | |
"syscall" | |
"time" | |
"golang.org/x/sync/errgroup" | |
) | |
var ( | |
httpClient = &http.Client{ | |
Timeout: 250 * time.Millisecond, | |
} | |
) | |
// Msg is a test message published to nsqd and consumed by nsq_to_file | |
type Msg struct { | |
ID int `json:"id"` | |
Topic string `json:"topic"` | |
} | |
func runNsqd(ctx context.Context, outputRoot string, logSink io.Writer) error { | |
log.Printf("running nsqd") | |
dataPath := filepath.Join(outputRoot, "nsqd_data") | |
err := os.MkdirAll(dataPath, 0770) | |
if err != nil { | |
return fmt.Errorf("error creating nsqd data path %s: %s", dataPath, err) | |
} | |
cmd := exec.Command("nsqd", "-data-path", dataPath) | |
cmd.Stdout = logSink | |
cmd.Stderr = logSink | |
err = cmd.Start() | |
if err != nil { | |
return fmt.Errorf("error running nsqd: %s", err) | |
} | |
<-ctx.Done() | |
log.Printf("stopping nsqd") | |
cmd.Process.Signal(syscall.SIGTERM) | |
return cmd.Wait() | |
} | |
func runNsqToFile(ctx context.Context, topic string, outputRoot string, ttl time.Duration, logSink io.Writer) error { | |
log.Printf("running nsq_to_file for topic %#v, restarting every %s", topic, ttl) | |
for { | |
cmd := exec.Command( | |
"nsq_to_file", | |
"-nsqd-tcp-address", ":4150", | |
"-topic", topic, | |
"-output-dir", filepath.Join(outputRoot, "output"), | |
"-work-dir", filepath.Join(outputRoot, "work"), | |
"-filename-format", "<TOPIC>/<TOPIC>.<DATETIME>+0000.<HOST><REV>.log", | |
"-datetime-format", "%Y-%m-%d_%H-%M", | |
"-gzip", | |
) | |
cmd.Stdout = logSink | |
cmd.Stderr = logSink | |
err := cmd.Start() | |
if err != nil { | |
return fmt.Errorf("error starting nsq_to_file: %s", err) | |
} | |
select { | |
case <-ctx.Done(): | |
log.Printf("stopping nsq_to_file") | |
cmd.Process.Signal(syscall.SIGTERM) | |
return cmd.Wait() | |
case <-time.After(ttl): | |
// log.Printf("restarting nsq_to_file") | |
cmd.Process.Signal(syscall.SIGTERM) | |
err := cmd.Wait() | |
if err != nil { | |
return err | |
} | |
continue | |
} | |
} | |
} | |
func runPublisher(ctx context.Context, topic string, count int, duration time.Duration) error { | |
makeMessage := func(topic string, id int) []byte { | |
msg, _ := json.Marshal(Msg{ID: id, Topic: topic}) | |
return msg | |
} | |
publish := func(topic string, msg []byte) error { | |
url := fmt.Sprintf("http://127.0.0.1:4151/mpub?topic=%s", topic) | |
req, _ := http.NewRequest("POST", url, bytes.NewReader(msg)) | |
resp, err := httpClient.Do(req) | |
if err != nil { | |
return err | |
} | |
defer resp.Body.Close() | |
ioutil.ReadAll(resp.Body) // body must be read to re-use http cx | |
if resp.StatusCode != 200 { | |
return fmt.Errorf("bad response from nsqd: HTTP %d", resp.StatusCode) | |
} | |
return nil | |
} | |
log.Printf("publishing %d messages over %s", count, duration) | |
targetDelay := duration / time.Duration(count) | |
lastTime := time.Now() | |
for i := 0; ; i++ { | |
msg := makeMessage(topic, i) | |
err := publish(topic, msg) | |
if err != nil { | |
return fmt.Errorf("error publishing message: %s", err) | |
} | |
if i == count-1 { | |
return nil | |
} | |
delay := targetDelay - time.Now().Sub(lastTime) | |
lastTime = time.Now() | |
select { | |
case <-time.After(delay): | |
continue | |
case <-ctx.Done(): | |
log.Printf("stopping publisher") | |
return nil | |
} | |
} | |
} | |
func pollNsqdForFinish(ctx context.Context, expectedMsgCount int, interval time.Duration) error { | |
type channelInfo struct { | |
Depth int `json:"depth"` | |
InFlightCount int `json:"in_flight_count"` | |
MessageCount int `json:"message_count"` | |
Name string `json:"channel_name"` | |
} | |
type topicInfo struct { | |
Channels []channelInfo `json:"channels"` | |
Depth int `json:"depth"` | |
InFlightCount int `json:"in_flight_count"` | |
MessageCount int `json:"message_count"` | |
Name string `json:"topic_name"` | |
} | |
type statsResp struct { | |
Topics []topicInfo | |
} | |
for { | |
req, _ := http.NewRequest("GET", "http://127.0.0.1:4151/stats?format=json", nil) | |
resp, err := httpClient.Do(req) | |
if err != nil { | |
return fmt.Errorf("error connecting to nsqd: %s", err) | |
} | |
stats := &statsResp{} | |
err = json.NewDecoder(resp.Body).Decode(stats) | |
resp.Body.Close() | |
if err != nil { | |
return fmt.Errorf("error parsing nsqd stats response: %s", err) | |
} | |
for _, t := range stats.Topics { | |
for _, c := range t.Channels { | |
if c.Name == "nsq_to_file" { | |
log.Printf("nsq_to_file stats: %d messages; %d channel depth, %d in flight", c.MessageCount, c.Depth, c.InFlightCount) | |
if c.MessageCount >= expectedMsgCount && c.Depth == 0 && c.InFlightCount == 0 { | |
log.Printf("nsqd fully drained") | |
return nil | |
} | |
} | |
} | |
} | |
select { | |
case <-time.After(interval): | |
continue | |
case <-ctx.Done(): | |
return nil | |
} | |
} | |
} | |
func report(topic string, outputDir string, expectedMsgCount int) bool { | |
orphanedFiles, err := filepath.Glob(filepath.Join(outputDir, "work", topic, "*")) | |
if err != nil { | |
log.Printf("failure: error listing work dir: %s", err) | |
return false | |
} | |
if len(orphanedFiles) > 0 { | |
log.Printf("failure: found orphaned files in working dir:") | |
for _, path := range orphanedFiles { | |
log.Printf(" - %s", path) | |
} | |
return false | |
} | |
foundIds := make(map[int]int) | |
outPattern := filepath.Join(outputDir, "output", topic, "*") | |
outputFiles, err := filepath.Glob(outPattern) | |
if err != nil { | |
log.Printf("failure: error listing output dir: %s", err) | |
return false | |
} | |
if len(outputFiles) == 0 { | |
log.Printf("failure: found no output files matching %s", outPattern) | |
return false | |
} | |
for _, path := range outputFiles { | |
err := loadOutputFile(path, foundIds) | |
if err != nil { | |
log.Printf("failure: %s", err) | |
return false | |
} | |
} | |
passed := true | |
if len(foundIds) != expectedMsgCount { | |
passed = false | |
log.Printf("failure: expected %d messages, got %d", expectedMsgCount, len(foundIds)) | |
} | |
for id, count := range foundIds { | |
if count > 1 { | |
passed = false | |
log.Printf("failure: got %d messages with id %d", count, id) | |
} | |
} | |
if passed { | |
log.Printf("success: found exactly %d messages in %d output files", len(foundIds), len(outputFiles)) | |
log.Printf("output dir: %s", filepath.Join(outputDir, "output")) | |
} | |
return passed | |
} | |
func loadOutputFile(path string, idMap map[int]int) error { | |
f, err := os.Open(path) | |
if err != nil { | |
return fmt.Errorf("error opening output file %s: %s", path, err) | |
} | |
defer f.Close() | |
gzf, err := gzip.NewReader(f) | |
if err != nil { | |
return fmt.Errorf("error opening gzip reader: %s: %s", path, err) | |
} | |
defer gzf.Close() | |
scanner := bufio.NewScanner(gzf) | |
for scanner.Scan() { | |
line := scanner.Bytes() | |
msg := &Msg{} | |
err := json.Unmarshal(line, msg) | |
if err != nil { | |
return fmt.Errorf("error parsing output msg: %s: %s", string(line), err) | |
} | |
idMap[msg.ID]++ | |
} | |
return scanner.Err() | |
} | |
func header(text string) { | |
log.Printf("=============================================================") | |
log.Printf(text) | |
log.Printf("=============================================================") | |
} | |
func main() { | |
var ( | |
duration = flag.Duration("duration", 3*time.Minute, "how long should the test last") | |
count = flag.Int("count", 100000, "number of test messages to publish") | |
topic = flag.String("topic", "foo", "NSQ topic to which test messages should be published") | |
nsqToFileCount = flag.Int("nsq-to-file-count", 10, "how many nsq_to_file processses will run") | |
nsqToFileTTL = flag.Duration("nsq-to-file-ttl", 10*time.Second, "how often should nsq_to_file will be restarted") | |
testDir = flag.String("dir", "", "test output dir (generated if not given; deleted if exists)") | |
) | |
flag.Parse() | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
g, ctx := errgroup.WithContext(ctx) | |
sigCh := make(chan os.Signal, 1) | |
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) | |
go func() { | |
<-sigCh | |
cancel() | |
}() | |
header("setup") | |
if *testDir == "" { | |
d, err := ioutil.TempDir("", "nsq-to-file-test") | |
if err != nil { | |
log.Fatalf("error creating test dir: %s", err) | |
} | |
testDir = &d | |
} | |
log.Printf("removing working dir %s (if it exists)", *testDir) | |
if err := os.RemoveAll(*testDir); err != nil { | |
log.Fatalf("error removing test dir %s: %s", *testDir, err) | |
} | |
log.Printf("creating working dir %s", *testDir) | |
if err := os.MkdirAll(*testDir, 0755); err != nil { | |
log.Fatalf("error creating test dir %s: %s", *testDir, err) | |
} | |
logFile, err := os.Create(filepath.Join(*testDir, "logs.txt")) | |
if err != nil { | |
log.Fatalf("error creating log file: %s", err) | |
} | |
defer logFile.Close() | |
log.Printf("nsqd and nsq_to_file logs combined in %s", logFile.Name()) | |
g.Go(func() error { | |
return runNsqd(ctx, *testDir, logFile) | |
}) | |
time.Sleep(1 * time.Second) | |
for i := 0; i < *nsqToFileCount; i++ { | |
g.Go(func() error { | |
return runNsqToFile(ctx, *topic, *testDir, *nsqToFileTTL, logFile) | |
}) | |
} | |
time.Sleep(1 * time.Second) | |
header("generating messages") | |
g.Go(func() error { | |
return runPublisher(ctx, *topic, *count, *duration) | |
}) | |
pollNsqdForFinish(ctx, *count, 5*time.Second) | |
cancel() | |
if err := g.Wait(); err != nil { | |
log.Fatalf("error running test: %s", err) | |
} | |
header("report") | |
passed := report(*topic, *testDir, *count) | |
if !passed { | |
os.Exit(1) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment