Created
June 8, 2016 18:28
-
-
Save cyberterror/679f1859e124cb99321e57a999ea2445 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Handler implements Runnable{ | |
/** Count of objects is used in session Name*/ | |
private static int count; | |
private int sessionCount; | |
/** Two variables used in communication process*/ | |
private final SelectionKey handlerKey; | |
private final SocketChannel socketChannel; | |
/** String representation of session name and address mostly used by view*/ | |
private final String sessionName; | |
private final String sessionAddress; | |
/** The queue of buffers to be to send*/ | |
private BlockingQueue<ByteBuffer> outBufferBlockingQueue = new LinkedBlockingQueue<>(); | |
private final ByteBuffer input = ByteBuffer.allocate(100); | |
static final int READING = 0, SENDING = 1; | |
int state = READING; | |
// GETTERS | |
public String getSessionName() {return sessionName;} | |
public String getSessionAddress() {return sessionAddress;} | |
public String getState() { | |
if (state == 0) return "READING"; | |
else return "SENDING"; | |
} | |
public BlockingQueue<ByteBuffer> getOutBufferBlockingQueue() {return outBufferBlockingQueue;} | |
public Handler(Selector selector, SocketChannel socketChannel) throws IOException { | |
this.socketChannel = socketChannel; | |
socketChannel.configureBlocking(false); | |
handlerKey = socketChannel.register(selector, 0); | |
handlerKey.attach(this); | |
handlerKey.interestOps(SelectionKey.OP_READ); | |
selector.wakeup(); | |
count++; | |
this.sessionCount = count; | |
this.sessionName = "Scan Engine " + sessionCount; | |
this.sessionAddress = socketChannel.getRemoteAddress().toString(); | |
} | |
@Override | |
public void run() { | |
/** todo Sometimes sudden disconnects and connects can brake everything. Need to debug!!!*/ | |
if (state == READING) { | |
// Check connection beforehand | |
if (!checkConnection()){ | |
System.out.println("Connection failed for reading!"); | |
return; | |
} | |
try { | |
read(); | |
} catch (IOException e) { | |
/** If client has forcibly closed the connection we need to properly close the connection and cancle key*/ | |
System.out.println("The remote forcibly closed the connection during reading!"); | |
try { | |
socketChannel.close(); | |
handlerKey.cancel(); | |
} catch (IOException e1) { | |
e1.printStackTrace(); | |
} | |
} | |
} | |
if (state == SENDING) { | |
// Check connection beforehand | |
if (!checkConnection()){ | |
System.out.println("Connection failed for sending!"); | |
return; | |
} | |
try { | |
sendProcess(); | |
} catch (IOException e) { | |
/** If client has forcibly closed the connection we need to properly close the connection and cancle key*/ | |
System.out.println("The remote forcibly closed the connection during sending!"); | |
try { | |
socketChannel.close(); | |
handlerKey.cancel(); | |
} catch (IOException e1) { | |
e1.printStackTrace(); | |
} | |
} | |
} | |
} | |
/** For sudden disconnects*/ | |
private boolean checkConnection() { | |
if (handlerKey.isValid() && socketChannel.isOpen()) return true; | |
else { | |
try { | |
disconnect(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
return false; | |
} | |
} | |
private void read() throws IOException{ | |
/** todo Так как у нас принимается информация кусками надо придумать систему как соединять и парсить эти куски | |
* В остальном работает корректно*/ | |
int readCount = socketChannel.read(input); | |
if (readCount == -1) { | |
// Remote entity shut the socket down cleanly. Do the | |
// same from our end and cancel the channel. | |
disconnect(); | |
} | |
if (readCount >= 1) { | |
readProcess(readCount); | |
} | |
} | |
synchronized private void readProcess(int readCount) { | |
/** Here we get the null, part, full message or more than a message data block - bytes | |
* From this moment we need to proceed it to MessageReader */ | |
input.flip(); | |
byte[] bytes = new byte[readCount]; | |
byte[] array = input.array(); | |
System.arraycopy(array, 0, bytes, 0, readCount); | |
//System.out.println("Received " + readCount + " bytes from " + getSessionName() + "@" + getSessionAddress()); | |
} | |
/** Public method for sending data*/ | |
public void send(final ByteBuffer byteBuffer) { | |
try { | |
/** Put the data in the queue*/ | |
outBufferBlockingQueue.put(byteBuffer); | |
} catch (InterruptedException e) { | |
System.out.println("Problems in outBufferBlockingQueue!!"); | |
} | |
state = SENDING; | |
handlerKey.interestOps(SelectionKey.OP_WRITE); | |
handlerKey.selector().wakeup(); | |
} | |
synchronized private void sendProcess() throws IOException { | |
ByteBuffer out = null; | |
try { | |
out = getOutBufferBlockingQueue().take(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
while (out.hasRemaining()) { | |
socketChannel.write(out); | |
} | |
//System.out.println("Send " + out.position() + " bytes to " + getSessionName() + "@" + getSessionAddress()); | |
handlerKey.interestOps(SelectionKey.OP_READ); | |
state = READING; | |
} | |
public void disconnect() throws IOException { | |
System.out.println("bye bye " + sessionAddress); | |
socketChannel.close(); | |
handlerKey.cancel(); | |
handlerKey.selector().wakeup(); | |
} | |
@Override | |
public String toString() { | |
return sessionName + "@" + sessionAddress; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment