Last active
August 21, 2018 22:55
-
-
Save preaction/925d02c9831b7758d42a43e0061d3145 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package Local::Format; | |
BEGIN { $INC{'Local/Format.pm'} = __FILE__ }; | |
sub new { | |
my ( $class, %opt ) = @_; | |
$opt{delimiter} ||= ','; | |
return bless \%opt, $class; | |
} | |
package Local::Format::csv; | |
BEGIN { $INC{'Local/Format/csv.pm'} = __FILE__ }; | |
use parent 'Local::Format'; | |
use feature qw( state ); | |
use Text::CSV; | |
sub read_buffer { | |
my ( $self, $buffref, $eof ) = @_; | |
state $csv = Text::CSV->new( { sep_char => $self->{delimiter} } ); | |
state $names = []; | |
my @docs; | |
while ( $$buffref =~ s/^(.*\n)// ) { | |
my $line = $1; | |
if ( !@$names ) { | |
$csv->parse( $line ); | |
@$names = $csv->fields; | |
} | |
else { | |
my $status = $csv->parse( $line ); | |
my @fields = $csv->fields; | |
my $doc = { | |
map {; $names->[ $_ ] => $fields[ $_ ] } | |
0..$#fields | |
}; | |
push @docs, $doc; | |
} | |
} | |
return @docs; | |
} | |
sub format { | |
my ( $self, $doc ) = @_; | |
state $csv = Text::CSV->new; | |
state $names = []; | |
if ( !@$names ) { | |
@$names = sort keys %$doc; | |
} | |
state $wrote_header = 0; | |
my $str = ''; | |
if ( !$wrote_header ) { | |
$csv->combine( @$names ); | |
$str .= $csv->string . $/; | |
$wrote_header = 1; | |
} | |
$csv->combine( map { $doc->{ $_ } } @$names ); | |
$str .= $csv->string . $/; | |
return $str; | |
} | |
package Local::Format::json; | |
BEGIN { $INC{'Local/Format/json.pm'} = __FILE__ }; | |
use parent 'Local::Format'; | |
use feature qw( state ); | |
use JSON::PP; | |
sub read_buffer { | |
my ( $self, $buffref, $eof ) = @_; | |
state $json = JSON::PP->new->relaxed; | |
my @docs; | |
# Work around a bug in JSON::PP: incr_parse() only returns the | |
# first item, see: https://github.com/makamaka/JSON-PP/pull/7 | |
# Adapted from IO::Async::JSONStream by Paul Evans | |
$json->incr_parse( $$buffref ); | |
$$buffref = ''; | |
PARSE_ONE: { | |
my $doc; | |
my $fail = not eval { | |
$doc = $json->incr_parse; | |
1 | |
}; | |
chomp( my $e = $@ ); | |
if ( $doc ) { | |
push @docs, $doc; | |
redo PARSE_ONE; | |
} | |
elsif ( $fail ) { | |
# XXX: Parse failure | |
$json->incr_skip; | |
redo PARSE_ONE; | |
} | |
# else last | |
} | |
return @docs; | |
} | |
sub format { | |
my ( $self, $doc ) = @_; | |
state $json = JSON::PP->new->canonical->pretty->indent_length(3)->allow_nonref; | |
return $json->encode( $doc ); | |
} | |
package Local::Format::yaml; | |
BEGIN { $INC{'Local/Format/yaml.pm'} = __FILE__ }; | |
use parent 'Local::Format'; | |
use YAML qw( Load Dump ); | |
sub read_buffer { | |
my ( $self, $buffref, $eof ) = @_; | |
if ( $$buffref =~ s/(.+(?:\n---[^\n]*\n|\Z))//s ) { | |
my @docs = YAML::Load( $1 ); | |
return @docs; | |
} | |
} | |
sub format { | |
my ( $self, $doc ) = @_; | |
return YAML::Dump( $doc ); | |
} | |
package Local::Format::default; | |
use parent 'Local::Format'; | |
use Module::Runtime qw( use_module ); | |
sub new { | |
my ( $class, @args ) = @_; | |
my $format = $ENV{YERTL_FORMAT} || 'yaml'; | |
my $format_class = "Local::Format::$format"; | |
return use_module( $format_class )->new( @args ); | |
} | |
package main; | |
use ETL::Yertl; | |
use Getopt::Long qw( GetOptions :config pass_through ); | |
use IO::Async::Loop; | |
use IO::Async::Stream; | |
GetOptions( \my %format_opt, | |
'delimiter|d=s', | |
); | |
my $loop = IO::Async::Loop->new; | |
my $out_format = Local::Format::default->new; | |
my $stdout = IO::Async::Stream->new_for_stdout; | |
$loop->add( $stdout ); | |
my $in_format_name = shift @ARGV; | |
my $in_format_class = "Local::Format::${in_format_name}"; | |
my $in_format = $in_format_class->new( %format_opt ); | |
my $stdin = IO::Async::Stream->new_for_stdin( | |
on_read => sub { | |
my ( $self, $buffref, $eof ) = @_; | |
my @docs = $in_format->read_buffer( $buffref, $eof ); | |
for my $doc ( @docs ) { | |
$stdout->write( $out_format->format( $doc ) ); | |
} | |
}, | |
on_read_eof => sub { $loop->stop }, | |
); | |
$loop->add( $stdin ); | |
my @result = $loop->run; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment