Skip to content

Instantly share code, notes, and snippets.

@Squiry
Created March 7, 2020 12:20
Show Gist options
  • Save Squiry/57e1801fadcdea8294579abd51f033ac to your computer and use it in GitHub Desktop.
Save Squiry/57e1801fadcdea8294579abd51f033ac to your computer and use it in GitHub Desktop.
@Test
void largeObjects() {
Flux<ByteBuffer> dataChunks = Flux.just(
US_ASCII.encode("1"),
US_ASCII.encode("2"),
US_ASCII.encode("3"),
US_ASCII.encode("4")
);
Function<PostgresqlConnection, Flux<ByteBuffer>> process = c -> c.createStatement("SELECT lo_creat(-1)").execute()
.flatMap(result -> result.map((row, meta) -> row.get(0, Long.class)))
.flatMap(oid -> c.createStatement("SELECT lo_open($1, 131072)")
.bind(0, oid)
.execute()
.flatMap(r -> r.map((row, meta) -> row.get(0, Integer.class)))
.single()
.map(fd -> Tuples.of(oid, fd))
)
.flatMap(oidAndFd -> {
long oid = oidAndFd.getT1();
int fd = oidAndFd.getT2();
Mono<Void> write = dataChunks.concatMap(buf -> c.createStatement("SELECT lowrite($1, $2)")
.bind(0, fd)
.bind(1, buf)
.execute()
.flatMap(PostgresqlResult::getRowsUpdated))
.then();
Mono<Void> closeFd = c.createStatement("SELECT lo_close($1)")
.bind(0, fd)
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.then();
Mono<Void> put = c.createStatement("SELECT lo_put($1, $2, $3)")
.bind(0, oid)
.bind(1, 4)
.bind(2, US_ASCII.encode("5"))
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.then();
Mono<Void> prepare = write.then(closeFd).then(put).then();
Mono<ByteBuffer> getAll = c.createStatement("SELECT lo_get($1)")
.bind(0, oid)
.execute()
.flatMap(result -> result.map((row, meta) -> row.get(0, ByteBuffer.class)))
.single();
Mono<ByteBuffer> getFrom = c.createStatement("SELECT lo_get($1, $2::bigint, $3)")
.bind(0, oid)
.bind(1, 3)
.bind(2, Integer.MAX_VALUE)
.execute()
.flatMap(result -> result.map((row, meta) -> row.get(0, ByteBuffer.class)))
.single();
Mono<ByteBuffer> getFromFor = c.createStatement("SELECT lo_get($1, $2::bigint, $3)")
.bind(0, oid)
.bind(1, 0)
.bind(2, 3)
.execute()
.flatMap(result -> result.map((row, meta) -> row.get(0, ByteBuffer.class)))
.single();
return prepare.then(getAll).concatWith(getFrom).concatWith(getFromFor);
});
this.connectionFactory.create()
.flatMapMany(c -> c.beginTransaction()
.thenMany(process.apply(c))
.concatWith(c.commitTransaction().then(Mono.empty()))
.concatWith(c.close().then(Mono.empty()))
)
.as(StepVerifier::create)
.expectNext(US_ASCII.encode("12345"))
.expectNext(US_ASCII.encode("45"))
.expectNext(US_ASCII.encode("123"))
.verifyComplete();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment