Skip to content

Instantly share code, notes, and snippets.

@s1037989
Last active October 19, 2023 02:38
Show Gist options
  • Save s1037989/1c8d219d74fb7a9bb93c72bbc97edb02 to your computer and use it in GitHub Desktop.
Save s1037989/1c8d219d74fb7a9bb93c72bbc97edb02 to your computer and use it in GitHub Desktop.
diode
# perl diode recv ring
# perl diode send cc00
# perl diode send ccff 0003 610062
# perl diode send ff01 0 0003 610062
# perl diode sendfile file [name]
use 5.010;
use strict;
use warnings;
use Errno;
use Digest::MD5;
use Fcntl qw(:DEFAULT :mode :seek);
use Time::HiRes qw(nanosleep time);
use IO::Socket::Packet; # TODO: how to set this up without an object?
use Socket qw(SOCK_RAW);
use Socket::Packet qw(
PACKET_OUTGOING
PF_PACKET AF_PACKET
ETH_P_ALL
pack_sockaddr_ll unpack_sockaddr_ll recv_len
);
use constant DEBUG => $ENV{DIODE_DEBUG} // 0;
use constant DIODE => $ENV{DIODE_IFNAME} || 'diode'; # TODO: test between machines; TODO: how to work use a virtual interface "pair"?
use constant {
PROTOCOL => ETH_P_ALL, # TODO: how to use a custom protocol?
ETH => 0x0000,
};
use constant {
FRAME_SIZE => 2048,
FRAME_NR => 256000,
KERNEL_BLOCK_SIZE => 16384,
};
use constant {
NS_RECV => 1,
NS_SEND => 1, # TODO: this should be calculated based on the speed of the link and how well the receiver can keep up (10 is not that much faster than 100)
# rather than just a constant pause between packets; perhaps also a larger pause between current bytes written or evenly every n seconds?
};
use constant VALID_CMDS => {
"00" => [], # ping
"01" => [], # stop
"10" => [ # start sending file
"[Aa]{1,4}",
],
"11" => [ # stop sending file
"[Aa]{1,4}",
],
"ff" => [ # custom command
"a (abc|[ab])",
"Z (ZYX|[ZX])",
],
};
my $queue = {};
my $subcommand = \&{'cmd_'.shift @ARGV};
# pre-set dst, src, eth, and id; take op, len, and payload from args
@ARGV = (('00'x6), ('00'x6), ('00'x2), $ARGV[0], (length($ARGV[1])?sprintf('%032s', $ARGV[1]):''), map { $_||'' } @ARGV[2,3]) if $subcommand eq \&cmd_send;
$subcommand->(@ARGV);
### subcommands
sub cmd_recv {
my $recv_type = shift // 'normal';
my $sock = $recv_type eq 'ring' ? &_sockring : &_sock;
while (1) {
nanosleep NS_RECV;
my $packet;
my $rv = _sockread($sock, \$packet, \my %info);
next if !defined($rv) && $!{EAGAIN};
my ($dst, $src, $eth, $op1, $op2, $id, $payload_len, $payload) = unpack 'a6a6a2a1a1a16a2a*', $packet;
next unless $dst eq pack 'H*', sprintf '%012x', 0; # TODO: use a custom dst
next unless $src eq pack 'H*', sprintf '%012x', 0; # TODO: use a custom src
# next unless $eth eq pack 'H*', sprintf '%04x', ETH; # TODO: how to use a custom eth?
next unless $op1 eq "\xff" || $op1 eq "\xcc"; # valid op1 is cc (command) or ff (file)
$payload_len = unpack "S", pack "S", hex unpack 'H*', $payload_len;
if ($payload_len != length $payload) {
warn sprintf "%s: %s\n", 'length mismatch', join ' ', map { unpack 'H*', $_ } unpack 'a6a6a2a*', $packet;
$sock->done_ring_frame if $sock->can('done_ring_frame');
next;
}
$id = unpack "H*", $id;
if ($op1 eq "\xff") {
my $addl_data = $op2;
_file($id, $addl_data, $payload);
}
elsif ($op1 eq "\xcc") {
$op1 = unpack 'H*', $op1;
$op2 = unpack 'H*', $op2;
my $command = \&{"_command_$op2"};
if (_is_valid_cmd($op1.$op2, $payload)) {
$command->($id, $payload);
}
else {
warn "invalid command $payload\n";
}
}
$sock->done_ring_frame if $sock->can('done_ring_frame');
}
}
sub cmd_sendfile { # TODO: allow sending multiple files from the cmdline, but they should be done in serial, not parallel
my $subcall = ref $_[0] ? 1 : 0;
my $sock = ref $_[0] ? shift : &_sock;
$_[2] = sprintf '%04x', PROTOCOL;
my ($file, $name) = (shift, shift);
$name ||= $file;
warn "executing file callback script [$file $name]\n"; # TODO: implement file callback script
die "file $file does not exist\n" unless -e $file;
open my $fh, '<', $file;
sysseek($fh, 0, SEEK_SET);
my $digest = Digest::MD5->new->addfile($fh)->digest;
my $hexdigest = unpack('H*', $digest);
my @cmd = (qw(000000000000 000000000000 0000 cc10), $hexdigest, sprintf('%04x', hex length $name), unpack('H*', $name)); # TODO: refactor so that a command like this is just 2 commands
my $ret = '';
my @alarm_cmd = (@cmd[0..2], 'cc00', $cmd[4], '0000');
$SIG{ALRM} = sub { alarm 1; cmd_send($sock, @alarm_cmd); } unless $SIG{ALRM};
alarm 1;
my $read_bytes = my $write_bytes = 0;
my $start_time = time;
cmd_send($sock, @cmd);
sysseek($fh, 0, SEEK_SET);
while ($ret = sysread($fh, my $read_buffer, 131072, 0)) {
$read_bytes += $ret;
my @read_buffer = unpack '(a1400)*', $read_buffer;
undef $read_buffer;
while (my $write_buffer = shift @read_buffer) {
my $addl_data = $read_bytes < -s $file || scalar @read_buffer ? "\x01" : "\x00";
$write_buffer = ("\x00"x6).("\x00"x6).("\x00"x2).("\xff".$addl_data).$digest.pack('H*', sprintf('%04x', length $write_buffer)).$write_buffer;
nanosleep NS_SEND;
my $rv = syswrite($sock, $write_buffer, length $write_buffer); # TODO: need a timeout here
$write_bytes += $rv;
if (!defined($rv) && $!{EAGAIN}) {
say "EAGAIN";
} elsif ($rv != length $write_buffer) {
say "incomplete write";
} else {
my ($dst, $src, $eth, $op1, $op2, $id, $payload_len, $payload) = unpack 'a6a6a2a1a1a16a2a*', $write_buffer;
$queue->{$hexdigest} += length $payload;
say join ' ', (map { unpack 'H*', $_ } $dst, $src, $eth, $op1.$op2, $id), _trunc(unpack('H*', $payload), 8) if DEBUG > 0;
}
}
}
say "total: $queue->{$hexdigest}" if $queue->{$hexdigest};
say time - $start_time unless $subcall;
}
sub cmd_send {
my $subcall = ref $_[0] ? 1 : 0;
my $sock = ref $_[0] ? shift : &_sock;
# Command Validation:
my $is_valid_cmd = _is_valid_cmd($_[3], pack 'H*', ($_[6]//''));
@_[5,6] = ('')x2 if $is_valid_cmd == -1; # remove args from cmds that don't accept args
die sprintf "invalid command %s\n", join ' ', split /\x00/, pack 'H*', ($_[6]//'') if !$is_valid_cmd;
$_[2] = sprintf '%04x', PROTOCOL;
my $loop = 1;
$loop = $_[5] // $loop if $_[3] eq 'ff01';
@_[5,6] = ('0578', "61" x 1400) if $_[3] eq 'ff01';
my $c = 1;
my @alarm_cmd = (@_[0..2], 'cc00', $_[4], '0000');
$SIG{ALRM} = sub { alarm 1; cmd_send($sock, @alarm_cmd); } unless $SIG{ALRM};
alarm 1;
for (1..$loop) {
nanosleep NS_SEND;
$_[3] = 'ff00' if $_[3] eq 'ff01' && $c == $loop;
my $buffer = pack 'H*', join '', @_;
my $rv = syswrite($sock, $buffer, length $buffer); # TODO: need a timeout here
if (!defined($rv) && $!{EAGAIN}) {
say "EAGAIN";
} elsif ($rv != length $buffer) {
say "incomplete write";
} else {
my ($dst, $src, $eth, $op1, $op2, $id, $payload_len, $payload) = unpack 'a6a6a2a1a1a16a2a*', $buffer;
$queue->{$_[4]} += length $payload if $op1 eq "\xff";
say join ' ', (map { unpack 'H*', $_ } $dst, $src, $eth, $op1.$op2, $id), _trunc(unpack('H*', $payload), 8) if DEBUG > 0;
}
} continue {
$c++;
}
say "total: $queue->{$_[4]}" if $_[4] && $queue->{$_[4]};
}
# private subs
sub _is_valid_cmd {
my ($op1, $op2) = unpack 'a2a2', $_[0];
return 0 unless $op1 eq 'cc';
return 0 unless exists VALID_CMDS->{$op2};
my $valid_cmd = VALID_CMDS->{$op2};
return -1 unless scalar @$valid_cmd;
$_[1] =~ s/\x00/ /g;
grep { $_[1] =~ $_ } map { s/\s+/ /g; qr(^$_$) } @$valid_cmd;
}
sub _sock {
my $ifname = DIODE;
open IFINDEX, "/sys/class/net/$ifname/ifindex";
my $ifindex = <IFINDEX>;
close IFINDEX;
socket(my $sock, AF_PACKET, SOCK_RAW, 0) or die "Cannot socket() - $!\n";
bind($sock, pack_sockaddr_ll(PROTOCOL, $ifindex, 0, 0, "")) or die "Cannot bind() - $!\n";
my $flags = fcntl($sock, F_GETFL, 0) or die "Couldn't get flags for \$sock : $!\n";
$flags |= O_NONBLOCK;
fcntl($sock, F_SETFL, $flags) or die "Couldn't set flags for \$sock: $!\n";
return $sock;
}
sub _sockread {
my ($sock, $packet, $info) = @_;
if ($sock->can('wait_ring_frame')) {
$sock->wait_ring_frame($$packet, $info);
}
else {
sysread($sock, $$packet, 1500);
}
}
sub _sockring {
my $ifname = DIODE;
my $sock = IO::Socket::Packet->new(Type => SOCK_RAW, IfName => $ifname, Protocol => PROTOCOL)
or die "Cannot create PF_PACKET socket - $!";
unless( eval { $sock->setup_rx_ring(FRAME_SIZE, FRAME_NR, KERNEL_BLOCK_SIZE) } ) {
die "Cannot setup PACKET_RX_RING - $@" if $@;
die "Cannot setup PACKET_RX_RING - $!";
}
$sock->blocking(0);
return $sock;
}
sub _trunc {
my ($string, $keep_length) = @_;
$keep_length //= 4;
my $length = length($string);
# If the string is shorter or equal to 8 characters, no truncation is needed
return $string if $length <= $keep_length * 2;
# Calculate the length of the parts to keep
my $keep_start = substr($string, 0, $keep_length);
my $keep_end = substr($string, -$keep_length);
# Construct the truncated string
my $truncated = $keep_start . "..." . $keep_end . " (${\($length/2)} bytes)";
return $truncated;
}
# files and commands
sub _command_00 { say 'ping' }
sub _command_01 { say 'stop' and exit }
sub _command_10 {
my ($id, $payload) = @_;
my $file = $payload;
$queue->{$id} = {meta => {file => $file}, data => ''};
my $fh = $queue->{$id}->{fh};
close $fh if $fh;
delete $queue->{$id}->{fh} if $fh;
open $queue->{$id}->{fh}, '>', './'.$file; # TODO: use cmdline argument for outdir; TODO: create a lockfile for writing to the file
say "receiving file $file ($id)";
}
sub _command_11 {
my ($id, $payload) = @_;
my $file = $payload;
my $fh = $queue->{$id}->{fh};
close $fh if $fh;
delete $queue->{$id}->{fh} if $fh;
say "closed file $file ($id)";
}
sub _command_ff {
my ($id, $payload) = @_;
my @args = split /\x00/, $payload;
warn "executing command callback script [@args]\n"; # TODO: implement command callback script
}
sub _file {
my ($id, $addl_data, $payload) = @_;
warn "file $id not initialized\n" and return unless my $fh = $queue->{$id}->{fh};
$queue->{$id}->{meta}->{_start_time} //= time;
# uncomment to not write to disk and go faster for testing
# return if $addl_data eq "\x01";
# die time - $queue->{$id}->{meta}->{_start_time};
$queue->{$id}->{meta}->{data} .= $payload;
my $data = $queue->{$id}->{meta}->{data};
my $file = $queue->{$id}->{meta}->{file};
say sprintf "received %d bytes (%d total) of data to file %s (%s)", length $payload, (length($data) + -s $file) , $file, $id if DEBUG > 1;
return unless length $data >= 131072 || $addl_data eq "\x00";
(syswrite($fh, $data) // -1) == length $data or die "failed to write to file $file: $!\n";
$queue->{$id}->{meta}->{data} = '';
say sprintf "added %d bytes (%d total) of data to file %s (%s)", length $data, -s $file, $file, $id if DEBUG > 0;
return unless $addl_data eq "\x00";
$queue->{$id}->{meta}->{end_time} = time;
close $fh;
delete $queue->{$id}->{fh};
open $fh, '<', $file;
sysseek($fh, 0, SEEK_SET);
my $hexdigest = Digest::MD5->new->addfile($fh)->hexdigest;
close $fh;
$queue->{$id}->{meta}->{start_time} = delete $queue->{$id}->{meta}->{_start_time};
my $duration = ($queue->{$id}->{meta}->{end_time} - $queue->{$id}->{meta}->{start_time});
if ($hexdigest eq $id) {
warn sprintf "closed file $id (%d bytes) in %f seconds (%0.3fMbps)\n", -s $file, $duration, ((8 * -s $file) / $duration / 1_000_000);
warn "executing file callback script on $file\n"; # TODO: implement file callback script
}
else {
warn sprintf "closed file $id (%d bytes) in %f seconds (%0.3fMbps) (checksum mismatch)\n", -s $file, $duration, ((8 * -s $file) / $duration / 1_000_000);
unlink $file;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment