Skip to content

Instantly share code, notes, and snippets.

@CAFxX
Created August 25, 2021 00:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save CAFxX/6957aabdd929231c9785575e9d13d606 to your computer and use it in GitHub Desktop.
Save CAFxX/6957aabdd929231c9785575e9d13d606 to your computer and use it in GitHub Desktop.
package main
import "sync"
import "fmt"
type Work interface {
Process()
}
type WorkFunc func()
func (wf WorkFunc) Process() {
wf()
}
type Order uint8
const (
Unspecified Order = iota
FIFO
LIFO
)
type Queue[W Work] struct {
MaxWorkers int
MaxQueue int
Order Order
l sync.Mutex
c sync.Cond
nwrk int
q queue[W]
wg sync.WaitGroup
}
func (q *Queue[W]) Enqueue(w W) {
q.l.Lock()
for q.MaxQueue > 0 && q.q.len() >= q.MaxQueue {
q.c.L = &q.l
q.c.Wait()
}
q.q.pushBack(w)
if q.MaxWorkers > 0 && q.nwrk >= q.MaxWorkers {
q.l.Unlock()
return
}
q.wg.Add(1)
go func() {
processing := false
q.nwrk++
defer func() {
if processing {
q.l.Lock()
}
q.nwrk--
q.wg.Done()
q.l.Unlock()
}()
for q.q.len() > 0 {
if q.MaxQueue > 0 && q.q.len() >= q.MaxQueue {
q.c.L = &q.l
q.c.Signal()
}
var w W
if q.Order == FIFO {
w, _ = q.q.popFront()
} else {
w, _ = q.q.popBack()
}
q.l.Unlock()
processing = true
w.Process()
processing = false
q.l.Lock()
}
}()
}
func (q *Queue[W]) Wait() {
q.wg.Wait()
}
type queue[T any] struct {
q []T
r int
w int
}
func (q *queue[T]) len() int {
return q.w - q.r
}
func (q *queue[T]) pushBack(v T) {
if q.r == q.w {
q.r, q.w = 0, 0
}
if q.w >= len(q.q) {
if q.w-q.r < len(q.q)/2 {
copy(q.q, q.q[q.r:q.w])
} else {
l := len(q.q)
if l == 0 {
l = 1
}
qn := make([]T, l*2)
copy(qn, q.q[q.r:q.w])
q.q = qn
}
q.r, q.w = 0, q.w-q.r
}
q.q[q.w] = v
q.w++
}
func (q *queue[T]) popFront() (T, bool) {
var zero T
if q.r == q.w {
return zero, false
}
v := q.q[q.r]
q.q[q.r] = zero
q.r++
if q.w-q.r < len(q.q)/8 {
qn := make([]T, len(q.q)/4)
copy(qn, q.q[q.r:q.w])
q.q, q.r, q.w = qn, 0, q.w-q.r
}
return v, true
}
func (q *queue[T]) popBack() (T, bool) {
var zero T
if q.r == q.w {
return zero, false
}
q.w--
v := q.q[q.w]
q.q[q.w] = zero
if q.w-q.r < len(q.q)/8 {
qn := make([]T, len(q.q)/4)
copy(qn, q.q[q.r:q.w])
q.q, q.r, q.w = qn, 0, q.w-q.r
}
return v, true
}
func main() {
q := Queue[Work]{MaxWorkers: 4, Order: FIFO, MaxQueue: 4}
for i := 0; i < 10000; i++ {
i := i
q.Enqueue(WorkFunc(func() {
fmt.Println("hello world", i)
}))
}
q.Wait()
}
@CAFxX
Copy link
Author

CAFxX commented Dec 6, 2022

simpler version

package fast

import (
	"sync"
)

type WorkerPool struct {
	Max int
	m   sync.Mutex
	n   int
	q   cbuf
}

func (s *WorkerPool) Go(fn func()) {
	if s.Max <= 0 {
		go fn()
		return
	}
	s.m.Lock()
	if s.n >= s.Max {
		s.q.put(fn)
		s.m.Unlock()
		return
	}
	s.n++
	s.m.Unlock()
	go s.worker(fn)
}

func (s *WorkerPool) worker(fn func()) {
	for {
		fn()

		s.m.Lock()
		var ok bool
		fn, ok = s.q.get()
		if !ok {
			s.n--
			if s.n == 0 {
				// We are the last running worker and we're shutting down
				// because the work queue is empty. Reset the queue (by
				// dropping also any queue storage) so that we don't keep
				// references to previously-enqueued functions alive
				// (preventing them from being GCed), and so that in general
				// we consume no memory while we are idle.
				s.q.reset()
			}
			s.m.Unlock()
			return
		}
		s.m.Unlock()
	}
}

type cbuf struct {
	e []func()
	r int
	w int
}

func (c *cbuf) put(v func()) {
	var w int
	var ne []func()
	if len(c.e) == 0 {
		const minLen = 4
		ne = make([]func(), minLen)
	} else if next(c.w+1, len(c.e)) == c.r {
		ne = make([]func(), len(c.e)*2)
		if c.r < c.w {
			w = copy(ne, c.e[c.r:c.w])
		} else if c.r > c.w {
			w = copy(ne, c.e[c.r:])
			w += copy(ne[w:], c.e[:c.w])
		}
	} else {
		c.e[c.w] = v
		c.w = next(c.w+1, len(c.e))
		return
	}
	ne[w] = v
	c.e, c.r, c.w = ne, 0, w+1
}

func (c *cbuf) get() (func(), bool) {
	if c.r == c.w {
		return nil, false
	}

	v := c.e[c.r]
	c.r = next(c.r+1, len(c.e))
	return v, true
}

func (c *cbuf) reset() {
	*c = cbuf{}
}

func next(n, m int) int {
	if n >= m {
		return n - m
	}
	return n
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment