Skip to content

Instantly share code, notes, and snippets.

@ceteri
Created July 3, 2012 23:05
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ceteri/3044049 to your computer and use it in GitHub Desktop.
Save ceteri/3044049 to your computer and use it in GitHub Desktop.
Cascading for the Impatient, Part 6
public class
Main
{
public static void
main( String[] args )
{
String docPath = args[ 0 ];
String wcPath = args[ 1 ];
String stopPath = args[ 2 ];
String tfidfPath = args[ 3 ];
String trapPath = args[ 4 ];
String checkPath = args[ 5 ];
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, Main.class );
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
// create source and sink taps
Tap docTap = new Hfs( new TextDelimited( true, "\t" ), docPath );
Tap wcTap = new Hfs( new TextDelimited( true, "\t" ), wcPath );
Fields stop = new Fields( "stop" );
Tap stopTap = new Hfs( new TextDelimited( stop, true, "\t" ), stopPath );
Tap tfidfTap = new Hfs( new TextDelimited( true, "\t" ), tfidfPath );
Tap trapTap = new Hfs( new TextDelimited( true, "\t" ), trapPath );
Tap checkTap = new Hfs( new TextDelimited( true, "\t" ), checkPath );
// use a stream assertion to validate the input data
Pipe docPipe = new Pipe( "token" );
AssertMatches assertMatches = new AssertMatches( "doc\\d+\\s.*" );
docPipe = new Each( docPipe, AssertionLevel.STRICT, assertMatches );
// specify a regex operation to split the "document" text lines into a token stream
Fields token = new Fields( "token" );
Fields text = new Fields( "text" );
RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );
Fields fieldSelector = new Fields( "doc_id", "token" );
docPipe = new Each( docPipe, text, splitter, fieldSelector );
// define "ScrubFunction" to clean up the token stream
Fields scrubArguments = new Fields( "doc_id", "token" );
docPipe = new Each( docPipe, scrubArguments, new ScrubFunction( scrubArguments ), Fields.RESULTS );
// perform a left join to remove stop words, discarding the rows
// which joined with stop words, i.e., were non-null after left join
Pipe stopPipe = new Pipe( "stop" );
Pipe tokenPipe = new HashJoin( docPipe, token, stopPipe, stop, new LeftJoin() );
tokenPipe = new Each( tokenPipe, stop, new RegexFilter( "^$" ) );
tokenPipe = new Retain( tokenPipe, fieldSelector );
// one branch of the flow tallies the token counts for term frequency (TF)
Pipe tfPipe = new Pipe( "TF", tokenPipe );
Fields tf_count = new Fields( "tf_count" );
tfPipe = new CountBy( tfPipe, new Fields( "doc_id", "token" ), tf_count );
Fields tf_token = new Fields( "tf_token" );
tfPipe = new Rename( tfPipe, token, tf_token );
// one branch counts the number of documents (D)
Fields doc_id = new Fields( "doc_id" );
Fields tally = new Fields( "tally" );
Fields rhs_join = new Fields( "rhs_join" );
Fields n_docs = new Fields( "n_docs" );
Pipe dPipe = new Unique( "D", tokenPipe, doc_id );
dPipe = new Each( dPipe, new Insert( tally, 1 ), Fields.ALL );
dPipe = new Each( dPipe, new Insert( rhs_join, 1 ), Fields.ALL );
dPipe = new SumBy( dPipe, rhs_join, tally, n_docs, long.class );
// one branch tallies the token counts for document frequency (DF)
Pipe dfPipe = new Unique( "DF", tokenPipe, Fields.ALL );
Fields df_count = new Fields( "df_count" );
dfPipe = new CountBy( dfPipe, token, df_count );
Fields df_token = new Fields( "df_token" );
Fields lhs_join = new Fields( "lhs_join" );
dfPipe = new Rename( dfPipe, token, df_token );
dfPipe = new Each( dfPipe, new Insert( lhs_join, 1 ), Fields.ALL );
// example use of a debug, to observe tuple stream; turn off below
dfPipe = new Each( dfPipe, DebugLevel.VERBOSE, new Debug( true ) );
// join to bring together all the components for calculating TF-IDF
// the D side of the join is smaller, so it goes on the RHS
Pipe idfPipe = new HashJoin( dfPipe, lhs_join, dPipe, rhs_join );
// create a checkpoint, to observe the intermediate data in DF stream
Checkpoint idfCheck = new Checkpoint( "checkpoint", idfPipe );
// the IDF side of the join is smaller, so it goes on the RHS
Pipe tfidfPipe = new CoGroup( tfPipe, tf_token, idfCheck, df_token );
// calculate the TF-IDF weights, per token, per document
Fields tfidf = new Fields( "tfidf" );
String expression = "(double) tf_count * Math.log( (double) n_docs / ( 1.0 + df_count ) )";
ExpressionFunction tfidfExpression = new ExpressionFunction( tfidf, expression, Double.class );
Fields tfidfArguments = new Fields( "tf_count", "df_count", "n_docs" );
tfidfPipe = new Each( tfidfPipe, tfidfArguments, tfidfExpression, Fields.ALL );
fieldSelector = new Fields( "tf_token", "doc_id", "tfidf" );
tfidfPipe = new Retain( tfidfPipe, fieldSelector );
tfidfPipe = new Rename( tfidfPipe, tf_token, token );
// keep track of the word counts, which are useful for QA
Pipe wcPipe = new Pipe( "wc", tfPipe );
wcPipe = new Retain( wcPipe, tf_token );
Fields count = new Fields( "count" );
wcPipe = new CountBy( wcPipe, tf_token, count );
// additionally, sort by count
wcPipe = new GroupBy( wcPipe, count, count );
// connect the taps, pipes, traps, checkpoints, etc., into a flow
FlowDef flowDef = FlowDef.flowDef()
.setName( "tfidf" )
.addSource( docPipe, docTap )
.addSource( stopPipe, stopTap )
.addTailSink( tfidfPipe, tfidfTap )
.addTailSink( wcPipe, wcTap )
.addTrap( docPipe, trapTap )
.addCheckpoint( idfCheck, checkTap );
// set to DebugLevel.VERBOSE for trace, or DebugLevel.NONE in production
flowDef.setDebugLevel( DebugLevel.VERBOSE );
// set to AssertionLevel.STRICT for all assertions, or AssertionLevel.NONE in production
flowDef.setAssertionLevel( AssertionLevel.STRICT );
// write a DOT file and run the flow
Flow tfidfFlow = flowConnector.connect( flowDef );
tfidfFlow.writeDOT( "dot/tfidf.dot" );
tfidfFlow.complete();
}
}
public class ScrubTest extends CascadingTestCase
{
@Test
public void testScrub()
{
Fields fieldDeclaration = new Fields( "doc_id", "token" );
Function scrub = new ScrubFunction( fieldDeclaration );
Tuple[] arguments = new Tuple[]{
new Tuple( "doc_1", "FoO" ),
new Tuple( "doc_1", " BAR " ),
new Tuple( "doc_1", " " ) // will be scrubed
};
ArrayList<Tuple> expectResults = new ArrayList<Tuple>();
expectResults.add( new Tuple( "doc_1", "foo" ) );
expectResults.add( new Tuple( "doc_1", "bar" ) );
TupleListCollector collector = invokeFunction( scrub, arguments, Fields.ALL );
Iterator<Tuple> it = collector.iterator();
ArrayList<Tuple> results = new ArrayList<Tuple>();
while( it.hasNext() )
results.add( it.next() );
assertEquals( "Scrub result is not expected", expectResults, results );
}
}
import java.text.SimpleDateFormat
apply plugin: 'java'
apply plugin: 'idea'
archivesBaseName = 'impatient'
repositories {
mavenLocal()
mavenCentral()
mavenRepo name: 'conjars', url: 'http://conjars.org/repo/'
}
dependencies {
compile( 'cascading:cascading-core:2.0.1' ) { exclude group: 'log4j' }
compile( 'cascading:cascading-hadoop:2.0.1' ) { transitive = true }
testCompile( 'cascading:cascading-test:2.0.1' )
testCompile( 'org.apache.hadoop:hadoop-test:1.0.3' )
testCompile( 'junit:junit:4.8.+' )
}
jar {
description = "Assembles a Hadoop ready jar file"
doFirst {
into( 'lib' ) {
from configurations.compile
}
}
manifest {
attributes( "Main-Class": "impatient/Main" )
}
}
task distCopy( type: Copy, dependsOn: jar ) {
into "${buildDir}/dist/${archivesBaseName}"
from 'README.md'
from 'build.gradle'
from( 'src' ) {into 'src'}
from( 'data' ) {into 'data'}
from "$buildDir/libs"
}
task dist( type: Tar, dependsOn: distCopy ) {
compression = "GZIP"
classifier = new SimpleDateFormat( "yyyyMMdd" ).format( new Date() )
from "${buildDir}/dist/"
}
test {
include 'impatient/**'
//makes the standard streams (err and out) visible at console when running tests
testLogging.showStandardStreams = true
//listening to test execution events
beforeTest { descriptor ->
logger.lifecycle("Running test: " + descriptor)
}
onOutput { descriptor, event ->
logger.lifecycle("Test: " + descriptor + " produced standard out/err: " + event.message )
}
}
bash-3.2$ ls
LICENSE.txt README.md build.gradle data docs src
bash-3.2$ hadoop version
Warning: $HADOOP_HOME is deprecated.
Hadoop 1.0.3
Subversion https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0 -r 1335192
Compiled by hortonfo on Tue May 8 20:31:25 UTC 2012
From source with checksum e6b0c1e23dcf76907c5fecb4b832f3be
bash-3.2$ gradle -version
------------------------------------------------------------
Gradle 1.0
------------------------------------------------------------
Gradle build time: Tuesday, June 12, 2012 12:56:21 AM UTC
Groovy: 1.8.6
Ant: Apache Ant(TM) version 1.8.2 compiled on December 20 2010
Ivy: 2.2.0
JVM: 1.6.0_33 (Apple Inc. 20.8-b03-424)
OS: Mac OS X 10.7.4 x86_64
bash-3.2$ gradle clean jar
:clean UP-TO-DATE
:compileJava
:processResources UP-TO-DATE
:classes
:jar
BUILD SUCCESSFUL
Total time: 5.249 secs
Warning: $HADOOP_HOME is deprecated.
12/11/23 01:17:29 INFO util.HadoopUtil: resolving application jar from found main method on: impatient.Main
12/11/23 01:17:29 INFO planner.HadoopPlanner: using application jar: /Users/ceteri/src/concur/Impatient/part6/./build/libs/impatient.jar
12/11/23 01:17:29 INFO property.AppProps: using app.id: 983379DAC9E8A56D75B7E7EEFE73A5D6
2012-11-23 01:17:29.158 java[14614:1903] Unable to load realm info from SCDynamicStore
12/11/23 01:17:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12/11/23 01:17:29 WARN snappy.LoadSnappy: Snappy native library not loaded
12/11/23 01:17:29 INFO mapred.FileInputFormat: Total input paths to process : 1
12/11/23 01:17:29 INFO planner.FlowPlanner: found checkpoint: checkpoint, using tap: Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["output/check"]"]
12/11/23 01:17:29 INFO util.Version: Concurrent, Inc - Cascading 2.0.1
12/11/23 01:17:29 INFO flow.Flow: [tfidf] starting
12/11/23 01:17:29 INFO flow.Flow: [tfidf] source: Hfs["TextDelimited[['stop']]"]["data/en.stop"]"]
12/11/23 01:17:29 INFO flow.Flow: [tfidf] source: Hfs["TextDelimited[['doc_id', 'text']->[ALL]]"]["data/rain.txt"]"]
12/11/23 01:17:29 INFO flow.Flow: [tfidf] sink: Hfs["TextDelimited[[UNKNOWN]->['doc_id', 'tfidf', 'token']]"]["output/tfidf"]"]
12/11/23 01:17:29 INFO flow.Flow: [tfidf] sink: Hfs["TextDelimited[[UNKNOWN]->['tf_token', 'count']]"]["output/wc"]"]
12/11/23 01:17:29 INFO flow.Flow: [tfidf] parallel execution is enabled: false
12/11/23 01:17:29 INFO flow.Flow: [tfidf] starting jobs: 10
12/11/23 01:17:29 INFO flow.Flow: [tfidf] allocating threads: 1
12/11/23 01:17:29 INFO flow.FlowStep: [tfidf] starting step: (1/10)
12/11/23 01:17:29 INFO mapred.FileInputFormat: Total input paths to process : 1
12/11/23 01:17:30 INFO flow.FlowStep: [tfidf] submitted hadoop job: job_local_0001
12/11/23 01:17:30 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:17:30 INFO io.MultiInputSplit: current split input path: file:/Users/ceteri/src/concur/Impatient/part6/data/rain.txt
12/11/23 01:17:30 INFO mapred.MapTask: numReduceTasks: 0
12/11/23 01:17:30 INFO hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['doc_id', 'text']->[ALL]]"]["data/rain.txt"]"]
12/11/23 01:17:30 INFO hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['stop']]"]["data/en.stop"]"]
12/11/23 01:17:30 INFO hadoop.FlowMapper: sinking to: TempHfs["SequenceFile[['doc_id', 'token']]"][token_stop/17673/]
12/11/23 01:17:30 INFO hadoop.FlowMapper: trapping to: Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["output/trap"]"]
12/11/23 01:17:30 INFO collect.SpillableTupleList: attempting to load codec: org.apache.hadoop.io.compress.GzipCodec
12/11/23 01:17:30 INFO collect.SpillableTupleList: found codec: org.apache.hadoop.io.compress.GzipCodec
12/11/23 01:17:30 INFO mapred.FileInputFormat: Total input paths to process : 1
12/11/23 01:17:30 INFO util.Hadoop18TapUtil: setting up task: 'attempt_local_0001_m_000000_0' - file:/Users/ceteri/src/concur/Impatient/part6/output/trap/_temporary/_attempt_local_0001_m_000000_0
12/11/23 01:17:30 WARN stream.TrapHandler: exception trap on branch: 'token', for fields: [{2}:'doc_id', 'text'] tuple: ['zoink', 'null']
cascading.operation.AssertionException: argument tuple: ['zoink', 'null'] did not match: doc\d+\s.*
at cascading.operation.assertion.BaseAssertion.throwFail(BaseAssertion.java:107)
at cascading.operation.assertion.AssertMatches.doAssert(AssertMatches.java:84)
at cascading.flow.stream.ValueAssertionEachStage.receive(ValueAssertionEachStage.java:57)
at cascading.flow.stream.ValueAssertionEachStage.receive(ValueAssertionEachStage.java:33)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.run(SourceStage.java:58)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:124)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)
12/11/23 01:17:30 INFO collect.SpillableTupleList: attempting to load codec: org.apache.hadoop.io.compress.GzipCodec
12/11/23 01:17:30 INFO collect.SpillableTupleList: found codec: org.apache.hadoop.io.compress.GzipCodec
12/11/23 01:17:30 INFO io.TapOutputCollector: closing tap collector for: output/trap/part-m-00001-00000
12/11/23 01:17:30 INFO util.Hadoop18TapUtil: committing task: 'attempt_local_0001_m_000000_0' - file:/Users/ceteri/src/concur/Impatient/part6/output/trap/_temporary/_attempt_local_0001_m_000000_0
12/11/23 01:17:30 INFO util.Hadoop18TapUtil: saved output of task 'attempt_local_0001_m_000000_0' to file:/Users/ceteri/src/concur/Impatient/part6/output/trap
12/11/23 01:17:30 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
12/11/23 01:17:30 INFO mapred.LocalJobRunner:
12/11/23 01:17:30 INFO mapred.Task: Task attempt_local_0001_m_000000_0 is allowed to commit now
12/11/23 01:17:30 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_m_000000_0' to file:/tmp/hadoop-ceteri/token_stop_17673_4CCF3534B9F67CBCF5F864501C33F5B1
12/11/23 01:17:33 INFO mapred.LocalJobRunner: file:/Users/ceteri/src/concur/Impatient/part6/data/rain.txt:0+521
12/11/23 01:17:33 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
12/11/23 01:17:35 INFO flow.FlowStep: [tfidf] starting step: (2/10)
12/11/23 01:17:35 INFO mapred.FileInputFormat: Total input paths to process : 1
12/11/23 01:17:35 INFO flow.FlowStep: [tfidf] submitted hadoop job: job_local_0002
12/11/23 01:17:35 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:17:35 INFO io.MultiInputSplit: current split input path: file:/tmp/hadoop-ceteri/token_stop_17673_4CCF3534B9F67CBCF5F864501C33F5B1/part-00000
12/11/23 01:17:35 INFO mapred.MapTask: numReduceTasks: 1
12/11/23 01:17:35 INFO mapred.MapTask: io.sort.mb = 100
12/11/23 01:17:35 INFO mapred.MapTask: data buffer = 79691776/99614720
12/11/23 01:17:35 INFO mapred.MapTask: record buffer = 262144/327680
12/11/23 01:17:35 INFO hadoop.FlowMapper: sourcing from: TempHfs["SequenceFile[['doc_id', 'token']]"][token_stop/17673/]
12/11/23 01:17:35 INFO hadoop.FlowMapper: sinking to: GroupBy(TF)[by:[{2}:'doc_id', 'token']]
12/11/23 01:17:35 INFO assembly.AggregateBy: using threshold value: 10000
12/11/23 01:17:35 INFO mapred.MapTask: Starting flush of map output
12/11/23 01:17:35 INFO mapred.MapTask: Finished spill 0
12/11/23 01:17:35 INFO mapred.Task: Task:attempt_local_0002_m_000000_0 is done. And is in the process of commiting
12/11/23 01:17:38 INFO mapred.LocalJobRunner: file:/tmp/hadoop-ceteri/token_stop_17673_4CCF3534B9F67CBCF5F864501C33F5B1/part-00000:0+1539
12/11/23 01:17:38 INFO mapred.Task: Task 'attempt_local_0002_m_000000_0' done.
12/11/23 01:17:38 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:17:38 INFO mapred.LocalJobRunner:
12/11/23 01:17:38 INFO mapred.Merger: Merging 1 sorted segments
12/11/23 01:17:38 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1321 bytes
12/11/23 01:17:38 INFO mapred.LocalJobRunner:
12/11/23 01:17:38 INFO hadoop.FlowReducer: sourcing from: GroupBy(TF)[by:[{2}:'doc_id', 'token']]
12/11/23 01:17:38 INFO hadoop.FlowReducer: sinking to: TempHfs["SequenceFile[['doc_id', 'tf_count', 'tf_token']]"][TF/93477/]
12/11/23 01:17:38 INFO mapred.Task: Task:attempt_local_0002_r_000000_0 is done. And is in the process of commiting
12/11/23 01:17:38 INFO mapred.LocalJobRunner:
12/11/23 01:17:38 INFO mapred.Task: Task attempt_local_0002_r_000000_0 is allowed to commit now
12/11/23 01:17:38 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0002_r_000000_0' to file:/tmp/hadoop-ceteri/TF_93477_60316665166041DC2ADDD203220CED34
12/11/23 01:17:41 INFO mapred.LocalJobRunner: reduce > reduce
12/11/23 01:17:41 INFO mapred.Task: Task 'attempt_local_0002_r_000000_0' done.
12/11/23 01:17:45 INFO flow.FlowStep: [tfidf] starting step: (4/10)
12/11/23 01:17:45 INFO mapred.FileInputFormat: Total input paths to process : 1
12/11/23 01:17:45 INFO flow.FlowStep: [tfidf] submitted hadoop job: job_local_0003
12/11/23 01:17:45 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:17:45 INFO io.MultiInputSplit: current split input path: file:/tmp/hadoop-ceteri/token_stop_17673_4CCF3534B9F67CBCF5F864501C33F5B1/part-00000
12/11/23 01:17:45 INFO mapred.MapTask: numReduceTasks: 1
12/11/23 01:17:45 INFO mapred.MapTask: io.sort.mb = 100
12/11/23 01:17:45 INFO mapred.MapTask: data buffer = 79691776/99614720
12/11/23 01:17:45 INFO mapred.MapTask: record buffer = 262144/327680
12/11/23 01:17:45 INFO hadoop.FlowMapper: sourcing from: TempHfs["SequenceFile[['doc_id', 'token']]"][token_stop/17673/]
12/11/23 01:17:45 INFO hadoop.FlowMapper: sinking to: GroupBy(DF)[by:[{?}:ALL]]
12/11/23 01:17:45 INFO mapred.MapTask: Starting flush of map output
12/11/23 01:17:45 INFO mapred.MapTask: Finished spill 0
12/11/23 01:17:45 INFO mapred.Task: Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting
12/11/23 01:17:48 INFO mapred.LocalJobRunner: file:/tmp/hadoop-ceteri/token_stop_17673_4CCF3534B9F67CBCF5F864501C33F5B1/part-00000:0+1539
12/11/23 01:17:48 INFO mapred.Task: Task 'attempt_local_0003_m_000000_0' done.
12/11/23 01:17:48 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:17:48 INFO mapred.LocalJobRunner:
12/11/23 01:17:48 INFO mapred.Merger: Merging 1 sorted segments
12/11/23 01:17:48 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1229 bytes
12/11/23 01:17:48 INFO mapred.LocalJobRunner:
12/11/23 01:17:48 INFO hadoop.FlowReducer: sourcing from: GroupBy(DF)[by:[{?}:ALL]]
12/11/23 01:17:48 INFO hadoop.FlowReducer: sinking to: TempHfs["SequenceFile[['token', 'df_count']]"][DF/16161/]
12/11/23 01:17:48 INFO assembly.AggregateBy: using threshold value: 10000
12/11/23 01:17:48 INFO mapred.Task: Task:attempt_local_0003_r_000000_0 is done. And is in the process of commiting
12/11/23 01:17:48 INFO mapred.LocalJobRunner:
12/11/23 01:17:48 INFO mapred.Task: Task attempt_local_0003_r_000000_0 is allowed to commit now
12/11/23 01:17:48 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0003_r_000000_0' to file:/tmp/hadoop-ceteri/DF_16161_D43FAAAC63093FBD0A72E91718DD526E
12/11/23 01:17:51 INFO mapred.LocalJobRunner: reduce > reduce
12/11/23 01:17:51 INFO mapred.Task: Task 'attempt_local_0003_r_000000_0' done.
12/11/23 01:17:55 INFO flow.FlowStep: [tfidf] starting step: (6/10)
12/11/23 01:17:55 INFO mapred.FileInputFormat: Total input paths to process : 1
12/11/23 01:17:55 INFO flow.FlowStep: [tfidf] submitted hadoop job: job_local_0004
12/11/23 01:17:55 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:17:55 INFO io.MultiInputSplit: current split input path: file:/tmp/hadoop-ceteri/TF_93477_60316665166041DC2ADDD203220CED34/part-00000
12/11/23 01:17:55 INFO mapred.MapTask: numReduceTasks: 1
12/11/23 01:17:55 INFO mapred.MapTask: io.sort.mb = 100
12/11/23 01:17:55 INFO mapred.MapTask: data buffer = 79691776/99614720
12/11/23 01:17:55 INFO mapred.MapTask: record buffer = 262144/327680
12/11/23 01:17:55 INFO hadoop.FlowMapper: sourcing from: TempHfs["SequenceFile[['doc_id', 'tf_count', 'tf_token']]"][TF/93477/]
12/11/23 01:17:55 INFO hadoop.FlowMapper: sinking to: GroupBy(wc)[by:[{1}:'tf_token']]
12/11/23 01:17:55 INFO assembly.AggregateBy: using threshold value: 10000
12/11/23 01:17:55 INFO mapred.MapTask: Starting flush of map output
12/11/23 01:17:55 INFO mapred.MapTask: Finished spill 0
12/11/23 01:17:55 INFO mapred.Task: Task:attempt_local_0004_m_000000_0 is done. And is in the process of commiting
12/11/23 01:17:58 INFO mapred.LocalJobRunner: file:/tmp/hadoop-ceteri/TF_93477_60316665166041DC2ADDD203220CED34/part-00000:0+1573
12/11/23 01:17:58 INFO mapred.Task: Task 'attempt_local_0004_m_000000_0' done.
12/11/23 01:17:58 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:17:58 INFO mapred.LocalJobRunner:
12/11/23 01:17:58 INFO mapred.Merger: Merging 1 sorted segments
12/11/23 01:17:58 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 561 bytes
12/11/23 01:17:58 INFO mapred.LocalJobRunner:
12/11/23 01:17:58 INFO hadoop.FlowReducer: sourcing from: GroupBy(wc)[by:[{1}:'tf_token']]
12/11/23 01:17:58 INFO hadoop.FlowReducer: sinking to: TempHfs["SequenceFile[['tf_token', 'count']]"][wc/55541/]
12/11/23 01:17:58 INFO mapred.Task: Task:attempt_local_0004_r_000000_0 is done. And is in the process of commiting
12/11/23 01:17:58 INFO mapred.LocalJobRunner:
12/11/23 01:17:58 INFO mapred.Task: Task attempt_local_0004_r_000000_0 is allowed to commit now
12/11/23 01:17:58 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0004_r_000000_0' to file:/tmp/hadoop-ceteri/wc_55541_9293098C2837283CBA90A2661CE9DA5A
12/11/23 01:18:01 INFO mapred.LocalJobRunner: reduce > reduce
12/11/23 01:18:01 INFO mapred.Task: Task 'attempt_local_0004_r_000000_0' done.
12/11/23 01:18:05 INFO flow.FlowStep: [tfidf] starting step: (8/10)
12/11/23 01:18:05 INFO mapred.FileInputFormat: Total input paths to process : 1
12/11/23 01:18:05 INFO flow.FlowStep: [tfidf] submitted hadoop job: job_local_0005
12/11/23 01:18:05 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:18:05 INFO io.MultiInputSplit: current split input path: file:/tmp/hadoop-ceteri/DF_16161_D43FAAAC63093FBD0A72E91718DD526E/part-00000
12/11/23 01:18:05 INFO mapred.MapTask: numReduceTasks: 1
12/11/23 01:18:05 INFO mapred.MapTask: io.sort.mb = 100
12/11/23 01:18:05 INFO mapred.MapTask: data buffer = 79691776/99614720
12/11/23 01:18:05 INFO mapred.MapTask: record buffer = 262144/327680
12/11/23 01:18:05 INFO hadoop.FlowMapper: sourcing from: TempHfs["SequenceFile[['token', 'df_count']]"][DF/16161/]
12/11/23 01:18:05 INFO hadoop.FlowMapper: sinking to: GroupBy(DF)[by:[{1}:'token']]
12/11/23 01:18:05 INFO mapred.MapTask: Starting flush of map output
12/11/23 01:18:05 INFO mapred.MapTask: Finished spill 0
12/11/23 01:18:05 INFO mapred.Task: Task:attempt_local_0005_m_000000_0 is done. And is in the process of commiting
12/11/23 01:18:08 INFO mapred.LocalJobRunner: file:/tmp/hadoop-ceteri/DF_16161_D43FAAAC63093FBD0A72E91718DD526E/part-00000:0+784
12/11/23 01:18:08 INFO mapred.Task: Task 'attempt_local_0005_m_000000_0' done.
12/11/23 01:18:08 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:18:08 INFO mapred.LocalJobRunner:
12/11/23 01:18:08 INFO mapred.Merger: Merging 1 sorted segments
12/11/23 01:18:08 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 561 bytes
12/11/23 01:18:08 INFO mapred.LocalJobRunner:
12/11/23 01:18:08 INFO hadoop.FlowReducer: sourcing from: GroupBy(DF)[by:[{1}:'token']]
12/11/23 01:18:08 INFO hadoop.FlowReducer: sinking to: TempHfs["SequenceFile[['df_count', 'df_token', 'lhs_join']]"][DF/26111/]
['df_count', 'df_token', 'lhs_join']
['1', 'air', '1']
['3', 'area', '1']
['1', 'australia', '1']
['1', 'broken', '1']
['1', 'california's', '1']
['1', 'cause', '1']
['1', 'cloudcover', '1']
['1', 'death', '1']
['1', 'deserts', '1']
['1', 'downwind', '1']
['df_count', 'df_token', 'lhs_join']
['3', 'dry', '1']
['1', 'dvd', '1']
['1', 'effect', '1']
['1', 'known', '1']
['2', 'land', '1']
['2', 'lee', '1']
['2', 'leeward', '1']
['1', 'less', '1']
['1', 'lies', '1']
['3', 'mountain', '1']
['df_count', 'df_token', 'lhs_join']
['1', 'mountainous', '1']
['1', 'primary', '1']
['1', 'produces', '1']
['4', 'rain', '1']
['1', 'ranges', '1']
['1', 'secrets', '1']
['4', 'shadow', '1']
['1', 'sinking', '1']
['1', 'such', '1']
['1', 'valley', '1']
['df_count', 'df_token', 'lhs_join']
['1', 'women', '1']
tuples count: 31
12/11/23 01:18:08 INFO mapred.Task: Task:attempt_local_0005_r_000000_0 is done. And is in the process of commiting
12/11/23 01:18:08 INFO mapred.LocalJobRunner:
12/11/23 01:18:08 INFO mapred.Task: Task attempt_local_0005_r_000000_0 is allowed to commit now
12/11/23 01:18:08 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0005_r_000000_0' to file:/tmp/hadoop-ceteri/DF_26111_91B0B8E1930CF07B8291C81D474E249B
12/11/23 01:18:11 INFO mapred.LocalJobRunner: reduce > reduce
12/11/23 01:18:11 INFO mapred.Task: Task 'attempt_local_0005_r_000000_0' done.
12/11/23 01:18:15 INFO flow.FlowStep: [tfidf] starting step: (9/10) output/wc
12/11/23 01:18:15 INFO mapred.FileInputFormat: Total input paths to process : 1
12/11/23 01:18:15 INFO flow.FlowStep: [tfidf] submitted hadoop job: job_local_0006
12/11/23 01:18:15 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:18:15 INFO io.MultiInputSplit: current split input path: file:/tmp/hadoop-ceteri/wc_55541_9293098C2837283CBA90A2661CE9DA5A/part-00000
12/11/23 01:18:15 INFO mapred.MapTask: numReduceTasks: 1
12/11/23 01:18:15 INFO mapred.MapTask: io.sort.mb = 100
12/11/23 01:18:15 INFO mapred.MapTask: data buffer = 79691776/99614720
12/11/23 01:18:15 INFO mapred.MapTask: record buffer = 262144/327680
12/11/23 01:18:15 INFO hadoop.FlowMapper: sourcing from: TempHfs["SequenceFile[['tf_token', 'count']]"][wc/55541/]
12/11/23 01:18:15 INFO hadoop.FlowMapper: sinking to: GroupBy(wc)[by:[{1}:'count']]
12/11/23 01:18:15 INFO mapred.MapTask: Starting flush of map output
12/11/23 01:18:15 INFO mapred.MapTask: Finished spill 0
12/11/23 01:18:15 INFO mapred.Task: Task:attempt_local_0006_m_000000_0 is done. And is in the process of commiting
12/11/23 01:18:18 INFO mapred.LocalJobRunner: file:/tmp/hadoop-ceteri/wc_55541_9293098C2837283CBA90A2661CE9DA5A/part-00000:0+784
12/11/23 01:18:18 INFO mapred.Task: Task 'attempt_local_0006_m_000000_0' done.
12/11/23 01:18:18 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:18:18 INFO mapred.LocalJobRunner:
12/11/23 01:18:18 INFO mapred.Merger: Merging 1 sorted segments
12/11/23 01:18:18 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 654 bytes
12/11/23 01:18:18 INFO mapred.LocalJobRunner:
12/11/23 01:18:18 INFO hadoop.FlowReducer: sourcing from: GroupBy(wc)[by:[{1}:'count']]
12/11/23 01:18:18 INFO hadoop.FlowReducer: sinking to: Hfs["TextDelimited[[UNKNOWN]->['tf_token', 'count']]"]["output/wc"]"]
12/11/23 01:18:18 INFO mapred.Task: Task:attempt_local_0006_r_000000_0 is done. And is in the process of commiting
12/11/23 01:18:18 INFO mapred.LocalJobRunner:
12/11/23 01:18:18 INFO mapred.Task: Task attempt_local_0006_r_000000_0 is allowed to commit now
12/11/23 01:18:18 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0006_r_000000_0' to file:/Users/ceteri/src/concur/Impatient/part6/output/wc
12/11/23 01:18:21 INFO mapred.LocalJobRunner: reduce > reduce
12/11/23 01:18:21 INFO mapred.Task: Task 'attempt_local_0006_r_000000_0' done.
12/11/23 01:18:25 INFO flow.FlowStep: [tfidf] starting step: (3/10)
12/11/23 01:18:25 INFO mapred.FileInputFormat: Total input paths to process : 1
12/11/23 01:18:25 INFO flow.FlowStep: [tfidf] submitted hadoop job: job_local_0007
12/11/23 01:18:25 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:18:25 INFO io.MultiInputSplit: current split input path: file:/tmp/hadoop-ceteri/token_stop_17673_4CCF3534B9F67CBCF5F864501C33F5B1/part-00000
12/11/23 01:18:25 INFO mapred.MapTask: numReduceTasks: 1
12/11/23 01:18:25 INFO mapred.MapTask: io.sort.mb = 100
12/11/23 01:18:25 INFO mapred.MapTask: data buffer = 79691776/99614720
12/11/23 01:18:25 INFO mapred.MapTask: record buffer = 262144/327680
12/11/23 01:18:25 INFO hadoop.FlowMapper: sourcing from: TempHfs["SequenceFile[['doc_id', 'token']]"][token_stop/17673/]
12/11/23 01:18:25 INFO hadoop.FlowMapper: sinking to: GroupBy(D)[by:[{1}:'doc_id']]
12/11/23 01:18:25 INFO mapred.MapTask: Starting flush of map output
12/11/23 01:18:25 INFO mapred.MapTask: Finished spill 0
12/11/23 01:18:25 INFO mapred.Task: Task:attempt_local_0007_m_000000_0 is done. And is in the process of commiting
12/11/23 01:18:28 INFO mapred.LocalJobRunner: file:/tmp/hadoop-ceteri/token_stop_17673_4CCF3534B9F67CBCF5F864501C33F5B1/part-00000:0+1539
12/11/23 01:18:28 INFO mapred.Task: Task 'attempt_local_0007_m_000000_0' done.
12/11/23 01:18:28 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:18:28 INFO mapred.LocalJobRunner:
12/11/23 01:18:28 INFO mapred.Merger: Merging 1 sorted segments
12/11/23 01:18:28 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 127 bytes
12/11/23 01:18:28 INFO mapred.LocalJobRunner:
12/11/23 01:18:28 INFO hadoop.FlowReducer: sourcing from: GroupBy(D)[by:[{1}:'doc_id']]
12/11/23 01:18:28 INFO hadoop.FlowReducer: sinking to: TempHfs["SequenceFile[['rhs_join', 'n_docs']]"][D/83640/]
12/11/23 01:18:28 INFO assembly.AggregateBy: using threshold value: 10000
12/11/23 01:18:28 INFO mapred.Task: Task:attempt_local_0007_r_000000_0 is done. And is in the process of commiting
12/11/23 01:18:28 INFO mapred.LocalJobRunner:
12/11/23 01:18:28 INFO mapred.Task: Task attempt_local_0007_r_000000_0 is allowed to commit now
12/11/23 01:18:28 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0007_r_000000_0' to file:/tmp/hadoop-ceteri/D_83640_E85C09EE3BA89D689318F7CD11A25DB2
12/11/23 01:18:31 INFO mapred.LocalJobRunner: reduce > reduce
12/11/23 01:18:31 INFO mapred.Task: Task 'attempt_local_0007_r_000000_0' done.
12/11/23 01:18:35 INFO flow.FlowStep: [tfidf] starting step: (7/10)
12/11/23 01:18:35 INFO mapred.FileInputFormat: Total input paths to process : 1
12/11/23 01:18:35 INFO flow.FlowStep: [tfidf] submitted hadoop job: job_local_0008
12/11/23 01:18:35 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:18:35 INFO io.MultiInputSplit: current split input path: file:/tmp/hadoop-ceteri/D_83640_E85C09EE3BA89D689318F7CD11A25DB2/part-00000
12/11/23 01:18:35 INFO mapred.MapTask: numReduceTasks: 1
12/11/23 01:18:35 INFO mapred.MapTask: io.sort.mb = 100
12/11/23 01:18:36 INFO mapred.MapTask: data buffer = 79691776/99614720
12/11/23 01:18:36 INFO mapred.MapTask: record buffer = 262144/327680
12/11/23 01:18:36 INFO hadoop.FlowMapper: sourcing from: TempHfs["SequenceFile[['rhs_join', 'n_docs']]"][D/83640/]
12/11/23 01:18:36 INFO hadoop.FlowMapper: sinking to: GroupBy(D)[by:[{1}:'rhs_join']]
12/11/23 01:18:36 INFO mapred.MapTask: Starting flush of map output
12/11/23 01:18:36 INFO mapred.MapTask: Finished spill 0
12/11/23 01:18:36 INFO mapred.Task: Task:attempt_local_0008_m_000000_0 is done. And is in the process of commiting
12/11/23 01:18:38 INFO mapred.LocalJobRunner: file:/tmp/hadoop-ceteri/D_83640_E85C09EE3BA89D689318F7CD11A25DB2/part-00000:0+84
12/11/23 01:18:38 INFO mapred.Task: Task 'attempt_local_0008_m_000000_0' done.
12/11/23 01:18:38 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:18:38 INFO mapred.LocalJobRunner:
12/11/23 01:18:38 INFO mapred.Merger: Merging 1 sorted segments
12/11/23 01:18:38 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 11 bytes
12/11/23 01:18:38 INFO mapred.LocalJobRunner:
12/11/23 01:18:38 INFO hadoop.FlowReducer: sourcing from: GroupBy(D)[by:[{1}:'rhs_join']]
12/11/23 01:18:38 INFO hadoop.FlowReducer: sinking to: TempHfs["SequenceFile[['rhs_join', 'n_docs']]"][D/89167/]
12/11/23 01:18:38 INFO mapred.Task: Task:attempt_local_0008_r_000000_0 is done. And is in the process of commiting
12/11/23 01:18:38 INFO mapred.LocalJobRunner:
12/11/23 01:18:38 INFO mapred.Task: Task attempt_local_0008_r_000000_0 is allowed to commit now
12/11/23 01:18:38 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0008_r_000000_0' to file:/tmp/hadoop-ceteri/D_89167_9F804A5A7A9303E584C2148354770D65
12/11/23 01:18:41 INFO mapred.LocalJobRunner: reduce > reduce
12/11/23 01:18:41 INFO mapred.Task: Task 'attempt_local_0008_r_000000_0' done.
12/11/23 01:18:45 INFO flow.FlowStep: [tfidf] starting step: (10/10) output/check
12/11/23 01:18:46 INFO mapred.FileInputFormat: Total input paths to process : 1
12/11/23 01:18:46 INFO flow.FlowStep: [tfidf] submitted hadoop job: job_local_0009
12/11/23 01:18:46 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:18:46 INFO io.MultiInputSplit: current split input path: file:/tmp/hadoop-ceteri/DF_26111_91B0B8E1930CF07B8291C81D474E249B/part-00000
12/11/23 01:18:46 INFO mapred.MapTask: numReduceTasks: 0
12/11/23 01:18:46 INFO hadoop.FlowMapper: sourcing from: TempHfs["SequenceFile[['df_count', 'df_token', 'lhs_join']]"][DF/26111/]
12/11/23 01:18:46 INFO hadoop.FlowMapper: sourcing from: TempHfs["SequenceFile[['rhs_join', 'n_docs']]"][D/89167/]
12/11/23 01:18:46 INFO hadoop.FlowMapper: sinking to: Hfs["TextDelimited[['df_count', 'df_token', 'lhs_join', 'rhs_join', 'n_docs']]"]["output/check"]"]
12/11/23 01:18:46 INFO collect.SpillableTupleList: attempting to load codec: org.apache.hadoop.io.compress.GzipCodec
12/11/23 01:18:46 INFO collect.SpillableTupleList: found codec: org.apache.hadoop.io.compress.GzipCodec
12/11/23 01:18:46 INFO mapred.FileInputFormat: Total input paths to process : 1
12/11/23 01:18:46 INFO collect.SpillableTupleList: attempting to load codec: org.apache.hadoop.io.compress.GzipCodec
12/11/23 01:18:46 INFO collect.SpillableTupleList: found codec: org.apache.hadoop.io.compress.GzipCodec
12/11/23 01:18:46 INFO mapred.Task: Task:attempt_local_0009_m_000000_0 is done. And is in the process of commiting
12/11/23 01:18:46 INFO mapred.LocalJobRunner:
12/11/23 01:18:46 INFO mapred.Task: Task attempt_local_0009_m_000000_0 is allowed to commit now
12/11/23 01:18:46 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0009_m_000000_0' to file:/Users/ceteri/src/concur/Impatient/part6/output/check
12/11/23 01:18:49 INFO mapred.LocalJobRunner: file:/tmp/hadoop-ceteri/DF_26111_91B0B8E1930CF07B8291C81D474E249B/part-00000:0+846
12/11/23 01:18:49 INFO mapred.Task: Task 'attempt_local_0009_m_000000_0' done.
12/11/23 01:18:51 INFO flow.FlowStep: [tfidf] starting step: (5/10) output/tfidf
12/11/23 01:18:51 INFO mapred.FileInputFormat: Total input paths to process : 1
12/11/23 01:18:51 INFO mapred.FileInputFormat: Total input paths to process : 1
12/11/23 01:18:51 INFO flow.FlowStep: [tfidf] submitted hadoop job: job_local_0010
12/11/23 01:18:51 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:18:51 INFO io.MultiInputSplit: current split input path: file:/tmp/hadoop-ceteri/TF_93477_60316665166041DC2ADDD203220CED34/part-00000
12/11/23 01:18:51 INFO mapred.MapTask: numReduceTasks: 1
12/11/23 01:18:51 INFO mapred.MapTask: io.sort.mb = 100
12/11/23 01:18:51 INFO mapred.MapTask: data buffer = 79691776/99614720
12/11/23 01:18:51 INFO mapred.MapTask: record buffer = 262144/327680
12/11/23 01:18:51 INFO hadoop.FlowMapper: sourcing from: TempHfs["SequenceFile[['doc_id', 'tf_count', 'tf_token']]"][TF/93477/]
12/11/23 01:18:51 INFO hadoop.FlowMapper: sinking to: CoGroup(TF*checkpoint)[by:TF:[{1}:'tf_token']checkpoint:[{1}:'df_token']]
12/11/23 01:18:51 INFO mapred.MapTask: Starting flush of map output
12/11/23 01:18:51 INFO mapred.MapTask: Finished spill 0
12/11/23 01:18:51 INFO mapred.Task: Task:attempt_local_0010_m_000000_0 is done. And is in the process of commiting
12/11/23 01:18:54 INFO mapred.LocalJobRunner: file:/tmp/hadoop-ceteri/TF_93477_60316665166041DC2ADDD203220CED34/part-00000:0+1573
12/11/23 01:18:54 INFO mapred.Task: Task 'attempt_local_0010_m_000000_0' done.
12/11/23 01:18:54 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:18:54 INFO io.MultiInputSplit: current split input path: file:/Users/ceteri/src/concur/Impatient/part6/output/check/part-00000
12/11/23 01:18:54 INFO mapred.MapTask: numReduceTasks: 1
12/11/23 01:18:54 INFO mapred.MapTask: io.sort.mb = 100
12/11/23 01:18:54 INFO mapred.MapTask: data buffer = 79691776/99614720
12/11/23 01:18:54 INFO mapred.MapTask: record buffer = 262144/327680
12/11/23 01:18:54 INFO hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['df_count', 'df_token', 'lhs_join', 'rhs_join', 'n_docs']]"]["output/check"]"]
12/11/23 01:18:54 INFO hadoop.FlowMapper: sinking to: CoGroup(TF*checkpoint)[by:TF:[{1}:'tf_token']checkpoint:[{1}:'df_token']]
12/11/23 01:18:54 INFO mapred.MapTask: Starting flush of map output
12/11/23 01:18:54 INFO mapred.MapTask: Finished spill 0
12/11/23 01:18:54 INFO mapred.Task: Task:attempt_local_0010_m_000001_0 is done. And is in the process of commiting
12/11/23 01:18:57 INFO mapred.LocalJobRunner: file:/Users/ceteri/src/concur/Impatient/part6/output/check/part-00000:0+509
12/11/23 01:18:57 INFO mapred.Task: Task 'attempt_local_0010_m_000001_0' done.
12/11/23 01:18:57 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/11/23 01:18:57 INFO mapred.LocalJobRunner:
12/11/23 01:18:57 INFO mapred.Merger: Merging 2 sorted segments
12/11/23 01:18:57 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 2672 bytes
12/11/23 01:18:57 INFO mapred.LocalJobRunner:
12/11/23 01:18:57 INFO hadoop.FlowReducer: sourcing from: CoGroup(TF*checkpoint)[by:TF:[{1}:'tf_token']checkpoint:[{1}:'df_token']]
12/11/23 01:18:57 INFO hadoop.FlowReducer: sinking to: Hfs["TextDelimited[[UNKNOWN]->['doc_id', 'tfidf', 'token']]"]["output/tfidf"]"]
12/11/23 01:18:57 INFO collect.SpillableTupleList: attempting to load codec: org.apache.hadoop.io.compress.GzipCodec
12/11/23 01:18:57 INFO collect.SpillableTupleList: found codec: org.apache.hadoop.io.compress.GzipCodec
12/11/23 01:18:57 INFO mapred.Task: Task:attempt_local_0010_r_000000_0 is done. And is in the process of commiting
12/11/23 01:18:57 INFO mapred.LocalJobRunner:
12/11/23 01:18:57 INFO mapred.Task: Task attempt_local_0010_r_000000_0 is allowed to commit now
12/11/23 01:18:57 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0010_r_000000_0' to file:/Users/ceteri/src/concur/Impatient/part6/output/tfidf
12/11/23 01:19:00 INFO mapred.LocalJobRunner: reduce > reduce
12/11/23 01:19:00 INFO mapred.Task: Task 'attempt_local_0010_r_000000_0' done.
12/11/23 01:19:01 INFO util.Hadoop18TapUtil: deleting temp path output/trap/_temporary
12/11/23 01:19:01 INFO util.Hadoop18TapUtil: deleting temp path output/wc/_temporary
12/11/23 01:19:01 INFO util.Hadoop18TapUtil: deleting temp path output/check/_temporary
12/11/23 01:19:01 INFO util.Hadoop18TapUtil: deleting temp path output/tfidf/_temporary
bash-3.2$
bash-3.2$ more output/tfidf/part-00000
doc_id tfidf token
doc02 0.9162907318741551 air
doc01 0.44628710262841953 area
doc03 0.22314355131420976 area
doc02 0.22314355131420976 area
doc05 0.9162907318741551 australia
doc05 0.9162907318741551 broken
doc04 0.9162907318741551 california's
doc04 0.9162907318741551 cause
doc02 0.9162907318741551 cloudcover
doc04 0.9162907318741551 death
doc04 0.9162907318741551 deserts
doc03 0.9162907318741551 downwind
doc02 0.22314355131420976 dry
doc01 0.22314355131420976 dry
doc03 0.22314355131420976 dry
doc05 0.9162907318741551 dvd
doc04 0.9162907318741551 effect
doc04 0.9162907318741551 known
doc05 0.5108256237659907 land
doc03 0.5108256237659907 land
doc01 0.5108256237659907 lee
doc02 0.5108256237659907 lee
doc04 0.5108256237659907 leeward
doc03 0.5108256237659907 leeward
doc02 0.9162907318741551 less
doc03 0.9162907318741551 lies
doc03 0.22314355131420976 mountain
doc02 0.22314355131420976 mountain
doc04 0.22314355131420976 mountain
doc01 0.9162907318741551 mountainous
doc04 0.9162907318741551 primary
doc02 0.9162907318741551 produces
doc01 0.0 rain
doc02 0.0 rain
doc04 0.0 rain
doc03 0.0 rain
doc04 0.9162907318741551 ranges
doc05 0.9162907318741551 secrets
doc04 0.0 shadow
doc02 0.0 shadow
doc01 0.0 shadow
doc03 0.0 shadow
doc02 0.9162907318741551 sinking
doc04 0.9162907318741551 such
doc04 0.9162907318741551 valley
doc05 0.9162907318741551 women
bash-3.2$ more output/check/part-00000
df_count df_token lhs_join rhs_join n_docs
1 air 1 1 5
3 area 1 1 5
1 australia 1 1 5
1 broken 1 1 5
1 california's 1 1 5
1 cause 1 1 5
1 cloudcover 1 1 5
1 death 1 1 5
1 deserts 1 1 5
1 downwind 1 1 5
3 dry 1 1 5
1 dvd 1 1 5
1 effect 1 1 5
1 known 1 1 5
2 land 1 1 5
2 lee 1 1 5
2 leeward 1 1 5
1 less 1 1 5
1 lies 1 1 5
3 mountain 1 1 5
1 mountainous 1 1 5
1 primary 1 1 5
1 produces 1 1 5
4 rain 1 1 5
1 ranges 1 1 5
1 secrets 1 1 5
4 shadow 1 1 5
1 sinking 1 1 5
1 such 1 1 5
1 valley 1 1 5
1 women 1 1 5
bash-3.2$ more output/trap/part-m-00001-00000
zoink null
bash-3.2$
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment