Skip to content

Instantly share code, notes, and snippets.

@pcmanus
Created December 9, 2015 13:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pcmanus/495d1d6f9418a99147ed to your computer and use it in GitHub Desktop.
Save pcmanus/495d1d6f9418a99147ed to your computer and use it in GitHub Desktop.
Insertion script for blog post example
"""Simple Data loader.
Usage:
insert.py [-c] [-n=<x>] [-p=<y>]
Options:
-c enable compression (disabled by default)
-n=<x> total number of rows to insert (default: 1M)
-p=<y> number of partitions to insert the row into (default: 100)
"""
from __future__ import print_function
from uuid import uuid4, uuid1
from docopt import docopt
from cassandra.cluster import Cluster
from random import randint, choice, random
from string import ascii_letters
max_async_query = 1000
if __name__ == '__main__':
args = docopt(__doc__)
use_compression = args['-c']
rows = 1000000 if args['-n'] is None else args['-n']
partitions = 100 if args['-p'] is None else args['-p']
assert rows > partitions
cluster = Cluster()
session = cluster.connect()
session.execute("CREATE KEYSPACE ks WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' };")
session.execute("""
CREATE TABLE ks.events (
id uuid,
received_at timeuuid,
property_1 int,
property_2 text,
property_3 float,
PRIMARY KEY (id, received_at)
) WITH compression = {'sstable_compression': '%s'}
""" % ('LZ4Compressor' if use_compression else ''))
insert = session.prepare('INSERT INTO ks.events (id, received_at, property_1, property_2, property_3) VALUES (?, ?, ?, ?, ?)')
row_per_partition = rows // partitions
futures = []
for _ in range(0, partitions):
key = uuid4()
for _ in range(0, row_per_partition):
random_text = ''.join(choice(ascii_letters) for _ in range(randint(3, 10)))
futures.append(session.execute_async(insert, [key, uuid1(), randint(0, 2^31), random_text, random()]))
if (len(futures) > max_async_query):
for f in futures:
f.result()
futures = []
for f in futures:
f.result()
assert len(list(session.execute('SELECT * from ks.events;'))) == rows
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment