Skip to content

Instantly share code, notes, and snippets.

@djonasdev
Last active February 17, 2022 12:22
Show Gist options
  • Save djonasdev/a660b5181d980c2a070b1b730aba733c to your computer and use it in GitHub Desktop.
Save djonasdev/a660b5181d980c2a070b1b730aba733c to your computer and use it in GitHub Desktop.
SubscriptionManager for convertersystems / opc-ua-client
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Haprotec.Opc.Extensions;
using Haprotec.Status;
using NLog;
using Workstation.ServiceModel.Ua;
using Workstation.ServiceModel.Ua.Channels;
using static System.Int32;
namespace Haprotec.Opc.Ua
{
/// <summary>
/// Handles all subscriptions to the <see cref="UaTcpSessionChannel"/>
/// </summary>
public class SubscriptionManager : IDisposable
{
public Logger Logger { get; }
/// <summary>
/// Create a new subscription Manager linked to an <see cref="UaTcpSessionChannel"/>
/// </summary>
/// <param name="channel"></param>
/// <returns></returns>
public static SubscriptionManager FromChannel(UaTcpSessionChannel channel)
{
if (!Channels.ContainsKey(channel.ChannelId))
{
var manager = new SubscriptionManager(channel);
if (Channels.TryAdd(channel.ChannelId, manager) == false) // if it fails, retry recursively
return FromChannel(channel);
}
return Channels[channel.ChannelId];
}
private static readonly ConcurrentDictionary<uint, SubscriptionManager> Channels = new ConcurrentDictionary<uint, SubscriptionManager>();
// ##############################################################################################################################
// Properties
// ##############################################################################################################################
#region Properties
// ##########################################################################################
// Public Properties
// ##########################################################################################
// ##########################################################################################
// Private Properties
// ##########################################################################################
private readonly UaTcpSessionChannel _channel;
/// <summary>
/// Mapping between subscription interval and the id of the subscription
/// interval in ms / subscription-ID
/// </summary>
private readonly Dictionary<uint, uint> _intervalSubscriptionIds = new();
/// <summary>
/// Holds all monitored items of a subscription encapsulated in a subject
/// subscription-ID / notification stream
/// </summary>
private readonly Dictionary<uint, Subject<MonitoredItemNotification>> _subjects = new();
private readonly HashSet<uint> _subscribedNodeIds = new();
private readonly CompositeDisposable _compositeDisposable = new();
#endregion
// ##############################################################################################################################
// Constructor
// ##############################################################################################################################
#region Constructor
private SubscriptionManager(UaTcpSessionChannel channel)
{
Logger = LogManager.GetLogger($"{GetType().FullName}.{channel.GetLoggerIpSuffix()}");
_channel = channel;
_channel.Faulted += (sender, args) =>
{
Channels.TryRemove(channel.ChannelId, out var s);
Dispose();
};
_channel.Closed += (sender, args) =>
{
Channels.TryRemove(channel.ChannelId, out var s);
Dispose();
};
}
#endregion
// ##############################################################################################################################
// public methods
// ##############################################################################################################################
#region public methods
private class SubscriptionWrapper
{
private readonly TaskCompletionSource<HtReturnStatus<List<IObservable<DataValue>>>> _tcs = new();
public Task<HtReturnStatus<List<IObservable<DataValue>>>> Task => _tcs.Task;
public List<ReferenceDescriptionEx> References { get; }
public SubscriptionWrapper(List<ReferenceDescriptionEx> references)
{
References = references;
}
public void SetCompleted(HtReturnStatus<List<IObservable<DataValue>>> result) => _tcs.SetResult(result);
}
private class SubscriptionBuffer
{
private readonly IPropagatorBlock<SubscriptionWrapper, IList<SubscriptionWrapper>> _buffer;
private readonly ActionBlock<IList<SubscriptionWrapper>> _subscriber;
public SubscriptionBuffer(ActionBlock<IList<SubscriptionWrapper>> subscriber)
{
_subscriber = subscriber;
_buffer = CreateTimedBatchBlock<SubscriptionWrapper>(TimeSpan.FromMilliseconds(250), 400);
_buffer.LinkTo(_subscriber, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Post(SubscriptionWrapper wrapper) => _buffer.Post(wrapper);
private IPropagatorBlock<TIn, IList<TIn>> CreateTimedBatchBlock<TIn>(TimeSpan timeSpan, int count)
{
var inBlock = new BufferBlock<TIn>();
var outBlock = new BufferBlock<IList<TIn>>();
var outObserver = outBlock.AsObserver();
inBlock.AsObservable()
.Buffer(timeSpan, count)
.Where(ins => ins.Count > 0)
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(outObserver);
return DataflowBlock.Encapsulate(inBlock, outBlock);
}
}
public async Task<HtReturnStatus<IObservable<DataValue>>> CreateSubscription(ReferenceDescriptionEx reference, uint publishingInterval = 500)
{
var result = await CreateMultipleSubscriptions(new List<ReferenceDescriptionEx> { reference }, publishingInterval);
if (result.HasErrors)
return HtReturnStatus.Failed<IObservable<DataValue>>(result);
return HtReturnStatus.Success(result.Result.First());
}
private readonly ConcurrentDictionary<uint, SubscriptionBuffer> _subscriptionBuffer = new();
public async Task<HtReturnStatus<List<IObservable<DataValue>>>> CreateMultipleSubscriptions(List<ReferenceDescriptionEx> references, uint publishingInterval = 500)
{
// get or add subscription buffer
var buffer = _subscriptionBuffer.GetOrAdd(publishingInterval, new SubscriptionBuffer(new ActionBlock<IList<SubscriptionWrapper>>(async wrappers =>
{
void SetError(HtReturnStatus<List<IObservable<DataValue>>> status)
{
foreach (var subscriptionWrapper in wrappers)
{
subscriptionWrapper.SetCompleted(status);
}
}
try
{
var r = wrappers.SelectMany(subscriptionWrapper => subscriptionWrapper.References).ToList();
var subscriptions = await Subscribe(r);
if (subscriptions.HasErrors)
{
SetError(HtReturnStatus.Failed<List<IObservable<DataValue>>>(subscriptions));
}
else
{
int index = 0;
for (int i = 0; i < wrappers.Count; i++)
{
var w = wrappers[i];
try
{
w.SetCompleted(HtReturnStatus.Success(subscriptions.Result.GetRange(index, w.References.Count)));
}
catch (Exception e)
{
w.SetCompleted(HtReturnStatus.Failed<List<IObservable<DataValue>>>(e));
}
index += w.References.Count;
}
}
}
catch (Exception e)
{
SetError(HtReturnStatus.Failed<List<IObservable<DataValue>>>(e));
}
})));
// create wrapper object
var wrapper = new SubscriptionWrapper(references);
// post into buffer
buffer.Post(wrapper);
// wait for completion
return await wrapper.Task;
}
private async Task<HtReturnStatus<List<IObservable<DataValue>>>> Subscribe(List<ReferenceDescriptionEx> references, uint publishingInterval = 500)
{
// generate a unique number for each nodeId to link new pushed values and nodeID subscriptions together
var mapping = references.Select(r => (reference: r, uniqueId: (uint)r.NodeId.NodeId.ToString().GetHashCode())).ToList();
// setup the channel subscription
var rGetId = await _GetSubscriptionIdAsync(publishingInterval);
if (rGetId.HasErrors)
return HtReturnStatus.Failed<List<IObservable<DataValue>>>(rGetId);
// Add new nodes to the channel subscription
var rSubscribe = await _SubscribeAsync(rGetId.Result, mapping);
// error occurred
if (rSubscribe.HasErrors)
return HtReturnStatus.Failed<List<IObservable<DataValue>>>(rSubscribe);
// create internal subscriptions
var subscriptions = mapping.Select(m => CreateInternalSubscription(rGetId.Result, m.reference, m.uniqueId)).ToList();
return HtReturnStatus.Success(subscriptions);
}
private IObservable<DataValue> CreateInternalSubscription(uint subscriptionId, ReferenceDescriptionEx reference, uint uniqueId)
{
return Observable
.Create<DataValue>(observer =>
{
// internal monitoring if a value is pushed after subscribing
// https://gitlab.ad.haprotec.de/dotnet/HtOPC/-/issues/99
var monitor = Observable.Timer(TimeSpan.FromSeconds(5)).Take(1).Subscribe(l =>
{
Logger.Error($"No value pushed after subscribing to {reference} on subscription ({subscriptionId}). Please restart (switch off energy) the CPU!");
});
_compositeDisposable.Add(monitor);
return _subjects[subscriptionId].Where(t => t.ClientHandle == uniqueId).Subscribe(
notification =>
{
monitor.Dispose(); // stop monitor
notification.Value.CheckForMissingUdtInjection(Logger, reference);
observer.OnNext(notification.Value);
});
}).Catch<DataValue, Exception>(ex =>
{
Logger.Error(ex);
return Observable.Empty<DataValue>();
});
}
private async Task<HtReturnStatus> _SubscribeAsync(uint id, List<(ReferenceDescriptionEx reference, uint uniqueId)> mapping)
{
// detect duplicate node subscription
var filteredMapping = mapping.Where(tuple =>
{
if (_subscribedNodeIds.Contains(tuple.uniqueId))
{
Logger.Warn($"duplicate subscription detected! '{tuple.reference}' has already been subscribed.");
return false;
}
else
{
_subscribedNodeIds.Add(tuple.uniqueId);
}
return true;
}).ToList();
if (filteredMapping.Count == 0)
return HtReturnStatus.Success(); // if only 1 reference was supplied and it was a duplicate, we can leave here
// create 'monitor item create request'
var itemsToCreate = filteredMapping.Select(m => new MonitoredItemCreateRequest
{
ItemToMonitor = new ReadValueId
{
NodeId = m.reference.NodeId.NodeId, AttributeId = AttributeIds.Value
},
MonitoringMode = MonitoringMode.Reporting,
RequestedParameters = new MonitoringParameters
{
ClientHandle = m.uniqueId, SamplingInterval = -1, QueueSize = 0, DiscardOldest = true
}
}).ToArray();
// add it into the mother request
var itemsRequest = new CreateMonitoredItemsRequest
{
SubscriptionId = id,
ItemsToCreate = itemsToCreate,
};
//Logger.Trace($"add ({filteredMapping.Count}) new monitoring items to the subscription ({id}): \n{string.Join("\n ", filteredMapping.Select(m => m.reference))}");
Logger.Trace($"add new monitoring items ({string.Join(", ", filteredMapping.Select(m => m.reference))}) to the subscription ({id})");
var itemsResponse = await _channel.CreateMonitoredItemsAsync(itemsRequest);
var result = itemsResponse.ToHtReturnStatus();
// log errors
if(result.HasErrors)
{
for (int i = 0; i < itemsToCreate.Length; i++)
{
Logger.Error($"error subscribing to \"{itemsToCreate[i].ItemToMonitor.NodeId.Identifier}\": {itemsResponse.Results[i].StatusCode.GetHtErrorCode()} - {itemsResponse.Results[i].StatusCode.GetErrorDescription()}");
}
return result;
}
// setup internal subscription
if (!_subjects.ContainsKey(id))
{
var observable = new Subject<MonitoredItemNotification>();
_subjects.Add(id, observable);
// use existing subscription for the nodeId if available
var sub = _channel.Catch<PublishResponse, Exception>(ex =>
{
Logger.Error($"subscription \"{id}\" terminated. Can happen on OPC Server reboot ({ex.Message})");
Logger.Trace(ex);
if (_intervalSubscriptionIds.ContainsKey(id))
{
_intervalSubscriptionIds.Remove(id);
}
return Observable.Empty<PublishResponse>();
}).Where(s => s.SubscriptionId == id).Subscribe(response =>
{
// loop thru all the data change notifications
foreach (DataChangeNotification notification in response.NotificationMessage.NotificationData
.OfType<DataChangeNotification>())
{
foreach (MonitoredItemNotification itemNotification in notification.MonitoredItems)
{
observable.OnNext(itemNotification);
}
}
});
_compositeDisposable.Add(observable);
_compositeDisposable.Add(sub);
}
return result;
}
private async Task<HtReturnStatus<uint>> _GetSubscriptionIdAsync(uint requestedPublishingInterval)
{
// first time. Fetch existing subscriptions from server
if (_intervalSubscriptionIds.Count == 0)
{
Logger.Debug("fetching existing subscriptions from server");
var result = await _channel.ReadSessionAndSubscriptionDiagnosticAsync();
if (!result.HasErrors)
{
var sessions = result.Result.Item1;
var subscriptions = result.Result.Item2;
var filteredSubscriptions = subscriptions.Where(s =>
s.SessionId.Identifier.ToString().Equals(sessions
.OrderByDescending(type =>type.ClientConnectionTime).First(se =>
se.ClientDescription.ApplicationName.Text.Equals(_channel.LocalDescription
.ApplicationName.Text)).SessionId.Identifier.ToString())).ToList();
foreach (var subscription in filteredSubscriptions)
{
Logger.Debug($"subscription with {(uint)subscription.PublishingInterval}ms ({subscription.SubscriptionId}) found");
_intervalSubscriptionIds.Add((uint)subscription.PublishingInterval, subscription.SubscriptionId);
}
}
}
// use existing channel subscription if available
// or create a new one
uint id;
if (!_intervalSubscriptionIds.ContainsKey(requestedPublishingInterval))
{
var subscriptionRequest = new CreateSubscriptionRequest
{
RequestedPublishingInterval = requestedPublishingInterval,
RequestedMaxKeepAliveCount = 30,
RequestedLifetimeCount = 30 * 3,
PublishingEnabled = true
};
Logger.Debug($"create subscription for interval {requestedPublishingInterval}ms");
var subscriptionResponse = await _channel.CreateSubscriptionAsync(subscriptionRequest);
// check error
if (subscriptionResponse.ResponseHeader.ServiceResult.Value != 0)
{
Logger.Error($"error creating subscription: {subscriptionResponse.ResponseHeader.ServiceResult.GetHtErrorCode()} - {subscriptionResponse.ResponseHeader.ServiceResult.GetErrorDescription()}");
return HtReturnStatus.Failed<uint>(subscriptionResponse.ResponseHeader.ServiceResult.GetHtErrorCode(), subscriptionResponse.ResponseHeader.ServiceResult.GetErrorDescription());
}
id = subscriptionResponse.SubscriptionId;
var revisedPublishingInterval = (uint) subscriptionResponse.RevisedPublishingInterval;
Logger.Trace($"subscription created with {revisedPublishingInterval}ms (id = '{id}')");
// check if server can handle the requested publishing interval
if (requestedPublishingInterval != revisedPublishingInterval)
{
Logger.Error($"server configuration error: minimum publishing interval was set to \"{revisedPublishingInterval}ms\" instead of \"{requestedPublishingInterval}ms\"");
// check if existing subscription fits our needs, we can use it and delete the new one
if (_intervalSubscriptionIds.ContainsKey(revisedPublishingInterval))
{
Logger.Debug($"found an existing subscription ({_intervalSubscriptionIds[revisedPublishingInterval]})");
_intervalSubscriptionIds.Add(requestedPublishingInterval, _intervalSubscriptionIds[revisedPublishingInterval]);
Logger.Debug($"delete the new created subscription (id = '{subscriptionResponse.SubscriptionId}')");
var subscriptionDeleteRequest = new DeleteSubscriptionsRequest
{
SubscriptionIds = new []{ subscriptionResponse.SubscriptionId }
};
var deleteResponse = await _channel.DeleteSubscriptionsAsync(subscriptionDeleteRequest);
// check error
if (deleteResponse.ResponseHeader.ServiceResult.Value != 0)
{
Logger.Error($"error deleting subscription: {deleteResponse.ResponseHeader.ServiceResult.GetHtErrorCode()} - {deleteResponse.ResponseHeader.ServiceResult.GetErrorDescription()}");
}
else
{
Logger.Trace("successfully deleted");
}
return HtReturnStatus.Success(_intervalSubscriptionIds[revisedPublishingInterval]);
}
// add the replacement publishing interval subscription to the collection
// example: we needed 200ms but we got 500ms => also add the 500ms as we could also use it later, otherwise a second 500ms subscription would be created
else
{
_intervalSubscriptionIds.Add(revisedPublishingInterval, id);
}
}
_intervalSubscriptionIds.Add(requestedPublishingInterval, id);
}
else
{
id = _intervalSubscriptionIds[requestedPublishingInterval];
}
return HtReturnStatus.Success(id);
}
#endregion
public void Dispose()
{
_compositeDisposable.Clear();
_subscribedNodeIds.Clear();
}
}
}
@djonasdev
Copy link
Author

@djonasdev
Copy link
Author

2022.02.10 added logic to detect duplicate subscriptions

@djonasdev
Copy link
Author

2022.02.17 Added TPL pattern to subscribe multiple nodes simultaneously using cache

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment