Skip to content

Instantly share code, notes, and snippets.

@DmytroMitin
Last active September 29, 2020 21:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DmytroMitin/e33cd244b37f9b33b67f7ac3e6609d39 to your computer and use it in GitHub Desktop.
Save DmytroMitin/e33cd244b37f9b33b67f7ac3e6609d39 to your computer and use it in GitHub Desktop.
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.reflect.internal.util.{AbstractFileClassLoader, BatchSourceFile}
import scala.reflect.io.{AbstractFile, VirtualDirectory}
import scala.tools.nsc.{Global, Settings}
object App {
class MapFunc extends RichMapFunction[String, Any] {
var clazz: Class[_] = _
override def open(parameters: Configuration): Unit = {
val code =
"""|import scala.beans.BeanProperty
|case class Test(
| @BeanProperty var a: String,
| @BeanProperty var b: Int,
| @BeanProperty var c: String,
| @BeanProperty var d: Long) {
| def this() = {
| this(null, 0, null, 0)
| }
|}
|
|object Wrapper {
| def getClassTag(): Class[_] = scala.reflect.classTag[Test].runtimeClass
|}""".stripMargin
val directory = new VirtualDirectory("(memory)", None)
compileCode(code, None, directory)
clazz = runObjectMethod("Wrapper", directory, "getClassTag").asInstanceOf[Class[_]]
}
override def map(value: String): Any = {
val values = Seq("aaa", 1, "ccc", 2L)
clazz.getConstructors()(0).newInstance(values.asInstanceOf[Seq[Object]]: _*)
}
}
def main(args: Array[String]): Unit = {
implicit val typeInfo = TypeInformation.of(classOf[Any])
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("localhost", 9999)
println(stream.map(new MapFunc).keyBy("a", "c"))
//org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key.
}
def compileCode(code: String, classpathDirectory: Option[AbstractFile], outputDirectory: AbstractFile): Unit = {
val settings = new Settings
classpathDirectory.foreach(dir => settings.classpath.prepend(dir.toString))
settings.outputDirs.setSingleOutput(outputDirectory)
settings.usejavacp.value = true
val global = new Global(settings)
(new global.Run).compileSources(List(new BatchSourceFile("(inline)", code)))
}
def runObjectMethod(objectName: String, directory: AbstractFile, methodName: String): Any = {
import scala.reflect.runtime.universe
import scala.reflect.runtime.universe._
val classLoader = new AbstractFileClassLoader(directory, this.getClass.getClassLoader)
val runtimeMirror = universe.runtimeMirror(classLoader)
val objectSymbol = runtimeMirror.staticModule(objectName)
val objectModuleMirror = runtimeMirror.reflectModule(objectSymbol)
val objectInstance = objectModuleMirror.instance
val objectType = objectSymbol.typeSignature
val methodSymbol = objectType.decl(TermName(methodName)).asMethod
val objectInstanceMirror = runtimeMirror.reflect(objectInstance)
val methodMirror = objectInstanceMirror.reflectMethod(methodSymbol)
methodMirror()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment