Last active
January 15, 2021 21:55
-
-
Save routevegetable/16b4020528a10172e05ae6db71d35041 to your computer and use it in GitHub Desktop.
Remoting Experiment
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.io.ObjectInputStream; | |
import java.io.ObjectOutputStream; | |
import java.io.Serializable; | |
import java.lang.reflect.Constructor; | |
import java.lang.reflect.InvocationHandler; | |
import java.lang.reflect.InvocationTargetException; | |
import java.lang.reflect.Method; | |
import java.lang.reflect.Proxy; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.Observable; | |
import java.util.Observer; | |
/** | |
* One side of an object connection. | |
*/ | |
public class ObjectConnection extends Thread { | |
interface ControlObject { | |
public Object getInitialObject(); | |
} | |
private ObjectOutputStream oos; | |
private ObjectInputStream ois; | |
public static class ExportedObject { | |
public Object obj; | |
public int refs; | |
public ExportedObject(Object o) { | |
this.obj = o; | |
} | |
} | |
// Objects we've sent out: | |
private Map<Integer, ExportedObject> exports = new HashMap<Integer, ExportedObject>(); | |
private Map<Object, Integer> exportsBack = new HashMap<Object, Integer>(); | |
// Objects we're waiting for: | |
private Map<Integer, Observer> returns = new HashMap<Integer, Observer>(); | |
private int ctr; | |
private int methodCallCtr; | |
private volatile boolean running; | |
public ObjectConnection(final Object initialLocal, ObjectInputStream ois, ObjectOutputStream oos) { | |
this.ois = ois; | |
this.oos = oos; | |
// Export this object | |
int id = findOrPutExport(new ControlObject() { | |
public Object getInitialObject() { | |
return initialLocal; | |
} | |
}); | |
assert(id == 0); | |
running = true; | |
} | |
public void close() { | |
running = false; | |
interrupt(); | |
} | |
public <T> T getInitialRemoteObject(Class<T> clazz) { | |
ObjectReference ref = makeRemoteObjectReference(0, ControlObject.class); | |
ref.forcedReturnType = clazz; | |
ControlObject co = (ControlObject) ref.getStub(); | |
return (T) co.getInitialObject(); | |
} | |
private synchronized int findOrPutExport(Object o) { | |
//System.err.println("Sending object: " + o); | |
if(exportsBack.containsKey(o)) { | |
int id = exportsBack.get(o); | |
ExportedObject ho = exports.get(id); | |
ho.refs++; | |
//System.err.println("Old id: " + id); | |
return id; | |
} else { | |
ExportedObject ho = new ExportedObject(o); | |
ho.refs++; | |
exports.put(ctr, ho); | |
exportsBack.put(o, ctr); | |
//System.err.println("New id: " + ctr); | |
return ctr++; | |
} | |
} | |
private synchronized void decExportRefCount(int id) { | |
//System.err.println("Decrementing object: " + id); | |
if(exports.containsKey(id)) { | |
//System.err.println("Forgetting object: " + id); | |
ExportedObject ho = exports.get(id); | |
if(--ho.refs == 0) { | |
// Remove the object from our local directory | |
exports.remove(id); | |
exportsBack.remove(ho.obj); | |
} | |
} | |
return; | |
} | |
private ObjectReference<?> makeRemoteObjectReference(int id, Class<?> clazz) { | |
return new ObjectReference(id, this, clazz); | |
} | |
private synchronized Object findExportById(int id) { | |
return exports.get(id).obj; | |
} | |
private synchronized int regMethodCallObserver(Observer o) { | |
returns.put(methodCallCtr, o); | |
return methodCallCtr++; | |
} | |
private synchronized Observer takeReturnObserver(int id) { | |
return returns.remove(id); | |
} | |
private Object decode(Serializable o, Class<?> proxyClass) { | |
if(o instanceof NullReference) { | |
return null; | |
} else if(o instanceof RemoteReference) { | |
// This is one of ours | |
RemoteReference rr = (RemoteReference) o; | |
return findExportById(rr.id); | |
} else if(o instanceof LocalReference) { | |
// This is a remote object | |
LocalReference lr = (LocalReference) o; | |
return makeRemoteObjectReference(lr.id, proxyClass).getStub(); | |
} else { | |
// Normal serializable | |
return o; | |
} | |
} | |
private Serializable encode(Object o) { | |
if(o == null) { | |
return new NullReference(); | |
} | |
if(o instanceof ObjectReference.RemotedObject) { | |
// It's a remote object. Get the reference we used for it | |
ObjectReference.RemotedObject ro = (ObjectReference.RemotedObject) o; | |
RemoteReference rr = new RemoteReference(); | |
rr.id = ro.getObjectId(); | |
return rr; | |
} else if(o instanceof Serializable){ | |
// It's a serializable | |
return (Serializable) o; | |
} else { | |
// It's a local object; get a reference for it. | |
int id = findOrPutExport(o); | |
LocalReference lr = new LocalReference(); | |
lr.id = id; | |
return lr; | |
} | |
} | |
private synchronized void sendMessage(Serializable s) { | |
try { | |
oos.writeObject(s); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
public void run() { | |
// Method call & return dispatch loop | |
while(running) { | |
Object o = null; | |
try { | |
o = ois.readObject(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
break; | |
} | |
System.err.println("Received: " + o.getClass().getSimpleName()); | |
if(o instanceof MethodCall) { | |
// It's a method call | |
try { | |
final MethodCall mc = (MethodCall) o; | |
final Object targetObject = decode(mc.target, null); | |
final Method m = targetObject.getClass().getMethod(mc.methodName, mc.paramTypes); | |
final Object[] realArgs = new Object[mc.args.length]; | |
for(int i = 0; i < mc.args.length; i++) | |
realArgs[i] = decode(mc.args[i], m.getParameterTypes()[i]); | |
// While this method call happens, we need to be able to dispatch new calls | |
// ...so do it in a new thread. | |
new Thread() { | |
public void run() { | |
Object res; | |
try { | |
try { | |
res = m.invoke(targetObject, realArgs); | |
MethodReturn mr = new MethodReturn(); | |
mr.callId = mc.callId; | |
mr.obj = encode(res); | |
sendMessage(mr); | |
} catch(InvocationTargetException e) { | |
Throw th = new Throw(); | |
th.callId = mc.callId; | |
th.errorMessage = e.getTargetException().getMessage(); | |
th.exceptionType = e.getTargetException().getClass(); | |
sendMessage(th); | |
} | |
} catch(Exception e) { | |
e.printStackTrace(); | |
// Shouldn't happen | |
} | |
} | |
}.start(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
break; | |
} | |
} else if(o instanceof MethodReturn) { | |
// It's a return | |
MethodReturn mr = (MethodReturn) o; | |
Observer ob = takeReturnObserver(mr.callId); | |
if(ob == null) { | |
break; | |
} | |
ob.update(null, mr); | |
} else if(o instanceof Throw) { | |
// It's a throw | |
Throw th = (Throw) o; | |
Observer ob = takeReturnObserver(th.callId); | |
if(ob == null) { | |
break; | |
} | |
ob.update(null, th); | |
} else if(o instanceof DecrementReference) { | |
DecrementReference dr = (DecrementReference) o; | |
decExportRefCount(dr.id); | |
} | |
} | |
} | |
// Is there any case where there can be no references to the object on the remoting side, yet the object is travelling back to the hosting side? | |
// No, a decref message *must* be serialized after the message itself, as the object reference is actually needed to call the method. | |
/** | |
* A reference to a remote object | |
*/ | |
private static class ObjectReference<T> implements InvocationHandler { | |
/** | |
* The interface implemented by all remote objects | |
*/ | |
private static interface RemotedObject { | |
int getObjectId(); | |
} | |
private final ObjectConnection conn; | |
private final int id; | |
private final Class<T> clazz; | |
// This is a hack for the ControlObject | |
public Class<?> forcedReturnType; | |
public ObjectReference(int id, ObjectConnection fd, Class<T> clazz) { | |
this.id = id; | |
this.conn = fd; | |
this.clazz = clazz; | |
} | |
@SuppressWarnings("unchecked") | |
public T getStub() { | |
return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), | |
new Class[] { clazz, RemotedObject.class }, | |
this); | |
} | |
protected void finalize() throws Throwable { | |
DecrementReference dRef = new DecrementReference(); | |
dRef.id = id; | |
conn.sendMessage(dRef); | |
} | |
public Object invoke(Object proxy, final Method method, Object[] args) throws Throwable { | |
if(method.getName().equals("getObjectId")) { | |
return id; | |
} | |
MethodCall mc = new MethodCall(); | |
mc.target = new RemoteReference(); | |
mc.target.id = id; | |
mc.paramTypes = method.getParameterTypes(); | |
mc.methodName = method.getName(); | |
if(args == null) | |
args = new Object[0]; | |
mc.args = new Serializable[args.length]; | |
for(int i = 0; i < args.length; i++) { | |
mc.args[i] = conn.encode(args[i]); | |
} | |
/** | |
* To catch the return value | |
*/ | |
class MethodCallObserver implements Observer { | |
public volatile Object result; | |
public volatile boolean gotResult; | |
public synchronized void update(Observable o, Object arg) { | |
result = arg; | |
gotResult = true; | |
notify(); | |
} | |
public synchronized Object waitForResult() { | |
while(!gotResult) { | |
try { | |
wait(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
return null; | |
} | |
} | |
return result; | |
} | |
} | |
MethodCallObserver rob = new MethodCallObserver(); | |
// Register the observer, grabbing a handle to use with this method call | |
mc.callId = conn.regMethodCallObserver(rob); | |
// Write the method call | |
conn.sendMessage(mc); | |
//System.err.println("Sent method call: " + mc.toString() + ", waiting for return value..."); | |
// Pause this thread, waiting for the return | |
rob.waitForResult(); | |
Object result = rob.result; | |
if(result instanceof Throw) { | |
// It was an exception | |
Throw th = (Throw)result; | |
Constructor<?> cons = th.exceptionType.getConstructor(String.class); | |
if(cons != null) { | |
throw (Throwable)cons.newInstance(th.errorMessage); | |
} else { | |
throw (Throwable)th.exceptionType.newInstance(); | |
} | |
} else if(result instanceof MethodReturn) { | |
// It was a normal return | |
MethodReturn mr = (MethodReturn) result; | |
return conn.decode((Serializable)mr.obj, forcedReturnType != null ? forcedReturnType : method.getReturnType()); | |
} else { | |
throw new RuntimeException("Unknown message type routed to MethodCallObserver"); | |
} | |
} | |
} | |
/* ---------------- Messages ---------------- */ | |
/** | |
* Decrement a reference to a remote object | |
*/ | |
public static class DecrementReference implements Serializable { | |
public int id; | |
public String toString() { | |
return "DecrementReference[" + id + "]"; | |
} | |
} | |
/** | |
* A Method call to a remote object | |
*/ | |
public static class MethodCall implements Serializable { | |
/** | |
* Handle to identify this method call | |
*/ | |
public int callId; | |
/** | |
* Target object of this method call | |
*/ | |
public RemoteReference target; | |
/** | |
* Name of method | |
*/ | |
public String methodName; | |
/** | |
* Method parameter types | |
*/ | |
public Class[] paramTypes; | |
/** | |
* Method arguments | |
*/ | |
public Serializable[] args; | |
public String toString() { | |
return "MethodCall#" + callId + "[" + methodName + "]"; | |
} | |
} | |
/** | |
* A Method return from a remote object | |
*/ | |
public static class MethodReturn implements Serializable { | |
/** | |
* Handle used in method call | |
*/ | |
public int callId; | |
/** | |
* Object returned | |
*/ | |
public Serializable obj; | |
} | |
/** | |
* Exception throw from a remote object | |
*/ | |
public static class Throw implements Serializable { | |
public int callId; | |
public String errorMessage; | |
public Class<?> exceptionType; | |
public String toString() { | |
return "Exception[" + errorMessage + "]"; | |
} | |
} | |
/* ---------------- References ---------------- */ | |
/** | |
* Object on the sender's side | |
*/ | |
public static class LocalReference implements Serializable { | |
public int id; | |
public String toString() { | |
return "LocalReference[" + id + "]"; | |
} | |
} | |
/** | |
* Object on the receiver's side | |
*/ | |
public static class RemoteReference implements Serializable { | |
public int id; | |
public String toString() { | |
return "RemoteReference[" + id + "]"; | |
} | |
} | |
/** | |
* A Null Reference | |
*/ | |
public static class NullReference implements Serializable { | |
public String toString() { | |
return "NullReference"; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment