Skip to content

Instantly share code, notes, and snippets.

@jamesrajendran
Created June 18, 2017 05:58
Show Gist options
  • Save jamesrajendran/cd4086c13c891a91ed981f7c793c3952 to your computer and use it in GitHub Desktop.
Save jamesrajendran/cd4086c13c891a91ed981f7c793c3952 to your computer and use it in GitHub Desktop.
kafka notes concepts points to remember
-------------kafka notes-----------
why?
better throughput
Replication
built-in partitioning
Fault tolerance
topics are unique!!
location of a message -> topic - partition - offset
more than 2 consumers cannot read from same topic - restriction to avoid double reading.
max number of consumers in a consumer group is the available partitions of a topic.
Replication is set at the topic level
Broker leader talks to producer and consumer - for every partition there is a leader
To have more brokers:
create 3 config/server.properties
in each set:
unique broker.id
listeners=PLAINTEXT://:9093 -- change the port number if multiple brokers are created in the same machine.
log-dir - give unique name if in the same machine
important properties:
broker.id
port
log.dirs
zookeeper.connect
delete.topic.enable
auto.create.topics.enable
default.replication.factor
num.partitions
log.retention.ms
log.retention.bytes[size based on partition not on topic]
Partitioning - default partition by kafka if key value is null or based on key value supplied
but can be hardcoded using ProducerRecord(,partition param,timestamp,)
if this timestamp not set, broker will set.
acks => acknoledgement values 0,1, all
0 --> no gurantee if it reached even the leader, highest throughput
1 --> reached leader, but not followers yet, slight risk if the leader crashes.
all --> reached leader replicated to followers - no risk , slow throughput.
max.in.flight.requests.per.connection -- in the asynchronous acknowledgement scenario, how many requests can be sent before reaching acknoledgement
will require more memory(more inflight requests) but high throughput
side-effect of Asynchronous and retry --> if a batch failed and the next batch sent successfully and the previous batch was retried, eventually the order of data events will be lost.
if order is important - use synchronous send & max.in.flight.requests.per.connection=1
other important properties:
buffer.memory
compression.type
batch.size - 0 means no batching
linger.ms - if batch size is not reached, will wait this much time-collect and make a request - if batch size reached before this time, request will be made.
client.id - beyond host ip and porr - logical app name for example
max.request.size
Consumer considerations:
reading in parallel -
only one consumer owns a partition - consumers in a consumer group do not share a partition - hence number of partitions in a topic is the upper limit of the consumers in a group
consumer reassignment: while scaling up from one consumer to many or when one of the consumers in the group crashes
Rebalance activity:
two players:
1. server side group coordinator- GC
2. Consumer side Leader - L
GC->(on the server) manages a list of group members
GC--> initiates rebalance activity(block the read for all members)
L--> executes rebalance activity and sends new partition assignment to coordinator
GC--> communcate the new aasignment to consumers
-------
offset:
current offset - sent records - the position upto which kafka thinks it has given out to the consumer
committed offset - processed records - the position already processed by consumer(consumer communicates back once successfully received)
is critical in the event of rebalancing - for the new consumer assigned to the partition to know where to start
committing offset:
auto - properties - enable.auto.commit, auto.commit.interval.ms(default 5 sec), but may cause second processing
manual;
commit sync
commit async
manual commit scenario:
process <within poll interval) takes longer.
1.GS considers this consumer is dead, triggers rebalance activity.
2.Rebalancing happens for some other reason
current partitions will be taken away and given to others - how about committing the offset upto which process has completed.
Commit before the ownership is taken away - using Rebalance listener - committing intermediary processed offsets instead of commiting the current offset.
Rebalance Listener class has 2 methods:
1.onPartitionsRevoked(happens just before partitions are revoked - this is where we commit)
ConsumerRebalanceListener:
maintain a list of offset that are processed but not committed yet
commit when partitions revoked.
2.onPartitionsAssigned
Rebalance occurs(new consumer added, poll is delayed as processing takes longer, other system failures)
Create rebalanceListener using ConsumerRebalanceListener interface-
maintain an offset and commit when onPartitionsRevoked method.
Advantages of Consumer Group:
parallel processing of topic
auto management of partition assignment
detect entry/exit/failure of a consumer and partition rebalancing activity take care
one downside is -- partition management is done by Kafka so no control on that if we have custom partitioner for example.
if you want to process one portion of data differently from others.
one issue with Rebalance listener:
if the process inside poll is to load data into db and it's loaded and committed.
the consumer crashes before the offsets are committed during onPartitionsRevoked method.
These two activities are not 'atomic' so db can't be rolledback.
To make it atomic:
assign the relevant partition to some consumer,
store the messages in a DB table and store their corresponding offset also in a table and make both as a 'transaction'.
Schema Evolution:
if data structure might change over time, we should be able to support both old and new schema.
working with Avro Serialization:
create a Avro schema for record
generate Avro source code for that schema
create producer and consumer using KafkaAvroSerializer, kafkaAvroDeserializer
Confluent maintains SchemaRegistry, stores an Id and schema and the id is passed by Producer with data and consumer uses the id to get schema info from registry.
Avro:
allows to define schema for your data
creates code for the schema(optional)
provide APIs to embed, extract schema and to serialize and deserialize the data.
Avro schema defined by Json.
--------------------deleting kafka topics----------
use run_class and Topicdelete command
prerequisite is delete.topic.enable = true
if the server is done while deleting the topic, topic will be shown in the list, but when server is started, the log folders will be deleted and the topic will permanantly be deleted.
if ther server is running while deletion, the list will not show the deleted topics.
kafka-run-class.sh kafka.admin.TopicCommand --delete --topic ftoKafka_topic --zookeeper localhost:2181
Sreaming App steps:
start kafka if not already running.
create kafka topic
generate logs
start flume agent whose source is log data and sink is kafka
run the scala jar that will do the analysis
----
/home/cloudera/Downloads/kafka_2.10-0.10.2.1/bin/kafka-server-start.sh /home/cloudera/Downloads/kafka_2.10-0.10.2.1/config/server.properties
/home/cloudera/Downloads/kafka_2.10-0.10.2.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic ftokafkaTopicNew
/opt/gen_logs/start_logs.sh
flume-ng agent -n ftokafka -c /home/cloudera/flume_script -f /home/cloudera/flume_script/ftokafka.conf
spark-submit --class FlumeToKafkaToSpark --master local[2] --jars "/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/spark-streaming_2.10-1.6.2.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/spark-streaming-kafka_2.10-1.6.2.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/kafka_2.10-0.10.2.1.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/metrics-core-2.2.0.jar" /home/cloudera/ftokafkatospark_2.10-1.0.jar
to debug.
flume-ng agent -Dflume.root.logger=INFO,console -n ftokafka -c /home/cloudera/flume_script -f ftokafka.conf
spark-shell --jars "/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/spark-streaming_2.10-1.6.2.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/spark-streaming-kafka_2.10-1.6.2.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/kafka_2.10-0.10.2.1.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/metrics-core-2.2.0.jar"
retail_2.10-1.0.jar /user/dgadiraju/streaming/streamingdepartmentanalysis
sudo ssh 127200813data00.eap.g4ihos.itcs.hpecorp.net
cd /usr/hdp/2.5.0.0-1245/kafka/bin/
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ftokafkaTopicNew
./kafka-topics.sh --list --zookeeper 127200813master.eap.g4ihos.itcs.hpecorp.net:2181,127200813data02.eap.g4ihos.itcs.hpecorp.net:2181,127200813data01.eap.g4ihos.itcs.hpecorp.net:2181,127200813data00.eap.g4ihos.itcs.hpecorp.net:2181
./kafka-console-producer.sh --broker-list 127200813data00.eap.g4ihos.itcs.hpecorp.net:9092 --topic test
/home/cloudera/Downloads/kafka_2.10-0.10.2.1/bin/kafka-console-consumer.sh --bootstrap-server 127200813data00.eap.g4ihos.itcs.hpecorp.net:9092 --zookeeper 127200813master.eap.g4ihos.itcs.hpecorp.net:2181,127200813data02.eap.g4ihos.itcs.hpecorp.net:2181,127200813data01.eap.g4ihos.itcs.hpecorp.net:2181,127200813data00.eap.g4ihos.itcs.hpecorp.net:2181 --topic ftokafkaTopicNew --from-beginning
countByDept.foreachRDD { rdd =>
rdd.foreach { record =>
println(record._1)
println(record._2)
} }
--------------
./kafka-topics.sh --create --zookeeper 127200813data02.eap.g4ihos.itcs.hpecorp.net:2181 --replication-factor 1 --partitions 1 --topic test1
./kafka-console-producer.sh --broker-list 127200813data00.eap.g4ihos.itcs.hpecorp.net:9092 --topic test1
./kafka-console-consumer.sh --bootstrap-server 127200813data00.eap.g4ihos.itcs.hpecorp.net:9092 --zookeeper 127200813data02.eap.g4ihos.itcs.hpecorp.net:2181 --topic test1 --from-beginning
kafka-topics.sh --describe --zookeeper 127200813data02.eap.g4ihos.itcs.hpecorp.net:2181 --topic tes1
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper 127200813master.eap.g4ihos.itcs.hpecorp.net:2181,127200813data02.eap.g4ihos.itcs.hpecorp.net:2181,127200813data01.eap.g4ihos.itcs.hpecorp.net:2181,127200813data00.eap.g4ihos.itcs.hpecorp.net:2181
spark-shell jar kafka-spark-consumer-1.0.10.jar
------------------concepts--------------------
important criteria - Fault tolerance, data consistency, simpler API, lower end to end latency
Durable - exactly once - replay any message or set of messages given the neccessary selection criteria.
Reliable - at least once - able to replay an already received message - JMS, RabbitMQ, Amazon kenesis
Spark streaming - high performance!! due to RDD microbatching
apache storm supported by hortonworks and mapR only
spark streaming hss higher API, similar to spark batch, sql support
partitions - number of partitions per topic is important -- can't have more consumers than partitions or else same partition data will be consumed by more than one consumer.
Rebalancing: happens when consumers join or leave a group, when failures happen.
-----------parallelize inputDstreamsms-----------------
multiple InputDStreams
val numInputDStreams = 5
val kafkaDStreams = (1 to numInputDStreams).map { _ => KafkaUtils.createStream(...) }
--
multiple threads in one inputDstream
val consumerThreadsPerInputDstream = 3
val topics = Map("zerg.hydra" -> consumerThreadsPerInputDstream)
val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
---------------kafka fraud receive data and respond synchronously??--------------
------------------reading http request parameters-------------------
protected void doGet(
HttpServletRequest request,
HttpServletResponse response)
throws ServletException, IOException {
String param1 = request.getParameter("param1");
String param2 = request.getParameter("param2");
}
------------------rebalance-----------------------------
when consumer threads change dynamically(user intiated or due to system failures)
------------flume to hdfs ---------------------
--ftohdfs.conf--
ftohdfs.sources = logsource
ftohdfs.sinks = hdfssink
ftohdfs.channels = mchannel
# Describe/configure the source
ftohdfs.sources.logsource.type = exec
ftohdfs.sources.logsource.command = tail -F /opt/gen_logs/logs/access.log
# Describe the sink
ftohdfs.sinks.hdfssink.type = hdfs
ftohdfs.sinks.hdfssink.hdfs.path = hdfs://quickstart.cloudera:8020/user/cloudera/flume_data
ftohdfs.sinks.hdfssink.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
ftohdfs.channels.mchannel.type = memory
ftohdfs.channels.mchannel.capacity = 1000
ftohdfs.channels.mchannel.transactionCapacity = 100
# Bind the source and sink to the channel
ftohdfs.sources.logsource.channels = mchannel
ftohdfs.sinks.hdfssink.channel = mchannel
----
start the log gen - /opt/gen_logs/start_logs.sh
flume-ng agent -n ftohdfs -c . -f ftohdfs.conf
---------------flume to kafka--------------
install kafka: https://archive.cloudera.com/kafka/kafka/2/kafka/quickstart.html
start zookeeper if not started(one comes with kafka):
bin/zookeeper-server-start.sh config/zookeeper.properties
start kafka server:
bin/kafka-server-start.sh config/server.properties
create topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ftoKafka_topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
ftokafka.conf
ftokafka.sources = logsource
ftokafka.sinks = kafkasink
ftokafka.channels = mchannel
# Describe/configure the source
ftokafka.sources.logsource.type = exec
ftokafka.sources.logsource.command = tail -F /opt/gen_logs/logs/access.log
# Describe the sink
ftokafka.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
ftokafka.sinks.kafkasink.brokerList = localhost:9092
ftokafka.sinks.kafkasink.topic = ftokafka_topic
# Use a channel which buffers events in memory
ftokafka.channels.mchannel.type = memory
ftokafka.channels.mchannel.capacity = 1000
ftokafka.channels.mchannel.transactionCapacity = 100
# Bind the source and sink to the channel
ftokafka.sources.logsource.channels = mchannel
ftokafka.sinks.kafkasink.channel = mchannel
Start flume agent that moves data from log files to kafka sink
flume-ng agent -n ftokafka -c . -f ftokafka.conf
consume on the console:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper localhost:2181 --topic ftokafkaTopicNew --from-beginning
--------------------kafka in cloudera--------------
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ftokafka_topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper localhost:2181 --topic ftokafka_topic --from-beginning
topic creation -- zookeeper
producer -- broker list
consumer -- bootstrap-server, zookeeper
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment