Skip to content

Instantly share code, notes, and snippets.

@Lancetnik
Created September 18, 2023 17:36
Show Gist options
  • Save Lancetnik/e479184d08f3322a705235147254c5c5 to your computer and use it in GitHub Desktop.
Save Lancetnik/e479184d08f3322a705235147254c5c5 to your computer and use it in GitHub Desktop.
FastStream Kafka RPC
import asyncio
from typing import Annotated, Any
from uuid import uuid4
from faststream import FastStream, Context, context
from faststream.kafka import KafkaBroker
context.set_global("responses", {})
Responses = Annotated[
dict[str, asyncio.Future[Any]],
Context("responses"),
]
CorrelationId = Annotated[str, Context("message.correlation_id")]
broker = KafkaBroker()
app = FastStream(broker)
@broker.subscriber("responses")
async def handle_responses(
msg: Any,
cor_id: CorrelationId,
responses: Responses,
):
if (future := responses.get(cor_id)):
future.set_result(msg)
@broker.subscriber("in")
async def just_handler():
return "Response"
@app.after_startup
async def rpc_pub(responses: Responses):
cor_id = str(uuid4())
future = responses[cor_id] = asyncio.Future()
await broker.publish(
message="test",
topic="in",
reply_to="responses",
correlation_id=cor_id,
)
result = await future
del responses[cor_id]
assert result == "Response"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment