Skip to content

Instantly share code, notes, and snippets.

@thobbs
Forked from arodrime/Datastax Python priver with multiprocessing.Pool
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thobbs/9491892 to your computer and use it in GitHub Desktop.
Save thobbs/9491892 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import logging
log = logging.getLogger()
log.setLevel('INFO')
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
log.addHandler(handler)
from multiprocessing import Pool
from cassandra.cluster import Cluster
from cassandra.io.libevreactor import LibevConnection
KEYSPACE = "testkeyspace"
def do_stuff(procnum):
print 'a'
log.info("Starting process %d", procnum)
cluster = Cluster(['127.0.0.1'])
cluster.connection_class = LibevConnection
print 'b'
session = cluster.connect()
print 'c'
for i in range(10):
rows = session.execute("SELECT * FROM testkeyspace.mytable")
log.info("Process %d fetched %d rows", procnum, len(rows))
cluster.shutdown()
def main():
cluster = Cluster(['127.0.0.1'])
cluster.connection_class = LibevConnection
session = cluster.connect()
rows = session.execute("SELECT keyspace_name FROM system.schema_keyspaces")
if KEYSPACE in [row[0] for row in rows]:
log.info("dropping existing keyspace...")
session.execute("DROP KEYSPACE " + KEYSPACE)
log.info("creating keyspace...")
session.execute("""
CREATE KEYSPACE %s
WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
""" % KEYSPACE)
log.info("setting keyspace...")
session.set_keyspace(KEYSPACE)
log.info("creating table...")
session.execute("""
CREATE TABLE mytable (
thekey text,
col1 text,
col2 text,
PRIMARY KEY (thekey, col1)
)
""")
prepared = session.prepare("""
INSERT INTO mytable (thekey, col1, col2)
VALUES (?, ?, ?)
""")
for i in range(10000):
if i % 1000 == 0:
log.info("Insering row %d", i)
session.execute(prepared.bind(("key%d" % i, 'b', 'b')))
cluster.shutdown()
futures = []
pool = Pool(processes=2)
for i in range(4):
print i
future = pool.apply_async(do_stuff, args=(i,))
futures.append(future)
for i, future in enumerate(futures):
log.info("Waiting for process %d to complete", i)
future.get()
log.info("Process %d completed", i)
pool.close()
pool.join()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment