Skip to content

Instantly share code, notes, and snippets.

@roman
Created January 31, 2014 21:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save roman/8744038 to your computer and use it in GitHub Desktop.
Save roman/8744038 to your computer and use it in GitHub Desktop.
Composition of Observables in rxhs
getLastPublishedMessages :: PubSub -> ClientId -> IO (Either SomeException [WebResponse])
getLastPublishedMessages pubsub cid = do
msubject <- getClientSubject pubsub cid
case msubject of
Just subject ->
Observable.toEither .
Observable.first .
Observable.filter (not . null) .
Observable.bufferInterval (Time.microSeconds 1000) .
Observable.concatMap excludeClientId $
Observable.toObservable subject
Nothing ->
return
$ Left
$ SomeException
$ ErrorCall "unknown clientId"
where
excludeClientId req =
if _requestClientId req == cid then [] else [Request $ fmap Aeson.toJSON req]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment