Skip to content

Instantly share code, notes, and snippets.

@aalexandrov
Created January 29, 2015 10:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aalexandrov/90bf21f66bf604676f37 to your computer and use it in GitHub Desktop.
Save aalexandrov/90bf21f66bf604676f37 to your computer and use it in GitHub Desktop.
The TypeSerializerInputFormat throws the following exception: The type returned by the input format could not be automatically determined.
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.{TypeSerializerInputFormat, TypeSerializerOutputFormat}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.api.java.{ExecutionEnvironment, _}
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.util.Collector
object SerializedFormatExample {
class Tokenizer(val separator: String) extends RichFlatMapFunction[String, tuple.Tuple2[String, Integer]] {
@Override
def flatMap(line: String, out: Collector[tuple.Tuple2[String, Integer]]): Unit = {
for (word: String <- line.split(separator)) {
out.collect(new tuple.Tuple2[String, Integer](word, 1));
}
}
}
def main(args: Array[String]): Unit = {
if (args.length < 1) {
println("Usage: <jar> [remote|local]")
System.exit(-1)
}
val env = if (args(0) == "remote")
ExecutionEnvironment.createRemoteEnvironment("localhost", 6123)
else
ExecutionEnvironment.createLocalEnvironment()
val text = env
.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?")
.flatMap(new Tokenizer(" "));
text.write(new TypeSerializerOutputFormat[tuple.Tuple2[String, Integer]], "file:///tmp/temp", WriteMode.OVERWRITE)
env.execute("tmp")
val tmp = {
val typeInformation = new TupleTypeInfo[tuple.Tuple2[String, Integer]](
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO)
val inFormat = new TypeSerializerInputFormat[org.apache.flink.api.java.tuple.Tuple2[String, Integer]](typeInformation.createSerializer())
env.readFile(inFormat, "file:///tmp/temp")
}
val wordCounts = tmp
.groupBy(0)
.sum(1);
wordCounts.writeAsCsv("file:///tmp/output", WriteMode.OVERWRITE)
env.execute("Word Count Example")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment