Last active
May 28, 2020 08:30
-
-
Save kenoma/d854bbf121fe1bf377a4da9f58a4cde1 to your computer and use it in GitHub Desktop.
MJPEG stream capture with .Net Core 3.0 System.IO.Pipelines features
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Buffers; | |
using System.IO; | |
using System.IO.Pipelines; | |
using System.Net; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using CvLab.Framework.IO.Camera; | |
using CvLab.Framework.Standard.Extensions; | |
using Prometheus; | |
using Serilog; | |
namespace AnNamespace.VideoCapture | |
{ | |
public class NetCoreMJPEGCapture | |
{ | |
private static readonly Counter _framesProcessed = | |
Metrics.CreateCounter("railocr_frames_captured_frames", "Количество кадров, захваченных камерой"); | |
private static readonly Counter _sourceErrors = | |
Metrics.CreateCounter("railocr_handler_videosource_errors", "Количество ошибок при работе с потоком камер"); | |
private static readonly Histogram _frameProcessingHistogramm = | |
Metrics.CreateHistogram("railocr_handler_frame_capturing_seconds", "Время первичной обработки кадра (захват, распознавание метки времени)", | |
new HistogramConfiguration { Buckets = new[] { 1e-4, 1e-3, 1e-2, 1e-1, 1 } }); | |
private static readonly Summary _averageFrameSize = Metrics | |
.CreateSummary("railocr_frame_size_bytes", "Средний размер кадра в байтах"); | |
private const string _httpUserAgent = "Mozilla/5.0"; | |
private const int _requestTimeout = 60000; | |
private readonly ILogger _logger; | |
private readonly TimestampExtractor _ocrCapture; | |
private readonly CameraInfo _camera; | |
private static readonly byte[] _jpegStart = new byte[] { 0xFF, 0xD8 }; | |
private static readonly byte[] _jpegEnd = new byte[] { 0xFF, 0xD9 }; | |
/// <summary> | |
/// Initializes a new instance of the <see cref="CameraCapture"/> class. | |
/// </summary> | |
/// <param name="logger"></param> | |
/// <param name="ocrCapture"></param> | |
/// <param name="cameraInfo"></param> | |
public NetCoreMJPEGCapture(CameraInfo cameraInfo, ILogger logger, TimestampExtractor ocrCapture) | |
{ | |
_camera = cameraInfo ?? throw new ArgumentNullException(nameof(cameraInfo)); | |
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |
_ocrCapture = ocrCapture ?? throw new ArgumentNullException(nameof(ocrCapture)); | |
if (string.IsNullOrWhiteSpace(_camera.Url)) | |
throw new ArgumentException("Video source is not specified."); | |
} | |
public Task Start(CancellationToken token) | |
{ | |
return WorkerThread(token); | |
} | |
private async Task WorkerThread(CancellationToken token) | |
{ | |
while (!token.IsCancellationRequested) | |
{ | |
await EstablshSessionWithCamera(token); | |
_logger.Warning("Stream with camera {@Camera} lost.", _camera); | |
_sourceErrors.Inc(); | |
} | |
_logger.Warning("Capturing thread for camera {@Camera} finished", _camera); | |
} | |
private async Task EstablshSessionWithCamera(CancellationToken token) | |
{ | |
try | |
{ | |
_logger.Information("Requesting stream {Source} ...", _camera.Url); | |
var request = (HttpWebRequest)WebRequest.Create(_camera.Url); | |
request.UserAgent = _httpUserAgent; | |
request.Timeout = _requestTimeout; | |
request.ConnectionGroupName = Guid.NewGuid().ToString(); | |
if (_camera.TryGetNetworkCredential(out NetworkCredential credential)) | |
{ | |
request.Credentials = credential; | |
var authInfo = $"{credential.UserName}:{credential.Password}"; | |
authInfo = Convert.ToBase64String(Encoding.Default.GetBytes(authInfo)); | |
request.Headers["Authorization"] = $"Basic {authInfo}"; | |
} | |
using (var response = await request.GetResponseAsync()) | |
using (var stream = response.GetResponseStream()) | |
{ | |
await ReadPipeAsync(stream, token); | |
} | |
} | |
catch (Exception ex) | |
{ | |
_logger.Error(ex, "CameraCapture failed to get frame"); | |
_sourceErrors.Inc(); | |
} | |
} | |
private async Task ReadPipeAsync(Stream stream, CancellationToken cancellationToken = default(CancellationToken)) | |
{ | |
var pipeReader = PipeReader.Create(stream, new StreamPipeReaderOptions(bufferSize: 32)); | |
while (!cancellationToken.IsCancellationRequested) | |
{ | |
var result = await pipeReader.ReadAsync(cancellationToken); | |
var buffer = result.Buffer; | |
var position = ReadFrames(buffer); | |
if (result.IsCompleted) | |
break; | |
pipeReader.AdvanceTo(position, buffer.End); | |
} | |
pipeReader.Complete(); | |
} | |
private SequencePosition ReadFrames(in ReadOnlySequence<byte> sequence) | |
{ | |
var reader = new SequenceReader<byte>(sequence); | |
var delimStart = new ReadOnlySpan<byte>(_jpegStart); | |
var delimEnd = new ReadOnlySpan<byte>(_jpegEnd); | |
while (!reader.End) | |
{ | |
if (reader.TryReadTo(out var _, delimStart, advancePastDelimiter: false) && | |
reader.TryReadTo(out var itemBytes, delimEnd, advancePastDelimiter: true)) | |
{ | |
ProcessFrame(itemBytes); | |
} | |
else | |
{ | |
break; | |
} | |
} | |
return reader.Position; | |
} | |
private void ProcessFrame(in ReadOnlySequence<byte> readOnlySequence) | |
{ | |
using (_frameProcessingHistogramm.NewTimer()) | |
{ | |
var data = readOnlySequence.ToArray(); | |
_ocrCapture.ProcessImage(data); | |
_framesProcessed.Inc(); | |
_averageFrameSize.Observe(data.Length); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment