Skip to content

Instantly share code, notes, and snippets.

@DmytroMitin
Last active October 2, 2020 02:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DmytroMitin/f2f859273075370c4687a30e0c3a2431 to your computer and use it in GitHub Desktop.
Save DmytroMitin/f2f859273075370c4687a30e0c3a2431 to your computer and use it in GitHub Desktop.
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.reflect.internal.util.{AbstractFileClassLoader, BatchSourceFile}
import scala.reflect.io.{AbstractFile, VirtualDirectory}
import scala.tools.nsc.{Global, Settings}
import scala.reflect.runtime.universe
import scala.reflect.runtime.universe._
object App {
class MapFunc extends RichMapFunction[String, Any] {
@transient var directory: AbstractFile = _
/*@transient*/ var typeInfo: TypeInformation[_] = _
override def open(parameters: Configuration): Unit = {
println("open")
val code =
"""|import scala.beans.BeanProperty
|
|case class Test(
| @BeanProperty var a: String,
| @BeanProperty var b: Int,
| @BeanProperty var c: String,
| @BeanProperty var d: Long) {
| def this() = {
| this(null, 0, null, 0)
| }
|}
|
|import org.apache.flink.api.common.typeinfo.TypeInformation
|
|object Wrapper {
| def getTypeInfo: TypeInformation[Test] = TypeInformation.of(classOf[Test])
|}""".stripMargin
directory = new VirtualDirectory("(memory)", None)
compileCode(code, None, directory)
typeInfo = callObjectMethod("Wrapper", directory, "getTypeInfo").asInstanceOf[TypeInformation[_]]
}
override def map(value: String): Any = {
println("map")
val values = Seq("aaa", 1, "ccc", 2L)
createClassInstance("Test", directory, values: _*)
}
}
def main(args: Array[String]): Unit = {
val func = new MapFunc
func.open(new Configuration)
// func.map("""{a: "aaa", b: 1, c: "ccc", d: 2}""")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("localhost", 9999)
val typeInfo = func.typeInfo.asInstanceOf[TypeInformation[Any]]
println(typeInfo)//PojoType<Test, fields = [a: String, b: Integer, c: String, d: Long]>
val res = stream.map(func)(typeInfo).keyBy("a", "c").sum("b")
println(res)//org.apache.flink.streaming.api.scala.DataStream@316acbb5
}
def compileCode(code: String, classpathDirectory: Option[AbstractFile], outputDirectory: AbstractFile): Unit = {
val settings = new Settings
classpathDirectory.foreach(dir => settings.classpath.prepend(dir.toString))
settings.outputDirs.setSingleOutput(outputDirectory)
settings.usejavacp.value = true
val global = new Global(settings)
(new global.Run).compileSources(List(new BatchSourceFile("(inline)", code)))
}
def callObjectMethod(objectName: String, directory: AbstractFile, methodName: String, args: Any*): Any = {
val runtimeMirror = getRuntimeMirror(directory)
val objectSymbol = runtimeMirror.staticModule(objectName)
val objectModuleMirror = runtimeMirror.reflectModule(objectSymbol)
val objectInstance = objectModuleMirror.instance
val objectType = objectSymbol.typeSignature
val methodSymbol = objectType.decl(TermName(methodName)).asMethod
val objectInstanceMirror = runtimeMirror.reflect(objectInstance)
val methodMirror = objectInstanceMirror.reflectMethod(methodSymbol)
methodMirror(args: _*)
}
def createClassInstance(className: String, directory: AbstractFile, args: Any*): Any = {
val runtimeMirror = getRuntimeMirror(directory)
val classSymbol = runtimeMirror.staticClass(className)
val classType = classSymbol.typeSignature
val constructorSymbol = classType.decl(termNames.CONSTRUCTOR).alternatives.head.asMethod
val classMirror = runtimeMirror.reflectClass(classSymbol)
val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
constructorMirror(args: _*)
}
def getRuntimeMirror(directory: AbstractFile): Mirror = {
val classLoader = new AbstractFileClassLoader(directory, this.getClass.getClassLoader)
universe.runtimeMirror(classLoader)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment