Skip to content

Instantly share code, notes, and snippets.

@rcarver
Last active February 1, 2016 00:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rcarver/59633d83fb9ef4634111 to your computer and use it in GitHub Desktop.
Save rcarver/59633d83fb9ef4634111 to your computer and use it in GitHub Desktop.
Example pipeline program in go
// Package main runs a program that counts input lines. It's an example of a
// program that cleanly handles input and output to pipes, and cancellation.
//
// Examples
//
// # Compile
//
// > go build -o go-pipeline .
//
// # Success
//
// > echo 'abc\ndefg\nhij\na\nb\nc' | ./go-pipeline -name a -sleep 0 | ./go-pipeline -name b -sleep 2s
// a Counted 6
// a Exiting
// b:1 a:1 abc
// b:2 a:2 defg
// b:3 a:3 hij
// b:4 a:4 a
// b:5 a:5 b
// b:6 a:6 c
// b Counted 6
// b Exiting
//
// # Cancel the second process with CTRL-C
//
// › echo 'abc\ndefg\nhij\na\nb\nc' | ./go-pipeline -name a -sleep 0 | ./go-pipeline -name b -sleep 1s
// a Counted 6
// a Exiting
// b:1 a:1 abc
// b:2 a:2 defg
// ^Cb Cancelling: interrupt
// b Counted 2
//
// # Cancel both processes with CTRL-C
//
// › echo 'abc\ndefg\nhij\na\nb\nc' | ./go-pipeline -name a -sleep 1s | ./go-pipeline -name b -sleep 1s
// b:1 a:1 abc
// b:2 a:2 defg
// ^Cb Cancelling: interrupt
// a Cancelling: interrupt
// a Counted 2
// b Counted 2
//
package main
import (
"bufio"
"flag"
"log"
"os"
"os/signal"
"sync"
"time"
"golang.org/x/net/context"
)
func main() {
// Parse inputs
var (
name string
sleep time.Duration
)
flag.StringVar(&name, "name", "a", "name, used in output")
flag.DurationVar(&sleep, "sleep", 0, "sleep between lines")
flag.Parse()
// Initialize output streams.
stdout := log.New(os.Stdout, "", 0)
stderr := log.New(os.Stderr, "", 0)
// Verify input stream.
if !isPipe(os.Stdin) {
stderr.Printf("No input")
os.Exit(1)
}
// Catch signals for cancellation.
sigs := make(chan os.Signal)
signal.Notify(sigs,
os.Interrupt, // CTRL-C
)
// Context drives control flow for cancellation.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle signals.
go func() {
// Handle signal.
sig := <-sigs
stderr.Printf("%s Cancelling: %s", name, sig)
cancel()
}()
// Handle shutdown.
var shutdown sync.WaitGroup
shutdown.Add(1)
go func() {
// Coordinate other shutdown activity.
<-ctx.Done()
stderr.Printf("%s Exiting", name)
shutdown.Done()
}()
// Main program.
var cnt int
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
select {
case <-ctx.Done(): // Exit loop if canceled.
break
default:
cnt++
text := scanner.Text()
stdout.Printf("%s:%d %s\n", name, cnt, text)
select {
case <-time.After(sleep):
case <-ctx.Done(): // Abort sleep if canceled.
}
}
}
stderr.Printf("%s Counted %d", name, cnt)
// Exit.
cancel() // Ensure shutdown is triggered.
shutdown.Wait() // Ensure shutdown is complete.
os.Exit(0)
}
func isPipe(f *os.File) bool {
info, _ := f.Stat()
if (info.Mode() & os.ModeCharDevice) == os.ModeCharDevice {
return false
}
return true
}
run: build
echo 'a\nb\nc\nd\ne' | ./go-pipeline -name a -sleep 1s | ./go-pipeline -name b -sleep 2s
build:
go build -o go-pipeline .
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment