Skip to content

Instantly share code, notes, and snippets.

@hoangtrucit
Created September 21, 2018 06:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hoangtrucit/28be985af2655563b1ab758de20fd749 to your computer and use it in GitHub Desktop.
Save hoangtrucit/28be985af2655563b1ab758de20fd749 to your computer and use it in GitHub Desktop.
package com.kinesisdemo.kinesisdemo.service.kinesis.consumer;
import com.kinesisdemo.kinesisdemo.service.kinesis.config.KinesisClient;
import com.kinesisdemo.kinesisdemo.service.kinesis.config.KinesisConfig;
import com.kinesisdemo.kinesisdemo.service.mysql.UserRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Component
public class Consumer implements CommandLineRunner {
@Autowired
private KinesisConfig kinesisConfig;
@Autowired
private KinesisClient kinesisClient;
@Autowired
private UserRepository userRepository;
private ScheduledExecutorService executor;
@Override
public void run(String... strings){
try{
executor = Executors.newScheduledThreadPool(4);
QueryEvent queryEvent1 = new QueryEvent()
.setShardId(kinesisConfig.getShardOne())
.setKinesisClient(kinesisClient)
.setKinesisConfig(kinesisConfig)
.setUserRepository(userRepository)
.getPointerFromKinesis();
QueryEvent queryEvent2 = new QueryEvent()
.setShardId(kinesisConfig.getShardTwo())
.setKinesisClient(kinesisClient)
.setKinesisConfig(kinesisConfig)
.setUserRepository(userRepository)
.getPointerFromKinesis();
// executor.scheduleAtFixedRate(queryEvent1, 1, 1, TimeUnit.SECONDS);
// executor.scheduleAtFixedRate(queryEvent2, 1, 1, TimeUnit.SECONDS);
}catch (Throwable e){
e.printStackTrace();
System.out.println(e.getMessage());
}
}
@PreDestroy
public void onDestroy() throws Exception {
executor.shutdown();
executor.shutdownNow();
while (!executor.isTerminated()) {
Thread.sleep(100);
executor.shutdownNow();
}
System.out.println("Success stop consumer");
}
}
package com.kinesisdemo.kinesisdemo.service.kinesis.consumer;
import com.amazonaws.services.kinesis.model.*;
import com.kinesisdemo.kinesisdemo.service.kinesis.config.KinesisClient;
import com.kinesisdemo.kinesisdemo.service.kinesis.config.KinesisConfig;
import com.kinesisdemo.kinesisdemo.service.mysql.ProccessData;
import com.kinesisdemo.kinesisdemo.service.mysql.UserRepository;
import java.util.Date;
import java.util.List;
public class QueryEvent implements Runnable {
private Boolean lock = false;
private String shardIterator = "";
private String shardId;
private KinesisConfig kinesisConfig;
private KinesisClient kinesisClient;
private ShardIteratorType shardIteratorType = ShardIteratorType.LATEST;
private Long millisBehindLatest;
private Date date;
private UserRepository userRepository;
public UserRepository getUserRepository() {
return userRepository;
}
public QueryEvent setUserRepository(UserRepository userRepository) {
this.userRepository = userRepository;
return this;
}
public QueryEvent setDate(Date date) {
this.date = date;
return this;
}
public QueryEvent setShardIteratorType(ShardIteratorType shardIteratorType) {
this.shardIteratorType = shardIteratorType;
return this;
}
public QueryEvent setShardId(String shardId) {
this.shardId = shardId;
return this;
}
public QueryEvent setKinesisConfig(KinesisConfig kinesisConfig) {
this.kinesisConfig = kinesisConfig;
return this;
}
public QueryEvent setKinesisClient(KinesisClient kinesisClient) {
this.kinesisClient = kinesisClient;
return this;
}
public QueryEvent getPointerFromKinesis() {
GetShardIteratorResult getShardIteratorResult = this.kinesisClient
.getAmazonKinesis()
.getShardIterator(
this.buildQuery()
);
this.shardIterator = getShardIteratorResult.getShardIterator();
return this;
}
private GetShardIteratorRequest buildQuery() {
Shard shard = new Shard();
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(kinesisConfig.getStreamName());
getShardIteratorRequest.setShardId(shard.getShardId());
getShardIteratorRequest.setShardId(this.shardId);
getShardIteratorRequest.setShardIteratorType(shardIteratorType);
switch (shardIteratorType) {
case AT_TIMESTAMP:
getShardIteratorRequest.setTimestamp(date);
break;
}
return getShardIteratorRequest;
}
private void getData() {
lock = true;
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(5000);
GetRecordsResult result = kinesisClient
.getAmazonKinesis()
.getRecords(getRecordsRequest);
List<Record> recordList = result.getRecords();
shardIterator = result.getNextShardIterator();
millisBehindLatest = result.getMillisBehindLatest();
System.out.println(String.format(
"[shard]: %s [query type]: %s [total records]: %s [latest]: %s",
shardId,
shardIteratorType.toString(),
this.formatTotalRecords(recordList.size()),
millisBehindLatest.toString()
));
if (recordList.size() > 0) {
new Thread(new ProccessData(result.getRecords(), userRepository)).start();
}
lock = false;
}
private String formatTotalRecords(int x) {
return String.format(x > 0 ? "\033[31;1m%d\033[0m" : "%d", x);
}
public void proccessData() {
}
@Override
public void run() {
if (lock || this.shardIterator.length() <= 0) return;
this.getData();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment