Skip to content

Instantly share code, notes, and snippets.

@DmytroMitin
Last active September 30, 2020 10:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DmytroMitin/16d312dbafeae54518f7ac2c490426b0 to your computer and use it in GitHub Desktop.
Save DmytroMitin/16d312dbafeae54518f7ac2c490426b0 to your computer and use it in GitHub Desktop.
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.reflect.internal.util.{AbstractFileClassLoader, BatchSourceFile}
import scala.reflect.io.{AbstractFile, VirtualDirectory}
import scala.tools.nsc.{Global, Settings}
import scala.reflect.runtime.universe
import scala.reflect.runtime.universe._
import scala.tools.nsc.io.Path
object App {
class MapFunc extends RichMapFunction[String, Any] {
var directory: AbstractFile = _
var typeInfo: TypeInformation[_] = _
override def open(parameters: Configuration): Unit = {
println("open")
val code =
"""|import scala.beans.BeanProperty
|
|case class Test(
| @BeanProperty var a: String,
| @BeanProperty var b: Int,
| @BeanProperty var c: String,
| @BeanProperty var d: Long) {
| def this() = {
| this(null, 0, null, 0)
| }
|}
|
|import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
|
|object Wrapper {
| def getTypeInfo: TypeInformation[Test] = TypeInformation.of(classOf[Test])
| //def getTypeInfo: TypeInformation[Test] = TypeInformation.of(new TypeHint[Test] {})
|}""".stripMargin
directory = AbstractFile.getDirectory(Path("dir")) // directory must exist
// directory = new VirtualDirectory("(memory)", None)
compileCode(code, None, directory)
typeInfo = callObjectMethod("Wrapper", directory, "getTypeInfo").asInstanceOf[TypeInformation[_]]
}
override def map(value: String): Any = {
println("map")
val values = Seq("aaa", 1, "ccc", 2L)
createClassInstance("Test", directory, values: _*)
}
}
def main(args: Array[String]): Unit = {
val func = new MapFunc
func.open(new Configuration)
// func.map("")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("localhost", 9999)
val typeInfo = func.typeInfo.asInstanceOf[TypeInformation[Any]]
println(s"typeInfo=$typeInfo")//PojoType<Test, fields = [a: String, b: Integer, c: String, d: Long]>
val mapped = stream.map(func)(typeInfo) //Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: UTF-8 is not serializable. The object probably contains or references non serializable fields.
println(s"mapped=$mapped")
val keyedStream = mapped.keyBy("a", "c")
println(s"keyedStream=$keyedStream")
}
def compileCode(code: String, classpathDirectory: Option[AbstractFile], outputDirectory: AbstractFile): Unit = {
val settings = new Settings
classpathDirectory.foreach(dir => settings.classpath.prepend(dir.toString))
settings.outputDirs.setSingleOutput(outputDirectory)
settings.usejavacp.value = true
val global = new Global(settings)
(new global.Run).compileSources(List(new BatchSourceFile("(inline)", code)))
}
def callObjectMethod(objectName: String, directory: AbstractFile, methodName: String, args: Any*): Any = {
val runtimeMirror = getRuntimeMirror(directory)
val objectSymbol = runtimeMirror.staticModule(objectName)
val objectModuleMirror = runtimeMirror.reflectModule(objectSymbol)
val objectInstance = objectModuleMirror.instance
val objectType = objectSymbol.typeSignature
val methodSymbol = objectType.decl(TermName(methodName)).asMethod
val objectInstanceMirror = runtimeMirror.reflect(objectInstance)
val methodMirror = objectInstanceMirror.reflectMethod(methodSymbol)
methodMirror(args: _*)
}
def createClassInstance(className: String, directory: AbstractFile, args: Any*): Any = {
val runtimeMirror = getRuntimeMirror(directory)
val classSymbol = runtimeMirror.staticClass(className)
val classType = classSymbol.typeSignature
val constructorSymbol = classType.decl(termNames.CONSTRUCTOR).alternatives.head.asMethod
val classMirror = runtimeMirror.reflectClass(classSymbol)
val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
constructorMirror(args: _*)
}
def getRuntimeMirror(directory: AbstractFile): Mirror = {
val classLoader = new AbstractFileClassLoader(directory, this.getClass.getClassLoader)
universe.runtimeMirror(classLoader)
}
}
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: UTF-8 is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1899)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:189)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:622)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:640)
at pckg.App$.main(App.scala:64)
at pckg.App.main(App.scala)
Caused by: java.io.NotSerializableException: sun.nio.cs.UTF_8
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 11 more
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment