public
Last active

Forking an enumerator computation

  • Download Gist
EnumeratorFork.hs
Haskell
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
{-# LANGUAGE ScopedTypeVariables #-}
 
import Control.Concurrent (killThread)
import Control.Concurrent.BoundedChan
import Control.Concurrent.Thread
import Control.Exception (SomeException)
import Control.Monad.CatchIO
import Control.Monad.Trans
import Prelude hiding (catch)
import qualified Data.Enumerator.List as E
import Snap.Iteratee
 
 
chanToIter :: BoundedChan (Maybe a) -> Iteratee a IO ()
chanToIter chan = go
where
go = E.head >>=
maybe (liftIO $ writeChan chan Nothing)
(\x -> liftIO (writeChan chan (Just x)) >> go)
 
 
chanToEnum :: BoundedChan (Maybe a) -> Enumerator a IO b
chanToEnum chan = check
where
readUntilEOF = do
mbX <- liftIO $ readChan chan
maybe (return ()) (const readUntilEOF) mbX
 
getStreamChunk = do
mbX <- liftIO $ readChan chan
return $ maybe EOF (Chunks . (:[])) mbX
 
check (Continue k) = do
stream <- getStreamChunk
step <- lift $ runIteratee $ k stream
check step
 
check (Yield x r) = readUntilEOF >> yield x r
 
check (Error e) = throwError e
 
 
forkEnumerator :: MonadIO m =>
(Enumerator a IO b -> IO c)
-> m (Iteratee a IO c)
forkEnumerator func = do
chan <- liftIO $ newBoundedChan chanSize
 
(tid, resultThunk) <- liftIO $ forkIO $ func $ chanToEnum chan
return $ mkOutputIter tid resultThunk chan
 
where
chanSize = 4
 
mkOutputIter tid resultThunk chan = do
res <- outputIter `catch` \(e::SomeException) -> do
liftIO $ killThread tid
throwError e
either throwError return res
 
where
outputIter = do
chanToIter chan
liftIO resultThunk

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.