Created
March 31, 2021 09:46
-
-
Save xd009642/fb8400d0b7d886e82efbeeec71b885ea to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use bytes::{Buf, Bytes, BytesMut}; | |
use opentelemetry::sdk; | |
use opentelemetry::sdk::propagation::TraceContextPropagator; | |
use opentelemetry::sdk::trace::{Builder, Sampler}; | |
use opentelemetry::trace::TracerProvider; | |
use opentelemetry::{global::set_text_map_propagator, KeyValue}; | |
use std::convert::TryFrom; | |
use std::env::var; | |
use std::net::SocketAddr; | |
use std::process::Stdio; | |
use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; | |
use tokio::process::Command; | |
use tokio::sync::mpsc::*; | |
use tokio_stream::wrappers::ReceiverStream; | |
use tokio_stream::{Stream, StreamExt}; | |
use tracing::{debug, error, info, info_span, instrument, warn, Instrument}; | |
use tracing_subscriber::EnvFilter; | |
use tracing_subscriber::{Layer, Registry}; | |
use warp::Filter; | |
use warp::{http::StatusCode, reply, Rejection, Reply}; | |
mod probe; | |
mod types; | |
#[tokio::main] | |
async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
let hnd = setup_logging()?; | |
let request = warp::path("assess").and(warp::post()); | |
let upload = warp::body::stream() | |
.and_then(move |stream| process_body(stream).instrument(info_span!("audio_upload"))); | |
let route = request.and(upload); | |
let addr: SocketAddr = "0.0.0.0:8080".parse().unwrap(); | |
warp::serve(route).run(addr).await; | |
Ok(()) | |
} | |
async fn process_body( | |
mut stream: impl Stream<Item = Result<impl Buf + Send, impl std::error::Error>> + Unpin + Send, | |
) -> Result<impl warp::Reply, std::convert::Infallible> { | |
let args = &[ | |
"-i", | |
"pipe:0", | |
"-hide_banner", | |
"-v", | |
"error", | |
"-vn", | |
"-ar", | |
"8000", | |
"-acodec", | |
"pcm_s16le", | |
"-f", | |
"s16le", | |
"-af", | |
"pan=1|c0=c0", | |
"pipe:1", | |
] | |
.iter() | |
.map(|x| x.to_string()) | |
.collect::<Vec<String>>(); | |
let (audio_in, audio_out) = channel(256); | |
let (samples_in, samples_out) = channel::<Bytes>(256); | |
let audio_out = ReceiverStream::new(audio_out); | |
let mut samples_out = ReceiverStream::new(samples_out); | |
let write_fut = async move { | |
while let Some(Ok(mut data)) = stream.next().await { | |
let buf = data.copy_to_bytes(data.remaining()); | |
audio_in.send(buf.clone()).await; | |
} | |
std::mem::drop(audio_in); | |
let res: Result<(), std::io::Error> = Ok(()); | |
res | |
} | |
.instrument(info_span!("Receiving audio data")); | |
let sample_counter = async move { | |
let mut read: usize = 0; | |
while let Some(s) = samples_out.next().await { | |
read += s.len(); | |
} | |
read | |
}; | |
let samples_fut = get_samples_pipe(&args, audio_out, samples_in); | |
let (write, response, read) = tokio::join!(write_fut, samples_fut, sample_counter); | |
match response { | |
Ok(resp) => { | |
let resp = format!("Got {} bytes", read); | |
Ok(reply::with_status(resp, StatusCode::OK)) | |
} | |
Err(e) => Ok(reply::with_status( | |
e.to_string(), | |
StatusCode::INTERNAL_SERVER_ERROR, | |
)), | |
} | |
} | |
#[instrument(skip(args, read, output))] | |
async fn get_samples_pipe<S, B>( | |
args: &[String], | |
mut read: S, | |
output: Sender<Bytes>, | |
) -> std::io::Result<()> | |
where | |
B: Buf, | |
S: Stream<Item = B> + Unpin, | |
{ | |
debug!("Launching ffmpeg with args {:?}", args); | |
let mut child = Command::new("ffmpeg") | |
.args(args) | |
.stdin(Stdio::piped()) | |
.stdout(Stdio::piped()) | |
.spawn()?; | |
let mut stdin = child.stdin.take(); | |
let write_fut = async move { | |
let in_handle = stdin.as_mut().ok_or_else(|| { | |
error!("Failed to get stdin for ffmpeg"); | |
io::Error::new(io::ErrorKind::InvalidData, "Unable to parse audio") | |
})?; | |
while let Some(s) = read.next().await { | |
debug!("Starting to write {} bytes", s.remaining()); | |
in_handle.write_all(s.chunk()).await?; | |
} | |
std::mem::drop(stdin); | |
let res: io::Result<()> = Ok(()); | |
res | |
} | |
.in_current_span(); | |
let mut stdout = child.stdout.take(); | |
let read_fut = async move { | |
let out_handle = stdout.as_mut().ok_or_else(|| { | |
error!("Failed to get stdout for ffmpeg"); | |
io::Error::new(io::ErrorKind::InvalidData, "Unable to parse audio") | |
})?; | |
let mut has_data = true; | |
const N_CHUNKS: usize = 256; | |
while has_data { | |
let mut bytes = BytesMut::with_capacity(64 * N_CHUNKS); | |
// ffmpeg writes out in chunks of 64 bytes. This is small and results in a lot of | |
// sends so lets just try and read say 256 chunks and send them | |
for _ in 0..N_CHUNKS { | |
let t = out_handle.read_buf(&mut bytes).await?; | |
if t == 0 { | |
debug!("Breaking read future"); | |
has_data = false; | |
break; | |
} | |
debug!("Read {} bytes from stream", bytes.len()); | |
} | |
if !bytes.is_empty() { | |
let bytes = bytes.freeze(); | |
output.send(bytes).await.map_err(|_| { | |
io::Error::new(io::ErrorKind::BrokenPipe, "Output channel broken") | |
})?; | |
} | |
} | |
Ok::<(), io::Error>(()) | |
} | |
.in_current_span(); | |
info!("Extracting audio"); | |
let (write, output) = tokio::join!(write_fut, read_fut); | |
info!("Audio extraction completed"); | |
write.map_err(|e| { | |
error!("Write error={}", e); | |
io::Error::new(io::ErrorKind::InvalidData, "Failed to stream file") | |
})?; | |
output.map_err(|e| { | |
warn!("read error {}", e); | |
io::Error::new( | |
io::ErrorKind::ConnectionAborted, | |
format!("Failed to extract audio data {:?}", e), | |
) | |
})?; | |
let _ = child.wait_with_output().await; | |
Ok(()) | |
} | |
fn setup_logging() -> Result<impl TracerProvider, Box<dyn std::error::Error>> { | |
let service_name = var("TRACER_SERVICE_NAME").unwrap_or_else(|_| "ffmpeg_loader".to_string()); | |
let builder = opentelemetry_jaeger::new_pipeline() | |
.with_tags(vec![KeyValue::new("version", "0.1.0")]) | |
.with_service_name(service_name); | |
let builder = match var("TRACER_ENDPOINT") { | |
Ok(s) => builder.with_agent_endpoint(s), | |
_ => builder, | |
}; | |
let exporter = builder.init_exporter()?; | |
set_text_map_propagator(TraceContextPropagator::new()); | |
let provider = Builder::default() | |
.with_simple_exporter(exporter) | |
.with_config(sdk::trace::Config { | |
default_sampler: Box::new(Sampler::AlwaysOn), | |
..Default::default() | |
}) | |
.build(); | |
let tracer = provider.get_tracer("p_project", None); | |
let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); | |
let filter = match var("RUST_LOG") { | |
Ok(_) => EnvFilter::from_default_env(), | |
_ => EnvFilter::new("ffmpeg_loader=info"), | |
}; | |
let fmt = tracing_subscriber::fmt::Layer::default(); | |
let subscriber = filter | |
.and_then(fmt) | |
.and_then(opentelemetry) | |
.with_subscriber(Registry::default()); | |
tracing::subscriber::set_global_default(subscriber)?; | |
info!("Opentelemetry enabled"); | |
Ok(provider) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment