Skip to content

Instantly share code, notes, and snippets.

@erasmas
Last active August 29, 2015 14:14
Show Gist options
  • Save erasmas/25b0eac0c5ba5648fb7d to your computer and use it in GitHub Desktop.
Save erasmas/25b0eac0c5ba5648fb7d to your computer and use it in GitHub Desktop.
2015-02-04 12:47:45,300 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:initialize(581)) - Using ResourceCalculatorProcessTree : null
2015-02-04 12:47:45,327 INFO [LocalJobRunner Map Task Executor #0] io.MultiInputSplit (MultiInputSplit.java:readFields(161)) - current split input path: file:/tmp/staging/data/staging-path/part-00000
2015-02-04 12:47:45,328 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:updateJobWithSplit(462)) - Processing split: cascading.tap.hadoop.io.MultiInputSplit@29033afe
2015-02-04 12:47:45,333 WARN [LocalJobRunner Map Task Executor #0] io.MultiInputFormat (Util.java:retry(768)) - unable to get record reader, but not retrying
java.io.IOException: file:/tmp/staging/data/staging-path/part-00000 not a SequenceFile
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1850)
at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1810)
at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1759)
at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1773)
at org.apache.hadoop.mapred.SequenceFileRecordReader.<init>(SequenceFileRecordReader.java:49)
at org.apache.hadoop.mapred.SequenceFileInputFormat.getRecordReader(SequenceFileInputFormat.java:64)
at cascading.tap.hadoop.io.MultiInputFormat$1.operate(MultiInputFormat.java:253)
at cascading.tap.hadoop.io.MultiInputFormat$1.operate(MultiInputFormat.java:248)
at cascading.util.Util.retry(Util.java:762)
at cascading.tap.hadoop.io.MultiInputFormat.getRecordReader(MultiInputFormat.java:247)
at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.<init>(MapTask.java:168)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:409)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)
(ns dm.etl
(:require [dm.conf :refer [job-conf]]
[dm.utils :refer :all]
[cascalog.api :refer :all]
[cascalog.checkpoint :refer [workflow]]
[cascalog.logic.vars :as v])
(:import (cascalog.ops IdentityBuffer)))
(defn force-reduce
[gen]
(let [num-fields (count (get-out-fields gen))
out-fields (v/gen-nullable-vars num-fields)
in-fields (v/gen-nullable-vars num-fields)]
(<- out-fields
(gen :>> in-fields)
((IdentityBuffer.) :<< in-fields :>> out-fields))))
(defn run-query
[out-path query]
(?- (hfs-seqfile out-path)
(force-reduce query)))
(defn partition-data
[fields scheme partition-fields in-path out-path error-sink]
(let [source (hfs-seqfile in-path)
query (<- fields
(source :>> fields)
(:trap error-sink))]
(with-job-conf
job-conf
(sink-to-parquet query scheme out-path partition-fields))
))
(defn run-workflow
[query scheme partition-fields out-path error-sink]
(let [fields (get-out-fields query)]
(workflow ["/tmp/staging"]
run-query ([:tmp-dirs [staging-path]]
(run-query staging-path query))
partition-data ([:deps :last]
(partition-data fields scheme partition-fields staging-path out-path error-sink))
)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment