Skip to content

Instantly share code, notes, and snippets.

@gfldex
Created July 14, 2020 22:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gfldex/5421e05d7ab2e3b89f38320ccc2f583d to your computer and use it in GitHub Desktop.
Save gfldex/5421e05d7ab2e3b89f38320ccc2f583d to your computer and use it in GitHub Desktop.
#! /usr/bin/env raku
# $*ERR.say: "<ERROR MESSAGE>";
for lines() -> $l {
if 10.rand < 2 {
$*ERR.say: $l;
} else {
$*OUT.say: $l;
}
}
exit 1;
#! /usr/bin/env raku
use NativeCall; sub free(Pointer) is native {*}; free(Pointer.new(1))
use v6;
subset Arrayish of Any where { !.isa('Code') && .^can(‚push‘) && .^can(‚list‘) }
subset CodeOrChannel of Any where Code | Channel;
class X::Shell::PipeeStartFailed is Exception {
has $.command;
has $.env-path;
}
class Shell::Pipe::Exitcode::Container {
has &.callback;
}
class Shell::Pipe::Path::Container {
has @.path;
}
class Shell::Command {
has $.command-path;
has $.command;
method whereis {
say %*ENV<PATH>.split(‚:‘).map(*.IO.add($.command));
}
}
class Shell::Pipe {
class BlockContainer {
has &.code;
has $.proc-in is rw;
has $.proc-out;
has $.proc-out-stdout;
method start { start {
await $.proc-out.ready;
await $.proc-in.ready with $.proc-in;
for $.proc-out-stdout.lines {
my $processed = &.code.($_) ~ "\n";
$.proc-in.write: $processed.Str.encode with $.proc-in;
}
$.proc-in.close-stdin with $.proc-in;
} }
}
constant NotYet = Mu.new but role { method defined { False } };
has @.pipees;
has @.starters; # list of Callable returning Awaitable
has $.exitcode is rw = { NotYet };
has $.name is rw = "Shell::Pipe <anon>";
has $.search-path is rw;
has &.done is rw;
has $.stderr is rw = CodeOrChannel;
has Bool $.quiet is rw; # divert STDERR away from terminal
method start {
do for @.starters.reverse -> &c { |c }
}
method sink {
note "sinking {self.gist}";
with $.stderr {
note "collecting STDERR";
for @.pipees.kv -> $index, $proc {
if $proc ~~ Proc::Async {
if $.stderr ~~ Code {
$proc.stderr.lines.tap: -> $line { $.stderr.($index, $line) };
} elsif $.stderr ~~ Channel {
$proc.stderr.lines.tap: -> $line { $.stderr.send( ($index, $line) ) };
}
}
}
} elsif $.quiet {
for @.pipees -> $proc {
if $proc ~~ Proc::Async {
$proc.stderr.tap: -> $s {};
}
}
}
my @proms = await(self.start).grep: * ~~ Proc;
# FIXME check if any Promise was broken, because a process did not start
my @exitcodes;
for @proms.reverse {
@exitcodes.push: .exitcode when Proc;
}
$.exitcode = @exitcodes but (@exitcodes.all == 0 ?? False !! True);
&.done.(self) with &.done;
@proms
}
method gist {
@.pipees.map(*.&gist-of-pipee).join(' ↦ ')
}
sub gist-of-pipee($e) {
given $e {
when Proc::Async { .path.IO.basename }
when Routine { .name }
when Block { „Block({.file.IO.basename}:{.line})“ }
when BlockContainer { „Block({.code.file.IO.basename}:{.code.line})“ }
when Arrayish { .?name // .WHAT.gist }
}
}
}
my multi infix:«|>»(Proc::Async:D $out, Proc::Async:D $in, :&done? = Code, :$stderr? = CodeOrChannel, Bool :$quiet?) {
note ‚$find |> $sort‘;
my $pipe = Shell::Pipe.new;
$pipe.done = &done with &done;
$pipe.stderr = $stderr with $stderr;
$pipe.quiet = $quiet;
$pipe.pipees.append: $out, $in;
$pipe.starters.append: -> {
$in.start, $out.start
}
$in.bind-stdin: $out.stdout;
$pipe
}
my multi infix:«|>»(Shell::Pipe:D $pipe where $pipe.pipees.tail ~~ Shell::Pipe::BlockContainer, Proc::Async:D $in) {
note ‚* |> {} |> $find‘;
my $blockish = $pipe.pipees.tail;
# FIXME workaround R#3778
$in.^attributes.grep(*.name eq '$!w')[0].set_value($in, True);
$blockish.proc-in = $in;
$pipe.pipees.push: $in;
$pipe.starters.append: -> {
$in.start
}
$pipe
}
my multi infix:«|>»(Shell::Pipe:D $pipe, Proc::Async:D $in, :&done? = Code, :$stderr? = CodeOrChannel, Bool :$quiet?) {
note ‚* |> $find‘;
$pipe.done = &done with &done;
$pipe.stderr = $stderr with $stderr;
$pipe.quiet = $quiet;
my $out = $pipe.pipees.tail;
$pipe.pipees.push: $in;
given $out {
when Proc::Async {
$in.bind-stdin: .stdout;
$pipe.starters.push: -> { $in.start };
}
when Arrayish {
fail "Arrayish not at the tail or head of a pipe.";
}
}
$pipe
}
my multi infix:«|>»(Proc::Async:D $out, Arrayish:D \a) {
note ‚$find |> @a‘;
my $pipe = Shell::Pipe.new;
$pipe.pipees.push: $out;
$pipe.pipees.push: a;
$out.stdout.lines.tap(-> \e { a.push: e });
$pipe.starters.push(-> { $out.start });
$pipe
}
my multi infix:«|>»(Shell::Pipe:D $pipe where $pipe.pipees.tail ~~ Proc::Async, Arrayish:D \a) {
note ‚* |> $find |> @a‘;
my $out = $pipe.pipees.tail;
$pipe.pipees.push: a;
$out.stdout.lines.tap(-> \e { a.push: e });
$pipe
}
my multi infix:«|>»(Shell::Pipe:D $pipe where $pipe.pipees.tail ~~ Shell::Pipe::BlockContainer, Arrayish:D \a) {
note ‚* |> {} |> @a‘;
my $cont = $pipe.pipees.tail;
my $fake-proc = class {
method write($blob) { a.push: $blob.decode }
method ready { my $p = Promise.new; $p.keep; $p }
method close-stdin { True }
}.new;
$cont.proc-in = $fake-proc;
$pipe.pipees.push: a;
$pipe
}
my multi infix:«|>»(Arrayish:D \a, Proc::Async:D $in) {
note ‚@a |> $find‘;
my $pipe = Shell::Pipe.new;
$pipe.pipees.push: a;
$pipe.pipees.push: $in;
# FIXME workaround R#3778
$in.^attributes.grep(*.name eq '$!w')[0].set_value($in, True);
$pipe.starters.push: -> {
| $in.start, start {
LEAVE try $in.close-stdin;
await $in.ready;
$in.write: „$_\n“.encode for a.list;
}
}
$pipe
}
my multi infix:«|>»(&c, Proc::Async:D $in) {
note ‚{} |> $find‘;
my $pipe = Shell::Pipe.new;
$pipe.pipees.push: &c;
$pipe.pipees.push: $in;
# FIXME workaround R#3778
$in.^attributes.grep(*.name eq '$!w')[0].set_value($in, True);
$pipe.starters.push: -> {
| $in.start, start {
LEAVE try $in.close-stdin;
await $in.ready;
$in.write: „$_\n“.encode for c
}
}
$pipe
}
my multi infix:«|>»(Shell::Pipe:D $pipe, &c) {
note ‚* |> $find |> {}‘;
my $out = $pipe.pipees.tail;
my $cont = Shell::Pipe::BlockContainer.new: :code(&c), :proc-out($$out), :proc-out-stdout($out.stdout);
$pipe.pipees.push: $out;
$pipe.pipees.push: $cont;
# $out.stdout.lines.tap(&c);
$pipe.starters.push: -> { $cont.start };
$pipe
}
my multi infix:«|>»(Proc::Async:D $out, &c) {
note ‚$find |> {}‘;
my $pipe = Shell::Pipe.new;
my $cont = Shell::Pipe::BlockContainer.new: :code(&c), :proc-out($out), :proc-out-stdout($out.stdout);
$pipe.pipees.push: $out;
$pipe.pipees.push: $cont;
$pipe.starters.push: -> { $out.start; }
$pipe.starters.push: -> { $cont.start; };
$pipe;
}
my multi infix:«|>»(Supply:D \s, Proc::Async:D $in) {
note ‚s |> $find‘;
my $pipe = Shell::Pipe.new;
$pipe.pipees.push: s;
$pipe.pipees.push: $in;
# FIXME workaround R#3778
$in.^attributes.grep(*.name eq '$!w')[0].set_value($in, True);
$pipe.starters.push: -> { start {
await $in.ready;
s.tap: -> $v { $in.write: „$v\n“.encode }, :done({ try $in.close-stdin }), :quit({ try $in.close-stdin });
} };
$pipe.starters.push: -> { $in.start };
$pipe
}
my multi infix:«|>»(Shell::Pipe:D $pipe where $pipe.pipees.tail ~~ Proc::Async, Supplier:D \s) {
note ‚* |> $find |> s‘;
my $out = $pipe.pipees.tail;
$pipe.pipees.push: s;
$out.stdout.lines.tap(-> $v {
s.emit($v);
});
$pipe
}
my multi infix:«|>»(Channel:D \c, Proc::Async:D $in, :&done? = Code) {
my $pipe = Shell::Pipe.new: :&done;
$pipe.pipees.push: c;
$pipe.pipees.push: $in;
# FIXME workaround R#3778
$in.^attributes.grep(*.name eq '$!w')[0].set_value($in, True);
$pipe.starters.push: -> { start {
for c.list -> $v {
$in.write: „$v\n“.encode;
}
$in.close-stdin;
} };
$pipe.starters.push: -> { $in.start };
$pipe
}
my multi infix:«|>»(Shell::Pipe:D $pipe where $pipe.pipees.tail ~~ Proc::Async, Channel:D \c) {
$pipe
}
my $find = Proc::Async.new('/usr/bin/find', '/tmp');
my $grep = Proc::Async.new('/bin/grep', 'a');
my $sort = Proc::Async.new('/usr/bin/sort');
my $segfaulter = Proc::Async.new('./segfaulter');
my $errorer = Proc::Async.new('./errorer');
my @a;
my @spy;
#
# sub my-little-filter { $^a ~~ s/a/b/ }
#
# my $pipe = $find \
# |> @spy \
# |> $grep \
# |> &my-little-filter \
# |> $sort \
# |> { .uc } \
# |> @a;
#
# say $pipe;
# @a = <1 b 4 a c>;
# my $p = $find |> $sort |> @a;
# say $p;
# { (‚a‘..‚z‘).roll(10) } |> $sort |> -> \e { @a.push: e };
#
# say @a;
# my $obj = class AnonClass {
# has @.a;
# method push(\e) { self.a.push: e; self }
# method list { self.a.list }
# }.new;
#
# { (‚a‘..‚z‘).roll(10) } |> $sort |> $obj;
#
# dd $obj;
#
# my $reverse = Proc::Async.new(</usr/bin/sort -r>);
#
# my $exitcodes;
# my $false = Proc::Async.new(</bin/false>);
# $obj |> $reverse |> $false |> store-exitcodes({ $exitcodes = $_ }) |> { .put }
# say so $exitcodes;
# $find |> { say $++; .uc } |> $sort |> { .lc } |> @a;
# dd @a;
# my $sup-out = ('a'..'z').pick(30).Supply;
# my $sup-in = Supplier.new;
# $sup-in.Supply.tap: { .say };
# $sup-out |> $sort |> $sup-in;
# my $c = Channel.new;
# Promise.in(1).then: {
# say ‚sending‘;
# for ('a'..'z').pick(30) {
# $c.send: .Str;
# }
# say 'closing';
# $c.close;
# };
#
# my @stderr;
# $c |> $grep |> $sort;
#
# $errorer |> $sort :done({ say .exitcode if .exitcode }) :stderr(-> $index, $line { say „ERR stream $index: $line“});
# my @err;
# my $c = Channel.new;
# $c.Supply.tap: -> ($index, $line) { @err[$index].push: $line; };
#
# $find |> $errorer |> $sort :stderr($c);
#
# $c.close;
#
# dd @err;
$find |> $errorer |> $sort :quiet;
Shell::Command.new(:command<find>).whereis;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment