Skip to content

Instantly share code, notes, and snippets.

@wey-gu
Created March 1, 2022 07:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wey-gu/64e213d882c4b036ba2be5bd61c7050d to your computer and use it in GitHub Desktop.
Save wey-gu/64e213d882c4b036ba2be5bd61c7050d to your computer and use it in GitHub Desktop.
run Louvain in nebula-algorithm

Here is an example that I performed a Louvain algorithm, it's basically what I had done in this post: https://siwei.io/nebula-livejournal/

Nebula Console:

CREATE SPACE louvain (partition_num=15, replica_factor=1, vid_type=FIXED_STRING(256)) comment="louvain test";

create tag user() comment = 'user';

create edge relation(weight int)  COMMENT = 'user relation'; 


INSERT VERTEX user() VALUE "110":(), "211": (), "312": (), "413": (), "514":(), "615":();
INSERT VERTEX user() VALUE "710":(), "811": (), "912": (), "1013": (), "1114":(), "1215":(); 
INSERT VERTEX user() VALUE "1310":(), "1411": (), "1512": (), "1613": (), "1714":(), "1815":(); 
INSERT VERTEX user() VALUE "1910":(), "2011": (), "2112": (), "2213": (), "2314":(), "2415":(); 
INSERT VERTEX user() VALUE "2510":(), "2611": (), "2712": (), "2813": (), "2914":(), "3015":(); 
INSERT VERTEX user() VALUE "3110":(), "3211": (), "3312": (), "3413": (), "3514":(), "3615":(); 
INSERT VERTEX user() VALUE "3710":(), "3811": (), "3912": (), "4013": (), "4114":(), "4215":(); 
INSERT VERTEX user() VALUE "4310":(), "4411": (), "4512": (), "4613": (), "4714":(), "4815":(); 
INSERT VERTEX user() VALUE "4910":(), "5011": (), "5112": (), "5213": (), "5314":(), "5415":(); 
INSERT VERTEX user() VALUE "5510":(), "5611": (), "5712": (), "5813": (), "5914":(), "6015":(); 

INSERT EDGE relation(weight) VALUES "110"->"211":(1);
INSERT EDGE relation(weight) VALUES "312"->"710":(2);
INSERT EDGE relation(weight) VALUES "811"->"912":(3);
INSERT EDGE relation(weight) VALUES "1013"->"1114":(4);
INSERT EDGE relation(weight) VALUES "1215"->"1310":(5);
INSERT EDGE relation(weight) VALUES "1411"->"1512":(6);
INSERT EDGE relation(weight) VALUES "1613"->"1714":(7);
INSERT EDGE relation(weight) VALUES "1815"->"1910":(8);
INSERT EDGE relation(weight) VALUES "2011"->"2112":(9);
INSERT EDGE relation(weight) VALUES "2213"->"2314":(10);
INSERT EDGE relation(weight) VALUES "2415"->"2510":(11);
INSERT EDGE relation(weight) VALUES "2611"->"2712":(12);
INSERT EDGE relation(weight) VALUES "2813"->"6015" :(13);
INSERT EDGE relation(weight) VALUES "2914"->"3015":(14);
INSERT EDGE relation(weight) VALUES "3110"->"3211":(15);
INSERT EDGE relation(weight) VALUES "3312"->"3413":(16);
INSERT EDGE relation(weight) VALUES "3514"->"3615":(17);
INSERT EDGE relation(weight) VALUES "3710"->"3811":(18);
INSERT EDGE relation(weight) VALUES "3912"->"4013":(19);
INSERT EDGE relation(weight) VALUES "4114"->"4215":(20);
INSERT EDGE relation(weight) VALUES "413"->"514":(21);
INSERT EDGE relation(weight) VALUES "615"->"710":(22);
INSERT EDGE relation(weight) VALUES "811"->"912":(23);
INSERT EDGE relation(weight) VALUES "1013"->"1114":(24);
INSERT EDGE relation(weight) VALUES "1215"->"1310":(25);
INSERT EDGE relation(weight) VALUES "1411"->"1512":(26);
INSERT EDGE relation(weight) VALUES "1613"->"1714":(27);
INSERT EDGE relation(weight) VALUES "1815"->"1910":(28);
INSERT EDGE relation(weight) VALUES "2011"->"2112":(29);
INSERT EDGE relation(weight) VALUES "2213"->"2314":(30);
INSERT EDGE relation(weight) VALUES "2415"->"2510":(31);
INSERT EDGE relation(weight) VALUES "2611"->"2712":(32);
INSERT EDGE relation(weight) VALUES "2813"->"2914":(33);
INSERT EDGE relation(weight) VALUES "3015"->"3110":(34);
INSERT EDGE relation(weight) VALUES "3211"->"3312":(35);
INSERT EDGE relation(weight) VALUES "3413"->"3514":(36);
INSERT EDGE relation(weight) VALUES "3615"->"3710":(37);
INSERT EDGE relation(weight) VALUES "3811"->"3912":(38);
INSERT EDGE relation(weight) VALUES "4013"->"4114":(39);
INSERT EDGE relation(weight) VALUES "4215"->"4310":(40);
INSERT EDGE relation(weight) VALUES "4411"->"4512":(41);
INSERT EDGE relation(weight) VALUES "4613"->"4714":(42);
INSERT EDGE relation(weight) VALUES "4815"->"4910":(43);
INSERT EDGE relation(weight) VALUES "5011"->"5112":(44);
INSERT EDGE relation(weight) VALUES "5213"->"5314":(45);
INSERT EDGE relation(weight) VALUES "5415"->"710":(46);
INSERT EDGE relation(weight) VALUES "811"->"912":(47);
INSERT EDGE relation(weight) VALUES "1013"->"1114":(48);
INSERT EDGE relation(weight) VALUES "1215"->"1310":(49);
INSERT EDGE relation(weight) VALUES "1411"->"1512":(50);
INSERT EDGE relation(weight) VALUES "1613"->"1714":(51);
INSERT EDGE relation(weight) VALUES "1815"->"1910":(52);
INSERT EDGE relation(weight) VALUES "2011"->"2112":(53);
INSERT EDGE relation(weight) VALUES "2213"->"2314":(54);
INSERT EDGE relation(weight) VALUES "2415"->"2510":(55);
INSERT EDGE relation(weight) VALUES "2611"->"2712":(56);
INSERT EDGE relation(weight) VALUES "2813"->"2914":(57);
INSERT EDGE relation(weight) VALUES "3015"->"3110":(58);
INSERT EDGE relation(weight) VALUES "3211"->"3312":(59);
INSERT EDGE relation(weight) VALUES "3413"->"3514":(60);
INSERT EDGE relation(weight) VALUES "3615"->"3710":(61);
INSERT EDGE relation(weight) VALUES "3811"->"3912":(62);
INSERT EDGE relation(weight) VALUES "4013"->"4114":(63);
INSERT EDGE relation(weight) VALUES "4215"->"5510":(64);
INSERT EDGE relation(weight) VALUES "5611"->"5712":(65);
INSERT EDGE relation(weight) VALUES "5813"->"5914":(66);


create algorithm env in docker

cd ~
mkdir -p test/nebula-algorithm
cd test/nebula-algorithm

docker run --name spark-master --network nebula-docker-compose_nebula-net \
    -h spark-master -e ENABLE_INIT_DAEMON=false -d \
    -v ${HOME}/test/nebula-algorithm/:/root \
    bde2020/spark-master:2.4.5-hadoop2.7

wget https://repo1.maven.org/maven2/com/vesoft/nebula-algorithm/2.6.2/nebula-algorithm-2.6.2.jar
vim algo-louvain.conf

docker exec -it spark-master bash
cd /root

/spark/bin/spark-submit --master "local" --conf spark.rpc.askTimeout=6000s \
    --class com.vesoft.nebula.algorithm.Main \
    --driver-memory 16g nebula-algorithm-2.6.2.jar \
    -p algo-louvain.conf
...

22/01/10 10:34:05 INFO TaskSetManager: Starting task 0.0 in stage 796.0 (TID 1000, localhost, executor driver, partition 0, ANY, 7767 bytes)
22/01/10 10:34:05 INFO Executor: Running task 0.0 in stage 796.0 (TID 1000)
22/01/10 10:34:05 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks including 2 local blocks and 0 remote blocks
22/01/10 10:34:05 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
22/01/10 10:34:05 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
22/01/10 10:34:05 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
22/01/10 10:34:05 INFO FileOutputCommitter: Saved output of task 'attempt_20220110103405_0796_m_000000_1000' to file:/output/_temporary/0/task_20220110103405_0796_m_000000
22/01/10 10:34:05 INFO SparkHadoopMapRedUtil: attempt_20220110103405_0796_m_000000_1000: Committed
22/01/10 10:34:05 INFO Executor: Finished task 0.0 in stage 796.0 (TID 1000). 1654 bytes result sent to driver
22/01/10 10:34:05 INFO TaskSetManager: Finished task 0.0 in stage 796.0 (TID 1000) in 120 ms on localhost (executor driver) (1/1)
22/01/10 10:34:05 INFO TaskSchedulerImpl: Removed TaskSet 796.0, whose tasks have all completed, from pool
22/01/10 10:34:05 INFO DAGScheduler: ResultStage 796 (csv at AlgoWriter.scala:53) finished in 0.147 s
22/01/10 10:34:05 INFO DAGScheduler: Job 22 finished: csv at AlgoWriter.scala:53, took 0.309399 s
22/01/10 10:34:05 INFO FileFormatWriter: Write Job 658734e4-ce53-4ca8-92cd-6d7f9421fc54 committed.
22/01/10 10:34:05 INFO FileFormatWriter: Finished processing stats for write job 658734e4-ce53-4ca8-92cd-6d7f9421fc54.
22/01/10 10:34:05 INFO SparkContext: Invoking stop() from shutdown hook
22/01/10 10:34:05 INFO SparkUI: Stopped Spark web UI at http://spark-master:4040
22/01/10 10:34:06 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/01/10 10:34:06 INFO MemoryStore: MemoryStore cleared
22/01/10 10:34:06 INFO BlockManager: BlockManager stopped
22/01/10 10:34:06 INFO BlockManagerMaster: BlockManagerMaster stopped
22/01/10 10:34:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/01/10 10:34:06 INFO SparkContext: Successfully stopped SparkContext
22/01/10 10:34:06 INFO ShutdownHookManager: Shutdown hook called
22/01/10 10:34:06 INFO ShutdownHookManager: Deleting directory /tmp/spark-0bdbb853-cbc8-4876-827c-937e1748f6a9
22/01/10 10:34:06 INFO ShutdownHookManager: Deleting directory /tmp/spark-0ab58035-9c32-4b2e-a649-4a34d67a4d06

bash-5.0# ls -l /output/
total 4
-rw-r--r--    1 root     root             0 Jan 10 10:34 _SUCCESS
-rw-r--r--    1 root     root           192 Jan 10 10:34 part-00000-01c6be1f-b8be-4e68-b708-156bab8ec2b6-c000.csv
bash-5.0# head /output/part-00000-01c6be1f-b8be-4e68-b708-156bab8ec2b6-c000.csv
_id,louvain
6015,2813
2914,2813
2813,2813
3514,3015
3413,3015
3312,3015
4114,3015
3211,3015

ref:

algo-louvain.conf

{
  # Spark relation config
  spark: {
    app: {
        name: louvain
        # spark.app.partitionNum
        partitionNum:10
    }
    master:local
  }

  data: {
    # data source. optional of nebula,csv,json
    source: nebula
    # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
    sink: csv
    # if your algorithm needs weight
    hasWeight: true
  }

  # Nebula Graph relation config
  nebula: {
    # algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
    read: {
        # Nebula metad server address, multiple addresses are split by English comma
        metaAddress: "172.20.0.3:9559"
        # Nebula space
        space: louvain
        # Nebula edge types, multiple labels means that data from multiple edges will union together
        labels: ["relation"]
        # Nebula edge property name for each edge type, this property will be as weight col for algorithm.
        # Make sure the weightCols are corresponding to labels.
        weightCols: ["weight"]
    }

    # algo result sink into Nebula. If data.sink is nebula, then this nebula.write config can be valid.
    write:{
        # Nebula graphd server address, multiple addresses are split by English comma
        graphAddress: "graphd:9669"
        # Nebula metad server address, multiple addresses are split by English comma
        metaAddress: "172.20.0.3:9559,172.20.0.4:9559,172.20.0.2:9559"
        user:root
        pswd:nebula
        # Nebula space name
        space:algo
        # Nebula tag name, the algorithm result will be write into this tag
        tag:louvain
        type:insert
    }
  }

  local: {
    # algo's data source from Nebula. If data.source is csv or json, then this local.read can be valid.
    read:{
        filePath: "hdfs://10.1.1.168:9000/edge/work_for.csv"
        # srcId column
        srcId:"_c0"
        # dstId column
        dstId:"_c1"
        # weight column
        #weight: "col3"
        # if csv file has header
        header: false
        # csv file's delimiter
        delimiter:","
    }

    # algo result sink into local file. If data.sink is csv or text, then this local.write can be valid.
    write:{
        resultPath:/output/
    }
  }


  algorithm: {
    # the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent,
    # labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount,
    # betweenness]
    executeAlgo: louvain

    # Louvain parameter
    louvain: {
        maxIter: 20
        internalIter: 10
        tol: 0.5
   }

   # connected component parameter.
    connectedcomponent: {
        maxIter: 20
   }

   # LabelPropagation parameter
    labelpropagation: {
        maxIter: 20
   }

   # ShortestPaths parameter
    shortestpaths: {
        # several vertices to compute the shortest path to all vertices.
        landmarks: "1"
   }

    # Vertex degree statistics parameter
    degreestatic: {}

   # KCore parameter
   kcore:{
        maxIter:10
        degree:1
   }

   # Trianglecount parameter
   trianglecount:{}

   # Betweenness centrality parameter
   betweenness:{
        maxIter:5
   }
 }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment