Skip to content

Instantly share code, notes, and snippets.

@anarchivist
Created October 14, 2010 04:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save anarchivist/625558 to your computer and use it in GitHub Desktop.
Save anarchivist/625558 to your computer and use it in GitHub Desktop.
sample MapReduce tasks for Disco to get tag counts from MARC files/streams
#!/usr/bin/env python
#
# pymarc_disco.py - Mark Matienzo
# sample MapReduce tasks for Disco to get tag counts from MARC files/streams
# usage: python pymarc_disco.py <input1> [input2 ... inputN]
import sys
from disco.core import Disco, result_iterator
from disco.settings import DiscoSettings
import pymarc
def read(fd, size, fname):
return pymarc.MARCReader(fd)
def map(record, params):
for field in record.fields:
yield field.tag, 1
def reduce(iter, params):
from disco.util import kvgroup
for tag, counts in kvgroup(sorted(iter)):
yield tag, sum(counts)
disco = Disco(DiscoSettings()['DISCO_MASTER'])
print "Starting Disco job.."
print "Go to %s to see status of the job." % disco.master
results = disco.new_job(name="tagcount",
input=sys.argv[1:],
map=map,
map_reader=read,
reduce=reduce,
save=True).wait()
print "Job done. Results:"
for word, count in result_iterator(results):
print word, count
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment