Instantly share code, notes, and snippets.

Embed
What would you like to do?
/*
* Copyright 2018 Shinya Mochida
*
* Licensed under the Apache License,Version2.0(the"License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,software
* Distributed under the License is distributed on an"AS IS"BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example;
import com.mongodb.MongoClientSettings;
import com.mongodb.async.client.MongoClient;
import com.mongodb.async.client.MongoClients;
import com.mongodb.async.client.MongoCollection;
import com.mongodb.async.client.MongoDatabase;
import com.mongodb.event.CommandFailedEvent;
import com.mongodb.event.CommandListener;
import com.mongodb.event.CommandStartedEvent;
import com.mongodb.event.CommandSucceededEvent;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class App {
private static final Logger logger = LoggerFactory.getLogger(App.class);
public static void main(String[] args) throws InterruptedException {
final MongoClientSettings settings = MongoClientSettings.builder()
.addCommandListener(new CommandListener() {
@Override
public void commandStarted(CommandStartedEvent event) {
logger.info("command: db = {}, command = {}", event.getDatabaseName(), event.getCommandName());
}
@Override
public void commandSucceeded(CommandSucceededEvent event) {
logger.info("command succeed: request = {}, command = {}", event.getRequestId(), event.getCommandName());
}
@Override
public void commandFailed(CommandFailedEvent event) {
logger.info("command failed: request = {}, command = {}", event.getRequestId(), event.getCommandName());
logger.error("detail", event.getThrowable());
}
})
.applicationName("sample-app")
.applyToConnectionPoolSettings(builder -> builder.maxSize(1).minSize(1))
.build();
final MongoClient client = MongoClients.create(settings);
final MongoDatabase database = client.getDatabase("sample");
final MongoCollection<Document> collection = database.getCollection("test");
final Document firstDocument = new Document("id", UUID.randomUUID())
.append("name", "test user")
.append("created", LocalDateTime.now());
logger.info("document to be saved: {}", firstDocument);
final Mono<Document> firstMono = Mono.create(sink ->
collection.insertOne(firstDocument, (result, t) -> {
if (t == null) {
logger.info("inserted: {}", firstDocument);
sink.success(firstDocument);
} else {
logger.error("error", t);
sink.error(t);
}
}));
final Mono<List<Document>> secondMono = create100Users(collection, firstMono);
final Mono<Long> thirdMono = secondMono.then(Mono.create(sink ->
collection.countDocuments((count, t) -> {
if (t == null) {
logger.info("collection has {} items.", count);
sink.success(count);
} else {
logger.error("error", t);
sink.error(t);
}
})));
final Mono<List<Document>> fourthMono = create100Users(collection, thirdMono);
final Mono<List<Document>> fifthMono = create100Users(collection, fourthMono);
final Mono<List<Document>> sixthMono = create100Users(collection, fifthMono);
final Mono<Document> seventhMono = sixthMono.then(Mono.create(sink ->
collection.find().first((doc, t) -> {
if (t == null) {
logger.info("found document: {}", doc);
sink.success(doc);
} else {
logger.error("error", t);
sink.error(t);
}
})));
final CountDownLatch latch = new CountDownLatch(1);
seventhMono.doOnTerminate(() -> {
latch.countDown();
client.close();
})
.subscribe(doc -> logger.info("first document: {}", doc));
latch.await();
}
private static Mono<List<Document>> create100Users(MongoCollection<Document> collection, Mono<?> mono) {
return mono.then(Mono.create(sink -> {
final List<Document> documents = IntStream.range(1, 100)
.mapToObj(index -> String.format("user-%d", index))
.map(name -> new Document("id", UUID.randomUUID()).append("name", name).append("created", LocalDateTime.now()))
.collect(Collectors.toList());
final int size = documents.size();
logger.info("documents to be saved: {}..{}", documents.get(0), documents.get(size - 1));
collection.insertMany(documents, (v, t) -> {
if (t == null) {
logger.info("inserted: {}..{}", documents.get(0), documents.get(size - 1));
sink.success(documents);
} else {
logger.error("error", t);
sink.error(t);
}
});
}));
}
}
[main] INFO org.mongodb.driver.cluster - Cluster created with settings {hosts=[127.0.0.1:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
[main] INFO com.example.App - document to be saved: Document{{id=f7b4250e-91b1-47a2-80f7-5578fbf08a9e, name=test user, created=2018-07-04T01:24:27.990639}}
[MaintenanceTimer-1-thread-1] INFO org.mongodb.driver.connection - Opened connection [connectionId{localValue:1, serverValue:9}] to 127.0.0.1:27017
[cluster-ClusterId{value='5b3ba33ba2cb5776826f4ec2', description='null'}-127.0.0.1:27017] INFO org.mongodb.driver.connection - Opened connection [connectionId{localValue:2, serverValue:10}] to 127.0.0.1:27017
[cluster-ClusterId{value='5b3ba33ba2cb5776826f4ec2', description='null'}-127.0.0.1:27017] INFO org.mongodb.driver.cluster - Monitor thread successfully connected to server with description ServerDescription{address=127.0.0.1:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 0, 0]}, minWireVersion=0, maxWireVersion=7, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=17914260}
[main] INFO com.example.App - command: db = sample, command = insert
[Thread-6] INFO com.example.App - command succeed: request = 8, command = insert
[Thread-6] INFO com.example.App - inserted: Document{{id=f7b4250e-91b1-47a2-80f7-5578fbf08a9e, name=test user, created=2018-07-04T01:24:27.990639, _id=5b3ba33ca2cb5776826f4ec3}}
[Thread-6] INFO com.example.App - documents to be saved: Document{{id=7d7eecff-7815-422b-8fac-22dc39b5fd5f, name=user-1, created=2018-07-04T01:24:28.156270}}..Document{{id=70855325-b54f-493a-83cc-05e9bfa4c38b, name=user-99, created=2018-07-04T01:24:28.165205}}
[Thread-6] INFO com.example.App - command: db = sample, command = insert
[Thread-3] INFO com.example.App - command succeed: request = 9, command = insert
[Thread-3] INFO com.example.App - inserted: Document{{id=7d7eecff-7815-422b-8fac-22dc39b5fd5f, name=user-1, created=2018-07-04T01:24:28.156270, _id=5b3ba33ca2cb5776826f4ec4}}..Document{{id=70855325-b54f-493a-83cc-05e9bfa4c38b, name=user-99, created=2018-07-04T01:24:28.165205, _id=5b3ba33ca2cb5776826f4f26}}
[Thread-3] INFO com.example.App - command: db = sample, command = aggregate
[Thread-2] INFO com.example.App - command succeed: request = 10, command = aggregate
[Thread-2] INFO com.example.App - collection has 698 items.
[Thread-2] INFO com.example.App - documents to be saved: Document{{id=aa71121b-786e-4f20-a809-818ae917a049, name=user-1, created=2018-07-04T01:24:28.204534}}..Document{{id=99b6743b-01bf-4d38-886e-ba6304e24137, name=user-99, created=2018-07-04T01:24:28.213854}}
[Thread-2] INFO com.example.App - command: db = sample, command = insert
[Thread-6] INFO com.example.App - command succeed: request = 11, command = insert
[Thread-6] INFO com.example.App - inserted: Document{{id=aa71121b-786e-4f20-a809-818ae917a049, name=user-1, created=2018-07-04T01:24:28.204534, _id=5b3ba33ca2cb5776826f4f27}}..Document{{id=99b6743b-01bf-4d38-886e-ba6304e24137, name=user-99, created=2018-07-04T01:24:28.213854, _id=5b3ba33ca2cb5776826f4f89}}
[Thread-6] INFO com.example.App - documents to be saved: Document{{id=8981ffb8-c490-45ae-b3f2-0218e9185ba8, name=user-1, created=2018-07-04T01:24:28.223607}}..Document{{id=87e30b04-7537-4e39-8584-b7ab4840953f, name=user-99, created=2018-07-04T01:24:28.230634}}
[Thread-6] INFO com.example.App - command: db = sample, command = insert
[Thread-1] INFO com.example.App - command succeed: request = 12, command = insert
[Thread-1] INFO com.example.App - inserted: Document{{id=8981ffb8-c490-45ae-b3f2-0218e9185ba8, name=user-1, created=2018-07-04T01:24:28.223607, _id=5b3ba33ca2cb5776826f4f8a}}..Document{{id=87e30b04-7537-4e39-8584-b7ab4840953f, name=user-99, created=2018-07-04T01:24:28.230634, _id=5b3ba33ca2cb5776826f4fec}}
[Thread-1] INFO com.example.App - documents to be saved: Document{{id=79584dd7-fca3-4ad7-829c-ab1c8ba36f03, name=user-1, created=2018-07-04T01:24:28.241548}}..Document{{id=221166eb-f74c-407d-960d-35a93b4efbbd, name=user-99, created=2018-07-04T01:24:28.248435}}
[Thread-1] INFO com.example.App - command: db = sample, command = insert
[Thread-4] INFO com.example.App - command succeed: request = 13, command = insert
[Thread-4] INFO com.example.App - inserted: Document{{id=79584dd7-fca3-4ad7-829c-ab1c8ba36f03, name=user-1, created=2018-07-04T01:24:28.241548, _id=5b3ba33ca2cb5776826f4fed}}..Document{{id=221166eb-f74c-407d-960d-35a93b4efbbd, name=user-99, created=2018-07-04T01:24:28.248435, _id=5b3ba33ca2cb5776826f504f}}
[Thread-4] INFO com.example.App - command: db = sample, command = find
[Thread-3] INFO com.example.App - command succeed: request = 14, command = find
[Thread-3] INFO com.example.App - found document: Document{{_id=5b3b98d4a2cb57761f409353, id=7c191c30-f0f0-41aa-8660-115292826333, name=test user, created=Wed Jul 04 09:40:04 JST 2018}}
[Thread-3] INFO com.example.App - first document: Document{{_id=5b3b98d4a2cb57761f409353, id=7c191c30-f0f0-41aa-8660-115292826333, name=test user, created=Wed Jul 04 09:40:04 JST 2018}}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment