Skip to content

Instantly share code, notes, and snippets.

@sklam
Created October 23, 2017 20:50
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sklam/45ba2f7136cd77a76d4e10b77de7230a to your computer and use it in GitHub Desktop.
Save sklam/45ba2f7136cd77a76d4e10b77de7230a to your computer and use it in GitHub Desktop.
Randomwalk PageRank in numba. Multithreaded CPU and single CUDA-GPU implementation.

README

Setup

Create conda environment with:

conda create -n pagerank35 python=3.5 numba pyculib cudatoolkit networkx

Activate environemnt with:

source activate pagerank35

Usage

main.py is the main program for demonstrating the random-walk pagerank implementation.

python main.py [<n_communities>]

Arguments:

  • n_communities : int; defaults to 100

    The number of communities in the randomly generated graph.

"""
Implementation of CPU random walk pagerank.
The implementation is optimized with the numba JIT compiler
and uses multi-threads for parallel execution.
"""
import multiprocessing
from concurrent.futures import ThreadPoolExecutor
import os
from numba import jit, float32, uint64, uint32
import numpy as np
CPU_COUNT = int(os.environ.get('CPU_COUNT', multiprocessing.cpu_count()))
MAX32 = uint32(0xffffffff)
@jit("(uint64[::1], uint64)", nogil=True)
def xorshift(states, id):
x = states[id]
x ^= x >> 12
x ^= x << 25
x ^= x >> 27
states[id] = x
return uint64(x) * uint64(2685821657736338717)
@jit("float32(uint64[::1], uint64)", nogil=True)
def xorshift_float(states, id):
return float32(float32(MAX32 & xorshift(states, id)) / float32(MAX32))
@jit("boolean(intp, uint32[::1], uint32[::1], uint32[::1], uint32[::1], "
"float32, uint64[::1])", nogil=True)
def random_walk_per_node(curnode, coupons, visits, colidx, edges, resetprob,
randstates):
moving = False
for _ in range(coupons[curnode]):
randnum = xorshift_float(randstates, curnode)
if randnum < resetprob:
# Terminate coupon
continue
else:
base = colidx[curnode]
offset = colidx[curnode + 1]
# If the edge list is non-empty
if offset - base > 0:
# Pick a random destination
randint = xorshift(randstates, curnode)
randdestid = uint64(randint % uint64(offset - base)) + uint64(base)
dest = edges[randdestid]
else:
# Random restart
randint = xorshift(randstates, curnode)
randdestid = randint % uint64(visits.size)
dest = randdestid
# Increment visit count
visits[dest] += 1
moving = True
return moving
@jit(nogil=True)
def job(tid, step, coupons, visits, colidx, edges, resetprob, randstates):
moving = False
base = step * tid
numnodes = colidx.size - 1
for i in range(base, min(base + step, numnodes)):
# XXX: numba bug with returned boolean types
if 1 == random_walk_per_node(i, coupons, visits, colidx, edges,
resetprob, randstates):
moving = True
return moving
@jit(nogil=True)
def sum_vertical(temp, visits, start, stop):
# for n in range(visits.size):
for n in range(start, stop):
for i in range(temp.shape[0]):
visits[n] += temp[i, n]
def random_walk_round(coupons, visits, colidx, edges, resetprob, randstates):
npar = CPU_COUNT
numnodes = colidx.size - 1
split_visits = np.zeros((npar, visits.size), dtype=visits.dtype)
assert numnodes == visits.size
step = int(np.ceil(numnodes / npar))
with ThreadPoolExecutor(max_workers=npar) as e:
futures = [e.submit(job, *(tid, step, coupons, split_visits[tid],
colidx, edges, resetprob, randstates))
for tid in range(npar)]
moving = any(f.result() for f in futures)
# with ThreadPoolExecutor(max_workers=npar) as e:
futures = [
e.submit(sum_vertical, *(split_visits, visits, n, min(n + step, visits.size)))
for n in range(0, visits.size, step)
]
[f.result() for f in futures]
return moving
def random_walk(nodes, coupons, colidx, edges, resetprob):
visits = np.zeros(len(nodes), dtype=np.uint32)
total = visits.copy()
randstates = np.random.randint(1, 0xffffffff, size=len(nodes)).astype(np.uint64)
while True:
visits.fill(0)
moving = random_walk_round(coupons, visits, colidx, edges, resetprob,
randstates)
if not moving:
break
else:
coupons[:] = visits[:]
total += visits
return total
"""
Implementation of CUDA accelerated random walk pagerank.
The implementation is optimized with the CUDA JIT in numba
for single GPU execution.
"""
import numpy as np
from numba import uint64, uint32, float32, cuda
from pyculib import sorting
MAX32 = uint32(0xffffffff)
@cuda.jit("(uint64[::1], uint64)", device=True)
def cuda_xorshift(states, id):
x = states[id]
x ^= x >> 12
x ^= x << 25
x ^= x >> 27
states[id] = x
return uint64(x) * uint64(2685821657736338717)
@cuda.jit("float32(uint64[::1], uint64)", device=True)
def cuda_xorshift_float(states, id):
return float32(float32(MAX32 & cuda_xorshift(states, id)) / float32(MAX32))
@cuda.jit(device=True)
def cuda_random_walk_per_node(curnode, visits, colidx, edges, resetprob,
randstates):
tid = cuda.threadIdx.x
randnum = cuda_xorshift_float(randstates, tid)
if randnum >= resetprob:
base = colidx[curnode]
offset = colidx[curnode + 1]
# If the edge list is non-empty
if offset - base > 0:
# Pick a random destination
randint = cuda_xorshift(randstates, tid)
randdestid = (uint64(randint % uint64(offset - base)) +
uint64(base))
dest = edges[randdestid]
else:
# Random restart
randint = cuda_xorshift(randstates, tid)
randdestid = randint % uint64(visits.size)
dest = randdestid
# Increment visit count
cuda.atomic.add(visits, dest, 1)
MAX_TPB = 64 * 2
@cuda.jit("void(uint32[::1], uint32[::1], uint32[::1], uint32[::1], float32, "
"uint64[::1], uint32[::1])")
def cuda_random_walk_round(coupons, visits, colidx, edges, resetprob,
randstates, remap):
sm_randstates = cuda.shared.array(MAX_TPB, dtype=uint64)
tid = cuda.threadIdx.x
blkid = cuda.blockIdx.x
if blkid < coupons.size:
workitem = remap[blkid]
sm_randstates[tid] = randstates[workitem] + tid
count = coupons[workitem]
# While there are coupons
while count:
# Try to assign coupons to every thread in the block
assigned = min(count, cuda.blockDim.x)
count -= assigned
# Thread within assigned range
if tid < assigned:
cuda_random_walk_per_node(workitem, visits, colidx, edges,
resetprob, sm_randstates)
# Kill the thread
else:
return
if tid == 0:
randstates[workitem] = sm_randstates[tid]
@cuda.jit("void(uint32[::1], uint32[::1])")
def cuda_reset_and_add_visits(visits, total):
tid = cuda.grid(1)
if tid < visits.size:
total[tid] += visits[tid]
visits[tid] = 0
@cuda.jit("void(uint32[::1], uint32[::1])")
def cuda_search_non_zero(bins, count):
loc = cuda.grid(1)
if loc < bins.size:
if bins[loc] != 0:
cuda.atomic.add(count, 0, 1)
def random_walk(nodes, coupons, colidx, edges, resetprob):
visits = np.zeros(len(nodes), dtype=np.uint32)
numnodes = len(nodes)
randstates = np.random.randint(1, 0xffffffff, size=len(nodes)).astype(
np.uint64)
d_remap = cuda.to_device(np.arange(len(nodes), dtype=np.uint32))
nnz = np.array([len(nodes)], dtype=np.uint32)
d_nnz = cuda.to_device(nnz)
d_coupons = cuda.to_device(coupons)
d_visits = cuda.to_device(visits)
d_visits_tmp = cuda.to_device(visits)
d_colidx = cuda.to_device(colidx)
d_edges = cuda.to_device(edges)
d_randstates = cuda.to_device(randstates)
d_total = cuda.to_device(visits)
round_count = 0
sorter = sorting.RadixSort(maxcount=d_remap.size, dtype=d_visits.dtype,
descending=True)
while nnz[0]: # and round_count < 3:
round_count += 1
# Run random walk kernel
device_args = (d_coupons, d_visits, d_colidx, d_edges, resetprob,
d_randstates, d_remap)
cuda_random_walk_round[nnz[0], MAX_TPB](*device_args)
# Prepare for next round
d_coupons.copy_to_device(d_visits)
d_visits_tmp.copy_to_device(d_visits)
# Remap indices to that earlier ones have more to do
d_remap = sorter.argsort(keys=d_visits_tmp)
d_nnz.copy_to_device(np.zeros(1, dtype=np.uint32))
cuda_search_non_zero.forall(d_visits_tmp.size)(d_visits_tmp,
d_nnz)
nnz = d_nnz.copy_to_host()
cuda_reset_and_add_visits.forall(numnodes)(d_visits, d_total)
return d_total.copy_to_host()
#!/usr/bin/env python
"""
Main program for demonstrating the random-walk pagerank implementation.
python main.py [<n_communities>]
Arguments:
n_communities : int; defaults to 100
The number of communities in the randomly generated graph.
"""
import sys
from timeit import default_timer as timer
import operator
import math
import random
import contextlib
import networkx as nx
import numpy as np
import cpu_randomwalk
import cuda_randomwalk
def make_random_graph(n_communities):
seed = 123
random.seed(seed)
# Generate a very sparse community graph.
communities = [random.randrange(5, 100) for _ in range(n_communities)]
prob_inner_connect = 0.002
prob_outer_connect = 0.0001
G = nx.random_partition_graph(communities, prob_inner_connect,
prob_outer_connect,
directed=True, seed=seed)
print('# of nodes: {}'.format(G.number_of_nodes()))
print('# of edges: {}'.format(G.number_of_edges()))
# To draw the graph with `dot`,
# use the following to write out the graph:
# nx.nx_pydot.write_dot(G, 'my.dot')
return G
def numpy_reference_implementation(G):
pr = nx.pagerank_numpy(G)
toplist = list(sorted(pr.items(), key=operator.itemgetter(1), reverse=True))
print('top5', [i[0] for i in toplist[:5]])
def random_walk_implementation(G, gpu=False):
spmat = nx.to_scipy_sparse_matrix(G)
colidx = spmat.indptr
edges = spmat.indices
nodes = list(G.nodes())
compute(colidx, edges, nodes, gpu=gpu)
def compute(colidx, edges, nodes, gpu=False):
coupons = np.zeros(len(nodes), dtype="uint32")
resetprob = 0.01
assert 0 < resetprob < 1
K = int(2 * math.log(len(nodes)))
assert K >= 1
coupons.fill(K)
colidx = colidx.astype(np.uint32)
edges = edges.astype(np.uint32)
if gpu:
module = cuda_randomwalk
else:
module = cpu_randomwalk
ranks = module.random_walk(nodes, coupons, colidx, edges, resetprob)
print('top5', [v for k, v in sorted(zip(ranks, nodes), reverse=True)][:5])
@contextlib.contextmanager
def timing():
s = timer()
yield
e = timer()
print('execution time: {:.2f}s'.format(e - s))
if __name__ == '__main__':
n_communities = 100
try:
n_communities = int(sys.argv[1])
except IndexError:
pass
print('# of communities: {}'.format(n_communities))
G = make_random_graph(n_communities)
print('multiCPU random walk')
with timing():
random_walk_implementation(G, gpu=False)
print('single-GPU random walk')
with timing():
random_walk_implementation(G, gpu=True)
print('numpy reference')
if G.number_of_edges() > 100000:
# the numpy based implementation will consume too much memory and time
# for large graphs.
print('disabled for large graph')
else:
with timing():
numpy_reference_implementation(G)
@SkBlaz
Copy link

SkBlaz commented Mar 14, 2018

Hello, thanks again for this very nice example. One question, what are the coupons? Is this related to GPU's thread-block model?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment