Skip to content

Instantly share code, notes, and snippets.

Last active July 6, 2018 17:29
Show Gist options
  • Save fubhy/d30944689ae6733ccbb37124e7f0d5ed to your computer and use it in GitHub Desktop.
Save fubhy/d30944689ae6733ccbb37124e7f0d5ed to your computer and use it in GitHub Desktop.
Avro serializer with cached schema registry
const MAGIC_BYTE = 0;
class AvroSerializer {
private schemaRegistry;
constructor(schemaRegistry) {
this.schemaRegistry = schemaRegistry;
public async decode(message): Promise<string> {
if (message[0] !== MAGIC_BYTE) {
throw new Error('Message does not start with magic byte.');
const schemaId = message.readInt32BE(1);
const type = await this.schemaRegistry.getById(schemaId);
return type.decode(message, 5).value;
export default AvroSerializer;
import axios from 'axios';
import { parse } from 'avsc';
const ACCEPT_HEADER = 'application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json';
class CachedSchemaRegistryClient {
private url: string;
private idToSchema: Map<number, string> = new Map();
private subjectToSchemaId: Map<string, Map<string, number>> = new Map();
private subjectToSchemaVersion: Map<string, Map<string, number>> = new Map();
caLocation?: string,
certLocation?: string,
keyLocation?: string,
) {
this.url = url;
// TODO: Handle certificates.
private sendRequest(
url: string,
method: string = 'GET',
body?: any,
headers: any = {}
) {
if (VALID_METHODS.indexOf(method) === -1) {
throw new Error(`Method "${method}" is invalid. Valid methods include "${VALID_METHODS.join(", ")}".`);
const headerDefaults = {
const headerDefaultsForBody = body ? {
'Content-Length': Object.keys(body).length.toString() + '',
'Content-Type': 'application/vnd.schemaregistry.v1+json',
} : {};
return axios.request({
headers: {
data: body,
private addToCache(cache, subject, schema, value): void {
if (!cache.has(subject)) {
cache.set(subject, new Map());
cache.get(subject).set(schema, value);
private cacheSchema(schema, schemaId, subject?, version?): void {
// Make sure we don't overwrite anything.
let actualSchema = schema;
if (this.idToSchema.has(schemaId)) {
actualSchema = this.idToSchema.get(schemaId);
else {
this.idToSchema.set(schemaId, actualSchema);
if (subject) {
if (version) {
return actualSchema;
public async register(subject: string, avroSchema: string): Promise<number> {
const schemasToId = this.subjectToSchemaId.get(subject);
const schemaId = schemasToId && schemasToId.get(avroSchema);
if (typeof schemaId !== 'undefined') {
return schemaId;
const url = `${this.url}/subjects/${subject}/versions`;
const body = {
schema: JSON.stringify(avroSchema),
const response = await this.sendRequest(url, 'POST', body);
if (response.status === 409) {
throw new Error('Incompatible Avro schema.');
if (response.status === 422) {
throw new Error('Invalid Avro schema.');
if (!(response.status >= 200 && response.status <= 299)) {
throw new Error('Unable to register schema.');
const responseSchemaId =['id'] as number;
this.cacheSchema(avroSchema, responseSchemaId, subject);
return responseSchemaId;
public async getById(schemaId: number) {
if (this.idToSchema.has(schemaId)) {
return this.idToSchema.get(schemaId);
const url = `${this.url}/schemas/ids/${schemaId}`;
const response = await this.sendRequest(url);
if (response.status === 404) {
// TODO: Add logging.
return null;
if (!(response.status >= 200 && response.status <= 299)) {
// TODO: Add logging.
return null;
try {
const schema = parse(['schema'] as string);
return this.cacheSchema(schema, schemaId);
catch (e) {
throw new Error(`Received bad schema (id ${schemaId}) from registry.`);
public async getLatestSchema(subject: string): Promise<{
id: number,
schema: string,
version: number,
} | null> {
const url = `${this.url}/subjects/${subject}/versions/latest`;
const response = await this.sendRequest(url);
if (response.status === 4040) {
// TODO: Add logging.
return null;
if (response.status === 422) {
// TODO: Add logging.
return null;
if (!(response.status >= 200 && response.status <= 299)) {
// TODO: Add logging.
return null;
const schemaId = parseInt(['id'] as string, 10);
const version = parseInt(['version'] as string, 10);
const schema = this.idToSchema.has(schemaId) ?
this.idToSchema.get(schemaId) :
this.cacheSchema(schema, schemaId, subject, version)
return { id: schemaId, schema, version };
public async getVersion(subject: string, avroSchema: string): Promise<number | null> {
const schemasToVersion = this.subjectToSchemaVersion.get(subject);
const version = schemasToVersion && schemasToVersion.get(avroSchema);
if (typeof version !== 'undefined') {
return version;
const url = `${this.url}/subjects/${subject}`;
const body = {
schema: JSON.stringify(avroSchema),
const response = await this.sendRequest(url, 'POST', body);
if (response.status === 404) {
// TODO: Add logging.
return null;
if (!(response.status >= 200 && response.status <= 299)) {
// TODO: Add logging.
return null;
const schemaId = parseInt(['id'] as string, 10);
const responseVersion = parseInt(['version'] as string, 10);
this.cacheSchema(avroSchema, schemaId, subject, responseVersion);
return responseVersion;
public async testCompatibility(subject: string, avroSchema: string, version: string = 'latest'): Promise<boolean> {
const url = `${this.url}/compatibility/subjects/${subject}/versions/${version}`;
const body = {
schema: JSON.stringify(avroSchema),
const response = await this.sendRequest(url, 'POST', body);
if (response.status === 404) {
// TODO: Add logging.
return false;
if (response.status === 422) {
// TODO: Add logging.
return false;
if (response.status >= 200 && response.status <= 299) {
return !!['is_compatible'];
// TODO: Add logging.
return false;
export default CachedSchemaRegistryClient;
import * as Kafka from 'node-rdkafka';
import * as Rx from 'rxjs';
import { multicast, switchMap } from 'rxjs/operators';
import CachedSchemaRegistryClient from './kafka/CachedSchemaRegistryClient';
import AvroSerializer from './kafka/AvroSerializer';
const serializer = new AvroSerializer(new CachedSchemaRegistryClient(
export default function consumeRecords(serializer) {
const record$ = new Rx.Observable(subscriber => {
try {
const consumer = new Kafka.KafkaConsumer({
'': 'meltspot-graphql-records',
'socket.keepalive.enable': true,
'': true,
'': process.env.KAFKA_BROKER
}, {});
consumer.on('ready', () => {
consumer.on('data', value =>;
consumer.on('error', error => subscriber.error(error));
consumer.connect({}, error => error && subscriber.error(error));
catch (error) {
// Decode avro messages.
const decoded$ = record$.pipe(switchMap(async (item: any) => {
const value = await (item.value && serializer.decode(item.value));
const key = await (item.key && serializer.decode(item.key));
return {
parsedValue: value,
parsedKey: key,
// Share (multicast) the records instead of creating non-shared
// observables for each client.
const multi$ = decoded$.pipe(multicast(new Rx.ReplaySubject(1))) as Rx.ConnectableObservable<any>;
return multi$;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment