Skip to content

Instantly share code, notes, and snippets.

@sbosell
Created December 22, 2016 15:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save sbosell/3831f5bb893b20e82c72467baf8aefea to your computer and use it in GitHub Desktop.
Save sbosell/3831f5bb893b20e82c72467baf8aefea to your computer and use it in GitHub Desktop.
Hangfire DisableConcurrentExecutionWithParameters Test
using Hangfire;
using Hangfire.Client;
using Hangfire.Common;
using Hangfire.Server;
using Hangfire.Storage;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace HangfireTest
{
public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IServerFilter
{
private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
private static readonly TimeSpan FingerprintTimeout = TimeSpan.FromHours(1);
public void OnCreating(CreatingContext filterContext)
{
if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job))
{
filterContext.Canceled = true;
}
}
public void OnPerformed(PerformedContext filterContext)
{
RemoveFingerprint(filterContext.Connection, filterContext.Job);
}
private static bool AddFingerprintIfNotExists(IStorageConnection connection, Job job)
{
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
{
var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(job));
DateTimeOffset timestamp;
if (fingerprint != null &&
fingerprint.ContainsKey("Timestamp") &&
DateTimeOffset.TryParse(fingerprint["Timestamp"], null, DateTimeStyles.RoundtripKind, out timestamp) &&
DateTimeOffset.UtcNow <= timestamp.Add(FingerprintTimeout))
{
// Actual fingerprint found, returning.
return false;
}
// Fingerprint does not exist, it is invalid (no `Timestamp` key),
// or it is not actual (timeout expired).
connection.SetRangeInHash(GetFingerprintKey(job), new Dictionary<string, string>
{
{ "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
});
return true;
}
}
private static void RemoveFingerprint(IStorageConnection connection, Job job)
{
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
using (var transaction = connection.CreateWriteTransaction())
{
transaction.RemoveHash(GetFingerprintKey(job));
transaction.Commit();
}
}
private static string GetFingerprintLockKey(Job job)
{
return String.Format("{0}:lock", GetFingerprintKey(job));
}
private static string GetFingerprintKey(Job job)
{
return String.Format("fingerprint:{0}", GetFingerprint(job));
}
private static string GetFingerprint(Job job)
{
var parameters = string.Empty;
if (job?.Arguments != null)
{
parameters = string.Join(".", job.Arguments);
}
if (job?.Type == null || job.Method == null)
{
return string.Empty;
}
var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}";
var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload));
var fingerprint = Convert.ToBase64String(hash);
return fingerprint;
}
void IClientFilter.OnCreated(CreatedContext filterContext)
{
}
void IServerFilter.OnPerforming(PerformingContext filterContext)
{
}
}
public class DisableConcurrentExecutionWithParametersAttribute : JobFilterAttribute, IServerFilter
{
private readonly int _timeoutInSeconds;
public DisableConcurrentExecutionWithParametersAttribute(int timeoutInSeconds)
{
if (timeoutInSeconds < 0) throw new ArgumentException("Timeout argument value should be greater that zero.");
_timeoutInSeconds = timeoutInSeconds;
}
public void OnPerforming(PerformingContext filterContext)
{
var resource = GetResource(filterContext.BackgroundJob.Job);
var timeout = TimeSpan.FromSeconds(_timeoutInSeconds);
var distributedLock = filterContext.Connection.AcquireDistributedLock(resource, timeout);
filterContext.Items["DistributedLock"] = distributedLock;
}
public void OnPerformed(PerformedContext filterContext)
{
if (!filterContext.Items.ContainsKey("DistributedLock"))
{
throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");
}
var distributedLock = (IDisposable)filterContext.Items["DistributedLock"];
distributedLock.Dispose();
}
private static string GetFingerprint(Job job)
{
var parameters = string.Empty;
if (job?.Arguments != null)
{
parameters = string.Join(".", job.Arguments);
}
if (job?.Type == null || job.Method == null)
{
return string.Empty;
}
var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}";
var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload));
var fingerprint = Convert.ToBase64String(hash);
return fingerprint;
}
private static string GetResource(Job job)
{
return GetFingerprint(job);
}
}
public class TestProcess
{
[DisableConcurrentExecution2Attribute(10000)]
public void Go(int id)
{
Console.WriteLine(id);
Thread.Sleep(10000);
}
}
class Program
{
static void Main(string[] args)
{
GlobalConfiguration.Configuration.UseSqlServerStorage("localconnect");
using (var server = new BackgroundJobServer())
{
BackgroundJob.Enqueue<TestProcess>((x) => x.Go(1));
BackgroundJob.Enqueue<TestProcess>((x) => x.Go(1));
BackgroundJob.Enqueue<TestProcess>((x) => x.Go(1));
BackgroundJob.Enqueue<TestProcess>((x) => x.Go(2));
BackgroundJob.Enqueue<TestProcess>((x) => x.Go(2));
BackgroundJob.Enqueue<TestProcess>((x) => x.Go(2));
BackgroundJob.Enqueue<TestProcess>((x) => x.Go(3));
BackgroundJob.Enqueue<TestProcess>((x) => x.Go(3));
BackgroundJob.Enqueue<TestProcess>((x) => x.Go(3));
Console.WriteLine("Hangfire Server started. Press any key to exit...");
Console.ReadKey();
}
}
}
}
@aethercowboy
Copy link

since Job.Arguments is now obsolete, perhaps change that part to do the following:

if (job?.Args != null) {
    parameters = string.Join(".", job.Args.Select(JobHelper.ToJson));
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment