Last active
November 16, 2015 12:00
-
-
Save s1ck/b33e6a4874c15c35cd16 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.functions.FilterFunction; | |
import org.apache.flink.api.common.functions.MapFunction; | |
import org.apache.flink.api.java.DataSet; | |
import org.apache.flink.api.java.ExecutionEnvironment; | |
import org.apache.flink.api.java.functions.FunctionAnnotation; | |
import org.apache.flink.graph.Edge; | |
import org.apache.flink.graph.Graph; | |
import org.apache.flink.graph.Vertex; | |
import org.apache.flink.graph.library.ConnectedComponents; | |
import org.apache.flink.types.NullValue; | |
import org.s1ck.ldbc.LDBCConstants; | |
import org.s1ck.ldbc.LDBCToFlink; | |
import org.s1ck.ldbc.tuples.LDBCEdge; | |
import org.s1ck.ldbc.tuples.LDBCVertex; | |
public class ConnectedComponentsExample { | |
public static void main(String[] args) throws Exception { | |
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); | |
final LDBCToFlink ldbcToFlink = new LDBCToFlink( | |
"/home/s1ck/Devel/Java/ldbc_snb_datagen/social_network", | |
env); | |
// filter vertices with label “Person” | |
DataSet<LDBCVertex> ldbcVertices = ldbcToFlink.getVertices() | |
.filter(new VertexLabelFilter(LDBCConstants.VERTEX_CLASS_PERSON)); | |
// filter edges with label “knows” | |
DataSet<LDBCEdge> ldbcEdges = ldbcToFlink.getEdges() | |
.filter(new EdgeLabelFilter(LDBCConstants.EDGE_CLASS_KNOWS)); | |
// create Gelly vertices suitable for connected components | |
DataSet<Vertex<Long, Long>> vertices = ldbcVertices.map(new VertexInitializer()); | |
// create Gelly edges suitable for connected components | |
DataSet<Edge<Long, NullValue>> edges = ldbcEdges.map(new EdgeInitializer()); | |
// create Gelly graph | |
Graph<Long, Long, NullValue> g = Graph.fromDataSet(vertices, edges, env); | |
// run connected components on the subgraph for 10 iterations | |
DataSet<Vertex<Long, Long>> components = g.run(new ConnectedComponents<Long, NullValue>(10)); | |
// print the component id of the first 10 vertices | |
components.first(10).print(); | |
} | |
private static class VertexLabelFilter implements FilterFunction<LDBCVertex> { | |
private final String vertexLabel; | |
private VertexLabelFilter(String vertexLabel) { | |
this.vertexLabel = vertexLabel; | |
} | |
public boolean filter(LDBCVertex ldbcVertex) throws Exception { | |
return ldbcVertex.getLabel().equals(vertexLabel); | |
} | |
} | |
private static class EdgeLabelFilter implements FilterFunction<LDBCEdge> { | |
private final String edgeLabel; | |
private EdgeLabelFilter(String edgeLabel) { | |
this.edgeLabel = edgeLabel; | |
} | |
public boolean filter(LDBCEdge ldbcEdge) throws Exception { | |
return ldbcEdge.getLabel().equals(edgeLabel); | |
} | |
} | |
@FunctionAnnotation.ForwardedFields("f0") | |
private static class VertexInitializer implements MapFunction<LDBCVertex, Vertex<Long, Long>> { | |
public Vertex<Long, Long> map(LDBCVertex ldbcVertex) throws | |
Exception { | |
return new Vertex<Long, Long>(ldbcVertex.getVertexId(), ldbcVertex.getVertexId()); | |
} | |
} | |
@FunctionAnnotation.ForwardedFields("f2->f0;f3->f1") | |
private static class EdgeInitializer implements MapFunction<LDBCEdge, Edge<Long, NullValue>> { | |
public Edge<Long, NullValue> map(LDBCEdge ldbcEdge) throws Exception { | |
return new Edge<Long, NullValue>(ldbcEdge.getSourceVertexId(), | |
ldbcEdge.getTargetVertexId(), NullValue.getInstance()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment