Skip to content

Instantly share code, notes, and snippets.

@icub3d
Last active October 11, 2016 14:59
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save icub3d/7505798 to your computer and use it in GitHub Desktop.
Save icub3d/7505798 to your computer and use it in GitHub Desktop.
An example of using doozer to keep track of your groupcache instances. If you want to run it yourself, you'll want to: go get github.com/golang/groupcache go get github.com/ha/doozer
package main
import (
"bufio"
"bytes"
"flag"
"fmt"
"github.com/golang/groupcache"
"github.com/ha/doozer"
"io"
"log"
"net"
"net/http"
"os"
"os/signal"
"path"
"strings"
"sync/atomic"
)
var (
addr string // The address of our httpd server.
daddr string // The address of the doozer server.
dictName string // The name of the dictionary.
dictAddr string // The address of the dictionary server.
// This is our groupcache stuff.
pool *groupcache.HTTPPool
dict *groupcache.Group
)
func init() {
flag.StringVar(&addr, "addr", "127.0.0.1:8000",
"the addr:port on which this server should run.")
flag.StringVar(&daddr, "doozer", "127.0.0.1:8046",
"the addr:port on which doozer is running.")
flag.StringVar(&dictName, "dictname", "gcide",
"the name of the dictionary to query.")
flag.StringVar(&dictAddr, "dictaddr", "dict.org:2628",
"the addr:port to the dict server to query.")
}
func main() {
flag.Parse()
// Setup the doozer connection.
d, err := doozer.Dial(daddr)
if err != nil {
log.Fatalf("connecting to doozer: %v\n", err)
}
defer d.Close()
// Setup the cache.
pool = groupcache.NewHTTPPool("http://" + addr)
dict = groupcache.NewGroup("dict", 64<<20, groupcache.GetterFunc(
func(ctx groupcache.Context, key string, dest groupcache.Sink) error {
def, err := query(key)
if err != nil {
err = fmt.Errorf("querying remote dictionary: %v", err)
log.Println(err)
return err
}
log.Println("retrieved remote definition for", key)
dest.SetString(def)
return nil
}))
// Start watching for changes and signals.
go watch(d)
// Add the handler for definition requests and then start the
// server.
http.Handle("/define/", http.HandlerFunc(handler))
log.Println(http.ListenAndServe(addr, nil))
}
// watch updates the peer list of servers based on changes to the
// doozer configuration or signals from the OS.
func watch(d *doozer.Conn) {
peerFile := "/peers"
var peers []string
var rev int64
// Run the initial get.
data, rev, err := d.Get(peerFile, nil)
if err != nil {
log.Println("initial peer list get:", err)
log.Println("using empty set to start")
peers = []string{}
} else {
peers = strings.Split(string(data), " ")
}
// Add myself to the list.
peers = append(peers, "http://"+addr)
rev, err = d.Set(peerFile, rev,
[]byte(strings.Join(peers, " ")))
if err != nil {
log.Println("unable to add myself to the peer list (no longer watching).")
return
}
pool.Set(peers...)
log.Println("added myself to the peer list.")
// Setup signal handling to deal with ^C and others.
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, os.Kill)
// Get the channel that's listening for changes.
updates := wait(d, peerFile, &rev)
for {
select {
case <-sigs:
// Remove ourselves from the peer list and exit.
for i, peer := range peers {
if peer == "http://"+addr {
peers = append(peers[:i], peers[i+1:]...)
d.Set(peerFile, rev, []byte(strings.Join(peers, " ")))
log.Println("removed myself from peer list before exiting.")
}
}
os.Exit(1)
case update, ok := <-updates:
// If the channel was closed, we should stop selecting on it.
if !ok {
updates = nil
continue
}
// Otherwise, update the peer list.
peers = update
log.Println("got new peer list:", strings.Join(peers, " "))
pool.Set(peers...)
}
}
}
// wait waits on a changes for the fiven file starting at the given
// revision from the given doozer connection. It sends updated peer
// lists on the returned channel.
func wait(d *doozer.Conn, file string, rev *int64) chan []string {
c := make(chan []string, 1)
cur := *rev
go func() {
for {
// Wait for the change.
e, err := d.Wait(file, cur+1)
if err != nil {
log.Println("waiting failed (no longer watching):", err)
close(c)
return
}
// Update the revision and send the change on the channel.
atomic.CompareAndSwapInt64(rev, cur, e.Rev)
cur = e.Rev
c <- strings.Split(string(e.Body), " ")
}
}()
return c
}
// handler handles all incoming requests for a definition.
func handler(w http.ResponseWriter, r *http.Request) {
log.Println("received request:", r.Method, r.URL.Path)
word := strings.Trim(path.Base(r.URL.Path), "/")
// Get the definition from groupcache and write it out.
var data []byte
err := dict.Get(nil, word, groupcache.AllocatingByteSliceSink(&data))
if err != nil {
log.Println("retreiving definition for", word, "-", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
io.Copy(w, bytes.NewReader(data))
}
// query is a helper function for the groupcache that queries a remote
// dict server for the first definition of the given word.
func query(word string) (string, error) {
// NOTE: I am aware this is brittle and doesn't really follow the
// protocol all that well.
conn, err := net.Dial("tcp", dictAddr)
if err != nil {
return "", fmt.Errorf("connecting to dict: %v", err)
}
defer conn.Close()
// Send the DEFINE request and read the response into a buffer.
fmt.Fprintf(conn, "DEFINE %s %s\r\n", dictName, word)
scanner := bufio.NewScanner(conn)
var response bytes.Buffer
for scanner.Scan() {
// Read the line, trim any excess new lines
line := scanner.Text()
line = strings.Trim(line, "\r\n")
if strings.HasPrefix(line, "2") || strings.HasPrefix(line, "1") {
// Skip over any control data.
continue
}
if line == "." || line == "" {
// Quit when we reach the end of the first definition.
break
}
// Store the line we just read.
response.WriteString(line)
response.WriteString("\n")
}
// Check for errors after the scan.
if err := scanner.Err(); err != nil {
return "", fmt.Errorf("reading line from connection: %v", err)
}
// Send the QUIT message and return the definition.
fmt.Fprintf(conn, "QUIT\r\n")
return response.String(), nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment