Skip to content

Instantly share code, notes, and snippets.

@sainnr
Created March 7, 2018 09:17
Show Gist options
  • Save sainnr/31b4055d70e210786c8666862e1b007b to your computer and use it in GitHub Desktop.
Save sainnr/31b4055d70e210786c8666862e1b007b to your computer and use it in GitHub Desktop.
Simple Java reader for Lined JSON format (e.g. OpenSanctions data sets). More details at https://medium.com/@vladimirsalin/reading-lined-json-files-with-java-d0f376671f9d
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Stream;
/**
* Simple streamed reader to go through Lined JSON files, convert each line to POJO entry
* and perform a specified action on every row.
* @author Vladimir Salin @ SwiftDil
*/
public class LineBasedJsonReader {
private static final Logger log = LoggerFactory.getLogger(LineBasedJsonReader.class);
private ObjectMapper objectMapper;
public LineBasedJsonReader(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
/**
* Parses a provided input in a streamed way. Converts each line in it
* (which is supposed to be a JSON) to a specified POJO class
* and performs an action provided as a Java 8 Consumer.
*
* @param stream lined JSON input
* @param entryClass POJO class to convert JSON to
* @param consumer action to perform on each entry
* @return number of rows read
*/
public int parseAsStream(final InputStream stream, final Class entryClass, final Consumer<? super Object> consumer) {
long start = System.currentTimeMillis();
final AtomicInteger total = new AtomicInteger(0);
final AtomicInteger failed = new AtomicInteger(0);
try (Stream<String> lines = new BufferedReader(new InputStreamReader(stream)).lines()) {
lines
.map(line -> {
try {
total.incrementAndGet();
return objectMapper.readerFor(entryClass).readValue(line);
} catch (IOException e) {
log.error("Failed to parse a line {}. Reason: {}", total.get()-1, e.getMessage());
log.debug("Stacktrace: ", e);
failed.incrementAndGet();
return null;
}
})
.filter(Objects::nonNull)
.forEach(consumer);
}
long took = System.currentTimeMillis() - start;
log.info("Parsed {} lines with {} failures. Took {}ms", total.get(), failed.get(), took);
return total.get() - failed.get();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment