Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Remoting Experiment
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;
/**
* One side of an object connection.
*/
public class ObjectConnection extends Thread {
interface ControlObject {
public Object getInitialObject();
}
private ObjectOutputStream oos;
private ObjectInputStream ois;
public static class ExportedObject {
public Object obj;
public int refs;
public ExportedObject(Object o) {
this.obj = o;
}
}
// Objects we've sent out:
private Map<Integer, ExportedObject> exports = new HashMap<Integer, ExportedObject>();
private Map<Object, Integer> exportsBack = new HashMap<Object, Integer>();
// Objects we're waiting for:
private Map<Integer, Observer> returns = new HashMap<Integer, Observer>();
private int ctr;
private int methodCallCtr;
private volatile boolean running;
public ObjectConnection(final Object initialLocal, ObjectInputStream ois, ObjectOutputStream oos) {
this.ois = ois;
this.oos = oos;
// Export this object
int id = findOrPutExport(new ControlObject() {
public Object getInitialObject() {
return initialLocal;
}
});
assert(id == 0);
running = true;
}
public void close() {
running = false;
interrupt();
}
public <T> T getInitialRemoteObject(Class<T> clazz) {
ObjectReference ref = makeRemoteObjectReference(0, ControlObject.class);
ref.forcedReturnType = clazz;
ControlObject co = (ControlObject) ref.getStub();
return (T) co.getInitialObject();
}
private synchronized int findOrPutExport(Object o) {
//System.err.println("Sending object: " + o);
if(exportsBack.containsKey(o)) {
int id = exportsBack.get(o);
ExportedObject ho = exports.get(id);
ho.refs++;
//System.err.println("Old id: " + id);
return id;
} else {
ExportedObject ho = new ExportedObject(o);
ho.refs++;
exports.put(ctr, ho);
exportsBack.put(o, ctr);
//System.err.println("New id: " + ctr);
return ctr++;
}
}
private synchronized void decExportRefCount(int id) {
//System.err.println("Decrementing object: " + id);
if(exports.containsKey(id)) {
//System.err.println("Forgetting object: " + id);
ExportedObject ho = exports.get(id);
if(--ho.refs == 0) {
// Remove the object from our local directory
exports.remove(id);
exportsBack.remove(ho.obj);
}
}
return;
}
private ObjectReference<?> makeRemoteObjectReference(int id, Class<?> clazz) {
return new ObjectReference(id, this, clazz);
}
private synchronized Object findExportById(int id) {
return exports.get(id).obj;
}
private synchronized int regMethodCallObserver(Observer o) {
returns.put(methodCallCtr, o);
return methodCallCtr++;
}
private synchronized Observer takeReturnObserver(int id) {
return returns.remove(id);
}
private Object decode(Serializable o, Class<?> proxyClass) {
if(o instanceof NullReference) {
return null;
} else if(o instanceof RemoteReference) {
// This is one of ours
RemoteReference rr = (RemoteReference) o;
return findExportById(rr.id);
} else if(o instanceof LocalReference) {
// This is a remote object
LocalReference lr = (LocalReference) o;
return makeRemoteObjectReference(lr.id, proxyClass).getStub();
} else {
// Normal serializable
return o;
}
}
private Serializable encode(Object o) {
if(o == null) {
return new NullReference();
}
if(o instanceof ObjectReference.RemotedObject) {
// It's a remote object. Get the reference we used for it
ObjectReference.RemotedObject ro = (ObjectReference.RemotedObject) o;
RemoteReference rr = new RemoteReference();
rr.id = ro.getObjectId();
return rr;
} else if(o instanceof Serializable){
// It's a serializable
return (Serializable) o;
} else {
// It's a local object; get a reference for it.
int id = findOrPutExport(o);
LocalReference lr = new LocalReference();
lr.id = id;
return lr;
}
}
private synchronized void sendMessage(Serializable s) {
try {
oos.writeObject(s);
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
// Method call & return dispatch loop
while(running) {
Object o = null;
try {
o = ois.readObject();
} catch (Exception e) {
e.printStackTrace();
break;
}
System.err.println("Received: " + o.getClass().getSimpleName());
if(o instanceof MethodCall) {
// It's a method call
try {
final MethodCall mc = (MethodCall) o;
final Object targetObject = decode(mc.target, null);
final Method m = targetObject.getClass().getMethod(mc.methodName, mc.paramTypes);
final Object[] realArgs = new Object[mc.args.length];
for(int i = 0; i < mc.args.length; i++)
realArgs[i] = decode(mc.args[i], m.getParameterTypes()[i]);
// While this method call happens, we need to be able to dispatch new calls
// ...so do it in a new thread.
new Thread() {
public void run() {
Object res;
try {
try {
res = m.invoke(targetObject, realArgs);
MethodReturn mr = new MethodReturn();
mr.callId = mc.callId;
mr.obj = encode(res);
sendMessage(mr);
} catch(InvocationTargetException e) {
Throw th = new Throw();
th.callId = mc.callId;
th.errorMessage = e.getTargetException().getMessage();
th.exceptionType = e.getTargetException().getClass();
sendMessage(th);
}
} catch(Exception e) {
e.printStackTrace();
// Shouldn't happen
}
}
}.start();
} catch (Exception e) {
e.printStackTrace();
break;
}
} else if(o instanceof MethodReturn) {
// It's a return
MethodReturn mr = (MethodReturn) o;
Observer ob = takeReturnObserver(mr.callId);
if(ob == null) {
break;
}
ob.update(null, mr);
} else if(o instanceof Throw) {
// It's a throw
Throw th = (Throw) o;
Observer ob = takeReturnObserver(th.callId);
if(ob == null) {
break;
}
ob.update(null, th);
} else if(o instanceof DecrementReference) {
DecrementReference dr = (DecrementReference) o;
decExportRefCount(dr.id);
}
}
}
// Is there any case where there can be no references to the object on the remoting side, yet the object is travelling back to the hosting side?
// No, a decref message *must* be serialized after the message itself, as the object reference is actually needed to call the method.
/**
* A reference to a remote object
*/
private static class ObjectReference<T> implements InvocationHandler {
/**
* The interface implemented by all remote objects
*/
private static interface RemotedObject {
int getObjectId();
}
private final ObjectConnection conn;
private final int id;
private final Class<T> clazz;
// This is a hack for the ControlObject
public Class<?> forcedReturnType;
public ObjectReference(int id, ObjectConnection fd, Class<T> clazz) {
this.id = id;
this.conn = fd;
this.clazz = clazz;
}
@SuppressWarnings("unchecked")
public T getStub() {
return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(),
new Class[] { clazz, RemotedObject.class },
this);
}
protected void finalize() throws Throwable {
DecrementReference dRef = new DecrementReference();
dRef.id = id;
conn.sendMessage(dRef);
}
public Object invoke(Object proxy, final Method method, Object[] args) throws Throwable {
if(method.getName().equals("getObjectId")) {
return id;
}
MethodCall mc = new MethodCall();
mc.target = new RemoteReference();
mc.target.id = id;
mc.paramTypes = method.getParameterTypes();
mc.methodName = method.getName();
if(args == null)
args = new Object[0];
mc.args = new Serializable[args.length];
for(int i = 0; i < args.length; i++) {
mc.args[i] = conn.encode(args[i]);
}
/**
* To catch the return value
*/
class MethodCallObserver implements Observer {
public volatile Object result;
public volatile boolean gotResult;
public synchronized void update(Observable o, Object arg) {
result = arg;
gotResult = true;
notify();
}
public synchronized Object waitForResult() {
while(!gotResult) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
}
return result;
}
}
MethodCallObserver rob = new MethodCallObserver();
// Register the observer, grabbing a handle to use with this method call
mc.callId = conn.regMethodCallObserver(rob);
// Write the method call
conn.sendMessage(mc);
//System.err.println("Sent method call: " + mc.toString() + ", waiting for return value...");
// Pause this thread, waiting for the return
rob.waitForResult();
Object result = rob.result;
if(result instanceof Throw) {
// It was an exception
Throw th = (Throw)result;
Constructor<?> cons = th.exceptionType.getConstructor(String.class);
if(cons != null) {
throw (Throwable)cons.newInstance(th.errorMessage);
} else {
throw (Throwable)th.exceptionType.newInstance();
}
} else if(result instanceof MethodReturn) {
// It was a normal return
MethodReturn mr = (MethodReturn) result;
return conn.decode((Serializable)mr.obj, forcedReturnType != null ? forcedReturnType : method.getReturnType());
} else {
throw new RuntimeException("Unknown message type routed to MethodCallObserver");
}
}
}
/* ---------------- Messages ---------------- */
/**
* Decrement a reference to a remote object
*/
public static class DecrementReference implements Serializable {
public int id;
public String toString() {
return "DecrementReference[" + id + "]";
}
}
/**
* A Method call to a remote object
*/
public static class MethodCall implements Serializable {
/**
* Handle to identify this method call
*/
public int callId;
/**
* Target object of this method call
*/
public RemoteReference target;
/**
* Name of method
*/
public String methodName;
/**
* Method parameter types
*/
public Class[] paramTypes;
/**
* Method arguments
*/
public Serializable[] args;
public String toString() {
return "MethodCall#" + callId + "[" + methodName + "]";
}
}
/**
* A Method return from a remote object
*/
public static class MethodReturn implements Serializable {
/**
* Handle used in method call
*/
public int callId;
/**
* Object returned
*/
public Serializable obj;
}
/**
* Exception throw from a remote object
*/
public static class Throw implements Serializable {
public int callId;
public String errorMessage;
public Class<?> exceptionType;
public String toString() {
return "Exception[" + errorMessage + "]";
}
}
/* ---------------- References ---------------- */
/**
* Object on the sender's side
*/
public static class LocalReference implements Serializable {
public int id;
public String toString() {
return "LocalReference[" + id + "]";
}
}
/**
* Object on the receiver's side
*/
public static class RemoteReference implements Serializable {
public int id;
public String toString() {
return "RemoteReference[" + id + "]";
}
}
/**
* A Null Reference
*/
public static class NullReference implements Serializable {
public String toString() {
return "NullReference";
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment