Skip to content

Instantly share code, notes, and snippets.

@chrilves
Last active March 2, 2020 22:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chrilves/dc1bd08e0665a7243b1d410e48cc5513 to your computer and use it in GitHub Desktop.
Save chrilves/dc1bd08e0665a7243b1d410e48cc5513 to your computer and use it in GitHub Desktop.
/** Simulates a resource can be created and closed
* like a file handler, database connection handler, etc.
*
* Like the ones above, its methods are only available if
* the resource is opened, it must always be closed
* after use to avoid leaks and closing and already closed
* connection is a fault.
*
* @param name the name of the connection
* @param failureProba the probability opening and closing fail.
* echoing will fail on probability {{{failureProba / 4}}}
*/
final class Connection(name: String, failureProba: Double = 0D) {
import Connection._
println(s"Opening Connection $name.")
// Opening the connection may fail with probaility {{{failureProba}}}
if (scala.util.Random.nextDouble < failureProba) throw OpeningFailed(name)
_numberOpened += 1 // We need to keep the number of connection opened valid
/** The state of the connection */
private var opened : Boolean = true
/** Close an OPENED the connection.
* If the connection is already closed, it fails.
*/
def close(): Unit =
if (opened) {
println(s"Closing connection $name.")
_numberOpened -= 1 // {{{close}}} was called, so the number of connection opened should be decreased.
opened = false
// Closing the connection may fail with probaility {{{failureProba}}}
if (scala.util.Random.nextDouble < failureProba) throw ClosingFailed(name)
}
else throw ConnectionAlreadyClosed(name)
/** print and return the input number.
* The connection MUST BE OPENED, otherwise it fails.
*/
def echo(i: Int): Int =
if (opened) {
println(s"Echoing $i on connection $name.")
if (scala.util.Random.nextDouble < (failureProba / 4)) throw EchoingFailed(name, i)
i
}
else throw ConnectionClosed(name)
/** Fails if the connection is opened */
def checkClosed(): Unit =
if (opened) throw ConnectionNotClosed(name)
}
object Connection {
/** Create a new connection with the given name */
def open(name: String, failureProba: Double = 0D): Connection = new Connection(name, failureProba)
/** Count the number of opened connections */
private var _numberOpened: Int = 0
/** Return the number of opened connections */
def numberOpened: Int = _numberOpened
/** Reset the number of opened connections to 0*/
def resetOpened(): Unit = _numberOpened = 0
final case class OpeningFailed(name: String) extends Exception(s"Opening $name failed!")
final case class ClosingFailed(name: String) extends Exception(s"Closing $name failed!")
final case class EchoingFailed(name: String, i: Int) extends Exception(s"Echoing $i on $name failed!")
final case class ConnectionAlreadyClosed(name: String) extends Exception(s"Can not close $name, it is already closed!")
final case class ConnectionClosed(name: String) extends Exception(s"Connection $name is closed!")
final case class ConnectionNotClosed(name: String) extends Exception(s"Connection $name is NOT closed!")
}
/** A safe wrapper to open and close a resource of type {{{A}}}. */
trait Resource[+A] { self =>
/** Open a new resource of type {{{A}}}
* Returns {{{(aNewA, functionClosingTheNewA)}}}
* - {{{aNewA}}} is the created resource.
* - {{{functionClosingTheNewA}}} is a function that closes the resource {{{aNewA}}}.
* A resource MUST NEVER be accessed/used after being closed.
*/
def open(): (A, () => Unit)
/* Call {{{f}}} with a newly created {{{a:A}}}.
* Ensures that {{{a}}} is properly closed after the call to {{{f}}}.
* The return value {{{f(a):B}}} MUST NEVER rely on {{{a}}} being opened.
*/
final def use[B](f: A => B): B = {
val (a, close) = open()
try f(a)
finally close()
}
final def map[B](f: A => B): Resource[B] =
new Resource[B] {
def open(): (B, () => Unit) = {
val (a, closeA) = self.open()
try (f(a), closeA)
catch { case e : Throwable =>
try closeA() catch { case e2: Throwable => e.addSuppressed(e2) }
throw e
}
}
}
final def flatMap[B](f: A => Resource[B]): Resource[B] =
new Resource[B] {
def open(): (B, () => Unit) = {
val (a, closeA) = self.open()
try {
val (b, closeB) = f(a).open()
(b, () => {
try closeB()
finally closeA()
})
} catch { case e: Throwable =>
closeA()
throw e
}
}
}
/* Call {{{f}}} with a newly created {{{a:A}}}.
* Ensures that the returned iterator will open/close an {{{a:A}}} properly.
*
* A resource {{{a:A}}} is created the first time the resulting iterator
* methods {{{hasNext}}}/{{{next}}} are called.
* The resource {{{a:A}}} is closed when there is no next value in the iterator
* OR if an exception was raised.
*/
final def useInIterator[B](f: A => Iterator[B]): Iterator[B] = {
sealed trait State
final case class Running(close: () => Unit, iter: Iterator[B]) extends State // Means a {{{a:A}}} has been opened and {{{f(a)}}} called.
final case object Finished extends State // Means the iterator {{{f(a:A)}}} has been consumed and {{{a:A}}} closed without error.
final case class Failed(error: Throwable) extends State // Means an exception has happened.
final class WrapedItetator(initialState: State) extends Iterator[B] {
var state: State = initialState
def hasNext: Boolean =
state match {
case Running(cls, iter) =>
try if (iter.hasNext)
true
else {
state = Finished
cls()
false
}
catch { case e : Throwable =>
if (state != Finished)
try cls()
catch { case e2: Throwable => e.addSuppressed(e2) }
state = Failed(e)
throw e
}
case Finished => false
case Failed(e) => throw e
}
def next(): B =
state match {
case Running(cls, iter) =>
try iter.next()
catch { case e : Throwable =>
try cls() catch { case e2: Throwable => e.addSuppressed(e2) }
state = Failed(e)
throw e
}
case Finished =>
throw new java.util.NoSuchElementException("next on empty iterator")
case Failed(e) =>
throw e
}
}
val (a, close) = self.open()
var opened: Boolean = true
try {
val iter = f(a)
if (!iter.hasNext) {
opened = false
close()
Iterator.empty
} else new WrapedItetator(Running(close, iter))
} catch { case e : Throwable =>
if (opened)
try close()
catch { case e2 : Throwable => e.addSuppressed(e2) }
throw e
}
}
}
object Resource {
/** A pure {{{a:A}}} that does not need to be opened/closed */
def pure[A](a:A): Resource[A] =
new Resource[A] {
def open(): (A, () => Unit) = (a, () => ())
}
/** Create a {{{Resource[A]}}} from an opening and closing function */
def of[A](opn: => A, cls: A => Unit): Resource[A] =
new Resource[A] {
def open(): (A, () => Unit) = {
val a = opn
(a, () => cls(a))
}
}
}
/***********
* TESTING *
***********/
final class Stats {
def count : Long = _count
def avg : Double = _sum / _count
def min : Double = _min
def max : Double = _max
def stddev: Double = math.sqrt(_sumSquares / _count - avg*avg)
private var _min : Double = Double.MaxValue
private var _max : Double = Double.MinValue
private var _sum : Double = 0
private var _sumSquares : Double = 0
private var _count : Long = 0
def +(d : Double): Unit = {
_min = math.min(_min, d)
_max = math.max(_max, d)
_sum += d
_sumSquares += d*d
_count += 1
}
def ++(s: Stats): Unit = {
_min = math.min(_min, s._min)
_max = math.max(_max, s._max)
_sum += s._sum
_sumSquares += s._sumSquares
_count += s._count
}
override def toString =
s"""count = ${count}
|avg = ${avg}
|min = ${min}
|max = ${max}
|stddev = ${stddev}
""".stripMargin
}
object Tests {
/** Open two resources A and B. B may depends on A, so B must be closed before A */
val rc : Resource[(Connection, Connection)] =
for {
a <- Resource.of[Connection](Connection.open("A", 0.2), _.close())
b <- Resource.of[Connection](Connection.open("B", 0.2), _.close())
} yield {
if (scala.util.Random.nextDouble < 0.2) throw new Exception("Map failed!")
(a,b)
}
/** Use the above resource to map an iterator.
* use the resulting operator to obverse what
* is happening.
*/
def iter() =
rc.useInIterator { case (connA,connB) =>
List(1,2,3,4,5)
.iterator
.map { i => connA.echo(i) + connB.echo(i) }
}
final case class TestStats(exceptions: Stats, correctClosing: Stats)
def test(times: Long): TestStats = {
val exceptions = new Stats
val correctClosing = new Stats
for (i <- 1L to times) {
Connection.resetOpened()
try {
iter().size
exceptions + 0d
} catch { case _ : Throwable =>
exceptions + 1d
}
correctClosing + (if (Connection.numberOpened == 0) 1d else 0d)
}
TestStats(exceptions, correctClosing)
}
}
object Benchmark {
def iterN(n: Long): Iterator[Long] =
new Iterator[Long] {
var _next: Long = -1
def hasNext: Boolean = _next < n
def next(): Long = {
_next += 1
_next
}
}
def iterFor(millis: Long): Iterator[Long] = {
val start = System.currentTimeMillis
new Iterator[Long] {
var _next: Long = -1
def hasNext: Boolean = System.currentTimeMillis < start + millis
def next(): Long = {
_next += 1
_next
}
}
}
val conn: Resource[Connection] =
Resource.of[Connection](Connection.open("A", 0D), _.close())
def bench(warm: Long, hot: Long)(mkIter: => Double): Stats = {
val s = new Stats
println("Warming")
for (i <- 1L to warm) mkIter
println("Statging")
for (i <- 1L to hot) s + mkIter
s
}
def time[A](a: => A): Double = {
val start = System.nanoTime
a
(System.nanoTime - start).toDouble / 1000000000L
}
val iterations: Long = 100000000L
def timeForNoRes(): Stats =
bench(100, 100)(time(iterN(iterations).size))
def timeForRes(): Stats =
bench(100, 100) {
val it = conn.useInIterator(_ => iterN(iterations))
time(it.size)
}
def numberForNoRes(): Stats =
bench(100, 100)(iterFor(100).size.toDouble)
def numberForRes(): Stats =
bench(100, 100)(conn.useInIterator(_ => iterFor(100)).size.toDouble)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment