Skip to content

Instantly share code, notes, and snippets.

@dbist
Last active December 6, 2019 17:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dbist/d37705f1da3e267d42bd03d3ff746b2f to your computer and use it in GitHub Desktop.
Save dbist/d37705f1da3e267d42bd03d3ff746b2f to your computer and use it in GitHub Desktop.

Start CockroachDB

Start CRDB with --max-sql-memory appropriate for your machine's RAM. Default 128mb is not enough for this workload.

cockroach start \
--insecure \
--store=node1 \
--listen-addr=localhost:26257 \
--http-addr=localhost:8080 \
--join=localhost:26257,localhost:26258,localhost:26259 \
--background \
--max-sql-memory=.25

Install Ray

pip3 install ray

I wrote the following code to execute an IMPORT job into CRDB, implementing Ray to add multiprocessing was a matter of following their README.

#! /usr/bin/python3

import subprocess as sb
import sys, ray, os, uuid

init = """
cockroach sql --insecure --host=localhost:26257 \
  --execute=\"CREATE DATABASE IF NOT EXISTS parallelload;\"
"""

def normalize_statement():
  id = str(uuid.uuid1()).replace("-", "", 4)
  return """cockroach sql --insecure --host=localhost:26257 \
   --database=\"parallelload\" \
   --execute=\"IMPORT TABLE orders{} (
   o_orderkey           INTEGER NOT NULL PRIMARY KEY,
   o_custkey            INTEGER NOT NULL,
   o_orderstatus        CHAR(1) NOT NULL,
   o_totalprice         FLOAT NOT NULL,
   o_orderdate          DATE NOT NULL,
   o_orderpriority      CHAR(15) NOT NULL,
   o_clerk              CHAR(15) NOT NULL,
   o_shippriority       INTEGER NOT NULL,
   o_comment            VARCHAR(79) NOT NULL,
   INDEX o_ck           (o_custkey ASC),
   INDEX o_od           (o_orderdate ASC)
 ) CSV DATA ('https://storage.googleapis.com/cockroach-fixtures/tpch-csv/sf-1/orders.tbl.1')
 WITH delimiter = '|';\"""".format(id)

def initialization():
   print("Initial Step: Creating Database")
   sb.run(init, shell=True, check=True)

@ray.remote
def execute_import():
   statement = normalize_statement()
   print("Running process: {}".format(os.getpid()))   
   try:
      sb.run(statement, shell=True, check=True)
   except sb.CalledProcessError as e:
        sys.stderr.write(
            "common::run_command() : [ERROR]: output = %s, error code = %s\n" 
            % (e.output, e.returncode))

if __name__ == "__main__":
   initialization()
   # run ray uncapped without arguments [ray.init()]
   ray.init(memory=52428800, object_store_memory=78643200)
   futures = [execute_import.remote() for i in range(1000)]
   print(ray.get(futures))

Ray-specific parts

import ray
ray.init()

@ray.remote
def f(x):
    return x * x

futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))

Cockroach-specific parts

IMPORT TABLE orders{} (
   o_orderkey           INTEGER NOT NULL PRIMARY KEY,
   o_custkey            INTEGER NOT NULL,
   o_orderstatus        CHAR(1) NOT NULL,
   o_totalprice         FLOAT NOT NULL,
   o_orderdate          DATE NOT NULL,
   o_orderpriority      CHAR(15) NOT NULL,
   o_clerk              CHAR(15) NOT NULL,
   o_shippriority       INTEGER NOT NULL,
   o_comment            VARCHAR(79) NOT NULL,
   INDEX o_ck           (o_custkey ASC),
   INDEX o_od           (o_orderdate ASC)
 ) CSV DATA ('https://storage.googleapis.com/cockroach-fixtures/tpch-csv/sf-1/orders.tbl.1')
 WITH delimiter = '|';

Then execute the code and watch it go

python3 import_with_ray.py
(pid=85485) job_id	status	fraction_completed	rows	index_entries	system_records	bytes
(pid=85485) 509795360310591489	succeeded	1	187500	375000	0	27768880
(pid=85485) Running process: 85485
(pid=85495) job_id	status	fraction_completed	rows	index_entries	system_records	bytes
(pid=85495) 509795420722757633	succeeded	1	187500	375000	0	27768880
(pid=85495) Running process: 85495

Finally, go to cockroach sql shell and verify your tables are loaded. Note, output is reduced for brevity.

root@localhost:26257/parallelload> show tables;
...
  ordersfef573fc183c11eaa7b2acde48001122
  ordersff03fb32183e11ea9700acde48001122
  ordersff479678183c11ea9700acde48001122
  ordersff58aa96183f11eaa1e9acde48001122
  ordersff8e417c183c11ea99bbacde48001122
  ordersff931a5a183f11ea8265acde48001122
(1004 rows)

Time: 15.318ms
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment