Skip to content

Instantly share code, notes, and snippets.

@scbizu
Last active March 14, 2019 18:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save scbizu/6650f1a286dfb544701552303af3d0a4 to your computer and use it in GitHub Desktop.
Save scbizu/6650f1a286dfb544701552303af3d0a4 to your computer and use it in GitHub Desktop.
A disque lock sample
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/zencoder/disque-go/disque"
)
// see what happened in http://ww1.sinaimg.cn/large/005uxZXHly1g12w946z64j3132184k3y.jpg
func main() {
p, d, err := getDisqueClient()
if err != nil {
log.Printf("disque get client failed:%s", err.Error())
return
}
defer p.Put(d) // return a connection to the pool
// Push with default settings
queueName := "resource_semaphore"
jobDetails := "job"
timeout := time.Second // take no long than 1 second to enqueue the message
_, err = d.Push(queueName, jobDetails, timeout)
if err != nil {
log.Fatalf("disque push: %s", err.Error())
}
gps := make(chan int, 20)
for i := 0; i < 20; i++ {
go func(seq int) {
log.Printf("Now at %d seq.", seq)
hi(seq)
gps <- 0
}(i)
}
for i := 0; i < 20; i++ {
<-gps
}
}
func hi(seq int) {
p, d, err := getDisqueClient()
if err != nil {
log.Printf("[%d]disque get client failed:%s", seq, err.Error())
return
}
defer p.Put(d) // return a connection to the pool
var job *disque.Job
f:
job, err = d.Fetch("resource_semaphore", time.Second)
if err != nil {
log.Printf("[%d]disque fetch failed: %s", seq, err.Error())
return
}
if job == nil {
goto f
}
fmt.Printf("[%d]say hi to %s\n", seq, job.Message)
d.Nack(job.JobID)
}
func getDisqueClient() (*disque.Pool, *disque.Disque, error) {
hosts := []string{"127.0.0.1:7711"} // array of 1 or more Disque servers
cycle := 1000 // check connection stats every 1000 Fetch's
capacity := 1 // initial capacity of the pool
maxCapacity := 1 // max capacity that the pool can be resized to
idleTimeout := 15 * time.Minute // timeout for idle connections
var p *disque.Pool
p = disque.NewPool(hosts, cycle, capacity, maxCapacity, idleTimeout)
var d *disque.Disque
var err error
d, err = p.Get(context.Background()) // get a connection from the pool
if err != nil {
return nil, nil, err
}
return p, d, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment