Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Perl Assíncrono
=pod
=encoding UTF-8
=head1 Perl Assíncrono
=head2 Introdução
Esse artigo é dedicado aos L<20 anos da WWW|http://www.wired.com/geekdad/2011/08/world-wide-web-20-years/> C<:)>
Para muitos programadores Web, "assíncrono" remete a Ajax (afinal, é acrônimo para I<asynchronous JavaScript and XML>).
Para alguns outros, remete a L<Node.js|http://nodejs.org/>.
Evidentemente, a participação do JavaScript (ou, mais genericamente, L<ECMAScript|http://pt.wikipedia.org/wiki/ECMAScript>)
na formação de opinião a respeito de L<programação orientada a eventos|http://pt.wikipedia.org/wiki/Programa%C3%A7%C3%A3o_orientada_a_eventos> foi significativa.
JavaScript, por sua vez, deve muito para L<Scheme|http://sao-paulo.pm.org/equinocio/2011/set/5> (assim como Perl deve muito para Lisp).
Não que a programação funcional e a orientada a eventos sejam duas faces da mesma moeda, mas ambos os conceitos requerem o mesmo "estado de espírito", por assim dizer.
E esse "estado de espírito" se aplicou muito bem no universo da I<World Wide Web>.
=head2 Um pouco de história
A abordagem de I<event loop> é uma "técnica milenar", presente no nosso dia-a-dia nas mais diversas formas.
O exemplo mais clássico é I<User Interface>: o programa fica rodando em ciclo contínuo, aguardando I<input> do usuário.
Quem nasceu na década de 80, inconscientemente (ou não) explorou a propriedade intrínsica de I<event loop>, desativando o modo "turbo" da CPU nas etapas mais difíceis dos jogos para entrar em modo "câmera lenta" (isso funcionava por que o I<timer>, muitas vezes, era um mero contador dentro do loop principal).
Historicamente, o elo mais lento de um I<event loop> foi a interface entre a cadeira e o teclado.
E gastar ciclos da CPU para aguardar I/O é muito custoso em sistemas multitarefa.
Então, abstraindo, ao invés de perguntar ao teclado toda hora se tem alguma tecla pressionada, fazemos o teclado disparar um aviso de que "algo aconteceu", e só então CPU se encarrega de verificar.
Obviamente, o que vale para o teclado, vale também para o HD.
E também para uma placa de rede.
E é aí que chegamos lá: I<Asynchronous I/O> (também conhecido como I<non-blocking I/O>) funciona muito bem para qualquer transação de dados que é significativamente mais lenta do que a CPU.
Graças à Internet, isso é particularmente útil para a comunicação B<entre> as CPUs localizadas em máquinas distintas.
No caso de um banco de dados, podemos delegar uma I<query> para um servidor e fazer outras coisas que não dependem diretamente do resultado da mesma.
Por muitos anos, sentiu-se a falta de uma linguagem comum entre programas se comunicando (no caso do banco de dados, precisaria ter cliente e servidor "conversando" através do mesmo protocolo; por exemplo, o do MySQL).
Mas essa linguagem existiu desde 1991, apesar de só ter entrado na moda quando a apelidaram de REST (I<Representational state transfer>).
A visão que L<Tim Berners-Lee|http://pt.wikipedia.org/wiki/Tim_Berners-Lee> & Cia. tiveram naquela época foi muito... além.
Bom, espero que a minha colocação sobre um elo de casualidade entre I<Asynchronous I/O> e I<World Wide Web> fique um pouco mais clara em vista dessa breve explicação.
Mas, vamos ao Perl, que se destaca em ambas as áreas C<:D>
=head2 Perl
I<Asynchronous I/O> não é módulo, nem biblioteca, nem I<framework>: é apenas um paradigma.
Portanto, pode-se fazer os programas assíncronos em Perl I<from scratch>, assim como pode-se fazer programas orientados a objetos sem usar o L<Moose>.
Seguem algumas das possibilidades:
=over
=item *
L<fork()|perlfunc/fork> (B<CORE>) - a forma mais tradicional e robusta de se fazer mais de uma coisa ao mesmo tempo, em sistemas UNIX.
Também é a única forma decente de se fazer uso do SMP (I<symmetric multiprocessing>).
Simplificando, a chamada de C<fork()> duplica o código atualmente executado, com todos os I<handles> abertos, I<buffers>, variáveis, etc.
Daí em diante, o processo-pai e o processo-filho seguem independentes.
Agora, o que isso tem a ver com I<Asynchronous I/O>?
Por si só, pouca coisa, mas, acrescentando ao C<fork()> os sinais, filas, I<pipes>, semáforos e memória compartilhada, dá para ir muito longe.
Um começo é a página L<Perl interprocess communication|perlipc>.
=item *
L<threads> (B<CORE>) - B<controverso>!
Diferentemente de muitas das coisas do Perl, nem sempre se comporta da forma que você espera.
Ou seja, requer um conhecimento técnico bastante elaborado.
Também não é muito amigável com os I<frameworks> de eventos (o L<EV> ao menos lida bem com o C<fork()>).
=item *
L<AnyEvent> - conforme a documentação do próprio módulo diz, está para os I<frameworks> de loops de eventos assim como L<DBI> está para os bancos de dados.
Por essa razão, será o foco do presente artigo.
=item *
L<Coro> - implementa I<coroutines> em Perl.
Diga-se de passagem, implementa de uma forma muito engenhosa.
E o que seriam I<coroutines>?
Simplificando, é mais ou menos o mesmo mecanismo que o sistema operacional emprega para rodar vários programas "simultaneamente" na mesma CPU.
Isso é, guarda estado do que está rodando, "congela", roda outro, guarda estado, "congela", e assim por diante.
=item *
L<IO::Lambda> - outra abordagem interessante, implementa I/O assíncrono como um L<cálculo lambda|http://pt.wikipedia.org/wiki/C%C3%A1lculo_lambda>.
I<Lispers gonna Lisp>.
=item *
L<EV>, L<Event>, L<IO::Async>, L<POE> - diversos I<frameworks> de I/O assíncrono.
A parte boa é: todos servem como I<backend> para o L<AnyEvent>!
=back
=head2 AnyEvent
A razão de ser do L<AnyEvent> é providenciar uma API compatível, portável e leve para programas assíncronos/orientados a eventos.
Apesar de ter um I<backend> próprio, implementado em I<Pure Perl>, o L<AnyEvent> pode fazer uso de módulos e bibliotecas especializados.
Por exemplo, é possível aproveitar o loop de eventos do próprio L<Tk|http://sao-paulo.pm.org/equinocio/2011/set/6>.
Todavia, o I<backend> que mais se destaca é o L<EV>: além de apresentar o melhor desempenho, é o único (até onde eu saiba) que suporta C<fork()> completamente.
Então, para prosseguir com os exemplos desse artigo:
perl -MCPAN -e "install EV"
perl -MCPAN -e "install AnyEvent"
=head2 Cliente HTTP
B<Atenção!>
Poupe-se do trabalho de copiar e colar o código picotado! O link para a versão completa está em L</"Referências">!
Um cliente HTTP é provavelmente a coisa mais simples que faça algo relativamente útil.
Também ilustra bem a disparidade existente entre o processamento e o I/O:
/usr/bin/time curl -o /dev/null http://www.cpan.org/src/5.0/perl-5.14.1.tar.gz
...
0.03user 0.14system 1:42.13elapsed 0%CPU (0avgtext+0avgdata 12752maxresident)k
280inputs+0outputs (1major+893minor)pagefaults 0swaps
Traduzindo: o processo que fez download do código-fonte do Perl levou um minuto e quarenta e dois segundos para completar, porém empregou uma quantidade irrisória de ciclos de CPU para isso.
O resto do tempo ficou parado, aguardando I/O.
Aqui reescrevi o exemplo da L<documentação oficial do AnyEvent|AnyEvent::Intro>, só fiz questão em utilizar L<HTTP::Request> para o processamento dos I<headers>.
Começamos com o nosso I<boilerplate>:
#!/usr/bin/env perl
use common::sense;
use AnyEvent;
use Data::Printer;
use HTTP::Request;
use constant CRLF => "\015\012";
Um destaque especial para L<common::sense>, que, assim como o L<AnyEvent> e o L<Coro>, é da autoria do L<Marc Lehmann|https://metacpan.org/author/MLEHMANN>.
Em suma, é um atalho para:
use feature qw(say state switch);
use strict;
use utf8;
use warnings;
Só que gasta menos memória (!) do que dar um C<use> em todos eles, um a um.
E outro destaque para o L<Data::Printer|http://sao-paulo.pm.org/equinocio/2011/set/10>, um excelente visualizador de dados do L<Breno G. de Oliveira|https://metacpan.org/author/GARU>.
Agora, registramos a I<conditional variable> e I<callback> para processar a resposta:
my $cv = AE::cv;
my $cb = sub {
p @_;
$cv->send;
};
I<Conditional variable> é o alicerce do L<AnyEvent>: através dela declaramos ao loop de eventos que estamos esperando alguma coisa.
No caso, quando C<$cb> será executado, enviará um C<send()> ao respectivo C<recv()> (a ser declarado).
Enquanto isso, montamos o nosso L<HTTP::Request>:
my $req = new HTTP::Request(GET => 'http://www.cpan.org/');
$req->header(Host => $req->uri->host_port);
$req->header(User_Agent => "AnyEvent/$AE::VERSION Perl/$] ($^O)");
E um L<AnyEvent::Handle> para a nossa conexão:
my $buf = '';
my $h;
$h = new AnyEvent::Handle
connect => [$req->uri->host => $req->uri->port],
on_eof => sub {
$cb->(HTTP::Response->parse($buf));
$h->destroy;
},
on_error => sub {
$cb->(HTTP::Response->new(500, $!));
$h->destroy;
};
Alguns dos eventos (C<on_eof>, C<on_error>) já são declarados na instanciação do I<handle>.
Um cuidado especial deve ser tomado com os escopos léxicos: C<$h> precisa ser visível nos I<closures> dos eventos, por isso, C<my $h = new AnyEvent::Handle> não daria certo!
Felizmente, L<AnyEvent> não é 100% assíncrono, e preserva a nossa sanidade através do emprego de filas para dados em I<streams>.
Assim, podemos colocar a nossa requisição na fila do I<handle>:
$h->push_write(
$req->method . ' ' . $req->uri->path_query . ' HTTP/1.0' . CRLF .
$req->headers->as_string(CRLF) . CRLF .
$req->content
);
Para receber a resposta, mais um I<callback>, e fechamos com C<recv()> aguardando o respectivo C<send()> através do C<$cv>:
$h->on_read(
sub {
my ($h) = @_;
$buf .= $h->rbuf;
$h->rbuf = '';
}
);
$cv->recv;
Detalhe importante: o I<buffer> do I<handle> precisa ser esvaziado! Então, concatenamos o seu conteúdo no nosso C<$buf>, que será processado no final da conexão (C<on_eof>).
Em suma, o nosso fluxo é: cadastrar os I<callbacks> para os eventos e ativar o loop através da I<conditional variable>.
Conforme os eventos vão acontecendo, os I<callbacks> são disparados.
A sequência dos acontecimentos raramente segue a organização lógica do código-fonte.
Tanto que, nesse exemplo, o I<callback> que termina o loop é o primeiro a ser declarado!
Isso pode parecer desmotivante, especialmente para uma aplicação tão simples e que não utilizou nenhuma vantagem da abordagem assíncrona.
Mas calma que estamos chegando lá C<:D>
=head2 Servidor HTTP
Tradicionalmente (tradição essa que provém do L<Apache|http://httpd.apache.org/> e antes dele, do L<NCSA|http://en.wikipedia.org/wiki/NCSA_HTTPd>), os servidores HTTP tendem a empregar o velho e bom C<fork()> para lidar com várias conexões simultâneas.
Ou seja: para cada cliente que conecta, é criado um I<worker> exclusivo, que atende àquele cliente enquanto o servidor está de prontidão para as outras conexões.
Porém o C<fork()> é relativamente custoso, por mais otimizado que seja (não se copia o processo inteiro; basta replicar apenas as páginas I<writable> da memória, e reaproveitar as I<read-only> em todas as cópias).
I<Threads> (não especificamente os do Perl) são mais eficientes, ainda assim o custo para organizar a bagaça toda é bastante elevado.
Eis que surge o L<problema C10k|http://en.wikipedia.org/wiki/C10k_problem>: como lidar com uma quantidade alta de conexões simultâneas?
Rodar 10 mil processos é algo perturbador.
O Apache atualmente lida com as conexões de várias formas, inclusive é possível combinar C<fork()> e I<threads>, entretanto, a sua arquitetura limita bastante o desempenho.
Assim, faz-se necessário combinar o Apache com L<nginx|http://www.nginx.org/> ou L<lighttpd|http://www.lighttpd.net/>.
E como esses últimos conseguem a proeza de atender a C10k?
I<Asynchronous I/O>!
O servidor que desenvolvi para servir como exemplo aqui está longe disso, ainda assim, é evidentemente muito mais veloz do que o L<Net::Server::HTTP>:
Server Software: Net::Server::HTTP/0.99
Server Hostname: 127.0.0.1
Server Port: 8080
...
Requests per second: 96.96 [#/sec] (mean)
Time per request: 103.140 [ms] (mean)
Time per request: 10.314 [ms] (mean, across all concurrent requests)
Transfer rate: 71.86 [Kbytes/sec] received
Server Software: AnyEvent/6.02
Server Hostname: 127.0.0.1
Server Port: 8888
...
Requests per second: 1801.45 [#/sec] (mean)
Time per request: 5.551 [ms] (mean)
Time per request: 0.555 [ms] (mean, across all concurrent requests)
Transfer rate: 2274.68 [Kbytes/sec] received
Então, vamos começar:
#!/usr/bin/env perl
use common::sense;
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Log;
use AnyEvent::Socket;
use HTTP::Headers;
use HTTP::Response;
use constant CRLF => "\015\012";
use constant MAXCONN => 100;
use constant TIMEOUT => 10;
$AnyEvent::Log::FILTER->level('debug');
Um I<feature> muito bacana do L<AnyEvent> é que ele tem um L<sisteminha de log|AnyEvent::Log>.
Apesar do próprio autor o descrever como simplório, para mim, pessoalmente, substitui o L<Log::Log4perl> em muitos casos.
Inicializando o servidor:
our %pool;
my $srv = tcp_server(
'127.0.0.1' => 8888,
sub {
my ($fh, $host, $port) = @_;
# código do servidor
...;
}
);
AE->cv->wait;
Como o objetivo do servidor é ficar permanentemente disponível até ser explicitamente terminado, não há necessidade de tratamento de I<conditional variable>.
É só dar um C<wait()>, que faz o papel do C<recv()>.
Adentrando o código do servidor, as coisas começam a ficar interessantes.
Assim exercemos o controle de limite de conexões:
if (scalar keys %pool > MAXCONN) {
AE::log error =>
"deny connection from $host:$port (too many connections)\n";
return;
} else {
AE::log warn =>
"new connection from $host:$port\n";
}
E instanciamos o L<AnyEvent::Handle> para cada conexão:
my $h = new AnyEvent::Handle(
fh => $fh,
on_eof => \&cleanup,
on_error => \&cleanup,
timeout => TIMEOUT,
);
$pool{fileno($fh)} = $h;
AE::log debug =>
sprintf "%d connection(s) in pool\n", scalar keys %pool;
O hash C<%pool> é essencial não apenas para o controle de número de conexões.
Como o I<handle> é local ao escopo léxico do I<closure>, é detonado pelo I<garbage collector> assim que o I<closure> termina, o que impede a execução dos seus C<callbacks>.
Então, o C<%pool> serve também para preservar as referências aos I<handles>.
Agora, enfileiramos o tratamento da primeira linha da requisição:
my ($req, $hdr);
$h->push_read(regex => qr{\015?\012}, sub {
my ($h, $data) = @_;
$data =~ s/\s+$//s;
$req = $data;
AE::log debug => "request: [$req]\n";
});
(de novo, C<$req> e C<$hdr> estão declarados B<fora> dos I<closures>)
Este evento será disparado quando aparecer o primeiro I<line terminator> no I<stream>.
Em outras palavras: o código acima pega a primeira linha da requisição (C<GET / HTTP/1.0>).
E para tratar o resto da requisição, que consiste de I<headers> (obrigatoriamente) e I<content> (opcional):
$h->push_read(regex => qr{(\015?\012){2}}, sub {
my ($h, $data) = @_;
$hdr = $data;
AE::log debug => "got headers\n";
if ($hdr =~ m{\bContent-length:\s*(\d+)\b}is) {
AE::log debug => "expecting content\n";
$h->push_read(chunk => int($1), sub {
my ($h, $data) = @_;
reply($h, $req, $hdr, $data);
});
} else {
reply($h, $req, $hdr);
}
});
Nesse caso, temos uma decisão a tomar: se vier I<header> com C<Content-length>, cadastramos mais um evento que pega a quantidade de I<octets> especificada.
Senão, estamos feitos.
A rotina de limpeza (que vinculamos a C<on_eof> e C<on_error>) faz de tudo para terminar a conexão:
sub cleanup {
my ($h, $fatal, $msg) = @_;
AE::log debug => "closing connection\n";
my $id = fileno($h->{fh});
delete $pool{$id} if defined $id;
eval {
no warnings;
shutdown $h->{fh}, 2;
};
$h->destroy;
};
Aliás, no fluxo normal (isso é, mais ou menos de acordo com o protocolo HTTP 1.0 como o conheço), a rotina de C<reply()> deverá ser chamada B<antes> da C<cleanup()>.
Normalmente, a requisição termina após uma linha em branco (dois I<line terminators> em seguida), ou, se for POST, após esgotar todos os I<octets> especificados no I<header>.
Formulando a resposta para o cliente, através do L<HTTP::Response>:
sub reply {
my ($h, $req, $hdr, $content) = @_;
my $res = HTTP::Response->new(
200 => 'OK',
HTTP::Headers->new(
Connection => 'close',
Content_Type => 'text/html; charset=utf-8',
Server => "AnyEvent/$AE::VERSION Perl/$] ($^O)",
)
);
$res->date(time);
$res->protocol('HTTP/1.0');
if ($req =~ m{^(GET|HEAD|POST|PUT)\s+(.+)\s+(HTTP/1\.[01])$}i) {
my ($method, $uri, $protocol) = ($1, $2, $3);
AE::log debug => "sending response\n";
$res->content("Hello, World!");
} else {
AE::log error => "bad request\n";
$res->code(400);
$res->message('Bad Request');
$res->content('Bad Request');
}
$h->push_write($res->as_string(CRLF));
cleanup($h);
};
Este servidorzinho só responde I<Hello, World!> para os métodos GET, HEAD, POST e PUT, para qualquer L<URI> (ou erro I<400 - Bad Request> para requisições desconhecidas).
É trivial combiná-lo com L<File::Slurp> para servir conteúdo estático, mas isso cabe a você, caro leitor C<;)>
=head2 Proxy HTTP
Perl é famoso por ser uma linguagem-cola (I<glue>).
Como bônus, e numa jogada de reaproveitamento via I<Copy & Paste>, juntemos os nossos cliente + servidor HTTP para fazer um I<proxy>.
Para isso, o L</"Cliente HTTP"> foi "encapsulado" numa subrotina, C<http_req()>.
E no L</"Servidor HTTP">, substituímos a rotina C<reply()> por esta:
sub reply {
my ($h, $req, $hdr, $content) = @_;
if ($req =~ m{^(GET|HEAD|POST|PUT)\s+(https?://.+)\s+(HTTP/1\.[01])$}i) {
# código para métodos HTTP
...;
} elsif ($req =~ m{^CONNECT\s+([\w\.\-]+):(\d+)\s+(HTTP/1\.[01])$}i) {
# código para túnel
...;
} else {
AE::log error => "bad request\n";
$h->push_write(
HTTP::Response->new(
400 => 'Bad Request',
undef, 'Bad Request'
)->as_string(CRLF)
);
cleanup($h);
}
}
A principal diferença está na adição do método CONNECT, que permite o acesso a túnel encriptado HTTPS via I<proxy>.
Formular a resposta para os métodos C</(GET|HEAD|POST|PUT)/> consiste em desmontar os I<headers>, construir uma requisição, enviá-la e retornar a resposta:
my ($method, $uri, $protocol) = ($1, $2, $3);
my $headers = HTTP::Headers->new;
for (split /\015?\012/, $hdr) {
$headers->header($1 => $2) if m{^\s*([\w\-]+)\s*:\s*(.+)$};
}
$headers->remove_header('Proxy-Connection');
$headers->header(Connection => 'close');
AE::log debug => "fetching $uri\n";
http_req
new HTTP::Request($method => $uri, $headers, $content),
sub {
my $n = 2**12; # 4KB
my $buf = $_[0]->as_string(CRLF);
my $len = length $buf;
AE::log debug => "sending $uri ($len bytes)\n";
$h->push_write($_)
for unpack "a$n" x ($len / $n - 1) . 'a*', $buf;
AE::timer 0, 0, sub { cleanup($h); };
}
O problema dessa abordagem é que se faz necessário aguardar o término da requisição encaminhada, para só então retornar a resposta.
Por um lado, isso leva bastante tempo, em que o cliente fica simplesmente esperando o I<proxy> baixar a resposta.
Por outro, a resposta é armazenada inteiramente na memória, então, baixe ISO através desse I<proxy> por conta e risco!
E, para terminar, é necessário fragmentar a resposta para enviá-la através do C<push_write> (pelo menos, com L<EV> como I<backend>).
Em contrapartida, o tratamento para CONNECT é muito mais elegante:
my ($peer_host, $peer_port, $protocol) = ($1, $2, $3);
AE::log debug => "connecting to $peer_host:$peer_port\n";
my $peer_h;
$peer_h = new AnyEvent::Handle
connect => [$peer_host => $peer_port],
on_eof => sub {
$peer_h->destroy;
cleanup($h);
},
on_error => sub {
$peer_h->destroy;
cleanup($h);
},
on_connect => sub {
AE::log debug => "connected to $peer_host:$peer_port\n";
$h->push_write(
"HTTP/1.0 200 Connection established" .
(CRLF x 2)
);
$h->on_read(
sub {
AE::log debug => "send to $peer_host:$peer_port\n";
$peer_h->push_write($_[0]->rbuf);
$_[0]->rbuf = '';
}
);
$peer_h->on_read(
sub {
AE::log debug => "recv from $peer_host:$peer_port\n";
$h->push_write($_[0]->rbuf);
$_[0]->rbuf = '';
}
);
};
Ao receber um CONNECT, o I<proxy> se conecta a endereço/porta especificados e faz um "curto-circuito", vinculando a saída do cliente à entrada do servidor remoto, e vice-versa.
Quanto à funcionalidade: zerei o I<cache> do meu L<Chrome|http://www.google.com/chrome>, apontei ele para usar o I<proxy> recém-criado, e reiniciei.
Abriu todas as minhas abas, entre elas a interface do Gmail e do Google Reader, e funcionou decentemente por algumas horas.
Mas, é claro, cada caso é um caso, então não me responsabilizo se o seu SSD pifar após o uso de qualquer um dos meus códigos C<:P>
=head2 Apêndice: AnyEvent::Util
Lembrando: L<AnyEvent> é uma API para I<frameworks> de loops de eventos.
De forma alguma sequer um deles se encarrega em distribuir o processamento entre as várias CPUs que você tem!
Traduzindo: se você tem um sistema com 8 núcleos, não espere que os seus programas que empreguem o paradigma assíncrono automagicamente passem a aproveitar os seus recursos de I<hardware>.
Para delegar o serviço pesado a outras CPUs, você ainda precisa de um C<fork()>.
Recapitulando: se o seu problema é I/O, a sua solução é (provavelmente) L<AnyEvent>.
E, se o seu problema é processamento, a solução é C<fork()> (L<threads> não é solução, é C<$problema ** 2>, I<sorry>).
Agora, se você precisa processar lotes de dados e receber os resultados...
Você precisa de mais um I<glue>: L<AnyEvent::Util>.
Como um (mau) exemplo, segue a implementação do L<sleep sort|http://dis.4chan.org/read/prog/1295544154>:
#!/usr/bin/env perl
use common::sense;
use AnyEvent::Util;
use List::Util qw(shuffle);
my $cv = AE::cv;
$AnyEvent::Util::MAX_FORKS = 10;
for my $i (shuffle(1 .. 10)) {
$cv->begin;
fork_call {
sleep $i;
} sub {
say shift;
$cv->end;
};
}
$cv->wait;
Obviamente, espera-se que o programador utilize algo envolvendo o módulo L<Graph> no lugar do C<sleep()>, para fazer jus à técnica.
Neste momento, estamos prontos para dar um novo significado à expressão "mal gosto":
#!/usr/bin/env perl
use common::sense;
use AnyEvent::Util;
use List::Util qw(shuffle);
my $cv = AE::cv;
for my $i (shuffle(1 .. 10)) {
$cv->begin;
run_cmd ['/bin/sleep', $i],
'>' => sub {
say $i;
$cv->end;
};
}
$cv->wait;
É exatamente o que parece: I<sleep sort> chamando o processo L<sleep(1)> externo.
Mas é claro que você enxergou aqui a possibilidade de interfacear com o L<xz(1)> sem apostar todas as fichas no L<IO::Compress::Lzma>.
=head2 Considerações
Algumas observações para escapar da armadilha do I<spaghetti code>:
=over
=item *
Os programas orientados a eventos tendem a crescer "na diagonal".
Isso é contornável criando subrotinas a torto e a direito...
O que pode causar outro problema.
=item *
O escopo das variáveis pode causar uma grande confusão!
Um C<my> mal-empregado num I<closure> pode se tornar uma grande ameaça à saúde mental.
=item *
Atenção redobrada para I<conditional variables>: tudo o que sobe tem que descer!
Um C<recv()> é anulado por B<qualquer> C<send()>.
Para exercer controle sobre eventos do mesmo tipo rodando em paralelo, empregue C<begin()>/C<end()>: C<begin()> incrementa o contador da I<conditional variables>, e C<end()> o decrementa.
=item *
Debugar programas assíncronos não é divertido.
Pare para pensar e trate de acertar na primeira.
É sério.
=item *
O L<namespace AnyEvent|https://metacpan.org/search?q=AnyEvent> tem muita coisa boa.
Pesquise antes de fazer você mesmo.
No pior caso (quando não tem exatamente aquilo que você pretende fazer), aproveite o módulo mais similar/simples como I<boilerplate> C<;)>
=back
=head2 Referências
=over
=item *
O código completo deste artigo, com os exemplos consolidados: L<https://gist.github.com/781246>
=item *
L<How I Explained REST to My Wife|http://tomayko.com/writings/rest-to-my-wife>
=item *
L<Hypertext Transfer Protocol - HTTP E<sol> 1.0|http://tools.ietf.org/html/rfc1945>
=item *
L<Hypertext Transfer Protocol - HTTP E<sol> 1.1|http://tools.ietf.org/html/rfc2616>
=item *
L<teepeedee2|http://cliki.net/teepeedee2>
=back
=head2 Autor
Stanislaw Pusep L<stas@sysd.org|mailto:stas@sysd.org>
Blog: L<http://sysd.org/>
GitHub: L<https://github.com/creaktive>
=head2 Licença
Este texto está licenciado sob os termos da Creative Commons by-sa,
L<http://creativecommons.org/licenses/by-sa/3.0/br/>
=begin pod:xhtml
<center>
<a rel="license" href="http://creativecommons.org/licenses/by-sa/3.0/br/"><img alt="Licença Creative Commons" style="border-width:0" src="http://i.creativecommons.org/l/by-sa/3.0/88x31.png" /></a><br />This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by-sa/3.0/br/">Creative Commons Attribution-ShareAlike License</a>.
</center>
=end pod:xhtml
=cut
#!/usr/bin/env perl
use common::sense;
use AnyEvent;
use AE::HTTP::Tiny;
use Data::Printer colored => 1;
use HTTP::Request;
my $cv = AE::cv;
http_req
new HTTP::Request(GET => $ARGV[0]),
sub {
p @_;
$cv->send;
};
$cv->recv;
#!/usr/bin/env perl
use common::sense;
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Log;
use AnyEvent::Socket;
use HTTP::Headers;
use HTTP::Request;
use HTTP::Response;
use constant CRLF => "\015\012";
use constant MAXCONN => 100;
use constant TIMEOUT => 10;
$AnyEvent::Log::FILTER->level('debug');
our %pool;
my $srv = tcp_server(
'127.0.0.1' => 8888,
sub {
my ($fh, $host, $port) = @_;
if (scalar keys %pool > MAXCONN) {
AE::log error =>
"deny connection from $host:$port (too many connections)\n";
return;
} else {
AE::log warn =>
"new connection from $host:$port\n";
}
my $h = new AnyEvent::Handle(
fh => $fh,
on_eof => \&cleanup,
on_error => \&cleanup,
timeout => TIMEOUT,
);
$pool{fileno($fh)} = $h;
AE::log debug =>
sprintf "%d connection(s) in pool\n", scalar keys %pool;
my ($req, $hdr);
$h->push_read(regex => qr{\015?\012}, sub {
my ($h, $data) = @_;
$data =~ s/\s+$//s;
$req = $data;
AE::log debug => "request: [$req]\n";
});
$h->push_read(regex => qr{(\015?\012){2}}, sub {
my ($h, $data) = @_;
$hdr = $data;
AE::log debug => "got headers\n";
if ($hdr =~ m{\bContent-length:\s*(\d+)\b}is) {
AE::log debug => "expecting content\n";
$h->push_read(chunk => int($1), sub {
my ($h, $data) = @_;
reply($h, $req, $hdr, $data);
});
} else {
reply($h, $req, $hdr);
}
});
}
);
AE->cv->wait;
sub cleanup {
my ($h, $fatal, $msg) = @_;
AE::log debug => "closing connection\n";
my $id = fileno($h->{fh});
delete $pool{$id} if defined $id;
eval {
no warnings; ## no critic
shutdown $h->{fh}, 2;
};
$h->destroy;
};
use File::Slurp;
sub reply {
my ($h, $req, $hdr, $content) = @_;
my $res = HTTP::Response->new(
200 => 'OK',
HTTP::Headers->new(
Connection => 'close',
Content_Type => 'text/html; charset=utf-8',
Server => "AnyEvent/$AE::VERSION Perl/$] ($^O)",
)
);
$res->date(time);
$res->protocol('HTTP/1.0');
if ($req =~ m{^(GET|HEAD|POST|PUT)\s+(.+)\s+(HTTP/1\.[01])$}i) {
my ($method, $uri, $protocol) = ($1, $2, $3);
AE::log debug => "sending response\n";
$res->content(scalar read_file(\*DATA));
} else {
AE::log error => "bad request\n";
$res->code(400);
$res->message('Bad Request');
$res->content('Bad Request');
}
$h->push_write($res->as_string(CRLF));
cleanup($h);
};
__DATA__
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2//EN">
<html>
<head>
<title></title>
</head>
<body>
<p>
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec
ut est diam. Aenean id dui non turpis fringilla dictum eget non
lectus. Nullam commodo augue quis magna volutpat eleifend. Mauris
aliquam gravida tempus. Duis consectetur nunc eget sem auctor et
congue risus ullamcorper. Ut luctus sem massa, id venenatis
massa. Mauris ac felis mi, scelerisque convallis magna.
Vestibulum volutpat commodo convallis. Nullam euismod tortor et
metus sollicitudin tempor. Curabitur et sapien vel augue tempor
mollis imperdiet eu enim. Proin vel ultrices sapien. Vestibulum
ante ipsum primis in faucibus orci luctus et ultrices posuere
cubilia Curae; Cras aliquet, augue vel blandit bibendum, nunc
odio dignissim nunc, interdum dignissim libero est vel ligula.
Fusce nec odio ac neque condimentum hendrerit vitae vel nisl.
</p>
</body>
</html>
#!/usr/bin/env perl
use common::sense;
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Log;
use AnyEvent::Socket;
use HTTP::Headers;
use HTTP::Request;
use HTTP::Response;
use constant CRLF => "\015\012";
use constant MAXCONN => 100;
use constant TIMEOUT => 10;
$AnyEvent::Log::FILTER->level('debug');
our %pool;
my $srv = tcp_server(
'127.0.0.1' => 8888,
sub {
my ($fh, $host, $port) = @_;
if (scalar keys %pool > MAXCONN) {
AE::log error =>
"deny connection from $host:$port (too many connections)\n";
return;
} else {
AE::log warn =>
"new connection from $host:$port\n";
}
my $h = new AnyEvent::Handle(
fh => $fh,
on_eof => \&cleanup,
on_error => \&cleanup,
timeout => TIMEOUT,
);
$pool{fileno($fh)} = $h;
AE::log debug =>
sprintf "%d connection(s) in pool\n", scalar keys %pool;
my ($req, $hdr);
$h->push_read(regex => qr{\015?\012}, sub {
my ($h, $data) = @_;
$data =~ s/\s+$//s;
$req = $data;
AE::log debug => "request: [$req]\n";
});
$h->push_read(regex => qr{(\015?\012){2}}, sub {
my ($h, $data) = @_;
$hdr = $data;
AE::log debug => "got headers\n";
if ($hdr =~ m{\bContent-length:\s*(\d+)\b}is) {
AE::log debug => "expecting content\n";
$h->push_read(chunk => int($1), sub {
my ($h, $data) = @_;
reply($h, $req, $hdr, $data);
});
} else {
reply($h, $req, $hdr);
}
});
}
);
AE->cv->wait;
sub cleanup {
my ($h, $fatal, $msg) = @_;
AE::log debug => "closing connection\n";
my $id = fileno($h->{fh});
delete $pool{$id} if defined $id;
eval {
no warnings; ## no critic
shutdown $h->{fh}, 2;
};
$h->destroy;
};
use AE::HTTP::Tiny;
sub reply {
my ($h, $req, $hdr, $content) = @_;
if ($req =~ m{^(GET|HEAD|POST|PUT)\s+(https?://.+)\s+(HTTP/1\.[01])$}i) {
my ($method, $uri, $protocol) = ($1, $2, $3);
my $headers = HTTP::Headers->new;
for (split /\015?\012/, $hdr) {
$headers->header($1 => $2) if m{^\s*([\w\-]+)\s*:\s*(.+)$};
}
$headers->remove_header('Proxy-Connection');
$headers->header(Connection => 'close');
AE::log debug => "fetching $uri\n";
http_req
new HTTP::Request($method => $uri, $headers, $content),
sub {
my $n = 2**12; # 4KB
my $buf = $_[0]->as_string(CRLF);
my $len = length $buf;
AE::log debug => "sending $uri ($len bytes)\n";
$h->push_write($_)
for unpack "a$n" x ($len / $n - 1) . 'a*', $buf;
AE::timer 0, 0, sub { cleanup($h); };
}
} elsif ($req =~ m{^CONNECT\s+([\w\.\-]+):(\d+)\s+(HTTP/1\.[01])$}i) {
my ($peer_host, $peer_port, $protocol) = ($1, $2, $3);
AE::log debug => "connecting to $peer_host:$peer_port\n";
my $peer_h;
$peer_h = new AnyEvent::Handle
connect => [$peer_host => $peer_port],
on_eof => sub {
$peer_h->destroy;
cleanup($h);
},
on_error => sub {
$peer_h->destroy;
cleanup($h);
},
on_connect => sub {
AE::log debug => "connected to $peer_host:$peer_port\n";
$h->push_write(
"HTTP/1.0 200 Connection established" .
(CRLF x 2)
);
$h->on_read(
sub {
AE::log debug => "send to $peer_host:$peer_port\n";
$peer_h->push_write($_[0]->rbuf);
$_[0]->rbuf = '';
}
);
$peer_h->on_read(
sub {
AE::log debug => "recv from $peer_host:$peer_port\n";
$h->push_write($_[0]->rbuf);
$_[0]->rbuf = '';
}
);
};
} else {
AE::log error => "bad request\n";
$h->push_write(
HTTP::Response->new(
400 => 'Bad Request',
undef, 'Bad Request'
)->as_string(CRLF)
);
cleanup($h);
}
}
package AE::HTTP::Tiny;
use common::sense;
use AnyEvent::Handle;
use HTTP::Response;
use constant CRLF => "\015\012";
sub http_req ($&) { ## no critic
my ($req, $cb) = @_;
return $cb->(HTTP::Response->new(500, 'Usage: http_req(HTTP::Request->new(...), sub {...})'))
if
ref($req) ne 'HTTP::Request'
or ref($cb) ne 'CODE';
return $cb->(HTTP::Response->new(500, 'Unsupported Protocol'))
unless $req->uri->scheme =~ m{^https?$};
$req->header(Content_Length => length $req->content) if $req->content;
$req->header(Host => $req->uri->host_port);
$req->header(User_Agent => "AnyEvent/$AE::VERSION Perl/$] ($^O)");
my $buf = '';
my $h;
$h = new AnyEvent::Handle
connect => [$req->uri->host => $req->uri->port],
on_eof => sub {
$cb->(HTTP::Response->parse($buf));
$h->destroy;
},
on_error => sub {
$cb->(HTTP::Response->new(500, $!));
$h->destroy;
},
tls => ($req->uri->scheme eq 'https') ? 'connect' : undef;
$h->push_write(
$req->method . ' ' . $req->uri->path_query . ' HTTP/1.0' . CRLF .
$req->headers->as_string(CRLF) . CRLF .
$req->content
);
$h->on_read(
sub {
my ($h) = @_;
$buf .= $h->rbuf;
$h->rbuf = '';
}
);
}
use base 'Exporter';
our @EXPORT = qw(http_req);
1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment