Skip to content

Instantly share code, notes, and snippets.

@nkcmr

nkcmr/main.go

Created Jun 11, 2017
Embed
What would you like to do?
a concurrent network diagnostic tool of sorts
package main
import (
// "io/ioutil"
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"sync"
"sync/atomic"
"time"
)
const (
chunkSize = 4096
)
type Checker func() chan LineUpdate
type LineUpdate struct {
// Checker is the check that this line update is for
Checker string `json:"ckr"`
// Sequence is an integer for a line update that will be unique across a single job
Sequence uint64 `json:"seq"`
// LineNumber is a zero-indexed integer that represents the line in the output that needs to be updated
LineNumber uint64 `json:"lno"`
// LineContent represents the updated contents of the line
LineContent []string `json:"lin"`
}
func lineUpdateFanIn(cs ...chan LineUpdate) chan LineUpdate {
var wg sync.WaitGroup
out := make(chan LineUpdate, 1)
output := func(c chan LineUpdate) {
for lu := range c {
out <- lu
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
seq := uint64(100)
checkers := []Checker{
func() chan LineUpdate {
outc := make(chan LineUpdate, 1)
go func() {
defer close(outc)
lno := uint64(0)
pingc := exec.Command("/sbin/ping", "-c", "10", "nick.comer.io")
pingcOut, _ := pingc.StdoutPipe()
defer pingcOut.Close()
pingc.Start()
for {
out := make([]byte, chunkSize)
n, err := pingcOut.Read(out)
if n == 0 || err == io.EOF {
break
}
out = out[0:n]
out = bytes.Trim(out, "\n ")
for _, outl := range bytes.Split(out, []byte("\n")) {
outc <- LineUpdate{
Checker: "ping",
Sequence: atomic.AddUint64(&seq, 1),
LineNumber: lno,
LineContent: []string{string(outl)},
}
lno++
}
}
}()
return outc
},
func() chan LineUpdate {
outc := make(chan LineUpdate, 1)
go func() {
close(outc)
// lno := uint64(0)
mtrc := exec.Command("/Users/nkcmr/.homebrew/sbin/mtr", "--split", "-c", "5", "nick.comer.io")
// mtr is being weird. starts in background. drives up CPU allocation
// to 100% and just doesn't say anything :(
cmdoutput := make(chan []byte, 1)
go func() {
out, _ := mtrc.CombinedOutput()
cmdoutput <- out
close(cmdoutput)
}()
select {
case out := <-cmdoutput:
os.Stdout.Write(append(out, '\n'))
case <-time.After(time.Duration(5) * time.Second):
mtrc.Process.Kill()
}
// mtrcOut, _ := mtrc.StdoutPipe()
// defer mtrcOut.Close()
// err := mtrc.Start()
// for {
// out := make([]byte, chunkSize)
// n, err := mtrcOut.Read(out)
// if n == 0 || err == io.EOF {
// break
// }
// out = out[0:n]
// out = bytes.Trim(out, "\n ")
// for _, outl := range bytes.Split(out, []byte("\n")) {
// outc <- LineUpdate{
// Checker: "mtr",
// Sequence: atomic.AddUint64(&seq, 1),
// LineNumber: lno,
// LineContent: []string{string(outl)},
// }
// lno++
// }
// }
}()
return outc
},
}
channels := make([]chan LineUpdate, 0)
for _, c := range checkers {
channels = append(channels, c())
}
updates := lineUpdateFanIn(channels...)
coreLoop:
for {
select {
case lu, ok := <-updates:
if !ok {
fmt.Println("looks like we're all done here.")
break coreLoop
}
data, _ := json.Marshal(&lu)
os.Stdout.Write(append(data, '\n'))
case <-time.After(time.Duration(5) * time.Second):
fmt.Println("hmm. haven't received any updates in a while. gonna die now...")
break coreLoop
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment