Skip to content

Instantly share code, notes, and snippets.

@kenoma
Last active May 28, 2020 08:30
Show Gist options
  • Save kenoma/d854bbf121fe1bf377a4da9f58a4cde1 to your computer and use it in GitHub Desktop.
Save kenoma/d854bbf121fe1bf377a4da9f58a4cde1 to your computer and use it in GitHub Desktop.
MJPEG stream capture with .Net Core 3.0 System.IO.Pipelines features
using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using CvLab.Framework.IO.Camera;
using CvLab.Framework.Standard.Extensions;
using Prometheus;
using Serilog;
namespace AnNamespace.VideoCapture
{
public class NetCoreMJPEGCapture
{
private static readonly Counter _framesProcessed =
Metrics.CreateCounter("railocr_frames_captured_frames", "Количество кадров, захваченных камерой");
private static readonly Counter _sourceErrors =
Metrics.CreateCounter("railocr_handler_videosource_errors", "Количество ошибок при работе с потоком камер");
private static readonly Histogram _frameProcessingHistogramm =
Metrics.CreateHistogram("railocr_handler_frame_capturing_seconds", "Время первичной обработки кадра (захват, распознавание метки времени)",
new HistogramConfiguration { Buckets = new[] { 1e-4, 1e-3, 1e-2, 1e-1, 1 } });
private static readonly Summary _averageFrameSize = Metrics
.CreateSummary("railocr_frame_size_bytes", "Средний размер кадра в байтах");
private const string _httpUserAgent = "Mozilla/5.0";
private const int _requestTimeout = 60000;
private readonly ILogger _logger;
private readonly TimestampExtractor _ocrCapture;
private readonly CameraInfo _camera;
private static readonly byte[] _jpegStart = new byte[] { 0xFF, 0xD8 };
private static readonly byte[] _jpegEnd = new byte[] { 0xFF, 0xD9 };
/// <summary>
/// Initializes a new instance of the <see cref="CameraCapture"/> class.
/// </summary>
/// <param name="logger"></param>
/// <param name="ocrCapture"></param>
/// <param name="cameraInfo"></param>
public NetCoreMJPEGCapture(CameraInfo cameraInfo, ILogger logger, TimestampExtractor ocrCapture)
{
_camera = cameraInfo ?? throw new ArgumentNullException(nameof(cameraInfo));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_ocrCapture = ocrCapture ?? throw new ArgumentNullException(nameof(ocrCapture));
if (string.IsNullOrWhiteSpace(_camera.Url))
throw new ArgumentException("Video source is not specified.");
}
public Task Start(CancellationToken token)
{
return WorkerThread(token);
}
private async Task WorkerThread(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
await EstablshSessionWithCamera(token);
_logger.Warning("Stream with camera {@Camera} lost.", _camera);
_sourceErrors.Inc();
}
_logger.Warning("Capturing thread for camera {@Camera} finished", _camera);
}
private async Task EstablshSessionWithCamera(CancellationToken token)
{
try
{
_logger.Information("Requesting stream {Source} ...", _camera.Url);
var request = (HttpWebRequest)WebRequest.Create(_camera.Url);
request.UserAgent = _httpUserAgent;
request.Timeout = _requestTimeout;
request.ConnectionGroupName = Guid.NewGuid().ToString();
if (_camera.TryGetNetworkCredential(out NetworkCredential credential))
{
request.Credentials = credential;
var authInfo = $"{credential.UserName}:{credential.Password}";
authInfo = Convert.ToBase64String(Encoding.Default.GetBytes(authInfo));
request.Headers["Authorization"] = $"Basic {authInfo}";
}
using (var response = await request.GetResponseAsync())
using (var stream = response.GetResponseStream())
{
await ReadPipeAsync(stream, token);
}
}
catch (Exception ex)
{
_logger.Error(ex, "CameraCapture failed to get frame");
_sourceErrors.Inc();
}
}
private async Task ReadPipeAsync(Stream stream, CancellationToken cancellationToken = default(CancellationToken))
{
var pipeReader = PipeReader.Create(stream, new StreamPipeReaderOptions(bufferSize: 32));
while (!cancellationToken.IsCancellationRequested)
{
var result = await pipeReader.ReadAsync(cancellationToken);
var buffer = result.Buffer;
var position = ReadFrames(buffer);
if (result.IsCompleted)
break;
pipeReader.AdvanceTo(position, buffer.End);
}
pipeReader.Complete();
}
private SequencePosition ReadFrames(in ReadOnlySequence<byte> sequence)
{
var reader = new SequenceReader<byte>(sequence);
var delimStart = new ReadOnlySpan<byte>(_jpegStart);
var delimEnd = new ReadOnlySpan<byte>(_jpegEnd);
while (!reader.End)
{
if (reader.TryReadTo(out var _, delimStart, advancePastDelimiter: false) &&
reader.TryReadTo(out var itemBytes, delimEnd, advancePastDelimiter: true))
{
ProcessFrame(itemBytes);
}
else
{
break;
}
}
return reader.Position;
}
private void ProcessFrame(in ReadOnlySequence<byte> readOnlySequence)
{
using (_frameProcessingHistogramm.NewTimer())
{
var data = readOnlySequence.ToArray();
_ocrCapture.ProcessImage(data);
_framesProcessed.Inc();
_averageFrameSize.Observe(data.Length);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment