Skip to content

Instantly share code, notes, and snippets.

@nefilim
Last active January 14, 2024 18:07
Show Gist options
  • Save nefilim/b47805255431b00866c07f3375fa36ff to your computer and use it in GitHub Desktop.
Save nefilim/b47805255431b00866c07f3375fa36ff to your computer and use it in GitHub Desktop.
http4s 7359
package org.home4s.hue
import cats.ApplicativeThrow
import cats.effect.kernel.Async
import cats.effect.{IO, IOApp, Resource}
import fs2.Stream
import fs2.io.net.tls.TLSContext
import org.home4s.hue.TestStreamsConfig.StaticHueConfig
import org.http4s.client.Client
import org.http4s.ember.client.EmberClientBuilder
import org.http4s.headers.Accept
import org.http4s.{Header, Headers, HttpVersion, MediaType, Method, Request, ServerSentEvent, Uri}
import org.typelevel.ci.CIStringSyntax
import org.typelevel.log4cats.slf4j.Slf4jLogger
import org.typelevel.log4cats.{Logger, StructuredLogger}
import scala.concurrent.duration.DurationInt
object TestStreams extends IOApp.Simple {
def eventStream[F[_]: ApplicativeThrow](
client: Client[F],
apiKey: String
)(implicit logger: Logger[F]): Stream[F, ServerSentEvent] = {
val req = Request[F](
Method.GET,
Uri.unsafeFromString(s"https://192.168.100.170/eventstream/clip/v2"),
httpVersion = HttpVersion.`HTTP/2`,
headers = Headers(
Header.Raw(ci"hue-application-key", apiKey),
Accept(MediaType.`text/event-stream`),
)
)
client
.stream(req)
.flatMap(_.body)
.through(ServerSentEvent.decoder[F])
.debug()
.handleErrorWith { e =>
Stream.eval(logger.error(e)(s"event stream error")) >>
Stream.raiseError[F](e)
}
.onFinalize(logger.warn("event stream shutting down"))
.repeat
}
def buildHTTPClient[F[_]: Async](implicit logger: Logger[F]): Resource[F, Client[F]] = {
for {
_ <- Resource.eval(logger.info("building HTTP client"))
tlsContext <- TLSContext.Builder.forAsync[F].insecureResource // TODO don't use insecure
client <- EmberClientBuilder.default[F]
.withTLSContext(tlsContext)
.withHttp2
.withTimeout(5.seconds)
.withIdleConnectionTime(30.seconds)
.build
} yield client
}
def buildStream[F[_]: Async](implicit logger: Logger[F]): Stream[F, Unit] = {
Stream.exec(logger.info("building stream")) ++
Stream.resource(buildHTTPClient[F]).flatMap { client =>
Stream(
eventStream[F](client, StaticHueConfig.apiKey),
Stream.awakeEvery[F](20.seconds).evalMap { _ =>
client.expect[String](request[F](StaticHueConfig.apiKey))
}
.handleErrorWith { e =>
Stream.eval(logger.error(e)("command stream error")) >>
Stream.raiseError[F](e)
}
.debug()
.onFinalize(logger.warn("command stream shutting down"))
).parJoin(2)
}
.onFinalize(logger.warn("shutting stream down"))
.handleErrorWith { e =>
Stream.exec(logger.error(e)("stream failed")) ++ buildStream
}
.map(_ => ())
}
override def run: IO[Unit] = {
(for {
case implicit0(logger: StructuredLogger[IO]) <- Resource.eval(Slf4jLogger.create[IO])
_ <- buildStream[IO].compile.drain.background
} yield ()).useForever
}
def request[F[_]](apiKey: String): Request[F] = {
Request(
Method.GET,
Uri.unsafeFromString(s"https://192.168.100.170/clip/v2/resource/bridge"),
httpVersion = HttpVersion.`HTTP/2`,
headers = Headers(
Accept(MediaType.application.json), // should we be adding this manually?
Header.Raw(ci"hue-application-key", apiKey)
)
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment