Skip to content

Instantly share code, notes, and snippets.

@roman
Last active December 30, 2015 20:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save roman/7878922 to your computer and use it in GitHub Desktop.
Save roman/7878922 to your computer and use it in GitHub Desktop.
Possible memory leak in Maestro.Distributed.Process.Platform.Supervisor v2
{-# LANGUAGE ScopedTypeVariables #-}
module CloudUtil
( module Control.Distributed.Process
, cloud
, cloudWithRemotable
, cloudWithRemotableAsync)
where
import Control.Exception (try, SomeException, bracket)
import Control.Concurrent (threadDelay)
import Control.Monad (void, forM_)
import qualified Control.Concurrent.MVar as MVar
import Control.Distributed.Process hiding (try, bracket)
import qualified Control.Distributed.Process.Node as Cloud
import qualified Network.Transport as Trans (Transport, closeTransport)
import qualified Network.Transport.Chan as Trans
-- import qualified Network.Transport.TCP as Trans
-- import qualified Maestro.Client.SupervisorTest.Helpers as Helpers
mkLocalNode :: (RemoteTable -> RemoteTable) -> IO (Cloud.LocalNode, Trans.Transport)
mkLocalNode remoteTable = do
trans <- Trans.createTransport
-- Right trans <- Trans.createTransport "localhost" "557766" Trans.defaultTCPParameters
localNode <- Cloud.newLocalNode trans $ remoteTable Cloud.initRemoteTable
return (localNode, trans)
cloudWithRemotable
:: (RemoteTable -> RemoteTable) -> Process b -> IO b
cloudWithRemotable remoteTable cproc = do
(node, trans) <- mkLocalNode remoteTable
resultVar <- MVar.newEmptyMVar
Cloud.runProcess node (cproc >>= liftIO . MVar.putMVar resultVar)
threadDelay 100000
result <- MVar.takeMVar resultVar
(_ :: Either SomeException ()) <- try $ Cloud.closeLocalNode node
(_ :: Either SomeException ()) <- try $ Trans.closeTransport trans
return result
cloudWithRemotableAsync :: (RemoteTable -> RemoteTable) -> Process () -> IO ()
cloudWithRemotableAsync remoteTable cproc =
bracket (mkLocalNode remoteTable)
(\(node, trans) -> do
(_ :: Either SomeException ()) <- try $ Cloud.closeLocalNode node
(_ :: Either SomeException ()) <- try $ Trans.closeTransport trans
return ())
(\(node, _) -> Cloud.runProcess node $ do
cproc
forM_ [1..15] $ \n -> liftIO $ do
liftIO $ putStrLn "Waiting..."
let meh = n * 10
threadDelay 1000000)
cloud :: Process b -> IO b
cloud = cloudWithRemotable id
cloudAsync :: Process () -> IO ()
cloudAsync = cloudWithRemotableAsync id
{-# LANGUAGE TemplateHaskell #-}
module SupervisorTest.Helpers where
import Control.Monad.Trans (MonadIO(..))
import qualified Control.Distributed.Process as Cloud
import qualified Control.Distributed.Process.Closure as Cloud
testChild :: Int -> Cloud.Process ()
testChild n = liftIO $ print n
$(Cloud.remotable ['testChild])
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE TemplateHaskell #-}
module SupervisorTest where
import Control.Monad (forever, when, void)
import Control.Monad.Trans (MonadIO(..), lift)
import Control.Concurrent (forkIO, killThread, threadDelay)
import System.Mem
import Maestro.Client.CloudUtil (cloud, cloudWithRemotableAsync)
import qualified Control.Distributed.Process as Cloud
import qualified Control.Distributed.Process.Closure as Cloud
import qualified Control.Distributed.Process.Platform.Time as CT
import qualified Control.Distributed.Process.Platform.Supervisor as SP
import qualified Control.Distributed.Process.Platform.ManagedProcess as MP
import Maestro.Client.SupervisorTest.Helpers
main = viaClosure 3000
viaClosure :: Int -> IO ()
viaClosure nTimes = do
-- tid <- forkIO $ forever $ threadDelay 100 >> performGC
cloudWithRemotableAsync __remoteTable $ do
sup <- SP.start (SP.RestartOne SP.defaultLimits) []
loop sup $ nTimes * 2
-- killThread tid
where
toSpec !key !childStart =
SP.ChildSpec {
SP.childKey = key
, SP.childType = SP.Worker
, SP.childRestart = SP.Transient
, SP.childStop = SP.TerminateImmediately
, SP.childStart = childStart
, SP.childRegName = Nothing
}
loop _ 0 = return ()
loop !sup !n
| even n = do
childStart <- SP.toChildStart ($(Cloud.mkClosure 'testChild) n)
let key = show n
childSpec = toSpec key childStart
SP.ChildAdded (SP.ChildRunning _) <- SP.startChild sup childSpec
loop sup $ n - 1
| otherwise = do
let key = show $ n + 1
-- SP.statistics sup >>= liftIO . print
SP.terminateChild sup key >>= liftIO . print
SP.deleteChild sup key >>= liftIO . print
-- SP.statistics sup >>= liftIO . print
-- liftIO $ threadDelay 1000000
loop sup $ n - 1
viaSend :: Int -> IO ()
viaSend nTimes = cloud $ do
sup <- SP.start (SP.RestartOne SP.defaultLimits) []
cStart <- SP.toChildStart testChild
go sup cStart --childStart
where
toSpec !key !childStart =
SP.ChildSpec {
SP.childKey = key
, SP.childType = SP.Worker
, SP.childRestart = SP.Transient
, SP.childStop = SP.TerminateImmediately
, SP.childStart = childStart
, SP.childRegName = Nothing
}
testChild :: Cloud.Process ()
testChild =
Cloud.receiveWait [Cloud.match printInt]
where
printInt :: Int -> Cloud.Process ()
printInt !n = liftIO $ print n
go !sup !childStart = loop sup $ nTimes * 2
where
loop _ 0 = return ()
loop !sup !n
| even n = do
let key = show n
childSpec = toSpec key childStart
SP.ChildAdded (SP.ChildRunning !cpid) <- SP.startChild sup childSpec
Cloud.send cpid n
loop sup $ n - 1
| otherwise = do
let key = show $ n + 1
SP.terminateChild sup key >>= liftIO . print
SP.deleteChild sup key >>= liftIO . print
SP.statistics sup >>= liftIO . print
liftIO $ threadDelay 1000000
loop sup $ n - 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment