Skip to content

Instantly share code, notes, and snippets.

@scttnlsn
Last active July 5, 2023 14:30
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save scttnlsn/5537977 to your computer and use it in GitHub Desktop.
Save scttnlsn/5537977 to your computer and use it in GitHub Desktop.
Queue server in Go
package main
import (
"bufio"
"flag"
"fmt"
"net"
"strconv"
"strings"
"time"
)
// ----------------------------------------------------------------------------
// Item
// ----------------------------------------------------------------------------
var n = 1
var items = map[int]*Item{}
type Item struct {
id int
value string
dequeued bool
complete chan bool
}
func NewItem(value string) *Item {
id := n
complete := make(chan bool)
n += 1
item := &Item{id, value, false, complete}
items[id] = item
return item
}
func CompleteItem(id int) bool {
item, ok := items[id]
if ok {
if !item.dequeued {
ok = false
} else {
delete(items, id)
item.Complete()
}
}
return ok
}
func (i *Item) Complete() {
i.complete <- true
}
// ----------------------------------------------------------------------------
// Queue
// ----------------------------------------------------------------------------
type Queue chan *Item
func NewQueue() Queue {
return make(chan *Item)
}
func (q Queue) Enqueue(item *Item) {
item.dequeued = false
q <- item
}
func (q Queue) Dequeue(block bool) (*Item, bool) {
if block {
item := <-q
go q.SetTimeout(item)
return item, true
} else {
select {
case item := <-q:
go q.SetTimeout(item)
return item, true
default:
return nil, false
}
}
return nil, false
}
func (q Queue) SetTimeout(item *Item) {
item.dequeued = true
timeout := time.After(time.Second * 30)
select {
case <-timeout:
q.Enqueue(item)
case <-item.complete:
return
}
}
// ----------------------------------------------------------------------------
// Client
// ----------------------------------------------------------------------------
type Client struct {
conn net.Conn
buffer *bufio.Reader
queue Queue
}
func NewClient(conn net.Conn, q Queue) *Client {
b := bufio.NewReader(conn)
return &Client{conn, b, q}
}
func (c *Client) Handle() {
for {
line, err := c.buffer.ReadBytes('\n')
if err != nil {
break
}
tokens := strings.SplitN(strings.Replace(string(line), "\n", "", -1), " ", 2)
switch tokens[0] {
case "enq":
if len(tokens) > 1 {
c.Enqueue(tokens[1])
} else {
fmt.Fprintf(c.conn, "ERR missing arg\n")
}
case "deq":
c.Dequeue(false)
case "bdeq":
c.Dequeue(true)
case "done":
if len(tokens) > 1 {
id, err := strconv.Atoi(tokens[1])
if err != nil {
fmt.Fprintf(c.conn, "ERR invalid id (%s)\n", tokens[1])
} else {
c.Complete(id)
}
} else {
fmt.Fprintf(c.conn, "ERR missing arg\n")
}
case "quit":
c.conn.Close()
return
default:
fmt.Fprintf(c.conn, "ERR unknown command (%s)\n", tokens[0])
}
}
}
func (c *Client) Enqueue(val string) {
item := NewItem(val)
go c.queue.Enqueue(item)
fmt.Fprintf(c.conn, "%d: OK\n", item.id)
}
func (c *Client) Dequeue(block bool) {
if item, ok := c.queue.Dequeue(block); ok {
fmt.Fprintf(c.conn, "%d: \"%s\"\n", item.id, item.value)
} else {
fmt.Fprintf(c.conn, "null\n")
}
}
func (c *Client) Complete(id int) {
if ok := CompleteItem(id); ok {
fmt.Fprintf(c.conn, "%d: OK\n", id)
} else {
fmt.Fprintf(c.conn, "ERR invalid id (%d)\n", id)
}
}
// ----------------------------------------------------------------------------
// Main
// ----------------------------------------------------------------------------
var port uint
func init() {
flag.UintVar(&port, "port", 8080, "the port to listen on")
}
func main() {
q := NewQueue()
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
fmt.Println(err.Error())
return
}
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println(err.Error())
return
}
client := NewClient(conn, q)
go client.Handle()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment