Skip to content

Instantly share code, notes, and snippets.

@tomotaka
Created November 9, 2011 08:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomotaka/1350862 to your computer and use it in GitHub Desktop.
Save tomotaka/1350862 to your computer and use it in GitHub Desktop.
feed-subscriber model messaging
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
public class Channel<E> {
private final Map<UUID, LinkedBlockingQueue<E>> channels = new HashMap<UUID, LinkedBlockingQueue<E>>();
@SuppressWarnings("hiding")
public <E> Channel() {
}
public Feeder<E> getFeeder() {
return new Feeder<E>() {
@Override
public void feed(E obj) {
synchronized(channels) {
Iterator<UUID> keys = channels.keySet().iterator();
while (keys.hasNext()) {
LinkedBlockingQueue<E> queue = channels.get(keys.next());
try {
queue.put(obj);
} catch (InterruptedException e) {
throw new RuntimeException("interrupted");
}
}
}
}
};
}
public Subscriber<E> getSubscriber() {
final UUID uuid = UUID.randomUUID();
final LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<E>();
synchronized(channels) {
channels.put(uuid, queue);
}
return new Subscriber<E>() {
@Override
public E receive() {
try {
return queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException("interrupted");
}
}
};
}
}
public interface Subscriber<E> {
public E receive();
}
import java.util.Random;
public class ChannelTest {
public static void main(String[] args) {
final Channel<String> channel = new Channel<String>();
final Random r = new Random();
Thread add = new Thread() {
public void run() {
Feeder<String> feeder = channel.getFeeder();
int i = 0;
while (true) {
String s = "message" + Integer.toString(i);
feeder.feed(s);
System.out.println("feeded: " + s);
i++;
try {
Thread.sleep(100 * r.nextInt(20));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
};
add.start();
Thread read1 = new Thread() {
public void run() {
Subscriber<String> sub = channel.getSubscriber();
while (true) {
String msg = sub.receive();
System.out.println("read1 received: " + msg);
}
}
};
read1.start();
Thread read2 = new Thread() {
public void run() {
Subscriber<String> sub = channel.getSubscriber();
while (true) {
String msg = sub.receive();
System.out.println("read2 received: " + msg);
}
}
};
read2.start();
}
}
public interface Feeder<E> {
public void feed(E obj);
}
public interface Subscriber<E> {
public E receive();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment