Skip to content

Instantly share code, notes, and snippets.

@danthelion
Last active October 20, 2022 11:55
Show Gist options
  • Save danthelion/cb972b669bbbfcaea0461f4fe5b37754 to your computer and use it in GitHub Desktop.
Save danthelion/cb972b669bbbfcaea0461f4fe5b37754 to your computer and use it in GitHub Desktop.
check schema before producing
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
class User(BaseModel):
ts: str
name: str
country: str
age: int
for i in range(100):
data = User(
ts=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
name=fake.name(),
country=fake.country(),
age=fake.random_int(min=0, max=100),
)
# Get the latest schema from the registry and serialize the message with it
compatibility = client.test_compatibility("USERS-value", data.schema_json(), schema_type="JSON")
if not compatibility:
raise Exception("Schema is not compatible with the latest version")
producer.send(topic=TOPIC, key=str(i).encode("utf-8"), value=json.dumps(data.dict()).encode("utf-8"))
print(f"Sent message {i} -> {data}")
sleep(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment