Created
July 13, 2019 09:18
-
-
Save dsschneidermann/a9aaa560b8e77d49d17d6cc8f45564dd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.IO; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Microsoft.Azure.Devices; | |
using Microsoft.Azure.Devices.Shared; | |
using Newtonsoft.Json; | |
namespace iothubapi.Iothub | |
{ | |
public class IothubManager | |
{ | |
// Queries to selects only the properties we need from twins | |
private const string QUERY_PREFIX = "SELECT deviceId, capabilities, status, lastActivityTime FROM devices"; | |
private const string MODULE_QUERY_PREFIX = "SELECT deviceId, moduleId, lastActivityTime FROM devices.modules"; | |
// Default conditions that are always used to limit the result | |
private const string DEVICE_ENABLED_QUERY = "status = 'enabled'"; | |
private const string MODULE_ACTIVE_QUERY = "lastActivityTime > '0001-01-01T00:00:00Z'"; | |
public IothubManager(IothubManagerConfigReader configReader) | |
{ | |
var builder = IotHubConnectionStringBuilder.Create(configReader.IothubConnectionString); | |
Registry = RegistryManager.CreateFromConnectionString(builder.ToString()); | |
IothubHostname = builder.HostName; | |
} | |
public IothubManager(RegistryManager registry, string iothubHostname) | |
{ | |
Registry = registry; | |
IothubHostname = iothubHostname; | |
} | |
public RegistryManager Registry { get; } | |
public string IothubHostname { get; } | |
/// <inheritdoc /> | |
public async Task<List<TwinActivity>> GetDeviceActivityAsync( | |
string deviceQuery = null, string moduleQuery = null, CancellationToken cancellationToken = default, | |
ILogger log = null) | |
{ | |
var logger = (log ?? Log.Logger).ForContext<IothubManager>(); | |
var queryEnabledOnly = CombinedQuery(deviceQuery, DEVICE_ENABLED_QUERY); | |
var dQuery = $"{QUERY_PREFIX} WHERE {queryEnabledOnly}"; | |
var queryActiveModulesOnly = CombinedQuery(moduleQuery, MODULE_ACTIVE_QUERY); | |
var mQuery = $"{MODULE_QUERY_PREFIX} WHERE {queryActiveModulesOnly}"; | |
var devicesTask = GetTwinsByQueryAsync(dQuery, null, -1, cancellationToken, logger); | |
var modulesTask = GetTwinsByQueryAsync(mQuery, null, -1, cancellationToken, logger); | |
// Parallel execution | |
await Task.WhenAll(devicesTask, modulesTask); | |
var (devices, modules) = (await devicesTask, await modulesTask); | |
var edgeDevices = GetEdgeDevices(devices.Result, modules.Result); | |
return devices.Result.Select( | |
twin => { | |
var foundEdgeModule = edgeDevices.TryGetValue(twin.DeviceId, out var edgeDevice); | |
return new TwinActivity | |
{ | |
DeviceId = twin.DeviceId, | |
IsEdgeDevice = foundEdgeModule, | |
LatestActivity = foundEdgeModule | |
? edgeDevice.LastActiveModule.LastActivityTime | |
: twin.LastActivityTime | |
}; | |
} | |
) | |
.ToList(); | |
} | |
/// <summary> | |
/// From the lists of given device and module twins, get the ones that that identify having IoT Edge capability | |
/// and return the device and the last active module for that device. | |
/// </summary> | |
private static Dictionary<string, (Twin Twin, Twin LastActiveModule)> | |
GetEdgeDevices(IEnumerable<Twin> deviceTwins, IEnumerable<Twin> moduleTwins) | |
{ | |
var devicesWithModuleActivity = moduleTwins.GroupBy(x => x.DeviceId) | |
.Select(g => g.OrderByDescending(x => x.LastActivityTime).First()) | |
.ToDictionary(x => x.DeviceId, x => x); | |
return deviceTwins.Where(twin => twin.Capabilities?.IotEdge ?? false) | |
.Join( | |
devicesWithModuleActivity, x => x.DeviceId, x => x.Key, | |
(twin, pair) => (pair.Key, Twin: twin, ModuleTwin: pair.Value) | |
) | |
.ToDictionary(x => x.Key, x => (x.Twin, x.ModuleTwin)); | |
} | |
/// <summary> | |
/// Get twin result by query. | |
/// </summary> | |
/// <param name="query">The query</param> | |
/// <param name="continuationToken">The continuationToken or null</param> | |
/// <param name="numberOfResults">The max result or -1</param> | |
/// <param name="ct">Cancellation token</param> | |
/// <param name="logger">Serilog logger</param> | |
/// <returns></returns> | |
private async Task<ResultWithContinuationToken<List<Twin>>> GetTwinsByQueryAsync( | |
string query, string continuationToken, int numberOfResults, CancellationToken ct, ILogger logger) | |
{ | |
var twins = new List<Twin>(); | |
var twinQuery = Registry.CreateQuery(query); | |
logger.Debug("Twin query: {Query}", query); | |
var options = new QueryOptions {ContinuationToken = continuationToken}; | |
while (twinQuery.HasMoreResults && numberOfResults == -1 || twins.Count < numberOfResults) | |
{ | |
ct.ThrowIfCancellationRequested(); | |
var response = await twinQuery.GetNextAsJsonAsync(options); | |
options.ContinuationToken = response.ContinuationToken; | |
var convert = new TwinJsonConverter(); | |
twins.AddRange( | |
response.Select( | |
x => { | |
using (var reader = new JsonTextReader(new StringReader(x))) | |
{ | |
return reader.Read() | |
? convert.ReadJson(reader, typeof(Twin), null, JsonSerializer.CreateDefault()) | |
: null; | |
} | |
} | |
) | |
.Where(x => x != null) | |
.Cast<Twin>() | |
); | |
} | |
return new ResultWithContinuationToken<List<Twin>>( | |
twins, twinQuery.HasMoreResults ? options.ContinuationToken : null | |
); | |
} | |
private class ResultWithContinuationToken<T> | |
{ | |
public ResultWithContinuationToken(T queryResult, string continuationToken) | |
{ | |
Result = queryResult; | |
ContinuationToken = continuationToken; | |
} | |
public T Result { get; } | |
public string ContinuationToken { get; } | |
} | |
// Helper method to combine query constraints with "AND" | |
private static string CombinedQuery(params string[] queries) => | |
string.Join(" AND ", queries.Where(x => !string.IsNullOrEmpty(x)).ToArray()); | |
} | |
public class IothubManagerConfigReader | |
{ | |
public IothubManagerConfigReader() | |
{ | |
_ = IothubConnectionString ?? throw new ApplicationException("IOTHUB_CONNECTION_STRING is null"); | |
} | |
public string IothubConnectionString => Environment.GetEnvironmentVariable("IOTHUB_CONNECTION_STRING"); | |
} | |
public class TwinActivity | |
{ | |
public string DeviceId { get; set; } | |
public DateTime? LatestActivity { get; set; } | |
public string IothubHostname { get; set; } | |
public bool IsEdgeDevice { get; set; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment