Last active
December 30, 2015 20:09
-
-
Save roman/7878922 to your computer and use it in GitHub Desktop.
Possible memory leak in Maestro.Distributed.Process.Platform.Supervisor v2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE ScopedTypeVariables #-} | |
module CloudUtil | |
( module Control.Distributed.Process | |
, cloud | |
, cloudWithRemotable | |
, cloudWithRemotableAsync) | |
where | |
import Control.Exception (try, SomeException, bracket) | |
import Control.Concurrent (threadDelay) | |
import Control.Monad (void, forM_) | |
import qualified Control.Concurrent.MVar as MVar | |
import Control.Distributed.Process hiding (try, bracket) | |
import qualified Control.Distributed.Process.Node as Cloud | |
import qualified Network.Transport as Trans (Transport, closeTransport) | |
import qualified Network.Transport.Chan as Trans | |
-- import qualified Network.Transport.TCP as Trans | |
-- import qualified Maestro.Client.SupervisorTest.Helpers as Helpers | |
mkLocalNode :: (RemoteTable -> RemoteTable) -> IO (Cloud.LocalNode, Trans.Transport) | |
mkLocalNode remoteTable = do | |
trans <- Trans.createTransport | |
-- Right trans <- Trans.createTransport "localhost" "557766" Trans.defaultTCPParameters | |
localNode <- Cloud.newLocalNode trans $ remoteTable Cloud.initRemoteTable | |
return (localNode, trans) | |
cloudWithRemotable | |
:: (RemoteTable -> RemoteTable) -> Process b -> IO b | |
cloudWithRemotable remoteTable cproc = do | |
(node, trans) <- mkLocalNode remoteTable | |
resultVar <- MVar.newEmptyMVar | |
Cloud.runProcess node (cproc >>= liftIO . MVar.putMVar resultVar) | |
threadDelay 100000 | |
result <- MVar.takeMVar resultVar | |
(_ :: Either SomeException ()) <- try $ Cloud.closeLocalNode node | |
(_ :: Either SomeException ()) <- try $ Trans.closeTransport trans | |
return result | |
cloudWithRemotableAsync :: (RemoteTable -> RemoteTable) -> Process () -> IO () | |
cloudWithRemotableAsync remoteTable cproc = | |
bracket (mkLocalNode remoteTable) | |
(\(node, trans) -> do | |
(_ :: Either SomeException ()) <- try $ Cloud.closeLocalNode node | |
(_ :: Either SomeException ()) <- try $ Trans.closeTransport trans | |
return ()) | |
(\(node, _) -> Cloud.runProcess node $ do | |
cproc | |
forM_ [1..15] $ \n -> liftIO $ do | |
liftIO $ putStrLn "Waiting..." | |
let meh = n * 10 | |
threadDelay 1000000) | |
cloud :: Process b -> IO b | |
cloud = cloudWithRemotable id | |
cloudAsync :: Process () -> IO () | |
cloudAsync = cloudWithRemotableAsync id |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE TemplateHaskell #-} | |
module SupervisorTest.Helpers where | |
import Control.Monad.Trans (MonadIO(..)) | |
import qualified Control.Distributed.Process as Cloud | |
import qualified Control.Distributed.Process.Closure as Cloud | |
testChild :: Int -> Cloud.Process () | |
testChild n = liftIO $ print n | |
$(Cloud.remotable ['testChild]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE BangPatterns #-} | |
{-# LANGUAGE TemplateHaskell #-} | |
module SupervisorTest where | |
import Control.Monad (forever, when, void) | |
import Control.Monad.Trans (MonadIO(..), lift) | |
import Control.Concurrent (forkIO, killThread, threadDelay) | |
import System.Mem | |
import Maestro.Client.CloudUtil (cloud, cloudWithRemotableAsync) | |
import qualified Control.Distributed.Process as Cloud | |
import qualified Control.Distributed.Process.Closure as Cloud | |
import qualified Control.Distributed.Process.Platform.Time as CT | |
import qualified Control.Distributed.Process.Platform.Supervisor as SP | |
import qualified Control.Distributed.Process.Platform.ManagedProcess as MP | |
import Maestro.Client.SupervisorTest.Helpers | |
main = viaClosure 3000 | |
viaClosure :: Int -> IO () | |
viaClosure nTimes = do | |
-- tid <- forkIO $ forever $ threadDelay 100 >> performGC | |
cloudWithRemotableAsync __remoteTable $ do | |
sup <- SP.start (SP.RestartOne SP.defaultLimits) [] | |
loop sup $ nTimes * 2 | |
-- killThread tid | |
where | |
toSpec !key !childStart = | |
SP.ChildSpec { | |
SP.childKey = key | |
, SP.childType = SP.Worker | |
, SP.childRestart = SP.Transient | |
, SP.childStop = SP.TerminateImmediately | |
, SP.childStart = childStart | |
, SP.childRegName = Nothing | |
} | |
loop _ 0 = return () | |
loop !sup !n | |
| even n = do | |
childStart <- SP.toChildStart ($(Cloud.mkClosure 'testChild) n) | |
let key = show n | |
childSpec = toSpec key childStart | |
SP.ChildAdded (SP.ChildRunning _) <- SP.startChild sup childSpec | |
loop sup $ n - 1 | |
| otherwise = do | |
let key = show $ n + 1 | |
-- SP.statistics sup >>= liftIO . print | |
SP.terminateChild sup key >>= liftIO . print | |
SP.deleteChild sup key >>= liftIO . print | |
-- SP.statistics sup >>= liftIO . print | |
-- liftIO $ threadDelay 1000000 | |
loop sup $ n - 1 | |
viaSend :: Int -> IO () | |
viaSend nTimes = cloud $ do | |
sup <- SP.start (SP.RestartOne SP.defaultLimits) [] | |
cStart <- SP.toChildStart testChild | |
go sup cStart --childStart | |
where | |
toSpec !key !childStart = | |
SP.ChildSpec { | |
SP.childKey = key | |
, SP.childType = SP.Worker | |
, SP.childRestart = SP.Transient | |
, SP.childStop = SP.TerminateImmediately | |
, SP.childStart = childStart | |
, SP.childRegName = Nothing | |
} | |
testChild :: Cloud.Process () | |
testChild = | |
Cloud.receiveWait [Cloud.match printInt] | |
where | |
printInt :: Int -> Cloud.Process () | |
printInt !n = liftIO $ print n | |
go !sup !childStart = loop sup $ nTimes * 2 | |
where | |
loop _ 0 = return () | |
loop !sup !n | |
| even n = do | |
let key = show n | |
childSpec = toSpec key childStart | |
SP.ChildAdded (SP.ChildRunning !cpid) <- SP.startChild sup childSpec | |
Cloud.send cpid n | |
loop sup $ n - 1 | |
| otherwise = do | |
let key = show $ n + 1 | |
SP.terminateChild sup key >>= liftIO . print | |
SP.deleteChild sup key >>= liftIO . print | |
SP.statistics sup >>= liftIO . print | |
liftIO $ threadDelay 1000000 | |
loop sup $ n - 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment