Last active
February 17, 2022 12:22
-
-
Save djonasdev/a660b5181d980c2a070b1b730aba733c to your computer and use it in GitHub Desktop.
SubscriptionManager for convertersystems / opc-ua-client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Disposables; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using System.Threading.Tasks.Dataflow; | |
using Haprotec.Opc.Extensions; | |
using Haprotec.Status; | |
using NLog; | |
using Workstation.ServiceModel.Ua; | |
using Workstation.ServiceModel.Ua.Channels; | |
using static System.Int32; | |
namespace Haprotec.Opc.Ua | |
{ | |
/// <summary> | |
/// Handles all subscriptions to the <see cref="UaTcpSessionChannel"/> | |
/// </summary> | |
public class SubscriptionManager : IDisposable | |
{ | |
public Logger Logger { get; } | |
/// <summary> | |
/// Create a new subscription Manager linked to an <see cref="UaTcpSessionChannel"/> | |
/// </summary> | |
/// <param name="channel"></param> | |
/// <returns></returns> | |
public static SubscriptionManager FromChannel(UaTcpSessionChannel channel) | |
{ | |
if (!Channels.ContainsKey(channel.ChannelId)) | |
{ | |
var manager = new SubscriptionManager(channel); | |
if (Channels.TryAdd(channel.ChannelId, manager) == false) // if it fails, retry recursively | |
return FromChannel(channel); | |
} | |
return Channels[channel.ChannelId]; | |
} | |
private static readonly ConcurrentDictionary<uint, SubscriptionManager> Channels = new ConcurrentDictionary<uint, SubscriptionManager>(); | |
// ############################################################################################################################## | |
// Properties | |
// ############################################################################################################################## | |
#region Properties | |
// ########################################################################################## | |
// Public Properties | |
// ########################################################################################## | |
// ########################################################################################## | |
// Private Properties | |
// ########################################################################################## | |
private readonly UaTcpSessionChannel _channel; | |
/// <summary> | |
/// Mapping between subscription interval and the id of the subscription | |
/// interval in ms / subscription-ID | |
/// </summary> | |
private readonly Dictionary<uint, uint> _intervalSubscriptionIds = new(); | |
/// <summary> | |
/// Holds all monitored items of a subscription encapsulated in a subject | |
/// subscription-ID / notification stream | |
/// </summary> | |
private readonly Dictionary<uint, Subject<MonitoredItemNotification>> _subjects = new(); | |
private readonly HashSet<uint> _subscribedNodeIds = new(); | |
private readonly CompositeDisposable _compositeDisposable = new(); | |
#endregion | |
// ############################################################################################################################## | |
// Constructor | |
// ############################################################################################################################## | |
#region Constructor | |
private SubscriptionManager(UaTcpSessionChannel channel) | |
{ | |
Logger = LogManager.GetLogger($"{GetType().FullName}.{channel.GetLoggerIpSuffix()}"); | |
_channel = channel; | |
_channel.Faulted += (sender, args) => | |
{ | |
Channels.TryRemove(channel.ChannelId, out var s); | |
Dispose(); | |
}; | |
_channel.Closed += (sender, args) => | |
{ | |
Channels.TryRemove(channel.ChannelId, out var s); | |
Dispose(); | |
}; | |
} | |
#endregion | |
// ############################################################################################################################## | |
// public methods | |
// ############################################################################################################################## | |
#region public methods | |
private class SubscriptionWrapper | |
{ | |
private readonly TaskCompletionSource<HtReturnStatus<List<IObservable<DataValue>>>> _tcs = new(); | |
public Task<HtReturnStatus<List<IObservable<DataValue>>>> Task => _tcs.Task; | |
public List<ReferenceDescriptionEx> References { get; } | |
public SubscriptionWrapper(List<ReferenceDescriptionEx> references) | |
{ | |
References = references; | |
} | |
public void SetCompleted(HtReturnStatus<List<IObservable<DataValue>>> result) => _tcs.SetResult(result); | |
} | |
private class SubscriptionBuffer | |
{ | |
private readonly IPropagatorBlock<SubscriptionWrapper, IList<SubscriptionWrapper>> _buffer; | |
private readonly ActionBlock<IList<SubscriptionWrapper>> _subscriber; | |
public SubscriptionBuffer(ActionBlock<IList<SubscriptionWrapper>> subscriber) | |
{ | |
_subscriber = subscriber; | |
_buffer = CreateTimedBatchBlock<SubscriptionWrapper>(TimeSpan.FromMilliseconds(250), 400); | |
_buffer.LinkTo(_subscriber, new DataflowLinkOptions { PropagateCompletion = true }); | |
} | |
public void Post(SubscriptionWrapper wrapper) => _buffer.Post(wrapper); | |
private IPropagatorBlock<TIn, IList<TIn>> CreateTimedBatchBlock<TIn>(TimeSpan timeSpan, int count) | |
{ | |
var inBlock = new BufferBlock<TIn>(); | |
var outBlock = new BufferBlock<IList<TIn>>(); | |
var outObserver = outBlock.AsObserver(); | |
inBlock.AsObservable() | |
.Buffer(timeSpan, count) | |
.Where(ins => ins.Count > 0) | |
.ObserveOn(TaskPoolScheduler.Default) | |
.Subscribe(outObserver); | |
return DataflowBlock.Encapsulate(inBlock, outBlock); | |
} | |
} | |
public async Task<HtReturnStatus<IObservable<DataValue>>> CreateSubscription(ReferenceDescriptionEx reference, uint publishingInterval = 500) | |
{ | |
var result = await CreateMultipleSubscriptions(new List<ReferenceDescriptionEx> { reference }, publishingInterval); | |
if (result.HasErrors) | |
return HtReturnStatus.Failed<IObservable<DataValue>>(result); | |
return HtReturnStatus.Success(result.Result.First()); | |
} | |
private readonly ConcurrentDictionary<uint, SubscriptionBuffer> _subscriptionBuffer = new(); | |
public async Task<HtReturnStatus<List<IObservable<DataValue>>>> CreateMultipleSubscriptions(List<ReferenceDescriptionEx> references, uint publishingInterval = 500) | |
{ | |
// get or add subscription buffer | |
var buffer = _subscriptionBuffer.GetOrAdd(publishingInterval, new SubscriptionBuffer(new ActionBlock<IList<SubscriptionWrapper>>(async wrappers => | |
{ | |
void SetError(HtReturnStatus<List<IObservable<DataValue>>> status) | |
{ | |
foreach (var subscriptionWrapper in wrappers) | |
{ | |
subscriptionWrapper.SetCompleted(status); | |
} | |
} | |
try | |
{ | |
var r = wrappers.SelectMany(subscriptionWrapper => subscriptionWrapper.References).ToList(); | |
var subscriptions = await Subscribe(r); | |
if (subscriptions.HasErrors) | |
{ | |
SetError(HtReturnStatus.Failed<List<IObservable<DataValue>>>(subscriptions)); | |
} | |
else | |
{ | |
int index = 0; | |
for (int i = 0; i < wrappers.Count; i++) | |
{ | |
var w = wrappers[i]; | |
try | |
{ | |
w.SetCompleted(HtReturnStatus.Success(subscriptions.Result.GetRange(index, w.References.Count))); | |
} | |
catch (Exception e) | |
{ | |
w.SetCompleted(HtReturnStatus.Failed<List<IObservable<DataValue>>>(e)); | |
} | |
index += w.References.Count; | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
SetError(HtReturnStatus.Failed<List<IObservable<DataValue>>>(e)); | |
} | |
}))); | |
// create wrapper object | |
var wrapper = new SubscriptionWrapper(references); | |
// post into buffer | |
buffer.Post(wrapper); | |
// wait for completion | |
return await wrapper.Task; | |
} | |
private async Task<HtReturnStatus<List<IObservable<DataValue>>>> Subscribe(List<ReferenceDescriptionEx> references, uint publishingInterval = 500) | |
{ | |
// generate a unique number for each nodeId to link new pushed values and nodeID subscriptions together | |
var mapping = references.Select(r => (reference: r, uniqueId: (uint)r.NodeId.NodeId.ToString().GetHashCode())).ToList(); | |
// setup the channel subscription | |
var rGetId = await _GetSubscriptionIdAsync(publishingInterval); | |
if (rGetId.HasErrors) | |
return HtReturnStatus.Failed<List<IObservable<DataValue>>>(rGetId); | |
// Add new nodes to the channel subscription | |
var rSubscribe = await _SubscribeAsync(rGetId.Result, mapping); | |
// error occurred | |
if (rSubscribe.HasErrors) | |
return HtReturnStatus.Failed<List<IObservable<DataValue>>>(rSubscribe); | |
// create internal subscriptions | |
var subscriptions = mapping.Select(m => CreateInternalSubscription(rGetId.Result, m.reference, m.uniqueId)).ToList(); | |
return HtReturnStatus.Success(subscriptions); | |
} | |
private IObservable<DataValue> CreateInternalSubscription(uint subscriptionId, ReferenceDescriptionEx reference, uint uniqueId) | |
{ | |
return Observable | |
.Create<DataValue>(observer => | |
{ | |
// internal monitoring if a value is pushed after subscribing | |
// https://gitlab.ad.haprotec.de/dotnet/HtOPC/-/issues/99 | |
var monitor = Observable.Timer(TimeSpan.FromSeconds(5)).Take(1).Subscribe(l => | |
{ | |
Logger.Error($"No value pushed after subscribing to {reference} on subscription ({subscriptionId}). Please restart (switch off energy) the CPU!"); | |
}); | |
_compositeDisposable.Add(monitor); | |
return _subjects[subscriptionId].Where(t => t.ClientHandle == uniqueId).Subscribe( | |
notification => | |
{ | |
monitor.Dispose(); // stop monitor | |
notification.Value.CheckForMissingUdtInjection(Logger, reference); | |
observer.OnNext(notification.Value); | |
}); | |
}).Catch<DataValue, Exception>(ex => | |
{ | |
Logger.Error(ex); | |
return Observable.Empty<DataValue>(); | |
}); | |
} | |
private async Task<HtReturnStatus> _SubscribeAsync(uint id, List<(ReferenceDescriptionEx reference, uint uniqueId)> mapping) | |
{ | |
// detect duplicate node subscription | |
var filteredMapping = mapping.Where(tuple => | |
{ | |
if (_subscribedNodeIds.Contains(tuple.uniqueId)) | |
{ | |
Logger.Warn($"duplicate subscription detected! '{tuple.reference}' has already been subscribed."); | |
return false; | |
} | |
else | |
{ | |
_subscribedNodeIds.Add(tuple.uniqueId); | |
} | |
return true; | |
}).ToList(); | |
if (filteredMapping.Count == 0) | |
return HtReturnStatus.Success(); // if only 1 reference was supplied and it was a duplicate, we can leave here | |
// create 'monitor item create request' | |
var itemsToCreate = filteredMapping.Select(m => new MonitoredItemCreateRequest | |
{ | |
ItemToMonitor = new ReadValueId | |
{ | |
NodeId = m.reference.NodeId.NodeId, AttributeId = AttributeIds.Value | |
}, | |
MonitoringMode = MonitoringMode.Reporting, | |
RequestedParameters = new MonitoringParameters | |
{ | |
ClientHandle = m.uniqueId, SamplingInterval = -1, QueueSize = 0, DiscardOldest = true | |
} | |
}).ToArray(); | |
// add it into the mother request | |
var itemsRequest = new CreateMonitoredItemsRequest | |
{ | |
SubscriptionId = id, | |
ItemsToCreate = itemsToCreate, | |
}; | |
//Logger.Trace($"add ({filteredMapping.Count}) new monitoring items to the subscription ({id}): \n{string.Join("\n ", filteredMapping.Select(m => m.reference))}"); | |
Logger.Trace($"add new monitoring items ({string.Join(", ", filteredMapping.Select(m => m.reference))}) to the subscription ({id})"); | |
var itemsResponse = await _channel.CreateMonitoredItemsAsync(itemsRequest); | |
var result = itemsResponse.ToHtReturnStatus(); | |
// log errors | |
if(result.HasErrors) | |
{ | |
for (int i = 0; i < itemsToCreate.Length; i++) | |
{ | |
Logger.Error($"error subscribing to \"{itemsToCreate[i].ItemToMonitor.NodeId.Identifier}\": {itemsResponse.Results[i].StatusCode.GetHtErrorCode()} - {itemsResponse.Results[i].StatusCode.GetErrorDescription()}"); | |
} | |
return result; | |
} | |
// setup internal subscription | |
if (!_subjects.ContainsKey(id)) | |
{ | |
var observable = new Subject<MonitoredItemNotification>(); | |
_subjects.Add(id, observable); | |
// use existing subscription for the nodeId if available | |
var sub = _channel.Catch<PublishResponse, Exception>(ex => | |
{ | |
Logger.Error($"subscription \"{id}\" terminated. Can happen on OPC Server reboot ({ex.Message})"); | |
Logger.Trace(ex); | |
if (_intervalSubscriptionIds.ContainsKey(id)) | |
{ | |
_intervalSubscriptionIds.Remove(id); | |
} | |
return Observable.Empty<PublishResponse>(); | |
}).Where(s => s.SubscriptionId == id).Subscribe(response => | |
{ | |
// loop thru all the data change notifications | |
foreach (DataChangeNotification notification in response.NotificationMessage.NotificationData | |
.OfType<DataChangeNotification>()) | |
{ | |
foreach (MonitoredItemNotification itemNotification in notification.MonitoredItems) | |
{ | |
observable.OnNext(itemNotification); | |
} | |
} | |
}); | |
_compositeDisposable.Add(observable); | |
_compositeDisposable.Add(sub); | |
} | |
return result; | |
} | |
private async Task<HtReturnStatus<uint>> _GetSubscriptionIdAsync(uint requestedPublishingInterval) | |
{ | |
// first time. Fetch existing subscriptions from server | |
if (_intervalSubscriptionIds.Count == 0) | |
{ | |
Logger.Debug("fetching existing subscriptions from server"); | |
var result = await _channel.ReadSessionAndSubscriptionDiagnosticAsync(); | |
if (!result.HasErrors) | |
{ | |
var sessions = result.Result.Item1; | |
var subscriptions = result.Result.Item2; | |
var filteredSubscriptions = subscriptions.Where(s => | |
s.SessionId.Identifier.ToString().Equals(sessions | |
.OrderByDescending(type =>type.ClientConnectionTime).First(se => | |
se.ClientDescription.ApplicationName.Text.Equals(_channel.LocalDescription | |
.ApplicationName.Text)).SessionId.Identifier.ToString())).ToList(); | |
foreach (var subscription in filteredSubscriptions) | |
{ | |
Logger.Debug($"subscription with {(uint)subscription.PublishingInterval}ms ({subscription.SubscriptionId}) found"); | |
_intervalSubscriptionIds.Add((uint)subscription.PublishingInterval, subscription.SubscriptionId); | |
} | |
} | |
} | |
// use existing channel subscription if available | |
// or create a new one | |
uint id; | |
if (!_intervalSubscriptionIds.ContainsKey(requestedPublishingInterval)) | |
{ | |
var subscriptionRequest = new CreateSubscriptionRequest | |
{ | |
RequestedPublishingInterval = requestedPublishingInterval, | |
RequestedMaxKeepAliveCount = 30, | |
RequestedLifetimeCount = 30 * 3, | |
PublishingEnabled = true | |
}; | |
Logger.Debug($"create subscription for interval {requestedPublishingInterval}ms"); | |
var subscriptionResponse = await _channel.CreateSubscriptionAsync(subscriptionRequest); | |
// check error | |
if (subscriptionResponse.ResponseHeader.ServiceResult.Value != 0) | |
{ | |
Logger.Error($"error creating subscription: {subscriptionResponse.ResponseHeader.ServiceResult.GetHtErrorCode()} - {subscriptionResponse.ResponseHeader.ServiceResult.GetErrorDescription()}"); | |
return HtReturnStatus.Failed<uint>(subscriptionResponse.ResponseHeader.ServiceResult.GetHtErrorCode(), subscriptionResponse.ResponseHeader.ServiceResult.GetErrorDescription()); | |
} | |
id = subscriptionResponse.SubscriptionId; | |
var revisedPublishingInterval = (uint) subscriptionResponse.RevisedPublishingInterval; | |
Logger.Trace($"subscription created with {revisedPublishingInterval}ms (id = '{id}')"); | |
// check if server can handle the requested publishing interval | |
if (requestedPublishingInterval != revisedPublishingInterval) | |
{ | |
Logger.Error($"server configuration error: minimum publishing interval was set to \"{revisedPublishingInterval}ms\" instead of \"{requestedPublishingInterval}ms\""); | |
// check if existing subscription fits our needs, we can use it and delete the new one | |
if (_intervalSubscriptionIds.ContainsKey(revisedPublishingInterval)) | |
{ | |
Logger.Debug($"found an existing subscription ({_intervalSubscriptionIds[revisedPublishingInterval]})"); | |
_intervalSubscriptionIds.Add(requestedPublishingInterval, _intervalSubscriptionIds[revisedPublishingInterval]); | |
Logger.Debug($"delete the new created subscription (id = '{subscriptionResponse.SubscriptionId}')"); | |
var subscriptionDeleteRequest = new DeleteSubscriptionsRequest | |
{ | |
SubscriptionIds = new []{ subscriptionResponse.SubscriptionId } | |
}; | |
var deleteResponse = await _channel.DeleteSubscriptionsAsync(subscriptionDeleteRequest); | |
// check error | |
if (deleteResponse.ResponseHeader.ServiceResult.Value != 0) | |
{ | |
Logger.Error($"error deleting subscription: {deleteResponse.ResponseHeader.ServiceResult.GetHtErrorCode()} - {deleteResponse.ResponseHeader.ServiceResult.GetErrorDescription()}"); | |
} | |
else | |
{ | |
Logger.Trace("successfully deleted"); | |
} | |
return HtReturnStatus.Success(_intervalSubscriptionIds[revisedPublishingInterval]); | |
} | |
// add the replacement publishing interval subscription to the collection | |
// example: we needed 200ms but we got 500ms => also add the 500ms as we could also use it later, otherwise a second 500ms subscription would be created | |
else | |
{ | |
_intervalSubscriptionIds.Add(revisedPublishingInterval, id); | |
} | |
} | |
_intervalSubscriptionIds.Add(requestedPublishingInterval, id); | |
} | |
else | |
{ | |
id = _intervalSubscriptionIds[requestedPublishingInterval]; | |
} | |
return HtReturnStatus.Success(id); | |
} | |
#endregion | |
public void Dispose() | |
{ | |
_compositeDisposable.Clear(); | |
_subscribedNodeIds.Clear(); | |
} | |
} | |
} |
2022.02.10 added logic to detect duplicate subscriptions
2022.02.17 Added TPL pattern to subscribe multiple nodes simultaneously using cache
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
SubscriptionManager for https://github.com/convertersystems/opc-ua-client