Skip to content

Instantly share code, notes, and snippets.

@andyczerwonka
Last active January 17, 2024 00:58
Show Gist options
  • Save andyczerwonka/23da899e21d3f69618360024038e4be4 to your computer and use it in GitHub Desktop.
Save andyczerwonka/23da899e21d3f69618360024038e4be4 to your computer and use it in GitHub Desktop.
import io.fury.Fury
import io.fury.config.Language
import org.junit.Test
import org.scalatest.Assertions._
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.util.Base64
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
import scala.util.Try
import scala.util.Success
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import java.util.concurrent.ThreadLocalRandom
import com.typesafe.scalalogging.StrictLogging
import scala.concurrent.Await
import io.fury.ThreadLocalFury
import scala.annotation.nowarn
// What makes this fail is the nested collection in the case class. If you change it to
// as 1-dimensional collection, we no longer see the exception
case class SampleData(label: String, data: Seq[Seq[Int]])
@nowarn
class SerdeThreadingTest extends StrictLogging {
def threadLocalFury =
new ThreadLocalFury(classLoader => {
Fury
.builder()
.withLanguage(Language.JAVA)
.requireClassRegistration(false)
.withScalaOptimizationEnabled(true)
.withRefTracking(true)
.withStringCompressed(true)
.withLongCompressed(true)
.withIntCompressed(true)
.withAsyncCompilation(true)
.withClassLoader(classLoader)
.build()
})
private val fury = Fury
.builder()
.withLanguage(Language.JAVA)
.requireClassRegistration(false)
.withScalaOptimizationEnabled(true)
.withRefTracking(true)
.withStringCompressed(true)
.withLongCompressed(true)
.withIntCompressed(true)
.withAsyncCompilation(true)
.buildThreadSafeFury()
def encode(sampleData: SampleData) = {
val raw = fury.serialize(sampleData)
val bos = new ByteArrayOutputStream(raw.length)
val zos = new GZIPOutputStream(bos)
zos.write(raw)
zos.flush()
zos.close()
bos.close()
sleepBetween(500, 1000)
Base64.getEncoder.encodeToString(bos.toByteArray)
}
def decode(encoded: String) =
Try {
val bis = new ByteArrayInputStream(Base64.getDecoder.decode(encoded))
val zis = new GZIPInputStream(bis)
val uncompressed = zis.readAllBytes()
val result = fury.deserialize(uncompressed).asInstanceOf[SampleData]
zis.close()
bis.close()
sleepBetween(500, 1000)
result
}
// The threading is a red herring as this simple test fails as well
@Test
def testNonThreadedSerde(): Unit = {
val data = SampleData("single sample", Seq.empty)
val encoded = encode(data)
val decoded = decode(encoded)
assert(decoded == Success(data))
}
@Test
def testNestedCollectionThreadedSerde(): Unit = {
import scala.concurrent.duration._
implicit val ec = ExecutionContext.global
val tasks = for (i <- 1 to 1) yield Future {
val data = SampleData(i.toString, Seq.empty)
logger.info(s"Start encoding ${data.label}")
val encoded = encode(data)
logger.info(s"End encoding ${data.label}")
encoded
}
val decodedFuture = for {
f <- Future.sequence(tasks)
} yield for {
encoded <- f
} yield {
logger.info(s"Start decoding...")
val Success(decoded) = decode(encoded)
logger.info(s"End decoding ${decoded.label}")
decoded
}
val result = Await.result(decodedFuture, 20.seconds)
assert(result.size == 1)
}
def sleepBetween(min: Int, max: Int) = {
val sleepTime = ThreadLocalRandom.current().nextInt(min, max)
Thread.sleep(sleepTime.toLong)
}
}
@andyczerwonka
Copy link
Author

This test generates the following error (which is the same as the original error described here):

`xception in thread "fury-jit-compiler-14" java.lang.RuntimeException: Create sequential serializer failed, 
class: class io.citrine.mithril.SampleData
	at io.fury.serializer.CodegenSerializer.loadCodegenSerializer(CodegenSerializer.java:51)
CodegenSerializer.java:51
	at io.fury.resolver.ClassResolver.lambda$getObjectSerializerClass$2(ClassResolver.java:966)
ClassResolver.java:966
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:75)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
ThreadPoolExecutor.java:1128
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
ThreadPoolExecutor.java:628
	at java.base/java.lang.Thread.run(Thread.java:829)
Thread.java:829
Caused by: java.lang.IllegalArgumentException: Expected AbstractCollectionSerializer but got io.fury.serializer.Serializer
	at io.fury.util.Preconditions.checkArgument(Preconditions.java:81)
Preconditions.java:81
	at io.fury.builder.BaseObjectCodecBuilder.deserializeForCollection(BaseObjectCodecBuilder.java:1210)
BaseObjectCodecBuilder.java:1210
	at io.fury.builder.BaseObjectCodecBuilder.deserializeForNotNull(BaseObjectCodecBuilder.java:1162)
BaseObjectCodecBuilder.java:1162
	at io.fury.builder.BaseObjectCodecBuilder.lambda$readContainerElement$12(BaseObjectCodecBuilder.java:1400)
BaseObjectCodecBuilder.java:1400
	at io.fury.builder.BaseObjectCodecBuilder.readRef(BaseObjectCodecBuilder.java:1093)
BaseObjectCodecBuilder.java:1093
	at io.fury.builder.BaseObjectCodecBuilder.readContainerElement(BaseObjectCodecBuilder.java:1397)
BaseObjectCodecBuilder.java:1397
	at io.fury.builder.BaseObjectCodecBuilder.lambda$readContainerElements$2223955c$1(BaseObjectCodecBuilder.java:1359)
BaseObjectCodecBuilder.java:1359
	at io.fury.codegen.Expression$ForLoop.doGenCode(Expression.java:2366)
Expression.java:2366
	at io.fury.codegen.Expression.genCode(Expression.java:105)
Expression.java:105
	at io.fury.codegen.Expression$ListExpression.doGenCode(Expression.java:184)
Expression.java:184
	at io.fury.codegen.Expression.genCode(Expression.java:105)
Expression.java:105
	at io.fury.codegen.ExpressionOptimizer.invokeGenerated(ExpressionOptimizer.java:123)
ExpressionOptimizer.java:123
	at io.fury.codegen.ExpressionOptimizer.invokeGenerated(ExpressionOptimizer.java:71)
ExpressionOptimizer.java:71
	at io.fury.builder.BaseObjectCodecBuilder.readCollectionCodegen(BaseObjectCodecBuilder.java:1297)
BaseObjectCodecBuilder.java:1297
	at io.fury.builder.BaseObjectCodecBuilder.deserializeForCollection(BaseObjectCodecBuilder.java:1220)
BaseObjectCodecBuilder.java:1220
	at io.fury.builder.BaseObjectCodecBuilder.deserializeForNotNull(BaseObjectCodecBuilder.java:1162)
BaseObjectCodecBuilder.java:1162
	at io.fury.builder.BaseObjectCodecBuilder.deserializeForNotNull(BaseObjectCodecBuilder.java:1122)
BaseObjectCodecBuilder.java:1122
	at io.fury.builder.BaseObjectCodecBuilder.lambda$deserializeFor$7(BaseObjectCodecBuilder.java:1073)
BaseObjectCodecBuilder.java:1073
	at io.fury.builder.BaseObjectCodecBuilder.readRef(BaseObjectCodecBuilder.java:1093)
BaseObjectCodecBuilder.java:1093
	at io.fury.builder.BaseObjectCodecBuilder.deserializeFor(BaseObjectCodecBuilder.java:1073)
BaseObjectCodecBuilder.java:1073
	at io.fury.builder.BaseObjectCodecBuilder.deserializeFor(BaseObjectCodecBuilder.java:1057)
BaseObjectCodecBuilder.java:1057
	at io.fury.builder.ObjectCodecBuilder.lambda$deserializeGroup$37fcf467$1(ObjectCodecBuilder.java:525)
ObjectCodecBuilder.java:525
	at io.fury.builder.ObjectCodecBuilder.deserializeGroup(ObjectCodecBuilder.java:539)
ObjectCodecBuilder.java:539
	at io.fury.builder.ObjectCodecBuilder.buildDecodeExpression(ObjectCodecBuilder.java:442)
ObjectCodecBuilder.java:442
	at io.fury.builder.BaseObjectCodecBuilder.genCode(BaseObjectCodecBuilder.java:208)
BaseObjectCodecBuilder.java:208
	at io.fury.codegen.CompileUnit.getCode(CompileUnit.java:57)
CompileUnit.java:57
	at io.fury.codegen.JaninoUtils.toBytecode(JaninoUtils.java:75)
JaninoUtils.java:75
	at io.fury.codegen.JaninoUtils.toBytecode(JaninoUtils.java:67)
JaninoUtils.java:67
	at io.fury.codegen.CodeGenerator.compile(CodeGenerator.java:147)
CodeGenerator.java:147
	at io.fury.builder.CodecUtils.loadOrGenCodecClass(CodecUtils.java:94)
CodecUtils.java:94
	at io.fury.builder.CodecUtils.loadOrGenObjectCodecClass(CodecUtils.java:45)
CodecUtils.java:45
	at io.fury.serializer.CodegenSerializer.loadCodegenSerializer(CodegenSerializer.java:48)
CodegenSerializer.java:48
	... 7 more

@chaokunyang
Copy link

Could you use ThreafSafeFury instead, seems that you are using same Fury for multiple threads

@andyczerwonka
Copy link
Author

andyczerwonka commented Jan 16, 2024

@andyczerwonka
Copy link
Author

Also note, the threading is a red herring. It fails the same way in a simple, single-threaded tests. https://gist.github.com/andyczerwonka/23da899e21d3f69618360024038e4be4#file-serdethreadingtest-scala-L81-L88

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment