Created
July 19, 2017 13:41
-
-
Save diegofreitas/ac9ce4a7b7a598b4b0a8a009d9203372 to your computer and use it in GitHub Desktop.
[APACHE FLINK] LocalStreamEnvironment where QueryableState is available
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.JobExecutionResult; | |
import org.apache.flink.configuration.*; | |
import org.apache.flink.runtime.jobgraph.JobGraph; | |
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; | |
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; | |
import org.apache.flink.streaming.api.graph.StreamGraph; | |
/** | |
* Created by diego.freitas on 7/18/17. | |
*/ | |
public class LocalQueryableStreamEnvironment extends LocalStreamEnvironment { | |
private Configuration config; | |
/** | |
* Creates a new local stream environment that configures its local executor with the given configuration. | |
* | |
* @param config The configuration used to configure the local executor. | |
*/ | |
public LocalQueryableStreamEnvironment(Configuration config) { | |
super(config); | |
this.config = config; | |
} | |
public static LocalQueryableStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) { | |
LocalQueryableStreamEnvironment currentEnvironment = new LocalQueryableStreamEnvironment(configuration); | |
currentEnvironment.setParallelism(parallelism); | |
return currentEnvironment; | |
} | |
/** | |
* Executes the JobGraph of the on a mini cluster of CLusterUtil with a user | |
* specified name. | |
* | |
* @param jobName | |
* name of the job | |
* @return The result of the job execution, containing elapsed time and accumulators. | |
*/ | |
@Override | |
public JobExecutionResult execute(String jobName) throws Exception { | |
// transform the streaming program into a JobGraph | |
StreamGraph streamGraph = getStreamGraph(); | |
streamGraph.setJobName(jobName); | |
JobGraph jobGraph = streamGraph.getJobGraph(); | |
Configuration configuration = new Configuration(); | |
configuration.addAll(jobGraph.getJobConfiguration()); | |
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L); | |
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); | |
// add (and override) the settings with what the user defined | |
configuration.addAll(this.config); | |
LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, false); | |
try { | |
exec.start(); | |
return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled()); | |
} | |
finally { | |
transformations.clear(); | |
exec.stop(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment