Skip to content

Instantly share code, notes, and snippets.

@geota
Created August 13, 2015 17:50
Show Gist options
  • Save geota/ed47ecdead08ab0cab66 to your computer and use it in GitHub Desktop.
Save geota/ed47ecdead08ab0cab66 to your computer and use it in GitHub Desktop.
package com.amazonaws.services.kinesis.stormspout;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.*;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import static org.testng.Assert.*;
/**
* Created by amaceiras on 8/13/15.
*/
public class KinesisHelperTest {
@Test
public void test() {
AWSCredentials credentials = new ProfileCredentialsProvider("profile tst").getCredentials();
AmazonKinesisClient kinesisClient = new AmazonKinesisClient(credentials);
System.out.println(kinesisClient.listStreams());
GetShardIteratorRequest itrReq = new GetShardIteratorRequest().withShardId("shardId-000000000000")
.withStreamName("KIN_TEST_STREAM")
.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
GetShardIteratorResult itrRes = kinesisClient.getShardIterator(itrReq);
System.out.println("Requesting records");
GetRecordsRequest req = new GetRecordsRequest().withLimit(1000).withShardIterator(itrRes.getShardIterator());
GetRecordsResult res = kinesisClient.getRecords(req);
List<Record> seq = new ArrayList<>();
while (res.getNextShardIterator() != null) {
List<Record> records = res.getRecords();
for (Record r : records) {
if (seq.isEmpty()) {
seq.add(r);
}
else {
Record last = seq.get(seq.size() - 1);
if (r.getSequenceNumber().compareTo(last.getSequenceNumber()) < 0) {
throw new RuntimeException("OUT OF ORDER INSERT: last seq " + last.getSequenceNumber() + " is AFTER curent seq: " + r.getSequenceNumber());
}
else if (seq.size() % 100 == 0) {
System.out.println("Processed " + seq.size() + " with no out of order inserts");
}
seq.add(r);
}
}
System.out.println("Requesting records");
req = new GetRecordsRequest().withLimit(10000).withShardIterator(itrRes.getShardIterator());
res = kinesisClient.getRecords(req);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment