-
-
Save nefilim/b47805255431b00866c07f3375fa36ff to your computer and use it in GitHub Desktop.
http4s 7359
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.home4s.hue | |
import cats.ApplicativeThrow | |
import cats.effect.kernel.Async | |
import cats.effect.{IO, IOApp, Resource} | |
import fs2.Stream | |
import fs2.io.net.tls.TLSContext | |
import org.home4s.hue.TestStreamsConfig.StaticHueConfig | |
import org.http4s.client.Client | |
import org.http4s.ember.client.EmberClientBuilder | |
import org.http4s.headers.Accept | |
import org.http4s.{Header, Headers, HttpVersion, MediaType, Method, Request, ServerSentEvent, Uri} | |
import org.typelevel.ci.CIStringSyntax | |
import org.typelevel.log4cats.slf4j.Slf4jLogger | |
import org.typelevel.log4cats.{Logger, StructuredLogger} | |
import scala.concurrent.duration.DurationInt | |
object TestStreams extends IOApp.Simple { | |
def eventStream[F[_]: ApplicativeThrow]( | |
client: Client[F], | |
apiKey: String | |
)(implicit logger: Logger[F]): Stream[F, ServerSentEvent] = { | |
val req = Request[F]( | |
Method.GET, | |
Uri.unsafeFromString(s"https://192.168.100.170/eventstream/clip/v2"), | |
httpVersion = HttpVersion.`HTTP/2`, | |
headers = Headers( | |
Header.Raw(ci"hue-application-key", apiKey), | |
Accept(MediaType.`text/event-stream`), | |
) | |
) | |
client | |
.stream(req) | |
.flatMap(_.body) | |
.through(ServerSentEvent.decoder[F]) | |
.debug() | |
.handleErrorWith { e => | |
Stream.eval(logger.error(e)(s"event stream error")) >> | |
Stream.raiseError[F](e) | |
} | |
.onFinalize(logger.warn("event stream shutting down")) | |
.repeat | |
} | |
def buildHTTPClient[F[_]: Async](implicit logger: Logger[F]): Resource[F, Client[F]] = { | |
for { | |
_ <- Resource.eval(logger.info("building HTTP client")) | |
tlsContext <- TLSContext.Builder.forAsync[F].insecureResource // TODO don't use insecure | |
client <- EmberClientBuilder.default[F] | |
.withTLSContext(tlsContext) | |
.withHttp2 | |
.withTimeout(5.seconds) | |
.withIdleConnectionTime(30.seconds) | |
.build | |
} yield client | |
} | |
def buildStream[F[_]: Async](implicit logger: Logger[F]): Stream[F, Unit] = { | |
Stream.exec(logger.info("building stream")) ++ | |
Stream.resource(buildHTTPClient[F]).flatMap { client => | |
Stream( | |
eventStream[F](client, StaticHueConfig.apiKey), | |
Stream.awakeEvery[F](20.seconds).evalMap { _ => | |
client.expect[String](request[F](StaticHueConfig.apiKey)) | |
} | |
.handleErrorWith { e => | |
Stream.eval(logger.error(e)("command stream error")) >> | |
Stream.raiseError[F](e) | |
} | |
.debug() | |
.onFinalize(logger.warn("command stream shutting down")) | |
).parJoin(2) | |
} | |
.onFinalize(logger.warn("shutting stream down")) | |
.handleErrorWith { e => | |
Stream.exec(logger.error(e)("stream failed")) ++ buildStream | |
} | |
.map(_ => ()) | |
} | |
override def run: IO[Unit] = { | |
(for { | |
case implicit0(logger: StructuredLogger[IO]) <- Resource.eval(Slf4jLogger.create[IO]) | |
_ <- buildStream[IO].compile.drain.background | |
} yield ()).useForever | |
} | |
def request[F[_]](apiKey: String): Request[F] = { | |
Request( | |
Method.GET, | |
Uri.unsafeFromString(s"https://192.168.100.170/clip/v2/resource/bridge"), | |
httpVersion = HttpVersion.`HTTP/2`, | |
headers = Headers( | |
Accept(MediaType.application.json), // should we be adding this manually? | |
Header.Raw(ci"hue-application-key", apiKey) | |
) | |
) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment