Skip to content

Instantly share code, notes, and snippets.

@preaction
Last active August 21, 2018 22:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save preaction/925d02c9831b7758d42a43e0061d3145 to your computer and use it in GitHub Desktop.
Save preaction/925d02c9831b7758d42a43e0061d3145 to your computer and use it in GitHub Desktop.
package Local::Format;
BEGIN { $INC{'Local/Format.pm'} = __FILE__ };
sub new {
my ( $class, %opt ) = @_;
$opt{delimiter} ||= ',';
return bless \%opt, $class;
}
package Local::Format::csv;
BEGIN { $INC{'Local/Format/csv.pm'} = __FILE__ };
use parent 'Local::Format';
use feature qw( state );
use Text::CSV;
sub read_buffer {
my ( $self, $buffref, $eof ) = @_;
state $csv = Text::CSV->new( { sep_char => $self->{delimiter} } );
state $names = [];
my @docs;
while ( $$buffref =~ s/^(.*\n)// ) {
my $line = $1;
if ( !@$names ) {
$csv->parse( $line );
@$names = $csv->fields;
}
else {
my $status = $csv->parse( $line );
my @fields = $csv->fields;
my $doc = {
map {; $names->[ $_ ] => $fields[ $_ ] }
0..$#fields
};
push @docs, $doc;
}
}
return @docs;
}
sub format {
my ( $self, $doc ) = @_;
state $csv = Text::CSV->new;
state $names = [];
if ( !@$names ) {
@$names = sort keys %$doc;
}
state $wrote_header = 0;
my $str = '';
if ( !$wrote_header ) {
$csv->combine( @$names );
$str .= $csv->string . $/;
$wrote_header = 1;
}
$csv->combine( map { $doc->{ $_ } } @$names );
$str .= $csv->string . $/;
return $str;
}
package Local::Format::json;
BEGIN { $INC{'Local/Format/json.pm'} = __FILE__ };
use parent 'Local::Format';
use feature qw( state );
use JSON::PP;
sub read_buffer {
my ( $self, $buffref, $eof ) = @_;
state $json = JSON::PP->new->relaxed;
my @docs;
# Work around a bug in JSON::PP: incr_parse() only returns the
# first item, see: https://github.com/makamaka/JSON-PP/pull/7
# Adapted from IO::Async::JSONStream by Paul Evans
$json->incr_parse( $$buffref );
$$buffref = '';
PARSE_ONE: {
my $doc;
my $fail = not eval {
$doc = $json->incr_parse;
1
};
chomp( my $e = $@ );
if ( $doc ) {
push @docs, $doc;
redo PARSE_ONE;
}
elsif ( $fail ) {
# XXX: Parse failure
$json->incr_skip;
redo PARSE_ONE;
}
# else last
}
return @docs;
}
sub format {
my ( $self, $doc ) = @_;
state $json = JSON::PP->new->canonical->pretty->indent_length(3)->allow_nonref;
return $json->encode( $doc );
}
package Local::Format::yaml;
BEGIN { $INC{'Local/Format/yaml.pm'} = __FILE__ };
use parent 'Local::Format';
use YAML qw( Load Dump );
sub read_buffer {
my ( $self, $buffref, $eof ) = @_;
if ( $$buffref =~ s/(.+(?:\n---[^\n]*\n|\Z))//s ) {
my @docs = YAML::Load( $1 );
return @docs;
}
}
sub format {
my ( $self, $doc ) = @_;
return YAML::Dump( $doc );
}
package Local::Format::default;
use parent 'Local::Format';
use Module::Runtime qw( use_module );
sub new {
my ( $class, @args ) = @_;
my $format = $ENV{YERTL_FORMAT} || 'yaml';
my $format_class = "Local::Format::$format";
return use_module( $format_class )->new( @args );
}
package main;
use ETL::Yertl;
use Getopt::Long qw( GetOptions :config pass_through );
use IO::Async::Loop;
use IO::Async::Stream;
GetOptions( \my %format_opt,
'delimiter|d=s',
);
my $loop = IO::Async::Loop->new;
my $out_format = Local::Format::default->new;
my $stdout = IO::Async::Stream->new_for_stdout;
$loop->add( $stdout );
my $in_format_name = shift @ARGV;
my $in_format_class = "Local::Format::${in_format_name}";
my $in_format = $in_format_class->new( %format_opt );
my $stdin = IO::Async::Stream->new_for_stdin(
on_read => sub {
my ( $self, $buffref, $eof ) = @_;
my @docs = $in_format->read_buffer( $buffref, $eof );
for my $doc ( @docs ) {
$stdout->write( $out_format->format( $doc ) );
}
},
on_read_eof => sub { $loop->stop },
);
$loop->add( $stdin );
my @result = $loop->run;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment