syntax = "proto3";
package rpc.pkg;
message HelloRequest {
float name = 1;
}
message HelloResponse {
string message = 1;
}
service Greeter {
rpc SayHello (HelloRequest) returns (HelloResponse);
}
- After compiling, it generates the next Scala code:
package rpc.pkg
import higherkindness.mu.rpc.protocol._
import fs2.Stream
import shapeless.{:+:, CNil}
object protocol {
@message final case class HelloRequest(name: Float)
@message final case class HelloResponse(message: String)
@service(Protobuf) trait Greeter[F[_]] {
def SayHello(req: HelloRequest): F[HelloResponse]
}
}
import cats.effect.{Sync, Timer}
import cats.syntax.functor._
import io.chrisdavenport.log4cats.Logger
import rpc.pkg.protocol
import rpc.pkg.protocol.Greeter
class HelloServiceHandler[F[_]:Timer](implicit F: Sync[F], L: Logger[F]) extends Greeter[F] {
val serviceName = "Greeter"
override def SayHello(req: protocol.HelloRequest): F[protocol.HelloResponse] =
L.info(s"$serviceName - Request: $req").as(protocol.HelloResponse("hi"))
}
import java.net.InetAddress
import cats.effect.{ConcurrentEffect, ContextShift, Effect, Resource, Timer}
import cats.implicits._
import higherkindness.mu.rpc.ChannelForAddress
import higherkindness.mu.rpc.channel.{ManagedChannelInterpreter, UsePlaintext}
import io.chrisdavenport.log4cats.Logger
import io.grpc.{CallOptions, ManagedChannel}
import rpc.pkg.protocol.{Greeter, HelloRequest, HelloResponse}
trait GreeterServiceClient[F[_]] {
def sayHello(name: Float): F[HelloResponse]
}
object GreeterServiceClient {
def apply[F[_]](client: Greeter[F])(
implicit F: Effect[F],
L: Logger[F],
T: Timer[F]): GreeterServiceClient[F] =
new GreeterServiceClient[F] {
val serviceName = "GreeterServiceClient"
override def sayHello(name: Float): F[HelloResponse] =
for {
result <- client.SayHello(HelloRequest(name))
_ <- L.info(s"$serviceName - Request: $name - Result: $result")
} yield result
}
def createClient[F[_]: ContextShift: Logger: Timer](
hostname: String,
port: Int,
sslEnabled: Boolean = true)(
implicit F: ConcurrentEffect[F]): fs2.Stream[F, GreeterServiceClient[F]] = {
val channel: F[ManagedChannel] =
F.delay(InetAddress.getByName(hostname).getHostAddress).flatMap { ip =>
val channelFor = ChannelForAddress(ip, port)
val channelConfig = if (!sslEnabled) List(UsePlaintext()) else Nil
new ManagedChannelInterpreter[F](channelFor, channelConfig).build
}
def clientFromChannel: Resource[F, Greeter[F]] =
Greeter.clientFromChannel(channel, CallOptions.DEFAULT)
fs2.Stream.resource(clientFromChannel).map(GreeterServiceClient(_))
}
}
- After binding the service implementation to the RPC server, I created this app:
import cats.effect._
import fs2.Stream
import io.chrisdavenport.log4cats.Logger
import io.chrisdavenport.log4cats.slf4j.Slf4jLogger
class HelloProgram[F[_]: ConcurrentEffect: ContextShift: Timer] {
def greeterServiceClient(host: String, port: Int)(
implicit L: Logger[F]): Stream[F, GreeterServiceClient[F]] =
GreeterServiceClient.createClient(host, port, sslEnabled = false)
def runProgram(args: List[String]): Stream[F, ExitCode] = {
for {
logger <- Stream.eval(Slf4jLogger.fromName[F]("hello-app"))
exitCode <- clientProgram(logger)
} yield exitCode
}
def clientProgram(implicit L: Logger[F]): Stream[F, ExitCode] = {
for {
client <- greeterServiceClient("localhost", 19683)
_ <- Stream.eval(client.sayHello(2.5f))
} yield ExitCode.Success
}
}
object HelloApp extends IOApp {
def run(args: List[String]): IO[ExitCode] =
new HelloProgram[IO]
.runProgram(args)
.compile
.toList
.map(_.headOption.getOrElse(ExitCode.Error))
}
- Output in the server side:
INFO - Greeter - Request: HelloRequest(2.5)
- Output in the client side:
INFO - GreeterServiceClient - Request: 2.5 - Result: HelloResponse(hi)