Skip to content

Instantly share code, notes, and snippets.

@mccutchen
Created December 12, 2018 00:31
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mccutchen/174bf5681264871644dfb5b8ec079cd9 to your computer and use it in GitHub Desktop.
Save mccutchen/174bf5681264871644dfb5b8ec079cd9 to your computer and use it in GitHub Desktop.
package main
// Here's a half-assed "jepsen" test for nsq_to_file that exercises its
// management of files in -work-dir and -output-dir in the face of multiple
// processes sharing those directories.
//
// Usage:
//
// 1. Ensure that the versions of nsqd & nsq_to_file you intend to test are on
// your $PATH:
//
// cd $GOPATH/src/github.com/nsqio/nsq && git checkout my_branch && make
// export PATH="$GOPATH/src/github.com/nsqio/nsq/build:$PATH"
//
// 2. Run this test:
//
// go run nsq_to_file_jepsen.go
//
// Test behavior:
//
// This test will launch 1 nsqd process and N nsq_to_file processes, publish M
// messages to a given topic, and validate that the expected messages were
// correctly archived in the output dir.
//
// During the course of the test, each nsq_to_file process will be restarted on
// some interval, to generate duplicate files in the working directory and
// output directory that must be accounted for when nsq_to_file chooses names.
//
// All output from nsqd and each nsq_to_file process is captured in a single
// combined log file for later examination.
import (
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"os/signal"
"path/filepath"
"syscall"
"time"
"golang.org/x/sync/errgroup"
)
var (
httpClient = &http.Client{
Timeout: 250 * time.Millisecond,
}
)
// Msg is a test message published to nsqd and consumed by nsq_to_file
type Msg struct {
ID int `json:"id"`
Topic string `json:"topic"`
}
func runNsqd(ctx context.Context, outputRoot string, logSink io.Writer) error {
log.Printf("running nsqd")
dataPath := filepath.Join(outputRoot, "nsqd_data")
err := os.MkdirAll(dataPath, 0770)
if err != nil {
return fmt.Errorf("error creating nsqd data path %s: %s", dataPath, err)
}
cmd := exec.Command("nsqd", "-data-path", dataPath)
cmd.Stdout = logSink
cmd.Stderr = logSink
err = cmd.Start()
if err != nil {
return fmt.Errorf("error running nsqd: %s", err)
}
<-ctx.Done()
log.Printf("stopping nsqd")
cmd.Process.Signal(syscall.SIGTERM)
return cmd.Wait()
}
func runNsqToFile(ctx context.Context, topic string, outputRoot string, ttl time.Duration, logSink io.Writer) error {
log.Printf("running nsq_to_file for topic %#v, restarting every %s", topic, ttl)
for {
cmd := exec.Command(
"nsq_to_file",
"-nsqd-tcp-address", ":4150",
"-topic", topic,
"-output-dir", filepath.Join(outputRoot, "output"),
"-work-dir", filepath.Join(outputRoot, "work"),
"-filename-format", "<TOPIC>/<TOPIC>.<DATETIME>+0000.<HOST><REV>.log",
"-datetime-format", "%Y-%m-%d_%H-%M",
"-gzip",
)
cmd.Stdout = logSink
cmd.Stderr = logSink
err := cmd.Start()
if err != nil {
return fmt.Errorf("error starting nsq_to_file: %s", err)
}
select {
case <-ctx.Done():
log.Printf("stopping nsq_to_file")
cmd.Process.Signal(syscall.SIGTERM)
return cmd.Wait()
case <-time.After(ttl):
// log.Printf("restarting nsq_to_file")
cmd.Process.Signal(syscall.SIGTERM)
err := cmd.Wait()
if err != nil {
return err
}
continue
}
}
}
func runPublisher(ctx context.Context, topic string, count int, duration time.Duration) error {
makeMessage := func(topic string, id int) []byte {
msg, _ := json.Marshal(Msg{ID: id, Topic: topic})
return msg
}
publish := func(topic string, msg []byte) error {
url := fmt.Sprintf("http://127.0.0.1:4151/mpub?topic=%s", topic)
req, _ := http.NewRequest("POST", url, bytes.NewReader(msg))
resp, err := httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
ioutil.ReadAll(resp.Body) // body must be read to re-use http cx
if resp.StatusCode != 200 {
return fmt.Errorf("bad response from nsqd: HTTP %d", resp.StatusCode)
}
return nil
}
log.Printf("publishing %d messages over %s", count, duration)
targetDelay := duration / time.Duration(count)
lastTime := time.Now()
for i := 0; ; i++ {
msg := makeMessage(topic, i)
err := publish(topic, msg)
if err != nil {
return fmt.Errorf("error publishing message: %s", err)
}
if i == count-1 {
return nil
}
delay := targetDelay - time.Now().Sub(lastTime)
lastTime = time.Now()
select {
case <-time.After(delay):
continue
case <-ctx.Done():
log.Printf("stopping publisher")
return nil
}
}
}
func pollNsqdForFinish(ctx context.Context, expectedMsgCount int, interval time.Duration) error {
type channelInfo struct {
Depth int `json:"depth"`
InFlightCount int `json:"in_flight_count"`
MessageCount int `json:"message_count"`
Name string `json:"channel_name"`
}
type topicInfo struct {
Channels []channelInfo `json:"channels"`
Depth int `json:"depth"`
InFlightCount int `json:"in_flight_count"`
MessageCount int `json:"message_count"`
Name string `json:"topic_name"`
}
type statsResp struct {
Topics []topicInfo
}
for {
req, _ := http.NewRequest("GET", "http://127.0.0.1:4151/stats?format=json", nil)
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("error connecting to nsqd: %s", err)
}
stats := &statsResp{}
err = json.NewDecoder(resp.Body).Decode(stats)
resp.Body.Close()
if err != nil {
return fmt.Errorf("error parsing nsqd stats response: %s", err)
}
for _, t := range stats.Topics {
for _, c := range t.Channels {
if c.Name == "nsq_to_file" {
log.Printf("nsq_to_file stats: %d messages; %d channel depth, %d in flight", c.MessageCount, c.Depth, c.InFlightCount)
if c.MessageCount >= expectedMsgCount && c.Depth == 0 && c.InFlightCount == 0 {
log.Printf("nsqd fully drained")
return nil
}
}
}
}
select {
case <-time.After(interval):
continue
case <-ctx.Done():
return nil
}
}
}
func report(topic string, outputDir string, expectedMsgCount int) bool {
orphanedFiles, err := filepath.Glob(filepath.Join(outputDir, "work", topic, "*"))
if err != nil {
log.Printf("failure: error listing work dir: %s", err)
return false
}
if len(orphanedFiles) > 0 {
log.Printf("failure: found orphaned files in working dir:")
for _, path := range orphanedFiles {
log.Printf(" - %s", path)
}
return false
}
foundIds := make(map[int]int)
outPattern := filepath.Join(outputDir, "output", topic, "*")
outputFiles, err := filepath.Glob(outPattern)
if err != nil {
log.Printf("failure: error listing output dir: %s", err)
return false
}
if len(outputFiles) == 0 {
log.Printf("failure: found no output files matching %s", outPattern)
return false
}
for _, path := range outputFiles {
err := loadOutputFile(path, foundIds)
if err != nil {
log.Printf("failure: %s", err)
return false
}
}
passed := true
if len(foundIds) != expectedMsgCount {
passed = false
log.Printf("failure: expected %d messages, got %d", expectedMsgCount, len(foundIds))
}
for id, count := range foundIds {
if count > 1 {
passed = false
log.Printf("failure: got %d messages with id %d", count, id)
}
}
if passed {
log.Printf("success: found exactly %d messages in %d output files", len(foundIds), len(outputFiles))
log.Printf("output dir: %s", filepath.Join(outputDir, "output"))
}
return passed
}
func loadOutputFile(path string, idMap map[int]int) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("error opening output file %s: %s", path, err)
}
defer f.Close()
gzf, err := gzip.NewReader(f)
if err != nil {
return fmt.Errorf("error opening gzip reader: %s: %s", path, err)
}
defer gzf.Close()
scanner := bufio.NewScanner(gzf)
for scanner.Scan() {
line := scanner.Bytes()
msg := &Msg{}
err := json.Unmarshal(line, msg)
if err != nil {
return fmt.Errorf("error parsing output msg: %s: %s", string(line), err)
}
idMap[msg.ID]++
}
return scanner.Err()
}
func header(text string) {
log.Printf("=============================================================")
log.Printf(text)
log.Printf("=============================================================")
}
func main() {
var (
duration = flag.Duration("duration", 3*time.Minute, "how long should the test last")
count = flag.Int("count", 100000, "number of test messages to publish")
topic = flag.String("topic", "foo", "NSQ topic to which test messages should be published")
nsqToFileCount = flag.Int("nsq-to-file-count", 10, "how many nsq_to_file processses will run")
nsqToFileTTL = flag.Duration("nsq-to-file-ttl", 10*time.Second, "how often should nsq_to_file will be restarted")
testDir = flag.String("dir", "", "test output dir (generated if not given; deleted if exists)")
)
flag.Parse()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g, ctx := errgroup.WithContext(ctx)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
cancel()
}()
header("setup")
if *testDir == "" {
d, err := ioutil.TempDir("", "nsq-to-file-test")
if err != nil {
log.Fatalf("error creating test dir: %s", err)
}
testDir = &d
}
log.Printf("removing working dir %s (if it exists)", *testDir)
if err := os.RemoveAll(*testDir); err != nil {
log.Fatalf("error removing test dir %s: %s", *testDir, err)
}
log.Printf("creating working dir %s", *testDir)
if err := os.MkdirAll(*testDir, 0755); err != nil {
log.Fatalf("error creating test dir %s: %s", *testDir, err)
}
logFile, err := os.Create(filepath.Join(*testDir, "logs.txt"))
if err != nil {
log.Fatalf("error creating log file: %s", err)
}
defer logFile.Close()
log.Printf("nsqd and nsq_to_file logs combined in %s", logFile.Name())
g.Go(func() error {
return runNsqd(ctx, *testDir, logFile)
})
time.Sleep(1 * time.Second)
for i := 0; i < *nsqToFileCount; i++ {
g.Go(func() error {
return runNsqToFile(ctx, *topic, *testDir, *nsqToFileTTL, logFile)
})
}
time.Sleep(1 * time.Second)
header("generating messages")
g.Go(func() error {
return runPublisher(ctx, *topic, *count, *duration)
})
pollNsqdForFinish(ctx, *count, 5*time.Second)
cancel()
if err := g.Wait(); err != nil {
log.Fatalf("error running test: %s", err)
}
header("report")
passed := report(*topic, *testDir, *count)
if !passed {
os.Exit(1)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment