Skip to content

Instantly share code, notes, and snippets.

@dlutwuwei
Created May 22, 2014 13:51
Show Gist options
  • Save dlutwuwei/7acea660d02706e1c820 to your computer and use it in GitHub Desktop.
Save dlutwuwei/7acea660d02706e1c820 to your computer and use it in GitHub Desktop.
chord代码解析

使用peersim已经有一段时间了,但是对如何能够编制高效程序,使用peersim提供的接口还有很多不足,今天开始学习官网上提供的协议模拟包,希望能够学习到peersim高手的一些技巧。

CreateNw:对chord协议进行初始化,构造chord环的后继表,初始化finger表; ChordProtocol:处理FinalMessage和LookupMessage,维持chord网络拓扑结构; TrafficGenerator:随机产生查询消息,制造网络流量; MessageObserver:观察chord网络的消息量和查询成功率等;

以上是主要的五个需要配置的类,其他类主要是一些辅助类: ChordMessage:是一个接口,由FinalMessage和LookupMessage实现; Parameters:封装两个id,PID协议ID,TID传输层ID(事件驱动特用); NodeComparator:实现比较node对象以chordid的大小比较大小;

###一、配置文件讲解:

# PEERSIM CHORD

# random.seed 1234567890
simulation.endtime 10^6
simulation.logtime 10^6

simulation.experiments 1

network.size 5000
#配置传输层协议,消息的最小延迟为0,最大为100
protocol.tr UniformRandomTransport
{
     mindelay 0
     maxdelay 100
}
#配置chord协议,加上传输层协议
protocol.my ChordProtocol
{
     transport tr
}

#网络流传生成,这个contorl就是不停地查找其他节点
control.traffic TrafficGenerator
{
     protocol my
     step 100
}

#初始化chord环
init.create CreateNw  
{
     protocol my
     idLength 128
     succListSize 12
}

control.observer MessageCounterObserver
{
     protocol my
     step 90000
}
#制造网络动态性环境
control.dnet DynamicNetwork
{
add 20
add -25
minsize 3000
maxsize 7000
step 100000
init.0 ChordInitializer#节点加入网络时调用
{     
   protocol my
}
}

###二、各个类的代码分析

####createNw 下面是createNew的执行方法:

public boolean execute() { 
        for (int i = 0; i < Network.size(); i++) { 
             Node node = (Node) Network.get(i); 
             ChordProtocol cp = (ChordProtocol) node.getProtocol(pid); 
             cp.m = idLength; 
             cp.succLSize = successorLsize; 
             cp.varSuccList = 0; 
             cp.chordId = new BigInteger(idLength, CommonState.r);//id 
             cp.fingerTable = new Node[idLength]; 
             cp.successorList = new Node[successorLsize]; 
         } 
         NodeComparator nc = new NodeComparator(pid); 
         Network.sort(nc);//按照chordID排序,网络中节点的排序和chordid一致; 
         createFingerTable(); 
        return false; 
     }

这个类主要进行的就是初始化节点和chord协议的finger table

分布式哈希表的原理,大家应该知道吧,不懂地google去!successorList保存几个后继节点,对于chord不是必要的,可以防止后继节点丢失。finger table是记录网络总节点的路由表。

先看下面的方法,是在nodeOne 和 nodeTwo 之间找节点的方法,

public Node findId(BigInteger id, int nodeOne, int nodeTwo)  

通过折半查找,找到在nodeone和nodetwo之间ID==id的节点,在createFingerTable中使用, createFingerTable()初始化后继表和finger table下面是其方法体:

/** 
 * 构建finger表,初始化大小为idLength,实际使用很少一部分。 
 */ 
public void createFingerTable() { 
    BigInteger idFirst = ((ChordProtocol) Network.get(0).getProtocol(pid)).chordId; 
    BigInteger idLast = ((ChordProtocol) Network.get(Network.size() - 1).getProtocol(pid)).chordId; 

    //网络中所有节点构成一个环,从idFirst 到 idLast 
    for (int i = 0; i < Network.size(); i++) { 

        Node node = (Node) Network.get(i); 
        ChordProtocol cp = (ChordProtocol) node.getProtocol(pid); 
         
        //节点的后继表初始化为节点后的successorlsize个节点,前驱只保存一个节点 
        for (int a = 0; a < successorLsize; a++) { 
            if (a + i < (Network.size() - 1)) 
                 cp.successorList[a] = Network.get(a + i + 1); 
            else 
                 cp.successorList[a] = Network.get(a); 
         } 
        if (i > 0) 
             cp.predecessor = (Node) Network.get(i - 1); 
        else 
             cp.predecessor = (Node) Network.get(Network.size() - 1); 
        int j = 0; 
        
        /*
            fingertalbe初始化,符合chord原理,指示范围按2的指数增长。
            0|  fingertalbe[0]    |2
            3|  fingertalbe[1]    |4
            5|  fingertalbe[2]    |8
            9|  fingertalbe[3]    |16
            17| fingertalbe[4]    |32
            33| fingertalbe[5]    |64
            65| fingertalbe[6]    |128
            ......
        */
        for (j = 0; j < idLength; j++) { 
            BigInteger base; 
            if (j == 0) 
                 base = BigInteger.ONE; 
            else { 
                 base = BigInteger.valueOf(2); 
                for (int exp = 1; exp < j; exp++) { 
                     base = base.multiply(BigInteger.valueOf(2)); 
                } 
            } 
            // base = 2^j
            BigInteger pot = cp.chordId.add(base); 
            if (pot.compareTo(idLast) == 1) {//已走过一个环 
                 pot = (pot.mod(idLast)); 
                if (pot.compareTo(cp.chordId) != -1) {//超过或刚走到当前节点 
                    break; 
                } 
                if (pot.compareTo(idFirst) == -1) {//走到first节点和last节点中的一个。 
                     cp.fingerTable[j] = Network.get(Network.size() - 1); 
                    continue; 
                } 
             } 
             cp.fingerTable[j] = findId(pot, 0, Network.size() - 1); 
         } 
     } 
} 
} 

了解分布式哈希表的同学应该能看懂上面的代码,这里讲解一下: 每个节点都有一个finger table,都需要初始化, cp.fingerTable[j] = findId(pot, 0, Network.size() - 1);找到id == pot的节点,作为2^(j-1)到2^j区域的头节点。

###ChordProtocol 这个类为模拟chord协议进行主要实现过程。 下面的参数node就是运行此协议的节点。 这里的过程很多,完成了搜索消息的转发和路由修正,如果下一跳路由失踪了(下线),则执行修正。

主要执行过程为

public void processEvent(Node node, int pid, Object event) { 
    p.pid = pid; 
    currentNode = node.getIndex(); 
    if (event.getClass() == LookUpMessage.class) { 
        LookUpMessage message = (LookUpMessage) event; 
        message.increaseHopCounter(); 
        BigInteger target = message.getTarget(); 
        Transport t = (Transport) node.getProtocol(p.tid); 
        Node n = message.getSender(); 
        if (target == ((ChordProtocol) node.getProtocol(pid)).chordId) { 
            // 消息传送到目标,则返回结束消息,结束消息中含有LookUpMessage的有关信息。 
             t.send(node, n, new FinalMessage(message.getHopCounter()), pid); 
         } 
        if (target != ((ChordProtocol) node.getProtocol(pid)).chordId) { 
            // 还没有找到目标节点,消息传送到后继节点。 
             Node dest = find_successor(target);               
            if (dest.isUp() == false) { 
            // 如果下一跳的节点不存在,修正chord拓扑,直到找到下一跳。 
                do { 
                     varSuccList = 0; 
                     stabilize(node); //稳定化
                     stabilizations++; 
                     fixFingers(); //修正路由表
                     dest = find_successor(target); 
                 } while (dest.isUp() == false); 
             }             
            // 如果找到的下一跳节点已经超过了target,则表示target不存在,失败次数加1;否则继续往后寻找。 
            if (dest.getID() == successorList[0].getID() 
                     && (target.compareTo(((ChordProtocol) dest 
                             .getProtocol(p.pid)).chordId) < 0)) { 
                 fails++; 
             } else { 
                 t.send(message.getSender(), dest, message, pid); 
             } 
         } 
    } 
    //int[] tmp; 
    if (event.getClass() == FinalMessage.class) { 
         FinalMessage message = (FinalMessage) event; 
        /*
        tmp = new int[index + 1]; 
        System.arraycopy(lookupMessage, 0, tmp, 0, index) ; 
        lookupMessage = tmp;
        */ 
        lookupMessage = new int[index+1];//一个低级BUG。这样min永远为0。 
        lookupMessage[index] = message.getHopCounter(); 
        index++; 
        if(index > 3){ 
            System.out.println(); 
         } 
    } 
    //this.printFingers(); 
} 

首先判断event是FinalMessage还是LookUpMessage,执行相应的操作,但是在event.getclass()==FinalMessage.class时,作者的程序有问题,lookupMessage是一个记录每个LookUpMessage经历跳数的数组,由FinalMessage携带传回,按照作者的程序,每次只能记录最后一个消息的跳数,其他的均被清零,注释里是修改后的程序。

下面对上面chord运行方法调用的其他方法做一个注释: find_successor: 这个方法很重要功能是在finger table中寻找后继节点,寻找id的后继,通过调用closet_preciding_node来获取。

public Node find_successor(BigInteger id) { 
    try { 
        if (successorList[0] == null || successorList[0].isUp() == false) { 
             updateSuccessor(); 
         } 
        if (idInab(id, this.chordId, ((ChordProtocol) successorList[0] 
                 .getProtocol(p.pid)).chordId)) { 
            return successorList[0]; 
         } else { 
             Node tmp = closest_preceding_node(id); 
            return tmp; 
         } 
     } catch (Exception e) { 
         e.printStackTrace(); 
     } 
    return successorList[0]; 
 } 

stablize过程是后继节点失效后触发的,find_successor找到的后继失效,只能用successorlist里保存的节点试试,如果也不好使就调用updateSuccessorList()找其他保存的后继。

/** 
 * 修正mynode的后继。如果mynode后继的前驱存在:1)就是当前节点,结束,2)否则先修正自己的后继,再通知后继修正前驱。 
 * @param myNode 
 */ 
public void stabilize(Node myNode) { 
    try { 
        //获取后继的前驱 node
        Node node = ((ChordProtocol) successorList[0].getProtocol(p.pid)).predecessor;
        if (node != null) { 
            //如果后继的前驱是自己,则正常,不用修正
            if (this.chordId == ((ChordProtocol) node.getProtocol(p.pid)).chordId)
                return; 
                
            BigInteger remoteID = ((ChordProtocol) node.getProtocol(p.pid)).chordId;
            
            //如果node既不是自己,也不是后继自身,则令其为MYNODE的后继 
            if (idInab(remoteID, chordId, ((ChordProtocol) successorList[0].getProtocol(p.pid)).chordId)) 
                 successorList[0] = node; 
        
             ((ChordProtocol) successorList[0].getProtocol(p.pid)) 
                     .notify(myNode);//令mynode为后继的新的前驱 
         } 
         updateSuccessorList(); 
     } catch (Exception e1) { 
         e1.printStackTrace(); 
         updateSuccessor(); 
     } 
 } 

后继修改完了,该修正路由表了,这里只修正自己的路由表,分布式就好在这,其他节点发现后继变化也会自动修复:

/** 
 * 在stablize的基础上随正自己的路由表项,虽然不能保证路由表最新,但对提高定位效率很有意义。 
*/ 
public void fixFingers() { 
    if (next >= m - 1) 
         next = 0; 
    if (fingerTable[next] != null && fingerTable[next].isUp()) { 
         next++; 
        return; 
    } 
    
    BigInteger base; 
    if (next == 0) 
        base = BigInteger.ONE; 
    else { 
        base = BigInteger.valueOf(2); 
        for (int exp = 1; exp < next; exp++) { 
             base = base.multiply(BigInteger.valueOf(2)); 
         } 
     } 
     BigInteger pot = this.chordId.add(base); 
     BigInteger idFirst = ((ChordProtocol) Network.get(0).getProtocol(p.pid)).chordId; 
     BigInteger idLast = ((ChordProtocol) Network.get(Network.size() - 1) 
             .getProtocol(p.pid)).chordId; 

    //如果路由表项指针超过idLast,则mod(idlast);等等; 
    if (pot.compareTo(idLast) == 1) { 
         pot = (pot.mod(idLast)); 
        if (pot.compareTo(this.chordId) != -1) { 
             next++; 
            return; 
         } 
        if (pot.compareTo(idFirst) == -1) { 
            this.fingerTable[next] = Network.get(Network.size() - 1); 
             next++; 
            return; 
         } 
     } 
    do {
        //修正路由表项 
        fingerTable[next] = ((ChordProtocol) successorList[0] .getProtocol(p.pid)).find_successor(pot); 
        pot = pot.subtract(BigInteger.ONE);
        
        //指针回退一个节点,后继继续修正;
        ((ChordProtocol) successorList[0].getProtocol(p.pid)).fixFingers(); 
     } while (fingerTable[next] == null || fingerTable[next].isUp() == false); 
     next++; 
 } 

Written with StackEdit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment