Skip to content

Instantly share code, notes, and snippets.

@thobbs
Created March 7, 2014 19:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thobbs/9418453 to your computer and use it in GitHub Desktop.
Save thobbs/9418453 to your computer and use it in GitHub Desktop.
Multiprocess Example
#!/usr/bin/env python
import logging
log = logging.getLogger()
log.setLevel('DEBUG')
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
log.addHandler(handler)
from multiprocessing import Process
from cassandra.cluster import Cluster
from cassandra.io.libevreactor import LibevConnection
KEYSPACE = "testkeyspace"
def do_stuff(procnum):
log.info("Starting process %d", procnum)
cluster = Cluster(['127.0.0.1'])
cluster.connection_class = LibevConnection
session = cluster.connect()
for i in range(10):
rows = session.execute("SELECT * FROM testkeyspace.mytable")
log.info("Process %d fetched %d rows", procnum, len(rows))
cluster.shutdown()
def main():
cluster = Cluster(['127.0.0.1'])
cluster.connection_class = LibevConnection
session = cluster.connect()
rows = session.execute("SELECT keyspace_name FROM system.schema_keyspaces")
if KEYSPACE in [row[0] for row in rows]:
log.info("dropping existing keyspace...")
session.execute("DROP KEYSPACE " + KEYSPACE)
log.info("creating keyspace...")
session.execute("""
CREATE KEYSPACE %s
WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
""" % KEYSPACE)
log.info("setting keyspace...")
session.set_keyspace(KEYSPACE)
log.info("creating table...")
session.execute("""
CREATE TABLE mytable (
thekey text,
col1 text,
col2 text,
PRIMARY KEY (thekey, col1)
)
""")
prepared = session.prepare("""
INSERT INTO mytable (thekey, col1, col2)
VALUES (?, ?, ?)
""")
for i in range(10000):
if i % 1000 == 0:
log.info("Insering row %d", i)
session.execute(prepared.bind(("key%d" % i, 'b', 'b')))
cluster.shutdown()
procs = []
for i in range(4):
procs.append(Process(target=do_stuff, args=(i,)))
for proc in procs:
proc.start()
for proc in procs:
proc.join()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment