-
-
Save Gitart/8e06fe2e0c57754a2c1a202bc1f32df4 to your computer and use it in GitHub Desktop.
Redis Failover Proxy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"log" | |
"net" | |
"sync" | |
"time" | |
"strings" | |
"runtime" | |
) | |
//var DebugMode bool = len(os.Getenv("FAILOVER_PROXY_DEBUG")) > 0 | |
func pipeIO(input net.Conn, output net.Conn, finished *sync.WaitGroup) { | |
defer finished.Done() | |
var buf [65536]byte | |
for { | |
input.SetDeadline(time.Now().Add(500 * time.Millisecond)) | |
output.SetDeadline(time.Now().Add(500 * time.Millisecond)) | |
bytes_read, read_err := input.Read(buf[0:]) | |
if read_err != nil { | |
return | |
} | |
//fmt.Println(string(buf[0:])) | |
_, write_err := output.Write(buf[0:bytes_read]) | |
if write_err != nil { | |
log.Printf("pipeIO write error: %v",write_err) | |
return | |
} | |
} | |
} | |
func handleConnection(input net.Conn, requests chan bool, connections chan net.Conn) { | |
requests <- true | |
output := <- connections | |
defer input.Close() | |
defer output.Close() | |
var finished sync.WaitGroup | |
finished.Add(2) | |
go pipeIO(input,output,&finished) | |
go pipeIO(output,input,&finished) | |
finished.Wait() | |
//log.Printf("handleConnection: finished") | |
} | |
func connectionManager(servers []string, requests chan bool, connections chan net.Conn) { | |
master := 0 | |
log.Printf("connectionManager: master is %s", servers[master]) | |
for { | |
select { | |
case <- requests: | |
conn, err := createConnection(servers,&master) | |
if err != nil { | |
log.Fatalf("connectionManager fatal error: %v",err) | |
} | |
connections <- conn | |
} | |
} | |
} | |
// | |
// Failover to next server if connection can not be established | |
// Give up after a number of attempts | |
// | |
func createConnection(servers []string, master *int) (net.Conn, error) { | |
var output net.Conn | |
var err error | |
for i:=1; i<=len(servers)*2; i++ { | |
output, err = net.Dial("tcp", servers[*master]) | |
if err == nil { | |
return output, err | |
} | |
log.Printf("createConnection error #%d: %v",i,err) | |
executeFailover(servers,master) | |
} | |
return output, err | |
} | |
func executeFailover(servers []string, master *int) { | |
*master = (*master + 1) % len(servers) | |
becomeMaster(servers[*master]) | |
log.Printf("executeFailover: master changed to %s", servers[*master]) | |
} | |
func becomeMaster(server string) { | |
conn, err := net.Dial("tcp", server) | |
if err != nil { | |
log.Fatalf("becomeMaster error: %v",err) | |
} | |
redisCommand(conn,"SLAVEOF NO ONE") | |
defer conn.Close() | |
} | |
// | |
// Send a command to Redis | |
// Protocol details: http://redis.io/topics/protocol | |
// | |
func redisCommand(conn net.Conn, command string) { | |
words := strings.Split(command," ") | |
fmt.Fprintf(conn, "*%d\r\n",len(words)) | |
for _, word := range words { | |
fmt.Fprintf(conn, "$%d\r\n%s\r\n",len(word),word) | |
} | |
} | |
// const MaxConnections = 5 | |
func main() { | |
requests := make(chan bool) | |
connections := make(chan net.Conn) | |
log.Printf("NumCPU: %d",runtime.NumCPU()) | |
log.Printf("GOMAXPROCS: %d",runtime.GOMAXPROCS(0)) | |
ln, err := net.Listen("tcp", ":7000") | |
if err != nil { | |
log.Fatal("main listen error: %v",err) | |
} | |
servers := []string{"localhost:7001", "localhost:7002"} | |
go connectionManager(servers, requests, connections) | |
for { | |
conn, err := ln.Accept() | |
if err != nil { | |
log.Printf("main accept error: %v",err) | |
continue | |
} | |
go handleConnection(conn, requests, connections) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment