Skip to content

Instantly share code, notes, and snippets.

@ggzeng
Last active October 23, 2019 05:23
Show Gist options
  • Save ggzeng/1cb3195f8a773d98374d0276ea74c719 to your computer and use it in GitHub Desktop.
Save ggzeng/1cb3195f8a773d98374d0276ea74c719 to your computer and use it in GitHub Desktop.
使用tunny包创建goroutine池
package main
import (
"io/ioutil"
"net/http"
"runtime"
"github.com/Jeffail/tunny"
)
func main() {
numCPUs := runtime.NumCPU()
// 后面的pool.Process中会调用此函数,而process的参数也是此函数的参数
pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
var result []byte
// TODO: Something CPU heavy with payload
return result
})
defer pool.Close()
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
input, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Internal error", http.StatusInternalServerError)
}
defer r.Body.Close()
// Funnel this work into our pool. This call is synchronous and will
// block until the job is completed.
result := pool.Process(input)
w.Write(result.([]byte))
})
http.ListenAndServe(":8080", nil)
}
// 实现callback有两个方法
func printHello() { // 无参数,无返回值
fmt.Println("Hello!")
}
// 方法1
pool3 := tunny.NewFunc(3, func(payload interface{}) interface{} {
f, ok := payload.(func())
if !ok {
return nil
}
f()
return f
})
pool3.Process(printHello)
// 方法2
pool2 := tunny.NewCallback(2) // 省去了自己实现调用函数
var wg sync.WaitGroup
pool2 := tunny.NewCallback(2)
for i:= 0; i < 10; i++ { // 控制并发量的例子
wg.Add(1) // 注意:需要写到go fun外面
go func() {
defer wg.Done()
pool2.Process(printHello)
}()
}
wg.Wait()
fmt.Println("all done")
package main
import (
"fmt"
"github.com/Jeffail/tunny"
)
func printHello(str interface{}) interface{} {
fmt.Println("Hello " + str.(string))
return 1
}
func printHi(str interface{}) interface{} {
fmt.Println("Hi " + str.(string))
return 1
}
type myWorker struct {
processor func(interface{}) interface{}
}
func (w *myWorker) Process(payload interface{}) interface{} { // 执行任务
return w.processor(payload)
}
func (w *myWorker) BlockUntilReady() {} // 在执行任务前执行,相当于init
func (w *myWorker) Interrupt() {} // 在任务执行时被终止时,会执行该函数
func (w *myWorker) Terminate() {} // 当协程被关闭时,执行该函数
func main() {
work1 := new(myWorker)
pool1 := tunny.New(3, func() tunny.Worker {
return work1
})
work1.processor = printHello
pool1.Process("xxx")
work1.processor = printHi
pool1.Process("xxx")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment