Start CRDB with --max-sql-memory
appropriate for your machine's RAM.
Default 128mb is not enough for this workload.
cockroach start \
--insecure \
--store=node1 \
--listen-addr=localhost:26257 \
--http-addr=localhost:8080 \
--join=localhost:26257,localhost:26258,localhost:26259 \
--background \
--max-sql-memory=.25
pip3 install ray
I wrote the following code to execute an IMPORT
job into CRDB, implementing Ray to add multiprocessing was a matter
of following their README.
#! /usr/bin/python3
import subprocess as sb
import sys, ray, os, uuid
init = """
cockroach sql --insecure --host=localhost:26257 \
--execute=\"CREATE DATABASE IF NOT EXISTS parallelload;\"
"""
def normalize_statement():
id = str(uuid.uuid1()).replace("-", "", 4)
return """cockroach sql --insecure --host=localhost:26257 \
--database=\"parallelload\" \
--execute=\"IMPORT TABLE orders{} (
o_orderkey INTEGER NOT NULL PRIMARY KEY,
o_custkey INTEGER NOT NULL,
o_orderstatus CHAR(1) NOT NULL,
o_totalprice FLOAT NOT NULL,
o_orderdate DATE NOT NULL,
o_orderpriority CHAR(15) NOT NULL,
o_clerk CHAR(15) NOT NULL,
o_shippriority INTEGER NOT NULL,
o_comment VARCHAR(79) NOT NULL,
INDEX o_ck (o_custkey ASC),
INDEX o_od (o_orderdate ASC)
) CSV DATA ('https://storage.googleapis.com/cockroach-fixtures/tpch-csv/sf-1/orders.tbl.1')
WITH delimiter = '|';\"""".format(id)
def initialization():
print("Initial Step: Creating Database")
sb.run(init, shell=True, check=True)
@ray.remote
def execute_import():
statement = normalize_statement()
print("Running process: {}".format(os.getpid()))
try:
sb.run(statement, shell=True, check=True)
except sb.CalledProcessError as e:
sys.stderr.write(
"common::run_command() : [ERROR]: output = %s, error code = %s\n"
% (e.output, e.returncode))
if __name__ == "__main__":
initialization()
# run ray uncapped without arguments [ray.init()]
ray.init(memory=52428800, object_store_memory=78643200)
futures = [execute_import.remote() for i in range(1000)]
print(ray.get(futures))
Ray-specific parts
import ray
ray.init()
@ray.remote
def f(x):
return x * x
futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))
Cockroach-specific parts
IMPORT TABLE orders{} (
o_orderkey INTEGER NOT NULL PRIMARY KEY,
o_custkey INTEGER NOT NULL,
o_orderstatus CHAR(1) NOT NULL,
o_totalprice FLOAT NOT NULL,
o_orderdate DATE NOT NULL,
o_orderpriority CHAR(15) NOT NULL,
o_clerk CHAR(15) NOT NULL,
o_shippriority INTEGER NOT NULL,
o_comment VARCHAR(79) NOT NULL,
INDEX o_ck (o_custkey ASC),
INDEX o_od (o_orderdate ASC)
) CSV DATA ('https://storage.googleapis.com/cockroach-fixtures/tpch-csv/sf-1/orders.tbl.1')
WITH delimiter = '|';
Then execute the code and watch it go
python3 import_with_ray.py
(pid=85485) job_id status fraction_completed rows index_entries system_records bytes
(pid=85485) 509795360310591489 succeeded 1 187500 375000 0 27768880
(pid=85485) Running process: 85485
(pid=85495) job_id status fraction_completed rows index_entries system_records bytes
(pid=85495) 509795420722757633 succeeded 1 187500 375000 0 27768880
(pid=85495) Running process: 85495
Finally, go to cockroach sql
shell and verify your tables are loaded. Note, output is reduced for brevity.
root@localhost:26257/parallelload> show tables;
...
ordersfef573fc183c11eaa7b2acde48001122
ordersff03fb32183e11ea9700acde48001122
ordersff479678183c11ea9700acde48001122
ordersff58aa96183f11eaa1e9acde48001122
ordersff8e417c183c11ea99bbacde48001122
ordersff931a5a183f11ea8265acde48001122
(1004 rows)
Time: 15.318ms