Skip to content

Instantly share code, notes, and snippets.

@sixhat
Last active May 29, 2018 09:00
Show Gist options
  • Save sixhat/a061e906688057030642458770af08b8 to your computer and use it in GitHub Desktop.
Save sixhat/a061e906688057030642458770af08b8 to your computer and use it in GitHub Desktop.
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.util.HashMap;
import java.util.Random;
/**
* This implements the process running the events.
* Run from command line as;
* java Lamport config-file-path node-id
*
* config-file is a list of host with the structure:
* id host udp-port
*
* @author "David Sousa-Rodrigues"
*/
public class Lamport {
public static final int NUM_EVENTS = 8;
/**
* Nested Class Node represents each node in the Distributed System.
*
* @author "David Sousa-Rodrigues"
*/
class Node {
private int id;
private String host;
private int port;
public Node() {
this.id = 0;
this.host = "";
this.port = 0;
}
public Node(int id, String host, int port) {
this.id = id;
this.host = host;
this.port = port;
}
/**
* A simple parser for each line in the config file.
*
* @param str host descripton
*/
public void parseNodeLine(String str) {
String[] elms = str.trim().split(" ");
if (elms.length != 3) {
System.err.println("Number of arguments is incorrect");
System.exit(3);
}
this.id = Integer.parseInt(elms[0]);
this.host = elms[1];
this.port = Integer.parseInt(elms[2]);
}
}
int myId;
String myHost;
int myPort;
private HashMap<Integer, Node> nodes;
private int lamportClock;
private Random rng;
private DatagramSocket udp;
public Lamport() {
lamportClock = 0;
rng = new Random();
}
/**
* This method increases the clock on every event. It needs to be synchronized
* because the sender and receiver threads will be concurrently updating this
* clock.
*
* @param remoteLC This is the remote Lamport Clock received (will be 0 if local)
*/
private synchronized void bumpLC(int remoteLC) {
lamportClock = 1 + Math.max(lamportClock, remoteLC);
}
/**
* Crates the datagram socket to send udp packets according to what was in the
* config file and creates the send and receive threads to perform the two
* tasks.
*/
private void setupNetworking() {
try {
udp = new DatagramSocket(nodes.get(myId).port);
Thread receiveT = new Thread(new Runnable() {
@Override
public void run() {
byte[] buf = new byte[1024];
DatagramPacket dp = new DatagramPacket(buf, buf.length);
while (true) {
try {
udp.receive(dp);
String[] data = new String(dp.getData(), 0, dp.getLength()).split(" ");
int remoteID = Integer.parseInt(data[0]);
int remoteLC = Integer.parseInt(data[1]);
bumpLC(remoteLC);
System.out.println(remoteID + "\t" + myId + "\t" + lamportClock + "\t <--");
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
Thread sendT = new Thread(new Runnable() {
@Override
public void run() {
try {
// Wait 5 seconds to let us launch the 4 different clocks from the console.
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < NUM_EVENTS; i++) {
sendEvent();
}
}
/**
* This is a helper function but the code could be inlined inside the for loop
* in the run method.
*/
private void sendEvent() {
bumpLC(0);
if (rng.nextBoolean()) {
System.out.println("\t\t" + lamportClock);
return;
}
String toPeers = myId + " " + lamportClock;
DatagramPacket toSend = new DatagramPacket(toPeers.getBytes(), toPeers.getBytes().length);
int destinationNode = rng.nextInt(nodes.size());
while (destinationNode == myId) {
destinationNode = rng.nextInt(nodes.size());
}
Node n = nodes.get(destinationNode);
toSend.setPort(n.port);
try {
toSend.setAddress(InetAddress.getByName(n.host));
udp.send(toSend);
System.out.println(myId + "\t" + n.id + "\t" + lamportClock);
} catch (IOException e) {
e.printStackTrace();
}
try {
Thread.sleep(rng.nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
receiveT.start();
sendT.start();
try {
sendT.join();
System.out.println("Sending Thread Stopped");
receiveT.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (SocketException e) {
e.printStackTrace();
}
}
private void loadConfigFile(String string) {
nodes = new HashMap<>();
try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(string)))) {
while (br.ready()) {
String line = br.readLine();
Node n = new Node();
n.parseNodeLine(line);
nodes.put(n.id, n);
}
} catch (FileNotFoundException e) {
System.err.println("Config File not Found");
System.exit(4);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
if (args.length != 2) {
System.err.println("Number of arguments must be 2 (config-file, self-id)");
System.exit(1);
}
try {
int id = Integer.parseInt(args[1]);
Lamport lc = new Lamport();
lc.myId = id;
lc.loadConfigFile(args[0]);
lc.setupNetworking();
} catch (NumberFormatException e) {
System.err.println("ID format is wrong");
System.exit(2);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment