Skip to content

Instantly share code, notes, and snippets.

@rksm
Created March 9, 2024 12:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rksm/e6739610d9aba825ba395f8fe2fb5913 to your computer and use it in GitHub Desktop.
Save rksm/e6739610d9aba825ba395f8fe2fb5913 to your computer and use it in GitHub Desktop.
streaming rust command with tokio
async fn run_speechpipeline_command(
job: Job,
) -> Result<(
mpsc::Receiver<Either<String, Result<Response>>>,
tokio::sync::oneshot::Sender<()>,
)> {
let payload = serde_json::to_string(&job)?;
let payload = shellwords::escape(&payload);
let cmd = format!(
r#"
source /media/robert/LINUX_DATA/python-venv/podwriter-speechpipeline/bin/activate;
python /home/robert/projects/biz/podwriter/transcription-pipeline/main.py {payload}
"#
);
println!("cmd: {}", cmd);
let mut proc = Command::new("bash")
.arg("-c")
.arg(cmd)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdout = proc.stdout.take().expect("stdout");
let stderr = proc.stderr.take().expect("stderr");
let (tx, rx) = mpsc::channel(256);
let (exit_tx, mut exit_rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
let mut stdout = tokio::io::BufReader::new(stdout).lines();
let mut stderr = tokio::io::BufReader::new(stderr).lines();
loop {
let (msg, cont) = tokio::select! {
status = proc.wait() => {
let err = eyre!("process exited unexpectedly with status: {status:?}");
error!("{err}");
(Some(either::Either::Right(Err(err))), false)
}
exit = &mut exit_rx => {
match exit {
Ok(()) => {
info!("received signal to kill process");
}
Err(_) => {
warn!("process/exit channel closed before process completed, killing process");
}
}
if let Err(err) = proc.kill().await {
error!("failed to kill process: {err}");
}
(None, false)
}
line = stdout.next_line() => {
if let Ok(Some(line)) = line {
if let Ok(output) = serde_json::from_str(&line) {
(Some(either::Either::Right(Ok(output))), true)
} else {
(Some(either::Either::Left(line)), true)
}
} else {
warn!("stdout closed");
(None, false)
}
}
line = stderr.next_line() => {
if let Ok(Some(line)) = line {
(Some(either::Either::Left(line)), true)
} else {
warn!("stderr closed");
(None, false)
}
}
};
if let Some(msg) = msg {
if (tx.send(msg).await).is_err() {
warn!("process output channel closed, exiting");
break;
}
}
if !cont {
break;
}
}
debug!("stop reading from process");
});
Ok((rx, exit_tx))
}
#[instrument(level = "debug", skip(input), fields(id = id.to_string(), task = input.name()))]
async fn spechpipeline_command_stream(
id: impl ToString,
input: Task,
) -> Result<impl Stream<Item = Either<SpeechpipelineUpdate, Response>>> {
let id = id.to_string();
let task = input.name();
info!(%id, %task, "starting task");
let (mut rx, exit_tx) = run_speechpipeline_command(Job {
id: id.clone(),
input,
})
.await?;
let mut record = false;
let mut recorded_output = String::new();
let stream = async_stream::stream! {
let _exit_tx = exit_tx; // keep the sender alive, when the stream is cancelled, the process will be killed
while let Some(item) = rx.recv().await {
match item {
either::Either::Left(line) if record || line.to_lowercase().contains("error") => {
record = true;
recorded_output.push_str(&line);
recorded_output.push('\n');
debug!("{line}");
}
either::Either::Left(line) => {
debug!("{line}");
}
either::Either::Right(Ok(response)) => {
trace!("got response");
yield Either::Right(response);
return;
}
either::Either::Right(Err(err)) => {
yield Either::Left(SpeechpipelineUpdate::Failed(err.into()));
}
}
}
warn!("unexpected end of stream: {recorded_output}");
yield Either::Left(SpeechpipelineUpdate::Failed(eyre!("unexpected end of stream: {recorded_output}").into()));
};
Ok(stream)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment