Skip to content

Instantly share code, notes, and snippets.

@Gitart
Forked from fabrizioc1/redis_failover_proxy.go
Created September 25, 2018 10:15
Show Gist options
  • Save Gitart/8e06fe2e0c57754a2c1a202bc1f32df4 to your computer and use it in GitHub Desktop.
Save Gitart/8e06fe2e0c57754a2c1a202bc1f32df4 to your computer and use it in GitHub Desktop.
Redis Failover Proxy
package main
import (
"fmt"
"log"
"net"
"sync"
"time"
"strings"
"runtime"
)
//var DebugMode bool = len(os.Getenv("FAILOVER_PROXY_DEBUG")) > 0
func pipeIO(input net.Conn, output net.Conn, finished *sync.WaitGroup) {
defer finished.Done()
var buf [65536]byte
for {
input.SetDeadline(time.Now().Add(500 * time.Millisecond))
output.SetDeadline(time.Now().Add(500 * time.Millisecond))
bytes_read, read_err := input.Read(buf[0:])
if read_err != nil {
return
}
//fmt.Println(string(buf[0:]))
_, write_err := output.Write(buf[0:bytes_read])
if write_err != nil {
log.Printf("pipeIO write error: %v",write_err)
return
}
}
}
func handleConnection(input net.Conn, requests chan bool, connections chan net.Conn) {
requests <- true
output := <- connections
defer input.Close()
defer output.Close()
var finished sync.WaitGroup
finished.Add(2)
go pipeIO(input,output,&finished)
go pipeIO(output,input,&finished)
finished.Wait()
//log.Printf("handleConnection: finished")
}
func connectionManager(servers []string, requests chan bool, connections chan net.Conn) {
master := 0
log.Printf("connectionManager: master is %s", servers[master])
for {
select {
case <- requests:
conn, err := createConnection(servers,&master)
if err != nil {
log.Fatalf("connectionManager fatal error: %v",err)
}
connections <- conn
}
}
}
//
// Failover to next server if connection can not be established
// Give up after a number of attempts
//
func createConnection(servers []string, master *int) (net.Conn, error) {
var output net.Conn
var err error
for i:=1; i<=len(servers)*2; i++ {
output, err = net.Dial("tcp", servers[*master])
if err == nil {
return output, err
}
log.Printf("createConnection error #%d: %v",i,err)
executeFailover(servers,master)
}
return output, err
}
func executeFailover(servers []string, master *int) {
*master = (*master + 1) % len(servers)
becomeMaster(servers[*master])
log.Printf("executeFailover: master changed to %s", servers[*master])
}
func becomeMaster(server string) {
conn, err := net.Dial("tcp", server)
if err != nil {
log.Fatalf("becomeMaster error: %v",err)
}
redisCommand(conn,"SLAVEOF NO ONE")
defer conn.Close()
}
//
// Send a command to Redis
// Protocol details: http://redis.io/topics/protocol
//
func redisCommand(conn net.Conn, command string) {
words := strings.Split(command," ")
fmt.Fprintf(conn, "*%d\r\n",len(words))
for _, word := range words {
fmt.Fprintf(conn, "$%d\r\n%s\r\n",len(word),word)
}
}
// const MaxConnections = 5
func main() {
requests := make(chan bool)
connections := make(chan net.Conn)
log.Printf("NumCPU: %d",runtime.NumCPU())
log.Printf("GOMAXPROCS: %d",runtime.GOMAXPROCS(0))
ln, err := net.Listen("tcp", ":7000")
if err != nil {
log.Fatal("main listen error: %v",err)
}
servers := []string{"localhost:7001", "localhost:7002"}
go connectionManager(servers, requests, connections)
for {
conn, err := ln.Accept()
if err != nil {
log.Printf("main accept error: %v",err)
continue
}
go handleConnection(conn, requests, connections)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment