Skip to content

Instantly share code, notes, and snippets.

@choffmeister
Last active June 28, 2017 18:41
Show Gist options
  • Save choffmeister/bd3522bd22bb361eb5eee049afceb3f0 to your computer and use it in GitHub Desktop.
Save choffmeister/bd3522bd22bb361eb5eee049afceb3f0 to your computer and use it in GitHub Desktop.
akka.cluster.seed.zookeeper {
url = ${AKKA_CLUSTER_SEED_ZOOKEEPER_URL}
path = "/akka/cluster/seed/documents"
}
lagom {
discovery {
zookeeper {
server-hostname = ${LAGOM_DISCOVERY_ZOOKEEPER_SERVER_HOSTNAME}
server-port = 2181
server-port = ${?LAGOM_DISCOVERY_ZOOKEEPER_SERVER_PORT}
uri-scheme = "http"
uri-scheme = ${?LAGOM_DISCOVERY_ZOOKEEPER_URI_SCHEME}
routing-policy = "round-robin"
}
}
}
libraryDependencies += Seq(
"com.sclasen" %% "akka-zk-cluster-seed" % "0.1.8",
"org.apache.curator" % "curator-x-discovery" % "2.11.0"
)
import akka.actor.ActorSystem
import akka.cluster.seed.ZookeeperClusterSeed
trait MyAkkaSeedComponent {
def actorSystem: ActorSystem
val extension = ZookeeperClusterSeed(actorSystem)
extension.join()
}
import java.io.Closeable
import java.net.{InetAddress, URI}
import java.util.concurrent.ConcurrentHashMap
import com.lightbend.lagom.scaladsl.api.Descriptor.Call
import com.lightbend.lagom.scaladsl.api.ServiceLocator
import com.typesafe.config.Config
import com.typesafe.config.ConfigException.BadValue
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.utils.CloseableUtils
import org.apache.curator.x.discovery.{ServiceDiscovery, ServiceDiscoveryBuilder, ServiceInstance}
import play.api.Logger
import scala.collection.concurrent.Map
import scala.collection.convert.decorateAsScala._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
class MyServiceLocator(config: Config) extends ServiceLocator with Closeable {
import MyServiceLocator._
lazy val log = Logger(getClass)
val locatorConfig = config.getConfig("lagom.discovery.zookeeper")
val serverHostname = locatorConfig.getString("server-hostname")
val serverPort = locatorConfig.getString("server-port")
val scheme = locatorConfig.getString("uri-scheme")
val routingPolicy = locatorConfig.getString("routing-policy")
val zookeeperUri = s"$serverHostname:$serverPort"
val zookeeperServicesPath = "/lagom/services"
private val zookeeperClient: CuratorFramework =
CuratorFrameworkFactory.newClient(zookeeperUri, new ExponentialBackoffRetry(1000, 3))
private val serviceDiscovery: ServiceDiscovery[String] =
ServiceDiscoveryBuilder
.builder(classOf[String])
.client(zookeeperClient)
.basePath(zookeeperServicesPath)
.build()
zookeeperClient.start()
serviceDiscovery.start()
override def doWithService[T](name: String, serviceCall: Call[_, _])(block: (URI) => Future[T])(implicit ec: ExecutionContext): Future[Option[T]] = {
locate(name).flatMap { uriOpt =>
uriOpt.fold(Future.successful(Option.empty[T])) { uri =>
block.apply(uri).map(Option.apply)
}
}
}
override def locate(name: String): Future[Option[URI]] = {
val instances = serviceDiscovery.queryForInstances(name).asScala.toList
Future.successful(
instances.size match {
case 0 => None
case 1 => toURIs(instances).headOption
case _ => instancePicker(name, instances)
})
}
override def locate(name: String, serviceCall: Call[_, _]): Future[Option[URI]] =
locate(name)
override def close(): Unit = {
CloseableUtils.closeQuietly(serviceDiscovery)
CloseableUtils.closeQuietly(zookeeperClient)
}
private val roundRobinIndexFor: Map[String, Int] = new ConcurrentHashMap[String, Int]().asScala
private val instancePicker: InstancePicker[String] = routingPolicy match {
case "first" =>
(_, instances) => Some(pickFirstInstance(instances))
case "random" =>
(_, instances) => Some(pickRandomInstance(instances))
case "round-robin" =>
(name, instances) => Some(pickRoundRobinInstance(name, instances))
case unknown =>
throw new BadValue("lagom.discovery.zookeeper.routing-policy", s"'$unknown' is not a valid routing algorithm")
}
private def pickFirstInstance(services: List[ServiceInstance[String]]): URI = {
assert(services.size > 1)
toURIs(services).sortWith(_.toString < _.toString).head
}
private def pickRandomInstance(services: List[ServiceInstance[String]]): URI = {
assert(services.size > 1)
toURIs(services).sortWith(_.toString < _.toString).apply(Random.nextInt(services.size - 1))
}
private def pickRoundRobinInstance(name: String, services: List[ServiceInstance[String]]): URI = {
assert(services.size > 1)
roundRobinIndexFor.putIfAbsent(name, 0)
val sortedServices = toURIs(services).sortWith(_.toString < _.toString)
val currentIndex = roundRobinIndexFor(name)
val nextIndex =
if (sortedServices.size > currentIndex + 1) currentIndex + 1
else 0
roundRobinIndexFor += (name -> nextIndex)
sortedServices(currentIndex)
}
private def toURIs(services: List[ServiceInstance[String]]): List[URI] =
services.map { service =>
val address = service.getAddress
val serviceAddress =
if (address == "" || address == "localhost") InetAddress.getLoopbackAddress.getHostAddress
else address
new URI(s"$scheme://$serviceAddress:${service.getPort}")
}
}
object MyServiceLocator {
type InstancePicker[T] = (String, List[ServiceInstance[T]]) => Option[URI]
}
import com.lightbend.lagom.scaladsl.api.ServiceInfo
import org.apache.curator.x.discovery.{ServiceInstance, UriSpec}
import play.api.Configuration
import play.api.inject.ApplicationLifecycle
import scala.concurrent.Future
trait MyServiceLocatorComponent {
def applicationLifecycle: ApplicationLifecycle
def configuration: Configuration
def serviceInfo: ServiceInfo
def serviceLocator = new MyServiceLocator(configuration.underlying)
// docker injects this env variable and makes sure that other services
// can reach this container under this name
val hostname = System.getenv("HOSTNAME")
val name = serviceInfo.serviceName
val id = s"$name-$hostname"
val registry = new MyServiceRegistry(serviceLocator.zookeeperUri, serviceLocator.zookeeperServicesPath)
val serviceInstance = ServiceInstance.builder[String]()
.name(name)
.id(id)
.address(hostname)
.port(9000)
.uriSpec(new UriSpec("{scheme}://{serviceAddress}:{servicePort}"))
.build()
registry.register(serviceInstance)
applicationLifecycle.addStopHook(() => {
registry.unregister(serviceInstance)
Future.successful()
})
}
import java.io.Closeable
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.utils.CloseableUtils
import org.apache.curator.x.discovery.{ServiceDiscoveryBuilder, ServiceInstance}
class MyServiceRegistry(val zookeeperUrl: String, val zookeeperServicesPath: String) extends Closeable {
val zkClient = CuratorFrameworkFactory.newClient(zookeeperUrl, new ExponentialBackoffRetry(1000, 3))
zkClient.start()
val serviceDiscovery = ServiceDiscoveryBuilder.builder(classOf[String]).client(zkClient).basePath(zookeeperServicesPath).build
serviceDiscovery.start()
def register(serviceInstance: ServiceInstance[String]): Unit = {
serviceDiscovery.registerService(serviceInstance)
}
def unregister(serviceInstance: ServiceInstance[String]): Unit = {
serviceDiscovery.unregisterService(serviceInstance)
}
override def close(): Unit = {
CloseableUtils.closeQuietly(serviceDiscovery)
CloseableUtils.closeQuietly(zkClient)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment