Skip to content

Instantly share code, notes, and snippets.

@srijiths
Last active March 17, 2018 06:49
Show Gist options
  • Save srijiths/a50f75f40116a30b6a8ca2130c4bb874 to your computer and use it in GitHub Desktop.
Save srijiths/a50f75f40116a30b6a8ca2130c4bb874 to your computer and use it in GitHub Desktop.
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.confluent.connect.avro.AvroData;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
/**
*
* Schema Utils converts avro schema and raw json data to connect schema and
* connect data format. This is required for source connector.
*
* @author Sreejith
*
*/
public class SchemaUtils {
private static final Logger logger = LoggerFactory.getLogger(SchemaUtils.class);
List<String> schemaUrls = new ArrayList<String>();
SchemaRegistryClient schemaClient;
AvroData avroDataUtils;
JsonAvroConverter jsonAvroConverter;
/**
* Initialize Schema Registry client with given urls. URL's can be one or
* more.
*
* @param schemaUrls
*/
public SchemaUtils(List<String> schemaUrls) {
this.schemaUrls = schemaUrls;
schemaClient = new CachedSchemaRegistryClient(schemaUrls, 1000);
avroDataUtils = new AvroData(1000);
jsonAvroConverter = new JsonAvroConverter();
}
/**
* Get Schema from registry based on ID.
*
* @param schemaId
* @return
*/
public Schema getSchemaByID(int schemaId) {
Schema avroSchema = null;
try {
avroSchema = schemaClient.getByID(schemaId);
} catch (IOException | RestClientException e) {
logger.error("Unable to get schema from Registry for ID {} ", schemaId, e);
}
return avroSchema;
}
/**
* Convert the input JSON to connect Struct format. Convert the avro schema
* to connect schema
*
* @param inputBytes
* @param avroSchema
* @return
*/
public SchemaAndValue toConnect(byte[] inputBytes, Schema avroSchema) {
GenericData.Record record = jsonAvroConverter.convertToGenericDataRecord(inputBytes, avroSchema);
SchemaAndValue schemaAndValue = avroDataUtils.toConnectData(avroSchema, record);
return schemaAndValue;
}
}
@srijiths
Copy link
Author

Include this in your POM.

	<repositories>
		<repository>
			<id>confluent</id>
			<name>Confluent</name>
			<url>https://packages.confluent.io/maven/</url>
		</repository>
	</repositories>
               <dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-schema-registry-client</artifactId>
			<version>${confluent.version}</version>
		</dependency>
		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-connect-avro-converter</artifactId>
			<version>${confluent.version}</version>
		</dependency>
		<dependency>
			<groupId>tech.allegro.schema.json2avro</groupId>
			<artifactId>converter</artifactId>
			<version>${jsonconverter.version}</version>
		</dependency>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment