Skip to content

Instantly share code, notes, and snippets.

@jcarrano
Last active February 14, 2024 16:51
Show Gist options
  • Save jcarrano/9c9f4bbddd4cc978cd7cc04e2f4c18a6 to your computer and use it in GitHub Desktop.
Save jcarrano/9c9f4bbddd4cc978cd7cc04e2f4c18a6 to your computer and use it in GitHub Desktop.
Program Evaluation and Review Technique (PERT) Montecarlo Simulator (Python and Haskell)
SPEC-1 7 -2 1
RES-1 14 -4 5
PROTO-1 RES-1 14 -4 10
SPEC-2 SPEC-1, PROTO-1 4 -1 2
DSN-1 SPEC-2 30 -10 15
ORD-1 SPEC-2, DSN-1 2 -1 3
PROD-1 DSN-1 30 -5 20
DSN-2 SPEC-1 7 -3 1
DSN-3 SPEC-1 3 -1 1
PROD-2 DSN-2 45 -10 15
TST-1 PROD-1, PROD-2, ORD-1 7 -3 7
DSN-4 TST-1 15 -9 10
PROD-3 TST-1 15 -11 7
ORD-2 DSN-4 2 -1 3
PROD-4 DSN-4 30 -5 20
OPT-1 TST-1 15 -5 10
TST-2 PROD-3, PROD-4, ORD-2 7 -2 7
PROD-5 DSN-3 7 -4 1
TST-3 TST-2, OPT-1,PROD-5 7 -4 5
{-# LANGUAGE DeriveGeneric #-}
import GHC.Generics
import Data.Maybe
import Control.Monad
import System.Environment
import System.IO
import qualified Data.Vector as V
import qualified Data.Text as T
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy as BL
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Random (MonadRandom, sample)
import Data.Random.Distribution.Triangular
import Data.Csv
data Activity = Activity { act_name :: ActivityName
, dependencies :: Set.Set ActivityName
, mode :: Double
, upper_bound :: Double
, lower_bound :: Double
} deriving (Generic, Show)
type ActivityName = String
splitcomma x = map ((B.pack).(T.unpack).(T.strip)) $ filter (not.(T.null)) $ T.splitOn (T.pack ",") (T.pack $ B.unpack x)
instance (Ord a, FromField a) => FromField (Set.Set a) where
parseField x = fmap Set.fromList $ traverse parseField (splitcomma x)
instance FromRecord Activity
type PERT = Map.Map ActivityName Activity
type PERTLayered = [Set.Set ActivityName]
contains :: PERTLayered -> ActivityName -> Bool
contains layers key = or $ map (Set.member key) layers
make_dag :: PERT -> Maybe (PERTLayered, Set.Set ActivityName)
make_dag pert = make_dag' pert [] (Map.keysSet pert)
make_dag' :: PERT -> PERTLayered -> Set.Set ActivityName -> Maybe (PERTLayered, Set.Set ActivityName)
make_dag' pending_activities layers potential_terminal_acts =
if Map.null pending_activities
then Just (reverse layers, potential_terminal_acts)
else let
processable = Map.filter ((all (layers `contains`)).dependencies) pending_activities
pending2 = Map.difference pending_activities processable
layers2 = (Map.keysSet processable):layers
processable_deps = Set.unions $ map dependencies $ Map.elems processable
terminals2 = Set.filter (not.(`Set.member` processable_deps)) potential_terminal_acts
in if Map.null processable
then Nothing
else make_dag' pending2 layers2 terminals2
vadd = V.zipWith (+)
vmax = V.zipWith max
simulate :: MonadRandom m => PERT -> (PERTLayered, Set.Set ActivityName) -> Int -> m (V.Vector Double)
simulate pert (layers, targets) runs = do
timings <- foldM sim_layer Map.empty layers
return $ foldl1 vmax $ map (timings Map.!) (Set.toList targets)
where
sim_act finish_times act_name = do
duration <- V.replicateM runs $ sample $ Triangular (upper_bound act) (mode act) (lower_bound act)
return $ if Set.null deps then duration
else duration `vadd` (foldl1 vmax $ Set.map (finish_times Map.!) $ deps)
where
act = pert Map.! act_name
deps = dependencies act
sim_layer finish_times layer = do
new_times <- mapM (\s -> (sim_act finish_times s)>>=(\ft -> return (s, ft))) $ Set.toList layer
return $ Map.union (Map.fromList new_times) finish_times
mean v = (sum v) / (fromIntegral $ length v)
std v = let vm = mean v in sqrt $ (sum $ V.map (\x->(x-vm)*(x-vm)) v) / (fromIntegral $ length v)
binsearch fx tol (xlow, xhigh) = let midpoint = (xhigh+xlow)/2
fxm = fx midpoint
new_lo = if fxm > 0 then xlow else midpoint
new_hi = if fxm > 0 then midpoint else xhigh
in if xhigh-xlow < tol then midpoint
else binsearch fx tol (new_lo, new_hi)
estimate samples confidence = let n_confidence = confidence * (fromIntegral $ length samples)
count min_duration acc duration = acc+(if duration<min_duration then 1 else 0)
in binsearch (\x -> (foldl (count x) 0 samples)-n_confidence) 1 (minimum samples, maximum samples)
main = do
args <- getArgs
csvData <- BL.readFile $ head args
case decode NoHeader csvData of
Left err -> putStrLn err
Right v -> sim_results $ Map.fromList $ map record2tuple $ V.toList (v::(V.Vector Activity))
where
sim_results pert = case make_dag pert of
Just (layered, terminals) -> do
sim_data <- simulate pert (layered, terminals) 1000000
putStrLn $ "Sigma = " ++ show (std sim_data) ++ ", mu = " ++ show (mean sim_data)
putStrLn $ "Project duration: " ++ show (round $ estimate sim_data 0.95) ++ " days with 95% confidence"
Nothing -> putStrLn "Graph is not DAG"
record2tuple a = (act_name a, a {upper_bound=mode a + upper_bound a,
lower_bound=mode a + lower_bound a})
#!/usr/bin/env python
# pert.py
# Montecarlo simulation of a PERT graph in < 100 lines.
# Juan I Carrano, 2013 2024
# Input format is a CSV with: ACTIVITY-NAME,DEPENDENCIES,Median-time,Delta-minus,Delta-plus
from functools import reduce
from itertools import chain
import numpy as np
class Activity:
def __init__(self, dep, mode_time, delta0, delta1):
self.dep = dep
self.mode_time = mode_time
self.best_time = mode_time + delta0
self.worst_time = mode_time + delta1
def simulate(self, n = None):
return np.random.triangular(self.best_time, self.mode_time,
self.worst_time, n)
class GraphError(Exception): ...
class Pert:
def __init__(self):
self.activities = {}
self.pending_activities = {}
self.terminals = None
def add_activity(self, *args):
self.pending_activities[name] = Activity(*args[1:])
def make_dag(self):
while self.pending_activities:
insertable_activities = [(k, v) for k, v in
self.pending_activities.items()
if all(d in self.activities for d in v.dep)]
if not insertable_activities:
raise GraphError("Circular or incomplete dependencies.")
self.activities.update(insertable_activities)
for k, v in insertable_activities:
del self.pending_activities[k]
self.terminals = self.activities.keys() - set(chain.from_iterable(
(v.dep for v in self.activities.values())))
def run_sim(self, N = None):
durations = {k: v.simulate(N or 1) for k, v in self.activities.items()}
finish_times = {}
for k, v in self.activities.items():
finish_times[k] = durations[k] + reduce(np.maximum,
(finish_times[d] for d in v.dep), 0)
return reduce(np.maximum, (finish_times[a] for a in self.terminals), 0)
if __name__ == '__main__':
import csv, sys
from matplotlib import pyplot as plt
with open(sys.argv[1]) as f:
table = csv.reader(f)
pert = Pert()
for entry in table:
name, _deps, _t0, _delta0, _delta1 = entry
deps = _deps.replace(',', ' ').split()
pert.add_activity(name, deps, int(_t0), int(_delta0), int(_delta1))
pert.make_dag()
results = pert.run_sim(int(sys.argv[2]) if len(sys.argv) > 2 else 1000000)
n, bins, patches = plt.hist(results, 150, density=True, facecolor="#BEE34F", edgecolor = "#AAAAAA")
sigma, mu = np.std(results), np.mean(results)
print(f"Sigma = {sigma:g}, mu = {mu:g}")
print(f"Project duration: {np.percentile(results, 95)} days with 95% confidence")
plt.plot(bins, np.exp(-.5*((bins - mu)/sigma)**2)/(sigma*np.sqrt(2*np.pi)), 'r--')
plt.xlabel(u"Project duration [days]"); plt.ylabel(u"Est. probability density");
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment