Created
June 29, 2021 00:32
-
-
Save marantz/fbc1caea5a449e44b70083a597732bb2 to your computer and use it in GitHub Desktop.
MongoDB Change Streams Java Spring Boot Application Sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spring: | |
profiles: | |
active: develop | |
--- | |
spring: | |
config: | |
activated: | |
on-profile: develop | |
data: | |
mongodb: | |
#uri: "mongodb://admin:admin@localhost:27017,localhost:27018,localhost:27019/test?authSource=admin&replicaSet=replset" | |
uri: "mongodb://localhost:27017/test" | |
database: test | |
--- | |
logging: | |
level: | |
root: error | |
sun.rmi: error | |
org.springframework: info | |
org.project: debug | |
com.zaxxer.hikari: off | |
pattern: | |
console: "%d{HH:mm:ss.SSS} - %msg%n" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.project; | |
import com.mongodb.ConnectionString; | |
import com.mongodb.MongoClientSettings; | |
import com.mongodb.client.*; | |
import com.mongodb.client.model.Filters; | |
import com.mongodb.client.model.UpdateOptions; | |
import com.mongodb.client.model.changestream.ChangeStreamDocument; | |
import com.mongodb.client.model.changestream.FullDocument; | |
import com.mongodb.client.result.UpdateResult; | |
import org.bson.BsonDocument; | |
import org.bson.BsonTimestamp; | |
import org.bson.Document; | |
import org.bson.codecs.configuration.CodecRegistry; | |
import org.bson.codecs.pojo.PojoCodecProvider; | |
import org.bson.conversions.Bson; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.CommandLineRunner; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
import org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration; | |
import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration; | |
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; | |
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration; | |
import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; | |
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; | |
import org.springframework.context.ApplicationContext; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.core.env.Environment; | |
import java.util.Arrays; | |
import java.util.List; | |
import static com.mongodb.client.model.Aggregates.match; | |
import static com.mongodb.client.model.Filters.and; | |
import static com.mongodb.client.model.Filters.or; | |
import static com.sun.tools.doclint.Entity.or; | |
import static java.util.Arrays.asList; | |
import static java.util.Collections.singletonList; | |
import static org.bson.codecs.configuration.CodecRegistries.fromProviders; | |
import static org.bson.codecs.configuration.CodecRegistries.fromRegistries; | |
@SpringBootApplication | |
@EnableAutoConfiguration(exclude = { | |
MongoAutoConfiguration.class, | |
MongoDataAutoConfiguration.class, | |
MongoRepositoriesAutoConfiguration.class, | |
DataSourceAutoConfiguration.class, | |
DataSourceTransactionManagerAutoConfiguration.class, | |
HibernateJpaAutoConfiguration.class | |
}) | |
public class MaintenanceSyncApplication { | |
private static final Logger logger = LoggerFactory.getLogger(MaintenanceSyncApplication.class); | |
@Autowired | |
private Environment environment; | |
private static MongoClient mongoClient; | |
private static String PROFILE; // devel, prod | |
private static String WATCH_MODE; // ARGS 1 - 't' : Token, 's' : TimeStamp | |
private static BsonDocument getToken(String nameSpace) { | |
MongoDatabase mongoDatabase = mongoClient.getDatabase("resume"); | |
MongoCollection<Document> collection = mongoDatabase.getCollection("token"); | |
BsonDocument resumeToken = null; | |
Document resumeTokenDoc = null; | |
try { | |
resumeTokenDoc = | |
collection.find(new Document("_id",nameSpace)) | |
.first() | |
.get("resumeToken",Document.class); | |
if( resumeTokenDoc != null ) { | |
//logger.debug("resumeTokenDoc - {}",resumeTokenDoc.toString()); | |
resumeToken = BsonDocument.parse(resumeTokenDoc.toJson()); | |
//logger.debug("resumeToken - {}",resumeToken.toString()); | |
} | |
} catch (Exception e) { | |
logger.error("getToken Error - {}",e.toString()); | |
logger.info("Change Stream Start without Token!!"); | |
} | |
return resumeToken; | |
} | |
private static void watchCollectionTokens(){ | |
MongoCursor<ChangeStreamDocument<Document>> cursor = null; | |
BsonDocument resumeToken = getToken("resume.token"); | |
if( resumeToken == null ) { | |
cursor = mongoClient.getDatabase("source") | |
.getCollection("coll") | |
.watch() | |
.fullDocument(FullDocument.UPDATE_LOOKUP) | |
.iterator(); | |
} else { | |
cursor = mongoClient.getDatabase("source") | |
.getCollection("coll") | |
.watch() | |
.startAfter(resumeToken) | |
.fullDocument(FullDocument.UPDATE_LOOKUP) | |
.iterator(); | |
} | |
while (cursor.hasNext()) { | |
ChangeStreamDocument<Document> doc = cursor.next(); | |
logger.debug("ChangeStreamDocument : {}", doc.toString()); | |
changeStreamDataHandler( doc ); | |
} | |
} | |
private static void saveToken(String nameSpace, BsonDocument token) { | |
MongoDatabase mongoDatabase = mongoClient.getDatabase("resume"); | |
MongoCollection<Document> collection = mongoDatabase.getCollection("token"); | |
UpdateOptions updateOptions = new UpdateOptions().upsert(true); | |
UpdateResult updateResult = collection.updateOne( | |
new Document("_id",nameSpace), | |
new Document("$set",new Document("resumeToken",token)), | |
updateOptions | |
); | |
} | |
private static void changeStreamDataHandler( ChangeStreamDocument<Document> streamData) { | |
MongoDatabase mongoDatabase = mongoClient.getDatabase("target"); | |
MongoCollection<Document> collection = mongoDatabase.getCollection("coll"); | |
try { | |
//-------------------------------------------------------------------------------------------------------------- | |
switch (streamData.getOperationType()) { | |
case INSERT: | |
case UPDATE: | |
case DELETE: | |
case REPLACE: | |
collection.insertOne(streamData.getFullDocument()); | |
break; | |
} | |
} catch (Exception e) { | |
logger.error(" changeStreamDataHandler Error - {}", e.toString()); | |
} finally { | |
saveToken(streamData.getNamespace().toString(),streamData.getResumeToken()); | |
} | |
} | |
private static BsonTimestamp getTime(String nameSpace) { | |
MongoDatabase mongoDatabase = mongoClient.getDatabase("test"); | |
MongoCollection<Document> collection = mongoDatabase.getCollection("time"); | |
BsonTimestamp clusterTime = null; | |
try { | |
clusterTime = collection.find(new Document("_id",nameSpace)) | |
.first() | |
.get("clusterTime",BsonTimestamp.class) | |
; | |
} catch (Exception e) { | |
logger.info("Change Stream Start with New Time Stamp!!"); | |
} | |
return clusterTime; | |
} | |
private static void saveTime(String nameSpace, BsonTimestamp clusterTime) { | |
MongoDatabase mongoDatabase = mongoClient.getDatabase("test"); | |
MongoCollection<Document> collection = mongoDatabase.getCollection("time"); | |
UpdateOptions updateOptions = new UpdateOptions().upsert(true); | |
UpdateResult updateResult = collection.updateOne( | |
new Document("_id",nameSpace), | |
new Document("$set", new Document("clusterTime",clusterTime)), | |
updateOptions | |
); | |
} | |
private static void watchCollectionTimes() { | |
MongoCursor<ChangeStreamDocument<Document>> cursor = null; | |
BsonTimestamp clusterTime = getTime("test.meta"); | |
if( clusterTime == null ) { | |
cursor = mongoClient.getDatabase("test") | |
.getCollection("meta") | |
.watch() | |
.fullDocument(FullDocument.UPDATE_LOOKUP) | |
.iterator(); | |
} else { | |
cursor = mongoClient.getDatabase("test") | |
.getCollection("meta") | |
.watch() | |
// startAtOperationTime from Get Time Stamp | |
.startAtOperationTime(clusterTime) | |
.fullDocument(FullDocument.UPDATE_LOOKUP) | |
.iterator(); | |
} | |
while (cursor.hasNext()) { | |
ChangeStreamDocument<Document> doc = cursor.next(); | |
logger.debug("ChangeStreamDocument : {}", doc.toString()); | |
changeStreamDataHandler( doc ); | |
} | |
} | |
private static void watchCollection() { | |
/* | |
// Watch Pipelines | |
List<Bson> pipeline = singletonList( | |
match( | |
or( | |
Filters.in("operationType", asList("insert", "delete")) | |
,and( | |
Filters.eq("operationType","update"), | |
// updateDescription.updatedFields 에 필드 지정 | |
Filters.exists( | |
"updateDescription.updatedFields.a", | |
true) | |
) | |
) | |
) | |
); | |
*/ | |
MongoCursor<ChangeStreamDocument<Document>> cursor = | |
mongoClient.getDatabase("test") | |
.getCollection("meta") | |
.watch() | |
.fullDocument(FullDocument.UPDATE_LOOKUP) | |
.iterator(); | |
while (cursor.hasNext()) { | |
ChangeStreamDocument<Document> doc = cursor.next(); | |
logger.debug("ChangeStreamDocument : {}", doc); | |
changeStreamDataHandler( doc ); | |
} | |
} | |
public static void batchWorker() { | |
// Change Stream Run Time Mode | |
if ("t".equals(WATCH_MODE)) { // Token Mode | |
watchCollectionTokens(); | |
} | |
else if ("s".equals(WATCH_MODE)) { // Time Stamp | |
watchCollectionTimes(); | |
} else { | |
watchCollection(); | |
} | |
} | |
private boolean batchInit(String[] args) { | |
try { | |
PROFILE = environment.getProperty("spring.profiles.active"); | |
logger.info("Active Profiles - {}", PROFILE ); | |
if( args.length > 0) { | |
WATCH_MODE = args[0]; | |
} else { | |
WATCH_MODE = "n"; | |
} | |
logger.info("WATCH_MODE - {}", WATCH_MODE ); | |
//logger.info("spring.data.mongodb.uri - {}", environment.getProperty("spring.data.mongodb.uri") ); | |
// MongoDB Connection | |
ConnectionString connectionString; | |
connectionString = new ConnectionString( | |
environment.getProperty("spring.data.mongodb.uri") | |
); | |
// POJO Object를 등록 | |
CodecRegistry pojoCodecRegistry = fromProviders( | |
PojoCodecProvider. | |
builder(). | |
//register(""). | |
automatic(true). | |
build() | |
); | |
CodecRegistry codecRegistry = fromRegistries( | |
MongoClientSettings.getDefaultCodecRegistry(), | |
pojoCodecRegistry | |
); | |
// MongoDB Connection String 과 POJO Object 등의 옵션을 MongoClientSettings 에 적용 | |
MongoClientSettings mongoClientSettings = | |
MongoClientSettings.builder() | |
.applyConnectionString(connectionString) | |
.codecRegistry(codecRegistry) | |
.build(); | |
// 전역 객체에 MongoDB Client 접속 | |
mongoClient = MongoClients.create(mongoClientSettings); | |
} catch (Exception e) { | |
logger.error("{}", e.toString()); | |
return false; | |
} | |
return true; | |
} | |
@Bean | |
public CommandLineRunner commandLineRunner(ApplicationContext ctx) { | |
return args -> { | |
logger.info("--------------------------------------------------------------------------------"); | |
logger.info("Start MaintenanceSyncApplication - {}", Arrays.toString(args)); | |
logger.info("--------------------------------------------------------------------------------"); | |
// -------------------------------------------------------------------------------- | |
// batch Work | |
batchInit(args); | |
batchWorker(); | |
// batch Work End | |
// -------------------------------------------------------------------------------- | |
logger.info("--------------------------------------------------------------------------------"); | |
logger.info("MaintenanceSyncApplication Finished"); | |
logger.info("--------------------------------------------------------------------------------"); | |
}; | |
} | |
public static void main(String[] args) { | |
SpringApplication.run(MaintenanceSyncApplication.class, args); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<parent> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-parent</artifactId> | |
<version>2.4.0</version> | |
<relativePath/> <!-- lookup parent from repository --> | |
</parent> | |
<groupId>net.sec</groupId> | |
<artifactId>MaintenanceSync</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<name>MaintenanceSync</name> | |
<description>Maintenance Sync Batch Application</description> | |
<properties> | |
<java.version>1.8</java.version> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-batch</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-data-jdbc</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-data-mongodb</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.projectlombok</groupId> | |
<artifactId>lombok</artifactId> | |
<optional>true</optional> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-maven-plugin</artifactId> | |
<executions> | |
<execution> | |
<goals> | |
<goal>repackage</goal> | |
</goals> | |
<configuration> | |
<classifier>spring-boot</classifier> | |
<mainClass>com.mongodb.century.MongoCenturyApplication</mainClass> | |
</configuration> | |
</execution> | |
</executions> | |
</plugin> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<configuration> | |
<source>1.8</source> | |
<target>1.8</target> | |
</configuration> | |
</plugin> | |
<plugin> | |
<artifactId>maven-resources-plugin</artifactId> | |
<version>2.3</version> | |
<executions> | |
<execution> | |
<id>copy-resources</id> | |
<phase>process-classes</phase> | |
<goals> | |
<goal>copy-resources</goal> | |
</goals> | |
<configuration> | |
<outputDirectory>${basedir}/target/classes</outputDirectory> | |
<resources> | |
<resource> | |
<directory>${basedir}/main/resources/</directory> | |
<includes> | |
<include>**/*.*</include> | |
</includes> | |
</resource> | |
</resources> | |
</configuration> | |
</execution> | |
</executions> | |
</plugin> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-dependency-plugin</artifactId> | |
<version>2.3</version> | |
<executions> | |
<execution> | |
<id>copy-dependencies</id> | |
<phase>package</phase> | |
<goals> | |
<goal>copy-dependencies</goal> | |
</goals> | |
</execution> | |
</executions> | |
<configuration> | |
<!--<outputDirectory>../../${lib.dir}</outputDirectory><overWriteIfNewer>true</overWriteIfNewer>--> | |
<outputDirectory>${project.build.directory}/lib</outputDirectory> | |
<overWriteReleases>false</overWriteReleases> | |
<overWriteSnapshots>false</overWriteSnapshots> | |
<overWriteIfNewer>true</overWriteIfNewer> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment