Last active
November 20, 2015 19:17
-
-
Save psigen/b2cf8fb252485d30a8f6 to your computer and use it in GitHub Desktop.
Vehicle Tunnel Service
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.platypus.com.platypus.crw.udp; | |
import com.platypus.crw.udp.UdpConstants; | |
import com.platypus.crw.udp.UdpServer; | |
import com.platypus.crw.udp.UdpServer.Request; | |
import java.io.IOException; | |
import java.net.*; | |
import java.util.Collection; | |
import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.LinkedHashMap; | |
import java.util.Map; | |
import java.util.Timer; | |
import java.util.TimerTask; | |
import java.util.logging.Level; | |
import java.util.logging.Logger; | |
/** | |
* Standalone service for supporting vehicle registration for UDP communication. | |
* | |
* Should be run standalone on a publicly visible server. | |
* | |
* @author Prasanna Velagapudi <psigen@gmail.com> | |
*/ | |
public class VehicleTunnelService { | |
private static final Logger logger = Logger.getLogger(VehicleTunnelService.class.getName()); | |
public static final int DEFAULT_UDP_PORT = 6077; | |
public static final int IPTOS_LOWDELAY = 0x10; | |
protected final UdpServer _udpServer; | |
protected final Timer _registrationTimer = new Timer(); | |
protected final Map<SocketAddress, Client> _clients = new LinkedHashMap<SocketAddress, Client>(); | |
protected static class Client { | |
int ttl; | |
String name; | |
SocketAddress vehicle_addr; | |
SocketAddress controller_addr; | |
InetSocketAddress tunnel_addr; | |
DatagramSocket socket; | |
} | |
public VehicleTunnelService() { | |
this(DEFAULT_UDP_PORT); | |
} | |
public VehicleTunnelService(int udpPort) { | |
_udpServer = new UdpServer(udpPort); | |
_udpServer.setHandler(_handler); | |
_udpServer.start(); | |
_registrationTimer.scheduleAtFixedRate(_registrationTask, 0, UdpConstants.REGISTRATION_RATE_MS); | |
} | |
public void shutdown() { | |
_udpServer.stop(); | |
} | |
private final class TunnelHandler implements Runnable { | |
private final Client _client; | |
public TunnelHandler(final Client c) { | |
_client = c; | |
} | |
@Override | |
public void run() { | |
final byte[] buffer = new byte[UdpConstants.MAX_PACKET_SIZE]; | |
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length); | |
try { | |
_client.socket.receive(packet); | |
// Send controller data to the vehicle and vice-versa. | |
synchronized (_client) { | |
if (packet.getSocketAddress().equals(_client.vehicle_addr)) { | |
packet.setSocketAddress(_client.controller_addr); | |
logger.fine("V2C: " + | |
_client.vehicle_addr + " -> " + | |
_client.socket.getLocalPort() + " -> " + | |
_client.controller_addr); | |
} else if (packet.getSocketAddress().equals(_client.controller_addr)) { | |
_client.controller_addr = packet.getSocketAddress(); | |
packet.setSocketAddress(_client.vehicle_addr); | |
logger.fine("C2V: " + | |
_client.controller_addr + " -> " + | |
_client.socket.getLocalPort() + " -> " + | |
_client.vehicle_addr); | |
} else { | |
logger.warning("Unknown transmission from: " + packet.getSocketAddress()); | |
} | |
} | |
_client.socket.send(packet); | |
} catch (IOException e) { | |
logger.severe("Error sending to " + _client.name + ": " + e); | |
} | |
} | |
} | |
private final UdpServer.RequestHandler _handler = new UdpServer.RequestHandler() { | |
@Override | |
public void received(Request req) { | |
try { | |
final String command = req.stream.readUTF(); | |
switch (UdpConstants.COMMAND.fromStr(command)) { | |
case UNKNOWN: | |
break; | |
case CMD_REGISTER: | |
synchronized(_clients) { | |
// Look for client in table | |
Client c = _clients.get(req.source); | |
// If not found, create a new entry | |
if (c == null) { | |
c = new Client(); | |
c.vehicle_addr = req.source; | |
c.controller_addr = null; | |
c.name = req.stream.readUTF(); | |
try { | |
c.socket = new DatagramSocket(); | |
c.socket.setSendBufferSize(UdpConstants.MAX_PACKET_SIZE); | |
c.socket.setTrafficClass(IPTOS_LOWDELAY); | |
c.tunnel_addr = new InetSocketAddress( | |
InetAddress.getLocalHost(), c.socket.getLocalPort()); | |
} catch (SocketException e) { | |
logger.severe("Unable to open desired UDP socket."); | |
throw new RuntimeException("Unable to open desired UDP socket.", e); | |
} | |
new Thread(new TunnelHandler(c)).start(); | |
_clients.put(req.source, c); | |
} | |
// Update the registration count for this client | |
c.ttl = UdpConstants.REGISTRATION_TIMEOUT_COUNT; | |
} | |
break; | |
case CMD_CONNECT: | |
// Unpack address to which to connect. | |
String hostname = req.stream.readUTF(); | |
int port = req.stream.readInt(); | |
InetSocketAddress addr = new InetSocketAddress(hostname, port); | |
// Rewrite request to go through registry. | |
Client c = null; | |
synchronized(_clients) { | |
for (Map.Entry<SocketAddress, Client> e : _clients.entrySet()) { | |
if (e.getValue().socket.getLocalPort() == addr.getPort()) { | |
c = e.getValue(); | |
break; | |
} | |
} | |
} | |
// Ignore requests for non-tunneled clients. | |
if (c == null) { | |
logger.warning("Connection requested for invalid client: " + addr); | |
return; | |
} else { | |
logger.info("Connection request for: " + | |
req.source + " -> " + | |
c.tunnel_addr + " -> " + | |
c.vehicle_addr); | |
} | |
// Forward this connection request to the server in question. | |
synchronized(c) { | |
c.controller_addr = req.source; | |
UdpServer.Response respCon = new UdpServer.Response(req.ticket, c.vehicle_addr); | |
respCon.stream.writeUTF(command); | |
respCon.stream.writeUTF(c.tunnel_addr.getAddress().getHostAddress()); | |
respCon.stream.writeInt(c.tunnel_addr.getPort()); | |
_udpServer.respond(respCon); | |
} | |
break; | |
case CMD_LIST: | |
// Create a response to the same client | |
UdpServer.Response respList = new UdpServer.Response(req); | |
respList.stream.writeUTF(command); | |
// List all of the clients | |
synchronized(_clients) { | |
respList.stream.writeInt(_clients.size()); | |
for (Map.Entry<SocketAddress, Client> e : _clients.entrySet()) { | |
respList.stream.writeUTF(e.getValue().name); | |
respList.stream.writeUTF(e.getValue().tunnel_addr.getAddress().getHostAddress()); | |
respList.stream.writeInt(e.getValue().tunnel_addr.getPort()); | |
} | |
} | |
_udpServer.respond(respList); | |
break; | |
case CMD_REGISTER_POSE_LISTENER: | |
break; | |
case CMD_SEND_POSE: | |
break; | |
case CMD_SET_POSE: | |
break; | |
case CMD_GET_POSE: | |
break; | |
case CMD_REGISTER_IMAGE_LISTENER: | |
break; | |
case CMD_SEND_IMAGE: | |
break; | |
case CMD_CAPTURE_IMAGE: | |
break; | |
case CMD_REGISTER_CAMERA_LISTENER: | |
break; | |
case CMD_SEND_CAMERA: | |
break; | |
case CMD_START_CAMERA: | |
break; | |
case CMD_STOP_CAMERA: | |
break; | |
case CMD_GET_CAMERA_STATUS: | |
break; | |
case CMD_REGISTER_SENSOR_LISTENER: | |
break; | |
case CMD_SEND_SENSOR: | |
break; | |
case CMD_SET_SENSOR_TYPE: | |
break; | |
case CMD_GET_SENSOR_TYPE: | |
break; | |
case CMD_GET_NUM_SENSORS: | |
break; | |
case CMD_REGISTER_VELOCITY_LISTENER: | |
break; | |
case CMD_SEND_VELOCITY: | |
break; | |
case CMD_SET_VELOCITY: | |
break; | |
case CMD_GET_VELOCITY: | |
break; | |
case CMD_REGISTER_WAYPOINT_LISTENER: | |
break; | |
case CMD_SEND_WAYPOINT: | |
break; | |
case CMD_START_WAYPOINTS: | |
break; | |
case CMD_STOP_WAYPOINTS: | |
break; | |
case CMD_GET_WAYPOINTS: | |
break; | |
case CMD_GET_WAYPOINT_STATUS: | |
break; | |
case CMD_IS_CONNECTED: | |
break; | |
case CMD_IS_AUTONOMOUS: | |
break; | |
case CMD_SET_AUTONOMOUS: | |
break; | |
case CMD_SET_GAINS: | |
break; | |
case CMD_GET_GAINS: | |
break; | |
default: | |
logger.log(Level.WARNING, "Ignoring unknown command: {0}", command); | |
} | |
} catch (IOException e) { | |
logger.log(Level.WARNING, "Failed to parse request: {0}", req.ticket); | |
} | |
} | |
@Override | |
public void timeout(long ticket, SocketAddress destination) { | |
logger.log(Level.SEVERE, "Unexpected timeout: {0}", ticket); | |
} | |
}; | |
// Removes outdated registrations from client list | |
protected TimerTask _registrationTask = new TimerTask() { | |
@Override | |
public void run() { | |
synchronized(_clients) { | |
for (Iterator<Map.Entry<SocketAddress, Client>> it = _clients.entrySet().iterator(); it.hasNext();) { | |
Map.Entry<SocketAddress, Client> client = it.next(); | |
if (client.getValue().ttl == 0) { | |
client.getValue().socket.close(); | |
it.remove(); | |
} else { | |
client.getValue().ttl--; | |
} | |
} | |
} | |
} | |
}; | |
/** | |
* Returns a map containing the current set of clients. | |
* | |
* @return a map from the socket address of clients to their text names | |
*/ | |
public Map<SocketAddress, String> getClients() { | |
HashMap<SocketAddress, String> map = new LinkedHashMap<SocketAddress, String>(); | |
synchronized(_clients) { | |
for (Client client : _clients.values()) { | |
map.put(client.vehicle_addr, client.name); | |
} | |
} | |
return map; | |
} | |
/** | |
* Simple startup script that runs the VehicleTunnelService using the | |
* specified UDP port if available and prints a list of connected clients. | |
* | |
* @param args takes a UDP port number as the first argument. | |
*/ | |
public static void main(String args[]) throws UnknownHostException { | |
final int port = (args.length > 0) ? Integer.parseInt(args[0]) : DEFAULT_UDP_PORT; | |
final VehicleTunnelService service = new VehicleTunnelService(); | |
System.out.println("Started vehicle tunnel: " + InetAddress.getLocalHost() + ":" + port); | |
// Periodically print the registered clients | |
Timer printer = new Timer(); | |
printer.scheduleAtFixedRate(new TimerTask() { | |
@Override | |
public void run() { | |
Collection<Map.Entry<SocketAddress, String>> clients = service.getClients().entrySet(); | |
if (clients.size() > 0) { | |
System.out.println("CLIENT LIST:"); | |
for (Map.Entry<SocketAddress, String> e : clients) { | |
System.out.println("\t" + e.getValue() + " : " + e.getKey()); | |
} | |
} else { | |
System.out.println("NO CLIENTS."); | |
} | |
} | |
}, 0, 1000); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment