Skip to content

Instantly share code, notes, and snippets.

@kubukoz
Last active April 2, 2021 14:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kubukoz/03039cd2fd03f252f9d5a7df4fefbc85 to your computer and use it in GitHub Desktop.
Save kubukoz/03039cd2fd03f252f9d5a7df4fefbc85 to your computer and use it in GitHub Desktop.
Return NotFound if http4s response stream is empty, wrap it in a different status otherwise
/*
Copyright 2021 Jakub Kozłowski
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import cats.effect.MonadThrow
import cats.implicits._
import fs2.Pull
import fs2.Stream
//Example usage: onEmptyOrNonEmpty(stream)(Ok(_))(NotFound()).flatten
def onEmptyOrNonEmpty[F[_]: MonadThrow, A, B](
stream: Stream[F, A]
)(
onNonEmpty: Stream[F, A] => B
)(
onEmpty: B
)(
implicit SC: fs2.Compiler[F, F]
): F[B] = stream
.pull
.peek1
.flatMap {
_.traverse_ { case (h, t) =>
Pull.output1(onNonEmpty((Stream.emit(h) ++ t)))
}
}
.stream
.compile
.last
.map(_.getOrElse(onEmpty))
@kubukoz
Copy link
Author

kubukoz commented Mar 25, 2021

Note: as @ChristopherDavenport reminded me, there's a .pull.peek1 which handles the scope extension already, so we can simplify that part:

     implicit SC: fs2.Compiler[F, F]
   ): F[Response[F]] = stream
     .pull
-    .uncons1
+    .peek1
     .flatMap {
       _.traverse_ { case (h, t) =>
-        Pull.extendScopeTo(t).flatMap { tEx =>
-          Pull.output1(onNonEmpty((Stream.emit(h) ++ tEx)))
-        }
+        Pull.output1(onNonEmpty((Stream.emit(h) ++ t)))
       }
     }
     .stream

Bonus point: We don't really need to stick to Response[F]:

   import cats.implicits._
   import fs2.Pull
   import fs2.Stream
-  import org.http4s.Response
 
   //Example usage: onEmptyOrNonEmpty(stream)(Ok(_))(NotFound())
-  def onEmptyOrNonEmpty[F[_]: MonadThrow, A](
+  def onEmptyOrNonEmpty[F[_]: MonadThrow, A, B](
     stream: Stream[F, A]
   )(
-    onNonEmpty: Stream[F, A] => F[Response[F]]
+    onNonEmpty: Stream[F, A] => F[B]
   )(
-    onEmpty: F[Response[F]]
+    onEmpty: F[B]
   )(
     implicit SC: fs2.Compiler[F, F]
-  ): F[Response[F]] = stream
+  ): F[B] = stream
     .pull
     .peek1
     .flatMap {

and then we can push the flattening of the F[B] part to the user, because why not (they'll just need to .flatten on the whole result):

   def onEmptyOrNonEmpty[F[_]: MonadThrow, A, B](
     stream: Stream[F, A]
   )(
-    onNonEmpty: Stream[F, A] => F[B]
+    onNonEmpty: Stream[F, A] => B
   )(
-    onEmpty: F[B]
+    onEmpty: B
   )(
     implicit SC: fs2.Compiler[F, F]
   ): F[B] = stream
@@ -80,7 +80,7 @@ object demo0 {
     .stream
     .compile
     .last
-    .flatMap(_.getOrElse(onEmpty))
+    .map(_.getOrElse(onEmpty))
 
 }
 

Final result:

import cats.effect.MonadThrow
import cats.implicits._
import fs2.Pull
import fs2.Stream

//Example usage: onEmptyOrNonEmpty(stream)(Ok(_))(NotFound()).flatten
def onEmptyOrNonEmpty[F[_]: MonadThrow, A, B](
  stream: Stream[F, A]
)(
  onNonEmpty: Stream[F, A] => B
)(
  onEmpty: B
)(
  implicit SC: fs2.Compiler[F, F]
): F[B] = stream
  .pull
  .peek1
  .flatMap {
    _.traverse_ { case (h, t) =>
      Pull.output1(onNonEmpty((Stream.emit(h) ++ t)))
    }
  }
  .stream
  .compile
  .last
  .map(_.getOrElse(onEmpty))

@msosnicki
Copy link

Why exactly the initial version needed the extendScopeTo? Looks like peek1 (which is implemented in terms of uncons1) is not doing any of this.

It's very hard to find any resources about fs2 scopes and how they work so will appreciate if you have some resources to share on this topic :)

@kubukoz
Copy link
Author

kubukoz commented Mar 26, 2021

@msosnicki uh, that's a great catch. Apparently peek1 doesn't do what we need here (I admit, I haven't ran that later code).

Basically, the idea is that if you just pull.uncons1 and compile the stream that results from that (which will contain the rest of the stream in the value - this is our B that we built from onNonEmpty), the resources of the stream will be closed when that effect completes - which may (and probably will) be before you even stream anything to the caller.

Extending the scope lets us explicitly say "don't close this stream yet" and pass the lifetime of all these resources to the "inner" stream (the one returned in the result: B). At least that's how I understand it (the original version worked like that).

Now that I think of it, there might be some gotchas related to cancellation/failure - if the outer stream is cancelled/fails after the scope is extended (although it shouldn't be able to fail after a successful uncons1, so cancellation is more likely), there's a chance the resources won't get closed... I'll do due research and testing and maybe post something longer about this.

@msosnicki
Copy link

Ok, that makes sense! So if I understand correctly, it is required if for whatever reason the inner stream compilation happens after the peek1.stream compilation. Here it's the case because in http4s Response this stream is retained and compiled later on. But if I wanted to use the same combinator with a B that compiles and consumes the inner stream internally, I think it wouldn't be required, as it wouldn't leak outside.

Anyway, looking forward to some longer post about it :) Thanks!

@kubukoz
Copy link
Author

kubukoz commented Mar 29, 2021

Might take some time :D

I think if we didn't have a compile that essentially runs before the response body, we wouldn't need to touch scopes, yeah. B can't consume the stream internally now, because it's no longer in F[B] (of course you can pass an effect, but the result will be F[F[...]] so it's a separate "compilation" and needs scope extension)

@ChristopherDavenport
Copy link

@kubukoz I'm sure people will copy this, might make sense to throw a license file up, so them doing so is alright.

@kubukoz
Copy link
Author

kubukoz commented Apr 2, 2021

Good idea, thanks :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment