This will help you get started on running your first Gobblin pipeline to extract file metadata from HDFS and store it in Ground with Kafka being the intermediary.
The following need to be set up and running before you start the pipelines :
- Ground
- HDFS
- Confluent Kafka, Zookeeper, Schema Registry
The data ingestion takes place through two Gobblin pipelines, the first to extract data from HDFS and store it in a Kafka topic, and the second to read the data from the topic and write it to Ground. Both these pipelines are run from within ground-ingest
.Inside the top level directory, execute the following command :
mvn clean install -DskipTests
This will build all the components inside ground, including ground-ingest. Once the build succeeds, a zip file of the ground-ingest
snapshot will be created inside the target
in ground-ingest
.Unzip the file locally.
cd ground-ingest/target
unzip ground-ingest-0.1-SNAPSHOT-bin.zip
A new folder called ground-ingest-0.1-SNAPSHOT
will be created inside target
. All Gobblin pipelines will now be run from within this folder.
A pull file is a gobblin configuration file which containes properties that specify various locations of the files required to run the pipeline. There is a separate pull file required for each of the two gobblin pipelines you want to run.
In ground-ingest, the pull file is located in two locations. The main copy is located in the following location: ground-ingest/src/main/resources/jobConf/
. Any changes to the pull file need to be made here. A copy of this pull file is also located insite the snapshot created earlier: ground-ingest/target/ground-ingest-0.1-SNAPSHOT/conf/jobConf
. This is the pull file that you will be using when you run the pipeline. To be safe, any changes you want to make to the pull file should be made to both these locations.
The following are the two pull files needed to run the Gobblin pipelines for the HDFS data ingestion.
HDFS-Kafka Pull File
job.name=Pull from hdfs
job.group=hdfs
job.description=A getting started example for hdfs
job.schedule=
writer.kafka.topic=SampleTopic
writer.kafka.producerConfig.schemaRegistry.schema.name=SampleTopic
writer.kafka.producerConfig.bootstrap.servers=localhost:9092
source.class=edu.berkeley.ground.ingest.FileMetadataSource
source.filebased.fs.uri=hdfs://localhost:8020
source.filebased.data.directory=hdfs://localhost:8020/(location of HDFS directory)
extract.table.name=GroundEntity
extract.table.type=SNAPSHOT_ONLY
extract.namespace=edu.berkeley.ground.ingest
writer.builder.class=gobblin.kafka.writer.KafkaDataWriterBuilder
writer.destination.type=KAFKA
writer.output.format=AVRO
data.publisher.type=gobblin.publisher.NoopPublisher
writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
writer.kafka.producerConfig.schema.registry.url=http://localhost:8081
The property job.schedule
should be used if the pipeline is required to be scheduled to run at certain times. The value of this property must be a CRONTrigger. When executed, this gobblin pipeline will create a new topic in Kafka called 'SampleTopic' and store the file metadata there. The property source.filebased.fs.uri
needs to be set to the HDFS uri. The property source.filebased.data.directory
needs to be set to point to the HDFS directory where the files are stored. Make sure that the writer.kafka.producerConfig.schema.registry.url
is correctly set.
Kafka-Ground Pull File
job.name=Pull from kafka
job.group=kafka
job.description=A getting started example for kafka
source.class=gobblin.source.extractor.extract.kafka.KafkaDeserializerSource
kafka.deserializer.type=CONFLUENT_AVRO
kafka.schema.registry.url=http://localhost:8081
extract.namespace=edu.berkeley.ground.ingest
data.publisher.type=gobblin.publisher.NoopPublisher
writer.builder.class=edu.berkeley.ground.ingest.GroundDataWriterBuilder
kafka.brokers=localhost:9092
topic.whitelist=SampleTopic
bootstrap.with.offset=earliest
writer.ground.serverAddress=http://localhost:9090/
The second pipeline will read the 'SampleTopic' and write to Ground. The property writer.ground.serverAddress
should be set to the correct ground server address.
To run Gobblin in standalone mode we need a Gobblin configuration file (such as uses gobblin-standalone.properties). And for each job we wish to run, we also need a job configuration file (or pull file). The Gobblin configuration file should contain certain properties which specify the locations of configuration files, working directories etc. By default these properties point to certain environment variable Make sure that the environment variable JAVA_HOME
is set correctly. Next, set the environment variable GOBBLIN_JOB_CONFIG_DIR
to point to the folder inside the snapshot containing the pull file ,i.e to ground-ingest/target/ground-ingest-0.1-SNAPSHOT/conf/jobConf
. Next create a folder as Gobblin's working directory. Gobblin will write job output as well as other information there, such as locks and state-store. Set environment variable GOBBLIN_WORK_DIR
to point to that folder.
Once all the services are up and running (HDFS, Kafka, Ground, Zookeeper, Schema Registry), and all the environment variables correctly set, you are now ready to run the pipelines.
- HDFS - Kafka
Make sure you have the correct pull file stored in both locations mentioned before. Next, run the run_scheduler.sh script from within the snapshot directory created earlier.
cd ground-ingest-0.1-SNAPSHOT
./bin/run_scheduler.sh
The logs will be displayed in the terminal window. To check if the data has been successfully written to the Kafka topic, you can check the kafka-logs.
cd /tmp/kafka-logs/SampleTopic
Check the log file. You should see some data corresponding to the files written in the Avro format.
- Kafka - Ground
Again, make sure to replace the pull file in both locations with the one corresponding to this pipeline. Run the run_scheduler.sh script the same way as before. To check whether data has been correctly written to Ground, you can check the ground database inside postgres manually to see the nodes and node versions being created.