Skip to content

Instantly share code, notes, and snippets.

@usbuild
Created May 7, 2015 13:25
Show Gist options
  • Save usbuild/341b1ce7ca03de53f007 to your computer and use it in GitHub Desktop.
Save usbuild/341b1ce7ca03de53f007 to your computer and use it in GitHub Desktop.
from google.protobuf import service
from test_pb2 import TestService_Stub, TestService, TestMessage, TestClientService, TestClientService_Stub
import asyncore, threading, socket, struct
class ServerService(TestService):
def setStub(self, ch):
self.stub = TestClientService_Stub(ch)
def Test(self, controller, reply, done):
msg = TestMessage()
msg.name = str(int(reply.name) + 1)
self.stub.ClientTest(None, msg, None)
class ClientService(TestClientService):
def setStub(self, ch):
self.stub = TestService_Stub(ch)
def ClientTest(self, controller, reply, done):
print reply
class RpcChannel(service.RpcChannel):
def __init__(self, sock):
self._sock = sock
def CallMethod(self, method_descriptor, rpc_controller, request, response_class, done):
data = request.SerializeToString()
totalLen = len(data) + 2
methodIndex = method_descriptor.index
t = struct.pack('<I', totalLen) + struct.pack('<H', methodIndex) + data
self._sock.sendData(t)
class TestHandler(asyncore.dispatcher):
def __init__(self, serviceObj, sock=None):
asyncore.dispatcher.__init__(self, sock)
self.wbuf = ""
self.rbuf = ""
self.serviceObj = serviceObj
self.channel = RpcChannel(self)
self.serviceObj.setStub(self.channel)
def handle_read(self):
data = self.recv(8192)
#omit async read
totalLen = struct.unpack('<I',data[0:4])[0]
methodIdx = struct.unpack('<H', data[4:6])[0]
descriptor = self.serviceObj.GetDescriptor()
method = descriptor.methods[methodIdx]
request = self.serviceObj.GetRequestClass(method)()
msg = data[6:]
request.ParseFromString(msg)
self.serviceObj.CallMethod(method, None, request, None)
def handle_close(self):
self.close()
def writable(self):
return len(self.wbuf) > 0
def readable(self):
return True
def handle_write(self):
sent = self.send(self.wbuf)
self.wbuf = self.wbuf[sent:]
def sendData(self, data):
self.wbuf += data
def connectT(self, host, port):
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect((host, port))
class Server(asyncore.dispatcher):
def __init__(self):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind(("127.0.0.1", 8889))
self.listen(5)
def handle_accept(self):
pair = self.accept()
if pair is not None:
sock, addr = pair
TestHandler(ServerService(), sock)
Server()
thd = threading.Thread(target=asyncore.loop, args=())
thd.start()
serviceObj2 = ClientService()
client = TestHandler(serviceObj2)
client.connectT("127.0.0.1", 8889)
msg = TestMessage()
msg.name = "12345"
serviceObj2.stub.Test(None, msg, None)
thd.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment