Skip to content

Instantly share code, notes, and snippets.

@ceteri
Last active October 6, 2015 19:47
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ceteri/3043745 to your computer and use it in GitHub Desktop.
Save ceteri/3043745 to your computer and use it in GitHub Desktop.
Cascading for the Impatient, Part 4
public class
Main
{
public static void
main( String[] args )
{
String docPath = args[ 0 ];
String wcPath = args[ 1 ];
String stopPath = args[ 2 ];
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, Main.class );
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
// create source and sink taps
Tap docTap = new Hfs( new TextDelimited( true, "\t" ), docPath );
Tap wcTap = new Hfs( new TextDelimited( true, "\t" ), wcPath );
Fields stop = new Fields( "stop" );
Tap stopTap = new Hfs( new TextDelimited( stop, true, "\t" ), stopPath );
// specify a regex operation to split the "document" text lines into a token stream
Fields token = new Fields( "token" );
Fields text = new Fields( "text" );
RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );
Fields fieldSelector = new Fields( "doc_id", "token" );
Pipe docPipe = new Each( "token", text, splitter, fieldSelector );
// define "ScrubFunction" to clean up the token stream
Fields scrubArguments = new Fields( "doc_id", "token" );
docPipe = new Each( docPipe, scrubArguments, new ScrubFunction( scrubArguments ), Fields.RESULTS );
// perform a left join to remove stop words, discarding the rows
// which joined with stop words, i.e., were non-null after left join
Pipe stopPipe = new Pipe( "stop" );
Pipe tokenPipe = new HashJoin( docPipe, token, stopPipe, stop, new LeftJoin() );
tokenPipe = new Each( tokenPipe, stop, new RegexFilter( "^$" ) );
// determine the word counts
Pipe wcPipe = new Pipe( "wc", tokenPipe );
wcPipe = new Retain( wcPipe, token );
wcPipe = new GroupBy( wcPipe, token );
wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );
// connect the taps, pipes, etc., into a flow
FlowDef flowDef = FlowDef.flowDef()
.setName( "wc" )
.addSource( docPipe, docTap )
.addSource( stopPipe, stopTap )
.addTailSink( wcPipe, wcTap );
// write a DOT file and run the flow
Flow wcFlow = flowConnector.connect( flowDef );
wcFlow.writeDOT( "dot/wc.dot" );
wcFlow.complete();
}
}
stop
a
about
after
all
along
an
and
any
are
around
as
asked
at
away
back
be
been
before
between
both
but
by
can
could
did
do
even
few
for
from
get
got
had
hand
has
have
he
he
her
here
high
him
his
how
i
if
in
into
is
it
its
just
large
like
long
man
many
more
most
much
my
near
new
next
no
not
now
of
off
on
one
or
other
our
out
over
right
said
see
she
side
small
so
some
than
that
the
their
them
then
there
these
they
this
those
through
time
to
too
two
up
us
used
was
way
we
were
what
when
where
which
while
who
will
with
within
would
you
your
bash-3.2$ rm -rf derby.log metastore_db/
bash-3.2$ hive -hiveconf hive.metastore.warehouse.dir=/tmp/metadb < src/scripts/wc.q
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.
Logging initialized using configuration in jar:file:/Users/ceteri/opt/hive-0.9.0-bin/lib/hive-common-0.9.0.jar!/hive-log4j.properties
Hive history file=/tmp/ceteri/hive_job_log_ceteri_201212231521_680816595.txt
2012-12-23 15:21:11.165 java[7881:1903] Unable to load realm info from SCDynamicStore
hive> -- prepare DDL for loading the raw data
>
> CREATE TABLE raw_docs (
> doc_id STRING,
> text STRING
> )
> ROW FORMAT DELIMITED
> FIELDS TERMINATED BY '\t'
> STORED AS TEXTFILE
> ;
OK
Time taken: 3.619 seconds
hive>
> CREATE TABLE raw_stop (
> stop STRING
> )
> ROW FORMAT DELIMITED
> FIELDS TERMINATED BY '\t'
> STORED AS TEXTFILE
> ;
OK
Time taken: 0.025 seconds
hive>
> -- load the raw data
>
> LOAD DATA
> LOCAL INPATH 'data/rain.txt'
> OVERWRITE INTO TABLE raw_docs
> ;
Copying data from file:/Users/ceteri/src/concur/Impatient/part4/data/rain.txt
Copying file: file:/Users/ceteri/src/concur/Impatient/part4/data/rain.txt
Loading data to table default.raw_docs
Deleted file:/tmp/metadb/raw_docs
OK
Time taken: 0.204 seconds
hive>
> LOAD DATA
> LOCAL INPATH 'data/en.stop'
> OVERWRITE INTO TABLE raw_stop
> ;
Copying data from file:/Users/ceteri/src/concur/Impatient/part4/data/en.stop
Copying file: file:/Users/ceteri/src/concur/Impatient/part4/data/en.stop
Loading data to table default.raw_stop
Deleted file:/tmp/metadb/raw_stop
OK
Time taken: 0.075 seconds
hive>
> -- additional steps to remove headers, yay
>
> CREATE TABLE docs (
> doc_id STRING,
> text STRING
> )
> ;
OK
Time taken: 0.024 seconds
hive>
> INSERT OVERWRITE TABLE docs
> SELECT
> *
> FROM raw_docs
> WHERE doc_id <> 'doc_id'
> ;
Total MapReduce jobs = 2
Launching Job 1 out of 2
Number of reduce tasks is set to 0 since there's no reduce operator
12/12/23 15:21:16 WARN conf.HiveConf: DEPRECATED: Ignoring hive-default.xml found on the CLASSPATH at /Users/ceteri/opt/hive-0.9.0-bin/conf/hive-default.xml
12/12/23 15:21:16 WARN conf.HiveConf: hive-site.xml not found on CLASSPATH
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.
Execution log at: /tmp/ceteri/ceteri_20121223152121_120b279e-0911-4dcc-9d7b-fc9d76ed0562.log
2012-12-23 15:21:16.918 java[7939:1903] Unable to load realm info from SCDynamicStore
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 0; number of reducers: 0
2012-12-23 15:21:19,333 null map = 0%, reduce = 0%
2012-12-23 15:21:22,338 null map = 100%, reduce = 0%
Ended Job = job_local_0001
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Ended Job = 1640864005, job is filtered out (removed at runtime).
Moving data to: file:/tmp/hive-ceteri/hive_2012-12-23_15-21-15_366_7720940276006194670/-ext-10000
Loading data to table default.docs
Deleted file:/tmp/metadb/docs
Table default.docs stats: [num_partitions: 0, num_files: 1, num_rows: 5, total_size: 498, raw_data_size: 493]
OK
Time taken: 7.393 seconds
hive>
> CREATE TABLE stop (
> stop STRING
> )
> ;
OK
Time taken: 0.019 seconds
hive>
> INSERT OVERWRITE TABLE stop
> SELECT
> *
> FROM raw_stop
> WHERE stop <> 'stop'
> ;
Total MapReduce jobs = 2
Launching Job 1 out of 2
Number of reduce tasks is set to 0 since there's no reduce operator
12/12/23 15:21:23 WARN conf.HiveConf: DEPRECATED: Ignoring hive-default.xml found on the CLASSPATH at /Users/ceteri/opt/hive-0.9.0-bin/conf/hive-default.xml
12/12/23 15:21:23 WARN conf.HiveConf: hive-site.xml not found on CLASSPATH
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.
Execution log at: /tmp/ceteri/ceteri_20121223152121_f1511b39-cffa-4197-8eb4-cdb017ea797e.log
2012-12-23 15:21:24.070 java[7966:1903] Unable to load realm info from SCDynamicStore
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 0; number of reducers: 0
2012-12-23 15:21:26,961 null map = 0%, reduce = 0%
2012-12-23 15:21:29,966 null map = 100%, reduce = 0%
Ended Job = job_local_0001
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Ended Job = 1551628365, job is filtered out (removed at runtime).
Moving data to: file:/tmp/hive-ceteri/hive_2012-12-23_15-21-22_781_7709385169981999922/-ext-10000
Loading data to table default.stop
Deleted file:/tmp/metadb/stop
Table default.stop stats: [num_partitions: 0, num_files: 1, num_rows: 119, total_size: 539, raw_data_size: 420]
OK
Time taken: 7.571 seconds
hive>
> -- tokenize using external Python script
>
> CREATE TABLE tokens (
> token STRING
> )
> ;
OK
Time taken: 0.026 seconds
hive>
> INSERT OVERWRITE TABLE tokens
> SELECT
> TRANSFORM(text) USING 'python ./src/scripts/tokenizer.py' AS token
> FROM docs
> ;
Total MapReduce jobs = 2
Launching Job 1 out of 2
Number of reduce tasks is set to 0 since there's no reduce operator
12/12/23 15:21:31 WARN conf.HiveConf: DEPRECATED: Ignoring hive-default.xml found on the CLASSPATH at /Users/ceteri/opt/hive-0.9.0-bin/conf/hive-default.xml
12/12/23 15:21:31 WARN conf.HiveConf: hive-site.xml not found on CLASSPATH
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.
Execution log at: /tmp/ceteri/ceteri_20121223152121_43bba36d-b43f-4098-87f4-e2388633b086.log
2012-12-23 15:21:31.946 java[7994:1903] Unable to load realm info from SCDynamicStore
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 0; number of reducers: 0
2012-12-23 15:21:34,666 null map = 0%, reduce = 0%
2012-12-23 15:21:37,670 null map = 100%, reduce = 0%
Ended Job = job_local_0001
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Ended Job = -2104034200, job is filtered out (removed at runtime).
Moving data to: file:/tmp/hive-ceteri/hive_2012-12-23_15-21-30_385_6822997415441284398/-ext-10000
Loading data to table default.tokens
Deleted file:/tmp/metadb/tokens
Table default.tokens stats: [num_partitions: 0, num_files: 1, num_rows: 89, total_size: 454, raw_data_size: 365]
OK
Time taken: 7.626 seconds
hive>
> -- filter with a left join, then count
>
> SELECT token, COUNT(*) AS count
> FROM (
> SELECT
> *
> FROM tokens LEFT OUTER JOIN stop
> ON (tokens.token = stop.stop)
> WHERE stop IS NULL
> ) t
> GROUP BY token
> ;
Total MapReduce jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapred.reduce.tasks=<number>
12/12/23 15:21:39 WARN conf.HiveConf: DEPRECATED: Ignoring hive-default.xml found on the CLASSPATH at /Users/ceteri/opt/hive-0.9.0-bin/conf/hive-default.xml
12/12/23 15:21:39 WARN conf.HiveConf: hive-site.xml not found on CLASSPATH
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.
Execution log at: /tmp/ceteri/ceteri_20121223152121_b14dcc61-a51b-4a18-b35e-6faf75706b82.log
2012-12-23 15:21:39.618 java[8022:1903] Unable to load realm info from SCDynamicStore
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 0; number of reducers: 0
2012-12-23 15:21:41,868 null map = 0%, reduce = 0%
2012-12-23 15:21:44,872 null map = 100%, reduce = 0%
2012-12-23 15:21:50,880 null map = 100%, reduce = 100%
Ended Job = job_local_0001
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 2 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapred.reduce.tasks=<number>
12/12/23 15:21:52 WARN conf.HiveConf: DEPRECATED: Ignoring hive-default.xml found on the CLASSPATH at /Users/ceteri/opt/hive-0.9.0-bin/conf/hive-default.xml
12/12/23 15:21:52 WARN conf.HiveConf: hive-site.xml not found on CLASSPATH
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.
Execution log at: /tmp/ceteri/ceteri_20121223152121_b14dcc61-a51b-4a18-b35e-6faf75706b82.log
2012-12-23 15:21:52.315 java[8049:1903] Unable to load realm info from SCDynamicStore
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 0; number of reducers: 0
2012-12-23 15:21:54,126 null map = 0%, reduce = 0%
2012-12-23 15:21:57,131 null map = 100%, reduce = 0%
2012-12-23 15:22:00,135 null map = 100%, reduce = 100%
Ended Job = job_local_0001
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
air 1
area 4
australia 1
broken 1
california's 1
cause 1
cloudcover 1
death 1
deserts 1
downwind 1
dry 3
dvd 1
effect 1
known 1
land 2
lee 2
leeward 2
less 1
lies 1
mountain 3
mountainous 1
primary 1
produces 1
rain 5
ranges 1
secrets 1
shadow 4
sinking 1
such 1
valley 1
women 1
Time taken: 22.384 seconds
hive> bash-3.2$
bash-3.2$ ls
LICENSE.txt README.md build.gradle data src
bash-3.2$ hadoop version
Warning: $HADOOP_HOME is deprecated.
Hadoop 1.0.3
Subversion https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0 -r 1335192
Compiled by hortonfo on Tue May 8 20:31:25 UTC 2012
From source with checksum e6b0c1e23dcf76907c5fecb4b832f3be
bash-3.2$ gradle -version
------------------------------------------------------------
Gradle 1.0
------------------------------------------------------------
Gradle build time: Tuesday, June 12, 2012 12:56:21 AM UTC
Groovy: 1.8.6
Ant: Apache Ant(TM) version 1.8.2 compiled on December 20 2010
Ivy: 2.2.0
JVM: 1.6.0_33 (Apple Inc. 20.8-b03-424)
OS: Mac OS X 10.7.4 x86_64
bash-3.2$ gradle clean jar
:clean UP-TO-DATE
:compileJava
:processResources UP-TO-DATE
:classes
:jar
BUILD SUCCESSFUL
Total time: 7.836 secs
bash-3.2$ hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc data/en.stop
Warning: $HADOOP_HOME is deprecated.
12/07/23 13:11:39 INFO util.HadoopUtil: resolving application jar from found main method on: impatient.Main
12/07/23 13:11:39 INFO planner.HadoopPlanner: using application jar: /Users/ceteri/src/concur/Impatient/part4/./build/libs/impatient.jar
12/07/23 13:11:39 INFO property.AppProps: using app.id: D22F09ABBCAB0AE1A6D24FFF0F6C64E3
2012-07-23 13:11:39.978 java[3209:1903] Unable to load realm info from SCDynamicStore
12/07/23 13:11:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12/07/23 13:11:40 WARN snappy.LoadSnappy: Snappy native library not loaded
12/07/23 13:11:40 INFO mapred.FileInputFormat: Total input paths to process : 1
12/07/23 13:11:40 INFO util.Version: Concurrent, Inc - Cascading 2.0.1
12/07/23 13:11:40 INFO flow.Flow: [wc] starting
12/07/23 13:11:40 INFO flow.Flow: [wc] source: Hfs["TextDelimited[['stop']]"]["data/en.stop"]"]
12/07/23 13:11:40 INFO flow.Flow: [wc] source: Hfs["TextDelimited[['doc_id', 'text']->[ALL]]"]["data/rain.txt"]"]
12/07/23 13:11:40 INFO flow.Flow: [wc] sink: Hfs["TextDelimited[[UNKNOWN]->['token', 'count']]"]["output/wc"]"]
12/07/23 13:11:40 INFO flow.Flow: [wc] parallel execution is enabled: false
12/07/23 13:11:40 INFO flow.Flow: [wc] starting jobs: 1
12/07/23 13:11:40 INFO flow.Flow: [wc] allocating threads: 1
12/07/23 13:11:40 INFO flow.FlowStep: [wc] starting step: (1/1) output/wc
12/07/23 13:11:40 INFO mapred.FileInputFormat: Total input paths to process : 1
12/07/23 13:11:40 INFO flow.FlowStep: [wc] submitted hadoop job: job_local_0001
12/07/23 13:11:40 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/07/23 13:11:40 INFO io.MultiInputSplit: current split input path: file:/Users/ceteri/src/concur/Impatient/part4/data/rain.txt
12/07/23 13:11:40 INFO mapred.MapTask: numReduceTasks: 1
12/07/23 13:11:40 INFO mapred.MapTask: io.sort.mb = 100
12/07/23 13:11:40 INFO mapred.MapTask: data buffer = 79691776/99614720
12/07/23 13:11:40 INFO mapred.MapTask: record buffer = 262144/327680
12/07/23 13:11:40 INFO hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['doc_id', 'text']->[ALL]]"]["data/rain.txt"]"]
12/07/23 13:11:40 INFO hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['stop']]"]["data/en.stop"]"]
12/07/23 13:11:40 INFO hadoop.FlowMapper: sinking to: GroupBy(wc)[by:[{1}:'token']]
12/07/23 13:11:40 INFO collect.SpillableTupleList: attempting to load codec: org.apache.hadoop.io.compress.GzipCodec
12/07/23 13:11:40 INFO collect.SpillableTupleList: found codec: org.apache.hadoop.io.compress.GzipCodec
12/07/23 13:11:40 INFO mapred.FileInputFormat: Total input paths to process : 1
12/07/23 13:11:40 INFO collect.SpillableTupleList: attempting to load codec: org.apache.hadoop.io.compress.GzipCodec
12/07/23 13:11:40 INFO collect.SpillableTupleList: found codec: org.apache.hadoop.io.compress.GzipCodec
12/07/23 13:11:40 INFO mapred.MapTask: Starting flush of map output
12/07/23 13:11:40 INFO mapred.MapTask: Finished spill 0
12/07/23 13:11:40 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
12/07/23 13:11:43 INFO mapred.LocalJobRunner: file:/Users/ceteri/src/concur/Impatient/part4/data/rain.txt:0+510
12/07/23 13:11:43 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
12/07/23 13:11:43 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/07/23 13:11:43 INFO mapred.LocalJobRunner:
12/07/23 13:11:43 INFO mapred.Merger: Merging 1 sorted segments
12/07/23 13:11:43 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 751 bytes
12/07/23 13:11:43 INFO mapred.LocalJobRunner:
12/07/23 13:11:43 INFO hadoop.FlowReducer: sourcing from: GroupBy(wc)[by:[{1}:'token']]
12/07/23 13:11:43 INFO hadoop.FlowReducer: sinking to: Hfs["TextDelimited[[UNKNOWN]->['token', 'count']]"]["output/wc"]"]
12/07/23 13:11:43 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
12/07/23 13:11:43 INFO mapred.LocalJobRunner:
12/07/23 13:11:43 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
12/07/23 13:11:43 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to file:/Users/ceteri/src/concur/Impatient/part4/output/wc
12/07/23 13:11:46 INFO mapred.LocalJobRunner: reduce > reduce
12/07/23 13:11:46 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
12/07/23 13:11:50 INFO util.Hadoop18TapUtil: deleting temp path output/wc/_temporary
bash-3.2$ more output/wc/part-00000
token count
air 1
area 4
australia 1
broken 1
california's 1
cause 1
cloudcover 1
death 1
deserts 1
downwind 1
dry 3
dvd 1
effect 1
known 1
land 2
lee 2
leeward 2
less 1
lies 1
mountain 3
mountainous 1
primary 1
produces 1
rain 5
ranges 1
secrets 1
shadow 4
sinking 1
such 1
valley 1
women 1
bash-3.2$
bash-3.2$ rm -rf output
bash-3.2$ mkdir -p dot
bash-3.2$ pig -version
Warning: $HADOOP_HOME is deprecated.
Apache Pig version 0.10.0 (r1328203)
compiled Apr 19 2012, 22:54:12
bash-3.2$ pig -p docPath=./data/rain.txt -p wcPath=./output/wc -p stopPath=./data/en.stop ./src/scripts/wc.pig
Warning: $HADOOP_HOME is deprecated.
2012-12-22 10:41:33,271 [main] INFO org.apache.pig.Main - Apache Pig version 0.10.0 (r1328203) compiled Apr 19 2012, 22:54:12
2012-12-22 10:41:33,272 [main] INFO org.apache.pig.Main - Logging error messages to: /Users/ceteri/src/concur/Impatient/part4/pig_1356201693269.log
2012-12-22 10:41:33.371 java[2020:1903] Unable to load realm info from SCDynamicStore
2012-12-22 10:41:33,579 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
2012-12-22 10:41:34,152 [main] WARN org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_CHARARRAY 2 time(s).
2012-12-22 10:41:34,152 [main] WARN org.apache.pig.PigServer - Encountered Warning USING_OVERLOADED_FUNCTION 1 time(s).
2012-12-22 10:41:34,300 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
2012-12-22 10:41:34,309 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer - Choosing to move algebraic foreach to combiner
2012-12-22 10:41:34,325 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 2
2012-12-22 10:41:34,325 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 2
2012-12-22 10:41:34,361 [main] WARN org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_CHARARRAY 2 time(s).
2012-12-22 10:41:34,362 [main] WARN org.apache.pig.PigServer - Encountered Warning USING_OVERLOADED_FUNCTION 1 time(s).
2012-12-22 10:41:34,364 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: HASH_JOIN,GROUP_BY,FILTER
2012-12-22 10:41:34,396 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
2012-12-22 10:41:34,399 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer - Choosing to move algebraic foreach to combiner
2012-12-22 10:41:34,402 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 2
2012-12-22 10:41:34,402 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 2
2012-12-22 10:41:34,417 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job
2012-12-22 10:41:34,428 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
2012-12-22 10:41:34,431 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - creating jar file Job6009769361318502147.jar
2012-12-22 10:41:38,205 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - jar file Job6009769361318502147.jar created
2012-12-22 10:41:38,215 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
2012-12-22 10:41:38,221 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - BytesPerReducer=1000000000 maxReducers=999 totalInputFileSize=1054
2012-12-22 10:41:38,221 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to 1
2012-12-22 10:41:38,274 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
2012-12-22 10:41:38,282 [Thread-6] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2012-12-22 10:41:38,385 [Thread-6] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2012-12-22 10:41:38,385 [Thread-6] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
2012-12-22 10:41:38,390 [Thread-6] WARN org.apache.hadoop.io.compress.snappy.LoadSnappy - Snappy native library not loaded
2012-12-22 10:41:38,392 [Thread-6] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
2012-12-22 10:41:38,398 [Thread-6] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2012-12-22 10:41:38,398 [Thread-6] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
2012-12-22 10:41:38,398 [Thread-6] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
2012-12-22 10:41:38,583 [Thread-7] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : null
2012-12-22 10:41:38,595 [Thread-7] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader - Current split being processed file:/Users/ceteri/src/concur/Impatient/part4/data/en.stop:0+544
2012-12-22 10:41:38,599 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - io.sort.mb = 100
2012-12-22 10:41:38,688 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - data buffer = 79691776/99614720
2012-12-22 10:41:38,690 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - record buffer = 262144/327680
2012-12-22 10:41:38,730 [Thread-7] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader - Created input record counter: Input records from _1_en.stop
2012-12-22 10:41:38,744 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - Starting flush of map output
2012-12-22 10:41:38,752 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - Finished spill 0
2012-12-22 10:41:38,753 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2012-12-22 10:41:38,775 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_local_0001
2012-12-22 10:41:38,776 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
2012-12-22 10:41:41,568 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-12-22 10:41:41,568 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task 'attempt_local_0001_m_000000_0' done.
2012-12-22 10:41:41,572 [Thread-7] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : null
2012-12-22 10:41:41,576 [Thread-7] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader - Current split being processed file:/Users/ceteri/src/concur/Impatient/part4/data/rain.txt:0+510
2012-12-22 10:41:41,576 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - io.sort.mb = 100
2012-12-22 10:41:41,654 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - data buffer = 79691776/99614720
2012-12-22 10:41:41,655 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - record buffer = 262144/327680
2012-12-22 10:41:41,679 [Thread-7] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader - Created input record counter: Input records from _0_rain.txt
2012-12-22 10:41:41,691 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - Starting flush of map output
2012-12-22 10:41:41,694 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - Finished spill 0
2012-12-22 10:41:41,698 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
2012-12-22 10:41:44,571 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-12-22 10:41:44,572 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task 'attempt_local_0001_m_000001_0' done.
2012-12-22 10:41:44,583 [Thread-7] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : null
2012-12-22 10:41:44,583 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-12-22 10:41:44,588 [Thread-7] INFO org.apache.hadoop.mapred.Merger - Merging 2 sorted segments
2012-12-22 10:41:44,596 [Thread-7] INFO org.apache.hadoop.mapred.Merger - Down to the last merge-pass, with 2 segments left of total size: 3284 bytes
2012-12-22 10:41:44,596 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-12-22 10:41:44,635 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2012-12-22 10:41:44,636 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-12-22 10:41:44,636 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task attempt_local_0001_r_000000_0 is allowed to commit now
2012-12-22 10:41:44,639 [Thread-7] INFO org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - Saved output of task 'attempt_local_0001_r_000000_0' to file:/tmp/temp1482927600/tmp-1364727125
2012-12-22 10:41:47,587 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner - reduce > reduce
2012-12-22 10:41:47,587 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task 'attempt_local_0001_r_000000_0' done.
2012-12-22 10:41:47,589 [Thread-7] WARN org.apache.hadoop.mapred.FileOutputCommitter - Output path is null in cleanup
2012-12-22 10:41:48,796 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 50% complete
2012-12-22 10:41:48,799 [main] WARN org.apache.pig.tools.pigstats.PigStatsUtil - Failed to get RunningJob for job job_local_0001
2012-12-22 10:41:48,800 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job
2012-12-22 10:41:48,801 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
2012-12-22 10:41:48,801 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - creating jar file Job8611044961811192709.jar
2012-12-22 10:41:52,423 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - jar file Job8611044961811192709.jar created
2012-12-22 10:41:52,428 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
2012-12-22 10:41:52,433 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - BytesPerReducer=1000000000 maxReducers=999 totalInputFileSize=1037
2012-12-22 10:41:52,433 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to 1
2012-12-22 10:41:52,445 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
2012-12-22 10:41:52,505 [Thread-11] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2012-12-22 10:41:52,505 [Thread-11] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
2012-12-22 10:41:52,505 [Thread-11] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
2012-12-22 10:41:52,588 [Thread-12] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : null
2012-12-22 10:41:52,592 [Thread-12] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader - Current split being processed file:/tmp/temp1482927600/tmp-1364727125/part-r-00000:0+1037
2012-12-22 10:41:52,593 [Thread-12] INFO org.apache.hadoop.mapred.MapTask - io.sort.mb = 100
2012-12-22 10:41:52,612 [Thread-12] INFO org.apache.hadoop.mapred.MapTask - data buffer = 79691776/99614720
2012-12-22 10:41:52,612 [Thread-12] INFO org.apache.hadoop.mapred.MapTask - record buffer = 262144/327680
2012-12-22 10:41:52,638 [Thread-12] INFO org.apache.hadoop.mapred.MapTask - Starting flush of map output
2012-12-22 10:41:52,653 [Thread-12] INFO org.apache.hadoop.mapred.MapTask - Finished spill 0
2012-12-22 10:41:52,655 [Thread-12] INFO org.apache.hadoop.mapred.Task - Task:attempt_local_0002_m_000000_0 is done. And is in the process of commiting
2012-12-22 10:41:52,946 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_local_0002
2012-12-22 10:41:55,586 [Thread-12] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-12-22 10:41:55,586 [Thread-12] INFO org.apache.hadoop.mapred.Task - Task 'attempt_local_0002_m_000000_0' done.
2012-12-22 10:41:55,594 [Thread-12] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : null
2012-12-22 10:41:55,594 [Thread-12] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-12-22 10:41:55,594 [Thread-12] INFO org.apache.hadoop.mapred.Merger - Merging 1 sorted segments
2012-12-22 10:41:55,595 [Thread-12] INFO org.apache.hadoop.mapred.Merger - Down to the last merge-pass, with 1 segments left of total size: 809 bytes
2012-12-22 10:41:55,595 [Thread-12] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-12-22 10:41:55,608 [Thread-12] INFO org.apache.hadoop.mapred.Task - Task:attempt_local_0002_r_000000_0 is done. And is in the process of commiting
2012-12-22 10:41:55,610 [Thread-12] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-12-22 10:41:55,610 [Thread-12] INFO org.apache.hadoop.mapred.Task - Task attempt_local_0002_r_000000_0 is allowed to commit now
2012-12-22 10:41:55,613 [Thread-12] INFO org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - Saved output of task 'attempt_local_0002_r_000000_0' to file:/Users/ceteri/src/concur/Impatient/part4/output/wc
2012-12-22 10:41:58,590 [Thread-12] INFO org.apache.hadoop.mapred.LocalJobRunner - reduce > reduce
2012-12-22 10:41:58,591 [Thread-12] INFO org.apache.hadoop.mapred.Task - Task 'attempt_local_0002_r_000000_0' done.
2012-12-22 10:41:58,592 [Thread-12] WARN org.apache.hadoop.mapred.FileOutputCommitter - Output path is null in cleanup
2012-12-22 10:42:02,969 [main] WARN org.apache.pig.tools.pigstats.PigStatsUtil - Failed to get RunningJob for job job_local_0002
2012-12-22 10:42:02,971 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2012-12-22 10:42:02,973 [main] INFO org.apache.pig.tools.pigstats.SimplePigStats - Script Statistics:
HadoopVersion PigVersion UserId StartedAt FinishedAt Features
1.0.3 0.10.0 ceteri 2012-12-22 10:41:34 2012-12-22 10:42:02 HASH_JOIN,GROUP_BY,FILTER
Success!
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs
job_local_0001 1 1 n/a n/a n/a n/a n/a n/a docPipe,stopPipe,tokenPipe HASH_JOIN
job_local_0002 1 1 n/a n/a n/a n/a n/a n/a tokenGroups,wcPipe GROUP_BY,COMBINER file:///Users/ceteri/src/concur/Impatient/part4/output/wc,
Input(s):
Successfully read 0 records from: "file:///Users/ceteri/src/concur/Impatient/part4/data/rain.txt"
Successfully read 0 records from: "file:///Users/ceteri/src/concur/Impatient/part4/data/en.stop"
Output(s):
Successfully stored 0 records in: "file:///Users/ceteri/src/concur/Impatient/part4/output/wc"
Counters:
Total records written : 0
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0
Job DAG:
job_local_0001 -> job_local_0002,
job_local_0002
2012-12-22 10:42:02,973 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
bash-3.2$ cat output/wc/part-r-00000
air 1
dry 3
dvd 1
lee 2
area 4
land 2
less 1
lies 1
rain 5
such 1
cause 1
death 1
known 1
women 1
broken 1
effect 1
ranges 1
shadow 4
valley 1
deserts 1
leeward 2
primary 1
secrets 1
sinking 1
downwind 1
mountain 3
produces 1
australia 1
cloudcover 1
mountainous 1
california's 1
bash-3.2$
docPipe = LOAD '$docPath' USING PigStorage('\t', 'tagsource') AS (doc_id, text);
docPipe = FILTER docPipe BY doc_id != 'doc_id';
stopPipe = LOAD '$stopPath' USING PigStorage('\t', 'tagsource') AS (stop:chararray);
stopPipe = FILTER stopPipe BY stop != 'stop';
-- specify a regex operation to split the "document" text lines into a token stream
tokenPipe = FOREACH docPipe GENERATE doc_id, FLATTEN(TOKENIZE(LOWER(text), ' [](),.')) AS token;
tokenPipe = FILTER tokenPipe BY token MATCHES '\\w.*';
-- perform a left join to remove stop words, discarding the rows
-- which joined with stop words, i.e., were non-null after left join
tokenPipe = JOIN tokenPipe BY token LEFT, stopPipe BY stop;
tokenPipe = FILTER tokenPipe BY stopPipe::stop IS NULL;
-- determine the word counts
tokenGroups = GROUP tokenPipe BY token;
wcPipe = FOREACH tokenGroups GENERATE group AS token, COUNT(tokenPipe) AS count;
-- output
STORE wcPipe INTO '$wcPath' using PigStorage('\t', 'tagsource');
EXPLAIN -out dot/wc_pig.dot -dot wcPipe;
-- prepare DDL for loading the raw data
CREATE TABLE raw_docs (
doc_id STRING,
text STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
;
CREATE TABLE raw_stop (
stop STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
;
-- load the raw data
LOAD DATA
LOCAL INPATH 'data/rain.txt'
OVERWRITE INTO TABLE raw_docs
;
LOAD DATA
LOCAL INPATH 'data/en.stop'
OVERWRITE INTO TABLE raw_stop
;
-- additional steps to remove headers, yay
CREATE TABLE docs (
doc_id STRING,
text STRING
)
;
INSERT OVERWRITE TABLE docs
SELECT
*
FROM raw_docs
WHERE doc_id <> 'doc_id'
;
CREATE TABLE stop (
stop STRING
)
;
INSERT OVERWRITE TABLE stop
SELECT
*
FROM raw_stop
WHERE stop <> 'stop'
;
-- tokenize using external Python script
CREATE TABLE tokens (
token STRING
)
;
INSERT OVERWRITE TABLE tokens
SELECT
TRANSFORM(text) USING 'python ./src/scripts/tokenizer.py' AS token
FROM docs
;
-- filter with a left join, then count
SELECT token, COUNT(*) AS count
FROM (
SELECT
*
FROM tokens LEFT OUTER JOIN stop
ON (tokens.token = stop.stop)
WHERE stop IS NULL
) t
GROUP BY token
;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment