Skip to content

Instantly share code, notes, and snippets.

@caprica
Last active October 15, 2021 21:02
Show Gist options
  • Save caprica/73b0200103bea437b45c841014b2ee57 to your computer and use it in GitHub Desktop.
Save caprica/73b0200103bea437b45c841014b2ee57 to your computer and use it in GitHub Desktop.
Some experiments using Reactor to parse XML/JSON from a Flux of DataBuffer instances
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
/**
* Some experiments using Reactor in WebFlux to parse an XML/JSON file given a {@link Flux} of {@link DataBuffer}.
* <p>
* Use for whatever purpose, feel free to feedback better solutions.
* <p>
* Note that the operations are intentionally written out long-hand rather than in a more compact form so it is clearer
* what is going on, and what the various return values types are.
* <p>
* Required dependencies are:
* <ul>
* <li>org.springframework:spring-core:5.2.9.RELEASE for {@link DataBufferUtils} etc</li>
* <li>io.projectreactor:reactor-core:3.3.10.RELEASE for {{@link Flux}}, {@link Mono} etc</li>
* <li>com.fasterxml.jackson.core:jackson-databind:2.11.2 for {@link ObjectMapper} etc</li>
* </ul>
* Yes, I know Jackson has a reactive JSON parser, that is not the point!
*/
public class FluxDataBufferParseExperiments {
private static final String SAMPLE_INPUT_FILE = "input.json";
private static final String SAMPLE_OUTPUT_FILE = "output.json";
/**
* With {@link #naiveTest()} the buffer size is significant with regards to the approach working or not, see the
* comments on that method.
*/
private static final int BUFFER_SIZE = 1024;
/**
* This implementation simply uses {@link DataBufferUtils} to write the data buffers to standard output.
* <p>
* Not particularly useful, but you have to start somewhere.
*/
private static void basicTest() {
System.out.println("Running basic test...");
Flux<DataBuffer> buffers = DataBufferUtils.write(getBuffers(), System.out);
Mono.when(buffers).block();
System.out.println();
System.out.println("Basic test complete.");
System.out.println();
}
/**
* This implementation will process multiple data buffers in turn, each data buffer will have a maximum size of
* {@link #BUFFER_SIZE}.
* <p>
* This means that when invoking {@link DataBuffer#asInputStream()} you will get an {@link InputStream} for that
* buffer only. So trying to use this input stream as an input to e.g. a JSON or XML parser will fail, as it will
* only see the data from that buffer before failing.
* <p>
* If the entire input fits inside a single buffer, it will work.
*/
private static void naiveTest() {
System.out.println("Running naive test...");
Flux<JsonNode> result = getBuffers().flatMap(dataBuffer -> {
try {
InputStream in = dataBuffer.asInputStream();
ObjectMapper om = new ObjectMapper();
JsonNode jsonNode = om.readTree(in);
return Flux.just(jsonNode);
} catch (Exception e) {
return Flux.error(e);
}
});
try {
Mono.when(result).block();
} catch (Exception e) {
System.out.printf("Expected error parsing buffer: %s%n", e.getMessage());
}
System.out.println("Naive test complete.");
System.out.println();
}
/**
* This implementation uses the Spring WebFlux {@link DataBufferUtils} to join the individual data buffers into a
* single buffer.
* <p>
* Feeding this single data buffer to a JSON/XML parser will then work as expected.
* <p>
* Clearly with the various blocking operations this is not a fully reactive asynchronous approach. It does have the
* advantage of actually working, and being quite simple to understand.
* <p>
* For more confidence that this version is working, delete the sample output file before running each test.
*/
private static void blockingTest() {
System.out.println("Running blocking test...");
Mono<DataBuffer> monoDb = DataBufferUtils.join(getBuffers());
System.out.println("Prepare JSON...");
Mono<JsonNode> result = monoDb.flatMap(dataBuffer -> {
try {
InputStream in = dataBuffer.asInputStream();
ObjectMapper om = new ObjectMapper();
return Mono.just(om.readTree(in));
} catch (Exception e) {
return Mono.error(e);
}
});
System.out.println("Adding consumer for next()");
result = result.doOnNext(jsonNode -> {
System.out.println("Writing JSON...");
try (BufferedWriter w = new BufferedWriter(new FileWriter(SAMPLE_OUTPUT_FILE))) {
w.write(jsonNode.toPrettyString());
w.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
System.out.println("Finished writing JSON.");
});
result.block();
System.out.println("After block.");
System.out.printf("Blocking test complete, check output file: %s%n", SAMPLE_OUTPUT_FILE);
System.out.println();
}
/**
* Get a publisher of a stream of {@link DataBuffer} instances for a local file.
*
* @return stream of data buffers
*/
private static Flux<DataBuffer> getBuffers() {
return DataBufferUtils.read(
Path.of(SAMPLE_INPUT_FILE),
new DefaultDataBufferFactory(),
BUFFER_SIZE,
StandardOpenOption.READ);
}
public static void main(String[] args) throws Exception {
basicTest();
naiveTest();
blockingTest();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment