Skip to content

Instantly share code, notes, and snippets.

@DmytroMitin
Last active September 29, 2020 21:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DmytroMitin/a23e45a546790630e838e60c7206adcd to your computer and use it in GitHub Desktop.
Save DmytroMitin/a23e45a546790630e838e60c7206adcd to your computer and use it in GitHub Desktop.
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.beans.BeanProperty
object App {
case class Test(
@BeanProperty var a: String,
@BeanProperty var b: Int,
@BeanProperty var c: String,
@BeanProperty var d: Long) {
def this() = {
this(null, 0, null, 0)
}
}
class MapFunc extends RichMapFunction[String, /*Test*/Any] {
override def map(value: String): /*Test*/Any = {
Test("aaa", 1, "ccc", 2L)
}
}
def main(args: Array[String]): Unit = {
implicit val typeInfo = TypeInformation.of(classOf[/*Test*/Any])
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("localhost", 9999)
println(stream.map(new MapFunc).keyBy("a", "c"))
//org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key.
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment