Skip to content

Instantly share code, notes, and snippets.

@joubertnel
Created May 31, 2015 16:37
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save joubertnel/e8a60da2d74372181d2c to your computer and use it in GitHub Desktop.
Save joubertnel/e8a60da2d74372181d2c to your computer and use it in GitHub Desktop.
CSP channels implementation with semaphores
CMSC 421 Operating Systems Lecture Notes
(c) 1994 Howard E. Motteler
Message Passing
================
The goal of critical sections, monitors, etc. is to allow
processes to _communicate_
Two general approaches to communication
- shared memory (with sem's or monitors)
- messages
Distributed systems must use message passing
Typical message passing operations are
- send(destination, message)
- receive(source)
Messages can be
- sent to specific destinations
- sent on `channels'
- sent one at a time (as in CSP's rendezvous protocol)
- queued at source, dest, or in-between (e.g., email)
- point to point (one sender and one receiver)
- broadcast (sent to multiple destinations)
- acknowledged or unacknowledged
Destinations can be
- internet addresses
- process id's
- any process doing a receive()
Channels can be
- static (like programming language variables)
- dynamic (ie., they need an `open' operation)
- 1-way or 2-way
Example: Bounded Buffer with message passing
---------------------------------------------
Assumptions:
- Message are buffered
- the message buffer acts as the queue
- buffer size is N
Producer:
while (alive) {
generate an item V;
receive(consumer); // get `empty' from the consumer
send(consumer, V); // send data to the consumer
}
Consumer:
for (i=0; i<N; i++)
send(producer, dummy); // send N `empties' to producer
while (alive) {
V = receive(producer); // get value from producer
send(producer, dummy); // send an `empty' back
use the item V;
}
Q: Why not have
- Producer: send(consumer, V);
- Consumer: V = receive(producer);
A: This should give *unbounded* buffer
(unbounded except by system limitations)
Example: CSP
-------------
CSP (`communicating sequential processes') is a simple
message passing language
CSP processes communicate through _channels_
CSP channels are
- statically allocated (declared like variables)
- point to point
- one way
CSP processes do not share memory space, and can run on either
one or several processors
Suppose C is a channel variable shared by processes P1 and P2
- C ! x means send x on channel C
- C ? y means receive a value on channel C and save in y
P1 P2
: :
C ! x C ? y
: :
If P1 does C ! x first
- P1 waits for C ? y
- both P1 and P2 continue
- y contains the value from x
If P2 does C ? y first
- P2 waits for C ! x,
- both P1 and P2 continue
- y contains the value from x
CSP communication is a _rendezvous_, and no queues are needed
(In practice, this works best over relatively short distances)
CSP has a special construct ``ALT'' that makes it possible to
wait for input from more than one channel at a time
ALT
C1 ? x # wait for something on C1
C3 ! x # do this if we get something on C1
C2 ? x # wait for something on C2
C3 ! x # do this if we get something on C2
This example acts as a `merge', taking input from either C1 or
C2 at any time, and sending the value received out on C3
CSP channels can be implemented with sem's and shared memory
Each channel needs two semaphores S1 and S2, both initially 0,
and a shared memory location, L
C ! x C ? y
L = x; wait(S1)
signal(S1) y = L;
wait(S2) signal(S2)
Example: bounded buffer in CSP
-------------------------------
Producer ----> Buffer ----> Consumer
process channel process channel process
`pchan' `cchan'
Producer:
WHILE true DO BEGIN
generate x
pchan ! x // send x to the buffer
END
Consumer:
WHILE true DO BEGIN
cchan ? y // receive y from the buffer
use y
END
Buffer:
INTEGER buf[N], in=0, out=0;
WHILE true DO
ALT
// test buffer not full and sender ready:
(in < out + N) & pchan ? buf[in mod N]
in := in + 1
// test buffer not empty and receiver ready:
(out < in) & cchan ! buf[out mod N]
out := out + 1
Unfortunately, the buffer here is busy-waiting; if it is not
running on a seperate processor, a sleep could be added to the
while-loop
The actual implementations of CSP (such as the `Occam' language)
may not provide the `guarded send' that we used here
Equivalence of IPC Primitives
==============================
Some IPC primitives can be used to simulate others
- CSP channels with with sem's and shared memory
- Monitors with semaphores (example in text)
- Message passing with sem's and shared memory
- Semaphores with messages
Messages with Semaphores and Shared Memory
-------------------------------------------
Most forms of message passing can be simulated with semaphores and
shared memory
Message Passing implemented with semaphores is similar to the
bounded buffer example:
P1 --> ||||||| --> P1
producer buffer consumer
or sender or receiver
The buffer in a message passing system may be
- part of the sender
- part of a separate process
- part of the receiver
The details of the semaphore/shared memory simulation depend on
what sort of message passing system we want
Semaphores with messages
-------------------------
Semaphores can be implemented in a message passing system, by using
a synchronization process.
Note that since semaphores are usually used with shared memory, this
example would not bee not too useful for a true distributed system.
Further, in any shared memory system, there are probably easier ways
to implement semaphores.
Assumptions:
- messages are queued
- message are sent to process id's
(example for a single semaphore)
signal() {
msg[] = [SIG, pid];
send(synch, msg);
}
wait() {
msg[] = [WAIT, pid];
send(synch, msg);
receive();
}
synch process:
while (alive) {
msg = receive();
fnx = msg[1];
pid = msg[2];
if (fnx == SIG)
s = s + 1;
if (s <= 0) {
get p from list L;
send(p, dummy);
}
}
else if (fnx == WAIT) {
s = s - 1;
if (s < 0)
add pid to L;
else
send(pid);
}
}
Q: do the synch process or signal and wait procedures
need mutual exclusion?
A: No, since the synch process only receives and processes one
message at a time.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment