Created
May 2, 2018 10:51
-
-
Save kellydavid/a5a4ba238f92248a8de86b804ed33304 to your computer and use it in GitHub Desktop.
A quick test run for getting started with the Alpakka Kinesis Connector
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.NotUsed | |
import akka.actor.ActorSystem | |
import akka.stream.alpakka.kinesis.ShardSettings | |
import akka.stream.alpakka.kinesis.scaladsl.KinesisSource | |
import akka.stream.scaladsl.{Keep, Sink, Source} | |
import akka.stream.{ActorMaterializer, Materializer} | |
import com.amazonaws.regions.{Region, Regions} | |
import com.amazonaws.services.kinesis.model.{Record, ShardIteratorType} | |
import com.amazonaws.services.kinesis.{AmazonKinesisAsyncClientBuilder, AmazonKinesisClient} | |
import org.scalatest.FunSuite | |
import scala.concurrent.Await | |
import scala.concurrent.duration._ | |
class AlpakkaKinesisTest extends FunSuite { | |
// kinesis client | |
val client: AmazonKinesisClient = { | |
val client = new AmazonKinesisClient() | |
client.setRegion(Region.getRegion(Regions.EU_WEST_1)) | |
client | |
} | |
val testStream: String = "testStreamA" | |
def convertRecordsToStrings(records: List[Record]): List[String] = { | |
records.map(r => new String(r.getData.array())) | |
} | |
test("A getting started test case"){ | |
implicit val system: ActorSystem = ActorSystem() | |
implicit val materializer: Materializer = ActorMaterializer() | |
implicit val amazonKinesisAsync: com.amazonaws.services.kinesis.AmazonKinesisAsync = | |
AmazonKinesisAsyncClientBuilder.standard().withRegion("eu-west-1").build() | |
val describeStream = client.describeStream(testStream) | |
val settings = ShardSettings( | |
streamName = testStream, | |
shardId = describeStream.getStreamDescription.getShards.get(0).getShardId, | |
shardIteratorType = ShardIteratorType.TRIM_HORIZON, | |
refreshInterval = 300.millis, | |
limit = 500 | |
) | |
val source: Source[com.amazonaws.services.kinesis.model.Record, NotUsed] = | |
KinesisSource.basic(settings, amazonKinesisAsync) | |
val pipeline = source | |
.map(middle => {println(new String(middle.getData.array())); middle}) | |
.toMat(Sink.collection)(Keep.right) | |
val fut = pipeline.run() | |
val finalResult = Await.result(fut, 10.seconds).toList | |
val finalResultConverted = convertRecordsToStrings(finalResult) | |
println(finalResultConverted) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment