Skip to content

Instantly share code, notes, and snippets.

@kevinsu
Created October 12, 2015 23:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kevinsu/988d32d4c1939aff1374 to your computer and use it in GitHub Desktop.
Save kevinsu/988d32d4c1939aff1374 to your computer and use it in GitHub Desktop.
Camus s3 hack
camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/CamusJob.java
@@ -243,13 +243,13 @@ public void run(Class<? extends InputFormat> inputFormatClass,
if (getLog4jConfigure(job)) {
DOMConfigurator.configure("log4j.xml");
}
- FileSystem fs = FileSystem.get(job.getConfiguration());
-
log.info("Dir Destination set to: " + EtlMultiOutputFormat.getDestinationPath(job));
Path execBasePath = new Path(props.getProperty(ETL_EXECUTION_BASE_PATH));
Path execHistory = new Path(props.getProperty(ETL_EXECUTION_HISTORY_PATH));
+ FileSystem fs = FileSystem.get(execBasePath.toUri(), job.getConfiguration());
+
if (!fs.exists(execBasePath)) {
log.info("The execution base path does not exist. Creating the directory");
fs.mkdirs(execBasePath);
View 4  camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlInputFormat.java
@@ -483,8 +483,8 @@ private boolean createMessageDecoder(JobContext context, String topic) {
}
private void writePrevious(Collection<EtlKey> missedKeys, JobContext context) throws IOException {
- FileSystem fs = FileSystem.get(context.getConfiguration());
Path output = FileOutputFormat.getOutputPath(context);
+ FileSystem fs = FileSystem.get(output.toUri(), context.getConfiguration());
if (fs.exists(output)) {
fs.mkdirs(output);
@@ -502,8 +502,8 @@ private void writePrevious(Collection<EtlKey> missedKeys, JobContext context) th
}
protected void writeRequests(List<CamusRequest> requests, JobContext context) throws IOException {
- FileSystem fs = FileSystem.get(context.getConfiguration());
Path output = FileOutputFormat.getOutputPath(context);
+ FileSystem fs = FileSystem.get(output.toUri(), context.getConfiguration());
if (fs.exists(output)) {
fs.mkdirs(output);
View 25  ...-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputCommitter.java
@@ -12,6 +12,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
@@ -89,13 +90,14 @@ public EtlMultiOutputCommitter(Path outputPath, TaskAttemptContext context, Logg
public void commitTask(TaskAttemptContext context) throws IOException {
ArrayList<Map<String, Object>> allCountObject = new ArrayList<Map<String, Object>>();
- FileSystem fs = FileSystem.get(context.getConfiguration());
+ Path workPath = super.getWorkPath();
+ Path baseOutDir = EtlMultiOutputFormat.getDestinationPath(context);
+ FileSystem workFs = FileSystem.get(workPath.toUri(), context.getConfiguration());
+ FileSystem destFs = FileSystem.get(baseOutDir.toUri(), context.getConfiguration());
if (EtlMultiOutputFormat.isRunMoveData(context)) {
- Path workPath = super.getWorkPath();
log.info("work path: " + workPath);
- Path baseOutDir = EtlMultiOutputFormat.getDestinationPath(context);
log.info("Destination base path: " + baseOutDir);
- for (FileStatus f : fs.listStatus(workPath)) {
+ for (FileStatus f : workFs.listStatus(workPath)) {
String file = f.getPath().getName();
log.info("work file: " + file);
if (file.startsWith("data")) {
@@ -107,16 +109,15 @@ public void commitTask(TaskAttemptContext context) throws IOException {
getPartitionedPath(context, file, count.getEventCount(), count.getLastKey().getOffset());
Path dest = new Path(baseOutDir, partitionedFile);
-
- if (!fs.exists(dest.getParent())) {
- mkdirs(fs, dest.getParent());
+ if (!destFs.exists(dest.getParent())) {
+ mkdirs(destFs, dest.getParent());
}
commitFile(context, f.getPath(), dest);
log.info("Moved file from: " + f.getPath() + " to: " + dest);
if (EtlMultiOutputFormat.isRunTrackingPost(context)) {
- count.writeCountsToMap(allCountObject, fs, new Path(workPath, EtlMultiOutputFormat.COUNTS_PREFIX + "."
+ count.writeCountsToMap(allCountObject, workFs, new Path(workPath, EtlMultiOutputFormat.COUNTS_PREFIX + "."
+ dest.getName().replace(recordWriterProvider.getFilenameExtension(), "")));
}
}
@@ -124,7 +125,7 @@ public void commitTask(TaskAttemptContext context) throws IOException {
if (EtlMultiOutputFormat.isRunTrackingPost(context)) {
Path tempPath = new Path(workPath, "counts." + context.getConfiguration().get("mapred.task.id"));
- OutputStream outputStream = new BufferedOutputStream(fs.create(tempPath));
+ OutputStream outputStream = new BufferedOutputStream(workFs.create(tempPath));
ObjectMapper mapper = new ObjectMapper();
log.info("Writing counts to : " + tempPath.toString());
long time = System.currentTimeMillis();
@@ -137,7 +138,7 @@ public void commitTask(TaskAttemptContext context) throws IOException {
SequenceFile.Writer offsetWriter =
SequenceFile.createWriter(
- fs,
+ workFs,
context.getConfiguration(),
new Path(super.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(context,
EtlMultiOutputFormat.OFFSET_PREFIX, "")), EtlKey.class, NullWritable.class);
@@ -149,7 +150,9 @@ public void commitTask(TaskAttemptContext context) throws IOException {
}
protected void commitFile(JobContext job, Path source, Path target) throws IOException {
- FileSystem.get(job.getConfiguration()).rename(source, target);
+ FileSystem sourceFs = FileSystem.get(source.toUri(), job.getConfiguration());
+ FileSystem targetFs = FileSystem.get(target.toUri(), job.getConfiguration());
+ FileUtil.copy(sourceFs, source, targetFs, target, true, true, job.getConfiguration());
}
public String getPartitionedPath(JobContext context, String file, int count, long offset) throws IOException {
View 8  ...l-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputRecordWriter.java
@@ -41,12 +41,12 @@ public EtlMultiOutputRecordWriter(TaskAttemptContext context, EtlMultiOutputComm
InterruptedException {
this.context = context;
this.committer = committer;
+ Path errorPath = new Path(committer.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(context,
+ EtlMultiOutputFormat.ERRORS_PREFIX, ""));
errorWriter =
SequenceFile.createWriter(
- FileSystem.get(context.getConfiguration()),
- context.getConfiguration(),
- new Path(committer.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(context,
- EtlMultiOutputFormat.ERRORS_PREFIX, "")), EtlKey.class, ExceptionWritable.class);
+ FileSystem.get(errorPath.toUri(), context.getConfiguration()),
+ context.getConfiguration(), errorPath, EtlKey.class, ExceptionWritable.class);
if (EtlInputFormat.getKafkaMaxHistoricalDays(context) != -1) {
int maxDays = EtlInputFormat.getKafkaMaxHistoricalDays(context);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment