Skip to content

Instantly share code, notes, and snippets.

@gelisam
Last active January 9, 2018 23:16
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save gelisam/291224df23cdc24fb55718103667cc90 to your computer and use it in GitHub Desktop.
Save gelisam/291224df23cdc24fb55718103667cc90 to your computer and use it in GitHub Desktop.
Distributing a computation using Cloud Haskell
-- in response to https://twitter.com/jfischoff/status/948768178070470656
{-# LANGUAGE TemplateHaskell #-}
module Main where
import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class (liftIO)
import Data.Traversable (for)
import Text.Printf (printf)
import Control.Distributed.Process (Process, NodeId, spawn)
import Control.Distributed.Process (SendPort, newChan, sendChan, receiveChan)
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process (RemoteTable, Closure)
import Control.Distributed.Process.Closure (mkClosure, remotable)
import Control.Distributed.Process.Node (initRemoteTable)
import System.Environment (getArgs, getProgName)
import Control.Distributed.Process.Backend.SimpleLocalnet (startSlave, initializeBackend, startMaster)
-- We will run a distributed map over a number of nodes.
-- Here is the computation we will run on each node:
slowIncr :: Int -> IO Int
slowIncr n = do
printf "incrementing %d...\n" n
let seconds = 3
threadDelay $ seconds * 1000 * 1000
pure (n+1)
-- We cannot serialize arbitrary IO computations, so the set of remotable
-- computations has to be known in advance. This is not a big limitation in
-- practice, because we can construct complex computations by combining a
-- finite number of primitive remotable computations; but let's keep things
-- simple for now.
--
-- The type of each remotable computation must be a specialization of the type
-- @Serializable a => a -> Process ()@, for a _concrete_ type @a@. Polymorphism
-- is supported, but messier.
--
-- Since we want our computations to produce a value but we are forced to
-- return unit, let's ask for a SendPort to which we can send that value. Yes,
-- SendPort is Serializable.
remotableSlowIncr :: (Int, SendPort Int) -> Process ()
remotableSlowIncr (n, sendPort) = do
r <- liftIO $ slowIncr n
sendChan sendPort r
-- The idea is that the remote nodes are running the same code as we are, so if
-- we ask them to run the top-level definition called "remotableSlowIncr",
-- we'll both know which computation we mean. To do that, both sides need to
-- have a table mapping top-level names to the corresponding computation.
remotable ['remotableSlowIncr]
-- The above generated __remoteTable, which is a @RemoteTable -> RemoteTable@,
-- so we need to apply it to the empty table in order to get our remote table.
-- This is supposed to make it easy to combine remote tables from different
-- modules, but I wonder why they didn't simply write a Monoid instance for
-- RemoteTable?
remoteTable :: RemoteTable
remoteTable = __remoteTable initRemoteTable
-- Now that we have a remote table, we can use it to construct a Closure, which
-- describes both which remotable computation we want to run and which
-- parameters we want to give it.
mkSlowIncrClosure :: (Int, SendPort Int) -> Closure (Process ())
mkSlowIncrClosure = $(mkClosure 'remotableSlowIncr)
-- We can finally write a higher-order function which distributes a computation
-- over a number of nodes.
distributedTraverse :: (Serializable a, Serializable b)
=> [NodeId]
-> ((a, SendPort b) -> Closure (Process ()))
-> [a]
-> Process [b]
distributedTraverse [] _ _ = error "expected at least one node"
distributedTraverse nodes cAction xs = do
receivePorts <- for (zip (cycle nodes) xs) $ \(node, x) -> do
(sendPort, receivePort) <- newChan
_ <- spawn node $ cAction (x, sendPort)
pure receivePort
traverse receiveChan receivePorts
-- One of the key ideas of Cloud Haskell (aka distributed-process) is that
-- instead of viewing your distributed application as a collection of
-- collaborating programs, you can view it as a single concurrent program in
-- which threads can be scheduled to run on other nodes instead of other CPUs.
-- This is a paradigm called "distributed concurrency".
distributedMain :: [NodeId] -> Process ()
distributedMain nodes = do
liftIO $ printf "distributing 10 computations...\n"
ys <- distributedTraverse nodes mkSlowIncrClosure [0..9]
liftIO $ printf "result: %s\n" (show ys)
-- Under the hood, of course, distributedMain is implemented by a collection of
-- collaborating programs.
main :: IO ()
main = do
args <- getArgs
case args of
["master", host, port] -> do
backend <- initializeBackend host port remoteTable
startMaster backend $ \slaveNodes -> do
if null slaveNodes
then liftIO $ printf "please start the slave nodes first\n"
else distributedMain slaveNodes
["slave", host, port] -> do
backend <- initializeBackend host port remoteTable
printf "waiting for the master\n"
startSlave backend
_ -> do
progName <- getProgName
printf "usage:\n"
printf " %s slave localhost 1234\n" progName
printf " %s slave localhost 1235\n" progName
printf " %s slave localhost 1236\n" progName
printf " %s master localhost 1237\n" progName
printf "\n"
printf "To run this on different machines, replace localhost with the ip\n"
printf "of the machine on which that node runs, so the other nodes can\n"
printf "find it. The nodes find each other using broadcast packets, which\n"
printf "are typically blocked by commercial routers, so this example should\n"
printf "work on your home's local network, but not across the internet.\n"
@gelisam
Copy link
Author

gelisam commented Jan 5, 2018

If SimpleLocalnet isn't working on your network (the master complains that you need to start the slaves first even though you did), try this reimplementation of the SimpleLocalnet API which uses a hardcoded list of nodes instead of broadcast packets.

@gelisam
Copy link
Author

gelisam commented Jan 5, 2018

One important aspect I did not cover is error-handling. When running distributed computations, there is always the possibility that some of the remote nodes will crash or that we will lose communication with them. Cloud Haskell supports sophisticated ways of dealing with those error conditions, but for simplicity I did not cover any of them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment