Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
pypy+stackless+twisted示例
pypy+stackless+twisted 示例
#!/usr/bin/env python
# -*- coding=utf-8 -*-
# Author: likebeta <ixxoo.me@gmail.com>
# Create: 2015-10-11
import sys
import json
import struct
import logging
from twisted.python import log
from twisted.internet import defer
from twisted.internet import reactor
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import ClientFactory
from twisted.internet.protocol import connectionDone
class SimpleProtocol(Protocol):
def __init__(self):
self._data = ''
def dataReceived(self, data):
self._data += data
while len(self._data) > 4:
msg_len = struct.unpack('I', self._data[:4])[0]
if msg_len > len(self._data) - 4:
return
body_data = self._data[4:4 + msg_len]
self._data = self._data[4 + msg_len:]
msg = json.loads(body_data)
log.msg("====recv====, len=%d body=%s" % (msg_len, body_data))
self.player.on_msg(msg)
def sendMsg(self, msg):
if self.connected:
data = json.dumps(msg)
header_data = struct.pack('I', len(data))
try:
self.transport.write(header_data + data)
log.msg("====send====, len=%d body=%s" % (len(data), data))
return True
except Exception, e:
log.msg(msg['cmd'], logLevel=logging.ERROR)
return False
else:
log.msg('not connect, cannot send msg %s' % msg['cmd'])
return False
def stop(self):
log.msg('active close connect')
self.transport.loseConnection()
def connectionMade(self):
self._data = ''
self.player.run()
def connectionLost(self, reason=connectionDone):
self.factory.done(reason)
self.connected = 0
class SimpleFactory(ClientFactory):
protocol = SimpleProtocol
def __init__(self, deferred, player):
self.deferred = deferred
self.player = player
def done(self, reason):
if self.deferred:
d, self.deferred = self.deferred, None
d.callback(reason)
def clientConnectionFailed(self, connector, reason):
if self.deferred:
d, self.deferred = self.deferred, None
d.errback(reason)
def buildProtocol(self, addr):
p = ClientFactory.buildProtocol(self, addr)
p.player = self.player
self.player.protocol = p
return p
class PlayerClient(object):
gid = 1
def __init__(self, userId):
self.userId = userId
def close(self):
self.protocol.stop()
def run(self):
if not self.req_hold():
self.close()
def on_msg(self, msg):
if 'error' in msg:
self.close()
return
def send_to_svrd(self, msg):
return self.protocol.sendMsg(msg)
def req_hold(self):
if not self.protocol.connected:
return
msg = {
'cmd': 'hold',
'param': {
'userId': self.userId,
'gameId': self.gid,
}
}
reactor.callLater(1, self.req_hold)
return self.send_to_svrd(msg)
def done(reason):
log.msg('connect lost:', reason)
reactor.stop()
def connect_failed(err):
if str(err) != str(connectionDone):
log.msg(err, logLevel=logging.ERROR)
else:
log.msg('connect close gracefully')
reactor.stop()
class __FileLogObserver__(log.FileLogObserver):
timeFormat = '%m-%d %H:%M:%S.%f'
def emit(self, eventDict):
eventDict['system'] = '-'
log.FileLogObserver.emit(self, eventDict)
if __name__ == '__main__':
flo = __FileLogObserver__(sys.stdout)
log.startLoggingWithObserver(flo.emit)
for i in range(10000, 10101):
p = PlayerClient(i)
d = defer.Deferred()
fy = SimpleFactory(d, p)
reactor.connectTCP('127.0.0.1', 9527, fy)
reactor.run()
#!/usr/bin/env python
# -*- coding=utf-8 -*-
# Author: likebeta <ixxoo.me@gmail.com>
# Create: 2015-10-11
import sys
import socket
import struct
import json
from twisted.internet.protocol import Protocol
from twisted.python import log
import logging
import stackless
from twisted.internet import reactor
from twisted.web import client
class TcpProtocol(Protocol):
def __init__(self):
self._data = ''
def connectionMade(self):
self.transport.setTcpNoDelay(1)
self.transport.setTcpKeepAlive(1)
try:
sock = self.transport.getHandle()
sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 30)
sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 10)
except Exception, e:
log.msg('not support TCP_KEEPIDLE', logLevel=logging.ERROR)
def sendMessage(self, msg):
try:
data = json.dumps(msg)
log.msg('==== SEND TCP:', repr(data))
if self.transport and self.connected:
header = struct.pack('I', len(data))
self.transport.write(header + data)
return True
else:
log.msg('==== ERROR: cannot connected !! protocol =', self, repr(data), logLevel=logging.ERROR)
except Exception, e:
import traceback
traceback.print_exc()
log.msg(msg, logLevel=logging.ERROR)
return False
def dataReceived(self, data):
self._data += data
while len(self._data) > 4:
msg_len = struct.unpack('I', self._data[:4])[0]
if msg_len > len(self._data) - 4:
return
body_data = self._data[4:4+msg_len]
self._data = self._data[4+msg_len:]
try:
log.msg('==== RECV TCP:', repr(body_data))
msg = json.loads(body_data)
tasklet = self.makeTasklet(msg, self)
stackless.tasklet(tasklet.run)()
except Exception, e:
import traceback
traceback.print_exc()
log.msg(body_data, logLevel=logging.ERROR)
self.transport.loseConnection()
return
reactor.callLater(0, stackless.schedule)
def makeTasklet(self, msg, connection):
return TaskletSimple(msg, connection)
class TaskletSimple(object):
def __init__(self, msg, connection):
self.msg = msg
self.connection = connection
def wait_for_deferred(self, d, tip=None):
try:
d.addCallbacks(self.__callback, self.__errorback)
return self._return_channel.receive()
except Exception, e:
import traceback
traceback.print_exc()
raise e
def __callback(self, msg):
try:
self._return_channel.send(msg)
except Exception, e:
log.msg(str(e), logLevel=logging.ERROR)
self._return_channel.send_exception(Exception, e)
if stackless.getcurrent() != self._tasklet_instance:
stackless.schedule()
def __errorback(self, fault):
try:
self._return_channel.send_exception(fault.type, fault.value)
except Exception, e:
log.msg(fault, logLevel=logging.ERROR)
self._return_channel.send_exception(Exception, e)
if stackless.getcurrent() != self._tasklet_instance:
stackless.schedule()
def run(self):
self._return_channel = stackless.channel()
current = stackless.getcurrent()
current._class_instance = self
self._tasklet_instance = current
try:
self.handle()
except Exception, e:
import traceback
traceback.print_exc()
def handle(self):
d = client.getPage('http://119.29.29.29/d?dn=www.ixxoo.me')
result = self.wait_for_deferred(d)
msg = {
'cmd': self.msg['cmd'],
'param': {
'userId': self.msg['param']['userId'],
'gameId': self.msg['param']['gameId'],
'ip': result,
}
}
self.connection.sendMessage(msg)
class __FileLogObserver__(log.FileLogObserver):
timeFormat = '%m-%d %H:%M:%S.%f'
def emit(self, eventDict):
eventDict['system'] = '%s' % id(stackless.getcurrent())
log.FileLogObserver.emit(self, eventDict)
if __name__ == '__main__':
flo = __FileLogObserver__(sys.stdout)
log.startLoggingWithObserver(flo.emit)
from twisted.internet.protocol import ServerFactory
fy = ServerFactory()
fy.protocol = TcpProtocol
reactor.listenTCP(9527, fy)
stackless.tasklet(reactor.run)()
stackless.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.