Skip to content

Instantly share code, notes, and snippets.

@xd009642
Created March 31, 2021 09:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xd009642/fb8400d0b7d886e82efbeeec71b885ea to your computer and use it in GitHub Desktop.
Save xd009642/fb8400d0b7d886e82efbeeec71b885ea to your computer and use it in GitHub Desktop.
use bytes::{Buf, Bytes, BytesMut};
use opentelemetry::sdk;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::trace::{Builder, Sampler};
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global::set_text_map_propagator, KeyValue};
use std::convert::TryFrom;
use std::env::var;
use std::net::SocketAddr;
use std::process::Stdio;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::process::Command;
use tokio::sync::mpsc::*;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::{Stream, StreamExt};
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
use tracing_subscriber::EnvFilter;
use tracing_subscriber::{Layer, Registry};
use warp::Filter;
use warp::{http::StatusCode, reply, Rejection, Reply};
mod probe;
mod types;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let hnd = setup_logging()?;
let request = warp::path("assess").and(warp::post());
let upload = warp::body::stream()
.and_then(move |stream| process_body(stream).instrument(info_span!("audio_upload")));
let route = request.and(upload);
let addr: SocketAddr = "0.0.0.0:8080".parse().unwrap();
warp::serve(route).run(addr).await;
Ok(())
}
async fn process_body(
mut stream: impl Stream<Item = Result<impl Buf + Send, impl std::error::Error>> + Unpin + Send,
) -> Result<impl warp::Reply, std::convert::Infallible> {
let args = &[
"-i",
"pipe:0",
"-hide_banner",
"-v",
"error",
"-vn",
"-ar",
"8000",
"-acodec",
"pcm_s16le",
"-f",
"s16le",
"-af",
"pan=1|c0=c0",
"pipe:1",
]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>();
let (audio_in, audio_out) = channel(256);
let (samples_in, samples_out) = channel::<Bytes>(256);
let audio_out = ReceiverStream::new(audio_out);
let mut samples_out = ReceiverStream::new(samples_out);
let write_fut = async move {
while let Some(Ok(mut data)) = stream.next().await {
let buf = data.copy_to_bytes(data.remaining());
audio_in.send(buf.clone()).await;
}
std::mem::drop(audio_in);
let res: Result<(), std::io::Error> = Ok(());
res
}
.instrument(info_span!("Receiving audio data"));
let sample_counter = async move {
let mut read: usize = 0;
while let Some(s) = samples_out.next().await {
read += s.len();
}
read
};
let samples_fut = get_samples_pipe(&args, audio_out, samples_in);
let (write, response, read) = tokio::join!(write_fut, samples_fut, sample_counter);
match response {
Ok(resp) => {
let resp = format!("Got {} bytes", read);
Ok(reply::with_status(resp, StatusCode::OK))
}
Err(e) => Ok(reply::with_status(
e.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
)),
}
}
#[instrument(skip(args, read, output))]
async fn get_samples_pipe<S, B>(
args: &[String],
mut read: S,
output: Sender<Bytes>,
) -> std::io::Result<()>
where
B: Buf,
S: Stream<Item = B> + Unpin,
{
debug!("Launching ffmpeg with args {:?}", args);
let mut child = Command::new("ffmpeg")
.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let mut stdin = child.stdin.take();
let write_fut = async move {
let in_handle = stdin.as_mut().ok_or_else(|| {
error!("Failed to get stdin for ffmpeg");
io::Error::new(io::ErrorKind::InvalidData, "Unable to parse audio")
})?;
while let Some(s) = read.next().await {
debug!("Starting to write {} bytes", s.remaining());
in_handle.write_all(s.chunk()).await?;
}
std::mem::drop(stdin);
let res: io::Result<()> = Ok(());
res
}
.in_current_span();
let mut stdout = child.stdout.take();
let read_fut = async move {
let out_handle = stdout.as_mut().ok_or_else(|| {
error!("Failed to get stdout for ffmpeg");
io::Error::new(io::ErrorKind::InvalidData, "Unable to parse audio")
})?;
let mut has_data = true;
const N_CHUNKS: usize = 256;
while has_data {
let mut bytes = BytesMut::with_capacity(64 * N_CHUNKS);
// ffmpeg writes out in chunks of 64 bytes. This is small and results in a lot of
// sends so lets just try and read say 256 chunks and send them
for _ in 0..N_CHUNKS {
let t = out_handle.read_buf(&mut bytes).await?;
if t == 0 {
debug!("Breaking read future");
has_data = false;
break;
}
debug!("Read {} bytes from stream", bytes.len());
}
if !bytes.is_empty() {
let bytes = bytes.freeze();
output.send(bytes).await.map_err(|_| {
io::Error::new(io::ErrorKind::BrokenPipe, "Output channel broken")
})?;
}
}
Ok::<(), io::Error>(())
}
.in_current_span();
info!("Extracting audio");
let (write, output) = tokio::join!(write_fut, read_fut);
info!("Audio extraction completed");
write.map_err(|e| {
error!("Write error={}", e);
io::Error::new(io::ErrorKind::InvalidData, "Failed to stream file")
})?;
output.map_err(|e| {
warn!("read error {}", e);
io::Error::new(
io::ErrorKind::ConnectionAborted,
format!("Failed to extract audio data {:?}", e),
)
})?;
let _ = child.wait_with_output().await;
Ok(())
}
fn setup_logging() -> Result<impl TracerProvider, Box<dyn std::error::Error>> {
let service_name = var("TRACER_SERVICE_NAME").unwrap_or_else(|_| "ffmpeg_loader".to_string());
let builder = opentelemetry_jaeger::new_pipeline()
.with_tags(vec![KeyValue::new("version", "0.1.0")])
.with_service_name(service_name);
let builder = match var("TRACER_ENDPOINT") {
Ok(s) => builder.with_agent_endpoint(s),
_ => builder,
};
let exporter = builder.init_exporter()?;
set_text_map_propagator(TraceContextPropagator::new());
let provider = Builder::default()
.with_simple_exporter(exporter)
.with_config(sdk::trace::Config {
default_sampler: Box::new(Sampler::AlwaysOn),
..Default::default()
})
.build();
let tracer = provider.get_tracer("p_project", None);
let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let filter = match var("RUST_LOG") {
Ok(_) => EnvFilter::from_default_env(),
_ => EnvFilter::new("ffmpeg_loader=info"),
};
let fmt = tracing_subscriber::fmt::Layer::default();
let subscriber = filter
.and_then(fmt)
.and_then(opentelemetry)
.with_subscriber(Registry::default());
tracing::subscriber::set_global_default(subscriber)?;
info!("Opentelemetry enabled");
Ok(provider)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment