Skip to content

Instantly share code, notes, and snippets.

@jooyunghan
Created September 19, 2012 05:01
Show Gist options
  • Save jooyunghan/3747769 to your computer and use it in GitHub Desktop.
Save jooyunghan/3747769 to your computer and use it in GitHub Desktop.
Main.java
package com.jooyunghan.concurrency;
import static java.lang.String.format;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class Message {
public Message(int sender, BlockingQueue<Boolean> wait, int data) {
this.sender = sender;
this.wait = wait;
this.data = data;
}
BlockingQueue<Boolean> wait;
int sender;
int data;
}
class Producer implements Runnable {
private int max;
private BlockingQueue<Message> out;
public Producer(int max, BlockingQueue<Message> out) {
this.max = max;
this.out = out;
}
@Override
public void run() {
int i = 1;
BlockingQueue<Boolean> wait = new ArrayBlockingQueue<Boolean>(1);
while (true) {
try {
out.put(new Message(max, wait, i));
i++;
if (i > max)
i = 1;
wait.take();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
class Assembler implements Runnable {
private BlockingQueue<Message> in;
private BlockingQueue<String> out;
public Assembler(BlockingQueue<Message> in, BlockingQueue<String> out) {
this.in = in;
this.out = out;
}
public void run() {
while (true) {
try {
Message first = in.take();
Message second = in.take();
first.wait.put(true);
second.wait.put(true);
if (first.sender == 12)
out.put(format("%d-%d", first.data, second.data));
else
out.put(format("%d-%d", second.data, first.data));
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
class Consumer implements Runnable {
private BlockingQueue<String> in;
public Consumer(BlockingQueue<String> in) {
this.in = in;
}
public void run() {
int count = 0;
while (true) {
try {
String x = in.take();
count++;
System.out.println(count + ": Read " + x);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
final BlockingQueue<Message> a = new ArrayBlockingQueue<Message>(1);
final BlockingQueue<String> c = new ArrayBlockingQueue<String>(1);
final Runnable p1 = new Producer(12, a);
final Runnable p2 = new Producer(10, a);
final Runnable assembler = new Assembler(a, c);
final Runnable consumer = new Consumer(c);
ExecutorService e = Executors.newCachedThreadPool();
e.submit(p1);
e.submit(p2);
e.submit(assembler);
e.submit(consumer);
Thread.sleep(1000);
e.shutdownNow();
}
}
@jooyunghan
Copy link
Author

This code models as following:

  • two producers generate integers
  • assembler gathers integers from producers and sends 'assembled' string to consumer
  • consumer reads string from assembler and prints out it

Producers share a single blocking queue. (as if queue is a mailbox for assembler)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment