Skip to content

Instantly share code, notes, and snippets.

@majk-p
Last active January 10, 2024 08:47
Show Gist options
  • Save majk-p/558b231d0b7a73b1cd46ad39d8d438a5 to your computer and use it in GitHub Desktop.
Save majk-p/558b231d0b7a73b1cd46ad39d8d438a5 to your computer and use it in GitHub Desktop.
Reproduction of bug where using AT_TIMESTAMP kinesis iterator breaks Localstack
import cats.effect.IO
import cats.Endo
import cats.implicits.*
import io.laserdisc.pure.kinesis.tagless.KinesisAsyncClientOp
import software.amazon.awssdk.core.async.AsyncRequestBody
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType
import scala.jdk.CollectionConverters.*
import java.time.Instant
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse
object aws {
object extensions {
extension (kinesisClient: KinesisAsyncClientOp[IO]) {
def readEventsAtTimestamp(
streamName: String,
from: Instant
): IO[List[String]] =
readEvents(streamName)(
_.shardIteratorType(ShardIteratorType.AT_TIMESTAMP).timestamp(from)
)
def readEventsTrimHorizon(streamName: String): IO[List[String]] =
readEvents(streamName)(
_.shardIteratorType(ShardIteratorType.TRIM_HORIZON)
)
def sendEvent(
streamName: String,
eventContent: String
): IO[PutRecordResponse] =
kinesisClient.putRecord(
PutRecordRequest
.builder()
.streamName(streamName)
.data(SdkBytes.fromUtf8String(eventContent))
.partitionKey("some-partition")
.build()
)
private def readEvents(
streamName: String
)(
iteratorBuilderExtension: Endo[GetShardIteratorRequest.Builder]
): IO[List[String]] =
for {
shardsResponse <-
kinesisClient.listShards(
ListShardsRequest.builder().streamName(streamName).build()
)
shards = shardsResponse.shards().asScala.toList
iteratorsResponses <- shards.parTraverse { shard =>
kinesisClient.getShardIterator(
iteratorBuilderExtension(
GetShardIteratorRequest
.builder()
.streamName(streamName)
.shardId(shard.shardId())
).build()
)
}
iterators = iteratorsResponses.map(_.shardIterator())
recordsResponses <- iterators.parTraverse { iterator =>
kinesisClient.getRecords(
GetRecordsRequest
.builder()
.shardIterator(iterator)
.build()
)
}
events = recordsResponses
.flatMap(_.records().asScala)
.map(_.data().asUtf8String())
} yield events
}
}
}
//> using scala 3
//> using dep "org.typelevel::cats-effect:3.5.2"
//> using dep "io.laserdisc::fs2-aws-kinesis:6.1.1"
//> using dep "com.dimafeng::testcontainers-scala-localstack-v2:0.41.0"
import aws.extensions.*
import cats.effect.*
import com.dimafeng.testcontainers.LocalStackV2Container
import org.testcontainers.containers.localstack.LocalStackContainer.Service
object Main extends IOApp {
val myEvent = "event"
override def run(args: List[String]): IO[ExitCode] =
localstackResources.use { (kinesisClient, streamName) =>
for {
_ <- kinesisClient.sendEvent(streamName, myEvent)
now <- IO.realTimeInstant
found <-
args match
case "TRIM_HORIZON" :: _ =>
kinesisClient.readEventsTrimHorizon(streamName)
case _ =>
kinesisClient.readEventsAtTimestamp(streamName, now)
_ <- IO.println(s"Found kinesis event: $found")
} yield ExitCode.Success
}
val localstackResources = for {
container <- localstack.utils.runLocalstack(Seq(Service.KINESIS))
_ <- localStackInfo(container)
kinesisClient <- localstack.utils.kinesisClient(container)
streamName <-
localstack.utils.kinesisStreamResource(kinesisClient)("my-stream")
} yield (kinesisClient, streamName)
def localStackInfo(container: LocalStackV2Container) = {
val containerId = container.container.getContainerId().take(10)
IO
.println(s"""Started Localstack container v${container.tag}
|Read container logs by running:
|docker logs -f $containerId""".stripMargin)
.toResource
}
}
package localstack
import cats.Endo
import cats.effect.IO
import cats.effect.Resource
import cats.implicits.*
import com.dimafeng.testcontainers.LocalStackV2Container
import com.dimafeng.testcontainers.SingleContainer
import io.laserdisc.pure.kinesis.tagless.KinesisAsyncClientOp
import io.laserdisc.pure.kinesis.tagless.{Interpreter => KinesisInterpreter}
import org.testcontainers.containers.localstack.LocalStackContainer.Service
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import scala.jdk.CollectionConverters.*
import scala.jdk.FutureConverters.*
import scala.util.Random
object utils {
def runLocalstack(
services: Seq[LocalStackV2Container.Service]
): Resource[IO, LocalStackV2Container] =
containerResource(createContainer(services))
def kinesisClient(
container: LocalStackV2Container
): Resource[IO, KinesisAsyncClientOp[IO]] =
KinesisInterpreter
.apply[IO]
.KinesisAsyncClientOpResource(
KinesisAsyncClient
.builder()
.endpointOverride(container.endpointOverride(Service.KINESIS))
.region(container.region)
.credentialsProvider(container.staticCredentialsProvider)
)
def kinesisStreamResource(
kinesisClient: KinesisAsyncClientOp[IO]
)(
streamName: String,
additionalParameters: Endo[CreateStreamRequest.Builder] = identity
): Resource[IO, String] =
Resource.make(for {
sn <- IO(Random.alphanumeric.take(8).mkString).map(randomSuffix =>
s"$streamName-$randomSuffix"
)
_ <- kinesisClient.createStream(
additionalParameters(
CreateStreamRequest.builder().streamName(sn).shardCount(1)
).build()
)
_ <- kinesisClient.waiter.flatMap { waiter =>
val describeStreamRequest =
DescribeStreamRequest.builder().streamName(sn).build()
IO.fromFuture(
IO(waiter.waitUntilStreamExists(describeStreamRequest).asScala)
)
}
} yield sn)(sn =>
kinesisClient
.deleteStream(DeleteStreamRequest.builder().streamName(sn).build())
.void
)
private def createContainer(
services: Seq[LocalStackV2Container.Service],
localStackTag: String = "2.3.2"
): IO[LocalStackV2Container] =
IO {
LocalStackV2Container(tag = localStackTag, services = services)
.configure(
_.setDockerImageName(s"localstack/localstack:$localStackTag")
)
.configure{ container =>
container.withEnv("DEBUG", "1")
()
}
}
private def containerResource[T <: SingleContainer[?]](
container: IO[T]
): Resource[IO, T] =
Resource.fromAutoCloseable(container.flatTap(c => IO(c.start())))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment