Created
October 1, 2014 09:09
-
-
Save freider/a69710e50310094db40e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class PigJob(luigi.Task): | |
def script_body(self): | |
""" | |
-- Your pig script body goes here | |
""" | |
def requires(self): | |
# should return a dictionary of input tasks | |
# they will be inserted as variables in the beginning of the produced | |
# pig script | |
pass | |
def output(self): | |
# should return a HdfsTarget | |
pass | |
def pig_command_path(self): | |
return luigi.configuration.get_config().get("pig", "command", "pig") | |
def cmdline(self, filename): | |
log_location = self.log_location() | |
log_options = [] | |
if log_location: | |
log_options = ["-l", log_location] | |
return ( | |
[self.pig_command_path()] + | |
log_options + | |
["-f", filename] | |
) | |
def run(self): | |
assert isinstance(self.output(), luigi.HdfsTarget) | |
if not self.output().is_writable(): | |
raise Exception( | |
"Can not write to output path %s Probably wrong permissions." % | |
self.output().path) | |
outputpath = self.output().path | |
tmpoutput = luigi.HdfsTarget(is_tmp=True) | |
compiled_script = self.build_script(tmpoutput.path) | |
scriptfile = tempfile.NamedTemporaryFile() | |
scriptfile.write(compiled_script) | |
scriptfile.flush() | |
scriptfile.seek(0) | |
cmdline = self.cmdline(scriptfile.name) | |
logger.info("Running cmdline: %r", cmdline) | |
logger.info("With pig script:\n%s", compiled_script) | |
p = subprocess.Popen( | |
cmdline) | |
p.communicate() | |
if p.returncode == 0: | |
logger.info( | |
"Job completed with temporary output in %s", tmpoutput.path) | |
tmpoutput.move_dir(self.output()) | |
logger.info("Moved final data set to %s", outputpath) | |
else: | |
logger.error("Error when running script:\n%s", compiled_script) | |
raise Exception( | |
"Pig script failed with return value: %s" % (p.returncode,)) | |
def log_location(self): | |
return "/dev/null" | |
def build_script(self, output_path): | |
parts = [ | |
self._register_snippet(), | |
self._job_name_snippet(), | |
self._input_snippet(), | |
self.script_body, | |
self._output_snippet(output_path) | |
] | |
return '\n\n'.join(parts) | |
def _register_snippet(self): | |
return """ | |
REGISTER hdfs:///lib/jars/piggybank.jar; | |
REGISTER hdfs:///lib/jars/jackson-core-asl-1.8.8.jar; | |
REGISTER hdfs:///lib/jars/avro-1.7.4.jar; | |
REGISTER hdfs:///lib/jars/jackson-mapper-asl-1.8.8.jar; | |
REGISTER hdfs:///lib/jars/json-simple-1.1.jar; | |
REGISTER hdfs:///lib/jars/snappy-java-1.0.4.1.jar; | |
""" | |
def _job_name_snippet(self): | |
return "SET job.name '{name}';".format(name=self.task_id) | |
def _input_snippet(self): | |
inputs = [] | |
for name, target in self.input().iteritems(): | |
inputs.append( | |
"{name} = load '{path}' USING " | |
"org.apache.pig.piggybank.storage.avro.AvroStorage ();".format( | |
name=name, | |
path=target.path | |
) | |
) | |
return '\n'.join(inputs) | |
def _output_snippet(self, output_path): | |
#<pig | |
pattern = ( | |
"store OUTPUTS into '{path}' using " | |
"org.apache.pig.piggybank.storage.avro.AvroStorage" | |
"('schema', '{schema}');" | |
) | |
#pig> | |
return pattern.format( | |
path=output_path, | |
schema=get_schema_string(self.output().schema) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment