Created
January 15, 2012 01:31
-
-
Save torao/1613789 to your computer and use it in GitHub Desktop.
My Design Pattern for Asyncronous I/O
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* description code for http://d.hatena.ne.jp/kuromoyo/20120115/1326621574 | |
*/ | |
import java.io.*; | |
import java.net.*; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.*; | |
import java.util.*; | |
public class AsyncIO1 { | |
public class Worker extends Thread { | |
private final Selector selector; | |
private final List<Client> queue = new ArrayList<Client>(); | |
public Worker() throws IOException { | |
selector = Selector.open(); | |
} | |
public void add(Client client){ | |
synchronized(queue){ | |
queue.add(client); | |
} | |
selector.wakeup(); | |
} | |
@Override | |
public void run(){ | |
ByteBuffer readBuffer = ByteBuffer.allocate(1024); | |
while(true){ | |
if(! select()){ | |
break; | |
} | |
Set<SelectionKey> keys = selector.selectedKeys(); | |
Iterator<SelectionKey> it = keys.iterator(); | |
while(it.hasNext()){ | |
SelectionKey key = it.next(); | |
it.remove(); | |
Client client = (Client)key.attachment(); | |
try { | |
if(key.isReadable()){ | |
client.read(readBuffer); | |
} else if(key.isWritable()){ | |
client.write(); | |
} | |
} catch(Exception ex){ | |
ex.printStackTrace(); | |
client.close(); | |
} | |
} | |
} | |
} | |
private boolean select(){ | |
try { | |
selector.select(); | |
if(this.isInterrupted()){ | |
return false; | |
} | |
synchronized(queue){ | |
while(! queue.isEmpty()){ | |
Client client = queue.remove(0); | |
SelectionKey key = client.join(selector); | |
key.attach(client); | |
} | |
} | |
return true; | |
} catch(IOException ex){ | |
ex.printStackTrace(); | |
return false; | |
} | |
} | |
} | |
public abstract class Client { | |
private final SocketChannel channel; | |
private ByteBuffer out = null; | |
private final ByteArrayOutputStream inBuffer = new ByteArrayOutputStream(); | |
private SelectionKey key = null; | |
private volatile boolean writable = false; | |
public Client(String server, int port) throws IOException{ | |
this(new InetSocketAddress(server, port)); | |
} | |
public Client(SocketAddress address) throws IOException{ | |
this(SocketChannel.open(address)); | |
} | |
public Client(SocketChannel channel) throws IOException{ | |
this.channel = channel; | |
channel.configureBlocking(false); | |
} | |
public abstract void init(); | |
public abstract String send(); | |
public abstract void receive(String line); | |
protected void setWritable(boolean ready){ | |
writable = ready; | |
if(ready){ | |
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); | |
key.selector().wakeup(); | |
} | |
} | |
public void close(){ | |
try{ | |
key.cancel(); | |
channel.close(); | |
} catch(IOException ex){ | |
ex.printStackTrace(); | |
} | |
} | |
public SelectionKey join(Selector selector) throws IOException{ | |
key = channel.register(selector, SelectionKey.OP_READ); | |
init(); | |
return key; | |
} | |
public void write() throws IOException { | |
if(out == null){ | |
String data = send() + "\r\n"; | |
out = ByteBuffer.wrap(data.getBytes("UTF-8")); | |
} | |
channel.write(out); | |
if(out.remaining() == 0){ | |
out = null; | |
if(! writable){ | |
key.interestOps(SelectionKey.OP_READ); | |
} | |
} | |
} | |
public void read(ByteBuffer buffer) throws IOException{ | |
channel.read(buffer); | |
buffer.flip(); | |
while(buffer.remaining() > 0){ | |
byte ch = buffer.get(); | |
if(ch != '\r' && ch != '\n'){ | |
inBuffer.write(ch & 0xFF); | |
} else { | |
if(ch == '\r' && buffer.remaining() > 0 && buffer.get() != '\n'){ | |
buffer.position(buffer.position() - 1); | |
} | |
String line = inBuffer.toString("UTF-8"); | |
inBuffer.reset(); | |
receive(line); | |
} | |
} | |
buffer.clear(); | |
} | |
} | |
public AsyncIO1() throws IOException{ | |
class TimeProducer extends Client{ | |
private final int id; | |
private final Timer timer = new Timer(); | |
public TimeProducer(int id) throws IOException{ | |
super("localhost", 8976); | |
this.id = id; | |
timer.scheduleAtFixedRate(new TimerTask(){ | |
@Override | |
public void run(){ | |
setWritable(true); | |
} | |
}, 1000, 1000); | |
} | |
@Override | |
public void init(){ } | |
@Override | |
public String send() { | |
setWritable(false); | |
StringBuilder buffer = new StringBuilder(); | |
while(buffer.length() < 1024 * 1024){ | |
buffer.append("X"); | |
} | |
return buffer.toString(); | |
} | |
@Override | |
public void receive(String line) { | |
assert(line.length() == 1024 * 1024); | |
assert(line.matches("X+")); | |
System.out.printf("[%d] %d%n", id, line.length()); | |
} | |
} | |
Worker worker = new Worker(); | |
worker.start(); | |
for(int i=0; i<10; i++){ | |
worker.add(new TimeProducer(i)); | |
} | |
} | |
public static void main(String[] args) throws IOException{ | |
new AsyncIO1(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment