Skip to content

Instantly share code, notes, and snippets.

@torao
Created January 15, 2012 01:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save torao/1613789 to your computer and use it in GitHub Desktop.
Save torao/1613789 to your computer and use it in GitHub Desktop.
My Design Pattern for Asyncronous I/O
/*
* description code for http://d.hatena.ne.jp/kuromoyo/20120115/1326621574
*/
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
public class AsyncIO1 {
public class Worker extends Thread {
private final Selector selector;
private final List<Client> queue = new ArrayList<Client>();
public Worker() throws IOException {
selector = Selector.open();
}
public void add(Client client){
synchronized(queue){
queue.add(client);
}
selector.wakeup();
}
@Override
public void run(){
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
while(true){
if(! select()){
break;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while(it.hasNext()){
SelectionKey key = it.next();
it.remove();
Client client = (Client)key.attachment();
try {
if(key.isReadable()){
client.read(readBuffer);
} else if(key.isWritable()){
client.write();
}
} catch(Exception ex){
ex.printStackTrace();
client.close();
}
}
}
}
private boolean select(){
try {
selector.select();
if(this.isInterrupted()){
return false;
}
synchronized(queue){
while(! queue.isEmpty()){
Client client = queue.remove(0);
SelectionKey key = client.join(selector);
key.attach(client);
}
}
return true;
} catch(IOException ex){
ex.printStackTrace();
return false;
}
}
}
public abstract class Client {
private final SocketChannel channel;
private ByteBuffer out = null;
private final ByteArrayOutputStream inBuffer = new ByteArrayOutputStream();
private SelectionKey key = null;
private volatile boolean writable = false;
public Client(String server, int port) throws IOException{
this(new InetSocketAddress(server, port));
}
public Client(SocketAddress address) throws IOException{
this(SocketChannel.open(address));
}
public Client(SocketChannel channel) throws IOException{
this.channel = channel;
channel.configureBlocking(false);
}
public abstract void init();
public abstract String send();
public abstract void receive(String line);
protected void setWritable(boolean ready){
writable = ready;
if(ready){
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
key.selector().wakeup();
}
}
public void close(){
try{
key.cancel();
channel.close();
} catch(IOException ex){
ex.printStackTrace();
}
}
public SelectionKey join(Selector selector) throws IOException{
key = channel.register(selector, SelectionKey.OP_READ);
init();
return key;
}
public void write() throws IOException {
if(out == null){
String data = send() + "\r\n";
out = ByteBuffer.wrap(data.getBytes("UTF-8"));
}
channel.write(out);
if(out.remaining() == 0){
out = null;
if(! writable){
key.interestOps(SelectionKey.OP_READ);
}
}
}
public void read(ByteBuffer buffer) throws IOException{
channel.read(buffer);
buffer.flip();
while(buffer.remaining() > 0){
byte ch = buffer.get();
if(ch != '\r' && ch != '\n'){
inBuffer.write(ch & 0xFF);
} else {
if(ch == '\r' && buffer.remaining() > 0 && buffer.get() != '\n'){
buffer.position(buffer.position() - 1);
}
String line = inBuffer.toString("UTF-8");
inBuffer.reset();
receive(line);
}
}
buffer.clear();
}
}
public AsyncIO1() throws IOException{
class TimeProducer extends Client{
private final int id;
private final Timer timer = new Timer();
public TimeProducer(int id) throws IOException{
super("localhost", 8976);
this.id = id;
timer.scheduleAtFixedRate(new TimerTask(){
@Override
public void run(){
setWritable(true);
}
}, 1000, 1000);
}
@Override
public void init(){ }
@Override
public String send() {
setWritable(false);
StringBuilder buffer = new StringBuilder();
while(buffer.length() < 1024 * 1024){
buffer.append("X");
}
return buffer.toString();
}
@Override
public void receive(String line) {
assert(line.length() == 1024 * 1024);
assert(line.matches("X+"));
System.out.printf("[%d] %d%n", id, line.length());
}
}
Worker worker = new Worker();
worker.start();
for(int i=0; i<10; i++){
worker.add(new TimeProducer(i));
}
}
public static void main(String[] args) throws IOException{
new AsyncIO1();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment