Created
January 12, 2017 08:50
-
-
Save urakozz/696fe68209e2e283b3c78a0a9522c62d to your computer and use it in GitHub Desktop.
Go Context, parallel computing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import "fmt" | |
import ( | |
"context" | |
"time" | |
"math/rand" | |
"sync" | |
) | |
// A message processes parameter and returns the result on responseChan. | |
// ctx is places in a struct, but this is ok to do. | |
type message struct { | |
responseChan chan <- int | |
parameter string | |
ctx context.Context | |
} | |
func ProcessMessages(work <-chan message) { | |
for job := range work { | |
fmt.Println("range job, select 1") | |
select { | |
// If the context is finished, don't bother processing the | |
// message. | |
case <-job.ctx.Done(): | |
fmt.Println("range job, select 1, ctx done") | |
continue | |
default: | |
fmt.Println("range job, select 1, default") | |
} | |
// Assume this takes a long time to calculate | |
t := rand.Int63n(1000) | |
fmt.Println("range job, sleep", t) | |
time.Sleep(time.Duration(t) * time.Millisecond) | |
fmt.Println("range job, sleeped") | |
hardToCalculate := len(job.parameter) | |
fmt.Println("range job, select 2") | |
select { | |
case <-job.ctx.Done(): | |
fmt.Println("range job, select 2, ctx done") | |
case job.responseChan <- hardToCalculate: | |
fmt.Println("range job, select 2, write response") | |
} | |
} | |
} | |
func newRequest(ctx context.Context, input string, q chan <- message) { | |
r := make(chan int) | |
fmt.Println("newRequest: select 1") | |
select { | |
// If the context finishes before we can send msg onto q, | |
// exit early | |
case <-ctx.Done(): | |
fmt.Println("Context ended before q could see message") | |
return | |
case q <- message{ | |
responseChan: r, | |
parameter: input, | |
// We are placing a context in a struct. This is ok since it | |
// is only stored as a passed message and we want q to know | |
// when it can discard this message | |
ctx: ctx, | |
}: | |
fmt.Println("newRequest: select 1, send message to q") | |
} | |
func main() { | |
w := NewWrapper() | |
w.Set(&Entity{ID:123, Name:"testname", Price:.01}) | |
wg := sync.WaitGroup{} | |
wg.Add(4) | |
for i := 0; i < 4; i++ { | |
go func() { | |
defer wg.Done() | |
e := w.Get(int64(123)) | |
fmt.Println("> main, get", e) | |
}() | |
} | |
wg.Wait() | |
} | |
type Entity struct { | |
ID int64 | |
Name string | |
Price float64 | |
} | |
type Cache struct { | |
storage map[int64]*Entity | |
} | |
type StorageProvider interface { | |
Set(e *Entity) | |
Get(ID int64) *Entity | |
} | |
func NewCache() StorageProvider { | |
return &Cache{storage:make(map[int64]*Entity)} | |
} | |
func (c *Cache) Set(e *Entity) { | |
c.storage[e.ID] = e | |
} | |
func (c *Cache) Get(ID int64) *Entity { | |
if e, ok := c.storage[ID]; ok { | |
return e | |
} | |
return nil | |
} | |
type DBAdapter struct { | |
Cache StorageProvider | |
sync.Mutex | |
} | |
func NewDBAdapter() StorageProvider { | |
return &DBAdapter{Cache:NewCache()} | |
} | |
func (d *DBAdapter) Set(e *Entity) { | |
d.Lock() | |
d.Cache.Set(e) | |
d.Unlock() | |
} | |
func (d *DBAdapter) Get(ID int64) *Entity { | |
d.Lock() | |
defer d.Unlock() | |
rand.Seed(time.Now().UnixNano()) | |
t := rand.Int63n(1000) | |
fmt.Println("db adapter sleep", t) | |
time.Sleep(time.Duration(t) * time.Millisecond) | |
fmt.Println("db adapter sleeped") | |
// call | |
return d.Cache.Get(ID) | |
} | |
type workMessage struct { | |
responseChan chan *Entity | |
ID int64 | |
ctx context.Context | |
waitChans chan chan *Entity | |
} | |
func (m *workMessage) Broadcast(e *Entity) { | |
for ch := range m.waitChans { | |
ch <- e | |
} | |
} | |
type Wrapper struct { | |
sync.RWMutex | |
inProcess map[int64]*workMessage | |
cache StorageProvider | |
db StorageProvider | |
waitTimeLimit time.Duration | |
} | |
func NewWrapper() StorageProvider { | |
context.Background() | |
w := &Wrapper{ | |
inProcess: make(map[int64]*workMessage), | |
cache: NewCache(), | |
db: NewDBAdapter(), | |
waitTimeLimit: 500 * time.Millisecond, | |
} | |
return w | |
} | |
func (d *Wrapper) Set(e *Entity) { | |
d.Lock() | |
// pretend cache doesn't stores values | |
// d.cache.Set(e) | |
d.Unlock() | |
d.db.Set(e) | |
} | |
func (d *Wrapper) Get(ID int64) *Entity { | |
e := d.cache.Get(ID) | |
if e != nil { | |
return e | |
} | |
fmt.Println("G0 lock") | |
d.Lock() | |
msg, ok := d.inProcess[ID] | |
if ok { | |
d.Unlock() | |
fmt.Println("G1.1 unlock and Wait here ", msg) | |
ch := make(chan *Entity) | |
msg.waitChans <- ch | |
fmt.Println("G1.2 chan to chans ", msg) | |
e = <- ch | |
fmt.Println("G1.3 got it ", e) | |
return e | |
} | |
if !ok { | |
fmt.Println("G2.1 get from db") | |
ctx, cancel := context.WithTimeout(context.Background(), d.waitTimeLimit) | |
defer cancel() | |
job := &workMessage{make(chan *Entity, 1), ID, ctx, make(chan chan *Entity)} | |
d.inProcess[ID] = job | |
d.Unlock() | |
fmt.Println("G2.2 unlock") | |
e = d.newDbRequest(job) | |
fmt.Println("G2.3 broadcast") | |
go job.Broadcast(e) | |
fmt.Println("G2.4 unlock") | |
d.Lock() | |
delete(d.inProcess, ID) | |
d.Unlock() | |
fmt.Println("G2.5 unlock") | |
return e | |
} | |
return d.cache.Get(ID) | |
} | |
func (d *Wrapper) newDbRequest(job *workMessage) *Entity { | |
defer close(job.responseChan) | |
go d.dbProcessMessages(job.ctx, job) | |
fmt.Println("r1 newRequest: select") | |
select { | |
case out := <-job.responseChan: | |
fmt.Printf("r2 return %+v\n", out) | |
return out | |
// If the context finishes before we could get the result, exit early | |
case <-job.ctx.Done(): | |
fmt.Println("r3 Context ended before processing message. Err:", job.ctx.Err()) | |
} | |
return nil | |
} | |
func (d *Wrapper) dbProcessMessages(ctx context.Context, job *workMessage) { | |
fmt.Println("j1, select 1") | |
select { | |
// If the context is finished, don't bother processing the | |
// message. | |
case <-ctx.Done(): | |
fmt.Println("j2, select 1, ctx done") | |
return | |
default: | |
fmt.Println("j3, select 1, default") | |
} | |
e := d.db.Get(job.ID) | |
fmt.Println("j1, select 2", e) | |
select { | |
case <-ctx.Done(): | |
fmt.Println("j4, select 2, ctx done") | |
case job.responseChan <- e: | |
fmt.Println("j5, select 2, write response") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment