Skip to content

Instantly share code, notes, and snippets.

@matt-martin
Last active August 29, 2015 14:07
Show Gist options
  • Save matt-martin/4cd9cab6ec761eb7f100 to your computer and use it in GitHub Desktop.
Save matt-martin/4cd9cab6ec761eb7f100 to your computer and use it in GitHub Desktop.
Slow compilation in scalding job
cascading version: 3.0.0, build: wip-57
application id: 843F17CE3594477D8C5E536EA2FC4CDE
application name:
application version:
platform: local:3.0.0-wip-57:Concurrent, Inc.
frameworks:
duration 2.114
PreBalanceAssembly 0.553
=======================
LoneGroupAssert 0.036
MissingGroupAssert 0.033
BufferAfterEveryAssert 0.353
EveryAfterBufferAssert 0.074
SplitBeforeEveryAssert 0.048
BalanceAssembly 0.003
=======================
PostBalanceAssembly 0.002
=======================
PreResolveAssembly 0.319
=======================
RemoveNoOpPipeTransformer 0.280
ApplyAssertionLevelTransformer 0.019
ApplyDebugLevelTransformer 0.009
ResolveAssembly 0.024
=======================
PostResolveAssembly 0.111
=======================
BlockingHashJoinAnnotator 0.050
HashJoinBlockingHashJoinAnnotator 0.039
PartitionSteps 0.595
=======================
WholeGraphStepPartitioner 0.594
PostSteps 0.008
=======================
PartitionNodes 0.467
=======================
WholeGraphNodePartitioner 0.467
PostNodes 0.001
=======================
PartitionPipelines 0.000
=======================
PostPipelines 0.001
=======================
cascading version: 3.0.0, build: wip-57
application id: AF515DFF676743C4A82D4CCC521F5B96
application name:
application version:
platform: local:3.0.0-wip-57:Concurrent, Inc.
frameworks:
duration 8.090
PreBalanceAssembly 3.230
=======================
LoneGroupAssert 0.075
MissingGroupAssert 0.089
BufferAfterEveryAssert 1.825
EveryAfterBufferAssert 0.728
SplitBeforeEveryAssert 0.488
BalanceAssembly 0.006
=======================
PostBalanceAssembly 0.008
=======================
PreResolveAssembly 1.038
=======================
RemoveNoOpPipeTransformer 0.968
ApplyAssertionLevelTransformer 0.034
ApplyDebugLevelTransformer 0.030
ResolveAssembly 0.076
=======================
PostResolveAssembly 0.490
=======================
BlockingHashJoinAnnotator 0.300
HashJoinBlockingHashJoinAnnotator 0.178
PartitionSteps 1.383
=======================
WholeGraphStepPartitioner 1.366
PostSteps 0.016
=======================
PartitionNodes 1.655
=======================
WholeGraphNodePartitioner 1.648
PostNodes 0.003
=======================
PartitionPipelines 0.074
=======================
PostPipelines 0.022
=======================
import cascading.flow.local.LocalFlowConnector;
import cascading.pipe.Merge;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.local.TextDelimited;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class CascadingJobWithSlowCompilation {
public static void main( String[] args ) throws IOException {
int numInputPaths = 100;
Map<String, Tap> sources = new HashMap<String, Tap>(numInputPaths);
Pipe pipe = null;
for (int i = 0; i < numInputPaths; i++) {
// create simple file
File tempFile = File.createTempFile("test", ".txt");
tempFile.deleteOnExit();
PrintWriter writer = new PrintWriter(tempFile);
try {
writer.println("test");
writer.println("testValue");
} finally {
writer.close();
}
// add tap
sources.put(tempFile.getAbsolutePath(), new FileTap(new TextDelimited(true, "\t"), tempFile.getAbsolutePath()));
if (pipe == null) {
pipe = new Pipe(tempFile.getAbsolutePath());
} else {
pipe = new Merge(pipe, new Pipe(tempFile.getAbsolutePath()));
}
}
File outFile = File.createTempFile("test", ".txt");
Properties properties = new Properties();
AppProps.setApplicationJarClass(properties, CascadingJobWithSlowCompilation.class);
LocalFlowConnector flowConnector = new LocalFlowConnector( properties );
// create the sink tap
Tap outTap = new FileTap( new TextDelimited( true, "\t" ), outFile.getAbsolutePath() );
// run the flow
flowConnector.connect( sources, outTap, pipe ).complete();
System.out.println("Output in " + outFile);
}
}
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import cascading.flow.local.LocalFlowConnector;
import cascading.operation.Identity;
import cascading.pipe.Each;
import cascading.pipe.Merge;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.local.TextDelimited;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;
public class CascadingSimple {
public static void main( String[] args ) throws IOException {
int numInputPaths = 365;
Map<String, Tap> sources = new HashMap<String, Tap>(numInputPaths);
List<Pipe> pipes = new ArrayList<Pipe>(numInputPaths);
for (int i = 0; i < numInputPaths; i++) {
// create simple file
File tempFile = File.createTempFile("test", ".txt");
tempFile.deleteOnExit();
PrintWriter writer = new PrintWriter(tempFile);
try {
writer.println("test");
writer.println("testValue");
} finally {
writer.close();
}
// add tap
sources.put(tempFile.getAbsolutePath(), new FileTap(new TextDelimited(true, "\t"), tempFile.getAbsolutePath()));
pipes.add(new Each( new Pipe( tempFile.getAbsolutePath() ), new Fields( "test" ), new Identity()));
}
File outFile = File.createTempFile("test", ".txt");
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, CascadingMain.class );
LocalFlowConnector flowConnector = new LocalFlowConnector( properties );
// create the sink tap
Tap outTap = new FileTap( new TextDelimited( true, "\t" ), outFile.getAbsolutePath() );
// specify a pipe to connect the taps
Pipe merge = new Merge("merge", pipes.toArray(new Pipe[numInputPaths]));
// run the flow
flowConnector.connect( sources, outTap, merge ).complete();
System.out.println("Output in " + outFile);
}
}
The CascadingSimple code completes in a matter of seconds when I run it on my laptop. However, the very similar job represented by ScaldingSimple.scala has a ~45 second pause which I think corresponds to the job compilation stage (see line 12, with a timestamp of 2014-10-15 18:13:19,836, and line 13, with a timestamp of 2014-10-15 18:14:05,416). If I rewrite the initial scalding code, it runs more or less just as quickly as the original Cascading code (see ScaldingSimpleFast.scala).:
2014-10-15 18:13:19,836 DEBUG [main] cascading.flow.planner.ElementGraph (ElementGraph.java:636) - setting outgoing declared: 'offset', 'line'
2014-10-15 18:13:19,836 DEBUG [main] cascading.flow.planner.ElementGraph (ElementGraph.java:638) - setting outgoing group: {_pipe_235='offset', _pipe_234='offset'}
2014-10-15 18:13:19,836 DEBUG [main] cascading.flow.planner.ElementGraph (ElementGraph.java:632) - for modifier: Merge(_pipe_236+_pipe_237)[by: _pipe_236:[{1}:0] _pipe_237:[{1}:0]]
2014-10-15 18:13:19,836 DEBUG [main] cascading.flow.planner.ElementGraph (ElementGraph.java:636) - setting outgoing declared: 'offset', 'line'
2014-10-15 18:13:19,836 DEBUG [main] cascading.flow.planner.ElementGraph (ElementGraph.java:638) - setting outgoing group: {_pipe_236='offset', _pipe_237='offset'}
2014-10-15 18:13:19,836 DEBUG [main] cascading.flow.planner.ElementGraph (ElementGraph.java:632) - for modifier: FileTap["TextLine[['offset', 'line']->[ALL]]"]["/var/folders/xv/vnz_xgwn7kd3wxnpm6q9b8hh0003bb/T/test7364894661987697688.txt"]
2014-10-15 18:14:05,416 INFO [flow ScaldingSimple] cascading.util.Version ( Version.java:78) - Concurrent, Inc - Cascading 2.5.5
2014-10-15 18:14:05,418 INFO [flow ScaldingSimple] cascading.flow.Flow (BaseFlow.java:1354) - [ScaldingSimple] starting
2014-10-15 18:14:05,418 INFO [flow ScaldingSimple] cascading.flow.Flow (BaseFlow.java:1354) - [ScaldingSimple] source: FileTap["TextLine[['offset', 'line']->[ALL]]"]["/var/folders/xv/vnz_xgwn7kd3wxnpm6q9b8hh0003bb/T/test3526484590140769188.txt"]
2014-10-15 18:14:05,418 INFO [flow ScaldingSimple] cascading.flow.Flow (BaseFlow.java:1354) - [ScaldingSimple] source: FileTap["TextLine[['offset', 'line']->[ALL]]"]["/var/folders/xv/vnz_xgwn7kd3wxnpm6q9b8hh0003bb/T/test4606544618344204517.txt"]
2014-10-15 18:14:05,418 INFO [flow ScaldingSimple] cascading.flow.Flow (BaseFlow.java:1354) - [ScaldingSimple] source: FileTap["TextLine[['offset', 'line']->[ALL]]"]["/var/folders/xv/vnz_xgwn7kd3wxnpm6q9b8hh0003bb/T/test98774922385137058.txt"]
import java.io.{PrintWriter, File}
import com.twitter.scalding._
class ScaldingSimple(args : Args) extends Job(args) {
val input = (1 to 120).map(
x => {
val tempFile = File.createTempFile("test", ".txt")
val writer : PrintWriter = new PrintWriter(tempFile)
try {
writer.println("test")
writer.println("testValue")
} finally {
writer.close()
}
sourceToRichPipe(TextLine(tempFile.getAbsolutePath))
}
).reduce(_ ++ _)
val output = TextLine(File.createTempFile("test", ".txt").getAbsolutePath)
input.write(output)
}
object ScaldingSimple extends App {
new ScaldingSimple(Mode.putMode(Local(true), new Args(Map()))).buildFlow.complete()
}
import java.io.{PrintWriter, File}
import cascading.pipe.Merge
import com.twitter.scalding._
class ScaldingSimpleFast(args : Args) extends Job(args) {
val input = new Merge("merge", (1 to 120).map(
x => {
val tempFile = File.createTempFile("test", ".txt")
val writer : PrintWriter = new PrintWriter(tempFile)
try {
writer.println("test")
writer.println("testValue")
} finally {
writer.close()
}
TextLine(tempFile.getAbsolutePath).read
}
).toSeq:_*)
val output = TextLine(File.createTempFile("test", ".txt").getAbsolutePath)
input.write(output)
}
object ScaldingSimpleFast extends App {
new ScaldingSimpleFast(Mode.putMode(Local(true), new Args(Map()))).buildFlow.complete()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment