Skip to content

Instantly share code, notes, and snippets.

@apg
Created January 16, 2015 19:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save apg/574aaab6ad3475d4dd94 to your computer and use it in GitHub Desktop.
Save apg/574aaab6ad3475d4dd94 to your computer and use it in GitHub Desktop.
really crappy prototype of dawk
package main
import (
"bufio"
"io"
"regexp"
"strings"
"sync"
)
type Program struct {
Begin func() Environment
Rules []*Rule
End func(Frame)
channels []chan string
results chan Frame
wg *sync.WaitGroup
}
type Environment struct {
FS string // field separator
}
type Rule struct {
RE *regexp.Regexp
Action func(Environment, Frame, []string, string)
Input chan string
Output chan Frame
}
// this will eventually be a CRDT, but for now, just a map that can be "merged" via adding all the results.
type Frame map[string]int
func NewFrame() Frame {
return make(Frame)
}
func NewProgram(begin func() Environment, rules []*Rule, end func(Frame)) *Program {
channels := make([]chan string, len(rules))
for i, _ := range rules {
channels[i] = make(chan string)
}
results := make(chan Frame, len(rules))
return &Program{
Begin: begin,
Rules: rules,
End: end,
channels: channels,
results: results,
wg: new(sync.WaitGroup),
}
}
func (p *Program) Run(input *bufio.Reader) {
environment := p.Begin()
// setup rules for this program and start them.
for i, r := range p.Rules {
r.Input = p.channels[i]
r.Output = p.results
go r.Run(environment, p.wg)
}
// read input and dispatch it.
line, _, err := input.ReadLine()
for err != io.EOF {
p.Send(string(line))
line, _, err = input.ReadLine()
}
// close the channels
for _, c := range p.channels {
close(c)
}
merged := p.wait() // until all rules have finished.
p.End(merged)
}
// Send tests each line against a Rule's RE, and submits it to an action if matches.
func (p *Program) Send(line string) {
for i, r := range p.Rules {
if r.RE.MatchString(line) {
p.channels[i] <- line
}
}
}
func (p *Program) wait() Frame {
p.wg.Wait() // wait until we're sure the channel submit happened. Then merge them all.
accum := NewFrame()
for _ = range p.Rules {
frame := <-p.results
accum.Merge(frame)
}
return accum
}
func (r *Rule) Run(env Environment, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
accum := NewFrame()
for s := range r.Input {
r.Action(env, accum, strings.Split(s, env.FS), s)
}
r.Output <- accum
}
func (f Frame) Merge(o Frame) {
for k, v := range o {
f[k] += v
}
}
package main
import (
"bufio"
"fmt"
"os"
"regexp"
)
func main() {
yesRule := &Rule{
RE: regexp.MustCompile("yes"),
Action: func(env Environment, f Frame, Fields []string, Full string) {
f["yeses"] += 1
},
}
noRule := &Rule{
RE: regexp.MustCompile("no"),
Action: func(env Environment, f Frame, Fields []string, Full string) {
f["nos"] += 1
},
}
rules := []*Rule{yesRule, noRule}
begin := func() Environment {
return Environment{" "}
}
end := func(f Frame) {
fmt.Printf("Yes: %d\nNo: %d\n", f["yeses"], f["nos"])
switch {
case f["yeses"] > f["nos"]:
fmt.Println("Yes wins!")
case f["yeses"] < f["nos"]:
fmt.Println("No wins!")
default:
fmt.Println("Tie!")
}
}
prog := NewProgram(begin, rules, end)
prog.Run(bufio.NewReader(os.Stdin))
}
[apg@amend dawk]$ ./dawk
yes
no
yes
Yes: 2
No: 1
Yes wins!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment