Skip to content

Instantly share code, notes, and snippets.

@psigen
Last active November 20, 2015 19:17
Show Gist options
  • Save psigen/b2cf8fb252485d30a8f6 to your computer and use it in GitHub Desktop.
Save psigen/b2cf8fb252485d30a8f6 to your computer and use it in GitHub Desktop.
Vehicle Tunnel Service
package com.platypus.com.platypus.crw.udp;
import com.platypus.crw.udp.UdpConstants;
import com.platypus.crw.udp.UdpServer;
import com.platypus.crw.udp.UdpServer.Request;
import java.io.IOException;
import java.net.*;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Standalone service for supporting vehicle registration for UDP communication.
*
* Should be run standalone on a publicly visible server.
*
* @author Prasanna Velagapudi <psigen@gmail.com>
*/
public class VehicleTunnelService {
private static final Logger logger = Logger.getLogger(VehicleTunnelService.class.getName());
public static final int DEFAULT_UDP_PORT = 6077;
public static final int IPTOS_LOWDELAY = 0x10;
protected final UdpServer _udpServer;
protected final Timer _registrationTimer = new Timer();
protected final Map<SocketAddress, Client> _clients = new LinkedHashMap<SocketAddress, Client>();
protected static class Client {
int ttl;
String name;
SocketAddress vehicle_addr;
SocketAddress controller_addr;
InetSocketAddress tunnel_addr;
DatagramSocket socket;
}
public VehicleTunnelService() {
this(DEFAULT_UDP_PORT);
}
public VehicleTunnelService(int udpPort) {
_udpServer = new UdpServer(udpPort);
_udpServer.setHandler(_handler);
_udpServer.start();
_registrationTimer.scheduleAtFixedRate(_registrationTask, 0, UdpConstants.REGISTRATION_RATE_MS);
}
public void shutdown() {
_udpServer.stop();
}
private final class TunnelHandler implements Runnable {
private final Client _client;
public TunnelHandler(final Client c) {
_client = c;
}
@Override
public void run() {
final byte[] buffer = new byte[UdpConstants.MAX_PACKET_SIZE];
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
try {
_client.socket.receive(packet);
// Send controller data to the vehicle and vice-versa.
synchronized (_client) {
if (packet.getSocketAddress().equals(_client.vehicle_addr)) {
packet.setSocketAddress(_client.controller_addr);
logger.fine("V2C: " +
_client.vehicle_addr + " -> " +
_client.socket.getLocalPort() + " -> " +
_client.controller_addr);
} else if (packet.getSocketAddress().equals(_client.controller_addr)) {
_client.controller_addr = packet.getSocketAddress();
packet.setSocketAddress(_client.vehicle_addr);
logger.fine("C2V: " +
_client.controller_addr + " -> " +
_client.socket.getLocalPort() + " -> " +
_client.vehicle_addr);
} else {
logger.warning("Unknown transmission from: " + packet.getSocketAddress());
}
}
_client.socket.send(packet);
} catch (IOException e) {
logger.severe("Error sending to " + _client.name + ": " + e);
}
}
}
private final UdpServer.RequestHandler _handler = new UdpServer.RequestHandler() {
@Override
public void received(Request req) {
try {
final String command = req.stream.readUTF();
switch (UdpConstants.COMMAND.fromStr(command)) {
case UNKNOWN:
break;
case CMD_REGISTER:
synchronized(_clients) {
// Look for client in table
Client c = _clients.get(req.source);
// If not found, create a new entry
if (c == null) {
c = new Client();
c.vehicle_addr = req.source;
c.controller_addr = null;
c.name = req.stream.readUTF();
try {
c.socket = new DatagramSocket();
c.socket.setSendBufferSize(UdpConstants.MAX_PACKET_SIZE);
c.socket.setTrafficClass(IPTOS_LOWDELAY);
c.tunnel_addr = new InetSocketAddress(
InetAddress.getLocalHost(), c.socket.getLocalPort());
} catch (SocketException e) {
logger.severe("Unable to open desired UDP socket.");
throw new RuntimeException("Unable to open desired UDP socket.", e);
}
new Thread(new TunnelHandler(c)).start();
_clients.put(req.source, c);
}
// Update the registration count for this client
c.ttl = UdpConstants.REGISTRATION_TIMEOUT_COUNT;
}
break;
case CMD_CONNECT:
// Unpack address to which to connect.
String hostname = req.stream.readUTF();
int port = req.stream.readInt();
InetSocketAddress addr = new InetSocketAddress(hostname, port);
// Rewrite request to go through registry.
Client c = null;
synchronized(_clients) {
for (Map.Entry<SocketAddress, Client> e : _clients.entrySet()) {
if (e.getValue().socket.getLocalPort() == addr.getPort()) {
c = e.getValue();
break;
}
}
}
// Ignore requests for non-tunneled clients.
if (c == null) {
logger.warning("Connection requested for invalid client: " + addr);
return;
} else {
logger.info("Connection request for: " +
req.source + " -> " +
c.tunnel_addr + " -> " +
c.vehicle_addr);
}
// Forward this connection request to the server in question.
synchronized(c) {
c.controller_addr = req.source;
UdpServer.Response respCon = new UdpServer.Response(req.ticket, c.vehicle_addr);
respCon.stream.writeUTF(command);
respCon.stream.writeUTF(c.tunnel_addr.getAddress().getHostAddress());
respCon.stream.writeInt(c.tunnel_addr.getPort());
_udpServer.respond(respCon);
}
break;
case CMD_LIST:
// Create a response to the same client
UdpServer.Response respList = new UdpServer.Response(req);
respList.stream.writeUTF(command);
// List all of the clients
synchronized(_clients) {
respList.stream.writeInt(_clients.size());
for (Map.Entry<SocketAddress, Client> e : _clients.entrySet()) {
respList.stream.writeUTF(e.getValue().name);
respList.stream.writeUTF(e.getValue().tunnel_addr.getAddress().getHostAddress());
respList.stream.writeInt(e.getValue().tunnel_addr.getPort());
}
}
_udpServer.respond(respList);
break;
case CMD_REGISTER_POSE_LISTENER:
break;
case CMD_SEND_POSE:
break;
case CMD_SET_POSE:
break;
case CMD_GET_POSE:
break;
case CMD_REGISTER_IMAGE_LISTENER:
break;
case CMD_SEND_IMAGE:
break;
case CMD_CAPTURE_IMAGE:
break;
case CMD_REGISTER_CAMERA_LISTENER:
break;
case CMD_SEND_CAMERA:
break;
case CMD_START_CAMERA:
break;
case CMD_STOP_CAMERA:
break;
case CMD_GET_CAMERA_STATUS:
break;
case CMD_REGISTER_SENSOR_LISTENER:
break;
case CMD_SEND_SENSOR:
break;
case CMD_SET_SENSOR_TYPE:
break;
case CMD_GET_SENSOR_TYPE:
break;
case CMD_GET_NUM_SENSORS:
break;
case CMD_REGISTER_VELOCITY_LISTENER:
break;
case CMD_SEND_VELOCITY:
break;
case CMD_SET_VELOCITY:
break;
case CMD_GET_VELOCITY:
break;
case CMD_REGISTER_WAYPOINT_LISTENER:
break;
case CMD_SEND_WAYPOINT:
break;
case CMD_START_WAYPOINTS:
break;
case CMD_STOP_WAYPOINTS:
break;
case CMD_GET_WAYPOINTS:
break;
case CMD_GET_WAYPOINT_STATUS:
break;
case CMD_IS_CONNECTED:
break;
case CMD_IS_AUTONOMOUS:
break;
case CMD_SET_AUTONOMOUS:
break;
case CMD_SET_GAINS:
break;
case CMD_GET_GAINS:
break;
default:
logger.log(Level.WARNING, "Ignoring unknown command: {0}", command);
}
} catch (IOException e) {
logger.log(Level.WARNING, "Failed to parse request: {0}", req.ticket);
}
}
@Override
public void timeout(long ticket, SocketAddress destination) {
logger.log(Level.SEVERE, "Unexpected timeout: {0}", ticket);
}
};
// Removes outdated registrations from client list
protected TimerTask _registrationTask = new TimerTask() {
@Override
public void run() {
synchronized(_clients) {
for (Iterator<Map.Entry<SocketAddress, Client>> it = _clients.entrySet().iterator(); it.hasNext();) {
Map.Entry<SocketAddress, Client> client = it.next();
if (client.getValue().ttl == 0) {
client.getValue().socket.close();
it.remove();
} else {
client.getValue().ttl--;
}
}
}
}
};
/**
* Returns a map containing the current set of clients.
*
* @return a map from the socket address of clients to their text names
*/
public Map<SocketAddress, String> getClients() {
HashMap<SocketAddress, String> map = new LinkedHashMap<SocketAddress, String>();
synchronized(_clients) {
for (Client client : _clients.values()) {
map.put(client.vehicle_addr, client.name);
}
}
return map;
}
/**
* Simple startup script that runs the VehicleTunnelService using the
* specified UDP port if available and prints a list of connected clients.
*
* @param args takes a UDP port number as the first argument.
*/
public static void main(String args[]) throws UnknownHostException {
final int port = (args.length > 0) ? Integer.parseInt(args[0]) : DEFAULT_UDP_PORT;
final VehicleTunnelService service = new VehicleTunnelService();
System.out.println("Started vehicle tunnel: " + InetAddress.getLocalHost() + ":" + port);
// Periodically print the registered clients
Timer printer = new Timer();
printer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
Collection<Map.Entry<SocketAddress, String>> clients = service.getClients().entrySet();
if (clients.size() > 0) {
System.out.println("CLIENT LIST:");
for (Map.Entry<SocketAddress, String> e : clients) {
System.out.println("\t" + e.getValue() + " : " + e.getKey());
}
} else {
System.out.println("NO CLIENTS.");
}
}
}, 0, 1000);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment