Skip to content

Instantly share code, notes, and snippets.

@goldeneggg
Created August 13, 2014 03:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save goldeneggg/ba5e08fa2cf8a646ac77 to your computer and use it in GitHub Desktop.
Save goldeneggg/ba5e08fa2cf8a646ac77 to your computer and use it in GitHub Desktop.
負荷やスループットを意識してgoroutineを実行する ref: http://qiita.com/jpshadowapps/items/a49c448d5e6b5f45f754
type LimitExector interface {
// 同時処理数に上限を設ける
// 上限を超えるリクエストは処理しない
ByMax(req ReqEntity)
// 1秒間の同時処理数に上限を設ける
// 上限を超えるリクエストは1秒待つ
ByMaxPerSecond(req ReqEntity)
// 処理に実行間隔を設ける => x秒間隔に1回処理を行う
ByTickerDuration(req ReqEntity)
// 処理に実行間隔を設ける => x秒間隔に1回処理を行う
// 瞬間的な高負荷(=burst)への対応として、先頭200リクエストまでは一気に処理する事を許可する
ByTickerDurationWithBurst(req ReqEntity)
}
// どんな制限を掛けるか?を指定するパラメータ用struct
type LimitParam struct {
BurstLimit int
DurationNanoSec int
}
// 制限を掛ける為のオブジェクト(ここではチャネル)を保持するstruct
// このstructにLimitExectorインタフェースのメソッドを実装する
type limit struct {
capacity chan time.Time // 最大同時処理数をコントロールするチャネル
ticker <-chan time.Time // 処理間隔をコントロールするチャネル
}
// LimitExectorのインスタンスを取得する
func NewLimitExector(limitParam LimitParam) LimitExector {
lmt := limit{}
if limitParam.BurstLimit > 0 {
lmt.capacity = make(chan time.Time, limitParam.BurstLimit)
for i := 0; i < limitParam.BurstLimit; i++ {
lmt.capacity <- time.Now()
}
}
if limitParam.DurationNanoSec > 0 {
lmt.ticker = time.Tick(time.Nanosecond * time.Duration(limitParam.DurationNanoSec))
}
return lmt
}
for req := range <リクエスト群> {
limitExector.ByXXXX(req)
}
func (lmt limit) ByMax(req ReqEntity) {
select {
case <-lmt.capacity:
go lmt.kickService(req)
default:
// 処理量オーバーの場合
// 実際の例では sorryサーバ的なトコに飛ばす とかありそう
fmt.Println("Capacity Over ByMax", req.Id)
}
}
func (lmt limit) kickService(req ReqEntity) {
if lmt.capacity != nil {
// 完了したらcapacityを再補充する
defer func() {
lmt.capacity <- time.Now()
fmt.Println("*refilled capacity")
}()
}
:
:
func (lmt limit) ByMaxPerSecond(req ReqEntity) {
select {
case <-lmt.capacity:
go lmt.kickService(req)
default:
// 処理量オーバーの場合
// 1秒待って再挑戦
time.Sleep(time.Second)
select {
case <-lmt.capacity:
go lmt.kickService(req)
default:
fmt.Println("Capacity Over ByMaxPerSecond", req.Id)
}
}
}
func (lmt limit) ByTickerDuration(req ReqEntity) {
// 指定した実行間隔を空けてサービスを起動する
<-lmt.ticker
go lmt.kickService(req)
}
func (lmt limit) ByTickerDurationWithBurst(req ReqEntity) {
select {
case <-lmt.capacity:
// 指定した最大同時処理量のサービスを同時起動する
go lmt.kickService(req)
case t := <-lmt.ticker:
// burstが起こっていれば指定したtickerの間隔で捌く
// 完了後のcapacityの補充は行わない
go lmt.kickServiceAndRefillCap(req, false)
fmt.Println("process by ticker", req.Id, t)
}
}
:
:
func (lmt limit) kickService(req ReqEntity) {
lmt.kickServiceAndRefillCap(req, true)
}
func (lmt limit) kickServiceAndRefillCap(req ReqEntity, isRefillCap bool) {
if lmt.capacity != nil && isRefillCap {
// 完了したらcapacityを補充する
defer func() {
lmt.capacity <- time.Now()
fmt.Println("*refilled capacity")
}()
}
:
:
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment