Skip to content

Instantly share code, notes, and snippets.

@Iftimie
Created September 6, 2020 12:35
Show Gist options
  • Save Iftimie/ed0b80dc76a2dd7c3d5403c7568e1380 to your computer and use it in GitHub Desktop.
Save Iftimie/ed0b80dc76a2dd7c3d5403c7568e1380 to your computer and use it in GitHub Desktop.
Collector -> message broker -> kafka-connect -> HDFS
#!/bin/sh
curl -s \
-X "POST" "http://localhost:8083/connectors/" \
-H "Content-Type: application/json" \
--data '{
"name": "hdfs-sink-phrases",
"config": {
"connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": 10000,
"path.format": "YYYYMMdd_HHmm",
"timestamp.extractor":"Record",
"confluent.topic.bootstrap.servers":"assembler.broker:9092",
"tasks.max": "1",
"topics": "phrases",
"hdfs.url": "hdfs://assembler.hadoop.namenode:9000",
"flush.size": "3",
"locale": "en",
"timezone": "UTC",
"topics.dir": "/phrases/1_sink",
"logs.dir": "/phrases/1_sink/logs",
"name": "hdfs-sink-phrases"
} }'
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
class Collector(object):
def __init__(self):
value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
self._producer = AvroProducer({
'bootstrap.servers': f'{os.getenv("BROKER_HOST")}:9092',
'schema.registry.url': f'http://{os.getenv("SCHEMA_REGISTRY_HOST")}:8081',
'on_delivery': self._delivery_report
}, default_key_schema=key_schema, default_value_schema=value_schema)
def collect_phrase(self, phrase):
self._producer.produce(topic='phrases', value={"phrase": phrase}, key={"phrase": phrase})
self._producer.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment