Last active
January 28, 2020 13:20
-
-
Save cybermaxs/75c5f1fb2bb4c492bf3e to your computer and use it in GitHub Desktop.
Adventures in storing Time Series with Microsoft Azure Table Storage.https://cybermaxs.wordpress.com/2015/04/29/store-time-series-in-microsoft-azure-table-storage/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.WindowsAzure.Storage; | |
using Microsoft.WindowsAzure.Storage.Table; | |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using Xunit; | |
namespace Syntheticks.Tests.Designs | |
{ | |
public class TimeSeries_AdvancedDesign | |
{ | |
const string TableName = "timeseriesadvanceddesign"; | |
const int PartitionFactor = 60 * 60; // one hour | |
const int RowFactor = 4 * 60; // 4 minutes | |
readonly Random generator = new Random(); | |
[Fact] | |
public void AddPoints() | |
{ | |
var table = GetCloudTable(TableName, true); | |
var now = new DateTime(2015, 1, 1, 0, 0, 0); | |
var current = now; | |
var dyns = new List<DynamicTableEntity>(); | |
foreach (var i in Enumerable.Range(0, 3600 * 24)) | |
{ | |
var partitionKey = ToRoundedSecondsTimestamp(current, PartitionFactor).ToString(); | |
var rowLey = ToRoundedSecondsTimestamp(current, RowFactor).ToString(); | |
var dyn = dyns.FirstOrDefault(d => d.RowKey == rowLey && d.PartitionKey == partitionKey); | |
if (dyn == null) | |
{ | |
dyn = new DynamicTableEntity(); | |
dyn.PartitionKey = partitionKey; | |
dyn.RowKey = rowLey; | |
dyns.Add(dyn); | |
} | |
dyn.Properties.Add("t_" + ToSecondsTimestamp(current), new EntityProperty(generator.Next(10000))); | |
current = current.AddSeconds(1); | |
} | |
var groups = dyns.GroupBy(d => d.PartitionKey); | |
foreach (var g in groups) | |
{ | |
var batch = new TableBatchOperation(); | |
foreach (var d in g) | |
batch.Add(TableOperation.InsertOrMerge(d)); | |
var results = table.ExecuteBatchAsync(batch).Result; | |
Assert.All(results, tr => tr.HttpStatusCode = 200); | |
} | |
//now query | |
} | |
[Fact] | |
public void RunQuery() | |
{ | |
var table = GetCloudTable(TableName, false); | |
// time range : first two hours of 2015 | |
var from_ts = ToRoundedSecondsTimestamp(new DateTime(2015, 1, 1, 0, 0, 0), RowFactor); | |
var to_ts = ToRoundedSecondsTimestamp(new DateTime(2015, 1, 1, 2, 0, 0).AddSeconds(-1), RowFactor); | |
// generate all row keys in the time range | |
var nbRows = (to_ts - from_ts) / RowFactor + 1; | |
var rowKeys = new long[nbRows]; | |
for (var i = 0; i < nbRows; i++) | |
{ | |
rowKeys[i] = from_ts + i * RowFactor; | |
} | |
// group row keys by partition | |
var partitionKeys = rowKeys.GroupBy(r => ToRoundedTimestamp(r, PartitionFactor)); | |
var partitionFilters = new List<string>(); | |
foreach (var part in partitionKeys) | |
{ | |
// PartitionKey = X and (RowKey=Y or RowKey=Y+1 or RowKey=Y+2 ...) | |
string partitionFilter = TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, part.Key.ToString()); | |
string rowsFilter = string.Join(" " + TableOperators.Or + " ", part.Select(r => TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, r.ToString()))); | |
string combinedFilter = TableQuery.CombineFilters(partitionFilter, TableOperators.And, rowsFilter); | |
partitionFilters.Add(combinedFilter); | |
} | |
// combine all filters | |
string final = string.Join(" " + TableOperators.Or + " ", partitionFilters); | |
var query = new TableQuery<DynamicTableEntity>().Where(final); | |
var res = table.ExecuteQuery(query); | |
//do something with results... | |
//basic check | |
long nbpoints = 0; | |
foreach (var r in res) | |
{ | |
nbpoints += r.Properties.Where(p => p.Key.Contains("t")).Count(); | |
} | |
Assert.Equal(7200, nbpoints); | |
} | |
#region Private Helpers | |
private static CloudTable GetCloudTable(string tableName, bool drop) | |
{ | |
CloudStorageAccount storageAccount = CloudStorageAccount.DevelopmentStorageAccount; | |
var tableClient = storageAccount.CreateCloudTableClient(); | |
var table = tableClient.GetTableReference(tableName); | |
if (drop) | |
table.DeleteIfExists(); | |
table.CreateIfNotExists(); | |
return table; | |
} | |
public const long EpochTicks = 621355968000000000; | |
public const long TicksPeriod = 10000000; | |
/// <summary> | |
/// Number of seconds since epoch(1/1/1970). | |
/// </summary> | |
/// <param name="date">DateTime to convert</param> | |
/// <returns>Number of seconds since 1/1/1970 (Unix timestamp)</returns> | |
public static long ToSecondsTimestamp(DateTime date) | |
{ | |
long ts = (date.Ticks - EpochTicks) / TicksPeriod; | |
return ts; | |
} | |
/// <summary> | |
/// Round a timestamp in seconds. | |
/// </summary> | |
/// <param name="date">DateTime to convert</param> | |
/// <param name="factor">Round factor in seconds.</param> | |
/// <returns>Rounded Timestamp in seconds.</returns> | |
public static long ToRoundedSecondsTimestamp(DateTime date, long factor) | |
{ | |
return ((long)ToSecondsTimestamp(date) / factor) * factor; | |
} | |
/// <summary> | |
/// Round a timestamp in seconds to the desired precision. | |
/// </summary> | |
/// <param name="date"></param> | |
/// <param name="precision"></param> | |
/// <returns></returns> | |
public static long ToRoundedTimestamp(Int64 timestamp, long factor) | |
{ | |
return ((long)timestamp / factor) * factor; | |
} | |
public static IEnumerable<List<T>> InSetsOf<T>(IEnumerable<T> source, int max) | |
{ | |
List<T> toReturn = new List<T>(max); | |
foreach (var item in source) | |
{ | |
toReturn.Add(item); | |
if (toReturn.Count == max) | |
{ | |
yield return toReturn; | |
toReturn = new List<T>(max); | |
} | |
} | |
if (toReturn.Any()) | |
{ | |
yield return toReturn; | |
} | |
} | |
#endregion | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment