Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save aishraj/72fbed1c1e433e09699495e1407f008a to your computer and use it in GitHub Desktop.
Save aishraj/72fbed1c1e433e09699495e1407f008a to your computer and use it in GitHub Desktop.
Notes on implementing synchronization techniques using semaphores, in rust, from The Little Book of Semaphores

Link to the book

Semaphore implementation in rust taken from the previously deprecated std-semaphore

Chapter 3

3.3 Rendezvous

If you have two threads, executing the following:

Thread A Thread B
statement a1
statement a2
statement b1
statement b2

the goal is to enforce that a1 happens before b2 and b1 happens before a2. There is no order constraint between a1 and b1. Create two semaphores, aArrived and bArrived and order them like this:

Thread A Thread B
statement a1
aArrived.signal()
bArrived.wait()
statement a2
statement b1
bArrived.signal()
aArrived.wait()
statement b2

Rust remark: It seems like the proper way to pass semaphores to threads is by wrapping them in Arc and cloning them, ie

let sem_a = Arc::new(Semaphore::new(0));
let sem_b = Arc::new(Semaphore::new(0));
let sem_a2 = sem_a.clone();
let sem_b2 = sem_b.clone();

let t = thread::spawn(move || {
    println!("statement b1");
    sem_b2.release();
    let _g = sem_a2.access();
    println!("statement b2");
});

basically. Note, the semaphores are initialized with count 0, so in this sense whatever resource they represent is already acquired.

3.4 Mutex

The situation requires that a resource only be accessed by a single thread at a time. Pretty straightforward, create a semaphore initialized to 1 and make each thread attempt to acquire it, and subsequently release it when they're done.

let mutex = Arc::new(Semaphore::new(1));

3.5 Multiplex

This generalizes the mutex problem such that multiple threads may run in the critical section at the same time, but with an upper limit n of threads. The solution is to initialize a semaphore to n,

const NUM_THREADS_ALLOWED: isize = 5;
let multiplex = Arc::new(Semaphore::new(NUM_THREADS_ALLOWED));
let mut handles = vec![];

for i in 0..10 {
    let t_multiplex = multiplex.clone();
    let handle = thread::spawn(move || {
        let _g = t_multiplex.access();
        // critical section
    });
    handles.push(handle);
}
for handle in handles {
    handle.join().unwrap();
}

3.6 Barrier

Generalizing the rendezvous problem, we seek to make n threads wait until they've all reached the critical section before proceeding through it. That is, the first n-1 threads will block until the nth thread arrives, then proceed. The solution requires:

n = number of threads
count = 0
mutex
barrier

where the barrier is a semaphore that is locked (zero or negative) but unlocks when all n threads arrive. As a thread gets to the critical section, it tries to acquire the mutex, when it does it increments count and checks if it is equal to n; if it is then unlock the barrier and mutex, otherwise unlock the mutex and proceed to the turnstile pattern which is

barrier.wait()
barrier.signal()

There are several opportunities to get a deadlock, if instead of the turnstile pattern, it was simply barrier.wait() we can imagine that when the nth thread signals the barrier the value goes from -n+1 to -n+2, a single thread will wake/unblock and proceed leaving the other threads blocked. Another deadlock can occur if the turnstile is placed inside the mutex. In fact, waiting on a semaphore inside of a mutex is a classic deadlock pattern. We can imagine a thread entering the mutex and blocking on the semaphore; because the mutex is locked no other thread can enter and increment the count.

The technique of counting the number of threads is useful and can be generalized outside of turnstiles/barriers, as we will see in chapter 4 with writers and readers.

Rust implementation:

const NUM_THREADS_ALLOWED: isize = 5;
let barrier = Arc::new(Semaphore::new(0));
let mutex = Arc::new(Semaphore::new(1));
let counter = Arc::new(RwLock::new(0));
let mut handles = vec![];

for i in 0..NUM_THREADS_ALLOWED {
    let t_barrier = barrier.clone();
    let _t_mutex = mutex.clone();
    let t_counter = counter.clone();
    let handle = thread::spawn(move || {
        let mut num = t_counter.write().unwrap();
        *num += 1;
        if *num == NUM_THREADS_ALLOWED {
            t_barrier.release();
        }
        drop(num);
        let _g = t_barrier.access();
        // critical section
    });
    handles.push(handle);
}
for handle in handles {
    handle.join().unwrap();
}

Rust remark: In this case it's more convenient to use RwLock. Unlike AtomicUsize or its counterparts, I think it requires a single reference, when wrapped in Arc to be able to read it. The mutex is redundant as RwLock basically acts like one. Also, Semaphore::access returns an RAII guard that releases when dropped, which is why no explicit call to release is made.

3.7 Reusable barrier

Building off of the barrier problem, once all the threads are past the barrier it remains unlocked. We would like to lock the barrier when the threads have gone through. The naive solution would be to add code after the critical section that reflects the barrier code: a thread acquires a mutex, decrements count, releases the mutex, then checks if counter == 0 and if so tries to acquire the barrier. Note that we're assuming this code is running in a loop, probably.

This actually answers the question of what happens when you check the counter outside of the mutex. It's possible, though perhaps not too likely, that a thread, say n-1th, will be interrupted right before it checks counter == 0. When the nth thread releases the mutex after decrementing counter, both threads will see that counter == 0 and both will block, creating a deadlock. Moving the if statement into the mutex solves the deadlock problem for this case, but there's still the possibility of a thread passing through the mutex, looping back and acquiring the mutex to increment the counter, effectively possiblty lapping the other threads, thus defeating the purpose of a generalized rendezvous synchronization.

The eventual solution is to have a second turnstile whose locked state is opposite that of the first. In the first mutex when we check if counter == n we lock the second turnstile, and unlock the first. The threads pass through into the critical section and then wait for the nth thread to get into the mutex to then lock the first turnstile and unlock the second.

Rust implementation:

const NUM_THREADS_ALLOWED: isize = 2;
let barrier = Arc::new(Semaphore::new(0));
let turnstile = Arc::new(Semaphore::new(1));
let mutex = Arc::new(Semaphore::new(1));
let counter = Arc::new(RwLock::new(0));
let mut handles = vec![];

for i in 0..NUM_THREADS_ALLOWED {
    let t_barrier = barrier.clone();
    let _t_mutex = mutex.clone();
    let t_counter = counter.clone();
    let t_turnstile = turnstile.clone();
    let handle = thread::spawn(move || {
        for j in 0..2 {
            let mut num = t_counter.write().unwrap();
            *num += 1;
            if *num == NUM_THREADS_ALLOWED {
                t_turnstile.acquire();
                t_barrier.release();
            }
            drop(num);
            t_barrier.acquire();
            t_barrier.release();
                
            // critical section

            let mut num = t_counter.write().unwrap();
            *num -= 1;
            if *num == 0 {
                t_barrier.acquire();
                t_turnstile.release();
            }
            drop(num);
            t_turnstile.acquire();
            t_turnstile.release()
        }


    });
    handles.push(handle);
}
for handle in handles {
    handle.join().unwrap();
}

Rust remark: Here, the first turnstile is called barrier because I mostly copied code from the barrier solution.

We can make a couple statements about the correctness of the solution:

  1. Only the nth thread can lock or unlock the turnstiles.
  2. Before a thread can unlock the first turnstile, it has to close the second, and vice versa; it is therefore impossible for a thread to get ahead of the others by more than one turnstile.

3.8 Queues

Didn't do that yet.

Chapter 4

4.1 Producer-consumer problem

Unlike previously where each thread was homogeneous, here threads are separated into two categories: producers and consumers. Producers do something like

event = waitForEvent()
buffer.add(event)

and consumers do

event = buffer.get()
event.process()

Naturally, we need each thread to have exclusive access to the buffer when they interact with it. We further want each consumer thread to block if there are no events in the buffer. Having said that, waitForEvent and event.process can run concurrently.

We definitely want a mutex to protect the buffer, we also want a semaphore for the events in the buffer where the count of the semaphore will represent the items. This solution actually illustrates a property of semaphores that might not be immediately obvious: there's nothing preventing a thread from signaling/releasing as many times as they want. There's no constraint that a semaphore must be decremented/wait/acquired prior.

So our producer will look something like this

event = waitForEvent()
mutex.wait()
	buffer.add(event)
mutex.signal()
items.signal()

Note that we did not signal items inside the mutex. We'll see why after the consumer code here:

items.wait()
mutex.wait()
	event = buffer.get()
mutex.signal()
event.process()

What happens if items was signaled inside the mutex? Let's say the consumer thread unblocks immediately after being signaled, it then immediately blocks on the mutex still held by the producer. Waking/sleeping/blocking/unblocking are expensive operations so it's better to signal after it's known that a consumer can proceed. This doesn't preclude that a producer will beat it to the mutex though.

There's a slight issue with the consumer though, it's quite possible that multiple consumers will block on the items.wait() line, in which case the items semaphore that's supposed to keep track of events in the buffer will be inaccurate. So we might address that by checking the buffer inside the mutex. This of course is the deadlock pattern we saw previously:

mutex.wait()
	items.wait()
    event = buffer.get()
mutex.signal()
event.process()

ie, blocking on a semaphore inside a mutex.

Rust implementation:

struct Event {}

impl Event {
    fn process(&self) {
        println!("processing an event");
    }
}

fn wait_for_event() -> Event {
    thread::sleep(time::Duration::from_secs(2));
    Event {}
}

fn main() {
    let mutex = Arc::new(Semaphore::new(1));
    let items = Arc::new(Semaphore::new(0));
    let buffer = Arc::new(Mutex::new(Vec::new()));
    let mut handles = vec![];

    let mutex1 = mutex.clone();
    let items1 =  items.clone();
    let buffer1 = buffer.clone();
    let handle1 = thread::spawn(move || {
        loop {
            let event = wait_for_event();
            mutex1.acquire();
            let mut buf = buffer1.lock().unwrap();
            buf.push(event);
            mutex1.release();
            items1.release();
        }
    });
    handles.push(handle1);

    let mutex2 = mutex.clone();
    let items2 = items.clone();
    let buffer2 = buffer.clone();
    let handle2 = thread::spawn(move || {
        loop {
            items2.acquire();
            mutex2.acquire();
            let mut buf = buffer2.lock().unwrap();
            let event = buf.pop().unwrap();
            mutex2.release();
            event.process();
        }
    });
    handles.push(handle2);

    for handle in handles {
        handle.join().unwrap();
    }
}

Rust remark: Unwrapping a Mutex returns a MutexGuard which implements Deref and DerefMut so deref coercion comes into play, which is nice.

Suppose now we want a finite buffer, call the limit bufferSize, if only we could do something like

if items >= bufferSize:
	block()

but we can't because the semaphore doesn't let us do that, only wait/acquire and signal/release. We could try a mutex coupled with a counter protected by another mutex, but that seems like a path towards pain. Turns out we can simply initialize a semaphore to the buffer size and acquire it each time a producer adds to it, and release it when a consumer pops it off.

Rust implementation:

// snip
let spaces = Arc::new(Semaphore::new(5));
let handle1 = thread::spawn(move || {
    loop {
        let event = wait_for_event();
        spaces1.acquire();
        mutex1.acquire();
        let mut buf = buffer1.lock().unwrap();
        buf.push(event);
        mutex1.release();
        items1.release();
    }
});
handles.push(handle1);

let mutex2 = mutex.clone();
let items2 = items.clone();
let buffer2 = buffer.clone();
let spaces2 = spaces.clone();
let handle2 = thread::spawn(move || {
    loop {
        thread::sleep(time::Duration::from_secs(2));
        items2.acquire();
        mutex2.acquire();
        let mut buf = buffer2.lock().unwrap();
        let event = buf.pop().unwrap();
        mutex2.release();
        spaces2.release();
        event.process();
    }
});
// snip

4.2 Readers-writers problem

This is similar to the producer-consumer problem but with a twist

  1. Any number of readers can be in the critical section simultaneously.
  2. Writers must have exclusive access to the critical section.

Clearly we'll need a mutex for the writers, but we need something else to deal with the readers. A mutex spanning the crticial section won't work because we need readers to enter it simultaneously. What about a queue like in the producer-consumer problem? That might work, but how do we prevent a writer from queuing up?

There's a good chance we might go about thinking of the problem in the wrong way and get stuck. It's not necessarily a bad idea to think about the critical section as a room. In this mental model, if a writer gets to the room first, it can go in but tells every other thread to wait until it leaves. But let's generalize that, if a writer gets to the room and sees that it is empty it can go in. When a reader gets to the room and sees that it is empty it can go in. If a reader gets to the room and sees that there is another reader in, it can also go in. So we have a nice binary condition that applies to both readers and writers, that is whether the room is empty or not. Naively, our writer code might be something like this:

roomEmpty.wait()
	critical section
roomEmpty.signal()

because writers want exclusive access. So to determine if the room is empty or not, all a reader has to do is roomEmpty.wait() but that would give it exclusive access to it, and that's ok if it's the first one. But how does the next reader know if there's a reader already inside? Again, semaphores don't let you check their value, so we need to keep track of it some other way. We already know how to do this from the barrier problem, we'll have a shared counter called readers, which means we'll need a mutex to protect it. Naively we might do something like this

mutex.wait() 
	if readers > 0: // if it's greater than 0, a reader is in the room so we just increment and go
    	readers += 1
    else
    	readers += 1
        roomEmpty.wait() // first one in locks against writers
mutex.signal()

// critical section

mutex.wait()
	readers -= 1
    if readers == 0:
    	roomEmpty.signal() // last one out unlocks
mutex.signal()

but we can see that we can refactor the first mutex since we're incrementing in both branches.

mutex.wait()
	readers += 1
    if readers == 1:
    	roomEmpty.wait()
mutex.signal()
// snip

The refactor changes our mental model just a little bit. If we think of the mutex as making readers queue up in front of the room, previously when it was a reader's turn (ie, when it acquires the mutex), it would check if a reader was inside and if so enter, otherwise wait for the room to be empty. With the refactor, when it's the reader's turn it just checks if it's the first reader in line, if it is then try and get the room, otherwise just go in.

Some statements we can make:

  • Only one reader may queue for the room, but several writers might.
  • When a reader signals roomEmpty the room must be empty.

Turns out this pattern is called a Lightswitch, seems kind of obvious when you think about it. Instead of the room being occupied or empty, the light is on or off, and of course the last one to leave turns out the lights.

Rust implementation:

const NUM_WRITERS: isize = 2;
const NUM_READERS: isize = 10;
let mut handles = vec![];

let mutex = Arc::new(Semaphore::new(1));
let room_empty = Arc::new(Semaphore::new(1));
let readers = Arc::new(RwLock::new(0));

for i in 0..NUM_WRITERS {
    let w_room_empty = room_empty.clone();
    let handle = thread::spawn(move || {
        loop {
            let _g = w_room_empty.access();
            println!("writer entering and leaving critical section");
        }
    });
    handles.push(handle);
}

for i in 0..NUM_READERS {
    let r_mutex = mutex.clone();
    let r_room_empty = room_empty.clone();
    let r_readers = readers.clone();

    let handle = thread::spawn(move || {
        loop {
            r_mutex.acquire();
            let mut num = r_readers.write().unwrap();
            *num += 1;
            if *num == 1 {
                r_room_empty.acquire()
            }
            drop(num);
            r_mutex.release();

            println!("reader: i'm in the critical section");

            r_mutex.acquire();
            let mut num = r_readers.write().unwrap();
            *num -= 1;
            if *num == 0 {
                r_room_empty.release();
            }
            drop(num);
            r_mutex.release();
        }
    });
    handles.push(handle);
}
for handle in handles {
    handle.join().unwrap();
}

We can implement a Lightswitch fairly easily as long as we keep some things in mind:

pub struct Lightswitch {
    counter: Mutex<usize>,
    mutex: Semaphore
}

impl Lightswitch {
    pub fn new(count: usize) -> Lightswitch {
        Lightswitch { counter: Mutex::new(count), mutex: Semaphore::new(1)}
    }

    pub fn lock(&self, semaphore: Arc<Semaphore>) {
        let _g = self.mutex.access();
        let mut num = self.counter.lock().unwrap();
        *num += 1;
        if *num == 1 {
            semaphore.acquire();
        }
    }

    pub fn unlock(&self, semaphore: Arc<Semaphore>) {
        let _g = self.mutex.access();
        let mut num = self.counter.lock().unwrap();
        *num -= 1;
        if *num == 0 {
            semaphore.release();
        }
    }
}

The first thing is that Arc only gives an immutable borrow. If we declare a mutable variable as a clone of Arc<Lightswitch> it still doesn't give a mutable borrow. If we didn't know that we might be tempted to use &mut self in the trait methods, after all it would let us do self.counter += 1 without any fuss. The problem shows up when we try to compile and we'd get cannot borrow immutable borrowed content as mutable because our method expects a mutable borrow but Arc only gives us immutable borrows. So we need interior mutability, which is not as straightforward in a multithreading context. This awesome blog post series explains interior mutability in a variety of contexts. The basic idea is Rust requires variables being shared between threads, as Arc lets us do, to be thread safe. There are a few options as listed in the blog post, for this purpose Mutex is as good as any.

With all of that said and done, let's rewrite our solution using our Lightswitch type, the reader part will be substantially simpler

const NUM_WRITERS: isize = 2;
const NUM_READERS: isize = 10;
let mut handles = vec![];

let room_empty = Arc::new(Semaphore::new(1));
let lightswitch = Arc::new(Lightswitch::new(0));

for _i in 0..NUM_WRITERS {
    let w_room_empty = room_empty.clone();
    let handle = thread::spawn(move || {
        loop {
            let _g = w_room_empty.access();
            println!("writer entering and leaving critical section");
        }
    });
    handles.push(handle);
}

for _i in 0..NUM_READERS {
    let r_lightswitch = lightswitch.clone();
    let r_room_empty = room_empty.clone();

    let handle = thread::spawn(move || {
        loop {
            r_lightswitch.lock(r_room_empty.clone());
            println!("reader: i'm in the critical section");
            r_lightswitch.unlock(r_room_empty.clone());
        }
    });
    handles.push(handle);
}
for handle in handles {
    handle.join().unwrap();
}

Let's note that we could also implement a LightswitchGuard like the semaphore counterpart so that we wouldn't have to manually unlock.

It turns out that while we we don't get deadlock, we do get starvation. It's very likely that readers will continue to queue back up after leaving the critical section without ever there being a 'last' reader to turn the lightswitch off. So we probably want a way for writers to 'preempt' a reader. How do we do that? Let's list out the constraints we currently implemented. We have a roomEmpty mutex that writers compete with each other to acquire, as well as only a single reader. When a reader acquires it, it turns on the switch and lets all the other readers go in until the last one turns it off, if that ever happens. There's no real constraint on who goes in, it's simply all the readers. We want to add a constraint that both readers and writers should have to go through but that would also be lockable by a writer after its gone through, so that it can wait on the room to drain of the readers and allow it to eventually enter the room. We've used it before, it's the turnstile pattern. Readers will wait and then signal in succession, but writers will wait and then when they acquire roomEmpty will go through their critical section, and only then signal the turnstile.

In the book's pseudocode the writer becomes

turnstile.wait()
	roomEmpty.wait()
    // critical section
turnstile.signal()
roomEmpty.signal()

and the reader becomes

turnstile.wait()
turnstile.signal()

readSwitch.lock(roomEmpty)
	// critical section
readSwitch.unlock(roomEmpty)

Rust implementation:

const NUM_WRITERS: isize = 10;
const NUM_READERS: isize = 10;
let mut handles = vec![];

let room_empty = Arc::new(Semaphore::new(1));
let read_switch= Arc::new(Lightswitch::new(0));
let turnstile = Arc::new(Semaphore::new(1));

for i in 0..NUM_WRITERS {
    let w_room_empty = room_empty.clone();
    let w_turnstile = turnstile.clone();
    let handle = thread::spawn(move || {
        loop {
            w_turnstile.acquire();
            let _g = w_room_empty.access();
            println!("writer {} entering and leaving critical section", i);
            w_turnstile.release();
        }
    });
    handles.push(handle);
}

for i in 0..NUM_READERS {
    let r_read_switch= read_switch.clone();
    let r_room_empty = room_empty.clone();
    let r_turnstile = turnstile.clone();
    let handle = thread::spawn(move || {
        loop {
            r_turnstile.acquire();
            r_turnstile.release();
            r_read_switch.lock(r_room_empty.clone());
            println!("reader {}: i'm in the critical section", i);
            r_read_switch.unlock(r_room_empty.clone());
        }
    });
    handles.push(handle);
}
for handle in handles {
    handle.join().unwrap();
}

Running this shows that there is no more starvation, but it doesn't address any sort of fairness. Let's recall that the writers are competing against a single reader for roomEmpty. When NUM_WRITERS is small compared to NUM_READERS we see more balance, less as the number of writers increase.

We now want to give more priority to writers. The extra condition we seek to impose is that once a writer arrives, no readers be allowed to enter until all writers have left the critical section. This is basically asking us to impose a lightswitch constraint on writers. Do we build off of our no-starve solution? That is, with a roomEmpty mutex, a turnstile, and a lightswitch for readers, can we simply add a lightswitch for writers? Probably not, if a writer and a reader are waiting on the turnstile while a writer is in its critical section, it's possible that a reader will proceed next which violates our condition.

We still need two lightswitches, but the turnstile won't work. Can we get away with the two lightswitches and a mutex? Not quite, with just one mutex being used by the two lightswitches, the writers will enter the critical section at the same time. So we need at least two mutexes, one that both readers and writers will compete for, and then another for writers to compete for among themselves. The book's mutex names in its hint are noReaders and noWriters.

Let's look at what the rust code would look like for the writers

w_write_switch.lock(w_no_readers);
w_no_writers.acquire();
println!("writer {}: in the critical section", i);
w_no_writers.release();
w_write_switch.unlock(w_no_readers);

The key thing here is that a writer will hold both mutexes in the critical section, because a writer has to have exclusive access to it across all threads. We also know that a reader will not hold the no_reader mutex in its critical section because otherwise it wouldn't satisfy the requirement of allowing readers in the critical section concurrently. In the event that a writer might get to the no_reader mutex first, a reader should try and acquire it right away. The readers code is just as straightforward

r_no_readers.acquire();
r_read_switch.lock(r_no_writers.clone());
r_no_readers.release();
println!("reader {}: in the critical section", i);
r_read_switch.unlock(r_no_writers.clone());

And the full implementation

const NUM_WRITERS: isize = 2;
const NUM_READERS: isize = 10;
let mut handles = vec![];

let no_readers= Arc::new(Semaphore::new(1));
let no_writers = Arc::new(Semaphore::new(1));
let read_switch= Arc::new(Lightswitch::new(0));
let write_switch= Arc::new(Lightswitch::new(0));

for i in 0..NUM_WRITERS {
    let w_no_readers = no_readers.clone();
    let w_no_writers = no_writers.clone();
    let w_write_switch = write_switch.clone();
    let handle = thread::spawn(move || {
        loop {
            w_write_switch.lock(w_no_readers.clone());
            w_no_writers.acquire();
            println!("writer {}: in the critical section", i);
            w_no_writers.release();
            w_write_switch.unlock(w_no_readers.clone());
        }
    });
    handles.push(handle);
}

for i in 0..NUM_READERS {
    let r_no_readers = no_readers.clone();
    let r_no_writers = no_writers.clone();
    let r_read_switch = read_switch.clone();
    let handle = thread::spawn(move || {
        loop {
            r_no_readers.acquire();
            r_read_switch.lock(r_no_writers.clone());
            r_no_readers.release();
            println!("reader {}: in the critical section", i);
            r_read_switch.unlock(r_no_writers.clone());
        }
    });
    handles.push(handle);
}
for handle in handles {
    handle.join().unwrap();
}

As it turns out, this solution gives us the opposite problem we had before, now readers will starve.

4.3 No-starve mutex

As the book describes it, we're working with weak semaphores, so we don't have the guarantee of a bounded number of threads being woken up before a given thread gets woken up itself. So how do we go about attacking the problem? If we want to bound the number of threads we need a way to constrain access to the critical section. If we're talking about bounding, we probably are going to need a counter, and therefore a mutex to protect it. The first time we counted threads was in the barrier problem which we then generalized to be able to reuse it. The solution to that was a second turnstile whose locked state was opposite the first.

Now, barriers and turnstiles aren't meant to solve mutual exclusion by themselves, but I brought them up because I wanted to point out that the addition of a turnstile was used in the reader-writer problem for writers to preempt the readers. In this case however, we can't assume our threads are heterogeneous, but what if we could write our code as if they were? We could just use a second counter, and incrementing/decrementing a given counter would be tantamount to a thread declaring itself as that category of thread.

In the preemptible readers-writers problem the readers went through a turnstile that the writers could lock so they could get their turn. In our situation we want both categories to be able to lock the others from proceeding, so we probably want two turnstiles in total. To sum up, we have two counters, say room1 and room2, two turnstiles, and a mutex. The mutex was for protecting the counter when we thought we only needed one, do we need more mutexes? Let's try and think it through. The first thing is, how do we divide the threads up into each room? We could try something like, odd numbered threads are in room1 and even numbered threads in room2.

Already we should stop ourselves and wonder when have we ever used metadata about the threads. The anwser is that we haven't, so we're already on the wrong track. In fact, the idea of two different types or categories of threads trying to preempt each other after a certain number of threads have gone through is mostly wrong. The actual solution is more like a two stage pipeline.

The basic idea is that all threads try to enter room1 by incrementing the counter protected via the mutex. Then they enter the first turnstile, and while inside increment room2 then acquire the mutex. Once acquired, decrement room1 and check if it's equal to 0. If it is, release the mutex and unlock the second turnstile, otherwise release the mutex and exit the first turnstile. Then, try and enter the second turnstile, once in, decrement room2 which does not have a mutex to protect it, it's protected by the second turnstile. Once inside the second turnstile decrement room2 and execute the critical section. Afterwards check if room2 is empty, if so unlock the first turnstile, otherwise exit the turnstile by signaling.

Starvation prevention occurs because all threads exit the second turnstile and loop back to the beginning can only get as far as room1 as long as there are threads in room2. This ensures that if there are n threads, a thread will have to wait for at most n-1 other threads to complete before they get their chance to proceed.

Rust implementation:

const NUM_THREADS: isize = 10;
let mutex = Arc::new(Semaphore::new(1));
let room1 = Arc::new(RwLock::new(0));
let room2 = Arc::new(RwLock::new(0));
let turnstile1 = Arc::new(Semaphore::new(1));
let turnstile2 = Arc::new(Semaphore::new(0));
let mut handles = vec![];

for i in 0..NUM_THREADS {
    let t_mutex = mutex.clone();
    let t_room1 = room1.clone();
    let t_room2 = room2.clone();
    let t_turnstile1 = turnstile1.clone();
    let t_turnstile2 = turnstile2.clone();

    let handle = thread::spawn(move || {
        loop {
            {
                let _g = t_mutex.access();
                let mut num = t_room1.write().unwrap();
                *num += 1;
            }
            t_turnstile1.acquire();
            {
                let mut num = t_room2.write().unwrap();
                *num += 1;
            }
            t_mutex.acquire();
            {
                let mut num = t_room1.write().unwrap();
                *num -= 1;
                if *num == 0 {
                    t_mutex.release();
                    t_turnstile2.release();
                } else {
                    t_mutex.release();
                    t_turnstile1.release();
                }
            }
            t_turnstile2.acquire();
            {
                let mut num = t_room2.write().unwrap();
                *num -= 1;
                println!("thread {}: I am in the critical section", i);
                if *num == 0 {
                    t_turnstile1.release();
                } else {
                    t_turnstile2.release();
                }
            }
        }
    });
    handles.push(handle)
}

for handle in handles {
    handle.join().unwrap();
}

Rust remark: here I decided to put some blocks of code into their own scope to take advantage of the RAII guards and to avoid manually calling drop. I don't actually know what's more idiomatic.

4.4 Dining philosophers

The most classic of classic synchronization problems that needs no introduction. As it turns out, there's a nice implementation detailed in an earlier version of The Rust Programming Language book here, but I decided to do it the way the book structures it. I did copy how the solution uses iterators to map over the philosophers vector and spawn threads that way.

Rust implementation of the naive solution:

fn left<T>(i: T) -> T {
    i
}

fn right(i: usize, len: usize) -> usize {
    (i + 1) % len
}

fn think(p: &str) {
    println!("{} is thinking", p);
//    thread::sleep(time::Duration::from_secs(3));
}

fn eat(p: &str) {
    println!("{} is eating", p);
//    thread::sleep(time::Duration::from_secs(3));
}

fn get_forks(i: usize, forks: &Arc<Vec<Semaphore>>, name: &str) {
    println!("{} is attempting to get the right fork", name);
    forks[right(i, forks.len())].acquire();
    println!("{} is attempting to get the left fork", name);
    forks[left(i)].acquire();
}

fn put_forks(i: usize, forks: &Arc<Vec<Semaphore>>, name: &str) {
    println!("{} is releasing the right fork", name);
    forks[right(i, forks.len())].release();
    println!("{} is releasing the left fork", name);
    forks[left(i)].release();
}

fn main() {
    let forks: Vec<_> = (0..5).map(|_x| Semaphore::new(1)).collect();
    let forks = Arc::new(forks);

    let philosophers = vec![
        "Lev Landau",
        "Lars Onsager",
        "Chien-Shiung Wu",
        "Arnold Sommerfeld",
        "Abdus Salam"
    ];

    let handles: Vec<_> = philosophers.into_iter().enumerate().map(|p| {
        let t_forks = forks.clone();
        thread::spawn(move || {
            let (index, name) = p;
            loop {
               think(name);
               get_forks(index, &t_forks, name);
               eat(name);
               put_forks(index, &t_forks, name)
            }
        })
    }).collect();
    for handle in handles {
        handle.join().unwrap();
    }
}

Rust remark: When I wrote left I wanted both of them to be generic, but I was running into issues with right getting the right traits, so I just used usize which is the type of the index given when calling enumerate. Note that

let forks: Vec<_> = (0..5).map(|_x| Semaphore::new(1)).collect();

is the same as

let forks = (0..5).map(|_x| Semaphore::new(1)).collect::<Vec<_>>();

but it probably comes down to readability, as opposed to one being more idiomatic than the other. I find the first one to be more readable. I also went with the names of physicists instead of philosophers: Lev Landau won the 1962 Nobel Prize in physics for superfluidity, Lars Onsager won the 1968 Nobel Prize in chemistry, but is known by physicists for solving the 2D Ising model, Chien-Shiung Wu was an experimentalist most known for showing that the weak force does not obey parity inversion symmetry, Arnold Sommerfeld made numerous contributions especially in the use of complex analysis in solving physics problems, and Abdus Salam is of course known for unifying the weak force with the EM force into the electroweak force, known as Glashow-Weinberg-Salam theory.

Of course we run into the famous deadlock situation when we run the program. The classic case where each philosopher picks up a fork, say, on their right side and then get stuck forever waiting on a fork to their left free up which will never happen because we have n philosophers and n forks.

This observation is the basis of the first hint: what if there were only n-1 philosophers at the table with n forks? If all philosophers try to pick up the fork on their right, then at least one philosopher will have immediate access to two forks, by the pigeonhole principle. This works in either direction, of course.

To implement this, all we really need is a multiplex, as we are now working based on the assumption that we have available, and are using strong semaphores, which are bounded. So we also avoid starvation.

The rust implementation is fairly straightforward, we need

let multiplex = Arc::new(Semaphore::new(4));

and then clone it into t_multiplex. Then our loop becomes

loop {
    think(name);
    let _g = t_multiplex.access();
    get_forks(index, &t_forks, name);
    eat(name);
    put_forks(index, &t_forks, name);
}

The book does the multiplex wait/signal inside get_forks/put_forks respectively, but it seems to be equivalent to just put the RAII guard right before get_forks as it will be dropped when it goes out of scope at the end of the loop and the semaphore will be signaled.

The next puzzle asks what would happen if Philosopher 0 was a lefty? Deadlock will be impossible and we can do so by enumerating the possibilities. If she tries to pick up her left fork and succeeds, she will be able to pick up her right fork, because Philosopher 1 would have picked up the one on their right. If she tries to pick up the left fork but is too slow, then she will wait on it to be available. Meanwhile, Philosopher 1 having successfully picked up the fork on his right, will then try to pick up the one on his left, which he willi be able to because Philosopher 0 is waiting on her left fork.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment