Skip to content

Instantly share code, notes, and snippets.

@DreamLean
Created September 11, 2019 03:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DreamLean/61b215bad3c8f839ac4c84536d67c804 to your computer and use it in GitHub Desktop.
Save DreamLean/61b215bad3c8f839ac4c84536d67c804 to your computer and use it in GitHub Desktop.
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class Client {
private SocketChannel client;
private ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> scheduledFuture;
//default behavior
private Consumer<String> consumer = (a) -> {};
public Client(String host, int port) throws IOException {
client = SocketChannel.open(new InetSocketAddress(host, port));
//scheduler to read input messages
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this::onMessage, 0, 250, TimeUnit.MILLISECONDS);
}
public void sendMessage(String msg) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
client.write(buffer);
}
public void onMessage(Consumer<String> consumer){
this.consumer = consumer;
}
public void close(){
scheduledFuture.cancel(true);
scheduledExecutorService.shutdownNow();
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void onMessage() {
try {
ByteBuffer inputBuffer = ByteBuffer.allocate(256);
client.read(inputBuffer);
consumer.accept(new String(inputBuffer.array()).trim());
} catch (IOException e) {
e.printStackTrace();
}
}
}
import java.io.IOException;
public class ClientExample {
public static void main(String[] args) throws IOException {
Client client = new Client("localhost", 40000);
client.onMessage(msg -> {
switch (msg){
case "Shutdown":
System.out.println("Shutting down...");
client.close();
System.exit(0);
break;
default:
System.out.println(msg);
}
});
}
}
/**
* Fields in class should be values that are extractable
* from message parser
*/
public class Message {
private String key;
private String value;
protected Message(String key, String value){
this.key = key;
this.value = value;
}
public String key(){
return key;
}
public String value(){
return value;
}
}
/**
Parser - Must be configured to tailor ones protocol
*/
class MessageParser{
public static Message get(String message){
String[] input = message.split("=");
return new Message(input[0], input[1]);
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
public class Server {
private ServerSocketChannel serverSocket;
private Selector selector;
private List<SocketChannel> clients;
private Map<String, SocketChannel> clientByName;
//default behavior
private BiConsumer<SocketChannel, String> consumer = (a, b) -> {};
public Server(String host, int port) throws IOException {
//server config
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(host, port));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
clients = new ArrayList<>();
clientByName = new HashMap<>();
}
public void listen() throws IOException {
while (true) {
selector.select();
for (SelectionKey key : selector.selectedKeys()) {
//onConnection
if (key.isAcceptable()) {
onConnection(selector, serverSocket);
}
//onMessage
if (key.isReadable()) {
onMessage(key);
}
}
//remove keys
selector.selectedKeys().clear();
}
}
//callback
public void onMessage(BiConsumer<SocketChannel, String> consumer) {
this.consumer = consumer;
}
//send message to client
public void reply(SocketChannel client, String message) {
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
try {
client.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
public void registerClient(SocketChannel client, String name) {
clientByName.put(name, client);
}
public SocketChannel getClient(String name) {
return clientByName.get(name);
}
//broadcast message to all clients
public void broadcast(String message) {
clients.forEach(client -> reply(client, message));
}
//terminate client connection
public void closeClient(SocketChannel client) {
try {
clients.remove(client);
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//registers new client onConnection
private void onConnection(Selector selector, ServerSocketChannel serverSocket) throws IOException {
SocketChannel clientSocket = serverSocket.accept();
clientSocket.configureBlocking(false);
clientSocket.register(selector, SelectionKey.OP_READ);
clients.add(clientSocket);
}
//extracts message and consumes callback
private void onMessage(SelectionKey key) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(256);
SocketChannel client = (SocketChannel) key.channel();
client.read(buffer);
String input = new String(buffer.array()).trim();
consumer.accept(client, input);
}
}
import java.io.IOException;
public class ServerExample {
public static void main(String[] args) throws IOException {
Server server = new Server("localhost", 40000);
//define protocol
server.onMessage((client, msg) -> {
Message message = MessageParser.get(msg);
switch (message.key()) {
case "Register":
server.registerClient(client, message.value());
server.reply(client, "Client registered!\n");
break;
case "SendTo":
server.reply(server.getClient(message.value()), "Message Sent");
break;
case "Echo":
server.reply(client, message.value());
break;
case "Global":
server.broadcast("Global Message\n");
break;
case "Close":
server.reply(client, "Shutdown\n");
server.closeClient(client);
break;
}
});
//start server
server.listen();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment