Skip to content

Instantly share code, notes, and snippets.

@kellydavid
Created May 2, 2018 10:51
Show Gist options
  • Save kellydavid/a5a4ba238f92248a8de86b804ed33304 to your computer and use it in GitHub Desktop.
Save kellydavid/a5a4ba238f92248a8de86b804ed33304 to your computer and use it in GitHub Desktop.
A quick test run for getting started with the Alpakka Kinesis Connector
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.kinesis.ShardSettings
import akka.stream.alpakka.kinesis.scaladsl.KinesisSource
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Materializer}
import com.amazonaws.regions.{Region, Regions}
import com.amazonaws.services.kinesis.model.{Record, ShardIteratorType}
import com.amazonaws.services.kinesis.{AmazonKinesisAsyncClientBuilder, AmazonKinesisClient}
import org.scalatest.FunSuite
import scala.concurrent.Await
import scala.concurrent.duration._
class AlpakkaKinesisTest extends FunSuite {
// kinesis client
val client: AmazonKinesisClient = {
val client = new AmazonKinesisClient()
client.setRegion(Region.getRegion(Regions.EU_WEST_1))
client
}
val testStream: String = "testStreamA"
def convertRecordsToStrings(records: List[Record]): List[String] = {
records.map(r => new String(r.getData.array()))
}
test("A getting started test case"){
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()
implicit val amazonKinesisAsync: com.amazonaws.services.kinesis.AmazonKinesisAsync =
AmazonKinesisAsyncClientBuilder.standard().withRegion("eu-west-1").build()
val describeStream = client.describeStream(testStream)
val settings = ShardSettings(
streamName = testStream,
shardId = describeStream.getStreamDescription.getShards.get(0).getShardId,
shardIteratorType = ShardIteratorType.TRIM_HORIZON,
refreshInterval = 300.millis,
limit = 500
)
val source: Source[com.amazonaws.services.kinesis.model.Record, NotUsed] =
KinesisSource.basic(settings, amazonKinesisAsync)
val pipeline = source
.map(middle => {println(new String(middle.getData.array())); middle})
.toMat(Sink.collection)(Keep.right)
val fut = pipeline.run()
val finalResult = Await.result(fut, 10.seconds).toList
val finalResultConverted = convertRecordsToStrings(finalResult)
println(finalResultConverted)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment