Skip to content

Instantly share code, notes, and snippets.

@dantswain
Created February 18, 2016 19:57
Show Gist options
  • Save dantswain/d517a57eac7cd19f2c07 to your computer and use it in GitHub Desktop.
Save dantswain/d517a57eac7cd19f2c07 to your computer and use it in GitHub Desktop.
KafkaEx consumer group begin set to "kafka_ex"
iex(1)> KafkaEx.create_worker(:foo, uris: Application.get_env(:kafka_ex, :brokers), consumer_group: "bar")
14:53:54.712 [debug] Succesfully connected to "localhost" on port 9092
14:53:54.714 [debug] Succesfully connected to "localhost" on port 9093
14:53:54.716 [debug] Succesfully connected to "localhost" on port 9094
{:ok, #PID<0.138.0>}
iex(2)> Enum.each(1..1000, fn x -> KafkaEx.produce("foo", 0, "bar#{x}") end)
:ok
iex(3)> KafkaEx.fetch("foo", 0, worker_name: :foo, max_bytes: 100)
[%KafkaEx.Protocol.Fetch.Response{partitions: [%{error_code: 0,
hw_mark_offset: 1000, last_offset: 2,
message_set: [%KafkaEx.Protocol.Fetch.Message{attributes: 0,
crc: 2966837974, key: "", offset: 0, value: "bar1"},
%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 702483308, key: "",
offset: 1, value: "bar2"},
%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 1591229434, key: "",
offset: 2, value: "bar3"}], partition: 0}], topic: "foo"}]
iex(4)> KafkaEx.fetch("foo", 0, worker_name: :foo, max_bytes: 100)
[%KafkaEx.Protocol.Fetch.Response{partitions: [%{error_code: 0,
hw_mark_offset: 1000, last_offset: 5,
message_set: [%KafkaEx.Protocol.Fetch.Message{attributes: 0,
crc: 3233590873, key: "", offset: 3, value: "bar4"},
%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 3082526415, key: "",
offset: 4, value: "bar5"},
%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 783470453, key: "",
offset: 5, value: "bar6"}], partition: 0}], topic: "foo"}]
iex(5)> :sys.get_stat
get_state/1 get_state/2 get_status/1 get_status/2
iex(5)> :sys.get_state(:foo).consumer_group
"kafka_ex"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment