Last active
March 26, 2016 15:04
-
-
Save nkcoder/4e5ef554b0f97c660a3e to your computer and use it in GitHub Desktop.
how to process a large number of small files in limited memory?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.*; | |
import java.nio.file.*; | |
import java.util.ArrayList; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.*; | |
import java.util.stream.Stream; | |
/** | |
* Created by nkcoder on 3/24/16. | |
*/ | |
public class ProcessLogFiles { | |
/** | |
* process logs with specified threads | |
* @param logPath the path representing the logs dir | |
* @param threadNum user specified thread number, default to Integer.MAX_VALUE | |
*/ | |
private static void process(final Path logPath, int threadNum) { | |
if (logPath == null || !logPath.toFile().isDirectory() || threadNum <= 0) { | |
throw new IllegalArgumentException("invalid arguments!"); | |
} | |
// fileName -> start line number | |
ConcurrentSkipListMap<String, Long> fileNameToStartLineNum = new ConcurrentSkipListMap<>(); | |
threadNum = Integer.min(threadNum, Runtime.getRuntime().availableProcessors() * 5); | |
System.out.println("threadNum: " + threadNum); | |
ExecutorService executorService = new ThreadPoolExecutor(0, threadNum, 60, TimeUnit.SECONDS, | |
new LinkedBlockingQueue<>()); | |
// 1. map file name to the number of lines of the file | |
long timePoint1 = System.nanoTime(); | |
File[] allFiles = logPath.toFile().listFiles(); | |
Stream.of(allFiles).parallel().filter(rawFile -> | |
rawFile.isFile() && rawFile.getName().matches("logtest.[0-9]{4}-[0-9]{2}-[0-9]{2}.log")) | |
.parallel().forEach(filteredFile -> { | |
try { | |
fileNameToStartLineNum.putIfAbsent(filteredFile.getName(), | |
Files.lines(filteredFile.toPath()).parallel().count()); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
}); | |
long timePoint2 = System.nanoTime(); | |
System.out.println("map file name to line count: " + (timePoint2 - timePoint1)/1000000 + "ms"); | |
// 2. map file name to start line number of the file | |
long lastLineCount = 0; | |
long lastIndex = 1; | |
Iterator<Map.Entry<String, Long>> entryIter = fileNameToStartLineNum.entrySet().iterator(); | |
while (entryIter.hasNext()) { | |
Map.Entry<String, Long> entry = entryIter.next(); | |
long currentLineCount = entry.getValue(); | |
fileNameToStartLineNum.replace(entry.getKey(), lastIndex + lastLineCount); | |
lastIndex += lastLineCount; | |
lastLineCount = currentLineCount; | |
} | |
long timePoint3 = System.nanoTime(); | |
System.out.println("map file name to start line number: " + (timePoint3 - timePoint2)/1000000 + "ms"); | |
// 3. prepend line number to each line of each file | |
List<Future<?>> futureList = new ArrayList<>(); | |
entryIter = fileNameToStartLineNum.entrySet().iterator(); | |
Path logBackupDir = Paths.get(logPath.toString() + File.separator + "backup"); | |
if (!logBackupDir.toFile().exists()) { | |
logBackupDir.toFile().mkdirs(); // create backup dir if not exists | |
} | |
while (entryIter.hasNext()) { | |
Map.Entry<String, Long> entry = entryIter.next(); | |
Future<?> future = executorService.submit(() -> { | |
String fileFullPath = logPath.toString() + File.separator + entry.getKey(); | |
try (BufferedReader br = new BufferedReader(new FileReader(fileFullPath)); | |
BufferedWriter bw = new BufferedWriter(new FileWriter(fileFullPath + ".txt"))) { | |
String line; | |
long startIndex = entry.getValue(); | |
while ((line = br.readLine()) != null) { | |
bw.write(startIndex++ + ". " + line + "\n"); | |
} | |
bw.flush(); | |
// move the original file to backup dir | |
try { | |
Path filePath = Paths.get(fileFullPath); | |
Files.move(filePath, logBackupDir.resolve(filePath.getFileName()), | |
StandardCopyOption.REPLACE_EXISTING); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
}); | |
futureList.add(future); | |
} | |
// wait for operations to finish | |
futureList.stream().forEach(future -> { | |
try { | |
future.get(); | |
} catch (InterruptedException | ExecutionException e) { | |
throw new RuntimeException(e); | |
} | |
}); | |
long timePoint4 = System.nanoTime(); | |
System.out.println("prepend line number: " + (timePoint4 - timePoint3)/1000000 + "ms"); | |
// 4. rename back files | |
allFiles = logPath.toFile().listFiles(); | |
Stream.of(allFiles).parallel().filter(rawFile -> | |
rawFile.isFile() && rawFile.getName().matches("logtest.[0-9]{4}-[0-9]{2}-[0-9]{2}.log.*") && | |
rawFile.getName().endsWith(".txt") | |
).parallel().forEach(filteredFile -> { | |
try { | |
Files.move(filteredFile.toPath(), | |
filteredFile.toPath().resolve(filteredFile.getAbsolutePath().replace(".txt", "")), | |
StandardCopyOption.REPLACE_EXISTING); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
}); | |
executorService.shutdown(); | |
long timePoint5 = System.nanoTime(); | |
System.out.println("rename back files: " + (timePoint5 - timePoint4)/1000000 + "ms"); | |
} | |
public static void main(String[] args) { | |
int threadNum = Integer.MAX_VALUE; | |
String logDir = "logs_dir"; | |
// 1. parse args | |
String usageStr = "usage: java ProcessLogFiles -d test_logs -t 10\n" + | |
"options: -d and -t are optional."; | |
if (args.length > 0 && args.length != 2 && args.length != 4) { | |
System.out.println(usageStr); | |
return; | |
} | |
for (int i = 0; i < args.length; i+=2) { | |
if (!args[i].equals("-d") && !args[i].equals("-t")) { | |
System.out.println(usageStr); | |
return; | |
} | |
if (args[i].equals("-d") && !args[i+1].isEmpty()) { | |
logDir = args[i+1]; | |
} else if (args[i].equals("-t") && !args[i+1].isEmpty()) { | |
if (0 < Integer.valueOf(args[i+1]) && Integer.valueOf(args[i+1]) <= Integer.MAX_VALUE) { | |
threadNum = Integer.valueOf(args[i+1]); | |
} | |
} | |
} | |
// 2. validate log dir | |
Path logPath = Paths.get(logDir); | |
if (!logPath.isAbsolute()) { | |
logPath = Paths.get(System.getProperty("user.dir") + File.separator + logDir); | |
} | |
if (!logPath.toFile().exists()) { | |
System.out.println(logDir + " not exists, check your path!"); | |
return; | |
} | |
// 3. process logs | |
long start = System.nanoTime(); | |
process(logPath, threadNum); | |
long end = System.nanoTime(); | |
System.out.println("total time: " + (end - start)/1000000 + "ms"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment