Skip to content

Instantly share code, notes, and snippets.

@threadedstream
Last active April 7, 2022 18:41
Show Gist options
  • Save threadedstream/5794876a3b6fb7b8960be1cbf94d6f68 to your computer and use it in GitHub Desktop.
Save threadedstream/5794876a3b6fb7b8960be1cbf94d6f68 to your computer and use it in GitHub Desktop.
package main
import (
"sync"
"sync/atomic"
)
var (
sharedRes = 0
count = 0
atomicCount atomic.Value
sentinel atomic.Value
turnstile *sync.Cond
turnstile2 *sync.Cond
turnstileLock sync.Mutex
turnstile2Lock sync.Mutex
countLock sync.Mutex
)
const N = 60
// a simple barrier, page 21
// don't let other threads pass through condition.Wait() unless
// the count value is gte N
func barrier(wg *sync.WaitGroup, gid int) {
defer func() {
println("goroutine ", gid, " exited")
wg.Done()
}()
countLock.Lock()
atomicCount.Store(atomicCount.Load().(int) + 1)
if atomicCount.Load().(int) == N {
turnstile.Broadcast()
}
countLock.Unlock()
turnstileLock.Lock()
for atomicCount.Load().(int) != N {
turnstile.Wait()
}
turnstileLock.Unlock()
// critical point
sharedRes++
}
// reusable barrier, page 31
// Puzzle: Rewrite the barrier solution so that after all the threads have passed
// through, the turnstile is locked again
func reusablebarrier(wg *sync.WaitGroup, gid int) {
defer func() {
println("goroutine ", gid, " exited")
wg.Done()
}()
countLock.Lock()
atomicCount.Store(atomicCount.Load().(int) + 1)
if atomicCount.Load().(int) == N {
turnstile.Broadcast()
sentinel.Store(true)
}
countLock.Unlock()
turnstileLock.Lock()
for sentinel.Load().(bool) != true {
turnstile.Wait()
}
turnstileLock.Unlock()
// CRITICAL POINT
sharedRes++
countLock.Lock()
atomicCount.Store(atomicCount.Load().(int) - 1)
if atomicCount.Load().(int) == 0 {
turnstile2.Broadcast()
}
countLock.Unlock()
turnstile2Lock.Lock()
for atomicCount.Load().(int) != 0 {
turnstile2.Wait()
}
turnstile2Lock.Unlock()
}
func main() {
turnstile = sync.NewCond(&turnstileLock)
turnstile2 = sync.NewCond(&turnstile2Lock)
wg := sync.WaitGroup{}
atomicCount.Store(0)
println("****************** test barrier ****************** ")
wg.Add(N)
for i := 0; i < N; i++ {
go barrier(&wg, i)
}
wg.Wait()
println(sharedRes)
println("****************** barrier passed ****************** ")
println("****************** test reusable barrier ****************** ")
atomicCount.Store(0)
sentinel.Store(false)
wg.Add(N)
for i := 0; i < N; i++ {
go reusablebarrier(&wg, i)
}
wg.Wait()
println(sharedRes)
println("****************** reusable barrier passed ****************** ")
}
package main
import (
"sync"
"time"
)
const N = 5
var chopsticks [N]*sync.Cond
var mu [N]sync.Mutex
func min(x, y int) int {
if x < y {
return x
} else {
return y
}
}
func max(x, y int) int {
if x > y {
return x
} else {
return y
}
}
func philosopher(wg *sync.WaitGroup, i int) {
defer wg.Done()
philosopherLife:
for {
select {
case <-time.After(10 * time.Second):
break philosopherLife
default:
println("philosopher(", i, "): thinking...")
time.Sleep(1 * time.Second)
mu[min(i, (i+1)%5)].Lock()
chopsticks[min(i, (i+1)%5)].Wait()
mu[min(i, (i+1)%5)].Unlock()
mu[max(i, (i+1)%5)].Lock()
chopsticks[max(i, (i+1)%5)].Wait()
mu[max(i, (i+1)%5)].Unlock()
println("philosopher(", i, "): eating...")
time.Sleep(1 * time.Second)
println("philosopher(", i, "): finish eating...")
chopsticks[max(i, (i+1)%5)].Signal()
chopsticks[min(i, (i+1)%5)].Signal()
}
}
}
func main() {
for i := 0; i < N; i++ {
chopsticks[i] = sync.NewCond(&mu[i])
}
wg := sync.WaitGroup{}
wg.Add(N)
for i := 0; i < N; i++ {
go philosopher(&wg, i)
}
wg.Wait()
}
package main
import (
"sync"
"sync/atomic"
)
var sharedData = 0x0
type Semaphore struct {
counter atomic.Value
mu sync.Mutex
lock sync.Mutex
locked atomic.Value
}
func (m *Semaphore) down() {
for m.counter.Load() == 0 {
}
m.counter.Store(m.counter.Load().(int) - 1)
//if m.locked.Load() == true {
// m.mu.Unlock()
// m.locked.Store(false)
//}
//panic("multimutex: unlock of unlocked mutex")
}
func (m *Semaphore) up() {
m.counter.Store(m.counter.Load().(int) + 1)
}
//func modifySharedResource(wg *sync.WaitGroup) {
// defer wg.Done()
// for mutex.counter.Load() == 0 {
// // spin until it acquires a non-zero value
// //println("waiting till the counter becomes gt 0")
// }
// // mutex.Lock()
// down(&mutex.counter)
// // at this point, the scheduler decides that it's a time
// // to place goroutine in a waiting list letting other goroutines
// // do their job.
// sharedData++
// up(&mutex.counter)
//}
var woneCond *sync.Cond
var wtwoCond *sync.Cond
var oneM = &sync.Mutex{}
var twoM = &sync.Mutex{}
func workeronechannels(wg *sync.WaitGroup, done chan bool) {
defer wg.Done()
// starts first
// grab the lock and execute the first statement
println("executing first statement in workerone")
done <- true
<-done
println("executing second statement in workerone")
done <- true
}
func workertwochannels(wg *sync.WaitGroup, done chan bool) {
defer wg.Done()
// wait till the first goroutine finishes
// grab the lock and execute the first statement
<-done
println("executing first statement in workertwo")
done <- true
//time.Sleep(1 * time.Second)
<-done
// release the held lock
println("executing second statement in workertwo")
done <- true
}
func workerone(wg *sync.WaitGroup) {
defer wg.Done()
// starts first
// grab the lock and execute the first statement
println("executing first statement in workerone")
woneCond.Signal()
twoM.Lock()
wtwoCond.Wait()
twoM.Unlock()
println("executing second statement in workerone")
woneCond.Signal()
}
func workertwo(wg *sync.WaitGroup) {
defer wg.Done()
// wait till the first goroutine finishes
// grab the lock and execute the first statement
oneM.Lock()
woneCond.Wait()
oneM.Unlock()
println("executing first statement in workertwo")
wtwoCond.Signal()
oneM.Lock()
woneCond.Wait()
oneM.Unlock()
//time.Sleep(1 * time.Second)
// release the held lock
println("executing second statement in workertwo")
}
func workeronebook(wg *sync.WaitGroup) {
defer wg.Done()
println("statement 1 in workerone")
woneCond.Signal()
twoM.Lock()
wtwoCond.Wait()
twoM.Unlock()
println("statement 2 in workerone")
}
func workertwobook(wg *sync.WaitGroup) {
defer wg.Done()
println("statement 1 in workertwo")
wtwoCond.Signal()
oneM.Lock()
woneCond.Wait()
oneM.Unlock()
println("statement 2 in workertwo")
}
const GS = 0x10
func main() {
woneCond = sync.NewCond(oneM)
wtwoCond = sync.NewCond(twoM)
wg := sync.WaitGroup{}
wg.Add(2)
go workeronebook(&wg)
go workertwobook(&wg)
wg.Wait()
println(sharedData)
}
package main
import (
"sync"
"sync/atomic"
"time"
)
// 3.8 Queue page 45
//Semaphores can also be used to represent a queue. In this case, the initial value
//is 0, and usually the code is written so that it is not possible to signal unless
//there is a thread waiting, so the value of the semaphore is never positive.
//For example, imagine that threads represent ballroom dancers and that two
//kinds of dancers, leaders and followers, wait in two queues before entering the
//dance floor. When a leader arrives, it checks to see if there is a follower waiting.
//If so, they can both proceed. Otherwise it waits.
//Similarly, when a follower arrives, it checks for a leader and either proceeds
//or waits, accordingly.
//Puzzle: write code for leaders and followers that enforces these constraints.
const (
LeaderIdx = 0x0
FollowerIdx = 0x1
RendezvousIdx = 0x2
Extra = 0x3
)
var (
leaderReady atomic.Value
followerReady atomic.Value
leaders atomic.Value
followers atomic.Value
rendezvousMet atomic.Value
conditions []*sync.Cond
locks []sync.Mutex
)
func leaderQueue(id int) {
println("leader(", id, "): arrived at a dance floor")
println("leader(", id, "): checking to see whether there's a waiting follower")
locks[FollowerIdx].Lock()
for followerReady.Load().(bool) != true {
conditions[FollowerIdx].Wait()
}
locks[FollowerIdx].Unlock()
println("leader(", id, "): one is available! Dancing...")
leaderReady.Store(false)
followerReady.Store(false)
time.Sleep(2 * time.Second)
println("leader(", id, "): finish dancing. Leaving the dance floor")
leaderReady.Store(true)
followerReady.Store(true)
conditions[LeaderIdx].Signal()
conditions[FollowerIdx].Signal()
}
func followerQueue(id int) {
println("follower(", id, "): arrived at a dance floor")
println("follower(", id, "): checking to see whether there's a waiting leader")
locks[LeaderIdx].Lock()
for leaderReady.Load().(bool) != true {
conditions[LeaderIdx].Wait()
}
locks[LeaderIdx].Unlock()
println("follower(", id, "): one is available! Dancing...")
followerReady.Store(false)
leaderReady.Store(false)
time.Sleep(2 * time.Second)
println("follower(", id, "): finish dancing. Leaving the dance floor")
followerReady.Store(true)
leaderReady.Store(true)
conditions[LeaderIdx].Signal()
conditions[FollowerIdx].Signal()
}
// adapted version from a book
func leaderQueue2(id int) {
conditions[FollowerIdx].Signal()
locks[LeaderIdx].Lock()
for followerReady.Load().(bool) != true {
conditions[LeaderIdx].Wait()
}
locks[LeaderIdx].Unlock()
// dance
leaderReady.Store(false)
followerReady.Store(false)
println("leader(", id, "): dancing...")
time.Sleep(1 * time.Second)
followerReady.Store(true)
leaderReady.Store(false)
}
func followerQueue2(id int) {
conditions[LeaderIdx].Signal()
locks[FollowerIdx].Lock()
for leaderReady.Load().(bool) != true {
conditions[FollowerIdx].Wait()
}
locks[FollowerIdx].Unlock()
// dance
leaderReady.Store(false)
followerReady.Store(false)
println("follower(", id, "): dancing...")
time.Sleep(1 * time.Second)
followerReady.Store(true)
leaderReady.Store(false)
}
// exclusive queue solution from the book
func exclusiveQueueLeader(id int) {
locks[Extra].Lock()
if followers.Load().(int) > 0 {
followers.Store(followers.Load().(int) - 1)
conditions[FollowerIdx].Signal()
} else {
leaders.Store(leaders.Load().(int) + 1)
locks[Extra].Unlock()
locks[LeaderIdx].Lock()
conditions[LeaderIdx].Wait()
locks[LeaderIdx].Unlock()
}
//dancing
println("leader(", id, "): dancing...")
locks[RendezvousIdx].Lock()
conditions[RendezvousIdx].Wait()
locks[RendezvousIdx].Unlock()
locks[Extra].Unlock()
rendezvousMet.Store(false)
}
func exclusiveQueueFollower(id int) {
locks[Extra].Lock()
if leaders.Load().(int) > 0 {
leaders.Store(leaders.Load().(int) - 1)
conditions[LeaderIdx].Signal()
} else {
followers.Store(followers.Load().(int) + 1)
locks[Extra].Unlock()
locks[FollowerIdx].Lock()
conditions[FollowerIdx].Wait()
locks[FollowerIdx].Unlock()
}
// dance
println("follower(", id, "): dancing...")
conditions[RendezvousIdx].Signal()
}
func main() {
conditions = make([]*sync.Cond, 4)
locks = make([]sync.Mutex, 4)
for i := 0; i < 4; i++ {
conditions[i] = sync.NewCond(&locks[i])
}
//leaderReady.Store(true)
//followerReady.Store(true)
followers.Store(0)
leaders.Store(0)
rendezvousMet.Store(false)
leaderId := 0
followerId := 0
for {
go exclusiveQueueLeader(leaderId)
go exclusiveQueueFollower(followerId)
leaderId++
followerId++
time.Sleep(1 * time.Second)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment