Skip to content

Instantly share code, notes, and snippets.

@cyberterror
Created June 8, 2016 18:28
Show Gist options
  • Save cyberterror/679f1859e124cb99321e57a999ea2445 to your computer and use it in GitHub Desktop.
Save cyberterror/679f1859e124cb99321e57a999ea2445 to your computer and use it in GitHub Desktop.
public class Handler implements Runnable{
/** Count of objects is used in session Name*/
private static int count;
private int sessionCount;
/** Two variables used in communication process*/
private final SelectionKey handlerKey;
private final SocketChannel socketChannel;
/** String representation of session name and address mostly used by view*/
private final String sessionName;
private final String sessionAddress;
/** The queue of buffers to be to send*/
private BlockingQueue<ByteBuffer> outBufferBlockingQueue = new LinkedBlockingQueue<>();
private final ByteBuffer input = ByteBuffer.allocate(100);
static final int READING = 0, SENDING = 1;
int state = READING;
// GETTERS
public String getSessionName() {return sessionName;}
public String getSessionAddress() {return sessionAddress;}
public String getState() {
if (state == 0) return "READING";
else return "SENDING";
}
public BlockingQueue<ByteBuffer> getOutBufferBlockingQueue() {return outBufferBlockingQueue;}
public Handler(Selector selector, SocketChannel socketChannel) throws IOException {
this.socketChannel = socketChannel;
socketChannel.configureBlocking(false);
handlerKey = socketChannel.register(selector, 0);
handlerKey.attach(this);
handlerKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
count++;
this.sessionCount = count;
this.sessionName = "Scan Engine " + sessionCount;
this.sessionAddress = socketChannel.getRemoteAddress().toString();
}
@Override
public void run() {
/** todo Sometimes sudden disconnects and connects can brake everything. Need to debug!!!*/
if (state == READING) {
// Check connection beforehand
if (!checkConnection()){
System.out.println("Connection failed for reading!");
return;
}
try {
read();
} catch (IOException e) {
/** If client has forcibly closed the connection we need to properly close the connection and cancle key*/
System.out.println("The remote forcibly closed the connection during reading!");
try {
socketChannel.close();
handlerKey.cancel();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
if (state == SENDING) {
// Check connection beforehand
if (!checkConnection()){
System.out.println("Connection failed for sending!");
return;
}
try {
sendProcess();
} catch (IOException e) {
/** If client has forcibly closed the connection we need to properly close the connection and cancle key*/
System.out.println("The remote forcibly closed the connection during sending!");
try {
socketChannel.close();
handlerKey.cancel();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
/** For sudden disconnects*/
private boolean checkConnection() {
if (handlerKey.isValid() && socketChannel.isOpen()) return true;
else {
try {
disconnect();
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
}
private void read() throws IOException{
/** todo Так как у нас принимается информация кусками надо придумать систему как соединять и парсить эти куски
* В остальном работает корректно*/
int readCount = socketChannel.read(input);
if (readCount == -1) {
// Remote entity shut the socket down cleanly. Do the
// same from our end and cancel the channel.
disconnect();
}
if (readCount >= 1) {
readProcess(readCount);
}
}
synchronized private void readProcess(int readCount) {
/** Here we get the null, part, full message or more than a message data block - bytes
* From this moment we need to proceed it to MessageReader */
input.flip();
byte[] bytes = new byte[readCount];
byte[] array = input.array();
System.arraycopy(array, 0, bytes, 0, readCount);
//System.out.println("Received " + readCount + " bytes from " + getSessionName() + "@" + getSessionAddress());
}
/** Public method for sending data*/
public void send(final ByteBuffer byteBuffer) {
try {
/** Put the data in the queue*/
outBufferBlockingQueue.put(byteBuffer);
} catch (InterruptedException e) {
System.out.println("Problems in outBufferBlockingQueue!!");
}
state = SENDING;
handlerKey.interestOps(SelectionKey.OP_WRITE);
handlerKey.selector().wakeup();
}
synchronized private void sendProcess() throws IOException {
ByteBuffer out = null;
try {
out = getOutBufferBlockingQueue().take();
} catch (InterruptedException e) {
e.printStackTrace();
}
while (out.hasRemaining()) {
socketChannel.write(out);
}
//System.out.println("Send " + out.position() + " bytes to " + getSessionName() + "@" + getSessionAddress());
handlerKey.interestOps(SelectionKey.OP_READ);
state = READING;
}
public void disconnect() throws IOException {
System.out.println("bye bye " + sessionAddress);
socketChannel.close();
handlerKey.cancel();
handlerKey.selector().wakeup();
}
@Override
public String toString() {
return sessionName + "@" + sessionAddress;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment