Skip to content

Instantly share code, notes, and snippets.

@marantz
Created June 29, 2021 00:32
Show Gist options
  • Save marantz/fbc1caea5a449e44b70083a597732bb2 to your computer and use it in GitHub Desktop.
Save marantz/fbc1caea5a449e44b70083a597732bb2 to your computer and use it in GitHub Desktop.
MongoDB Change Streams Java Spring Boot Application Sample
spring:
profiles:
active: develop
---
spring:
config:
activated:
on-profile: develop
data:
mongodb:
#uri: "mongodb://admin:admin@localhost:27017,localhost:27018,localhost:27019/test?authSource=admin&replicaSet=replset"
uri: "mongodb://localhost:27017/test"
database: test
---
logging:
level:
root: error
sun.rmi: error
org.springframework: info
org.project: debug
com.zaxxer.hikari: off
pattern:
console: "%d{HH:mm:ss.SSS} - %msg%n"
package org.project;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.*;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.result.UpdateResult;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration;
import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import java.util.Arrays;
import java.util.List;
import static com.mongodb.client.model.Aggregates.match;
import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.or;
import static com.sun.tools.doclint.Entity.or;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
@SpringBootApplication
@EnableAutoConfiguration(exclude = {
MongoAutoConfiguration.class,
MongoDataAutoConfiguration.class,
MongoRepositoriesAutoConfiguration.class,
DataSourceAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class,
HibernateJpaAutoConfiguration.class
})
public class MaintenanceSyncApplication {
private static final Logger logger = LoggerFactory.getLogger(MaintenanceSyncApplication.class);
@Autowired
private Environment environment;
private static MongoClient mongoClient;
private static String PROFILE; // devel, prod
private static String WATCH_MODE; // ARGS 1 - 't' : Token, 's' : TimeStamp
private static BsonDocument getToken(String nameSpace) {
MongoDatabase mongoDatabase = mongoClient.getDatabase("resume");
MongoCollection<Document> collection = mongoDatabase.getCollection("token");
BsonDocument resumeToken = null;
Document resumeTokenDoc = null;
try {
resumeTokenDoc =
collection.find(new Document("_id",nameSpace))
.first()
.get("resumeToken",Document.class);
if( resumeTokenDoc != null ) {
//logger.debug("resumeTokenDoc - {}",resumeTokenDoc.toString());
resumeToken = BsonDocument.parse(resumeTokenDoc.toJson());
//logger.debug("resumeToken - {}",resumeToken.toString());
}
} catch (Exception e) {
logger.error("getToken Error - {}",e.toString());
logger.info("Change Stream Start without Token!!");
}
return resumeToken;
}
private static void watchCollectionTokens(){
MongoCursor<ChangeStreamDocument<Document>> cursor = null;
BsonDocument resumeToken = getToken("resume.token");
if( resumeToken == null ) {
cursor = mongoClient.getDatabase("source")
.getCollection("coll")
.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
.iterator();
} else {
cursor = mongoClient.getDatabase("source")
.getCollection("coll")
.watch()
.startAfter(resumeToken)
.fullDocument(FullDocument.UPDATE_LOOKUP)
.iterator();
}
while (cursor.hasNext()) {
ChangeStreamDocument<Document> doc = cursor.next();
logger.debug("ChangeStreamDocument : {}", doc.toString());
changeStreamDataHandler( doc );
}
}
private static void saveToken(String nameSpace, BsonDocument token) {
MongoDatabase mongoDatabase = mongoClient.getDatabase("resume");
MongoCollection<Document> collection = mongoDatabase.getCollection("token");
UpdateOptions updateOptions = new UpdateOptions().upsert(true);
UpdateResult updateResult = collection.updateOne(
new Document("_id",nameSpace),
new Document("$set",new Document("resumeToken",token)),
updateOptions
);
}
private static void changeStreamDataHandler( ChangeStreamDocument<Document> streamData) {
MongoDatabase mongoDatabase = mongoClient.getDatabase("target");
MongoCollection<Document> collection = mongoDatabase.getCollection("coll");
try {
//--------------------------------------------------------------------------------------------------------------
switch (streamData.getOperationType()) {
case INSERT:
case UPDATE:
case DELETE:
case REPLACE:
collection.insertOne(streamData.getFullDocument());
break;
}
} catch (Exception e) {
logger.error(" changeStreamDataHandler Error - {}", e.toString());
} finally {
saveToken(streamData.getNamespace().toString(),streamData.getResumeToken());
}
}
private static BsonTimestamp getTime(String nameSpace) {
MongoDatabase mongoDatabase = mongoClient.getDatabase("test");
MongoCollection<Document> collection = mongoDatabase.getCollection("time");
BsonTimestamp clusterTime = null;
try {
clusterTime = collection.find(new Document("_id",nameSpace))
.first()
.get("clusterTime",BsonTimestamp.class)
;
} catch (Exception e) {
logger.info("Change Stream Start with New Time Stamp!!");
}
return clusterTime;
}
private static void saveTime(String nameSpace, BsonTimestamp clusterTime) {
MongoDatabase mongoDatabase = mongoClient.getDatabase("test");
MongoCollection<Document> collection = mongoDatabase.getCollection("time");
UpdateOptions updateOptions = new UpdateOptions().upsert(true);
UpdateResult updateResult = collection.updateOne(
new Document("_id",nameSpace),
new Document("$set", new Document("clusterTime",clusterTime)),
updateOptions
);
}
private static void watchCollectionTimes() {
MongoCursor<ChangeStreamDocument<Document>> cursor = null;
BsonTimestamp clusterTime = getTime("test.meta");
if( clusterTime == null ) {
cursor = mongoClient.getDatabase("test")
.getCollection("meta")
.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
.iterator();
} else {
cursor = mongoClient.getDatabase("test")
.getCollection("meta")
.watch()
// startAtOperationTime from Get Time Stamp
.startAtOperationTime(clusterTime)
.fullDocument(FullDocument.UPDATE_LOOKUP)
.iterator();
}
while (cursor.hasNext()) {
ChangeStreamDocument<Document> doc = cursor.next();
logger.debug("ChangeStreamDocument : {}", doc.toString());
changeStreamDataHandler( doc );
}
}
private static void watchCollection() {
/*
// Watch Pipelines
List<Bson> pipeline = singletonList(
match(
or(
Filters.in("operationType", asList("insert", "delete"))
,and(
Filters.eq("operationType","update"),
// updateDescription.updatedFields 에 필드 지정
Filters.exists(
"updateDescription.updatedFields.a",
true)
)
)
)
);
*/
MongoCursor<ChangeStreamDocument<Document>> cursor =
mongoClient.getDatabase("test")
.getCollection("meta")
.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
.iterator();
while (cursor.hasNext()) {
ChangeStreamDocument<Document> doc = cursor.next();
logger.debug("ChangeStreamDocument : {}", doc);
changeStreamDataHandler( doc );
}
}
public static void batchWorker() {
// Change Stream Run Time Mode
if ("t".equals(WATCH_MODE)) { // Token Mode
watchCollectionTokens();
}
else if ("s".equals(WATCH_MODE)) { // Time Stamp
watchCollectionTimes();
} else {
watchCollection();
}
}
private boolean batchInit(String[] args) {
try {
PROFILE = environment.getProperty("spring.profiles.active");
logger.info("Active Profiles - {}", PROFILE );
if( args.length > 0) {
WATCH_MODE = args[0];
} else {
WATCH_MODE = "n";
}
logger.info("WATCH_MODE - {}", WATCH_MODE );
//logger.info("spring.data.mongodb.uri - {}", environment.getProperty("spring.data.mongodb.uri") );
// MongoDB Connection
ConnectionString connectionString;
connectionString = new ConnectionString(
environment.getProperty("spring.data.mongodb.uri")
);
// POJO Object를 등록
CodecRegistry pojoCodecRegistry = fromProviders(
PojoCodecProvider.
builder().
//register("").
automatic(true).
build()
);
CodecRegistry codecRegistry = fromRegistries(
MongoClientSettings.getDefaultCodecRegistry(),
pojoCodecRegistry
);
// MongoDB Connection String 과 POJO Object 등의 옵션을 MongoClientSettings 에 적용
MongoClientSettings mongoClientSettings =
MongoClientSettings.builder()
.applyConnectionString(connectionString)
.codecRegistry(codecRegistry)
.build();
// 전역 객체에 MongoDB Client 접속
mongoClient = MongoClients.create(mongoClientSettings);
} catch (Exception e) {
logger.error("{}", e.toString());
return false;
}
return true;
}
@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
return args -> {
logger.info("--------------------------------------------------------------------------------");
logger.info("Start MaintenanceSyncApplication - {}", Arrays.toString(args));
logger.info("--------------------------------------------------------------------------------");
// --------------------------------------------------------------------------------
// batch Work
batchInit(args);
batchWorker();
// batch Work End
// --------------------------------------------------------------------------------
logger.info("--------------------------------------------------------------------------------");
logger.info("MaintenanceSyncApplication Finished");
logger.info("--------------------------------------------------------------------------------");
};
}
public static void main(String[] args) {
SpringApplication.run(MaintenanceSyncApplication.class, args);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>net.sec</groupId>
<artifactId>MaintenanceSync</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>MaintenanceSync</name>
<description>Maintenance Sync Batch Application</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
<configuration>
<classifier>spring-boot</classifier>
<mainClass>com.mongodb.century.MongoCenturyApplication</mainClass>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<id>copy-resources</id>
<phase>process-classes</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/classes</outputDirectory>
<resources>
<resource>
<directory>${basedir}/main/resources/</directory>
<includes>
<include>**/*.*</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
</execution>
</executions>
<configuration>
<!--<outputDirectory>../../${lib.dir}</outputDirectory><overWriteIfNewer>true</overWriteIfNewer>-->
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</plugin>
</plugins>
</build>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment