Skip to content

Instantly share code, notes, and snippets.

@jshirley
Created January 12, 2011 15:09
Show Gist options
  • Save jshirley/776267 to your computer and use it in GitHub Desktop.
Save jshirley/776267 to your computer and use it in GitHub Desktop.
Kafka client rough cut
package Kafka::Client;
use Moose;
use Digest::CRC qw(crc32);
use IO::Socket;
my $PRODUCE_REQUEST_ID = 0;
has 'host' => (
is => 'ro',
isa => 'Str',
required => 1,
);
has 'port' => (
is => 'ro',
isa => 'Num',
required => 1,
);
has 'socket' => (
is => 'ro',
isa => 'IO::Socket',
lazy_build => 1,
handles => {
close => 'close',
}
);
sub _build_socket {
my ( $self ) = @_;
IO::Socket::INET->new( PeerAddr => $self->host, PeerPort => $self->port );
}
sub send {
my ( $self, $messages, $topic, $partition ) = @_;
$partition ||= 0;
$messages = [ $messages ] if not ref $messages;
$self->socket->send( encode_produce_request( $topic, $partition, $messages ) );
}
sub encode_message {
my $msg = shift;
print pack('C', 0)."\n";
print pack('i', crc32($msg))."\n";
return pack('C', 0) . pack('i>', crc32($msg)) . $msg;
}
sub encode_produce_request {
my ( $topic, $partition, $messages ) = @_;
my $message_set = map {
my $encoded = encode_message($_);
pack('i>', length($encoded)) . $encoded;
} @$messages;
my $data = pack('n', $PRODUCE_REQUEST_ID) .
pack('n', length($topic)) . $topic .
pack('i>', $partition) .
pack('i>', length($message_set)) . $message_set;
return pack('i>', length($data)) . $data;
}
package main;
my $kafka = Kafka::Client->new( host => '10.1.3.156', port => 9092 );
my $message = 'Fuck your mom';
my $topic = 'test';
$kafka->send( $message, $topic );
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment