Created
November 9, 2011 08:41
-
-
Save tomotaka/1350862 to your computer and use it in GitHub Desktop.
feed-subscriber model messaging
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.Map; | |
import java.util.UUID; | |
import java.util.concurrent.LinkedBlockingQueue; | |
public class Channel<E> { | |
private final Map<UUID, LinkedBlockingQueue<E>> channels = new HashMap<UUID, LinkedBlockingQueue<E>>(); | |
@SuppressWarnings("hiding") | |
public <E> Channel() { | |
} | |
public Feeder<E> getFeeder() { | |
return new Feeder<E>() { | |
@Override | |
public void feed(E obj) { | |
synchronized(channels) { | |
Iterator<UUID> keys = channels.keySet().iterator(); | |
while (keys.hasNext()) { | |
LinkedBlockingQueue<E> queue = channels.get(keys.next()); | |
try { | |
queue.put(obj); | |
} catch (InterruptedException e) { | |
throw new RuntimeException("interrupted"); | |
} | |
} | |
} | |
} | |
}; | |
} | |
public Subscriber<E> getSubscriber() { | |
final UUID uuid = UUID.randomUUID(); | |
final LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<E>(); | |
synchronized(channels) { | |
channels.put(uuid, queue); | |
} | |
return new Subscriber<E>() { | |
@Override | |
public E receive() { | |
try { | |
return queue.take(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
throw new RuntimeException("interrupted"); | |
} | |
} | |
}; | |
} | |
} | |
public interface Subscriber<E> { | |
public E receive(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Random; | |
public class ChannelTest { | |
public static void main(String[] args) { | |
final Channel<String> channel = new Channel<String>(); | |
final Random r = new Random(); | |
Thread add = new Thread() { | |
public void run() { | |
Feeder<String> feeder = channel.getFeeder(); | |
int i = 0; | |
while (true) { | |
String s = "message" + Integer.toString(i); | |
feeder.feed(s); | |
System.out.println("feeded: " + s); | |
i++; | |
try { | |
Thread.sleep(100 * r.nextInt(20)); | |
} catch (InterruptedException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
} | |
} | |
}; | |
add.start(); | |
Thread read1 = new Thread() { | |
public void run() { | |
Subscriber<String> sub = channel.getSubscriber(); | |
while (true) { | |
String msg = sub.receive(); | |
System.out.println("read1 received: " + msg); | |
} | |
} | |
}; | |
read1.start(); | |
Thread read2 = new Thread() { | |
public void run() { | |
Subscriber<String> sub = channel.getSubscriber(); | |
while (true) { | |
String msg = sub.receive(); | |
System.out.println("read2 received: " + msg); | |
} | |
} | |
}; | |
read2.start(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface Feeder<E> { | |
public void feed(E obj); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface Subscriber<E> { | |
public E receive(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment