Skip to content

Instantly share code, notes, and snippets.

@freider
Created October 1, 2014 09:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save freider/a69710e50310094db40e to your computer and use it in GitHub Desktop.
Save freider/a69710e50310094db40e to your computer and use it in GitHub Desktop.
class PigJob(luigi.Task):
def script_body(self):
"""
-- Your pig script body goes here
"""
def requires(self):
# should return a dictionary of input tasks
# they will be inserted as variables in the beginning of the produced
# pig script
pass
def output(self):
# should return a HdfsTarget
pass
def pig_command_path(self):
return luigi.configuration.get_config().get("pig", "command", "pig")
def cmdline(self, filename):
log_location = self.log_location()
log_options = []
if log_location:
log_options = ["-l", log_location]
return (
[self.pig_command_path()] +
log_options +
["-f", filename]
)
def run(self):
assert isinstance(self.output(), luigi.HdfsTarget)
if not self.output().is_writable():
raise Exception(
"Can not write to output path %s Probably wrong permissions." %
self.output().path)
outputpath = self.output().path
tmpoutput = luigi.HdfsTarget(is_tmp=True)
compiled_script = self.build_script(tmpoutput.path)
scriptfile = tempfile.NamedTemporaryFile()
scriptfile.write(compiled_script)
scriptfile.flush()
scriptfile.seek(0)
cmdline = self.cmdline(scriptfile.name)
logger.info("Running cmdline: %r", cmdline)
logger.info("With pig script:\n%s", compiled_script)
p = subprocess.Popen(
cmdline)
p.communicate()
if p.returncode == 0:
logger.info(
"Job completed with temporary output in %s", tmpoutput.path)
tmpoutput.move_dir(self.output())
logger.info("Moved final data set to %s", outputpath)
else:
logger.error("Error when running script:\n%s", compiled_script)
raise Exception(
"Pig script failed with return value: %s" % (p.returncode,))
def log_location(self):
return "/dev/null"
def build_script(self, output_path):
parts = [
self._register_snippet(),
self._job_name_snippet(),
self._input_snippet(),
self.script_body,
self._output_snippet(output_path)
]
return '\n\n'.join(parts)
def _register_snippet(self):
return """
REGISTER hdfs:///lib/jars/piggybank.jar;
REGISTER hdfs:///lib/jars/jackson-core-asl-1.8.8.jar;
REGISTER hdfs:///lib/jars/avro-1.7.4.jar;
REGISTER hdfs:///lib/jars/jackson-mapper-asl-1.8.8.jar;
REGISTER hdfs:///lib/jars/json-simple-1.1.jar;
REGISTER hdfs:///lib/jars/snappy-java-1.0.4.1.jar;
"""
def _job_name_snippet(self):
return "SET job.name '{name}';".format(name=self.task_id)
def _input_snippet(self):
inputs = []
for name, target in self.input().iteritems():
inputs.append(
"{name} = load '{path}' USING "
"org.apache.pig.piggybank.storage.avro.AvroStorage ();".format(
name=name,
path=target.path
)
)
return '\n'.join(inputs)
def _output_snippet(self, output_path):
#<pig
pattern = (
"store OUTPUTS into '{path}' using "
"org.apache.pig.piggybank.storage.avro.AvroStorage"
"('schema', '{schema}');"
)
#pig>
return pattern.format(
path=output_path,
schema=get_schema_string(self.output().schema)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment