Skip to content

Instantly share code, notes, and snippets.

@jmingov
Forked from neon520/ppwt2.go
Last active June 18, 2018 21:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jmingov/e5f1795316ad1e56d45098d8204cc8ff to your computer and use it in GitHub Desktop.
Save jmingov/e5f1795316ad1e56d45098d8204cc8ff to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"log"
"math/rand"
"time"
)
type Threader struct {
TIME_LIMIT time.Time
numWorkers int
jobs int
jobsProduced int
jobsWorking int
doneProducing chan bool
doneReceiving chan bool
pwCounterChan chan int
pwChan chan int
wrChan chan int
errChan chan error
initTime time.Time
}
const TIME_LIMIT = time.Second
func main() {
mimt := Threader{}
mimt.Init(4, 27)
}
func (tr *Threader) Init(numWorkers int, jobs int) {
tr.pwCounterChan = make(chan int)
tr.doneProducing = make(chan bool)
tr.doneReceiving = make(chan bool)
tr.pwChan = make(chan int)
tr.wrChan = make(chan int)
tr.errChan = make(chan error)
tr.initTime = time.Now()
tr.numWorkers = numWorkers
tr.jobsWorking = 0
log.Printf("Init %p", &tr.jobsWorking)
tr.jobs = jobs
// start producing jobs
go tr.produce()
// start waiting for jobs done
go tr.receiver()
// start workers for jobs
for i := 0; i < tr.numWorkers; i++ {
go tr.work()
}
// wait for done producing
<-tr.doneProducing
}
func (tr *Threader) produce() {
// current job
i := 0
// marker for timing
tr.jobsProduced = 0
// start tracking time passed producing
t := time.Now()
log.Printf("Jobs Working %p", &tr.jobsWorking)
javi := 0
exit := 0
for {
for {
select {
case n := <-tr.pwCounterChan:
javi += n
fmt.Println("received javi", n, "jval", javi)
default:
exit = 1
}
if exit == 1 {
exit = 0
break
}
}
if tr.jobsProduced == tr.numWorkers {
// if moreThanTimeLimit has passed
if moreThanTL(t) {
// reset
tr.jobsProduced = javi
t = time.Now()
}
} else {
// normal round, send work
log.Printf("Sending %d", i)
// send job
tr.pwChan <- i
// add to counters
i += 1
tr.jobsProduced += 1
}
// we finished doing our jobs, quit producing
if i == tr.jobs {
//log.Printf("Rondas done %d", i)
break
}
}
end(tr.initTime, "PRODUCE")
// make main wait for all to finish
tr.doneProducing <- <-tr.doneReceiving
}
func (tr *Threader) work() {
// the routine receives the data,
// sleeps for a random time
// and sends response to receiver (wrChan)
var mi_rand int = rand.Intn(7)
for {
resp := <-tr.pwChan
// do work
tr.pwCounterChan <- 1
if mi_rand%3 == 0 {
//log.Printf("mi_rand %d", mi_rand)
time.Sleep(10 * (time.Second))
}
tr.pwCounterChan <- -1
// rend response
tr.wrChan <- resp
}
}
func (tr *Threader) receiver() {
// waits for msgs from workers
for {
u := <-tr.wrChan
log.Printf("Received %d", u)
// all workers finished, quit
if u == (tr.jobs - 1) {
break
}
}
end(tr.initTime, "RECEIVE")
tr.doneReceiving <- true
}
////
// helpers
////
func moreThanTL(t time.Time) bool {
elapsed := time.Since(t)
if TIME_LIMIT < elapsed {
return true
}
return false
}
func end(t time.Time, s string) {
elapsed := time.Since(t)
log.Printf("%s took %s", s, elapsed)
}
func random(min, max int) int {
rand.Seed(time.Now().Unix())
return rand.Intn(max-min) + min
}
@jmingov
Copy link
Author

jmingov commented Jun 18, 2018

fixme. El fin del loop del producer esta desincronizado con el loop que consume javi. Cuando el producer termina cierra el bucle y deja el channel tr.pwCounterChan pendiente de ser consumido.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment