Skip to content

Instantly share code, notes, and snippets.

@cybermaxs
Last active January 28, 2020 13:20
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cybermaxs/75c5f1fb2bb4c492bf3e to your computer and use it in GitHub Desktop.
Save cybermaxs/75c5f1fb2bb4c492bf3e to your computer and use it in GitHub Desktop.
Adventures in storing Time Series with Microsoft Azure Table Storage.https://cybermaxs.wordpress.com/2015/04/29/store-time-series-in-microsoft-azure-table-storage/
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;
using System;
using System.Collections.Generic;
using System.Linq;
using Xunit;
namespace Syntheticks.Tests.Designs
{
public class TimeSeries_AdvancedDesign
{
const string TableName = "timeseriesadvanceddesign";
const int PartitionFactor = 60 * 60; // one hour
const int RowFactor = 4 * 60; // 4 minutes
readonly Random generator = new Random();
[Fact]
public void AddPoints()
{
var table = GetCloudTable(TableName, true);
var now = new DateTime(2015, 1, 1, 0, 0, 0);
var current = now;
var dyns = new List<DynamicTableEntity>();
foreach (var i in Enumerable.Range(0, 3600 * 24))
{
var partitionKey = ToRoundedSecondsTimestamp(current, PartitionFactor).ToString();
var rowLey = ToRoundedSecondsTimestamp(current, RowFactor).ToString();
var dyn = dyns.FirstOrDefault(d => d.RowKey == rowLey && d.PartitionKey == partitionKey);
if (dyn == null)
{
dyn = new DynamicTableEntity();
dyn.PartitionKey = partitionKey;
dyn.RowKey = rowLey;
dyns.Add(dyn);
}
dyn.Properties.Add("t_" + ToSecondsTimestamp(current), new EntityProperty(generator.Next(10000)));
current = current.AddSeconds(1);
}
var groups = dyns.GroupBy(d => d.PartitionKey);
foreach (var g in groups)
{
var batch = new TableBatchOperation();
foreach (var d in g)
batch.Add(TableOperation.InsertOrMerge(d));
var results = table.ExecuteBatchAsync(batch).Result;
Assert.All(results, tr => tr.HttpStatusCode = 200);
}
//now query
}
[Fact]
public void RunQuery()
{
var table = GetCloudTable(TableName, false);
// time range : first two hours of 2015
var from_ts = ToRoundedSecondsTimestamp(new DateTime(2015, 1, 1, 0, 0, 0), RowFactor);
var to_ts = ToRoundedSecondsTimestamp(new DateTime(2015, 1, 1, 2, 0, 0).AddSeconds(-1), RowFactor);
// generate all row keys in the time range
var nbRows = (to_ts - from_ts) / RowFactor + 1;
var rowKeys = new long[nbRows];
for (var i = 0; i < nbRows; i++)
{
rowKeys[i] = from_ts + i * RowFactor;
}
// group row keys by partition
var partitionKeys = rowKeys.GroupBy(r => ToRoundedTimestamp(r, PartitionFactor));
var partitionFilters = new List<string>();
foreach (var part in partitionKeys)
{
// PartitionKey = X and (RowKey=Y or RowKey=Y+1 or RowKey=Y+2 ...)
string partitionFilter = TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, part.Key.ToString());
string rowsFilter = string.Join(" " + TableOperators.Or + " ", part.Select(r => TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, r.ToString())));
string combinedFilter = TableQuery.CombineFilters(partitionFilter, TableOperators.And, rowsFilter);
partitionFilters.Add(combinedFilter);
}
// combine all filters
string final = string.Join(" " + TableOperators.Or + " ", partitionFilters);
var query = new TableQuery<DynamicTableEntity>().Where(final);
var res = table.ExecuteQuery(query);
//do something with results...
//basic check
long nbpoints = 0;
foreach (var r in res)
{
nbpoints += r.Properties.Where(p => p.Key.Contains("t")).Count();
}
Assert.Equal(7200, nbpoints);
}
#region Private Helpers
private static CloudTable GetCloudTable(string tableName, bool drop)
{
CloudStorageAccount storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
var tableClient = storageAccount.CreateCloudTableClient();
var table = tableClient.GetTableReference(tableName);
if (drop)
table.DeleteIfExists();
table.CreateIfNotExists();
return table;
}
public const long EpochTicks = 621355968000000000;
public const long TicksPeriod = 10000000;
/// <summary>
/// Number of seconds since epoch(1/1/1970).
/// </summary>
/// <param name="date">DateTime to convert</param>
/// <returns>Number of seconds since 1/1/1970 (Unix timestamp)</returns>
public static long ToSecondsTimestamp(DateTime date)
{
long ts = (date.Ticks - EpochTicks) / TicksPeriod;
return ts;
}
/// <summary>
/// Round a timestamp in seconds.
/// </summary>
/// <param name="date">DateTime to convert</param>
/// <param name="factor">Round factor in seconds.</param>
/// <returns>Rounded Timestamp in seconds.</returns>
public static long ToRoundedSecondsTimestamp(DateTime date, long factor)
{
return ((long)ToSecondsTimestamp(date) / factor) * factor;
}
/// <summary>
/// Round a timestamp in seconds to the desired precision.
/// </summary>
/// <param name="date"></param>
/// <param name="precision"></param>
/// <returns></returns>
public static long ToRoundedTimestamp(Int64 timestamp, long factor)
{
return ((long)timestamp / factor) * factor;
}
public static IEnumerable<List<T>> InSetsOf<T>(IEnumerable<T> source, int max)
{
List<T> toReturn = new List<T>(max);
foreach (var item in source)
{
toReturn.Add(item);
if (toReturn.Count == max)
{
yield return toReturn;
toReturn = new List<T>(max);
}
}
if (toReturn.Any())
{
yield return toReturn;
}
}
#endregion
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment