Skip to content

Instantly share code, notes, and snippets.

@erewok
Last active March 7, 2023 22:37
Show Gist options
  • Save erewok/7cf27c54673f0e6fa0265e5ef24df6ff to your computer and use it in GitHub Desktop.
Save erewok/7cf27c54673f0e6fa0265e5ef24df6ff to your computer and use it in GitHub Desktop.
Azure service-bus Receiver with Opentelemetry Tracing Bug
[tool.poetry]
name = "service-bus-otel-test"
version = "1.0.0"
description = "Service Bus otel test"
authors = ["Erik Aker <eraker@gmail.com>"]
[tool.poetry.dependencies]
python = "^3.11"
azure-core = "^1.26.3"
azure-servicebus = "7.8.2"
# all otel stuff in here
opentelemetry-api = "^1.16.0"
opentelemetry-sdk = "^1.16.0"
# exporter
opentelemetry-exporter-otlp-proto-grpc = "^1.16.0"
# instrumenters
azure-core-tracing-opentelemetry = "^1.0.0b9"
uamqp = "^1.6.4"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
from contextlib import asynccontextmanager
import datetime
import logging.config
import os
from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan
from azure.core.settings import settings
from azure.servicebus import ServiceBusMessage
from azure.servicebus.aio import ServiceBusClient
# otel libs
from opentelemetry.sdk import resources # type: ignore
from opentelemetry.sdk.trace import TracerProvider # type: ignore
from opentelemetry import trace # type: ignore
# unsued but can export to a collector
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter # type: ignore
from opentelemetry.sdk.trace.export import BatchSpanProcessor # type: ignore
CONNECTION_STR = os.environ['SERVICEBUS_CONNECTION_STR']
QUEUE_NAME = os.environ["SERVICEBUS_QUEUE_NAME"]
tracer: trace.Tracer = trace.get_tracer(__name__)
logging.config.dictConfig({
"version": 1,
"disable_existing_loggers": False,
"handlers": {
"default": {
"level": "WARN",
"class": "logging.StreamHandler"
}
},
"loggers": {"": {"handlers": ["default"], "level": "INFO"}},
"azure": {"handlers": ["default"], "level": "WARN"},
"uamqp": {"handlers": ["default"], "level": "WARN"},
})
def setup_otel():
# Declare OpenTelemetry as enabled tracing plugin for Azure SDKs
settings.tracing_implementation = OpenTelemetrySpan
# Service name is required for most backends
resource = resources.Resource(
attributes={
resources.DEPLOYMENT_ENVIRONMENT: "local",
resources.SERVICE_NAME: "service-bus-otel-test",
resources.SERVICE_VERSION: "0.0.1",
}
)
# Set up traces provider
provider = TracerProvider(resource=resource)
# Can export to a collector (unnecessary for this demo)
# processor = BatchSpanProcessor(
# OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
# )
# provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
@asynccontextmanager
async def start_span(
tracer: trace.Tracer,
name: str,
):
"""
Adds Parent Span information from `diagnostic_id` if otel enabled.
"""
with tracer.start_as_current_span(
name, kind=trace.SpanKind.CONSUMER
) as current_span:
yield current_span
async def on_receive(msg: str):
async with start_span(tracer, "service-bus-otel-test") as span:
debug = f"MSG RECEIVED [{msg}] at {datetime.datetime.utcnow()}"
print(debug)
async def receiver():
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
async with servicebus_client:
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
async with receiver:
async for msg in receiver:
await on_receive(msg)
await receiver.complete_message(msg)
async def sender(msg: str, delay: int):
"""
msg: message to send
delay: seconds to delay delivery
"""
message = ServiceBusMessage(msg)
now = datetime.datetime.now(tz=datetime.timezone.utc)
scheduled_time_utc = now + datetime.timedelta(seconds=delay)
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
async with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
async with sender:
await sender.schedule_messages(message, scheduled_time_utc)
async def main(args):
setup_otel()
if args.send:
msg = args.send
assert bool(msg), "Message is not empty"
debug = f"SENDING MESSAGE [{msg}] WITH DELAY {args.delay} at {datetime.datetime.utcnow()}"
print(debug)
await sender(msg, delay=args.delay)
else:
print("STARTING RECEIVER")
await receiver()
if __name__ == "__main__":
# TO SEND:
# $ python service-bus-otel-test.py -s test20 -d 20
# TO RECEIVE:
# $ python service-bus-otel-test.py -r
import argparse
import asyncio
parser = argparse.ArgumentParser("Azure Queues Test")
parser.add_argument("-s", "--send", type=str, help="Send value into the queue")
parser.add_argument("-d", "--delay", type=int, help="Message delay", default=0)
parser.add_argument("-r", "--receiver", action="store_true", help="run receiver")
args = parser.parse_args()
asyncio.run(main(args))
@erewok
Copy link
Author

erewok commented Mar 7, 2023

Run Sender in one terminal with:

❯ python service-bus-otel-test.py -s test0 -d 0
SENDING MESSAGE [test0] WITH DELAY 0 at 2023-03-07 20:05:06.987793

Run Receiver in another terminal and see:

❯ python service-bus-otel-test.py -r
STARTING RECEIVER
/my-repo/.venv/lib/python3.11/site-packages/azure/servicebus/_common/utils.py:287: RuntimeWarning: coroutine 'BaseHandler._add_span_request_attributes' was never awaited
  receiver._add_span_request_attributes(receive_span)  # type: ignore  # pylint: disable=protected-access
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
MSG RECEIVED [test0] at 2023-03-07 20:05:08.326821

@erewok
Copy link
Author

erewok commented Mar 7, 2023

The problem appears to be that this method is async but that it's called by this function with no await

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment