Skip to content

Instantly share code, notes, and snippets.

@tanishiking
Last active February 8, 2023 14:21
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tanishiking/be9fac4aa02419d68c6770a85e53c936 to your computer and use it in GitHub Desktop.
Save tanishiking/be9fac4aa02419d68c6770a85e53c936 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"sync"
)
// Dispatcher represents a job dispatcher.
type Dispatcher struct {
sem chan struct{} // semaphore
jobBuffer chan *Job
worker Worker
wg sync.WaitGroup
}
// NewDispatcher will create a new instance of job dispatcher.
// maxWorkers means the maximum number of goroutines that can work concurrently.
// buffers means the maximum size of the queue.
func NewDispatcher(worker Worker, maxWorkers int, buffers int) *Dispatcher {
return &Dispatcher{
// Restrict the number of goroutine using buffered channel (as counting semaphor)
sem: make(chan struct{}, maxWorkers),
jobBuffer: make(chan *Job, buffers),
worker: worker,
}
}
// Start starts a dispatcher.
// This dispatcher will stops when it receive a value from `ctx.Done`.
func (d *Dispatcher) Start(ctx context.Context) {
d.wg.Add(1)
go d.loop(ctx)
}
// Wait blocks until the dispatcher stops.
func (d *Dispatcher) Wait() {
d.wg.Wait()
}
// Add enqueues a job into the queue.
// If the number of enqueued jobs has already reached to the maximum size,
// this will block until the other job has finish and the queue has space to accept a new job.
func (d *Dispatcher) Add(job *Job) {
d.jobBuffer <- job
}
func (d *Dispatcher) stop() {
d.wg.Done()
}
func (d *Dispatcher) loop(ctx context.Context) {
var wg sync.WaitGroup
Loop:
for {
select {
case <-ctx.Done():
// block until all the jobs finishes
wg.Wait()
break Loop
case job := <-d.jobBuffer:
// Increment the waitgroup
wg.Add(1)
// Decrement a semaphore count
d.sem <- struct{}{}
go func(job *Job) {
defer wg.Done()
// After the job finished, increment a semaphore count
defer func() { <-d.sem }()
d.worker.Work(job)
}(job)
}
}
d.stop()
}
package main
// Job represents a interface of job that can be enqueued into a dispatcher.
type Job struct {
URL string
}
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
sigCh := make(chan os.Signal, 1)
defer close(sigCh)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGINT)
go func() {
// wait until receiving the signal
<-sigCh
cancel()
}()
p := NewPrinter()
d := NewDispatcher(p, 10, 1000)
d.Start(ctx)
for i := 0; i < 100; i++ {
url := fmt.Sprintf("http://example.com/%d", i)
job := &Job{URL: url}
d.Add(job)
}
d.Wait()
}
package main
import (
"fmt"
"math/rand"
"time"
)
// Printer is a dummy worker that just prints received URL.
type Printer struct{}
func NewPrinter() *Printer {
return &Printer{}
}
// Work waits for a few seconds and print a received URL.
func (p *Printer) Work(j *Job) {
t := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second)
defer t.Stop()
<-t.C
fmt.Println(j.URL)
}
package main
type Worker interface {
Work(j *Job)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment