Skip to content

Instantly share code, notes, and snippets.

@joshmarshall
Last active August 3, 2018 07:34
Show Gist options
  • Save joshmarshall/561bf3bf6c1477d99e7b to your computer and use it in GitHub Desktop.
Save joshmarshall/561bf3bf6c1477d99e7b to your computer and use it in GitHub Desktop.
Tornado wrapper for Cassandra driver futures
from cassandra.cluster import Cluster, OperationTimedOut
from cassandra.decoder import SyntaxException
from tornado.concurrent import Future
from tornado.testing import AsyncTestCase, gen_test
class TornadoCassandra(object):
def __init__(self, session, ioloop):
self._session = session
self._ioloop = ioloop
def execute(self, *args, **kwargs):
tornado_future = Future()
cassandra_future = self._session.execute_async(*args, **kwargs)
self._ioloop.add_callback(
self._callback, cassandra_future, tornado_future)
return tornado_future
def _callback(self, cassandra_future, tornado_future):
try:
# should spend just about no time blocking.
result = cassandra_future.result(timeout=0)
except OperationTimedOut:
return self._ioloop.add_callback(
self._callback, cassandra_future, tornado_future)
except Exception, exc:
return tornado_future.set_exception(exc)
tornado_future.set_result(result)
class TestTornadoCassandra(AsyncTestCase):
def setUp(self):
super(TestTornadoCassandra, self).setUp()
self.cluster = Cluster(["127.0.0.1"])
self.session = self.cluster.connect()
self.session.execute(
"CREATE KEYSPACE IF NOT EXISTS testingfuture WITH REPLICATION = "
"{ 'class': 'SimpleStrategy', 'replication_factor': 1 }")
self.session.execute("USE testingfuture;")
self.session.execute(
"CREATE TABLE IF NOT EXISTS footable (\n"
"key VARCHAR, \n"
"url VARCHAR, \n"
"PRIMARY KEY (key));")
self.session.execute(
"INSERT INTO footable (key, url) "
"VALUES (%s, %s)", ("foobar", "http://foo.com"))
self.connection = TornadoCassandra(self.session, ioloop=self.io_loop)
def tearDown(self):
super(TestTornadoCassandra, self).tearDown()
self.session.execute("DROP KEYSPACE testingfuture;")
@gen_test
def test_query(self):
results = yield self.connection.execute(
"SELECT key, url FROM footable;")
self.assertEqual(1, len(results))
self.assertEqual(("foobar", "http://foo.com"), results[0])
@gen_test
def test_exception(self):
with self.assertRaises(SyntaxException):
yield self.connection.execute("foobar!")
@gen_test
def test_lots_of_queries(self):
futures = []
count = 2048
for i in range(count):
futures.append(self.connection.execute(
"SELECT key FROM footable;"))
results = 0
for future in futures:
yield future
results += 1
self.assertEqual(count, results)
@tantra35
Copy link

tantra35 commented Mar 22, 2017

Hm, this is looks like synchronous solution which work at the end of the tornado ioloop. IMHO much better write something like this:

from cassandra.cluster import OperationTimedOut
from tornado.concurrent import Future

class TornadoCassandra(object):
    def __init__(self, session):
        self._session = session

    def execute(self, *args, **kwargs):
        tornado_future = Future()
        cassandra_future = self._session.execute_async(*args, **kwargs)

        cassandra_future.add_callbacks(
                  lambda _result: tornado_future.set_result(_result),
                  lambda _exception: tornado_future.set_exception(_exception)
        )

        return tornado_future

@nicholasamorim
Copy link

Hm, this is looks like synchronous solution which work at the end of the tornado ioloop

@tantra35, could you explain why you see it like that? I'm just curious as both solution do essentially the same thing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment