Skip to content

Instantly share code, notes, and snippets.

@chaoaretasty
Created May 7, 2015 13:58
Show Gist options
  • Save chaoaretasty/aecc06b6fecf17276fe8 to your computer and use it in GitHub Desktop.
Save chaoaretasty/aecc06b6fecf17276fe8 to your computer and use it in GitHub Desktop.

Temporary fix for HangfireIO/Hangfire#362.

This provides a class for FixedHangfireSqlStorage and an extension method for GlobalConfiguration. This class uses the FixedCountersAggregator which has the ammened SQL command.

As this uses reflection to get around the heavy use of internal it comes with all the usual caveats of "works on my machine, with my version, and my setup".

using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Reflection;
using System.Threading;
using Dapper;
using Hangfire;
using Hangfire.Annotations;
using Hangfire.Logging;
using Hangfire.Server;
using Hangfire.SqlServer;
namespace MyApp
{
public class FixedHangfireSqlStorage : SqlServerStorage
{
private readonly SqlServerStorageOptions _options;
public FixedHangfireSqlStorage(string nameOrConnectionString, SqlServerStorageOptions options) : base(nameOrConnectionString, options)
{
_options = options;
}
public override IEnumerable<IServerComponent> GetComponents()
{
var a = Assembly.GetAssembly(typeof(SqlServerStorage))
.GetType("Hangfire.SqlServer.ExpirationManager")
.GetConstructor(BindingFlags.Public | BindingFlags.Instance, null, CallingConventions.HasThis, new Type[] {typeof (SqlServerStorage), typeof (TimeSpan)}, null)
.Invoke(new object[]{this, _options.JobExpirationCheckInterval});
yield return a as IServerComponent;
yield return new FixedCountersAggregator(this, _options.CountersAggregateInterval);
}
}
internal class FixedCountersAggregator : IServerComponent
{
private static readonly ILog Logger = LogProvider.GetCurrentClassLogger();
private const int NumberOfRecordsInSinglePass = 1000;
private static readonly TimeSpan DelayBetweenPasses = TimeSpan.FromMilliseconds(500);
private readonly SqlServerStorage _storage;
private readonly TimeSpan _interval;
public FixedCountersAggregator(SqlServerStorage storage, TimeSpan interval)
{
if (storage == null) throw new ArgumentNullException("storage");
_storage = storage;
_interval = interval;
}
public void Execute(CancellationToken cancellationToken)
{
Logger.DebugFormat("Aggregating records in 'Counter' table...");
int removedCount;
do
{
using (var storageConnection = _storage.GetConnection())
{
var query = GetAggregationQuery();
SqlConnection connection = (SqlConnection) storageConnection.GetType().GetProperty("Connection").GetValue(storageConnection);
removedCount = connection.Execute(
query,
new { now = DateTime.UtcNow, count = NumberOfRecordsInSinglePass });
}
if (removedCount >= NumberOfRecordsInSinglePass)
{
cancellationToken.WaitHandle.WaitOne(DelayBetweenPasses);
cancellationToken.ThrowIfCancellationRequested();
}
} while (removedCount >= NumberOfRecordsInSinglePass);
cancellationToken.WaitHandle.WaitOne(_interval);
}
public override string ToString()
{
return "SQL Counter Table Aggregator";
}
private static string GetAggregationQuery()
{
return @"
DECLARE @RecordsToAggregate TABLE
(
[Key] NVARCHAR(100) NOT NULL,
[Value] SMALLINT NOT NULL,
[ExpireAt] DATETIME NULL
)
SET TRANSACTION ISOLATION LEVEL READ COMMITTED
BEGIN TRAN
DELETE TOP (@count) [Hangfire].[Counter] with (readpast)
OUTPUT DELETED.[Key], DELETED.[Value], DELETED.[ExpireAt] INTO @RecordsToAggregate
SET NOCOUNT ON;
MERGE [Hangfire].[AggregatedCounter] AS [Target]
USING (
SELECT [Key], SUM([Value]) as [Value], MAX([ExpireAt]) AS [ExpireAt] FROM @RecordsToAggregate
GROUP BY [Key]) AS [Source] ([Key], [Value], [ExpireAt])
ON [Target].[Key] = [Source].[Key]
WHEN MATCHED THEN UPDATE SET
[Target].[Value] = [Target].[Value] + [Source].[Value],
[Target].[ExpireAt] = (SELECT MAX([ExpireAt]) FROM (VALUES ([Source].ExpireAt), ([Target].[ExpireAt])) AS MaxExpireAt([ExpireAt]))
WHEN NOT MATCHED THEN INSERT ([Key], [Value], [ExpireAt]) VALUES ([Source].[Key], [Source].[Value], [Source].[ExpireAt]);
COMMIT TRAN";
}
}
public static class FixedSqlServerStorageExtensions
{
public static IGlobalConfiguration<SqlServerStorage> UseFixedSqlServerStorage(
[NotNull] this IGlobalConfiguration configuration,
[NotNull] string nameOrConnectionString,
[NotNull] SqlServerStorageOptions options)
{
if (configuration == null) throw new ArgumentNullException("configuration");
if (nameOrConnectionString == null) throw new ArgumentNullException("nameOrConnectionString");
if (options == null) throw new ArgumentNullException("options");
var storage = new FixedHangfireSqlStorage(nameOrConnectionString, options);
return configuration.UseStorage(storage);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment