Skip to content

Instantly share code, notes, and snippets.

@dpsoft
Last active July 9, 2020 14:02
Show Gist options
  • Save dpsoft/ecafda65ba002d3225a649cc5f6dac89 to your computer and use it in GitHub Desktop.
Save dpsoft/ecafda65ba002d3225a649cc5f6dac89 to your computer and use it in GitHub Desktop.
Armeria Instrumentation(first working approach)
# =================================== #
# kamon-armeria reference configuration #
# =================================== #
kamon.instrumentation.armeria {
# Settings to control the HTTP Server instrumentation.
#
# IMPORTANT: Besides the "initial-operation-name" and "unhandled-operation-name" settings, the entire configuration of
# the HTTP Server Instrumentation is based on the constructs provided by the Kamon Instrumentation Common library
# which will always fallback to the settings found under the "kamon.instrumentation.http-server.default" path. The
# default settings have been included here to make them easy to find and understand in the context of this project and
# commented out so that any changes to the default settings will actually have effect.
#
server {
#
# Configuration for HTTP context propagation.
#
propagation {
# Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if
# propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can
# be created and reported but will not be linked across boundaries nor take trace identifiers from tags).
#enabled = yes
# HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default
# configuration for more details on how to configure the detault HTTP context propagation.
channel = "default"
}
#
# Configuration for HTTP server metrics collection.
#
metrics {
# Enables collection of HTTP server metrics. When enabled the following metrics will be collected, assuming
# that the instrumentation is fully compliant:
#
# - http.server.requets
# - http.server.request.active
# - http.server.request.size
# - http.server.response.size
# - http.server.connection.lifetime
# - http.server.connection.usage
# - http.server.connection.open
#
# All metrics have at least three tags: component, interface and port. Additionally, the http.server.requests
# metric will also have a status_code tag with the status code group (1xx, 2xx and so on).
#
#enabled = yes
}
#
# Configuration for HTTP request tracing.
#
tracing {
# Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests
# and finish them when the response is sent back to the clients.
#enabled = yes
# Select a context tag that provides a preferred trace identifier. The preferred trace identifier will be used
# only if all these conditions are met:
# - the context tag is present.
# - there is no parent Span on the incoming context (i.e. this is the first service on the trace).
# - the identifier is valid in accordance to the identity provider.
#preferred-trace-id-tag = "none"
# Enables collection of span metrics using the `span.processing-time` metric.
#span-metrics = on
# Select which tags should be included as span and span metric tags. The possible options are:
# - span: the tag is added as a Span tag (i.e. using span.tag(...))
# - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...))
# - off: the tag is not used.
#
tags {
# Use the http.url tag.
#url = span
# Use the http.method tag.
#method = metric
# Use the http.status_code tag.
#status-code = metric
# Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type
# tag from the context into the HTTP Server Span crekamon.trace.sampler = alwaysated by the instrumentation, the following configuration
# should be added:
#
# from-context {
# customer_type = span
# }
#
from-context {
}
}
# Controls writing trace and span identifiers to HTTP response headers sent by the instrumented servers. The
# configuration can be set to either "none" to disable writing the identifiers on the response headers or to
# the header name to be used when writing the identifiers.
response-headers {
# HTTP response header name for the trace identifier, or "none" to disable it.
#trace-id = "trace-id"
# HTTP response header name for the server span identifier, or "none" to disable it.
#span-id = none
}
# Custom mappings between routes and operation names.
operations {
# The default operation name to be used when creating Spans to handle the HTTP server requests. In most
# cases it is not possible to define an operation name right at the moment of starting the HTTP server Span
# and in those cases, this operation name will be initially assigned to the Span. Instrumentation authors
# should do their best effort to provide a suitable operation name or make use of the "mappings" facilities.
default = "http.server.request"
# The operation name to be assigned when an application cannot find any route/endpoint/controller to handle
# a given request. Depending on the instrumented framework, it might be possible to apply this operation
# name automatically or not, check the frameworks' instrumentation docs for more details.
unhandled = "unhandled"
# FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms:
# - default: Uses the set default operation name
# - method: Uses the request HTTP method as the operation name.
#
name-generator = "kamon.armeria.DefaultNameGenerator"
# Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode
# instrumentation is not able to provide a sensible operation name that is free of high cardinality values.
# For example, with the following configuration:
# mappings {
# "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile"
# "/events/*/rsvps" = "EventRSVPs"
# }
#
# Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have
# the same operation name "/organization/:orgID/user/:userID/profile".
#
# Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation
# name "EventRSVPs".
#
# The patterns are expressed as globs and the operation names are free form.
#
mappings {
}
}
}
}
# Settings to control the HTTP Client instrumentation
#
# IMPORTANT: The entire configuration of the HTTP Client Instrumentation is based on the constructs provided by the
# Kamon Instrumentation Common library which will always fallback to the settings found under the
# "kamon.instrumentation.http-client.default" path. The default settings have been included here to make them easy to
# find and understand in the context of this project and commented out so that any changes to the default settings
# will actually have effect.
#
client {
#
# Configuration for HTTP context propagation.
#
propagation {
# Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if
# propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can
# be created and reported but will not be linked across boundaries nor take trace identifiers from tags).
#enabled = yes
# HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default
# configuration for more details on how to configure the detault HTTP context propagation.
#channel = "default"
}
tracing {
# Enables HTTP request tracing. When enabled the instrumentation will create Spans for outgoing requests
# and finish them when the response is received from the server.
#enabled = yes
# Enables collection of span metrics using the `span.processing-time` metric.
#span-metrics = on
# Select which tags should be included as span and span metric tags. The possible options are:
# - span: the tag is added as a Span tag (i.e. using span.tag(...))
# - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...))
# - off: the tag is not used.
#
tags {
# Use the http.url tag.
#url = span
# Use the http.method tag.
#method = metric
# Use the http.status_code tag.
#status-code = metric
# Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type
# tag from the context into the HTTP Server Span created by the instrumentation, the following configuration
# should be added:
#
# from-context {
# customer_type = span
# }
#
from-context {
}
}
operations {
# The default operation name to be used when creating Spans to handle the HTTP client requests. The HTTP
# Client instrumentation will always try to use the HTTP Operation Name Generator configured bellow to get
# a name, but if it fails to generate it then this name will be used.
#default = "http.client.request"
# FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms:
# - hostname: Uses the request Host as the operation name.
# - method: Uses the request HTTP method as the operation name.
#
name-generator = "kamon.armeria.PathOperationNameGenerator"
}
}
}
}
kanela {
show-banner = false
modules {
armeria-server {
name = "Armeria Server Instrumentation"
stoppable = true
instrumentations = [
"kamon.armeria.instrumentation.server.ArmeriaHttpServerInstrumentation"
]
within = [ "io.netty..*", "com.linecorp.armeria..*"]
}
}
}
package kamon.armeria
import kamon.instrumentation.http.HttpMessage.Request
import kamon.instrumentation.http.HttpOperationNameGenerator
object Armeria
class DefaultNameGenerator extends HttpOperationNameGenerator {
import java.util.Locale
import scala.collection.concurrent.TrieMap
private val localCache = TrieMap.empty[String, String]
private val normalizePattern = """\$([^<]+)<[^>]+>""".r
def name(request: Request): Option[String] = {
Some(
localCache.getOrElseUpdate(s"${request.method}${request.path}", {
// Convert paths of form GET /foo/bar/$paramname<regexp>/blah to foo.bar.paramname.blah.get
val p = normalizePattern.replaceAllIn(request.path, "$1").replace('/', '.').dropWhile(_ == '.')
val normalisedPath = {
if (p.lastOption.exists(_ != '.')) s"$p."
else p
}
s"$normalisedPath${request.method.toLowerCase(Locale.ENGLISH)}"
})
)
}
}
package kamon.armeria.instrumentation.server
import com.linecorp.armeria.server._
import io.netty.channel.ChannelPipeline
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.api.instrumentation.bridge.FieldBridge
import kanela.agent.libs.net.bytebuddy.asm.Advice
class ArmeriaHttpServerInstrumentation extends InstrumentationBuilder {
onSubTypesOf("io.netty.channel.Channel")
.mixin(classOf[HasRequestProcessingContextMixin])
onType("com.linecorp.armeria.server.HttpServerPipelineConfigurator")
.bridge(classOf[InternalState])
.advise(method("configureHttp"), classOf[ConfigureMethodAdvisor])
}
class ConfigureMethodAdvisor
object ConfigureMethodAdvisor {
@Advice.OnMethodExit
def around(@Advice.This configurer:Object,
@Advice.Argument(0) p: ChannelPipeline): Unit = {
val serverPort = configurer.asInstanceOf[InternalState].getServerPort
val hostName = serverPort.localAddress().getHostName
val port = serverPort.localAddress().getPort
p.addBefore("HttpServerHandler#0", "armeria-http-server-request-handler", ArmeriaHttpServerRequestHandler(hostName, port))
p.addLast("armeria-http-server-response-handler", ArmeriaHttpServerResponseHandler())
}
}
trait HasRequestProcessingContext {
def setRequestProcessingContext(requestProcessingContext: RequestProcessingContext)
def getRequestProcessingContext:RequestProcessingContext
}
class HasRequestProcessingContextMixin extends HasRequestProcessingContext {
@volatile var requestProcessingContext:RequestProcessingContext = _
override def setRequestProcessingContext(requestProcessing: RequestProcessingContext): Unit =
requestProcessingContext = requestProcessing
override def getRequestProcessingContext:RequestProcessingContext =
requestProcessingContext
}
trait InternalState {
@FieldBridge("port")
def getServerPort: ServerPort
}
package com.linecorp.armeria.server
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter, ChannelOutboundHandlerAdapter, ChannelPromise}
import io.netty.handler.codec.http.HttpResponse
import kamon.Kamon
import kamon.armeria.instrumentation.server.HasRequestProcessingContext
import kamon.context.Storage
import kamon.instrumentation.http.HttpServerInstrumentation.RequestHandler
import kamon.instrumentation.http.{HttpMessage, HttpServerInstrumentation}
import kanela.agent.util.log.Logger
case class RequestProcessingContext(requestHandler: RequestHandler, scope: Storage.Scope)
final class ArmeriaHttpServerRequestHandler(serverHost:String, serverPort:Int) extends ChannelInboundHandlerAdapter {
private val httpServerConfig = Kamon.config().getConfig("kamon.instrumentation.armeria.server")
private val serverInstrumentation = HttpServerInstrumentation.from(httpServerConfig, "armeria.server", serverHost, serverPort)
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
if (!(msg.isInstanceOf[DecodedHttpRequest])) ctx.fireChannelRead(msg)
else {
val processingContext = ctx.channel().asInstanceOf[HasRequestProcessingContext]
val request = msg.asInstanceOf[DecodedHttpRequest]
val serverRequestHandler = serverInstrumentation.createHandler(toRequest(request, serverHost, serverPort))
processingContext.setRequestProcessingContext(RequestProcessingContext(serverRequestHandler, Kamon.storeContext(serverRequestHandler.context)))
Logger.info(() => s"Context Request Hash => ${serverRequestHandler.context.hashCode()}")
ctx.fireChannelRead(msg)
}
}
private def toRequest(request: DecodedHttpRequest, serverHost:String, serverPort:Int): HttpMessage.Request = new HttpMessage.Request {
import scala.jdk.CollectionConverters._
override def url: String = request.uri().toString
override def path: String = request.path()
override def method: String = request.method().name()
override def host: String = serverHost
override def port: Int = serverPort
override def read(header: String): Option[String] =
Option(request.headers().get(header))
override def readAll(): Map[String, String] =
request.headers().asScala.map(e => e.getKey.toString() -> e.getValue).toMap
}
}
object ArmeriaHttpServerRequestHandler {
def apply(serverHost: String, serverPort: Int): ArmeriaHttpServerRequestHandler =
new ArmeriaHttpServerRequestHandler(serverHost, serverPort)
}
class ArmeriaHttpServerResponseHandler extends ChannelOutboundHandlerAdapter {
override def write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise): Unit = {
if(!(msg.isInstanceOf[HttpResponse])) ctx.write(msg, promise)
else {
val response = msg.asInstanceOf[HttpResponse]
val processingContext = ctx.channel().asInstanceOf[HasRequestProcessingContext].getRequestProcessingContext
processingContext.requestHandler.buildResponse(toResponse(response), processingContext.scope.context)
Logger.info(() => s"Context Response Hash => ${processingContext.scope.context.hashCode()}")
try ctx.write(msg, promise) finally {
processingContext.requestHandler.responseSent()
processingContext.scope.close()
}
}
}
private def toResponse(response: HttpResponse): HttpMessage.ResponseBuilder[HttpResponse] = new HttpMessage.ResponseBuilder[HttpResponse] {
override def build(): HttpResponse =
response
override def statusCode: Int =
response.status().code()
override def write(header: String, value: String): Unit =
response.headers().add(header, value)
}
}
object ArmeriaHttpServerResponseHandler {
def apply(): ArmeriaHttpServerResponseHandler =
new ArmeriaHttpServerResponseHandler()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment