Skip to content

Instantly share code, notes, and snippets.

@s1ck
Last active November 16, 2015 12:00
Show Gist options
  • Save s1ck/b33e6a4874c15c35cd16 to your computer and use it in GitHub Desktop.
Save s1ck/b33e6a4874c15c35cd16 to your computer and use it in GitHub Desktop.
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.library.ConnectedComponents;
import org.apache.flink.types.NullValue;
import org.s1ck.ldbc.LDBCConstants;
import org.s1ck.ldbc.LDBCToFlink;
import org.s1ck.ldbc.tuples.LDBCEdge;
import org.s1ck.ldbc.tuples.LDBCVertex;
public class ConnectedComponentsExample {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final LDBCToFlink ldbcToFlink = new LDBCToFlink(
"/home/s1ck/Devel/Java/ldbc_snb_datagen/social_network",
env);
// filter vertices with label “Person”
DataSet<LDBCVertex> ldbcVertices = ldbcToFlink.getVertices()
.filter(new VertexLabelFilter(LDBCConstants.VERTEX_CLASS_PERSON));
// filter edges with label “knows”
DataSet<LDBCEdge> ldbcEdges = ldbcToFlink.getEdges()
.filter(new EdgeLabelFilter(LDBCConstants.EDGE_CLASS_KNOWS));
// create Gelly vertices suitable for connected components
DataSet<Vertex<Long, Long>> vertices = ldbcVertices.map(new VertexInitializer());
// create Gelly edges suitable for connected components
DataSet<Edge<Long, NullValue>> edges = ldbcEdges.map(new EdgeInitializer());
// create Gelly graph
Graph<Long, Long, NullValue> g = Graph.fromDataSet(vertices, edges, env);
// run connected components on the subgraph for 10 iterations
DataSet<Vertex<Long, Long>> components = g.run(new ConnectedComponents<Long, NullValue>(10));
// print the component id of the first 10 vertices
components.first(10).print();
}
private static class VertexLabelFilter implements FilterFunction<LDBCVertex> {
private final String vertexLabel;
private VertexLabelFilter(String vertexLabel) {
this.vertexLabel = vertexLabel;
}
public boolean filter(LDBCVertex ldbcVertex) throws Exception {
return ldbcVertex.getLabel().equals(vertexLabel);
}
}
private static class EdgeLabelFilter implements FilterFunction<LDBCEdge> {
private final String edgeLabel;
private EdgeLabelFilter(String edgeLabel) {
this.edgeLabel = edgeLabel;
}
public boolean filter(LDBCEdge ldbcEdge) throws Exception {
return ldbcEdge.getLabel().equals(edgeLabel);
}
}
@FunctionAnnotation.ForwardedFields("f0")
private static class VertexInitializer implements MapFunction<LDBCVertex, Vertex<Long, Long>> {
public Vertex<Long, Long> map(LDBCVertex ldbcVertex) throws
Exception {
return new Vertex<Long, Long>(ldbcVertex.getVertexId(), ldbcVertex.getVertexId());
}
}
@FunctionAnnotation.ForwardedFields("f2->f0;f3->f1")
private static class EdgeInitializer implements MapFunction<LDBCEdge, Edge<Long, NullValue>> {
public Edge<Long, NullValue> map(LDBCEdge ldbcEdge) throws Exception {
return new Edge<Long, NullValue>(ldbcEdge.getSourceVertexId(),
ldbcEdge.getTargetVertexId(), NullValue.getInstance());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment