Created
October 12, 2015 23:59
-
-
Save kevinsu/988d32d4c1939aff1374 to your computer and use it in GitHub Desktop.
Camus s3 hack
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/CamusJob.java | |
@@ -243,13 +243,13 @@ public void run(Class<? extends InputFormat> inputFormatClass, | |
if (getLog4jConfigure(job)) { | |
DOMConfigurator.configure("log4j.xml"); | |
} | |
- FileSystem fs = FileSystem.get(job.getConfiguration()); | |
- | |
log.info("Dir Destination set to: " + EtlMultiOutputFormat.getDestinationPath(job)); | |
Path execBasePath = new Path(props.getProperty(ETL_EXECUTION_BASE_PATH)); | |
Path execHistory = new Path(props.getProperty(ETL_EXECUTION_HISTORY_PATH)); | |
+ FileSystem fs = FileSystem.get(execBasePath.toUri(), job.getConfiguration()); | |
+ | |
if (!fs.exists(execBasePath)) { | |
log.info("The execution base path does not exist. Creating the directory"); | |
fs.mkdirs(execBasePath); | |
View 4 camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlInputFormat.java | |
@@ -483,8 +483,8 @@ private boolean createMessageDecoder(JobContext context, String topic) { | |
} | |
private void writePrevious(Collection<EtlKey> missedKeys, JobContext context) throws IOException { | |
- FileSystem fs = FileSystem.get(context.getConfiguration()); | |
Path output = FileOutputFormat.getOutputPath(context); | |
+ FileSystem fs = FileSystem.get(output.toUri(), context.getConfiguration()); | |
if (fs.exists(output)) { | |
fs.mkdirs(output); | |
@@ -502,8 +502,8 @@ private void writePrevious(Collection<EtlKey> missedKeys, JobContext context) th | |
} | |
protected void writeRequests(List<CamusRequest> requests, JobContext context) throws IOException { | |
- FileSystem fs = FileSystem.get(context.getConfiguration()); | |
Path output = FileOutputFormat.getOutputPath(context); | |
+ FileSystem fs = FileSystem.get(output.toUri(), context.getConfiguration()); | |
if (fs.exists(output)) { | |
fs.mkdirs(output); | |
View 25 ...-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputCommitter.java | |
@@ -12,6 +12,7 @@ | |
import org.apache.hadoop.fs.FileStatus; | |
import org.apache.hadoop.fs.FileSystem; | |
+import org.apache.hadoop.fs.FileUtil; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.SequenceFile; | |
@@ -89,13 +90,14 @@ public EtlMultiOutputCommitter(Path outputPath, TaskAttemptContext context, Logg | |
public void commitTask(TaskAttemptContext context) throws IOException { | |
ArrayList<Map<String, Object>> allCountObject = new ArrayList<Map<String, Object>>(); | |
- FileSystem fs = FileSystem.get(context.getConfiguration()); | |
+ Path workPath = super.getWorkPath(); | |
+ Path baseOutDir = EtlMultiOutputFormat.getDestinationPath(context); | |
+ FileSystem workFs = FileSystem.get(workPath.toUri(), context.getConfiguration()); | |
+ FileSystem destFs = FileSystem.get(baseOutDir.toUri(), context.getConfiguration()); | |
if (EtlMultiOutputFormat.isRunMoveData(context)) { | |
- Path workPath = super.getWorkPath(); | |
log.info("work path: " + workPath); | |
- Path baseOutDir = EtlMultiOutputFormat.getDestinationPath(context); | |
log.info("Destination base path: " + baseOutDir); | |
- for (FileStatus f : fs.listStatus(workPath)) { | |
+ for (FileStatus f : workFs.listStatus(workPath)) { | |
String file = f.getPath().getName(); | |
log.info("work file: " + file); | |
if (file.startsWith("data")) { | |
@@ -107,16 +109,15 @@ public void commitTask(TaskAttemptContext context) throws IOException { | |
getPartitionedPath(context, file, count.getEventCount(), count.getLastKey().getOffset()); | |
Path dest = new Path(baseOutDir, partitionedFile); | |
- | |
- if (!fs.exists(dest.getParent())) { | |
- mkdirs(fs, dest.getParent()); | |
+ if (!destFs.exists(dest.getParent())) { | |
+ mkdirs(destFs, dest.getParent()); | |
} | |
commitFile(context, f.getPath(), dest); | |
log.info("Moved file from: " + f.getPath() + " to: " + dest); | |
if (EtlMultiOutputFormat.isRunTrackingPost(context)) { | |
- count.writeCountsToMap(allCountObject, fs, new Path(workPath, EtlMultiOutputFormat.COUNTS_PREFIX + "." | |
+ count.writeCountsToMap(allCountObject, workFs, new Path(workPath, EtlMultiOutputFormat.COUNTS_PREFIX + "." | |
+ dest.getName().replace(recordWriterProvider.getFilenameExtension(), ""))); | |
} | |
} | |
@@ -124,7 +125,7 @@ public void commitTask(TaskAttemptContext context) throws IOException { | |
if (EtlMultiOutputFormat.isRunTrackingPost(context)) { | |
Path tempPath = new Path(workPath, "counts." + context.getConfiguration().get("mapred.task.id")); | |
- OutputStream outputStream = new BufferedOutputStream(fs.create(tempPath)); | |
+ OutputStream outputStream = new BufferedOutputStream(workFs.create(tempPath)); | |
ObjectMapper mapper = new ObjectMapper(); | |
log.info("Writing counts to : " + tempPath.toString()); | |
long time = System.currentTimeMillis(); | |
@@ -137,7 +138,7 @@ public void commitTask(TaskAttemptContext context) throws IOException { | |
SequenceFile.Writer offsetWriter = | |
SequenceFile.createWriter( | |
- fs, | |
+ workFs, | |
context.getConfiguration(), | |
new Path(super.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(context, | |
EtlMultiOutputFormat.OFFSET_PREFIX, "")), EtlKey.class, NullWritable.class); | |
@@ -149,7 +150,9 @@ public void commitTask(TaskAttemptContext context) throws IOException { | |
} | |
protected void commitFile(JobContext job, Path source, Path target) throws IOException { | |
- FileSystem.get(job.getConfiguration()).rename(source, target); | |
+ FileSystem sourceFs = FileSystem.get(source.toUri(), job.getConfiguration()); | |
+ FileSystem targetFs = FileSystem.get(target.toUri(), job.getConfiguration()); | |
+ FileUtil.copy(sourceFs, source, targetFs, target, true, true, job.getConfiguration()); | |
} | |
public String getPartitionedPath(JobContext context, String file, int count, long offset) throws IOException { | |
View 8 ...l-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputRecordWriter.java | |
@@ -41,12 +41,12 @@ public EtlMultiOutputRecordWriter(TaskAttemptContext context, EtlMultiOutputComm | |
InterruptedException { | |
this.context = context; | |
this.committer = committer; | |
+ Path errorPath = new Path(committer.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(context, | |
+ EtlMultiOutputFormat.ERRORS_PREFIX, "")); | |
errorWriter = | |
SequenceFile.createWriter( | |
- FileSystem.get(context.getConfiguration()), | |
- context.getConfiguration(), | |
- new Path(committer.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(context, | |
- EtlMultiOutputFormat.ERRORS_PREFIX, "")), EtlKey.class, ExceptionWritable.class); | |
+ FileSystem.get(errorPath.toUri(), context.getConfiguration()), | |
+ context.getConfiguration(), errorPath, EtlKey.class, ExceptionWritable.class); | |
if (EtlInputFormat.getKafkaMaxHistoricalDays(context) != -1) { | |
int maxDays = EtlInputFormat.getKafkaMaxHistoricalDays(context); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment