Created
November 23, 2022 15:57
-
-
Save capezotte/1fed33feec1d5097322f50768e49f9d0 to your computer and use it in GitHub Desktop.
poor man's xargs (ft. skalibs)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#define SUBGETOPT_SHORT | |
#include <unistd.h> | |
#include <errno.h> | |
#include <string.h> | |
#include <poll.h> | |
#include <stdlib.h> | |
#include <spawn.h> | |
#include <sys/wait.h> | |
#include <skalibs/types.h> | |
#include <skalibs/bytestr.h> | |
#include <skalibs/sgetopt.h> | |
#include <skalibs/selfpipe.h> | |
#include <skalibs/posixplz.h> | |
#include <skalibs/buffer.h> | |
#include <skalibs/bufalloc.h> | |
#include <skalibs/djbunix.h> | |
#include <skalibs/strerr2.h> | |
#include <skalibs/genalloc.h> | |
#include <skalibs/skamisc.h> | |
/* stack overflow */ | |
static inline unsigned int nextpow2(unsigned int x) { | |
x--; | |
x |= x >> 1; | |
x |= x >> 2; | |
x |= x >> 4; | |
x |= x >> 8; | |
x |= x >> 16; | |
x++; | |
return x; | |
} | |
static int sa_eql(stralloc *a1, const char *x) { | |
size_t xlen = strlen(x) + 1; | |
if (a1->len != xlen) return 0; | |
return !!byte_diff(a1->s, xlen, x); | |
} | |
#define NN 4096 | |
static void sa_stdout_delim(stralloc *x, char delim) { | |
for (;;) { | |
if (x->len == 0) break; | |
size_t len = x->len > NN ? NN : x->len, end; | |
if (delim && (end = byte_rchr(x->s, len, delim)) != len) { | |
len = end; | |
} | |
ssize_t written = buffer_put(buffer_1, x->s, len); | |
if (written < 0) { | |
strerr_warnwu1sys("write child stdout"); | |
break; | |
} else if (written == 0) { | |
break; | |
} | |
byte_copy(x->s, x->len - written, x->s + written); | |
x->len -= written; | |
} | |
} | |
#define sa_stdout(x) sa_stdout_delim(x, 0) | |
static ssize_t sa_read(int fd, stralloc *x, int lbuf) { | |
/* flush if line mixing is allowed */ | |
if (x->len > NN && lbuf) { | |
sa_stdout_delim(x, '\n'); | |
} | |
stralloc_readyplus(x, NN); | |
ssize_t read = fd_read(fd, x->s + x->len, NN); | |
if (read < 0) { | |
strerr_warnwu1sys("put child output into buffer"); | |
return read; | |
} else { | |
x->len += read; | |
return read; | |
} | |
} | |
#undef NN | |
struct state { | |
struct pollfd *pfds; | |
pid_t *pids; | |
stralloc *buffers; | |
int npids, line_buffered, keep_order, keep_stdin; | |
unsigned int spawned_count, max_count, done_count, live_count; | |
}; | |
enum { | |
SLOT_PID = 1 << 1, | |
SLOT_IN = 1 << 2, | |
SLOT_HUP = 1 << 3, | |
}; | |
static int free_slots(struct state *o) { | |
for (;;) { | |
int r, wstat, chk = 0, dead_pipes = 0; | |
for (int i = 0; i < o->npids; i++) { | |
/* kids live */ | |
if (o->pids[i] > 0) chk |= SLOT_PID; | |
/* has pending data */ | |
if (o->pfds[i].fd > -1) chk |= SLOT_IN; | |
/* slot needs cleanup */ | |
if (o->pfds[i].fd < -1) chk |= SLOT_HUP; | |
} | |
if (chk == 0) | |
return 0; /* what am I waiting for? */ | |
/* chk = 4 means no new SIGCHLDs will come (no live kids) | |
* do not stall here then if so */ | |
r = chk == SLOT_HUP ? o->npids : poll(o->pfds, o->npids + 1, -1); | |
if (r < 0) break; | |
/* update outputs */ | |
for (nfds_t i = 0; i < o->npids; i++) { | |
if (o->pfds[i].revents & POLLIN) { | |
/* the output eat */ | |
if (sa_read(o->pfds[i].fd, &o->buffers[i], o->line_buffered) < 0) | |
return -1; | |
} else if (o->pfds[i].fd < -1 || o->pfds[i].revents & POLLHUP) { | |
/* disable for less syscalls */ | |
if (o->pfds[i].fd > 0) | |
o->pfds[i].fd *= -1; | |
if (o->pids[i] > 0) | |
continue; /* still alive */ | |
if (o->keep_order && (o->done_count & o->npids - 1) != i) | |
continue; /* wrong order */ | |
/* ded and pollhup - cleanup thyme */ | |
sa_stdout(&o->buffers[i]); | |
o->buffers[i].len = 0; | |
fd_close(abs(o->pfds[i].fd)); | |
o->pfds[i].fd = -1; | |
o->pids[i] = 0; /* return slot */ | |
dead_pipes++; | |
o->live_count--; | |
o->done_count++; | |
} | |
} | |
if (dead_pipes) | |
return dead_pipes; | |
if (chk == SLOT_HUP) | |
return 0; /* we'll just stall in the selfpipe reading */ | |
/* self pipe time */ | |
if (o->pfds[o->npids].revents & POLLIN) { | |
int sig = selfpipe_read(); | |
if (sig == -1) { | |
return -1; | |
} | |
if (sig == SIGCHLD) { | |
int i; | |
while ((i = wait_pids_nohang(o->pids, o->npids, &wstat)) > 0) { | |
i--; | |
o->pids[i] *= -1; | |
/* delay cleanup until we get pollhup */ | |
} | |
} | |
} | |
} | |
strerr_diefu1sys(111, "wait for children's outputs"); | |
} | |
static pid_t occupy_slot(int sl, char *const *argv, struct state *o) { | |
int pps[2]; | |
if (pipe(pps)) goto err0; | |
pid_t child = 0; | |
posix_spawn_file_actions_t fa; | |
if ((errno = posix_spawn_file_actions_init(&fa))) goto err1; | |
if (!o->keep_stdin && (errno = posix_spawn_file_actions_addopen(&fa, 0, "/dev/null", O_RDONLY, 0))) goto err2; | |
if ((errno = posix_spawn_file_actions_adddup2(&fa, pps[1], 1))) goto err2; | |
if ((errno = posix_spawn_file_actions_addclose(&fa, pps[1]))) goto err2; | |
if ((errno = posix_spawnp(&child, argv[0], &fa, NULL, argv, environ))) goto err2; | |
posix_spawn_file_actions_destroy(&fa); | |
fd_close(pps[1]); | |
o->pids[sl] = child; | |
o->pfds[sl].fd = pps[0]; | |
o->pfds[sl].events = POLLIN; | |
o->spawned_count++; | |
o->live_count++; | |
return child; | |
err2: | |
posix_spawn_file_actions_destroy(&fa); | |
err1: | |
fd_close(pps[0]); | |
fd_close(pps[1]); | |
err0: | |
return 0; | |
} | |
static int next_slot(struct state *o) { | |
int pos; | |
while (o->live_count >= o->max_count) | |
if (free_slots(o) < 0) | |
strerr_warnwu1sys("ensure right amount of job slots"); | |
if (o->keep_order) { | |
pos = o->spawned_count & o->npids - 1; | |
while (o->pids[pos] != 0) | |
if (free_slots(o) < 0) { | |
strerr_warnwu1sys("wait for ordered job slot"); | |
continue; | |
} | |
} else { | |
pos = 0; | |
while (o->pids[pos] != 0) { | |
pos++; | |
pos &= o->npids - 1; | |
if (pos == 0) | |
if (free_slots(o) < 0) | |
strerr_warnwu1sys("wait for any job slot"); | |
} | |
} | |
return pos; | |
} | |
int main(int argc, char **argv) { | |
PROG = "zorgs"; | |
int r; | |
struct state opts = { 0 }; | |
opts.max_count = 2; | |
char linebuf[BUFFER_INSIZE]; | |
buffer linesrc = BUFFER_INIT(fd_readv, 0, linebuf, BUFFER_INSIZE); | |
unsigned int max_lines = -1, delim = '\n'; | |
const char *eof_str = NULL, *USAGE = "-k -d [delimiter] -P [max parallel] -L [max lines] -E [EOF string] prog {}\n-j is an alias for -P; -0 is an alias for -d ''"; | |
#define DIEUSAGE strerr_dieusage(100, USAGE) | |
while ((r = lgetopt(argc, (const char* const*)argv, "kP:j:L:d:0E:la:")) != -1) { | |
switch (r) { | |
case 'P': | |
case 'j': | |
if (!uint0_scan(optarg, &opts.max_count) || opts.max_count == 0) DIEUSAGE; | |
break; | |
case '0': | |
optarg = ""; | |
/* fallthrough */ | |
case 'd': | |
delim = *optarg; | |
break; | |
case 'L': | |
if (!uint0_scan(optarg, &max_lines)) DIEUSAGE; | |
break; | |
case 'E': | |
if (strlen(optarg)) | |
eof_str = optarg; | |
else DIEUSAGE; | |
break; | |
case 'l': | |
opts.line_buffered = 1; | |
break; | |
case 'a': | |
opts.keep_stdin = 1; | |
linesrc.fd = open_read(optarg); | |
if (linesrc.fd < 0) | |
strerr_diefu2sys(111, "unable to open", optarg); | |
break; | |
case 'k': | |
opts.keep_order = 1; | |
break; | |
default: | |
DIEUSAGE; | |
} | |
} | |
argc -= optind; argv += optind; | |
if (argc == 0) DIEUSAGE; | |
/* Struct of VLAs moment */ | |
const unsigned int max_child_bit = nextpow2(opts.max_count); | |
if (max_child_bit == 0) return -1; | |
pid_t children[max_child_bit]; | |
struct pollfd children_outs[max_child_bit + 1]; /* +1 for selfpipe */ | |
stralloc children_buffers[max_child_bit]; | |
/* can't x[i] = { ... } in detroit */ | |
memset(children, 0, max_child_bit * sizeof(pid_t)); | |
memset(children_outs, 0, max_child_bit * sizeof(struct pollfd)); | |
memset(children_buffers, 0, max_child_bit * sizeof(stralloc)); | |
for (size_t i = 0; i < max_child_bit; i++) { | |
children_outs[i].fd = -1; | |
children_outs[i].events = POLLIN; | |
} | |
opts.pfds = children_outs; | |
opts.pids = children; | |
opts.buffers = children_buffers; | |
opts.npids = max_child_bit; | |
int sfd = selfpipe_init(); | |
if (sfd == -1) | |
strerr_diefu1sys(111, "create selfpipe"); | |
selfpipe_trap(SIGCHLD); | |
children_outs[max_child_bit].fd = sfd; | |
children_outs[max_child_bit].events = POLLIN; | |
char *spawn_args[argc+1]; | |
ssize_t replace_at = -1; | |
byte_copy(spawn_args, sizeof(char*)*(argc+1), argv); | |
for (size_t i = 0; i < argc; i++) { | |
if (!str_diff("{}", spawn_args[i])) { | |
if (replace_at == -1) { | |
replace_at = i; | |
} else { | |
strerr_dief1x(100, "{} specified twice in arguments"); | |
} | |
} | |
} | |
if (replace_at == -1) | |
strerr_dief1x(100, "no {} in arguments"); | |
do { | |
r = skagetln(&linesrc, &satmp, delim); | |
if (r < 0) { | |
if (errno == EPIPE) { | |
/* got delimiter'd, ig? | |
* TODO WRITE THIS DOCUMENTATION PAGE AAAA */ | |
r = 1; | |
} else { | |
strerr_die1sys(111, "read line"); | |
} | |
} | |
if (r > 0) { | |
/* replace newline with NUL */ | |
satmp.len--; | |
if (!stralloc_0(&satmp)) | |
strerr_diefu1sys(100, "NUL-terminate line"); | |
} else { | |
break; /* EOF */ | |
} | |
if (eof_str && sa_eql(&satmp, eof_str)) { | |
max_lines = 1; | |
break; | |
} | |
spawn_args[replace_at] = satmp.s; | |
int pos = next_slot(&opts); | |
if (!occupy_slot(pos, spawn_args, &opts)) | |
strerr_warnwu2sys("spawn child for argument ", satmp.s); | |
/* next line thyme */ | |
satmp.len = 0; | |
} while (--max_lines); | |
/* wait remaining children */ | |
if (opts.live_count) while (free_slots(&opts) > 0); | |
buffer_flush(buffer_1); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment