Skip to content

Instantly share code, notes, and snippets.

@juanpedromoreno
Last active April 28, 2019 18:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save juanpedromoreno/350b4051823cfaee6dcb455657534133 to your computer and use it in GitHub Desktop.
Save juanpedromoreno/350b4051823cfaee6dcb455657534133 to your computer and use it in GitHub Desktop.
Mu - Numeric Types in .proto
  • protocol.proto file:
syntax = "proto3";

package rpc.pkg;


message HelloRequest {
  float name = 1;
}

message HelloResponse {
  string message = 1;
}

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloResponse);
}
  • After compiling, it generates the next Scala code:
package rpc.pkg
import higherkindness.mu.rpc.protocol._
import fs2.Stream
import shapeless.{:+:, CNil}

object protocol { 

@message final case class HelloRequest(name: Float)
@message final case class HelloResponse(message: String)
@service(Protobuf) trait Greeter[F[_]] {
  def SayHello(req: HelloRequest): F[HelloResponse]
}

}
  • For the server, I did:
import cats.effect.{Sync, Timer}
import cats.syntax.functor._
import io.chrisdavenport.log4cats.Logger
import rpc.pkg.protocol
import rpc.pkg.protocol.Greeter

class HelloServiceHandler[F[_]:Timer](implicit F: Sync[F], L: Logger[F]) extends Greeter[F] {

  val serviceName = "Greeter"

  override def SayHello(req: protocol.HelloRequest): F[protocol.HelloResponse] =
    L.info(s"$serviceName - Request: $req").as(protocol.HelloResponse("hi"))
}
  • For the client:
import java.net.InetAddress

import cats.effect.{ConcurrentEffect, ContextShift, Effect, Resource, Timer}
import cats.implicits._
import higherkindness.mu.rpc.ChannelForAddress
import higherkindness.mu.rpc.channel.{ManagedChannelInterpreter, UsePlaintext}
import io.chrisdavenport.log4cats.Logger
import io.grpc.{CallOptions, ManagedChannel}
import rpc.pkg.protocol.{Greeter, HelloRequest, HelloResponse}

trait GreeterServiceClient[F[_]] {

  def sayHello(name: Float): F[HelloResponse]

}

object GreeterServiceClient {

  def apply[F[_]](client: Greeter[F])(
      implicit F: Effect[F],
      L: Logger[F],
      T: Timer[F]): GreeterServiceClient[F] =
    new GreeterServiceClient[F] {

      val serviceName = "GreeterServiceClient"

      override def sayHello(name: Float): F[HelloResponse] =
        for {
          result <- client.SayHello(HelloRequest(name))
          _      <- L.info(s"$serviceName - Request: $name - Result: $result")
        } yield result
    }

  def createClient[F[_]: ContextShift: Logger: Timer](
      hostname: String,
      port: Int,
      sslEnabled: Boolean = true)(
      implicit F: ConcurrentEffect[F]): fs2.Stream[F, GreeterServiceClient[F]] = {

    val channel: F[ManagedChannel] =
      F.delay(InetAddress.getByName(hostname).getHostAddress).flatMap { ip =>
        val channelFor    = ChannelForAddress(ip, port)
        val channelConfig = if (!sslEnabled) List(UsePlaintext()) else Nil
        new ManagedChannelInterpreter[F](channelFor, channelConfig).build
      }

    def clientFromChannel: Resource[F, Greeter[F]] =
      Greeter.clientFromChannel(channel, CallOptions.DEFAULT)

    fs2.Stream.resource(clientFromChannel).map(GreeterServiceClient(_))
  }
}
  • After binding the service implementation to the RPC server, I created this app:
import cats.effect._
import fs2.Stream
import io.chrisdavenport.log4cats.Logger
import io.chrisdavenport.log4cats.slf4j.Slf4jLogger

class HelloProgram[F[_]: ConcurrentEffect: ContextShift: Timer] {

  def greeterServiceClient(host: String, port: Int)(
    implicit L: Logger[F]): Stream[F, GreeterServiceClient[F]] =
    GreeterServiceClient.createClient(host, port, sslEnabled = false)

  def runProgram(args: List[String]): Stream[F, ExitCode] = {
    for {
      logger   <- Stream.eval(Slf4jLogger.fromName[F]("hello-app"))
      exitCode <- clientProgram(logger)
    } yield exitCode
  }

  def clientProgram(implicit L: Logger[F]): Stream[F, ExitCode] = {
    for {
      client <- greeterServiceClient("localhost", 19683)
      _            <- Stream.eval(client.sayHello(2.5f))
    } yield ExitCode.Success
  }
}

object HelloApp extends IOApp {
  def run(args: List[String]): IO[ExitCode] =
    new HelloProgram[IO]
      .runProgram(args)
      .compile
      .toList
      .map(_.headOption.getOrElse(ExitCode.Error))
}
  • Output in the server side:
INFO  - Greeter - Request: HelloRequest(2.5)
  • Output in the client side:
INFO  - GreeterServiceClient - Request: 2.5 - Result: HelloResponse(hi)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment