Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jerrymartins/4059348564c10b57709bf89fc6a60c88 to your computer and use it in GitHub Desktop.
Save jerrymartins/4059348564c10b57709bf89fc6a60c88 to your computer and use it in GitHub Desktop.
Multi-threaded CSV Reader/Writer for Java
// uses Apache commons CSV, IO and Lang
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Stream;
import static java.lang.String.format;
import static java.util.Collections.synchronizedList;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.Spliterator.CONCURRENT;
import static java.util.Spliterators.spliteratorUnknownSize;
import static java.util.concurrent.Executors.callable;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;
import static java.util.stream.StreamSupport.stream;
import static org.apache.commons.lang3.time.StopWatch.createStarted;
public class App {
private static final int CSV_CHUNK_SIZE = 10000;
public static void main(String[] args) throws InterruptedException, IOException {
var outDir = new File("out");
FileUtils.deleteQuietly(outDir);
FileUtils.forceMkdir(outDir);
runMultiThreadCsvWriteTest();
runMultiThreadCsvReadTest();
}
private static void runMultiThreadCsvWriteTest() throws InterruptedException {
var executor = newFixedThreadPool(10);
var files = synchronizedList(new ArrayList<File>());
var latch = new CountDownLatch(10);
var allChunksStopwatch = createStarted();
executor.invokeAll(
range(0, 10)
.mapToObj(i -> callable(() -> {
var filename = format("out/chunk_%d.csv", i);
try {
writeCsvFile(filename, i);
} catch (IOException ex) {
throw new RuntimeException(format("Error writing to file: %s", filename), ex);
}
files.add(new File(filename));
latch.countDown();
}))
.collect(toList())
);
latch.await();
executor.shutdownNow();
allChunksStopwatch.stop();
System.out.println(format("Writing 10 CSV chunks took %dms", allChunksStopwatch.getTime()));
try {
buildFinalCsvFile(files);
} catch (IOException ex) {
throw new RuntimeException("Failed to build file: final_file.csv", ex);
}
}
private static void writeCsvFile(String filename, int number) throws IOException {
System.out.println(format("Writing records to file: %s", filename));
var csvFileStopwatch = createStarted();
try (
var fileWriter = new FileWriter(filename);
var csvFile = new CSVPrinter(fileWriter, CSVFormat.EXCEL)
) {
range(0, CSV_CHUNK_SIZE).forEach(i -> {
try {
csvFile.printRecord("name", number, "address", "number", "age");
} catch (IOException ex) {
throw new RuntimeException(ex);
}
});
}
csvFileStopwatch.stop();
System.out.println(
format(
"Writing %d records to CSV file chunk took %sms",
CSV_CHUNK_SIZE,
csvFileStopwatch.getTime()
)
);
}
private static void buildFinalCsvFile(List<File> files) throws IOException {
System.out.println("Building final file: final_file.csv");
var recordsFile = new File("out/final_file.csv");
var buildStopwatch = createStarted();
try(var fileOutputStream = FileUtils.openOutputStream(recordsFile)) {
for (var file : files) {
try (var fileInputStream = FileUtils.openInputStream(file)) {
IOUtils.copy(fileInputStream, fileOutputStream);
}
}
}
buildStopwatch.stop();
System.out.println(format("Building final file took %sms", buildStopwatch.getTime()));
}
private static void runMultiThreadCsvReadTest() throws InterruptedException, IOException {
var readAllStopwatch = createStarted();
var recordsFile = new File("out/final_file.csv");
var csvChunkFiles = splitRecordsFileIntoChunks(recordsFile);
var executor = newFixedThreadPool(10);
var latch = new CountDownLatch(10);
var recordCounts = executor.invokeAll(
csvChunkFiles.map(f ->
(Callable<Integer>)() -> {
var count = readCsvFile(f);
latch.countDown();
return count;
}).collect(toList())
);
latch.await();
executor.shutdownNow();
var totalRecordsRead = recordCounts.stream()
.mapToInt(rc -> {
try {
return rc.get();
} catch (Throwable ex) {
throw new RuntimeException(ex);
}
}).sum();
readAllStopwatch.stop();
System.out.println(
format(
"Reading %d records from CSV chunk files took %dms",
totalRecordsRead,
readAllStopwatch.getTime()
)
);
}
private static Stream<File> splitRecordsFileIntoChunks(File recordsFile) throws IOException {
var chunkFilesStopwatch = createStarted();
var chunkFiles = new ArrayList<File>();
try (
var fileReader = new FileReader(recordsFile);
var bufferedReader = new BufferedReader(fileReader)
) {
// open initial file
var currentFileRecordCount = 0;
var currentFile = new File(format("out/in_chunk_%d.csv", chunkFiles.size()));
var currentFileWriter = new FileWriter(currentFile);
var currentBufferedWriter = new BufferedWriter(currentFileWriter);
var line = bufferedReader.readLine();
while (nonNull(line)) {
// dump line into chunk file
currentBufferedWriter.write(line);
currentBufferedWriter.newLine();
currentFileRecordCount++;
line = bufferedReader.readLine();
if (nonNull(line) && currentFileRecordCount > 9999) {
// open next file if we are at chunk limit and still reading full file
currentBufferedWriter.close();
currentFileWriter.close();
currentBufferedWriter.close();
currentFileWriter.close();
chunkFiles.add(currentFile);
currentFileRecordCount = 0;
currentFile = new File(format("out/in_chunk_%d.csv", chunkFiles.size()));
currentFileWriter = new FileWriter(currentFile);
currentBufferedWriter = new BufferedWriter(currentFileWriter);
} else if (isNull(line)) {
// ensure we add the last file to chunks list
chunkFiles.add(currentFile);
}
}
}
chunkFilesStopwatch.stop();
System.out.println(
format(
"Splitting file '%s' into '%d' chunks of (at most) 10K records each took %dms",
recordsFile.getName(),
chunkFiles.size(),
chunkFilesStopwatch.getTime()
)
);
return chunkFiles.stream();
}
private static int readCsvFile(File csvFileHandle) {
var readStopwatch = createStarted();
try (
var fileReader = new FileReader(csvFileHandle);
var csvFile = new CSVParser(fileReader, CSVFormat.EXCEL)
) {
var numRecords = stream(
spliteratorUnknownSize(csvFile.iterator(), CONCURRENT), true
).mapToInt(r -> 1).sum();
readStopwatch.stop();
System.out.println(
format(
"Reading %d records from CSV file '%s' took %dms",
numRecords,
csvFileHandle.getName(),
readStopwatch.getTime()
)
);
return numRecords;
} catch (Throwable ex) {
throw new RuntimeException(
format("Error reading from chunk file: %s", csvFileHandle.getName()),
ex
);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment