Skip to content

Instantly share code, notes, and snippets.

@Mons
Created July 21, 2017 14:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Mons/ea1841621dce7eaec786344b5b450419 to your computer and use it in GitHub Desktop.
Save Mons/ea1841621dce7eaec786344b5b450419 to your computer and use it in GitHub Desktop.
Simple debugging scribe server in perl
#!/usr/bin/env perl
use 5.010;
use strict;
use DDP;
use Data::Dumper;
# use Thrift;
# use Scribe::Thrift::scribe;
use Thrift::XS::MemoryBuffer;
use Thrift::XS::BinaryProtocol;
use AnyEvent::Socket;
use AnyEvent::Handle;
use EV;
my $ib = Thrift::XS::MemoryBuffer->new(8192);
my $ob = Thrift::XS::MemoryBuffer->new(8192);
my $i = Thrift::XS::BinaryProtocol->new($ib);
my $o = Thrift::XS::BinaryProtocol->new($ob);
###
### Struct is a noop
### Field end is a noop
### MEssageEnd is a noop
###
use DDP;
use Getopt::Long;
my ($host,$port) = qw(0.0.0.0 1463);
my $examples = 0;
GetOptions(
"l|listen=s", sub { my ($h,$p) = split ':',$_[1],2;$host = $h//$host;$port = $p//$port },
'examples' => \$examples,
) or die <<EOH;
Usage:
$0 [OPTIONS]
-l --listen host:port
EOH
if ($examples) {
say "Examples:\n";
## Examples:
use Devel::Hexdump 'xd';
my @messages = (
{category => "my-cat", message => "my message"}
);
$o->writeMessageBegin('Log', TMessageType::CALL, 0x11223344);
# $o->writeStructBegin('scribe_Log_args');
if (@messages) {
$o->writeFieldBegin( 'messages', TType::LIST, 1 );
$o->writeListBegin(TType::STRUCT, 0+@messages);
for my $msg (@messages) {
# $o->writeStructBegin('LogEntry');
if (defined $msg->{category}) {
$o->writeFieldBegin('category', TType::STRING, 1);
$o->writeString($msg->{category});
# $o->writeFieldEnd();
}
if (defined $msg->{message}) {
$o->writeFieldBegin('message', TType::STRING, 2);
$o->writeString($msg->{message});
# $o->writeFieldEnd();
}
$o->writeFieldStop();
# $o->writeStructEnd();
}
$o->writeListEnd();
# $o->writeFieldEnd();
}
$o->writeFieldStop();
# $o->writeStructEnd();
warn "Request packet: \n".xd( my $request = $ob->read($ob->available()) );
warn "Request packet [pack]: \n".
xd(pack 'n n N/a* N CnCN (CnN/a* CnN/a* C C)',
0x8001, TMessageType::CALL,
'Log',
0x11223344,
TType::LIST,1,
TType::STRUCT,1,
(TType::STRING,1,"my-cat"),
(TType::STRING,2,"my message"),
TType::STOP,
TType::STOP
);
$o->writeMessageBegin('Log', TMessageType::REPLY, 0xdeadbeef);
# $o->writeStructBegin('scribe_Log_result');
if ('defined success_code') {
$o->writeFieldBegin('success', TType::I32, 0);
$o->writeI32(0x1234);
# $o->writeFieldEnd();
}
$o->writeFieldStop();
# $o->writeStructEnd();
# $o->writeMessageEnd();
warn "Response success with code (0x1234): \n".xd( $ob->read($ob->available()) );
warn "Response success with code (0x1234) [pack]: \n".
xd(pack 'n n N/a* N CnN C',
0x8001, TMessageType::REPLY,
'Log',
0xdeadbeef,
(TType::I32,0,0x1234),
TType::STOP
);
# exit;
$o->writeMessageBegin('Log', TMessageType::EXCEPTION, 0xdeadbeef);
$o->writeStructBegin('TApplicationException');
if ('message') {
$o->writeFieldBegin('message', TType::STRING, 1);
$o->writeString("Error message");
# $o->writeFieldEnd();
}
if ('code') {
$o->writeFieldBegin('type', TType::I32, 2);
$o->writeI32(1234);
# $o->writeFieldEnd();
}
$o->writeFieldStop();
$o->writeStructEnd();
# $o->writeMessageEnd();
warn "Exception packet (raw): \n".xd( $ob->read($ob->available()) );
warn "Exception packet [pack]: \n".
xd(pack 'n n N/a* N (CnN/a* CnN C)',
0x8001, TMessageType::EXCEPTION,
'Log',
0xdeadbeef,
(TType::STRING,1,"Error message"),
(TType::I32,2,1234),
TType::STOP
);
# exit;
my $exc = TApplicationException->new();
$exc->{message} = 'Function not implemented.';
$exc->{code} = TApplicationException::UNKNOWN_METHOD;
$o->writeMessageBegin('Log', TMessageType::EXCEPTION, 0xdeadbeef);
$exc->write($o);
# $o->writeMessageEnd();
warn "Exception packet (from obj): \n".xd( $ob->read($ob->available()) );
exit;
## End examples
}
our %TYPES;
our %MSGS;
BEGIN{
%TYPES = (
0 => 'T_STOP',
1 => 'T_VOID',
2 => 'T_BOOL',
3 => 'T_BYTE',
# 3 => 'T_I08',
6 => 'T_I16',
8 => 'T_I32',
9 => 'T_U64',
10 => 'T_I64',
4 => 'T_DOUBLE',
11 => 'T_STRING',
# 11 => 'T_UTF7',
12 => 'T_STRUCT',
13 => 'T_MAP',
14 => 'T_SET',
15 => 'T_LIST',
16 => 'T_UTF8',
# 1 => 'T_UTF16',
);
%MSGS = (
1 => 'M_CALL',
2 => 'M_REPLY',
3 => 'M_EXCEPTION',
4 => 'M_ONEWAY',
);
}
use constant +{ reverse %TYPES };
use constant +{ reverse %MSGS };
tcp_server $host,$port,sub {
my $fh = shift;
say "Client connected @_";
my $h;$h = AnyEvent::Handle->new(
fh => $fh,
on_read => sub {
my $rbuf = \$_[0]{rbuf};
my $have = length $$rbuf;
my $oft = 0;
while ($have >= $oft + 4) {
my $len = unpack 'N',substr($$rbuf,$oft,4);
# warn "expect frame $len";
my $empty = 1;
if ( $have >= $oft + 4 + $len ) {
my $f = \substr( $$rbuf, $oft+4,$len );
print "received frame (length = $len)\n".xd( $$f );
my %h;my $data;
(@h{qw(ver type name seq)},$data) = unpack 'nn N/a* N a*', $$f;
printf("\tVER=%d Type=%d:%s Name='%s' Seq=%d +%d more data\n",@h{qw(ver type)},$MSGS{$h{type}}, $h{name},$h{seq}, length $data);
STRUCT1: while( length $data ) {
my %f;
(@f{qw(type id)},$data) = unpack 'C n a*', $data;
print "\t\tfield <$f{type}:$TYPES{$f{type}}><$f{id}>\n";
last STRUCT1 if $f{type} == T_STOP;
if ($f{type} != T_LIST) {
warn "\t\tCan't handle non-list\n";
last;
}
my %l;
(@l{qw(etype count)},$data) = unpack 'C N a*', $data;
printf "\t\tElements type: <$l{etype}:$TYPES{$l{etype}}>\n";
# if ($f{etype} != T_STRUCT) {
# warn "\t\tCan't handle non-struct elements\n";
# last;
# }
my @msgs;
for (1..$l{count}) {
my %m;
STRUCT2: while( length $data ) {
my %f;
(@f{qw(type id)},$data) = unpack 'C n a*', $data;
printf "\t\t\t<$f{type}:$TYPES{$f{type}}> id:$f{id}\n";
if ($f{type} == T_STRING) {
my $str;
($str,$data) = unpack 'N/a* a*', $data;
if ($f{id} == 1) {
$m{cat} = $str;
$empty = 0;
}
elsif ($f{id} == 2) {
$m{msg} = $str;
$empty = 0;
}
else {
warn "\t\t\tUnknown subtype: $f{id} with data ".unpack("H*",$str)."\n";
}
#warn Dumper $str;
}
elsif ($f{type} == T_STOP) {
last;
}
else {
warn "\t\t\tUnsupported field type\n";
last;
}
}
print "\t\t[$m{cat}] $m{msg}\n";
# push @msgs, \%m;
# warn "inner end";
}
# warn Dumper \@msgs;
}
# make reply
my $reply;
unless ($empty) {
$reply = pack 'n n N/a* N (CnN C)',
0x8001, TMessageType::REPLY,
'Log',
$h{seq},
(TType::I32,0,123), # reply code
TType::STOP
;
}
else {
$reply = pack 'n n N/a* N (CnN/a* CnN C)',
0x8001, TMessageType::EXCEPTION,
'Log',
$h{seq},
(TType::STRING,1,"Empty log"),
(TType::I32,2,1234),
TType::STOP
}
warn "Reply = \n".xd($reply);
$_[0]->push_write( pack 'N/a*',$reply );
$oft += 4 + $len;
}
else {
warn "need $len+4, have $have";
last;
}
}
$_[0]{rbuf} = substr($$rbuf,$oft);
},
on_error => sub {
undef $h;
},
);
}, sub {
my (undef,$host,$port) = @_;
say "Server started at $host:$port";
return 128;
};
EV::loop;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment