Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.library.ConnectedComponents;
import org.apache.flink.types.NullValue;
import org.s1ck.ldbc.LDBCConstants;
import org.s1ck.ldbc.LDBCToFlink;
import org.s1ck.ldbc.tuples.LDBCEdge;
import org.s1ck.ldbc.tuples.LDBCVertex;
public class ConnectedComponentsExample {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final LDBCToFlink ldbcToFlink = new LDBCToFlink(
// filter vertices with label “Person”
DataSet<LDBCVertex> ldbcVertices = ldbcToFlink.getVertices()
.filter(new VertexLabelFilter(LDBCConstants.VERTEX_CLASS_PERSON));
// filter edges with label “knows”
DataSet<LDBCEdge> ldbcEdges = ldbcToFlink.getEdges()
.filter(new EdgeLabelFilter(LDBCConstants.EDGE_CLASS_KNOWS));
// create Gelly vertices suitable for connected components
DataSet<Vertex<Long, Long>> vertices = VertexInitializer());
// create Gelly edges suitable for connected components
DataSet<Edge<Long, NullValue>> edges = EdgeInitializer());
// create Gelly graph
Graph<Long, Long, NullValue> g = Graph.fromDataSet(vertices, edges, env);
// run connected components on the subgraph for 10 iterations
DataSet<Vertex<Long, Long>> components = ConnectedComponents<Long, NullValue>(10));
// print the component id of the first 10 vertices
private static class VertexLabelFilter implements FilterFunction<LDBCVertex> {
private final String vertexLabel;
private VertexLabelFilter(String vertexLabel) {
this.vertexLabel = vertexLabel;
public boolean filter(LDBCVertex ldbcVertex) throws Exception {
return ldbcVertex.getLabel().equals(vertexLabel);
private static class EdgeLabelFilter implements FilterFunction<LDBCEdge> {
private final String edgeLabel;
private EdgeLabelFilter(String edgeLabel) {
this.edgeLabel = edgeLabel;
public boolean filter(LDBCEdge ldbcEdge) throws Exception {
return ldbcEdge.getLabel().equals(edgeLabel);
private static class VertexInitializer implements MapFunction<LDBCVertex, Vertex<Long, Long>> {
public Vertex<Long, Long> map(LDBCVertex ldbcVertex) throws
Exception {
return new Vertex<Long, Long>(ldbcVertex.getVertexId(), ldbcVertex.getVertexId());
private static class EdgeInitializer implements MapFunction<LDBCEdge, Edge<Long, NullValue>> {
public Edge<Long, NullValue> map(LDBCEdge ldbcEdge) throws Exception {
return new Edge<Long, NullValue>(ldbcEdge.getSourceVertexId(),
ldbcEdge.getTargetVertexId(), NullValue.getInstance());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment