Skip to content

Instantly share code, notes, and snippets.

@highsmallxu
Last active May 16, 2020 12:44
Show Gist options
  • Save highsmallxu/b47e2e58902bcd9d94322bdfc763b558 to your computer and use it in GitHub Desktop.
Save highsmallxu/b47e2e58902bcd9d94322bdfc763b558 to your computer and use it in GitHub Desktop.
import fastavro
import json
json_stream = faust_app.topic('my-json-topic', value_type=bytes)
# manual deseralization of JSON
@app.agent(json_stream)
async def processor(records):
async for record in records:
des_data = json.loads(record)
# manual deseralization of AVRO
avro_stream = faust_app.topic('my-avro-topic', value_type=bytes)
avro_schema_str = {
'doc': 'A schema',
'name': 'Transaction',
'namespace': 'test',
'type': 'record',
'fields': [
{'name': 'amount', 'type': 'float'},
{'name': 'receiver', 'type': 'str'},
{'name': 'date', 'type': 'str'},
],
}
def fast_avro_decode(schema, encoded_message):
stringio = io.BytesIO(encoded_message)
return fastavro.schemaless_reader(stringio, schema)
@app.agent(avro_stream)
async def processor(records):
async for record in records:
schema = fastavro.parse_schema(avro_schema_str)
des_data = fast_avro_decode(schema, record)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment