Skip to content

Instantly share code, notes, and snippets.

@capezotte
Created November 23, 2022 15:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save capezotte/1fed33feec1d5097322f50768e49f9d0 to your computer and use it in GitHub Desktop.
Save capezotte/1fed33feec1d5097322f50768e49f9d0 to your computer and use it in GitHub Desktop.
poor man's xargs (ft. skalibs)
#define SUBGETOPT_SHORT
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <poll.h>
#include <stdlib.h>
#include <spawn.h>
#include <sys/wait.h>
#include <skalibs/types.h>
#include <skalibs/bytestr.h>
#include <skalibs/sgetopt.h>
#include <skalibs/selfpipe.h>
#include <skalibs/posixplz.h>
#include <skalibs/buffer.h>
#include <skalibs/bufalloc.h>
#include <skalibs/djbunix.h>
#include <skalibs/strerr2.h>
#include <skalibs/genalloc.h>
#include <skalibs/skamisc.h>
/* stack overflow */
static inline unsigned int nextpow2(unsigned int x) {
x--;
x |= x >> 1;
x |= x >> 2;
x |= x >> 4;
x |= x >> 8;
x |= x >> 16;
x++;
return x;
}
static int sa_eql(stralloc *a1, const char *x) {
size_t xlen = strlen(x) + 1;
if (a1->len != xlen) return 0;
return !!byte_diff(a1->s, xlen, x);
}
#define NN 4096
static void sa_stdout_delim(stralloc *x, char delim) {
for (;;) {
if (x->len == 0) break;
size_t len = x->len > NN ? NN : x->len, end;
if (delim && (end = byte_rchr(x->s, len, delim)) != len) {
len = end;
}
ssize_t written = buffer_put(buffer_1, x->s, len);
if (written < 0) {
strerr_warnwu1sys("write child stdout");
break;
} else if (written == 0) {
break;
}
byte_copy(x->s, x->len - written, x->s + written);
x->len -= written;
}
}
#define sa_stdout(x) sa_stdout_delim(x, 0)
static ssize_t sa_read(int fd, stralloc *x, int lbuf) {
/* flush if line mixing is allowed */
if (x->len > NN && lbuf) {
sa_stdout_delim(x, '\n');
}
stralloc_readyplus(x, NN);
ssize_t read = fd_read(fd, x->s + x->len, NN);
if (read < 0) {
strerr_warnwu1sys("put child output into buffer");
return read;
} else {
x->len += read;
return read;
}
}
#undef NN
struct state {
struct pollfd *pfds;
pid_t *pids;
stralloc *buffers;
int npids, line_buffered, keep_order, keep_stdin;
unsigned int spawned_count, max_count, done_count, live_count;
};
enum {
SLOT_PID = 1 << 1,
SLOT_IN = 1 << 2,
SLOT_HUP = 1 << 3,
};
static int free_slots(struct state *o) {
for (;;) {
int r, wstat, chk = 0, dead_pipes = 0;
for (int i = 0; i < o->npids; i++) {
/* kids live */
if (o->pids[i] > 0) chk |= SLOT_PID;
/* has pending data */
if (o->pfds[i].fd > -1) chk |= SLOT_IN;
/* slot needs cleanup */
if (o->pfds[i].fd < -1) chk |= SLOT_HUP;
}
if (chk == 0)
return 0; /* what am I waiting for? */
/* chk = 4 means no new SIGCHLDs will come (no live kids)
* do not stall here then if so */
r = chk == SLOT_HUP ? o->npids : poll(o->pfds, o->npids + 1, -1);
if (r < 0) break;
/* update outputs */
for (nfds_t i = 0; i < o->npids; i++) {
if (o->pfds[i].revents & POLLIN) {
/* the output eat */
if (sa_read(o->pfds[i].fd, &o->buffers[i], o->line_buffered) < 0)
return -1;
} else if (o->pfds[i].fd < -1 || o->pfds[i].revents & POLLHUP) {
/* disable for less syscalls */
if (o->pfds[i].fd > 0)
o->pfds[i].fd *= -1;
if (o->pids[i] > 0)
continue; /* still alive */
if (o->keep_order && (o->done_count & o->npids - 1) != i)
continue; /* wrong order */
/* ded and pollhup - cleanup thyme */
sa_stdout(&o->buffers[i]);
o->buffers[i].len = 0;
fd_close(abs(o->pfds[i].fd));
o->pfds[i].fd = -1;
o->pids[i] = 0; /* return slot */
dead_pipes++;
o->live_count--;
o->done_count++;
}
}
if (dead_pipes)
return dead_pipes;
if (chk == SLOT_HUP)
return 0; /* we'll just stall in the selfpipe reading */
/* self pipe time */
if (o->pfds[o->npids].revents & POLLIN) {
int sig = selfpipe_read();
if (sig == -1) {
return -1;
}
if (sig == SIGCHLD) {
int i;
while ((i = wait_pids_nohang(o->pids, o->npids, &wstat)) > 0) {
i--;
o->pids[i] *= -1;
/* delay cleanup until we get pollhup */
}
}
}
}
strerr_diefu1sys(111, "wait for children's outputs");
}
static pid_t occupy_slot(int sl, char *const *argv, struct state *o) {
int pps[2];
if (pipe(pps)) goto err0;
pid_t child = 0;
posix_spawn_file_actions_t fa;
if ((errno = posix_spawn_file_actions_init(&fa))) goto err1;
if (!o->keep_stdin && (errno = posix_spawn_file_actions_addopen(&fa, 0, "/dev/null", O_RDONLY, 0))) goto err2;
if ((errno = posix_spawn_file_actions_adddup2(&fa, pps[1], 1))) goto err2;
if ((errno = posix_spawn_file_actions_addclose(&fa, pps[1]))) goto err2;
if ((errno = posix_spawnp(&child, argv[0], &fa, NULL, argv, environ))) goto err2;
posix_spawn_file_actions_destroy(&fa);
fd_close(pps[1]);
o->pids[sl] = child;
o->pfds[sl].fd = pps[0];
o->pfds[sl].events = POLLIN;
o->spawned_count++;
o->live_count++;
return child;
err2:
posix_spawn_file_actions_destroy(&fa);
err1:
fd_close(pps[0]);
fd_close(pps[1]);
err0:
return 0;
}
static int next_slot(struct state *o) {
int pos;
while (o->live_count >= o->max_count)
if (free_slots(o) < 0)
strerr_warnwu1sys("ensure right amount of job slots");
if (o->keep_order) {
pos = o->spawned_count & o->npids - 1;
while (o->pids[pos] != 0)
if (free_slots(o) < 0) {
strerr_warnwu1sys("wait for ordered job slot");
continue;
}
} else {
pos = 0;
while (o->pids[pos] != 0) {
pos++;
pos &= o->npids - 1;
if (pos == 0)
if (free_slots(o) < 0)
strerr_warnwu1sys("wait for any job slot");
}
}
return pos;
}
int main(int argc, char **argv) {
PROG = "zorgs";
int r;
struct state opts = { 0 };
opts.max_count = 2;
char linebuf[BUFFER_INSIZE];
buffer linesrc = BUFFER_INIT(fd_readv, 0, linebuf, BUFFER_INSIZE);
unsigned int max_lines = -1, delim = '\n';
const char *eof_str = NULL, *USAGE = "-k -d [delimiter] -P [max parallel] -L [max lines] -E [EOF string] prog {}\n-j is an alias for -P; -0 is an alias for -d ''";
#define DIEUSAGE strerr_dieusage(100, USAGE)
while ((r = lgetopt(argc, (const char* const*)argv, "kP:j:L:d:0E:la:")) != -1) {
switch (r) {
case 'P':
case 'j':
if (!uint0_scan(optarg, &opts.max_count) || opts.max_count == 0) DIEUSAGE;
break;
case '0':
optarg = "";
/* fallthrough */
case 'd':
delim = *optarg;
break;
case 'L':
if (!uint0_scan(optarg, &max_lines)) DIEUSAGE;
break;
case 'E':
if (strlen(optarg))
eof_str = optarg;
else DIEUSAGE;
break;
case 'l':
opts.line_buffered = 1;
break;
case 'a':
opts.keep_stdin = 1;
linesrc.fd = open_read(optarg);
if (linesrc.fd < 0)
strerr_diefu2sys(111, "unable to open", optarg);
break;
case 'k':
opts.keep_order = 1;
break;
default:
DIEUSAGE;
}
}
argc -= optind; argv += optind;
if (argc == 0) DIEUSAGE;
/* Struct of VLAs moment */
const unsigned int max_child_bit = nextpow2(opts.max_count);
if (max_child_bit == 0) return -1;
pid_t children[max_child_bit];
struct pollfd children_outs[max_child_bit + 1]; /* +1 for selfpipe */
stralloc children_buffers[max_child_bit];
/* can't x[i] = { ... } in detroit */
memset(children, 0, max_child_bit * sizeof(pid_t));
memset(children_outs, 0, max_child_bit * sizeof(struct pollfd));
memset(children_buffers, 0, max_child_bit * sizeof(stralloc));
for (size_t i = 0; i < max_child_bit; i++) {
children_outs[i].fd = -1;
children_outs[i].events = POLLIN;
}
opts.pfds = children_outs;
opts.pids = children;
opts.buffers = children_buffers;
opts.npids = max_child_bit;
int sfd = selfpipe_init();
if (sfd == -1)
strerr_diefu1sys(111, "create selfpipe");
selfpipe_trap(SIGCHLD);
children_outs[max_child_bit].fd = sfd;
children_outs[max_child_bit].events = POLLIN;
char *spawn_args[argc+1];
ssize_t replace_at = -1;
byte_copy(spawn_args, sizeof(char*)*(argc+1), argv);
for (size_t i = 0; i < argc; i++) {
if (!str_diff("{}", spawn_args[i])) {
if (replace_at == -1) {
replace_at = i;
} else {
strerr_dief1x(100, "{} specified twice in arguments");
}
}
}
if (replace_at == -1)
strerr_dief1x(100, "no {} in arguments");
do {
r = skagetln(&linesrc, &satmp, delim);
if (r < 0) {
if (errno == EPIPE) {
/* got delimiter'd, ig?
* TODO WRITE THIS DOCUMENTATION PAGE AAAA */
r = 1;
} else {
strerr_die1sys(111, "read line");
}
}
if (r > 0) {
/* replace newline with NUL */
satmp.len--;
if (!stralloc_0(&satmp))
strerr_diefu1sys(100, "NUL-terminate line");
} else {
break; /* EOF */
}
if (eof_str && sa_eql(&satmp, eof_str)) {
max_lines = 1;
break;
}
spawn_args[replace_at] = satmp.s;
int pos = next_slot(&opts);
if (!occupy_slot(pos, spawn_args, &opts))
strerr_warnwu2sys("spawn child for argument ", satmp.s);
/* next line thyme */
satmp.len = 0;
} while (--max_lines);
/* wait remaining children */
if (opts.live_count) while (free_slots(&opts) > 0);
buffer_flush(buffer_1);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment