Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save diegofreitas/ac9ce4a7b7a598b4b0a8a009d9203372 to your computer and use it in GitHub Desktop.
Save diegofreitas/ac9ce4a7b7a598b4b0a8a009d9203372 to your computer and use it in GitHub Desktop.
[APACHE FLINK] LocalStreamEnvironment where QueryableState is available
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.*;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
/**
* Created by diego.freitas on 7/18/17.
*/
public class LocalQueryableStreamEnvironment extends LocalStreamEnvironment {
private Configuration config;
/**
* Creates a new local stream environment that configures its local executor with the given configuration.
*
* @param config The configuration used to configure the local executor.
*/
public LocalQueryableStreamEnvironment(Configuration config) {
super(config);
this.config = config;
}
public static LocalQueryableStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
LocalQueryableStreamEnvironment currentEnvironment = new LocalQueryableStreamEnvironment(configuration);
currentEnvironment.setParallelism(parallelism);
return currentEnvironment;
}
/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
* specified name.
*
* @param jobName
* name of the job
* @return The result of the job execution, containing elapsed time and accumulators.
*/
@Override
public JobExecutionResult execute(String jobName) throws Exception {
// transform the streaming program into a JobGraph
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
JobGraph jobGraph = streamGraph.getJobGraph();
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
// add (and override) the settings with what the user defined
configuration.addAll(this.config);
LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, false);
try {
exec.start();
return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled());
}
finally {
transformations.clear();
exec.stop();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment