Skip to content

Instantly share code, notes, and snippets.

View talfco's full-sized avatar

Felix Kuestahler talfco

View GitHub Profile
public String persist(byte[] msg, String index, String type, String id) throws IOException {
if (checkForAvroSingleObjectEncoding(msg)) {
String jsonDoc = convertAvroBinaryToJSON(msg);
esPersistencyManager.createUpdateDocument(index,type,jsonDoc,id);
return jsonDoc;
}
else {
logger.error("Received message wasn't Avro Single Object encoded");
throw new IOException("Received message wasn't Avro Single Object encoded");
}
public String persist(byte[] msg, String index, String type, String id) throws IOException {
if (checkForAvroSingleObjectEncoding(msg)) {
String jsonDoc = convertAvroBinaryToJSON(msg);
esPersistencyManager.createUpdateDocument(index,type,jsonDoc,id);
return jsonDoc;
}
else {
logger.error("Received message wasn't Avro Single Object encoded");
throw new IOException("Received message wasn't Avro Single Object encoded");
}
private String convertAvroBinaryToJSON(byte[] msg) throws IOException {
long msgFingerprint = getAvroFingerprint(msg);
Schema mgsSchema = registry.getSchema(msgFingerprint);
byte[] payload = extractPayload(msg);
DatumReader<Object> reader = null;
if (msgFingerprint == this.backendComponentSchemaFingerprint)
reader = new GenericDatumReader<>(mgsSchema);
else
reader = new GenericDatumReader<>(mgsSchema,this.backendComponentSchema);
{ "namespace": "business.model.strategy",
"type": "record",
"doc" : "Schema for the Coporate Business Model Strategy Part",
"name": "BusinessModelStrategy",
"fields": [
{ "name": "avro_fingerprint", "type": "long"},
{ "name": "index_ipid", "type": "string"},
{ "name": "last_update_timestamp", "type": "long", "logical-type":"time-micros"},
{ "name": "last_update_loginid", "type": "string"},
{ "name": "client_ipid", "type": "string"},
public JsonObject readDocumentByIdAsObject(String index, String id) throws IOException{
return esClient.execute(new Get.Builder(index, id).build()).getJsonObject();
}
@talfco
talfco / createUpdateDocument.java
Last active May 21, 2020 08:26
Updating a ElasticSearch Document
public void createUpdateDocument(String index, String type, String docJson, String id) throws IOException {
Index esIndex;
if (id == null) {
esIndex = new Index.Builder(docJson).index(index).type(type).build();
} else {
esIndex = new Index.Builder(docJson).index(index).type(type).id(id).build();
}
JestResult jestResult = esClient.execute(esIndex);
if(jestResult.isSucceeded()) {
logger.info("Document persisted");
package net.cloudburo.avro.registry;
import com.google.gson.JsonObject;
import org.apache.avro.Schema;
import net.cloudburo.elasticsearch.ESPersistencyManager;
public class ElasticSearchSchemaRegistry extends SchemaRegistry {
private static final String esIndex = "avroschema";
@talfco
talfco / avro-message-schema.json
Last active May 21, 2020 07:08
A simple schema for a Avro Message
{ "namespace": "business.model.strategy",
"type": "record",
"doc" : "A simple Avro example schema",
"name": "Person",
"fields": [
{ "name": "personid", "type": "long",
"name": "lastname", "type": "string",
"name": "surname", "type": [ "null", { "type": "string" }],"default": null}
]
}
@talfco
talfco / maven.pom
Last active May 21, 2020 06:36
Avro Compiler in Maven.pom
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.9.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
@talfco
talfco / SchemaRegistry.java
Last active May 21, 2020 06:38
Avro Schema Registry Abstract class
import org.apache.avro.Schema;
import org.apache.avro.SchemaNormalization;
public abstract class SchemaRegistry {
public abstract long registerSchema(Schema schema) throws IOException;
public abstract Schema getSchema(long fingerprint) throws IOException;
public long getSchemaFingerprint(Schema schema) {
return SchemaNormalization.parsingFingerprint64(schema);