Skip to content

Instantly share code, notes, and snippets.

@huandu
Created January 28, 2015 11:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save huandu/66837eba3ee039505d5b to your computer and use it in GitHub Desktop.
Save huandu/66837eba3ee039505d5b to your computer and use it in GitHub Desktop.
一个简单的并发可控、任务可随意拼接的任务队列实现
// 一个简单的并发可控、任务可随意拼接的任务队列实现。
// 仅作概念演示用,细节不要纠结。
//
// 基本结构:
// Context:所有任务共享的上下文,任务通过上下文交换数据
// Dispatcher:任务队列管理器,负责创建 Context 并把它放入合适的工作队列
// Worker:任务的抽象接口
// XXXWorker:各个具体的任务处理逻辑
package main
import (
"fmt"
"time"
)
type JobId string
type JobData string
type WorkerFactory func() Worker
type WorkerConfig struct {
Name JobId
Factory WorkerFactory
Count int // 需要启动的 worker 数量
}
// 所有的任务都会读取 Context 的内容,所以这个结构会很大。
// 当它变得过于复杂的时候需要重构,不过这就不是现在讨论的问题了。
type Context struct {
Jobs []JobId
// 各种可能被用到的字段
Data JobData
Foo string
Bar string
Player string
}
// 任务调度器
type Dispatcher struct {
done chan bool
jobChannels map[JobId]*JobChannels
}
type JobChannels struct {
input chan *Context
output chan *Context
}
// Worker 的接口
type Worker interface {
Work(input <-chan *Context, output chan<- *Context)
}
// 各种 worker
type FooWorker struct{}
type BarWorker struct{}
type PlayerWorker struct{}
func main() {
fmt.Println("starting...")
dispatcher := NewDispatcher()
// 这里用来演示通过网络异步收到 job 的情况
go func() {
job1 := []JobId{"foo", "bar", "player"}
job2 := []JobId{"foo", "player"} // 跳过 bar
job3 := []JobId{"bar", "foo"} // 逆序
// 执行任务,每个任务可以带一个自定义数据,现在先简单用 string,未来应该根据设计
dispatcher.Dispatch(job1, "job1")
dispatcher.Dispatch(job2, "job2")
dispatcher.Dispatch(job3, "job3")
time.Sleep(time.Second)
dispatcher.Stop()
}()
dispatcher.Start()
}
func NewDispatcher() *Dispatcher {
return &Dispatcher{
done: make(chan bool),
}
}
var workerConfig = []*WorkerConfig{
&WorkerConfig{"foo", NewFooWorker, 1},
&WorkerConfig{"bar", NewBarWorker, 2},
&WorkerConfig{"player", NewPlayerWorker, 3},
}
func (d *Dispatcher) Start() {
d.jobChannels = make(map[JobId]*JobChannels)
// 启动足够数量的 worker
for _, config := range workerConfig {
channels := &JobChannels{
input: make(chan *Context),
output: make(chan *Context),
}
d.jobChannels[config.Name] = channels
for i := 0; i < config.Count; i++ {
worker := config.Factory()
go worker.Work(channels.input, channels.output)
}
}
// 做输入输出的调度工作
for _, channels := range d.jobChannels {
go d.monitor(channels.output)
}
<-d.done
}
func (d *Dispatcher) monitor(output <-chan *Context) {
for ctx := range output {
go d.dispatch(ctx)
}
}
func (d *Dispatcher) dispatch(ctx *Context) {
// 所有任务都完成了
if len(ctx.Jobs) == 0 {
fmt.Println("job is done! Name:", ctx.Data, "Data:", *ctx)
return
}
// 把 ctx 放入合适的任务队列,开始执行任务
job := ctx.Jobs[0]
ctx.Jobs = ctx.Jobs[1:]
channels := d.jobChannels[job]
channels.input <- ctx
}
func (d *Dispatcher) Stop() {
d.done <- true
}
func (d *Dispatcher) Dispatch(jobs []JobId, data JobData) {
// 首先初始化一个上下文
ctx := &Context{
Jobs: jobs,
Data: data,
}
// 开始派发任务
d.dispatch(ctx)
}
func NewFooWorker() Worker {
return &FooWorker{}
}
func NewBarWorker() Worker {
return &BarWorker{}
}
func NewPlayerWorker() Worker {
return &PlayerWorker{}
}
func (foo *FooWorker) Work(input <-chan *Context, output chan<- *Context) {
for ctx := range input {
fmt.Println("Worker foo: current job name is", ctx.Data)
ctx.Foo = "foo-done"
output <- ctx
}
}
func (bar *BarWorker) Work(input <-chan *Context, output chan<- *Context) {
for ctx := range input {
fmt.Println("Worker bar: current job name is", ctx.Data)
ctx.Bar = "bar-done"
output <- ctx
}
}
func (player *PlayerWorker) Work(input <-chan *Context, output chan<- *Context) {
for ctx := range input {
fmt.Println("Worker player: current job name is", ctx.Data)
ctx.Player = "player-done"
output <- ctx
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment