Skip to content

Instantly share code, notes, and snippets.

Last active February 8, 2023 14:21
Show Gist options
  • Save tanishiking/be9fac4aa02419d68c6770a85e53c936 to your computer and use it in GitHub Desktop.
Save tanishiking/be9fac4aa02419d68c6770a85e53c936 to your computer and use it in GitHub Desktop.
package main
import (
// Dispatcher represents a job dispatcher.
type Dispatcher struct {
sem chan struct{} // semaphore
jobBuffer chan *Job
worker Worker
wg sync.WaitGroup
// NewDispatcher will create a new instance of job dispatcher.
// maxWorkers means the maximum number of goroutines that can work concurrently.
// buffers means the maximum size of the queue.
func NewDispatcher(worker Worker, maxWorkers int, buffers int) *Dispatcher {
return &Dispatcher{
// Restrict the number of goroutine using buffered channel (as counting semaphor)
sem: make(chan struct{}, maxWorkers),
jobBuffer: make(chan *Job, buffers),
worker: worker,
// Start starts a dispatcher.
// This dispatcher will stops when it receive a value from `ctx.Done`.
func (d *Dispatcher) Start(ctx context.Context) {
go d.loop(ctx)
// Wait blocks until the dispatcher stops.
func (d *Dispatcher) Wait() {
// Add enqueues a job into the queue.
// If the number of enqueued jobs has already reached to the maximum size,
// this will block until the other job has finish and the queue has space to accept a new job.
func (d *Dispatcher) Add(job *Job) {
d.jobBuffer <- job
func (d *Dispatcher) stop() {
func (d *Dispatcher) loop(ctx context.Context) {
var wg sync.WaitGroup
for {
select {
case <-ctx.Done():
// block until all the jobs finishes
break Loop
case job := <-d.jobBuffer:
// Increment the waitgroup
// Decrement a semaphore count
d.sem <- struct{}{}
go func(job *Job) {
defer wg.Done()
// After the job finished, increment a semaphore count
defer func() { <-d.sem }()
package main
// Job represents a interface of job that can be enqueued into a dispatcher.
type Job struct {
URL string
package main
import (
func main() {
ctx, cancel := context.WithCancel(context.Background())
sigCh := make(chan os.Signal, 1)
defer close(sigCh)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGINT)
go func() {
// wait until receiving the signal
p := NewPrinter()
d := NewDispatcher(p, 10, 1000)
for i := 0; i < 100; i++ {
url := fmt.Sprintf("", i)
job := &Job{URL: url}
package main
import (
// Printer is a dummy worker that just prints received URL.
type Printer struct{}
func NewPrinter() *Printer {
return &Printer{}
// Work waits for a few seconds and print a received URL.
func (p *Printer) Work(j *Job) {
t := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second)
defer t.Stop()
package main
type Worker interface {
Work(j *Job)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment