Skip to content

Instantly share code, notes, and snippets.

@urakozz
Created January 12, 2017 08:50
Show Gist options
  • Save urakozz/696fe68209e2e283b3c78a0a9522c62d to your computer and use it in GitHub Desktop.
Save urakozz/696fe68209e2e283b3c78a0a9522c62d to your computer and use it in GitHub Desktop.
Go Context, parallel computing
package main
import "fmt"
import (
"context"
"time"
"math/rand"
"sync"
)
// A message processes parameter and returns the result on responseChan.
// ctx is places in a struct, but this is ok to do.
type message struct {
responseChan chan <- int
parameter string
ctx context.Context
}
func ProcessMessages(work <-chan message) {
for job := range work {
fmt.Println("range job, select 1")
select {
// If the context is finished, don't bother processing the
// message.
case <-job.ctx.Done():
fmt.Println("range job, select 1, ctx done")
continue
default:
fmt.Println("range job, select 1, default")
}
// Assume this takes a long time to calculate
t := rand.Int63n(1000)
fmt.Println("range job, sleep", t)
time.Sleep(time.Duration(t) * time.Millisecond)
fmt.Println("range job, sleeped")
hardToCalculate := len(job.parameter)
fmt.Println("range job, select 2")
select {
case <-job.ctx.Done():
fmt.Println("range job, select 2, ctx done")
case job.responseChan <- hardToCalculate:
fmt.Println("range job, select 2, write response")
}
}
}
func newRequest(ctx context.Context, input string, q chan <- message) {
r := make(chan int)
fmt.Println("newRequest: select 1")
select {
// If the context finishes before we can send msg onto q,
// exit early
case <-ctx.Done():
fmt.Println("Context ended before q could see message")
return
case q <- message{
responseChan: r,
parameter: input,
// We are placing a context in a struct. This is ok since it
// is only stored as a passed message and we want q to know
// when it can discard this message
ctx: ctx,
}:
fmt.Println("newRequest: select 1, send message to q")
}
func main() {
w := NewWrapper()
w.Set(&Entity{ID:123, Name:"testname", Price:.01})
wg := sync.WaitGroup{}
wg.Add(4)
for i := 0; i < 4; i++ {
go func() {
defer wg.Done()
e := w.Get(int64(123))
fmt.Println("> main, get", e)
}()
}
wg.Wait()
}
type Entity struct {
ID int64
Name string
Price float64
}
type Cache struct {
storage map[int64]*Entity
}
type StorageProvider interface {
Set(e *Entity)
Get(ID int64) *Entity
}
func NewCache() StorageProvider {
return &Cache{storage:make(map[int64]*Entity)}
}
func (c *Cache) Set(e *Entity) {
c.storage[e.ID] = e
}
func (c *Cache) Get(ID int64) *Entity {
if e, ok := c.storage[ID]; ok {
return e
}
return nil
}
type DBAdapter struct {
Cache StorageProvider
sync.Mutex
}
func NewDBAdapter() StorageProvider {
return &DBAdapter{Cache:NewCache()}
}
func (d *DBAdapter) Set(e *Entity) {
d.Lock()
d.Cache.Set(e)
d.Unlock()
}
func (d *DBAdapter) Get(ID int64) *Entity {
d.Lock()
defer d.Unlock()
rand.Seed(time.Now().UnixNano())
t := rand.Int63n(1000)
fmt.Println("db adapter sleep", t)
time.Sleep(time.Duration(t) * time.Millisecond)
fmt.Println("db adapter sleeped")
// call
return d.Cache.Get(ID)
}
type workMessage struct {
responseChan chan *Entity
ID int64
ctx context.Context
waitChans chan chan *Entity
}
func (m *workMessage) Broadcast(e *Entity) {
for ch := range m.waitChans {
ch <- e
}
}
type Wrapper struct {
sync.RWMutex
inProcess map[int64]*workMessage
cache StorageProvider
db StorageProvider
waitTimeLimit time.Duration
}
func NewWrapper() StorageProvider {
context.Background()
w := &Wrapper{
inProcess: make(map[int64]*workMessage),
cache: NewCache(),
db: NewDBAdapter(),
waitTimeLimit: 500 * time.Millisecond,
}
return w
}
func (d *Wrapper) Set(e *Entity) {
d.Lock()
// pretend cache doesn't stores values
// d.cache.Set(e)
d.Unlock()
d.db.Set(e)
}
func (d *Wrapper) Get(ID int64) *Entity {
e := d.cache.Get(ID)
if e != nil {
return e
}
fmt.Println("G0 lock")
d.Lock()
msg, ok := d.inProcess[ID]
if ok {
d.Unlock()
fmt.Println("G1.1 unlock and Wait here ", msg)
ch := make(chan *Entity)
msg.waitChans <- ch
fmt.Println("G1.2 chan to chans ", msg)
e = <- ch
fmt.Println("G1.3 got it ", e)
return e
}
if !ok {
fmt.Println("G2.1 get from db")
ctx, cancel := context.WithTimeout(context.Background(), d.waitTimeLimit)
defer cancel()
job := &workMessage{make(chan *Entity, 1), ID, ctx, make(chan chan *Entity)}
d.inProcess[ID] = job
d.Unlock()
fmt.Println("G2.2 unlock")
e = d.newDbRequest(job)
fmt.Println("G2.3 broadcast")
go job.Broadcast(e)
fmt.Println("G2.4 unlock")
d.Lock()
delete(d.inProcess, ID)
d.Unlock()
fmt.Println("G2.5 unlock")
return e
}
return d.cache.Get(ID)
}
func (d *Wrapper) newDbRequest(job *workMessage) *Entity {
defer close(job.responseChan)
go d.dbProcessMessages(job.ctx, job)
fmt.Println("r1 newRequest: select")
select {
case out := <-job.responseChan:
fmt.Printf("r2 return %+v\n", out)
return out
// If the context finishes before we could get the result, exit early
case <-job.ctx.Done():
fmt.Println("r3 Context ended before processing message. Err:", job.ctx.Err())
}
return nil
}
func (d *Wrapper) dbProcessMessages(ctx context.Context, job *workMessage) {
fmt.Println("j1, select 1")
select {
// If the context is finished, don't bother processing the
// message.
case <-ctx.Done():
fmt.Println("j2, select 1, ctx done")
return
default:
fmt.Println("j3, select 1, default")
}
e := d.db.Get(job.ID)
fmt.Println("j1, select 2", e)
select {
case <-ctx.Done():
fmt.Println("j4, select 2, ctx done")
case job.responseChan <- e:
fmt.Println("j5, select 2, write response")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment