Skip to content

Instantly share code, notes, and snippets.

@cloverstd
Created March 16, 2017 06:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cloverstd/cde14b9da199df08df0eee55e048b7bb to your computer and use it in GitHub Desktop.
Save cloverstd/cde14b9da199df08df0eee55e048b7bb to your computer and use it in GitHub Desktop.
Workerpool with golang channel
package main
import (
"container/list"
"fmt"
"log"
"net/http"
"strconv"
"sync"
"time"
)
type Storage struct {
l sync.RWMutex
Capacity int
container *list.List
cargo chan int
Free int
}
func NewStorage(cap int) *Storage {
return &Storage{
Capacity: cap,
Free: cap,
container: list.New(),
cargo: make(chan int),
}
}
func (s *Storage) Put(cargo int) bool {
s.l.Lock()
defer s.l.Unlock()
if s.Free == 0 || s.Capacity == s.container.Len() {
return false
}
s.container.PushBack(cargo)
s.Free -= 1
fmt.Printf("storage free %d\n", s.Free)
go s.dispatch(cargo)
return true
}
func (s *Storage) dispatch(cargo int) {
s.cargo <- cargo
}
func (s *Storage) Get() int {
s.l.Lock()
defer s.l.Unlock()
var item interface{}
first := s.container.Front()
if first != nil {
item = s.container.Remove(first)
s.Free += 1
}
value, ok := item.(int)
if !ok {
value = 0
}
return value
}
type Task interface {
Run() error
}
type Worker struct {
Name string
task chan Task
idle bool
isStart bool
wg sync.WaitGroup
quit chan bool
}
func (worker *Worker) Start(factory *Factory) error {
if worker.isStart {
return fmt.Errorf("worker %s has start", worker.Name)
}
worker.isStart = true
go func() {
for {
fmt.Printf("worker %s wait task\n", worker.Name)
factory.worker <- worker
select {
case task := <-worker.task:
worker.do(task)
case q := <-worker.quit:
if q {
return
}
}
}
}()
return nil
}
func (worker *Worker) do(task Task) {
task.Run()
}
func (worker *Worker) stop() {
worker.quit <- true
}
type Factory struct {
MaxWorker int
task chan Task
workerList []*Worker
worker chan *Worker
storage *Storage
}
func NewFactory(maxWorker int) *Factory {
factory := Factory{
MaxWorker: maxWorker,
workerList: make([]*Worker, 0, maxWorker),
task: make(chan Task, maxWorker),
worker: make(chan *Worker),
storage: NewStorage(maxWorker * 5),
}
for i := 0; i < maxWorker; i++ {
factory.workerList = append(factory.workerList, &Worker{
Name: fmt.Sprintf("%d", i),
task: make(chan Task),
quit: make(chan bool),
})
factory.workerList[i].Start(&factory)
}
return &factory
}
func (factory *Factory) Dispatch() {
go func() {
fmt.Println("dispatch task start")
for {
select {
case <-factory.storage.cargo:
go func() {
sleep := factory.storage.Get()
if sleep <= 0 {
return
}
fmt.Println("dispatch task")
task := SleepTask{sleep}
worker := <-factory.worker
worker.task <- task
fmt.Println("dispatch task done")
}()
}
}
}()
}
type SleepTask struct {
sleep int
}
func (task SleepTask) Run() error {
time.Sleep(time.Duration(task.sleep) * time.Second)
fmt.Printf("task sleep %d done\n", task.sleep)
return nil
}
var (
factory *Factory
)
func main() {
factory = NewFactory(5)
factory.Dispatch()
if err := server(); err != nil {
log.Println(err)
}
}
func server() error {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
sleepStr := r.URL.Path[1:]
sleep, err := strconv.Atoi(sleepStr)
if err != nil {
fmt.Fprintf(w, "error %s", err)
} else {
if factory.storage.Put(sleep) {
fmt.Fprintf(w, "ok %d", sleep)
} else {
fmt.Fprintln(w, "满了")
}
}
})
return http.ListenAndServe(":8888", nil)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment