Last active
August 29, 2015 14:07
-
-
Save matt-martin/4cd9cab6ec761eb7f100 to your computer and use it in GitHub Desktop.
Slow compilation in scalding job
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cascading version: 3.0.0, build: wip-57 | |
application id: 843F17CE3594477D8C5E536EA2FC4CDE | |
application name: | |
application version: | |
platform: local:3.0.0-wip-57:Concurrent, Inc. | |
frameworks: | |
duration 2.114 | |
PreBalanceAssembly 0.553 | |
======================= | |
LoneGroupAssert 0.036 | |
MissingGroupAssert 0.033 | |
BufferAfterEveryAssert 0.353 | |
EveryAfterBufferAssert 0.074 | |
SplitBeforeEveryAssert 0.048 | |
BalanceAssembly 0.003 | |
======================= | |
PostBalanceAssembly 0.002 | |
======================= | |
PreResolveAssembly 0.319 | |
======================= | |
RemoveNoOpPipeTransformer 0.280 | |
ApplyAssertionLevelTransformer 0.019 | |
ApplyDebugLevelTransformer 0.009 | |
ResolveAssembly 0.024 | |
======================= | |
PostResolveAssembly 0.111 | |
======================= | |
BlockingHashJoinAnnotator 0.050 | |
HashJoinBlockingHashJoinAnnotator 0.039 | |
PartitionSteps 0.595 | |
======================= | |
WholeGraphStepPartitioner 0.594 | |
PostSteps 0.008 | |
======================= | |
PartitionNodes 0.467 | |
======================= | |
WholeGraphNodePartitioner 0.467 | |
PostNodes 0.001 | |
======================= | |
PartitionPipelines 0.000 | |
======================= | |
PostPipelines 0.001 | |
======================= | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cascading version: 3.0.0, build: wip-57 | |
application id: AF515DFF676743C4A82D4CCC521F5B96 | |
application name: | |
application version: | |
platform: local:3.0.0-wip-57:Concurrent, Inc. | |
frameworks: | |
duration 8.090 | |
PreBalanceAssembly 3.230 | |
======================= | |
LoneGroupAssert 0.075 | |
MissingGroupAssert 0.089 | |
BufferAfterEveryAssert 1.825 | |
EveryAfterBufferAssert 0.728 | |
SplitBeforeEveryAssert 0.488 | |
BalanceAssembly 0.006 | |
======================= | |
PostBalanceAssembly 0.008 | |
======================= | |
PreResolveAssembly 1.038 | |
======================= | |
RemoveNoOpPipeTransformer 0.968 | |
ApplyAssertionLevelTransformer 0.034 | |
ApplyDebugLevelTransformer 0.030 | |
ResolveAssembly 0.076 | |
======================= | |
PostResolveAssembly 0.490 | |
======================= | |
BlockingHashJoinAnnotator 0.300 | |
HashJoinBlockingHashJoinAnnotator 0.178 | |
PartitionSteps 1.383 | |
======================= | |
WholeGraphStepPartitioner 1.366 | |
PostSteps 0.016 | |
======================= | |
PartitionNodes 1.655 | |
======================= | |
WholeGraphNodePartitioner 1.648 | |
PostNodes 0.003 | |
======================= | |
PartitionPipelines 0.074 | |
======================= | |
PostPipelines 0.022 | |
======================= | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cascading.flow.local.LocalFlowConnector; | |
import cascading.pipe.Merge; | |
import cascading.pipe.Pipe; | |
import cascading.property.AppProps; | |
import cascading.scheme.local.TextDelimited; | |
import cascading.tap.Tap; | |
import cascading.tap.local.FileTap; | |
import java.io.File; | |
import java.io.IOException; | |
import java.io.PrintWriter; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.Properties; | |
public class CascadingJobWithSlowCompilation { | |
public static void main( String[] args ) throws IOException { | |
int numInputPaths = 100; | |
Map<String, Tap> sources = new HashMap<String, Tap>(numInputPaths); | |
Pipe pipe = null; | |
for (int i = 0; i < numInputPaths; i++) { | |
// create simple file | |
File tempFile = File.createTempFile("test", ".txt"); | |
tempFile.deleteOnExit(); | |
PrintWriter writer = new PrintWriter(tempFile); | |
try { | |
writer.println("test"); | |
writer.println("testValue"); | |
} finally { | |
writer.close(); | |
} | |
// add tap | |
sources.put(tempFile.getAbsolutePath(), new FileTap(new TextDelimited(true, "\t"), tempFile.getAbsolutePath())); | |
if (pipe == null) { | |
pipe = new Pipe(tempFile.getAbsolutePath()); | |
} else { | |
pipe = new Merge(pipe, new Pipe(tempFile.getAbsolutePath())); | |
} | |
} | |
File outFile = File.createTempFile("test", ".txt"); | |
Properties properties = new Properties(); | |
AppProps.setApplicationJarClass(properties, CascadingJobWithSlowCompilation.class); | |
LocalFlowConnector flowConnector = new LocalFlowConnector( properties ); | |
// create the sink tap | |
Tap outTap = new FileTap( new TextDelimited( true, "\t" ), outFile.getAbsolutePath() ); | |
// run the flow | |
flowConnector.connect( sources, outTap, pipe ).complete(); | |
System.out.println("Output in " + outFile); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File; | |
import java.io.IOException; | |
import java.io.PrintWriter; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import cascading.flow.local.LocalFlowConnector; | |
import cascading.operation.Identity; | |
import cascading.pipe.Each; | |
import cascading.pipe.Merge; | |
import cascading.pipe.Pipe; | |
import cascading.property.AppProps; | |
import cascading.scheme.local.TextDelimited; | |
import cascading.tap.Tap; | |
import cascading.tap.local.FileTap; | |
import cascading.tuple.Fields; | |
public class CascadingSimple { | |
public static void main( String[] args ) throws IOException { | |
int numInputPaths = 365; | |
Map<String, Tap> sources = new HashMap<String, Tap>(numInputPaths); | |
List<Pipe> pipes = new ArrayList<Pipe>(numInputPaths); | |
for (int i = 0; i < numInputPaths; i++) { | |
// create simple file | |
File tempFile = File.createTempFile("test", ".txt"); | |
tempFile.deleteOnExit(); | |
PrintWriter writer = new PrintWriter(tempFile); | |
try { | |
writer.println("test"); | |
writer.println("testValue"); | |
} finally { | |
writer.close(); | |
} | |
// add tap | |
sources.put(tempFile.getAbsolutePath(), new FileTap(new TextDelimited(true, "\t"), tempFile.getAbsolutePath())); | |
pipes.add(new Each( new Pipe( tempFile.getAbsolutePath() ), new Fields( "test" ), new Identity())); | |
} | |
File outFile = File.createTempFile("test", ".txt"); | |
Properties properties = new Properties(); | |
AppProps.setApplicationJarClass( properties, CascadingMain.class ); | |
LocalFlowConnector flowConnector = new LocalFlowConnector( properties ); | |
// create the sink tap | |
Tap outTap = new FileTap( new TextDelimited( true, "\t" ), outFile.getAbsolutePath() ); | |
// specify a pipe to connect the taps | |
Pipe merge = new Merge("merge", pipes.toArray(new Pipe[numInputPaths])); | |
// run the flow | |
flowConnector.connect( sources, outTap, merge ).complete(); | |
System.out.println("Output in " + outFile); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The CascadingSimple code completes in a matter of seconds when I run it on my laptop. However, the very similar job represented by ScaldingSimple.scala has a ~45 second pause which I think corresponds to the job compilation stage (see line 12, with a timestamp of 2014-10-15 18:13:19,836, and line 13, with a timestamp of 2014-10-15 18:14:05,416). If I rewrite the initial scalding code, it runs more or less just as quickly as the original Cascading code (see ScaldingSimpleFast.scala).: | |
2014-10-15 18:13:19,836 DEBUG [main] cascading.flow.planner.ElementGraph (ElementGraph.java:636) - setting outgoing declared: 'offset', 'line' | |
2014-10-15 18:13:19,836 DEBUG [main] cascading.flow.planner.ElementGraph (ElementGraph.java:638) - setting outgoing group: {_pipe_235='offset', _pipe_234='offset'} | |
2014-10-15 18:13:19,836 DEBUG [main] cascading.flow.planner.ElementGraph (ElementGraph.java:632) - for modifier: Merge(_pipe_236+_pipe_237)[by: _pipe_236:[{1}:0] _pipe_237:[{1}:0]] | |
2014-10-15 18:13:19,836 DEBUG [main] cascading.flow.planner.ElementGraph (ElementGraph.java:636) - setting outgoing declared: 'offset', 'line' | |
2014-10-15 18:13:19,836 DEBUG [main] cascading.flow.planner.ElementGraph (ElementGraph.java:638) - setting outgoing group: {_pipe_236='offset', _pipe_237='offset'} | |
2014-10-15 18:13:19,836 DEBUG [main] cascading.flow.planner.ElementGraph (ElementGraph.java:632) - for modifier: FileTap["TextLine[['offset', 'line']->[ALL]]"]["/var/folders/xv/vnz_xgwn7kd3wxnpm6q9b8hh0003bb/T/test7364894661987697688.txt"] | |
2014-10-15 18:14:05,416 INFO [flow ScaldingSimple] cascading.util.Version ( Version.java:78) - Concurrent, Inc - Cascading 2.5.5 | |
2014-10-15 18:14:05,418 INFO [flow ScaldingSimple] cascading.flow.Flow (BaseFlow.java:1354) - [ScaldingSimple] starting | |
2014-10-15 18:14:05,418 INFO [flow ScaldingSimple] cascading.flow.Flow (BaseFlow.java:1354) - [ScaldingSimple] source: FileTap["TextLine[['offset', 'line']->[ALL]]"]["/var/folders/xv/vnz_xgwn7kd3wxnpm6q9b8hh0003bb/T/test3526484590140769188.txt"] | |
2014-10-15 18:14:05,418 INFO [flow ScaldingSimple] cascading.flow.Flow (BaseFlow.java:1354) - [ScaldingSimple] source: FileTap["TextLine[['offset', 'line']->[ALL]]"]["/var/folders/xv/vnz_xgwn7kd3wxnpm6q9b8hh0003bb/T/test4606544618344204517.txt"] | |
2014-10-15 18:14:05,418 INFO [flow ScaldingSimple] cascading.flow.Flow (BaseFlow.java:1354) - [ScaldingSimple] source: FileTap["TextLine[['offset', 'line']->[ALL]]"]["/var/folders/xv/vnz_xgwn7kd3wxnpm6q9b8hh0003bb/T/test98774922385137058.txt"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.{PrintWriter, File} | |
import com.twitter.scalding._ | |
class ScaldingSimple(args : Args) extends Job(args) { | |
val input = (1 to 120).map( | |
x => { | |
val tempFile = File.createTempFile("test", ".txt") | |
val writer : PrintWriter = new PrintWriter(tempFile) | |
try { | |
writer.println("test") | |
writer.println("testValue") | |
} finally { | |
writer.close() | |
} | |
sourceToRichPipe(TextLine(tempFile.getAbsolutePath)) | |
} | |
).reduce(_ ++ _) | |
val output = TextLine(File.createTempFile("test", ".txt").getAbsolutePath) | |
input.write(output) | |
} | |
object ScaldingSimple extends App { | |
new ScaldingSimple(Mode.putMode(Local(true), new Args(Map()))).buildFlow.complete() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.{PrintWriter, File} | |
import cascading.pipe.Merge | |
import com.twitter.scalding._ | |
class ScaldingSimpleFast(args : Args) extends Job(args) { | |
val input = new Merge("merge", (1 to 120).map( | |
x => { | |
val tempFile = File.createTempFile("test", ".txt") | |
val writer : PrintWriter = new PrintWriter(tempFile) | |
try { | |
writer.println("test") | |
writer.println("testValue") | |
} finally { | |
writer.close() | |
} | |
TextLine(tempFile.getAbsolutePath).read | |
} | |
).toSeq:_*) | |
val output = TextLine(File.createTempFile("test", ".txt").getAbsolutePath) | |
input.write(output) | |
} | |
object ScaldingSimpleFast extends App { | |
new ScaldingSimpleFast(Mode.putMode(Local(true), new Args(Map()))).buildFlow.complete() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment