Skip to content

Instantly share code, notes, and snippets.

@CMCDragonkai
Last active January 6, 2019 22:55
Show Gist options
  • Save CMCDragonkai/631ee428ddcf1222b8b5 to your computer and use it in GitHub Desktop.
Save CMCDragonkai/631ee428ddcf1222b8b5 to your computer and use it in GitHub Desktop.
Haskell: Channels - an explanation from Tackling the Awkward Squad http://research.microsoft.com/en-us/um/people/simonpj/papers/marktoberdorf/mark.pdf
import Control.Concurrent.MVar
type Channel a = (ReadPort a, WritePort a)
type ReadPort a = MVar (Stream a)
type WritePort a = MVar (Stream a)
type Stream a = MVar (Item a)
data Item a = MkItem a (Stream a)
newChan :: IO (Channel a)
newChan = do
readPort <- newEmptyMVar
writePort <- newEmptyMvar
stream <- newEmptyMVar
putMVar readPort stream
putMVar writePort stream
return (readPort, writePort)
putChan :: Channel a -> a -> IO ()
putChan (readPort, writePort) item = do
streamA <- takeMVar writePort
streamB <- newEmptyMVar
putMVar writePort streamB
putMVar streamA (MkItem item streamB)
getChan :: Channel a -> IO ()
getChan (readPort, writePort) = do
streamA <- takeMVar readPort
MkItem item streamB <- readMVar streamA -- pattern matching!
putMVar readPort streamB
return item
dupChan :: Channel a -> IO (Channel a)
dupChan (readPort, writePort) = do
newReadPort <- newEmptyMVar
stream <- readMVar writePort
putMVar newReadPort stream
return (newReadPort, writePort)
-- takeMVar extracts the value and removes the contents of the MVar
-- putMVar puts the value into the contents of the MVar
-- readMVar reads the value and leavs teh contents of the MVar intact

Channels allow you to implement communicating sequential processes (CSP). This example shows how an unbounded buffered channel works. Using buffered channels implies an asynchronous IO. In production you should be using: https://hackage.haskell.org/package/base-4.7.0.2/docs/Control-Concurrent-Chan.html

A channel is a FIFO pipe with 2 ports on it. A Read and Write port. These 2 ports are implemented as an MVar pointer.

We then create another MVar pointer called Stream, this pointer points to a box. This box sits in memory. This pointer is assigned to both Read and Write ports. At the beginning each Read and Write port holds an alias/pointer to the same box.

Our type specifications indicate that this Stream box can either contain nothing (emptiness) or a pair of an item and an MVar pointer. This data structure resembles a linked list using recursively nested MVar pointers/boxes.

A channel can be read and written by any thread. Hence a channel looks kind of like a pipe in the shape of a loop. A thread can write to the pipe and read back from the same pipe, getting back the value it put into the channel.

When a thread writes to the channel's Write port. It takes the Stream MVar pointer A out, and replaces it with a new MVar pointer B. It then assigns a pair of the item to be written and the pointer B to Stream pointer A. This means the Write port now holds a pointer B which is pointing to the internally nested box inside pointer A's box. This allows any subsequent writes to enqueue items in constant time to the channel by simply always referencing the most deeply nested box.

When a thread reads from the channel's Read port. It takes the Stream MVar pointer A out, extracts its contents out into a pair of the item and pointer B. If during the extraction the contents were empty, a read operation will block until there is some contents to be extracted. This is ensured by the MVar implementation. It then puts the extracted pointer B back into the Read port, and returns the item. For the purposes of multicast (pub/sub), when extracting the contents of the Stream MVar pointer, it should not remove everything, but put the values back into the box. This allows duplciate channels to read independently. Once a MVar pointer and box goes out of scope, it can be garbage collected. Because it put the extracted pointer B into the Read port, on subsequent reads, it can extract the contents in constant time. Thus the Read port always holds a reference to the next "to-be-opened" box.

                                         FIFO Queue with reversible read                                        
                              Implemented using recursively nested pointers (MVars)                        
                                                                                                           
                                       +------+------------------+-+                                       
                                       |      +------------------+ |                                       
                                       |      |      +---------+ | |                                       
                                       |      |      |         | | |                                       
                       Dequeue <-----+ | A    | B    | C    +--+ | | <-----+ Enqueue                       
                                       |      |      |         | | |                                       
                                       |      |      +---------+ | |                                       
                                       |      +------------------+ |                                       
                                       +------+------------------+-+                                       
                                                     |                                                     
                                                     |                                                     
                                                     |                                                     
                                             +-------+-------+                                             
                                          +  |               |  ^                                          
Pointer to the most opened pointer (MVar) |  |               |  | Pointer to the most closed pointer (MVar)
                                          v  |               |  +                                          
                                             |               |                                             
                                  +-------------------------------------+                                  
                                  |          |               |          |                                  
                                  |          |               |          |                                  
                                  |      +---v---+       +---v---+      |                                  
                                  |      |       |       |       |      |                                  
                                  |      | Read  |       | Write |      |                                  
                                  |      |       |       |       |      |                                  
                                  |      +---+---+       +---^---+      |                                  
                                  |          |               |          |                                  
                                  |          |    Channel    |          |                                  
                                  +-------------------------------------+                                  
                                             |               |                                             
                                             |               |                                             
                          Thread B           |               |           Thread A                          
                                             |               |                                             
                         +---------+         |               |         +---------+                         
                         |         |         |               |         |         |                         
         Sequence  1     |         |         |               +---------+ Write A |                         
                         |         |         |               |         |         |                         
                   2     | Read A  <---------+               |         |         |                         
                         |         |         |               |         |         |                         
                   3     |         |         |               +---------+ Write B |                         
                         |         |         |               |         |         |                         
                   4     |         |         |               +---------+ Write C |                         
                         |         |         |                         |         |                         
                   5     | Read B  <---------+                         |         |                         
                         |         |         |                         |         |                         
                   6     |         |         +-------------------------> Read C  |                         
                         |         |                                   |         |                         
                         +---------+                                   +---------+                         

Duplicate channels can be easily made in order to setup a multicast/pub-sub topology. Basically all that needs to be done, is to create another channel with 2 ports Read and Write. The Write port will be the exact same port as the original Write port on the original Channel. This allows all writes from any duplicate channel to be written to the same Write port. The Read port however is a completely independent Read port. This allows independent Reads from different threads. Imagine 2 threads with 2 channels, thread A might read once, and would change its Read port to hold the next nested MVar pointer, this change needs to be independent from thread B's channel. Otherwise when thread B tries to read, it would read from the second pointer and would not be able to access the first pointer's item. If the Read port was independent, then reads coming from thread A would be completely separated from reads happening in thread B. You must also make sure that any read from any box, does not remove all of the box's contents, otherwise reads from one channel, would destructively remove the contents of box that it read, preventing other threads from reading the same items.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment