Created
January 29, 2015 10:56
-
-
Save aalexandrov/90bf21f66bf604676f37 to your computer and use it in GitHub Desktop.
The TypeSerializerInputFormat throws the following exception: The type returned by the input format could not be automatically determined.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.functions.RichFlatMapFunction | |
import org.apache.flink.api.common.typeinfo.BasicTypeInfo | |
import org.apache.flink.api.java.io.{TypeSerializerInputFormat, TypeSerializerOutputFormat} | |
import org.apache.flink.api.java.typeutils.TupleTypeInfo | |
import org.apache.flink.api.java.{ExecutionEnvironment, _} | |
import org.apache.flink.core.fs.FileSystem.WriteMode | |
import org.apache.flink.util.Collector | |
object SerializedFormatExample { | |
class Tokenizer(val separator: String) extends RichFlatMapFunction[String, tuple.Tuple2[String, Integer]] { | |
@Override | |
def flatMap(line: String, out: Collector[tuple.Tuple2[String, Integer]]): Unit = { | |
for (word: String <- line.split(separator)) { | |
out.collect(new tuple.Tuple2[String, Integer](word, 1)); | |
} | |
} | |
} | |
def main(args: Array[String]): Unit = { | |
if (args.length < 1) { | |
println("Usage: <jar> [remote|local]") | |
System.exit(-1) | |
} | |
val env = if (args(0) == "remote") | |
ExecutionEnvironment.createRemoteEnvironment("localhost", 6123) | |
else | |
ExecutionEnvironment.createLocalEnvironment() | |
val text = env | |
.fromElements( | |
"Who's there?", | |
"I think I hear them. Stand, ho! Who's there?") | |
.flatMap(new Tokenizer(" ")); | |
text.write(new TypeSerializerOutputFormat[tuple.Tuple2[String, Integer]], "file:///tmp/temp", WriteMode.OVERWRITE) | |
env.execute("tmp") | |
val tmp = { | |
val typeInformation = new TupleTypeInfo[tuple.Tuple2[String, Integer]]( | |
BasicTypeInfo.STRING_TYPE_INFO, | |
BasicTypeInfo.INT_TYPE_INFO) | |
val inFormat = new TypeSerializerInputFormat[org.apache.flink.api.java.tuple.Tuple2[String, Integer]](typeInformation.createSerializer()) | |
env.readFile(inFormat, "file:///tmp/temp") | |
} | |
val wordCounts = tmp | |
.groupBy(0) | |
.sum(1); | |
wordCounts.writeAsCsv("file:///tmp/output", WriteMode.OVERWRITE) | |
env.execute("Word Count Example") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment