public class KafkaAvroAsyncSerializer<T> : IAsyncSerializer<T> | |
{ | |
private readonly SchemaRegistryAvroObjectSerializer serializer; | |
public KafkaAvroAsyncSerializer(string schemaRegistryUrl, TokenCredential credential, string schemaGroup, Boolean autoRegisterSchemas = false) | |
{ | |
this.serializer = new SchemaRegistryAvroObjectSerializer( | |
new SchemaRegistryClient( | |
schemaRegistryUrl, | |
credential), | |
schemaGroup, | |
new SchemaRegistryAvroObjectSerializerOptions() | |
{ | |
AutoRegisterSchemas = autoRegisterSchemas | |
}); | |
} | |
public async Task<byte[]> SerializeAsync(T o, SerializationContext context) | |
{ | |
if (o == null) | |
{ | |
return null; | |
} | |
using (var stream = new MemoryStream()) | |
{ | |
await serializer.SerializeAsync(stream, o, typeof(T), CancellationToken.None); | |
return stream.ToArray(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment