Skip to content

Instantly share code, notes, and snippets.

@edhiley
Last active August 29, 2015 14:02
Show Gist options
  • Save edhiley/5b96a9edebf782f0d269 to your computer and use it in GitHub Desktop.
Save edhiley/5b96a9edebf782f0d269 to your computer and use it in GitHub Desktop.
Two examples of Riak map reduce, curl and python client
from riak import RiakClient, RiakNode
host, port = 'localhost', '10091'
timeout = 6000000
client = RiakClient(protocol='http', hosts=host, http_port=port)
if not client.ping():
print "Cannot connect to riak on {}:{}".format(host, port)
exit()
inputs = dict({
"bucket": "pdsRecord",
"index": "$key",
"start": "0",
"end": "Z"
})
map_phase = dict( {
"language":"erlang",
"module":"distinct_hidden_name_types_count",
"function":"map_key_value" })
reduce_phase = dict({
"language":"erlang",
"module":"common_reconciliation_functions",
"function":"reduce" })
query = [dict(map=map_phase), dict(reduce=reduce_phase)]
# equivalent to curl mr
# print client.mapred(inputs=inputs, query=query, timeout=6000000)
# stream each phase
for phase_num, data in client.stream_mapred(inputs=inputs, query=query, timeout=6000000):
print phase_num, data
curl -o /tmp/distinct_hidden_name_types_count.txt -v -X POST -H "content-type: application/json" http://localhost:10091/mapred --data @-<<\EOF
{
"inputs": {
"bucket": "pdsRecord",
"index": "$key",
"start": "0",
"end": "Z"
},
"query" :
[
{ "map": {"language":"erlang", "module":"distinct_hidden_name_types_count", "function":"map_key_value" } },
{ "reduce": {"language":"erlang", "module":"common_reconciliation_functions", "function":"reduce" } }
],
"timeout": 6000000
}
EOF
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment