Skip to content

Instantly share code, notes, and snippets.

@hnaohiro
Created December 7, 2017 07:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hnaohiro/f2b5466b15c09076a8788d797d1bf960 to your computer and use it in GitHub Desktop.
Save hnaohiro/f2b5466b15c09076a8788d797d1bf960 to your computer and use it in GitHub Desktop.
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.kinesis.model.{GetRecordsRequest, PutRecordRequest}
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClientBuilder}
import scala.collection.JavaConverters._
import scala.util.Random
object Sample extends App {
def createStream(client: AmazonKinesis, name: String, shardCount: Int): Unit = {
if (client.listStreams().getStreamNames.asScala.contains(name)) return
client.createStream(name, shardCount)
while (client.describeStream(name).getStreamDescription.getStreamStatus != "ACTIVE") {}
}
def putRecord(client: AmazonKinesis, streamName: String, partitionKey: String, data: String): Unit = {
val recordRequest = new PutRecordRequest()
recordRequest.setStreamName(streamName)
recordRequest.setData(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)))
recordRequest.setPartitionKey(partitionKey)
client.putRecord(recordRequest)
}
def getRecords(client: AmazonKinesis, streamName: String): Seq[String] = {
val shardIds = client.describeStream(streamName).getStreamDescription.getShards.asScala.map(_.getShardId)
shardIds.flatMap { shardId =>
val shardIterator = client.getShardIterator(streamName, shardId, "TRIM_HORIZON").getShardIterator
val request = new GetRecordsRequest()
request.setShardIterator(shardIterator)
val records = client.getRecords(request).getRecords.asScala
records.map(record => StandardCharsets.UTF_8.decode(record.getData).toString)
}
}
def printRecords(client: AmazonKinesis, streamName: String): Unit = {
val records = getRecords(client, streamName)
println(records)
}
val kinesis = AmazonKinesisClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy")))
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4568", null))
.build()
val streamName = "test"
createStream(kinesis, streamName, 5)
putRecord(kinesis, streamName, "key1", "hello てすと " + new Random().nextInt(100))
printRecords(kinesis, streamName)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment