Created
January 31, 2014 21:52
-
-
Save roman/8744038 to your computer and use it in GitHub Desktop.
Composition of Observables in rxhs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
getLastPublishedMessages :: PubSub -> ClientId -> IO (Either SomeException [WebResponse]) | |
getLastPublishedMessages pubsub cid = do | |
msubject <- getClientSubject pubsub cid | |
case msubject of | |
Just subject -> | |
Observable.toEither . | |
Observable.first . | |
Observable.filter (not . null) . | |
Observable.bufferInterval (Time.microSeconds 1000) . | |
Observable.concatMap excludeClientId $ | |
Observable.toObservable subject | |
Nothing -> | |
return | |
$ Left | |
$ SomeException | |
$ ErrorCall "unknown clientId" | |
where | |
excludeClientId req = | |
if _requestClientId req == cid then [] else [Request $ fmap Aeson.toJSON req] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment