Skip to content

Instantly share code, notes, and snippets.

@Grogs
Last active June 4, 2022 22:58
Show Gist options
  • Save Grogs/f8a53f3757951293cf978a22a178aa99 to your computer and use it in GitHub Desktop.
Save Grogs/f8a53f3757951293cf978a22a178aa99 to your computer and use it in GitHub Desktop.
File backed implementation of ScalaCache Cache
import java.nio.ByteBuffer
import java.nio.channels.{AsynchronousFileChannel, CompletionHandler, FileLock}
import java.nio.file.StandardOpenOption.{CREATE, READ, WRITE}
import java.nio.file._
import java.util.concurrent.{Executors, TimeUnit}
import scala.compat.java8.FunctionConverters.asJavaConsumer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try
import scalacache.Cache
import scalacache.serialization.Codec
class FileCache(prefix: String) extends Cache[Array[Byte]] {
private val scheduler = Executors.newScheduledThreadPool(2)
private val ec = ExecutionContext.fromExecutor(scheduler)
override def get[V](key: String)(implicit codec: Codec[V, Array[Byte]]): Future[Option[V]] = {
Future(AsynchronousFileChannel.open(path(key), READ))(ec).flatMap{
channel =>
readFile(codec, channel)
.map(Option.apply)
}.recover { case _: NoSuchFileException => None }
}
/**
* Note: At it stands, if the process is restarted, entries with a TTL won't get deleted :(
* Future work: maybe store TTL and creation time disk, then remove during get... Or remove on close
*/
override def put[V](key: String, value: V, ttl: Option[Duration])(implicit codec: Codec[V, Array[Byte]]): Future[Unit] = {
val res = writeFile(key, value, codec)
ttl.foreach { ttl =>
val delete = runnable {
val location = path(key)
Try(
Files.delete(location)
).recover { case e => e.printStackTrace() }
}
scheduler.schedule(delete, ttl.toMillis, TimeUnit.MILLISECONDS)
}
res
}
override def remove(key: String): Future[Unit] = {
Future(Files.delete(path(key)))(ec).recover{ case _ => () }
}
override def removeAll(): Future[Unit] = {
val completion = Promise[Unit]()
def deleteAll = {
Files.list(Paths.get(prefix)).forEach(
asJavaConsumer(item =>
Files.delete(item)
)
)
}
val deleteAndComplete = runnable {
completion.complete(Try(deleteAll))
}
scheduler.execute(deleteAndComplete)
completion.future
}
override def close(): Unit = {
scheduler.shutdown()
scheduler.awaitTermination(1, TimeUnit.MINUTES)
}
private def path(key: String) = Paths.get(s"$prefix/$key")
private def readFile[V](codec: Codec[V, Array[Byte]], channel: AsynchronousFileChannel): Future[V] = {
for {
fileSize <- Future.successful(channel.size().toInt) //Will fail if file is larger than 2.147 gigabytes
buffer = ByteBuffer.allocate(fileSize)
_ <- read(channel, buffer)
bytes = buffer.array()
value = codec.deserialize(bytes)
} yield value
}
private def lock[V](channel: AsynchronousFileChannel) = {
val lock = Promise[FileLock]()
channel.lock((), completionHandler(lock))
lock.future
}
private def read(channel: AsynchronousFileChannel, buffer: ByteBuffer): Future[Integer] = {
val completion = Promise[Integer]()
channel.read(buffer, 0, (), completionHandler(completion))
completion.future
}
private def writeFile[V](key: String, value: V, codec: Codec[V, Array[Byte]]) = {
val completion = Promise[Integer]()
val channel = AsynchronousFileChannel.open(path(key), CREATE, WRITE)
val bytes = codec.serialize(value)
val buffer = ByteBuffer.wrap(bytes)
val handler = completionHandler(completion)
channel.write(buffer, 0, (), handler)
completion.future.map { _ =>
channel.close()
()
}
}
private def completionHandler[V](promise: Promise[V]) = {
new CompletionHandler[V, Unit] {
def failed(exc: Throwable, attachment: Unit): Unit = promise.failure(exc)
def completed(result: V, attachment: Unit): Unit = promise.success(result)
}
}
private def runnable(f: => Unit) = new Runnable {
def run(): Unit = f
}
}
import java.nio.file.Files
import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}
import org.scalatest.{FunSuite, Matchers}
import scala.collection.JavaConverters._
import scalacache.AnyRefBinaryCodec
import scala.concurrent.duration._
class FileCacheSpec extends FunSuite with Matchers with ScalaFutures with Eventually with IntegrationPatience{
test("remove") {
val cacheDir = Files.createTempDirectory("FileCacheSpec").toAbsolutePath
val cache = new FileCache(cacheDir.toString)
def files = Files.list(cacheDir).iterator().asScala.toList
files shouldBe empty
cache.put("a", 1, None)
cache.put("b", 2, None)
eventually(timeout(1.second)) {
files.size shouldBe 2
}
cache.remove("c").futureValue
files.size shouldBe 2
cache.remove("b").futureValue
files.size shouldBe 1
}
test("put without ttl") {
val cacheDir = Files.createTempDirectory("FileCacheSpec").toAbsolutePath
val cache = new FileCache(cacheDir.toString)
def files = Files.list(cacheDir).iterator().asScala.toList
files shouldBe empty
cache.put("someKey", 1, None).futureValue shouldBe (())
files.size shouldBe 1
}
test("put with ttl") {
val cacheDir = Files.createTempDirectory("FileCacheSpec").toAbsolutePath
val cache = new FileCache(cacheDir.toString)
def files = Files.list(cacheDir).iterator().asScala.toList
files shouldBe empty
val write = System.currentTimeMillis()
cache.put("someKey", 42, Option(300.millis)).futureValue shouldBe (())
val written = System.currentTimeMillis()
files.size shouldBe 1
eventually(timeout(1.second)){
files.size shouldBe 0
val timeLived = System.currentTimeMillis() - write
timeLived shouldBe > (300L)
timeLived shouldBe < (600L)
}
}
test("removeAll") {
val cacheDir = Files.createTempDirectory("FileCacheSpec").toAbsolutePath
val cache = new FileCache(cacheDir.toString)
def files = Files.list(cacheDir).iterator().asScala.toList
files shouldBe empty
cache.put("a", 1, None)
cache.put("b", 2, None)
cache.put("c", 3, None)
cache.put("d", 4, None)
eventually(timeout(1.second)) {
files.size shouldBe 4
}
cache.removeAll()
eventually(timeout(1.second)) {
files.size shouldBe 4
}
}
test("get") {
val cacheDir = Files.createTempDirectory("FileCacheSpec").toAbsolutePath
val cache = new FileCache(cacheDir.toString)
def files = Files.list(cacheDir).iterator().asScala.toList
cache.get[Int]("someKey").futureValue shouldBe None
cache.put("someKey", 42, None).futureValue
cache.get[Int]("someKey").futureValue shouldBe Some(42)
cache.get[String]("someKey").failed.futureValue shouldBe a [Throwable]
}
test("close") {
val cacheDir = Files.createTempDirectory("FileCacheSpec").toAbsolutePath
val cache = new FileCache(cacheDir.toString)
def files = Files.list(cacheDir).iterator().asScala.toList
//Queue up some async actions
cache.put("a", 1, None)
cache.put("b", 2, None)
cache.put("c", 3, None)
cache.put("d", 4, None)
cache.removeAll()
//Close should return after removeAll is processed
cache.close()
files shouldBe empty
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment