Skip to content

Instantly share code, notes, and snippets.

@alex8224
Created December 29, 2015 12:33
Show Gist options
  • Save alex8224/816eeb17f93606089ff1 to your computer and use it in GitHub Desktop.
Save alex8224/816eeb17f93606089ff1 to your computer and use it in GitHub Desktop.
greenlet in tornado
# -*- coding:utf-8 -*-
from __future__ import absolute_import
import sys
import greenlet
import socket
import time
from tornado.iostream import IOStream
from tornado.gen import coroutine, Return
from tornado.concurrent import Future
from tornado.ioloop import IOLoop
def synclize(func):
coro = coroutine(func)
def _sync_call(*args, **kwargs):
child_gr = greenlet.getcurrent()
main = child_gr.parent
def callback(future):
if future.exc_info():
child_gr.throw(*future.exc_info())
elif future.exception():
child_gr.throw(future.exception())
else:
child_gr.switch(future.result())
IOLoop.instance().add_future(coro(*args, **kwargs), callback)
return main.switch()
return _sync_call
def spawn(callable_obj, *args, **kwargs):
future = Future()
def inner_call():
try:
result = callable_obj(*args, **kwargs)
future.set_result(result)
except Exception as ex:
future.set_exception(ex)
greenlet.greenlet(inner_call).switch()
return future
def greentask(func):
def call_func(*args, **kwargs):
future = Future()
def inner_call():
try:
result = func(*args, **kwargs)
future.set_result(result)
except Exception as ex:
future.set_result(ex)
greenlet.greenlet(inner_call).switch()
return future
return call_func
class Waiter(object):
def __init__(self):
self._greenlet = greenlet.getcurrent()
self._main = self._greenlet.parent
def switch(self, value):
self._greenlet.switch(value)
def get(self):
return self._main.switch()
def sleep(seconds):
waiter = Waiter()
unique = object()
IOLoop.current().add_timeout(time.time() + seconds, waiter.switch, unique)
waiter.get()
class AsyncSocket(object):
def __init__(self, sock):
self._iostream = IOStream(sock)
@synclize
def connect(self, address):
yield self._iostream.connect(address)
@synclize
def sendall(self, buff):
yield self._iostream.write(buff)
@synclize
def read(self, nbytes):
buff = yield self._iostream.read_bytes(nbytes)
raise Return(buff)
def close(self):
self._iostream.close()
def set_nodelay(self, flag):
self._iostream.set_nodelay(flag)
def async_connect(self, sock=None):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket = AsyncSocket(sock)
self.socket.connect((self.host, self.port))
self.socket.set_nodelay(True)
self._rfile = self.socket
self._get_server_information()
self._request_authentication()
if self.sql_mode is not None:
c = self.cursor()
c.execute("SET sql_mode=%s", (self.sql_mode,))
if self.init_command is not None:
c = self.cursor()
c.execute(self.init_command)
c.close()
self.commit()
if self.autocommit_mode is not None:
self.autocommit(self.autocommit_mode)
except socket.error:
if self.socket:
self.socket.close()
raise
def patch_pymysql():
sys.modules["pymysql"].connections.Connection.connect = async_connect
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment