Last active
May 29, 2018 09:00
-
-
Save sixhat/a061e906688057030642458770af08b8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.BufferedReader; | |
import java.io.FileInputStream; | |
import java.io.FileNotFoundException; | |
import java.io.IOException; | |
import java.io.InputStreamReader; | |
import java.net.DatagramPacket; | |
import java.net.DatagramSocket; | |
import java.net.InetAddress; | |
import java.net.SocketException; | |
import java.util.HashMap; | |
import java.util.Random; | |
/** | |
* This implements the process running the events. | |
* Run from command line as; | |
* java Lamport config-file-path node-id | |
* | |
* config-file is a list of host with the structure: | |
* id host udp-port | |
* | |
* @author "David Sousa-Rodrigues" | |
*/ | |
public class Lamport { | |
public static final int NUM_EVENTS = 8; | |
/** | |
* Nested Class Node represents each node in the Distributed System. | |
* | |
* @author "David Sousa-Rodrigues" | |
*/ | |
class Node { | |
private int id; | |
private String host; | |
private int port; | |
public Node() { | |
this.id = 0; | |
this.host = ""; | |
this.port = 0; | |
} | |
public Node(int id, String host, int port) { | |
this.id = id; | |
this.host = host; | |
this.port = port; | |
} | |
/** | |
* A simple parser for each line in the config file. | |
* | |
* @param str host descripton | |
*/ | |
public void parseNodeLine(String str) { | |
String[] elms = str.trim().split(" "); | |
if (elms.length != 3) { | |
System.err.println("Number of arguments is incorrect"); | |
System.exit(3); | |
} | |
this.id = Integer.parseInt(elms[0]); | |
this.host = elms[1]; | |
this.port = Integer.parseInt(elms[2]); | |
} | |
} | |
int myId; | |
String myHost; | |
int myPort; | |
private HashMap<Integer, Node> nodes; | |
private int lamportClock; | |
private Random rng; | |
private DatagramSocket udp; | |
public Lamport() { | |
lamportClock = 0; | |
rng = new Random(); | |
} | |
/** | |
* This method increases the clock on every event. It needs to be synchronized | |
* because the sender and receiver threads will be concurrently updating this | |
* clock. | |
* | |
* @param remoteLC This is the remote Lamport Clock received (will be 0 if local) | |
*/ | |
private synchronized void bumpLC(int remoteLC) { | |
lamportClock = 1 + Math.max(lamportClock, remoteLC); | |
} | |
/** | |
* Crates the datagram socket to send udp packets according to what was in the | |
* config file and creates the send and receive threads to perform the two | |
* tasks. | |
*/ | |
private void setupNetworking() { | |
try { | |
udp = new DatagramSocket(nodes.get(myId).port); | |
Thread receiveT = new Thread(new Runnable() { | |
@Override | |
public void run() { | |
byte[] buf = new byte[1024]; | |
DatagramPacket dp = new DatagramPacket(buf, buf.length); | |
while (true) { | |
try { | |
udp.receive(dp); | |
String[] data = new String(dp.getData(), 0, dp.getLength()).split(" "); | |
int remoteID = Integer.parseInt(data[0]); | |
int remoteLC = Integer.parseInt(data[1]); | |
bumpLC(remoteLC); | |
System.out.println(remoteID + "\t" + myId + "\t" + lamportClock + "\t <--"); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
}); | |
Thread sendT = new Thread(new Runnable() { | |
@Override | |
public void run() { | |
try { | |
// Wait 5 seconds to let us launch the 4 different clocks from the console. | |
Thread.sleep(5000); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
for (int i = 0; i < NUM_EVENTS; i++) { | |
sendEvent(); | |
} | |
} | |
/** | |
* This is a helper function but the code could be inlined inside the for loop | |
* in the run method. | |
*/ | |
private void sendEvent() { | |
bumpLC(0); | |
if (rng.nextBoolean()) { | |
System.out.println("\t\t" + lamportClock); | |
return; | |
} | |
String toPeers = myId + " " + lamportClock; | |
DatagramPacket toSend = new DatagramPacket(toPeers.getBytes(), toPeers.getBytes().length); | |
int destinationNode = rng.nextInt(nodes.size()); | |
while (destinationNode == myId) { | |
destinationNode = rng.nextInt(nodes.size()); | |
} | |
Node n = nodes.get(destinationNode); | |
toSend.setPort(n.port); | |
try { | |
toSend.setAddress(InetAddress.getByName(n.host)); | |
udp.send(toSend); | |
System.out.println(myId + "\t" + n.id + "\t" + lamportClock); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
try { | |
Thread.sleep(rng.nextInt(3000)); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
}); | |
receiveT.start(); | |
sendT.start(); | |
try { | |
sendT.join(); | |
System.out.println("Sending Thread Stopped"); | |
receiveT.join(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} catch (SocketException e) { | |
e.printStackTrace(); | |
} | |
} | |
private void loadConfigFile(String string) { | |
nodes = new HashMap<>(); | |
try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(string)))) { | |
while (br.ready()) { | |
String line = br.readLine(); | |
Node n = new Node(); | |
n.parseNodeLine(line); | |
nodes.put(n.id, n); | |
} | |
} catch (FileNotFoundException e) { | |
System.err.println("Config File not Found"); | |
System.exit(4); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
public static void main(String[] args) { | |
if (args.length != 2) { | |
System.err.println("Number of arguments must be 2 (config-file, self-id)"); | |
System.exit(1); | |
} | |
try { | |
int id = Integer.parseInt(args[1]); | |
Lamport lc = new Lamport(); | |
lc.myId = id; | |
lc.loadConfigFile(args[0]); | |
lc.setupNetworking(); | |
} catch (NumberFormatException e) { | |
System.err.println("ID format is wrong"); | |
System.exit(2); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment