Skip to content

Instantly share code, notes, and snippets.

@roman
Created June 5, 2015 00:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save roman/69cd9edd88144f48c7a7 to your computer and use it in GitHub Desktop.
Save roman/69cd9edd88144f48c7a7 to your computer and use it in GitHub Desktop.
Experiments with a Cont Monad approach for Observables
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE RecursiveDo #-}
module Main where
import Control.Concurrent (myThreadId)
import Control.Exception (SomeException)
import Control.Applicative (Alternative(..))
import Control.Monad (liftM, ap, unless, foldM_)
import Data.IORef (newIORef, atomicModifyIORef', readIORef)
import Rx.Disposable (Disposable, setDisposable, newSingleAssignmentDisposable, newDisposable, emptyDisposable, dispose, disposeWithResult)
import Rx.Scheduler (schedule, newThread)
data Notification a
= OnNext a
| OnError SomeException
| OnCompleted
deriving (Show)
data Concurrency
= A
| S
data Observer a
= Observer {
emitNotification :: Notification a -> IO ()
, isUnsubscribed :: IO Bool
, unsubscribeObserver :: IO ()
}
newtype Observable (concurrency :: Concurrency) a
= Observable {
internalSubscribe :: Observer a -> IO Disposable
}
instance Functor (Observable concurrency) where
fmap = liftM
instance Applicative (Observable concurrency) where
pure = return
(<*>) = ap
instance Alternative (Observable concurrency) where
empty = Observable $ \_ -> emptyDisposable
obA <|> obB = Observable $ \observer -> do
d1 <- internalSubscribe obA observer
d2 <- internalSubscribe obB observer
return (d1 `mappend` d2)
instance Monad (Observable concurrency) where
return a = Observable $ \observer -> do
emitNotification observer (OnNext a)
emitNotification observer OnCompleted
emptyDisposable
obsA >>= f = Observable $ \observer -> do
disposableRef <- newIORef mempty
appendDisposable disposableRef obsA $ \notificationA -> do
case notificationA of
OnNext a -> do
let obsB = f a
appendDisposable disposableRef obsB (emitNotification observer)
OnError err -> emitNotification observer (OnError err)
OnCompleted -> emitNotification observer OnCompleted
newDisposable "bind-operator" (readIORef disposableRef >>= dispose)
where
appendDisposable disposableRef obs callback = do
observer <- newObserver callback
currentD <- internalSubscribe obs observer
atomicModifyIORef' disposableRef (\d -> (d `mappend` currentD, ()))
observeList :: [a] -> Observable S a
observeList as = Observable $ \observer -> do
foldM_ (\result a -> do
unsubscribed <- isUnsubscribed observer
if unsubscribed
then return ()
else do
emitNotification observer (OnNext a)
return result)
()
as
emitNotification observer OnCompleted
emptyDisposable
asyncObserveList :: [a] -> Observable A a
asyncObserveList as = Observable $ \observer ->
schedule newThread $ do
foldM_ (\result a -> do
unsubscribed <- isUnsubscribed observer
if unsubscribed
then return ()
else do
emitNotification observer (OnNext a)
return result)
()
as
emitNotification observer OnCompleted
once :: Observable concurrency a
-> Observable concurrency a
once source = Observable $ \observer -> do
subscribe source
(\a -> do
emitNotification observer (OnNext a)
emitNotification observer OnCompleted
unsubscribeObserver observer)
(emitNotification observer . OnError)
(emitNotification observer OnCompleted)
newObserver :: (Notification a -> IO ()) -> IO (Observer a)
newObserver callback0 = do
isDoneRef <- newIORef False
return $ Observer (observer isDoneRef)
(readIORef isDoneRef)
(atomicModifyIORef' isDoneRef (\_ -> (True, ())))
where
observer isDoneRef OnCompleted = do
isDone <- readIORef isDoneRef
unless isDone (callback0 OnCompleted)
atomicModifyIORef' isDoneRef (\_ -> (True, ()))
observer isDoneRef notification = do
isDone <- readIORef isDoneRef
unless isDone (callback0 notification)
subscribe
:: String
-> Observable (s :: Concurrency) a
-> (a -> IO ())
-> (SomeException -> IO ())
-> IO ()
-> IO Disposable
subscribe desc obs onNext onErr onCompl = do
observer <- newObserver (\n -> notificationHandler n)
internalSubscribe obs observer
where
notificationHandler (OnNext a) = onNext a
notificationHandler (OnError err) = onErr err
notificationHandler (OnCompleted) = onCompl
main :: IO ()
main = do
let obs = do
a <- asyncObserveList [1..10]
b <- asyncObserveList [11..20]
return (a, b)
_ <- subscribe obs print print (putStrLn "done")
return ()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment