A source schema has to be declared before extracting the data from the source. To define the source schema source.schema
property is available which takes a JSON value defining the source schema. This schema is used by Converters to perform data type or data format conversions. The java class representation of a source schema can be found here Schema.java.
In Gobblin library a Converter is an interface for classes that implement data transformations, e.g., data type conversions, schema projections, data manipulations, data filtering, etc. This interface is responsible for converting both schema and data records. Classes implementing this interface are composible and can be chained together to achieve more complex data transformations.
A converter basically needs four inputs:
- Input schema
- Output schema type
- Input data
- Output data type
There are various inbuilt Converters available within gobblin-core. However, you can also implement your own converter by extending abstract class org.apache.gobblin.converter.Converter
. Below, is example of such a custom implementation of Gobblin Converter which replaces multiple newlines and spaces from JSON values.
package org.apache.gobblin.example.sample;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.Converter;
import org.apache.gobblin.converter.DataConversionException;
import org.apache.gobblin.converter.SchemaConversionException;
import org.apache.gobblin.converter.SingleRecordIterable;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
public class FilterSpacesConverter extends Converter<JsonArray, JsonArray, JsonObject, JsonObject> {
@Override
public JsonArray convertSchema(JsonArray inputSchema, WorkUnitState workUnit)
throws SchemaConversionException {
return inputSchema; //We are not doing any schema conversion
}
@Override
public Iterable<JsonObject> convertRecord(JsonArray outputSchema, JsonObject inputRecord, WorkUnitState workUnit)
throws DataConversionException {
String jsonStr = inputRecord.toString().replaceAll("\\s{2,}", " ");
return new SingleRecordIterable<>(new JsonParser().parse(jsonStr).getAsJsonObject());
}
}
The converters can also be chained to perform sequential conversion on each input record. To chain converters use the property converter.classes
and provide a list of comma separated converters with full reference name of converters. The execution order of the converters is same as defined in the comma separated list.
For example:
If you are reading data from a JsonSource and you want to write data into Avro format. For this you can chain the converters to convert from Json string to Json and the convert Json into Avro. By using the following property in your .pull file.
converter.classes="org.apache.gobblin.converter.json.JsonStringToJsonIntermediateConverter,org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter"
- AvroFieldRetrieverConverter.java
- AvroRecordToAvroWritableConverter.java
- AvroToAvroCopyableConverter.java
- AvroToBytesConverter.java
- BytesToAvroConverter.java
- FlattenNestedKeyConverter.java
- JsonIntermediateToAvroConverter.java
- JsonRecordAvroSchemaToAvroConverter.java
- CsvToJsonConverter.java
- CsvToJsonConverterV2.java
- AvroFieldsPickConverter.java
- AvroFilterConverter.java
- AvroToRestJsonEntryConverter.java
- BytesToJsonConverter.java
- JsonStringToJsonIntermediateConverter.java
- JsonToStringConverter.java
- ObjectStoreConverter.java
- ObjectStoreDeleteConverter.java
- HiveSerDeConverter.java
- ObjectToStringConverter.java
- StringFilterConverter.java
- StringSplitterConverter.java
- StringSplitterToListConverter.java
- StringToBytesConverter.java
- TextToStringConverter.java
- GobblinMetricsPinotFlattenerConverter.java
The following section discusses the specification to define source schema using a JSON format and few examples of how primitive, complex and nested data types are defined using the schema.
Key Name | Value data type | Description |
---|---|---|
columnName | String | The name of the JSON key which will contain the data. |
isNullable | Boolean | Can data be null? |
comment | String | Field description just for documentation purpose. |
dataType | JSON | Provides more information about the data type. |
dataType.type | String | Type of data to store. ex: int, long etc |
dataType.name | String | Provide a name to your data type. |
dataType.items | String/JSON | Used for array type to define the data type of items contained by the array. If data type of array items is primitive the String is used as value otherwise for complex type dataType JSON should be used as a value to provide further information on complex array items. |
dataType.values | String/JSON/Array | Used by map and record types to define the data type of the values. In case of records it will always be Array type defining fields. In case of map it could be String or JSON based on primitive or complex data type involved. |
dataype.symbols | Array | Array of strings to define the enum symbols. |
watermark | Boolean | To specify if the key is used as a watermark. Or use extract.delta.fields property to define comma separated list of watermark fields. |
unique | Boolean | To specify if the key should be unique set of records. |
defaultValue | Object | To specify the default value. |
The converters which perform data format conversions such as CSV to JSON, JSON to AVRO etc. will have to perform data type conversions. Below, is the list of such converters and the data types they support.
Converter | Data types |
---|---|
JsonIntermediateToAvroConverter.java |
|
JsonIntermediateToParquetGroupConverter.java |
|
{
"jobRoles": 42,
"peopleWeightAvg": 50.5,
"peopleOrg": "EvilCorp",
"peopleAvgSal": 342222.65,
"peopleCount": 8344242342,
"peopleBrain": null,
"public": false
}
[
{
"columnName": "jobRoles",
"isNullable": false,
"comment": "Number of roles in the org"
"dataType": {
"type": "int"
}
},
{
"columnName": "peopleWeightAvg",
"isNullable": false,
"comment": "Avg weight of people in org"
"dataType": {
"type": "float"
}
},
{
"columnName": "peopleOrg",
"isNullable": false,
"comment": "Name of org people works for"
"dataType": {
"type": "string"
}
},
{
"columnName": "peopleAvgSal",
"isNullable": false,
"comment": "Avg salary of people in org"
"dataType": {
"type": "double"
}
},
{
"columnName": "peopleCount",
"isNullable": false,
"comment": "Count of people in org"
"dataType": {
"type": "long"
}
},
{
"columnName": "peopleBrain",
"comment": "Brain obj of people"
"dataType": {
"type": "null"
}
},
{
"columnName": "public",
"isNullable": false,
"comment": "Is data public"
"dataType": {
"type": "boolean"
}
}
]
{
"arrayOfInts": [25, 50, 75]
}
[
{
"columnName": "arrayOfInts",
"isNullable": false,
"comment": "Items in array have same data type as defined in dataType."
"dataType": {
"type": "array",
"items": "int"
}
}
]
Maps can contain n number of key value pairs with constraint of same data type for values and keys are always string.
{
"bookDetails":{
"harry potter and the deathly hallows": 10245,
"harry potter and the cursed child": 20362
}
}
[
{
"columnName": "bookDetails",
"isNullable": false,
"comment": "Maps always have string as keys and all values have same type as defined in dataType"
"dataType": {
"type": "map",
"values": "long"
}
}
]
Unlike map, values in record type are not bound by single value type. Keys and values have to be declared in the schema with data type.
{
"userDetails": {
"userName": "anonyoumous",
"userAge": 50,
}
}
[
{
"columnName": "userDetails",
"isNullable": false,
"comment": "user detail"
"dataType": {
"type": "record",
"values": [
{
"columnName": "userName",
"dataType":{
"type":"string"
}
},
{
"columnName": "userAge",
"dataType":{
"type":"int"
}
}
]
}
}
]
{
"userStatus": "ACTIVE"
}
[
{
"columnName": "userStatus",
"dataType":{
"type": "enum",
"symbols":[
"ACTIVE", "INACTIVE"
]
}
}
]
Complex types can be used to created nested schemas.
Few of the examples to show how nested schema is written
[
{
"columnName": "userName",
"dataType": {
"type": "string"
}
},
{
"columnName": "purchase",
"dataType": {
"type": "array",
"items": {
"dataType": {
"type": "record",
"values": [
{
"columnName": "ProductName",
"dataType": {
"type": "string"
}
},
{
"columnName": "ProductPrice",
"dataType": {
"type": "long"
}
}
]
}
}
}
}
]
[
{
"columnName": "persons",
"dataType": {
"type": "map",
"values": {
"dataType": {
"type": "array",
"items": "int"
}
}
}
}
]