Skip to content

Instantly share code, notes, and snippets.

@tjweir
Forked from gelisam/Main.hs
Created January 5, 2018 15:11
Show Gist options
  • Save tjweir/6217480c0fcc21f1647b97fa5fbfe289 to your computer and use it in GitHub Desktop.
Save tjweir/6217480c0fcc21f1647b97fa5fbfe289 to your computer and use it in GitHub Desktop.
Distributing a computation using Cloud Haskell
-- in response to https://twitter.com/jfischoff/status/948768178070470656
{-# LANGUAGE TemplateHaskell #-}
module Main where
import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class (liftIO)
import Data.Traversable (for)
import Text.Printf (printf)
import Control.Distributed.Process (Process, NodeId, spawn)
import Control.Distributed.Process (SendPort, newChan, sendChan, receiveChan)
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process (RemoteTable, Closure)
import Control.Distributed.Process.Closure (mkClosure, remotable)
import Control.Distributed.Process.Node (initRemoteTable)
import System.Environment (getArgs, getProgName)
import Control.Distributed.Process.Backend.SimpleLocalnet (startSlave, initializeBackend, startMaster)
-- We will run a distributed map over a number of nodes.
-- Here is the computation we will run on each node:
slowIncr :: Int -> IO Int
slowIncr n = do
printf "incrementing %d...\n" n
let seconds = 3
threadDelay $ seconds * 1000 * 1000
pure (n+1)
-- We cannot serialize arbitrary IO computations, so the set of remotable
-- computations has to be known in advance. This is not a big limitation in
-- practice, because we can construct complex computations by combining a
-- finite number of primitive remotable computations; but let's keep things
-- simple for now.
--
-- The type of each remotable computation must be a specialization of the type
-- @Serializable a => a -> Process ()@, for a _concrete_ type @a@. Polymorphism
-- is supported, but messier.
--
-- Since we want our computations to produce a value but we are forced to
-- return unit, let's ask for a SendPort to which we can send that value. Yes,
-- SendPort is Serializable.
remotableSlowIncr :: (Int, SendPort Int) -> Process ()
remotableSlowIncr (n, sendPort) = do
r <- liftIO $ slowIncr n
sendChan sendPort r
-- The idea is that the remote nodes are running the same code as we are, so if
-- we ask them to run the top-level definition called "remotableSlowIncr",
-- we'll both know which computation we mean. To do that, both sides need to
-- have a table mapping top-level names to the corresponding computation.
remotable ['remotableSlowIncr]
-- The above generated __remoteTable, which is a @RemoteTable -> RemoteTable@,
-- so we need to apply it to the empty table in order to get our remote table.
-- This is supposed to make it easy to combine remote tables from different
-- modules, but I wonder why they didn't simply write a Monoid instance for
-- RemoteTable?
remoteTable :: RemoteTable
remoteTable = __remoteTable initRemoteTable
-- Now that we have a remote table, we can use it to construct a Closure, which
-- describes both which remotable computation we want to run and which
-- parameters we want to give it.
mkSlowIncrClosure :: (Int, SendPort Int) -> Closure (Process ())
mkSlowIncrClosure = $(mkClosure 'remotableSlowIncr)
-- We can finally write a higher-order function which distributes a computation
-- over a number of nodes.
distributedTraverse :: (Serializable a, Serializable b)
=> [NodeId]
-> ((a, SendPort b) -> Closure (Process ()))
-> [a]
-> Process [b]
distributedTraverse [] _ _ = error "expected at least one node"
distributedTraverse nodes cAction xs = do
receivePorts <- for (zip (cycle nodes) xs) $ \(node, x) -> do
(sendPort, receivePort) <- newChan
_ <- spawn node $ cAction (x, sendPort)
pure receivePort
traverse receiveChan receivePorts
-- One of the key ideas of Cloud Haskell (aka distributed-process) is that
-- instead of viewing your distributed application as a collection of
-- collaborating programs, you can view it as a single concurrent program in
-- which threads can be scheduled to run on other nodes instead of other CPUs.
-- This is a paradigm called "distributed concurrency".
distributedMain :: [NodeId] -> Process ()
distributedMain nodes = do
liftIO $ printf "distributing 10 computations...\n"
ys <- distributedTraverse nodes mkSlowIncrClosure [0..9]
liftIO $ printf "result: %s\n" (show ys)
-- Under the hood, of course, distributedMain is implemented by a collection of
-- collaborating programs.
main :: IO ()
main = do
args <- getArgs
case args of
["master", host, port] -> do
backend <- initializeBackend host port remoteTable
startMaster backend $ \slaveNodes -> do
if null slaveNodes
then liftIO $ printf "please start the slave nodes first\n"
else distributedMain slaveNodes
["slave", host, port] -> do
backend <- initializeBackend host port remoteTable
printf "waiting for the master\n"
startSlave backend
_ -> do
progName <- getProgName
printf "usage:\n"
printf " %s slave localhost 1234\n" progName
printf " %s slave localhost 1235\n" progName
printf " %s slave localhost 1236\n" progName
printf " %s master localhost 1237\n" progName
printf "\n"
printf "To run this on different machines, replace localhost with the ip\n"
printf "of the machine on which that node runs, so the other nodes can\n"
printf "find it. The nodes find each other using broadcast packets, which\n"
printf "are typically blocked by commercial routers, so this example should\n"
printf "work on your home's local network, but not across the internet.\n"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment