Skip to content

Instantly share code, notes, and snippets.

@jhartman86
Created June 6, 2018 19:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jhartman86/9fe89a4ee0ece98b27d3274f171024f6 to your computer and use it in GitHub Desktop.
Save jhartman86/9fe89a4ee0ece98b27d3274f171024f6 to your computer and use it in GitHub Desktop.
optimistic fan-out concurrency job queue
package queue
import (
"fmt"
"log"
"sync"
"time"
"github.com/jhartman86/conduit/pkg/system/database"
)
type Worker func(p interface{}) (msg string, err error)
type Config struct {
Key string
Concurrency int
BatchLoadSize int
Interval int
Consumer Worker
// notice this is *unexported*; its set internally to true only
// if the configuration is valid (and defaults to false)
valid bool
}
/*
Kue is an encompassing struct that contains all the settings and
handlers for running a Kue.
*/
type kue struct {
config Config
running bool
iterations int
locker sync.RWMutex
}
/*
WorkUnit would otherwise be an unexported type, if we didn't need to use it
during database migrations for the models setup; it should be considered a
representation for *internal* payloads used by the Queue.
*/
type WorkUnit struct {
ID string `gorm:"primary_key;default:from_uuid(uuid_v4())"`
Status string `gorm:"not null;index"`
Key string `gorm:"not null;index"`
Payload string `gorm:"not null"`
ResponseMessage database.NullString `gorm:"default:null"`
ResponseError database.NullString `gorm:"default:null"`
CreatedAt time.Time
UpdatedAt time.Time
}
/*
New returns a configured kue, but does error checking first to ensure
the configs are valid.
*/
func New(c Config) (k kue, err error) {
if c.Concurrency < 1 {
err = fmt.Errorf(`Queues cannot be created with concurrency less than 1`)
return
}
c.valid = true
k.config = c
return
}
/*
Start is the public entry to begin the long running blocking loop
*/
func (k *kue) Start() error {
if !k.config.valid {
return fmt.Errorf(`Queue with invalid configuration will not be started`)
}
if k.IsRunning() {
return fmt.Errorf(`Queue is already running`)
}
k.setRunning(true)
return k.next()
}
/*
Stop will set the running property to set, which has the positive
consequence of *letting the currently in-flight next iterations*
complete before trying to stop the next() loop.
*/
func (k *kue) Stop() {
k.setRunning(false)
}
/*
IsRunning atomically checks whether the queue should continue
running; it *does not* check whether there is a currently in
flight - but as of yet unfinished - set of workers running in
the current next() iteration.
*/
func (k *kue) IsRunning() bool {
k.locker.Lock()
defer k.locker.Unlock()
return k.running
}
/*
Only used internally and put into a method so the locking
mechanics can be normalized.
*/
func (k *kue) setRunning(to bool) {
k.locker.Lock()
defer k.locker.Unlock()
k.running = to
}
/*
Next is a recursive call which handles:
- Fanning out to the number of goroutines set by the concurrency level
- Each routine listens for sends on the pipeline channel, and handling
the results by invoking the Consumer func passed to the Kue config
- Waiting until all goroutines have completed
- ... pausing ... then invoking next() again
*/
func (k *kue) next() error {
// Increment and log the iteration we're on
k.iterations = k.iterations + 1 // @todo: definite race type condition; use atomic counter
log.Printf("\n\n--- Queue: %s [iteration: %d] ---\n\n", k.config.Key, k.iterations)
// Initialize a wait group instance to ensure concurrent routines
// complete before moving to the next iteration
var wg sync.WaitGroup
// Create a pipeline of *up to* the batch size to send jobs into
pipeline := make(chan WorkUnit, k.config.BatchLoadSize)
// Create config.Concurrency number of go routines, each of which
// will range over the pipeline and accept requests to send to
// the Consumers. In each routine, when its no longer possible to
// range over the pipeline, we invoke wg.Done() and let the goroutine
// go to GC
for i := 1; i <= k.config.Concurrency; i++ {
// Add to the waitgroup
wg.Add(1)
// Invoke goroutine and pass requisite arguments in to ensure scoping!
go func(w Worker, pl chan WorkUnit) {
defer wg.Done()
for job := range pl {
// @todo capture results of syncronously invoked func and send back out to a pipeline
// --OR--, syncronously call a success/error handler on the config
// which should also be blocking...
w(job.Payload)
}
}(k.config.Consumer, pipeline)
}
// The for loop above create a bunch of non-blocking routines, so
// by the time we get to here, we can pass the pipeline to the
// loader which will actually send jobs
k.batchLoader(pipeline)
// Wait for all routines to complete
wg.Wait() // <-pipeline @todo: block on the pipeline instead of waitgroups???
// NOW we check to see if the queue should still be running,
// and IF NOT, we don't invoke the next() loop again (also
// don't need to do a sleep interval)
if !k.IsRunning() {
return nil
}
// Take a break for a set interval
time.Sleep(time.Duration(k.config.Interval) * time.Second)
// Start the next iteration, incrementing the value + 1
return k.next()
}
/*
Batch loader, in charge or querying for the next set of things to
work on.
*/
func (k *kue) batchLoader(c chan WorkUnit) {
for i := 1; i <= k.config.BatchLoadSize; i++ {
c <- WorkUnit{Payload: fmt.Sprintf(`id-%d`, i)}
}
close(c)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment