Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices;
using Microsoft.Azure.Devices.Shared;
using Newtonsoft.Json;
namespace iothubapi.Iothub
{
public class IothubManager
{
// Queries to selects only the properties we need from twins
private const string QUERY_PREFIX = "SELECT deviceId, capabilities, status, lastActivityTime FROM devices";
private const string MODULE_QUERY_PREFIX = "SELECT deviceId, moduleId, lastActivityTime FROM devices.modules";
// Default conditions that are always used to limit the result
private const string DEVICE_ENABLED_QUERY = "status = 'enabled'";
private const string MODULE_ACTIVE_QUERY = "lastActivityTime > '0001-01-01T00:00:00Z'";
public IothubManager(IothubManagerConfigReader configReader)
{
var builder = IotHubConnectionStringBuilder.Create(configReader.IothubConnectionString);
Registry = RegistryManager.CreateFromConnectionString(builder.ToString());
IothubHostname = builder.HostName;
}
public IothubManager(RegistryManager registry, string iothubHostname)
{
Registry = registry;
IothubHostname = iothubHostname;
}
public RegistryManager Registry { get; }
public string IothubHostname { get; }
/// <inheritdoc />
public async Task<List<TwinActivity>> GetDeviceActivityAsync(
string deviceQuery = null, string moduleQuery = null, CancellationToken cancellationToken = default,
ILogger log = null)
{
var logger = (log ?? Log.Logger).ForContext<IothubManager>();
var queryEnabledOnly = CombinedQuery(deviceQuery, DEVICE_ENABLED_QUERY);
var dQuery = $"{QUERY_PREFIX} WHERE {queryEnabledOnly}";
var queryActiveModulesOnly = CombinedQuery(moduleQuery, MODULE_ACTIVE_QUERY);
var mQuery = $"{MODULE_QUERY_PREFIX} WHERE {queryActiveModulesOnly}";
var devicesTask = GetTwinsByQueryAsync(dQuery, null, -1, cancellationToken, logger);
var modulesTask = GetTwinsByQueryAsync(mQuery, null, -1, cancellationToken, logger);
// Parallel execution
await Task.WhenAll(devicesTask, modulesTask);
var (devices, modules) = (await devicesTask, await modulesTask);
var edgeDevices = GetEdgeDevices(devices.Result, modules.Result);
return devices.Result.Select(
twin => {
var foundEdgeModule = edgeDevices.TryGetValue(twin.DeviceId, out var edgeDevice);
return new TwinActivity
{
DeviceId = twin.DeviceId,
IsEdgeDevice = foundEdgeModule,
LatestActivity = foundEdgeModule
? edgeDevice.LastActiveModule.LastActivityTime
: twin.LastActivityTime
};
}
)
.ToList();
}
/// <summary>
/// From the lists of given device and module twins, get the ones that that identify having IoT Edge capability
/// and return the device and the last active module for that device.
/// </summary>
private static Dictionary<string, (Twin Twin, Twin LastActiveModule)>
GetEdgeDevices(IEnumerable<Twin> deviceTwins, IEnumerable<Twin> moduleTwins)
{
var devicesWithModuleActivity = moduleTwins.GroupBy(x => x.DeviceId)
.Select(g => g.OrderByDescending(x => x.LastActivityTime).First())
.ToDictionary(x => x.DeviceId, x => x);
return deviceTwins.Where(twin => twin.Capabilities?.IotEdge ?? false)
.Join(
devicesWithModuleActivity, x => x.DeviceId, x => x.Key,
(twin, pair) => (pair.Key, Twin: twin, ModuleTwin: pair.Value)
)
.ToDictionary(x => x.Key, x => (x.Twin, x.ModuleTwin));
}
/// <summary>
/// Get twin result by query.
/// </summary>
/// <param name="query">The query</param>
/// <param name="continuationToken">The continuationToken or null</param>
/// <param name="numberOfResults">The max result or -1</param>
/// <param name="ct">Cancellation token</param>
/// <param name="logger">Serilog logger</param>
/// <returns></returns>
private async Task<ResultWithContinuationToken<List<Twin>>> GetTwinsByQueryAsync(
string query, string continuationToken, int numberOfResults, CancellationToken ct, ILogger logger)
{
var twins = new List<Twin>();
var twinQuery = Registry.CreateQuery(query);
logger.Debug("Twin query: {Query}", query);
var options = new QueryOptions {ContinuationToken = continuationToken};
while (twinQuery.HasMoreResults && numberOfResults == -1 || twins.Count < numberOfResults)
{
ct.ThrowIfCancellationRequested();
var response = await twinQuery.GetNextAsJsonAsync(options);
options.ContinuationToken = response.ContinuationToken;
var convert = new TwinJsonConverter();
twins.AddRange(
response.Select(
x => {
using (var reader = new JsonTextReader(new StringReader(x)))
{
return reader.Read()
? convert.ReadJson(reader, typeof(Twin), null, JsonSerializer.CreateDefault())
: null;
}
}
)
.Where(x => x != null)
.Cast<Twin>()
);
}
return new ResultWithContinuationToken<List<Twin>>(
twins, twinQuery.HasMoreResults ? options.ContinuationToken : null
);
}
private class ResultWithContinuationToken<T>
{
public ResultWithContinuationToken(T queryResult, string continuationToken)
{
Result = queryResult;
ContinuationToken = continuationToken;
}
public T Result { get; }
public string ContinuationToken { get; }
}
// Helper method to combine query constraints with "AND"
private static string CombinedQuery(params string[] queries) =>
string.Join(" AND ", queries.Where(x => !string.IsNullOrEmpty(x)).ToArray());
}
public class IothubManagerConfigReader
{
public IothubManagerConfigReader()
{
_ = IothubConnectionString ?? throw new ApplicationException("IOTHUB_CONNECTION_STRING is null");
}
public string IothubConnectionString => Environment.GetEnvironmentVariable("IOTHUB_CONNECTION_STRING");
}
public class TwinActivity
{
public string DeviceId { get; set; }
public DateTime? LatestActivity { get; set; }
public string IothubHostname { get; set; }
public bool IsEdgeDevice { get; set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.