Skip to content

Instantly share code, notes, and snippets.

@gglanzani
Created January 6, 2015 14:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gglanzani/440795adaaa22620500e to your computer and use it in GitHub Desktop.
Save gglanzani/440795adaaa22620500e to your computer and use it in GitHub Desktop.
from riak import RiakClient
from riak import RiakMapReduce
riak = RiakClient(protocol='pbc', host='172.17.12.22', http_port=8087)
bucket = riak.bucket("STATS")
mr = RiakMapReduce(riak)
keys = [d['_yz_rk'] for d in bucket.search(query="industry_id:[1 TO 22]", index="grid_stats", rows=1000)['docs']]
mr.add("STATS", keys[:40])
len(set(map(lambda x: x[:-5], keys[:40]))) # number of unique keys, 31 in this case
mr.map(['grid_mr', 'yearfun'], {"language": "erlang", "keep": False})
mr.reduce(['riak_kv_mapreduce', 'reduce_sort'], {"language": "erlang", "keep": False})
mr.reduce(['grid_binary4', 'gridreducefilter'], {"language": "erlang", "keep": True})
for result in mr.run():
for k, v in json.loads(result).iteritems():
print k, v
from riak import RiakClient
from riak import RiakMapReduce
riak = RiakClient(protocol='pbc', host='172.17.12.22', http_port=8087)
bucket = riak.bucket("STATS")
mr = RiakMapReduce(riak)
keys = [d['_yz_rk'] for d in bucket.search(query="industry_id:[1 TO 22]", index="grid_stats", rows=1000)['docs']]
mr.add("STATS", keys[:30])
len(set(map(lambda x: x[:-5], keys[:30]))) # number of unique keys, 27 in this case
mr.map(['grid_mr', 'yearfun'], {"language": "erlang", "keep": False})
mr.reduce(['riak_kv_mapreduce', 'reduce_sort'], {"language": "erlang", "keep": False})
mr.reduce(['grid_binary4', 'gridreducefilter'], {"language": "erlang", "keep": True})
for result in mr.run():
for k, v in json.loads(result).iteritems():
print k, v
# the next is the output: Note: only 15 keys!!
# 16FBC00507D0F1D2E06D4BB1E8AB8DA86BD2BDC9594556214908BEB04DC1A242 2014
# 72FE1567A941A02949979820345763AF04DC290DE3FB221B7844D5C57CB881D6 2012
# 8ED62B197342E2C5C143C74900048998D02DC9747C671EB43A0ED0CDA68314A9 2013
# 276C46C1FC720FD31FD331E4A716FE1F8DBF63D8F71BD02318FA41A264017816 2007
# 82D247B184D8C0B47DE32F7C7E630FDEB7A583457F43DD5AF51C393D28C60205 2014
# 81B0D44B4462B1E34199832ADB6BFF8B03159D1FE200562E3E874A9A9C6F71D6 2013
# CAFE88D023045184C9B64F30090C28C849A839432A50E5A05CF224B8240C46C9 2012
# 0A40AE7FB4BFCE25389CC6C9FAABD25975F27DC00EDB67C6218FD447AA107215 2008
# 8916C1835B4F38DAA45415E277123299CF5CAC2343AE01112C0C9562DD908FD0 2010
# CBDC010B0984002DEFDE60D133451D5D1E548C6BB56D3DDD6959091216084FEB 2009
# F73AB18932BC25EF6FE53C827A6A9B970FF86D165097B7172B1E0FC0B8854F93 4016
# BF93836C69CC6FD988DFC960F19D940FCA5AC86BD58CDA359950C806C811CAA1 2007
# 44EB97DEB2F996BAA69F54343F82261C7506675A49A0FB59423E11BF3321C1C2 2007
# 6BD962849AF27C9FB4C835F2C9578E0C390877AA2A35BB2BA5EEE6CE67C58077 2011
# D269CFCA858D15BA5BB4949D71C20F7E97CEB8FFAEF02234183EB17DF332A5CE 2012
---------------------------------------------------------------------------
RiakError Traceback (most recent call last)
<ipython-input-118-55ac1a58a0f2> in <module>()
2 mr.reduce(['riak_kv_mapreduce', 'reduce_sort'], {"language": "erlang", "keep": False})
3 mr.reduce(['grid_binary4', 'gridreducefilter'], {"language": "erlang", "keep": True})
----> 4 for result in mr.run():
5 for k, v in json.loads(result).iteritems():
6 print k, v
/Users/gio/.virtualenvs/numpy/lib/python2.7/site-packages/riak/mapreduce.pyc in run(self, timeout)
317 'when not allowed\n'
318 'original error: ' + e.value)
--> 319 raise e
320
321 # If the last phase is NOT a link phase, then return the result.
RiakError: 'Error running MapReduce operation. Headers: <httplib.HTTPMessage instance at 0x10b0842d8> Body: \'{"phase":2,"error":"{function_clause,[{grid_binary4,\\\'-gridreducefilter/2-fun-0-\\\',[<<\\\\"{\\\\\\\\\\\\"2DA8D2FB221F3CB8DE0006695F93879FC8B0FE21AE9A8D428A5E2F44224A34FA\\\\\\\\\\\\":4027,\\\\\\\\\\\\"276C46C1FC720FD31FD331E4A716FE1F8DBF63D8F71BD02318FA41A264017816\\\\\\\\\\\\":2008,\\\\\\\\\\\\"160ED09CD13AF4391520D043976FF8257162E9C29714625486343B7C5C05F877\\\\\\\\\\\\":2011,\\\\\\\\\\\\"14E72A0FD64D6CC61878FDA6D57F9F79201F8EA41C7FA7070A8FC60382F130AF\\\\\\\\\\\\":2008,\\\\\\\\\\\\"11DB78985329D9CA85841D94D5ECBADB5A783934D530A6950BBAD4F5BEEF14DD\\\\\\\\\\\\":2008,\\\\\\\\\\\\"11C80856AF6B69E34463443BFF3D875B29038CF05B5695FEBA8841B516AB1EE3\\\\\\\\\\\\":401...\\\\">>,...],...},...]}","input":null,"type":null,"stack":null}\''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment