Skip to content

Instantly share code, notes, and snippets.

@nickwesselman
Created April 10, 2021 21:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nickwesselman/3176835929ee4fc5c80bafab49cf507c to your computer and use it in GitHub Desktop.
Save nickwesselman/3176835929ee4fc5c80bafab49cf507c to your computer and use it in GitHub Desktop.
Read from a multiplexed stream provided by Docker.DotNet for container logs. In parallel, read from the multiplexed streams using a StreamReader and write to the gRPC response. Currently this isn't working -- the gRPC call never returns.
public override async Task StreamContainerLogs(StreamContainerLogsRequest request, IServerStreamWriter<StreamContainerLogsResponse> responseStream, ServerCallContext context)
{
const int bufferSize = 81920;
var streamTask = DockerClient.Containers.GetContainerLogsAsync(request.Id, false, new ContainerLogsParameters
{
Follow = true,
Tail = "500",
ShowStdout = true,
ShowStderr = true
}, context.CancellationToken);
var buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
try
{
using (var stream = await streamTask)
using (MemoryStream outStream = new MemoryStream(), errStream = new MemoryStream())
using (StreamReader outReader = new StreamReader(outStream), errReader = new StreamReader(errStream))
{
var inTask = Task.Run(async () =>
{
while (!context.CancellationToken.IsCancellationRequested)
{
var result = await stream.ReadOutputAsync(buffer, 0, buffer.Length, context.CancellationToken).ConfigureAwait(false);
if (result.EOF)
{
return;
}
Stream target;
switch (result.Target)
{
case MultiplexedStream.TargetStream.StandardOut:
target = outStream;
break;
case MultiplexedStream.TargetStream.StandardError:
target = errStream;
break;
default:
throw new InvalidOperationException($"Unexpected TargetStream: {result.Target}");
}
await target.WriteAsync(buffer, 0, result.Count, context.CancellationToken).ConfigureAwait(false);
}
});
Func<StreamReader, Task> readTheReader = async (StreamReader reader) =>
{
while (!context.CancellationToken.IsCancellationRequested)
{
var line = await reader.ReadLineAsync();
if (line == null)
{
return;
}
await responseStream.WriteAsync(new StreamContainerLogsResponse
{
Log = line
});
}
};
var outTask = Task.Run(() => readTheReader(outReader));
var errTask = Task.Run(() => readTheReader(errReader));
Task.WaitAll(new[] { inTask, outTask, errTask });
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment