Skip to content

Instantly share code, notes, and snippets.

@afsalthaj
Last active November 7, 2020 12:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save afsalthaj/6ff9c0e2be28b1223a7d805e9232a116 to your computer and use it in GitHub Desktop.
Save afsalthaj/6ff9c0e2be28b1223a7d805e9232a116 to your computer and use it in GitHub Desktop.
import cats.data.StateT
import cats.effect.{ IO, Resource }
import cats.syntax.traverse._
import cats.instances.list._
import Resources.StateInfo
/**
* Resources is a light weight stuff on top of `cats.Resource`
* to guarantee strong semantics of acquisitions and release for a collection of resources,
* while allowing you to maintain some state.
*
* Assuming S3Support.downloadFiles returns `Resources`, following are some examples:
*
* Example 1 (Most straight forward and simple usecase)
* {{{
* S3Support.downloadFiles(..).use((_, a) => sendFile(a))
* }}}
*
* The above usage makes sure of the following:
* All files will be downloaded individually and may possess some extra resource-ful logic (such as converting to a parquet),
* initiate a sendFile to each one of these files individually,
* and clean up all the resources associated with it instanteneously and independently.
* This implies, resource won't be allocated until it is time to execute corresponding sendFile.
*
* This prohibits downloading an entire set of files (possibly in an s3 path)
* in a container. A failure in one of the file transfer results in deleting the corresponding resource instantaneously.
*
* You can also keep a state of executions.`List[B]` in function `f` of `use`
* represents a state associated with entire process. Example: To make sure we are not sending
* duplicate files without collecting resources into memory, allowing instant clean ups.
*
* Example 2 (executeAll)
* {{{
* S3Support.downloadFiles().execute.use(allFiles => sendAllFiles(allFiles))
* }}}
*
* In this case, resources are accumulated (and will not initiate any further process until all of them are collated)
* allowing you to use the collection of downloaded-renamed files together (Example: sendAllFiles),
* It guarantees release of all resources associated with the collection, (only)
* after a successful or failed execution of `sendAllFiles`.
* A practical example, for a JDBC transfer using spark,
* we may need to collate all resources, and close them only after a collective transfer to SQL database.
*
* Example 3 (mix of use and executeAll):
*
* You can also acquire resources together, but use each resource individually (may be some complex process)
* and guarantee release of all resources acquired up until then, regardless of the outcome.
*
* {{{
* S3Support.downloadFiles().execute.use(allFiles => allFiles.traverse(a => sendFile(a)))
* }}}
*
* This typically means, regardless of the order of execution, granularity of resource usages, and time of execution,
* resources will be cleaned up.
*
* This is fairly equivalent to forming `Collection` of `Composable` (version) of `try-with-resources` in Java.
* and such such strong semantics are missing in the language.
*
* Porting this idea to Java is reasonably impossible (unless we are ready to cut corners) -
* one of the most important reasons for why some code is strictly written in a language like Scala as of 2020.
*
* Note: The only disciplined way to use the output of `S3Support.downloadFile` is either call `.use` or `.execute`.
*/
case class Resources[A](resources: List[Resource[IO, A]]) {
/**
* Execute all of it without losing the context of resource.
* All the resources will be closed together, after all the resources
* are used
*/
def execute: Resource[IO, List[A]] =
resources.sequence[Resource[IO, *], A]
/**
* @param f: Given a list of B (derived from A) already processed, and the current file A, execute the process to derive C.
* Resources are closed as an when they are used, unlike `execute`
*/
def use[B, C](f: (List[B], A) => IO[C])(g: A => B): IO[List[C]] =
resources
.traverse[StateInfo[B, *], C](resource => {
for {
existingFiles <- StateT.get[IO, List[B]]
dataB <- StateT.liftF(resource.use(data => f(existingFiles, data).map(b => (data, b))))
_ <- StateT.set[IO, List[B]](g(dataB._1) :: existingFiles)
} yield dataB._2
})
.run(Nil: List[B])
.map(_._2)
}
object Resources {
type StateInfo[A, B] = StateT[IO, List[A], B]
def trackDuplicate[A, B, C](
f: A => IO[C]
)(g: A => B)(onDuplicate: B => String): (List[B], A) => IO[C] = { (state, resource) =>
if (state.contains(g(resource))) {
IO.raiseError(new RuntimeException(onDuplicate(g(resource))))
} else f(resource)
}
}
/**
* Resource handling using try-with-resources is a sub-optimal stuff in Java. (While it must be used if we are in Java)
Infact, TWR (try-with-resource) is wrong for the following reasons:
1. TWR has edge cases, which they are trying to fix now. Essentially in any try (var f = foo()), foo may open a resource and then do something with the resource that throws an exception; in this case, the resource will be opened but not closed. They may fix this but for now it is a problem.
2. TWR is always sequential. You cannot do TWR in parallel with another try/catch. This can increase latency in some applications, e.g. when creating pools, allocating remote resources, etc., anything whose overhead of acquisition is significant enough to take time. Things like cats.Resource or zio.ZManaged is easily parallel, open(left).zipPar(open(right)).
3. TWR is always static. You cannot do TWR on a list of things, for example. You have to have a statically fixed number of things you want to acquire/release. ZManaged can easily handle dynamic numbers of things, e.g. ZManaged.foreachPar(files) { file => open(file) }.use { ... }
4. TWR conflates the acquisition/release of a resource with its usage. This forces developers to pay attention to acquisition/release. You can't separate them because the "use" code always has to go inside the try block. Something like ZManaged allows you to package up the acquire/release into a value. Apply traverse. Use resources together and close them together, or use it invidually looping through each one of them.The part of the code that uses a resource doesn't need to know how to acquire/release it. That's vastly more modular and leads to better coding patterns.
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment