Skip to content

Instantly share code, notes, and snippets.

@fubhy
Last active July 6, 2018 17:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fubhy/d30944689ae6733ccbb37124e7f0d5ed to your computer and use it in GitHub Desktop.
Save fubhy/d30944689ae6733ccbb37124e7f0d5ed to your computer and use it in GitHub Desktop.
Avro serializer with cached schema registry
const MAGIC_BYTE = 0;
class AvroSerializer {
private schemaRegistry;
constructor(schemaRegistry) {
this.schemaRegistry = schemaRegistry;
}
public async decode(message): Promise<string> {
if (message[0] !== MAGIC_BYTE) {
throw new Error('Message does not start with magic byte.');
}
const schemaId = message.readInt32BE(1);
const type = await this.schemaRegistry.getById(schemaId);
return type.decode(message, 5).value;
}
}
export default AvroSerializer;
import axios from 'axios';
import { parse } from 'avsc';
const VALID_METHODS = ['GET', 'POST', 'PUT', 'DELETE'];
const ACCEPT_HEADER = 'application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json';
class CachedSchemaRegistryClient {
private url: string;
private idToSchema: Map<number, string> = new Map();
private subjectToSchemaId: Map<string, Map<string, number>> = new Map();
private subjectToSchemaVersion: Map<string, Map<string, number>> = new Map();
constructor(
url,
caLocation?: string,
certLocation?: string,
keyLocation?: string,
) {
this.url = url;
// TODO: Handle certificates.
}
private sendRequest(
url: string,
method: string = 'GET',
body?: any,
headers: any = {}
) {
if (VALID_METHODS.indexOf(method) === -1) {
throw new Error(`Method "${method}" is invalid. Valid methods include "${VALID_METHODS.join(", ")}".`);
}
const headerDefaults = {
'Accept': ACCEPT_HEADER,
};
const headerDefaultsForBody = body ? {
'Content-Length': Object.keys(body).length.toString() + '',
'Content-Type': 'application/vnd.schemaregistry.v1+json',
} : {};
return axios.request({
url,
method,
headers: {
headerDefaults,
headerDefaultsForBody,
...headers,
},
data: body,
});
}
private addToCache(cache, subject, schema, value): void {
if (!cache.has(subject)) {
cache.set(subject, new Map());
}
cache.get(subject).set(schema, value);
}
private cacheSchema(schema, schemaId, subject?, version?): void {
// Make sure we don't overwrite anything.
let actualSchema = schema;
if (this.idToSchema.has(schemaId)) {
actualSchema = this.idToSchema.get(schemaId);
}
else {
this.idToSchema.set(schemaId, actualSchema);
}
if (subject) {
this.addToCache(
this.subjectToSchemaId,
subject,
actualSchema,
schemaId
);
if (version) {
this.addToCache(
this.subjectToSchemaVersion,
subject,
actualSchema,
version,
);
}
}
return actualSchema;
}
public async register(subject: string, avroSchema: string): Promise<number> {
const schemasToId = this.subjectToSchemaId.get(subject);
const schemaId = schemasToId && schemasToId.get(avroSchema);
if (typeof schemaId !== 'undefined') {
return schemaId;
}
const url = `${this.url}/subjects/${subject}/versions`;
const body = {
schema: JSON.stringify(avroSchema),
};
const response = await this.sendRequest(url, 'POST', body);
if (response.status === 409) {
throw new Error('Incompatible Avro schema.');
}
if (response.status === 422) {
throw new Error('Invalid Avro schema.');
}
if (!(response.status >= 200 && response.status <= 299)) {
throw new Error('Unable to register schema.');
}
const responseSchemaId = response.data['id'] as number;
this.cacheSchema(avroSchema, responseSchemaId, subject);
return responseSchemaId;
}
public async getById(schemaId: number) {
if (this.idToSchema.has(schemaId)) {
return this.idToSchema.get(schemaId);
}
const url = `${this.url}/schemas/ids/${schemaId}`;
const response = await this.sendRequest(url);
if (response.status === 404) {
// TODO: Add logging.
return null;
}
if (!(response.status >= 200 && response.status <= 299)) {
// TODO: Add logging.
return null;
}
try {
const schema = parse(response.data['schema'] as string);
return this.cacheSchema(schema, schemaId);
}
catch (e) {
throw new Error(`Received bad schema (id ${schemaId}) from registry.`);
}
}
public async getLatestSchema(subject: string): Promise<{
id: number,
schema: string,
version: number,
} | null> {
const url = `${this.url}/subjects/${subject}/versions/latest`;
const response = await this.sendRequest(url);
if (response.status === 4040) {
// TODO: Add logging.
return null;
}
if (response.status === 422) {
// TODO: Add logging.
return null;
}
if (!(response.status >= 200 && response.status <= 299)) {
// TODO: Add logging.
return null;
}
const schemaId = parseInt(response.data['id'] as string, 10);
const version = parseInt(response.data['version'] as string, 10);
const schema = this.idToSchema.has(schemaId) ?
this.idToSchema.get(schemaId) :
parse(response.data['schema']);
this.cacheSchema(schema, schemaId, subject, version)
return { id: schemaId, schema, version };
}
public async getVersion(subject: string, avroSchema: string): Promise<number | null> {
const schemasToVersion = this.subjectToSchemaVersion.get(subject);
const version = schemasToVersion && schemasToVersion.get(avroSchema);
if (typeof version !== 'undefined') {
return version;
}
const url = `${this.url}/subjects/${subject}`;
const body = {
schema: JSON.stringify(avroSchema),
};
const response = await this.sendRequest(url, 'POST', body);
if (response.status === 404) {
// TODO: Add logging.
return null;
}
if (!(response.status >= 200 && response.status <= 299)) {
// TODO: Add logging.
return null;
}
const schemaId = parseInt(response.data['id'] as string, 10);
const responseVersion = parseInt(response.data['version'] as string, 10);
this.cacheSchema(avroSchema, schemaId, subject, responseVersion);
return responseVersion;
}
public async testCompatibility(subject: string, avroSchema: string, version: string = 'latest'): Promise<boolean> {
const url = `${this.url}/compatibility/subjects/${subject}/versions/${version}`;
const body = {
schema: JSON.stringify(avroSchema),
};
const response = await this.sendRequest(url, 'POST', body);
if (response.status === 404) {
// TODO: Add logging.
return false;
}
if (response.status === 422) {
// TODO: Add logging.
return false;
}
if (response.status >= 200 && response.status <= 299) {
return !!response.data['is_compatible'];
}
// TODO: Add logging.
return false;
}
}
export default CachedSchemaRegistryClient;
import * as Kafka from 'node-rdkafka';
import * as Rx from 'rxjs';
import { multicast, switchMap } from 'rxjs/operators';
import CachedSchemaRegistryClient from './kafka/CachedSchemaRegistryClient';
import AvroSerializer from './kafka/AvroSerializer';
const serializer = new AvroSerializer(new CachedSchemaRegistryClient(
process.env.SCHEMA_REGISTRY_URL,
));
export default function consumeRecords(serializer) {
const record$ = new Rx.Observable(subscriber => {
try {
const consumer = new Kafka.KafkaConsumer({
'group.id': 'meltspot-graphql-records',
'socket.keepalive.enable': true,
'enable.auto.commit': true,
'metadata.broker.list': process.env.KAFKA_BROKER
}, {});
consumer.on('ready', () => {
consumer.subscribe(['record']);
consumer.consume();
});
consumer.on('data', value => subscriber.next(value));
consumer.on('error', error => subscriber.error(error));
consumer.connect({}, error => error && subscriber.error(error));
}
catch (error) {
subscriber.error(error);
}
});
// Decode avro messages.
const decoded$ = record$.pipe(switchMap(async (item: any) => {
const value = await (item.value && serializer.decode(item.value));
const key = await (item.key && serializer.decode(item.key));
return {
...item,
parsedValue: value,
parsedKey: key,
};
}));
// Share (multicast) the records instead of creating non-shared
// observables for each client.
const multi$ = decoded$.pipe(multicast(new Rx.ReplaySubject(1))) as Rx.ConnectableObservable<any>;
multi$.connect();
return multi$;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment