Skip to content

Instantly share code, notes, and snippets.

@jungomi
Last active December 7, 2015 23:37
Show Gist options
  • Save jungomi/dd63bf71e7e0e5241098 to your computer and use it in GitHub Desktop.
Save jungomi/dd63bf71e7e0e5241098 to your computer and use it in GitHub Desktop.
Project 3 DS
package coin;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.protocols.TP;
public class CoinExchanger extends ReceiverAdapter {
private static final Random rand = new Random();
private static final int DEFAULT_NUM_THREADS = 4;
private static final int DEFAULT_COINS = 3;
private static int num_threads = DEFAULT_NUM_THREADS;
private static enum ExchangerState {
STARTING, RUNNING, ENDED;
}
private JChannel channel;
private View currentView;
private Address leader;
private Address myself;
private int coins;
private ExchangerState state = ExchangerState.STARTING;
public CoinExchanger(String name, int coins) throws Exception {
channel = new JChannel();
channel.setReceiver(this);
channel.setName(name);
this.coins = coins;
}
public void viewAccepted(View view) {
synchronized (this) {
currentView = view;
leader = currentView.getMembers().get(0);
System.out.format("** New view: %s\n", currentView);
}
}
public void receive(Message msg) {
synchronized (this) {
coins++;
System.out.format("<< [%s] received coin from [%s] (Coins: %d)\n",
channel.getName(), msg.getSrc(), coins);
}
}
public void mainLoop() throws Exception {
channel.connect("CoinExchangerGroup");
TP tp = channel.getProtocolStack().getTransport();
System.out.format("Transport layer: %s, total sent: %s, total received: %s\n",
channel.getName(), tp.getNumMessagesSent(),
tp.getNumMessagesReceived());
while (state != ExchangerState.ENDED) {
sendCoin();
updateThreadState();
Thread.sleep(1000);
}
System.out.format("== [%s] total sent: %s, total received: %s\n",
channel.getName(), tp.getNumMessagesSent(),
tp.getNumMessagesReceived());
synchronized (this) {
System.out.format("== [%s] Exiting with %d coins\n", channel.getName(), coins);
}
channel.close();
}
public void sendCoin() {
List<Address> members = currentView.getMembers();
Address recipient = members.get(rand.nextInt(members.size()));
// don't send to self (just keep it!)
if (recipient.equals(channel.getAddress())) {
return;
}
Message msg = new Message(recipient, null, "Coin");
try {
channel.send(msg);
synchronized (this) {
coins--;
System.out.format(">> [%s] sent coin to [%s] (coins: %d)\n",
channel.getName(), recipient, coins);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public synchronized void updateThreadState() {
if (currentView.getMembers().size() > 1) {
state = ExchangerState.RUNNING;
} else {
if (state == ExchangerState.RUNNING) {
state = ExchangerState.ENDED;
}
}
boolean hasInput = false;
try {
hasInput = (System.in.available() > 0);
} catch (IOException e) {
}
if (coins <= 0 || hasInput) {
state = ExchangerState.ENDED;
}
}
public static void main(String[] args) throws Exception {
int num_coins = DEFAULT_COINS;
if (args.length > 0) num_threads = Integer.parseInt(args[0]);
if (args.length > 1) num_coins = Integer.parseInt(args[1]);
System.setProperty("java.net.preferIPv4Stack", "true");
System.out.format("Using %d threads with %d coins\n", num_threads, num_coins);
List<Thread> threads = new ArrayList<Thread>(num_threads);
for (int i = 0; i < num_threads; i++) {
final String name = "Thread " + (i + 1);
final int coins = num_coins;
Thread t = new Thread() {
public void run() {
try {
CoinExchanger ce = new CoinExchanger(name, coins);
ce.mainLoop();
} catch (Exception e){
e.printStackTrace();
}
}
};
threads.add(t);
t.start();
}
for (Thread t : threads) {
t.join();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment