Skip to content

Instantly share code, notes, and snippets.

@albertoleal
Last active June 9, 2016 12:12
Show Gist options
  • Save albertoleal/7c4e05efe45883ce91cbbae8b9cf46c5 to your computer and use it in GitHub Desktop.
Save albertoleal/7c4e05efe45883ce91cbbae8b9cf46c5 to your computer and use it in GitHub Desktop.
Gocached with channels
type Server interface {
Running() bool
Start() error
Stop() error
}
type cache struct {
MagicNumber byte
MessageType byte
KeyLength uint16
ExtrasLength byte
DataType byte
}
type cacheRequest struct {
cache
ReservedField uint16
TotalMessageBody uint32
Opaque uint32
Cas uint64
}
type cacheResponse struct {
cache
Status uint16
BodyLength uint32
Opaque uint32
Cas uint64
}
type tcpServer struct {
host string
port int32
responseCh <-chan cacheResponse
closing chan chan error
server net.Listener
storage store.Storage
running bool
}
type request struct {
connection net.Conn
request cacheRequest
}
func (s *tcpServer) Start() error {
s.responseCh = make(chan cacheResponse)
...
go acceptClients()
}
func (s *tcpServer) Stop() error {
errCh := make(chan error)
s.closing <- errCh
return <-errCh
}
func (s *tcpServer) acceptClients() {
for {
go s.handleConnection()
conn, _ := s.server.Accept()
s.connections <- conn
}
}
func (s *tcpServer) handleConnection() {
var err error
for {
select {
case connection := <-s.connections:
go s.handleRequest(connection)
case request := <-s.responseCh:
s.respondTo(request)
case errCh := <-s.closing:
errCh <- err
close(s.closing)
s.running = false
return
}
}
}
func (s *tcpServer) handleRequest(connection net.Conn) {
var req cacheRequest
err := binary.Read(connection, binary.BigEndian, &req)
if err == io.EOF {
return
}
if err != nil {
fmt.Println("binary.Read failed:", err)
return
}
extras := make([]byte, int32(req.ExtrasLength), int32(req.ExtrasLength))
if req.ExtrasLength > 0 {
_, err = io.ReadFull(connection, extras)
if err != nil {
fmt.Println("reading extras failed:", err)
return
}
}
s.requestCh <- request{
connection: connection,
request: req,
}
}
func (s *tcpServer) respondTo(req cacheRequest) {
keyBytes := make([]byte, req.request.KeyLength, req.request.KeyLength)
if _, err := io.ReadFull(req.connection, keyBytes); err != nil {
fmt.Println("reading key failed:", err)
return
}
key := string(keyBytes)
resp := prepareResponse(&req.request)
switch req.request.MessageType {
case messageGet:
if !s.handleGet(req.connection, resp, key) {
break
}
case messageSet:
if !s.handleSet(req.connection, &req.request, resp, key) {
break
}
case messageDelete:
if !s.handleDelete(req.connection, resp, key) {
break
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment