Last active
August 29, 2015 14:14
-
-
Save erasmas/25b0eac0c5ba5648fb7d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2015-02-04 12:47:45,300 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:initialize(581)) - Using ResourceCalculatorProcessTree : null | |
2015-02-04 12:47:45,327 INFO [LocalJobRunner Map Task Executor #0] io.MultiInputSplit (MultiInputSplit.java:readFields(161)) - current split input path: file:/tmp/staging/data/staging-path/part-00000 | |
2015-02-04 12:47:45,328 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:updateJobWithSplit(462)) - Processing split: cascading.tap.hadoop.io.MultiInputSplit@29033afe | |
2015-02-04 12:47:45,333 WARN [LocalJobRunner Map Task Executor #0] io.MultiInputFormat (Util.java:retry(768)) - unable to get record reader, but not retrying | |
java.io.IOException: file:/tmp/staging/data/staging-path/part-00000 not a SequenceFile | |
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1850) | |
at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1810) | |
at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1759) | |
at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1773) | |
at org.apache.hadoop.mapred.SequenceFileRecordReader.<init>(SequenceFileRecordReader.java:49) | |
at org.apache.hadoop.mapred.SequenceFileInputFormat.getRecordReader(SequenceFileInputFormat.java:64) | |
at cascading.tap.hadoop.io.MultiInputFormat$1.operate(MultiInputFormat.java:253) | |
at cascading.tap.hadoop.io.MultiInputFormat$1.operate(MultiInputFormat.java:248) | |
at cascading.util.Util.retry(Util.java:762) | |
at cascading.tap.hadoop.io.MultiInputFormat.getRecordReader(MultiInputFormat.java:247) | |
at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.<init>(MapTask.java:168) | |
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:409) | |
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342) | |
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) | |
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) | |
at java.util.concurrent.FutureTask.run(FutureTask.java:266) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) | |
at java.lang.Thread.run(Thread.java:744) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns dm.etl | |
(:require [dm.conf :refer [job-conf]] | |
[dm.utils :refer :all] | |
[cascalog.api :refer :all] | |
[cascalog.checkpoint :refer [workflow]] | |
[cascalog.logic.vars :as v]) | |
(:import (cascalog.ops IdentityBuffer))) | |
(defn force-reduce | |
[gen] | |
(let [num-fields (count (get-out-fields gen)) | |
out-fields (v/gen-nullable-vars num-fields) | |
in-fields (v/gen-nullable-vars num-fields)] | |
(<- out-fields | |
(gen :>> in-fields) | |
((IdentityBuffer.) :<< in-fields :>> out-fields)))) | |
(defn run-query | |
[out-path query] | |
(?- (hfs-seqfile out-path) | |
(force-reduce query))) | |
(defn partition-data | |
[fields scheme partition-fields in-path out-path error-sink] | |
(let [source (hfs-seqfile in-path) | |
query (<- fields | |
(source :>> fields) | |
(:trap error-sink))] | |
(with-job-conf | |
job-conf | |
(sink-to-parquet query scheme out-path partition-fields)) | |
)) | |
(defn run-workflow | |
[query scheme partition-fields out-path error-sink] | |
(let [fields (get-out-fields query)] | |
(workflow ["/tmp/staging"] | |
run-query ([:tmp-dirs [staging-path]] | |
(run-query staging-path query)) | |
partition-data ([:deps :last] | |
(partition-data fields scheme partition-fields staging-path out-path error-sink)) | |
))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment