Skip to content

Instantly share code, notes, and snippets.

@nkcoder
Last active March 26, 2016 15:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nkcoder/4e5ef554b0f97c660a3e to your computer and use it in GitHub Desktop.
Save nkcoder/4e5ef554b0f97c660a3e to your computer and use it in GitHub Desktop.
how to process a large number of small files in limited memory?
import java.io.*;
import java.nio.file.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Stream;
/**
* Created by nkcoder on 3/24/16.
*/
public class ProcessLogFiles {
/**
* process logs with specified threads
* @param logPath the path representing the logs dir
* @param threadNum user specified thread number, default to Integer.MAX_VALUE
*/
private static void process(final Path logPath, int threadNum) {
if (logPath == null || !logPath.toFile().isDirectory() || threadNum <= 0) {
throw new IllegalArgumentException("invalid arguments!");
}
// fileName -> start line number
ConcurrentSkipListMap<String, Long> fileNameToStartLineNum = new ConcurrentSkipListMap<>();
threadNum = Integer.min(threadNum, Runtime.getRuntime().availableProcessors() * 5);
System.out.println("threadNum: " + threadNum);
ExecutorService executorService = new ThreadPoolExecutor(0, threadNum, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
// 1. map file name to the number of lines of the file
long timePoint1 = System.nanoTime();
File[] allFiles = logPath.toFile().listFiles();
Stream.of(allFiles).parallel().filter(rawFile ->
rawFile.isFile() && rawFile.getName().matches("logtest.[0-9]{4}-[0-9]{2}-[0-9]{2}.log"))
.parallel().forEach(filteredFile -> {
try {
fileNameToStartLineNum.putIfAbsent(filteredFile.getName(),
Files.lines(filteredFile.toPath()).parallel().count());
} catch (IOException e) {
throw new RuntimeException(e);
}
});
long timePoint2 = System.nanoTime();
System.out.println("map file name to line count: " + (timePoint2 - timePoint1)/1000000 + "ms");
// 2. map file name to start line number of the file
long lastLineCount = 0;
long lastIndex = 1;
Iterator<Map.Entry<String, Long>> entryIter = fileNameToStartLineNum.entrySet().iterator();
while (entryIter.hasNext()) {
Map.Entry<String, Long> entry = entryIter.next();
long currentLineCount = entry.getValue();
fileNameToStartLineNum.replace(entry.getKey(), lastIndex + lastLineCount);
lastIndex += lastLineCount;
lastLineCount = currentLineCount;
}
long timePoint3 = System.nanoTime();
System.out.println("map file name to start line number: " + (timePoint3 - timePoint2)/1000000 + "ms");
// 3. prepend line number to each line of each file
List<Future<?>> futureList = new ArrayList<>();
entryIter = fileNameToStartLineNum.entrySet().iterator();
Path logBackupDir = Paths.get(logPath.toString() + File.separator + "backup");
if (!logBackupDir.toFile().exists()) {
logBackupDir.toFile().mkdirs(); // create backup dir if not exists
}
while (entryIter.hasNext()) {
Map.Entry<String, Long> entry = entryIter.next();
Future<?> future = executorService.submit(() -> {
String fileFullPath = logPath.toString() + File.separator + entry.getKey();
try (BufferedReader br = new BufferedReader(new FileReader(fileFullPath));
BufferedWriter bw = new BufferedWriter(new FileWriter(fileFullPath + ".txt"))) {
String line;
long startIndex = entry.getValue();
while ((line = br.readLine()) != null) {
bw.write(startIndex++ + ". " + line + "\n");
}
bw.flush();
// move the original file to backup dir
try {
Path filePath = Paths.get(fileFullPath);
Files.move(filePath, logBackupDir.resolve(filePath.getFileName()),
StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
throw new RuntimeException(e);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});
futureList.add(future);
}
// wait for operations to finish
futureList.stream().forEach(future -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
long timePoint4 = System.nanoTime();
System.out.println("prepend line number: " + (timePoint4 - timePoint3)/1000000 + "ms");
// 4. rename back files
allFiles = logPath.toFile().listFiles();
Stream.of(allFiles).parallel().filter(rawFile ->
rawFile.isFile() && rawFile.getName().matches("logtest.[0-9]{4}-[0-9]{2}-[0-9]{2}.log.*") &&
rawFile.getName().endsWith(".txt")
).parallel().forEach(filteredFile -> {
try {
Files.move(filteredFile.toPath(),
filteredFile.toPath().resolve(filteredFile.getAbsolutePath().replace(".txt", "")),
StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
executorService.shutdown();
long timePoint5 = System.nanoTime();
System.out.println("rename back files: " + (timePoint5 - timePoint4)/1000000 + "ms");
}
public static void main(String[] args) {
int threadNum = Integer.MAX_VALUE;
String logDir = "logs_dir";
// 1. parse args
String usageStr = "usage: java ProcessLogFiles -d test_logs -t 10\n" +
"options: -d and -t are optional.";
if (args.length > 0 && args.length != 2 && args.length != 4) {
System.out.println(usageStr);
return;
}
for (int i = 0; i < args.length; i+=2) {
if (!args[i].equals("-d") && !args[i].equals("-t")) {
System.out.println(usageStr);
return;
}
if (args[i].equals("-d") && !args[i+1].isEmpty()) {
logDir = args[i+1];
} else if (args[i].equals("-t") && !args[i+1].isEmpty()) {
if (0 < Integer.valueOf(args[i+1]) && Integer.valueOf(args[i+1]) <= Integer.MAX_VALUE) {
threadNum = Integer.valueOf(args[i+1]);
}
}
}
// 2. validate log dir
Path logPath = Paths.get(logDir);
if (!logPath.isAbsolute()) {
logPath = Paths.get(System.getProperty("user.dir") + File.separator + logDir);
}
if (!logPath.toFile().exists()) {
System.out.println(logDir + " not exists, check your path!");
return;
}
// 3. process logs
long start = System.nanoTime();
process(logPath, threadNum);
long end = System.nanoTime();
System.out.println("total time: " + (end - start)/1000000 + "ms");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment