Skip to content

Instantly share code, notes, and snippets.

@clee
Last active April 3, 2016 17:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clee/92d4b260596411bf74366e9e2d789745 to your computer and use it in GitHub Desktop.
Save clee/92d4b260596411bf74366e9e2d789745 to your computer and use it in GitHub Desktop.
watchdog daemon for reconfiguring multihome routing setups
package main
import (
"io"
"log"
"strconv"
"os/exec"
"bytes"
"github.com/moovweb/gokogiri"
"github.com/moovweb/gokogiri/xpath"
)
func iptables_read() []byte {
buf := new(bytes.Buffer)
iptables_save := exec.Command("iptables-save")
iptables_xml := exec.Command("iptables-xml")
reader, writer := io.Pipe()
iptables_save.Stdout = writer
iptables_xml.Stdin = reader
iptables_xml.Stdout = buf
iptables_save.Start()
iptables_xml.Start()
if err := iptables_save.Wait(); err != nil {
log.Printf("error from iptables-save: %v\n", err)
}
writer.Close()
if err := iptables_xml.Wait(); err != nil {
log.Printf("error from iptables-xml: %v\n", err)
}
return buf.Bytes()
}
func iptables_write(xml []byte) {
buf := new(bytes.Buffer)
xsltproc := exec.Command("xsltproc", "/usr/share/iptables/iptables.xslt", "/dev/stdin")
iptables_restore := exec.Command("iptables-restore")
reader, writer := io.Pipe()
xsltproc.Stdin = bytes.NewBuffer(xml)
xsltproc.Stdout = writer
iptables_restore.Stdin = reader
iptables_restore.Stdout = buf
xsltproc.Start()
iptables_restore.Start()
if err := xsltproc.Wait(); err != nil {
log.Printf("error from xsltproc: %v\n", err)
}
writer.Close()
if err := iptables_restore.Wait(); err != nil {
log.Printf("error from iptables-restore: %v\n", err)
}
}
func iptables_set_probability(p float64) {
// log.Printf("setting probability for secondary provider to %f\n", p)
// modify the rule that has the 'probability' member
xml_data := iptables_read()
xml_doc, err := gokogiri.ParseXml(xml_data)
if err != nil {
log.Printf("error parsing iptables XML: %v\n", err)
}
probability_query := xpath.Compile("/iptables-rules/table/chain/rule/conditions/statistic/probability")
nodes, err := xml_doc.Root().Search(probability_query)
if len(nodes) != 1 {
log.Printf("weird: number of probability nodes in XML is %v\n", len(nodes))
}
probability := nodes[0]
probability.SetContent(strconv.FormatFloat(p, 'f', 11, 64))
iptables_write(xml_doc.Root().ToBuffer(nil))
}
package main
import (
"log"
"sync"
"time"
"github.com/looplab/fsm"
)
type IPLink struct {
Gateway, Source string
Sequence chan int
Up bool
}
func main() {
s := fsm.NewFSM(
"init",
fsm.Events{
{Name: "balance", Src: []string{"init", "primary_danger", "primary_down", "secondary_danger", "secondary_down"}, Dst: "balanced"},
{Name: "warn_primary", Src: []string{"init", "balanced", "secondary_danger", "secondary_down"}, Dst: "primary_danger"},
{Name: "warn_secondary", Src: []string{"init", "balanced", "primary_danger", "primary_down"}, Dst: "secondary_danger"},
{Name: "disable_primary", Src: []string{"primary_danger"}, Dst: "primary_down"},
{Name: "disable_secondary", Src: []string{"secondary_danger"}, Dst: "secondary_down"},
},
fsm.Callbacks{
"balance": func(e *fsm.Event) {
if e.Src == "primary_danger" || e.Src == "secondary_danger" {
// log.Printf("recovered from %s hiccup\n", e.Src)
} else if e.Src == "init" || e.Src == "primary_down" || e.Src == "secondary_down" {
log.Println("restoring balance to the Force")
iptables_set_probability(1.0 / 3.0)
}
},
"disable_primary": func(e *fsm.Event) {
log.Println("primary link down, disabling traffic")
iptables_set_probability(1.0)
},
"disable_secondary": func(e *fsm.Event) {
log.Println("secondary link down, disabling traffic")
iptables_set_probability(0.0)
},
},
)
// assume that we're starting off balanced... for now
links := []IPLink{
{"71.56.220.1", "71.56.223.49", seq(), true},
{"207.225.112.5", "75.171.245.221", seq(), true},
}
waitgroup := new(sync.WaitGroup)
for {
current_ping_start := time.Now()
// ping both IPs at the same time
waitgroup.Add(len(links))
for link_index, _ := range links {
go func(link_index int) {
defer waitgroup.Done()
l := &links[link_index]
if err := ping4(l.Gateway, l.Source, l.Sequence); err != nil {
l.Up = false
// log.Printf("link %d is down!\n", link_index)
} else {
l.Up = true
}
}(link_index)
}
waitgroup.Wait()
primary_up := links[0].Up
secondary_up := links[1].Up
switch s.Current() {
case "balanced":
if !primary_up && secondary_up {
s.Event("warn_primary")
} else if primary_up && !secondary_up {
s.Event("warn_secondary")
}
case "primary_danger":
if primary_up && secondary_up {
s.Event("balance")
} else if !primary_up && secondary_up {
s.Event("disable_primary")
} else if primary_up && !secondary_up {
s.Event("warn_secondary")
}
case "secondary_danger":
if primary_up && secondary_up {
s.Event("balance")
} else if !primary_up && secondary_up {
s.Event("warn_primary")
} else if primary_up && !secondary_up {
s.Event("disable_secondary")
}
case "primary_down":
if primary_up && secondary_up {
s.Event("balance")
} else if primary_up && !secondary_up {
s.Event("warn_secondary")
}
case "secondary_down":
if primary_up && secondary_up {
s.Event("balance")
} else if !primary_up && secondary_up {
s.Event("warn_primary")
}
default:
s.Event("balance")
}
time_to_wait := (1 * time.Second) - time.Now().Sub(current_ping_start)
if time_to_wait < 0 || time_to_wait > 1 * time.Second {
time_to_wait = 1 * time.Second
}
time.Sleep(time_to_wait)
}
}
package main
import (
"errors"
"fmt"
"math"
"net"
"os"
"time"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
)
func seq() chan int {
c := make(chan int)
go func() {
for i := int(0);; i++ {
if i == math.MaxInt32 {
i = 0
}
c <- i
}
}()
return c
}
func ping4(target, source string, sequence chan int) error {
network := "ip4:icmp"
dst := &net.IPAddr{IP: net.ParseIP(target).To4()}
if dst == nil {
return errors.New("Failed to parse target IP address!")
}
protocol := 1 // iana.ProtocolICMP
mtype := ipv4.ICMPTypeEcho
c, err := icmp.ListenPacket(network, source)
if err != nil {
return err
}
defer c.Close()
write_message := icmp.Message{
Type: mtype,
Code: 0,
Body: &icmp.Echo{
ID: os.Getpid() & 0xFFFF, Seq: <- sequence,
Data: []byte("Sentry mode activated."),
},
}
write_bytes, err := write_message.Marshal(nil)
if err != nil {
return err
}
ping_start := time.Now()
if n, err := c.WriteTo(write_bytes, dst); err != nil {
return err
} else if n != len(write_bytes) {
return fmt.Errorf("got %v; want %v", n, len(write_bytes))
}
read_bytes := make([]byte, 1500)
if err := c.SetReadDeadline(time.Now().Add(800 * time.Millisecond)); err != nil {
return err
}
n, peer, err := c.ReadFrom(read_bytes)
if err != nil {
return err
}
ping_latency := time.Now().Sub(ping_start)
read_message, err := icmp.ParseMessage(protocol, read_bytes[:n])
if err != nil {
return err
}
switch read_message.Type {
case ipv4.ICMPTypeEchoReply:
// fmt.Printf("latency: %+v (%+v from %v)\n", ping_latency, read_message, peer)
return nil
case ipv4.ICMPTypeDestinationUnreachable:
return fmt.Errorf("destination %s unreachable", target)
default:
return fmt.Errorf("latency: %+v (got %+v from %v; wanted echo reply)", ping_latency, read_message, peer)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment