Created
March 16, 2017 06:05
-
-
Save cloverstd/cde14b9da199df08df0eee55e048b7bb to your computer and use it in GitHub Desktop.
Workerpool with golang channel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"container/list" | |
"fmt" | |
"log" | |
"net/http" | |
"strconv" | |
"sync" | |
"time" | |
) | |
type Storage struct { | |
l sync.RWMutex | |
Capacity int | |
container *list.List | |
cargo chan int | |
Free int | |
} | |
func NewStorage(cap int) *Storage { | |
return &Storage{ | |
Capacity: cap, | |
Free: cap, | |
container: list.New(), | |
cargo: make(chan int), | |
} | |
} | |
func (s *Storage) Put(cargo int) bool { | |
s.l.Lock() | |
defer s.l.Unlock() | |
if s.Free == 0 || s.Capacity == s.container.Len() { | |
return false | |
} | |
s.container.PushBack(cargo) | |
s.Free -= 1 | |
fmt.Printf("storage free %d\n", s.Free) | |
go s.dispatch(cargo) | |
return true | |
} | |
func (s *Storage) dispatch(cargo int) { | |
s.cargo <- cargo | |
} | |
func (s *Storage) Get() int { | |
s.l.Lock() | |
defer s.l.Unlock() | |
var item interface{} | |
first := s.container.Front() | |
if first != nil { | |
item = s.container.Remove(first) | |
s.Free += 1 | |
} | |
value, ok := item.(int) | |
if !ok { | |
value = 0 | |
} | |
return value | |
} | |
type Task interface { | |
Run() error | |
} | |
type Worker struct { | |
Name string | |
task chan Task | |
idle bool | |
isStart bool | |
wg sync.WaitGroup | |
quit chan bool | |
} | |
func (worker *Worker) Start(factory *Factory) error { | |
if worker.isStart { | |
return fmt.Errorf("worker %s has start", worker.Name) | |
} | |
worker.isStart = true | |
go func() { | |
for { | |
fmt.Printf("worker %s wait task\n", worker.Name) | |
factory.worker <- worker | |
select { | |
case task := <-worker.task: | |
worker.do(task) | |
case q := <-worker.quit: | |
if q { | |
return | |
} | |
} | |
} | |
}() | |
return nil | |
} | |
func (worker *Worker) do(task Task) { | |
task.Run() | |
} | |
func (worker *Worker) stop() { | |
worker.quit <- true | |
} | |
type Factory struct { | |
MaxWorker int | |
task chan Task | |
workerList []*Worker | |
worker chan *Worker | |
storage *Storage | |
} | |
func NewFactory(maxWorker int) *Factory { | |
factory := Factory{ | |
MaxWorker: maxWorker, | |
workerList: make([]*Worker, 0, maxWorker), | |
task: make(chan Task, maxWorker), | |
worker: make(chan *Worker), | |
storage: NewStorage(maxWorker * 5), | |
} | |
for i := 0; i < maxWorker; i++ { | |
factory.workerList = append(factory.workerList, &Worker{ | |
Name: fmt.Sprintf("%d", i), | |
task: make(chan Task), | |
quit: make(chan bool), | |
}) | |
factory.workerList[i].Start(&factory) | |
} | |
return &factory | |
} | |
func (factory *Factory) Dispatch() { | |
go func() { | |
fmt.Println("dispatch task start") | |
for { | |
select { | |
case <-factory.storage.cargo: | |
go func() { | |
sleep := factory.storage.Get() | |
if sleep <= 0 { | |
return | |
} | |
fmt.Println("dispatch task") | |
task := SleepTask{sleep} | |
worker := <-factory.worker | |
worker.task <- task | |
fmt.Println("dispatch task done") | |
}() | |
} | |
} | |
}() | |
} | |
type SleepTask struct { | |
sleep int | |
} | |
func (task SleepTask) Run() error { | |
time.Sleep(time.Duration(task.sleep) * time.Second) | |
fmt.Printf("task sleep %d done\n", task.sleep) | |
return nil | |
} | |
var ( | |
factory *Factory | |
) | |
func main() { | |
factory = NewFactory(5) | |
factory.Dispatch() | |
if err := server(); err != nil { | |
log.Println(err) | |
} | |
} | |
func server() error { | |
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { | |
sleepStr := r.URL.Path[1:] | |
sleep, err := strconv.Atoi(sleepStr) | |
if err != nil { | |
fmt.Fprintf(w, "error %s", err) | |
} else { | |
if factory.storage.Put(sleep) { | |
fmt.Fprintf(w, "ok %d", sleep) | |
} else { | |
fmt.Fprintln(w, "满了") | |
} | |
} | |
}) | |
return http.ListenAndServe(":8888", nil) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment