Skip to content

Instantly share code, notes, and snippets.

@excavador
Last active July 7, 2017 04:29
Show Gist options
  • Save excavador/1da0c88504d36ba8b19f6cfe1ae5cca1 to your computer and use it in GitHub Desktop.
Save excavador/1da0c88504d36ba8b19f6cfe1ae5cca1 to your computer and use it in GitHub Desktop.
$(FMT).state: bin/xargs-if-changed
rm -f $@
touch $@
$(FMT): bin/xargs-if-changed bin/gofmt $(FMT).state $(SIGN__GO__SOURCE) $(SIGN__GO__TEST) $(SIGN__GO__GENERATED)
$(info [fmt])
make/02.files-go-source.sh | XARGS_STATE=$(FMT).state XARGS_WRAP_OUTPUT=0 bin/xargs-if-changed bin/gofmt -l -s -w
make/02.files-go-test.sh | XARGS_STATE=$(FMT).state XARGS_WRAP_OUTPUT=0 bin/xargs-if-changed bin/gofmt -l -s -w
touch $@
fmt: $(FMT)
package prefixer
import (
"bytes"
"io"
)
type writer struct {
prefix []byte
delimeter []byte
insert bool
target io.Writer
}
var _ io.Writer = &writer{}
func NewWriter(target io.Writer, prefix []byte, delimeter []byte) io.Writer {
if len(delimeter) == 0 {
panic("delimeter must be not empty")
}
if target == nil {
return nil
}
if len(prefix) == 0 {
return target
}
return &writer{
prefix: prefix,
insert: true,
target: target,
delimeter: delimeter,
}
}
func (w *writer) Write(p []byte) (n int, err error) {
var current, written int
for current != -1 && current < len(p) {
// write prefix if need it
if w.insert {
written, err = w.target.Write(w.prefix)
if err != nil {
return
}
w.insert = false
}
// search new line
stop := bytes.Index(p[current:], w.delimeter)
// extract line
var line []byte
if stop == -1 {
line = p[current:]
current = stop
} else {
stop = current + stop + len(w.delimeter)
line = p[current:stop]
w.insert = true
current = stop
}
// write data
written, err = w.target.Write(line)
n += written
if err != nil {
return
}
}
return
}
package prefixer_test
import (
"bytes"
"io"
"lib/prefixer"
"os"
"testing"
"github.com/stretchr/testify/assert"
)
func TestNegativeDelimeter(t *testing.T) {
for _, writer := range []io.Writer{nil, os.Stdout} {
for _, prefix := range [][]byte{nil, {}, []byte("[prefix]")} {
for _, delimeter := range [][]byte{nil, {}} {
assert.Panics(t, func() {
prefixer.NewWriter(os.Stdout, []byte("[prefix]"), delimeter)
}, "should panic on prefixer.NewWriter(%s, %s, nil)", writer, prefix)
}
}
}
}
func TestNegativeWriter(t *testing.T) {
for _, prefix := range [][]byte{nil, {}, []byte("[prefix]")} {
assert.Equal(t, nil, prefixer.NewWriter(nil, prefix, []byte("[delimeter]")),
`should return nil prefixer.NewWriter(nil, %s, <delimeter>`, prefix)
}
}
func TestNegativePrefix(t *testing.T) {
for _, prefix := range [][]byte{nil, {}} {
assert.Equal(t, os.Stdout, prefixer.NewWriter(os.Stdout, nil, []byte("[delimeter]")),
"should return <writer> on prefixer.NewWriter(<writer>, %s, <delimeter>)", prefix)
}
}
func Test(t *testing.T) {
var data = map[string]string{
"": "",
"a": "[prefix]a",
"a[delimeter]": "[prefix]a[delimeter]",
"a[delimeter]b": "[prefix]a[delimeter][prefix]b",
"a[delimeter]b[delimeter]": "[prefix]a[delimeter][prefix]b[delimeter]",
"aa": "[prefix]aa",
"aa[delimeter]": "[prefix]aa[delimeter]",
"aa[delimeter]bb": "[prefix]aa[delimeter][prefix]bb",
"aa[delimeter]bb[delimeter]": "[prefix]aa[delimeter][prefix]bb[delimeter]",
}
for input, expected := range data {
var result bytes.Buffer
writer := prefixer.NewWriter(&result, []byte("[prefix]"), []byte("[delimeter]"))
writer.Write([]byte(input))
assert.Equal(t, expected, result.String(), "input: %s", input)
}
}
package main
import (
"bufio"
"context"
"crypto/md5"
"flag"
"fmt"
"io/ioutil"
"lib/prefixer"
"os"
"os/exec"
"os/signal"
"runtime"
"sort"
"strconv"
"strings"
"sync"
)
type (
Path string
Sum string
Target map[Path]map[Sum]bool
)
func Usage() {
fmt.Fprintln(os.Stderr, `Usage: xargs-if-changed <tool with arguments>
tool
1. reads file paths from stdin
2. reads content of every file and calculate MD5
3. checks MD5 for this file in <XARGS_STATE>
4. if file was changed executes "<tool with arguments> <file path>"
5. if execution was successfull writes actual MD5 for <file path> to <XARGS_STATE>
environment variables:
XARGS_STATE <file path> path to file with state
XARGS_WRAP_OUTPUT <0|1> wrap or not <tool> stdout/stderr by prefix "<file>: "
XARGS_PARALLEL <integer> worker count`)
}
func ReadTarget(path string) (target Target, err error) {
target = make(Target)
var content []byte
if content, err = ioutil.ReadFile(path); err != nil {
// missed target, it's ok
err = nil
return
}
scanner := bufio.NewScanner(strings.NewReader(string(content)))
for lineno := 0; scanner.Scan(); lineno++ {
line := scanner.Text()
line = strings.TrimSuffix(line, "\n")
if len(line) == 0 {
continue
}
words := strings.Split(line, "\t")
if len(words) != 2 {
err = fmt.Errorf("parse target %s:%d invalid syntax\n%s", path, lineno, line)
fmt.Fprintf(os.Stderr, "%s", err)
return
}
sourcePath, sourceSum := Path(words[0]), Sum(words[1])
if _, ok := target[sourcePath]; !ok {
target[sourcePath] = make(map[Sum]bool)
}
target[sourcePath][sourceSum] = true
}
return
}
func Main() (err error) {
flag.Usage = Usage
flag.Parse()
if flag.NArg() < 1 {
flag.Usage()
err = fmt.Errorf("you must provide tool for run")
fmt.Fprintln(os.Stderr, err)
return
}
targetPath := os.Getenv("XARGS_STATE")
if len(targetPath) == 0 {
err = fmt.Errorf("you must specify XARGS_STATE environment variable")
fmt.Fprintln(os.Stderr, err)
return
}
wrapOutput := os.Getenv("XARGS_WRAP_OUTPUT") != "0"
var workerCount int
if str := os.Getenv("XARGS_PARALLEL"); len(str) == 0 {
workerCount = runtime.NumCPU()
} else {
if workerCount, err = strconv.Atoi(str); err != nil {
fmt.Fprintf(os.Stderr, "failed to parse XARGS_PARALLEL environment variable %s", err)
return
}
}
if workerCount < 1 {
err = fmt.Errorf("invalid XARGS_PARALLEL %d (must be positive)", workerCount)
fmt.Fprintln(os.Stderr, err)
return
}
var target Target
if target, err = ReadTarget(targetPath); err != nil {
return
}
task := make(chan Path, workerCount)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
go func() {
select {
case <-ctx.Done():
case <-interrupt:
cancel()
}
}()
workerCtx, workerCancel := context.WithCancel(ctx)
var workerWait sync.WaitGroup
var mutex sync.Mutex
for i := 0; i < workerCount; i++ {
workerWait.Add(1)
go func() {
defer workerWait.Done()
for path := range task {
select {
case <-workerCtx.Done():
break
default:
}
var actualSum Sum
var already bool
if content, err := ioutil.ReadFile(string(path)); err != nil {
fmt.Fprintf(os.Stderr, "read '%s' failed %s\n", path, err)
workerCancel()
return
} else {
actualSum = Sum(fmt.Sprintf("%x", md5.Sum(content)))
mutex.Lock()
already = target[path][Sum(actualSum)]
mutex.Unlock()
}
if already {
continue
}
cmd := append(flag.Args(), string(path))
run := exec.CommandContext(workerCtx, cmd[0], cmd[1:]...)
run.Env = os.Environ()
run.Stdout = os.Stdout
run.Stderr = os.Stderr
if wrapOutput {
run.Stdout = prefixer.NewWriter(run.Stdout, []byte(strings.Join(cmd, " ")+": "), []byte("\n"))
run.Stderr = prefixer.NewWriter(run.Stderr, []byte(strings.Join(cmd, " ")+": "), []byte("\n"))
}
var err error
var started bool
if err = run.Start(); err == nil {
started = true
err = run.Wait()
}
switch err {
case nil:
mutex.Lock()
if _, ok := target[path]; !ok {
target[path] = make(map[Sum]bool)
}
target[path][actualSum] = true
mutex.Unlock()
case context.Canceled:
break
default:
if exitError, ok := err.(*exec.ExitError); started && ok {
if !exitError.Exited() {
// canceled by context
err = nil
}
}
if err != nil {
fmt.Fprintf(os.Stderr, "%s failed: %s\n", strings.Join(cmd, " "), err)
workerCancel()
}
break
}
}
}()
}
workerWait.Add(1)
go func() {
defer workerWait.Done()
defer close(task)
for scanner := bufio.NewScanner(os.Stdin); scanner.Scan(); {
if err := scanner.Err(); err != nil {
fmt.Fprintf(os.Stderr, "read stdin problem %s", err)
workerCancel()
return
}
path := Path(strings.TrimSuffix(scanner.Text(), "\n"))
select {
case <-workerCtx.Done():
return
case task <- Path(path):
}
}
}()
workerWait.Wait()
var result []string
for path, inner := range target {
for sum := range inner {
result = append(result, fmt.Sprintf("%s\t%s\n", string(path), string(sum)))
}
}
sort.Strings(result)
if err = ioutil.WriteFile(targetPath, []byte(strings.Join(result, "")), 0664); err != nil {
fmt.Fprintf(os.Stderr, "write target %s failed %s", targetPath, err)
return
}
err = workerCtx.Err()
return
}
func main() {
if err := Main(); err != nil {
os.Exit(1)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment