Skip to content

Instantly share code, notes, and snippets.

@andyczerwonka
Last active January 17, 2024 00:58
Show Gist options
  • Save andyczerwonka/23da899e21d3f69618360024038e4be4 to your computer and use it in GitHub Desktop.
Save andyczerwonka/23da899e21d3f69618360024038e4be4 to your computer and use it in GitHub Desktop.
import io.fury.Fury
import io.fury.config.Language
import org.junit.Test
import org.scalatest.Assertions._
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.util.Base64
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
import scala.util.Try
import scala.util.Success
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import java.util.concurrent.ThreadLocalRandom
import com.typesafe.scalalogging.StrictLogging
import scala.concurrent.Await
import io.fury.ThreadLocalFury
import scala.annotation.nowarn
// What makes this fail is the nested collection in the case class. If you change it to
// as 1-dimensional collection, we no longer see the exception
case class SampleData(label: String, data: Seq[Seq[Int]])
@nowarn
class SerdeThreadingTest extends StrictLogging {
def threadLocalFury =
new ThreadLocalFury(classLoader => {
Fury
.builder()
.withLanguage(Language.JAVA)
.requireClassRegistration(false)
.withScalaOptimizationEnabled(true)
.withRefTracking(true)
.withStringCompressed(true)
.withLongCompressed(true)
.withIntCompressed(true)
.withAsyncCompilation(true)
.withClassLoader(classLoader)
.build()
})
private val fury = Fury
.builder()
.withLanguage(Language.JAVA)
.requireClassRegistration(false)
.withScalaOptimizationEnabled(true)
.withRefTracking(true)
.withStringCompressed(true)
.withLongCompressed(true)
.withIntCompressed(true)
.withAsyncCompilation(true)
.buildThreadSafeFury()
def encode(sampleData: SampleData) = {
val raw = fury.serialize(sampleData)
val bos = new ByteArrayOutputStream(raw.length)
val zos = new GZIPOutputStream(bos)
zos.write(raw)
zos.flush()
zos.close()
bos.close()
sleepBetween(500, 1000)
Base64.getEncoder.encodeToString(bos.toByteArray)
}
def decode(encoded: String) =
Try {
val bis = new ByteArrayInputStream(Base64.getDecoder.decode(encoded))
val zis = new GZIPInputStream(bis)
val uncompressed = zis.readAllBytes()
val result = fury.deserialize(uncompressed).asInstanceOf[SampleData]
zis.close()
bis.close()
sleepBetween(500, 1000)
result
}
// The threading is a red herring as this simple test fails as well
@Test
def testNonThreadedSerde(): Unit = {
val data = SampleData("single sample", Seq.empty)
val encoded = encode(data)
val decoded = decode(encoded)
assert(decoded == Success(data))
}
@Test
def testNestedCollectionThreadedSerde(): Unit = {
import scala.concurrent.duration._
implicit val ec = ExecutionContext.global
val tasks = for (i <- 1 to 1) yield Future {
val data = SampleData(i.toString, Seq.empty)
logger.info(s"Start encoding ${data.label}")
val encoded = encode(data)
logger.info(s"End encoding ${data.label}")
encoded
}
val decodedFuture = for {
f <- Future.sequence(tasks)
} yield for {
encoded <- f
} yield {
logger.info(s"Start decoding...")
val Success(decoded) = decode(encoded)
logger.info(s"End decoding ${decoded.label}")
decoded
}
val result = Await.result(decodedFuture, 20.seconds)
assert(result.size == 1)
}
def sleepBetween(min: Int, max: Int) = {
val sleepTime = ThreadLocalRandom.current().nextInt(min, max)
Thread.sleep(sleepTime.toLong)
}
}
@chaokunyang
Copy link

Could you use ThreafSafeFury instead, seems that you are using same Fury for multiple threads

@andyczerwonka
Copy link
Author

andyczerwonka commented Jan 16, 2024

@andyczerwonka
Copy link
Author

Also note, the threading is a red herring. It fails the same way in a simple, single-threaded tests. https://gist.github.com/andyczerwonka/23da899e21d3f69618360024038e4be4#file-serdethreadingtest-scala-L81-L88

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment