Skip to content

Instantly share code, notes, and snippets.

@kolo
Created January 6, 2017 08:23
Show Gist options
  • Save kolo/e2b982a072219d5a5a1a4ecec6496f3b to your computer and use it in GitHub Desktop.
Save kolo/e2b982a072219d5a5a1a4ecec6496f3b to your computer and use it in GitHub Desktop.
Example of using amboy job queue.
package main
import (
"context"
"fmt"
"os"
"os/signal"
"time"
"golang.org/x/time/rate"
"github.com/mongodb/amboy"
"github.com/mongodb/amboy/job"
"github.com/mongodb/amboy/queue"
)
func main() {
queue := queue.NewLocalUnordered(2)
ctx, cancel := context.WithCancel(context.Background())
queue.Start(ctx)
producer := &JobProducer{
lim: rate.NewLimiter(1, 1),
q: queue,
}
producer.Start(ctx)
interrupter := &Interrupter{cancel}
interrupter.Watch()
}
type TimeJob struct {
job.Base
}
func (job *TimeJob) ID() string {
return time.Now().Format(time.RFC3339)
}
func (job *TimeJob) Run() {
fmt.Printf("Time is %s\n", time.Now().Format(time.RFC3339))
job.MarkComplete()
}
type JobProducer struct {
lim *rate.Limiter
q amboy.Queue
}
func (p *JobProducer) Start(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("Closing job producer")
return
default:
if ok := p.lim.Allow(); ok {
if err := p.q.Put(&TimeJob{}); err != nil {
fmt.Printf("error: %v\n", err)
}
}
}
}
}()
}
type Interrupter struct {
cancel context.CancelFunc
}
func (i *Interrupter) Watch() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
for _ = range c {
fmt.Println("Ctrl-C...")
i.cancel()
break
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment