Created
September 14, 2016 13:31
-
-
Save datafibers/4b842ebc5b3c9e754ceaf78695e7567e to your computer and use it in GitHub Desktop.
Flink Remote Issues
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.myorg.quickstart; | |
import org.apache.flink.api.common.functions.MapFunction; | |
import org.apache.flink.api.common.functions.FlatMapFunction; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.util.Collector; | |
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; | |
import org.apache.flink.configuration.Configuration; | |
public class WordCount { | |
public static void main(String[] args) throws Exception { | |
final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false); | |
cluster.start(); | |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 6123); | |
final long N = 10; | |
DataStream<Long> stream = env.generateSequence(1, N); | |
stream.map(new MapFunction<Long, Long>() { | |
@Override | |
public Long map(Long value) throws Exception { | |
return 2 * value; | |
} | |
}).print(); | |
env.execute("WC Test"); | |
} | |
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { | |
@Override | |
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { | |
// normalize and split the line | |
String[] tokens = value.toLowerCase().split("\\W+"); | |
// emit the pairs | |
for (String token : tokens) { | |
if (token.length() > 0) { | |
out.collect(new Tuple2<String, Integer>(token, 1)); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Above code can run locally (windows + IDEA) successfully on local mini-cluster.
Then, I comment out Line 16 and 17. I start a Flink in virtualbox either by start-local.sh or start-cluster.sh. However, my program is failed in IDEA this time. It complained about the unconnected to job manager as follows. Can anyone help me find out why it is failed? Do I need special setting for Flink in virtualbox. I already forward the JobManager port 6123 in virtualbox setting. I even compile the jar and run the jar inside of the virtualbox with same error.
09:16:00,631 INFO org.apache.flink.streaming.api.environment.RemoteStreamEnvironment - Running remotely at localhost:6123
09:16:01,220 INFO org.apache.flink.client.program.StandaloneClusterClient - Submitting job with JobID: 2e53f2e2f66f023c2f4748f7fe6d4545. Waiting for job completion.
Submitting job with JobID: 2e53f2e2f66f023c2f4748f7fe6d4545. Waiting for job completion.
09:16:01,220 INFO org.apache.flink.client.program.StandaloneClusterClient - Starting client actor system.
09:16:02,212 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
09:16:02,295 INFO Remoting - Starting remoting
09:16:02,530 INFO Remoting - Remoting started; listening on addresses :[akka.tcp://flink@127.0.0.1:56050]
09:16:02,542 INFO org.apache.flink.runtime.client.JobClientActor - Disconnect from JobManager null.
09:16:02,550 INFO org.apache.flink.runtime.client.JobClientActor - Received job Window WordCount (2e53f2e2f66f023c2f4748f7fe6d4545).
09:16:02,550 INFO org.apache.flink.runtime.client.JobClientActor - Could not submit job Window WordCount (2e53f2e2f66f023c2f4748f7fe6d4545), because there is no connection to a JobManager.
09:16:02,624 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@127.0.0.1:6123] has failed, address is now gated for [5000] ms. Reason is: [Association failed with [akka.tcp://flink@127.0.0.1:6123]].
Process finished with exit code 1