Skip to content

Instantly share code, notes, and snippets.

@zbjornson
Created October 12, 2015 19:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zbjornson/ac6a378129f1675ca5ff to your computer and use it in GitHub Desktop.
Save zbjornson/ac6a378129f1675ca5ff to your computer and use it in GitHub Desktop.
Running arbitrary jobs in parallel on Google Dataflow
package demo.pipeline;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.coders.VoidCoder;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.MapElements;
import com.google.cloud.dataflow.sdk.transforms.SimpleFunction;
import com.google.cloud.dataflow.sdk.util.GcsUtil;
import com.google.cloud.dataflow.sdk.util.GcsUtil.GcsUtilFactory;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
@SuppressWarnings("serial")
public class StarterPipeline2 {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
private static final File TEMP_PATH = new File("/dataflow/logs/taskrunner/harness/");
private static final int QUOTA = 90; // CPUs
public static void main(String[] args) {
PipelineOptionsFactory.Builder opts = PipelineOptionsFactory.fromArgs(args)
.withValidation();
GcsUtil gcsUtil = new GcsUtilFactory().create(opts.create());
try {
// List files in bucket
List<GcsPath> bucketListing = gcsUtil.expand(GcsPath.fromUri("gs://my-bucket/*"));
// GcsPath cannot be serialized directly. Make a list of Strings.
List<String> files = new LinkedList<String>();
for (GcsPath path : bucketListing) {
files.add(path.toString());
}
// Use one worker per file, up to the quota.
DataflowPipelineOptions dfopts = opts.as(DataflowPipelineOptions.class);
dfopts.setNumWorkers(Math.min(files.size(), QUOTA));
dfopts.setWorkerMachineType("n1-standard-1");
Pipeline p = Pipeline.create(dfopts);
p.apply(Create.of(files).withCoder(StringUtf8Coder.of()))
.apply(MapElements.via(new Processor()))
.setCoder(VoidCoder.of());
p.run();
} catch (IOException e) {
e.printStackTrace();
}
}
static File getTempFile() {
String uuid = UUID.randomUUID().toString();
return new File(TEMP_PATH, uuid);
}
static void exec(String cmd) throws InterruptedException, IOException {
LOG.info("Running: " + cmd);
Process proc = new ProcessBuilder().inheritIO().command(cmd.split(" ")).start();
try {
int exitVal = proc.waitFor();
LOG.info("Command (" + cmd + ") exited with code " + exitVal);
} catch (InterruptedException e) {
LOG.error("Command (" + cmd + ") failed");
throw e;
}
}
static class Processor extends SimpleFunction<String, Void> {
@Override
public Void apply(String filePath) {
try {
File tempDir = StarterPipeline.getTempFile();
tempDir.mkdirs();
// Copy file to <temp>/input. Could also use GscUtil.open to get a SeekableByteStream.
File inputDir = new File(tempDir, "input");
inputDir.mkdirs();
String cmd = String.format("gsutil -m cp %s %s/", filePath, inputDir.getPath());
StarterPipeline.exec(cmd);
File outputDir = new File(tempDir, "output");
outputDir.mkdirs();
//
// Do work here on the file in <temp>/input, writing to <temp>/output
//
cmd = String.format("gsutil -m cp %s/* gs://my-bucket/output/", outputDir.getPath(), filePath);
StarterPipeline.exec(cmd);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment