Skip to content

Instantly share code, notes, and snippets.

@VictorKoenders
Last active November 17, 2023 07:13
Show Gist options
  • Save VictorKoenders/51e8ee6e67fdace99be37c1befdce1da to your computer and use it in GitHub Desktop.
Save VictorKoenders/51e8ee6e67fdace99be37c1befdce1da to your computer and use it in GitHub Desktop.
C# EF timescale
public class DataContext : DbContextBase<DataContext>
{
protected override string Schema => "Foo";
public DbSet<TimescaleEntry> TimescaleEntries { get; set; }
public DataContext(DbContextOptions<DataContext> options) : base(options)
{
}
}
public class TimescaleEntry : TimescaleEntity
{
[TimescalePartition]
public Guid ExternalId { get; set; }
public decimal Value { get; set; }
}
public abstract class DbContextBase<TDbContext> : DbContext
where TDbContext : DbContext
{
public DbContextBase(DbContextOptions<TDbContext> options) : base(options)
{
// Do not track retrieved entities by default
ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
}
protected abstract string Schema { get; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.HasDefaultSchema(Schema);
base.OnModelCreating(modelBuilder);
}
public void UpdateHypertables()
{
Database.ExecuteSqlRaw("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;");
foreach (var type in Model.GetEntityTypes())
{
if (typeof(TimescaleEntity).IsAssignableFrom(type.ClrType))
{
var partition = type.ClrType
.GetProperties()
.Select(Property => new { Property, Attribute = Property.GetCustomAttribute<TimescalePartitionAttribute>() })
.FirstOrDefault(p => p.Attribute != null)
?? throw new Exception("Type " + type.ClrType.FullName + " is missing a field with a `[TimescalePartition]` attribute");
Debug.Assert(partition.Attribute != null);
var column = type.GetProperty(partition.Property.Name);
// Reference: https://docs.timescale.com/api/latest/hypertable/create_hypertable/
var query = $"SELECT * FROM create_hypertable('\"{Schema}\".\"{type.GetTableName()}\"', " +
$"'{nameof(TimescaleEntity.Timestamp)}', " +
$"partitioning_column => '{column.GetColumnName()}', " +
$"number_partitions => {partition.Attribute.NumberPartitions}, " +
$"chunk_time_interval => {partition.Attribute.ChunkTimeInterval}, " +
$"if_not_exists => {partition.Attribute.IfNotExists.ToString().ToUpper()});";
Database.ExecuteSqlRaw(query);
}
}
}
public async Task InsertTimescale<T>(
IEnumerable<T> entries,
int? batchSize = null
)
{
var model = Model.FindEntityType(typeof(T)) ?? throw new Exception("Type " + typeof(T).FullName + " not registered on the DataContext"); ;
NpgsqlConnection conn = (NpgsqlConnection)Database.GetDbConnection();
if (conn.State != System.Data.ConnectionState.Open)
await conn.OpenAsync();
var dbBatch = conn.CreateBatch();
var sql = INSERT_COMMANDS.GetOrAdd(typeof(T), _t => GenerateInsertQuery(model));
foreach (var entity in entries)
{
var dbCommand = new NpgsqlBatchCommand(sql);
foreach (var prop in model.GetProperties())
{
dbCommand.Parameters.Add(new NpgsqlParameter("@" + prop.GetColumnName(), prop.PropertyInfo!.GetValue(entity)));
}
dbBatch.BatchCommands.Add(dbCommand);
if (batchSize != null)
{
if (dbBatch.BatchCommands.Count >= batchSize)
{
await dbBatch.ExecuteNonQueryAsync();
dbBatch.BatchCommands.Clear();
}
}
}
if (dbBatch.BatchCommands.Count > 0)
{
await dbBatch.ExecuteNonQueryAsync();
}
static string GenerateInsertQuery(Microsoft.EntityFrameworkCore.Metadata.IEntityType model)
{
var sql = "INSERT INTO \"" + SCHEMA + "\".\"" + model.GetTableName() + "\" (";
var values = ") VALUES (";
var first = true;
foreach (var prop in model.GetProperties())
{
if (first) first = false;
else
{
sql += ", ";
values += ", ";
}
sql += "\"" + prop.GetColumnName() + "\"";
values += "@" + prop.GetColumnName();
}
sql += values + ")";
return sql;
}
}
}
/// <summary>
/// Timescale entity. Extend from this to use timescale data.
///
/// Note: this will create an entity without a key, so you can't use normal EF with this.
/// Note: You need to mark exactly 1 property with <see cref="TimescalePartitionAttribute"/>.
/// Note: DbContextBase has a method <see cref="DbContextBase{TDbContext}.InsertTimescale{T}(IEnumerable{T})"/> for inserting timescale entries.
/// </summary>
[Keyless]
public abstract class TimescaleEntity
{
public DateTimeOffset Timestamp { get; set; }
}
[AttributeUsage(AttributeTargets.Property)]
public class TimescalePartitionAttribute : Attribute
{
public TimescalePartitionAttribute(
int numberPartitions = 4,
string chunkTimeInterval = "INTERVAL '1 day'",
bool ifNotExists = true
)
{
NumberPartitions = numberPartitions;
ChunkTimeInterval = chunkTimeInterval;
IfNotExists = ifNotExists;
}
public int NumberPartitions { get; }
public string ChunkTimeInterval { get; }
public bool IfNotExists { get; }
}
var dbContext = scope.ServiceProvider.GetRequiredService<DataContext>();
await dbContext.Database.MigrateAsync();
dbContext.UpdateHypertables();
await dbContext.InsertTimescale(new List<TimescaleEntry> { ... });
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment